ElaboratedSwapOrderStatus with tx_id; alerts; db connection fixes; order recovery fixes

This commit is contained in:
tim
2024-08-13 21:45:25 -04:00
parent d7cccf3da3
commit d9bc031f72
13 changed files with 170 additions and 38 deletions

View File

@@ -1,7 +1,7 @@
sqlalchemy
alembic
omegaconf
web3>=6.11.1
web3
psycopg2-binary
orjson
sortedcontainers
@@ -14,3 +14,5 @@ cachetools
async-lru
eth-bloom
python-dateutil
eth_abi
pdpyras # pagerduty

72
src/dexorder/alert.py Normal file
View File

@@ -0,0 +1,72 @@
import logging
import socket
import pdpyras
from dexorder import NARG, config
log = logging.getLogger(__name__)
def alert(title, message, dedup_key=NARG, log_level=logging.ERROR, do_log=True):
if dedup_key is NARG:
dedup_key = str(hash(title + '|' + message))
if do_log:
msg = f'{title}: {message}'
log.log(log_level, msg) # if log_level=CRITICAL for example, make sure this does not re-alert!
alert_pagerduty(title, message, dedup_key, log_level)
def warningAlert(title, message, dedup_key=NARG, log_level=logging.WARNING):
return alert(title, message, dedup_key, log_level)
async def spawn_alert(title, message, dedup_key):
alert_pagerduty(title,message,dedup_key)
pagerduty_session = None
hostname = None
def alert_pagerduty(title, message, dedup_key, log_level):
if not config.pagerduty:
return
try:
global pagerduty_session
global hostname
if pagerduty_session is None:
pagerduty_session = pdpyras.EventsAPISession(config.pagerduty)
hostname = socket.gethostname()
sev = 'error' if log_level >= logging.ERROR else 'warning'
pagerduty_session.trigger(title, hostname, severity=sev, custom_details={'message': message}, dedup_key=dedup_key)
except Exception:
log.warning('Could not notify PagerDuty!', exc_info=True)
class AlertHandler(logging.Handler):
def __init__(self, level=logging.NOTSET):
super().__init__(level)
self.paused = False
def emit(self, record):
if not self.paused:
msg = self.format(record)
if msg is not None and type(msg) is str:
msg = msg.strip()
msg_last_line = msg.split('\n')[-1]
if msg_last_line:
msg_last_line = ': ' + msg_last_line[:100]
else:
msg_last_line = ''
self.paused = True
try:
alert(record.levelname + msg_last_line, msg, log_level=record.levelno, do_log=False)
finally:
self.paused = False
def init_alerts():
if config.pagerduty:
logging.getLogger('dexorder').addHandler(AlertHandler(logging.WARNING))

View File

@@ -105,7 +105,9 @@ class SwapOrderStatus(SwapStatus):
self.order = order
@staticmethod
def load(obj):
def load(obj, *, Class=None):
if Class is None:
Class = SwapOrderStatus
order = SwapOrder.load(obj[0])
fillFeeHalfBps = int(obj[1])
state = SwapOrderState(obj[2])
@@ -117,11 +119,13 @@ class SwapOrderStatus(SwapStatus):
trancheFilledIn = [int(f) for f in obj[8]]
trancheFilledOut = [int(f) for f in obj[9]]
trancheActivationTime = [int(f) for f in obj[10]]
return SwapOrderStatus(order, fillFeeHalfBps, state, startTime, startPrice, ocoGroup,
filledIn, filledOut, trancheFilledIn, trancheFilledOut, trancheActivationTime)
return Class(order, fillFeeHalfBps, state, startTime, startPrice, ocoGroup,
filledIn, filledOut, trancheFilledIn, trancheFilledOut, trancheActivationTime)
@staticmethod
def load_from_chain(obj):
def load_from_chain(obj, *, Class=None):
if Class is None:
Class = SwapOrderStatus
# 0 SwapOrder order;
# 1 int fillFeeHalfBps
# 2 bool canceled;
@@ -144,8 +148,8 @@ class SwapOrderStatus(SwapStatus):
trancheFilledIn = [0 for _ in range(len(obj[7]))]
trancheFilledOut = [0 for _ in range(len(obj[7]))]
trancheActivationTime = [int(i) for i in obj[8]]
return SwapOrderStatus(order, fillFeeHalfBps, state, startTime, startPrice, ocoGroup,
filledIn, filledOut, trancheFilledIn, trancheFilledOut, trancheActivationTime)
return Class(order, fillFeeHalfBps, state, startTime, startPrice, ocoGroup,
filledIn, filledOut, trancheFilledIn, trancheFilledOut, trancheActivationTime)
def dump(self):
return (
@@ -158,6 +162,36 @@ class SwapOrderStatus(SwapStatus):
def copy(self):
return copy.deepcopy(self)
@dataclass
class ElaboratedSwapOrderStatus (SwapOrderStatus):
@staticmethod
def load_from_tx(tx_id: bytes, obj):
# noinspection PyTypeChecker
status: ElaboratedSwapOrderStatus = SwapOrderStatus.load_from_chain(obj, Class=ElaboratedSwapOrderStatus)
status.tx_id = tx_id
return status
# noinspection PyMethodOverriding
@staticmethod
def load(obj):
tx_id, *swaporder_args = obj
result = SwapOrderStatus.load(obj[1:], Class=ElaboratedSwapOrderStatus)
result.tx_id = tx_id
return result
# noinspection PyMissingConstructor
def __init__(self, order: SwapOrder, *swapstatus_args, tx_id=b''):
super().__init__(order, *swapstatus_args)
self.tx_id: bytes = tx_id
def dump(self):
return self.tx_id, *super().dump()
def copy(self):
return super().copy()
NO_OCO = 18446744073709551615 # max uint64
@@ -239,7 +273,8 @@ class Tranche:
def __str__(self):
msg = f'{self.fraction/MAX_FRACTION:.1%} {"start+" if self.startTimeIsRelative else ""}{self.startTime} to {"start+" if self.startTimeIsRelative else ""}{self.endTime}'
if self.marketOrder:
msg += ' market order'
# for marketOrders, minIntercept is the slippage
msg += f' market order slippage {self.minIntercept:.2%}'
else:
if self.minIntercept or self.minSlope:
msg += f' >{self.minIntercept:.5g}'

View File

@@ -2,7 +2,7 @@
# This script generates a BIP-39 24-word key from physical dice rolls (1-6)
#
KEY_LENGTH=128 # use this to select the size of your seed: 128 (12-word), 160, 192, 224, 256 (24-word)
KEY_LENGTH=256 # use this to select the size of your seed: 128 (12-word), 160, 192, 224, 256 (24-word)
from bip_utils import Bip39MnemonicEncoder
from bitarray import bitarray

View File

@@ -11,6 +11,7 @@ from typing import Coroutine
import sys
from dexorder import configuration
from dexorder.alert import init_alerts
if __name__ == '__main__':
raise Exception('this file is meant to be imported not executed')
@@ -43,6 +44,7 @@ def execute(main:Coroutine, shutdown=None, *, parse_logging=True, parse_args=Tru
log.info('Logging configured to default')
if parse_args:
configuration.parse_args()
init_alerts()
# loop setup
loop = asyncio.get_event_loop()

View File

@@ -77,7 +77,7 @@ async def main():
db.connect()
if db:
db_state = DbState(BlockData.by_opt('db'))
with db.session:
with db.transaction():
state = await db_state.load()
if state is None:
log.info('no state in database')

View File

@@ -38,3 +38,5 @@ class Config:
mirror_source_rpc_url: Optional[str] = None # source RPC for original pools
mirror_pools: list[str] = field(default_factory=list)
mirror_env: Optional[str] = None
pagerduty: Optional[str] = None

View File

@@ -77,13 +77,15 @@ class Db:
@staticmethod
def make_session(**kwargs) -> Session:
# if _session.get(None) is not None:
# log.warning('Creating new session when old one isn\'t closed', stack_info=True)
engine = _engine.get()
if engine is None:
raise RuntimeError('Cannot create session: no database engine set. Use dexorder.db.connect() first')
kwargs.setdefault('expire_on_commit', False)
s = Session(engine, **kwargs)
_session.set(s)
# log.debug('created new DB session')
# log.debug(f'Created new DB session {s}', stack_info=True)
return s
@staticmethod
@@ -91,6 +93,7 @@ class Db:
s = _session.get()
if s is not None:
s.close()
# log.debug(f'Closing DB session {s}')
# noinspection PyTypeChecker
_session.set(None)

View File

@@ -41,7 +41,7 @@ class Token (Base):
name: Mapped[str]
symbol: Mapped[str]
decimals: Mapped[Uint8]
approved: Mapped[bool]
approved: Mapped[bool] = mapped_column(index=True)
@staticmethod

View File

@@ -56,7 +56,7 @@ async def handle_order_placed(event: EventData):
contract = VaultContract(addr)
obj = await contract.swapOrderStatus(index)
log.debug(f'raw order status {obj}')
order = Order.create(addr, index, obj)
order = Order.create(addr, index, event['transactionHash'], obj)
await activate_order(order)
log.debug(f'new order {order} {order.order}')
@@ -70,6 +70,7 @@ def handle_swap_filled(event: EventData):
tranche_index = args['trancheIndex']
amount_in = args['amountIn']
amount_out = args['amountOut']
fill_fee = args['fillFee']
next_execution_time = args['nextExecutionTime']
try:
order: Order = Order.of(vault, order_index)
@@ -304,14 +305,19 @@ def finish_execution_request(req: TrancheExecutionRequest, error: str):
log.debug(f'execution request for tranche {tk} had error "{error}"')
if error == '':
log.debug(f'execution request for tranche {tk} was successful!')
elif error in ('IIA', 'STF'): # todo not STF
elif error == 'IIA':
# Insufficient Input Amount or Safe Transfer Failure: suspend execution until new funds are sent
# todo vault balance checks
token = order.order.tokenIn
log.debug(f'insufficient funds {req.vault} {token} ')
elif error == 'SPL':
# Square-root price limit from Uniswap means we asked for a limit price that isn't met. This is a fault of
# vault logic if it happens.
log.warning(f'SPL when executing tranche {tk}')
close_order_and_disable_triggers(order, SwapOrderState.Error)
elif error == 'NO':
# order is not open
log.error(f'order {OrderKey(tk.vault,tk.order_index)} was closed, undetected!')
log.warning(f'order {OrderKey(tk.vault,tk.order_index)} was closed, undetected!')
close_order_and_disable_triggers(order, SwapOrderState.Error)
elif error == 'TF':
# Tranche Filled
@@ -329,6 +335,7 @@ def finish_execution_request(req: TrancheExecutionRequest, error: str):
log.debug('warning: de minimis liquidity in pool')
# todo dont keep trying
else:
# todo slash and backoff
log.error(f'Unhandled execution error for transaction request {req} ERROR: "{error}"')

View File

@@ -6,7 +6,7 @@ from typing import overload
from dexorder import DELETE, db, order_log
from dexorder.base.chain import current_chain
from dexorder.base.order import OrderKey, TrancheKey
from dexorder.base.orderlib import SwapOrderStatus, SwapOrderState
from dexorder.base.orderlib import SwapOrderStatus, SwapOrderState, ElaboratedSwapOrderStatus
from dexorder.blockstate import BlockDict, BlockSet
from dexorder.database.model.orderindex import OrderIndex
from dexorder.routing import pool_address
@@ -67,16 +67,19 @@ class Order:
@staticmethod
def of(a, b=None) -> 'Order':
key = (OrderKey(a.vault, a.order_index) if type(a) is TrancheKey else a) if b is None else OrderKey(a, b)
return Order.instances[key]
try:
return Order.instances[key]
except KeyError:
return Order(key)
@staticmethod
def create(vault: str, order_index: int, obj):
def create(vault: str, order_index: int, tx_id: bytes, obj):
""" use when a brand new order is detected by the system """
key = OrderKey(vault, order_index)
if key in Order.instances:
raise ValueError
status = SwapOrderStatus.load_from_chain(obj)
status = ElaboratedSwapOrderStatus.load_from_tx(tx_id, obj)
Order.order_statuses[key] = status.copy() # always copy the struct when setting. values in BlockData must be immutable
order = Order(key)
if order.is_open:
@@ -255,7 +258,7 @@ class Order:
order_statuses: BlockDict[OrderKey, SwapOrderStatus] = BlockDict(
'o', db='lazy', redis=True, pub=pub_order_status, finalize_cb=save_order_index,
str2key=OrderKey.str2key, value2str=lambda v: json.dumps(v.dump()),
str2value=lambda s:SwapOrderStatus.load(json.loads(s)),
str2value=lambda s:ElaboratedSwapOrderStatus.load(json.loads(s)),
)
# open orders = the set of unfilled, not-canceled orders

View File

@@ -134,11 +134,7 @@ class TrancheTrigger:
if self.closed:
log.debug(f'price trigger ignored because trigger status is {self.status}')
return
if now >= self.time_constraint[1]:
order_log.debug(f'tranche expired {self.tk}')
self.status = TrancheStatus.Expired
self.disable()
elif self.status == TrancheStatus.Early and now >= self.time_constraint[0]:
if not self.check_expired(now) and self.status == TrancheStatus.Early and now >= self.time_constraint[0]:
order_log.debug(f'tranche time enabled {self.tk}')
self.status = TrancheStatus.Pricing
self.enable_price_trigger()
@@ -193,6 +189,17 @@ class TrancheTrigger:
order_log.debug(f'tranche part-filled {self.tk} in:{_amount_in} out:{_amount_out} remaining:{remaining}')
return filled
def check_expired(self, now):
expired = now >= self.time_constraint[1]
if expired:
self.expire()
return expired
def expire(self):
order_log.debug(f'tranche expired {self.tk}')
self.status = TrancheStatus.Expired
self.disable()
def disable(self):
try:
del active_tranches[self.tk]
@@ -244,6 +251,10 @@ class OrderTriggers:
if self.triggers[tranche_index].fill(amount_in, amount_out):
self.check_complete()
def expire_tranche(self, tranche_index):
self.triggers[tranche_index].expire()
self.check_complete()
def close_order_and_disable_triggers(order: Order, final_state: SwapOrderState):
order.complete(final_state)

View File

@@ -253,9 +253,10 @@ class BlockStateRunner(BlockProgressor):
w3 = current_w3.get()
current_blockstate.set(self.state)
current_fork.set(fork)
session = None
batches = []
pubs = []
session = db.make_session(autocommit=False)
session.begin()
try:
branch = fork.branch
if branch.disjoint:
@@ -289,8 +290,6 @@ class BlockStateRunner(BlockProgressor):
batches.append((None, callback, None, None))
# set up for callbacks
session = db.make_session(autocommit=False)
session.begin()
current_pub.set(lambda room, evnt, *args: pubs.append((room, evnt, args))) # used by handle_vault_created
if not self.state_initialized:
await self.do_state_init_cbs()
@@ -337,16 +336,12 @@ class BlockStateRunner(BlockProgressor):
db.session.commit()
# manage transactions in a separate database session
# todo separate out the transaction manager completely from runner
# noinspection PyBroadException
try:
await create_and_send_transactions()
except BaseException:
db.session.rollback()
raise
else:
db.session.commit()
finally:
db.close_session()
with db.transaction():
await create_and_send_transactions()
except Exception:
log.exception('Exception in transaction manager')
# publish messages
if pubs and self.publish_all:
@@ -372,7 +367,7 @@ class BlockStateRunner(BlockProgressor):
else:
session.commit()
finally:
session.close()
db.close_session()
async def do_state_init_cbs(self):