bugfixes; pagerduty client lib change; requirements bump

This commit is contained in:
tim
2025-02-26 16:58:57 -04:00
parent 61ab34a9f7
commit eccf81c3c8
8 changed files with 40 additions and 38 deletions

View File

@@ -49,8 +49,8 @@ numpy==2.2.2
oauthlib==3.2.2 oauthlib==3.2.2
omegaconf==2.3.0 omegaconf==2.3.0
orjson==3.10.15 orjson==3.10.15
pagerduty==1.0.0
parsimonious==0.10.0 parsimonious==0.10.0
pdpyras==5.4.0
prometheus_client==0.21.1 prometheus_client==0.21.1
propcache==0.2.0 propcache==0.2.0
protobuf==5.26.1 protobuf==5.26.1

View File

@@ -21,7 +21,7 @@ eth-keys
eth-account eth-account
eth-utils eth-utils
eth-typing eth-typing
pdpyras # pagerduty pagerduty
numpy numpy
bitarray bitarray
typing_extensions typing_extensions

View File

@@ -1,30 +1,33 @@
import logging import logging
import socket import socket
import pdpyras import pagerduty
from dexorder import NARG, config from dexorder import NARG, config
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def alert(title, message, dedup_key=NARG, log_level=logging.ERROR, do_log=True): def alert(title, message, dedup_key=NARG, log_level=logging.ERROR, do_log=True, severity='critical'):
if dedup_key is NARG: if dedup_key is NARG:
dedup_key = str(hash(title)) dedup_key = str(hash(title))
if do_log: if do_log:
msg = f'{title}: {message}' msg = f'{title}: {message}'
log.log(log_level, msg) # if log_level=CRITICAL for example, make sure this does not re-alert! log.log(log_level, msg) # if log_level=CRITICAL for example, make sure this does not re-alert!
alert_pagerduty(title, message, dedup_key, log_level) alert_pagerduty(title, message, dedup_key, severity)
def warningAlert(title, message, dedup_key=NARG, log_level=logging.WARNING): def warningAlert(title, message, dedup_key=NARG, log_level=logging.WARNING):
return alert(title, message, dedup_key, log_level) return alert(title, message, dedup_key, log_level, severity='warning')
def infoAlert(title, message, dedup_key=NARG, log_level=logging.INFO):
return alert(title, message, dedup_key, log_level, severity='info')
pagerduty_session = None pagerduty_session = None
hostname = None hostname = None
def alert_pagerduty(title, message, dedup_key, log_level): def alert_pagerduty(title, message, dedup_key, severity):
if not config.pagerduty: if not config.pagerduty:
return return
# noinspection PyBroadException # noinspection PyBroadException
@@ -32,10 +35,9 @@ def alert_pagerduty(title, message, dedup_key, log_level):
global pagerduty_session global pagerduty_session
global hostname global hostname
if pagerduty_session is None: if pagerduty_session is None:
pagerduty_session = pdpyras.EventsAPISession(config.pagerduty) pagerduty_session = pagerduty.EventsApiV2Client(config.pagerduty)
hostname = socket.gethostname() hostname = socket.gethostname()
sev = 'critical' if log_level >= logging.ERROR else 'info' pagerduty_session.trigger(title, hostname, severity=severity, custom_details={'message': message}, dedup_key=dedup_key, payload=dict(severity=severity))
pagerduty_session.trigger(title, hostname, severity=sev, custom_details={'message': message}, dedup_key=dedup_key)
except Exception: except Exception:
log.warning('Could not notify PagerDuty!', exc_info=True) log.warning('Could not notify PagerDuty!', exc_info=True)

View File

@@ -3,7 +3,7 @@ from asyncio import CancelledError
from dexorder import db, blockchain from dexorder import db, blockchain
from dexorder.accounting import initialize_accounting from dexorder.accounting import initialize_accounting
from dexorder.alert import warningAlert from dexorder.alert import infoAlert
from dexorder.base.chain import current_chain from dexorder.base.chain import current_chain
from dexorder.bin.executable import execute from dexorder.bin.executable import execute
from dexorder.blockstate import current_blockstate from dexorder.blockstate import current_blockstate
@@ -76,19 +76,19 @@ def setup_logevent_triggers(runner):
runner.add_callback(end_trigger_updates) runner.add_callback(end_trigger_updates)
runner.add_callback(execute_tranches) runner.add_callback(execute_tranches)
runner.add_callback(cleanup_jobs)
# fee adjustments are handled offline by batch jobs # fee adjustments are handled offline by batch jobs
# runner.add_event_trigger(handle_fee_limits_changed, get_contract_event('IFeeManager', 'FeeLimitsChanged')) # runner.add_event_trigger(handle_fee_limits_changed, get_contract_event('IFeeManager', 'FeeLimitsChanged'))
# runner.add_event_trigger(handle_fees_changed, get_contract_event('IFeeManager', 'FeesChanged')) # runner.add_event_trigger(handle_fees_changed, get_contract_event('IFeeManager', 'FeesChanged'))
# runner.add_callback(adjust_gas) # runner.add_callback(adjust_gas)
runner.add_callback(cleanup_jobs)
runner.add_callback(update_metrics) runner.add_callback(update_metrics)
# noinspection DuplicatedCode # noinspection DuplicatedCode
async def main(): async def main():
warningAlert('Started', 'backend has started', log_level=logging.INFO) infoAlert('Started', 'backend has started', log_level=logging.INFO)
await blockchain.connect(autosign=False) # the transaction manager checks out accounts and releases them. await blockchain.connect(autosign=False) # the transaction manager checks out accounts and releases them.
redis_state = None redis_state = None
state = None state = None

View File

@@ -15,10 +15,11 @@ from dexorder.contract.dexorder import get_dexorder_contract
from dexorder.database.model.accounting import AccountingSubcategory from dexorder.database.model.accounting import AccountingSubcategory
from dexorder.database.model.transaction import TransactionJob from dexorder.database.model.transaction import TransactionJob
from dexorder.order.orderstate import Order from dexorder.order.orderstate import Order
from dexorder.order.triggers import (inflight_execution_requests, OrderTriggers, from dexorder.order.triggers import (OrderTriggers,
TrancheState, active_tranches, order_error) TrancheState, active_tranches, order_error)
from dexorder.transactions import TransactionHandler, submit_transaction_request from dexorder.transactions import TransactionHandler, submit_transaction_request
from dexorder.util import hexbytes from dexorder.util import hexbytes
from dexorder.vault_blockdata import refresh_vault_balances
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -79,7 +80,7 @@ class TrancheExecutionHandler (TransactionHandler):
errcode = hexbytes(x.args[1]).decode('utf-8') errcode = hexbytes(x.args[1]).decode('utf-8')
log.error(f'While building execution for tranche {tk}: {errcode}') log.error(f'While building execution for tranche {tk}: {errcode}')
# if there's a logic error we shouldn't keep trying # if there's a logic error we shouldn't keep trying
finish_execution_request(tk, errcode) await finish_execution_request(tk, errcode)
raise exception raise exception
async def complete_transaction(self, job: TransactionJob, receipt: TransactionReceiptDict) -> None: async def complete_transaction(self, job: TransactionJob, receipt: TransactionReceiptDict) -> None:
@@ -90,13 +91,13 @@ class TrancheExecutionHandler (TransactionHandler):
log.error('Could not build execution transaction due to exception', exc_info=e) log.error('Could not build execution transaction due to exception', exc_info=e)
# noinspection PyTypeChecker # noinspection PyTypeChecker
req: TrancheExecutionRequest = job.request req: TrancheExecutionRequest = job.request
finish_execution_request(req.tranche_key, '') await finish_execution_request(req.tranche_key, '')
TrancheExecutionHandler() # map 'te' to a TrancheExecutionHandler TrancheExecutionHandler() # map 'te' to a TrancheExecutionHandler
def finish_execution_request(tk: TrancheKey, error: Optional[str]=None): async def finish_execution_request(tk: TrancheKey, error: Optional[str]=None):
order_key = OrderKey(tk.vault, tk.order_index) order_key = OrderKey(tk.vault, tk.order_index)
try: try:
order: Order = Order.of(order_key) order: Order = Order.of(order_key)
@@ -104,11 +105,6 @@ def finish_execution_request(tk: TrancheKey, error: Optional[str]=None):
log.error(f'Could not get order {order_key}') log.error(f'Could not get order {order_key}')
return return
try:
inflight_execution_requests.remove(tk)
except KeyError:
pass
def get_trigger(): def get_trigger():
try: try:
return OrderTriggers.instances[order_key].triggers[tk.tranche_index] return OrderTriggers.instances[order_key].triggers[tk.tranche_index]
@@ -139,7 +135,9 @@ def finish_execution_request(tk: TrancheKey, error: Optional[str]=None):
# Insufficient Input Amount # Insufficient Input Amount
token = order.order.tokenIn token = order.order.tokenIn
log.debug(f'insufficient funds {tk.vault} {token} ') log.debug(f'insufficient funds {tk.vault} {token} ')
slash()
retry() retry()
await refresh_vault_balances(tk.vault, order.order.tokenIn, order.order.tokenOut)
elif error == 'SPL': elif error == 'SPL':
# todo tight slippage can cause excessive executions as the backend repeatedly retries the remainder. The symptom is error 'SPL'. # todo tight slippage can cause excessive executions as the backend repeatedly retries the remainder. The symptom is error 'SPL'.
# Square-root price limit from Uniswap means we asked for a limit price that isn't met. This is a fault of # Square-root price limit from Uniswap means we asked for a limit price that isn't met. This is a fault of
@@ -199,24 +197,20 @@ def finish_execution_request(tk: TrancheKey, error: Optional[str]=None):
def execute_tranches(): def execute_tranches():
new_execution_requests = [] new_execution_requests = []
for tk, proof in active_tranches.items(): for tk, proof in active_tranches.items():
if tk not in inflight_execution_requests: new_execution_requests.append((tk, proof))
new_execution_requests.append((tk, proof))
else:
log.debug(f'execute {tk} already in flight')
# todo order requests and batch # todo order requests and batch
for tk, proof in new_execution_requests: for tk, proof in new_execution_requests:
create_execution_request(tk, proof) create_execution_request(tk, proof)
def create_execution_request(tk: TrancheKey, proof: PriceProof): def create_execution_request(tk: TrancheKey, proof: PriceProof):
inflight_execution_requests.add(tk)
job = submit_transaction_request(new_tranche_execution_request(tk, proof)) job = submit_transaction_request(new_tranche_execution_request(tk, proof))
if job is not None: if job is not None:
log.debug(f'Executing {tk} as job {job.id}') log.debug(f'Executing {tk} as job {job.id}')
return job return job
def handle_dexorderexecutions(event: EventData): async def handle_dexorderexecutions(event: EventData):
log.debug(f'executions {event}') log.debug(f'executions {event}')
exe_id = UUID(bytes=event['args']['id']) exe_id = UUID(bytes=event['args']['id'])
try: try:
@@ -236,4 +230,4 @@ def handle_dexorderexecutions(event: EventData):
# noinspection PyTypeChecker # noinspection PyTypeChecker
req: TrancheExecutionRequest = job.request req: TrancheExecutionRequest = job.request
tk = TrancheKey(req.vault, req.order_index, req.tranche_index) tk = TrancheKey(req.vault, req.order_index, req.tranche_index)
finish_execution_request(tk, None if errors[0] == '' else errors[0]) await finish_execution_request(tk, None if errors[0] == '' else errors[0])

View File

@@ -39,9 +39,6 @@ execution should be attempted on the tranche.
# tranches which have passed all constraints and should be executed # tranches which have passed all constraints and should be executed
active_tranches: BlockDict[TrancheKey, Optional[PriceProof]] = BlockDict('at') active_tranches: BlockDict[TrancheKey, Optional[PriceProof]] = BlockDict('at')
# tranches which have an execute() transaction sent but not completed
inflight_execution_requests: set[TrancheKey] = set()
class OrderTriggers: class OrderTriggers:
instances: dict[OrderKey, 'OrderTriggers'] = {} instances: dict[OrderKey, 'OrderTriggers'] = {}
@@ -307,10 +304,7 @@ class TimeTrigger (Trigger):
if time == self._time: if time == self._time:
return return
self._time = time self._time = time
if self.active: self.remove()
# remove old trigger
TimeTrigger.all.remove(self)
self.active = False
self.update_active(time_now) self.update_active(time_now)
def update_active(self, time_now: int = None, time: int = None): def update_active(self, time_now: int = None, time: int = None):

View File

@@ -159,12 +159,12 @@ async def handle_transaction_receipts():
receipt: TransactionReceiptDict = await w3.eth.get_transaction_receipt(job.tx_id) receipt: TransactionReceiptDict = await w3.eth.get_transaction_receipt(job.tx_id)
except TransactionNotFound: except TransactionNotFound:
return return
job.state = TransactionJobState.Mined
job.receipt = receipt
fork = current_fork.get() fork = current_fork.get()
assert fork is not None assert fork is not None
if fork.branch.contiguous and receipt['blockHash'] in fork.branch.path or \ if fork.branch.contiguous and receipt['blockHash'] in fork.branch.path or \
fork.branch.disjoint and receipt['blockNumber'] <= fork.height: fork.branch.disjoint and receipt['blockNumber'] <= fork.height:
job.state = TransactionJobState.Mined
job.receipt = receipt
try: try:
handler = TransactionHandler.of(job.request.type) handler = TransactionHandler.of(job.request.type)
except KeyError: except KeyError:

View File

@@ -1,3 +1,4 @@
import asyncio
import functools import functools
import logging import logging
@@ -90,3 +91,14 @@ def publish_vaults(chain_id, owner):
break break
log.debug(f'publish_vaults {chain_id} {owner} {vaults}') log.debug(f'publish_vaults {chain_id} {owner} {vaults}')
current_pub.get()(f'{chain_id}|{owner}', 'vaults', chain_id, owner, vaults) current_pub.get()(f'{chain_id}|{owner}', 'vaults', chain_id, owner, vaults)
async def refresh_vault_balances(vault, *tokens):
amounts = await asyncio.gather(*(ERC20(token).balanceOf(vault) for token in tokens))
def _adjust(vaddr, toks, amts, old_balances):
result = dict(old_balances) # copy
for t, a in zip(toks, amts):
result[t] = a
return result
vault_balances.modify(vault, functools.partial(_adjust, vault, tokens, amounts))