From 979f31dfe02e49c6301373319773e12074cf5d0f Mon Sep 17 00:00:00 2001 From: tim Date: Tue, 25 Feb 2025 19:00:31 -0400 Subject: [PATCH] roundrobin/archive connection fix; job cleanup fix; mirror fix --- src/dexorder/alert.py | 2 +- src/dexorder/bin/main.py | 5 +- src/dexorder/bin/mirror.py | 2 +- src/dexorder/blockchain/connection.py | 183 ++++++++++++-------------- src/dexorder/transactions.py | 13 +- 5 files changed, 97 insertions(+), 108 deletions(-) diff --git a/src/dexorder/alert.py b/src/dexorder/alert.py index d46ac07..1c0af2e 100644 --- a/src/dexorder/alert.py +++ b/src/dexorder/alert.py @@ -34,7 +34,7 @@ def alert_pagerduty(title, message, dedup_key, log_level): if pagerduty_session is None: pagerduty_session = pdpyras.EventsAPISession(config.pagerduty) hostname = socket.gethostname() - sev = 'error' if log_level >= logging.ERROR else 'warning' + sev = 'critical' if log_level >= logging.ERROR else 'info' pagerduty_session.trigger(title, hostname, severity=sev, custom_details={'message': message}, dedup_key=dedup_key) except Exception: log.warning('Could not notify PagerDuty!', exc_info=True) diff --git a/src/dexorder/bin/main.py b/src/dexorder/bin/main.py index 1e980aa..50645fc 100644 --- a/src/dexorder/bin/main.py +++ b/src/dexorder/bin/main.py @@ -20,7 +20,7 @@ from dexorder.memcache.memcache_state import RedisState, publish_all from dexorder.order.executionhandler import handle_dexorderexecutions, execute_tranches from dexorder.order.triggers import activate_orders, end_trigger_updates from dexorder.runner import BlockStateRunner -from dexorder.transactions import handle_transaction_receipts +from dexorder.transactions import handle_transaction_receipts, cleanup_jobs from dexorder.vaultcreationhandler import handle_vault_creation_requests log = logging.getLogger('dexorder') @@ -61,6 +61,7 @@ def setup_logevent_triggers(runner): runner.add_callback(check_activate_orders) runner.add_callback(init) + runner.add_event_trigger(handle_transaction_receipts) runner.add_event_trigger(handle_vault_created, get_contract_event('Vault', 'VaultCreated')) runner.add_event_trigger(handle_vault_impl_changed, get_contract_event('Vault', 'VaultImplChanged')) runner.add_event_trigger(handle_order_placed, get_contract_event('VaultImpl', 'DexorderSwapPlaced')) @@ -70,12 +71,12 @@ def setup_logevent_triggers(runner): runner.add_event_trigger(handle_order_canceled, get_contract_event('VaultImpl', 'DexorderSwapCanceled')) runner.add_event_trigger(handle_order_cancel_all, get_contract_event('VaultImpl', 'DexorderCancelAll')) - runner.add_event_trigger(handle_transaction_receipts) runner.add_event_trigger(handle_dexorderexecutions, executions) runner.add_event_trigger(handle_vault_creation_requests) runner.add_callback(end_trigger_updates) runner.add_callback(execute_tranches) + runner.add_callback(cleanup_jobs) # fee adjustments are handled offline by batch jobs # runner.add_event_trigger(handle_fee_limits_changed, get_contract_event('IFeeManager', 'FeeLimitsChanged')) diff --git a/src/dexorder/bin/mirror.py b/src/dexorder/bin/mirror.py index 88b678a..98ee58b 100644 --- a/src/dexorder/bin/mirror.py +++ b/src/dexorder/bin/mirror.py @@ -119,7 +119,7 @@ async def main(): delay = max(0.010, config.polling) update_once = config.polling <= 0 global source_w3 - source_w3 = await create_w3(config.mirror_source_rpc_url, name='source', autosign=False) + source_w3 = await create_w3(config.mirror_source_rpc_url, name='source', autosign=False, archive_url=[]) pools = (config.mirror_pools or []) if not pools: log.error('must configure mirror_pools') diff --git a/src/dexorder/blockchain/connection.py b/src/dexorder/blockchain/connection.py index 25aedb5..caba6e0 100644 --- a/src/dexorder/blockchain/connection.py +++ b/src/dexorder/blockchain/connection.py @@ -2,7 +2,7 @@ import asyncio import itertools import logging from random import random -from typing import Any, Optional, Union, Callable +from typing import Any, Optional, Union # noinspection PyPackageRequirements from aiohttp import ClientResponseError, ClientSession, ClientTimeout, TCPConnector @@ -10,6 +10,7 @@ from eth_typing import URI from hexbytes import HexBytes from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider from web3.middleware.signing import async_construct_sign_and_send_raw_middleware +from web3.providers.async_base import AsyncJSONBaseProvider from web3.types import RPCEndpoint, RPCResponse from .. import current_w3, Blockchain, config, Account, NARG @@ -54,40 +55,22 @@ async def create_w3(rpc_url: Union[str,list[str]]=None, account=NARG, autosign=T if not rpc_urls: raise ValueError("No rpc_url configured") - w3_instances = [] - archive_instances = [] - for (url, archive) in itertools.chain(((url, False) for url in rpc_urls), ((url, True) for url in archive_urls)): - connector = TCPConnector(limit=config.concurrent_rpc_connections) - session = ClientSession(connector=connector, timeout=ClientTimeout(config.rpc_timeout)) - http_provider = RetryHTTPProvider(url) - await http_provider.cache_async_session(session) - w3 = AsyncWeb3(http_provider) - if archive_urls: - w3.middleware_onion.add(archive_intercept_middleware, 'block_number_intercept_middleware') - w3.middleware_onion.remove('attrdict') - w3.middleware_onion.add(clean_input_async, 'clean_input') - w3.eth.Contract = _make_contract(w3.eth) - # Highest block number that has reported a -32000 error indicating a lack of history that far back - w3.archive_fault_height = -1 - if autosign: - if account is NARG: - account = Account.get() - if account is not None: - # noinspection PyTypeChecker - w3.middleware_onion.add(await async_construct_sign_and_send_raw_middleware(account)) - w3.eth.default_account = account.address - if archive: - archive_instances.append(w3) - else: - w3_instances.append(w3) - # Ensure all instances share the same chain ID - chain_ids = await asyncio.gather(*(w3.eth.chain_id for w3 in itertools.chain(w3_instances, archive_instances))) - if len(set(chain_ids)) != 1: - raise RuntimeError("All RPC URLs must belong to the same blockchain") - - # noinspection PyTypeChecker - return RoundRobinWebProxy(w3_instances, archive_instances) if len(w3_instances) > 1 or archive_instances else w3_instances[0] + provider = await RoundRobinHTTPProvider.construct(rpc_urls, archive_urls) if len(rpc_urls) > 1 or archive_urls else await RetryHTTPProvider.construct(rpc_urls[0]) + w3 = AsyncWeb3(provider) + if archive_urls: + w3.middleware_onion.add(archive_intercept_middleware, 'block_number_intercept_middleware') + w3.middleware_onion.remove('attrdict') + w3.middleware_onion.add(clean_input_async, 'clean_input') + w3.eth.Contract = _make_contract(w3.eth) + if autosign: + if account is NARG: + account = Account.get() + if account is not None: + # noinspection PyTypeChecker + w3.middleware_onion.add(await async_construct_sign_and_send_raw_middleware(account)) + w3.eth.default_account = account.address + return w3 async def create_w3_ws(ws_url=None) -> AsyncWeb3: @@ -209,7 +192,7 @@ async def archive_intercept_middleware(make_request, w3): block_height = None if is_archive_method: block_identifier = params[-1] - if block_identifier != 'latest': + if block_identifier not in ('latest', 'pending',): block_height = int(block_identifier, 16) if type(block_identifier) is str else int(params[-1]) if block_height <= w3.archive_fault_height: # this block is at least as old as another block that already failed to fetch history from this RPC @@ -222,7 +205,6 @@ async def archive_intercept_middleware(make_request, w3): # noinspection PyTypeChecker w3.archive_fault_height = max(w3.archive_fault_height, block_height) raise ArchiveException(method, block_height) - resp = await make_request(method, params) return resp return middleware @@ -235,77 +217,27 @@ class ArchiveException (Exception): self.block_number = block_number -class RoundRobinWebProxy: - def __init__(self, w3_instances, archive_instances): - if not w3_instances: - raise ValueError("At least one w3 instance is required") - self._w3_instances = w3_instances - self._archive_instances = archive_instances - self._index = 0 - self._archive_index = 0 - for w3 in self._w3_instances: - w3.manager.coro_request = self.make_coro_request_function(w3) - - def __getattr__(self, name): - # proxy in a round-robin fashion - return getattr(self._current(), name) - - def _current(self): - if self._index >= len(self._w3_instances): - self._index = 0 - current_instance = self._w3_instances[self._index] - self._index += 1 - return current_instance - - def _current_archive(self): - if self._archive_index >= len(self._archive_instances): - self._archive_index = 0 - current_instance = self._archive_instances[self._archive_index] - self._archive_index += 1 - return current_instance - - - def make_coro_request_function(rrwp, w3): - # This replaces w3.manager.coro_request with our own version that catches ArchiveExceptions and retries them - # on an archive w3 - - ### NOTE!!! ### - # we use `self` to mean the RequestManager so we can copy that code directly over here. - # instead we rename the RoundRobinWebProxy rrwp and name the w3 instance too - self = w3.manager - - async def RequestManager__coro_request( - method: Union[RPCEndpoint, Callable[..., RPCEndpoint]], - params: Any, - error_formatters: Optional[Callable[..., Any]] = None, - null_result_formatters: Optional[Callable[..., Any]] = None, - ) -> Any: - """ - Coroutine for making a request using the provider - """ - try: - response = await self._coro_make_request(method, params) - return self.formatted_response( - response, params, error_formatters, null_result_formatters - ) - except ArchiveException as e: - w3.archive_fault_height = max(w3.archive_fault_height, e.block_number) - return await rrwp._current_archive().manager.coro_request(method, params, error_formatters, null_result_formatters) - return RequestManager__coro_request - - class RetryHTTPProvider (AsyncHTTPProvider): - def __init__(self, endpoint_uri: Optional[Union[URI, str]] = None, request_kwargs: Optional[Any] = None) -> None: + @staticmethod + async def construct(endpoint_uri: Optional[Union[URI, str]] = None, request_kwargs: Optional[Any] = None, retries: int = 10): + result = RetryHTTPProvider(endpoint_uri, request_kwargs, retries) + connector = TCPConnector(limit=config.concurrent_rpc_connections) + session = ClientSession(connector=connector, timeout=ClientTimeout(config.rpc_timeout)) + await result.cache_async_session(session) + return result + + def __init__(self, endpoint_uri: Optional[Union[URI, str]] = None, request_kwargs: Optional[Any] = None, retries: int = 10) -> None: super().__init__(endpoint_uri, request_kwargs) + self.in_flight = asyncio.Semaphore(config.concurrent_rpc_connections) self.rate_allowed = asyncio.Event() self.rate_allowed.set() + self.retries = retries async def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse: - MAX_TRIES = 10 wait = 0 - for _ in range(MAX_TRIES): + for _ in range(self.retries): try: async with self.in_flight: await self.rate_allowed.wait() @@ -324,6 +256,55 @@ class RetryHTTPProvider (AsyncHTTPProvider): await asyncio.sleep(wait) finally: self.rate_allowed.set() - # finally: - # log.debug(f'Ended request of RPC call {method}') - raise IOError(f'Could not query rpc server after {MAX_TRIES} tries: {method} {params}') + raise IOError(f'Could not query rpc server after {self.retries} tries: {method} {params}') + + +class RoundRobinHTTPProvider (AsyncJSONBaseProvider): + + @staticmethod + async def construct(endpoint_uris: list[str], archive_uris: list[str]): + providers = [RetryHTTPProvider.construct(uri, retries=1) for uri in endpoint_uris] + archive_providers = [RetryHTTPProvider.construct(uri, retries=1) for uri in archive_uris] + providers = await asyncio.gather(*providers) + archive_providers = await asyncio.gather(*archive_providers) + + # Ensure all instances share the same chain ID + chain_ids = await asyncio.gather(*(AsyncWeb3(provider).eth.chain_id for provider in itertools.chain(providers, archive_providers))) + if len(set(chain_ids)) != 1: + raise RuntimeError("All RPC URLs must belong to the same blockchain") + + return RoundRobinHTTPProvider(providers, archive_providers) + + + def __init__(self, providers: list[RetryHTTPProvider], archive_providers: list[RetryHTTPProvider]): + super().__init__() + self.providers = providers + self.archive_providers = archive_providers + for provider in self.providers: + provider.archive_fault_height = 0 + self.index = 0 + self.archive_index = 0 + + async def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse: + provider = self._current() + try: + return await provider.make_request(method, params) + except ArchiveException as e: + provider.archive_fault_height = max(provider.archive_fault_height, e.block_number) + if not self.archive_providers: + raise + return await self._current_archive().make_request(method, params) + + def _current(self) -> RetryHTTPProvider: + if self.index >= len(self.providers): + self.index = 0 + current_provider = self.providers[self.index] + self.index += 1 + return current_provider + + def _current_archive(self) -> RetryHTTPProvider: + if self.archive_index >= len(self.archive_providers): + self.archive_index = 0 + current_provider = self.archive_providers[self.archive_index] + self.archive_index += 1 + return current_provider diff --git a/src/dexorder/transactions.py b/src/dexorder/transactions.py index 8318004..5fb8686 100644 --- a/src/dexorder/transactions.py +++ b/src/dexorder/transactions.py @@ -106,7 +106,7 @@ async def create_and_send_transactions(): w3 = current_w3.get() account = await handler.acquire_account() if account is None: - log.warning(f'No account available for job {job.id} type "{handler.tag}"') + warningAlert(f'No account available for job {job.id} type "{handler.tag}"', 'no account available') continue await ctx.sign(account) log.info(f'servicing job {job.request.__class__.__name__} {job.id} with account {account.address} nonce {ctx.tx["nonce"]} tx {ctx.id}') @@ -140,6 +140,7 @@ async def create_and_send_transactions(): job.tx_data = ctx.data assert sent == job.tx_id +ended_jobs = [] async def handle_transaction_receipts(): # log.debug('handle_transaction_receipts') @@ -174,6 +175,12 @@ async def handle_transaction_receipts(): def end_job(job): - in_flight.discard((job.request.type, job.request.key)) - db.session.delete(job) + ended_jobs.append(job) + +def cleanup_jobs(): + for job in ended_jobs: + log.debug(f'ending job {job.id}') + in_flight.discard((job.request.type, job.request.key)) + db.session.delete(job) + ended_jobs.clear()