diff --git a/requirements.txt b/requirements.txt index 9c3b04c..7601947 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ sqlalchemy alembic omegaconf -web3>=6.11.1 +web3 psycopg2-binary orjson sortedcontainers @@ -14,3 +14,5 @@ cachetools async-lru eth-bloom python-dateutil +eth_abi +pdpyras # pagerduty diff --git a/src/dexorder/alert.py b/src/dexorder/alert.py new file mode 100644 index 0000000..b697de1 --- /dev/null +++ b/src/dexorder/alert.py @@ -0,0 +1,72 @@ +import logging +import socket + +import pdpyras + +from dexorder import NARG, config + +log = logging.getLogger(__name__) + + +def alert(title, message, dedup_key=NARG, log_level=logging.ERROR, do_log=True): + if dedup_key is NARG: + dedup_key = str(hash(title + '|' + message)) + if do_log: + msg = f'{title}: {message}' + log.log(log_level, msg) # if log_level=CRITICAL for example, make sure this does not re-alert! + alert_pagerduty(title, message, dedup_key, log_level) + + +def warningAlert(title, message, dedup_key=NARG, log_level=logging.WARNING): + return alert(title, message, dedup_key, log_level) + + +async def spawn_alert(title, message, dedup_key): + alert_pagerduty(title,message,dedup_key) + + +pagerduty_session = None +hostname = None + +def alert_pagerduty(title, message, dedup_key, log_level): + if not config.pagerduty: + return + try: + global pagerduty_session + global hostname + if pagerduty_session is None: + pagerduty_session = pdpyras.EventsAPISession(config.pagerduty) + hostname = socket.gethostname() + sev = 'error' if log_level >= logging.ERROR else 'warning' + pagerduty_session.trigger(title, hostname, severity=sev, custom_details={'message': message}, dedup_key=dedup_key) + except Exception: + log.warning('Could not notify PagerDuty!', exc_info=True) + + +class AlertHandler(logging.Handler): + + def __init__(self, level=logging.NOTSET): + super().__init__(level) + self.paused = False + + def emit(self, record): + if not self.paused: + msg = self.format(record) + if msg is not None and type(msg) is str: + msg = msg.strip() + msg_last_line = msg.split('\n')[-1] + if msg_last_line: + msg_last_line = ': ' + msg_last_line[:100] + else: + msg_last_line = '' + self.paused = True + try: + alert(record.levelname + msg_last_line, msg, log_level=record.levelno, do_log=False) + finally: + self.paused = False + + +def init_alerts(): + if config.pagerduty: + logging.getLogger('dexorder').addHandler(AlertHandler(logging.WARNING)) + diff --git a/src/dexorder/base/orderlib.py b/src/dexorder/base/orderlib.py index 112c1a8..1c55d96 100644 --- a/src/dexorder/base/orderlib.py +++ b/src/dexorder/base/orderlib.py @@ -105,7 +105,9 @@ class SwapOrderStatus(SwapStatus): self.order = order @staticmethod - def load(obj): + def load(obj, *, Class=None): + if Class is None: + Class = SwapOrderStatus order = SwapOrder.load(obj[0]) fillFeeHalfBps = int(obj[1]) state = SwapOrderState(obj[2]) @@ -117,11 +119,13 @@ class SwapOrderStatus(SwapStatus): trancheFilledIn = [int(f) for f in obj[8]] trancheFilledOut = [int(f) for f in obj[9]] trancheActivationTime = [int(f) for f in obj[10]] - return SwapOrderStatus(order, fillFeeHalfBps, state, startTime, startPrice, ocoGroup, - filledIn, filledOut, trancheFilledIn, trancheFilledOut, trancheActivationTime) + return Class(order, fillFeeHalfBps, state, startTime, startPrice, ocoGroup, + filledIn, filledOut, trancheFilledIn, trancheFilledOut, trancheActivationTime) @staticmethod - def load_from_chain(obj): + def load_from_chain(obj, *, Class=None): + if Class is None: + Class = SwapOrderStatus # 0 SwapOrder order; # 1 int fillFeeHalfBps # 2 bool canceled; @@ -144,8 +148,8 @@ class SwapOrderStatus(SwapStatus): trancheFilledIn = [0 for _ in range(len(obj[7]))] trancheFilledOut = [0 for _ in range(len(obj[7]))] trancheActivationTime = [int(i) for i in obj[8]] - return SwapOrderStatus(order, fillFeeHalfBps, state, startTime, startPrice, ocoGroup, - filledIn, filledOut, trancheFilledIn, trancheFilledOut, trancheActivationTime) + return Class(order, fillFeeHalfBps, state, startTime, startPrice, ocoGroup, + filledIn, filledOut, trancheFilledIn, trancheFilledOut, trancheActivationTime) def dump(self): return ( @@ -158,6 +162,36 @@ class SwapOrderStatus(SwapStatus): def copy(self): return copy.deepcopy(self) + +@dataclass +class ElaboratedSwapOrderStatus (SwapOrderStatus): + @staticmethod + def load_from_tx(tx_id: bytes, obj): + # noinspection PyTypeChecker + status: ElaboratedSwapOrderStatus = SwapOrderStatus.load_from_chain(obj, Class=ElaboratedSwapOrderStatus) + status.tx_id = tx_id + return status + + # noinspection PyMethodOverriding + @staticmethod + def load(obj): + tx_id, *swaporder_args = obj + result = SwapOrderStatus.load(obj[1:], Class=ElaboratedSwapOrderStatus) + result.tx_id = tx_id + return result + + # noinspection PyMissingConstructor + def __init__(self, order: SwapOrder, *swapstatus_args, tx_id=b''): + super().__init__(order, *swapstatus_args) + self.tx_id: bytes = tx_id + + def dump(self): + return self.tx_id, *super().dump() + + def copy(self): + return super().copy() + + NO_OCO = 18446744073709551615 # max uint64 @@ -239,7 +273,8 @@ class Tranche: def __str__(self): msg = f'{self.fraction/MAX_FRACTION:.1%} {"start+" if self.startTimeIsRelative else ""}{self.startTime} to {"start+" if self.startTimeIsRelative else ""}{self.endTime}' if self.marketOrder: - msg += ' market order' + # for marketOrders, minIntercept is the slippage + msg += f' market order slippage {self.minIntercept:.2%}' else: if self.minIntercept or self.minSlope: msg += f' >{self.minIntercept:.5g}' diff --git a/src/dexorder/bin/dice_seed.py b/src/dexorder/bin/dice_seed.py index 379af44..4471436 100644 --- a/src/dexorder/bin/dice_seed.py +++ b/src/dexorder/bin/dice_seed.py @@ -2,7 +2,7 @@ # This script generates a BIP-39 24-word key from physical dice rolls (1-6) # -KEY_LENGTH=128 # use this to select the size of your seed: 128 (12-word), 160, 192, 224, 256 (24-word) +KEY_LENGTH=256 # use this to select the size of your seed: 128 (12-word), 160, 192, 224, 256 (24-word) from bip_utils import Bip39MnemonicEncoder from bitarray import bitarray diff --git a/src/dexorder/bin/executable.py b/src/dexorder/bin/executable.py index 5d3bd4f..e7a982c 100644 --- a/src/dexorder/bin/executable.py +++ b/src/dexorder/bin/executable.py @@ -11,6 +11,7 @@ from typing import Coroutine import sys from dexorder import configuration +from dexorder.alert import init_alerts if __name__ == '__main__': raise Exception('this file is meant to be imported not executed') @@ -43,6 +44,7 @@ def execute(main:Coroutine, shutdown=None, *, parse_logging=True, parse_args=Tru log.info('Logging configured to default') if parse_args: configuration.parse_args() + init_alerts() # loop setup loop = asyncio.get_event_loop() diff --git a/src/dexorder/bin/main.py b/src/dexorder/bin/main.py index a8312e9..fb5ef2b 100644 --- a/src/dexorder/bin/main.py +++ b/src/dexorder/bin/main.py @@ -77,7 +77,7 @@ async def main(): db.connect() if db: db_state = DbState(BlockData.by_opt('db')) - with db.session: + with db.transaction(): state = await db_state.load() if state is None: log.info('no state in database') diff --git a/src/dexorder/configuration/schema.py b/src/dexorder/configuration/schema.py index 1fed0e6..6f85ac4 100644 --- a/src/dexorder/configuration/schema.py +++ b/src/dexorder/configuration/schema.py @@ -38,3 +38,5 @@ class Config: mirror_source_rpc_url: Optional[str] = None # source RPC for original pools mirror_pools: list[str] = field(default_factory=list) mirror_env: Optional[str] = None + + pagerduty: Optional[str] = None diff --git a/src/dexorder/database/__init__.py b/src/dexorder/database/__init__.py index 958c417..6b114f5 100644 --- a/src/dexorder/database/__init__.py +++ b/src/dexorder/database/__init__.py @@ -77,13 +77,15 @@ class Db: @staticmethod def make_session(**kwargs) -> Session: + # if _session.get(None) is not None: + # log.warning('Creating new session when old one isn\'t closed', stack_info=True) engine = _engine.get() if engine is None: raise RuntimeError('Cannot create session: no database engine set. Use dexorder.db.connect() first') kwargs.setdefault('expire_on_commit', False) s = Session(engine, **kwargs) _session.set(s) - # log.debug('created new DB session') + # log.debug(f'Created new DB session {s}', stack_info=True) return s @staticmethod @@ -91,6 +93,7 @@ class Db: s = _session.get() if s is not None: s.close() + # log.debug(f'Closing DB session {s}') # noinspection PyTypeChecker _session.set(None) diff --git a/src/dexorder/database/model/token.py b/src/dexorder/database/model/token.py index 48c503f..762d57a 100644 --- a/src/dexorder/database/model/token.py +++ b/src/dexorder/database/model/token.py @@ -41,7 +41,7 @@ class Token (Base): name: Mapped[str] symbol: Mapped[str] decimals: Mapped[Uint8] - approved: Mapped[bool] + approved: Mapped[bool] = mapped_column(index=True) @staticmethod diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index 77c2b27..ff636f6 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -56,7 +56,7 @@ async def handle_order_placed(event: EventData): contract = VaultContract(addr) obj = await contract.swapOrderStatus(index) log.debug(f'raw order status {obj}') - order = Order.create(addr, index, obj) + order = Order.create(addr, index, event['transactionHash'], obj) await activate_order(order) log.debug(f'new order {order} {order.order}') @@ -70,6 +70,7 @@ def handle_swap_filled(event: EventData): tranche_index = args['trancheIndex'] amount_in = args['amountIn'] amount_out = args['amountOut'] + fill_fee = args['fillFee'] next_execution_time = args['nextExecutionTime'] try: order: Order = Order.of(vault, order_index) @@ -304,14 +305,19 @@ def finish_execution_request(req: TrancheExecutionRequest, error: str): log.debug(f'execution request for tranche {tk} had error "{error}"') if error == '': log.debug(f'execution request for tranche {tk} was successful!') - elif error in ('IIA', 'STF'): # todo not STF + elif error == 'IIA': # Insufficient Input Amount or Safe Transfer Failure: suspend execution until new funds are sent # todo vault balance checks token = order.order.tokenIn log.debug(f'insufficient funds {req.vault} {token} ') + elif error == 'SPL': + # Square-root price limit from Uniswap means we asked for a limit price that isn't met. This is a fault of + # vault logic if it happens. + log.warning(f'SPL when executing tranche {tk}') + close_order_and_disable_triggers(order, SwapOrderState.Error) elif error == 'NO': # order is not open - log.error(f'order {OrderKey(tk.vault,tk.order_index)} was closed, undetected!') + log.warning(f'order {OrderKey(tk.vault,tk.order_index)} was closed, undetected!') close_order_and_disable_triggers(order, SwapOrderState.Error) elif error == 'TF': # Tranche Filled @@ -329,6 +335,7 @@ def finish_execution_request(req: TrancheExecutionRequest, error: str): log.debug('warning: de minimis liquidity in pool') # todo dont keep trying else: + # todo slash and backoff log.error(f'Unhandled execution error for transaction request {req} ERROR: "{error}"') diff --git a/src/dexorder/order/orderstate.py b/src/dexorder/order/orderstate.py index e8c1551..b03e4e0 100644 --- a/src/dexorder/order/orderstate.py +++ b/src/dexorder/order/orderstate.py @@ -6,7 +6,7 @@ from typing import overload from dexorder import DELETE, db, order_log from dexorder.base.chain import current_chain from dexorder.base.order import OrderKey, TrancheKey -from dexorder.base.orderlib import SwapOrderStatus, SwapOrderState +from dexorder.base.orderlib import SwapOrderStatus, SwapOrderState, ElaboratedSwapOrderStatus from dexorder.blockstate import BlockDict, BlockSet from dexorder.database.model.orderindex import OrderIndex from dexorder.routing import pool_address @@ -67,16 +67,19 @@ class Order: @staticmethod def of(a, b=None) -> 'Order': key = (OrderKey(a.vault, a.order_index) if type(a) is TrancheKey else a) if b is None else OrderKey(a, b) - return Order.instances[key] + try: + return Order.instances[key] + except KeyError: + return Order(key) @staticmethod - def create(vault: str, order_index: int, obj): + def create(vault: str, order_index: int, tx_id: bytes, obj): """ use when a brand new order is detected by the system """ key = OrderKey(vault, order_index) if key in Order.instances: raise ValueError - status = SwapOrderStatus.load_from_chain(obj) + status = ElaboratedSwapOrderStatus.load_from_tx(tx_id, obj) Order.order_statuses[key] = status.copy() # always copy the struct when setting. values in BlockData must be immutable order = Order(key) if order.is_open: @@ -255,7 +258,7 @@ class Order: order_statuses: BlockDict[OrderKey, SwapOrderStatus] = BlockDict( 'o', db='lazy', redis=True, pub=pub_order_status, finalize_cb=save_order_index, str2key=OrderKey.str2key, value2str=lambda v: json.dumps(v.dump()), - str2value=lambda s:SwapOrderStatus.load(json.loads(s)), + str2value=lambda s:ElaboratedSwapOrderStatus.load(json.loads(s)), ) # open orders = the set of unfilled, not-canceled orders diff --git a/src/dexorder/order/triggers.py b/src/dexorder/order/triggers.py index 4e8526f..4ba508c 100644 --- a/src/dexorder/order/triggers.py +++ b/src/dexorder/order/triggers.py @@ -134,11 +134,7 @@ class TrancheTrigger: if self.closed: log.debug(f'price trigger ignored because trigger status is {self.status}') return - if now >= self.time_constraint[1]: - order_log.debug(f'tranche expired {self.tk}') - self.status = TrancheStatus.Expired - self.disable() - elif self.status == TrancheStatus.Early and now >= self.time_constraint[0]: + if not self.check_expired(now) and self.status == TrancheStatus.Early and now >= self.time_constraint[0]: order_log.debug(f'tranche time enabled {self.tk}') self.status = TrancheStatus.Pricing self.enable_price_trigger() @@ -193,6 +189,17 @@ class TrancheTrigger: order_log.debug(f'tranche part-filled {self.tk} in:{_amount_in} out:{_amount_out} remaining:{remaining}') return filled + def check_expired(self, now): + expired = now >= self.time_constraint[1] + if expired: + self.expire() + return expired + + def expire(self): + order_log.debug(f'tranche expired {self.tk}') + self.status = TrancheStatus.Expired + self.disable() + def disable(self): try: del active_tranches[self.tk] @@ -244,6 +251,10 @@ class OrderTriggers: if self.triggers[tranche_index].fill(amount_in, amount_out): self.check_complete() + def expire_tranche(self, tranche_index): + self.triggers[tranche_index].expire() + self.check_complete() + def close_order_and_disable_triggers(order: Order, final_state: SwapOrderState): order.complete(final_state) diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index 0017569..87d46c9 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -253,9 +253,10 @@ class BlockStateRunner(BlockProgressor): w3 = current_w3.get() current_blockstate.set(self.state) current_fork.set(fork) - session = None batches = [] pubs = [] + session = db.make_session(autocommit=False) + session.begin() try: branch = fork.branch if branch.disjoint: @@ -289,8 +290,6 @@ class BlockStateRunner(BlockProgressor): batches.append((None, callback, None, None)) # set up for callbacks - session = db.make_session(autocommit=False) - session.begin() current_pub.set(lambda room, evnt, *args: pubs.append((room, evnt, args))) # used by handle_vault_created if not self.state_initialized: await self.do_state_init_cbs() @@ -337,16 +336,12 @@ class BlockStateRunner(BlockProgressor): db.session.commit() # manage transactions in a separate database session - # todo separate out the transaction manager completely from runner + # noinspection PyBroadException try: - await create_and_send_transactions() - except BaseException: - db.session.rollback() - raise - else: - db.session.commit() - finally: - db.close_session() + with db.transaction(): + await create_and_send_transactions() + except Exception: + log.exception('Exception in transaction manager') # publish messages if pubs and self.publish_all: @@ -372,7 +367,7 @@ class BlockStateRunner(BlockProgressor): else: session.commit() finally: - session.close() + db.close_session() async def do_state_init_cbs(self):