Compare commits

..

6 Commits

Author SHA1 Message Date
tim
473e0ec516 log tweak 2025-02-25 19:08:13 -04:00
tim
979f31dfe0 roundrobin/archive connection fix; job cleanup fix; mirror fix 2025-02-25 19:00:31 -04:00
tim
afb1ee49a4 transaction handling touchups 2025-02-25 09:57:56 -04:00
tim
8b541bd76d metrics fixes 2025-02-24 22:15:05 -04:00
tim
04d7686c30 backend metrics port/svc 2025-02-24 21:31:17 -04:00
tim
603dd64dc4 log tweak 2025-02-24 21:00:53 -04:00
10 changed files with 130 additions and 124 deletions

View File

@@ -2,5 +2,5 @@ rpc_url = 'arbitrum_dxod'
ws_url = 'arbitrum_dxod_ws' ws_url = 'arbitrum_dxod_ws'
archive_url = 'arbitrum_alchemy' archive_url = 'arbitrum_alchemy'
concurrent_rpc_connections=8 concurrent_rpc_connections=8
metrics_port=9001 metrics_port=9090
metadata = '' # this setting approves no tokens metadata = '' # this setting approves no tokens

View File

@@ -34,7 +34,7 @@ def alert_pagerduty(title, message, dedup_key, log_level):
if pagerduty_session is None: if pagerduty_session is None:
pagerduty_session = pdpyras.EventsAPISession(config.pagerduty) pagerduty_session = pdpyras.EventsAPISession(config.pagerduty)
hostname = socket.gethostname() hostname = socket.gethostname()
sev = 'error' if log_level >= logging.ERROR else 'warning' sev = 'critical' if log_level >= logging.ERROR else 'info'
pagerduty_session.trigger(title, hostname, severity=sev, custom_details={'message': message}, dedup_key=dedup_key) pagerduty_session.trigger(title, hostname, severity=sev, custom_details={'message': message}, dedup_key=dedup_key)
except Exception: except Exception:
log.warning('Could not notify PagerDuty!', exc_info=True) log.warning('Could not notify PagerDuty!', exc_info=True)

View File

@@ -14,13 +14,13 @@ from dexorder.contract import get_contract_event
from dexorder.contract.dexorder import get_dexorder_contract from dexorder.contract.dexorder import get_dexorder_contract
from dexorder.event_handler import (init, dump_log, handle_vault_created, handle_order_placed, from dexorder.event_handler import (init, dump_log, handle_vault_created, handle_order_placed,
handle_transfer, handle_swap_filled, handle_order_canceled, handle_order_cancel_all, handle_transfer, handle_swap_filled, handle_order_canceled, handle_order_cancel_all,
handle_uniswap_swaps, handle_vault_impl_changed, initialize_metrics) handle_uniswap_swaps, handle_vault_impl_changed, update_metrics)
from dexorder.memcache import memcache from dexorder.memcache import memcache
from dexorder.memcache.memcache_state import RedisState, publish_all from dexorder.memcache.memcache_state import RedisState, publish_all
from dexorder.order.executionhandler import handle_dexorderexecutions, execute_tranches from dexorder.order.executionhandler import handle_dexorderexecutions, execute_tranches
from dexorder.order.triggers import activate_orders, end_trigger_updates from dexorder.order.triggers import activate_orders, end_trigger_updates
from dexorder.runner import BlockStateRunner from dexorder.runner import BlockStateRunner
from dexorder.transactions import handle_transaction_receipts from dexorder.transactions import handle_transaction_receipts, cleanup_jobs
from dexorder.vaultcreationhandler import handle_vault_creation_requests from dexorder.vaultcreationhandler import handle_vault_creation_requests
log = logging.getLogger('dexorder') log = logging.getLogger('dexorder')
@@ -60,6 +60,8 @@ def setup_logevent_triggers(runner):
runner.add_callback(check_activate_orders) runner.add_callback(check_activate_orders)
runner.add_callback(init) runner.add_callback(init)
runner.add_event_trigger(handle_transaction_receipts)
runner.add_event_trigger(handle_vault_created, get_contract_event('Vault', 'VaultCreated')) runner.add_event_trigger(handle_vault_created, get_contract_event('Vault', 'VaultCreated'))
runner.add_event_trigger(handle_vault_impl_changed, get_contract_event('Vault', 'VaultImplChanged')) runner.add_event_trigger(handle_vault_impl_changed, get_contract_event('Vault', 'VaultImplChanged'))
runner.add_event_trigger(handle_order_placed, get_contract_event('VaultImpl', 'DexorderSwapPlaced')) runner.add_event_trigger(handle_order_placed, get_contract_event('VaultImpl', 'DexorderSwapPlaced'))
@@ -69,18 +71,20 @@ def setup_logevent_triggers(runner):
runner.add_event_trigger(handle_order_canceled, get_contract_event('VaultImpl', 'DexorderSwapCanceled')) runner.add_event_trigger(handle_order_canceled, get_contract_event('VaultImpl', 'DexorderSwapCanceled'))
runner.add_event_trigger(handle_order_cancel_all, get_contract_event('VaultImpl', 'DexorderCancelAll')) runner.add_event_trigger(handle_order_cancel_all, get_contract_event('VaultImpl', 'DexorderCancelAll'))
runner.add_event_trigger(handle_transaction_receipts) # todo handle only the transactions that were posted to this block
runner.add_event_trigger(handle_dexorderexecutions, executions) runner.add_event_trigger(handle_dexorderexecutions, executions)
runner.add_event_trigger(handle_vault_creation_requests) runner.add_event_trigger(handle_vault_creation_requests)
runner.add_callback(end_trigger_updates) runner.add_callback(end_trigger_updates)
runner.add_callback(execute_tranches) runner.add_callback(execute_tranches)
runner.add_callback(cleanup_jobs)
# fee adjustments are handled offline by batch jobs # fee adjustments are handled offline by batch jobs
# runner.add_event_trigger(handle_fee_limits_changed, get_contract_event('IFeeManager', 'FeeLimitsChanged')) # runner.add_event_trigger(handle_fee_limits_changed, get_contract_event('IFeeManager', 'FeeLimitsChanged'))
# runner.add_event_trigger(handle_fees_changed, get_contract_event('IFeeManager', 'FeesChanged')) # runner.add_event_trigger(handle_fees_changed, get_contract_event('IFeeManager', 'FeesChanged'))
# runner.add_callback(adjust_gas) # runner.add_callback(adjust_gas)
runner.add_callback(update_metrics)
# noinspection DuplicatedCode # noinspection DuplicatedCode
async def main(): async def main():
@@ -114,7 +118,6 @@ async def main():
await redis_state.save(state.root_fork, state.diffs_by_branch[state.root_branch.id]) await redis_state.save(state.root_fork, state.diffs_by_branch[state.root_branch.id])
await initialize_accounting() await initialize_accounting()
initialize_metrics()
runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None) runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None)
setup_logevent_triggers(runner) setup_logevent_triggers(runner)

View File

@@ -119,7 +119,7 @@ async def main():
delay = max(0.010, config.polling) delay = max(0.010, config.polling)
update_once = config.polling <= 0 update_once = config.polling <= 0
global source_w3 global source_w3
source_w3 = await create_w3(config.mirror_source_rpc_url, name='source', autosign=False) source_w3 = await create_w3(config.mirror_source_rpc_url, name='source', autosign=False, archive_url=[])
pools = (config.mirror_pools or []) pools = (config.mirror_pools or [])
if not pools: if not pools:
log.error('must configure mirror_pools') log.error('must configure mirror_pools')

View File

@@ -2,7 +2,7 @@ import asyncio
import itertools import itertools
import logging import logging
from random import random from random import random
from typing import Any, Optional, Union, Callable from typing import Any, Optional, Union
# noinspection PyPackageRequirements # noinspection PyPackageRequirements
from aiohttp import ClientResponseError, ClientSession, ClientTimeout, TCPConnector from aiohttp import ClientResponseError, ClientSession, ClientTimeout, TCPConnector
@@ -10,6 +10,7 @@ from eth_typing import URI
from hexbytes import HexBytes from hexbytes import HexBytes
from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider
from web3.middleware.signing import async_construct_sign_and_send_raw_middleware from web3.middleware.signing import async_construct_sign_and_send_raw_middleware
from web3.providers.async_base import AsyncJSONBaseProvider
from web3.types import RPCEndpoint, RPCResponse from web3.types import RPCEndpoint, RPCResponse
from .. import current_w3, Blockchain, config, Account, NARG from .. import current_w3, Blockchain, config, Account, NARG
@@ -54,40 +55,22 @@ async def create_w3(rpc_url: Union[str,list[str]]=None, account=NARG, autosign=T
if not rpc_urls: if not rpc_urls:
raise ValueError("No rpc_url configured") raise ValueError("No rpc_url configured")
w3_instances = []
archive_instances = []
for (url, archive) in itertools.chain(((url, False) for url in rpc_urls), ((url, True) for url in archive_urls)):
connector = TCPConnector(limit=config.concurrent_rpc_connections)
session = ClientSession(connector=connector, timeout=ClientTimeout(config.rpc_timeout))
http_provider = RetryHTTPProvider(url)
await http_provider.cache_async_session(session)
w3 = AsyncWeb3(http_provider)
if archive_urls:
w3.middleware_onion.add(archive_intercept_middleware, 'block_number_intercept_middleware')
w3.middleware_onion.remove('attrdict')
w3.middleware_onion.add(clean_input_async, 'clean_input')
w3.eth.Contract = _make_contract(w3.eth)
# Highest block number that has reported a -32000 error indicating a lack of history that far back
w3.archive_fault_height = -1
if autosign:
if account is NARG:
account = Account.get()
if account is not None:
# noinspection PyTypeChecker
w3.middleware_onion.add(await async_construct_sign_and_send_raw_middleware(account))
w3.eth.default_account = account.address
if archive:
archive_instances.append(w3)
else:
w3_instances.append(w3)
# Ensure all instances share the same chain ID provider = await RoundRobinHTTPProvider.construct(rpc_urls, archive_urls) if len(rpc_urls) > 1 or archive_urls else await RetryHTTPProvider.construct(rpc_urls[0])
chain_ids = await asyncio.gather(*(w3.eth.chain_id for w3 in itertools.chain(w3_instances, archive_instances))) w3 = AsyncWeb3(provider)
if len(set(chain_ids)) != 1: if archive_urls:
raise RuntimeError("All RPC URLs must belong to the same blockchain") w3.middleware_onion.add(archive_intercept_middleware, 'block_number_intercept_middleware')
w3.middleware_onion.remove('attrdict')
# noinspection PyTypeChecker w3.middleware_onion.add(clean_input_async, 'clean_input')
return RoundRobinWebProxy(w3_instances, archive_instances) if len(w3_instances) > 1 or archive_instances else w3_instances[0] w3.eth.Contract = _make_contract(w3.eth)
if autosign:
if account is NARG:
account = Account.get()
if account is not None:
# noinspection PyTypeChecker
w3.middleware_onion.add(await async_construct_sign_and_send_raw_middleware(account))
w3.eth.default_account = account.address
return w3
async def create_w3_ws(ws_url=None) -> AsyncWeb3: async def create_w3_ws(ws_url=None) -> AsyncWeb3:
@@ -209,7 +192,7 @@ async def archive_intercept_middleware(make_request, w3):
block_height = None block_height = None
if is_archive_method: if is_archive_method:
block_identifier = params[-1] block_identifier = params[-1]
if block_identifier != 'latest': if block_identifier not in ('latest', 'pending',):
block_height = int(block_identifier, 16) if type(block_identifier) is str else int(params[-1]) block_height = int(block_identifier, 16) if type(block_identifier) is str else int(params[-1])
if block_height <= w3.archive_fault_height: if block_height <= w3.archive_fault_height:
# this block is at least as old as another block that already failed to fetch history from this RPC # this block is at least as old as another block that already failed to fetch history from this RPC
@@ -222,7 +205,6 @@ async def archive_intercept_middleware(make_request, w3):
# noinspection PyTypeChecker # noinspection PyTypeChecker
w3.archive_fault_height = max(w3.archive_fault_height, block_height) w3.archive_fault_height = max(w3.archive_fault_height, block_height)
raise ArchiveException(method, block_height) raise ArchiveException(method, block_height)
resp = await make_request(method, params)
return resp return resp
return middleware return middleware
@@ -235,77 +217,27 @@ class ArchiveException (Exception):
self.block_number = block_number self.block_number = block_number
class RoundRobinWebProxy:
def __init__(self, w3_instances, archive_instances):
if not w3_instances:
raise ValueError("At least one w3 instance is required")
self._w3_instances = w3_instances
self._archive_instances = archive_instances
self._index = 0
self._archive_index = 0
for w3 in self._w3_instances:
w3.manager.coro_request = self.make_coro_request_function(w3)
def __getattr__(self, name):
# proxy in a round-robin fashion
return getattr(self._current(), name)
def _current(self):
if self._index >= len(self._w3_instances):
self._index = 0
current_instance = self._w3_instances[self._index]
self._index += 1
return current_instance
def _current_archive(self):
if self._archive_index >= len(self._archive_instances):
self._archive_index = 0
current_instance = self._archive_instances[self._archive_index]
self._archive_index += 1
return current_instance
def make_coro_request_function(rrwp, w3):
# This replaces w3.manager.coro_request with our own version that catches ArchiveExceptions and retries them
# on an archive w3
### NOTE!!! ###
# we use `self` to mean the RequestManager so we can copy that code directly over here.
# instead we rename the RoundRobinWebProxy rrwp and name the w3 instance too
self = w3.manager
async def RequestManager__coro_request(
method: Union[RPCEndpoint, Callable[..., RPCEndpoint]],
params: Any,
error_formatters: Optional[Callable[..., Any]] = None,
null_result_formatters: Optional[Callable[..., Any]] = None,
) -> Any:
"""
Coroutine for making a request using the provider
"""
try:
response = await self._coro_make_request(method, params)
return self.formatted_response(
response, params, error_formatters, null_result_formatters
)
except ArchiveException as e:
w3.archive_fault_height = max(w3.archive_fault_height, e.block_number)
return await rrwp._current_archive().manager.coro_request(method, params, error_formatters, null_result_formatters)
return RequestManager__coro_request
class RetryHTTPProvider (AsyncHTTPProvider): class RetryHTTPProvider (AsyncHTTPProvider):
def __init__(self, endpoint_uri: Optional[Union[URI, str]] = None, request_kwargs: Optional[Any] = None) -> None: @staticmethod
async def construct(endpoint_uri: Optional[Union[URI, str]] = None, request_kwargs: Optional[Any] = None, retries: int = 10):
result = RetryHTTPProvider(endpoint_uri, request_kwargs, retries)
connector = TCPConnector(limit=config.concurrent_rpc_connections)
session = ClientSession(connector=connector, timeout=ClientTimeout(config.rpc_timeout))
await result.cache_async_session(session)
return result
def __init__(self, endpoint_uri: Optional[Union[URI, str]] = None, request_kwargs: Optional[Any] = None, retries: int = 10) -> None:
super().__init__(endpoint_uri, request_kwargs) super().__init__(endpoint_uri, request_kwargs)
self.in_flight = asyncio.Semaphore(config.concurrent_rpc_connections) self.in_flight = asyncio.Semaphore(config.concurrent_rpc_connections)
self.rate_allowed = asyncio.Event() self.rate_allowed = asyncio.Event()
self.rate_allowed.set() self.rate_allowed.set()
self.retries = retries
async def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse: async def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse:
MAX_TRIES = 10
wait = 0 wait = 0
for _ in range(MAX_TRIES): for _ in range(self.retries):
try: try:
async with self.in_flight: async with self.in_flight:
await self.rate_allowed.wait() await self.rate_allowed.wait()
@@ -324,6 +256,55 @@ class RetryHTTPProvider (AsyncHTTPProvider):
await asyncio.sleep(wait) await asyncio.sleep(wait)
finally: finally:
self.rate_allowed.set() self.rate_allowed.set()
# finally: raise IOError(f'Could not query rpc server after {self.retries} tries: {method} {params}')
# log.debug(f'Ended request of RPC call {method}')
raise IOError(f'Could not query rpc server after {MAX_TRIES} tries: {method} {params}')
class RoundRobinHTTPProvider (AsyncJSONBaseProvider):
@staticmethod
async def construct(endpoint_uris: list[str], archive_uris: list[str]):
providers = [RetryHTTPProvider.construct(uri, retries=1) for uri in endpoint_uris]
archive_providers = [RetryHTTPProvider.construct(uri, retries=1) for uri in archive_uris]
providers = await asyncio.gather(*providers)
archive_providers = await asyncio.gather(*archive_providers)
# Ensure all instances share the same chain ID
chain_ids = await asyncio.gather(*(AsyncWeb3(provider).eth.chain_id for provider in itertools.chain(providers, archive_providers)))
if len(set(chain_ids)) != 1:
raise RuntimeError("All RPC URLs must belong to the same blockchain")
return RoundRobinHTTPProvider(providers, archive_providers)
def __init__(self, providers: list[RetryHTTPProvider], archive_providers: list[RetryHTTPProvider]):
super().__init__()
self.providers = providers
self.archive_providers = archive_providers
for provider in self.providers:
provider.archive_fault_height = 0
self.index = 0
self.archive_index = 0
async def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse:
provider = self._current()
try:
return await provider.make_request(method, params)
except ArchiveException as e:
provider.archive_fault_height = max(provider.archive_fault_height, e.block_number)
if not self.archive_providers:
raise
return await self._current_archive().make_request(method, params)
def _current(self) -> RetryHTTPProvider:
if self.index >= len(self.providers):
self.index = 0
current_provider = self.providers[self.index]
self.index += 1
return current_provider
def _current_archive(self) -> RetryHTTPProvider:
if self.archive_index >= len(self.archive_providers):
self.archive_index = 0
current_provider = self.archive_providers[self.archive_index]
self.archive_index += 1
return current_provider

View File

@@ -3,7 +3,7 @@ import logging
from web3.types import EventData from web3.types import EventData
from dexorder import db, metric, current_w3 from dexorder import db, metric, current_w3, timestamp
from dexorder.accounting import accounting_fill, accounting_placement, accounting_transfer, is_tracked_address, \ from dexorder.accounting import accounting_fill, accounting_placement, accounting_transfer, is_tracked_address, \
accounting_lock accounting_lock
from dexorder.base.chain import current_chain from dexorder.base.chain import current_chain
@@ -214,13 +214,21 @@ async def handle_vault_impl_changed(upgrade: EventData):
log.debug(f'Vault {addr} upgraded to impl version {version}') log.debug(f'Vault {addr} upgraded to impl version {version}')
async def get_gas_price(): slow_metric_update = 0
return await current_w3.get().eth.gas_price async def update_metrics():
# called at the end of the runloop in the worker context
metric.vaults.set(vault_owners.upper_len())
metric.open_orders.set(Order.open_orders.upper_len())
metric.triggers_time.set(len(TimeTrigger.all))
metric.triggers_line.set(len(PriceLineTrigger.triggers_set))
def initialize_metrics(): # slow updates
metric.vaults.set_function(vault_owners.upper_len) global slow_metric_update
metric.open_orders.set_function(Order.open_orders.upper_len) now = timestamp()
metric.triggers_time.set_function(lambda: len(TimeTrigger.all)) if now - slow_metric_update >= 60:
metric.triggers_line.set_function(lambda: len(PriceLineTrigger.triggers_set)) slow_metric_update = now
metric.gas_price.set_function(get_gas_price)
# put slow updates here
price = await current_w3.get().eth.gas_price
metric.gas_price.observe(price)

View File

@@ -22,4 +22,4 @@ volume = Counter("volume", "Total volume of successful executions in USD")
account_total = Gauge('account_total', 'Total number of accounts configured') account_total = Gauge('account_total', 'Total number of accounts configured')
account_available = Gauge('account_available', 'Number of accounts that do not have any pending transactions') account_available = Gauge('account_available', 'Number of accounts that do not have any pending transactions')
gas_price = Gauge('gas_price', 'Gas price in wei') gas_price = Summary('gas_price', 'Gas price in wei')

View File

@@ -152,7 +152,7 @@ def finish_execution_request(tk: TrancheKey, error: Optional[str]=None):
order_error(order, error) # We do not know if it was filled or not so only Error status can be given order_error(order, error) # We do not know if it was filled or not so only Error status can be given
elif error == 'TF': elif error == 'TF':
# Tranche Filled # Tranche Filled
log.warning(f'tranche already filled {tk}') log.debug(f'tranche already filled {tk}')
tranche_trigger = get_trigger() tranche_trigger = get_trigger()
if tranche_trigger is not None: if tranche_trigger is not None:
tranche_trigger.status = TrancheState.Filled tranche_trigger.status = TrancheState.Filled

View File

@@ -90,7 +90,7 @@ class BlockStateRunner(BlockProgressor):
self.new_head_event.set() self.new_head_event.set()
log.debug(f'new head {block}') log.debug(f'new head {block}')
if abs(block.timestamp-timestamp()) > 3: if abs(block.timestamp-timestamp()) > 3:
log.warning(f'Blockchain {chain_id} time is off by {block.timestamp-timestamp():.1f}s') log.info(f'Blockchain {chain_id} time is off by {block.timestamp-timestamp():.1f}s')
if not self.running: if not self.running:
break break
await async_yield() await async_yield()

View File

@@ -7,6 +7,7 @@ from uuid import uuid4
from web3.exceptions import TransactionNotFound, ContractPanicError, ContractLogicError from web3.exceptions import TransactionNotFound, ContractPanicError, ContractLogicError
from dexorder import db, current_w3, Account from dexorder import db, current_w3, Account
from dexorder.alert import warningAlert
from dexorder.base import TransactionReceiptDict, TransactionRequest from dexorder.base import TransactionReceiptDict, TransactionRequest
from dexorder.base.chain import current_chain from dexorder.base.chain import current_chain
from dexorder.blockstate.fork import current_fork from dexorder.blockstate.fork import current_fork
@@ -105,7 +106,7 @@ async def create_and_send_transactions():
w3 = current_w3.get() w3 = current_w3.get()
account = await handler.acquire_account() account = await handler.acquire_account()
if account is None: if account is None:
log.warning(f'No account available for job {job.id} type "{handler.tag}"') warningAlert(f'No account available for job {job.id} type "{handler.tag}"', 'no account available')
continue continue
await ctx.sign(account) await ctx.sign(account)
log.info(f'servicing job {job.request.__class__.__name__} {job.id} with account {account.address} nonce {ctx.tx["nonce"]} tx {ctx.id}') log.info(f'servicing job {job.request.__class__.__name__} {job.id} with account {account.address} nonce {ctx.tx["nonce"]} tx {ctx.id}')
@@ -113,10 +114,16 @@ async def create_and_send_transactions():
try: try:
sent = await w3.eth.send_raw_transaction(ctx.data) sent = await w3.eth.send_raw_transaction(ctx.data)
except ValueError as e: except ValueError as e:
if e.args[0]['code'] == -32003: try:
msg = e.args[0].get('message','')
except IndexError:
msg = ''
if msg.startswith('nonce too low'):
# Nonce too low # Nonce too low
log.warning(f'Account {account.address} nonce too low') log.warning(f'Account {account.address} nonce too low')
account.reset_nonce() account.reset_nonce()
elif msg.startswith('insufficient funds'):
warningAlert('Account Empty', f'Account {account.address} is out of funds!')
else: else:
log.exception(f'Failure sending transaction for job {job.id}') log.exception(f'Failure sending transaction for job {job.id}')
await handler.release_account(account) await handler.release_account(account)
@@ -133,6 +140,7 @@ async def create_and_send_transactions():
job.tx_data = ctx.data job.tx_data = ctx.data
assert sent == job.tx_id assert sent == job.tx_id
ended_jobs = []
async def handle_transaction_receipts(): async def handle_transaction_receipts():
# log.debug('handle_transaction_receipts') # log.debug('handle_transaction_receipts')
@@ -167,6 +175,12 @@ async def handle_transaction_receipts():
def end_job(job): def end_job(job):
in_flight.discard((job.request.type, job.request.key)) ended_jobs.append(job)
db.session.delete(job)
def cleanup_jobs():
for job in ended_jobs:
log.debug(f'ending job {job.id}')
in_flight.discard((job.request.type, job.request.key))
db.session.delete(job)
ended_jobs.clear()