From 6b3e464574a5941de92a50a595e423518101a4a5 Mon Sep 17 00:00:00 2001 From: Tim Date: Sat, 17 Feb 2024 20:25:14 -0400 Subject: [PATCH] runner refactor to support walker --- src/dexorder/base/chain.py | 6 +- src/dexorder/bin/finaldata.py | 46 +++++++++++++ src/dexorder/blocktime.py | 20 ++++++ src/dexorder/configuration/schema.py | 4 +- src/dexorder/event_handler.py | 41 +++--------- src/dexorder/pools.py | 22 ++++++- src/dexorder/progressor.py | 98 ++++++++++++++++++++++++++++ src/dexorder/runner.py | 95 +++++---------------------- src/dexorder/walker.py | 86 ++++++++++++++++++++++++ 9 files changed, 304 insertions(+), 114 deletions(-) create mode 100644 src/dexorder/bin/finaldata.py create mode 100644 src/dexorder/blocktime.py create mode 100644 src/dexorder/progressor.py create mode 100644 src/dexorder/walker.py diff --git a/src/dexorder/base/chain.py b/src/dexorder/base/chain.py index 01e3240..a9a62b2 100644 --- a/src/dexorder/base/chain.py +++ b/src/dexorder/base/chain.py @@ -28,8 +28,8 @@ class Blockchain: """ self.chain_id = chain_id self.name = name - self.confirms = confirms # todo configure - self.batch_size = batch_size # todo configure + self.confirms = confirms + self.batch_size = batch_size Blockchain._instances_by_id[chain_id] = self Blockchain._instances_by_name[name] = self @@ -47,7 +47,7 @@ Goerli = Blockchain(5, 'Goerli') Polygon = Blockchain(137, 'Polygon') # POS not zkEVM Mumbai = Blockchain(80001, 'Mumbai') BSC = Blockchain(56, 'BSC') -Arbitrum = Blockchain(42161, 'Arbitrum', 3, batch_size=2000) # todo configure batch size +Arbitrum = Blockchain(42161, 'Arbitrum', 3, batch_size=2000) Mock = Blockchain(31337, 'Mock', 3, batch_size=10000) Alpha = Blockchain(1337, 'Dexorder Alpha', 3, batch_size=1000) diff --git a/src/dexorder/bin/finaldata.py b/src/dexorder/bin/finaldata.py new file mode 100644 index 0000000..ea3caf7 --- /dev/null +++ b/src/dexorder/bin/finaldata.py @@ -0,0 +1,46 @@ +import asyncio +import logging +from datetime import datetime + +from web3.types import EventData + +from dexorder import dec +from dexorder.bin.executable import execute +from dexorder.blocktime import get_block_timestamp +from dexorder.contract import get_contract_event +from dexorder.database.model.pool import PoolDict +from dexorder.pools import get_uniswap_data +from dexorder.walker import BlockWalker + +log = logging.getLogger(__name__) + + +async def handle_backfill_uniswap_swaps(swaps: list[EventData]): + # asynchronously prefetch the block timestamps we'll need + hashes = set(swap['blockHash'] for swap in swaps) + await asyncio.gather(*[get_block_timestamp(h) for h in hashes]) + # now execute the swaps synchronously + for swap in swaps: + pool, time, price = await get_uniswap_data(swap) + await handle_uniswap_swap(pool, time, price) + + +async def handle_uniswap_swap(pool: PoolDict, time: datetime, price: dec): + pass + +def finalize_callback(block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]): + # start = now() + log.info("finalizing OHLC's") + ohlc_save(block, diffs) + # log.info(f'\ttook {(now() - start).total_seconds():.1f} seconds') + log.info(f'backfill completed through block {block.height} {from_timestamp(block.timestamp):%Y-%m-%d %H:%M:%S} {hexstr(block.hash)}') + +async def main(): + walker = BlockWalker() + walker.add_event_trigger(handle_backfill_uniswap_swaps, + get_contract_event('IUniswapV3PoolEvents', 'Swap'), multi=True) + await walker.run() + + +if __name__ == '__main__': + execute(main()) diff --git a/src/dexorder/blocktime.py b/src/dexorder/blocktime.py new file mode 100644 index 0000000..3955f56 --- /dev/null +++ b/src/dexorder/blocktime.py @@ -0,0 +1,20 @@ +import logging + +from async_lru import alru_cache + +from dexorder import current_w3 +from dexorder.blockstate import current_blockstate +from dexorder.util import hexint + +log = logging.getLogger(__name__) + + +@alru_cache(maxsize=1024) +async def get_block_timestamp(blockhash) -> int: + block = current_blockstate.get().by_hash.get(blockhash) + if block is not None: + return block.timestamp + response = await current_w3.get().provider.make_request('eth_getBlockByHash', [blockhash, False]) + raw = hexint(response['result']['timestamp']) + # noinspection PyTypeChecker + return raw if type(raw) is int else hexint(raw) diff --git a/src/dexorder/configuration/schema.py b/src/dexorder/configuration/schema.py index f845c61..c708a1a 100644 --- a/src/dexorder/configuration/schema.py +++ b/src/dexorder/configuration/schema.py @@ -9,13 +9,15 @@ from typing import Optional @dataclass class Config: + confirms: Optional[int] = None # number of blocks before data is considered finalized. if None then the chain's default setting is used + batch_size: Optional[int] = None # max number of blocks to query in a single backfill rpc request rpc_url: str = 'http://localhost:8545' ws_url: Optional[str] = 'ws://localhost:8545' rpc_urls: Optional[dict[str,str]] = field(default_factory=dict) db_url: Optional[str] = 'postgresql://dexorder:redroxed@localhost/dexorder' - ohlc_dir: Optional[str] = None # if empty string or None, then OHLC's are not saved to disk dump_sql: bool = False redis_url: Optional[str] = 'redis://localhost:6379' + ohlc_dir: Optional[str] = None # if empty string or None, then OHLC's are not saved to disk parallel_logevent_queries: bool = True polling: float = 0 # seconds between queries for a new block. 0 disables polling and uses a websocket subscription on ws_url instead diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index 150bdd8..b1125d1 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -3,23 +3,21 @@ import functools import logging from uuid import UUID -from async_lru import alru_cache from web3.types import EventData -from dexorder import current_pub, db, dec, from_timestamp, minutely, current_w3 +from dexorder import current_pub, db, from_timestamp, minutely from dexorder.base.chain import current_chain, current_clock from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, OrderKey -from dexorder.blockstate import current_blockstate +from dexorder.blocktime import get_block_timestamp from dexorder.ohlc import ohlcs, recent_ohlcs from dexorder.transaction import submit_transaction_request -from dexorder.pools import uniswap_price, new_pool_prices, pool_prices, get_pool +from dexorder.pools import new_pool_prices, pool_prices, get_uniswap_data from dexorder.contract.dexorder import vault_address, VaultContract from dexorder.contract import ERC20 -from dexorder.util import hexint from dexorder.vault_blockdata import vault_owners, vault_balances from dexorder.database.model.block import current_block from dexorder.database.model.transaction import TransactionJob -from dexorder.base.orderlib import SwapOrderState, Exchange +from dexorder.base.orderlib import SwapOrderState from dexorder.order.orderstate import Order from dexorder.order.triggers import OrderTriggers, price_triggers, time_triggers, \ unconstrained_price_triggers, execution_requests, inflight_execution_requests, TrancheStatus, active_tranches, new_price_triggers, activate_order @@ -165,33 +163,14 @@ async def handle_uniswap_swaps(swaps: list[EventData]): async def handle_uniswap_swap(swap: EventData): - try: - sqrt_price = swap['args']['sqrtPriceX96'] - except KeyError: + data = await get_uniswap_data(swap) + if data is None: return - addr = swap['address'] - pool = await get_pool(addr) - if pool['exchange'] != Exchange.UniswapV3.value: - log.debug(f'Ignoring {Exchange(pool["exchange"])} pool {addr}') - return - price: dec = await uniswap_price(pool, sqrt_price) - timestamp = await get_block_timestamp(swap['blockHash']) - dt = from_timestamp(timestamp) + pool, time, price = data + addr = pool['address'] pool_prices[addr] = price - ohlcs.update_all(addr, dt, price) - log.debug(f'pool {addr} {minutely(dt)} {price}') - - -# todo is there a better spot for this function? -@alru_cache(maxsize=1024) -async def get_block_timestamp(blockhash) -> int: - block = current_blockstate.get().by_hash.get(blockhash) - if block is not None: - return block.timestamp - response = await current_w3.get().provider.make_request('eth_getBlockByHash', [blockhash, False]) - raw = hexint(response['result']['timestamp']) - # noinspection PyTypeChecker - return raw if type(raw) is int else hexint(raw) + ohlcs.update_all(addr, time, price) + log.debug(f'pool {addr} {minutely(time)} {price}') def handle_vault_created(created: EventData): diff --git a/src/dexorder/pools.py b/src/dexorder/pools.py index a7c7a3d..e676685 100644 --- a/src/dexorder/pools.py +++ b/src/dexorder/pools.py @@ -2,16 +2,18 @@ import asyncio import logging from web3.exceptions import ContractLogicError +from web3.types import EventData -from dexorder import dec, ADDRESS_0 +from dexorder import dec, ADDRESS_0, from_timestamp from dexorder.addrmeta import address_metadata from dexorder.base.chain import current_chain from dexorder.base.orderlib import Exchange from dexorder.blockstate import BlockDict from dexorder.blockstate.blockdata import K, V +from dexorder.blocktime import get_block_timestamp from dexorder.database.model.pool import PoolDict from dexorder.tokens import get_token -from dexorder.uniswap import UniswapV3Pool, uniswapV3_pool_address +from dexorder.uniswap import UniswapV3Pool, uniswapV3_pool_address, log log = logging.getLogger(__name__) @@ -86,3 +88,19 @@ async def ensure_pool_price(pool: PoolDict): else: raise ValueError(f'Unsupported exchange type {pool["exchange"]}') # todo other exchanges + + +async def get_uniswap_data(swap: EventData): + try: + sqrt_price = swap['args']['sqrtPriceX96'] + except KeyError: + return None + addr = swap['address'] + pool = await get_pool(addr) + if pool['exchange'] != Exchange.UniswapV3.value: + log.debug(f'Ignoring {Exchange(pool["exchange"])} pool {addr}') + return None + price: dec = await uniswap_price(pool, sqrt_price) + timestamp = await get_block_timestamp(swap['blockHash']) + dt = from_timestamp(timestamp) + return pool, dt, price diff --git a/src/dexorder/progressor.py b/src/dexorder/progressor.py new file mode 100644 index 0000000..ebb45a4 --- /dev/null +++ b/src/dexorder/progressor.py @@ -0,0 +1,98 @@ +import logging +from abc import ABCMeta, abstractmethod +from typing import Callable, Union + +from web3.contract.contract import ContractEvents +from web3.exceptions import LogTopicError, MismatchedABI +from web3.types import EventData + +from dexorder import config, current_w3, NARG +from dexorder.base.chain import current_chain +from dexorder.util import topic +from dexorder.util.async_util import Maywaitable, maywait +from dexorder.util.shutdown import fatal + +log = logging.getLogger(__name__) + + +class BlockProgressor(metaclass=ABCMeta): + """ + Base class for BlockStateRunner and BlockWalker holds the common standard callback machinery and also the + height of the latest processed block + """ + + def __init__(self): + # items are (callback, event, log_filter). The callback is invoked with web3 EventData for every detected event + self.events:list[tuple[Callable[[EventData],Maywaitable[None]],ContractEvents,dict]] = [] + # these callbacks are invoked after every block and also every second if there wasnt a block + self.postprocess_cbs:list[Callable[[],Maywaitable[None]]] = [] + + def add_event_trigger(self, + # callback takes either a single event if multi=False, or if multi=True then a list of all events in the processing range + callback: Union[Callable[[EventData], Maywaitable[None]], + Callable[[list[EventData]], Maywaitable[None]]], + event: ContractEvents = None, + log_filter: Union[dict, str] = None, + *, multi=False): + """ + if event is None, the callback is still invoked in the series of log handlers but with no argument instead of logs + """ + if log_filter is None and event is not None: + log_filter = {'topics': [topic(event.abi)]} + cb = callback if multi else lambda events: map(cb, events) + self.events.append((cb, event, log_filter)) + + @abstractmethod + def run(self): + pass + + async def get_backfill_batches(self, from_height, to_height, w3=None): + if w3 is None: + w3 = current_w3.get() + batches = [] + for callback, event, log_filter in self.events: + if log_filter is None: + batches.append((None, callback, event, None)) + else: + lf = dict(log_filter) + lf['fromBlock'] = from_height + lf['toBlock'] = to_height + log.debug(f'querying backfill {from_height} through {to_height}') + get_logs = w3.eth.get_logs(lf) + if not config.parallel_logevent_queries: + get_logs = await get_logs + batches.append((get_logs, callback, event, lf)) + for callback in self.postprocess_cbs: + batches.append((None, callback, None, None)) + return batches + + + @staticmethod + async def invoke_callbacks(batches, chain=None): + if chain is None: + chain = current_chain.get() + # logevent callbacks + while batches: + # we remove entries as we process them, so the exception handler doesn't re-await the callbacks + batch = batches.pop(0) + future, callback, event, filter_args = batch + if future is None: + await maywait(callback()) # non-log callback + else: + try: + log_events = await future if config.parallel_logevent_queries else future + except ValueError as e: + if e.args[0].get('code') == -32602: + # too many logs were returned in the batch, so decrease the batch size. + fatal(f'Decrease batch size for {chain}') + raise + parsed_events = [] + for log_event in log_events: + try: + parsed = event.process_log(log_event) if event is not None else log_event + except (LogTopicError, MismatchedABI) as e: + log.warning(f'logevent parse error {e}\n{log_event}') + parsed = NARG # need a placeholder + parsed_events.append(parsed) + # todo try/except for known retryable errors + await maywait(callback(parsed_events)) diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index acd1a34..9cb10ae 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -1,11 +1,9 @@ import asyncio import logging from asyncio import Queue -from typing import Union, Any, Iterable, Callable +from typing import Any, Iterable, Callable -from web3.contract.contract import ContractEvents from web3.exceptions import LogTopicError, MismatchedABI -from web3.types import EventData # noinspection PyPackageRequirements from websockets.exceptions import ConnectionClosedError @@ -17,15 +15,20 @@ from dexorder.blockstate import BlockState, current_blockstate from dexorder.blockstate.diff import DiffEntryItem from dexorder.database.model import Block from dexorder.database.model.block import current_block, latest_block -from dexorder.util import hexstr, topic +from dexorder.progressor import BlockProgressor +from dexorder.util import hexstr from dexorder.util.async_util import maywait, Maywaitable +from dexorder.util.shutdown import fatal log = logging.getLogger(__name__) class Retry (Exception): ... + # todo detect reorgs and generate correct onHeadUpdate set by unioning the changes along the two forks, not including their common ancestor deltas -class BlockStateRunner: + + +class BlockStateRunner(BlockProgressor): """ @@ -59,14 +62,9 @@ class BlockStateRunner: """ If state is None, then it is initialized as empty using the first block seen as the root block. Then the second block begins log event handling. """ + super().__init__() self.state = state - # items are (callback, event, log_filter). The callback is invoked with web3 EventData for every detected event - self.events:list[tuple[Callable[[EventData],Maywaitable[None]],ContractEvents,dict]] = [] - - # these callbacks are invoked after every block and also every second if there wasnt a block - self.postprocess_cbs:list[Callable[[],Maywaitable[None]]] = [] - # onStateInit callbacks are invoked after the initial state is loaded or created self.on_state_init: list[Callable[[],Maywaitable[None]]] = [] self.state_initialized = False @@ -86,22 +84,6 @@ class BlockStateRunner: self.running = False - - def add_event_trigger(self, - # callback takes either a single event if multi=False, or if multi=True then a list of all events in the processing range - callback: Union[Callable[[EventData], Maywaitable[None]], - Callable[[list[EventData]], Maywaitable[None]]], - event: ContractEvents = None, - log_filter: Union[dict, str] = None, - *, multi=False): - """ - if event is None, the callback is still invoked in the series of log handlers but with no argument instead of logs - """ - if log_filter is None and event is not None: - log_filter = {'topics': [topic(event.abi)]} - cb = callback if multi else lambda events: map(cb, events) - self.events.append((cb, event, log_filter)) - async def run(self): # this run() process discovers new heads and puts them on a queue for the worker to process. the discovery is ether websockets or polling if self.state: @@ -214,7 +196,8 @@ class BlockStateRunner: if self.state or config.backfill: # backfill batches start_height = self.max_height_seen - batch_height = start_height + chain.batch_size - 1 + batch_size = config.batch_size if config.batch_size is not None else chain.batch_size + batch_height = start_height + batch_size - 1 while batch_height < head.height: # the backfill is larger than a single batch, so we push intermediate head blocks onto the queue response = await w3.provider.make_request('eth_getBlockByNumber', [hex(batch_height), False]) @@ -291,25 +274,11 @@ class BlockStateRunner: if fork is None: log.debug(f'discarded late-arriving head {block}') else: - batches = [] + batches: list + from_height = fork.parent.height + to_height = fork.height if fork.disjoint: - # backfill batches - for callback, event, log_filter in self.events: - if log_filter is None: - batches.append((None, callback, event, None)) - else: - from_height = fork.parent.height - to_height = fork.height - lf = dict(log_filter) - lf['fromBlock'] = from_height - lf['toBlock'] = to_height - log.debug(f'querying backfill {from_height} through {to_height}') - get_logs = w3.eth.get_logs(lf) - if not config.parallel_logevent_queries: - get_logs = await get_logs - batches.append((get_logs, callback, event, lf)) - for callback in self.postprocess_cbs: - batches.append((None, callback, None, None)) + batches = await self.get_backfill_batches(from_height, to_height, w3) else: # event callbacks are triggered in the order in which they're registered. the events passed to # each callback are in block transaction order @@ -337,36 +306,7 @@ class BlockStateRunner: current_pub.set(lambda room, evnt, *args: pubs.append((room, evnt, args))) # used by handle_vault_created if not self.state_initialized: await self.do_state_init_cbs() - # logevent callbacks - while True: - try: - # we remove entries as we process them, so the exception handler doesn't re-await the callbacks - batch = batches.pop(0) - except IndexError: - break - future, callback, event, filter_args = batch - if future is None: - await maywait(callback()) # non-log callback - else: - try: - log_events = await future if config.parallel_logevent_queries else future - except ValueError as e: - if e.args[0].get('code') == -32602: - # too many logs were returned in the batch, so decrease the batch size. - # fatal(f'Decrease batch size for {chain}') - log.warning(f'Decrease batch size for {chain}') - return - raise - parsed_events = [] - for log_event in log_events: - try: - parsed = event.process_log(log_event) if event is not None else log_event - except (LogTopicError, MismatchedABI) as e: - log.warning(f'logevent parse error {e}\n{log_event}') - parsed = NARG - parsed_events.append(parsed) - # todo try/except for known retryable errors - await maywait(callback(parsed_events)) + await self.invoke_callbacks(batches) # todo # IMPORTANT! check for a reorg and generate a reorg diff list. the diff list we need is the union of the set of keys touched by either @@ -378,7 +318,8 @@ class BlockStateRunner: await maywait(callback(block, diff_items)) # check for root promotion - promotion_height = latest_block.get().height - chain.confirms + confirm_offset = (config.confirms if config.confirms is not None else chain.confirms) - 1 + promotion_height = latest_block.get().height - confirm_offset new_root_fork = None if fork.disjoint: # individually check the fork's head and ancestor diff --git a/src/dexorder/walker.py b/src/dexorder/walker.py new file mode 100644 index 0000000..c20fcb9 --- /dev/null +++ b/src/dexorder/walker.py @@ -0,0 +1,86 @@ +import asyncio +import logging +from asyncio import Queue + +from websockets import ConnectionClosedError + +from dexorder import Blockchain, config, db +from dexorder.base.chain import current_chain +from dexorder.blockchain.connection import create_w3 +from dexorder.progressor import BlockProgressor + +log = logging.getLogger(__name__) + + +class BlockWalker (BlockProgressor): + """ + Walker is similar to the Runner in that it progresses through the blockchain, but unlike the Runner, the Walker only + processes blocks after they have reached finalization age. It does not create or maintain any BlockState. + """ + + def __init__(self): + super().__init__() + self.queue: Queue = Queue() + self.running = False + + async def run(self): + self.running = True + db.connect() + w3 = create_w3() + chain_id = await w3.eth.chain_id + chain = Blockchain.for_id(chain_id) + current_chain.set(chain) + confirm_offset = (config.confirms if config.confirms is not None else chain.confirms) - 1 + batch_size = config.batch_size if config.batch_size is not None else chain.batch_size + + kv_key = f'walker_height|{chain_id}' + processed_height = db.kv.get(kv_key, config.backfill) + prev_height = None + while self.running: + try: + block = await w3.eth.get_block('latest') + latest_height = block['number'] + if latest_height > prev_height: + prev_height = latest_height + log.debug(f'polled new block {latest_height}') + promotion_height = latest_height - confirm_offset + while processed_height <= promotion_height: + stop = min(promotion_height, processed_height+batch_size-1) + await self.handle(processed_height, stop, chain) + processed_height = db.kv[kv_key] = stop + if not self.running: + break + await asyncio.sleep(config.polling) + except (ConnectionClosedError, TimeoutError, asyncio.TimeoutError) as e: + log.debug(f'walker timeout {e}') + finally: + # noinspection PyBroadException + try: + # noinspection PyUnresolvedReferences + await w3.provider.disconnect() + except Exception: + pass + + + async def handle(self, from_height, to_height, chain=None): + if chain is None: + chain = current_chain.get() + session = None + try: + batches = await self.get_backfill_batches(from_height, to_height) + session = db.session + session.begin() + await self.invoke_callbacks(batches, chain) + except: # legitimately catch EVERYTHING because we re-raise + log.debug('rolling back session') + if session is not None: + session.rollback() + raise + else: + if session is not None: + session.commit() + log.info(f'completed through block {to_height}') + finally: + if session is not None: + session.close() +