trigger bugfixes
This commit is contained in:
@@ -1,5 +1,4 @@
|
||||
import logging
|
||||
import math
|
||||
from contextvars import ContextVar
|
||||
from datetime import datetime, timezone
|
||||
|
||||
@@ -73,7 +72,7 @@ class BlockClock (Clock):
|
||||
def update(self, block_timestamp: int):
|
||||
now = datetime.now().timestamp()
|
||||
self.adjustment = block_timestamp - now
|
||||
# logging.getLogger(__name__).debug(f'blocktime {datetime.fromtimestamp(block_timestamp,tz=timezone.utc)} {block_timestamp} - now {now} = {self.adjustment}')
|
||||
logging.getLogger(__name__).debug(f'blocktime {datetime.fromtimestamp(block_timestamp,tz=timezone.utc)} {block_timestamp} - now {now} = {self.adjustment}')
|
||||
|
||||
@property
|
||||
def timestamp(self):
|
||||
|
||||
@@ -8,13 +8,12 @@ from web3.types import EventData
|
||||
from dexorder import db
|
||||
from dexorder.base import TransactionReceiptDict
|
||||
from dexorder.base.order import TrancheKey, OrderKey
|
||||
from dexorder.base.orderlib import SwapOrderState, PriceProof
|
||||
from dexorder.base.orderlib import PriceProof
|
||||
from dexorder.contract.dexorder import get_dexorder_contract
|
||||
from dexorder.database.model.transaction import TransactionJob
|
||||
from dexorder.order.orderstate import Order
|
||||
from dexorder.order.triggers import (inflight_execution_requests, OrderTriggers,
|
||||
close_order_and_disable_triggers, TrancheState, active_tranches, TrancheTrigger,
|
||||
order_error)
|
||||
TrancheState, active_tranches, order_error)
|
||||
from dexorder.transactions import TransactionHandler, TrancheExecutionRequest, submit_transaction_request, \
|
||||
new_tranche_execution_request
|
||||
from dexorder.util import hexbytes
|
||||
@@ -81,9 +80,7 @@ def finish_execution_request(tk: TrancheKey, error: Optional[str]=None):
|
||||
|
||||
def retry():
|
||||
trig = get_trigger()
|
||||
if trig is None:
|
||||
log.warning(f'Trying to touch a nonexistent trigger for tranche {tk}')
|
||||
else:
|
||||
if trig is not None:
|
||||
trig.touch()
|
||||
|
||||
#
|
||||
@@ -101,11 +98,11 @@ def finish_execution_request(tk: TrancheKey, error: Optional[str]=None):
|
||||
# Square-root price limit from Uniswap means we asked for a limit price that isn't met. This is a fault of
|
||||
# vault impl if it happens.
|
||||
log.warning(f'SPL when executing tranche {tk}')
|
||||
close_order_and_disable_triggers(order, SwapOrderState.Error)
|
||||
order_error(order, error)
|
||||
elif error == 'NO':
|
||||
# order is not open
|
||||
log.warning(f'order {order_key} was closed, undetected!')
|
||||
close_order_and_disable_triggers(order, SwapOrderState.Error) # We do not know if it was filled or not so only Error status can be given
|
||||
order_error(order, error) # We do not know if it was filled or not so only Error status can be given
|
||||
elif error == 'TF':
|
||||
# Tranche Filled
|
||||
log.warning(f'tranche already filled {tk}')
|
||||
@@ -142,7 +139,7 @@ def finish_execution_request(tk: TrancheKey, error: Optional[str]=None):
|
||||
order_error(order, error)
|
||||
elif error == 'K':
|
||||
log.error(f'vault killed')
|
||||
close_order_and_disable_triggers(order, SwapOrderState.Error)
|
||||
order_error(order, error)
|
||||
elif error == 'STF':
|
||||
log.error(f'tranche {tk} execution failed due to "STF" safe transfer failure')
|
||||
order_error(order, error)
|
||||
|
||||
@@ -75,9 +75,9 @@ class OrderTriggers:
|
||||
return not self.closed
|
||||
|
||||
def check_complete(self):
|
||||
if all(t.closed for t in self.triggers):
|
||||
if self.closed:
|
||||
final_state = SwapOrderState.Filled if self.order.remaining == 0 or self.order.remaining < self.order.min_fill_amount else SwapOrderState.Expired
|
||||
close_order_and_disable_triggers(self.order, final_state)
|
||||
self.order.complete(final_state)
|
||||
|
||||
def fill(self, tx: str, time: int, tranche_index, amount_in, amount_out, fee, next_activation_time):
|
||||
self.order.add_fill(tx, time, tranche_index, amount_in, amount_out, fee, next_activation_time)
|
||||
@@ -97,7 +97,7 @@ def start_trigger_updates():
|
||||
"""
|
||||
Called near the beginning of block handling to initialize any per-block trigger data structures
|
||||
"""
|
||||
# log.debug(f'current clock is {current_clock.get().timestamp} vs {timestamp()} => Δ{current_clock.get().timestamp-timestamp():.1f}s')
|
||||
log.debug(f'current clock is {current_clock.get().timestamp} vs {timestamp()} => Δ{current_clock.get().timestamp-timestamp():.1f}s')
|
||||
TimeTrigger.update_all(current_clock.get().timestamp)
|
||||
PriceLineTrigger.clear_data()
|
||||
|
||||
@@ -142,11 +142,8 @@ async def end_trigger_updates():
|
||||
pass
|
||||
|
||||
|
||||
def close_order_and_disable_triggers(order: Order, final_state: SwapOrderState):
|
||||
order.complete(final_state) # this will call _disable_triggers(order)
|
||||
|
||||
|
||||
def _disable_triggers(order):
|
||||
def _Order__disable_triggers(order):
|
||||
""" implementation of Order.disable_triggers() """
|
||||
try:
|
||||
triggers = OrderTriggers.instances[order.key]
|
||||
except KeyError:
|
||||
@@ -154,15 +151,16 @@ def _disable_triggers(order):
|
||||
else:
|
||||
triggers.disable()
|
||||
|
||||
Order._disable_triggers = _disable_triggers
|
||||
Order._disable_triggers = _Order__disable_triggers
|
||||
|
||||
|
||||
def order_error(order: Order, message:str=None):
|
||||
close_order_and_disable_triggers(order, SwapOrderState.Error)
|
||||
def order_error(order: Order, _message:str=None):
|
||||
# todo record message and propagate to client
|
||||
order.complete(SwapOrderState.Error)
|
||||
|
||||
|
||||
# NOTE: we store the INVERSE of each trigger's value! this causes the test for "All True" to be comparison with 0
|
||||
# instead of comparison with a set of 1's the correct size. By storing inverted values, the group does not
|
||||
# NOTE: _trigger_state stores the INVERSE of each trigger's value! this causes the test for "All True" to be comparison
|
||||
# with 0 instead of comparison with a set of 1's the correct size. By storing inverted values, the group does not
|
||||
# need to know the number of child triggers, only that no falses have been reported.
|
||||
_trigger_state: BlockDict[TrancheKey, int] = BlockDict('trig', str2key=TrancheKey.str2key, db=True)
|
||||
_dirty:set[TrancheKey] = set()
|
||||
@@ -183,6 +181,7 @@ class Trigger:
|
||||
|
||||
@property
|
||||
def value(self):
|
||||
""" A value of True means trading is currently permitted. """
|
||||
return _trigger_state.get(self.tk,0) & (1 << self.position) == 0 # NOTE: inverted
|
||||
|
||||
@value.setter
|
||||
@@ -193,20 +192,27 @@ class Trigger:
|
||||
if value != old:
|
||||
_dirty.add(self.tk)
|
||||
if not value: # this conditional is inverted
|
||||
_trigger_state[self.tk] = old | (1 << self.position) # set
|
||||
_trigger_state[self.tk] = state | (1 << self.position) # set
|
||||
else:
|
||||
_trigger_state[self.tk] = old & ~(1 << self.position) # clear
|
||||
_trigger_state[self.tk] = state & ~(1 << self.position) # clear
|
||||
self._value_changed()
|
||||
|
||||
|
||||
def _value_changed(self): pass
|
||||
|
||||
|
||||
@abstractmethod
|
||||
def remove(self): ...
|
||||
|
||||
|
||||
async def has_funds(tk: TrancheKey):
|
||||
# log.debug(f'has funds? {tk.vault}')
|
||||
log.debug(f'has funds? {tk.vault}')
|
||||
order = Order.of(tk)
|
||||
balances = vault_balances.get(tk.vault, {})
|
||||
log.debug(f'balances {balances}')
|
||||
token_addr = order.status.order.tokenIn
|
||||
token_balance = balances.get(token_addr)
|
||||
log.debug(f'amount of {token_addr} = {token_balance}')
|
||||
if token_balance is None:
|
||||
# unknown balance
|
||||
token_balance = balances[token_addr] = await ERC20(token_addr).balanceOf(tk.vault)
|
||||
@@ -216,10 +222,13 @@ async def has_funds(tk: TrancheKey):
|
||||
|
||||
|
||||
async def input_amount_is_sufficient(order, token_balance):
|
||||
log.debug(f'input is sufficient? {order.min_fill_amount}')
|
||||
if order.amount_is_input:
|
||||
return token_balance >= order.status.order.minFillAmount
|
||||
log.debug(f'amount is input: {token_balance} >= {order.min_fill_amount}')
|
||||
return token_balance >= order.min_fill_amount
|
||||
# amount is an output amount, so we need to know the price
|
||||
price = pool_prices.get(order.pool_address)
|
||||
log.debug(f'amount is output amount. price={price}')
|
||||
if price is None:
|
||||
return token_balance > 0 # we don't know the price so we allow any nonzero amount to be sufficient
|
||||
pool = await get_pool(order.pool_address)
|
||||
@@ -227,7 +236,7 @@ async def input_amount_is_sufficient(order, token_balance):
|
||||
inverted = order.order.tokenIn != pool['base']
|
||||
minimum = dec(order.min_fill_amount)*price if inverted else dec(order.min_fill_amount)/price
|
||||
log.debug(f'order minimum amount is {order.min_fill_amount} '+ ("input" if order.amount_is_input else f"output @ {price} = {minimum} ")+f'< {token_balance} balance')
|
||||
return token_balance > minimum
|
||||
return token_balance >= minimum
|
||||
|
||||
|
||||
class BalanceTrigger (Trigger):
|
||||
@@ -235,22 +244,26 @@ class BalanceTrigger (Trigger):
|
||||
|
||||
@staticmethod
|
||||
async def create(tk: TrancheKey):
|
||||
value = await has_funds(tk)
|
||||
return BalanceTrigger(tk, value)
|
||||
value = await has_funds(tk)
|
||||
bt = BalanceTrigger(tk, value)
|
||||
log.debug(f'got bt {id(bt)}')
|
||||
return bt
|
||||
|
||||
def __init__(self, tk: TrancheKey, value: bool):
|
||||
super().__init__(0, tk, value)
|
||||
self.order = Order.of(self.tk)
|
||||
self.vault_token = self.tk.vault, self.order.status.order.tokenIn
|
||||
BalanceTrigger.by_vault_token[self.vault_token].add(self)
|
||||
log.debug(f'initializing Balance Trigger {id(self)} {tk} {value} {self.value}')
|
||||
|
||||
async def update(self, balance):
|
||||
self.value = await input_amount_is_sufficient(self.order, balance)
|
||||
log.debug(f'update balance {balance} was sufficient? {self.value}')
|
||||
|
||||
def remove(self):
|
||||
try:
|
||||
BalanceTrigger.by_vault_token[self.vault_token].remove(self)
|
||||
except KeyError:
|
||||
except (KeyError, ValueError):
|
||||
pass
|
||||
|
||||
|
||||
@@ -267,13 +280,14 @@ class TimeTrigger (Trigger):
|
||||
return TimeTrigger(is_start, tk, time, time_now)
|
||||
|
||||
def __init__(self, is_start: bool, tk: TrancheKey, time: int, time_now: int):
|
||||
triggered = time_now >= time
|
||||
super().__init__(1 if is_start else 2, tk, triggered is is_start)
|
||||
trigger_type = 1 if is_start else 2
|
||||
in_future = time_now >= time
|
||||
value = in_future is is_start
|
||||
self.is_start = is_start
|
||||
self._time = time
|
||||
self.active = not triggered
|
||||
if self.active:
|
||||
TimeTrigger.all.add(self)
|
||||
self.active = False # whether this trigger is in the `all` list or not (has the time passed?)
|
||||
super().__init__(trigger_type, tk, value)
|
||||
log.debug(f'created time trigger {self} {from_timestamp(time)} now={from_timestamp(time_now)} {value}')
|
||||
|
||||
@property
|
||||
def time(self):
|
||||
@@ -290,25 +304,43 @@ class TimeTrigger (Trigger):
|
||||
if self.active:
|
||||
# remove old trigger
|
||||
TimeTrigger.all.remove(self)
|
||||
self.active = (time_now > time) is self.is_start
|
||||
if self.active:
|
||||
self.update_active(time_now)
|
||||
|
||||
def update_active(self, time_now: int = None, time: int = None):
|
||||
if time_now is None:
|
||||
time_now = current_clock.get().timestamp
|
||||
if time is None:
|
||||
time = self._time
|
||||
next_active = (time_now > time) is self.is_start
|
||||
activate = not self.active and next_active
|
||||
if activate:
|
||||
log.debug(f'adding time trigger {self}')
|
||||
TimeTrigger.all.add(self)
|
||||
self.active = next_active
|
||||
|
||||
def _value_changed(self):
|
||||
self.update_active()
|
||||
|
||||
def update(self):
|
||||
# called when our self.time has been reached
|
||||
log.debug(f'update time trigger {self}')
|
||||
self.value = self.is_start
|
||||
self.active = False
|
||||
if not self.is_start:
|
||||
if not self.is_start and not self.value:
|
||||
OrderTriggers.instances[self.tk.order_key].expire_tranche(self.tk.tranche_index)
|
||||
# we are popped off the stack by update_all()
|
||||
|
||||
def remove(self):
|
||||
if self.active:
|
||||
TimeTrigger.all.remove(self)
|
||||
try:
|
||||
log.debug(f'remove time trigger {self}')
|
||||
TimeTrigger.all.remove(self)
|
||||
except ValueError:
|
||||
pass
|
||||
self.active = False
|
||||
|
||||
@staticmethod
|
||||
def update_all(time):
|
||||
log.debug(f'update time triggers {time}')
|
||||
while TimeTrigger.all and TimeTrigger.all[0].time <= time:
|
||||
# todo this doesnt work across reorgs. we need to keep a BlockState cursor of the last time handled,
|
||||
# then activate any time triggers from that past time through the present. time triggers may only
|
||||
@@ -318,6 +350,9 @@ class TimeTrigger (Trigger):
|
||||
t = TimeTrigger.all.pop(0)
|
||||
t.update()
|
||||
|
||||
def __str__(self):
|
||||
return f'{self.tk} {"start" if self.is_start else "end"}'
|
||||
|
||||
|
||||
class PriceLineTrigger (Trigger):
|
||||
by_pool: dict[str,set['PriceLineTrigger']] = defaultdict(set)
|
||||
@@ -430,7 +465,9 @@ async def activate_order(order: Order):
|
||||
triggers = await OrderTriggers.create(order)
|
||||
if triggers.closed:
|
||||
log.debug(f'order {order.key} was immediately closed')
|
||||
close_order_and_disable_triggers(order, SwapOrderState.Filled if order.remaining == 0 or order.remaining <= order.min_fill_amount else SwapOrderState.Expired)
|
||||
final_state = SwapOrderState.Filled if order.remaining == 0 or order.remaining < order.min_fill_amount \
|
||||
else SwapOrderState.Expired
|
||||
order.complete(final_state)
|
||||
|
||||
|
||||
class TrancheState (Enum):
|
||||
@@ -520,7 +557,7 @@ class TrancheTrigger:
|
||||
|
||||
def check_expire(self):
|
||||
# if the expiration constraint has become False then the tranche can never execute again
|
||||
if self.expiration_trigger is not None and self.expiration_trigger:
|
||||
if self.expiration_trigger is not None and not self.expiration_trigger:
|
||||
OrderTriggers.instances[self.tk.order_key].expire_tranche(self.tk.tranche_index)
|
||||
|
||||
def expire(self):
|
||||
@@ -555,6 +592,7 @@ class TrancheTrigger:
|
||||
|
||||
def disable(self):
|
||||
# permanently stop this trigger and deconstruct
|
||||
log.debug(f'removing bt {id(self.balance_trigger)}')
|
||||
self.balance_trigger.remove()
|
||||
if self.activation_trigger is not None:
|
||||
self.activation_trigger.remove()
|
||||
@@ -592,7 +630,7 @@ class TrancheTrigger:
|
||||
def __str__(self):
|
||||
trigs = []
|
||||
if self.balance_trigger is not None:
|
||||
trigs.append(f'balance {self.balance_trigger.value}')
|
||||
trigs.append(f'balance {self.balance_trigger.value} {id(self.balance_trigger)}')
|
||||
if self.activation_trigger is not None:
|
||||
trigs.append(f'activation {self.activation_trigger.value}')
|
||||
if self.expiration_trigger is not None:
|
||||
|
||||
Reference in New Issue
Block a user