From ba165c2aff09edfb769a440c8ef34e37ac9cc90c Mon Sep 17 00:00:00 2001 From: tim Date: Mon, 9 Sep 2024 00:41:11 -0400 Subject: [PATCH] disabled time tick to fix expiries --- src/dexorder/base/order.py | 4 ++ src/dexorder/bin/main.py | 24 +++++++---- src/dexorder/order/executionhandler.py | 14 +++--- src/dexorder/order/triggers.py | 60 ++++++++++++++++++++------ src/dexorder/runner.py | 12 +++++- 5 files changed, 85 insertions(+), 29 deletions(-) diff --git a/src/dexorder/base/order.py b/src/dexorder/base/order.py index 7aa5175..33025ac 100644 --- a/src/dexorder/base/order.py +++ b/src/dexorder/base/order.py @@ -26,5 +26,9 @@ class TrancheKey (OrderKey): vault, order_index, tranche_index = keystring.split('|') return TrancheKey(vault, int(order_index), int(tranche_index)) + @property + def order_key(self): + return OrderKey(self.vault, self.order_index) + def __str__(self): return f'{self.vault}|{self.order_index}|{self.tranche_index}' diff --git a/src/dexorder/bin/main.py b/src/dexorder/bin/main.py index 074384d..6bb2c8c 100644 --- a/src/dexorder/bin/main.py +++ b/src/dexorder/bin/main.py @@ -24,9 +24,16 @@ log = logging.getLogger('dexorder') LOG_ALL_EVENTS = False # for debug todo config -# -# These event callbacks are basically the run loop for every block -# +activate_orders_needed = False + +async def check_activate_orders(): + # we must delay order activation until we are inside the handling of the first block, so that any state updates + # due to time triggers are applied to that block. + global activate_orders_needed + if activate_orders_needed: + await activate_orders() + activate_orders_needed = False + def setup_logevent_triggers(runner): # the triggers for each log events are triggered in the order of event registry, so the @@ -44,9 +51,11 @@ def setup_logevent_triggers(runner): else: executions = dexorder.events.DexorderExecutions() - # the callbacks are run even if there's no blocks and the regular timer triggers. event triggers only run when - # a block is received. + # + # These event callbacks are basically the run loop for every block + # + runner.add_callback(check_activate_orders) runner.add_callback(init) runner.add_event_trigger(handle_vault_created, get_contract_event('Vault', 'VaultCreated')) runner.add_event_trigger(handle_vault_impl_changed, get_contract_event('Vault', 'VaultImplChanged')) @@ -86,9 +95,8 @@ async def main(): else: current_blockstate.set(state) current_fork.set(state.root_fork) - await activate_orders() # activate orders first before pushing data to redis - if redis_state: - await redis_state.init(state, state.root_fork) + global activate_orders_needed + activate_orders_needed = True log.info(f'loaded state from db for root block {state.root_branch.height}') runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None) setup_logevent_triggers(runner) diff --git a/src/dexorder/order/executionhandler.py b/src/dexorder/order/executionhandler.py index 0832d71..9f11526 100644 --- a/src/dexorder/order/executionhandler.py +++ b/src/dexorder/order/executionhandler.py @@ -12,7 +12,8 @@ from dexorder.contract.dexorder import get_dexorder_contract from dexorder.database.model.transaction import TransactionJob from dexorder.order.orderstate import Order from dexorder.order.triggers import (inflight_execution_requests, OrderTriggers, - close_order_and_disable_triggers, TrancheState, active_tranches) + close_order_and_disable_triggers, TrancheState, active_tranches, TrancheTrigger, + order_error) from dexorder.transactions import TransactionHandler, TrancheExecutionRequest, submit_transaction_request, \ new_tranche_execution_request from dexorder.util import hexbytes @@ -112,7 +113,10 @@ def finish_execution_request(tk: TrancheKey, error: Optional[str]=None): pass elif error == 'TL': log.debug(f'tranche {tk} execution failed due to "TL" too late') - pass + try: + OrderTriggers.instances[tk.order_key].expire_tranche(tk.tranche_index) + except KeyError: + pass elif error == 'LL': log.debug(f'tranche {tk} execution failed due to "LL" lower limit') pass @@ -121,14 +125,14 @@ def finish_execution_request(tk: TrancheKey, error: Optional[str]=None): pass elif error == 'OVR': log.warning(f'tranche {tk} execution failed due to "OVR" overfilled') - # this should never happen. Shut down the order. - close_order_and_disable_triggers(order, SwapOrderState.Error) + # this should never happen. Shut down the entire order. + order_error(order, error) elif error == 'K': log.error(f'vault killed') close_order_and_disable_triggers(order, SwapOrderState.Error) elif error == 'STF': log.error(f'tranche {tk} execution failed due to "STF" safe transfer failure') - close_order_and_disable_triggers(order, SwapOrderState.Error) + order_error(order, error) else: slash() msg = '' if not error else error diff --git a/src/dexorder/order/triggers.py b/src/dexorder/order/triggers.py index e80a280..11382c2 100644 --- a/src/dexorder/order/triggers.py +++ b/src/dexorder/order/triggers.py @@ -115,27 +115,41 @@ async def end_trigger_updates(): Call once after all updates have been handled. This updates the active_tranches array based on final trigger state. """ PriceLineTrigger.end_updates(current_clock.get().timestamp) - for tk in _dirty: - if _trigger_state.get(tk,0) == 0: - # all clear for execution. add to active list with any necessary proofs - active_tranches[tk] = PriceProof(0) - else: - # blocked by one or more triggers. delete from active list. - try: - del active_tranches[tk] - except KeyError: - pass - _dirty.clear() + # dirty can change + global _dirty + while _dirty: + working_set = _dirty + _dirty = set() + for tk in working_set: + if _trigger_state.get(tk,0) == 0: + # all clear for execution. add to active list with any necessary proofs + active_tranches[tk] = PriceProof(0) + else: + # blocked by one or more triggers being False (nonzero mask) + # check expiry constraint + try: + TrancheTrigger.all[tk].check_expire() + except KeyError: + pass + # delete from active list. + try: + del active_tranches[tk] + except KeyError: + pass def close_order_and_disable_triggers(order: Order, final_state: SwapOrderState): - order.complete(final_state) try: triggers = OrderTriggers.instances[order.key] except KeyError: pass else: triggers.disable() + order.complete(final_state) + + +def order_error(order: Order, message:str=None): + close_order_and_disable_triggers(order, SwapOrderState.Error) # NOTE: we store the INVERSE of each trigger's value! this causes the test for "All True" to be comparison with 0 @@ -155,6 +169,9 @@ class Trigger: self.value = value _dirty.add(self.tk) + def __bool__(self): + return self.value + @property def value(self): return _trigger_state.get(self.tk,0) & (1 << self.position) == 0 # NOTE: inverted @@ -213,14 +230,12 @@ class BalanceTrigger (Trigger): super().__init__(0, tk, value) self.order = Order.of(self.tk) self.vault_token = self.tk.vault, self.order.status.order.tokenIn - log.debug(f'adding balanc trigger {id(self)}') BalanceTrigger.by_vault_token[self.vault_token].add(self) async def update(self, balance): self.value = await input_amount_is_sufficient(self.order, balance) def remove(self): - log.debug(f'removing balanc trigger {id(self)}') try: BalanceTrigger.by_vault_token[self.vault_token].remove(self) except KeyError: @@ -266,6 +281,8 @@ class TimeTrigger (Trigger): # called when our self.time has been reached self.value = self.is_start self.active = False + if not self.is_start: + OrderTriggers.instances[self.tk.order_key].expire_tranche(self.tk.tranche_index) # we are popped off the stack by update_all() def remove(self): @@ -361,12 +378,14 @@ class PriceLineTrigger (Trigger): async def activate_orders(): log.debug('activating orders') # this is a state init callback, called only once after the state has been loaded from the db or created fresh + start_trigger_updates() keys = list(Order.open_orders) for key in keys: order = Order.of(key) # setup triggers await activate_order(order) # too many to really parallelize, and it's startup anyway # log.debug(f'activated {order}') + await end_trigger_updates() log.debug(f'activated {len(keys)} orders') @@ -392,6 +411,7 @@ class TrancheState (Enum): class TrancheTrigger: + all: dict[TrancheKey,'TrancheTrigger'] = {} @staticmethod async def create(order: Order, tk: TrancheKey) -> 'TrancheTrigger': @@ -440,6 +460,7 @@ class TrancheTrigger: TrancheState.Early if self.activation_trigger is None and not self.activation_trigger else \ TrancheState.Active _dirty.add(tk) + TrancheTrigger.all[tk] = self log.debug(f'Tranche {tk} initial status {self.status} {self}') @@ -455,7 +476,14 @@ class TrancheTrigger: self.slash_count = 0 # reset slash count return filled + def check_expire(self): + # if the expiration constraint has become False then the tranche can never execute again + if not self.expiration_trigger: + OrderTriggers.instances[self.tk.order_key].expire_tranche(self.tk.tranche_index) + def expire(self): + if self.status == TrancheState.Expired: + return order_log.debug(f'tranche expired {self.tk}') self.status = TrancheState.Expired self.disable() @@ -494,6 +522,10 @@ class TrancheTrigger: self.min_trigger.remove() if self.max_trigger is not None: self.max_trigger.remove() + try: + del TrancheTrigger.all[self.tk] + except KeyError: + pass try: del _trigger_state[self.tk] except KeyError: diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index d55d754..d89ce12 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -221,8 +221,9 @@ class BlockStateRunner(BlockProgressor): try: await asyncio.wait_for(self.new_head_event.wait(), timeout=1) # todo configure except TimeoutError: - if fork is not None: - await self.handle_time_tick(fork) + # DISABLED. See note on handle_time_tick() + # if fork is not None: + # await self.handle_time_tick(fork) continue except asyncio.CancelledError: break @@ -357,6 +358,13 @@ class BlockStateRunner(BlockProgressor): async def handle_time_tick(self, fork: Fork): + # WARNING: BROKEN! + # Time ticks do not occur during blocks and therefore violate the basics of the blockstate system. Any changes + # to blockstate during a time tick are made against the state of the head block, but the processing of that + # block has already finished. Any changes from the time tick are made in Python memory only and do not + # propragate to the DB or Redis. + # TIME TICKS ARE DISABLED FOR THIS REASON + return current_fork.set(fork) session = db.session session.begin()