diff --git a/src/dexorder/bin/backfill_ohlc.py b/src/dexorder/bin/backfill_ohlc.py index f20def7..6ad23ed 100644 --- a/src/dexorder/bin/backfill_ohlc.py +++ b/src/dexorder/bin/backfill_ohlc.py @@ -25,7 +25,7 @@ log = logging.getLogger('dexorder.backfill') async def finalize_callback(fork: Fork, diffs: Reversible[Union[DiffItem, DiffEntryItem]]): ohlc_save(fork, diffs) - ts = await get_block_timestamp(fork.head) + ts = await get_block_timestamp(fork.head, fork.height) log.info(f'backfill completed through block {fork.height} {from_timestamp(ts):%Y-%m-%d %H:%M:%S}') diff --git a/src/dexorder/bin/finaldata.py b/src/dexorder/bin/finaldata.py index d350927..9fb9f44 100644 --- a/src/dexorder/bin/finaldata.py +++ b/src/dexorder/bin/finaldata.py @@ -1,4 +1,5 @@ import asyncio +import itertools import logging import sys from datetime import timedelta @@ -28,14 +29,16 @@ ohlcs = FinalOHLCRepository() async def handle_backfill_uniswap_swaps(swaps: list[EventData]): # asynchronously prefetch the block timestamps we'll need - hashes = set(swap['blockHash'] for swap in swaps) - asyncio.gather(*[get_block_timestamp(h) for h in hashes]).add_done_callback(lambda _:None) # fire and forget, just to build cache + block_ids = set((swap['blockHash'], swap['blockNumber']) for swap in swaps) + for batch in itertools.batched(block_ids, 4): + await asyncio.gather(*[get_block_timestamp(h,n) for h, n in batch]) + # now execute the swaps synchronously for swap in swaps: data = await get_uniswap_data(swap) if data is not None: pool, time, price = data -# log.debug(f'OHLC {pool["address"]} {time} {price}') + # log.debug(f'OHLC {pool["address"]} {time} {price}') ohlcs.light_update_all(pool['address'], time, price) async def flush_callback(): diff --git a/src/dexorder/blocks.py b/src/dexorder/blocks.py index 829d9e3..e1d4d20 100644 --- a/src/dexorder/blocks.py +++ b/src/dexorder/blocks.py @@ -19,8 +19,13 @@ from dexorder.util.async_dict import AsyncDict log = logging.getLogger(__name__) -async def get_block_timestamp(blockid: Union[bytes,int]) -> int: +async def get_block_timestamp(blockid: Union[bytes,int], block_number: int = None) -> int: block = await (fetch_block_by_number(blockid) if type(blockid) is int else get_block(blockid)) + if block is None: + if block_number is not None: + block = await fetch_block_by_number(block_number) + if block is None: + raise ValueError(f'Block {blockid} {block_number} not found') return block.timestamp diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index e61b801..7fe9641 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -1,4 +1,5 @@ import asyncio +import itertools import logging from uuid import UUID @@ -131,8 +132,10 @@ async def handle_transfer(transfer: EventData): async def handle_uniswap_swaps(swaps: list[EventData]): # asynchronously prefetch the block timestamps we'll need - hashes = set(swap['blockHash'] for swap in swaps) - await asyncio.gather(*[get_block_timestamp(h) for h in hashes]) + block_ids = set((swap['blockHash'], swap['blockNumber']) for swap in swaps) + for batch in itertools.batched(block_ids, 4): + await asyncio.gather(*[get_block_timestamp(h,n) for h, n in batch]) + # now execute the swaps synchronously for swap in swaps: await handle_uniswap_swap(swap) diff --git a/src/dexorder/ohlc.py b/src/dexorder/ohlc.py index 842c400..3cff509 100644 --- a/src/dexorder/ohlc.py +++ b/src/dexorder/ohlc.py @@ -374,9 +374,10 @@ class OHLCRepository: updated = update_ohlc(historical[-1], period, time, price) # drop any historical bars that are older than we need # oldest_needed = cover the root block time plus one period prior - root_hash = current_blockstate.get().root_branch.head + root_branch = current_blockstate.get().root_branch + root_hash = root_branch.head if root_hash is not None: - root_timestamp = await get_block_timestamp(root_hash) + root_timestamp = await get_block_timestamp(root_hash, root_branch.height) oldest_needed = from_timestamp(root_timestamp) - period # noinspection PyTypeChecker trim = (oldest_needed - historical[0].start) // period diff --git a/src/dexorder/pools.py b/src/dexorder/pools.py index 7339c96..5e1df21 100644 --- a/src/dexorder/pools.py +++ b/src/dexorder/pools.py @@ -118,6 +118,6 @@ async def get_uniswap_data(swap: EventData) -> Optional[tuple[OldPoolDict, datet if pool['exchange'] != Exchange.UniswapV3.value: return None price: dec = await uniswap_price(pool, sqrt_price) - timestamp = await get_block_timestamp(swap['blockHash']) + timestamp = await get_block_timestamp(swap['blockHash'], swap['blockNumber']) dt = from_timestamp(timestamp) return pool, dt, price