From 96d54360b61429dd108892be22cacea72c61fdd7 Mon Sep 17 00:00:00 2001 From: Tim Date: Thu, 25 Jan 2024 00:59:59 -0400 Subject: [PATCH 1/7] backfill fixes --- src/dexorder/base/fork.py | 3 +- src/dexorder/bin/backfill_ohlc.py | 10 ++++- src/dexorder/bin/main.py | 11 ++++-- src/dexorder/blockstate/db_state.py | 2 +- src/dexorder/blockstate/diff.py | 3 ++ src/dexorder/configuration/schema.py | 8 ++-- src/dexorder/database/model/block.py | 4 +- src/dexorder/ohlc.py | 58 +++++++++++++++++++++------- src/dexorder/runner.py | 31 +++++++++++++-- src/dexorder/util/__init__.py | 14 ++++++- 10 files changed, 111 insertions(+), 33 deletions(-) diff --git a/src/dexorder/base/fork.py b/src/dexorder/base/fork.py index 9514677..a9e5a0f 100644 --- a/src/dexorder/base/fork.py +++ b/src/dexorder/base/fork.py @@ -41,7 +41,8 @@ class Fork: def for_height(self, height): """ returns a new Fork object for an older block along this fork. used for root promotion. """ - assert( height <= self.height ) + if height > self.height : + raise ValueError if height <= self.height - len(self.ancestry): return None return Fork(self.ancestry[self.height-height:], height=height) diff --git a/src/dexorder/bin/backfill_ohlc.py b/src/dexorder/bin/backfill_ohlc.py index ded13f2..d3f9e42 100644 --- a/src/dexorder/bin/backfill_ohlc.py +++ b/src/dexorder/bin/backfill_ohlc.py @@ -3,7 +3,6 @@ import sys from asyncio import CancelledError from dexorder import blockchain, config -from dexorder.base.ohlc import recent_ohlcs from dexorder.bin.executable import execute from dexorder.blockstate.blockdata import BlockData from dexorder.blockstate.db_state import DbState @@ -13,16 +12,22 @@ from dexorder.database import db from dexorder.event_handler import handle_uniswap_swap from dexorder.memcache.memcache_state import RedisState, publish_all from dexorder.memcache import memcache +from dexorder.ohlc import recent_ohlcs, ohlc_finalize, ohlcs from dexorder.runner import BlockStateRunner log = logging.getLogger('dexorder') +# noinspection DuplicatedCode async def main(): # noinspection DuplicatedCode logging.basicConfig(level=logging.INFO, stream=sys.stdout) log.setLevel(logging.DEBUG) parse_args() + if not config.ohlc_dir: + config.ohlc_dir = './ohlc' + log.warning('Defaulting ohlc_dir to ./ohlc') + ohlcs.dir = config.ohlc_dir await blockchain.connect() redis_state = None state = None @@ -31,7 +36,6 @@ async def main(): redis_state = RedisState([recent_ohlcs]) # NOTE: ONLY the ohlc's are pushed to Redis. We do not want to touch anything else. if db: db.connect(url=config.datadb_url) # our main database is the data db - # noinspection DuplicatedCode db_state = DbState(BlockData.by_opt('db')) with db.session: state = db_state.load() @@ -46,6 +50,8 @@ async def main(): # noinspection PyTypeChecker runner.add_event_trigger(handle_uniswap_swap, get_contract_event('IUniswapV3PoolEvents', 'Swap')) + # noinspection PyTypeChecker + runner.on_promotion.append(ohlc_finalize) if db: # noinspection PyUnboundLocalVariable,PyTypeChecker runner.on_promotion.append(db_state.save) diff --git a/src/dexorder/bin/main.py b/src/dexorder/bin/main.py index 81da665..4d23055 100644 --- a/src/dexorder/bin/main.py +++ b/src/dexorder/bin/main.py @@ -2,7 +2,7 @@ import logging import sys from asyncio import CancelledError -from dexorder import db, blockchain +from dexorder import db, blockchain, config from dexorder.base.chain import current_chain from dexorder.bin.executable import execute from dexorder.blockstate.blockdata import BlockData @@ -15,11 +15,12 @@ from dexorder.event_handler import init_order_triggers, init, dump_log, handle_v process_active_tranches, process_execution_requests, check_ohlc_rollover from dexorder.memcache.memcache_state import RedisState, publish_all from dexorder.memcache import memcache +from dexorder.ohlc import ohlc_finalize from dexorder.runner import BlockStateRunner from dexorder.transaction import handle_transaction_receipts, create_transactions, send_transactions log = logging.getLogger('dexorder') -LOG_ALL_EVENTS = True # for debug todo config +LOG_ALL_EVENTS = False # for debug todo config # @@ -70,6 +71,7 @@ def setup_logevent_triggers(runner): runner.postprocess_cbs.append(send_transactions) +# noinspection DuplicatedCode async def main(): logging.basicConfig(level=logging.INFO, stream=sys.stdout) log.setLevel(logging.DEBUG) @@ -94,10 +96,13 @@ async def main(): runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None) setup_logevent_triggers(runner) + if config.ohlc_dir: + # noinspection PyTypeChecker + runner.on_promotion.append(ohlc_finalize) if db: # noinspection PyTypeChecker runner.on_state_init.append(init_order_triggers) - # noinspection PyUnboundLocalVariable + # noinspection PyUnboundLocalVariable,PyTypeChecker runner.on_promotion.append(db_state.save) if redis_state: # noinspection PyTypeChecker diff --git a/src/dexorder/blockstate/db_state.py b/src/dexorder/blockstate/db_state.py index 9c1f5a6..46af53b 100644 --- a/src/dexorder/blockstate/db_state.py +++ b/src/dexorder/blockstate/db_state.py @@ -104,7 +104,7 @@ class DbState(SeriesCollection): for row in db.session.query(SeriesDict).where(SeriesDict.chain == chain_id, SeriesDict.series == data.series2str(series)): key = data.str2key(row.key) value = data.str2value(row.value) - log.debug(f'load {series} {key} {value}') + # log.debug(f'load {series} {key} {value}') var[key] = value completed_block.set(root_block) log.debug(f'loaded db state from block {root_block}') diff --git a/src/dexorder/blockstate/diff.py b/src/dexorder/blockstate/diff.py index 9029691..fc301a4 100644 --- a/src/dexorder/blockstate/diff.py +++ b/src/dexorder/blockstate/diff.py @@ -6,6 +6,7 @@ from dexorder import DELETE @dataclass class DiffEntry: + """ DiffEntry is the "value" part of a key-value pair, but DiffEntry also has metadata about the block in which the value was set """ value: Union[Any, DELETE] height: int hash: bytes @@ -13,6 +14,7 @@ class DiffEntry: @dataclass class DiffItem: + """ DiffItem is a simple series-key-value triple """ series: Any key: Any value: Any @@ -22,6 +24,7 @@ class DiffItem: @dataclass class DiffEntryItem: + """ DiffEntryItem is a DiffItem that has a DiffEntry as its extended value, instead of storing just the primary value directly """ series: Any key: Any entry: DiffEntry diff --git a/src/dexorder/configuration/schema.py b/src/dexorder/configuration/schema.py index 368c9af..e28cbbe 100644 --- a/src/dexorder/configuration/schema.py +++ b/src/dexorder/configuration/schema.py @@ -12,11 +12,11 @@ class Config: rpc_url: str = 'http://localhost:8545' ws_url: str = 'ws://localhost:8545' rpc_urls: Optional[dict[str,str]] = field(default_factory=dict) - db_url: str = 'postgresql://dexorder:redroxed@localhost/dexorder' - datadb_url: str = 'postgresql://dexorder:redroxed@localhost/dexorderdata' - ohlc_dir: str = './ohlc' + db_url: Optional[str] = 'postgresql://dexorder:redroxed@localhost/dexorder' + datadb_url: Optional[str] = 'postgresql://dexorder:redroxed@localhost/dexorderdata' + ohlc_dir: Optional[str] = None # if empty string or None, then OHLC's are not saved to disk dump_sql: bool = False - redis_url: str = 'redis://localhost:6379' + redis_url: Optional[str] = 'redis://localhost:6379' parallel_logevent_queries: bool = True polling: float = 0 # seconds between queries for a new block. 0 disables polling and uses a websocket subscription on ws_url instead diff --git a/src/dexorder/database/model/block.py b/src/dexorder/database/model/block.py index c280053..6368a2a 100644 --- a/src/dexorder/database/model/block.py +++ b/src/dexorder/database/model/block.py @@ -4,7 +4,7 @@ from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import Mapped, mapped_column from dexorder.database.model import Base -from dexorder.util import hexint +from dexorder.util import hexint, Field class Block(Base): @@ -25,5 +25,5 @@ class Block(Base): current_block = ContextVar[Block]('Block.cur') # block for the current thread -latest_block = ContextVar[Block]('Block.latest') # most recent discovered but may not be processed yet +latest_block = Field[Block]() # most recent discovered block but maybe not the currently processing one completed_block = ContextVar[Block]('Block.completed') # most recent fully-processed block diff --git a/src/dexorder/ohlc.py b/src/dexorder/ohlc.py index fc32d89..776eeaa 100644 --- a/src/dexorder/ohlc.py +++ b/src/dexorder/ohlc.py @@ -3,20 +3,24 @@ import logging import os from dataclasses import dataclass from datetime import datetime, timedelta, timezone -from typing import Optional, NamedTuple +from typing import Optional, NamedTuple, Reversible, Union from cachetools import LFUCache -from dexorder import dec, config, from_isotime, minutely +from dexorder import dec, config, from_isotime, minutely, from_timestamp from dexorder.base.chain import current_chain -from dexorder.blockstate import BlockDict +from dexorder.blockstate import BlockDict, DiffItem +from dexorder.blockstate.diff import DiffEntryItem +from dexorder.database.model import Block +from dexorder.database.model.block import current_block log = logging.getLogger(__name__) OHLC_PERIODS = [ - timedelta(minutes=1), timedelta(minutes=3), timedelta(minutes=5), timedelta(minutes=10), timedelta(minutes=15), timedelta(minutes=30), - timedelta(hours=1), timedelta(hours=2), timedelta(hours=4), timedelta(hours=8), timedelta(hours=12), - timedelta(days=1), timedelta(days=2), timedelta(days=3), timedelta(days=7) + timedelta(minutes=1), + # timedelta(minutes=3), timedelta(minutes=5), timedelta(minutes=10), timedelta(minutes=15), timedelta(minutes=30), + # timedelta(hours=1), timedelta(hours=2), timedelta(hours=4), timedelta(hours=8), timedelta(hours=12), + # timedelta(days=1), timedelta(days=2), timedelta(days=3), timedelta(days=7) ] OHLC_DATE_ROOT = datetime(2009, 1, 4, tzinfo=timezone.utc) # Sunday before Bitcoin Genesis @@ -136,13 +140,15 @@ class OHLCRepository: for period in OHLC_PERIODS: self.update(symbol, period, time, price, create=create) - def update(self, symbol: str, period: timedelta, time: datetime, price: Optional[dec] = None, *, create: bool = True) -> Optional[list[OHLC]]: + @staticmethod + def update(symbol: str, period: timedelta, time: datetime, price: Optional[dec] = None, *, create: bool = True) -> Optional[list[OHLC]]: """ if price is None, then bars are advanced based on the time but no new price is added to the series. """ logname = f'{symbol} {ohlc_name(period)}' log.debug(f'Updating OHLC {logname} {minutely(time)} {price}') key = (symbol, period) + # bars is a list of "recent" OHLC's stored as blockdata. we try to keep the recent array long enough to extend before the root block time bars: Optional[list[OHLC]] = recent_ohlcs.get(key) if not bars: if create is False or price is None: @@ -152,18 +158,27 @@ class OHLCRepository: log.debug(f'\tcreated new bars {updated}') else: updated = update_ohlc(bars[-1], period, time, price) - if len(updated) == 1: - updated = [bars[-1], updated[0]] # return the previous finalized bar along with the updated current bar + # we need to retain enough recent history to at least cover the root block time, plus one previous finalized block + # first we construct the longest possible sequence + if not bars or not updated: + updated = (bars or []) + (updated or []) + else: + last_bar = from_isotime(bars[-1][0]) + first_updated = from_isotime(updated[0][0]) + overlap = (first_updated - last_bar) // period + updated = bars[:-overlap] + updated if overlap > 0 else bars + updated + # now we drop history that is older than we need + oldest_needed = from_timestamp(current_block.get().timestamp) - period # cover the root block time plus one period prior + trim = (oldest_needed - from_isotime(updated[0][0])) // period + if trim > 0: + updated = updated[trim:] log.debug(f'\tnew recents: {updated}') recent_ohlcs.setitem(key, updated) - if len(updated) > 1: - log.debug(f'\tsaving finalized bars: {updated[:-1]}') - self.save_all(symbol, period, updated[:-1]) # save any finalized bars to storage return updated def save_all(self, symbol: str, period: timedelta, ohlc_list: list[OHLC]) -> None: for ohlc in ohlc_list: - self.save(symbol, period, ohlc) + self.save(symbol, period, ohlc) # we need to act sequentially so we don't have conflicting access to chunks def save(self, symbol: str, period: timedelta, ohlc: OHLC) -> None: time = dt(ohlc[0]) @@ -173,6 +188,7 @@ class OHLCRepository: else: start = from_isotime(chunk[0][0]) index = (time - start) // period + log.debug(f'save {symbol} {ohlc_name(period)} chunk {start} index {index} <= {len(chunk)}') assert index <= len(chunk) if index == len(chunk): assert from_isotime(chunk[-1][0]) + period == time @@ -214,10 +230,12 @@ class OHLCRepository: json.dump(chunk, file) - def chunk_path(self, symbol: str, period: timedelta, time: datetime) -> str: + def chunk_path(self, symbol: str, period: timedelta, time: datetime, *, chain_id: int = None) -> str: + if chain_id is None: + chain_id = current_chain.get().chain_id start = ohlc_start_time(time, period) name = ohlc_name(period) - return f'{self.dir}/{symbol}/{name}/' + ( + return f'{self.dir}/{chain_id}/{symbol}/{name}/' + ( f'{start.year}/{symbol}-{name}-{start:%Y%m%d}.json' if period < timedelta(hours=1) else # <1H data has a file per day f'{start.year}/{symbol}-{name}-{start:%Y%m}.json' if period < timedelta(days=7) else # <1W data has a file per month f'{symbol}-{name}.json' # long periods are a single file for all of history @@ -240,6 +258,16 @@ def ohlc_str_to_key(s): pool, period_name = s.split('|') return pool, period_from_name(period_name) +def ohlc_finalize(_block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]): + """ + used as a finalization callback from BlockState data. + """ + for diff in diffs: + if diff.series == 'ohlc': + symbol, period = diff.key + ohlcs.save_all(symbol, period, diff.value) + + # The most recent OHLC's are stored as block data. We store a list of at least the two latest bars, which provides clients with # the latest finalized bar as well as the current open bar. recent_ohlcs: BlockDict[OHLCKey, list[OHLC]] = BlockDict('ohlc', db=True, redis=True, pub=pub_ohlc, diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index 2a0a01b..6683e0b 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -200,6 +200,7 @@ class BlockStateRunner: parent = bytes.fromhex(block_data['parentHash'][2:]) height = int(block_data['number'], 0) head = Block(chain=chain.chain_id, height=height, hash=blockhash, parent=parent, data=block_data) + latest_block.set(head) if self.state or config.backfill: # backfill batches @@ -264,7 +265,6 @@ class BlockStateRunner: if self.state is not None and block.hash in self.state.by_hash: log.debug(f'block {block.hash} was already processed') return - latest_block.set(block) if self.state is None: # initialize self.state = BlockState(block) @@ -343,18 +343,38 @@ class BlockStateRunner: # isn't updated by the new fork is still queried from the root state to overwrite any stale data from the abandoned branch. diff_items = self.state.diffs_by_hash[block.hash] for callback in self.on_head_update: + # noinspection PyCallingNonCallable await maywait(callback(block, diff_items)) # check for root promotion - promotion_height = fork.height - chain.confirms - if not fork.disjoint and promotion_height > self.state.root_block.height and ( - new_root_fork := fork.for_height(promotion_height)): + promotion_height = latest_block.get().height - chain.confirms + new_root_fork = None + if fork.disjoint: + # individually check the fork's head and ancestor + if fork.height <= promotion_height: + new_root_fork = fork + else: + state = current_blockstate.get() + parent_block = state.by_hash[fork.parent] + if parent_block.height <= promotion_height: + new_root_fork = state.fork(parent_block) + else: + # non-disjoint, contiguous fork + if fork.height <= promotion_height: + new_root_fork = fork + else: + new_root_fork = fork.for_height(promotion_height) + if new_root_fork: + log.debug(f'promoting root {new_root_fork.height} {new_root_fork.hash}') diff_items = self.state.promote_root(new_root_fork) for callback in self.on_promotion: # todo try/except for known retryable errors + # noinspection PyCallingNonCallable await maywait(callback(self.state.root_block, diff_items)) + # publish messages if pubs and self.publish_all: + # noinspection PyCallingNonCallable await maywait(self.publish_all(pubs)) except: # legitimately catch EVERYTHING because we re-raise log.debug('rolling back session') @@ -364,6 +384,7 @@ class BlockStateRunner: self.state.delete_block(block.hash) if config.parallel_logevent_queries: for get_logs, *_ in batches: + # noinspection PyBroadException try: await get_logs except Exception: @@ -388,6 +409,7 @@ class BlockStateRunner: session.begin() try: for callback in self.postprocess_cbs: + # noinspection PyCallingNonCallable await maywait(callback()) except: session.rollback() @@ -402,5 +424,6 @@ class BlockStateRunner: if self.state_initialized: return for cb in self.on_state_init: + # noinspection PyCallingNonCallable await maywait(cb()) self.state_initialized = True diff --git a/src/dexorder/util/__init__.py b/src/dexorder/util/__init__.py index f36976b..b1bd8ea 100644 --- a/src/dexorder/util/__init__.py +++ b/src/dexorder/util/__init__.py @@ -1,5 +1,5 @@ import re -from typing import Callable, TypeVar, Generic, Union +from typing import Callable, TypeVar, Generic, Union, Any from eth_utils import keccak from hexbytes import HexBytes @@ -71,3 +71,15 @@ class defaultdictk (Generic[K,V], dict[K,V]): except KeyError: default = self[item] = self.default_factory(item) return default + + +T = TypeVar('T') +class Field (Generic[T]): + def __init__(self, value: T = None): + self._value = value + + def get(self) -> T: + return self._value + + def set(self, value: T): + self._value = value From 5931dd564763bc03f9099a7d99347731287ebfdc Mon Sep 17 00:00:00 2001 From: Tim Date: Thu, 25 Jan 2024 18:20:23 -0400 Subject: [PATCH 2/7] typing cleanup --- src/dexorder/bin/backfill_ohlc.py | 14 ++++++++++---- src/dexorder/bin/main.py | 5 +---- src/dexorder/blockstate/state.py | 1 - src/dexorder/configuration/load.py | 1 - src/dexorder/contract/contract_proxy.py | 20 +++++++++----------- src/dexorder/database/__init__.py | 1 - src/dexorder/event_handler.py | 4 ---- src/dexorder/memcache/memcache_state.py | 2 +- src/dexorder/runner.py | 16 ++++++++-------- src/dexorder/util/__init__.py | 3 +-- src/dexorder/util/async_util.py | 4 +--- src/dexorder/util/shutdown.py | 3 ++- 12 files changed, 33 insertions(+), 41 deletions(-) diff --git a/src/dexorder/bin/backfill_ohlc.py b/src/dexorder/bin/backfill_ohlc.py index d3f9e42..bfdc99b 100644 --- a/src/dexorder/bin/backfill_ohlc.py +++ b/src/dexorder/bin/backfill_ohlc.py @@ -1,14 +1,18 @@ import logging import sys from asyncio import CancelledError +from typing import Iterable, Union from dexorder import blockchain, config from dexorder.bin.executable import execute +from dexorder.blockstate import DiffItem from dexorder.blockstate.blockdata import BlockData from dexorder.blockstate.db_state import DbState +from dexorder.blockstate.diff import DiffEntryItem from dexorder.configuration import parse_args from dexorder.contract import get_contract_event from dexorder.database import db +from dexorder.database.model import Block from dexorder.event_handler import handle_uniswap_swap from dexorder.memcache.memcache_state import RedisState, publish_all from dexorder.memcache import memcache @@ -18,6 +22,10 @@ from dexorder.runner import BlockStateRunner log = logging.getLogger('dexorder') +def finalize_callback(block: Block, _diffs: Iterable[Union[DiffItem, DiffEntryItem]]): + log.info(f'backfill completed through block {block.height} {block.timestamp:%Y-%m-%d %H:%M:%S} {block.hash}') + + # noinspection DuplicatedCode async def main(): # noinspection DuplicatedCode @@ -47,16 +55,14 @@ async def main(): log.info(f'loaded state from db for root block {state.root_block}') runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None, timer_period=0) - # noinspection PyTypeChecker runner.add_event_trigger(handle_uniswap_swap, get_contract_event('IUniswapV3PoolEvents', 'Swap')) - # noinspection PyTypeChecker runner.on_promotion.append(ohlc_finalize) if db: - # noinspection PyUnboundLocalVariable,PyTypeChecker + # noinspection PyUnboundLocalVariable runner.on_promotion.append(db_state.save) + runner.on_promotion.append(finalize_callback) if redis_state: - # noinspection PyTypeChecker runner.on_head_update.append(redis_state.save) try: diff --git a/src/dexorder/bin/main.py b/src/dexorder/bin/main.py index 4d23055..b3a141b 100644 --- a/src/dexorder/bin/main.py +++ b/src/dexorder/bin/main.py @@ -97,15 +97,12 @@ async def main(): runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None) setup_logevent_triggers(runner) if config.ohlc_dir: - # noinspection PyTypeChecker runner.on_promotion.append(ohlc_finalize) if db: - # noinspection PyTypeChecker runner.on_state_init.append(init_order_triggers) - # noinspection PyUnboundLocalVariable,PyTypeChecker + # noinspection PyUnboundLocalVariable runner.on_promotion.append(db_state.save) if redis_state: - # noinspection PyTypeChecker runner.on_head_update.append(redis_state.save) try: diff --git a/src/dexorder/blockstate/state.py b/src/dexorder/blockstate/state.py index 12e28ae..1e0a2d9 100644 --- a/src/dexorder/blockstate/state.py +++ b/src/dexorder/blockstate/state.py @@ -97,7 +97,6 @@ class BlockState: return Fork([block.hash], height=block.height) if block.height - self.ancestors[block.hash].height > 1: - # noinspection PyTypeChecker return DisjointFork(block, self.root_block) def ancestors(): diff --git a/src/dexorder/configuration/load.py b/src/dexorder/configuration/load.py index 461c25b..a440d03 100644 --- a/src/dexorder/configuration/load.py +++ b/src/dexorder/configuration/load.py @@ -80,7 +80,6 @@ def from_toml(filename): def parse_args(args=None): """ should be called from binaries to parse args as command-line config settings """ - # noinspection PyTypeChecker try: config.merge_with(OmegaConf.from_cli(args)) # updates config in-place. THANK YOU OmegaConf! except OmegaConfBaseException as x: diff --git a/src/dexorder/contract/contract_proxy.py b/src/dexorder/contract/contract_proxy.py index 423e1e9..a51f7d1 100644 --- a/src/dexorder/contract/contract_proxy.py +++ b/src/dexorder/contract/contract_proxy.py @@ -69,24 +69,22 @@ class ContractProxy: def events(self): return self.contract.events - def deploy(self, *args): - """ - Calls the contract constructor transaction and waits to receive a transaction receipt. - """ - tx: ContractTransaction = self.transact.constructor(*args) - receipt = tx.wait() - self.address = receipt.contractAddress - self._contracts.clear() - return receipt + # def deploy(self, *args): + # """ + # Calls the contract constructor transaction and waits to receive a transaction receipt. + # """ + # tx: ContractTransaction = self.transact.constructor(*args) + # receipt = tx.wait() + # self.address = receipt.contractAddress + # self._contracts.clear() + # return receipt @property def transact(self): - # noinspection PyTypeChecker return ContractProxy(self.address, self._interface_name, _contracts=self._contracts, _wrapper=transact_wrapper, abi=self._abi) @property def build(self): - # noinspection PyTypeChecker return ContractProxy(self.address, self._interface_name, _contracts=self._contracts, _wrapper=build_wrapper, abi=self._abi) def __getattr__(self, item): diff --git a/src/dexorder/database/__init__.py b/src/dexorder/database/__init__.py index 71f2ef5..8908956 100644 --- a/src/dexorder/database/__init__.py +++ b/src/dexorder/database/__init__.py @@ -75,7 +75,6 @@ class Db: if engine is None: raise RuntimeError('Cannot create session: no database engine set. Use dexorder.db.connect() first') s = Session(engine, expire_on_commit=False) - # noinspection PyTypeChecker _session.set(s) return s diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index 54d36f4..42014c5 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -210,7 +210,6 @@ async def activate_time_triggers(): # log.debug(f'activating time triggers at {now}') # time triggers for tt in tuple(time_triggers): - # noinspection PyTypeChecker await maywait(tt(now)) @@ -220,16 +219,13 @@ async def activate_price_triggers(): for pool, price in new_pool_prices.items(): pools_triggered.add(pool) for pt in tuple(price_triggers[pool]): - # noinspection PyTypeChecker await maywait(pt(price)) for pool, triggers in new_price_triggers.items(): if pool not in pools_triggered: price = pool_prices[pool] for pt in triggers: - # noinspection PyTypeChecker await maywait(pt(price)) for t in tuple(unconstrained_price_triggers): - # noinspection PyTypeChecker await maywait(t(None)) diff --git a/src/dexorder/memcache/memcache_state.py b/src/dexorder/memcache/memcache_state.py index 20d644b..2fd5d1c 100644 --- a/src/dexorder/memcache/memcache_state.py +++ b/src/dexorder/memcache/memcache_state.py @@ -54,7 +54,7 @@ class RedisState (SeriesCollection): sdels: dict[str,set[str]] = defaultdict(set) hsets: dict[str,dict[str,str]] = defaultdict(dict) hdels: dict[str,set[str]] = defaultdict(set) - pubs: list[tuple[str,str,list[Any]]] = [] # series, key, value => room, event, value + pubs: list[tuple[str,str,Any]] = [] # series, key, value => room, event, value for diff in compress_diffs(diffs): try: d = self.datas[diff.series] diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index 6683e0b..02428de 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -1,7 +1,7 @@ import asyncio import logging from asyncio import Queue -from typing import Union, Any, Iterable +from typing import Union, Any, Iterable, Callable from web3.contract.contract import ContractEvents from web3.exceptions import LogTopicError, MismatchedABI @@ -60,22 +60,22 @@ class BlockStateRunner: self.state = state # items are (callback, event, log_filter). The callback is invoked with web3 EventData for every detected event - self.events:list[tuple[Maywaitable[[EventData],None],ContractEvents,dict]] = [] + self.events:list[tuple[Callable[[EventData],Maywaitable[None]],ContractEvents,dict]] = [] # these callbacks are invoked after every block and also every second if there wasnt a block - self.postprocess_cbs:list[Maywaitable[[],None]] = [] + self.postprocess_cbs:list[Callable[[],Maywaitable[None]]] = [] # onStateInit callbacks are invoked after the initial state is loaded or created - self.on_state_init: list[Maywaitable[[],None]] = [] + self.on_state_init: list[Callable[[],Maywaitable[None]]] = [] self.state_initialized = False # onHeadUpdate callbacks are invoked with a list of DiffItems used to update the head state from either the previous head or the root - self.on_head_update: list[Maywaitable[[Block,list[DiffEntryItem]],None]] = [] + self.on_head_update: list[Callable[[Block,list[DiffEntryItem]],Maywaitable[None]]] = [] # onPromotion callbacks are invoked with a list of DiffItems used to advance the root state - self.on_promotion: list[Maywaitable[[Block,list[DiffEntryItem]],None]] = [] + self.on_promotion: list[Callable[[Block,list[DiffEntryItem]],Maywaitable[None]]] = [] - self.publish_all: Maywaitable[[Iterable[tuple[str,str,Any]]],None] = publish_all + self.publish_all: Callable[[Iterable[tuple[str,str,Any]]],Maywaitable[None]] = publish_all self.timer_period = timer_period @@ -85,7 +85,7 @@ class BlockStateRunner: self.running = False - def add_event_trigger(self, callback: Maywaitable[[EventData], None], event: ContractEvents = None, log_filter: Union[dict, str] = None): + def add_event_trigger(self, callback: Callable[[EventData], Maywaitable[None]], event: ContractEvents = None, log_filter: Union[dict, str] = None): """ if event is None, the callback is still invoked in the series of log handlers but with no argument instead of logs """ diff --git a/src/dexorder/util/__init__.py b/src/dexorder/util/__init__.py index b1bd8ea..4be444d 100644 --- a/src/dexorder/util/__init__.py +++ b/src/dexorder/util/__init__.py @@ -17,14 +17,13 @@ def align_decimal(value, left_columns) -> str: return ' ' * pad + s -def hexstr(value: bytes): +def hexstr(value: Union[HexBytes, bytes, str]): """ returns an 0x-prefixed hex string """ if type(value) is HexBytes: return value.hex() elif type(value) is bytes: return '0x' + value.hex() elif type(value) is str: - # noinspection PyTypeChecker return value if value.startswith('0x') else '0x' + value else: raise ValueError diff --git a/src/dexorder/util/async_util.py b/src/dexorder/util/async_util.py index b12588b..13bb1d0 100644 --- a/src/dexorder/util/async_util.py +++ b/src/dexorder/util/async_util.py @@ -12,9 +12,7 @@ async def async_yield(): Args = TypeVar('Args') Return = TypeVar('Return') -class Maywaitable (Generic[Args, Return], Callable[[Args],Return], Awaitable[Return], ABC): - pass - +Maywaitable = Union[Return, Awaitable[Return]] async def maywait(obj: Maywaitable): if inspect.isawaitable(obj): diff --git a/src/dexorder/util/shutdown.py b/src/dexorder/util/shutdown.py index c5de24e..fbb5cb6 100644 --- a/src/dexorder/util/shutdown.py +++ b/src/dexorder/util/shutdown.py @@ -1,8 +1,9 @@ import logging +from typing import Never log = logging.getLogger('dexorder') -def fatal(message, exception=None): +def fatal(message, exception=None) -> Never: if exception is None and isinstance(message, (BaseException,RuntimeError)): exception = message log.exception(message, exc_info=exception) From 64384c3d3a9dafc00dee8048696b7075dc7cd24f Mon Sep 17 00:00:00 2001 From: Tim Date: Thu, 25 Jan 2024 19:39:58 -0400 Subject: [PATCH 3/7] ohlc bugfixes --- src/dexorder/__init__.py | 2 +- src/dexorder/ohlc.py | 33 +++++++++++++++++---------------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/src/dexorder/__init__.py b/src/dexorder/__init__.py index 41730fa..98945c2 100644 --- a/src/dexorder/__init__.py +++ b/src/dexorder/__init__.py @@ -11,7 +11,7 @@ def now(): return datetime.now(timezone.utc) def timestamp(): - return datetime.now().timestamp() + return int(datetime.now().timestamp()) def from_timestamp(ts): return datetime.fromtimestamp(ts, timezone.utc) diff --git a/src/dexorder/ohlc.py b/src/dexorder/ohlc.py index 776eeaa..456befe 100644 --- a/src/dexorder/ohlc.py +++ b/src/dexorder/ohlc.py @@ -97,7 +97,7 @@ def update_ohlc(prev: OHLC, period: timedelta, time: datetime, price: Optional[d break result.append(cur.ohlc) cur = NativeOHLC(end, None, None, None, cur.close) - log.debug(f'\tresult after finalization: {result}') + log.debug(f'\ttime advancements: {result}') # if we are setting a price, update the current bar if price is not None: if cur.open is None: @@ -149,29 +149,30 @@ class OHLCRepository: log.debug(f'Updating OHLC {logname} {minutely(time)} {price}') key = (symbol, period) # bars is a list of "recent" OHLC's stored as blockdata. we try to keep the recent array long enough to extend before the root block time - bars: Optional[list[OHLC]] = recent_ohlcs.get(key) - if not bars: + historical: Optional[list[OHLC]] = recent_ohlcs.get(key) + if not historical: if create is False or price is None: return # do not track symbols which have not been explicity set up p = str(price) + historical = [] updated = [OHLC((minutely(ohlc_start_time(time, period)), p, p, p, p))] log.debug(f'\tcreated new bars {updated}') else: - updated = update_ohlc(bars[-1], period, time, price) - # we need to retain enough recent history to at least cover the root block time, plus one previous finalized block - # first we construct the longest possible sequence - if not bars or not updated: - updated = (bars or []) + (updated or []) + updated = update_ohlc(historical[-1], period, time, price) + # drop any historical bars that are older than we need + oldest_needed = from_timestamp(current_block.get().timestamp) - period # cover the root block time plus one period prior + trim = (oldest_needed - from_isotime(historical[0][0])) // period + if trim > 0: + historical = historical[trim:] + + # now overlap the updated data on top of the historical data + if not historical or not updated: + updated = historical + updated else: - last_bar = from_isotime(bars[-1][0]) + last_bar = from_isotime(historical[-1][0]) first_updated = from_isotime(updated[0][0]) - overlap = (first_updated - last_bar) // period - updated = bars[:-overlap] + updated if overlap > 0 else bars + updated - # now we drop history that is older than we need - oldest_needed = from_timestamp(current_block.get().timestamp) - period # cover the root block time plus one period prior - trim = (oldest_needed - from_isotime(updated[0][0])) // period - if trim > 0: - updated = updated[trim:] + overlap = (first_updated - last_bar) // period + 1 + updated = historical[:-overlap] + updated if overlap > 0 else historical + updated log.debug(f'\tnew recents: {updated}') recent_ohlcs.setitem(key, updated) return updated From 455df0f03899a0bd455a27469581a17ead54231f Mon Sep 17 00:00:00 2001 From: Tim Date: Fri, 26 Jan 2024 00:29:56 -0400 Subject: [PATCH 4/7] backfill debugged --- src/dexorder/bin/backfill_ohlc.py | 25 ++++++----- src/dexorder/bin/main.py | 4 +- src/dexorder/configuration/schema.py | 2 +- src/dexorder/event_handler.py | 1 - src/dexorder/ohlc.py | 67 +++++++++++++++------------- src/dexorder/order/triggers.py | 3 +- src/dexorder/runner.py | 14 +++--- 7 files changed, 61 insertions(+), 55 deletions(-) diff --git a/src/dexorder/bin/backfill_ohlc.py b/src/dexorder/bin/backfill_ohlc.py index bfdc99b..8d517db 100644 --- a/src/dexorder/bin/backfill_ohlc.py +++ b/src/dexorder/bin/backfill_ohlc.py @@ -1,9 +1,9 @@ import logging import sys from asyncio import CancelledError -from typing import Iterable, Union +from typing import Union, Reversible -from dexorder import blockchain, config +from dexorder import blockchain, config, from_timestamp, now from dexorder.bin.executable import execute from dexorder.blockstate import DiffItem from dexorder.blockstate.blockdata import BlockData @@ -13,17 +13,22 @@ from dexorder.configuration import parse_args from dexorder.contract import get_contract_event from dexorder.database import db from dexorder.database.model import Block -from dexorder.event_handler import handle_uniswap_swap +from dexorder.event_handler import handle_uniswap_swap, check_ohlc_rollover from dexorder.memcache.memcache_state import RedisState, publish_all from dexorder.memcache import memcache -from dexorder.ohlc import recent_ohlcs, ohlc_finalize, ohlcs +from dexorder.ohlc import recent_ohlcs, ohlc_save, ohlcs from dexorder.runner import BlockStateRunner +from dexorder.util import hexstr log = logging.getLogger('dexorder') -def finalize_callback(block: Block, _diffs: Iterable[Union[DiffItem, DiffEntryItem]]): - log.info(f'backfill completed through block {block.height} {block.timestamp:%Y-%m-%d %H:%M:%S} {block.hash}') +def finalize_callback(block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]): + start = now() + log.info("finalizing OHLC's...") + ohlc_save(block, diffs) + log.info(f'\ttook {(now() - start).total_seconds():.1f} seconds') + log.info(f'backfill completed through block {block.height} {from_timestamp(block.timestamp):%Y-%m-%d %H:%M:%S} {hexstr(block.hash)}') # noinspection DuplicatedCode @@ -34,8 +39,7 @@ async def main(): parse_args() if not config.ohlc_dir: config.ohlc_dir = './ohlc' - log.warning('Defaulting ohlc_dir to ./ohlc') - ohlcs.dir = config.ohlc_dir + ohlcs.dir = config.ohlc_dir await blockchain.connect() redis_state = None state = None @@ -56,12 +60,11 @@ async def main(): runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None, timer_period=0) runner.add_event_trigger(handle_uniswap_swap, get_contract_event('IUniswapV3PoolEvents', 'Swap')) - - runner.on_promotion.append(ohlc_finalize) + runner.postprocess_cbs.append(check_ohlc_rollover) + runner.on_promotion.append(finalize_callback) if db: # noinspection PyUnboundLocalVariable runner.on_promotion.append(db_state.save) - runner.on_promotion.append(finalize_callback) if redis_state: runner.on_head_update.append(redis_state.save) diff --git a/src/dexorder/bin/main.py b/src/dexorder/bin/main.py index b3a141b..526406a 100644 --- a/src/dexorder/bin/main.py +++ b/src/dexorder/bin/main.py @@ -15,7 +15,7 @@ from dexorder.event_handler import init_order_triggers, init, dump_log, handle_v process_active_tranches, process_execution_requests, check_ohlc_rollover from dexorder.memcache.memcache_state import RedisState, publish_all from dexorder.memcache import memcache -from dexorder.ohlc import ohlc_finalize +from dexorder.ohlc import ohlc_save from dexorder.runner import BlockStateRunner from dexorder.transaction import handle_transaction_receipts, create_transactions, send_transactions @@ -97,7 +97,7 @@ async def main(): runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None) setup_logevent_triggers(runner) if config.ohlc_dir: - runner.on_promotion.append(ohlc_finalize) + runner.on_promotion.append(ohlc_save) if db: runner.on_state_init.append(init_order_triggers) # noinspection PyUnboundLocalVariable diff --git a/src/dexorder/configuration/schema.py b/src/dexorder/configuration/schema.py index e28cbbe..3f43849 100644 --- a/src/dexorder/configuration/schema.py +++ b/src/dexorder/configuration/schema.py @@ -10,7 +10,7 @@ from typing import Optional @dataclass class Config: rpc_url: str = 'http://localhost:8545' - ws_url: str = 'ws://localhost:8545' + ws_url: Optional[str] = 'ws://localhost:8545' rpc_urls: Optional[dict[str,str]] = field(default_factory=dict) db_url: Optional[str] = 'postgresql://dexorder:redroxed@localhost/dexorder' datadb_url: Optional[str] = 'postgresql://dexorder:redroxed@localhost/dexorderdata' diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index 42014c5..c0bcf6c 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -1,7 +1,6 @@ import asyncio import functools import logging -from datetime import datetime from uuid import UUID from web3.types import EventData diff --git a/src/dexorder/ohlc.py b/src/dexorder/ohlc.py index 456befe..5640441 100644 --- a/src/dexorder/ohlc.py +++ b/src/dexorder/ohlc.py @@ -9,18 +9,17 @@ from cachetools import LFUCache from dexorder import dec, config, from_isotime, minutely, from_timestamp from dexorder.base.chain import current_chain -from dexorder.blockstate import BlockDict, DiffItem +from dexorder.blockstate import BlockDict, DiffItem, current_blockstate from dexorder.blockstate.diff import DiffEntryItem from dexorder.database.model import Block -from dexorder.database.model.block import current_block log = logging.getLogger(__name__) OHLC_PERIODS = [ timedelta(minutes=1), - # timedelta(minutes=3), timedelta(minutes=5), timedelta(minutes=10), timedelta(minutes=15), timedelta(minutes=30), - # timedelta(hours=1), timedelta(hours=2), timedelta(hours=4), timedelta(hours=8), timedelta(hours=12), - # timedelta(days=1), timedelta(days=2), timedelta(days=3), timedelta(days=7) + timedelta(minutes=3), timedelta(minutes=5), timedelta(minutes=10), timedelta(minutes=15), timedelta(minutes=30), + timedelta(hours=1), timedelta(hours=2), timedelta(hours=4), timedelta(hours=8), timedelta(hours=12), + timedelta(days=1), timedelta(days=2), timedelta(days=3), timedelta(days=7) ] OHLC_DATE_ROOT = datetime(2009, 1, 4, tzinfo=timezone.utc) # Sunday before Bitcoin Genesis @@ -51,7 +50,7 @@ class NativeOHLC: @property def ohlc(self) -> OHLC: return [ - self.start.isoformat(timespec='minutes'), + minutely(self.start), None if self.open is None else str(self.open), None if self.high is None else str(self.high), None if self.low is None else str(self.low), @@ -59,8 +58,7 @@ class NativeOHLC: ] - -def ohlc_name(period: timedelta) -> str: +def period_name(period: timedelta) -> str: return f'{period // timedelta(minutes=1)}m' if period < timedelta(hours=1) \ else f'{period // timedelta(hours=1)}H' if period < timedelta(days=1) \ else f'{period // timedelta(days=7)}W' if period == timedelta(days=7) \ @@ -86,7 +84,7 @@ def update_ohlc(prev: OHLC, period: timedelta, time: datetime, price: Optional[d returns an ordered list of OHLC's that have been created/modified by the new time/price if price is None, then bars are advanced based on the time but no new price is added to the series. """ - log.debug(f'\tupdating {prev} with {minutely(time)} {price}') + # log.debug(f'\tupdating {prev} with {minutely(time)} {price}') cur = NativeOHLC.from_ohlc(prev) assert time >= cur.start result = [] @@ -97,7 +95,7 @@ def update_ohlc(prev: OHLC, period: timedelta, time: datetime, price: Optional[d break result.append(cur.ohlc) cur = NativeOHLC(end, None, None, None, cur.close) - log.debug(f'\ttime advancements: {result}') + # log.debug(f'\ttime advancements: {result}') # if we are setting a price, update the current bar if price is not None: if cur.open is None: @@ -109,8 +107,8 @@ def update_ohlc(prev: OHLC, period: timedelta, time: datetime, price: Optional[d cur.low = min(cur.low, price) cur.close = price result.append(cur.ohlc) - log.debug(f'\tappended current bar: {cur.ohlc}') - log.debug(f'\tupdate result: {result}') + # log.debug(f'\tappended current bar: {cur.ohlc}') + # log.debug(f'\tupdate result: {result}') return result class OHLCKey (NamedTuple): @@ -145,22 +143,23 @@ class OHLCRepository: """ if price is None, then bars are advanced based on the time but no new price is added to the series. """ - logname = f'{symbol} {ohlc_name(period)}' - log.debug(f'Updating OHLC {logname} {minutely(time)} {price}') + logname = f'{symbol} {period_name(period)}' + # log.debug(f'Updating OHLC {logname} {minutely(time)} {price}') key = (symbol, period) # bars is a list of "recent" OHLC's stored as blockdata. we try to keep the recent array long enough to extend before the root block time historical: Optional[list[OHLC]] = recent_ohlcs.get(key) + # log.debug(f'got recent {historical}') if not historical: if create is False or price is None: return # do not track symbols which have not been explicity set up p = str(price) historical = [] updated = [OHLC((minutely(ohlc_start_time(time, period)), p, p, p, p))] - log.debug(f'\tcreated new bars {updated}') + # log.debug(f'\tcreated new bars {updated}') else: updated = update_ohlc(historical[-1], period, time, price) # drop any historical bars that are older than we need - oldest_needed = from_timestamp(current_block.get().timestamp) - period # cover the root block time plus one period prior + oldest_needed = from_timestamp(current_blockstate.get().root_block.timestamp) - period # cover the root block time plus one period prior trim = (oldest_needed - from_isotime(historical[0][0])) // period if trim > 0: historical = historical[trim:] @@ -173,15 +172,16 @@ class OHLCRepository: first_updated = from_isotime(updated[0][0]) overlap = (first_updated - last_bar) // period + 1 updated = historical[:-overlap] + updated if overlap > 0 else historical + updated - log.debug(f'\tnew recents: {updated}') + # log.debug(f'\tnew recents: {updated}') recent_ohlcs.setitem(key, updated) return updated def save_all(self, symbol: str, period: timedelta, ohlc_list: list[OHLC]) -> None: for ohlc in ohlc_list: - self.save(symbol, period, ohlc) # we need to act sequentially so we don't have conflicting access to chunks + self.save(symbol, period, ohlc) # we need to act synchronously so we don't have conflicting access to chunks def save(self, symbol: str, period: timedelta, ohlc: OHLC) -> None: + # log.debug(f'saving ohlc {symbol} {period_name(period)} {ohlc}') time = dt(ohlc[0]) chunk = self.get_chunk(symbol, period, time) if not chunk: @@ -189,8 +189,10 @@ class OHLCRepository: else: start = from_isotime(chunk[0][0]) index = (time - start) // period - log.debug(f'save {symbol} {ohlc_name(period)} chunk {start} index {index} <= {len(chunk)}') - assert index <= len(chunk) + # log.debug(f'save {symbol} {ohlc_name(period)} chunk {start} index {index} <= {len(chunk)}') + if index > len(chunk): + log.error(f'chunk gap: {index} > {len(chunk)} {symbol} {period_name(period)} {ohlc}'+''.join(f'\n\t{c}' for c in chunk)) + exit(1) if index == len(chunk): assert from_isotime(chunk[-1][0]) + period == time chunk.append(ohlc) @@ -221,21 +223,22 @@ class OHLCRepository: if not chunk: return path = self.chunk_path(symbol, period, from_isotime(chunk[0][0])) - try: - with open(path, 'w') as file: - json.dump(chunk, file) - return - except FileNotFoundError: - os.makedirs(os.path.dirname(path), exist_ok=True) - with open(path, 'w') as file: - json.dump(chunk, file) + for _ in range(2): + try: + with open(path, 'w') as file: + json.dump(chunk, file) + self.cache[path] = chunk + return + except FileNotFoundError: + os.makedirs(os.path.dirname(path), exist_ok=True) + raise IOError(f'Could not write chunk {path}') def chunk_path(self, symbol: str, period: timedelta, time: datetime, *, chain_id: int = None) -> str: if chain_id is None: chain_id = current_chain.get().chain_id start = ohlc_start_time(time, period) - name = ohlc_name(period) + name = period_name(period) return f'{self.dir}/{chain_id}/{symbol}/{name}/' + ( f'{start.year}/{symbol}-{name}-{start:%Y%m%d}.json' if period < timedelta(hours=1) else # <1H data has a file per day f'{start.year}/{symbol}-{name}-{start:%Y%m}.json' if period < timedelta(days=7) else # <1W data has a file per month @@ -247,19 +250,19 @@ def pub_ohlc(_series:str, key: OHLCKey, bars: list[OHLC]): pool_addr, period = key chain_id = current_chain.get().chain_id return ( - f'{chain_id}|{pool_addr}|{ohlc_name(period)}', # channel name is like 0x...|1m + f'{chain_id}|{pool_addr}|{period_name(period)}', # channel name is like 0x...|1m 'ohlcs', (chain_id, pool_addr, bars) ) def ohlc_key_to_str(k): - return f'{k[0]}|{ohlc_name(k[1])}' + return f'{k[0]}|{period_name(k[1])}' def ohlc_str_to_key(s): pool, period_name = s.split('|') return pool, period_from_name(period_name) -def ohlc_finalize(_block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]): +def ohlc_save(_block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]): """ used as a finalization callback from BlockState data. """ diff --git a/src/dexorder/order/triggers.py b/src/dexorder/order/triggers.py index 5a7c285..782f65e 100644 --- a/src/dexorder/order/triggers.py +++ b/src/dexorder/order/triggers.py @@ -21,9 +21,10 @@ TimeTrigger = Callable[[int], None] # func(timestamp) time_triggers:BlockSet[TimeTrigger] = BlockSet('tt') PriceTrigger = Callable[[dec], Union[Awaitable[None],None]] # [async] func(pool_price) +UnconstrainedPriceTrigger = Callable[[Optional[dec]], Union[Awaitable[None],None]] # [async] func(pool_price) price_triggers:dict[str, BlockSet[PriceTrigger]] = defaultdictk(lambda addr:BlockSet(f'pt|{addr}')) # different BlockSet per pool address new_price_triggers:dict[str, set[PriceTrigger]] = defaultdict(set) # when price triggers are first set, they must be tested against the current price even if it didnt change this block -unconstrained_price_triggers: BlockSet[PriceTrigger] = BlockSet('upt') # tranches with no price constraints, whose time constraint is fulfilled +unconstrained_price_triggers: BlockSet[UnconstrainedPriceTrigger] = BlockSet('upt') # tranches with no price constraints, whose time constraint is fulfilled active_tranches: BlockDict[TrancheKey, Optional[PriceProof]] = BlockDict('at') # tranches which have passed all constraints and should be executed execution_requests:BlockDict[TrancheKey, ExecutionRequest] = BlockDict('e') # generated by the active tranches # todo should this really be blockdata? diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index 02428de..6166c51 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -98,7 +98,7 @@ class BlockStateRunner: if self.state: self.max_height_seen = max(self.max_height_seen, self.state.root_block.height) self.running = True - return await (self.run_polling() if config.polling > 0 else self.run_ws()) + return await (self.run_polling() if config.polling > 0 or not config.ws_url else self.run_ws()) async def run_ws(self): w3ws = await create_w3_ws() @@ -124,8 +124,8 @@ class BlockStateRunner: if not self.running: break await async_yield() - except (ConnectionClosedError, TimeoutError): - pass + except (ConnectionClosedError, TimeoutError, asyncio.TimeoutError) as e: + log.debug(f'runner timeout {e}') finally: # noinspection PyBroadException try: @@ -168,8 +168,8 @@ class BlockStateRunner: if not self.running: break await asyncio.sleep(config.polling) - except ConnectionClosedError: - pass + except (ConnectionClosedError, TimeoutError, asyncio.TimeoutError) as e: + log.debug(f'runner timeout {e}') finally: # noinspection PyBroadException try: @@ -258,7 +258,7 @@ class BlockStateRunner: async def handle_head(self, chain, block, w3): print(f'logger {log} {log.name} level {log.level} {logging.DEBUG} {logging.FATAL}') - log.debug(f'handle_head {block.height} {block.hash}') + log.debug(f'handle_head {block.height} {hexstr(block.hash)}') session = None batches = [] try: @@ -365,7 +365,7 @@ class BlockStateRunner: else: new_root_fork = fork.for_height(promotion_height) if new_root_fork: - log.debug(f'promoting root {new_root_fork.height} {new_root_fork.hash}') + log.debug(f'promoting root {new_root_fork.height} {hexstr(new_root_fork.hash)}') diff_items = self.state.promote_root(new_root_fork) for callback in self.on_promotion: # todo try/except for known retryable errors From c9d7a6f4ddb1eb9e76238bab283f5f783af96351 Mon Sep 17 00:00:00 2001 From: Tim Date: Fri, 26 Jan 2024 21:59:31 -0400 Subject: [PATCH 5/7] bugfixes for pool detection and runner error reporting --- src/dexorder/pools.py | 6 ++++++ src/dexorder/runner.py | 8 +++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/dexorder/pools.py b/src/dexorder/pools.py index 81ec780..3847b03 100644 --- a/src/dexorder/pools.py +++ b/src/dexorder/pools.py @@ -41,6 +41,12 @@ class Pools: except ContractLogicError: log.debug(f'new Unknown pool at {address}') found = Pool(chain=chain, address=address, exchange=Exchange.Unknown, base=ADDRESS_0, quote=ADDRESS_0, fee=0) + except ValueError as v: + if v.args[0].get('code') == -32000: + log.debug(f'new Unknown pool at {address}') + found = Pool(chain=chain, address=address, exchange=Exchange.Unknown, base=ADDRESS_0, quote=ADDRESS_0, fee=0) + else: + raise db.session.add(found) Pools.instances[key] = found return None if found.exchange == Exchange.Unknown else found diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index 6166c51..6176185 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -323,7 +323,13 @@ class BlockStateRunner: if not self.state_initialized: await self.do_state_init_cbs() # logevent callbacks - for future, callback, event, filter_args in batches: + while True: + try: + # we remove entries as we process them, so the exception handler doesn't re-await the callbacks + batch = batches.pop(0) + except IndexError: + break + future, callback, event, filter_args = batch if future is None: await maywait(callback()) # non-log callback else: From 1b5d1e334717897d8f468a0345588ff62cae01eb Mon Sep 17 00:00:00 2001 From: Tim Date: Sat, 27 Jan 2024 18:39:11 -0400 Subject: [PATCH 6/7] backfill dynamic batch size --- src/dexorder/runner.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index 6176185..20c22a4 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -22,6 +22,8 @@ from dexorder.util.async_util import maywait, Maywaitable log = logging.getLogger(__name__) +class Retry (Exception): ... + # todo detect reorgs and generate correct onHeadUpdate set by unioning the changes along the two forks, not including their common ancestor deltas class BlockStateRunner: @@ -247,6 +249,8 @@ class BlockStateRunner: try: await self.handle_head(chain, head, w3) prev_head = head + except Retry: + pass except Exception as x: log.exception(x) except Exception: @@ -333,12 +337,20 @@ class BlockStateRunner: if future is None: await maywait(callback()) # non-log callback else: - log_events = await future if config.parallel_logevent_queries else future + try: + log_events = await future if config.parallel_logevent_queries else future + except ValueError as e: + if e.args[0].get('code') == -32602: + # too many logs were returned in the batch, so decrease the batch size. + batch_size = int(chain.batch_size * 0.9) + chain.batch_size = batch_size + log.info(f'Decreasing batch size for {chain} to {batch_size}') + raise Retry for log_event in log_events: try: parsed = event.process_log(log_event) if event is not None else log_event - except (LogTopicError, MismatchedABI) as x: - log.warning(f'logevent parse error {x}\n{log_event}') + except (LogTopicError, MismatchedABI) as e: + log.warning(f'logevent parse error {e}\n{log_event}') else: # todo try/except for known retryable errors await maywait(callback(parsed)) From 11d9a2f6f4893d6d318787a23efe2c2af35733e4 Mon Sep 17 00:00:00 2001 From: Tim Date: Mon, 29 Jan 2024 01:26:28 -0400 Subject: [PATCH 7/7] backfill touchups --- src/dexorder/base/chain.py | 4 ++-- src/dexorder/runner.py | 7 +++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/dexorder/base/chain.py b/src/dexorder/base/chain.py index 6c86e47..41bbb10 100644 --- a/src/dexorder/base/chain.py +++ b/src/dexorder/base/chain.py @@ -51,9 +51,9 @@ Goerli = Blockchain(5, 'Goerli') Polygon = Blockchain(137, 'Polygon') # POS not zkEVM Mumbai = Blockchain(80001, 'Mumbai') BSC = Blockchain(56, 'BSC') -Arbitrum = Blockchain(42161, 'Arbitrum', 3, batch_size=1000) # todo configure batch size... does it depend on log count? :( +Arbitrum = Blockchain(42161, 'Arbitrum', 3, batch_size=2000) # todo configure batch size Mock = Blockchain(31337, 'Mock', 3, batch_size=10000) -Alpha = Blockchain(1337, 'Dexorder Alpha', 3, batch_size=100) +Alpha = Blockchain(1337, 'Dexorder Alpha', 3, batch_size=1000) current_chain = ContextVar[Blockchain]('current_chain') diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index 20c22a4..f5946a6 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -19,6 +19,7 @@ from dexorder.database.model import Block from dexorder.database.model.block import current_block, latest_block from dexorder.util import hexstr, topic from dexorder.util.async_util import maywait, Maywaitable +from dexorder.util.shutdown import fatal log = logging.getLogger(__name__) @@ -342,10 +343,8 @@ class BlockStateRunner: except ValueError as e: if e.args[0].get('code') == -32602: # too many logs were returned in the batch, so decrease the batch size. - batch_size = int(chain.batch_size * 0.9) - chain.batch_size = batch_size - log.info(f'Decreasing batch size for {chain} to {batch_size}') - raise Retry + fatal(f'Decrease batch size for {chain}') + raise for log_event in log_events: try: parsed = event.process_log(log_event) if event is not None else log_event