diff --git a/src/dexorder/base/chain.py b/src/dexorder/base/chain.py index df8580f..ee2948d 100644 --- a/src/dexorder/base/chain.py +++ b/src/dexorder/base/chain.py @@ -1,4 +1,5 @@ import math +from abc import ABC, abstractmethod # noinspection PyPackageRequirements from contextvars import ContextVar @@ -54,7 +55,11 @@ Alpha = Blockchain(1337, 'Dexorder Alpha', 3, batch_size=1000) current_chain = ContextVar[Blockchain]('current_chain', default=Mock) -class BlockClock: +class Clock: + timestamp: int + + +class BlockClock (Clock): def __init__(self, block_timestamp=0, adjustment=None): self.block_timestamp = block_timestamp if block_timestamp != 0 else dexorder.timestamp() self.adjustment = 0 if block_timestamp == 0 \ @@ -65,5 +70,12 @@ class BlockClock: def timestamp(self): return math.ceil(dexorder.timestamp() + self.adjustment) -current_clock = ContextVar[BlockClock]('clock') # current estimated timestamp of the blockchain. will be different than current_block.get().timestamp when evaluating time triggers in-between blocks or for historical playbacks + +class SystemClock (Clock): + @property + def timestamp(self): + return dexorder.timestamp() + + +current_clock = ContextVar[BlockClock]('clock', default=SystemClock()) # current estimated timestamp of the blockchain. will be different than current_block.get().timestamp when evaluating time triggers in-between blocks or for historical playbacks diff --git a/src/dexorder/bin/backfill_ohlc.py b/src/dexorder/bin/backfill_ohlc.py index d52bb63..c918d8a 100644 --- a/src/dexorder/bin/backfill_ohlc.py +++ b/src/dexorder/bin/backfill_ohlc.py @@ -48,13 +48,13 @@ async def main(): db.connect() db_state = DbState(BlockData.by_opt('db')) with db.session: - state = db_state.load() + state = await db_state.load() if state is None: log.info('no state in database') else: if redis_state: - await redis_state.init(state) - log.info(f'loaded state from db for root block {state.root_block}') + await redis_state.init(state, state.root_fork) + log.info(f'loaded state from db for root block {state.root_branch.height}') runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None, timer_period=0) runner.add_event_trigger(handle_uniswap_swaps, get_contract_event('IUniswapV3PoolEvents', 'Swap'), multi=True) diff --git a/src/dexorder/bin/executable.py b/src/dexorder/bin/executable.py index a1afe30..14d4763 100644 --- a/src/dexorder/bin/executable.py +++ b/src/dexorder/bin/executable.py @@ -19,7 +19,7 @@ ignorable_exceptions = [CancelledError] log = logging.getLogger(__name__) -async def _shutdown_coro(_sig, loop, extra_shutdown): +async def _shutdown_coro(_sig, _loop, extra_shutdown): log.info('shutting down') if extra_shutdown is not None: extra_shutdown() @@ -28,7 +28,6 @@ async def _shutdown_coro(_sig, loop, extra_shutdown): for task in tasks: task.cancel() exceptions = await asyncio.gather(*tasks, return_exceptions=True) - loop.stop() for x in exceptions: if x is not None and x.__class__ not in ignorable_exceptions: print_exception(x) @@ -57,14 +56,19 @@ def execute(main:Coroutine, shutdown=None, *, parse_logging=True, parse_args=Tru for s in signals: loop.add_signal_handler(s, lambda sig=s: asyncio.create_task(_shutdown_coro(sig, loop, shutdown), name=f'{s.name} handler')) task = loop.create_task(main, name='main') - loop.run_until_complete(task) - x = task.exception() - if x is not None: - if x.__class__ not in ignorable_exceptions: - print_exception(x) - for t in asyncio.all_tasks(): + try: + loop.run_until_complete(task) + except CancelledError: + pass + except Exception as x: + print_exception(x) + try: + remaining_tasks = asyncio.all_tasks() + except RuntimeError: + pass + else: + for t in remaining_tasks: t.cancel() - # else: - # loop.run_forever() + loop.run_until_complete(asyncio.gather(*remaining_tasks)) loop.stop() loop.close() diff --git a/src/dexorder/bin/main.py b/src/dexorder/bin/main.py index 5c53dba..294420f 100644 --- a/src/dexorder/bin/main.py +++ b/src/dexorder/bin/main.py @@ -7,14 +7,16 @@ from dexorder.bin.executable import execute from dexorder.blockstate import current_blockstate from dexorder.blockstate.blockdata import BlockData from dexorder.blockstate.db_state import DbState +from dexorder.blockstate.fork import current_fork from dexorder.contract import get_contract_event from dexorder.contract.dexorder import get_factory_contract, get_dexorder_contract -from dexorder.event_handler import init_order_triggers, init, dump_log, handle_vault_created, handle_order_placed, \ +from dexorder.event_handler import init, dump_log, handle_vault_created, handle_order_placed, \ handle_transfer, handle_swap_filled, handle_order_canceled, handle_order_cancel_all, handle_dexorderexecutions, \ activate_time_triggers, activate_price_triggers, \ process_active_tranches, process_execution_requests, check_ohlc_rollover, handle_uniswap_swaps from dexorder.memcache import memcache from dexorder.memcache.memcache_state import RedisState, publish_all +from dexorder.order.triggers import activate_orders from dexorder.runner import BlockStateRunner from dexorder.transaction import handle_transaction_receipts, finalize_transactions @@ -86,6 +88,8 @@ async def main(): await redis_state.clear() else: current_blockstate.set(state) + current_fork.set(state.root_fork) + await activate_orders() # activate orders first before pushing data to redis if redis_state: await redis_state.init(state, state.root_fork) log.info(f'loaded state from db for root block {state.root_branch.height}') @@ -95,7 +99,6 @@ async def main(): # if config.ohlc_dir: # runner.on_promotion.append(ohlc_save) if db: - runner.on_state_init.append(init_order_triggers) # noinspection PyUnboundLocalVariable runner.on_promotion.append(db_state.finalize) if redis_state: diff --git a/src/dexorder/blockstate/blockdata.py b/src/dexorder/blockstate/blockdata.py index a0047ce..b5d8c9d 100644 --- a/src/dexorder/blockstate/blockdata.py +++ b/src/dexorder/blockstate/blockdata.py @@ -58,7 +58,7 @@ class BlockData (Generic[T]): state = current_blockstate.get() fork = current_fork.get() try: - result = state.get(fork, self.series, item) + result = state.get(fork, self.series, item, default=NARG) # force raise KeyError except KeyError: result = default if self.lazy_getitem: diff --git a/src/dexorder/blockstate/db_state.py b/src/dexorder/blockstate/db_state.py index 7b0e910..712d196 100644 --- a/src/dexorder/blockstate/db_state.py +++ b/src/dexorder/blockstate/db_state.py @@ -94,7 +94,7 @@ class DbState(SeriesCollection): for row in db.session.query(SeriesDict).where(SeriesDict.chain == chain_id, SeriesDict.series == data.series2str(series)): key = data.str2key(row.key) value = data.str2value(row.value) - # log.debug(f'load {series} {key} {value}') + log.debug(f'load {series} {key} {value}') state.set(root_fork, var.series, key, value, overwrite=True) log.debug(f'loaded db state from block {root_block}') return state diff --git a/src/dexorder/contract/__init__.py b/src/dexorder/contract/__init__.py index 14dc0a0..6c36422 100644 --- a/src/dexorder/contract/__init__.py +++ b/src/dexorder/contract/__init__.py @@ -1,11 +1,16 @@ import json import os +from eth_abi.exceptions import InsufficientDataBytes +from web3.exceptions import BadFunctionCallOutput, ContractLogicError + from .abi import abis from .contract_proxy import ContractProxy from .. import current_w3 from ..base.chain import current_chain +CONTRACT_ERRORS = (InsufficientDataBytes, ContractLogicError, BadFunctionCallOutput) + def get_contract_data(name): if name in abis: diff --git a/src/dexorder/contract/decimals.py b/src/dexorder/contract/decimals.py index 9e4cd81..774fb5f 100644 --- a/src/dexorder/contract/decimals.py +++ b/src/dexorder/contract/decimals.py @@ -1,10 +1,7 @@ import logging -from eth_abi.exceptions import InsufficientDataBytes -from web3.exceptions import ContractLogicError, BadFunctionCallOutput - from dexorder import db -from dexorder.contract import ERC20 +from dexorder.contract import ERC20, CONTRACT_ERRORS log = logging.getLogger(__name__) @@ -17,7 +14,7 @@ async def token_decimals(addr): # noinspection PyBroadException try: decimals = await ERC20(addr).decimals() - except (InsufficientDataBytes, ContractLogicError, BadFunctionCallOutput): + except CONTRACT_ERRORS: log.warning(f'token {addr} has no decimals()') decimals = 0 except Exception: diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index 988f9c9..87d9de6 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -6,21 +6,23 @@ from web3.types import EventData from dexorder import current_pub, db, from_timestamp, minutely from dexorder.base.chain import current_chain, current_clock -from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, OrderKey +from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, \ + OrderKey +from dexorder.base.orderlib import SwapOrderState from dexorder.blocks import get_block_timestamp from dexorder.blockstate.fork import current_fork -from dexorder.ohlc import ohlcs, recent_ohlcs -from dexorder.transaction import submit_transaction_request -from dexorder.pools import new_pool_prices, pool_prices, get_uniswap_data -from dexorder.contract.dexorder import vault_address, VaultContract from dexorder.contract import ERC20 -from dexorder.vault_blockdata import vault_owners, vault_balances, adjust_balance +from dexorder.contract.dexorder import vault_address, VaultContract from dexorder.database.model.transaction import TransactionJob -from dexorder.base.orderlib import SwapOrderState +from dexorder.ohlc import ohlcs, recent_ohlcs from dexorder.order.orderstate import Order from dexorder.order.triggers import OrderTriggers, price_triggers, time_triggers, \ - unconstrained_price_triggers, execution_requests, inflight_execution_requests, TrancheStatus, active_tranches, new_price_triggers, activate_order + unconstrained_price_triggers, execution_requests, inflight_execution_requests, TrancheStatus, active_tranches, \ + new_price_triggers, activate_order +from dexorder.pools import new_pool_prices, pool_prices, get_uniswap_data +from dexorder.transaction import submit_transaction_request from dexorder.util.async_util import maywait +from dexorder.vault_blockdata import vault_owners, vault_balances, adjust_balance, MAX_VAULTS, verify_vault log = logging.getLogger(__name__) @@ -29,14 +31,6 @@ def dump_log(eventlog): log.debug(f'\t{eventlog}') -async def init_order_triggers(): - log.debug('activating orders') - # this is a state init callback, called only once after the state has been loaded from the db or created fresh - orders = [Order.of(key) for key in Order.open_orders] - futures = [activate_order(order) for order in orders] - await asyncio.gather(*futures, return_exceptions=True) - log.debug(f'activated {len(futures)} orders') - def init(): new_pool_prices.clear() new_price_triggers.clear() @@ -49,22 +43,18 @@ async def handle_order_placed(event: EventData): start_index = int(event['args']['startOrderIndex']) num_orders = int(event['args']['numOrders']) log.debug(f'DexorderPlaced {addr} {start_index} {num_orders}') - if addr not in vault_owners: - log.warning(f'block {current_fork.get().head} order from unknown vault {addr}') # todo insert (short) block hash into all logs - # return todo always discard rogues - # noinspection PyBroadException - try: - vault_owners[addr] = await VaultContract(addr).owner() - except Exception: - log.warning(f'vault owner for {addr} could not be found.', exc_info=True) - return - vault = VaultContract(addr) + if not await verify_vault(addr): + log.warning(f'Discarding order from rogue vault {addr}.') + return + contract = None for index in range(start_index, start_index+num_orders): - key = OrderKey(vault.address, index) + key = OrderKey(addr, index) if key not in Order.instances: - obj = await vault.swapOrderStatus(index) + if contract is None: + contract = VaultContract(addr) + obj = await contract.swapOrderStatus(index) log.debug(f'raw order status {obj}') - order = Order.create(vault.address, index, obj) + order = Order.create(addr, index, obj) await activate_order(order) log.debug(f'new order {order} {order.order}') @@ -160,25 +150,23 @@ async def handle_uniswap_swap(swap: EventData): log.debug(f'pool {addr} {minutely(time)} {price}') -def handle_vault_created(created: EventData): +async def handle_vault_created(created: EventData): try: owner = created['args']['owner'] num = created['args']['num'] except KeyError: log.debug('couldnt parse event data for VaultCreated', created) return - vault = vault_address(owner,num) - log.debug(f'VaultCreated {owner} #{num} => {vault}') - vault_owners[vault] = owner + addr = vault_address(owner, num) + vault_owners[addr] = owner + log.debug(f'VaultCreated {owner} #{num} => {addr}') vaults = [] - for num in range(256): + for num in range(MAX_VAULTS): addr = vault_address(owner, num) - # log.debug(f'v{num}? {addr}') if addr in vault_owners: vaults.append(addr) else: break - # log.debug(f'updated vaults: {vaults}') current_pub.get()(f'{current_chain.get().id}|{owner}', 'vaults', vaults) @@ -214,8 +202,8 @@ async def process_active_tranches(): if await has_funds(tk): log.info(f'execution request for {tk}') execution_requests[tk] = ExecutionRequest(height, proof) - else: - log.debug(f'underfunded tranche {tk}') + # else: + # log.debug(f'underfunded tranche {tk}') async def has_funds(tk: TrancheKey): diff --git a/src/dexorder/memcache/memcache_state.py b/src/dexorder/memcache/memcache_state.py index 7ed0b96..16d37a2 100644 --- a/src/dexorder/memcache/memcache_state.py +++ b/src/dexorder/memcache/memcache_state.py @@ -52,7 +52,8 @@ class RedisState (SeriesCollection): hsets: dict[str,dict[str,str]] = defaultdict(dict) hdels: dict[str,set[str]] = defaultdict(set) pubs: list[tuple[str,str,Any]] = [] # series, key, value => room, event, value - for diff in compress_diffs(diffs): + compressed = compress_diffs(diffs) + for diff in compressed: try: d = self.datas[diff.series] except KeyError: diff --git a/src/dexorder/order/orderstate.py b/src/dexorder/order/orderstate.py index 4b71b61..39bf3bf 100644 --- a/src/dexorder/order/orderstate.py +++ b/src/dexorder/order/orderstate.py @@ -201,7 +201,7 @@ class Order: 'o', # order message type (chain_id, k.vault, k.order_index, v.dump())) # (order_index, order_status) except KeyError: - log.warning(f'No vault owner for {k}') + log.warning(f'No vault owner for {k.vault}') return None except AttributeError: log.error(f'could not dump {v}') @@ -218,7 +218,7 @@ class Order: 'of', # order message type (chain_id, k.vault, k.order_index, v.dump())) # (order_index, order_fills) except KeyError: - log.warning(f'No vault owner for {k}') + log.warning(f'No vault owner for {k.vault}') return None @staticmethod diff --git a/src/dexorder/order/triggers.py b/src/dexorder/order/triggers.py index 350bd01..674e45f 100644 --- a/src/dexorder/order/triggers.py +++ b/src/dexorder/order/triggers.py @@ -31,6 +31,17 @@ execution_requests:BlockDict[TrancheKey, ExecutionRequest] = BlockDict('e') # g inflight_execution_requests:BlockDict[TrancheKey, int] = BlockDict('ei') # value is block height when the request was sent +async def activate_orders(): + log.debug('activating orders') + # this is a state init callback, called only once after the state has been loaded from the db or created fresh + keys = list(Order.open_orders) + orders = [Order.of(key) for key in keys] + for order in orders: + # setup triggers + await activate_order(order) # too many to really parallelize, and it's startup anyway + log.debug(f'activated {len(keys)} orders') + + async def activate_order(order: Order): """ Call this to enable triggers on an order which is already in the state. @@ -42,6 +53,8 @@ async def activate_order(order: Order): if triggers.closed: log.debug(f'order {order.key} was immediately closed') close_order_and_disable_triggers(order, SwapOrderState.Filled if order.remaining == 0 or order.remaining <= order.min_fill_amount else SwapOrderState.Expired) + else: + order.status def intersect_ranges( a_low, a_high, b_low, b_high): diff --git a/src/dexorder/tokens.py b/src/dexorder/tokens.py index cb9e1e5..7a3635d 100644 --- a/src/dexorder/tokens.py +++ b/src/dexorder/tokens.py @@ -7,7 +7,7 @@ from web3.exceptions import ContractLogicError, BadFunctionCallOutput from dexorder import ADDRESS_0, config from dexorder.addrmeta import address_metadata from dexorder.base.chain import current_chain -from dexorder.contract import ERC20, ContractProxy +from dexorder.contract import ERC20, ContractProxy, CONTRACT_ERRORS from dexorder.database.model.token import OldTokenDict from dexorder.metadata import get_metadata @@ -39,7 +39,7 @@ async def load_token(address: str) -> Optional[OldTokenDict]: if end == -1: end = 32 result = rb[:end].decode('utf8') - except (InsufficientDataBytes, ContractLogicError, BadFunctionCallOutput): + except CONTRACT_ERRORS: log.warning(f'token {address} has broken {func_name}()') return None return result @@ -49,7 +49,7 @@ async def load_token(address: str) -> Optional[OldTokenDict]: name_prom = get_string_or_bytes32('name') try: decimals = await dec_prom - except (InsufficientDataBytes, ContractLogicError, BadFunctionCallOutput): + except CONTRACT_ERRORS: log.warning(f'token {address} has no decimals()') decimals = 0 approved = config.metadata is None diff --git a/src/dexorder/vault_blockdata.py b/src/dexorder/vault_blockdata.py index 4815385..2c7eac2 100644 --- a/src/dexorder/vault_blockdata.py +++ b/src/dexorder/vault_blockdata.py @@ -3,7 +3,8 @@ import logging from dexorder.base.chain import current_chain from dexorder.blockstate import BlockDict -from dexorder.contract import ERC20 +from dexorder.contract import ERC20, CONTRACT_ERRORS +from dexorder.contract.dexorder import VaultContract, vault_address from dexorder.util import json log = logging.getLogger(__name__) @@ -12,6 +13,33 @@ log = logging.getLogger(__name__) # if pub is True, then event is the current series name, room is the key, and args is [value] # values of DELETE are serialized as nulls + +MAX_VAULTS = 1 # todo increase + + +async def verify_vault(addr: str, owner: str = None, num: int = None) -> bool: + if addr in vault_owners: + return True + if owner is None: + try: + owner = await VaultContract(addr).owner() + except CONTRACT_ERRORS: + log.warning(f'vault owner for {addr} could not be found.') + return False + if num is not None: + if vault_address(owner, num) == addr: + vault_owners[addr] = owner + return True + return False + # no num so scan them all + for num in range(MAX_VAULTS): + test_addr = vault_address(owner, num) + if addr == test_addr: + vault_owners[addr] = owner + return True + return False + + def pub_vault_balances(_s, k, v): chain_id = current_chain.get().id try: