complete trigger rework; update SwapOrderStatus with Andrew's changes; not fully debugged

This commit is contained in:
tim
2024-08-25 19:21:05 -04:00
parent f1492d9326
commit 750b4bcd65
31 changed files with 1051 additions and 760 deletions

View File

@@ -72,7 +72,7 @@ def upgrade() -> None:
sa.Column('id', sa.UUID(), nullable=False),
sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False),
sa.Column('height', sa.Integer(), nullable=False),
sa.Column('state', sa.Enum('Requested', 'Signed', 'Sent', 'Mined', name='transactionjobstate'), nullable=False),
sa.Column('state', sa.Enum('Requested', 'Signed', 'Sent', 'Mined', 'Error', name='transactionjobstate'), nullable=False),
sa.Column('request', dexorder.database.column_types.DataclassDictBase(astext_type=sa.Text()), nullable=False),
sa.Column('tx_id', postgresql.BYTEA(), nullable=True),
sa.Column('tx_data', postgresql.BYTEA(), nullable=True),
@@ -83,10 +83,24 @@ def upgrade() -> None:
op.create_index(op.f('ix_transactionjob_height'), 'transactionjob', ['height'], unique=False)
op.create_index(op.f('ix_transactionjob_state'), 'transactionjob', ['state'], unique=False)
op.create_index(op.f('ix_transactionjob_tx_id'), 'transactionjob', ['tx_id'], unique=False)
# ### end Alembic commands ###
op.create_table('dbblock',
sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False),
sa.Column('hash', postgresql.BYTEA(), nullable=False),
sa.Column('height', sa.Integer(), nullable=False),
sa.Column('timestamp', sa.INTEGER(), nullable=False),
sa.Column('confirmed', sa.Boolean(), nullable=False),
sa.Column('data', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.PrimaryKeyConstraint('chain', 'hash')
)
op.create_index(op.f('ix_dbblock_height'), 'dbblock', ['height'], unique=False)
op.create_index(op.f('ix_dbblock_timestamp'), 'dbblock', ['timestamp'], unique=False)
def downgrade() -> None:
op.drop_index(op.f('ix_dbblock_timestamp'), table_name='dbblock')
op.drop_index(op.f('ix_dbblock_height'), table_name='dbblock')
op.drop_table('dbblock')
op.drop_index(op.f('ix_transactionjob_tx_id'), table_name='transactionjob')
op.drop_index(op.f('ix_transactionjob_state'), table_name='transactionjob')
op.drop_index(op.f('ix_transactionjob_height'), table_name='transactionjob')

View File

@@ -1,44 +0,0 @@
"""BlockIndex
Revision ID: ee22683693a5
Revises: 516b55c83144
Create Date: 2024-07-19 18:52:04.933167
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
import dexorder.database
import dexorder.database.column_types
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = 'ee22683693a5'
down_revision: Union[str, None] = '516b55c83144'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('dbblock',
sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False),
sa.Column('hash', postgresql.BYTEA(), nullable=False),
sa.Column('height', sa.Integer(), nullable=False),
sa.Column('timestamp', sa.INTEGER(), nullable=False),
sa.Column('confirmed', sa.Boolean(), nullable=False),
sa.Column('data', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.PrimaryKeyConstraint('chain', 'hash')
)
op.create_index(op.f('ix_dbblock_height'), 'dbblock', ['height'], unique=False)
op.create_index(op.f('ix_dbblock_timestamp'), 'dbblock', ['timestamp'], unique=False)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_dbblock_timestamp'), table_name='dbblock')
op.drop_index(op.f('ix_dbblock_height'), table_name='dbblock')
op.drop_table('dbblock')
# ### end Alembic commands ###

View File

@@ -16,3 +16,4 @@ eth-bloom
python-dateutil
eth_abi
pdpyras # pagerduty
numpy

View File

@@ -21,16 +21,13 @@ def warningAlert(title, message, dedup_key=NARG, log_level=logging.WARNING):
return alert(title, message, dedup_key, log_level)
async def spawn_alert(title, message, dedup_key):
alert_pagerduty(title,message,dedup_key)
pagerduty_session = None
hostname = None
def alert_pagerduty(title, message, dedup_key, log_level):
if not config.pagerduty:
return
# noinspection PyBroadException
try:
global pagerduty_session
global hostname

View File

@@ -1,8 +1,22 @@
from typing import TypedDict, Union
from dataclasses import dataclass
from typing import TypedDict, Union, Type
Address = str
Quantity = Union[str,int]
@dataclass
class TransactionRequest:
"""
All members of TransactionRequest and its subclasses must be JSON-serializable. They get stored in the database
TransactionJob in a JSONB field, as handled by the DataclassDict column type.
"""
type: str
# subclasses of TransactionRequest must register their type code here so the appropriate dataclass may be constructed
transaction_request_registry: dict[str, Type[TransactionRequest]] = {}
TransactionDict = TypedDict( 'TransactionDict', {
'from': Address,
'to': Address,

View File

@@ -1,6 +1,4 @@
import math
from abc import ABC, abstractmethod
# noinspection PyPackageRequirements
from contextvars import ContextVar
import dexorder

View File

@@ -1,6 +1,5 @@
import logging
from dataclasses import dataclass
from typing import Optional, Type, Union
log = logging.getLogger(__name__)
@@ -29,37 +28,3 @@ class TrancheKey (OrderKey):
def __str__(self):
return f'{self.vault}|{self.order_index}|{self.tranche_index}'
@dataclass
class ExecutionRequest:
height: int
proof: None
@dataclass
class TransactionRequest:
type: str # 'te' for tranche execution
@dataclass
class TrancheExecutionRequest (TransactionRequest):
# type: str # 'te' for tranche execution
vault: str
order_index: int
tranche_index: int
price_proof: Union[None,dict,tuple[int]]
def new_tranche_execution_request(tk: TrancheKey, _proof: Optional[dict]) -> TrancheExecutionRequest:
return TrancheExecutionRequest('te', tk.vault, tk.order_index, tk.tranche_index, (0,)) # todo proof
def deserialize_transaction_request(**d):
t = d['type']
Class = transaction_request_registry.get(t)
if Class is None:
raise ValueError(f'No TransactionRequest for type "{t}"')
# noinspection PyArgumentList
return Class(**d)
transaction_request_registry: dict[str, Type[TransactionRequest]] = dict(
te = TrancheExecutionRequest,
)

View File

@@ -4,14 +4,21 @@ from dataclasses import dataclass
from enum import Enum
from typing import Optional
from dexorder.util.convert import decode_IEEE754, encode_IEEE754
from dexorder.util import hexbytes
from dexorder.util.convert import decode_IEEE754
log = logging.getLogger(__name__)
"""
These dataclasses are meant to closely mirror the raw data on-chain, using native Python types but serializing to
something JSON-able.
"""
class SwapOrderState (Enum):
# This includes on-chain codes as well as additional codes
Unknown = -1
Signing = 0 # only used by the web but here for completeness todo rework OrderLib.sol to remove offchain statuses
Signing = 0 # only used by the web but here for completeness
Underfunded = 1
Open = 2
Canceled = 3
@@ -45,6 +52,26 @@ class Route:
def dump(self):
return self.exchange.value, self.fee
@dataclass
class Line:
intercept: float
slope: float
def value(self, timestamp):
return self.intercept + self.slope * timestamp
@staticmethod
def load_from_chain(obj: tuple[int,int]):
return Line(decode_IEEE754(obj[0]), decode_IEEE754(obj[1]))
@staticmethod
def load(obj: tuple[float,float]):
return Line(*obj)
def dump(self):
return self.intercept, self.slope
@dataclass
class SwapOrder:
tokenIn: str
@@ -57,6 +84,10 @@ class SwapOrder:
conditionalOrder: int
tranches: list['Tranche']
@property
def min_input_amount(self):
return self.minFillAmount if self.amountIsInput else 0
@staticmethod
def load(obj):
return SwapOrder(obj[0], obj[1], Route.load(obj[2]), int(obj[3]), int(obj[4]), obj[5], obj[6], obj[7],
@@ -72,7 +103,7 @@ SwapOrder
in: {self.tokenIn}
out: {self.tokenOut}
exchange: {self.route.exchange, self.route.fee}
amount: {"input" if self.amountIsInput else "output"} {self.amount} {"to owner" if self.outputDirectlyToOwner else ""}
amount: {"input" if self.amountIsInput else "output"} {self.amount}{" to owner" if self.outputDirectlyToOwner else ""}
minFill: {self.minFillAmount}
tranches:
'''
@@ -80,118 +111,110 @@ SwapOrder
msg += f' {tranche}\n'
return msg
@dataclass
class SwapStatus:
# this is an elaborated version of the on-chain status
class ElaboratedTrancheStatus:
filledIn: int
filledOut: int
activationTime: int
startTime: int
endTime: int
@staticmethod
def load_from_chain(obj: tuple[int,int,int,int]):
filled, activationTime, startTime, endTime = obj
return ElaboratedTrancheStatus(
# we do NOT grab the filled amount from the chain, because our process will handle the fill events
# separately by incrementing these status values as fills arrive.
0, 0,
activationTime, startTime, endTime,
)
def dump(self):
# filled fields can be larger than JSON-able ints, so we use strings.
return str(self.filledIn), str(self.filledOut), self.activationTime, self.startTime, self.endTime
@staticmethod
def load(obj: tuple[str,str,int,int,int]):
filledIn, filledOut, activationTime, startTime, endTime = obj
return ElaboratedTrancheStatus(int(filledIn), int(filledOut), activationTime, startTime, endTime)
@dataclass
class ElaboratedSwapOrderStatus:
tx_id: bytes
order: SwapOrder
fillFeeHalfBps: int
state: SwapOrderState
startTime: int
startPrice: int
ocoGroup: Optional[int]
filledIn: Optional[int] # if None then look in the order_filled blockstate
filledOut: Optional[int] # if None then look in the order_filled blockstate
trancheFilledIn: Optional[list[int]] # if None then look in the tranche_filled blockstate
trancheFilledOut: Optional[list[int]] # if None then look in the tranche_filled blockstate
trancheActivationTime: list[int]
@dataclass
class SwapOrderStatus(SwapStatus):
order: SwapOrder
def __init__(self, order: SwapOrder, *swapstatus_args):
""" init with order object first followed by the swap status args"""
super().__init__(*swapstatus_args)
self.order = order
filledIn: int
filledOut: int
trancheStatus: list[ElaboratedTrancheStatus]
@staticmethod
def load(obj, *, Class=None):
if Class is None:
Class = SwapOrderStatus
order = SwapOrder.load(obj[0])
fillFeeHalfBps = int(obj[1])
state = SwapOrderState(obj[2])
startTime = obj[3]
startPrice = obj[4]
ocoGroup = None if obj[5] == NO_OCO else obj[5]
filledIn = int(obj[6])
filledOut = int(obj[7])
trancheFilledIn = [int(f) for f in obj[8]]
trancheFilledOut = [int(f) for f in obj[9]]
trancheActivationTime = [int(f) for f in obj[10]]
return Class(order, fillFeeHalfBps, state, startTime, startPrice, ocoGroup,
filledIn, filledOut, trancheFilledIn, trancheFilledOut, trancheActivationTime)
@staticmethod
def load_from_chain(obj, *, Class=None):
if Class is None:
Class = SwapOrderStatus
# 0 SwapOrder order;
def load_from_chain(tx_id: bytes, obj):
# 0 SwapOrder order
# 1 int fillFeeHalfBps
# 2 bool canceled;
# 3 uint32 startTime;
# 4 uint32 startPrice;
# 5 uint64 ocoGroup;
# 6 uint256 filled; // total
# 7 uint256[] trancheFilled; // sum(trancheFilled) == filled
# 8 uint32[] trancheActivationTime;
# 2 bool canceled
# 3 uint32 startTime
# 4 uint32 startPrice
# 5 uint64 ocoGroup
# 6 uint256 filled
# 7 ElaboratedTrancheStatus[] trancheStatus
order = SwapOrder.load(obj[0])
fillFeeHalfBps = obj[1]
state = SwapOrderState.Canceled if obj[2] else SwapOrderState.Open
startTime = obj[3]
startPrice = obj[4]
ocoGroup = None if obj[5] == NO_OCO else obj[5]
# we ignore any fill values from the on-chain struct, because we will subsequently detect the DexorderSwapFilled events and add them in
filledIn = 0
filledOut = 0
trancheFilledIn = [0 for _ in range(len(obj[7]))]
trancheFilledOut = [0 for _ in range(len(obj[7]))]
trancheActivationTime = [int(i) for i in obj[8]]
return Class(order, fillFeeHalfBps, state, startTime, startPrice, ocoGroup,
filledIn, filledOut, trancheFilledIn, trancheFilledOut, trancheActivationTime)
item = iter(obj)
order = SwapOrder.load(next(item))
fillFeeHalfBps = int(next(item))
canceled = next(item)
state = SwapOrderState.Canceled if canceled else SwapOrderState.Open
startTime = next(item)
startPrice = next(item)
ocoGroup = next(item)
if ocoGroup == NO_OCO:
ocoGroup = None
# we ignore any fill values from the on-chain struct, because we will subsequently detect the
# DexorderSwapFilled events and add them in
_ignore_filled = next(item)
trancheStatuses = [ElaboratedTrancheStatus.load_from_chain(ts) for ts in next(item)]
for ts in trancheStatuses:
ts.filledIn = 0
ts.filledOut = 0
return ElaboratedSwapOrderStatus(tx_id, order, fillFeeHalfBps, state, startTime, startPrice,
ocoGroup, 0, 0, trancheStatuses)
@staticmethod
def load(obj):
item = iter(obj)
tx_id = hexbytes(next(item))
order = SwapOrder.load(next(item))
fillFeeHalfBps = int(next(item))
state = SwapOrderState(next(item))
startTime = next(item)
startPrice = int(next(item))
ocoGroup = next(item)
if ocoGroup == NO_OCO:
ocoGroup = None
filledIn = int(next(item)) # convert from str
filledOut = int(next(item))
trancheStatus = [ElaboratedTrancheStatus.load(ts) for ts in next(item)]
return ElaboratedSwapOrderStatus(tx_id, order, fillFeeHalfBps, state, startTime, startPrice,
ocoGroup, filledIn, filledOut, trancheStatus)
def dump(self):
return (
self.order.dump(), self.fillFeeHalfBps, self.state.value, self.startTime, self.startPrice, self.ocoGroup,
str(self.filledIn), str(self.filledOut),
[str(f) for f in self.trancheFilledIn], [str(f) for f in self.trancheFilledOut],
[int(i) for i in self.trancheActivationTime]
self.tx_id, self.order.dump(), self.fillFeeHalfBps, self.state.value, self.startTime, str(self.startPrice),
self.ocoGroup, str(self.filledIn), str(self.filledOut), [ts.dump() for ts in self.trancheStatus]
)
def copy(self):
return copy.deepcopy(self)
@dataclass
class ElaboratedSwapOrderStatus (SwapOrderStatus):
@staticmethod
def load_from_tx(tx_id: bytes, obj):
# noinspection PyTypeChecker
status: ElaboratedSwapOrderStatus = SwapOrderStatus.load_from_chain(obj, Class=ElaboratedSwapOrderStatus)
status.tx_id = tx_id
return status
# noinspection PyMethodOverriding
@staticmethod
def load(obj):
tx_id, *swaporder_args = obj
result = SwapOrderStatus.load(obj[1:], Class=ElaboratedSwapOrderStatus)
result.tx_id = tx_id
return result
# noinspection PyMissingConstructor
def __init__(self, order: SwapOrder, *swapstatus_args, tx_id=b''):
super().__init__(order, *swapstatus_args)
self.tx_id: bytes = tx_id
def dump(self):
return self.tx_id, *super().dump()
def copy(self):
return super().copy()
NO_OCO = 18446744073709551615 # max uint64
@@ -224,10 +247,8 @@ class Tranche:
startTime: int
endTime: int
minIntercept: float
minSlope: float
maxIntercept: float
maxSlope: float
minLine: Line
maxLine: Line
def fraction_of(self, amount):
@@ -250,40 +271,34 @@ class Tranche:
obj[10], # rateLimitPeriod
obj[11], # startTime
obj[12], # endTime
decode_IEEE754(obj[13]), # minIntercept
decode_IEEE754(obj[14]), # minSlope
decode_IEEE754(obj[15]), # maxIntercept
decode_IEEE754(obj[16]), # maxSlope
Line.load(obj[13]), # minLine
Line.load(obj[14]), # maxLine
)
return result
def dump(self):
minB = encode_IEEE754(self.minIntercept)
minM = encode_IEEE754(self.minSlope)
maxB = encode_IEEE754(self.maxIntercept)
maxM = encode_IEEE754(self.maxSlope)
return (
self.fraction, self.startTimeIsRelative, self.endTimeIsRelative, self.minIsBarrier, self.maxIsBarrier, self.marketOrder,
self.minIsRatio, self.maxIsRatio,
False, # _reserved7
self.rateLimitFraction, self.rateLimitPeriod,
self.startTime, self.endTime, minB, minM, maxB, maxM,
self.startTime, self.endTime, self.minLine.dump(), self.maxLine.dump(),
)
def __str__(self):
msg = f'{self.fraction/MAX_FRACTION:.1%} {"start+" if self.startTimeIsRelative else ""}{self.startTime} to {"start+" if self.startTimeIsRelative else ""}{self.endTime}'
if self.marketOrder:
# for marketOrders, minIntercept is the slippage
msg += f' market order slippage {self.minIntercept:.2%}'
# for marketOrders, minLine.intercept is the slippage
msg += f' market order slippage {self.minLine.intercept:.2%}'
else:
if self.minIntercept or self.minSlope:
msg += f' >{self.minIntercept:.5g}'
if self.minSlope:
msg += f'{self.minSlope:+.5g}'
if self.maxIntercept or self.maxSlope:
msg += f' <{self.maxIntercept:.5g}'
if self.maxSlope:
msg += f'{self.maxSlope:+.5g}'
if self.minLine.intercept or self.minLine.slope:
msg += f' >{self.minLine.intercept:.5g}'
if self.minLine.slope:
msg += f'{self.minLine.slope:+.5g}'
if self.maxLine.intercept or self.maxLine.slope:
msg += f' <{self.maxLine.intercept:.5g}'
if self.maxLine.slope:
msg += f'{self.maxLine.slope:+.5g}'
if self.rateLimitPeriod:
msg += f' {self.rateLimitFraction/MAX_FRACTION:.1%} every {self.rateLimitPeriod/60:.0} minutes'
return msg
@@ -293,6 +308,9 @@ class Tranche:
class PriceProof:
proof: int
def dump(self):
return (self.proof,)
class OcoMode (Enum):
NO_OCO = 0

View File

@@ -58,7 +58,7 @@ async def main():
runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None, timer_period=0)
runner.add_event_trigger(handle_uniswap_swaps, get_contract_event('IUniswapV3PoolEvents', 'Swap'), multi=True)
runner.postprocess_cbs.append(check_ohlc_rollover)
runner.add_callback(check_ohlc_rollover)
runner.on_promotion.append(finalize_callback)
if db:
# noinspection PyUnboundLocalVariable

View File

@@ -23,6 +23,8 @@ while True:
def bits(b0, b1):
bit(b0); bit(b1)
# noinspection PyBroadException
try:
i = int(i)
assert 1 <= i <= 6

View File

@@ -1,7 +1,7 @@
import logging
from asyncio import CancelledError
from dexorder import db, blockchain, config
from dexorder import db, blockchain
from dexorder.base.chain import current_chain
from dexorder.bin.executable import execute
from dexorder.blockstate import current_blockstate
@@ -9,16 +9,16 @@ from dexorder.blockstate.blockdata import BlockData
from dexorder.blockstate.db_state import DbState
from dexorder.blockstate.fork import current_fork
from dexorder.contract import get_contract_event
from dexorder.contract.dexorder import get_factory_contract, get_dexorder_contract
from dexorder.event_handler import init, dump_log, handle_vault_created, handle_order_placed, \
handle_transfer, handle_swap_filled, handle_order_canceled, handle_order_cancel_all, handle_dexorderexecutions, \
activate_time_triggers, activate_price_triggers, \
process_active_tranches, process_execution_requests, check_ohlc_rollover, handle_uniswap_swaps, handle_vault_logic_changed
from dexorder.contract.dexorder import get_dexorder_contract
from dexorder.event_handler import (init, dump_log, handle_vault_created, handle_order_placed,
handle_transfer, handle_swap_filled, handle_order_canceled, handle_order_cancel_all,
handle_uniswap_swaps, handle_vault_logic_changed)
from dexorder.memcache import memcache
from dexorder.memcache.memcache_state import RedisState, publish_all
from dexorder.order.triggers import activate_orders
from dexorder.order.executionhandler import handle_dexorderexecutions, execute_tranches
from dexorder.order.triggers import activate_orders, end_trigger_updates
from dexorder.runner import BlockStateRunner
from dexorder.transaction import handle_transaction_receipts, finalize_transactions
from dexorder.transactions import handle_transaction_receipts, finalize_transactions
log = logging.getLogger('dexorder')
LOG_ALL_EVENTS = False # for debug todo config
@@ -44,7 +44,10 @@ def setup_logevent_triggers(runner):
else:
executions = dexorder.events.DexorderExecutions()
runner.add_event_trigger(init)
# the callbacks are run even if there's no blocks and the regular timer triggers. event triggers only run when
# a block is received.
runner.add_callback(init)
runner.add_event_trigger(handle_vault_created, get_contract_event('Vault', 'VaultCreated'))
runner.add_event_trigger(handle_vault_logic_changed, get_contract_event('Vault', 'VaultLogicChanged'))
runner.add_event_trigger(handle_order_placed, get_contract_event('OrderLib', 'DexorderSwapPlaced'))
@@ -53,15 +56,12 @@ def setup_logevent_triggers(runner):
runner.add_event_trigger(handle_swap_filled, get_contract_event('OrderLib', 'DexorderSwapFilled'))
runner.add_event_trigger(handle_order_canceled, get_contract_event('OrderLib', 'DexorderSwapCanceled'))
runner.add_event_trigger(handle_order_cancel_all, get_contract_event('OrderLib', 'DexorderCancelAll'))
runner.add_event_trigger(handle_transaction_receipts) # todo handle only the transactions that were posted to this block
runner.add_event_trigger(handle_dexorderexecutions, executions)
# these callbacks run after the ones above on each block, plus these also run every second
runner.postprocess_cbs.append(check_ohlc_rollover)
runner.postprocess_cbs.append(activate_time_triggers)
runner.postprocess_cbs.append(activate_price_triggers)
runner.postprocess_cbs.append(process_active_tranches)
runner.postprocess_cbs.append(process_execution_requests)
runner.add_callback(end_trigger_updates)
runner.add_callback(execute_tranches)
# noinspection DuplicatedCode

View File

@@ -146,6 +146,7 @@ async def main():
log.debug(f'Mirroring tokens')
txs = []
for t in tokens:
# noinspection PyBroadException
try:
info = await get_token_info(t)
# anvil had trouble estimating the gas, so we hardcode it.
@@ -163,6 +164,7 @@ async def main():
log.debug(f'Mirroring pools {", ".join(pools)}')
txs = []
for pool, info in zip(pools, pool_infos):
# noinspection PyBroadException
try:
# anvil had trouble estimating the gas, so we hardcode it.
tx = await mirrorenv.transact.mirrorPool(info, gas=5_500_000)

View File

@@ -12,7 +12,6 @@ from typing import Union, Optional
from cachetools import LRUCache
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert
from dexorder import current_w3, config, db, Blockchain
from dexorder.base.block import Block, BlockInfo, latest_block
@@ -56,18 +55,11 @@ async def _fetch(fetch: FetchLock, chain_id: int, block_id: Union[int,bytes]) ->
return Block(chain_id, found.data)
# fetch from RPC
try:
if type(block_id) is int:
fetch.result = await fetch_block_by_number(block_id, chain_id=chain_id)
else:
fetch.result = await fetch_block(block_id, chain_id=chain_id)
return fetch.result
except Exception as e:
fetch.exception = e
fetch.result = None
raise
finally:
fetch.ready.set()
if type(block_id) is int:
fetch.result = await fetch_block_by_number(block_id, chain_id=chain_id)
else:
fetch.result = await fetch_block(block_id, chain_id=chain_id)
return fetch.result
_lru = LRUCache[tuple[int, Union[int,bytes]], Block](maxsize=256)
@@ -78,9 +70,9 @@ def cache_block(block: Block, confirmed=False):
_lru[block.chain_id, block.hash] = block
_lru[block.chain_id, block.height] = block
if db:
db.session.execute(insert(DbBlock).values(
db.session.add(DbBlock(
chain=block.chain_id, hash=block.hash, height=block.height, timestamp=block.timestamp,
confirmed=confirmed, data=block.data).on_conflict_do_nothing())
confirmed=confirmed, data=block.data))
async def get_block(block_id: Union[bytes,int], *, chain_id=None) -> Block:
@@ -91,7 +83,6 @@ async def get_block(block_id: Union[bytes,int], *, chain_id=None) -> Block:
key = chain_id, block_id
# try LRU cache synchronously first
try:
# log.debug(f'\thit LRU')
return _lru[key]
except KeyError:
pass
@@ -118,6 +109,7 @@ async def get_block(block_id: Union[bytes,int], *, chain_id=None) -> Block:
finally:
# log.debug(f'fetch.result {fetch.result}')
del _fetch_locks[key]
fetch.ready.set()
# log.debug(f'\t{fetch.result}')
return fetch.result

View File

@@ -13,7 +13,6 @@ from .diff import DiffEntry, DELETE, DiffEntryItem
from ..base.block import Block
from ..base.chain import current_chain
from ..blocks import promotion_height
from ..util import hexstr
log = logging.getLogger(__name__)
state_log = logging.getLogger('dexorder.state')
@@ -158,14 +157,16 @@ class BlockState:
def remove_branch(self, branch: Branch, *, remove_series_diffs=True):
if branch.height == self.height and len(self.branches_by_height[branch.height]) == 1:
# this is the only branch at this height: compute the new lower height
self.height = max(0, *[b.height for b in self.branches_by_id.values() if b is not branch])
other_heights = [b.height for b in self.branches_by_id.values() if b is not branch]
self.height = 0 if not other_heights else max(0, *other_heights)
del self.branches_by_id[branch.id]
by_height = self.branches_by_height.get(branch.height)
if by_height is not None:
by_height.remove(branch)
if len(by_height) == 0:
# garbage collect empty arrays
del self.branches_by_height[branch.height]
if self.height:
by_height = self.branches_by_height.get(branch.height)
if by_height is not None:
by_height.remove(branch)
if len(by_height) == 0:
# garbage collect empty arrays
del self.branches_by_height[branch.height]
try:
del self.unloads[branch.id]
except KeyError:

View File

@@ -31,6 +31,12 @@ class Config:
accounts: Optional[dict[str,str]] = field(default_factory=dict) # account aliases
min_gas: str = '0'
# Order slashing
slash_kill_count: int = 5
slash_delay_base: float = 60 # one minute
slash_delay_mul: float = 2 # double the delay each time
slash_delay_max: int = 15 * 60
walker_name: str = 'default'
walker_flush_interval: float = 300
walker_stop: Optional[int] = None # block number of the last block the walker should process

View File

@@ -1,14 +1,15 @@
test_accounts = {
'test0': '0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80',
'test1': '0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d',
'test2': '0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a',
'test3': '0x7c852118294e51e653712a81e05800f419141751be58f605c371e15141b007a6',
'test4': '0x47e179ec197488593b187f80a00eb0da91f1b9d0b13f8733639f19c30a34926a',
'test5': '0x8b3a350cf5c34c9194ca85829a2df0ec3153be0318b5e2d3348e872092edffba',
'test6': '0x92db14e403b83dfe3df233f83dfa3a0d7096f21ca9b0d6d6b8d88b2b4ec1564e',
'test7': '0x4bbbf85ce3377467afe5d46f804f221813b2bb87f24d81f60f1fcdbf7cbf4356',
'test8': '0xdbda1821b80551c9d65939329250298aa3472ba22feea921c0cf5d620ea67b97',
'test9': '0x2a871d0798f97d79848a013d4936a73bf4cc922c825d33c1cf7073dff6d409c6',
# 'account_name': '0x_private_key', # public address
'test0': '0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80', # 0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266
'test1': '0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d', # 0x70997970C51812dc3A010C7d01b50e0d17dc79C8
'test2': '0x5de4111afa1a4b94908f83103eb1f1706367c2e68ca870fc3fb9a804cdab365a', # 0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC
'test3': '0x7c852118294e51e653712a81e05800f419141751be58f605c371e15141b007a6', # 0x90F79bf6EB2c4f870365E785982E1f101E93b906
'test4': '0x47e179ec197488593b187f80a00eb0da91f1b9d0b13f8733639f19c30a34926a', # 0x15d34AAf54267DB7D7c367839AAf71A00a2C6A65
'test5': '0x8b3a350cf5c34c9194ca85829a2df0ec3153be0318b5e2d3348e872092edffba', # 0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc
'test6': '0x92db14e403b83dfe3df233f83dfa3a0d7096f21ca9b0d6d6b8d88b2b4ec1564e', # 0x976EA74026E726554dB657fA54763abd0C3a0aa9
'test7': '0x4bbbf85ce3377467afe5d46f804f221813b2bb87f24d81f60f1fcdbf7cbf4356', # 0x14dC79964da2C08b23698B3D3cc7Ca32193d9955
'test8': '0xdbda1821b80551c9d65939329250298aa3472ba22feea921c0cf5d620ea67b97', # 0x23618e81E3f5cdF7f54C3d65f7FBc0aBf5B21E8f
'test9': '0x2a871d0798f97d79848a013d4936a73bf4cc922c825d33c1cf7073dff6d409c6', # 0xa0Ee7A142d267C1f36714E4a8F75612F20a79720
}
default_accounts_config = {}

View File

@@ -1,8 +1,10 @@
import glob
import json
import os
from eth_abi.exceptions import InsufficientDataBytes
from eth_utils import to_checksum_address
from typing_extensions import Union
from web3.exceptions import BadFunctionCallOutput, ContractLogicError
from .abi import abis
@@ -13,20 +15,31 @@ from ..base.chain import current_chain
CONTRACT_ERRORS = (InsufficientDataBytes, ContractLogicError, BadFunctionCallOutput)
def get_abi(name, filename=None):
return get_contract_data(name, filename)['abi']
# set initially to the string filename, then loaded on demand and set to the parsed JSON result
_contract_data: dict[str,Union[str,dict]] = {}
# finds all .sol files and sets _contract_data with their pathname
for _file in glob.glob('../contract/out/**/*.sol/*.json', recursive=True):
if os.path.isfile(_file):
_contract_data[os.path.basename(_file)[:-5]] = _file
def get_contract_data(name, filename=None):
if filename is None and name in abis:
def get_abi(name):
return get_contract_data(name)['abi']
def get_contract_data(name):
try:
return {'abi':abis[name]}
if filename is None and name == "Vault" and os.path.exists(f'../contract/out/IVault.sol/IVault.json') :
# logging.debug("getting abi from IVault.json instead of Vault.json")
name = "IVault" # Special case for proxy Vault
if filename is None:
filename = name
with open(f'../contract/out/{filename}.sol/{name}.json', 'rt') as file:
return json.load(file)
except KeyError:
pass
if name == 'Vault':
name = 'IVault' # special exception due to use of a proxy
entry = _contract_data[name]
if type(entry) is str:
with open(entry, 'rt') as file:
entry = _contract_data[name] = json.load(file)
return entry
def get_deployment_address(deployment_name, contract_name, *, chain_id=None):

View File

@@ -7,7 +7,6 @@ from web3.exceptions import Web3Exception
from web3.types import TxReceipt, TxData
from dexorder import current_w3, Account
from dexorder.base.account import current_account
from dexorder.blockstate.fork import current_fork
from dexorder.util import hexstr
@@ -91,7 +90,7 @@ def transact_wrapper(addr, name, func):
return f
def build_wrapper(addr, name, func):
def build_wrapper(_addr, _name, func):
async def f(*args, **kwargs):
tx = await func(*args).build_transaction(kwargs)
return ContractTransaction(tx)

View File

@@ -32,6 +32,7 @@ class Kv:
found.value = value
def __delitem__(self, key: str):
# noinspection PyTypeChecker
db.session.query(KeyValue).filter(KeyValue.key == key).delete()
def get(self, key: str, default=None):

View File

@@ -3,11 +3,10 @@ from enum import Enum
from typing import Optional
import sqlalchemy as sa
from sqlalchemy import ForeignKey
from sqlalchemy.orm import mapped_column, Mapped, relationship
from sqlalchemy.orm import mapped_column, Mapped
from dexorder.base.order import TransactionRequest as TransactionRequestDict, deserialize_transaction_request
from dexorder.database.column import Dict, Bytes, UUID_PK, Blockchain, UUID
from dexorder.base import TransactionRequest, transaction_request_registry
from dexorder.database.column import Dict, Bytes, UUID_PK, Blockchain
from dexorder.database.column_types import DataclassDict
from dexorder.database.model import Base
@@ -18,6 +17,7 @@ class TransactionJobState (Enum):
Signed = 'n' # tx has been signed
Sent = 's' # tx has been delivered to a node
Mined = 'z' # mined on at least one fork, whether reverted or not. todo handle forks that didnt confirm: receipts are per-fork!
Error = 'x' # an exception has prevented this job from sending a transaction
# noinspection PyProtectedMember
@@ -25,12 +25,21 @@ class TransactionJobState (Enum):
TransactionJobStateColumnType = sa.Enum(TransactionJobState)
def deserialize_transaction_request(**d):
t = d['type']
Class = transaction_request_registry.get(t)
if Class is None:
raise ValueError(f'No TransactionRequest for type "{t}"')
# noinspection PyArgumentList
return Class(**d)
class TransactionJob (Base):
id: Mapped[UUID_PK]
chain: Mapped[Blockchain] = mapped_column(index=True)
height: Mapped[int] = mapped_column(index=True) # the height at which the job was created, to be used for timeout/ data rolloff and/or by Timescale
state: Mapped[TransactionJobState] = mapped_column(TransactionJobStateColumnType, default=TransactionJobState.Requested, index=True)
request: Mapped[TransactionRequestDict] = mapped_column(DataclassDict(deserialize_transaction_request))
request: Mapped[TransactionRequest] = mapped_column(DataclassDict(deserialize_transaction_request))
tx_id: Mapped[Optional[Bytes]] = mapped_column(index=True)
tx_data: Mapped[Optional[Bytes]]
receipt: Mapped[Optional[Dict]]

View File

@@ -1,30 +1,21 @@
import asyncio
import itertools
import logging
from uuid import UUID
from web3.types import EventData
from dexorder import current_pub, db, from_timestamp, minutely
from dexorder.base.chain import current_chain, current_clock
from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, \
OrderKey
from dexorder import current_pub, minutely
from dexorder.base.chain import current_chain
from dexorder.base.order import TrancheKey, OrderKey
from dexorder.base.orderlib import SwapOrderState
from dexorder.blocks import get_block_timestamp
from dexorder.blockstate.fork import current_fork
from dexorder.contract import ERC20
from dexorder.contract.dexorder import vault_address, VaultContract, get_factory_contract
from dexorder.database.model.transaction import TransactionJob
from dexorder.logics import get_logic_version
from dexorder.ohlc import ohlcs, recent_ohlcs
from dexorder.ohlc import ohlcs
from dexorder.order.orderstate import Order
from dexorder.order.triggers import OrderTriggers, price_triggers, time_triggers, \
unconstrained_price_triggers, execution_requests, inflight_execution_requests, TrancheStatus, active_tranches, \
new_price_triggers, activate_order, close_order_and_disable_triggers
from dexorder.order.triggers import (OrderTriggers, activate_order, update_balance_triggers, start_trigger_updates,
update_price_triggers)
from dexorder.pools import new_pool_prices, pool_prices, get_uniswap_data
from dexorder.transaction import submit_transaction_request
from dexorder.util.async_util import maywait
from dexorder.vault_blockdata import vault_owners, vault_balances, adjust_balance, MAX_VAULTS, verify_vault
from dexorder.vault_blockdata import vault_owners, adjust_balance, MAX_VAULTS, verify_vault
log = logging.getLogger(__name__)
@@ -35,7 +26,7 @@ def dump_log(eventlog):
def init():
new_pool_prices.clear()
new_price_triggers.clear()
start_trigger_updates()
async def handle_order_placed(event: EventData):
@@ -44,6 +35,9 @@ async def handle_order_placed(event: EventData):
addr = event['address']
start_index = int(event['args']['startOrderIndex'])
num_orders = int(event['args']['numOrders'])
# todo accounting
order_fee = int(event['args']['orderFee'])
gas_fee = int(event['args']['gasFee'])
log.debug(f'DexorderPlaced {addr} {start_index} {num_orders}')
if not await verify_vault(addr):
log.warning(f'Discarding order from rogue vault {addr}.')
@@ -62,6 +56,7 @@ async def handle_order_placed(event: EventData):
def handle_swap_filled(event: EventData):
log.debug('handle_swap_filled')
# event DexorderSwapFilled (uint64 orderIndex, uint8 trancheIndex, uint256 amountIn, uint256 amountOut);
log.debug(f'DexorderSwapFilled {event}')
args = event['args']
@@ -70,6 +65,7 @@ def handle_swap_filled(event: EventData):
tranche_index = args['trancheIndex']
amount_in = args['amountIn']
amount_out = args['amountOut']
# todo accounting
fill_fee = args['fillFee']
next_execution_time = args['nextExecutionTime']
try:
@@ -77,7 +73,7 @@ def handle_swap_filled(event: EventData):
except KeyError:
log.warning(f'DexorderSwapFilled IGNORED due to missing order {vault} {order_index}')
return
order.status.trancheActivationTime[tranche_index] = next_execution_time # update rate limit
order.status.trancheStatus[tranche_index].activationTime = next_execution_time # update rate limit
try:
triggers = OrderTriggers.instances[order.key]
except KeyError:
@@ -118,28 +114,27 @@ async def handle_transfer(transfer: EventData):
# log.debug(f'Transfer {transfer}')
from_address = transfer['args']['from']
to_address = transfer['args']['to']
if to_address == from_address:
return
amount = int(transfer['args']['value'])
if to_address in vault_owners and to_address != from_address:
if to_address in vault_owners:
log.debug(f'deposit {to_address} {amount}')
vault = to_address
token_address = transfer['address']
await adjust_balance(vault, token_address, amount)
if from_address in vault_owners and to_address != from_address:
elif from_address in vault_owners:
log.debug(f'withdraw {to_address} {amount}')
vault = from_address
else:
vault = None
if vault is not None:
token_address = transfer['address']
await adjust_balance(vault, token_address, amount)
# if to_address not in vault_owners and from_address not in vault_owners:
# vaults = vault_owners.keys()
# log.debug(f'vaults: {list(vaults)}')
await update_balance_triggers(vault, token_address, amount)
async def handle_uniswap_swaps(swaps: list[EventData]):
# asynchronously prefetch the block timestamps we'll need
block_ids = set(swap['blockHash'] for swap in swaps)
for batch in itertools.batched(block_ids, 4):
await asyncio.gather(*[get_block_timestamp(h) for h in batch])
hashes = set(swap['blockHash'] for swap in swaps)
await asyncio.gather(*[get_block_timestamp(h) for h in hashes])
# now execute the swaps synchronously
for swap in swaps:
await handle_uniswap_swap(swap)
@@ -153,6 +148,7 @@ async def handle_uniswap_swap(swap: EventData):
addr = pool['address']
pool_prices[addr] = price
await ohlcs.update_all(addr, time, price)
await update_price_triggers(pool, price)
log.debug(f'pool {addr} {minutely(time)} {price}')
@@ -198,155 +194,3 @@ async def handle_vault_logic_changed(upgrade: EventData):
version = await get_logic_version(logic)
log.debug(f'Vault {addr} upgraded to logic version {version}')
async def activate_time_triggers():
now = current_clock.get().timestamp
# log.debug(f'activating time triggers at {now}')
# time triggers
for tt in tuple(time_triggers):
await maywait(tt(now))
async def activate_price_triggers():
# log.debug(f'activating price triggers')
pools_triggered = set()
for pool, price in new_pool_prices.items():
pools_triggered.add(pool)
for pt in tuple(price_triggers[pool]):
await maywait(pt(price))
for pool, triggers in new_price_triggers.items():
if pool not in pools_triggered:
price = pool_prices[pool]
for pt in triggers:
await maywait(pt(price))
for t in tuple(unconstrained_price_triggers):
await maywait(t(None))
async def process_active_tranches():
for tk, proof in active_tranches.items():
old_req = execution_requests.get(tk)
height = current_fork.get().height
if old_req is None or old_req.height <= height: # '<=' is used so proof is updated with more recent values
if await has_funds(tk):
log.info(f'execution request for {tk}')
execution_requests[tk] = ExecutionRequest(height, proof)
# else:
# log.debug(f'underfunded tranche {tk}')
async def has_funds(tk: TrancheKey):
# log.debug(f'has funds? {tk.vault}')
order = Order.of(tk)
minimum = order.status.order.minFillAmount if order.amount_is_input else 0
balances = vault_balances.get(tk.vault, {})
token_addr = order.status.order.tokenIn
token_balance = balances.get(token_addr)
if token_balance is None:
# unknown balance
token_balance = balances[token_addr] = await ERC20(token_addr).balanceOf(tk.vault)
log.debug(f'queried token balance {token_addr}.balanceOf({tk.vault}) = {token_balance}')
await adjust_balance(tk.vault, token_addr, token_balance)
# log.debug(f'minimum {minimum} balances {token_addr} {balances}')
return token_balance > minimum
async def process_execution_requests():
height = current_fork.get().height
execs = {} # which requests to act on
for tk, er in execution_requests.items():
tk: TrancheKey
er: ExecutionRequest
pending = inflight_execution_requests.get(tk)
if pending is None or height-pending >= 30:
# todo execution timeout => retry ; should we use timestamps? configure per-chain.
# todo check balances
log.warning(f're-sending unconfirmed transaction {tk} is pending execution')
execs[tk] = er
else:
log.debug(f'tranche {tk} is pending execution')
# execute the list
# todo batch execution
for tk, er in execs.items():
job = submit_transaction_request(new_tranche_execution_request(tk, er.proof))
inflight_execution_requests[tk] = height
log.info(f'created job {job.id} to execute tranche {tk}')
def handle_dexorderexecutions(event: EventData):
log.debug(f'executions {event}')
exe_id = UUID(bytes=event['args']['id'])
errors = event['args']['errors']
if len(errors) == 0:
log.warning(f'No errors found in DexorderExecutions event: {event}')
return
if len(errors) > 1:
log.warning(f'Multiple executions not yet implemented')
job: TransactionJob = db.session.get(TransactionJob, exe_id)
if job is None:
log.warning(f'Job {exe_id} not found!')
return
finish_execution_request(job.request, errors[0])
def finish_execution_request(req: TrancheExecutionRequest, error: str):
try:
order: Order = Order.of(req.vault, req.order_index)
except KeyError:
log.error(f'Could not get order {OrderKey(req.vault, req.order_index)}')
return
tk = TrancheKey(req.vault, req.order_index, req.tranche_index)
try:
del execution_requests[tk]
except KeyError:
pass
if error != '':
log.debug(f'execution request for tranche {tk} had error "{error}"')
if error == '':
log.debug(f'execution request for tranche {tk} was successful!')
elif error == 'IIA':
# Insufficient Input Amount or Safe Transfer Failure: suspend execution until new funds are sent
# todo vault balance checks
token = order.order.tokenIn
log.debug(f'insufficient funds {req.vault} {token} ')
elif error == 'SPL':
# Square-root price limit from Uniswap means we asked for a limit price that isn't met. This is a fault of
# vault logic if it happens.
log.warning(f'SPL when executing tranche {tk}')
close_order_and_disable_triggers(order, SwapOrderState.Error)
elif error == 'NO':
# order is not open
log.warning(f'order {OrderKey(tk.vault,tk.order_index)} was closed, undetected!')
close_order_and_disable_triggers(order, SwapOrderState.Error)
elif error == 'TF':
# Tranche Filled
log.warning(f'tranche already filled {tk}')
try:
triggers = OrderTriggers.instances[order.key]
tranche_trigger = triggers.triggers[tk.tranche_index]
except KeyError:
pass
else:
tranche_trigger.status = TrancheStatus.Filled
tranche_trigger.disable()
elif error == 'Too little received':
# from UniswapV3 SwapRouter when not even 1 satoshi of output was gained
log.debug('warning: de minimis liquidity in pool')
# todo dont keep trying
else:
# todo slash and backoff
log.error(f'Unhandled execution error for transaction request {req} ERROR: "{error}"')
last_ohlc_rollover = 0
async def check_ohlc_rollover():
global last_ohlc_rollover
time = await get_block_timestamp(current_fork.get().head_identifier)
dt = from_timestamp(time)
diff = time - last_ohlc_rollover
if diff >= 60 or dt.minute != from_timestamp(last_ohlc_rollover).minute:
for (symbol, period) in recent_ohlcs.keys():
await ohlcs.update(symbol, period, dt)
last_ohlc_rollover = time

View File

@@ -12,6 +12,6 @@ async def get_logic_version(addr):
try:
return logics[addr]
except KeyError:
version = await ContractProxy(addr, abi=get_abi('IVaultLogic', 'IVault')).version()
version = await ContractProxy(addr, abi=get_abi('IVaultLogic')).version()
logics[addr] = version
return version

View File

@@ -1,37 +1,171 @@
import logging
from typing import Optional
from uuid import UUID
from web3.exceptions import ContractPanicError, ContractLogicError
from web3.types import EventData
from dexorder import db
from dexorder.base.order import TrancheExecutionRequest, TrancheKey
from dexorder.transaction import TransactionHandler
from dexorder.base.order import TrancheKey, OrderKey
from dexorder.base.orderlib import SwapOrderState, PriceProof
from dexorder.contract.dexorder import get_dexorder_contract
from dexorder.database.model.transaction import TransactionJob
from dexorder.order.triggers import inflight_execution_requests
from dexorder.order.orderstate import Order
from dexorder.order.triggers import (inflight_execution_requests, OrderTriggers,
close_order_and_disable_triggers, TrancheState, active_tranches)
from dexorder.transactions import TransactionHandler, TrancheExecutionRequest, submit_transaction_request, \
new_tranche_execution_request
from dexorder.util import hexbytes
log = logging.getLogger(__name__)
class TrancheExecutionHandler (TransactionHandler):
def __init__(self):
super().__init__('te')
async def build_transaction(self, job_id: UUID, req: TrancheExecutionRequest) -> dict:
# noinspection PyBroadException
tk = req.tranche_key
try:
return await get_dexorder_contract().build.execute(job_id.bytes, (req.vault, req.order_index, req.tranche_index, req.price_proof))
except (ContractPanicError, ContractLogicError) as x:
# todo if there's a logic error we shouldn't keep trying
log.error(f'While executing job {job_id}: {x}')
await self.complete_transaction(db.session.get(TransactionJob, job_id))
except Exception:
log.exception(f'Could not send execution request {req}')
except ContractPanicError as x:
exception = x
errcode = ''
except ContractLogicError as x:
exception = x
errcode = hexbytes(x.args[1]).decode('utf-8')
log.error(f'While building execution for tranche {tk}: {errcode}')
# if there's a logic error we shouldn't keep trying
finish_execution_request(tk, errcode)
raise exception
async def complete_transaction(self, job: TransactionJob) -> None:
# noinspection PyTypeChecker
req: TrancheExecutionRequest = job.request
tk = TrancheKey(req.vault, req.order_index, req.tranche_index)
log.debug(f'completing execution request {tk}')
del inflight_execution_requests[tk]
finish_execution_request(tk)
TrancheExecutionHandler() # map 'te' to a TrancheExecutionHandler
def finish_execution_request(tk: TrancheKey, error: Optional[str]=None):
order_key = OrderKey(tk.vault, tk.order_index)
try:
order: Order = Order.of(order_key)
except KeyError:
log.error(f'Could not get order {order_key}')
return
try:
inflight_execution_requests.remove(tk)
except KeyError:
pass
def get_trigger():
try:
return OrderTriggers.instances[order.key].triggers[tk.tranche_index]
except KeyError:
return None
def slash():
trig = get_trigger()
if trig is not None:
trig.slash()
#
# execute() error handling
#
if error is None:
log.debug(f'execution request for tranche {tk} was successful!')
elif error == 'IIA':
# Insufficient Input Amount
token = order.order.tokenIn
log.debug(f'insufficient funds {tk.vault} {token} ')
elif error == 'SPL':
# todo tight slippage can cause excessive executions as the backend repeatedly retries the remainder. The symptom is error 'SPL'.
# Square-root price limit from Uniswap means we asked for a limit price that isn't met. This is a fault of
# vault logic if it happens.
log.warning(f'SPL when executing tranche {tk}')
close_order_and_disable_triggers(order, SwapOrderState.Error)
elif error == 'NO':
# order is not open
log.warning(f'order {order_key} was closed, undetected!')
close_order_and_disable_triggers(order, SwapOrderState.Error) # We do not know if it was filled or not so only Error status can be given
elif error == 'TF':
# Tranche Filled
log.warning(f'tranche already filled {tk}')
tranche_trigger = get_trigger()
if tranche_trigger is not None:
tranche_trigger.status = TrancheState.Filled
tranche_trigger.disable()
elif error == 'Too little received':
# from UniswapV3 SwapRouter when not even 1 satoshi of output was gained
log.debug('warning: de minimis liquidity in pool')
slash()
elif error == 'RL':
log.debug(f'tranche {tk} execution failed due to "RL" rate limit')
pass
elif error == 'TE':
log.debug(f'tranche {tk} execution failed due to "TE" too early')
pass
elif error == 'TL':
log.debug(f'tranche {tk} execution failed due to "TL" too late')
pass
elif error == 'LL':
log.debug(f'tranche {tk} execution failed due to "LL" lower limit')
pass
elif error == 'LU':
log.debug(f'tranche {tk} execution failed due to "LU" upper limit')
pass
elif error == 'OVR':
log.warning(f'tranche {tk} execution failed due to "OVR" overfilled')
# this should never happen. Shut down the order.
close_order_and_disable_triggers(order, SwapOrderState.Error)
elif error == 'K':
log.error(f'vault killed')
close_order_and_disable_triggers(order, SwapOrderState.Error)
elif error == 'STF':
log.error(f'tranche {tk} execution failed due to "STF" safe transfer failure')
close_order_and_disable_triggers(order, SwapOrderState.Error)
else:
slash()
msg = '<unspecified>' if not error else error
log.error(f'Unhandled execution error for tranche {tk} ERROR: {msg}')
def execute_tranches():
new_execution_requests = []
for tk, proof in active_tranches.items():
if tk not in inflight_execution_requests:
new_execution_requests.append((tk, proof))
# todo order requests and batch
for tk, proof in new_execution_requests:
create_execution_request(tk, proof)
def create_execution_request(tk: TrancheKey, proof: PriceProof):
inflight_execution_requests.add(tk)
job = submit_transaction_request(new_tranche_execution_request(tk, proof))
log.debug(f'Executing {tk} as job {job.id}')
return job
def handle_dexorderexecutions(event: EventData):
log.debug(f'executions {event}')
exe_id = UUID(bytes=event['args']['id'])
errors = event['args']['errors']
if len(errors) == 0:
log.warning(f'No errors found in DexorderExecutions event: {event}')
return
if len(errors) > 1:
log.warning(f'Multiple executions not yet implemented')
job: TransactionJob = db.session.get(TransactionJob, exe_id)
if job is None:
log.warning(f'Job {exe_id} not found!')
return
# noinspection PyTypeChecker
req: TrancheExecutionRequest = job.request
tk = TrancheKey(req.vault, req.order_index, req.tranche_index)
finish_execution_request(tk, None if errors[0] == '' else errors[0])

View File

@@ -6,7 +6,7 @@ from typing import overload
from dexorder import DELETE, db, order_log
from dexorder.base.chain import current_chain
from dexorder.base.order import OrderKey, TrancheKey
from dexorder.base.orderlib import SwapOrderStatus, SwapOrderState, ElaboratedSwapOrderStatus
from dexorder.base.orderlib import SwapOrderState, ElaboratedSwapOrderStatus
from dexorder.blockstate import BlockDict, BlockSet
from dexorder.database.model.orderindex import OrderIndex
from dexorder.routing import pool_address
@@ -16,6 +16,8 @@ from dexorder.vault_blockdata import vault_owners
log = logging.getLogger(__name__)
# We split off the fill information for efficient communication to clients.
@dataclass
class Filled:
filled_in: int
@@ -79,16 +81,16 @@ class Order:
key = OrderKey(vault, order_index)
if key in Order.instances:
raise ValueError
status = ElaboratedSwapOrderStatus.load_from_tx(tx_id, obj)
status = ElaboratedSwapOrderStatus.load_from_chain(tx_id, obj)
Order.order_statuses[key] = status.copy() # always copy the struct when setting. values in BlockData must be immutable
order = Order(key)
if order.is_open:
Order.open_orders.add(key)
Order.vault_open_orders.listappend(key.vault, key.order_index)
# Start with a filled value of 0 even if the chain says otherwise, because we will process the fill events later and add them in
tranche_filled = [Filled(0,0) for _ in range(len(status.trancheFilledIn))]
tranche_filled = [Filled(0, 0) for _ in range(len(status.trancheStatus))]
order_log.debug(f'initialized order_filled[{key}]')
Order.order_filled[key] = OrderFilled(Filled(0,0), tranche_filled)
Order.order_filled[key] = OrderFilled(Filled(0, 0), tranche_filled)
order_log.debug(f'order created {key}')
return order
@@ -103,9 +105,9 @@ class Order:
key = a if b is None else OrderKey(a, b)
assert key not in Order.instances
self.key = key
self.status: SwapOrderStatus = Order.order_statuses[key].copy()
self.status: ElaboratedSwapOrderStatus = Order.order_statuses[key].copy()
self.pool_address: str = pool_address(self.status.order)
self.tranche_keys = [TrancheKey(key.vault, key.order_index, i) for i in range(len(self.status.trancheFilledIn))]
self.tranche_keys = [TrancheKey(key.vault, key.order_index, i) for i in range(len(self.status.trancheStatus))]
# flattenings of various static data
self.order = self.status.order
self.amount = self.status.order.amount
@@ -132,11 +134,11 @@ class Order:
def tranche_filled_in(self, tranche_index: int):
return Order.order_filled[self.key].tranche_filled[tranche_index].filled_in if self.is_open \
else self.status.trancheFilledIn[tranche_index]
else self.status.trancheStatus[tranche_index].filledIn
def tranche_filled_out(self, tranche_index: int):
return Order.order_filled[self.key].tranche_filled[tranche_index].filled_out if self.is_open \
else self.status.trancheFilledIn[tranche_index]
else self.status.trancheStatus[tranche_index].filledOut
def tranche_filled(self, tranche_index: int):
return self.tranche_filled_in(tranche_index) if self.amount_is_input \
@@ -146,18 +148,16 @@ class Order:
return self.tranche_amounts[tranche_index] - self.tranche_filled(tranche_index)
def activation_time(self, tranche_index: int):
return self.status.trancheActivationTime[tranche_index]
return self.status.trancheStatus[tranche_index].activationTime
@property
def filled(self):
return self.filled_in if self.amount_is_input else self.filled_out
@property
def is_open(self):
return self.state.is_open
def add_fill(self, tranche_index: int, filled_in: int, filled_out: int):
order_log.debug(f'tranche fill {self.key}|{tranche_index} in:{filled_in} out:{filled_out}')
try:
@@ -192,8 +192,8 @@ class Order:
status.filledIn = of.filled.filled_in
status.filledOut = of.filled.filled_out
for i, tf in enumerate(of.tranche_filled):
status.trancheFilledIn[i] += of.tranche_filled[i].filled_in
status.trancheFilledOut[i] += of.tranche_filled[i].filled_out
status.trancheStatus[i].filledIn = of.tranche_filled[i].filled_in
status.trancheStatus[i].filledOut = of.tranche_filled[i].filled_out
Order.order_statuses[self.key] = status # set the status in order to save it
Order.order_statuses.unload(self.key) # but then unload from memory after root promotion
order_log.debug(f'order completed {status}')
@@ -229,7 +229,7 @@ class Order:
return None
@staticmethod
def save_order_index(key: OrderKey, status: SwapOrderStatus):
def save_order_index(key: OrderKey, status: ElaboratedSwapOrderStatus):
if status is DELETE:
sess = db.session
oi = sess.get(OrderIndex, (current_chain.get(), key.vault, key.order_index))
@@ -255,7 +255,7 @@ class Order:
# this is the main order table.
# it holds "everything" about an order in the canonical format specified by the contract orderlib, except that
# the filled amount fields for active orders are maintained in the order_remainings and tranche_remainings series.
order_statuses: BlockDict[OrderKey, SwapOrderStatus] = BlockDict(
order_statuses: BlockDict[OrderKey, ElaboratedSwapOrderStatus] = BlockDict(
'o', db='lazy', redis=True, pub=pub_order_status, finalize_cb=save_order_index,
str2key=OrderKey.str2key, value2str=lambda v: json.dumps(v.dump()),
str2value=lambda s:ElaboratedSwapOrderStatus.load(json.loads(s)),

View File

@@ -1,229 +1,57 @@
import asyncio
import logging
from abc import abstractmethod
from collections import defaultdict
from enum import Enum, auto
from typing import Callable, Optional, Union, Awaitable
from typing import Optional, Sequence
from dexorder.base.orderlib import SwapOrderState, PriceProof, DISTANT_FUTURE, DISTANT_PAST
from dexorder.blockstate import BlockSet, BlockDict
from dexorder.util import defaultdictk
import numpy as np
from sortedcontainers import SortedList
from dexorder.base.orderlib import SwapOrderState, PriceProof, DISTANT_FUTURE, DISTANT_PAST, Line
from dexorder.blockstate import BlockDict
from .orderstate import Order
from .. import dec, order_log, now, timestamp, from_timestamp
from .. import dec, order_log, timestamp, from_timestamp, config
from ..base.chain import current_clock
from ..base.order import OrderKey, TrancheKey, ExecutionRequest
from ..base.order import OrderKey, TrancheKey
from ..contract import ERC20
from ..database.model.pool import OldPoolDict
from ..pools import ensure_pool_price, pool_prices, get_pool
from ..routing import pool_address
from ..vault_blockdata import vault_balances, adjust_balance
log = logging.getLogger(__name__)
# todo time and price triggers should be BlockSortedSets that support range queries for efficient lookup of triggers
TimeTrigger = Callable[[int], None] # func(timestamp)
time_triggers:BlockSet[TimeTrigger] = BlockSet('tt')
PriceTrigger = Callable[[dec], Union[Awaitable[None],None]] # [async] func(pool_price)
UnconstrainedPriceTrigger = Callable[[Optional[dec]], Union[Awaitable[None],None]] # [async] func(pool_price)
price_triggers:dict[str, BlockSet[PriceTrigger]] = defaultdictk(lambda addr:BlockSet(f'pt|{addr}')) # different BlockSet per pool address
new_price_triggers:dict[str, set[PriceTrigger]] = defaultdict(set) # when price triggers are first set, they must be tested against the current price even if it didnt change this block
unconstrained_price_triggers: BlockSet[UnconstrainedPriceTrigger] = BlockSet('upt') # tranches with no price constraints, whose time constraint is fulfilled
active_tranches: BlockDict[TrancheKey, Optional[PriceProof]] = BlockDict('at') # tranches which have passed all constraints and should be executed
execution_requests:BlockDict[TrancheKey, ExecutionRequest] = BlockDict('e') # generated by the active tranches
# todo should this really be blockdata?
inflight_execution_requests:BlockDict[TrancheKey, int] = BlockDict('ei') # value is block height when the request was sent
"""
Each tranche can have up to two time constraints: activation time and expiration time, and two price constraints:
min line and max line. Line constraints may either be barriers or not.
Additionally, each order can be blocked based on available funds in the vault.
In order to handle chain reorganizations without re-evaluating every trigger for every head, the boolean state of each
constraint is saved in BlockState as a bitarray. When a time or price is changed, only the triggers sensitive to that
input are updated, and then checked along with the cached values from unchanged constraints to determine if an
execution should be attempted on the tranche.
"""
async def activate_orders():
log.debug('activating orders')
# this is a state init callback, called only once after the state has been loaded from the db or created fresh
keys = list(Order.open_orders)
orders = [Order.of(key) for key in keys]
for order in orders:
# setup triggers
await activate_order(order) # too many to really parallelize, and it's startup anyway
log.debug(f'activated {len(keys)} orders')
async def activate_order(order: Order):
"""
Call this to enable triggers on an order which is already in the state.
"""
address = pool_address(order.status.order)
pool = await get_pool(address)
await ensure_pool_price(pool)
triggers = OrderTriggers(order)
if triggers.closed:
log.debug(f'order {order.key} was immediately closed')
close_order_and_disable_triggers(order, SwapOrderState.Filled if order.remaining == 0 or order.remaining <= order.min_fill_amount else SwapOrderState.Expired)
def intersect_ranges( a_low, a_high, b_low, b_high):
low, high = max(a_low,b_low), min(a_high,b_high)
if high <= low:
low, high = None, None
return low, high
async def line_passes(lc: tuple[float,float], is_min: bool, price: dec) -> bool:
b, m = lc
if b == 0 and m == 0:
return True
limit = m * current_clock.get().timestamp + b
# log.debug(f'line passes {limit} {"<" if is_min else ">"} {price}')
# todo ratios
# prices AT the limit get zero volume, so we only trigger on >, not >=
return is_min and limit < price or not is_min and limit > price
class TrancheStatus (Enum):
Early = auto() # first time trigger hasnt happened yet
Pricing = auto() # we are inside the time window and checking prices
Filled = auto() # tranche has no more available amount
Expired = auto() # time deadline has past and this tranche cannot be filled
class TrancheTrigger:
def __init__(self, order: Order, tranche_key: TrancheKey):
assert order.key.vault == tranche_key.vault and order.key.order_index == tranche_key.order_index
self.order = order
self.tk = tranche_key
self.status = TrancheStatus.Early
tranche = order.order.tranches[self.tk.tranche_index]
tranche_amount = tranche.fraction_of(order.amount)
tranche_filled = order.tranche_filled(self.tk.tranche_index)
tranche_remaining = tranche_amount - tranche_filled
# time and price constraints
self.time_constraint = [tranche.startTime, tranche.endTime]
if tranche.startTimeIsRelative:
self.time_constraint[0] += self.order.status.start
if tranche.endTimeIsRelative:
self.time_constraint[1] += self.order.status.start
if self.time_constraint[0] <= DISTANT_PAST and self.time_constraint[1] >= DISTANT_FUTURE:
self.time_constraint = None
self.min_line_constraint = (0.,0.) if tranche.marketOrder else (tranche.minIntercept, tranche.minSlope)
self.max_line_constraint = (0.,0.) if tranche.marketOrder else (tranche.maxIntercept, tranche.maxSlope)
self.has_line_constraint = any( a or b for a,b in (self.min_line_constraint, self.max_line_constraint))
self.has_sloped_line_constraint = any(m!=0 for b,m in (self.min_line_constraint, self.max_line_constraint))
self.slippage = tranche.minIntercept if tranche.marketOrder else 0
self.pool_price_multiplier = None
# compute status and set relevant triggers
if tranche_remaining == 0 or tranche_remaining < self.order.min_fill_amount: # min_fill_amount could be 0 (disabled) so we also check for the 0 case separately
self.status = TrancheStatus.Filled
return
timestamp = current_clock.get().timestamp
self.status = \
TrancheStatus.Pricing if self.time_constraint is None else \
TrancheStatus.Early if timestamp < self.time_constraint[0] else \
TrancheStatus.Expired if timestamp > self.time_constraint[1] else \
TrancheStatus.Pricing
self.enable_time_trigger()
if self.status == TrancheStatus.Pricing:
self.enable_price_trigger()
def enable_time_trigger(self):
if self.time_constraint:
log.debug(f'enable_time_trigger')
time_triggers.add(self.time_trigger)
def disable_time_trigger(self):
if self.time_constraint:
time_triggers.remove(self.time_trigger)
def time_trigger(self, now):
# log.debug(f'time_trigger {now} {self.status} {self.time_constraint}')
if self.closed:
log.debug(f'price trigger ignored because trigger status is {self.status}')
return
if not self.check_expired(now) and self.status == TrancheStatus.Early and now >= self.time_constraint[0]:
order_log.debug(f'tranche time enabled {self.tk}')
self.status = TrancheStatus.Pricing
self.enable_price_trigger()
def enable_price_trigger(self):
if self.has_line_constraint and not self.has_sloped_line_constraint: # sloped constraints must be triggered every tick, not just on pool price changes
price_triggers[self.order.pool_address].add(self.price_trigger)
new_price_triggers[self.order.pool_address].add(self.price_trigger)
else:
unconstrained_price_triggers.add(self.price_trigger)
def disable_price_trigger(self):
if self.has_line_constraint and not self.has_sloped_line_constraint:
price_triggers[self.order.pool_address].remove(self.price_trigger)
else:
unconstrained_price_triggers.remove(self.price_trigger)
async def price_trigger(self, cur: dec):
# must be idempotent. could be called twice when first activated: once for the initial price lookup then once again if that price was changed in the current block
if self.closed:
log.debug(f'price trigger ignored because trigger status is {self.status}')
return
activation_time = self.order.activation_time(self.tk.tranche_index)
if activation_time != 0 and timestamp() < activation_time:
log.debug(f'{self.tk} is rate limited until {from_timestamp(activation_time)}')
return # rate limited
# log.debug(f'price trigger {cur}')
addr = pool_address(self.order.order)
pool = await get_pool(addr)
if cur is None and self.has_line_constraint:
await ensure_pool_price(pool)
cur = pool_prices[addr]
if cur is not None:
if self.pool_price_multiplier is None:
self.pool_price_multiplier = dec(10) ** dec(-pool['decimals'])
# log.debug(f'adjusted cur price from {cur} => {cur*self.pool_price_multiplier}')
cur *= self.pool_price_multiplier
if cur is None or not self.has_line_constraint or all(await asyncio.gather(
line_passes(self.min_line_constraint, True, cur),
line_passes(self.max_line_constraint, False, cur))):
# setting active_tranches[] with a PriceProof causes an execute() invocation
active_tranches[self.tk] = PriceProof(0) # todo PriceProof
def fill(self, _amount_in, _amount_out ):
remaining = self.order.tranche_remaining(self.tk.tranche_index)
filled = remaining == 0 or remaining < self.order.min_fill_amount
if filled:
order_log.debug(f'tranche filled {self.tk}')
self.status = TrancheStatus.Filled
self.disable()
else:
order_log.debug(f'tranche part-filled {self.tk} in:{_amount_in} out:{_amount_out} remaining:{remaining}')
return filled
def check_expired(self, now):
expired = now >= self.time_constraint[1]
if expired:
self.expire()
return expired
def expire(self):
order_log.debug(f'tranche expired {self.tk}')
self.status = TrancheStatus.Expired
self.disable()
def disable(self):
try:
del active_tranches[self.tk]
except KeyError:
pass
self.disable_time_trigger()
self.disable_price_trigger()
@property
def closed(self):
return self.status in (TrancheStatus.Filled, TrancheStatus.Expired)
@property
def open(self):
return not self.closed
# tranches which have passed all constraints and should be executed
active_tranches: BlockDict[TrancheKey, Optional[PriceProof]] = BlockDict('at')
class OrderTriggers:
instances: dict[OrderKey, 'OrderTriggers'] = {}
def __init__(self, order: Order):
@staticmethod
async def create(order: Order):
triggers = await asyncio.gather(*[TrancheTrigger.create(order, tk) for tk in order.tranche_keys])
return OrderTriggers(order, triggers)
def __init__(self, order: Order, triggers: Sequence['TrancheTrigger']):
assert order.key not in OrderTriggers.instances
self.order = order
self.triggers = [TrancheTrigger(order, tk) for tk in self.order.tranche_keys]
self.triggers = triggers
OrderTriggers.instances[order.key] = self
log.debug(f'created OrderTriggers for {order.key}')
@@ -256,6 +84,50 @@ class OrderTriggers:
self.check_complete()
def start_trigger_updates():
"""
Called near the beginning of block handling to initialize any per-block trigger data structures
"""
TimeTrigger.update_all(current_clock.get().timestamp)
PriceLineTrigger.clear_data()
#
# Client Interface
#
async def update_balance_triggers(vault: str, token: str, balance: int):
updates = [bt.update(balance) for bt in BalanceTrigger.by_vault_token.get((vault, token), [])]
await asyncio.gather(*updates)
async def update_price_triggers(pool: OldPoolDict, price: dec):
price = price * dec(10) ** dec(-pool['decimals']) # adjust for pool decimals to get onchain price
price = float(price) # since we use SIMD operations to evaluate lines, we must convert to float
updates = [pt.update(price) for pt in PriceLineTrigger.by_pool.get(pool['address'], [])]
await asyncio.gather(*updates)
inflight_execution_requests: set[TrancheKey] = set()
async def end_trigger_updates():
"""
Call once after all updates have been handled. This updates the active_tranches array based on final trigger state.
"""
PriceLineTrigger.end_updates(current_clock.get().timestamp)
for tk in _dirty:
if _trigger_state.get(tk,0) == 0:
# all clear for execution. add to active list with any necessary proofs
active_tranches[tk] = PriceProof(0)
else:
# blocked by one or more triggers. delete from active list.
try:
del active_tranches[tk]
except KeyError:
pass
_dirty.clear()
def close_order_and_disable_triggers(order: Order, final_state: SwapOrderState):
order.complete(final_state)
try:
@@ -265,3 +137,387 @@ def close_order_and_disable_triggers(order: Order, final_state: SwapOrderState):
else:
triggers.disable()
# NOTE: we store the INVERSE of each trigger's value! this causes the test for "All True" to be comparison with 0
# instead of comparison with a set of 1's the correct size. By storing inverted values, the group does not
# need to know the number of child triggers, only that no falses have been reported.
_trigger_state: BlockDict[TrancheKey, int] = BlockDict('trig', str2key=TrancheKey.str2key, db=True)
_dirty:set[TrancheKey] = set()
class Trigger:
def __init__(self, position: int, tk: TrancheKey, value: bool):
"""
position is the bit position of the boolean result in the tranche's constraint bitfield.
"""
self.position = position
self.tk = tk
self.value = value
@property
def value(self):
return _trigger_state.get(self.tk,0) & (1 << self.position) == 0 # NOTE: inverted
@value.setter
def value(self, value):
if value != self.value:
_dirty.add(self.tk)
if not value: # this conditional is inverted
_trigger_state[self.tk] |= 1 << self.position # set
else:
_trigger_state[self.tk] &= ~(1 << self.position) # clear
@abstractmethod
def remove(self): ...
async def has_funds(tk: TrancheKey):
# log.debug(f'has funds? {tk.vault}')
order = Order.of(tk)
balances = vault_balances.get(tk.vault, {})
token_addr = order.status.order.tokenIn
token_balance = balances.get(token_addr)
if token_balance is None:
# unknown balance
token_balance = balances[token_addr] = await ERC20(token_addr).balanceOf(tk.vault)
log.debug(f'queried token balance {token_addr}.balanceOf({tk.vault}) = {token_balance}')
await adjust_balance(tk.vault, token_addr, token_balance)
return await input_amount_is_sufficient(order, token_balance)
async def input_amount_is_sufficient(order, token_balance):
if order.amount_is_input:
return token_balance >= order.status.order.minFillAmount
# amount is an output amount, so we need to know the price
price = pool_prices.get(order.pool_address)
if price is None:
return token_balance > 0 # we don't know the price so we allow any nonzero amount to be sufficient
pool = await get_pool(order.pool_address)
inverted = order.order.tokenIn != pool['base']
minimum = dec(order.min_fill_amount)*price if inverted else dec(order.min_fill_amount)/price
log.debug(f'order minimum amount is {order.min_fill_amount} '+ ("input" if order.amount_is_input else f"output @ {price} = {minimum} ")+f'< {token_balance} balance')
return token_balance > minimum
class BalanceTrigger (Trigger):
by_vault_token: dict[tuple[str,str],set['BalanceTrigger']] = defaultdict(set)
@staticmethod
async def create(tk: TrancheKey):
value = await has_funds(tk)
return BalanceTrigger(tk, value)
def __init__(self, tk: TrancheKey, value: bool):
super().__init__(0, tk, value)
self.order = Order.of(self.tk)
self.vault_token = self.tk.vault, self.order.status.order.tokenIn
log.debug(f'adding balanc trigger {id(self)}')
BalanceTrigger.by_vault_token[self.vault_token].add(self)
async def update(self, balance):
self.value = await input_amount_is_sufficient(self.order, balance)
def remove(self):
log.debug(f'removing balanc trigger {id(self)}')
try:
BalanceTrigger.by_vault_token[self.vault_token].remove(self)
except KeyError:
pass
class TimeTrigger (Trigger):
all = SortedList(key=lambda t: (t.time, 0 if t.is_start else 1)) # start before end even if the same time
@staticmethod
def create(is_start: bool, tk: TrancheKey, time: int, time_now: int = None):
if is_start and time == DISTANT_PAST or not is_start and time == DISTANT_FUTURE:
return None
if time_now is None:
time_now = current_clock.get().timestamp
return TimeTrigger(is_start, tk, time, time_now)
def __init__(self, is_start: bool, tk: TrancheKey, time: int, time_now: int):
triggered = time_now >= time
super().__init__(1 if is_start else 2, tk, triggered is is_start)
self.is_start = is_start
self._time = time
self.active = not triggered
if self.active:
TimeTrigger.all.add(self)
@property
def time(self):
return self._time
@time.setter
def time(self, time: int):
self.set_time(time, current_clock.get().timestamp)
def set_time(self, time: int, time_now: int):
self._time = time
self.active = (time_now > time) is self.is_start
TimeTrigger.all.remove(self)
TimeTrigger.all.add(self)
def update(self):
# called when our self.time has been reached
self.value = self.is_start
self.active = False
# we are popped off the stack by update_all()
def remove(self):
if self.active:
TimeTrigger.all.remove(self)
self.active = False
@staticmethod
def update_all(time):
while TimeTrigger.all and TimeTrigger.all[0].time <= time:
# todo this doesnt work across reorgs. we need to keep a BlockState cursor of the last time handled,
# then activate any time triggers from that past time through the present. time triggers may only
# be popped off the stack after their times are older than the latest finalized block
# todo what if an order is placed on a reorg'd branch but never hits main branch? we have triggers going
# for a nonexistent order!
t = TimeTrigger.all.pop(0)
t.update()
class PriceLineTrigger (Trigger):
by_pool: dict[str,set['PriceLineTrigger']] = defaultdict(set)
@staticmethod
async def create(tk: TrancheKey, line: Line, is_min: bool, is_barrier: bool):
if line.intercept == 0 and line.slope == 0:
return None # no constraint (deactivated)
pool = await get_pool(Order.of(tk).pool_address)
await ensure_pool_price(pool)
price_now = pool_prices[pool['address']]
return PriceLineTrigger(tk, line, is_min, is_barrier, price_now)
def __init__(self, tk: TrancheKey, line: Line, is_min: bool, is_barrier: bool, price_now: dec):
if is_barrier:
log.warning('Barriers not supported')
price_above = price_now > line.intercept + line.slope * current_clock.get().timestamp
super().__init__(3 if is_min else 4, tk, is_min is price_above)
self.line = line
self.is_min = is_min
self.is_barrier = is_barrier
self.pool_address = Order.of(tk).pool_address
self.index: Optional[int] = None
PriceLineTrigger.by_pool[self.pool_address].add(self)
# lines that need evaluating add their data to these arrays, which are then sent to SIMD for evaluation. each
# array must always have the same size as the others.
y = []
m = []
b = []
triggers = [] # 1-for-1 with line_data
triggers_set = set()
@staticmethod
def clear_data():
PriceLineTrigger.y.clear()
PriceLineTrigger.m.clear()
PriceLineTrigger.b.clear()
PriceLineTrigger.triggers.clear()
PriceLineTrigger.triggers_set.clear()
def update(self, price: float):
if self not in PriceLineTrigger.triggers_set:
self.index = len(PriceLineTrigger.y)
PriceLineTrigger.y.append(price)
PriceLineTrigger.m.append(self.line.slope)
PriceLineTrigger.b.append(self.line.intercept)
PriceLineTrigger.triggers.append(self)
PriceLineTrigger.triggers_set.add(self)
else:
# update an existing equation's price
PriceLineTrigger.y[self.index] = price
@staticmethod
def end_updates(time: int):
# here we use numpy to compute all dirty lines using SIMD
y, m, b = map(np.array, (PriceLineTrigger.y, PriceLineTrigger.m, PriceLineTrigger.b))
line_value = m * time + b
price_diff = y - line_value
for t, pd in zip(PriceLineTrigger.triggers, price_diff):
t.handle_result(pd)
def handle_result(self, price_diff: float):
value = self.is_min and price_diff > 0 or not self.is_min and price_diff < 0
if not self.is_barrier or value: # barriers that are False do not update their values to False
self.value = value
def remove(self):
PriceLineTrigger.by_pool[self.pool_address].remove(self)
async def activate_orders():
log.debug('activating orders')
# this is a state init callback, called only once after the state has been loaded from the db or created fresh
keys = list(Order.open_orders)
orders = [Order.of(key) for key in keys]
for order in orders:
# setup triggers
await activate_order(order) # too many to really parallelize, and it's startup anyway
log.debug(f'activated {len(keys)} orders')
async def activate_order(order: Order):
"""
Call this to enable triggers on an order which is already in the state.
"""
address = pool_address(order.status.order)
pool = await get_pool(address)
await ensure_pool_price(pool)
triggers = await OrderTriggers.create(order)
if triggers.closed:
log.debug(f'order {order.key} was immediately closed')
close_order_and_disable_triggers(order, SwapOrderState.Filled if order.remaining == 0 or order.remaining <= order.min_fill_amount else SwapOrderState.Expired)
class TrancheState (Enum):
Early = auto() # first time trigger hasnt happened yet
Active = auto() # we are inside the time window and checking prices
Filled = auto() # tranche has no more available amount
Expired = auto() # time deadline has past and this tranche cannot be filled
Error = auto() # the tranche was slashed and killed due to reverts during execute()
class TrancheTrigger:
@staticmethod
async def create(order: Order, tk: TrancheKey) -> 'TrancheTrigger':
time = current_clock.get().timestamp
tranche = order.order.tranches[tk.tranche_index]
ts = order.status.trancheStatus[tk.tranche_index]
balance_trigger = await BalanceTrigger.create(tk)
activation_trigger = TimeTrigger.create(True, tk, ts.activationTime, time)
expiration_trigger = TimeTrigger.create(False, tk, ts.endTime, time)
if tranche.marketOrder:
min_trigger = max_trigger = None
else:
min_trigger, max_trigger = await asyncio.gather(
PriceLineTrigger.create(tk, tranche.minLine, True, tranche.minIsBarrier),
PriceLineTrigger.create(tk, tranche.maxLine, True, tranche.maxIsBarrier))
return TrancheTrigger(order, tk, balance_trigger, activation_trigger, expiration_trigger, min_trigger, max_trigger)
def __init__(self, order: Order, tk: TrancheKey,
balance_trigger: BalanceTrigger,
activation_trigger: Optional[TimeTrigger],
expiration_trigger: Optional[TimeTrigger],
min_trigger: Optional[PriceLineTrigger],
max_trigger: Optional[PriceLineTrigger],
):
assert order.key.vault == tk.vault and order.key.order_index == tk.order_index
tranche = order.order.tranches[tk.tranche_index]
self.order = order
self.tk = tk
self.balance_trigger = balance_trigger
self.activation_trigger = activation_trigger
self.expiration_trigger = expiration_trigger
self.min_trigger = min_trigger
self.max_trigger = max_trigger
self.slippage = tranche.minLine.intercept if tranche.marketOrder else 0
self.slash_count = 0
tranche_remaining = tranche.fraction_of(order.amount) - order.tranche_filled(self.tk.tranche_index)
self.status = \
TrancheState.Filled if tranche_remaining == 0 or tranche_remaining < self.order.min_fill_amount else \
TrancheState.Expired if self.expiration_trigger is not None and not self.expiration_trigger else \
TrancheState.Early if self.activation_trigger is None and not self.activation_trigger else \
TrancheState.Active
_dirty.add(tk)
log.debug(f'Tranche {tk} initial status {self.status} {self}')
def fill(self, _amount_in, _amount_out ):
remaining = self.order.tranche_remaining(self.tk.tranche_index)
filled = remaining == 0 or remaining < self.order.min_fill_amount
if filled:
order_log.debug(f'tranche filled {self.tk}')
self.status = TrancheState.Filled
self.disable()
else:
order_log.debug(f'tranche part-filled {self.tk} in:{_amount_in} out:{_amount_out} remaining:{remaining}')
self.slash_count = 0 # reset slash count
return filled
def expire(self):
order_log.debug(f'tranche expired {self.tk}')
self.status = TrancheState.Expired
self.disable()
def kill(self):
order_log.warning(f'tranche KILLED {self.tk}')
self.status = TrancheState.Error
self.disable()
def slash(self):
# slash() is called when an execute() transaction on this tranche reverts without a recognized reason.
self.slash_count += 1
log.debug(f'slashed tranche x{self.slash_count} {self.tk}')
if self.slash_count >= config.slash_kill_count:
self.kill()
else:
delay = round(config.slash_delay_base * config.slash_delay_mul ** (self.slash_count-1))
self.deactivate(timestamp()+delay)
def deactivate(self, until):
# Temporarily deactivate the tranche due to a rate limit. Use disable() to permanently halt the trigger.
log.debug(f'deactivating tranche {self.tk} until {from_timestamp(until)}')
if self.activation_trigger is None:
self.activation_trigger = TimeTrigger.create(True, self.tk, until)
else:
self.activation_trigger.time = until
def disable(self):
# permanently stop this trigger and deconstruct
self.balance_trigger.remove()
if self.activation_trigger is not None:
self.activation_trigger.remove()
if self.expiration_trigger is not None:
self.expiration_trigger.remove()
if self.min_trigger is not None:
self.min_trigger.remove()
if self.max_trigger is not None:
self.max_trigger.remove()
try:
del _trigger_state[self.tk]
except KeyError:
pass
try:
_dirty.remove(self.tk)
except KeyError:
pass
try:
del active_tranches[self.tk]
except KeyError:
pass
@property
def closed(self):
return self.status in (TrancheState.Filled, TrancheState.Expired, TrancheState.Error)
@property
def open(self):
return not self.closed
def __str__(self):
trigs = []
if self.balance_trigger is not None:
trigs.append(f'balance {self.balance_trigger.value}')
if self.activation_trigger is not None:
trigs.append(f'activation {self.activation_trigger.value}')
if self.expiration_trigger is not None:
trigs.append(f'expiration {self.expiration_trigger.value}')
if self.min_trigger is not None:
trigs.append(f'min line {self.min_trigger.value}')
if self.max_trigger is not None:
trigs.append(f'max line {self.max_trigger.value}')
return f'TrancheTrigger[{",".join(str(t) for t in trigs)}]'

View File

@@ -3,7 +3,6 @@ import logging
from datetime import datetime
from typing import Optional
from sqlalchemy.exc import NoResultFound
from web3.exceptions import ContractLogicError
from web3.types import EventData
@@ -11,12 +10,11 @@ from dexorder import dec, ADDRESS_0, from_timestamp, db
from dexorder.addrmeta import address_metadata
from dexorder.base.chain import current_chain
from dexorder.base.orderlib import Exchange
from dexorder.blocks import get_block_timestamp
from dexorder.blockstate import BlockDict
from dexorder.blockstate.blockdata import K, V
from dexorder.blocks import get_block_timestamp
from dexorder.database.model import Pool
from dexorder.database.model.pool import OldPoolDict
from dexorder.metadata import is_generating_metadata
from dexorder.tokens import get_token
from dexorder.uniswap import UniswapV3Pool, uniswapV3_pool_address

View File

@@ -26,7 +26,16 @@ class BlockProgressor(metaclass=ABCMeta):
# items are (callback, event, log_filter). The callback is invoked with web3 EventData for every detected event
self.events:list[tuple[Callable[[EventData],Maywaitable[None]],ContractEvents,dict]] = []
# these callbacks are invoked after every block and also every second if there wasnt a block
self.postprocess_cbs:list[Callable[[],Maywaitable[None]]] = []
self.callbacks:list[tuple[Callable[[],Maywaitable[None]],bool]] = []
self.combined = [] # a mix of both event handlers and callbacks
def add_callback(self, callback: Callable[[], Maywaitable[None]], trigger_on_timer=True):
"""
If trigger_on_timer is True, then the callback is also invoked on a regular timer if there is a lull in blocks.
"""
item = (callback, trigger_on_timer)
self.callbacks.append(item)
self.combined.append(item)
def add_event_trigger(self,
# callback takes either a single event if multi=False, or if multi=True then a list of all events in the processing range
@@ -50,7 +59,9 @@ class BlockProgressor(metaclass=ABCMeta):
for e in events:
await maywait(func(e))
cb = callback if event is None or multi else functools.partial(_map, callback)
self.events.append((cb, event, log_filter))
item = (cb, event, log_filter)
self.events.append(item)
self.combined.append(item)
@abstractmethod
def run(self):
@@ -61,24 +72,29 @@ class BlockProgressor(metaclass=ABCMeta):
if w3 is None:
w3 = current_w3.get()
batches = []
for callback, event, log_filter in self.events:
if log_filter is None:
batches.append((None, callback, event, None))
for entry in self.combined:
if len(entry) == 2:
# plain callback
callback, on_timer = entry
batches.append((None, callback, None, None))
else:
lf = dict(log_filter)
lf['fromBlock'] = from_height
lf['toBlock'] = to_height
get_logs = w3.eth.get_logs(lf)
if not config.parallel_logevent_queries:
get_logs = await get_logs
batches.append((get_logs, callback, event, lf))
for callback in self.postprocess_cbs:
batches.append((None, callback, None, None))
# event callback
callback, event, log_filter = entry
if log_filter is None:
batches.append((None, callback, event, None))
else:
lf = dict(log_filter)
lf['fromBlock'] = from_height
lf['toBlock'] = to_height
get_logs = w3.eth.get_logs(lf)
if not config.parallel_logevent_queries:
get_logs = await get_logs
batches.append((get_logs, callback, event, lf))
return batches
@staticmethod
async def invoke_callbacks(batches, chain=None):
async def invoke_callback_batches(batches, chain=None):
if chain is None:
chain = current_chain.get()
# logevent callbacks

View File

@@ -18,7 +18,7 @@ from dexorder.blockstate.branch import Branch
from dexorder.blockstate.diff import DiffEntryItem
from dexorder.blockstate.fork import current_fork, Fork
from dexorder.progressor import BlockProgressor
from dexorder.transaction import create_and_send_transactions
from dexorder.transactions import create_and_send_transactions
from dexorder.util import hexstr, hexbytes
from dexorder.util.async_util import maywait, Maywaitable
from dexorder.util.shutdown import fatal
@@ -270,31 +270,34 @@ class BlockStateRunner(BlockProgressor):
block = await get_block(blockhash)
current_block.set(block)
bloom = BloomFilter(int.from_bytes(hexbytes(block.data['logsBloom'])))
for callback, event, log_filter in self.events:
if log_filter is None:
batches.append((None, callback, event, None))
for item in self.combined:
if len(item) == 2:
callback, on_timer = item
batches.append((None, callback, None, None))
else:
lf = dict(log_filter)
lf['blockHash'] = hexstr(block.hash)
has_logs = any(bytes(hexbytes(topic)) in bloom for topic in lf['topics'])
# log.debug(f'has {event.__class__.__name__}? {has_logs}')
if not has_logs:
get_logs = None
callback, event, log_filter = item
if log_filter is None:
batches.append((None, callback, event, None))
else:
# log.debug(f'has {event.__class__.__name__}')
get_logs = w3.eth.get_logs(lf)
if not config.parallel_logevent_queries:
get_logs = await get_logs
batches.append((get_logs, callback, event, log_filter))
for callback in self.postprocess_cbs:
batches.append((None, callback, None, None))
lf = dict(log_filter)
lf['blockHash'] = hexstr(block.hash)
has_logs = any(bytes(hexbytes(topic)) in bloom for topic in lf['topics'])
# log.debug(f'has {event.__class__.__name__}? {has_logs}')
if not has_logs:
get_logs = None
else:
# log.debug(f'has {event.__class__.__name__}')
get_logs = w3.eth.get_logs(lf)
if not config.parallel_logevent_queries:
get_logs = await get_logs
batches.append((get_logs, callback, event, log_filter))
# set up for callbacks
current_pub.set(lambda room, evnt, *args: pubs.append((room, evnt, args))) # used by handle_vault_created
if not self.state_initialized:
await self.do_state_init_cbs()
# log.debug(f'invoking callbacks with fork {current_fork.get()}')
await self.invoke_callbacks(batches)
await self.invoke_callback_batches(batches)
# todo
# IMPORTANT! check for a reorg and generate a reorg diff list. the diff list we need is the union of the set of keys touched by either
@@ -358,9 +361,10 @@ class BlockStateRunner(BlockProgressor):
session = db.session
session.begin()
try:
for callback in self.postprocess_cbs:
# noinspection PyCallingNonCallable
await maywait(callback())
for callback, on_timer in self.callbacks:
if on_timer:
# noinspection PyCallingNonCallable
await maywait(callback())
except BaseException:
session.rollback()
raise

View File

@@ -2,7 +2,7 @@ import logging
from typing import Optional
from eth_abi.exceptions import InsufficientDataBytes
from web3.exceptions import ContractLogicError, BadFunctionCallOutput
from web3.exceptions import BadFunctionCallOutput
from dexorder import ADDRESS_0, config, db
from dexorder.addrmeta import address_metadata
@@ -11,7 +11,6 @@ from dexorder.contract import ERC20, ContractProxy, CONTRACT_ERRORS
from dexorder.database.model import Token
from dexorder.database.model.token import OldTokenDict
from dexorder.metadata import get_metadata
from dexorder.util import hexstr
log = logging.getLogger(__name__)

View File

@@ -1,21 +1,22 @@
import logging
from abc import abstractmethod
from typing import Optional
from dataclasses import dataclass
from typing import Union, Optional
from uuid import uuid4
from sqlalchemy import select
from web3.exceptions import TransactionNotFound
from web3.exceptions import TransactionNotFound, ContractPanicError, ContractLogicError
from dexorder import db, current_w3, Account
from dexorder.base import TransactionReceiptDict
from dexorder.base import TransactionReceiptDict, TransactionRequest, transaction_request_registry
from dexorder.base.chain import current_chain
from dexorder.base.order import TransactionRequest
from dexorder.base.order import TrancheKey, OrderKey
from dexorder.base.orderlib import PriceProof
from dexorder.blockstate import BlockDict
from dexorder.blockstate.diff import DiffEntryItem
from dexorder.blockstate.fork import current_fork, Fork
from dexorder.contract.contract_proxy import ContractTransaction
from dexorder.database.model.transaction import TransactionJob, TransactionJobState
from dexorder.util.shutdown import fatal
log = logging.getLogger(__name__)
@@ -38,7 +39,47 @@ class TransactionHandler:
async def complete_transaction(self, job: TransactionJob) -> None: ...
@dataclass
class TrancheExecutionRequest (TransactionRequest):
TYPE = 'te'
# type='te' for tranche execution
vault: str
order_index: int
tranche_index: int
price_proof: Union[None,dict,tuple[int]]
def __init__(self, vault: str, order_index: int, tranche_index: int, price_proof: Union[None,dict,tuple[int]], **_):
super().__init__(TrancheExecutionRequest.TYPE)
self.vault = vault
self.order_index = order_index
self.tranche_index = tranche_index
self.price_proof = price_proof
@property
def order_key(self):
return OrderKey(self.vault, self.order_index)
@property
def tranche_key(self):
return TrancheKey(self.vault, self.order_index, self.tranche_index)
# Must register the class for deserialization
transaction_request_registry[TrancheExecutionRequest.TYPE] = TrancheExecutionRequest
def new_tranche_execution_request(tk: TrancheKey, proof: Optional[PriceProof]=None) -> TrancheExecutionRequest:
if proof is None:
proof = PriceProof(0)
return TrancheExecutionRequest(tk.vault, tk.order_index, tk.tranche_index, proof.dump())
def submit_transaction_request(tr: TransactionRequest):
"""
Once a transaction request has been submitted, it is this module's responsibility to see that it gets mined, at
which point `tr.complete_transaction()` is called with the transaction receipt.
The building of a transaction can also fail,
"""
job = TransactionJob(id=uuid4(), chain=current_chain.get(), height=current_fork.get().height,
state=TransactionJobState.Requested, request=tr)
db.session.add(job)
@@ -58,37 +99,46 @@ async def create_and_send_transactions():
# todo remove bad request?
log.warning('ignoring transaction request with bad type '
f'"{job.request.type}": ' + ",".join(TransactionHandler.instances.keys()))
else:
return
try:
ctx: ContractTransaction = await handler.build_transaction(job.id, job.request)
if ctx is None:
log.warning(f'unable to send transaction for job {job.id}')
return
w3 = current_w3.get()
account = Account.get_named(handler.tag)
if account is None:
account = Account.get()
if account is None:
log.error(f'No account available for transaction request type "{handler.tag}"')
continue
await ctx.sign(account)
job.state = TransactionJobState.Signed
job.tx_id = ctx.id_bytes
job.tx_data = ctx.data
except (ContractPanicError, ContractLogicError):
# these errors can be thrown immediately when the tx is tested for gas
log.warning(f'failed to build transaction request for {job.request.__class__.__name__} {job.id}')
job.state = TransactionJobState.Error
db.session.add(job)
await handler.complete_transaction(job)
return
except Exception as x:
log.warning(f'unable to send transaction for job {job.id}', exc_info=x)
return
w3 = current_w3.get()
account = Account.get_named(handler.tag)
if account is None:
account = Account.get()
if account is None:
log.error(f'No account available for transaction request type "{handler.tag}"')
continue
await ctx.sign(account)
job.state = TransactionJobState.Signed
job.tx_id = ctx.id_bytes
job.tx_data = ctx.data
db.session.add(job)
log.info(f'servicing transaction request {job.request.__class__.__name__} {job.id} with tx {ctx.id}')
try:
sent = await w3.eth.send_raw_transaction(job.tx_data)
except:
log.exception(f'Failure sending transaction for job {job.id}')
# todo pager
# todo send state unknown!
else:
assert sent == job.tx_id
job.state = TransactionJobState.Sent
db.session.add(job)
log.info(f'servicing transaction request {job.request.__class__.__name__} {job.id} with tx {ctx.id}')
try:
sent = await w3.eth.send_raw_transaction(job.tx_data)
except:
log.exception(f'Failure sending transaction for job {job.id}')
# todo pager
# todo send state unknown!
else:
assert sent == job.tx_id
job.state = TransactionJobState.Sent
db.session.add(job)
async def handle_transaction_receipts():
log.debug('handle_transaction_receipts')
w3 = current_w3.get()
for job in db.session.query(TransactionJob).filter(
TransactionJob.chain == current_chain.get(),
@@ -116,6 +166,7 @@ async def handle_transaction_receipts():
def finalize_transactions(_fork: Fork, diffs: list[DiffEntryItem]):
# noinspection PyTypeChecker
open_jobs = db.session.scalars(select(TransactionJob).where(
TransactionJob.chain == current_chain.get(),
TransactionJob.state == TransactionJobState.Sent

View File

@@ -128,5 +128,5 @@ class BlockWalker (BlockProgressor):
fork = Fork([branch])
current_fork.set(fork)
batches = await self.get_backfill_batches(from_height, to_height, w3=w3)
await self.invoke_callbacks(batches, chain)
await self.invoke_callback_batches(batches, chain)
log.info(f'completed through block {to_height}')