disabled time tick to fix expiries
This commit is contained in:
@@ -26,5 +26,9 @@ class TrancheKey (OrderKey):
|
||||
vault, order_index, tranche_index = keystring.split('|')
|
||||
return TrancheKey(vault, int(order_index), int(tranche_index))
|
||||
|
||||
@property
|
||||
def order_key(self):
|
||||
return OrderKey(self.vault, self.order_index)
|
||||
|
||||
def __str__(self):
|
||||
return f'{self.vault}|{self.order_index}|{self.tranche_index}'
|
||||
|
||||
@@ -24,9 +24,16 @@ log = logging.getLogger('dexorder')
|
||||
LOG_ALL_EVENTS = False # for debug todo config
|
||||
|
||||
|
||||
#
|
||||
# These event callbacks are basically the run loop for every block
|
||||
#
|
||||
activate_orders_needed = False
|
||||
|
||||
async def check_activate_orders():
|
||||
# we must delay order activation until we are inside the handling of the first block, so that any state updates
|
||||
# due to time triggers are applied to that block.
|
||||
global activate_orders_needed
|
||||
if activate_orders_needed:
|
||||
await activate_orders()
|
||||
activate_orders_needed = False
|
||||
|
||||
|
||||
def setup_logevent_triggers(runner):
|
||||
# the triggers for each log events are triggered in the order of event registry, so the
|
||||
@@ -44,9 +51,11 @@ def setup_logevent_triggers(runner):
|
||||
else:
|
||||
executions = dexorder.events.DexorderExecutions()
|
||||
|
||||
# the callbacks are run even if there's no blocks and the regular timer triggers. event triggers only run when
|
||||
# a block is received.
|
||||
#
|
||||
# These event callbacks are basically the run loop for every block
|
||||
#
|
||||
|
||||
runner.add_callback(check_activate_orders)
|
||||
runner.add_callback(init)
|
||||
runner.add_event_trigger(handle_vault_created, get_contract_event('Vault', 'VaultCreated'))
|
||||
runner.add_event_trigger(handle_vault_impl_changed, get_contract_event('Vault', 'VaultImplChanged'))
|
||||
@@ -86,9 +95,8 @@ async def main():
|
||||
else:
|
||||
current_blockstate.set(state)
|
||||
current_fork.set(state.root_fork)
|
||||
await activate_orders() # activate orders first before pushing data to redis
|
||||
if redis_state:
|
||||
await redis_state.init(state, state.root_fork)
|
||||
global activate_orders_needed
|
||||
activate_orders_needed = True
|
||||
log.info(f'loaded state from db for root block {state.root_branch.height}')
|
||||
runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None)
|
||||
setup_logevent_triggers(runner)
|
||||
|
||||
@@ -12,7 +12,8 @@ from dexorder.contract.dexorder import get_dexorder_contract
|
||||
from dexorder.database.model.transaction import TransactionJob
|
||||
from dexorder.order.orderstate import Order
|
||||
from dexorder.order.triggers import (inflight_execution_requests, OrderTriggers,
|
||||
close_order_and_disable_triggers, TrancheState, active_tranches)
|
||||
close_order_and_disable_triggers, TrancheState, active_tranches, TrancheTrigger,
|
||||
order_error)
|
||||
from dexorder.transactions import TransactionHandler, TrancheExecutionRequest, submit_transaction_request, \
|
||||
new_tranche_execution_request
|
||||
from dexorder.util import hexbytes
|
||||
@@ -112,7 +113,10 @@ def finish_execution_request(tk: TrancheKey, error: Optional[str]=None):
|
||||
pass
|
||||
elif error == 'TL':
|
||||
log.debug(f'tranche {tk} execution failed due to "TL" too late')
|
||||
pass
|
||||
try:
|
||||
OrderTriggers.instances[tk.order_key].expire_tranche(tk.tranche_index)
|
||||
except KeyError:
|
||||
pass
|
||||
elif error == 'LL':
|
||||
log.debug(f'tranche {tk} execution failed due to "LL" lower limit')
|
||||
pass
|
||||
@@ -121,14 +125,14 @@ def finish_execution_request(tk: TrancheKey, error: Optional[str]=None):
|
||||
pass
|
||||
elif error == 'OVR':
|
||||
log.warning(f'tranche {tk} execution failed due to "OVR" overfilled')
|
||||
# this should never happen. Shut down the order.
|
||||
close_order_and_disable_triggers(order, SwapOrderState.Error)
|
||||
# this should never happen. Shut down the entire order.
|
||||
order_error(order, error)
|
||||
elif error == 'K':
|
||||
log.error(f'vault killed')
|
||||
close_order_and_disable_triggers(order, SwapOrderState.Error)
|
||||
elif error == 'STF':
|
||||
log.error(f'tranche {tk} execution failed due to "STF" safe transfer failure')
|
||||
close_order_and_disable_triggers(order, SwapOrderState.Error)
|
||||
order_error(order, error)
|
||||
else:
|
||||
slash()
|
||||
msg = '<unspecified>' if not error else error
|
||||
|
||||
@@ -115,27 +115,41 @@ async def end_trigger_updates():
|
||||
Call once after all updates have been handled. This updates the active_tranches array based on final trigger state.
|
||||
"""
|
||||
PriceLineTrigger.end_updates(current_clock.get().timestamp)
|
||||
for tk in _dirty:
|
||||
if _trigger_state.get(tk,0) == 0:
|
||||
# all clear for execution. add to active list with any necessary proofs
|
||||
active_tranches[tk] = PriceProof(0)
|
||||
else:
|
||||
# blocked by one or more triggers. delete from active list.
|
||||
try:
|
||||
del active_tranches[tk]
|
||||
except KeyError:
|
||||
pass
|
||||
_dirty.clear()
|
||||
# dirty can change
|
||||
global _dirty
|
||||
while _dirty:
|
||||
working_set = _dirty
|
||||
_dirty = set()
|
||||
for tk in working_set:
|
||||
if _trigger_state.get(tk,0) == 0:
|
||||
# all clear for execution. add to active list with any necessary proofs
|
||||
active_tranches[tk] = PriceProof(0)
|
||||
else:
|
||||
# blocked by one or more triggers being False (nonzero mask)
|
||||
# check expiry constraint
|
||||
try:
|
||||
TrancheTrigger.all[tk].check_expire()
|
||||
except KeyError:
|
||||
pass
|
||||
# delete from active list.
|
||||
try:
|
||||
del active_tranches[tk]
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
|
||||
def close_order_and_disable_triggers(order: Order, final_state: SwapOrderState):
|
||||
order.complete(final_state)
|
||||
try:
|
||||
triggers = OrderTriggers.instances[order.key]
|
||||
except KeyError:
|
||||
pass
|
||||
else:
|
||||
triggers.disable()
|
||||
order.complete(final_state)
|
||||
|
||||
|
||||
def order_error(order: Order, message:str=None):
|
||||
close_order_and_disable_triggers(order, SwapOrderState.Error)
|
||||
|
||||
|
||||
# NOTE: we store the INVERSE of each trigger's value! this causes the test for "All True" to be comparison with 0
|
||||
@@ -155,6 +169,9 @@ class Trigger:
|
||||
self.value = value
|
||||
_dirty.add(self.tk)
|
||||
|
||||
def __bool__(self):
|
||||
return self.value
|
||||
|
||||
@property
|
||||
def value(self):
|
||||
return _trigger_state.get(self.tk,0) & (1 << self.position) == 0 # NOTE: inverted
|
||||
@@ -213,14 +230,12 @@ class BalanceTrigger (Trigger):
|
||||
super().__init__(0, tk, value)
|
||||
self.order = Order.of(self.tk)
|
||||
self.vault_token = self.tk.vault, self.order.status.order.tokenIn
|
||||
log.debug(f'adding balanc trigger {id(self)}')
|
||||
BalanceTrigger.by_vault_token[self.vault_token].add(self)
|
||||
|
||||
async def update(self, balance):
|
||||
self.value = await input_amount_is_sufficient(self.order, balance)
|
||||
|
||||
def remove(self):
|
||||
log.debug(f'removing balanc trigger {id(self)}')
|
||||
try:
|
||||
BalanceTrigger.by_vault_token[self.vault_token].remove(self)
|
||||
except KeyError:
|
||||
@@ -266,6 +281,8 @@ class TimeTrigger (Trigger):
|
||||
# called when our self.time has been reached
|
||||
self.value = self.is_start
|
||||
self.active = False
|
||||
if not self.is_start:
|
||||
OrderTriggers.instances[self.tk.order_key].expire_tranche(self.tk.tranche_index)
|
||||
# we are popped off the stack by update_all()
|
||||
|
||||
def remove(self):
|
||||
@@ -361,12 +378,14 @@ class PriceLineTrigger (Trigger):
|
||||
async def activate_orders():
|
||||
log.debug('activating orders')
|
||||
# this is a state init callback, called only once after the state has been loaded from the db or created fresh
|
||||
start_trigger_updates()
|
||||
keys = list(Order.open_orders)
|
||||
for key in keys:
|
||||
order = Order.of(key)
|
||||
# setup triggers
|
||||
await activate_order(order) # too many to really parallelize, and it's startup anyway
|
||||
# log.debug(f'activated {order}')
|
||||
await end_trigger_updates()
|
||||
log.debug(f'activated {len(keys)} orders')
|
||||
|
||||
|
||||
@@ -392,6 +411,7 @@ class TrancheState (Enum):
|
||||
|
||||
|
||||
class TrancheTrigger:
|
||||
all: dict[TrancheKey,'TrancheTrigger'] = {}
|
||||
|
||||
@staticmethod
|
||||
async def create(order: Order, tk: TrancheKey) -> 'TrancheTrigger':
|
||||
@@ -440,6 +460,7 @@ class TrancheTrigger:
|
||||
TrancheState.Early if self.activation_trigger is None and not self.activation_trigger else \
|
||||
TrancheState.Active
|
||||
_dirty.add(tk)
|
||||
TrancheTrigger.all[tk] = self
|
||||
log.debug(f'Tranche {tk} initial status {self.status} {self}')
|
||||
|
||||
|
||||
@@ -455,7 +476,14 @@ class TrancheTrigger:
|
||||
self.slash_count = 0 # reset slash count
|
||||
return filled
|
||||
|
||||
def check_expire(self):
|
||||
# if the expiration constraint has become False then the tranche can never execute again
|
||||
if not self.expiration_trigger:
|
||||
OrderTriggers.instances[self.tk.order_key].expire_tranche(self.tk.tranche_index)
|
||||
|
||||
def expire(self):
|
||||
if self.status == TrancheState.Expired:
|
||||
return
|
||||
order_log.debug(f'tranche expired {self.tk}')
|
||||
self.status = TrancheState.Expired
|
||||
self.disable()
|
||||
@@ -494,6 +522,10 @@ class TrancheTrigger:
|
||||
self.min_trigger.remove()
|
||||
if self.max_trigger is not None:
|
||||
self.max_trigger.remove()
|
||||
try:
|
||||
del TrancheTrigger.all[self.tk]
|
||||
except KeyError:
|
||||
pass
|
||||
try:
|
||||
del _trigger_state[self.tk]
|
||||
except KeyError:
|
||||
|
||||
@@ -221,8 +221,9 @@ class BlockStateRunner(BlockProgressor):
|
||||
try:
|
||||
await asyncio.wait_for(self.new_head_event.wait(), timeout=1) # todo configure
|
||||
except TimeoutError:
|
||||
if fork is not None:
|
||||
await self.handle_time_tick(fork)
|
||||
# DISABLED. See note on handle_time_tick()
|
||||
# if fork is not None:
|
||||
# await self.handle_time_tick(fork)
|
||||
continue
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
@@ -357,6 +358,13 @@ class BlockStateRunner(BlockProgressor):
|
||||
|
||||
|
||||
async def handle_time_tick(self, fork: Fork):
|
||||
# WARNING: BROKEN!
|
||||
# Time ticks do not occur during blocks and therefore violate the basics of the blockstate system. Any changes
|
||||
# to blockstate during a time tick are made against the state of the head block, but the processing of that
|
||||
# block has already finished. Any changes from the time tick are made in Python memory only and do not
|
||||
# propragate to the DB or Redis.
|
||||
# TIME TICKS ARE DISABLED FOR THIS REASON
|
||||
return
|
||||
current_fork.set(fork)
|
||||
session = db.session
|
||||
session.begin()
|
||||
|
||||
Reference in New Issue
Block a user