From 39fe81a49dec25a8e63265dc52e431f7029c5855 Mon Sep 17 00:00:00 2001 From: Tim Date: Sat, 17 Feb 2024 20:25:14 -0400 Subject: [PATCH 1/5] runner refactor to support walker --- src/dexorder/base/chain.py | 6 +- src/dexorder/bin/finaldata.py | 46 +++++++++++++ src/dexorder/blocktime.py | 20 ++++++ src/dexorder/configuration/schema.py | 4 +- src/dexorder/event_handler.py | 41 +++--------- src/dexorder/pools.py | 22 ++++++- src/dexorder/progressor.py | 98 ++++++++++++++++++++++++++++ src/dexorder/runner.py | 95 +++++---------------------- src/dexorder/walker.py | 86 ++++++++++++++++++++++++ 9 files changed, 304 insertions(+), 114 deletions(-) create mode 100644 src/dexorder/bin/finaldata.py create mode 100644 src/dexorder/blocktime.py create mode 100644 src/dexorder/progressor.py create mode 100644 src/dexorder/walker.py diff --git a/src/dexorder/base/chain.py b/src/dexorder/base/chain.py index 01e3240..a9a62b2 100644 --- a/src/dexorder/base/chain.py +++ b/src/dexorder/base/chain.py @@ -28,8 +28,8 @@ class Blockchain: """ self.chain_id = chain_id self.name = name - self.confirms = confirms # todo configure - self.batch_size = batch_size # todo configure + self.confirms = confirms + self.batch_size = batch_size Blockchain._instances_by_id[chain_id] = self Blockchain._instances_by_name[name] = self @@ -47,7 +47,7 @@ Goerli = Blockchain(5, 'Goerli') Polygon = Blockchain(137, 'Polygon') # POS not zkEVM Mumbai = Blockchain(80001, 'Mumbai') BSC = Blockchain(56, 'BSC') -Arbitrum = Blockchain(42161, 'Arbitrum', 3, batch_size=2000) # todo configure batch size +Arbitrum = Blockchain(42161, 'Arbitrum', 3, batch_size=2000) Mock = Blockchain(31337, 'Mock', 3, batch_size=10000) Alpha = Blockchain(1337, 'Dexorder Alpha', 3, batch_size=1000) diff --git a/src/dexorder/bin/finaldata.py b/src/dexorder/bin/finaldata.py new file mode 100644 index 0000000..ea3caf7 --- /dev/null +++ b/src/dexorder/bin/finaldata.py @@ -0,0 +1,46 @@ +import asyncio +import logging +from datetime import datetime + +from web3.types import EventData + +from dexorder import dec +from dexorder.bin.executable import execute +from dexorder.blocktime import get_block_timestamp +from dexorder.contract import get_contract_event +from dexorder.database.model.pool import PoolDict +from dexorder.pools import get_uniswap_data +from dexorder.walker import BlockWalker + +log = logging.getLogger(__name__) + + +async def handle_backfill_uniswap_swaps(swaps: list[EventData]): + # asynchronously prefetch the block timestamps we'll need + hashes = set(swap['blockHash'] for swap in swaps) + await asyncio.gather(*[get_block_timestamp(h) for h in hashes]) + # now execute the swaps synchronously + for swap in swaps: + pool, time, price = await get_uniswap_data(swap) + await handle_uniswap_swap(pool, time, price) + + +async def handle_uniswap_swap(pool: PoolDict, time: datetime, price: dec): + pass + +def finalize_callback(block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]): + # start = now() + log.info("finalizing OHLC's") + ohlc_save(block, diffs) + # log.info(f'\ttook {(now() - start).total_seconds():.1f} seconds') + log.info(f'backfill completed through block {block.height} {from_timestamp(block.timestamp):%Y-%m-%d %H:%M:%S} {hexstr(block.hash)}') + +async def main(): + walker = BlockWalker() + walker.add_event_trigger(handle_backfill_uniswap_swaps, + get_contract_event('IUniswapV3PoolEvents', 'Swap'), multi=True) + await walker.run() + + +if __name__ == '__main__': + execute(main()) diff --git a/src/dexorder/blocktime.py b/src/dexorder/blocktime.py new file mode 100644 index 0000000..3955f56 --- /dev/null +++ b/src/dexorder/blocktime.py @@ -0,0 +1,20 @@ +import logging + +from async_lru import alru_cache + +from dexorder import current_w3 +from dexorder.blockstate import current_blockstate +from dexorder.util import hexint + +log = logging.getLogger(__name__) + + +@alru_cache(maxsize=1024) +async def get_block_timestamp(blockhash) -> int: + block = current_blockstate.get().by_hash.get(blockhash) + if block is not None: + return block.timestamp + response = await current_w3.get().provider.make_request('eth_getBlockByHash', [blockhash, False]) + raw = hexint(response['result']['timestamp']) + # noinspection PyTypeChecker + return raw if type(raw) is int else hexint(raw) diff --git a/src/dexorder/configuration/schema.py b/src/dexorder/configuration/schema.py index f845c61..c708a1a 100644 --- a/src/dexorder/configuration/schema.py +++ b/src/dexorder/configuration/schema.py @@ -9,13 +9,15 @@ from typing import Optional @dataclass class Config: + confirms: Optional[int] = None # number of blocks before data is considered finalized. if None then the chain's default setting is used + batch_size: Optional[int] = None # max number of blocks to query in a single backfill rpc request rpc_url: str = 'http://localhost:8545' ws_url: Optional[str] = 'ws://localhost:8545' rpc_urls: Optional[dict[str,str]] = field(default_factory=dict) db_url: Optional[str] = 'postgresql://dexorder:redroxed@localhost/dexorder' - ohlc_dir: Optional[str] = None # if empty string or None, then OHLC's are not saved to disk dump_sql: bool = False redis_url: Optional[str] = 'redis://localhost:6379' + ohlc_dir: Optional[str] = None # if empty string or None, then OHLC's are not saved to disk parallel_logevent_queries: bool = True polling: float = 0 # seconds between queries for a new block. 0 disables polling and uses a websocket subscription on ws_url instead diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index 150bdd8..b1125d1 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -3,23 +3,21 @@ import functools import logging from uuid import UUID -from async_lru import alru_cache from web3.types import EventData -from dexorder import current_pub, db, dec, from_timestamp, minutely, current_w3 +from dexorder import current_pub, db, from_timestamp, minutely from dexorder.base.chain import current_chain, current_clock from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, OrderKey -from dexorder.blockstate import current_blockstate +from dexorder.blocktime import get_block_timestamp from dexorder.ohlc import ohlcs, recent_ohlcs from dexorder.transaction import submit_transaction_request -from dexorder.pools import uniswap_price, new_pool_prices, pool_prices, get_pool +from dexorder.pools import new_pool_prices, pool_prices, get_uniswap_data from dexorder.contract.dexorder import vault_address, VaultContract from dexorder.contract import ERC20 -from dexorder.util import hexint from dexorder.vault_blockdata import vault_owners, vault_balances from dexorder.database.model.block import current_block from dexorder.database.model.transaction import TransactionJob -from dexorder.base.orderlib import SwapOrderState, Exchange +from dexorder.base.orderlib import SwapOrderState from dexorder.order.orderstate import Order from dexorder.order.triggers import OrderTriggers, price_triggers, time_triggers, \ unconstrained_price_triggers, execution_requests, inflight_execution_requests, TrancheStatus, active_tranches, new_price_triggers, activate_order @@ -165,33 +163,14 @@ async def handle_uniswap_swaps(swaps: list[EventData]): async def handle_uniswap_swap(swap: EventData): - try: - sqrt_price = swap['args']['sqrtPriceX96'] - except KeyError: + data = await get_uniswap_data(swap) + if data is None: return - addr = swap['address'] - pool = await get_pool(addr) - if pool['exchange'] != Exchange.UniswapV3.value: - log.debug(f'Ignoring {Exchange(pool["exchange"])} pool {addr}') - return - price: dec = await uniswap_price(pool, sqrt_price) - timestamp = await get_block_timestamp(swap['blockHash']) - dt = from_timestamp(timestamp) + pool, time, price = data + addr = pool['address'] pool_prices[addr] = price - ohlcs.update_all(addr, dt, price) - log.debug(f'pool {addr} {minutely(dt)} {price}') - - -# todo is there a better spot for this function? -@alru_cache(maxsize=1024) -async def get_block_timestamp(blockhash) -> int: - block = current_blockstate.get().by_hash.get(blockhash) - if block is not None: - return block.timestamp - response = await current_w3.get().provider.make_request('eth_getBlockByHash', [blockhash, False]) - raw = hexint(response['result']['timestamp']) - # noinspection PyTypeChecker - return raw if type(raw) is int else hexint(raw) + ohlcs.update_all(addr, time, price) + log.debug(f'pool {addr} {minutely(time)} {price}') def handle_vault_created(created: EventData): diff --git a/src/dexorder/pools.py b/src/dexorder/pools.py index a7c7a3d..e676685 100644 --- a/src/dexorder/pools.py +++ b/src/dexorder/pools.py @@ -2,16 +2,18 @@ import asyncio import logging from web3.exceptions import ContractLogicError +from web3.types import EventData -from dexorder import dec, ADDRESS_0 +from dexorder import dec, ADDRESS_0, from_timestamp from dexorder.addrmeta import address_metadata from dexorder.base.chain import current_chain from dexorder.base.orderlib import Exchange from dexorder.blockstate import BlockDict from dexorder.blockstate.blockdata import K, V +from dexorder.blocktime import get_block_timestamp from dexorder.database.model.pool import PoolDict from dexorder.tokens import get_token -from dexorder.uniswap import UniswapV3Pool, uniswapV3_pool_address +from dexorder.uniswap import UniswapV3Pool, uniswapV3_pool_address, log log = logging.getLogger(__name__) @@ -86,3 +88,19 @@ async def ensure_pool_price(pool: PoolDict): else: raise ValueError(f'Unsupported exchange type {pool["exchange"]}') # todo other exchanges + + +async def get_uniswap_data(swap: EventData): + try: + sqrt_price = swap['args']['sqrtPriceX96'] + except KeyError: + return None + addr = swap['address'] + pool = await get_pool(addr) + if pool['exchange'] != Exchange.UniswapV3.value: + log.debug(f'Ignoring {Exchange(pool["exchange"])} pool {addr}') + return None + price: dec = await uniswap_price(pool, sqrt_price) + timestamp = await get_block_timestamp(swap['blockHash']) + dt = from_timestamp(timestamp) + return pool, dt, price diff --git a/src/dexorder/progressor.py b/src/dexorder/progressor.py new file mode 100644 index 0000000..ebb45a4 --- /dev/null +++ b/src/dexorder/progressor.py @@ -0,0 +1,98 @@ +import logging +from abc import ABCMeta, abstractmethod +from typing import Callable, Union + +from web3.contract.contract import ContractEvents +from web3.exceptions import LogTopicError, MismatchedABI +from web3.types import EventData + +from dexorder import config, current_w3, NARG +from dexorder.base.chain import current_chain +from dexorder.util import topic +from dexorder.util.async_util import Maywaitable, maywait +from dexorder.util.shutdown import fatal + +log = logging.getLogger(__name__) + + +class BlockProgressor(metaclass=ABCMeta): + """ + Base class for BlockStateRunner and BlockWalker holds the common standard callback machinery and also the + height of the latest processed block + """ + + def __init__(self): + # items are (callback, event, log_filter). The callback is invoked with web3 EventData for every detected event + self.events:list[tuple[Callable[[EventData],Maywaitable[None]],ContractEvents,dict]] = [] + # these callbacks are invoked after every block and also every second if there wasnt a block + self.postprocess_cbs:list[Callable[[],Maywaitable[None]]] = [] + + def add_event_trigger(self, + # callback takes either a single event if multi=False, or if multi=True then a list of all events in the processing range + callback: Union[Callable[[EventData], Maywaitable[None]], + Callable[[list[EventData]], Maywaitable[None]]], + event: ContractEvents = None, + log_filter: Union[dict, str] = None, + *, multi=False): + """ + if event is None, the callback is still invoked in the series of log handlers but with no argument instead of logs + """ + if log_filter is None and event is not None: + log_filter = {'topics': [topic(event.abi)]} + cb = callback if multi else lambda events: map(cb, events) + self.events.append((cb, event, log_filter)) + + @abstractmethod + def run(self): + pass + + async def get_backfill_batches(self, from_height, to_height, w3=None): + if w3 is None: + w3 = current_w3.get() + batches = [] + for callback, event, log_filter in self.events: + if log_filter is None: + batches.append((None, callback, event, None)) + else: + lf = dict(log_filter) + lf['fromBlock'] = from_height + lf['toBlock'] = to_height + log.debug(f'querying backfill {from_height} through {to_height}') + get_logs = w3.eth.get_logs(lf) + if not config.parallel_logevent_queries: + get_logs = await get_logs + batches.append((get_logs, callback, event, lf)) + for callback in self.postprocess_cbs: + batches.append((None, callback, None, None)) + return batches + + + @staticmethod + async def invoke_callbacks(batches, chain=None): + if chain is None: + chain = current_chain.get() + # logevent callbacks + while batches: + # we remove entries as we process them, so the exception handler doesn't re-await the callbacks + batch = batches.pop(0) + future, callback, event, filter_args = batch + if future is None: + await maywait(callback()) # non-log callback + else: + try: + log_events = await future if config.parallel_logevent_queries else future + except ValueError as e: + if e.args[0].get('code') == -32602: + # too many logs were returned in the batch, so decrease the batch size. + fatal(f'Decrease batch size for {chain}') + raise + parsed_events = [] + for log_event in log_events: + try: + parsed = event.process_log(log_event) if event is not None else log_event + except (LogTopicError, MismatchedABI) as e: + log.warning(f'logevent parse error {e}\n{log_event}') + parsed = NARG # need a placeholder + parsed_events.append(parsed) + # todo try/except for known retryable errors + await maywait(callback(parsed_events)) diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index acd1a34..9cb10ae 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -1,11 +1,9 @@ import asyncio import logging from asyncio import Queue -from typing import Union, Any, Iterable, Callable +from typing import Any, Iterable, Callable -from web3.contract.contract import ContractEvents from web3.exceptions import LogTopicError, MismatchedABI -from web3.types import EventData # noinspection PyPackageRequirements from websockets.exceptions import ConnectionClosedError @@ -17,15 +15,20 @@ from dexorder.blockstate import BlockState, current_blockstate from dexorder.blockstate.diff import DiffEntryItem from dexorder.database.model import Block from dexorder.database.model.block import current_block, latest_block -from dexorder.util import hexstr, topic +from dexorder.progressor import BlockProgressor +from dexorder.util import hexstr from dexorder.util.async_util import maywait, Maywaitable +from dexorder.util.shutdown import fatal log = logging.getLogger(__name__) class Retry (Exception): ... + # todo detect reorgs and generate correct onHeadUpdate set by unioning the changes along the two forks, not including their common ancestor deltas -class BlockStateRunner: + + +class BlockStateRunner(BlockProgressor): """ @@ -59,14 +62,9 @@ class BlockStateRunner: """ If state is None, then it is initialized as empty using the first block seen as the root block. Then the second block begins log event handling. """ + super().__init__() self.state = state - # items are (callback, event, log_filter). The callback is invoked with web3 EventData for every detected event - self.events:list[tuple[Callable[[EventData],Maywaitable[None]],ContractEvents,dict]] = [] - - # these callbacks are invoked after every block and also every second if there wasnt a block - self.postprocess_cbs:list[Callable[[],Maywaitable[None]]] = [] - # onStateInit callbacks are invoked after the initial state is loaded or created self.on_state_init: list[Callable[[],Maywaitable[None]]] = [] self.state_initialized = False @@ -86,22 +84,6 @@ class BlockStateRunner: self.running = False - - def add_event_trigger(self, - # callback takes either a single event if multi=False, or if multi=True then a list of all events in the processing range - callback: Union[Callable[[EventData], Maywaitable[None]], - Callable[[list[EventData]], Maywaitable[None]]], - event: ContractEvents = None, - log_filter: Union[dict, str] = None, - *, multi=False): - """ - if event is None, the callback is still invoked in the series of log handlers but with no argument instead of logs - """ - if log_filter is None and event is not None: - log_filter = {'topics': [topic(event.abi)]} - cb = callback if multi else lambda events: map(cb, events) - self.events.append((cb, event, log_filter)) - async def run(self): # this run() process discovers new heads and puts them on a queue for the worker to process. the discovery is ether websockets or polling if self.state: @@ -214,7 +196,8 @@ class BlockStateRunner: if self.state or config.backfill: # backfill batches start_height = self.max_height_seen - batch_height = start_height + chain.batch_size - 1 + batch_size = config.batch_size if config.batch_size is not None else chain.batch_size + batch_height = start_height + batch_size - 1 while batch_height < head.height: # the backfill is larger than a single batch, so we push intermediate head blocks onto the queue response = await w3.provider.make_request('eth_getBlockByNumber', [hex(batch_height), False]) @@ -291,25 +274,11 @@ class BlockStateRunner: if fork is None: log.debug(f'discarded late-arriving head {block}') else: - batches = [] + batches: list + from_height = fork.parent.height + to_height = fork.height if fork.disjoint: - # backfill batches - for callback, event, log_filter in self.events: - if log_filter is None: - batches.append((None, callback, event, None)) - else: - from_height = fork.parent.height - to_height = fork.height - lf = dict(log_filter) - lf['fromBlock'] = from_height - lf['toBlock'] = to_height - log.debug(f'querying backfill {from_height} through {to_height}') - get_logs = w3.eth.get_logs(lf) - if not config.parallel_logevent_queries: - get_logs = await get_logs - batches.append((get_logs, callback, event, lf)) - for callback in self.postprocess_cbs: - batches.append((None, callback, None, None)) + batches = await self.get_backfill_batches(from_height, to_height, w3) else: # event callbacks are triggered in the order in which they're registered. the events passed to # each callback are in block transaction order @@ -337,36 +306,7 @@ class BlockStateRunner: current_pub.set(lambda room, evnt, *args: pubs.append((room, evnt, args))) # used by handle_vault_created if not self.state_initialized: await self.do_state_init_cbs() - # logevent callbacks - while True: - try: - # we remove entries as we process them, so the exception handler doesn't re-await the callbacks - batch = batches.pop(0) - except IndexError: - break - future, callback, event, filter_args = batch - if future is None: - await maywait(callback()) # non-log callback - else: - try: - log_events = await future if config.parallel_logevent_queries else future - except ValueError as e: - if e.args[0].get('code') == -32602: - # too many logs were returned in the batch, so decrease the batch size. - # fatal(f'Decrease batch size for {chain}') - log.warning(f'Decrease batch size for {chain}') - return - raise - parsed_events = [] - for log_event in log_events: - try: - parsed = event.process_log(log_event) if event is not None else log_event - except (LogTopicError, MismatchedABI) as e: - log.warning(f'logevent parse error {e}\n{log_event}') - parsed = NARG - parsed_events.append(parsed) - # todo try/except for known retryable errors - await maywait(callback(parsed_events)) + await self.invoke_callbacks(batches) # todo # IMPORTANT! check for a reorg and generate a reorg diff list. the diff list we need is the union of the set of keys touched by either @@ -378,7 +318,8 @@ class BlockStateRunner: await maywait(callback(block, diff_items)) # check for root promotion - promotion_height = latest_block.get().height - chain.confirms + confirm_offset = (config.confirms if config.confirms is not None else chain.confirms) - 1 + promotion_height = latest_block.get().height - confirm_offset new_root_fork = None if fork.disjoint: # individually check the fork's head and ancestor diff --git a/src/dexorder/walker.py b/src/dexorder/walker.py new file mode 100644 index 0000000..c20fcb9 --- /dev/null +++ b/src/dexorder/walker.py @@ -0,0 +1,86 @@ +import asyncio +import logging +from asyncio import Queue + +from websockets import ConnectionClosedError + +from dexorder import Blockchain, config, db +from dexorder.base.chain import current_chain +from dexorder.blockchain.connection import create_w3 +from dexorder.progressor import BlockProgressor + +log = logging.getLogger(__name__) + + +class BlockWalker (BlockProgressor): + """ + Walker is similar to the Runner in that it progresses through the blockchain, but unlike the Runner, the Walker only + processes blocks after they have reached finalization age. It does not create or maintain any BlockState. + """ + + def __init__(self): + super().__init__() + self.queue: Queue = Queue() + self.running = False + + async def run(self): + self.running = True + db.connect() + w3 = create_w3() + chain_id = await w3.eth.chain_id + chain = Blockchain.for_id(chain_id) + current_chain.set(chain) + confirm_offset = (config.confirms if config.confirms is not None else chain.confirms) - 1 + batch_size = config.batch_size if config.batch_size is not None else chain.batch_size + + kv_key = f'walker_height|{chain_id}' + processed_height = db.kv.get(kv_key, config.backfill) + prev_height = None + while self.running: + try: + block = await w3.eth.get_block('latest') + latest_height = block['number'] + if latest_height > prev_height: + prev_height = latest_height + log.debug(f'polled new block {latest_height}') + promotion_height = latest_height - confirm_offset + while processed_height <= promotion_height: + stop = min(promotion_height, processed_height+batch_size-1) + await self.handle(processed_height, stop, chain) + processed_height = db.kv[kv_key] = stop + if not self.running: + break + await asyncio.sleep(config.polling) + except (ConnectionClosedError, TimeoutError, asyncio.TimeoutError) as e: + log.debug(f'walker timeout {e}') + finally: + # noinspection PyBroadException + try: + # noinspection PyUnresolvedReferences + await w3.provider.disconnect() + except Exception: + pass + + + async def handle(self, from_height, to_height, chain=None): + if chain is None: + chain = current_chain.get() + session = None + try: + batches = await self.get_backfill_batches(from_height, to_height) + session = db.session + session.begin() + await self.invoke_callbacks(batches, chain) + except: # legitimately catch EVERYTHING because we re-raise + log.debug('rolling back session') + if session is not None: + session.rollback() + raise + else: + if session is not None: + session.commit() + log.info(f'completed through block {to_height}') + finally: + if session is not None: + session.close() + From c25c7f5a1b700348e97c917f3ba9969a67fe2646 Mon Sep 17 00:00:00 2001 From: Tim Date: Sun, 18 Feb 2024 23:12:52 -0400 Subject: [PATCH 2/5] finaldata debugged --- src/dexorder/bin/finaldata.py | 39 +++++++---- src/dexorder/blockstate/state.py | 48 ++++++++++++++ src/dexorder/blocktime.py | 7 +- src/dexorder/contract/__init__.py | 7 +- src/dexorder/ohlc.py | 83 ++++++++++++++++++++--- src/dexorder/pools.py | 5 +- src/dexorder/walker.py | 107 ++++++++++++++++++++---------- 7 files changed, 232 insertions(+), 64 deletions(-) diff --git a/src/dexorder/bin/finaldata.py b/src/dexorder/bin/finaldata.py index ea3caf7..9707454 100644 --- a/src/dexorder/bin/finaldata.py +++ b/src/dexorder/bin/finaldata.py @@ -1,18 +1,28 @@ import asyncio import logging -from datetime import datetime +import sys +from datetime import datetime, timedelta from web3.types import EventData -from dexorder import dec +from dexorder import dec, from_timestamp, blockchain, config from dexorder.bin.executable import execute +from dexorder.blockstate import current_blockstate, BlockState from dexorder.blocktime import get_block_timestamp +from dexorder.configuration import parse_args from dexorder.contract import get_contract_event +from dexorder.database.model.block import current_block from dexorder.database.model.pool import PoolDict +from dexorder.ohlc import LightOHLCRepository from dexorder.pools import get_uniswap_data +from dexorder.util import hexstr +from dexorder.util.shutdown import fatal from dexorder.walker import BlockWalker -log = logging.getLogger(__name__) +log = logging.getLogger('dexorder') + + +ohlcs = LightOHLCRepository() async def handle_backfill_uniswap_swaps(swaps: list[EventData]): @@ -22,21 +32,26 @@ async def handle_backfill_uniswap_swaps(swaps: list[EventData]): # now execute the swaps synchronously for swap in swaps: pool, time, price = await get_uniswap_data(swap) - await handle_uniswap_swap(pool, time, price) + ohlcs.light_update_all(pool['address'], time, price) -async def handle_uniswap_swap(pool: PoolDict, time: datetime, price: dec): - pass - -def finalize_callback(block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]): +def flush_callback(): # start = now() - log.info("finalizing OHLC's") - ohlc_save(block, diffs) + # log.info("finalizing OHLC's") # log.info(f'\ttook {(now() - start).total_seconds():.1f} seconds') - log.info(f'backfill completed through block {block.height} {from_timestamp(block.timestamp):%Y-%m-%d %H:%M:%S} {hexstr(block.hash)}') + block = current_block.get() + log.info(f'backfill completed through block {block.height} ' + f'{from_timestamp(block.timestamp):%Y-%m-%d %H:%M:%S} {hexstr(block.hash)}') + ohlcs.flush() async def main(): - walker = BlockWalker() + logging.basicConfig(level=logging.INFO, stream=sys.stdout) + log.setLevel(logging.DEBUG) + parse_args() + if config.ohlc_dir is None: + fatal('an ohlc_dir must be configured') + await blockchain.connect() + walker = BlockWalker(flush_callback, timedelta(seconds=5)) # todo flush every minute walker.add_event_trigger(handle_backfill_uniswap_swaps, get_contract_event('IUniswapV3PoolEvents', 'Swap'), multi=True) await walker.run() diff --git a/src/dexorder/blockstate/state.py b/src/dexorder/blockstate/state.py index 2dff20b..9e09d32 100644 --- a/src/dexorder/blockstate/state.py +++ b/src/dexorder/blockstate/state.py @@ -259,4 +259,52 @@ class BlockState: return compress_diffs(difflist) +# noinspection PyMethodMayBeStatic +class FinalizedBlockState: + """ + Does not act like a BlockState at all but like a regular underlying datastructure. Used by backfill when working + with finalized data that does not need BlockData since it can never be reorganized + """ + + def __init__(self): + self.data = {} + self.by_hash = {} + + def add_block(self, block: Block) -> Optional[Fork]: + self.by_hash[block.hash] = block + return self.fork(block) + + def delete_block(self, block: Union[Block, Fork, bytes]): + blockhash = block if isinstance(block, bytes) else block.hash + try: + del self.by_hash[blockhash] + except KeyError: + pass + + def fork(self, block: Block): + return Fork([block.hash], height=block.height) + + def get(self, _fork: Optional[Fork], series, key, default=NARG): + result = self.data.get(series,{}).get(key, default) + if result is NARG: + raise KeyError(key) + return result + + def set(self, _fork: Optional[Fork], series, key, value, overwrite=True): + assert overwrite + self.data.setdefault(series, {})[key] = value + + def iteritems(self, _fork: Optional[Fork], series): + return self.data.get(series,{}).items() + + def iterkeys(self, _fork: Optional[Fork], series): + return self.data.get(series,{}).keys() + + def itervalues(self, _fork: Optional[Fork], series): + return self.data.get(series,{}).values() + + def delete_series(self, _fork: Optional[Fork], series: str): + del self.data[series] + + current_blockstate = ContextVar[BlockState]('current_blockstate') diff --git a/src/dexorder/blocktime.py b/src/dexorder/blocktime.py index 3955f56..263667e 100644 --- a/src/dexorder/blocktime.py +++ b/src/dexorder/blocktime.py @@ -11,9 +11,10 @@ log = logging.getLogger(__name__) @alru_cache(maxsize=1024) async def get_block_timestamp(blockhash) -> int: - block = current_blockstate.get().by_hash.get(blockhash) - if block is not None: - return block.timestamp + try: + return current_blockstate.get().by_hash[blockhash] + except (LookupError, KeyError): + pass response = await current_w3.get().provider.make_request('eth_getBlockByHash', [blockhash, False]) raw = hexint(response['result']['timestamp']) # noinspection PyTypeChecker diff --git a/src/dexorder/contract/__init__.py b/src/dexorder/contract/__init__.py index 0da128c..aa303a7 100644 --- a/src/dexorder/contract/__init__.py +++ b/src/dexorder/contract/__init__.py @@ -1,8 +1,9 @@ -import json, os, logging +import json +import os -from .. import current_w3 as _current_w3 from .abi import abis from .contract_proxy import ContractProxy +from .. import current_w3 def get_contract_data(name): @@ -14,7 +15,7 @@ def get_contract_data(name): def get_contract_event(contract_name:str, event_name:str): - return getattr(_current_w3.get().eth.contract(abi=get_contract_data(contract_name)['abi']).events, event_name)() + return getattr(current_w3.get().eth.contract(abi=get_contract_data(contract_name)['abi']).events, event_name)() class ERC20 (ContractProxy): diff --git a/src/dexorder/ohlc.py b/src/dexorder/ohlc.py index 9c937f6..7743b59 100644 --- a/src/dexorder/ohlc.py +++ b/src/dexorder/ohlc.py @@ -174,10 +174,22 @@ class Chunk: bars = json.load(file) self.bars = [NativeOHLC.from_ohlc(ohlc) for ohlc in bars] except FileNotFoundError: - self.bars = [] + self.bars: list[NativeOHLC] = [] - def update(self, native: NativeOHLC): + def bar_at(self, time: datetime) -> Optional[NativeOHLC]: + if not self.bars: + return None + start = self.bars[0].start + index = (time - start) // self.period + if index >= len(self.bars): + return None + bar = self.bars[index] + assert bar.start == time + return bar + + + def update(self, native: NativeOHLC, *, backfill=False): if not self.bars: self.bars = [native] return @@ -185,9 +197,18 @@ class Chunk: index = (native.start - start) // self.period # log.debug(f'save {symbol} {ohlc_name(period)} chunk {start} index {index} <= {len(chunk)}') if index > len(self.bars): - log.error(f'chunk gap: {index} > {len(self.bars)} {self.symbol} {period_name(self.period)} {native}' + ''.join( - f'\n\t{c}' for c in self.bars)) - exit(1) + if not backfill: + log.error(f'chunk gap: {index} > {len(self.bars)} {self.symbol} {period_name(self.period)} {native}' + ''.join( + f'\n\t{c}' for c in self.bars)) + exit(1) + else: + last_bar = self.bars[-1] + last_price = last_bar.close + time = last_bar.start + for _ in range(index-len(self.bars)): + time += self.period + self.bars.append(NativeOHLC(time, None, None, None, last_price)) + assert index == len(self.bars) if index == len(self.bars): assert self.bars[-1].start + self.period == native.start self.bars.append(native) @@ -218,12 +239,19 @@ class Chunk: class OHLCRepository: def __init__(self, base_dir: str = None): """ can't actually make more than one of these because there's a global recent_ohlcs BlockDict """ - if base_dir is None: - base_dir = config.ohlc_dir - self.dir = base_dir + self._dir = base_dir self.cache = LFUCache(len(OHLC_PERIODS) * 1024) self.dirty_chunks = set() + @property + def dir(self): + if self._dir is None: + self._dir = config.ohlc_dir + if self._dir is None: + raise ValueError('OHLCRepository needs an ohlc_dir configured') + return self._dir + + @staticmethod def add_symbol(symbol: str, period: timedelta = None): if period is not None: @@ -235,6 +263,7 @@ class OHLCRepository: recent_ohlcs[(symbol, period)] = [] def update_all(self, symbol: str, time: datetime, price: dec, *, create: bool = True): + """ the update_all() and update() methods generate bars for the recent_ohlcs BlockDict """ for period in OHLC_PERIODS: self.update(symbol, period, time, price, create=create) @@ -279,7 +308,10 @@ class OHLCRepository: return updated def save_all(self, symbol: str, period: timedelta, ohlc_list: list[NativeOHLC]) -> None: - """ saves all OHLC's in the list """ + """ + The save_all() and save() methods interact with Chunks (disk files) not the BlockState. + saves all OHLC's in the list + """ for ohlc in ohlc_list: self.save(symbol, period, ohlc) @@ -311,6 +343,39 @@ class OHLCRepository: self.dirty_chunks.clear() +class LightOHLCRepository (OHLCRepository): + """ + Used for backfill when all data is known to be final. Keeps a simple table of current NativeOHLC candles. + """ + def __init__(self, base_dir: str = None): + super().__init__(base_dir) + self.current:dict[tuple[str,timedelta],NativeOHLC] = {} # current bars keyed by symbol + self.dirty_bars = set() + + def light_update_all(self, symbol: str, time: datetime, price: dec): + for period in OHLC_PERIODS: + self.light_update(symbol, period, time, price) + + def light_update(self, symbol: str, period: timedelta, time: datetime, price: Optional[dec] = None, + *, backfill=True): + start = ohlc_start_time(time, period) + chunk = self.get_chunk(symbol, period, start) + key = symbol, period + prev = self.current.get(key) + if prev is None: + prev = self.current[key] = chunk.bar_at(start) + if prev is None: + bar = NativeOHLC(start, price, price, price, price) + chunk.update(bar, backfill=backfill) + self.dirty_chunks.add(chunk) + else: + updated = update_ohlc(prev, period, time, price) + for bar in updated: + chunk = self.get_chunk(symbol, period, bar.start) + chunk.update(bar, backfill=backfill) + self.dirty_chunks.add(chunk) + + def pub_ohlc(_series:str, key: OHLCKey, bars: list[NativeOHLC]): pool_addr, period = key chain_id = current_chain.get().chain_id diff --git a/src/dexorder/pools.py b/src/dexorder/pools.py index e676685..54e02f7 100644 --- a/src/dexorder/pools.py +++ b/src/dexorder/pools.py @@ -13,7 +13,7 @@ from dexorder.blockstate.blockdata import K, V from dexorder.blocktime import get_block_timestamp from dexorder.database.model.pool import PoolDict from dexorder.tokens import get_token -from dexorder.uniswap import UniswapV3Pool, uniswapV3_pool_address, log +from dexorder.uniswap import UniswapV3Pool, uniswapV3_pool_address log = logging.getLogger(__name__) @@ -38,7 +38,8 @@ async def load_pool(address: str) -> PoolDict: decimals = token0['decimals'] - token1['decimals'] found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.UniswapV3.value, base=t0, quote=t1, fee=fee, decimals=decimals) - log.debug(f'new UniswapV3 pool {token0["symbol"]}\\{token1["symbol"]} .{decimals} {address}') + log.debug(f'new UniswapV3 pool {token0["symbol"]}\\{token1["symbol"]} ' + f'{("."+str(decimals)) if decimals >= 0 else (str(-decimals)+".")} {address}') else: # NOT a genuine Uniswap V3 pool if the address test doesn't pass log.debug(f'new Unknown pool at {address}') except ContractLogicError: diff --git a/src/dexorder/walker.py b/src/dexorder/walker.py index c20fcb9..d2ffb23 100644 --- a/src/dexorder/walker.py +++ b/src/dexorder/walker.py @@ -1,13 +1,20 @@ import asyncio import logging from asyncio import Queue +from datetime import timedelta +from typing import Union, Callable from websockets import ConnectionClosedError -from dexorder import Blockchain, config, db +from dexorder import Blockchain, config, db, now, current_w3 from dexorder.base.chain import current_chain from dexorder.blockchain.connection import create_w3 +from dexorder.blockstate import current_blockstate +from dexorder.blockstate.state import FinalizedBlockState +from dexorder.database.model import Block +from dexorder.database.model.block import current_block from dexorder.progressor import BlockProgressor +from dexorder.util.async_util import Maywaitable log = logging.getLogger(__name__) @@ -16,44 +23,87 @@ class BlockWalker (BlockProgressor): """ Walker is similar to the Runner in that it progresses through the blockchain, but unlike the Runner, the Walker only processes blocks after they have reached finalization age. It does not create or maintain any BlockState. + Walker additionally has a delayed finalization mechanism to allow infrequent flushing of accumulated data. """ - def __init__(self): + def __init__(self, + flush_callback:Callable[[],Maywaitable[None]] = None, + flush_delay: Union[None, int, timedelta] = None): + """ + :param flush_delay: either a number of blocks or a time interval. after the flush_delay + """ super().__init__() self.queue: Queue = Queue() self.running = False + self.flush_callback = flush_callback + self.flush_delay = flush_delay + self.flush_type = None if flush_delay is None else 'time' if isinstance(flush_delay, timedelta) else 'blocks' async def run(self): self.running = True db.connect() - w3 = create_w3() - chain_id = await w3.eth.chain_id - chain = Blockchain.for_id(chain_id) - current_chain.set(chain) + w3 = current_w3.get() + chain = current_chain.get() + chain_id = chain.chain_id confirm_offset = (config.confirms if config.confirms is not None else chain.confirms) - 1 batch_size = config.batch_size if config.batch_size is not None else chain.batch_size + current_blockstate.set(FinalizedBlockState()) kv_key = f'walker_height|{chain_id}' - processed_height = db.kv.get(kv_key, config.backfill) + with db.session: + processed_height = db.kv.get(kv_key, config.backfill) + log.info(f'walker starting at block {processed_height}') + last_flush = processed_height if self.flush_type == 'blocks' else now() if self.flush_type == 'time' else None prev_height = None + session = db.session + session.begin() while self.running: try: - block = await w3.eth.get_block('latest') - latest_height = block['number'] - if latest_height > prev_height: + latest_rawblock = await w3.eth.get_block('latest') + latest_height = latest_rawblock['number'] + if prev_height is None or latest_height > prev_height: prev_height = latest_height log.debug(f'polled new block {latest_height}') promotion_height = latest_height - confirm_offset while processed_height <= promotion_height: - stop = min(promotion_height, processed_height+batch_size-1) - await self.handle(processed_height, stop, chain) - processed_height = db.kv[kv_key] = stop + cur_height = min(promotion_height, processed_height+batch_size-1) + block_data = await w3.eth.get_block(cur_height) + height = block_data['number'] + assert height == cur_height + block = Block(chain=chain.chain_id, height=cur_height, hash=(block_data['hash']), + parent=(block_data['parentHash']), data=block_data) + current_block.set(block) + await self.handle(processed_height, cur_height, chain=chain, w3=w3) + if self.flush_delay is None or \ + self.flush_type=='blocks' and last_flush + self.flush_delay <= processed_height or \ + self.flush_type=='time' and last_flush + self.flush_delay <= now(): + if self.flush_callback is not None: + self.flush_callback() + # flush height to db + db.kv[kv_key] = cur_height + if self.flush_type=='blocks': + last_flush = cur_height + elif self.flush_type=='time': + last_flush = now() + # this is the only way to commit any data to the db. everything else is a rollback. + db.session.commit() + db.session.begin() + processed_height = cur_height if not self.running: break await asyncio.sleep(config.polling) - except (ConnectionClosedError, TimeoutError, asyncio.TimeoutError) as e: - log.debug(f'walker timeout {e}') finally: + # anything that wasnt committed yet by the flush timer is discarded + # noinspection PyBroadException + try: + db.session.rollback() + except Exception: + pass + # noinspection PyBroadException + try: + db.session.close() + except Exception: + pass # noinspection PyBroadException try: # noinspection PyUnresolvedReferences @@ -62,25 +112,12 @@ class BlockWalker (BlockProgressor): pass - async def handle(self, from_height, to_height, chain=None): + async def handle(self, from_height, to_height, *, chain=None, w3=None): + log.info(f'processing blocks {from_height} - {to_height}') if chain is None: chain = current_chain.get() - session = None - try: - batches = await self.get_backfill_batches(from_height, to_height) - session = db.session - session.begin() - await self.invoke_callbacks(batches, chain) - except: # legitimately catch EVERYTHING because we re-raise - log.debug('rolling back session') - if session is not None: - session.rollback() - raise - else: - if session is not None: - session.commit() - log.info(f'completed through block {to_height}') - finally: - if session is not None: - session.close() - + if w3 is None: + w3 = current_w3.get() + batches = await self.get_backfill_batches(from_height, to_height, w3=w3) + await self.invoke_callbacks(batches, chain) + log.info(f'completed through block {to_height}') From cb7cfe00e82ead8580dac28cacd5c035d592b0e2 Mon Sep 17 00:00:00 2001 From: Tim Date: Mon, 19 Feb 2024 21:23:55 -0400 Subject: [PATCH 3/5] walker exception catch --- src/dexorder/bin/finaldata.py | 11 ++++------- src/dexorder/walker.py | 3 +++ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/dexorder/bin/finaldata.py b/src/dexorder/bin/finaldata.py index 9707454..7975b47 100644 --- a/src/dexorder/bin/finaldata.py +++ b/src/dexorder/bin/finaldata.py @@ -1,18 +1,16 @@ import asyncio import logging import sys -from datetime import datetime, timedelta +from datetime import timedelta from web3.types import EventData -from dexorder import dec, from_timestamp, blockchain, config +from dexorder import from_timestamp, blockchain, config from dexorder.bin.executable import execute -from dexorder.blockstate import current_blockstate, BlockState from dexorder.blocktime import get_block_timestamp from dexorder.configuration import parse_args from dexorder.contract import get_contract_event from dexorder.database.model.block import current_block -from dexorder.database.model.pool import PoolDict from dexorder.ohlc import LightOHLCRepository from dexorder.pools import get_uniswap_data from dexorder.util import hexstr @@ -28,13 +26,12 @@ ohlcs = LightOHLCRepository() async def handle_backfill_uniswap_swaps(swaps: list[EventData]): # asynchronously prefetch the block timestamps we'll need hashes = set(swap['blockHash'] for swap in swaps) - await asyncio.gather(*[get_block_timestamp(h) for h in hashes]) + asyncio.gather(*[get_block_timestamp(h) for h in hashes]).add_done_callback(lambda _:None) # fire and forget, just to build cache # now execute the swaps synchronously for swap in swaps: pool, time, price = await get_uniswap_data(swap) ohlcs.light_update_all(pool['address'], time, price) - def flush_callback(): # start = now() # log.info("finalizing OHLC's") @@ -51,7 +48,7 @@ async def main(): if config.ohlc_dir is None: fatal('an ohlc_dir must be configured') await blockchain.connect() - walker = BlockWalker(flush_callback, timedelta(seconds=5)) # todo flush every minute + walker = BlockWalker(flush_callback, timedelta(minutes=5)) walker.add_event_trigger(handle_backfill_uniswap_swaps, get_contract_event('IUniswapV3PoolEvents', 'Swap'), multi=True) await walker.run() diff --git a/src/dexorder/walker.py b/src/dexorder/walker.py index d2ffb23..088f054 100644 --- a/src/dexorder/walker.py +++ b/src/dexorder/walker.py @@ -58,6 +58,7 @@ class BlockWalker (BlockProgressor): session = db.session session.begin() while self.running: + # noinspection PyBroadException try: latest_rawblock = await w3.eth.get_block('latest') latest_height = latest_rawblock['number'] @@ -92,6 +93,8 @@ class BlockWalker (BlockProgressor): if not self.running: break await asyncio.sleep(config.polling) + except Exception: + log.exception('Exception in walker loop') finally: # anything that wasnt committed yet by the flush timer is discarded # noinspection PyBroadException From bb6b2d0be451199ed27f76c3e0c706886a7cba3d Mon Sep 17 00:00:00 2001 From: Tim Date: Mon, 19 Feb 2024 22:42:51 -0400 Subject: [PATCH 4/5] dexorder-finaldata.toml --- dexorder-finaldata.toml | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 dexorder-finaldata.toml diff --git a/dexorder-finaldata.toml b/dexorder-finaldata.toml new file mode 100644 index 0000000..10bf7e6 --- /dev/null +++ b/dexorder-finaldata.toml @@ -0,0 +1,11 @@ +ohlc_dir='/ohlc' +rpc_url='https://arb-mainnet.g.alchemy.com/v2/fneXR05VTXzNS6ApcPd-QuyX-gv7AWzL' +account='test1' # Dev Account #1 +ws_url='' +db_url='postgresql://dexorder:redroxed@postgres/dexorder' # it's ok to use the main db because finaldata.py only writes its own special key to the KV table +redis_url='' + +[deployments] +1337='alpha' +42161='alpha' +31337='alpha' From 6da5773ff9a17a4ab8562fe98dfc2d7fc13a2391 Mon Sep 17 00:00:00 2001 From: Tim Date: Wed, 21 Feb 2024 14:48:52 -0400 Subject: [PATCH 5/5] finaldata bugfixes --- src/dexorder/bin/finaldata.py | 6 ++++-- src/dexorder/blockchain/connection.py | 25 ++++++++++++++++--------- src/dexorder/configuration/schema.py | 1 + src/dexorder/ohlc.py | 5 ++++- src/dexorder/pools.py | 15 +++++++++------ src/dexorder/tokens.py | 11 ++++++++--- 6 files changed, 42 insertions(+), 21 deletions(-) diff --git a/src/dexorder/bin/finaldata.py b/src/dexorder/bin/finaldata.py index 7975b47..be0271c 100644 --- a/src/dexorder/bin/finaldata.py +++ b/src/dexorder/bin/finaldata.py @@ -29,8 +29,10 @@ async def handle_backfill_uniswap_swaps(swaps: list[EventData]): asyncio.gather(*[get_block_timestamp(h) for h in hashes]).add_done_callback(lambda _:None) # fire and forget, just to build cache # now execute the swaps synchronously for swap in swaps: - pool, time, price = await get_uniswap_data(swap) - ohlcs.light_update_all(pool['address'], time, price) + data = await get_uniswap_data(swap) + if data is not None: + pool, time, price = data + ohlcs.light_update_all(pool['address'], time, price) def flush_callback(): # start = now() diff --git a/src/dexorder/blockchain/connection.py b/src/dexorder/blockchain/connection.py index ed368e4..4f56f46 100644 --- a/src/dexorder/blockchain/connection.py +++ b/src/dexorder/blockchain/connection.py @@ -10,7 +10,7 @@ from hexbytes import HexBytes from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider from web3.types import RPCEndpoint, RPCResponse -from .. import current_w3, Blockchain +from .. import current_w3, Blockchain, config from ..base.chain import current_chain from ..configuration import resolve_rpc_url from ..configuration.resolve import resolve_ws_url @@ -109,27 +109,34 @@ def _make_contract(w3_eth): log = logging.getLogger(__name__) -MAX_CONCURRENCY = 6 # todo configure +MAX_CONCURRENCY = config.concurrent_rpc_connections class RetryHTTPProvider (AsyncHTTPProvider): def __init__(self, endpoint_uri: Optional[Union[URI, str]] = None, request_kwargs: Optional[Any] = None) -> None: super().__init__(endpoint_uri, request_kwargs) self.in_flight = asyncio.Semaphore(MAX_CONCURRENCY) + self.rate_allowed = asyncio.Event() + self.rate_allowed.set() async def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse: wait = 0 while True: try: async with self.in_flight: + await self.rate_allowed.wait() return await super().make_request(method, params) except ClientResponseError as e: if e.status != 429: raise - retry_after = e.headers.get('retry-after', None) - if retry_after is not None: - wait = float(retry_after) - else: - wait += 1 + .125 * random() - log.debug('rate limiting') - await asyncio.sleep(wait) + self.rate_allowed.clear() + try: + retry_after = e.headers.get('retry-after', None) + if retry_after is not None: + wait = float(retry_after) + else: + wait += 1 + .125 * random() + log.debug('rate limiting') + await asyncio.sleep(wait) + finally: + self.rate_allowed.set() diff --git a/src/dexorder/configuration/schema.py b/src/dexorder/configuration/schema.py index c708a1a..603dd3a 100644 --- a/src/dexorder/configuration/schema.py +++ b/src/dexorder/configuration/schema.py @@ -19,6 +19,7 @@ class Config: redis_url: Optional[str] = 'redis://localhost:6379' ohlc_dir: Optional[str] = None # if empty string or None, then OHLC's are not saved to disk + concurrent_rpc_connections: int = 4 parallel_logevent_queries: bool = True polling: float = 0 # seconds between queries for a new block. 0 disables polling and uses a websocket subscription on ws_url instead backfill: int = 0 # if not 0, then runner will initialize an empty database by backfilling from the given block height diff --git a/src/dexorder/ohlc.py b/src/dexorder/ohlc.py index 7743b59..cb4124a 100644 --- a/src/dexorder/ohlc.py +++ b/src/dexorder/ohlc.py @@ -11,6 +11,7 @@ from dexorder.base.chain import current_chain from dexorder.blockstate import BlockDict, DiffItem, current_blockstate from dexorder.blockstate.diff import DiffEntryItem from dexorder.database.model import Block +from dexorder.util.shutdown import fatal log = logging.getLogger(__name__) @@ -121,7 +122,9 @@ def update_ohlc(prev: NativeOHLC, period: timedelta, time: datetime, price: Opti """ # log.debug(f'\tupdating {prev} with {minutely(time)} {price}') cur = prev - assert time >= cur.start + if time < cur.start: + # data corruption. just shut down + fatal(f'update_ohlc({prev}, {period}, {time}, {price}) failed because time is before the start of the candle') result = [] # advance time and finalize any past OHLC's into the result array while True: diff --git a/src/dexorder/pools.py b/src/dexorder/pools.py index 54e02f7..9571d1a 100644 --- a/src/dexorder/pools.py +++ b/src/dexorder/pools.py @@ -35,11 +35,14 @@ async def load_pool(address: str) -> PoolDict: t0, t1, fee = await asyncio.gather(v3.token0(), v3.token1(), v3.fee()) if uniswapV3_pool_address(t0, t1, fee) == address: # VALIDATE don't just trust that it's a Uniswap pool token0, token1 = await asyncio.gather(get_token(t0), get_token(t1)) - decimals = token0['decimals'] - token1['decimals'] - found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.UniswapV3.value, - base=t0, quote=t1, fee=fee, decimals=decimals) - log.debug(f'new UniswapV3 pool {token0["symbol"]}\\{token1["symbol"]} ' - f'{("."+str(decimals)) if decimals >= 0 else (str(-decimals)+".")} {address}') + if token0 is None or token1 is None: + found = None + else: + decimals = token0['decimals'] - token1['decimals'] + found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.UniswapV3.value, + base=t0, quote=t1, fee=fee, decimals=decimals) + log.debug(f'new UniswapV3 pool {token0["symbol"]}\\{token1["symbol"]} ' + f'{("."+str(decimals)) if decimals >= 0 else (str(-decimals)+".")} {address}') else: # NOT a genuine Uniswap V3 pool if the address test doesn't pass log.debug(f'new Unknown pool at {address}') except ContractLogicError: @@ -99,7 +102,7 @@ async def get_uniswap_data(swap: EventData): addr = swap['address'] pool = await get_pool(addr) if pool['exchange'] != Exchange.UniswapV3.value: - log.debug(f'Ignoring {Exchange(pool["exchange"])} pool {addr}') + # log.debug(f'Ignoring {Exchange(pool["exchange"])} pool {addr}') return None price: dec = await uniswap_price(pool, sqrt_price) timestamp = await get_block_timestamp(swap['blockHash']) diff --git a/src/dexorder/tokens.py b/src/dexorder/tokens.py index a743b29..9b2e4b0 100644 --- a/src/dexorder/tokens.py +++ b/src/dexorder/tokens.py @@ -1,5 +1,6 @@ import asyncio import logging +from typing import Optional from eth_abi.exceptions import InsufficientDataBytes from web3.exceptions import ContractLogicError, BadFunctionCallOutput @@ -12,7 +13,7 @@ from dexorder.database.model.token import TokenDict log = logging.getLogger(__name__) -async def get_token(address) -> TokenDict: +async def get_token(address) -> Optional[TokenDict]: try: return address_metadata[address] except KeyError: @@ -20,7 +21,7 @@ async def get_token(address) -> TokenDict: return result -async def load_token(address: str) -> TokenDict: +async def load_token(address: str) -> Optional[TokenDict]: contract = ERC20(address) prom = asyncio.gather(contract.name(), contract.symbol()) try: @@ -28,7 +29,11 @@ async def load_token(address: str) -> TokenDict: except (InsufficientDataBytes, ContractLogicError, BadFunctionCallOutput): log.warning(f'token {address} has no decimals()') decimals = 0 - name, symbol = await prom + try: + name, symbol = await prom + except OverflowError: + # this happens when the token returns bytes32 instead of a string + return None log.debug(f'new token {name} {symbol} {address}') return TokenDict(type='Token', chain=current_chain.get().chain_id, address=address, name=name, symbol=symbol, decimals=decimals)