diff --git a/alembic/versions/516b55c83144_initial_schema.py b/alembic/versions/516b55c83144_initial_schema.py index c1e3266..6cb53b3 100644 --- a/alembic/versions/516b55c83144_initial_schema.py +++ b/alembic/versions/516b55c83144_initial_schema.py @@ -72,7 +72,7 @@ def upgrade() -> None: sa.Column('id', sa.UUID(), nullable=False), sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False), sa.Column('height', sa.Integer(), nullable=False), - sa.Column('state', sa.Enum('Requested', 'Signed', 'Sent', 'Mined', name='transactionjobstate'), nullable=False), + sa.Column('state', sa.Enum('Requested', 'Signed', 'Sent', 'Mined', 'Error', name='transactionjobstate'), nullable=False), sa.Column('request', dexorder.database.column_types.DataclassDictBase(astext_type=sa.Text()), nullable=False), sa.Column('tx_id', postgresql.BYTEA(), nullable=True), sa.Column('tx_data', postgresql.BYTEA(), nullable=True), @@ -83,10 +83,24 @@ def upgrade() -> None: op.create_index(op.f('ix_transactionjob_height'), 'transactionjob', ['height'], unique=False) op.create_index(op.f('ix_transactionjob_state'), 'transactionjob', ['state'], unique=False) op.create_index(op.f('ix_transactionjob_tx_id'), 'transactionjob', ['tx_id'], unique=False) - # ### end Alembic commands ### + + op.create_table('dbblock', + sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False), + sa.Column('hash', postgresql.BYTEA(), nullable=False), + sa.Column('height', sa.Integer(), nullable=False), + sa.Column('timestamp', sa.INTEGER(), nullable=False), + sa.Column('confirmed', sa.Boolean(), nullable=False), + sa.Column('data', postgresql.JSONB(astext_type=sa.Text()), nullable=False), + sa.PrimaryKeyConstraint('chain', 'hash') + ) + op.create_index(op.f('ix_dbblock_height'), 'dbblock', ['height'], unique=False) + op.create_index(op.f('ix_dbblock_timestamp'), 'dbblock', ['timestamp'], unique=False) def downgrade() -> None: + op.drop_index(op.f('ix_dbblock_timestamp'), table_name='dbblock') + op.drop_index(op.f('ix_dbblock_height'), table_name='dbblock') + op.drop_table('dbblock') op.drop_index(op.f('ix_transactionjob_tx_id'), table_name='transactionjob') op.drop_index(op.f('ix_transactionjob_state'), table_name='transactionjob') op.drop_index(op.f('ix_transactionjob_height'), table_name='transactionjob') diff --git a/alembic/versions/ee22683693a5_blockindex.py b/alembic/versions/ee22683693a5_blockindex.py deleted file mode 100644 index 4e83a03..0000000 --- a/alembic/versions/ee22683693a5_blockindex.py +++ /dev/null @@ -1,44 +0,0 @@ -"""BlockIndex - -Revision ID: ee22683693a5 -Revises: 516b55c83144 -Create Date: 2024-07-19 18:52:04.933167 - -""" -from typing import Sequence, Union - -from alembic import op -import sqlalchemy as sa -import dexorder.database -import dexorder.database.column_types -from sqlalchemy.dialects import postgresql - -# revision identifiers, used by Alembic. -revision: str = 'ee22683693a5' -down_revision: Union[str, None] = '516b55c83144' -branch_labels: Union[str, Sequence[str], None] = None -depends_on: Union[str, Sequence[str], None] = None - - -def upgrade() -> None: - # ### commands auto generated by Alembic - please adjust! ### - op.create_table('dbblock', - sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False), - sa.Column('hash', postgresql.BYTEA(), nullable=False), - sa.Column('height', sa.Integer(), nullable=False), - sa.Column('timestamp', sa.INTEGER(), nullable=False), - sa.Column('confirmed', sa.Boolean(), nullable=False), - sa.Column('data', postgresql.JSONB(astext_type=sa.Text()), nullable=False), - sa.PrimaryKeyConstraint('chain', 'hash') - ) - op.create_index(op.f('ix_dbblock_height'), 'dbblock', ['height'], unique=False) - op.create_index(op.f('ix_dbblock_timestamp'), 'dbblock', ['timestamp'], unique=False) - # ### end Alembic commands ### - - -def downgrade() -> None: - # ### commands auto generated by Alembic - please adjust! ### - op.drop_index(op.f('ix_dbblock_timestamp'), table_name='dbblock') - op.drop_index(op.f('ix_dbblock_height'), table_name='dbblock') - op.drop_table('dbblock') - # ### end Alembic commands ### diff --git a/requirements.txt b/requirements.txt index 7601947..d436931 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,3 +16,4 @@ eth-bloom python-dateutil eth_abi pdpyras # pagerduty +numpy diff --git a/src/dexorder/alert.py b/src/dexorder/alert.py index b697de1..fb287aa 100644 --- a/src/dexorder/alert.py +++ b/src/dexorder/alert.py @@ -21,16 +21,13 @@ def warningAlert(title, message, dedup_key=NARG, log_level=logging.WARNING): return alert(title, message, dedup_key, log_level) -async def spawn_alert(title, message, dedup_key): - alert_pagerduty(title,message,dedup_key) - - pagerduty_session = None hostname = None def alert_pagerduty(title, message, dedup_key, log_level): if not config.pagerduty: return + # noinspection PyBroadException try: global pagerduty_session global hostname diff --git a/src/dexorder/base/__init__.py b/src/dexorder/base/__init__.py index 2356a13..5535b7f 100644 --- a/src/dexorder/base/__init__.py +++ b/src/dexorder/base/__init__.py @@ -1,8 +1,22 @@ -from typing import TypedDict, Union +from dataclasses import dataclass +from typing import TypedDict, Union, Type Address = str Quantity = Union[str,int] + +@dataclass +class TransactionRequest: + """ + All members of TransactionRequest and its subclasses must be JSON-serializable. They get stored in the database + TransactionJob in a JSONB field, as handled by the DataclassDict column type. + """ + type: str + +# subclasses of TransactionRequest must register their type code here so the appropriate dataclass may be constructed +transaction_request_registry: dict[str, Type[TransactionRequest]] = {} + + TransactionDict = TypedDict( 'TransactionDict', { 'from': Address, 'to': Address, diff --git a/src/dexorder/base/chain.py b/src/dexorder/base/chain.py index 906c3b5..37d26fc 100644 --- a/src/dexorder/base/chain.py +++ b/src/dexorder/base/chain.py @@ -1,6 +1,4 @@ import math -from abc import ABC, abstractmethod -# noinspection PyPackageRequirements from contextvars import ContextVar import dexorder diff --git a/src/dexorder/base/order.py b/src/dexorder/base/order.py index 9b85a92..7aa5175 100644 --- a/src/dexorder/base/order.py +++ b/src/dexorder/base/order.py @@ -1,6 +1,5 @@ import logging from dataclasses import dataclass -from typing import Optional, Type, Union log = logging.getLogger(__name__) @@ -29,37 +28,3 @@ class TrancheKey (OrderKey): def __str__(self): return f'{self.vault}|{self.order_index}|{self.tranche_index}' - - -@dataclass -class ExecutionRequest: - height: int - proof: None - - -@dataclass -class TransactionRequest: - type: str # 'te' for tranche execution - -@dataclass -class TrancheExecutionRequest (TransactionRequest): - # type: str # 'te' for tranche execution - vault: str - order_index: int - tranche_index: int - price_proof: Union[None,dict,tuple[int]] - -def new_tranche_execution_request(tk: TrancheKey, _proof: Optional[dict]) -> TrancheExecutionRequest: - return TrancheExecutionRequest('te', tk.vault, tk.order_index, tk.tranche_index, (0,)) # todo proof - -def deserialize_transaction_request(**d): - t = d['type'] - Class = transaction_request_registry.get(t) - if Class is None: - raise ValueError(f'No TransactionRequest for type "{t}"') - # noinspection PyArgumentList - return Class(**d) - -transaction_request_registry: dict[str, Type[TransactionRequest]] = dict( - te = TrancheExecutionRequest, -) diff --git a/src/dexorder/base/orderlib.py b/src/dexorder/base/orderlib.py index 1c55d96..bf0e893 100644 --- a/src/dexorder/base/orderlib.py +++ b/src/dexorder/base/orderlib.py @@ -4,14 +4,21 @@ from dataclasses import dataclass from enum import Enum from typing import Optional -from dexorder.util.convert import decode_IEEE754, encode_IEEE754 +from dexorder.util import hexbytes +from dexorder.util.convert import decode_IEEE754 log = logging.getLogger(__name__) +""" +These dataclasses are meant to closely mirror the raw data on-chain, using native Python types but serializing to +something JSON-able. +""" + class SwapOrderState (Enum): + # This includes on-chain codes as well as additional codes Unknown = -1 - Signing = 0 # only used by the web but here for completeness todo rework OrderLib.sol to remove offchain statuses + Signing = 0 # only used by the web but here for completeness Underfunded = 1 Open = 2 Canceled = 3 @@ -45,6 +52,26 @@ class Route: def dump(self): return self.exchange.value, self.fee +@dataclass +class Line: + intercept: float + slope: float + + def value(self, timestamp): + return self.intercept + self.slope * timestamp + + @staticmethod + def load_from_chain(obj: tuple[int,int]): + return Line(decode_IEEE754(obj[0]), decode_IEEE754(obj[1])) + + @staticmethod + def load(obj: tuple[float,float]): + return Line(*obj) + + def dump(self): + return self.intercept, self.slope + + @dataclass class SwapOrder: tokenIn: str @@ -57,6 +84,10 @@ class SwapOrder: conditionalOrder: int tranches: list['Tranche'] + @property + def min_input_amount(self): + return self.minFillAmount if self.amountIsInput else 0 + @staticmethod def load(obj): return SwapOrder(obj[0], obj[1], Route.load(obj[2]), int(obj[3]), int(obj[4]), obj[5], obj[6], obj[7], @@ -72,7 +103,7 @@ SwapOrder in: {self.tokenIn} out: {self.tokenOut} exchange: {self.route.exchange, self.route.fee} - amount: {"input" if self.amountIsInput else "output"} {self.amount} {"to owner" if self.outputDirectlyToOwner else ""} + amount: {"input" if self.amountIsInput else "output"} {self.amount}{" to owner" if self.outputDirectlyToOwner else ""} minFill: {self.minFillAmount} tranches: ''' @@ -80,118 +111,110 @@ SwapOrder msg += f' {tranche}\n' return msg + @dataclass -class SwapStatus: - # this is an elaborated version of the on-chain status +class ElaboratedTrancheStatus: + filledIn: int + filledOut: int + activationTime: int + startTime: int + endTime: int + + @staticmethod + def load_from_chain(obj: tuple[int,int,int,int]): + filled, activationTime, startTime, endTime = obj + return ElaboratedTrancheStatus( + # we do NOT grab the filled amount from the chain, because our process will handle the fill events + # separately by incrementing these status values as fills arrive. + 0, 0, + activationTime, startTime, endTime, + ) + + def dump(self): + # filled fields can be larger than JSON-able ints, so we use strings. + return str(self.filledIn), str(self.filledOut), self.activationTime, self.startTime, self.endTime + + @staticmethod + def load(obj: tuple[str,str,int,int,int]): + filledIn, filledOut, activationTime, startTime, endTime = obj + return ElaboratedTrancheStatus(int(filledIn), int(filledOut), activationTime, startTime, endTime) + + + +@dataclass +class ElaboratedSwapOrderStatus: + tx_id: bytes + order: SwapOrder fillFeeHalfBps: int state: SwapOrderState startTime: int startPrice: int ocoGroup: Optional[int] - filledIn: Optional[int] # if None then look in the order_filled blockstate - filledOut: Optional[int] # if None then look in the order_filled blockstate - trancheFilledIn: Optional[list[int]] # if None then look in the tranche_filled blockstate - trancheFilledOut: Optional[list[int]] # if None then look in the tranche_filled blockstate - trancheActivationTime: list[int] - - -@dataclass -class SwapOrderStatus(SwapStatus): - order: SwapOrder - - def __init__(self, order: SwapOrder, *swapstatus_args): - """ init with order object first followed by the swap status args""" - super().__init__(*swapstatus_args) - self.order = order + filledIn: int + filledOut: int + trancheStatus: list[ElaboratedTrancheStatus] @staticmethod - def load(obj, *, Class=None): - if Class is None: - Class = SwapOrderStatus - order = SwapOrder.load(obj[0]) - fillFeeHalfBps = int(obj[1]) - state = SwapOrderState(obj[2]) - startTime = obj[3] - startPrice = obj[4] - ocoGroup = None if obj[5] == NO_OCO else obj[5] - filledIn = int(obj[6]) - filledOut = int(obj[7]) - trancheFilledIn = [int(f) for f in obj[8]] - trancheFilledOut = [int(f) for f in obj[9]] - trancheActivationTime = [int(f) for f in obj[10]] - return Class(order, fillFeeHalfBps, state, startTime, startPrice, ocoGroup, - filledIn, filledOut, trancheFilledIn, trancheFilledOut, trancheActivationTime) - - @staticmethod - def load_from_chain(obj, *, Class=None): - if Class is None: - Class = SwapOrderStatus - # 0 SwapOrder order; + def load_from_chain(tx_id: bytes, obj): + # 0 SwapOrder order # 1 int fillFeeHalfBps - # 2 bool canceled; - # 3 uint32 startTime; - # 4 uint32 startPrice; - # 5 uint64 ocoGroup; - # 6 uint256 filled; // total - # 7 uint256[] trancheFilled; // sum(trancheFilled) == filled - # 8 uint32[] trancheActivationTime; + # 2 bool canceled + # 3 uint32 startTime + # 4 uint32 startPrice + # 5 uint64 ocoGroup + # 6 uint256 filled + # 7 ElaboratedTrancheStatus[] trancheStatus - order = SwapOrder.load(obj[0]) - fillFeeHalfBps = obj[1] - state = SwapOrderState.Canceled if obj[2] else SwapOrderState.Open - startTime = obj[3] - startPrice = obj[4] - ocoGroup = None if obj[5] == NO_OCO else obj[5] - # we ignore any fill values from the on-chain struct, because we will subsequently detect the DexorderSwapFilled events and add them in - filledIn = 0 - filledOut = 0 - trancheFilledIn = [0 for _ in range(len(obj[7]))] - trancheFilledOut = [0 for _ in range(len(obj[7]))] - trancheActivationTime = [int(i) for i in obj[8]] - return Class(order, fillFeeHalfBps, state, startTime, startPrice, ocoGroup, - filledIn, filledOut, trancheFilledIn, trancheFilledOut, trancheActivationTime) + item = iter(obj) + + order = SwapOrder.load(next(item)) + fillFeeHalfBps = int(next(item)) + canceled = next(item) + state = SwapOrderState.Canceled if canceled else SwapOrderState.Open + startTime = next(item) + startPrice = next(item) + ocoGroup = next(item) + if ocoGroup == NO_OCO: + ocoGroup = None + # we ignore any fill values from the on-chain struct, because we will subsequently detect the + # DexorderSwapFilled events and add them in + _ignore_filled = next(item) + trancheStatuses = [ElaboratedTrancheStatus.load_from_chain(ts) for ts in next(item)] + for ts in trancheStatuses: + ts.filledIn = 0 + ts.filledOut = 0 + + return ElaboratedSwapOrderStatus(tx_id, order, fillFeeHalfBps, state, startTime, startPrice, + ocoGroup, 0, 0, trancheStatuses) + + @staticmethod + def load(obj): + item = iter(obj) + tx_id = hexbytes(next(item)) + order = SwapOrder.load(next(item)) + fillFeeHalfBps = int(next(item)) + state = SwapOrderState(next(item)) + startTime = next(item) + startPrice = int(next(item)) + ocoGroup = next(item) + if ocoGroup == NO_OCO: + ocoGroup = None + filledIn = int(next(item)) # convert from str + filledOut = int(next(item)) + trancheStatus = [ElaboratedTrancheStatus.load(ts) for ts in next(item)] + return ElaboratedSwapOrderStatus(tx_id, order, fillFeeHalfBps, state, startTime, startPrice, + ocoGroup, filledIn, filledOut, trancheStatus) def dump(self): return ( - self.order.dump(), self.fillFeeHalfBps, self.state.value, self.startTime, self.startPrice, self.ocoGroup, - str(self.filledIn), str(self.filledOut), - [str(f) for f in self.trancheFilledIn], [str(f) for f in self.trancheFilledOut], - [int(i) for i in self.trancheActivationTime] + self.tx_id, self.order.dump(), self.fillFeeHalfBps, self.state.value, self.startTime, str(self.startPrice), + self.ocoGroup, str(self.filledIn), str(self.filledOut), [ts.dump() for ts in self.trancheStatus] ) def copy(self): return copy.deepcopy(self) -@dataclass -class ElaboratedSwapOrderStatus (SwapOrderStatus): - @staticmethod - def load_from_tx(tx_id: bytes, obj): - # noinspection PyTypeChecker - status: ElaboratedSwapOrderStatus = SwapOrderStatus.load_from_chain(obj, Class=ElaboratedSwapOrderStatus) - status.tx_id = tx_id - return status - - # noinspection PyMethodOverriding - @staticmethod - def load(obj): - tx_id, *swaporder_args = obj - result = SwapOrderStatus.load(obj[1:], Class=ElaboratedSwapOrderStatus) - result.tx_id = tx_id - return result - - # noinspection PyMissingConstructor - def __init__(self, order: SwapOrder, *swapstatus_args, tx_id=b''): - super().__init__(order, *swapstatus_args) - self.tx_id: bytes = tx_id - - def dump(self): - return self.tx_id, *super().dump() - - def copy(self): - return super().copy() - - NO_OCO = 18446744073709551615 # max uint64 @@ -224,10 +247,8 @@ class Tranche: startTime: int endTime: int - minIntercept: float - minSlope: float - maxIntercept: float - maxSlope: float + minLine: Line + maxLine: Line def fraction_of(self, amount): @@ -250,40 +271,34 @@ class Tranche: obj[10], # rateLimitPeriod obj[11], # startTime obj[12], # endTime - decode_IEEE754(obj[13]), # minIntercept - decode_IEEE754(obj[14]), # minSlope - decode_IEEE754(obj[15]), # maxIntercept - decode_IEEE754(obj[16]), # maxSlope + Line.load(obj[13]), # minLine + Line.load(obj[14]), # maxLine ) return result def dump(self): - minB = encode_IEEE754(self.minIntercept) - minM = encode_IEEE754(self.minSlope) - maxB = encode_IEEE754(self.maxIntercept) - maxM = encode_IEEE754(self.maxSlope) return ( self.fraction, self.startTimeIsRelative, self.endTimeIsRelative, self.minIsBarrier, self.maxIsBarrier, self.marketOrder, self.minIsRatio, self.maxIsRatio, False, # _reserved7 self.rateLimitFraction, self.rateLimitPeriod, - self.startTime, self.endTime, minB, minM, maxB, maxM, + self.startTime, self.endTime, self.minLine.dump(), self.maxLine.dump(), ) def __str__(self): msg = f'{self.fraction/MAX_FRACTION:.1%} {"start+" if self.startTimeIsRelative else ""}{self.startTime} to {"start+" if self.startTimeIsRelative else ""}{self.endTime}' if self.marketOrder: - # for marketOrders, minIntercept is the slippage - msg += f' market order slippage {self.minIntercept:.2%}' + # for marketOrders, minLine.intercept is the slippage + msg += f' market order slippage {self.minLine.intercept:.2%}' else: - if self.minIntercept or self.minSlope: - msg += f' >{self.minIntercept:.5g}' - if self.minSlope: - msg += f'{self.minSlope:+.5g}' - if self.maxIntercept or self.maxSlope: - msg += f' <{self.maxIntercept:.5g}' - if self.maxSlope: - msg += f'{self.maxSlope:+.5g}' + if self.minLine.intercept or self.minLine.slope: + msg += f' >{self.minLine.intercept:.5g}' + if self.minLine.slope: + msg += f'{self.minLine.slope:+.5g}' + if self.maxLine.intercept or self.maxLine.slope: + msg += f' <{self.maxLine.intercept:.5g}' + if self.maxLine.slope: + msg += f'{self.maxLine.slope:+.5g}' if self.rateLimitPeriod: msg += f' {self.rateLimitFraction/MAX_FRACTION:.1%} every {self.rateLimitPeriod/60:.0} minutes' return msg @@ -293,6 +308,9 @@ class Tranche: class PriceProof: proof: int + def dump(self): + return (self.proof,) + class OcoMode (Enum): NO_OCO = 0 diff --git a/src/dexorder/bin/backfill_ohlc.py b/src/dexorder/bin/backfill_ohlc.py index f20def7..65ee4dd 100644 --- a/src/dexorder/bin/backfill_ohlc.py +++ b/src/dexorder/bin/backfill_ohlc.py @@ -58,7 +58,7 @@ async def main(): runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None, timer_period=0) runner.add_event_trigger(handle_uniswap_swaps, get_contract_event('IUniswapV3PoolEvents', 'Swap'), multi=True) - runner.postprocess_cbs.append(check_ohlc_rollover) + runner.add_callback(check_ohlc_rollover) runner.on_promotion.append(finalize_callback) if db: # noinspection PyUnboundLocalVariable diff --git a/src/dexorder/bin/dice_seed.py b/src/dexorder/bin/dice_seed.py index 4471436..b489c40 100644 --- a/src/dexorder/bin/dice_seed.py +++ b/src/dexorder/bin/dice_seed.py @@ -23,6 +23,8 @@ while True: def bits(b0, b1): bit(b0); bit(b1) + + # noinspection PyBroadException try: i = int(i) assert 1 <= i <= 6 diff --git a/src/dexorder/bin/main.py b/src/dexorder/bin/main.py index fb5ef2b..b2a7bd1 100644 --- a/src/dexorder/bin/main.py +++ b/src/dexorder/bin/main.py @@ -1,7 +1,7 @@ import logging from asyncio import CancelledError -from dexorder import db, blockchain, config +from dexorder import db, blockchain from dexorder.base.chain import current_chain from dexorder.bin.executable import execute from dexorder.blockstate import current_blockstate @@ -9,16 +9,16 @@ from dexorder.blockstate.blockdata import BlockData from dexorder.blockstate.db_state import DbState from dexorder.blockstate.fork import current_fork from dexorder.contract import get_contract_event -from dexorder.contract.dexorder import get_factory_contract, get_dexorder_contract -from dexorder.event_handler import init, dump_log, handle_vault_created, handle_order_placed, \ - handle_transfer, handle_swap_filled, handle_order_canceled, handle_order_cancel_all, handle_dexorderexecutions, \ - activate_time_triggers, activate_price_triggers, \ - process_active_tranches, process_execution_requests, check_ohlc_rollover, handle_uniswap_swaps, handle_vault_logic_changed +from dexorder.contract.dexorder import get_dexorder_contract +from dexorder.event_handler import (init, dump_log, handle_vault_created, handle_order_placed, + handle_transfer, handle_swap_filled, handle_order_canceled, handle_order_cancel_all, + handle_uniswap_swaps, handle_vault_logic_changed) from dexorder.memcache import memcache from dexorder.memcache.memcache_state import RedisState, publish_all -from dexorder.order.triggers import activate_orders +from dexorder.order.executionhandler import handle_dexorderexecutions, execute_tranches +from dexorder.order.triggers import activate_orders, end_trigger_updates from dexorder.runner import BlockStateRunner -from dexorder.transaction import handle_transaction_receipts, finalize_transactions +from dexorder.transactions import handle_transaction_receipts, finalize_transactions log = logging.getLogger('dexorder') LOG_ALL_EVENTS = False # for debug todo config @@ -44,7 +44,10 @@ def setup_logevent_triggers(runner): else: executions = dexorder.events.DexorderExecutions() - runner.add_event_trigger(init) + # the callbacks are run even if there's no blocks and the regular timer triggers. event triggers only run when + # a block is received. + + runner.add_callback(init) runner.add_event_trigger(handle_vault_created, get_contract_event('Vault', 'VaultCreated')) runner.add_event_trigger(handle_vault_logic_changed, get_contract_event('Vault', 'VaultLogicChanged')) runner.add_event_trigger(handle_order_placed, get_contract_event('OrderLib', 'DexorderSwapPlaced')) @@ -53,15 +56,12 @@ def setup_logevent_triggers(runner): runner.add_event_trigger(handle_swap_filled, get_contract_event('OrderLib', 'DexorderSwapFilled')) runner.add_event_trigger(handle_order_canceled, get_contract_event('OrderLib', 'DexorderSwapCanceled')) runner.add_event_trigger(handle_order_cancel_all, get_contract_event('OrderLib', 'DexorderCancelAll')) + runner.add_event_trigger(handle_transaction_receipts) # todo handle only the transactions that were posted to this block runner.add_event_trigger(handle_dexorderexecutions, executions) - # these callbacks run after the ones above on each block, plus these also run every second - runner.postprocess_cbs.append(check_ohlc_rollover) - runner.postprocess_cbs.append(activate_time_triggers) - runner.postprocess_cbs.append(activate_price_triggers) - runner.postprocess_cbs.append(process_active_tranches) - runner.postprocess_cbs.append(process_execution_requests) + runner.add_callback(end_trigger_updates) + runner.add_callback(execute_tranches) # noinspection DuplicatedCode diff --git a/src/dexorder/bin/mirror.py b/src/dexorder/bin/mirror.py index 26751a0..9ff331e 100644 --- a/src/dexorder/bin/mirror.py +++ b/src/dexorder/bin/mirror.py @@ -146,6 +146,7 @@ async def main(): log.debug(f'Mirroring tokens') txs = [] for t in tokens: + # noinspection PyBroadException try: info = await get_token_info(t) # anvil had trouble estimating the gas, so we hardcode it. @@ -163,6 +164,7 @@ async def main(): log.debug(f'Mirroring pools {", ".join(pools)}') txs = [] for pool, info in zip(pools, pool_infos): + # noinspection PyBroadException try: # anvil had trouble estimating the gas, so we hardcode it. tx = await mirrorenv.transact.mirrorPool(info, gas=5_500_000) diff --git a/src/dexorder/blocks.py b/src/dexorder/blocks.py index cf622ba..7c416d1 100644 --- a/src/dexorder/blocks.py +++ b/src/dexorder/blocks.py @@ -12,7 +12,6 @@ from typing import Union, Optional from cachetools import LRUCache from sqlalchemy import select -from sqlalchemy.dialects.postgresql import insert from dexorder import current_w3, config, db, Blockchain from dexorder.base.block import Block, BlockInfo, latest_block @@ -56,18 +55,11 @@ async def _fetch(fetch: FetchLock, chain_id: int, block_id: Union[int,bytes]) -> return Block(chain_id, found.data) # fetch from RPC - try: - if type(block_id) is int: - fetch.result = await fetch_block_by_number(block_id, chain_id=chain_id) - else: - fetch.result = await fetch_block(block_id, chain_id=chain_id) - return fetch.result - except Exception as e: - fetch.exception = e - fetch.result = None - raise - finally: - fetch.ready.set() + if type(block_id) is int: + fetch.result = await fetch_block_by_number(block_id, chain_id=chain_id) + else: + fetch.result = await fetch_block(block_id, chain_id=chain_id) + return fetch.result _lru = LRUCache[tuple[int, Union[int,bytes]], Block](maxsize=256) @@ -78,9 +70,9 @@ def cache_block(block: Block, confirmed=False): _lru[block.chain_id, block.hash] = block _lru[block.chain_id, block.height] = block if db: - db.session.execute(insert(DbBlock).values( + db.session.add(DbBlock( chain=block.chain_id, hash=block.hash, height=block.height, timestamp=block.timestamp, - confirmed=confirmed, data=block.data).on_conflict_do_nothing()) + confirmed=confirmed, data=block.data)) async def get_block(block_id: Union[bytes,int], *, chain_id=None) -> Block: @@ -91,7 +83,6 @@ async def get_block(block_id: Union[bytes,int], *, chain_id=None) -> Block: key = chain_id, block_id # try LRU cache synchronously first try: - # log.debug(f'\thit LRU') return _lru[key] except KeyError: pass @@ -118,6 +109,7 @@ async def get_block(block_id: Union[bytes,int], *, chain_id=None) -> Block: finally: # log.debug(f'fetch.result {fetch.result}') del _fetch_locks[key] + fetch.ready.set() # log.debug(f'\t{fetch.result}') return fetch.result diff --git a/src/dexorder/blockstate/state.py b/src/dexorder/blockstate/state.py index 414f3a5..0e7f869 100644 --- a/src/dexorder/blockstate/state.py +++ b/src/dexorder/blockstate/state.py @@ -13,7 +13,6 @@ from .diff import DiffEntry, DELETE, DiffEntryItem from ..base.block import Block from ..base.chain import current_chain from ..blocks import promotion_height -from ..util import hexstr log = logging.getLogger(__name__) state_log = logging.getLogger('dexorder.state') @@ -158,14 +157,16 @@ class BlockState: def remove_branch(self, branch: Branch, *, remove_series_diffs=True): if branch.height == self.height and len(self.branches_by_height[branch.height]) == 1: # this is the only branch at this height: compute the new lower height - self.height = max(0, *[b.height for b in self.branches_by_id.values() if b is not branch]) + other_heights = [b.height for b in self.branches_by_id.values() if b is not branch] + self.height = 0 if not other_heights else max(0, *other_heights) del self.branches_by_id[branch.id] - by_height = self.branches_by_height.get(branch.height) - if by_height is not None: - by_height.remove(branch) - if len(by_height) == 0: - # garbage collect empty arrays - del self.branches_by_height[branch.height] + if self.height: + by_height = self.branches_by_height.get(branch.height) + if by_height is not None: + by_height.remove(branch) + if len(by_height) == 0: + # garbage collect empty arrays + del self.branches_by_height[branch.height] try: del self.unloads[branch.id] except KeyError: diff --git a/src/dexorder/configuration/schema.py b/src/dexorder/configuration/schema.py index 6f85ac4..265b342 100644 --- a/src/dexorder/configuration/schema.py +++ b/src/dexorder/configuration/schema.py @@ -31,6 +31,12 @@ class Config: accounts: Optional[dict[str,str]] = field(default_factory=dict) # account aliases min_gas: str = '0' + # Order slashing + slash_kill_count: int = 5 + slash_delay_base: float = 60 # one minute + slash_delay_mul: float = 2 # double the delay each time + slash_delay_max: int = 15 * 60 + walker_name: str = 'default' walker_flush_interval: float = 300 walker_stop: Optional[int] = None # block number of the last block the walker should process diff --git a/src/dexorder/configuration/standard_accounts.py b/src/dexorder/configuration/standard_accounts.py index 29d4b65..41c281c 100644 --- a/src/dexorder/configuration/standard_accounts.py +++ b/src/dexorder/configuration/standard_accounts.py @@ -1,14 +1,15 @@ test_accounts = { - 'test0': '0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80', - 'test1': '0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d', - 'test2': '0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a', - 'test3': '0x7c852118294e51e653712a81e05800f419141751be58f605c371e15141b007a6', - 'test4': '0x47e179ec197488593b187f80a00eb0da91f1b9d0b13f8733639f19c30a34926a', - 'test5': '0x8b3a350cf5c34c9194ca85829a2df0ec3153be0318b5e2d3348e872092edffba', - 'test6': '0x92db14e403b83dfe3df233f83dfa3a0d7096f21ca9b0d6d6b8d88b2b4ec1564e', - 'test7': '0x4bbbf85ce3377467afe5d46f804f221813b2bb87f24d81f60f1fcdbf7cbf4356', - 'test8': '0xdbda1821b80551c9d65939329250298aa3472ba22feea921c0cf5d620ea67b97', - 'test9': '0x2a871d0798f97d79848a013d4936a73bf4cc922c825d33c1cf7073dff6d409c6', + # 'account_name': '0x_private_key', # public address + 'test0': '0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80', # 0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266 + 'test1': '0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d', # 0x70997970C51812dc3A010C7d01b50e0d17dc79C8 + 'test2': '0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a', # 0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC + 'test3': '0x7c852118294e51e653712a81e05800f419141751be58f605c371e15141b007a6', # 0x90F79bf6EB2c4f870365E785982E1f101E93b906 + 'test4': '0x47e179ec197488593b187f80a00eb0da91f1b9d0b13f8733639f19c30a34926a', # 0x15d34AAf54267DB7D7c367839AAf71A00a2C6A65 + 'test5': '0x8b3a350cf5c34c9194ca85829a2df0ec3153be0318b5e2d3348e872092edffba', # 0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc + 'test6': '0x92db14e403b83dfe3df233f83dfa3a0d7096f21ca9b0d6d6b8d88b2b4ec1564e', # 0x976EA74026E726554dB657fA54763abd0C3a0aa9 + 'test7': '0x4bbbf85ce3377467afe5d46f804f221813b2bb87f24d81f60f1fcdbf7cbf4356', # 0x14dC79964da2C08b23698B3D3cc7Ca32193d9955 + 'test8': '0xdbda1821b80551c9d65939329250298aa3472ba22feea921c0cf5d620ea67b97', # 0x23618e81E3f5cdF7f54C3d65f7FBc0aBf5B21E8f + 'test9': '0x2a871d0798f97d79848a013d4936a73bf4cc922c825d33c1cf7073dff6d409c6', # 0xa0Ee7A142d267C1f36714E4a8F75612F20a79720 } default_accounts_config = {} diff --git a/src/dexorder/contract/__init__.py b/src/dexorder/contract/__init__.py index a26acdd..b8e98b6 100644 --- a/src/dexorder/contract/__init__.py +++ b/src/dexorder/contract/__init__.py @@ -1,8 +1,10 @@ +import glob import json import os from eth_abi.exceptions import InsufficientDataBytes from eth_utils import to_checksum_address +from typing_extensions import Union from web3.exceptions import BadFunctionCallOutput, ContractLogicError from .abi import abis @@ -13,20 +15,31 @@ from ..base.chain import current_chain CONTRACT_ERRORS = (InsufficientDataBytes, ContractLogicError, BadFunctionCallOutput) -def get_abi(name, filename=None): - return get_contract_data(name, filename)['abi'] +# set initially to the string filename, then loaded on demand and set to the parsed JSON result +_contract_data: dict[str,Union[str,dict]] = {} + +# finds all .sol files and sets _contract_data with their pathname +for _file in glob.glob('../contract/out/**/*.sol/*.json', recursive=True): + if os.path.isfile(_file): + _contract_data[os.path.basename(_file)[:-5]] = _file -def get_contract_data(name, filename=None): - if filename is None and name in abis: +def get_abi(name): + return get_contract_data(name)['abi'] + + +def get_contract_data(name): + try: return {'abi':abis[name]} - if filename is None and name == "Vault" and os.path.exists(f'../contract/out/IVault.sol/IVault.json') : - # logging.debug("getting abi from IVault.json instead of Vault.json") - name = "IVault" # Special case for proxy Vault - if filename is None: - filename = name - with open(f'../contract/out/{filename}.sol/{name}.json', 'rt') as file: - return json.load(file) + except KeyError: + pass + if name == 'Vault': + name = 'IVault' # special exception due to use of a proxy + entry = _contract_data[name] + if type(entry) is str: + with open(entry, 'rt') as file: + entry = _contract_data[name] = json.load(file) + return entry def get_deployment_address(deployment_name, contract_name, *, chain_id=None): diff --git a/src/dexorder/contract/contract_proxy.py b/src/dexorder/contract/contract_proxy.py index 0513eca..5df3a4f 100644 --- a/src/dexorder/contract/contract_proxy.py +++ b/src/dexorder/contract/contract_proxy.py @@ -7,7 +7,6 @@ from web3.exceptions import Web3Exception from web3.types import TxReceipt, TxData from dexorder import current_w3, Account -from dexorder.base.account import current_account from dexorder.blockstate.fork import current_fork from dexorder.util import hexstr @@ -91,7 +90,7 @@ def transact_wrapper(addr, name, func): return f -def build_wrapper(addr, name, func): +def build_wrapper(_addr, _name, func): async def f(*args, **kwargs): tx = await func(*args).build_transaction(kwargs) return ContractTransaction(tx) diff --git a/src/dexorder/database/__init__.py b/src/dexorder/database/__init__.py index 6b114f5..bfc42f0 100644 --- a/src/dexorder/database/__init__.py +++ b/src/dexorder/database/__init__.py @@ -32,6 +32,7 @@ class Kv: found.value = value def __delitem__(self, key: str): + # noinspection PyTypeChecker db.session.query(KeyValue).filter(KeyValue.key == key).delete() def get(self, key: str, default=None): diff --git a/src/dexorder/database/model/transaction.py b/src/dexorder/database/model/transaction.py index 4a6f48a..b30eb27 100644 --- a/src/dexorder/database/model/transaction.py +++ b/src/dexorder/database/model/transaction.py @@ -3,11 +3,10 @@ from enum import Enum from typing import Optional import sqlalchemy as sa -from sqlalchemy import ForeignKey -from sqlalchemy.orm import mapped_column, Mapped, relationship +from sqlalchemy.orm import mapped_column, Mapped -from dexorder.base.order import TransactionRequest as TransactionRequestDict, deserialize_transaction_request -from dexorder.database.column import Dict, Bytes, UUID_PK, Blockchain, UUID +from dexorder.base import TransactionRequest, transaction_request_registry +from dexorder.database.column import Dict, Bytes, UUID_PK, Blockchain from dexorder.database.column_types import DataclassDict from dexorder.database.model import Base @@ -18,6 +17,7 @@ class TransactionJobState (Enum): Signed = 'n' # tx has been signed Sent = 's' # tx has been delivered to a node Mined = 'z' # mined on at least one fork, whether reverted or not. todo handle forks that didnt confirm: receipts are per-fork! + Error = 'x' # an exception has prevented this job from sending a transaction # noinspection PyProtectedMember @@ -25,12 +25,21 @@ class TransactionJobState (Enum): TransactionJobStateColumnType = sa.Enum(TransactionJobState) +def deserialize_transaction_request(**d): + t = d['type'] + Class = transaction_request_registry.get(t) + if Class is None: + raise ValueError(f'No TransactionRequest for type "{t}"') + # noinspection PyArgumentList + return Class(**d) + + class TransactionJob (Base): id: Mapped[UUID_PK] chain: Mapped[Blockchain] = mapped_column(index=True) height: Mapped[int] = mapped_column(index=True) # the height at which the job was created, to be used for timeout/ data rolloff and/or by Timescale state: Mapped[TransactionJobState] = mapped_column(TransactionJobStateColumnType, default=TransactionJobState.Requested, index=True) - request: Mapped[TransactionRequestDict] = mapped_column(DataclassDict(deserialize_transaction_request)) + request: Mapped[TransactionRequest] = mapped_column(DataclassDict(deserialize_transaction_request)) tx_id: Mapped[Optional[Bytes]] = mapped_column(index=True) tx_data: Mapped[Optional[Bytes]] receipt: Mapped[Optional[Dict]] diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index ff636f6..7611ae2 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -1,30 +1,21 @@ import asyncio -import itertools import logging -from uuid import UUID from web3.types import EventData -from dexorder import current_pub, db, from_timestamp, minutely -from dexorder.base.chain import current_chain, current_clock -from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, \ - OrderKey +from dexorder import current_pub, minutely +from dexorder.base.chain import current_chain +from dexorder.base.order import TrancheKey, OrderKey from dexorder.base.orderlib import SwapOrderState from dexorder.blocks import get_block_timestamp -from dexorder.blockstate.fork import current_fork -from dexorder.contract import ERC20 from dexorder.contract.dexorder import vault_address, VaultContract, get_factory_contract -from dexorder.database.model.transaction import TransactionJob from dexorder.logics import get_logic_version -from dexorder.ohlc import ohlcs, recent_ohlcs +from dexorder.ohlc import ohlcs from dexorder.order.orderstate import Order -from dexorder.order.triggers import OrderTriggers, price_triggers, time_triggers, \ - unconstrained_price_triggers, execution_requests, inflight_execution_requests, TrancheStatus, active_tranches, \ - new_price_triggers, activate_order, close_order_and_disable_triggers +from dexorder.order.triggers import (OrderTriggers, activate_order, update_balance_triggers, start_trigger_updates, + update_price_triggers) from dexorder.pools import new_pool_prices, pool_prices, get_uniswap_data -from dexorder.transaction import submit_transaction_request -from dexorder.util.async_util import maywait -from dexorder.vault_blockdata import vault_owners, vault_balances, adjust_balance, MAX_VAULTS, verify_vault +from dexorder.vault_blockdata import vault_owners, adjust_balance, MAX_VAULTS, verify_vault log = logging.getLogger(__name__) @@ -35,7 +26,7 @@ def dump_log(eventlog): def init(): new_pool_prices.clear() - new_price_triggers.clear() + start_trigger_updates() async def handle_order_placed(event: EventData): @@ -44,6 +35,9 @@ async def handle_order_placed(event: EventData): addr = event['address'] start_index = int(event['args']['startOrderIndex']) num_orders = int(event['args']['numOrders']) + # todo accounting + order_fee = int(event['args']['orderFee']) + gas_fee = int(event['args']['gasFee']) log.debug(f'DexorderPlaced {addr} {start_index} {num_orders}') if not await verify_vault(addr): log.warning(f'Discarding order from rogue vault {addr}.') @@ -62,6 +56,7 @@ async def handle_order_placed(event: EventData): def handle_swap_filled(event: EventData): + log.debug('handle_swap_filled') # event DexorderSwapFilled (uint64 orderIndex, uint8 trancheIndex, uint256 amountIn, uint256 amountOut); log.debug(f'DexorderSwapFilled {event}') args = event['args'] @@ -70,6 +65,7 @@ def handle_swap_filled(event: EventData): tranche_index = args['trancheIndex'] amount_in = args['amountIn'] amount_out = args['amountOut'] + # todo accounting fill_fee = args['fillFee'] next_execution_time = args['nextExecutionTime'] try: @@ -77,7 +73,7 @@ def handle_swap_filled(event: EventData): except KeyError: log.warning(f'DexorderSwapFilled IGNORED due to missing order {vault} {order_index}') return - order.status.trancheActivationTime[tranche_index] = next_execution_time # update rate limit + order.status.trancheStatus[tranche_index].activationTime = next_execution_time # update rate limit try: triggers = OrderTriggers.instances[order.key] except KeyError: @@ -118,28 +114,27 @@ async def handle_transfer(transfer: EventData): # log.debug(f'Transfer {transfer}') from_address = transfer['args']['from'] to_address = transfer['args']['to'] + if to_address == from_address: + return amount = int(transfer['args']['value']) - if to_address in vault_owners and to_address != from_address: + if to_address in vault_owners: log.debug(f'deposit {to_address} {amount}') vault = to_address - token_address = transfer['address'] - await adjust_balance(vault, token_address, amount) - if from_address in vault_owners and to_address != from_address: + elif from_address in vault_owners: log.debug(f'withdraw {to_address} {amount}') vault = from_address + else: + vault = None + if vault is not None: token_address = transfer['address'] await adjust_balance(vault, token_address, amount) - # if to_address not in vault_owners and from_address not in vault_owners: - # vaults = vault_owners.keys() - # log.debug(f'vaults: {list(vaults)}') + await update_balance_triggers(vault, token_address, amount) async def handle_uniswap_swaps(swaps: list[EventData]): # asynchronously prefetch the block timestamps we'll need - block_ids = set(swap['blockHash'] for swap in swaps) - for batch in itertools.batched(block_ids, 4): - await asyncio.gather(*[get_block_timestamp(h) for h in batch]) - + hashes = set(swap['blockHash'] for swap in swaps) + await asyncio.gather(*[get_block_timestamp(h) for h in hashes]) # now execute the swaps synchronously for swap in swaps: await handle_uniswap_swap(swap) @@ -153,6 +148,7 @@ async def handle_uniswap_swap(swap: EventData): addr = pool['address'] pool_prices[addr] = price await ohlcs.update_all(addr, time, price) + await update_price_triggers(pool, price) log.debug(f'pool {addr} {minutely(time)} {price}') @@ -198,155 +194,3 @@ async def handle_vault_logic_changed(upgrade: EventData): version = await get_logic_version(logic) log.debug(f'Vault {addr} upgraded to logic version {version}') - -async def activate_time_triggers(): - now = current_clock.get().timestamp - # log.debug(f'activating time triggers at {now}') - # time triggers - for tt in tuple(time_triggers): - await maywait(tt(now)) - - -async def activate_price_triggers(): - # log.debug(f'activating price triggers') - pools_triggered = set() - for pool, price in new_pool_prices.items(): - pools_triggered.add(pool) - for pt in tuple(price_triggers[pool]): - await maywait(pt(price)) - for pool, triggers in new_price_triggers.items(): - if pool not in pools_triggered: - price = pool_prices[pool] - for pt in triggers: - await maywait(pt(price)) - for t in tuple(unconstrained_price_triggers): - await maywait(t(None)) - - -async def process_active_tranches(): - for tk, proof in active_tranches.items(): - old_req = execution_requests.get(tk) - height = current_fork.get().height - if old_req is None or old_req.height <= height: # '<=' is used so proof is updated with more recent values - if await has_funds(tk): - log.info(f'execution request for {tk}') - execution_requests[tk] = ExecutionRequest(height, proof) - # else: - # log.debug(f'underfunded tranche {tk}') - - -async def has_funds(tk: TrancheKey): - # log.debug(f'has funds? {tk.vault}') - order = Order.of(tk) - minimum = order.status.order.minFillAmount if order.amount_is_input else 0 - balances = vault_balances.get(tk.vault, {}) - token_addr = order.status.order.tokenIn - token_balance = balances.get(token_addr) - if token_balance is None: - # unknown balance - token_balance = balances[token_addr] = await ERC20(token_addr).balanceOf(tk.vault) - log.debug(f'queried token balance {token_addr}.balanceOf({tk.vault}) = {token_balance}') - await adjust_balance(tk.vault, token_addr, token_balance) - # log.debug(f'minimum {minimum} balances {token_addr} {balances}') - return token_balance > minimum - - -async def process_execution_requests(): - height = current_fork.get().height - execs = {} # which requests to act on - for tk, er in execution_requests.items(): - tk: TrancheKey - er: ExecutionRequest - pending = inflight_execution_requests.get(tk) - if pending is None or height-pending >= 30: - # todo execution timeout => retry ; should we use timestamps? configure per-chain. - # todo check balances - log.warning(f're-sending unconfirmed transaction {tk} is pending execution') - execs[tk] = er - else: - log.debug(f'tranche {tk} is pending execution') - - # execute the list - # todo batch execution - for tk, er in execs.items(): - job = submit_transaction_request(new_tranche_execution_request(tk, er.proof)) - inflight_execution_requests[tk] = height - log.info(f'created job {job.id} to execute tranche {tk}') - - -def handle_dexorderexecutions(event: EventData): - log.debug(f'executions {event}') - exe_id = UUID(bytes=event['args']['id']) - errors = event['args']['errors'] - if len(errors) == 0: - log.warning(f'No errors found in DexorderExecutions event: {event}') - return - if len(errors) > 1: - log.warning(f'Multiple executions not yet implemented') - job: TransactionJob = db.session.get(TransactionJob, exe_id) - if job is None: - log.warning(f'Job {exe_id} not found!') - return - finish_execution_request(job.request, errors[0]) - - -def finish_execution_request(req: TrancheExecutionRequest, error: str): - try: - order: Order = Order.of(req.vault, req.order_index) - except KeyError: - log.error(f'Could not get order {OrderKey(req.vault, req.order_index)}') - return - tk = TrancheKey(req.vault, req.order_index, req.tranche_index) - try: - del execution_requests[tk] - except KeyError: - pass - if error != '': - log.debug(f'execution request for tranche {tk} had error "{error}"') - if error == '': - log.debug(f'execution request for tranche {tk} was successful!') - elif error == 'IIA': - # Insufficient Input Amount or Safe Transfer Failure: suspend execution until new funds are sent - # todo vault balance checks - token = order.order.tokenIn - log.debug(f'insufficient funds {req.vault} {token} ') - elif error == 'SPL': - # Square-root price limit from Uniswap means we asked for a limit price that isn't met. This is a fault of - # vault logic if it happens. - log.warning(f'SPL when executing tranche {tk}') - close_order_and_disable_triggers(order, SwapOrderState.Error) - elif error == 'NO': - # order is not open - log.warning(f'order {OrderKey(tk.vault,tk.order_index)} was closed, undetected!') - close_order_and_disable_triggers(order, SwapOrderState.Error) - elif error == 'TF': - # Tranche Filled - log.warning(f'tranche already filled {tk}') - try: - triggers = OrderTriggers.instances[order.key] - tranche_trigger = triggers.triggers[tk.tranche_index] - except KeyError: - pass - else: - tranche_trigger.status = TrancheStatus.Filled - tranche_trigger.disable() - elif error == 'Too little received': - # from UniswapV3 SwapRouter when not even 1 satoshi of output was gained - log.debug('warning: de minimis liquidity in pool') - # todo dont keep trying - else: - # todo slash and backoff - log.error(f'Unhandled execution error for transaction request {req} ERROR: "{error}"') - - -last_ohlc_rollover = 0 -async def check_ohlc_rollover(): - global last_ohlc_rollover - time = await get_block_timestamp(current_fork.get().head_identifier) - dt = from_timestamp(time) - diff = time - last_ohlc_rollover - if diff >= 60 or dt.minute != from_timestamp(last_ohlc_rollover).minute: - for (symbol, period) in recent_ohlcs.keys(): - await ohlcs.update(symbol, period, dt) - last_ohlc_rollover = time - diff --git a/src/dexorder/logics.py b/src/dexorder/logics.py index b0e8fc3..139f6ef 100644 --- a/src/dexorder/logics.py +++ b/src/dexorder/logics.py @@ -12,6 +12,6 @@ async def get_logic_version(addr): try: return logics[addr] except KeyError: - version = await ContractProxy(addr, abi=get_abi('IVaultLogic', 'IVault')).version() + version = await ContractProxy(addr, abi=get_abi('IVaultLogic')).version() logics[addr] = version return version diff --git a/src/dexorder/order/executionhandler.py b/src/dexorder/order/executionhandler.py index 5473187..67cb179 100644 --- a/src/dexorder/order/executionhandler.py +++ b/src/dexorder/order/executionhandler.py @@ -1,37 +1,171 @@ import logging +from typing import Optional from uuid import UUID from web3.exceptions import ContractPanicError, ContractLogicError +from web3.types import EventData from dexorder import db -from dexorder.base.order import TrancheExecutionRequest, TrancheKey -from dexorder.transaction import TransactionHandler +from dexorder.base.order import TrancheKey, OrderKey +from dexorder.base.orderlib import SwapOrderState, PriceProof from dexorder.contract.dexorder import get_dexorder_contract from dexorder.database.model.transaction import TransactionJob -from dexorder.order.triggers import inflight_execution_requests +from dexorder.order.orderstate import Order +from dexorder.order.triggers import (inflight_execution_requests, OrderTriggers, + close_order_and_disable_triggers, TrancheState, active_tranches) +from dexorder.transactions import TransactionHandler, TrancheExecutionRequest, submit_transaction_request, \ + new_tranche_execution_request +from dexorder.util import hexbytes log = logging.getLogger(__name__) + class TrancheExecutionHandler (TransactionHandler): def __init__(self): super().__init__('te') async def build_transaction(self, job_id: UUID, req: TrancheExecutionRequest) -> dict: - # noinspection PyBroadException + tk = req.tranche_key try: return await get_dexorder_contract().build.execute(job_id.bytes, (req.vault, req.order_index, req.tranche_index, req.price_proof)) - except (ContractPanicError, ContractLogicError) as x: - # todo if there's a logic error we shouldn't keep trying - log.error(f'While executing job {job_id}: {x}') - await self.complete_transaction(db.session.get(TransactionJob, job_id)) - except Exception: - log.exception(f'Could not send execution request {req}') + except ContractPanicError as x: + exception = x + errcode = '' + except ContractLogicError as x: + exception = x + errcode = hexbytes(x.args[1]).decode('utf-8') + log.error(f'While building execution for tranche {tk}: {errcode}') + # if there's a logic error we shouldn't keep trying + finish_execution_request(tk, errcode) + raise exception async def complete_transaction(self, job: TransactionJob) -> None: + # noinspection PyTypeChecker req: TrancheExecutionRequest = job.request tk = TrancheKey(req.vault, req.order_index, req.tranche_index) log.debug(f'completing execution request {tk}') - del inflight_execution_requests[tk] + finish_execution_request(tk) TrancheExecutionHandler() # map 'te' to a TrancheExecutionHandler + + +def finish_execution_request(tk: TrancheKey, error: Optional[str]=None): + order_key = OrderKey(tk.vault, tk.order_index) + try: + order: Order = Order.of(order_key) + except KeyError: + log.error(f'Could not get order {order_key}') + return + + try: + inflight_execution_requests.remove(tk) + except KeyError: + pass + + def get_trigger(): + try: + return OrderTriggers.instances[order.key].triggers[tk.tranche_index] + except KeyError: + return None + + def slash(): + trig = get_trigger() + if trig is not None: + trig.slash() + + # + # execute() error handling + # + if error is None: + log.debug(f'execution request for tranche {tk} was successful!') + elif error == 'IIA': + # Insufficient Input Amount + token = order.order.tokenIn + log.debug(f'insufficient funds {tk.vault} {token} ') + elif error == 'SPL': + # todo tight slippage can cause excessive executions as the backend repeatedly retries the remainder. The symptom is error 'SPL'. + # Square-root price limit from Uniswap means we asked for a limit price that isn't met. This is a fault of + # vault logic if it happens. + log.warning(f'SPL when executing tranche {tk}') + close_order_and_disable_triggers(order, SwapOrderState.Error) + elif error == 'NO': + # order is not open + log.warning(f'order {order_key} was closed, undetected!') + close_order_and_disable_triggers(order, SwapOrderState.Error) # We do not know if it was filled or not so only Error status can be given + elif error == 'TF': + # Tranche Filled + log.warning(f'tranche already filled {tk}') + tranche_trigger = get_trigger() + if tranche_trigger is not None: + tranche_trigger.status = TrancheState.Filled + tranche_trigger.disable() + elif error == 'Too little received': + # from UniswapV3 SwapRouter when not even 1 satoshi of output was gained + log.debug('warning: de minimis liquidity in pool') + slash() + elif error == 'RL': + log.debug(f'tranche {tk} execution failed due to "RL" rate limit') + pass + elif error == 'TE': + log.debug(f'tranche {tk} execution failed due to "TE" too early') + pass + elif error == 'TL': + log.debug(f'tranche {tk} execution failed due to "TL" too late') + pass + elif error == 'LL': + log.debug(f'tranche {tk} execution failed due to "LL" lower limit') + pass + elif error == 'LU': + log.debug(f'tranche {tk} execution failed due to "LU" upper limit') + pass + elif error == 'OVR': + log.warning(f'tranche {tk} execution failed due to "OVR" overfilled') + # this should never happen. Shut down the order. + close_order_and_disable_triggers(order, SwapOrderState.Error) + elif error == 'K': + log.error(f'vault killed') + close_order_and_disable_triggers(order, SwapOrderState.Error) + elif error == 'STF': + log.error(f'tranche {tk} execution failed due to "STF" safe transfer failure') + close_order_and_disable_triggers(order, SwapOrderState.Error) + else: + slash() + msg = '' if not error else error + log.error(f'Unhandled execution error for tranche {tk} ERROR: {msg}') + + +def execute_tranches(): + new_execution_requests = [] + for tk, proof in active_tranches.items(): + if tk not in inflight_execution_requests: + new_execution_requests.append((tk, proof)) + # todo order requests and batch + for tk, proof in new_execution_requests: + create_execution_request(tk, proof) + + +def create_execution_request(tk: TrancheKey, proof: PriceProof): + inflight_execution_requests.add(tk) + job = submit_transaction_request(new_tranche_execution_request(tk, proof)) + log.debug(f'Executing {tk} as job {job.id}') + return job + + +def handle_dexorderexecutions(event: EventData): + log.debug(f'executions {event}') + exe_id = UUID(bytes=event['args']['id']) + errors = event['args']['errors'] + if len(errors) == 0: + log.warning(f'No errors found in DexorderExecutions event: {event}') + return + if len(errors) > 1: + log.warning(f'Multiple executions not yet implemented') + job: TransactionJob = db.session.get(TransactionJob, exe_id) + if job is None: + log.warning(f'Job {exe_id} not found!') + return + # noinspection PyTypeChecker + req: TrancheExecutionRequest = job.request + tk = TrancheKey(req.vault, req.order_index, req.tranche_index) + finish_execution_request(tk, None if errors[0] == '' else errors[0]) diff --git a/src/dexorder/order/orderstate.py b/src/dexorder/order/orderstate.py index b03e4e0..24349fa 100644 --- a/src/dexorder/order/orderstate.py +++ b/src/dexorder/order/orderstate.py @@ -6,7 +6,7 @@ from typing import overload from dexorder import DELETE, db, order_log from dexorder.base.chain import current_chain from dexorder.base.order import OrderKey, TrancheKey -from dexorder.base.orderlib import SwapOrderStatus, SwapOrderState, ElaboratedSwapOrderStatus +from dexorder.base.orderlib import SwapOrderState, ElaboratedSwapOrderStatus from dexorder.blockstate import BlockDict, BlockSet from dexorder.database.model.orderindex import OrderIndex from dexorder.routing import pool_address @@ -16,6 +16,8 @@ from dexorder.vault_blockdata import vault_owners log = logging.getLogger(__name__) +# We split off the fill information for efficient communication to clients. + @dataclass class Filled: filled_in: int @@ -79,16 +81,16 @@ class Order: key = OrderKey(vault, order_index) if key in Order.instances: raise ValueError - status = ElaboratedSwapOrderStatus.load_from_tx(tx_id, obj) + status = ElaboratedSwapOrderStatus.load_from_chain(tx_id, obj) Order.order_statuses[key] = status.copy() # always copy the struct when setting. values in BlockData must be immutable order = Order(key) if order.is_open: Order.open_orders.add(key) Order.vault_open_orders.listappend(key.vault, key.order_index) # Start with a filled value of 0 even if the chain says otherwise, because we will process the fill events later and add them in - tranche_filled = [Filled(0,0) for _ in range(len(status.trancheFilledIn))] + tranche_filled = [Filled(0, 0) for _ in range(len(status.trancheStatus))] order_log.debug(f'initialized order_filled[{key}]') - Order.order_filled[key] = OrderFilled(Filled(0,0), tranche_filled) + Order.order_filled[key] = OrderFilled(Filled(0, 0), tranche_filled) order_log.debug(f'order created {key}') return order @@ -103,9 +105,9 @@ class Order: key = a if b is None else OrderKey(a, b) assert key not in Order.instances self.key = key - self.status: SwapOrderStatus = Order.order_statuses[key].copy() + self.status: ElaboratedSwapOrderStatus = Order.order_statuses[key].copy() self.pool_address: str = pool_address(self.status.order) - self.tranche_keys = [TrancheKey(key.vault, key.order_index, i) for i in range(len(self.status.trancheFilledIn))] + self.tranche_keys = [TrancheKey(key.vault, key.order_index, i) for i in range(len(self.status.trancheStatus))] # flattenings of various static data self.order = self.status.order self.amount = self.status.order.amount @@ -132,11 +134,11 @@ class Order: def tranche_filled_in(self, tranche_index: int): return Order.order_filled[self.key].tranche_filled[tranche_index].filled_in if self.is_open \ - else self.status.trancheFilledIn[tranche_index] + else self.status.trancheStatus[tranche_index].filledIn def tranche_filled_out(self, tranche_index: int): return Order.order_filled[self.key].tranche_filled[tranche_index].filled_out if self.is_open \ - else self.status.trancheFilledIn[tranche_index] + else self.status.trancheStatus[tranche_index].filledOut def tranche_filled(self, tranche_index: int): return self.tranche_filled_in(tranche_index) if self.amount_is_input \ @@ -146,18 +148,16 @@ class Order: return self.tranche_amounts[tranche_index] - self.tranche_filled(tranche_index) def activation_time(self, tranche_index: int): - return self.status.trancheActivationTime[tranche_index] + return self.status.trancheStatus[tranche_index].activationTime @property def filled(self): return self.filled_in if self.amount_is_input else self.filled_out - @property def is_open(self): return self.state.is_open - def add_fill(self, tranche_index: int, filled_in: int, filled_out: int): order_log.debug(f'tranche fill {self.key}|{tranche_index} in:{filled_in} out:{filled_out}') try: @@ -192,8 +192,8 @@ class Order: status.filledIn = of.filled.filled_in status.filledOut = of.filled.filled_out for i, tf in enumerate(of.tranche_filled): - status.trancheFilledIn[i] += of.tranche_filled[i].filled_in - status.trancheFilledOut[i] += of.tranche_filled[i].filled_out + status.trancheStatus[i].filledIn = of.tranche_filled[i].filled_in + status.trancheStatus[i].filledOut = of.tranche_filled[i].filled_out Order.order_statuses[self.key] = status # set the status in order to save it Order.order_statuses.unload(self.key) # but then unload from memory after root promotion order_log.debug(f'order completed {status}') @@ -229,7 +229,7 @@ class Order: return None @staticmethod - def save_order_index(key: OrderKey, status: SwapOrderStatus): + def save_order_index(key: OrderKey, status: ElaboratedSwapOrderStatus): if status is DELETE: sess = db.session oi = sess.get(OrderIndex, (current_chain.get(), key.vault, key.order_index)) @@ -255,7 +255,7 @@ class Order: # this is the main order table. # it holds "everything" about an order in the canonical format specified by the contract orderlib, except that # the filled amount fields for active orders are maintained in the order_remainings and tranche_remainings series. - order_statuses: BlockDict[OrderKey, SwapOrderStatus] = BlockDict( + order_statuses: BlockDict[OrderKey, ElaboratedSwapOrderStatus] = BlockDict( 'o', db='lazy', redis=True, pub=pub_order_status, finalize_cb=save_order_index, str2key=OrderKey.str2key, value2str=lambda v: json.dumps(v.dump()), str2value=lambda s:ElaboratedSwapOrderStatus.load(json.loads(s)), diff --git a/src/dexorder/order/triggers.py b/src/dexorder/order/triggers.py index 4ba508c..589c9c5 100644 --- a/src/dexorder/order/triggers.py +++ b/src/dexorder/order/triggers.py @@ -1,229 +1,57 @@ import asyncio import logging +from abc import abstractmethod from collections import defaultdict from enum import Enum, auto -from typing import Callable, Optional, Union, Awaitable +from typing import Optional, Sequence -from dexorder.base.orderlib import SwapOrderState, PriceProof, DISTANT_FUTURE, DISTANT_PAST -from dexorder.blockstate import BlockSet, BlockDict -from dexorder.util import defaultdictk +import numpy as np +from sortedcontainers import SortedList + +from dexorder.base.orderlib import SwapOrderState, PriceProof, DISTANT_FUTURE, DISTANT_PAST, Line +from dexorder.blockstate import BlockDict from .orderstate import Order -from .. import dec, order_log, now, timestamp, from_timestamp +from .. import dec, order_log, timestamp, from_timestamp, config from ..base.chain import current_clock -from ..base.order import OrderKey, TrancheKey, ExecutionRequest +from ..base.order import OrderKey, TrancheKey +from ..contract import ERC20 +from ..database.model.pool import OldPoolDict from ..pools import ensure_pool_price, pool_prices, get_pool from ..routing import pool_address +from ..vault_blockdata import vault_balances, adjust_balance log = logging.getLogger(__name__) -# todo time and price triggers should be BlockSortedSets that support range queries for efficient lookup of triggers -TimeTrigger = Callable[[int], None] # func(timestamp) -time_triggers:BlockSet[TimeTrigger] = BlockSet('tt') -PriceTrigger = Callable[[dec], Union[Awaitable[None],None]] # [async] func(pool_price) -UnconstrainedPriceTrigger = Callable[[Optional[dec]], Union[Awaitable[None],None]] # [async] func(pool_price) -price_triggers:dict[str, BlockSet[PriceTrigger]] = defaultdictk(lambda addr:BlockSet(f'pt|{addr}')) # different BlockSet per pool address -new_price_triggers:dict[str, set[PriceTrigger]] = defaultdict(set) # when price triggers are first set, they must be tested against the current price even if it didnt change this block -unconstrained_price_triggers: BlockSet[UnconstrainedPriceTrigger] = BlockSet('upt') # tranches with no price constraints, whose time constraint is fulfilled -active_tranches: BlockDict[TrancheKey, Optional[PriceProof]] = BlockDict('at') # tranches which have passed all constraints and should be executed -execution_requests:BlockDict[TrancheKey, ExecutionRequest] = BlockDict('e') # generated by the active tranches -# todo should this really be blockdata? -inflight_execution_requests:BlockDict[TrancheKey, int] = BlockDict('ei') # value is block height when the request was sent +""" +Each tranche can have up to two time constraints: activation time and expiration time, and two price constraints: +min line and max line. Line constraints may either be barriers or not. + +Additionally, each order can be blocked based on available funds in the vault. + +In order to handle chain reorganizations without re-evaluating every trigger for every head, the boolean state of each +constraint is saved in BlockState as a bitarray. When a time or price is changed, only the triggers sensitive to that +input are updated, and then checked along with the cached values from unchanged constraints to determine if an +execution should be attempted on the tranche. +""" -async def activate_orders(): - log.debug('activating orders') - # this is a state init callback, called only once after the state has been loaded from the db or created fresh - keys = list(Order.open_orders) - orders = [Order.of(key) for key in keys] - for order in orders: - # setup triggers - await activate_order(order) # too many to really parallelize, and it's startup anyway - log.debug(f'activated {len(keys)} orders') - - -async def activate_order(order: Order): - """ - Call this to enable triggers on an order which is already in the state. - """ - address = pool_address(order.status.order) - pool = await get_pool(address) - await ensure_pool_price(pool) - triggers = OrderTriggers(order) - if triggers.closed: - log.debug(f'order {order.key} was immediately closed') - close_order_and_disable_triggers(order, SwapOrderState.Filled if order.remaining == 0 or order.remaining <= order.min_fill_amount else SwapOrderState.Expired) - - -def intersect_ranges( a_low, a_high, b_low, b_high): - low, high = max(a_low,b_low), min(a_high,b_high) - if high <= low: - low, high = None, None - return low, high - - -async def line_passes(lc: tuple[float,float], is_min: bool, price: dec) -> bool: - b, m = lc - if b == 0 and m == 0: - return True - limit = m * current_clock.get().timestamp + b - # log.debug(f'line passes {limit} {"<" if is_min else ">"} {price}') - # todo ratios - # prices AT the limit get zero volume, so we only trigger on >, not >= - return is_min and limit < price or not is_min and limit > price - - -class TrancheStatus (Enum): - Early = auto() # first time trigger hasnt happened yet - Pricing = auto() # we are inside the time window and checking prices - Filled = auto() # tranche has no more available amount - Expired = auto() # time deadline has past and this tranche cannot be filled - -class TrancheTrigger: - def __init__(self, order: Order, tranche_key: TrancheKey): - assert order.key.vault == tranche_key.vault and order.key.order_index == tranche_key.order_index - self.order = order - self.tk = tranche_key - self.status = TrancheStatus.Early - - tranche = order.order.tranches[self.tk.tranche_index] - tranche_amount = tranche.fraction_of(order.amount) - tranche_filled = order.tranche_filled(self.tk.tranche_index) - tranche_remaining = tranche_amount - tranche_filled - - # time and price constraints - self.time_constraint = [tranche.startTime, tranche.endTime] - if tranche.startTimeIsRelative: - self.time_constraint[0] += self.order.status.start - if tranche.endTimeIsRelative: - self.time_constraint[1] += self.order.status.start - if self.time_constraint[0] <= DISTANT_PAST and self.time_constraint[1] >= DISTANT_FUTURE: - self.time_constraint = None - self.min_line_constraint = (0.,0.) if tranche.marketOrder else (tranche.minIntercept, tranche.minSlope) - self.max_line_constraint = (0.,0.) if tranche.marketOrder else (tranche.maxIntercept, tranche.maxSlope) - self.has_line_constraint = any( a or b for a,b in (self.min_line_constraint, self.max_line_constraint)) - self.has_sloped_line_constraint = any(m!=0 for b,m in (self.min_line_constraint, self.max_line_constraint)) - self.slippage = tranche.minIntercept if tranche.marketOrder else 0 - self.pool_price_multiplier = None - - # compute status and set relevant triggers - if tranche_remaining == 0 or tranche_remaining < self.order.min_fill_amount: # min_fill_amount could be 0 (disabled) so we also check for the 0 case separately - self.status = TrancheStatus.Filled - return - timestamp = current_clock.get().timestamp - self.status = \ - TrancheStatus.Pricing if self.time_constraint is None else \ - TrancheStatus.Early if timestamp < self.time_constraint[0] else \ - TrancheStatus.Expired if timestamp > self.time_constraint[1] else \ - TrancheStatus.Pricing - self.enable_time_trigger() - if self.status == TrancheStatus.Pricing: - self.enable_price_trigger() - - def enable_time_trigger(self): - if self.time_constraint: - log.debug(f'enable_time_trigger') - time_triggers.add(self.time_trigger) - - def disable_time_trigger(self): - if self.time_constraint: - time_triggers.remove(self.time_trigger) - - def time_trigger(self, now): - # log.debug(f'time_trigger {now} {self.status} {self.time_constraint}') - if self.closed: - log.debug(f'price trigger ignored because trigger status is {self.status}') - return - if not self.check_expired(now) and self.status == TrancheStatus.Early and now >= self.time_constraint[0]: - order_log.debug(f'tranche time enabled {self.tk}') - self.status = TrancheStatus.Pricing - self.enable_price_trigger() - - def enable_price_trigger(self): - if self.has_line_constraint and not self.has_sloped_line_constraint: # sloped constraints must be triggered every tick, not just on pool price changes - price_triggers[self.order.pool_address].add(self.price_trigger) - new_price_triggers[self.order.pool_address].add(self.price_trigger) - else: - unconstrained_price_triggers.add(self.price_trigger) - - def disable_price_trigger(self): - if self.has_line_constraint and not self.has_sloped_line_constraint: - price_triggers[self.order.pool_address].remove(self.price_trigger) - else: - unconstrained_price_triggers.remove(self.price_trigger) - - async def price_trigger(self, cur: dec): - # must be idempotent. could be called twice when first activated: once for the initial price lookup then once again if that price was changed in the current block - if self.closed: - log.debug(f'price trigger ignored because trigger status is {self.status}') - return - activation_time = self.order.activation_time(self.tk.tranche_index) - if activation_time != 0 and timestamp() < activation_time: - log.debug(f'{self.tk} is rate limited until {from_timestamp(activation_time)}') - return # rate limited - # log.debug(f'price trigger {cur}') - addr = pool_address(self.order.order) - pool = await get_pool(addr) - if cur is None and self.has_line_constraint: - await ensure_pool_price(pool) - cur = pool_prices[addr] - if cur is not None: - if self.pool_price_multiplier is None: - self.pool_price_multiplier = dec(10) ** dec(-pool['decimals']) - # log.debug(f'adjusted cur price from {cur} => {cur*self.pool_price_multiplier}') - cur *= self.pool_price_multiplier - if cur is None or not self.has_line_constraint or all(await asyncio.gather( - line_passes(self.min_line_constraint, True, cur), - line_passes(self.max_line_constraint, False, cur))): - # setting active_tranches[] with a PriceProof causes an execute() invocation - active_tranches[self.tk] = PriceProof(0) # todo PriceProof - - def fill(self, _amount_in, _amount_out ): - remaining = self.order.tranche_remaining(self.tk.tranche_index) - filled = remaining == 0 or remaining < self.order.min_fill_amount - if filled: - order_log.debug(f'tranche filled {self.tk}') - self.status = TrancheStatus.Filled - self.disable() - else: - order_log.debug(f'tranche part-filled {self.tk} in:{_amount_in} out:{_amount_out} remaining:{remaining}') - return filled - - def check_expired(self, now): - expired = now >= self.time_constraint[1] - if expired: - self.expire() - return expired - - def expire(self): - order_log.debug(f'tranche expired {self.tk}') - self.status = TrancheStatus.Expired - self.disable() - - def disable(self): - try: - del active_tranches[self.tk] - except KeyError: - pass - self.disable_time_trigger() - self.disable_price_trigger() - - @property - def closed(self): - return self.status in (TrancheStatus.Filled, TrancheStatus.Expired) - - @property - def open(self): - return not self.closed +# tranches which have passed all constraints and should be executed +active_tranches: BlockDict[TrancheKey, Optional[PriceProof]] = BlockDict('at') class OrderTriggers: instances: dict[OrderKey, 'OrderTriggers'] = {} - def __init__(self, order: Order): + @staticmethod + async def create(order: Order): + triggers = await asyncio.gather(*[TrancheTrigger.create(order, tk) for tk in order.tranche_keys]) + return OrderTriggers(order, triggers) + + def __init__(self, order: Order, triggers: Sequence['TrancheTrigger']): assert order.key not in OrderTriggers.instances self.order = order - self.triggers = [TrancheTrigger(order, tk) for tk in self.order.tranche_keys] + self.triggers = triggers OrderTriggers.instances[order.key] = self log.debug(f'created OrderTriggers for {order.key}') @@ -256,6 +84,50 @@ class OrderTriggers: self.check_complete() +def start_trigger_updates(): + """ + Called near the beginning of block handling to initialize any per-block trigger data structures + """ + TimeTrigger.update_all(current_clock.get().timestamp) + PriceLineTrigger.clear_data() + + +# +# Client Interface +# + +async def update_balance_triggers(vault: str, token: str, balance: int): + updates = [bt.update(balance) for bt in BalanceTrigger.by_vault_token.get((vault, token), [])] + await asyncio.gather(*updates) + + +async def update_price_triggers(pool: OldPoolDict, price: dec): + price = price * dec(10) ** dec(-pool['decimals']) # adjust for pool decimals to get onchain price + price = float(price) # since we use SIMD operations to evaluate lines, we must convert to float + updates = [pt.update(price) for pt in PriceLineTrigger.by_pool.get(pool['address'], [])] + await asyncio.gather(*updates) + + +inflight_execution_requests: set[TrancheKey] = set() + +async def end_trigger_updates(): + """ + Call once after all updates have been handled. This updates the active_tranches array based on final trigger state. + """ + PriceLineTrigger.end_updates(current_clock.get().timestamp) + for tk in _dirty: + if _trigger_state.get(tk,0) == 0: + # all clear for execution. add to active list with any necessary proofs + active_tranches[tk] = PriceProof(0) + else: + # blocked by one or more triggers. delete from active list. + try: + del active_tranches[tk] + except KeyError: + pass + _dirty.clear() + + def close_order_and_disable_triggers(order: Order, final_state: SwapOrderState): order.complete(final_state) try: @@ -265,3 +137,387 @@ def close_order_and_disable_triggers(order: Order, final_state: SwapOrderState): else: triggers.disable() + +# NOTE: we store the INVERSE of each trigger's value! this causes the test for "All True" to be comparison with 0 +# instead of comparison with a set of 1's the correct size. By storing inverted values, the group does not +# need to know the number of child triggers, only that no falses have been reported. +_trigger_state: BlockDict[TrancheKey, int] = BlockDict('trig', str2key=TrancheKey.str2key, db=True) +_dirty:set[TrancheKey] = set() + + +class Trigger: + def __init__(self, position: int, tk: TrancheKey, value: bool): + """ + position is the bit position of the boolean result in the tranche's constraint bitfield. + """ + self.position = position + self.tk = tk + self.value = value + + @property + def value(self): + return _trigger_state.get(self.tk,0) & (1 << self.position) == 0 # NOTE: inverted + + @value.setter + def value(self, value): + if value != self.value: + _dirty.add(self.tk) + if not value: # this conditional is inverted + _trigger_state[self.tk] |= 1 << self.position # set + else: + _trigger_state[self.tk] &= ~(1 << self.position) # clear + + @abstractmethod + def remove(self): ... + + +async def has_funds(tk: TrancheKey): + # log.debug(f'has funds? {tk.vault}') + order = Order.of(tk) + balances = vault_balances.get(tk.vault, {}) + token_addr = order.status.order.tokenIn + token_balance = balances.get(token_addr) + if token_balance is None: + # unknown balance + token_balance = balances[token_addr] = await ERC20(token_addr).balanceOf(tk.vault) + log.debug(f'queried token balance {token_addr}.balanceOf({tk.vault}) = {token_balance}') + await adjust_balance(tk.vault, token_addr, token_balance) + return await input_amount_is_sufficient(order, token_balance) + + +async def input_amount_is_sufficient(order, token_balance): + if order.amount_is_input: + return token_balance >= order.status.order.minFillAmount + # amount is an output amount, so we need to know the price + price = pool_prices.get(order.pool_address) + if price is None: + return token_balance > 0 # we don't know the price so we allow any nonzero amount to be sufficient + pool = await get_pool(order.pool_address) + inverted = order.order.tokenIn != pool['base'] + minimum = dec(order.min_fill_amount)*price if inverted else dec(order.min_fill_amount)/price + log.debug(f'order minimum amount is {order.min_fill_amount} '+ ("input" if order.amount_is_input else f"output @ {price} = {minimum} ")+f'< {token_balance} balance') + return token_balance > minimum + + +class BalanceTrigger (Trigger): + by_vault_token: dict[tuple[str,str],set['BalanceTrigger']] = defaultdict(set) + + @staticmethod + async def create(tk: TrancheKey): + value = await has_funds(tk) + return BalanceTrigger(tk, value) + + def __init__(self, tk: TrancheKey, value: bool): + super().__init__(0, tk, value) + self.order = Order.of(self.tk) + self.vault_token = self.tk.vault, self.order.status.order.tokenIn + log.debug(f'adding balanc trigger {id(self)}') + BalanceTrigger.by_vault_token[self.vault_token].add(self) + + async def update(self, balance): + self.value = await input_amount_is_sufficient(self.order, balance) + + def remove(self): + log.debug(f'removing balanc trigger {id(self)}') + try: + BalanceTrigger.by_vault_token[self.vault_token].remove(self) + except KeyError: + pass + + +class TimeTrigger (Trigger): + + all = SortedList(key=lambda t: (t.time, 0 if t.is_start else 1)) # start before end even if the same time + + @staticmethod + def create(is_start: bool, tk: TrancheKey, time: int, time_now: int = None): + if is_start and time == DISTANT_PAST or not is_start and time == DISTANT_FUTURE: + return None + if time_now is None: + time_now = current_clock.get().timestamp + return TimeTrigger(is_start, tk, time, time_now) + + def __init__(self, is_start: bool, tk: TrancheKey, time: int, time_now: int): + triggered = time_now >= time + super().__init__(1 if is_start else 2, tk, triggered is is_start) + self.is_start = is_start + self._time = time + self.active = not triggered + if self.active: + TimeTrigger.all.add(self) + + @property + def time(self): + return self._time + + @time.setter + def time(self, time: int): + self.set_time(time, current_clock.get().timestamp) + + def set_time(self, time: int, time_now: int): + self._time = time + self.active = (time_now > time) is self.is_start + TimeTrigger.all.remove(self) + TimeTrigger.all.add(self) + + def update(self): + # called when our self.time has been reached + self.value = self.is_start + self.active = False + # we are popped off the stack by update_all() + + def remove(self): + if self.active: + TimeTrigger.all.remove(self) + self.active = False + + @staticmethod + def update_all(time): + while TimeTrigger.all and TimeTrigger.all[0].time <= time: + # todo this doesnt work across reorgs. we need to keep a BlockState cursor of the last time handled, + # then activate any time triggers from that past time through the present. time triggers may only + # be popped off the stack after their times are older than the latest finalized block + # todo what if an order is placed on a reorg'd branch but never hits main branch? we have triggers going + # for a nonexistent order! + t = TimeTrigger.all.pop(0) + t.update() + + +class PriceLineTrigger (Trigger): + by_pool: dict[str,set['PriceLineTrigger']] = defaultdict(set) + + @staticmethod + async def create(tk: TrancheKey, line: Line, is_min: bool, is_barrier: bool): + if line.intercept == 0 and line.slope == 0: + return None # no constraint (deactivated) + pool = await get_pool(Order.of(tk).pool_address) + await ensure_pool_price(pool) + price_now = pool_prices[pool['address']] + return PriceLineTrigger(tk, line, is_min, is_barrier, price_now) + + def __init__(self, tk: TrancheKey, line: Line, is_min: bool, is_barrier: bool, price_now: dec): + if is_barrier: + log.warning('Barriers not supported') + price_above = price_now > line.intercept + line.slope * current_clock.get().timestamp + super().__init__(3 if is_min else 4, tk, is_min is price_above) + self.line = line + self.is_min = is_min + self.is_barrier = is_barrier + self.pool_address = Order.of(tk).pool_address + self.index: Optional[int] = None + PriceLineTrigger.by_pool[self.pool_address].add(self) + + # lines that need evaluating add their data to these arrays, which are then sent to SIMD for evaluation. each + # array must always have the same size as the others. + y = [] + m = [] + b = [] + triggers = [] # 1-for-1 with line_data + triggers_set = set() + + @staticmethod + def clear_data(): + PriceLineTrigger.y.clear() + PriceLineTrigger.m.clear() + PriceLineTrigger.b.clear() + PriceLineTrigger.triggers.clear() + PriceLineTrigger.triggers_set.clear() + + def update(self, price: float): + if self not in PriceLineTrigger.triggers_set: + self.index = len(PriceLineTrigger.y) + PriceLineTrigger.y.append(price) + PriceLineTrigger.m.append(self.line.slope) + PriceLineTrigger.b.append(self.line.intercept) + PriceLineTrigger.triggers.append(self) + PriceLineTrigger.triggers_set.add(self) + else: + # update an existing equation's price + PriceLineTrigger.y[self.index] = price + + @staticmethod + def end_updates(time: int): + # here we use numpy to compute all dirty lines using SIMD + y, m, b = map(np.array, (PriceLineTrigger.y, PriceLineTrigger.m, PriceLineTrigger.b)) + line_value = m * time + b + price_diff = y - line_value + for t, pd in zip(PriceLineTrigger.triggers, price_diff): + t.handle_result(pd) + + def handle_result(self, price_diff: float): + value = self.is_min and price_diff > 0 or not self.is_min and price_diff < 0 + if not self.is_barrier or value: # barriers that are False do not update their values to False + self.value = value + + def remove(self): + PriceLineTrigger.by_pool[self.pool_address].remove(self) + + +async def activate_orders(): + log.debug('activating orders') + # this is a state init callback, called only once after the state has been loaded from the db or created fresh + keys = list(Order.open_orders) + orders = [Order.of(key) for key in keys] + for order in orders: + # setup triggers + await activate_order(order) # too many to really parallelize, and it's startup anyway + log.debug(f'activated {len(keys)} orders') + + +async def activate_order(order: Order): + """ + Call this to enable triggers on an order which is already in the state. + """ + address = pool_address(order.status.order) + pool = await get_pool(address) + await ensure_pool_price(pool) + triggers = await OrderTriggers.create(order) + if triggers.closed: + log.debug(f'order {order.key} was immediately closed') + close_order_and_disable_triggers(order, SwapOrderState.Filled if order.remaining == 0 or order.remaining <= order.min_fill_amount else SwapOrderState.Expired) + + +class TrancheState (Enum): + Early = auto() # first time trigger hasnt happened yet + Active = auto() # we are inside the time window and checking prices + Filled = auto() # tranche has no more available amount + Expired = auto() # time deadline has past and this tranche cannot be filled + Error = auto() # the tranche was slashed and killed due to reverts during execute() + + +class TrancheTrigger: + + @staticmethod + async def create(order: Order, tk: TrancheKey) -> 'TrancheTrigger': + time = current_clock.get().timestamp + tranche = order.order.tranches[tk.tranche_index] + ts = order.status.trancheStatus[tk.tranche_index] + balance_trigger = await BalanceTrigger.create(tk) + activation_trigger = TimeTrigger.create(True, tk, ts.activationTime, time) + expiration_trigger = TimeTrigger.create(False, tk, ts.endTime, time) + if tranche.marketOrder: + min_trigger = max_trigger = None + else: + min_trigger, max_trigger = await asyncio.gather( + PriceLineTrigger.create(tk, tranche.minLine, True, tranche.minIsBarrier), + PriceLineTrigger.create(tk, tranche.maxLine, True, tranche.maxIsBarrier)) + return TrancheTrigger(order, tk, balance_trigger, activation_trigger, expiration_trigger, min_trigger, max_trigger) + + def __init__(self, order: Order, tk: TrancheKey, + balance_trigger: BalanceTrigger, + activation_trigger: Optional[TimeTrigger], + expiration_trigger: Optional[TimeTrigger], + min_trigger: Optional[PriceLineTrigger], + max_trigger: Optional[PriceLineTrigger], + ): + assert order.key.vault == tk.vault and order.key.order_index == tk.order_index + tranche = order.order.tranches[tk.tranche_index] + + self.order = order + self.tk = tk + + self.balance_trigger = balance_trigger + self.activation_trigger = activation_trigger + self.expiration_trigger = expiration_trigger + self.min_trigger = min_trigger + self.max_trigger = max_trigger + + self.slippage = tranche.minLine.intercept if tranche.marketOrder else 0 + self.slash_count = 0 + + tranche_remaining = tranche.fraction_of(order.amount) - order.tranche_filled(self.tk.tranche_index) + self.status = \ + TrancheState.Filled if tranche_remaining == 0 or tranche_remaining < self.order.min_fill_amount else \ + TrancheState.Expired if self.expiration_trigger is not None and not self.expiration_trigger else \ + TrancheState.Early if self.activation_trigger is None and not self.activation_trigger else \ + TrancheState.Active + _dirty.add(tk) + log.debug(f'Tranche {tk} initial status {self.status} {self}') + + + def fill(self, _amount_in, _amount_out ): + remaining = self.order.tranche_remaining(self.tk.tranche_index) + filled = remaining == 0 or remaining < self.order.min_fill_amount + if filled: + order_log.debug(f'tranche filled {self.tk}') + self.status = TrancheState.Filled + self.disable() + else: + order_log.debug(f'tranche part-filled {self.tk} in:{_amount_in} out:{_amount_out} remaining:{remaining}') + self.slash_count = 0 # reset slash count + return filled + + def expire(self): + order_log.debug(f'tranche expired {self.tk}') + self.status = TrancheState.Expired + self.disable() + + def kill(self): + order_log.warning(f'tranche KILLED {self.tk}') + self.status = TrancheState.Error + self.disable() + + def slash(self): + # slash() is called when an execute() transaction on this tranche reverts without a recognized reason. + self.slash_count += 1 + log.debug(f'slashed tranche x{self.slash_count} {self.tk}') + if self.slash_count >= config.slash_kill_count: + self.kill() + else: + delay = round(config.slash_delay_base * config.slash_delay_mul ** (self.slash_count-1)) + self.deactivate(timestamp()+delay) + + def deactivate(self, until): + # Temporarily deactivate the tranche due to a rate limit. Use disable() to permanently halt the trigger. + log.debug(f'deactivating tranche {self.tk} until {from_timestamp(until)}') + if self.activation_trigger is None: + self.activation_trigger = TimeTrigger.create(True, self.tk, until) + else: + self.activation_trigger.time = until + + def disable(self): + # permanently stop this trigger and deconstruct + self.balance_trigger.remove() + if self.activation_trigger is not None: + self.activation_trigger.remove() + if self.expiration_trigger is not None: + self.expiration_trigger.remove() + if self.min_trigger is not None: + self.min_trigger.remove() + if self.max_trigger is not None: + self.max_trigger.remove() + try: + del _trigger_state[self.tk] + except KeyError: + pass + try: + _dirty.remove(self.tk) + except KeyError: + pass + try: + del active_tranches[self.tk] + except KeyError: + pass + + @property + def closed(self): + return self.status in (TrancheState.Filled, TrancheState.Expired, TrancheState.Error) + + @property + def open(self): + return not self.closed + + def __str__(self): + trigs = [] + if self.balance_trigger is not None: + trigs.append(f'balance {self.balance_trigger.value}') + if self.activation_trigger is not None: + trigs.append(f'activation {self.activation_trigger.value}') + if self.expiration_trigger is not None: + trigs.append(f'expiration {self.expiration_trigger.value}') + if self.min_trigger is not None: + trigs.append(f'min line {self.min_trigger.value}') + if self.max_trigger is not None: + trigs.append(f'max line {self.max_trigger.value}') + return f'TrancheTrigger[{",".join(str(t) for t in trigs)}]' + diff --git a/src/dexorder/pools.py b/src/dexorder/pools.py index f2c9c21..4cb210a 100644 --- a/src/dexorder/pools.py +++ b/src/dexorder/pools.py @@ -3,7 +3,6 @@ import logging from datetime import datetime from typing import Optional -from sqlalchemy.exc import NoResultFound from web3.exceptions import ContractLogicError from web3.types import EventData @@ -11,12 +10,11 @@ from dexorder import dec, ADDRESS_0, from_timestamp, db from dexorder.addrmeta import address_metadata from dexorder.base.chain import current_chain from dexorder.base.orderlib import Exchange +from dexorder.blocks import get_block_timestamp from dexorder.blockstate import BlockDict from dexorder.blockstate.blockdata import K, V -from dexorder.blocks import get_block_timestamp from dexorder.database.model import Pool from dexorder.database.model.pool import OldPoolDict -from dexorder.metadata import is_generating_metadata from dexorder.tokens import get_token from dexorder.uniswap import UniswapV3Pool, uniswapV3_pool_address diff --git a/src/dexorder/progressor.py b/src/dexorder/progressor.py index 8170366..86e1fd9 100644 --- a/src/dexorder/progressor.py +++ b/src/dexorder/progressor.py @@ -26,7 +26,16 @@ class BlockProgressor(metaclass=ABCMeta): # items are (callback, event, log_filter). The callback is invoked with web3 EventData for every detected event self.events:list[tuple[Callable[[EventData],Maywaitable[None]],ContractEvents,dict]] = [] # these callbacks are invoked after every block and also every second if there wasnt a block - self.postprocess_cbs:list[Callable[[],Maywaitable[None]]] = [] + self.callbacks:list[tuple[Callable[[],Maywaitable[None]],bool]] = [] + self.combined = [] # a mix of both event handlers and callbacks + + def add_callback(self, callback: Callable[[], Maywaitable[None]], trigger_on_timer=True): + """ + If trigger_on_timer is True, then the callback is also invoked on a regular timer if there is a lull in blocks. + """ + item = (callback, trigger_on_timer) + self.callbacks.append(item) + self.combined.append(item) def add_event_trigger(self, # callback takes either a single event if multi=False, or if multi=True then a list of all events in the processing range @@ -50,7 +59,9 @@ class BlockProgressor(metaclass=ABCMeta): for e in events: await maywait(func(e)) cb = callback if event is None or multi else functools.partial(_map, callback) - self.events.append((cb, event, log_filter)) + item = (cb, event, log_filter) + self.events.append(item) + self.combined.append(item) @abstractmethod def run(self): @@ -61,24 +72,29 @@ class BlockProgressor(metaclass=ABCMeta): if w3 is None: w3 = current_w3.get() batches = [] - for callback, event, log_filter in self.events: - if log_filter is None: - batches.append((None, callback, event, None)) + for entry in self.combined: + if len(entry) == 2: + # plain callback + callback, on_timer = entry + batches.append((None, callback, None, None)) else: - lf = dict(log_filter) - lf['fromBlock'] = from_height - lf['toBlock'] = to_height - get_logs = w3.eth.get_logs(lf) - if not config.parallel_logevent_queries: - get_logs = await get_logs - batches.append((get_logs, callback, event, lf)) - for callback in self.postprocess_cbs: - batches.append((None, callback, None, None)) + # event callback + callback, event, log_filter = entry + if log_filter is None: + batches.append((None, callback, event, None)) + else: + lf = dict(log_filter) + lf['fromBlock'] = from_height + lf['toBlock'] = to_height + get_logs = w3.eth.get_logs(lf) + if not config.parallel_logevent_queries: + get_logs = await get_logs + batches.append((get_logs, callback, event, lf)) return batches @staticmethod - async def invoke_callbacks(batches, chain=None): + async def invoke_callback_batches(batches, chain=None): if chain is None: chain = current_chain.get() # logevent callbacks diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index 87d46c9..d55d754 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -18,7 +18,7 @@ from dexorder.blockstate.branch import Branch from dexorder.blockstate.diff import DiffEntryItem from dexorder.blockstate.fork import current_fork, Fork from dexorder.progressor import BlockProgressor -from dexorder.transaction import create_and_send_transactions +from dexorder.transactions import create_and_send_transactions from dexorder.util import hexstr, hexbytes from dexorder.util.async_util import maywait, Maywaitable from dexorder.util.shutdown import fatal @@ -270,31 +270,34 @@ class BlockStateRunner(BlockProgressor): block = await get_block(blockhash) current_block.set(block) bloom = BloomFilter(int.from_bytes(hexbytes(block.data['logsBloom']))) - for callback, event, log_filter in self.events: - if log_filter is None: - batches.append((None, callback, event, None)) + for item in self.combined: + if len(item) == 2: + callback, on_timer = item + batches.append((None, callback, None, None)) else: - lf = dict(log_filter) - lf['blockHash'] = hexstr(block.hash) - has_logs = any(bytes(hexbytes(topic)) in bloom for topic in lf['topics']) - # log.debug(f'has {event.__class__.__name__}? {has_logs}') - if not has_logs: - get_logs = None + callback, event, log_filter = item + if log_filter is None: + batches.append((None, callback, event, None)) else: - # log.debug(f'has {event.__class__.__name__}') - get_logs = w3.eth.get_logs(lf) - if not config.parallel_logevent_queries: - get_logs = await get_logs - batches.append((get_logs, callback, event, log_filter)) - for callback in self.postprocess_cbs: - batches.append((None, callback, None, None)) + lf = dict(log_filter) + lf['blockHash'] = hexstr(block.hash) + has_logs = any(bytes(hexbytes(topic)) in bloom for topic in lf['topics']) + # log.debug(f'has {event.__class__.__name__}? {has_logs}') + if not has_logs: + get_logs = None + else: + # log.debug(f'has {event.__class__.__name__}') + get_logs = w3.eth.get_logs(lf) + if not config.parallel_logevent_queries: + get_logs = await get_logs + batches.append((get_logs, callback, event, log_filter)) # set up for callbacks current_pub.set(lambda room, evnt, *args: pubs.append((room, evnt, args))) # used by handle_vault_created if not self.state_initialized: await self.do_state_init_cbs() # log.debug(f'invoking callbacks with fork {current_fork.get()}') - await self.invoke_callbacks(batches) + await self.invoke_callback_batches(batches) # todo # IMPORTANT! check for a reorg and generate a reorg diff list. the diff list we need is the union of the set of keys touched by either @@ -358,9 +361,10 @@ class BlockStateRunner(BlockProgressor): session = db.session session.begin() try: - for callback in self.postprocess_cbs: - # noinspection PyCallingNonCallable - await maywait(callback()) + for callback, on_timer in self.callbacks: + if on_timer: + # noinspection PyCallingNonCallable + await maywait(callback()) except BaseException: session.rollback() raise diff --git a/src/dexorder/tokens.py b/src/dexorder/tokens.py index a9e94a8..17c0da6 100644 --- a/src/dexorder/tokens.py +++ b/src/dexorder/tokens.py @@ -2,7 +2,7 @@ import logging from typing import Optional from eth_abi.exceptions import InsufficientDataBytes -from web3.exceptions import ContractLogicError, BadFunctionCallOutput +from web3.exceptions import BadFunctionCallOutput from dexorder import ADDRESS_0, config, db from dexorder.addrmeta import address_metadata @@ -11,7 +11,6 @@ from dexorder.contract import ERC20, ContractProxy, CONTRACT_ERRORS from dexorder.database.model import Token from dexorder.database.model.token import OldTokenDict from dexorder.metadata import get_metadata -from dexorder.util import hexstr log = logging.getLogger(__name__) diff --git a/src/dexorder/transaction.py b/src/dexorder/transactions.py similarity index 54% rename from src/dexorder/transaction.py rename to src/dexorder/transactions.py index 1c2f006..9de9570 100644 --- a/src/dexorder/transaction.py +++ b/src/dexorder/transactions.py @@ -1,21 +1,22 @@ import logging from abc import abstractmethod -from typing import Optional +from dataclasses import dataclass +from typing import Union, Optional from uuid import uuid4 from sqlalchemy import select -from web3.exceptions import TransactionNotFound +from web3.exceptions import TransactionNotFound, ContractPanicError, ContractLogicError from dexorder import db, current_w3, Account -from dexorder.base import TransactionReceiptDict +from dexorder.base import TransactionReceiptDict, TransactionRequest, transaction_request_registry from dexorder.base.chain import current_chain -from dexorder.base.order import TransactionRequest +from dexorder.base.order import TrancheKey, OrderKey +from dexorder.base.orderlib import PriceProof from dexorder.blockstate import BlockDict from dexorder.blockstate.diff import DiffEntryItem from dexorder.blockstate.fork import current_fork, Fork from dexorder.contract.contract_proxy import ContractTransaction from dexorder.database.model.transaction import TransactionJob, TransactionJobState -from dexorder.util.shutdown import fatal log = logging.getLogger(__name__) @@ -38,7 +39,47 @@ class TransactionHandler: async def complete_transaction(self, job: TransactionJob) -> None: ... +@dataclass +class TrancheExecutionRequest (TransactionRequest): + TYPE = 'te' + + # type='te' for tranche execution + vault: str + order_index: int + tranche_index: int + price_proof: Union[None,dict,tuple[int]] + + def __init__(self, vault: str, order_index: int, tranche_index: int, price_proof: Union[None,dict,tuple[int]], **_): + super().__init__(TrancheExecutionRequest.TYPE) + self.vault = vault + self.order_index = order_index + self.tranche_index = tranche_index + self.price_proof = price_proof + + @property + def order_key(self): + return OrderKey(self.vault, self.order_index) + + @property + def tranche_key(self): + return TrancheKey(self.vault, self.order_index, self.tranche_index) + +# Must register the class for deserialization +transaction_request_registry[TrancheExecutionRequest.TYPE] = TrancheExecutionRequest + + +def new_tranche_execution_request(tk: TrancheKey, proof: Optional[PriceProof]=None) -> TrancheExecutionRequest: + if proof is None: + proof = PriceProof(0) + return TrancheExecutionRequest(tk.vault, tk.order_index, tk.tranche_index, proof.dump()) + + def submit_transaction_request(tr: TransactionRequest): + """ + Once a transaction request has been submitted, it is this module's responsibility to see that it gets mined, at + which point `tr.complete_transaction()` is called with the transaction receipt. + The building of a transaction can also fail, + """ job = TransactionJob(id=uuid4(), chain=current_chain.get(), height=current_fork.get().height, state=TransactionJobState.Requested, request=tr) db.session.add(job) @@ -58,37 +99,46 @@ async def create_and_send_transactions(): # todo remove bad request? log.warning('ignoring transaction request with bad type ' f'"{job.request.type}": ' + ",".join(TransactionHandler.instances.keys())) - else: + return + try: ctx: ContractTransaction = await handler.build_transaction(job.id, job.request) - if ctx is None: - log.warning(f'unable to send transaction for job {job.id}') - return - w3 = current_w3.get() - account = Account.get_named(handler.tag) - if account is None: - account = Account.get() - if account is None: - log.error(f'No account available for transaction request type "{handler.tag}"') - continue - await ctx.sign(account) - job.state = TransactionJobState.Signed - job.tx_id = ctx.id_bytes - job.tx_data = ctx.data + except (ContractPanicError, ContractLogicError): + # these errors can be thrown immediately when the tx is tested for gas + log.warning(f'failed to build transaction request for {job.request.__class__.__name__} {job.id}') + job.state = TransactionJobState.Error + db.session.add(job) + await handler.complete_transaction(job) + return + except Exception as x: + log.warning(f'unable to send transaction for job {job.id}', exc_info=x) + return + w3 = current_w3.get() + account = Account.get_named(handler.tag) + if account is None: + account = Account.get() + if account is None: + log.error(f'No account available for transaction request type "{handler.tag}"') + continue + await ctx.sign(account) + job.state = TransactionJobState.Signed + job.tx_id = ctx.id_bytes + job.tx_data = ctx.data + db.session.add(job) + log.info(f'servicing transaction request {job.request.__class__.__name__} {job.id} with tx {ctx.id}') + try: + sent = await w3.eth.send_raw_transaction(job.tx_data) + except: + log.exception(f'Failure sending transaction for job {job.id}') + # todo pager + # todo send state unknown! + else: + assert sent == job.tx_id + job.state = TransactionJobState.Sent db.session.add(job) - log.info(f'servicing transaction request {job.request.__class__.__name__} {job.id} with tx {ctx.id}') - try: - sent = await w3.eth.send_raw_transaction(job.tx_data) - except: - log.exception(f'Failure sending transaction for job {job.id}') - # todo pager - # todo send state unknown! - else: - assert sent == job.tx_id - job.state = TransactionJobState.Sent - db.session.add(job) async def handle_transaction_receipts(): + log.debug('handle_transaction_receipts') w3 = current_w3.get() for job in db.session.query(TransactionJob).filter( TransactionJob.chain == current_chain.get(), @@ -116,6 +166,7 @@ async def handle_transaction_receipts(): def finalize_transactions(_fork: Fork, diffs: list[DiffEntryItem]): + # noinspection PyTypeChecker open_jobs = db.session.scalars(select(TransactionJob).where( TransactionJob.chain == current_chain.get(), TransactionJob.state == TransactionJobState.Sent diff --git a/src/dexorder/walker.py b/src/dexorder/walker.py index 967d0f0..ecf6107 100644 --- a/src/dexorder/walker.py +++ b/src/dexorder/walker.py @@ -128,5 +128,5 @@ class BlockWalker (BlockProgressor): fork = Fork([branch]) current_fork.set(fork) batches = await self.get_backfill_batches(from_height, to_height, w3=w3) - await self.invoke_callbacks(batches, chain) + await self.invoke_callback_batches(batches, chain) log.info(f'completed through block {to_height}')