From 2a95dd26df98b90ba8ba89e0fb6e22e6d57834dd Mon Sep 17 00:00:00 2001 From: tim Date: Wed, 23 Oct 2024 16:45:43 -0400 Subject: [PATCH] rpc connection timeouts --- requirements.txt | 12 +++++++++++- src/dexorder/blockchain/connection.py | 15 ++++++++++++--- src/dexorder/configuration/schema.py | 1 + 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/requirements.txt b/requirements.txt index edda26c..927247b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ sqlalchemy alembic omegaconf -web3<7 +web3 psycopg2-binary orjson sortedcontainers @@ -16,5 +16,15 @@ eth-bloom python-dateutil eth_abi eth_utils +eth_typing +eth-keys +eth-account +eth-utils +eth-typing pdpyras # pagerduty numpy +bitarray +typing_extensions +requests +aiohttp +charset-normalizer diff --git a/src/dexorder/blockchain/connection.py b/src/dexorder/blockchain/connection.py index e4aac87..14a468d 100644 --- a/src/dexorder/blockchain/connection.py +++ b/src/dexorder/blockchain/connection.py @@ -3,8 +3,11 @@ import logging from random import random from typing import Any, Optional, Union +import requests +import requests.adapters + # noinspection PyPackageRequirements -from aiohttp import ClientResponseError +from aiohttp import ClientResponseError, ClientSession, BaseConnector, ClientTimeout, TCPConnector from eth_typing import URI from hexbytes import HexBytes from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider @@ -35,7 +38,11 @@ async def create_w3(rpc_url=None, account=NARG, autosign=True, name='default'): # assert all(w3.eth.chain_id == chain_id for w3 in self.w3s) # all rpc urls must be the same blockchain # self.w3iter = itertools.cycle(self.w3s) url = resolve_rpc_url(rpc_url) - w3 = AsyncWeb3(RetryHTTPProvider(url)) + connector = TCPConnector(limit=config.concurrent_rpc_connections) + session = ClientSession(connector=connector, timeout=ClientTimeout(config.rpc_timeout)) + http_provider = RetryHTTPProvider(url) + await http_provider.cache_async_session(session) + w3 = AsyncWeb3(http_provider) # w3.middleware_onion.inject(geth_poa_middleware, layer=0) # todo is this line needed? # w3.middleware_onion.add(simple_cache_middleware) # log.debug(f'middleware {list(w3.middleware_onion.middlewares)}') @@ -129,8 +136,9 @@ class RetryHTTPProvider (AsyncHTTPProvider): self.rate_allowed.set() async def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse: + MAX_TRIES = 10 wait = 0 - while True: + for _ in range(MAX_TRIES): try: async with self.in_flight: await self.rate_allowed.wait() @@ -152,3 +160,4 @@ class RetryHTTPProvider (AsyncHTTPProvider): self.rate_allowed.set() # finally: # log.debug(f'Ended request of RPC call {method}') + raise IOError(f'Could not query rpc server after {MAX_TRIES} tries: {method} {params}') diff --git a/src/dexorder/configuration/schema.py b/src/dexorder/configuration/schema.py index 9feecb6..d70f92e 100644 --- a/src/dexorder/configuration/schema.py +++ b/src/dexorder/configuration/schema.py @@ -25,6 +25,7 @@ class Config: concurrent_rpc_connections: int = 4 parallel_logevent_queries: bool = True + rpc_timeout: float = 3 polling: float = 0 # seconds between queries for a new block. 0 disables polling and uses a websocket subscription on ws_url instead backfill: int = 0 # if not 0, then runner will initialize an empty database by backfilling from the given block height. Use negative numbers to indicate a number of blocks before the present.