From eccf81c3c88149d654afc2c2ef8a0b89f8c16c52 Mon Sep 17 00:00:00 2001 From: tim Date: Wed, 26 Feb 2025 16:58:57 -0400 Subject: [PATCH] bugfixes; pagerduty client lib change; requirements bump --- requirements-lock.txt | 2 +- requirements.txt | 2 +- src/dexorder/alert.py | 18 ++++++++++-------- src/dexorder/bin/main.py | 6 +++--- src/dexorder/order/executionhandler.py | 26 ++++++++++---------------- src/dexorder/order/triggers.py | 8 +------- src/dexorder/transactions.py | 4 ++-- src/dexorder/vault_blockdata.py | 12 ++++++++++++ 8 files changed, 40 insertions(+), 38 deletions(-) diff --git a/requirements-lock.txt b/requirements-lock.txt index 9be035a..d238c85 100644 --- a/requirements-lock.txt +++ b/requirements-lock.txt @@ -49,8 +49,8 @@ numpy==2.2.2 oauthlib==3.2.2 omegaconf==2.3.0 orjson==3.10.15 +pagerduty==1.0.0 parsimonious==0.10.0 -pdpyras==5.4.0 prometheus_client==0.21.1 propcache==0.2.0 protobuf==5.26.1 diff --git a/requirements.txt b/requirements.txt index 71c59c9..f769dc1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,7 +21,7 @@ eth-keys eth-account eth-utils eth-typing -pdpyras # pagerduty +pagerduty numpy bitarray typing_extensions diff --git a/src/dexorder/alert.py b/src/dexorder/alert.py index 1c0af2e..c6e22dd 100644 --- a/src/dexorder/alert.py +++ b/src/dexorder/alert.py @@ -1,30 +1,33 @@ import logging import socket -import pdpyras +import pagerduty from dexorder import NARG, config log = logging.getLogger(__name__) -def alert(title, message, dedup_key=NARG, log_level=logging.ERROR, do_log=True): +def alert(title, message, dedup_key=NARG, log_level=logging.ERROR, do_log=True, severity='critical'): if dedup_key is NARG: dedup_key = str(hash(title)) if do_log: msg = f'{title}: {message}' log.log(log_level, msg) # if log_level=CRITICAL for example, make sure this does not re-alert! - alert_pagerduty(title, message, dedup_key, log_level) + alert_pagerduty(title, message, dedup_key, severity) def warningAlert(title, message, dedup_key=NARG, log_level=logging.WARNING): - return alert(title, message, dedup_key, log_level) + return alert(title, message, dedup_key, log_level, severity='warning') + +def infoAlert(title, message, dedup_key=NARG, log_level=logging.INFO): + return alert(title, message, dedup_key, log_level, severity='info') pagerduty_session = None hostname = None -def alert_pagerduty(title, message, dedup_key, log_level): +def alert_pagerduty(title, message, dedup_key, severity): if not config.pagerduty: return # noinspection PyBroadException @@ -32,10 +35,9 @@ def alert_pagerduty(title, message, dedup_key, log_level): global pagerduty_session global hostname if pagerduty_session is None: - pagerduty_session = pdpyras.EventsAPISession(config.pagerduty) + pagerduty_session = pagerduty.EventsApiV2Client(config.pagerduty) hostname = socket.gethostname() - sev = 'critical' if log_level >= logging.ERROR else 'info' - pagerduty_session.trigger(title, hostname, severity=sev, custom_details={'message': message}, dedup_key=dedup_key) + pagerduty_session.trigger(title, hostname, severity=severity, custom_details={'message': message}, dedup_key=dedup_key, payload=dict(severity=severity)) except Exception: log.warning('Could not notify PagerDuty!', exc_info=True) diff --git a/src/dexorder/bin/main.py b/src/dexorder/bin/main.py index 50645fc..66b67d2 100644 --- a/src/dexorder/bin/main.py +++ b/src/dexorder/bin/main.py @@ -3,7 +3,7 @@ from asyncio import CancelledError from dexorder import db, blockchain from dexorder.accounting import initialize_accounting -from dexorder.alert import warningAlert +from dexorder.alert import infoAlert from dexorder.base.chain import current_chain from dexorder.bin.executable import execute from dexorder.blockstate import current_blockstate @@ -76,19 +76,19 @@ def setup_logevent_triggers(runner): runner.add_callback(end_trigger_updates) runner.add_callback(execute_tranches) - runner.add_callback(cleanup_jobs) # fee adjustments are handled offline by batch jobs # runner.add_event_trigger(handle_fee_limits_changed, get_contract_event('IFeeManager', 'FeeLimitsChanged')) # runner.add_event_trigger(handle_fees_changed, get_contract_event('IFeeManager', 'FeesChanged')) # runner.add_callback(adjust_gas) + runner.add_callback(cleanup_jobs) runner.add_callback(update_metrics) # noinspection DuplicatedCode async def main(): - warningAlert('Started', 'backend has started', log_level=logging.INFO) + infoAlert('Started', 'backend has started', log_level=logging.INFO) await blockchain.connect(autosign=False) # the transaction manager checks out accounts and releases them. redis_state = None state = None diff --git a/src/dexorder/order/executionhandler.py b/src/dexorder/order/executionhandler.py index 9a92189..f7fdb94 100644 --- a/src/dexorder/order/executionhandler.py +++ b/src/dexorder/order/executionhandler.py @@ -15,10 +15,11 @@ from dexorder.contract.dexorder import get_dexorder_contract from dexorder.database.model.accounting import AccountingSubcategory from dexorder.database.model.transaction import TransactionJob from dexorder.order.orderstate import Order -from dexorder.order.triggers import (inflight_execution_requests, OrderTriggers, +from dexorder.order.triggers import (OrderTriggers, TrancheState, active_tranches, order_error) from dexorder.transactions import TransactionHandler, submit_transaction_request from dexorder.util import hexbytes +from dexorder.vault_blockdata import refresh_vault_balances log = logging.getLogger(__name__) @@ -79,7 +80,7 @@ class TrancheExecutionHandler (TransactionHandler): errcode = hexbytes(x.args[1]).decode('utf-8') log.error(f'While building execution for tranche {tk}: {errcode}') # if there's a logic error we shouldn't keep trying - finish_execution_request(tk, errcode) + await finish_execution_request(tk, errcode) raise exception async def complete_transaction(self, job: TransactionJob, receipt: TransactionReceiptDict) -> None: @@ -90,13 +91,13 @@ class TrancheExecutionHandler (TransactionHandler): log.error('Could not build execution transaction due to exception', exc_info=e) # noinspection PyTypeChecker req: TrancheExecutionRequest = job.request - finish_execution_request(req.tranche_key, '') + await finish_execution_request(req.tranche_key, '') TrancheExecutionHandler() # map 'te' to a TrancheExecutionHandler -def finish_execution_request(tk: TrancheKey, error: Optional[str]=None): +async def finish_execution_request(tk: TrancheKey, error: Optional[str]=None): order_key = OrderKey(tk.vault, tk.order_index) try: order: Order = Order.of(order_key) @@ -104,11 +105,6 @@ def finish_execution_request(tk: TrancheKey, error: Optional[str]=None): log.error(f'Could not get order {order_key}') return - try: - inflight_execution_requests.remove(tk) - except KeyError: - pass - def get_trigger(): try: return OrderTriggers.instances[order_key].triggers[tk.tranche_index] @@ -139,7 +135,9 @@ def finish_execution_request(tk: TrancheKey, error: Optional[str]=None): # Insufficient Input Amount token = order.order.tokenIn log.debug(f'insufficient funds {tk.vault} {token} ') + slash() retry() + await refresh_vault_balances(tk.vault, order.order.tokenIn, order.order.tokenOut) elif error == 'SPL': # todo tight slippage can cause excessive executions as the backend repeatedly retries the remainder. The symptom is error 'SPL'. # Square-root price limit from Uniswap means we asked for a limit price that isn't met. This is a fault of @@ -199,24 +197,20 @@ def finish_execution_request(tk: TrancheKey, error: Optional[str]=None): def execute_tranches(): new_execution_requests = [] for tk, proof in active_tranches.items(): - if tk not in inflight_execution_requests: - new_execution_requests.append((tk, proof)) - else: - log.debug(f'execute {tk} already in flight') + new_execution_requests.append((tk, proof)) # todo order requests and batch for tk, proof in new_execution_requests: create_execution_request(tk, proof) def create_execution_request(tk: TrancheKey, proof: PriceProof): - inflight_execution_requests.add(tk) job = submit_transaction_request(new_tranche_execution_request(tk, proof)) if job is not None: log.debug(f'Executing {tk} as job {job.id}') return job -def handle_dexorderexecutions(event: EventData): +async def handle_dexorderexecutions(event: EventData): log.debug(f'executions {event}') exe_id = UUID(bytes=event['args']['id']) try: @@ -236,4 +230,4 @@ def handle_dexorderexecutions(event: EventData): # noinspection PyTypeChecker req: TrancheExecutionRequest = job.request tk = TrancheKey(req.vault, req.order_index, req.tranche_index) - finish_execution_request(tk, None if errors[0] == '' else errors[0]) + await finish_execution_request(tk, None if errors[0] == '' else errors[0]) diff --git a/src/dexorder/order/triggers.py b/src/dexorder/order/triggers.py index 4c3099f..05da67a 100644 --- a/src/dexorder/order/triggers.py +++ b/src/dexorder/order/triggers.py @@ -39,9 +39,6 @@ execution should be attempted on the tranche. # tranches which have passed all constraints and should be executed active_tranches: BlockDict[TrancheKey, Optional[PriceProof]] = BlockDict('at') -# tranches which have an execute() transaction sent but not completed -inflight_execution_requests: set[TrancheKey] = set() - class OrderTriggers: instances: dict[OrderKey, 'OrderTriggers'] = {} @@ -307,10 +304,7 @@ class TimeTrigger (Trigger): if time == self._time: return self._time = time - if self.active: - # remove old trigger - TimeTrigger.all.remove(self) - self.active = False + self.remove() self.update_active(time_now) def update_active(self, time_now: int = None, time: int = None): diff --git a/src/dexorder/transactions.py b/src/dexorder/transactions.py index 4b6a738..93566e3 100644 --- a/src/dexorder/transactions.py +++ b/src/dexorder/transactions.py @@ -159,12 +159,12 @@ async def handle_transaction_receipts(): receipt: TransactionReceiptDict = await w3.eth.get_transaction_receipt(job.tx_id) except TransactionNotFound: return - job.state = TransactionJobState.Mined - job.receipt = receipt fork = current_fork.get() assert fork is not None if fork.branch.contiguous and receipt['blockHash'] in fork.branch.path or \ fork.branch.disjoint and receipt['blockNumber'] <= fork.height: + job.state = TransactionJobState.Mined + job.receipt = receipt try: handler = TransactionHandler.of(job.request.type) except KeyError: diff --git a/src/dexorder/vault_blockdata.py b/src/dexorder/vault_blockdata.py index bfc2ce4..c82cdec 100644 --- a/src/dexorder/vault_blockdata.py +++ b/src/dexorder/vault_blockdata.py @@ -1,3 +1,4 @@ +import asyncio import functools import logging @@ -90,3 +91,14 @@ def publish_vaults(chain_id, owner): break log.debug(f'publish_vaults {chain_id} {owner} {vaults}') current_pub.get()(f'{chain_id}|{owner}', 'vaults', chain_id, owner, vaults) + + +async def refresh_vault_balances(vault, *tokens): + amounts = await asyncio.gather(*(ERC20(token).balanceOf(vault) for token in tokens)) + + def _adjust(vaddr, toks, amts, old_balances): + result = dict(old_balances) # copy + for t, a in zip(toks, amts): + result[t] = a + return result + vault_balances.modify(vault, functools.partial(_adjust, vault, tokens, amounts))