activate orders before pushing loaded db state to redis; rogue vault handling

This commit is contained in:
Tim
2024-04-13 00:31:35 -04:00
parent 663e055241
commit fff136315a
14 changed files with 120 additions and 69 deletions

View File

@@ -1,4 +1,5 @@
import math
from abc import ABC, abstractmethod
# noinspection PyPackageRequirements
from contextvars import ContextVar
@@ -54,7 +55,11 @@ Alpha = Blockchain(1337, 'Dexorder Alpha', 3, batch_size=1000)
current_chain = ContextVar[Blockchain]('current_chain', default=Mock)
class BlockClock:
class Clock:
timestamp: int
class BlockClock (Clock):
def __init__(self, block_timestamp=0, adjustment=None):
self.block_timestamp = block_timestamp if block_timestamp != 0 else dexorder.timestamp()
self.adjustment = 0 if block_timestamp == 0 \
@@ -65,5 +70,12 @@ class BlockClock:
def timestamp(self):
return math.ceil(dexorder.timestamp() + self.adjustment)
current_clock = ContextVar[BlockClock]('clock') # current estimated timestamp of the blockchain. will be different than current_block.get().timestamp when evaluating time triggers in-between blocks or for historical playbacks
class SystemClock (Clock):
@property
def timestamp(self):
return dexorder.timestamp()
current_clock = ContextVar[BlockClock]('clock', default=SystemClock()) # current estimated timestamp of the blockchain. will be different than current_block.get().timestamp when evaluating time triggers in-between blocks or for historical playbacks

View File

@@ -48,13 +48,13 @@ async def main():
db.connect()
db_state = DbState(BlockData.by_opt('db'))
with db.session:
state = db_state.load()
state = await db_state.load()
if state is None:
log.info('no state in database')
else:
if redis_state:
await redis_state.init(state)
log.info(f'loaded state from db for root block {state.root_block}')
await redis_state.init(state, state.root_fork)
log.info(f'loaded state from db for root block {state.root_branch.height}')
runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None, timer_period=0)
runner.add_event_trigger(handle_uniswap_swaps, get_contract_event('IUniswapV3PoolEvents', 'Swap'), multi=True)

View File

@@ -19,7 +19,7 @@ ignorable_exceptions = [CancelledError]
log = logging.getLogger(__name__)
async def _shutdown_coro(_sig, loop, extra_shutdown):
async def _shutdown_coro(_sig, _loop, extra_shutdown):
log.info('shutting down')
if extra_shutdown is not None:
extra_shutdown()
@@ -28,7 +28,6 @@ async def _shutdown_coro(_sig, loop, extra_shutdown):
for task in tasks:
task.cancel()
exceptions = await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
for x in exceptions:
if x is not None and x.__class__ not in ignorable_exceptions:
print_exception(x)
@@ -57,14 +56,19 @@ def execute(main:Coroutine, shutdown=None, *, parse_logging=True, parse_args=Tru
for s in signals:
loop.add_signal_handler(s, lambda sig=s: asyncio.create_task(_shutdown_coro(sig, loop, shutdown), name=f'{s.name} handler'))
task = loop.create_task(main, name='main')
loop.run_until_complete(task)
x = task.exception()
if x is not None:
if x.__class__ not in ignorable_exceptions:
print_exception(x)
for t in asyncio.all_tasks():
try:
loop.run_until_complete(task)
except CancelledError:
pass
except Exception as x:
print_exception(x)
try:
remaining_tasks = asyncio.all_tasks()
except RuntimeError:
pass
else:
for t in remaining_tasks:
t.cancel()
# else:
# loop.run_forever()
loop.run_until_complete(asyncio.gather(*remaining_tasks))
loop.stop()
loop.close()

View File

@@ -7,14 +7,16 @@ from dexorder.bin.executable import execute
from dexorder.blockstate import current_blockstate
from dexorder.blockstate.blockdata import BlockData
from dexorder.blockstate.db_state import DbState
from dexorder.blockstate.fork import current_fork
from dexorder.contract import get_contract_event
from dexorder.contract.dexorder import get_factory_contract, get_dexorder_contract
from dexorder.event_handler import init_order_triggers, init, dump_log, handle_vault_created, handle_order_placed, \
from dexorder.event_handler import init, dump_log, handle_vault_created, handle_order_placed, \
handle_transfer, handle_swap_filled, handle_order_canceled, handle_order_cancel_all, handle_dexorderexecutions, \
activate_time_triggers, activate_price_triggers, \
process_active_tranches, process_execution_requests, check_ohlc_rollover, handle_uniswap_swaps
from dexorder.memcache import memcache
from dexorder.memcache.memcache_state import RedisState, publish_all
from dexorder.order.triggers import activate_orders
from dexorder.runner import BlockStateRunner
from dexorder.transaction import handle_transaction_receipts, finalize_transactions
@@ -86,6 +88,8 @@ async def main():
await redis_state.clear()
else:
current_blockstate.set(state)
current_fork.set(state.root_fork)
await activate_orders() # activate orders first before pushing data to redis
if redis_state:
await redis_state.init(state, state.root_fork)
log.info(f'loaded state from db for root block {state.root_branch.height}')
@@ -95,7 +99,6 @@ async def main():
# if config.ohlc_dir:
# runner.on_promotion.append(ohlc_save)
if db:
runner.on_state_init.append(init_order_triggers)
# noinspection PyUnboundLocalVariable
runner.on_promotion.append(db_state.finalize)
if redis_state:

View File

@@ -58,7 +58,7 @@ class BlockData (Generic[T]):
state = current_blockstate.get()
fork = current_fork.get()
try:
result = state.get(fork, self.series, item)
result = state.get(fork, self.series, item, default=NARG) # force raise KeyError
except KeyError:
result = default
if self.lazy_getitem:

View File

@@ -94,7 +94,7 @@ class DbState(SeriesCollection):
for row in db.session.query(SeriesDict).where(SeriesDict.chain == chain_id, SeriesDict.series == data.series2str(series)):
key = data.str2key(row.key)
value = data.str2value(row.value)
# log.debug(f'load {series} {key} {value}')
log.debug(f'load {series} {key} {value}')
state.set(root_fork, var.series, key, value, overwrite=True)
log.debug(f'loaded db state from block {root_block}')
return state

View File

@@ -1,11 +1,16 @@
import json
import os
from eth_abi.exceptions import InsufficientDataBytes
from web3.exceptions import BadFunctionCallOutput, ContractLogicError
from .abi import abis
from .contract_proxy import ContractProxy
from .. import current_w3
from ..base.chain import current_chain
CONTRACT_ERRORS = (InsufficientDataBytes, ContractLogicError, BadFunctionCallOutput)
def get_contract_data(name):
if name in abis:

View File

@@ -1,10 +1,7 @@
import logging
from eth_abi.exceptions import InsufficientDataBytes
from web3.exceptions import ContractLogicError, BadFunctionCallOutput
from dexorder import db
from dexorder.contract import ERC20
from dexorder.contract import ERC20, CONTRACT_ERRORS
log = logging.getLogger(__name__)
@@ -17,7 +14,7 @@ async def token_decimals(addr):
# noinspection PyBroadException
try:
decimals = await ERC20(addr).decimals()
except (InsufficientDataBytes, ContractLogicError, BadFunctionCallOutput):
except CONTRACT_ERRORS:
log.warning(f'token {addr} has no decimals()')
decimals = 0
except Exception:

View File

@@ -6,21 +6,23 @@ from web3.types import EventData
from dexorder import current_pub, db, from_timestamp, minutely
from dexorder.base.chain import current_chain, current_clock
from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, OrderKey
from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, \
OrderKey
from dexorder.base.orderlib import SwapOrderState
from dexorder.blocks import get_block_timestamp
from dexorder.blockstate.fork import current_fork
from dexorder.ohlc import ohlcs, recent_ohlcs
from dexorder.transaction import submit_transaction_request
from dexorder.pools import new_pool_prices, pool_prices, get_uniswap_data
from dexorder.contract.dexorder import vault_address, VaultContract
from dexorder.contract import ERC20
from dexorder.vault_blockdata import vault_owners, vault_balances, adjust_balance
from dexorder.contract.dexorder import vault_address, VaultContract
from dexorder.database.model.transaction import TransactionJob
from dexorder.base.orderlib import SwapOrderState
from dexorder.ohlc import ohlcs, recent_ohlcs
from dexorder.order.orderstate import Order
from dexorder.order.triggers import OrderTriggers, price_triggers, time_triggers, \
unconstrained_price_triggers, execution_requests, inflight_execution_requests, TrancheStatus, active_tranches, new_price_triggers, activate_order
unconstrained_price_triggers, execution_requests, inflight_execution_requests, TrancheStatus, active_tranches, \
new_price_triggers, activate_order
from dexorder.pools import new_pool_prices, pool_prices, get_uniswap_data
from dexorder.transaction import submit_transaction_request
from dexorder.util.async_util import maywait
from dexorder.vault_blockdata import vault_owners, vault_balances, adjust_balance, MAX_VAULTS, verify_vault
log = logging.getLogger(__name__)
@@ -29,14 +31,6 @@ def dump_log(eventlog):
log.debug(f'\t{eventlog}')
async def init_order_triggers():
log.debug('activating orders')
# this is a state init callback, called only once after the state has been loaded from the db or created fresh
orders = [Order.of(key) for key in Order.open_orders]
futures = [activate_order(order) for order in orders]
await asyncio.gather(*futures, return_exceptions=True)
log.debug(f'activated {len(futures)} orders')
def init():
new_pool_prices.clear()
new_price_triggers.clear()
@@ -49,22 +43,18 @@ async def handle_order_placed(event: EventData):
start_index = int(event['args']['startOrderIndex'])
num_orders = int(event['args']['numOrders'])
log.debug(f'DexorderPlaced {addr} {start_index} {num_orders}')
if addr not in vault_owners:
log.warning(f'block {current_fork.get().head} order from unknown vault {addr}') # todo insert (short) block hash into all logs
# return todo always discard rogues
# noinspection PyBroadException
try:
vault_owners[addr] = await VaultContract(addr).owner()
except Exception:
log.warning(f'vault owner for {addr} could not be found.', exc_info=True)
return
vault = VaultContract(addr)
if not await verify_vault(addr):
log.warning(f'Discarding order from rogue vault {addr}.')
return
contract = None
for index in range(start_index, start_index+num_orders):
key = OrderKey(vault.address, index)
key = OrderKey(addr, index)
if key not in Order.instances:
obj = await vault.swapOrderStatus(index)
if contract is None:
contract = VaultContract(addr)
obj = await contract.swapOrderStatus(index)
log.debug(f'raw order status {obj}')
order = Order.create(vault.address, index, obj)
order = Order.create(addr, index, obj)
await activate_order(order)
log.debug(f'new order {order} {order.order}')
@@ -160,25 +150,23 @@ async def handle_uniswap_swap(swap: EventData):
log.debug(f'pool {addr} {minutely(time)} {price}')
def handle_vault_created(created: EventData):
async def handle_vault_created(created: EventData):
try:
owner = created['args']['owner']
num = created['args']['num']
except KeyError:
log.debug('couldnt parse event data for VaultCreated', created)
return
vault = vault_address(owner,num)
log.debug(f'VaultCreated {owner} #{num} => {vault}')
vault_owners[vault] = owner
addr = vault_address(owner, num)
vault_owners[addr] = owner
log.debug(f'VaultCreated {owner} #{num} => {addr}')
vaults = []
for num in range(256):
for num in range(MAX_VAULTS):
addr = vault_address(owner, num)
# log.debug(f'v{num}? {addr}')
if addr in vault_owners:
vaults.append(addr)
else:
break
# log.debug(f'updated vaults: {vaults}')
current_pub.get()(f'{current_chain.get().id}|{owner}', 'vaults', vaults)
@@ -214,8 +202,8 @@ async def process_active_tranches():
if await has_funds(tk):
log.info(f'execution request for {tk}')
execution_requests[tk] = ExecutionRequest(height, proof)
else:
log.debug(f'underfunded tranche {tk}')
# else:
# log.debug(f'underfunded tranche {tk}')
async def has_funds(tk: TrancheKey):

View File

@@ -52,7 +52,8 @@ class RedisState (SeriesCollection):
hsets: dict[str,dict[str,str]] = defaultdict(dict)
hdels: dict[str,set[str]] = defaultdict(set)
pubs: list[tuple[str,str,Any]] = [] # series, key, value => room, event, value
for diff in compress_diffs(diffs):
compressed = compress_diffs(diffs)
for diff in compressed:
try:
d = self.datas[diff.series]
except KeyError:

View File

@@ -201,7 +201,7 @@ class Order:
'o', # order message type
(chain_id, k.vault, k.order_index, v.dump())) # (order_index, order_status)
except KeyError:
log.warning(f'No vault owner for {k}')
log.warning(f'No vault owner for {k.vault}')
return None
except AttributeError:
log.error(f'could not dump {v}')
@@ -218,7 +218,7 @@ class Order:
'of', # order message type
(chain_id, k.vault, k.order_index, v.dump())) # (order_index, order_fills)
except KeyError:
log.warning(f'No vault owner for {k}')
log.warning(f'No vault owner for {k.vault}')
return None
@staticmethod

View File

@@ -31,6 +31,17 @@ execution_requests:BlockDict[TrancheKey, ExecutionRequest] = BlockDict('e') # g
inflight_execution_requests:BlockDict[TrancheKey, int] = BlockDict('ei') # value is block height when the request was sent
async def activate_orders():
log.debug('activating orders')
# this is a state init callback, called only once after the state has been loaded from the db or created fresh
keys = list(Order.open_orders)
orders = [Order.of(key) for key in keys]
for order in orders:
# setup triggers
await activate_order(order) # too many to really parallelize, and it's startup anyway
log.debug(f'activated {len(keys)} orders')
async def activate_order(order: Order):
"""
Call this to enable triggers on an order which is already in the state.
@@ -42,6 +53,8 @@ async def activate_order(order: Order):
if triggers.closed:
log.debug(f'order {order.key} was immediately closed')
close_order_and_disable_triggers(order, SwapOrderState.Filled if order.remaining == 0 or order.remaining <= order.min_fill_amount else SwapOrderState.Expired)
else:
order.status
def intersect_ranges( a_low, a_high, b_low, b_high):

View File

@@ -7,7 +7,7 @@ from web3.exceptions import ContractLogicError, BadFunctionCallOutput
from dexorder import ADDRESS_0, config
from dexorder.addrmeta import address_metadata
from dexorder.base.chain import current_chain
from dexorder.contract import ERC20, ContractProxy
from dexorder.contract import ERC20, ContractProxy, CONTRACT_ERRORS
from dexorder.database.model.token import OldTokenDict
from dexorder.metadata import get_metadata
@@ -39,7 +39,7 @@ async def load_token(address: str) -> Optional[OldTokenDict]:
if end == -1:
end = 32
result = rb[:end].decode('utf8')
except (InsufficientDataBytes, ContractLogicError, BadFunctionCallOutput):
except CONTRACT_ERRORS:
log.warning(f'token {address} has broken {func_name}()')
return None
return result
@@ -49,7 +49,7 @@ async def load_token(address: str) -> Optional[OldTokenDict]:
name_prom = get_string_or_bytes32('name')
try:
decimals = await dec_prom
except (InsufficientDataBytes, ContractLogicError, BadFunctionCallOutput):
except CONTRACT_ERRORS:
log.warning(f'token {address} has no decimals()')
decimals = 0
approved = config.metadata is None

View File

@@ -3,7 +3,8 @@ import logging
from dexorder.base.chain import current_chain
from dexorder.blockstate import BlockDict
from dexorder.contract import ERC20
from dexorder.contract import ERC20, CONTRACT_ERRORS
from dexorder.contract.dexorder import VaultContract, vault_address
from dexorder.util import json
log = logging.getLogger(__name__)
@@ -12,6 +13,33 @@ log = logging.getLogger(__name__)
# if pub is True, then event is the current series name, room is the key, and args is [value]
# values of DELETE are serialized as nulls
MAX_VAULTS = 1 # todo increase
async def verify_vault(addr: str, owner: str = None, num: int = None) -> bool:
if addr in vault_owners:
return True
if owner is None:
try:
owner = await VaultContract(addr).owner()
except CONTRACT_ERRORS:
log.warning(f'vault owner for {addr} could not be found.')
return False
if num is not None:
if vault_address(owner, num) == addr:
vault_owners[addr] = owner
return True
return False
# no num so scan them all
for num in range(MAX_VAULTS):
test_addr = vault_address(owner, num)
if addr == test_addr:
vault_owners[addr] = owner
return True
return False
def pub_vault_balances(_s, k, v):
chain_id = current_chain.get().id
try: