From ee2ef4c7acebab19876facf6fa81887f66ebf8c2 Mon Sep 17 00:00:00 2001 From: Tim Date: Sun, 18 Feb 2024 23:12:52 -0400 Subject: [PATCH] finaldata debugged --- src/dexorder/bin/finaldata.py | 39 +++++++---- src/dexorder/blockstate/state.py | 48 ++++++++++++++ src/dexorder/blocktime.py | 7 +- src/dexorder/contract/__init__.py | 7 +- src/dexorder/ohlc.py | 83 ++++++++++++++++++++--- src/dexorder/pools.py | 5 +- src/dexorder/walker.py | 107 ++++++++++++++++++++---------- 7 files changed, 232 insertions(+), 64 deletions(-) diff --git a/src/dexorder/bin/finaldata.py b/src/dexorder/bin/finaldata.py index ea3caf7..9707454 100644 --- a/src/dexorder/bin/finaldata.py +++ b/src/dexorder/bin/finaldata.py @@ -1,18 +1,28 @@ import asyncio import logging -from datetime import datetime +import sys +from datetime import datetime, timedelta from web3.types import EventData -from dexorder import dec +from dexorder import dec, from_timestamp, blockchain, config from dexorder.bin.executable import execute +from dexorder.blockstate import current_blockstate, BlockState from dexorder.blocktime import get_block_timestamp +from dexorder.configuration import parse_args from dexorder.contract import get_contract_event +from dexorder.database.model.block import current_block from dexorder.database.model.pool import PoolDict +from dexorder.ohlc import LightOHLCRepository from dexorder.pools import get_uniswap_data +from dexorder.util import hexstr +from dexorder.util.shutdown import fatal from dexorder.walker import BlockWalker -log = logging.getLogger(__name__) +log = logging.getLogger('dexorder') + + +ohlcs = LightOHLCRepository() async def handle_backfill_uniswap_swaps(swaps: list[EventData]): @@ -22,21 +32,26 @@ async def handle_backfill_uniswap_swaps(swaps: list[EventData]): # now execute the swaps synchronously for swap in swaps: pool, time, price = await get_uniswap_data(swap) - await handle_uniswap_swap(pool, time, price) + ohlcs.light_update_all(pool['address'], time, price) -async def handle_uniswap_swap(pool: PoolDict, time: datetime, price: dec): - pass - -def finalize_callback(block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]): +def flush_callback(): # start = now() - log.info("finalizing OHLC's") - ohlc_save(block, diffs) + # log.info("finalizing OHLC's") # log.info(f'\ttook {(now() - start).total_seconds():.1f} seconds') - log.info(f'backfill completed through block {block.height} {from_timestamp(block.timestamp):%Y-%m-%d %H:%M:%S} {hexstr(block.hash)}') + block = current_block.get() + log.info(f'backfill completed through block {block.height} ' + f'{from_timestamp(block.timestamp):%Y-%m-%d %H:%M:%S} {hexstr(block.hash)}') + ohlcs.flush() async def main(): - walker = BlockWalker() + logging.basicConfig(level=logging.INFO, stream=sys.stdout) + log.setLevel(logging.DEBUG) + parse_args() + if config.ohlc_dir is None: + fatal('an ohlc_dir must be configured') + await blockchain.connect() + walker = BlockWalker(flush_callback, timedelta(seconds=5)) # todo flush every minute walker.add_event_trigger(handle_backfill_uniswap_swaps, get_contract_event('IUniswapV3PoolEvents', 'Swap'), multi=True) await walker.run() diff --git a/src/dexorder/blockstate/state.py b/src/dexorder/blockstate/state.py index 2dff20b..9e09d32 100644 --- a/src/dexorder/blockstate/state.py +++ b/src/dexorder/blockstate/state.py @@ -259,4 +259,52 @@ class BlockState: return compress_diffs(difflist) +# noinspection PyMethodMayBeStatic +class FinalizedBlockState: + """ + Does not act like a BlockState at all but like a regular underlying datastructure. Used by backfill when working + with finalized data that does not need BlockData since it can never be reorganized + """ + + def __init__(self): + self.data = {} + self.by_hash = {} + + def add_block(self, block: Block) -> Optional[Fork]: + self.by_hash[block.hash] = block + return self.fork(block) + + def delete_block(self, block: Union[Block, Fork, bytes]): + blockhash = block if isinstance(block, bytes) else block.hash + try: + del self.by_hash[blockhash] + except KeyError: + pass + + def fork(self, block: Block): + return Fork([block.hash], height=block.height) + + def get(self, _fork: Optional[Fork], series, key, default=NARG): + result = self.data.get(series,{}).get(key, default) + if result is NARG: + raise KeyError(key) + return result + + def set(self, _fork: Optional[Fork], series, key, value, overwrite=True): + assert overwrite + self.data.setdefault(series, {})[key] = value + + def iteritems(self, _fork: Optional[Fork], series): + return self.data.get(series,{}).items() + + def iterkeys(self, _fork: Optional[Fork], series): + return self.data.get(series,{}).keys() + + def itervalues(self, _fork: Optional[Fork], series): + return self.data.get(series,{}).values() + + def delete_series(self, _fork: Optional[Fork], series: str): + del self.data[series] + + current_blockstate = ContextVar[BlockState]('current_blockstate') diff --git a/src/dexorder/blocktime.py b/src/dexorder/blocktime.py index 3955f56..263667e 100644 --- a/src/dexorder/blocktime.py +++ b/src/dexorder/blocktime.py @@ -11,9 +11,10 @@ log = logging.getLogger(__name__) @alru_cache(maxsize=1024) async def get_block_timestamp(blockhash) -> int: - block = current_blockstate.get().by_hash.get(blockhash) - if block is not None: - return block.timestamp + try: + return current_blockstate.get().by_hash[blockhash] + except (LookupError, KeyError): + pass response = await current_w3.get().provider.make_request('eth_getBlockByHash', [blockhash, False]) raw = hexint(response['result']['timestamp']) # noinspection PyTypeChecker diff --git a/src/dexorder/contract/__init__.py b/src/dexorder/contract/__init__.py index 0da128c..aa303a7 100644 --- a/src/dexorder/contract/__init__.py +++ b/src/dexorder/contract/__init__.py @@ -1,8 +1,9 @@ -import json, os, logging +import json +import os -from .. import current_w3 as _current_w3 from .abi import abis from .contract_proxy import ContractProxy +from .. import current_w3 def get_contract_data(name): @@ -14,7 +15,7 @@ def get_contract_data(name): def get_contract_event(contract_name:str, event_name:str): - return getattr(_current_w3.get().eth.contract(abi=get_contract_data(contract_name)['abi']).events, event_name)() + return getattr(current_w3.get().eth.contract(abi=get_contract_data(contract_name)['abi']).events, event_name)() class ERC20 (ContractProxy): diff --git a/src/dexorder/ohlc.py b/src/dexorder/ohlc.py index c0a0463..717e7f8 100644 --- a/src/dexorder/ohlc.py +++ b/src/dexorder/ohlc.py @@ -174,10 +174,22 @@ class Chunk: bars = json.load(file) self.bars = [NativeOHLC.from_ohlc(ohlc) for ohlc in bars] except FileNotFoundError: - self.bars = [] + self.bars: list[NativeOHLC] = [] - def update(self, native: NativeOHLC): + def bar_at(self, time: datetime) -> Optional[NativeOHLC]: + if not self.bars: + return None + start = self.bars[0].start + index = (time - start) // self.period + if index >= len(self.bars): + return None + bar = self.bars[index] + assert bar.start == time + return bar + + + def update(self, native: NativeOHLC, *, backfill=False): if not self.bars: self.bars = [native] return @@ -185,9 +197,18 @@ class Chunk: index = (native.start - start) // self.period # log.debug(f'save {symbol} {ohlc_name(period)} chunk {start} index {index} <= {len(chunk)}') if index > len(self.bars): - log.error(f'chunk gap: {index} > {len(self.bars)} {self.symbol} {period_name(self.period)} {native}' + ''.join( - f'\n\t{c}' for c in self.bars)) - exit(1) + if not backfill: + log.error(f'chunk gap: {index} > {len(self.bars)} {self.symbol} {period_name(self.period)} {native}' + ''.join( + f'\n\t{c}' for c in self.bars)) + exit(1) + else: + last_bar = self.bars[-1] + last_price = last_bar.close + time = last_bar.start + for _ in range(index-len(self.bars)): + time += self.period + self.bars.append(NativeOHLC(time, None, None, None, last_price)) + assert index == len(self.bars) if index == len(self.bars): assert self.bars[-1].start + self.period == native.start self.bars.append(native) @@ -218,12 +239,19 @@ class Chunk: class OHLCRepository: def __init__(self, base_dir: str = None): """ can't actually make more than one of these because there's a global recent_ohlcs BlockDict """ - if base_dir is None: - base_dir = config.ohlc_dir - self.dir = base_dir + self._dir = base_dir self.cache = LFUCache(len(OHLC_PERIODS) * 1024) self.dirty_chunks = set() + @property + def dir(self): + if self._dir is None: + self._dir = config.ohlc_dir + if self._dir is None: + raise ValueError('OHLCRepository needs an ohlc_dir configured') + return self._dir + + @staticmethod def add_symbol(symbol: str, period: timedelta = None): if period is not None: @@ -235,6 +263,7 @@ class OHLCRepository: recent_ohlcs[(symbol, period)] = [] def update_all(self, symbol: str, time: datetime, price: dec, *, create: bool = True): + """ the update_all() and update() methods generate bars for the recent_ohlcs BlockDict """ for period in OHLC_PERIODS: self.update(symbol, period, time, price, create=create) @@ -279,7 +308,10 @@ class OHLCRepository: return updated def save_all(self, symbol: str, period: timedelta, ohlc_list: list[NativeOHLC]) -> None: - """ saves all OHLC's in the list """ + """ + The save_all() and save() methods interact with Chunks (disk files) not the BlockState. + saves all OHLC's in the list + """ for ohlc in ohlc_list: self.save(symbol, period, ohlc) @@ -311,6 +343,39 @@ class OHLCRepository: self.dirty_chunks.clear() +class LightOHLCRepository (OHLCRepository): + """ + Used for backfill when all data is known to be final. Keeps a simple table of current NativeOHLC candles. + """ + def __init__(self, base_dir: str = None): + super().__init__(base_dir) + self.current:dict[tuple[str,timedelta],NativeOHLC] = {} # current bars keyed by symbol + self.dirty_bars = set() + + def light_update_all(self, symbol: str, time: datetime, price: dec): + for period in OHLC_PERIODS: + self.light_update(symbol, period, time, price) + + def light_update(self, symbol: str, period: timedelta, time: datetime, price: Optional[dec] = None, + *, backfill=True): + start = ohlc_start_time(time, period) + chunk = self.get_chunk(symbol, period, start) + key = symbol, period + prev = self.current.get(key) + if prev is None: + prev = self.current[key] = chunk.bar_at(start) + if prev is None: + bar = NativeOHLC(start, price, price, price, price) + chunk.update(bar, backfill=backfill) + self.dirty_chunks.add(chunk) + else: + updated = update_ohlc(prev, period, time, price) + for bar in updated: + chunk = self.get_chunk(symbol, period, bar.start) + chunk.update(bar, backfill=backfill) + self.dirty_chunks.add(chunk) + + def pub_ohlc(_series:str, key: OHLCKey, bars: list[NativeOHLC]): pool_addr, period = key chain_id = current_chain.get().chain_id diff --git a/src/dexorder/pools.py b/src/dexorder/pools.py index e676685..54e02f7 100644 --- a/src/dexorder/pools.py +++ b/src/dexorder/pools.py @@ -13,7 +13,7 @@ from dexorder.blockstate.blockdata import K, V from dexorder.blocktime import get_block_timestamp from dexorder.database.model.pool import PoolDict from dexorder.tokens import get_token -from dexorder.uniswap import UniswapV3Pool, uniswapV3_pool_address, log +from dexorder.uniswap import UniswapV3Pool, uniswapV3_pool_address log = logging.getLogger(__name__) @@ -38,7 +38,8 @@ async def load_pool(address: str) -> PoolDict: decimals = token0['decimals'] - token1['decimals'] found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.UniswapV3.value, base=t0, quote=t1, fee=fee, decimals=decimals) - log.debug(f'new UniswapV3 pool {token0["symbol"]}\\{token1["symbol"]} .{decimals} {address}') + log.debug(f'new UniswapV3 pool {token0["symbol"]}\\{token1["symbol"]} ' + f'{("."+str(decimals)) if decimals >= 0 else (str(-decimals)+".")} {address}') else: # NOT a genuine Uniswap V3 pool if the address test doesn't pass log.debug(f'new Unknown pool at {address}') except ContractLogicError: diff --git a/src/dexorder/walker.py b/src/dexorder/walker.py index c20fcb9..d2ffb23 100644 --- a/src/dexorder/walker.py +++ b/src/dexorder/walker.py @@ -1,13 +1,20 @@ import asyncio import logging from asyncio import Queue +from datetime import timedelta +from typing import Union, Callable from websockets import ConnectionClosedError -from dexorder import Blockchain, config, db +from dexorder import Blockchain, config, db, now, current_w3 from dexorder.base.chain import current_chain from dexorder.blockchain.connection import create_w3 +from dexorder.blockstate import current_blockstate +from dexorder.blockstate.state import FinalizedBlockState +from dexorder.database.model import Block +from dexorder.database.model.block import current_block from dexorder.progressor import BlockProgressor +from dexorder.util.async_util import Maywaitable log = logging.getLogger(__name__) @@ -16,44 +23,87 @@ class BlockWalker (BlockProgressor): """ Walker is similar to the Runner in that it progresses through the blockchain, but unlike the Runner, the Walker only processes blocks after they have reached finalization age. It does not create or maintain any BlockState. + Walker additionally has a delayed finalization mechanism to allow infrequent flushing of accumulated data. """ - def __init__(self): + def __init__(self, + flush_callback:Callable[[],Maywaitable[None]] = None, + flush_delay: Union[None, int, timedelta] = None): + """ + :param flush_delay: either a number of blocks or a time interval. after the flush_delay + """ super().__init__() self.queue: Queue = Queue() self.running = False + self.flush_callback = flush_callback + self.flush_delay = flush_delay + self.flush_type = None if flush_delay is None else 'time' if isinstance(flush_delay, timedelta) else 'blocks' async def run(self): self.running = True db.connect() - w3 = create_w3() - chain_id = await w3.eth.chain_id - chain = Blockchain.for_id(chain_id) - current_chain.set(chain) + w3 = current_w3.get() + chain = current_chain.get() + chain_id = chain.chain_id confirm_offset = (config.confirms if config.confirms is not None else chain.confirms) - 1 batch_size = config.batch_size if config.batch_size is not None else chain.batch_size + current_blockstate.set(FinalizedBlockState()) kv_key = f'walker_height|{chain_id}' - processed_height = db.kv.get(kv_key, config.backfill) + with db.session: + processed_height = db.kv.get(kv_key, config.backfill) + log.info(f'walker starting at block {processed_height}') + last_flush = processed_height if self.flush_type == 'blocks' else now() if self.flush_type == 'time' else None prev_height = None + session = db.session + session.begin() while self.running: try: - block = await w3.eth.get_block('latest') - latest_height = block['number'] - if latest_height > prev_height: + latest_rawblock = await w3.eth.get_block('latest') + latest_height = latest_rawblock['number'] + if prev_height is None or latest_height > prev_height: prev_height = latest_height log.debug(f'polled new block {latest_height}') promotion_height = latest_height - confirm_offset while processed_height <= promotion_height: - stop = min(promotion_height, processed_height+batch_size-1) - await self.handle(processed_height, stop, chain) - processed_height = db.kv[kv_key] = stop + cur_height = min(promotion_height, processed_height+batch_size-1) + block_data = await w3.eth.get_block(cur_height) + height = block_data['number'] + assert height == cur_height + block = Block(chain=chain.chain_id, height=cur_height, hash=(block_data['hash']), + parent=(block_data['parentHash']), data=block_data) + current_block.set(block) + await self.handle(processed_height, cur_height, chain=chain, w3=w3) + if self.flush_delay is None or \ + self.flush_type=='blocks' and last_flush + self.flush_delay <= processed_height or \ + self.flush_type=='time' and last_flush + self.flush_delay <= now(): + if self.flush_callback is not None: + self.flush_callback() + # flush height to db + db.kv[kv_key] = cur_height + if self.flush_type=='blocks': + last_flush = cur_height + elif self.flush_type=='time': + last_flush = now() + # this is the only way to commit any data to the db. everything else is a rollback. + db.session.commit() + db.session.begin() + processed_height = cur_height if not self.running: break await asyncio.sleep(config.polling) - except (ConnectionClosedError, TimeoutError, asyncio.TimeoutError) as e: - log.debug(f'walker timeout {e}') finally: + # anything that wasnt committed yet by the flush timer is discarded + # noinspection PyBroadException + try: + db.session.rollback() + except Exception: + pass + # noinspection PyBroadException + try: + db.session.close() + except Exception: + pass # noinspection PyBroadException try: # noinspection PyUnresolvedReferences @@ -62,25 +112,12 @@ class BlockWalker (BlockProgressor): pass - async def handle(self, from_height, to_height, chain=None): + async def handle(self, from_height, to_height, *, chain=None, w3=None): + log.info(f'processing blocks {from_height} - {to_height}') if chain is None: chain = current_chain.get() - session = None - try: - batches = await self.get_backfill_batches(from_height, to_height) - session = db.session - session.begin() - await self.invoke_callbacks(batches, chain) - except: # legitimately catch EVERYTHING because we re-raise - log.debug('rolling back session') - if session is not None: - session.rollback() - raise - else: - if session is not None: - session.commit() - log.info(f'completed through block {to_height}') - finally: - if session is not None: - session.close() - + if w3 is None: + w3 = current_w3.get() + batches = await self.get_backfill_batches(from_height, to_height, w3=w3) + await self.invoke_callbacks(batches, chain) + log.info(f'completed through block {to_height}')