diff --git a/src/dexorder/base/orderlib.py b/src/dexorder/base/orderlib.py index f3a2dde..01c516e 100644 --- a/src/dexorder/base/orderlib.py +++ b/src/dexorder/base/orderlib.py @@ -4,6 +4,8 @@ from dataclasses import dataclass from enum import Enum from typing import Optional +from numpy.ma.core import filled + from dexorder.util import hexbytes from dexorder.util.convert import decode_IEEE754 @@ -117,6 +119,23 @@ SwapOrder return msg +@dataclass +class Fill: + tx: str + time: int + filledIn: int + filledOut: int + fee: int + + def dump(self): + return self.tx, self.time, str(self.filledIn), str(self.filledOut), str(self.fee) + + @staticmethod + def load(obj): + tx, time, filledIn, filledOut, fee = obj + return Fill(tx, time, int(filledIn), int(filledOut), int(fee)) + + @dataclass class ElaboratedTrancheStatus: filledIn: int @@ -124,6 +143,7 @@ class ElaboratedTrancheStatus: activationTime: int startTime: int endTime: int + fills: list[Fill] @staticmethod def load_from_chain(obj: tuple[int,int,int,int]): @@ -132,17 +152,20 @@ class ElaboratedTrancheStatus: # we do NOT grab the filled amount from the chain, because our process will handle the fill events # separately by incrementing these status values as fills arrive. 0, 0, - activationTime, startTime, endTime, + activationTime, startTime, endTime, [] ) def dump(self): # filled fields can be larger than JSON-able ints, so we use strings. - return str(self.filledIn), str(self.filledOut), self.activationTime, self.startTime, self.endTime + return str(self.filledIn), str(self.filledOut), self.activationTime, self.startTime, self.endTime, [ + f.dump() for f in self.fills + ] @staticmethod def load(obj: tuple[str,str,int,int,int]): - filledIn, filledOut, activationTime, startTime, endTime = obj - return ElaboratedTrancheStatus(int(filledIn), int(filledOut), activationTime, startTime, endTime) + filledIn, filledOut, activationTime, startTime, endTime, fillsObj = obj + fills = [Fill.load(f) for f in fillsObj] + return ElaboratedTrancheStatus(int(filledIn), int(filledOut), activationTime, startTime, endTime, fills) @@ -324,11 +347,11 @@ class Tranche: if self.minLine.intercept or self.minLine.slope: msg += f' >{self.minLine.intercept:.5g}' if self.minLine.slope: - msg += f'{self.minLine.slope:+.5g}' + msg += f'{self.minLine.slope:+.5g}/s' if self.maxLine.intercept or self.maxLine.slope: msg += f' <{self.maxLine.intercept:.5g}' if self.maxLine.slope: - msg += f'{self.maxLine.slope:+.5g}' + msg += f'{self.maxLine.slope:+.5g}/s' if self.rateLimitPeriod: msg += f' {self.rateLimitFraction/MAX_FRACTION:.1%} every {self.rateLimitPeriod/60:.0} minutes' return msg diff --git a/src/dexorder/blockstate/blockdata.py b/src/dexorder/blockstate/blockdata.py index b5d8c9d..9a2e875 100644 --- a/src/dexorder/blockstate/blockdata.py +++ b/src/dexorder/blockstate/blockdata.py @@ -166,6 +166,11 @@ class BlockDict(Generic[K,V], BlockData[V]): def get(self, item: K, default: V = None) -> V: return self.getitem(item, default) + def pop(self, item: K, default: V = None): + result = self.getitem(item, default) + self.delitem(item) + return result + def setdefault(self, item: K, value: V): return self.setitem(item, value, overwrite=False) diff --git a/src/dexorder/blockstate/state.py b/src/dexorder/blockstate/state.py index 0e7f869..cc28f10 100644 --- a/src/dexorder/blockstate/state.py +++ b/src/dexorder/blockstate/state.py @@ -231,6 +231,7 @@ class BlockState: diff = DiffEntry(value, branch.height, branch.id) diffs.add(diff) self.diffs_by_branch[branch.id].append(DiffEntryItem(series, key, diff)) + return old_value def unload(self, fork: Optional[Fork], series, key): self.unloads[fork.branch_id].append((series, key)) diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index 0e1e739..de82ab6 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -7,7 +7,7 @@ from dexorder import current_pub, minutely from dexorder.base.chain import current_chain from dexorder.base.order import TrancheKey, OrderKey from dexorder.base.orderlib import SwapOrderState -from dexorder.blocks import get_block_timestamp +from dexorder.blocks import get_block_timestamp, current_block from dexorder.contract.dexorder import vault_address, VaultContract, get_factory_contract from dexorder.impls import get_impl_version from dexorder.ohlc import ohlcs @@ -15,6 +15,7 @@ from dexorder.order.orderstate import Order from dexorder.order.triggers import (OrderTriggers, activate_order, update_balance_triggers, start_trigger_updates, update_price_triggers) from dexorder.pools import new_pool_prices, pool_prices, get_uniswap_data +from dexorder.util import hexstr from dexorder.vault_blockdata import vault_owners, adjust_balance, MAX_VAULTS, verify_vault log = logging.getLogger(__name__) @@ -55,7 +56,7 @@ async def handle_order_placed(event: EventData): log.debug(f'new order{order}') -def handle_swap_filled(event: EventData): +async def handle_swap_filled(event: EventData): log.debug('handle_swap_filled') # event DexorderSwapFilled (uint64 orderIndex, uint8 trancheIndex, uint256 amountIn, uint256 amountOut); log.debug(f'DexorderSwapFilled {event}') @@ -65,7 +66,6 @@ def handle_swap_filled(event: EventData): tranche_index = args['trancheIndex'] amount_in = args['amountIn'] amount_out = args['amountOut'] - # todo accounting fill_fee = args['fillFee'] next_execution_time = args['nextExecutionTime'] try: @@ -79,7 +79,8 @@ def handle_swap_filled(event: EventData): except KeyError: log.warning(f'No order triggers for fill of {TrancheKey(order.key.vault, order.key.order_index, tranche_index)}') else: - triggers.fill(tranche_index, amount_in, amount_out) + time = await get_block_timestamp(event['blockHash']) + triggers.fill(hexstr(event['transactionHash']), time, tranche_index, amount_in, amount_out, fill_fee) async def handle_order_canceled(event: EventData): # event DexorderCanceled (uint64 orderIndex); diff --git a/src/dexorder/order/executionhandler.py b/src/dexorder/order/executionhandler.py index 9f11526..9eebd8e 100644 --- a/src/dexorder/order/executionhandler.py +++ b/src/dexorder/order/executionhandler.py @@ -6,6 +6,7 @@ from web3.exceptions import ContractPanicError, ContractLogicError from web3.types import EventData from dexorder import db +from dexorder.base import TransactionReceiptDict from dexorder.base.order import TrancheKey, OrderKey from dexorder.base.orderlib import SwapOrderState, PriceProof from dexorder.contract.dexorder import get_dexorder_contract @@ -40,12 +41,15 @@ class TrancheExecutionHandler (TransactionHandler): finish_execution_request(tk, errcode) raise exception - async def complete_transaction(self, job: TransactionJob) -> None: + async def complete_transaction(self, job: TransactionJob, receipt: TransactionReceiptDict) -> None: + # we handle execution results using the DexorderExecution event, so there's nothing to do here. + pass + + async def transaction_exception(self, job: TransactionJob, e: Exception) -> None: + log.error('Could not build execution transaction due to exception', exc_info=e) # noinspection PyTypeChecker req: TrancheExecutionRequest = job.request - tk = TrancheKey(req.vault, req.order_index, req.tranche_index) - log.debug(f'completing execution request {tk}') - finish_execution_request(tk) + finish_execution_request(req.tranche_key, '') TrancheExecutionHandler() # map 'te' to a TrancheExecutionHandler @@ -66,7 +70,7 @@ def finish_execution_request(tk: TrancheKey, error: Optional[str]=None): def get_trigger(): try: - return OrderTriggers.instances[order.key].triggers[tk.tranche_index] + return OrderTriggers.instances[order_key].triggers[tk.tranche_index] except KeyError: return None diff --git a/src/dexorder/order/orderstate.py b/src/dexorder/order/orderstate.py index a19b5c3..d6f5efc 100644 --- a/src/dexorder/order/orderstate.py +++ b/src/dexorder/order/orderstate.py @@ -6,7 +6,7 @@ from typing import overload from dexorder import DELETE, db, order_log from dexorder.base.chain import current_chain from dexorder.base.order import OrderKey, TrancheKey -from dexorder.base.orderlib import SwapOrderState, ElaboratedSwapOrderStatus +from dexorder.base.orderlib import SwapOrderState, ElaboratedSwapOrderStatus, Fill from dexorder.blockstate import BlockDict, BlockSet from dexorder.database.model.orderindex import OrderIndex from dexorder.routing import pool_address @@ -18,31 +18,24 @@ log = logging.getLogger(__name__) # We split off the fill information for efficient communication to clients. -@dataclass -class Filled: - filled_in: int - filled_out: int - - @staticmethod - def load(obj: tuple[str,str]): - return Filled(*map(int,obj)) - - def dump(self): - return str(self.filled_in), str(self.filled_out) - - @dataclass class OrderFilled: - filled: Filled - tranche_filled: list[Filled] + tranche_fills: list[list[Fill]] + + @property + def filledIn(self): + return sum(tf.filledIn for tfs in self.tranche_fills for tf in tfs) + + @property + def filledOut(self): + return sum(tf.filledOut for tfs in self.tranche_fills for tf in tfs) @staticmethod def load(obj): - f, tfs = obj - return OrderFilled(Filled.load(f), [Filled.load(tf) for tf in tfs]) + return OrderFilled([[Fill.load(tf) for tf in tfs] for tfs in obj]) def dump(self): - return [self.filled.dump(), [tf.dump() for tf in self.tranche_filled]] + return [[tf.dump() for tf in tfs] for tfs in self.tranche_fills] # todo oco groups @@ -88,10 +81,10 @@ class Order: if order.is_open: Order.open_orders.add(key) Order.vault_open_orders.listappend(key.vault, key.order_index) - # Start with a filled value of 0 even if the chain says otherwise, because we will process the fill events later and add them in - tranche_filled = [Filled(0, 0) for _ in range(len(status.trancheStatus))] + # Start with an empty set of fills, even if the chain says otherwise, because we will process the fill + # events later and add them in + Order.order_filled[key] = OrderFilled([[] for _ in range(len(order.order.tranches))]) order_log.debug(f'initialized order_filled[{key}]') - Order.order_filled[key] = OrderFilled(Filled(0, 0), tranche_filled) order_log.debug(f'order created {key}') return order @@ -127,18 +120,18 @@ class Order: @property def filled_in(self): - return Order.order_filled[self.key].filled.filled_in if self.is_open else self.status.filledIn + return Order.order_filled[self.key].filledIn if self.is_open else self.status.filledIn @property def filled_out(self): - return Order.order_filled[self.key].filled.filled_out if self.is_open else self.status.filledOut + return Order.order_filled[self.key].filledOut if self.is_open else self.status.filledOut def tranche_filled_in(self, tranche_index: int): - return Order.order_filled[self.key].tranche_filled[tranche_index].filled_in if self.is_open \ + return sum(tf.filledIn for tf in Order.order_filled[self.key].tranche_fills[tranche_index]) if self.is_open \ else self.status.trancheStatus[tranche_index].filledIn def tranche_filled_out(self, tranche_index: int): - return Order.order_filled[self.key].tranche_filled[tranche_index].filled_out if self.is_open \ + return sum(tf.filledOut for tf in Order.order_filled[self.key].tranche_fills[tranche_index]) if self.is_open \ else self.status.trancheStatus[tranche_index].filledOut def tranche_filled(self, tranche_index: int): @@ -159,17 +152,14 @@ class Order: def is_open(self): return self.state.is_open - def add_fill(self, tranche_index: int, filled_in: int, filled_out: int): + def add_fill(self, tx: str, time: int, tranche_index: int, filled_in: int, filled_out: int, fee: int): order_log.debug(f'tranche fill {self.key}|{tranche_index} in:{filled_in} out:{filled_out}') try: old = Order.order_filled[self.key] except KeyError: raise new = copy.deepcopy(old) - new.filled.filled_in += filled_in - new.filled.filled_out += filled_out - new.tranche_filled[tranche_index].filled_in += filled_in - new.tranche_filled[tranche_index].filled_out += filled_out + new.tranche_fills[tranche_index].append(Fill(tx, time, filled_in, filled_out, fee)) order_log.debug(f'updated order_filled: {new}') Order.order_filled[self.key] = new @@ -188,13 +178,22 @@ class Order: except KeyError: log.warning(f'While completing with status {final_state}, no order fills found for {self.key}') else: - order_log.debug(f'deleting order_filled[{self.key}]') - del Order.order_filled[self.key] - status.filledIn = of.filled.filled_in - status.filledOut = of.filled.filled_out - for i, tf in enumerate(of.tranche_filled): - status.trancheStatus[i].filledIn = of.tranche_filled[i].filled_in - status.trancheStatus[i].filledOut = of.tranche_filled[i].filled_out + # order_log.debug(f'deleting order_filled[{self.key}]') + filledIn = filledOut = 0 + for (i,fills) in enumerate(of.tranche_fills): + fi = fo = 0 + for fill in fills: + fill: Fill + fi += fill.filledIn + fo += fill.filledOut + filledIn += fi + filledOut += fo + ts = status.trancheStatus[i] + ts.fills = copy.deepcopy(fills) + ts.filledIn = fi + ts.filledOut = fo + status.filledIn = filledIn + status.filledOut = filledOut Order.order_statuses[self.key] = status # set the status in order to save it Order.order_statuses.unload(self.key) # but then unload from memory after root promotion order_log.debug(f'order completed {status}') diff --git a/src/dexorder/order/triggers.py b/src/dexorder/order/triggers.py index 11382c2..eb1be8a 100644 --- a/src/dexorder/order/triggers.py +++ b/src/dexorder/order/triggers.py @@ -39,6 +39,9 @@ execution should be attempted on the tranche. # tranches which have passed all constraints and should be executed active_tranches: BlockDict[TrancheKey, Optional[PriceProof]] = BlockDict('at') +# tranches which have an execute() transaction sent but not completed +inflight_execution_requests: set[TrancheKey] = set() + class OrderTriggers: instances: dict[OrderKey, 'OrderTriggers'] = {} @@ -74,8 +77,8 @@ class OrderTriggers: final_state = SwapOrderState.Filled if self.order.remaining == 0 or self.order.remaining < self.order.min_fill_amount else SwapOrderState.Expired close_order_and_disable_triggers(self.order, final_state) - def fill(self, tranche_index, amount_in, amount_out): - self.order.add_fill(tranche_index, amount_in, amount_out) + def fill(self, tx: str, time: int, tranche_index, amount_in, amount_out, fee): + self.order.add_fill(tx, time, tranche_index, amount_in, amount_out, fee) if self.triggers[tranche_index].fill(amount_in, amount_out): self.check_complete() @@ -84,6 +87,10 @@ class OrderTriggers: self.check_complete() +# +# Client Interface +# + def start_trigger_updates(): """ Called near the beginning of block handling to initialize any per-block trigger data structures @@ -92,10 +99,6 @@ def start_trigger_updates(): PriceLineTrigger.clear_data() -# -# Client Interface -# - async def update_balance_triggers(vault: str, token: str, balance: int): updates = [bt.update(balance) for bt in BalanceTrigger.by_vault_token.get((vault, token), [])] await asyncio.gather(*updates) @@ -108,8 +111,6 @@ async def update_price_triggers(pool: OldPoolDict, price: dec): pt.update(price) -inflight_execution_requests: set[TrancheKey] = set() - async def end_trigger_updates(): """ Call once after all updates have been handled. This updates the active_tranches array based on final trigger state. @@ -178,9 +179,11 @@ class Trigger: @value.setter def value(self, value): - if value != self.value: + state = _trigger_state.get(self.tk,0) + old = state & (1 << self.position) == 0 # NOTE: inverted + + if value != old: _dirty.add(self.tk) - old = _trigger_state.get(self.tk,0) if not value: # this conditional is inverted _trigger_state[self.tk] = old | (1 << self.position) # set else: @@ -311,14 +314,16 @@ class PriceLineTrigger (Trigger): return None # no constraint (deactivated) pool = await get_pool(Order.of(tk).pool_address) await ensure_pool_price(pool) - price_now = pool_prices[pool['address']] + price_now = pool_prices[pool['address']] * dec(10) ** dec(-pool['decimals']) # adjust for pool decimals to get onchain price return PriceLineTrigger(tk, line, is_min, is_barrier, price_now) def __init__(self, tk: TrancheKey, line: Line, is_min: bool, is_barrier: bool, price_now: dec): if is_barrier: log.warning('Barriers not supported') - price_above = price_now > line.intercept + line.slope * current_clock.get().timestamp - super().__init__(3 if is_min else 4, tk, is_min is price_above) + value_now = line.intercept + line.slope * current_clock.get().timestamp + price_above = price_now > value_now + # log.debug(f'initial price line {value_now} {">" if is_min else "<"} {price_now} {is_min is not price_above}') + super().__init__(3 if is_min else 4, tk, is_min is not price_above) self.line = line self.is_min = is_min self.is_barrier = is_barrier @@ -328,9 +333,10 @@ class PriceLineTrigger (Trigger): # lines that need evaluating add their data to these arrays, which are then sent to SIMD for evaluation. each # array must always have the same size as the others. - y = [] + y = [] # the current price m = [] b = [] + sign = [] # 1 or -1 to represent a min line or max line respectively triggers = [] # 1-for-1 with line_data triggers_set = set() @@ -339,6 +345,7 @@ class PriceLineTrigger (Trigger): PriceLineTrigger.y.clear() PriceLineTrigger.m.clear() PriceLineTrigger.b.clear() + PriceLineTrigger.sign.clear() PriceLineTrigger.triggers.clear() PriceLineTrigger.triggers_set.clear() @@ -348,6 +355,7 @@ class PriceLineTrigger (Trigger): PriceLineTrigger.y.append(price) PriceLineTrigger.m.append(self.line.slope) PriceLineTrigger.b.append(self.line.intercept) + PriceLineTrigger.sign.append(-1 if self.is_min else 1) PriceLineTrigger.triggers.append(self) PriceLineTrigger.triggers_set.add(self) else: @@ -357,14 +365,18 @@ class PriceLineTrigger (Trigger): @staticmethod def end_updates(time: int): # here we use numpy to compute all dirty lines using SIMD - y, m, b = map(np.array, (PriceLineTrigger.y, PriceLineTrigger.m, PriceLineTrigger.b)) + y, m, b, sign = map(np.array, + (PriceLineTrigger.y, PriceLineTrigger.m, PriceLineTrigger.b, PriceLineTrigger.sign)) line_value = m * time + b - price_diff = y - line_value - for t, pd in zip(PriceLineTrigger.triggers, price_diff): - t.handle_result(pd) + price_diff = sign * (y - line_value) + activated = price_diff > 0 + # for price, line, s, a in zip(y, line_value, sign, activated): + # log.debug(f'price: {line} {">" if s == -1 else "<"} {price} {a}') + for t, activated in zip(PriceLineTrigger.triggers, activated): + t.handle_result(activated) + PriceLineTrigger.clear_data() - def handle_result(self, price_diff: float): - value = self.is_min and price_diff > 0 or not self.is_min and price_diff < 0 + def handle_result(self, value: bool): if not self.is_barrier or value: # barriers that are False do not update their values to False self.value = value @@ -478,7 +490,7 @@ class TrancheTrigger: def check_expire(self): # if the expiration constraint has become False then the tranche can never execute again - if not self.expiration_trigger: + if self.expiration_trigger is not None and self.expiration_trigger: OrderTriggers.instances[self.tk.order_key].expire_tranche(self.tk.tranche_index) def expire(self): diff --git a/src/dexorder/transactions.py b/src/dexorder/transactions.py index 6f3540d..2ef16c9 100644 --- a/src/dexorder/transactions.py +++ b/src/dexorder/transactions.py @@ -12,7 +12,6 @@ from dexorder.base import TransactionReceiptDict, TransactionRequest, transactio from dexorder.base.chain import current_chain from dexorder.base.order import TrancheKey, OrderKey from dexorder.base.orderlib import PriceProof -from dexorder.blockstate import BlockDict from dexorder.blockstate.diff import DiffEntryItem from dexorder.blockstate.fork import current_fork, Fork from dexorder.contract.contract_proxy import ContractTransaction @@ -36,8 +35,10 @@ class TransactionHandler: async def build_transaction(self, job_id: int, tr: TransactionRequest) -> ContractTransaction: ... @abstractmethod - async def complete_transaction(self, job: TransactionJob) -> None: ... + async def complete_transaction(self, job: TransactionJob, receipt: TransactionReceiptDict) -> None: ... + @abstractmethod + async def transaction_exception(self, job: TransactionJob, e: Exception) -> None: ... @dataclass class TrancheExecutionRequest (TransactionRequest): @@ -102,12 +103,12 @@ async def create_and_send_transactions(): return try: ctx: ContractTransaction = await handler.build_transaction(job.id, job.request) - except (ContractPanicError, ContractLogicError): + except (ContractPanicError, ContractLogicError) as x: # these errors can be thrown immediately when the tx is tested for gas log.warning(f'failed to build transaction request for {job.request.__class__.__name__} {job.id}') job.state = TransactionJobState.Error db.session.add(job) - await handler.complete_transaction(job) + await handler.transaction_exception(job, x) return except Exception as x: log.warning(f'unable to send transaction for job {job.id}', exc_info=x) @@ -154,15 +155,13 @@ async def handle_transaction_receipts(): assert fork is not None if fork.branch.contiguous and receipt['blockHash'] in fork.branch.path or \ fork.branch.disjoint and receipt['blockNumber'] <= fork.height: - # don't set the database yet because we could get reorged - completed_transactions[job.tx_id] = receipt try: handler = TransactionHandler.of(job.request.type) except KeyError: # todo remove bad request? log.warning(f'ignoring transaction request with bad type "{job.request.type}"') else: - await handler.complete_transaction(job) + await handler.complete_transaction(job, receipt) def finalize_transactions(_fork: Fork, diffs: list[DiffEntryItem]): @@ -179,5 +178,3 @@ def finalize_transactions(_fork: Fork, diffs: list[DiffEntryItem]): job.receipt = diff.value db.session.add(job) - -completed_transactions = BlockDict[bytes, dict]('mined_txs') # stores the transaction receipt