diff --git a/src/dexorder/blockchain/connection.py b/src/dexorder/blockchain/connection.py index 1e79f2b..c54d804 100644 --- a/src/dexorder/blockchain/connection.py +++ b/src/dexorder/blockchain/connection.py @@ -1,65 +1,94 @@ import asyncio +import itertools import logging from random import random -from typing import Any, Optional, Union +from typing import Any, Optional, Union, Callable # noinspection PyPackageRequirements from aiohttp import ClientResponseError, ClientSession, ClientTimeout, TCPConnector from eth_typing import URI from hexbytes import HexBytes from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider +from web3.exceptions import Web3Exception from web3.middleware.signing import async_construct_sign_and_send_raw_middleware from web3.types import RPCEndpoint, RPCResponse from .. import current_w3, Blockchain, config, Account, NARG from ..base.chain import current_chain -from ..configuration import resolve_rpc_url -from ..configuration.resolve import resolve_ws_url from ..contract import get_contract_data -async def connect(rpc_url=None, account=NARG, autosign=True, name='default'): +log = logging.getLogger(__name__) + + +async def connect(rpc_url: Union[str,list[str]]=None, account=NARG, autosign=True, name='default', *, archive_url: Union[str,list[str]] = None): """ connects to the rpc_url and configures context vars """ - w3 = await create_w3(rpc_url, account, autosign, name) + w3 = await create_w3(rpc_url, account, autosign, name, archive_url=archive_url) current_w3.set(w3) chain = Blockchain.get(await w3.eth.chain_id) current_chain.set(chain) return w3 -async def create_w3(rpc_url=None, account=NARG, autosign=True, name='default'): - # todo create a proxy w3 that rotates among rpc urls - # self.w3s = tuple(await create_w3(url) for url in rpc_url_or_tag) - # chain_id = self.w3s[0].eth.chain_id - # assert all(w3.eth.chain_id == chain_id for w3 in self.w3s) # all rpc urls must be the same blockchain - # self.w3iter = itertools.cycle(self.w3s) - url = resolve_rpc_url(rpc_url) - connector = TCPConnector(limit=config.concurrent_rpc_connections) - session = ClientSession(connector=connector, timeout=ClientTimeout(config.rpc_timeout)) - http_provider = RetryHTTPProvider(url) - await http_provider.cache_async_session(session) - w3 = AsyncWeb3(http_provider) - # w3.middleware_onion.inject(geth_poa_middleware, layer=0) # todo is this line needed? - # w3.middleware_onion.add(simple_cache_middleware) - # log.debug(f'middleware {list(w3.middleware_onion.middlewares)}') - w3.middleware_onion.remove('attrdict') - w3.middleware_onion.add(clean_input_async, 'clean_input') - w3.eth.Contract = _make_contract(w3.eth) - has_account = False - if autosign: - if account is NARG: - account = Account.get() - if account is not None: - # noinspection PyTypeChecker - w3.middleware_onion.add(await async_construct_sign_and_send_raw_middleware(account)) - w3.eth.default_account = account.address - has_account = True - log.info(f'{name} w3 configured to autosign as {account.address}') - if not has_account: - log.info(f'No account set for {name} w3') - return w3 +async def create_w3(rpc_url: Union[str,list[str]]=None, account=NARG, autosign=True, name='default', *, archive_url: Union[str,list[str]] = None) -> AsyncWeb3: + if rpc_url is None: + rpc_urls = [config.rpc_url.strip()] + if isinstance(rpc_url, str): + rpc_urls = [resolve_rpc_url(s) for url in rpc_url.split(',') if (s:=url.strip()) != ''] + elif isinstance(rpc_url, list): + rpc_urls = [resolve_rpc_url(s) for url in rpc_url if (s:=url.strip()) != ''] + else: + raise ValueError("rpc_url must be a string or list of strings") + + if archive_url is None: + archive_urls = config.archive_url.strip() + if archive_url is None: + archive_urls = [] + elif isinstance(archive_url, str): + archive_urls = [resolve_rpc_url(s) for url in archive_url.split(',') if (s:=url.strip()) != ''] + elif isinstance(archive_url, list): + archive_urls = [resolve_rpc_url(s) for url in archive_url if (s:=url.strip()) != ''] + else: + raise ValueError("archive_url must be a string or list of strings") + + if not rpc_urls: + raise ValueError("No rpc_url configured") + + w3_instances = [] + archive_instances = [] + for (url, archive) in itertools.chain(((url, False) for url in rpc_urls), ((url, True) for url in archive_urls)): + connector = TCPConnector(limit=config.concurrent_rpc_connections) + session = ClientSession(connector=connector, timeout=ClientTimeout(config.rpc_timeout)) + http_provider = RetryHTTPProvider(url) + await http_provider.cache_async_session(session) + w3 = AsyncWeb3(http_provider) + w3.middleware_onion.add(archive_intercept_middleware, 'block_number_intercept_middleware') + w3.middleware_onion.remove('attrdict') + w3.middleware_onion.add(clean_input_async, 'clean_input') + w3.eth.Contract = _make_contract(w3.eth) + # Highest block number that has reported a -32000 error indicating a lack of history that far back + w3.archive_fault_height = -1 + if autosign: + if account is NARG: + account = Account.get() + if account is not None: + # noinspection PyTypeChecker + w3.middleware_onion.add(await async_construct_sign_and_send_raw_middleware(account)) + w3.eth.default_account = account.address + if archive: + archive_instances.append(w3) + else: + w3_instances.append(w3) + + # Ensure all instances share the same chain ID + chain_ids = await asyncio.gather(*(w3.eth.chain_id for w3 in itertools.chain(w3_instances, archive_instances))) + if len(set(chain_ids)) != 1: + raise RuntimeError("All RPC URLs must belong to the same blockchain") + + # noinspection PyTypeChecker + return RoundRobinWebProxy(w3_instances, archive_instances) if len(w3_instances) > 1 or archive_instances else w3_instances[0] async def create_w3_ws(ws_url=None) -> AsyncWeb3: @@ -80,6 +109,30 @@ async def create_w3_ws(ws_url=None) -> AsyncWeb3: return w3 +def resolve_rpc_url(rpc_url=None): + if rpc_url is None: + rpc_url = config.rpc_url + if rpc_url == 'test': + return 'http://localhost:8545' + try: + return config.rpc_urls[rpc_url] # look up aliases + except KeyError: + pass + return rpc_url + + +def resolve_ws_url(ws_url=None): + if ws_url is None: + ws_url = config.ws_url + if ws_url == 'test': + return 'ws://localhost:8545' + try: + return config.rpc_urls[ws_url] # look up aliases + except KeyError: + pass + return ws_url + + def _clean(obj): if type(obj) is HexBytes: return bytes(obj) @@ -123,7 +176,118 @@ def _make_contract(w3_eth): return f -log = logging.getLogger(__name__) +# +# ARCHIVE NODE MANAGEMENT +# + +# Regular RPC nodes do not necessarily have the full state history available. The code below tracks the block heights +# of historical data lookup failures on rpc nodes and automatically retries failed history requests on an archive_rpc +# node. Archive RPC nodes are otherwise not used unless they are also listed in the rpc_url config. + +# Define methods that may carry a `block_identifier` parameter, +# along with the required number of arguments to include it. +ARCHIVE_METHODS = { + # Examples: + "eth_getBalance": 2, # e.g., get_balance(address, block_identifier) + "eth_call": 2, # e.g., contract.call(params, block_identifier) + "eth_getStorageAt": 3, # e.g., get_storage_at(address, position, block_identifier) + # Add more methods as needed +} + +ARCHIVE_ERRORS = { + -32000, +} + +async def archive_intercept_middleware(make_request, w3): + """ + Middleware to intercept any call with `block_number` and manage marking archive_fault_height + """ + async def middleware(method, params): + # Only intercept relevant methods + expected_args = ARCHIVE_METHODS.get(method,-1) + is_archive_method = len(params) == expected_args + if is_archive_method: + block_identifier = params[-1] + block_identifier = int(block_identifier, 16) if type(block_identifier) is str else int(params[-1]) # we don't expect hex strings + if block_identifier <= w3.archive_fault_height: + # this block is at least as old as another block that already failed to fetch history from this RPC + raise ArchiveException(method, block_identifier) + resp = await make_request(method, params) + if is_archive_method and 'error' in resp and resp['error']['code'] in ARCHIVE_ERRORS: + # noinspection PyUnboundLocalVariable + w3.archive_fault_height = max(w3.archive_fault_height, block_identifier) + raise ArchiveException(method, block_identifier) + resp = await make_request(method, params) + return resp + + + return middleware + + +class ArchiveException (Exception): + def __init__(self, method, block_number): + super().__init__(f"Archive fault for method {method} at block {block_number}", block_number) + self.method = method + self.block_number = block_number + + +class RoundRobinWebProxy: + def __init__(self, w3_instances, archive_instances): + if not w3_instances: + raise ValueError("At least one w3 instance is required") + self._w3_instances = w3_instances + self._archive_instances = archive_instances + self._index = 0 + self._archive_index = 0 + for w3 in self._w3_instances: + w3.manager.coro_request = self.make_coro_request_function(w3) + + def __getattr__(self, name): + # proxy in a round-robin fashion + return getattr(self._current(), name) + + def _current(self): + if self._index >= len(self._w3_instances): + self._index = 0 + current_instance = self._w3_instances[self._index] + self._index += 1 + return current_instance + + def _current_archive(self): + if self._archive_index >= len(self._archive_instances): + self._archive_index = 0 + current_instance = self._archive_instances[self._archive_index] + self._archive_index += 1 + return current_instance + + + def make_coro_request_function(rrwp, w3): + # This replaces w3.manager.coro_request with our own version that catches ArchiveExceptions and retries them + # on an archive w3 + + ### NOTE!!! ### + # we use `self` to mean the RequestManager so we can copy that code directly over here. + # instead we rename the RoundRobinWebProxy rrwp and name the w3 instance too + self = w3.manager + + async def RequestManager__coro_request( + method: Union[RPCEndpoint, Callable[..., RPCEndpoint]], + params: Any, + error_formatters: Optional[Callable[..., Any]] = None, + null_result_formatters: Optional[Callable[..., Any]] = None, + ) -> Any: + """ + Coroutine for making a request using the provider + """ + try: + response = await self._coro_make_request(method, params) + return self.formatted_response( + response, params, error_formatters, null_result_formatters + ) + except ArchiveException as e: + w3.archive_fault_height = max(w3.archive_fault_height, e.block_number) + return await rrwp._current_archive().manager.coro_request(method, params, error_formatters, null_result_formatters) + return RequestManager__coro_request class RetryHTTPProvider (AsyncHTTPProvider): @@ -141,7 +305,7 @@ class RetryHTTPProvider (AsyncHTTPProvider): try: async with self.in_flight: await self.rate_allowed.wait() - # log.debug(f'Requesting RPC call {method}') + log.debug(f'Requesting RPC call {method}') return await super().make_request(method, params) except ClientResponseError as e: if e.status != 429: diff --git a/src/dexorder/configuration/__init__.py b/src/dexorder/configuration/__init__.py index a319d02..64e0501 100644 --- a/src/dexorder/configuration/__init__.py +++ b/src/dexorder/configuration/__init__.py @@ -1,3 +1,2 @@ from .standard_accounts import test_accounts from .load import config, parse_args -from .resolve import resolve_rpc_url diff --git a/src/dexorder/configuration/load.py b/src/dexorder/configuration/load.py index dd387ac..c0814b7 100644 --- a/src/dexorder/configuration/load.py +++ b/src/dexorder/configuration/load.py @@ -69,7 +69,7 @@ class ConfigDict (Config, DictConfig): # give type hints from Config plus method pass # Special command-line argument handling to get a config file. The -c/--config flag MUST BE FIRST. -if sys.argv[1] == '-c' or sys.argv[1] == '--config': +if len(sys.argv) > 1 and (sys.argv[1] == '-c' or sys.argv[1] == '--config'): if len(sys.argv) < 3: raise ConfigException('Missing config file argument') else: diff --git a/src/dexorder/configuration/resolve.py b/src/dexorder/configuration/resolve.py index 90d8eff..e57a115 100644 --- a/src/dexorder/configuration/resolve.py +++ b/src/dexorder/configuration/resolve.py @@ -1,25 +1,3 @@ from .load import config -def resolve_rpc_url(rpc_url=None): - if rpc_url is None: - rpc_url = config.rpc_url - if rpc_url == 'test': - return 'http://localhost:8545' - try: - return config.rpc_urls[rpc_url] # look up aliases - except KeyError: - pass - return rpc_url - - -def resolve_ws_url(ws_url=None): - if ws_url is None: - ws_url = config.ws_url - if ws_url == 'test': - return 'ws://localhost:8545' - try: - return config.rpc_urls[ws_url] # look up aliases - except KeyError: - pass - return ws_url diff --git a/src/dexorder/configuration/schema.py b/src/dexorder/configuration/schema.py index 8529096..3c3bc3c 100644 --- a/src/dexorder/configuration/schema.py +++ b/src/dexorder/configuration/schema.py @@ -11,14 +11,15 @@ from typing import Optional class Config: confirms: Optional[int] = None # number of blocks before data is considered finalized. if None then the chain's default setting is used batch_size: Optional[int] = None # max number of blocks to query in a single backfill rpc request - rpc_url: str = 'http://localhost:8545' + rpc_url: str = 'http://localhost:8545' # may be a comma-separated list. may include names of entries in rpc_urls. + archive_url: str = '' # these rpc URL's are not used unless a query uses an old block number that prior to what the currently-assigned rpc_url can provide ws_url: Optional[str] = 'ws://localhost:8545' rpc_urls: Optional[dict[str,str]] = field(default_factory=dict) db_url: Optional[str] = 'postgresql://dexorder:redroxed@localhost/dexorder' dump_sql: bool = False redis_url: Optional[str] = 'redis://localhost:6379' - metrics_port: int = 9090 + metrics_port: Optional[int] = None cache_blocks_in_db: bool = False metadata: Optional[str] = None diff --git a/src/dexorder/tokens.py b/src/dexorder/tokens.py index 3a7d683..4c82aeb 100644 --- a/src/dexorder/tokens.py +++ b/src/dexorder/tokens.py @@ -7,7 +7,7 @@ from web3.exceptions import BadFunctionCallOutput from dexorder import ADDRESS_0, db, NATIVE_TOKEN, dec, current_w3 from dexorder.addrmeta import address_metadata from dexorder.base.chain import current_chain -from dexorder.blockstate.fork import current_fork +from dexorder.blocks import current_block from dexorder.contract import ERC20, ContractProxy, CONTRACT_ERRORS from dexorder.database.model import Token from dexorder.database.model.token import OldTokenDict @@ -36,7 +36,7 @@ async def get_erc20_balance(addr, token_addr, *, adjust_decimals=True): # noinspection PyShadowingNames async def get_native_balance(addr, *, adjust_decimals=True) -> dec: try: - block_id = current_fork.get().height + block_id = current_block.get().height except LookupError: block_id = 'latest' value = dec(await current_w3.get().eth.get_balance(addr, block_identifier=block_id))