diff --git a/src/dexorder/base/chain.py b/src/dexorder/base/chain.py index e770b4e..b3997b6 100644 --- a/src/dexorder/base/chain.py +++ b/src/dexorder/base/chain.py @@ -1,4 +1,6 @@ +import math from contextvars import ContextVar +from datetime import datetime class Blockchain: @@ -48,3 +50,19 @@ Mock = Blockchain(31337, 'Mock', 3, batch_size=10000) Alpha = Blockchain(53261, 'Dexorder Alpha', 3, batch_size=10000) current_chain = ContextVar[Blockchain]('current_chain') + + +class BlockClock: + def __init__(self): + self.timestamp = 0 + self.adjustment = 0 + + def set(self, timestamp): + self.timestamp = timestamp + self.adjustment = timestamp - datetime.now().timestamp() + + def now(self): + return math.ceil(datetime.now().timestamp() + self.adjustment) + +current_clock = ContextVar[BlockClock]('clock') # current estimated timestamp of the blockchain. will be different than current_block.get().timestamp when evaluating time triggers in-between blocks + diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index 71473a9..5fd8024 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -6,7 +6,7 @@ from uuid import UUID from web3.types import EventData from dexorder import current_pub, db, dec -from dexorder.base.chain import current_chain +from dexorder.base.chain import current_chain, current_clock from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, OrderKey from dexorder.transaction import create_transactions, submit_transaction_request, handle_transaction_receipts, send_transactions from dexorder.pools import uniswap_price @@ -66,12 +66,14 @@ def setup_logevent_triggers(runner): runner.add_event_trigger(handle_order_cancel_all, get_contract_event('OrderLib', 'DexorderCancelAll')) runner.add_event_trigger(handle_transaction_receipts) runner.add_event_trigger(handle_dexorderexecutions, executions) - runner.add_event_trigger(activate_time_triggers) - runner.add_event_trigger(activate_price_triggers) - runner.add_event_trigger(process_active_tranches) - runner.add_event_trigger(process_execution_requests) - runner.add_event_trigger(create_transactions) - runner.add_event_trigger(send_transactions) + + # these callbacks run after the ones above on each block, plus these also run every second + runner.add_postprocess_trigger(activate_time_triggers) + runner.add_postprocess_trigger(activate_price_triggers) + runner.add_postprocess_trigger(process_active_tranches) + runner.add_postprocess_trigger(process_execution_requests) + runner.add_postprocess_trigger(create_transactions) + runner.add_postprocess_trigger(send_transactions) def dump_log(eventlog): @@ -236,8 +238,8 @@ def handle_vault_created(created: EventData): async def activate_time_triggers(): - now = current_block.get().timestamp - log.debug(f'activating time triggers') + now = current_clock.get().now() + log.debug(f'activating time triggers at {now}') # time triggers for tt in tuple(time_triggers): await maywait(tt(now)) diff --git a/src/dexorder/order/triggers.py b/src/dexorder/order/triggers.py index d47c0d9..42024a9 100644 --- a/src/dexorder/order/triggers.py +++ b/src/dexorder/order/triggers.py @@ -9,9 +9,9 @@ from dexorder.base.orderlib import SwapOrderState, PriceProof, DISTANT_FUTURE, D from dexorder.util import defaultdictk from .orderstate import Order from .. import dec +from ..base.chain import current_clock from ..base.order import OrderKey, TrancheKey, ExecutionRequest -from ..pools import ensure_pool_price, Pools, pool_decimals -from ..database.model.block import current_block +from ..pools import ensure_pool_price, Pools, pool_decimals, pool_prices from ..routing import pool_address log = logging.getLogger(__name__) @@ -38,10 +38,7 @@ async def activate_order(order: Order): address = pool_address(order.status.order) pool = await Pools.get(address) await ensure_pool_price(pool) - inverted = pool.base != order.order.tokenIn - if inverted: - assert pool.base == order.order.tokenOut - triggers = OrderTriggers(order, inverted) + triggers = OrderTriggers(order) if triggers.closed: log.debug(f'order {order.key} was immediately closed') close_order_and_disable_triggers(order, SwapOrderState.Filled if order.remaining == 0 or order.remaining <= order.min_fill_amount else SwapOrderState.Expired) @@ -58,10 +55,11 @@ async def line_passes(lc: tuple[float,float], is_min: bool, price: dec) -> bool: b, m = lc if b == 0 and m == 0: return True - limit = m * current_block.get().timestamp + b + limit = m * current_clock.get().now() + b + log.debug(f'line passes {limit} {"<" if is_min else ">"} {price}') # todo ratios # prices AT the limit get zero volume, so we only trigger on >, not >= - return is_min and price > limit or not is_min and price < limit + return is_min and limit < price or not is_min and limit > price class TrancheStatus (Enum): @@ -71,7 +69,7 @@ class TrancheStatus (Enum): Expired = auto() # time deadline has past and this tranche cannot be filled class TrancheTrigger: - def __init__(self, order: Order, tranche_key: TrancheKey, inverted: bool): + def __init__(self, order: Order, tranche_key: TrancheKey): assert order.key.vault == tranche_key.vault and order.key.order_index == tranche_key.order_index self.order = order self.tk = tranche_key @@ -93,8 +91,7 @@ class TrancheTrigger: self.min_line_constraint = (0.,0.) if tranche.marketOrder else (tranche.minIntercept, tranche.minSlope) self.max_line_constraint = (0.,0.) if tranche.marketOrder else (tranche.maxIntercept, tranche.maxSlope) self.has_line_constraint = any( a or b for a,b in (self.min_line_constraint, self.max_line_constraint)) - if not tranche.marketOrder and inverted: - self.min_line_constraint, self.max_line_constraint = self.max_line_constraint, self.min_line_constraint + self.has_sloped_line_constraint = any(m!=0 for b,m in (self.min_line_constraint, self.max_line_constraint)) self.slippage = tranche.minIntercept if tranche.marketOrder else 0 self.pool_price_multiplier = None @@ -102,8 +99,12 @@ class TrancheTrigger: if tranche_remaining == 0 or tranche_remaining < self.order.min_fill_amount: # min_fill_amount could be 0 (disabled) so we also check for the 0 case separately self._status = TrancheStatus.Filled return - timestamp = current_block.get().timestamp - self._status = TrancheStatus.Early if timestamp < self.time_constraint[0] else TrancheStatus.Expired if timestamp > self.time_constraint[1] else TrancheStatus.Pricing + timestamp = current_clock.get().now() + self._status = \ + TrancheStatus.Pricing if self.time_constraint is None else \ + TrancheStatus.Early if timestamp < self.time_constraint[0] else \ + TrancheStatus.Expired if timestamp > self.time_constraint[1] else \ + TrancheStatus.Pricing self.enable_time_trigger() if self._status == TrancheStatus.Pricing: self.enable_price_trigger() @@ -142,14 +143,14 @@ class TrancheTrigger: self.enable_price_trigger() def enable_price_trigger(self): - if self.has_line_constraint: + if self.has_line_constraint and not self.has_sloped_line_constraint: # sloped constraints must be triggered every tick, not just on pool price changes price_triggers[self.order.pool_address].add(self.price_trigger) new_price_triggers[self.order.pool_address].add(self.price_trigger) else: unconstrained_price_triggers.add(self.price_trigger) def disable_price_trigger(self): - if self.has_line_constraint: + if self.has_line_constraint and not self.has_sloped_line_constraint: price_triggers[self.order.pool_address].remove(self.price_trigger) else: unconstrained_price_triggers.remove(self.price_trigger) @@ -159,11 +160,16 @@ class TrancheTrigger: if self.closed: log.debug(f'price trigger ignored because trigger status is {self.status}') return + log.debug(f'price trigger {cur}') + if cur is None and self.has_line_constraint: + await ensure_pool_price(self.order.pool_address) + cur = pool_prices[self.order.pool_address] if cur is not None: if self.pool_price_multiplier is None: pool = await Pools.get(pool_address(self.order.order)) pool_dec = await pool_decimals(pool) self.pool_price_multiplier = dec(10) ** dec(-pool_dec) + log.debug(f'adjusted cur price from {cur} => {cur*self.pool_price_multiplier}') cur *= self.pool_price_multiplier if cur is None or not self.has_line_constraint or all(await asyncio.gather( line_passes(self.min_line_constraint, True, cur), @@ -199,10 +205,10 @@ class TrancheTrigger: class OrderTriggers: instances: dict[OrderKey, 'OrderTriggers'] = {} - def __init__(self, order: Order, inverted: bool): + def __init__(self, order: Order): assert order.key not in OrderTriggers.instances self.order = order - self.triggers = [TrancheTrigger(order, tk, inverted) for tk in self.order.tranche_keys] + self.triggers = [TrancheTrigger(order, tk) for tk in self.order.tranche_keys] OrderTriggers.instances[order.key] = self log.debug(f'created OrderTriggers for {order.key}') diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index 1e62952..df5dd3b 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -1,15 +1,17 @@ import asyncio import logging from asyncio import Queue +from datetime import datetime from typing import Callable, Union, Any, Iterable +from sqlalchemy.sql.functions import current_timestamp from web3.contract.contract import ContractEvents from web3.exceptions import LogTopicError, MismatchedABI # noinspection PyPackageRequirements from websockets.exceptions import ConnectionClosedError from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config -from dexorder.base.chain import current_chain +from dexorder.base.chain import current_chain, current_clock, BlockClock from dexorder.base.fork import current_fork, Fork from dexorder.blockchain.connection import create_w3_ws, create_w3 from dexorder.blockstate import BlockState, current_blockstate @@ -35,6 +37,9 @@ class BlockStateRunner: # items are (callback, event, log_filter). The callback is invoked with web3 EventData for every detected event self.events:list[tuple[Callable[[dict],None],ContractEvents,dict]] = [] + # these callbacks are invoked after every block and also every second if there wasnt a block + self.postprocess_cbs:list[Callable[[],None]] = [] + # onStateInit callbacks are invoked after the initial state is loaded or created self.on_state_init: list[Callable[[],None]] = [] self.state_initialized = False @@ -60,6 +65,8 @@ class BlockStateRunner: log_filter = {'topics': [topic(event.abi)]} self.events.append((callback, event, log_filter)) + def add_postprocess_trigger(self, callback: Callable[[dict], None]): + self.postprocess_cbs.append(callback) async def run(self): return await (self.run_polling() if config.polling > 0 else self.run_ws()) @@ -171,16 +178,21 @@ class BlockStateRunner: w3 = current_w3.get() chain = current_chain.get() assert chain.chain_id == await w3.eth.chain_id + current_clock.set(BlockClock()) + prev_head = None while self.running: try: async with asyncio.timeout(1): # check running flag every second + start = datetime.now() head = await self.queue.get() - log.debug(f'got head {hexstr(head)}') except TimeoutError: - pass + # 1 second has passed without a new head. Run the postprocess callbacks to check for activated time-based triggers + if prev_head is not None: + await self.handle_time_tick(head) else: try: await self.handle_head(chain, head, w3) + prev_head = head except Exception as x: log.exception(x) log.debug('runner worker exiting') @@ -203,6 +215,7 @@ class BlockStateRunner: block = Block(chain=chain_id, height=int(block_data['number'], 0), hash=bytes.fromhex(block_data['hash'][2:]), parent=bytes.fromhex(block_data['parentHash'][2:]), data=block_data) latest_block.set(block) + current_clock.get().set(block.timestamp) if self.state is None: # initialize self.state = BlockState(block) @@ -234,6 +247,8 @@ class BlockStateRunner: get_logs = await get_logs batches.append((get_logs, callback, event, lf)) from_height += chain.batch_size + for callback in self.postprocess_cbs: + batches.append((None, callback, None, None)) else: # event callbacks are triggered in the order in which they're registered. the events passed to # each callback are in block transaction order @@ -248,10 +263,12 @@ class BlockStateRunner: if not config.parallel_logevent_queries: get_logs = await get_logs batches.append((get_logs, callback, event, log_filter)) + for callback in self.postprocess_cbs: + batches.append((None, callback, None, None)) # set up for callbacks current_block.set(block) - current_fork.set(fork) # this is set earlier + current_fork.set(fork) session = db.session session.begin() session.add(block) @@ -305,6 +322,27 @@ class BlockStateRunner: if session is not None: session.close() + + async def handle_time_tick(self, blockhash): + # similar to handle_head, but we only call the postprocess events, since there was only a time tick and no new block data + block = self.state.by_hash[blockhash] + fork = self.state.fork(block) + current_block.set(block) + current_fork.set(fork) + session = db.session + session.begin() + try: + for callback in self.postprocess_cbs: + await maywait(callback()) + except: + session.rollback() + raise + else: + session.commit() + finally: + if session is not None: + session.close() + async def do_state_init_cbs(self): if self.state_initialized: return