fills; line calculation fixes; execution transaction handling fixes

This commit is contained in:
tim
2024-09-11 00:59:47 -04:00
parent ba165c2aff
commit fa710db1ce
8 changed files with 125 additions and 83 deletions

View File

@@ -4,6 +4,8 @@ from dataclasses import dataclass
from enum import Enum
from typing import Optional
from numpy.ma.core import filled
from dexorder.util import hexbytes
from dexorder.util.convert import decode_IEEE754
@@ -117,6 +119,23 @@ SwapOrder
return msg
@dataclass
class Fill:
tx: str
time: int
filledIn: int
filledOut: int
fee: int
def dump(self):
return self.tx, self.time, str(self.filledIn), str(self.filledOut), str(self.fee)
@staticmethod
def load(obj):
tx, time, filledIn, filledOut, fee = obj
return Fill(tx, time, int(filledIn), int(filledOut), int(fee))
@dataclass
class ElaboratedTrancheStatus:
filledIn: int
@@ -124,6 +143,7 @@ class ElaboratedTrancheStatus:
activationTime: int
startTime: int
endTime: int
fills: list[Fill]
@staticmethod
def load_from_chain(obj: tuple[int,int,int,int]):
@@ -132,17 +152,20 @@ class ElaboratedTrancheStatus:
# we do NOT grab the filled amount from the chain, because our process will handle the fill events
# separately by incrementing these status values as fills arrive.
0, 0,
activationTime, startTime, endTime,
activationTime, startTime, endTime, []
)
def dump(self):
# filled fields can be larger than JSON-able ints, so we use strings.
return str(self.filledIn), str(self.filledOut), self.activationTime, self.startTime, self.endTime
return str(self.filledIn), str(self.filledOut), self.activationTime, self.startTime, self.endTime, [
f.dump() for f in self.fills
]
@staticmethod
def load(obj: tuple[str,str,int,int,int]):
filledIn, filledOut, activationTime, startTime, endTime = obj
return ElaboratedTrancheStatus(int(filledIn), int(filledOut), activationTime, startTime, endTime)
filledIn, filledOut, activationTime, startTime, endTime, fillsObj = obj
fills = [Fill.load(f) for f in fillsObj]
return ElaboratedTrancheStatus(int(filledIn), int(filledOut), activationTime, startTime, endTime, fills)
@@ -324,11 +347,11 @@ class Tranche:
if self.minLine.intercept or self.minLine.slope:
msg += f' >{self.minLine.intercept:.5g}'
if self.minLine.slope:
msg += f'{self.minLine.slope:+.5g}'
msg += f'{self.minLine.slope:+.5g}/s'
if self.maxLine.intercept or self.maxLine.slope:
msg += f' <{self.maxLine.intercept:.5g}'
if self.maxLine.slope:
msg += f'{self.maxLine.slope:+.5g}'
msg += f'{self.maxLine.slope:+.5g}/s'
if self.rateLimitPeriod:
msg += f' {self.rateLimitFraction/MAX_FRACTION:.1%} every {self.rateLimitPeriod/60:.0} minutes'
return msg

View File

@@ -166,6 +166,11 @@ class BlockDict(Generic[K,V], BlockData[V]):
def get(self, item: K, default: V = None) -> V:
return self.getitem(item, default)
def pop(self, item: K, default: V = None):
result = self.getitem(item, default)
self.delitem(item)
return result
def setdefault(self, item: K, value: V):
return self.setitem(item, value, overwrite=False)

View File

@@ -231,6 +231,7 @@ class BlockState:
diff = DiffEntry(value, branch.height, branch.id)
diffs.add(diff)
self.diffs_by_branch[branch.id].append(DiffEntryItem(series, key, diff))
return old_value
def unload(self, fork: Optional[Fork], series, key):
self.unloads[fork.branch_id].append((series, key))

View File

@@ -7,7 +7,7 @@ from dexorder import current_pub, minutely
from dexorder.base.chain import current_chain
from dexorder.base.order import TrancheKey, OrderKey
from dexorder.base.orderlib import SwapOrderState
from dexorder.blocks import get_block_timestamp
from dexorder.blocks import get_block_timestamp, current_block
from dexorder.contract.dexorder import vault_address, VaultContract, get_factory_contract
from dexorder.impls import get_impl_version
from dexorder.ohlc import ohlcs
@@ -15,6 +15,7 @@ from dexorder.order.orderstate import Order
from dexorder.order.triggers import (OrderTriggers, activate_order, update_balance_triggers, start_trigger_updates,
update_price_triggers)
from dexorder.pools import new_pool_prices, pool_prices, get_uniswap_data
from dexorder.util import hexstr
from dexorder.vault_blockdata import vault_owners, adjust_balance, MAX_VAULTS, verify_vault
log = logging.getLogger(__name__)
@@ -55,7 +56,7 @@ async def handle_order_placed(event: EventData):
log.debug(f'new order{order}')
def handle_swap_filled(event: EventData):
async def handle_swap_filled(event: EventData):
log.debug('handle_swap_filled')
# event DexorderSwapFilled (uint64 orderIndex, uint8 trancheIndex, uint256 amountIn, uint256 amountOut);
log.debug(f'DexorderSwapFilled {event}')
@@ -65,7 +66,6 @@ def handle_swap_filled(event: EventData):
tranche_index = args['trancheIndex']
amount_in = args['amountIn']
amount_out = args['amountOut']
# todo accounting
fill_fee = args['fillFee']
next_execution_time = args['nextExecutionTime']
try:
@@ -79,7 +79,8 @@ def handle_swap_filled(event: EventData):
except KeyError:
log.warning(f'No order triggers for fill of {TrancheKey(order.key.vault, order.key.order_index, tranche_index)}')
else:
triggers.fill(tranche_index, amount_in, amount_out)
time = await get_block_timestamp(event['blockHash'])
triggers.fill(hexstr(event['transactionHash']), time, tranche_index, amount_in, amount_out, fill_fee)
async def handle_order_canceled(event: EventData):
# event DexorderCanceled (uint64 orderIndex);

View File

@@ -6,6 +6,7 @@ from web3.exceptions import ContractPanicError, ContractLogicError
from web3.types import EventData
from dexorder import db
from dexorder.base import TransactionReceiptDict
from dexorder.base.order import TrancheKey, OrderKey
from dexorder.base.orderlib import SwapOrderState, PriceProof
from dexorder.contract.dexorder import get_dexorder_contract
@@ -40,12 +41,15 @@ class TrancheExecutionHandler (TransactionHandler):
finish_execution_request(tk, errcode)
raise exception
async def complete_transaction(self, job: TransactionJob) -> None:
async def complete_transaction(self, job: TransactionJob, receipt: TransactionReceiptDict) -> None:
# we handle execution results using the DexorderExecution event, so there's nothing to do here.
pass
async def transaction_exception(self, job: TransactionJob, e: Exception) -> None:
log.error('Could not build execution transaction due to exception', exc_info=e)
# noinspection PyTypeChecker
req: TrancheExecutionRequest = job.request
tk = TrancheKey(req.vault, req.order_index, req.tranche_index)
log.debug(f'completing execution request {tk}')
finish_execution_request(tk)
finish_execution_request(req.tranche_key, '')
TrancheExecutionHandler() # map 'te' to a TrancheExecutionHandler
@@ -66,7 +70,7 @@ def finish_execution_request(tk: TrancheKey, error: Optional[str]=None):
def get_trigger():
try:
return OrderTriggers.instances[order.key].triggers[tk.tranche_index]
return OrderTriggers.instances[order_key].triggers[tk.tranche_index]
except KeyError:
return None

View File

@@ -6,7 +6,7 @@ from typing import overload
from dexorder import DELETE, db, order_log
from dexorder.base.chain import current_chain
from dexorder.base.order import OrderKey, TrancheKey
from dexorder.base.orderlib import SwapOrderState, ElaboratedSwapOrderStatus
from dexorder.base.orderlib import SwapOrderState, ElaboratedSwapOrderStatus, Fill
from dexorder.blockstate import BlockDict, BlockSet
from dexorder.database.model.orderindex import OrderIndex
from dexorder.routing import pool_address
@@ -18,31 +18,24 @@ log = logging.getLogger(__name__)
# We split off the fill information for efficient communication to clients.
@dataclass
class Filled:
filled_in: int
filled_out: int
@staticmethod
def load(obj: tuple[str,str]):
return Filled(*map(int,obj))
def dump(self):
return str(self.filled_in), str(self.filled_out)
@dataclass
class OrderFilled:
filled: Filled
tranche_filled: list[Filled]
tranche_fills: list[list[Fill]]
@property
def filledIn(self):
return sum(tf.filledIn for tfs in self.tranche_fills for tf in tfs)
@property
def filledOut(self):
return sum(tf.filledOut for tfs in self.tranche_fills for tf in tfs)
@staticmethod
def load(obj):
f, tfs = obj
return OrderFilled(Filled.load(f), [Filled.load(tf) for tf in tfs])
return OrderFilled([[Fill.load(tf) for tf in tfs] for tfs in obj])
def dump(self):
return [self.filled.dump(), [tf.dump() for tf in self.tranche_filled]]
return [[tf.dump() for tf in tfs] for tfs in self.tranche_fills]
# todo oco groups
@@ -88,10 +81,10 @@ class Order:
if order.is_open:
Order.open_orders.add(key)
Order.vault_open_orders.listappend(key.vault, key.order_index)
# Start with a filled value of 0 even if the chain says otherwise, because we will process the fill events later and add them in
tranche_filled = [Filled(0, 0) for _ in range(len(status.trancheStatus))]
# Start with an empty set of fills, even if the chain says otherwise, because we will process the fill
# events later and add them in
Order.order_filled[key] = OrderFilled([[] for _ in range(len(order.order.tranches))])
order_log.debug(f'initialized order_filled[{key}]')
Order.order_filled[key] = OrderFilled(Filled(0, 0), tranche_filled)
order_log.debug(f'order created {key}')
return order
@@ -127,18 +120,18 @@ class Order:
@property
def filled_in(self):
return Order.order_filled[self.key].filled.filled_in if self.is_open else self.status.filledIn
return Order.order_filled[self.key].filledIn if self.is_open else self.status.filledIn
@property
def filled_out(self):
return Order.order_filled[self.key].filled.filled_out if self.is_open else self.status.filledOut
return Order.order_filled[self.key].filledOut if self.is_open else self.status.filledOut
def tranche_filled_in(self, tranche_index: int):
return Order.order_filled[self.key].tranche_filled[tranche_index].filled_in if self.is_open \
return sum(tf.filledIn for tf in Order.order_filled[self.key].tranche_fills[tranche_index]) if self.is_open \
else self.status.trancheStatus[tranche_index].filledIn
def tranche_filled_out(self, tranche_index: int):
return Order.order_filled[self.key].tranche_filled[tranche_index].filled_out if self.is_open \
return sum(tf.filledOut for tf in Order.order_filled[self.key].tranche_fills[tranche_index]) if self.is_open \
else self.status.trancheStatus[tranche_index].filledOut
def tranche_filled(self, tranche_index: int):
@@ -159,17 +152,14 @@ class Order:
def is_open(self):
return self.state.is_open
def add_fill(self, tranche_index: int, filled_in: int, filled_out: int):
def add_fill(self, tx: str, time: int, tranche_index: int, filled_in: int, filled_out: int, fee: int):
order_log.debug(f'tranche fill {self.key}|{tranche_index} in:{filled_in} out:{filled_out}')
try:
old = Order.order_filled[self.key]
except KeyError:
raise
new = copy.deepcopy(old)
new.filled.filled_in += filled_in
new.filled.filled_out += filled_out
new.tranche_filled[tranche_index].filled_in += filled_in
new.tranche_filled[tranche_index].filled_out += filled_out
new.tranche_fills[tranche_index].append(Fill(tx, time, filled_in, filled_out, fee))
order_log.debug(f'updated order_filled: {new}')
Order.order_filled[self.key] = new
@@ -188,13 +178,22 @@ class Order:
except KeyError:
log.warning(f'While completing with status {final_state}, no order fills found for {self.key}')
else:
order_log.debug(f'deleting order_filled[{self.key}]')
del Order.order_filled[self.key]
status.filledIn = of.filled.filled_in
status.filledOut = of.filled.filled_out
for i, tf in enumerate(of.tranche_filled):
status.trancheStatus[i].filledIn = of.tranche_filled[i].filled_in
status.trancheStatus[i].filledOut = of.tranche_filled[i].filled_out
# order_log.debug(f'deleting order_filled[{self.key}]')
filledIn = filledOut = 0
for (i,fills) in enumerate(of.tranche_fills):
fi = fo = 0
for fill in fills:
fill: Fill
fi += fill.filledIn
fo += fill.filledOut
filledIn += fi
filledOut += fo
ts = status.trancheStatus[i]
ts.fills = copy.deepcopy(fills)
ts.filledIn = fi
ts.filledOut = fo
status.filledIn = filledIn
status.filledOut = filledOut
Order.order_statuses[self.key] = status # set the status in order to save it
Order.order_statuses.unload(self.key) # but then unload from memory after root promotion
order_log.debug(f'order completed {status}')

View File

@@ -39,6 +39,9 @@ execution should be attempted on the tranche.
# tranches which have passed all constraints and should be executed
active_tranches: BlockDict[TrancheKey, Optional[PriceProof]] = BlockDict('at')
# tranches which have an execute() transaction sent but not completed
inflight_execution_requests: set[TrancheKey] = set()
class OrderTriggers:
instances: dict[OrderKey, 'OrderTriggers'] = {}
@@ -74,8 +77,8 @@ class OrderTriggers:
final_state = SwapOrderState.Filled if self.order.remaining == 0 or self.order.remaining < self.order.min_fill_amount else SwapOrderState.Expired
close_order_and_disable_triggers(self.order, final_state)
def fill(self, tranche_index, amount_in, amount_out):
self.order.add_fill(tranche_index, amount_in, amount_out)
def fill(self, tx: str, time: int, tranche_index, amount_in, amount_out, fee):
self.order.add_fill(tx, time, tranche_index, amount_in, amount_out, fee)
if self.triggers[tranche_index].fill(amount_in, amount_out):
self.check_complete()
@@ -84,6 +87,10 @@ class OrderTriggers:
self.check_complete()
#
# Client Interface
#
def start_trigger_updates():
"""
Called near the beginning of block handling to initialize any per-block trigger data structures
@@ -92,10 +99,6 @@ def start_trigger_updates():
PriceLineTrigger.clear_data()
#
# Client Interface
#
async def update_balance_triggers(vault: str, token: str, balance: int):
updates = [bt.update(balance) for bt in BalanceTrigger.by_vault_token.get((vault, token), [])]
await asyncio.gather(*updates)
@@ -108,8 +111,6 @@ async def update_price_triggers(pool: OldPoolDict, price: dec):
pt.update(price)
inflight_execution_requests: set[TrancheKey] = set()
async def end_trigger_updates():
"""
Call once after all updates have been handled. This updates the active_tranches array based on final trigger state.
@@ -178,9 +179,11 @@ class Trigger:
@value.setter
def value(self, value):
if value != self.value:
state = _trigger_state.get(self.tk,0)
old = state & (1 << self.position) == 0 # NOTE: inverted
if value != old:
_dirty.add(self.tk)
old = _trigger_state.get(self.tk,0)
if not value: # this conditional is inverted
_trigger_state[self.tk] = old | (1 << self.position) # set
else:
@@ -311,14 +314,16 @@ class PriceLineTrigger (Trigger):
return None # no constraint (deactivated)
pool = await get_pool(Order.of(tk).pool_address)
await ensure_pool_price(pool)
price_now = pool_prices[pool['address']]
price_now = pool_prices[pool['address']] * dec(10) ** dec(-pool['decimals']) # adjust for pool decimals to get onchain price
return PriceLineTrigger(tk, line, is_min, is_barrier, price_now)
def __init__(self, tk: TrancheKey, line: Line, is_min: bool, is_barrier: bool, price_now: dec):
if is_barrier:
log.warning('Barriers not supported')
price_above = price_now > line.intercept + line.slope * current_clock.get().timestamp
super().__init__(3 if is_min else 4, tk, is_min is price_above)
value_now = line.intercept + line.slope * current_clock.get().timestamp
price_above = price_now > value_now
# log.debug(f'initial price line {value_now} {">" if is_min else "<"} {price_now} {is_min is not price_above}')
super().__init__(3 if is_min else 4, tk, is_min is not price_above)
self.line = line
self.is_min = is_min
self.is_barrier = is_barrier
@@ -328,9 +333,10 @@ class PriceLineTrigger (Trigger):
# lines that need evaluating add their data to these arrays, which are then sent to SIMD for evaluation. each
# array must always have the same size as the others.
y = []
y = [] # the current price
m = []
b = []
sign = [] # 1 or -1 to represent a min line or max line respectively
triggers = [] # 1-for-1 with line_data
triggers_set = set()
@@ -339,6 +345,7 @@ class PriceLineTrigger (Trigger):
PriceLineTrigger.y.clear()
PriceLineTrigger.m.clear()
PriceLineTrigger.b.clear()
PriceLineTrigger.sign.clear()
PriceLineTrigger.triggers.clear()
PriceLineTrigger.triggers_set.clear()
@@ -348,6 +355,7 @@ class PriceLineTrigger (Trigger):
PriceLineTrigger.y.append(price)
PriceLineTrigger.m.append(self.line.slope)
PriceLineTrigger.b.append(self.line.intercept)
PriceLineTrigger.sign.append(-1 if self.is_min else 1)
PriceLineTrigger.triggers.append(self)
PriceLineTrigger.triggers_set.add(self)
else:
@@ -357,14 +365,18 @@ class PriceLineTrigger (Trigger):
@staticmethod
def end_updates(time: int):
# here we use numpy to compute all dirty lines using SIMD
y, m, b = map(np.array, (PriceLineTrigger.y, PriceLineTrigger.m, PriceLineTrigger.b))
y, m, b, sign = map(np.array,
(PriceLineTrigger.y, PriceLineTrigger.m, PriceLineTrigger.b, PriceLineTrigger.sign))
line_value = m * time + b
price_diff = y - line_value
for t, pd in zip(PriceLineTrigger.triggers, price_diff):
t.handle_result(pd)
price_diff = sign * (y - line_value)
activated = price_diff > 0
# for price, line, s, a in zip(y, line_value, sign, activated):
# log.debug(f'price: {line} {">" if s == -1 else "<"} {price} {a}')
for t, activated in zip(PriceLineTrigger.triggers, activated):
t.handle_result(activated)
PriceLineTrigger.clear_data()
def handle_result(self, price_diff: float):
value = self.is_min and price_diff > 0 or not self.is_min and price_diff < 0
def handle_result(self, value: bool):
if not self.is_barrier or value: # barriers that are False do not update their values to False
self.value = value
@@ -478,7 +490,7 @@ class TrancheTrigger:
def check_expire(self):
# if the expiration constraint has become False then the tranche can never execute again
if not self.expiration_trigger:
if self.expiration_trigger is not None and self.expiration_trigger:
OrderTriggers.instances[self.tk.order_key].expire_tranche(self.tk.tranche_index)
def expire(self):

View File

@@ -12,7 +12,6 @@ from dexorder.base import TransactionReceiptDict, TransactionRequest, transactio
from dexorder.base.chain import current_chain
from dexorder.base.order import TrancheKey, OrderKey
from dexorder.base.orderlib import PriceProof
from dexorder.blockstate import BlockDict
from dexorder.blockstate.diff import DiffEntryItem
from dexorder.blockstate.fork import current_fork, Fork
from dexorder.contract.contract_proxy import ContractTransaction
@@ -36,8 +35,10 @@ class TransactionHandler:
async def build_transaction(self, job_id: int, tr: TransactionRequest) -> ContractTransaction: ...
@abstractmethod
async def complete_transaction(self, job: TransactionJob) -> None: ...
async def complete_transaction(self, job: TransactionJob, receipt: TransactionReceiptDict) -> None: ...
@abstractmethod
async def transaction_exception(self, job: TransactionJob, e: Exception) -> None: ...
@dataclass
class TrancheExecutionRequest (TransactionRequest):
@@ -102,12 +103,12 @@ async def create_and_send_transactions():
return
try:
ctx: ContractTransaction = await handler.build_transaction(job.id, job.request)
except (ContractPanicError, ContractLogicError):
except (ContractPanicError, ContractLogicError) as x:
# these errors can be thrown immediately when the tx is tested for gas
log.warning(f'failed to build transaction request for {job.request.__class__.__name__} {job.id}')
job.state = TransactionJobState.Error
db.session.add(job)
await handler.complete_transaction(job)
await handler.transaction_exception(job, x)
return
except Exception as x:
log.warning(f'unable to send transaction for job {job.id}', exc_info=x)
@@ -154,15 +155,13 @@ async def handle_transaction_receipts():
assert fork is not None
if fork.branch.contiguous and receipt['blockHash'] in fork.branch.path or \
fork.branch.disjoint and receipt['blockNumber'] <= fork.height:
# don't set the database yet because we could get reorged
completed_transactions[job.tx_id] = receipt
try:
handler = TransactionHandler.of(job.request.type)
except KeyError:
# todo remove bad request?
log.warning(f'ignoring transaction request with bad type "{job.request.type}"')
else:
await handler.complete_transaction(job)
await handler.complete_transaction(job, receipt)
def finalize_transactions(_fork: Fork, diffs: list[DiffEntryItem]):
@@ -179,5 +178,3 @@ def finalize_transactions(_fork: Fork, diffs: list[DiffEntryItem]):
job.receipt = diff.value
db.session.add(job)
completed_transactions = BlockDict[bytes, dict]('mined_txs') # stores the transaction receipt