rpc connection timeouts
This commit is contained in:
@@ -1,7 +1,7 @@
|
|||||||
sqlalchemy
|
sqlalchemy
|
||||||
alembic
|
alembic
|
||||||
omegaconf
|
omegaconf
|
||||||
web3<7
|
web3
|
||||||
psycopg2-binary
|
psycopg2-binary
|
||||||
orjson
|
orjson
|
||||||
sortedcontainers
|
sortedcontainers
|
||||||
@@ -16,5 +16,15 @@ eth-bloom
|
|||||||
python-dateutil
|
python-dateutil
|
||||||
eth_abi
|
eth_abi
|
||||||
eth_utils
|
eth_utils
|
||||||
|
eth_typing
|
||||||
|
eth-keys
|
||||||
|
eth-account
|
||||||
|
eth-utils
|
||||||
|
eth-typing
|
||||||
pdpyras # pagerduty
|
pdpyras # pagerduty
|
||||||
numpy
|
numpy
|
||||||
|
bitarray
|
||||||
|
typing_extensions
|
||||||
|
requests
|
||||||
|
aiohttp
|
||||||
|
charset-normalizer
|
||||||
|
|||||||
@@ -3,8 +3,11 @@ import logging
|
|||||||
from random import random
|
from random import random
|
||||||
from typing import Any, Optional, Union
|
from typing import Any, Optional, Union
|
||||||
|
|
||||||
|
import requests
|
||||||
|
import requests.adapters
|
||||||
|
|
||||||
# noinspection PyPackageRequirements
|
# noinspection PyPackageRequirements
|
||||||
from aiohttp import ClientResponseError
|
from aiohttp import ClientResponseError, ClientSession, BaseConnector, ClientTimeout, TCPConnector
|
||||||
from eth_typing import URI
|
from eth_typing import URI
|
||||||
from hexbytes import HexBytes
|
from hexbytes import HexBytes
|
||||||
from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider
|
from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider
|
||||||
@@ -35,7 +38,11 @@ async def create_w3(rpc_url=None, account=NARG, autosign=True, name='default'):
|
|||||||
# assert all(w3.eth.chain_id == chain_id for w3 in self.w3s) # all rpc urls must be the same blockchain
|
# assert all(w3.eth.chain_id == chain_id for w3 in self.w3s) # all rpc urls must be the same blockchain
|
||||||
# self.w3iter = itertools.cycle(self.w3s)
|
# self.w3iter = itertools.cycle(self.w3s)
|
||||||
url = resolve_rpc_url(rpc_url)
|
url = resolve_rpc_url(rpc_url)
|
||||||
w3 = AsyncWeb3(RetryHTTPProvider(url))
|
connector = TCPConnector(limit=config.concurrent_rpc_connections)
|
||||||
|
session = ClientSession(connector=connector, timeout=ClientTimeout(config.rpc_timeout))
|
||||||
|
http_provider = RetryHTTPProvider(url)
|
||||||
|
await http_provider.cache_async_session(session)
|
||||||
|
w3 = AsyncWeb3(http_provider)
|
||||||
# w3.middleware_onion.inject(geth_poa_middleware, layer=0) # todo is this line needed?
|
# w3.middleware_onion.inject(geth_poa_middleware, layer=0) # todo is this line needed?
|
||||||
# w3.middleware_onion.add(simple_cache_middleware)
|
# w3.middleware_onion.add(simple_cache_middleware)
|
||||||
# log.debug(f'middleware {list(w3.middleware_onion.middlewares)}')
|
# log.debug(f'middleware {list(w3.middleware_onion.middlewares)}')
|
||||||
@@ -129,8 +136,9 @@ class RetryHTTPProvider (AsyncHTTPProvider):
|
|||||||
self.rate_allowed.set()
|
self.rate_allowed.set()
|
||||||
|
|
||||||
async def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse:
|
async def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse:
|
||||||
|
MAX_TRIES = 10
|
||||||
wait = 0
|
wait = 0
|
||||||
while True:
|
for _ in range(MAX_TRIES):
|
||||||
try:
|
try:
|
||||||
async with self.in_flight:
|
async with self.in_flight:
|
||||||
await self.rate_allowed.wait()
|
await self.rate_allowed.wait()
|
||||||
@@ -152,3 +160,4 @@ class RetryHTTPProvider (AsyncHTTPProvider):
|
|||||||
self.rate_allowed.set()
|
self.rate_allowed.set()
|
||||||
# finally:
|
# finally:
|
||||||
# log.debug(f'Ended request of RPC call {method}')
|
# log.debug(f'Ended request of RPC call {method}')
|
||||||
|
raise IOError(f'Could not query rpc server after {MAX_TRIES} tries: {method} {params}')
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ class Config:
|
|||||||
|
|
||||||
concurrent_rpc_connections: int = 4
|
concurrent_rpc_connections: int = 4
|
||||||
parallel_logevent_queries: bool = True
|
parallel_logevent_queries: bool = True
|
||||||
|
rpc_timeout: float = 3
|
||||||
polling: float = 0 # seconds between queries for a new block. 0 disables polling and uses a websocket subscription on ws_url instead
|
polling: float = 0 # seconds between queries for a new block. 0 disables polling and uses a websocket subscription on ws_url instead
|
||||||
backfill: int = 0 # if not 0, then runner will initialize an empty database by backfilling from the given block height. Use negative numbers to indicate a number of blocks before the present.
|
backfill: int = 0 # if not 0, then runner will initialize an empty database by backfilling from the given block height. Use negative numbers to indicate a number of blocks before the present.
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user