roundrobin/archive connection fix; job cleanup fix; mirror fix

This commit is contained in:
tim
2025-02-25 19:00:31 -04:00
parent afb1ee49a4
commit 979f31dfe0
5 changed files with 97 additions and 108 deletions

View File

@@ -34,7 +34,7 @@ def alert_pagerduty(title, message, dedup_key, log_level):
if pagerduty_session is None: if pagerduty_session is None:
pagerduty_session = pdpyras.EventsAPISession(config.pagerduty) pagerduty_session = pdpyras.EventsAPISession(config.pagerduty)
hostname = socket.gethostname() hostname = socket.gethostname()
sev = 'error' if log_level >= logging.ERROR else 'warning' sev = 'critical' if log_level >= logging.ERROR else 'info'
pagerduty_session.trigger(title, hostname, severity=sev, custom_details={'message': message}, dedup_key=dedup_key) pagerduty_session.trigger(title, hostname, severity=sev, custom_details={'message': message}, dedup_key=dedup_key)
except Exception: except Exception:
log.warning('Could not notify PagerDuty!', exc_info=True) log.warning('Could not notify PagerDuty!', exc_info=True)

View File

@@ -20,7 +20,7 @@ from dexorder.memcache.memcache_state import RedisState, publish_all
from dexorder.order.executionhandler import handle_dexorderexecutions, execute_tranches from dexorder.order.executionhandler import handle_dexorderexecutions, execute_tranches
from dexorder.order.triggers import activate_orders, end_trigger_updates from dexorder.order.triggers import activate_orders, end_trigger_updates
from dexorder.runner import BlockStateRunner from dexorder.runner import BlockStateRunner
from dexorder.transactions import handle_transaction_receipts from dexorder.transactions import handle_transaction_receipts, cleanup_jobs
from dexorder.vaultcreationhandler import handle_vault_creation_requests from dexorder.vaultcreationhandler import handle_vault_creation_requests
log = logging.getLogger('dexorder') log = logging.getLogger('dexorder')
@@ -61,6 +61,7 @@ def setup_logevent_triggers(runner):
runner.add_callback(check_activate_orders) runner.add_callback(check_activate_orders)
runner.add_callback(init) runner.add_callback(init)
runner.add_event_trigger(handle_transaction_receipts)
runner.add_event_trigger(handle_vault_created, get_contract_event('Vault', 'VaultCreated')) runner.add_event_trigger(handle_vault_created, get_contract_event('Vault', 'VaultCreated'))
runner.add_event_trigger(handle_vault_impl_changed, get_contract_event('Vault', 'VaultImplChanged')) runner.add_event_trigger(handle_vault_impl_changed, get_contract_event('Vault', 'VaultImplChanged'))
runner.add_event_trigger(handle_order_placed, get_contract_event('VaultImpl', 'DexorderSwapPlaced')) runner.add_event_trigger(handle_order_placed, get_contract_event('VaultImpl', 'DexorderSwapPlaced'))
@@ -70,12 +71,12 @@ def setup_logevent_triggers(runner):
runner.add_event_trigger(handle_order_canceled, get_contract_event('VaultImpl', 'DexorderSwapCanceled')) runner.add_event_trigger(handle_order_canceled, get_contract_event('VaultImpl', 'DexorderSwapCanceled'))
runner.add_event_trigger(handle_order_cancel_all, get_contract_event('VaultImpl', 'DexorderCancelAll')) runner.add_event_trigger(handle_order_cancel_all, get_contract_event('VaultImpl', 'DexorderCancelAll'))
runner.add_event_trigger(handle_transaction_receipts)
runner.add_event_trigger(handle_dexorderexecutions, executions) runner.add_event_trigger(handle_dexorderexecutions, executions)
runner.add_event_trigger(handle_vault_creation_requests) runner.add_event_trigger(handle_vault_creation_requests)
runner.add_callback(end_trigger_updates) runner.add_callback(end_trigger_updates)
runner.add_callback(execute_tranches) runner.add_callback(execute_tranches)
runner.add_callback(cleanup_jobs)
# fee adjustments are handled offline by batch jobs # fee adjustments are handled offline by batch jobs
# runner.add_event_trigger(handle_fee_limits_changed, get_contract_event('IFeeManager', 'FeeLimitsChanged')) # runner.add_event_trigger(handle_fee_limits_changed, get_contract_event('IFeeManager', 'FeeLimitsChanged'))

View File

@@ -119,7 +119,7 @@ async def main():
delay = max(0.010, config.polling) delay = max(0.010, config.polling)
update_once = config.polling <= 0 update_once = config.polling <= 0
global source_w3 global source_w3
source_w3 = await create_w3(config.mirror_source_rpc_url, name='source', autosign=False) source_w3 = await create_w3(config.mirror_source_rpc_url, name='source', autosign=False, archive_url=[])
pools = (config.mirror_pools or []) pools = (config.mirror_pools or [])
if not pools: if not pools:
log.error('must configure mirror_pools') log.error('must configure mirror_pools')

View File

@@ -2,7 +2,7 @@ import asyncio
import itertools import itertools
import logging import logging
from random import random from random import random
from typing import Any, Optional, Union, Callable from typing import Any, Optional, Union
# noinspection PyPackageRequirements # noinspection PyPackageRequirements
from aiohttp import ClientResponseError, ClientSession, ClientTimeout, TCPConnector from aiohttp import ClientResponseError, ClientSession, ClientTimeout, TCPConnector
@@ -10,6 +10,7 @@ from eth_typing import URI
from hexbytes import HexBytes from hexbytes import HexBytes
from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider
from web3.middleware.signing import async_construct_sign_and_send_raw_middleware from web3.middleware.signing import async_construct_sign_and_send_raw_middleware
from web3.providers.async_base import AsyncJSONBaseProvider
from web3.types import RPCEndpoint, RPCResponse from web3.types import RPCEndpoint, RPCResponse
from .. import current_w3, Blockchain, config, Account, NARG from .. import current_w3, Blockchain, config, Account, NARG
@@ -54,40 +55,22 @@ async def create_w3(rpc_url: Union[str,list[str]]=None, account=NARG, autosign=T
if not rpc_urls: if not rpc_urls:
raise ValueError("No rpc_url configured") raise ValueError("No rpc_url configured")
w3_instances = []
archive_instances = []
for (url, archive) in itertools.chain(((url, False) for url in rpc_urls), ((url, True) for url in archive_urls)):
connector = TCPConnector(limit=config.concurrent_rpc_connections)
session = ClientSession(connector=connector, timeout=ClientTimeout(config.rpc_timeout))
http_provider = RetryHTTPProvider(url)
await http_provider.cache_async_session(session)
w3 = AsyncWeb3(http_provider)
if archive_urls:
w3.middleware_onion.add(archive_intercept_middleware, 'block_number_intercept_middleware')
w3.middleware_onion.remove('attrdict')
w3.middleware_onion.add(clean_input_async, 'clean_input')
w3.eth.Contract = _make_contract(w3.eth)
# Highest block number that has reported a -32000 error indicating a lack of history that far back
w3.archive_fault_height = -1
if autosign:
if account is NARG:
account = Account.get()
if account is not None:
# noinspection PyTypeChecker
w3.middleware_onion.add(await async_construct_sign_and_send_raw_middleware(account))
w3.eth.default_account = account.address
if archive:
archive_instances.append(w3)
else:
w3_instances.append(w3)
# Ensure all instances share the same chain ID provider = await RoundRobinHTTPProvider.construct(rpc_urls, archive_urls) if len(rpc_urls) > 1 or archive_urls else await RetryHTTPProvider.construct(rpc_urls[0])
chain_ids = await asyncio.gather(*(w3.eth.chain_id for w3 in itertools.chain(w3_instances, archive_instances))) w3 = AsyncWeb3(provider)
if len(set(chain_ids)) != 1: if archive_urls:
raise RuntimeError("All RPC URLs must belong to the same blockchain") w3.middleware_onion.add(archive_intercept_middleware, 'block_number_intercept_middleware')
w3.middleware_onion.remove('attrdict')
# noinspection PyTypeChecker w3.middleware_onion.add(clean_input_async, 'clean_input')
return RoundRobinWebProxy(w3_instances, archive_instances) if len(w3_instances) > 1 or archive_instances else w3_instances[0] w3.eth.Contract = _make_contract(w3.eth)
if autosign:
if account is NARG:
account = Account.get()
if account is not None:
# noinspection PyTypeChecker
w3.middleware_onion.add(await async_construct_sign_and_send_raw_middleware(account))
w3.eth.default_account = account.address
return w3
async def create_w3_ws(ws_url=None) -> AsyncWeb3: async def create_w3_ws(ws_url=None) -> AsyncWeb3:
@@ -209,7 +192,7 @@ async def archive_intercept_middleware(make_request, w3):
block_height = None block_height = None
if is_archive_method: if is_archive_method:
block_identifier = params[-1] block_identifier = params[-1]
if block_identifier != 'latest': if block_identifier not in ('latest', 'pending',):
block_height = int(block_identifier, 16) if type(block_identifier) is str else int(params[-1]) block_height = int(block_identifier, 16) if type(block_identifier) is str else int(params[-1])
if block_height <= w3.archive_fault_height: if block_height <= w3.archive_fault_height:
# this block is at least as old as another block that already failed to fetch history from this RPC # this block is at least as old as another block that already failed to fetch history from this RPC
@@ -222,7 +205,6 @@ async def archive_intercept_middleware(make_request, w3):
# noinspection PyTypeChecker # noinspection PyTypeChecker
w3.archive_fault_height = max(w3.archive_fault_height, block_height) w3.archive_fault_height = max(w3.archive_fault_height, block_height)
raise ArchiveException(method, block_height) raise ArchiveException(method, block_height)
resp = await make_request(method, params)
return resp return resp
return middleware return middleware
@@ -235,77 +217,27 @@ class ArchiveException (Exception):
self.block_number = block_number self.block_number = block_number
class RoundRobinWebProxy:
def __init__(self, w3_instances, archive_instances):
if not w3_instances:
raise ValueError("At least one w3 instance is required")
self._w3_instances = w3_instances
self._archive_instances = archive_instances
self._index = 0
self._archive_index = 0
for w3 in self._w3_instances:
w3.manager.coro_request = self.make_coro_request_function(w3)
def __getattr__(self, name):
# proxy in a round-robin fashion
return getattr(self._current(), name)
def _current(self):
if self._index >= len(self._w3_instances):
self._index = 0
current_instance = self._w3_instances[self._index]
self._index += 1
return current_instance
def _current_archive(self):
if self._archive_index >= len(self._archive_instances):
self._archive_index = 0
current_instance = self._archive_instances[self._archive_index]
self._archive_index += 1
return current_instance
def make_coro_request_function(rrwp, w3):
# This replaces w3.manager.coro_request with our own version that catches ArchiveExceptions and retries them
# on an archive w3
### NOTE!!! ###
# we use `self` to mean the RequestManager so we can copy that code directly over here.
# instead we rename the RoundRobinWebProxy rrwp and name the w3 instance too
self = w3.manager
async def RequestManager__coro_request(
method: Union[RPCEndpoint, Callable[..., RPCEndpoint]],
params: Any,
error_formatters: Optional[Callable[..., Any]] = None,
null_result_formatters: Optional[Callable[..., Any]] = None,
) -> Any:
"""
Coroutine for making a request using the provider
"""
try:
response = await self._coro_make_request(method, params)
return self.formatted_response(
response, params, error_formatters, null_result_formatters
)
except ArchiveException as e:
w3.archive_fault_height = max(w3.archive_fault_height, e.block_number)
return await rrwp._current_archive().manager.coro_request(method, params, error_formatters, null_result_formatters)
return RequestManager__coro_request
class RetryHTTPProvider (AsyncHTTPProvider): class RetryHTTPProvider (AsyncHTTPProvider):
def __init__(self, endpoint_uri: Optional[Union[URI, str]] = None, request_kwargs: Optional[Any] = None) -> None: @staticmethod
async def construct(endpoint_uri: Optional[Union[URI, str]] = None, request_kwargs: Optional[Any] = None, retries: int = 10):
result = RetryHTTPProvider(endpoint_uri, request_kwargs, retries)
connector = TCPConnector(limit=config.concurrent_rpc_connections)
session = ClientSession(connector=connector, timeout=ClientTimeout(config.rpc_timeout))
await result.cache_async_session(session)
return result
def __init__(self, endpoint_uri: Optional[Union[URI, str]] = None, request_kwargs: Optional[Any] = None, retries: int = 10) -> None:
super().__init__(endpoint_uri, request_kwargs) super().__init__(endpoint_uri, request_kwargs)
self.in_flight = asyncio.Semaphore(config.concurrent_rpc_connections) self.in_flight = asyncio.Semaphore(config.concurrent_rpc_connections)
self.rate_allowed = asyncio.Event() self.rate_allowed = asyncio.Event()
self.rate_allowed.set() self.rate_allowed.set()
self.retries = retries
async def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse: async def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse:
MAX_TRIES = 10
wait = 0 wait = 0
for _ in range(MAX_TRIES): for _ in range(self.retries):
try: try:
async with self.in_flight: async with self.in_flight:
await self.rate_allowed.wait() await self.rate_allowed.wait()
@@ -324,6 +256,55 @@ class RetryHTTPProvider (AsyncHTTPProvider):
await asyncio.sleep(wait) await asyncio.sleep(wait)
finally: finally:
self.rate_allowed.set() self.rate_allowed.set()
# finally: raise IOError(f'Could not query rpc server after {self.retries} tries: {method} {params}')
# log.debug(f'Ended request of RPC call {method}')
raise IOError(f'Could not query rpc server after {MAX_TRIES} tries: {method} {params}')
class RoundRobinHTTPProvider (AsyncJSONBaseProvider):
@staticmethod
async def construct(endpoint_uris: list[str], archive_uris: list[str]):
providers = [RetryHTTPProvider.construct(uri, retries=1) for uri in endpoint_uris]
archive_providers = [RetryHTTPProvider.construct(uri, retries=1) for uri in archive_uris]
providers = await asyncio.gather(*providers)
archive_providers = await asyncio.gather(*archive_providers)
# Ensure all instances share the same chain ID
chain_ids = await asyncio.gather(*(AsyncWeb3(provider).eth.chain_id for provider in itertools.chain(providers, archive_providers)))
if len(set(chain_ids)) != 1:
raise RuntimeError("All RPC URLs must belong to the same blockchain")
return RoundRobinHTTPProvider(providers, archive_providers)
def __init__(self, providers: list[RetryHTTPProvider], archive_providers: list[RetryHTTPProvider]):
super().__init__()
self.providers = providers
self.archive_providers = archive_providers
for provider in self.providers:
provider.archive_fault_height = 0
self.index = 0
self.archive_index = 0
async def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse:
provider = self._current()
try:
return await provider.make_request(method, params)
except ArchiveException as e:
provider.archive_fault_height = max(provider.archive_fault_height, e.block_number)
if not self.archive_providers:
raise
return await self._current_archive().make_request(method, params)
def _current(self) -> RetryHTTPProvider:
if self.index >= len(self.providers):
self.index = 0
current_provider = self.providers[self.index]
self.index += 1
return current_provider
def _current_archive(self) -> RetryHTTPProvider:
if self.archive_index >= len(self.archive_providers):
self.archive_index = 0
current_provider = self.archive_providers[self.archive_index]
self.archive_index += 1
return current_provider

View File

@@ -106,7 +106,7 @@ async def create_and_send_transactions():
w3 = current_w3.get() w3 = current_w3.get()
account = await handler.acquire_account() account = await handler.acquire_account()
if account is None: if account is None:
log.warning(f'No account available for job {job.id} type "{handler.tag}"') warningAlert(f'No account available for job {job.id} type "{handler.tag}"', 'no account available')
continue continue
await ctx.sign(account) await ctx.sign(account)
log.info(f'servicing job {job.request.__class__.__name__} {job.id} with account {account.address} nonce {ctx.tx["nonce"]} tx {ctx.id}') log.info(f'servicing job {job.request.__class__.__name__} {job.id} with account {account.address} nonce {ctx.tx["nonce"]} tx {ctx.id}')
@@ -140,6 +140,7 @@ async def create_and_send_transactions():
job.tx_data = ctx.data job.tx_data = ctx.data
assert sent == job.tx_id assert sent == job.tx_id
ended_jobs = []
async def handle_transaction_receipts(): async def handle_transaction_receipts():
# log.debug('handle_transaction_receipts') # log.debug('handle_transaction_receipts')
@@ -174,6 +175,12 @@ async def handle_transaction_receipts():
def end_job(job): def end_job(job):
in_flight.discard((job.request.type, job.request.key)) ended_jobs.append(job)
db.session.delete(job)
def cleanup_jobs():
for job in ended_jobs:
log.debug(f'ending job {job.id}')
in_flight.discard((job.request.type, job.request.key))
db.session.delete(job)
ended_jobs.clear()