old block lookup fixes
This commit is contained in:
@@ -25,7 +25,7 @@ log = logging.getLogger('dexorder.backfill')
|
|||||||
|
|
||||||
async def finalize_callback(fork: Fork, diffs: Reversible[Union[DiffItem, DiffEntryItem]]):
|
async def finalize_callback(fork: Fork, diffs: Reversible[Union[DiffItem, DiffEntryItem]]):
|
||||||
ohlc_save(fork, diffs)
|
ohlc_save(fork, diffs)
|
||||||
ts = await get_block_timestamp(fork.head)
|
ts = await get_block_timestamp(fork.head, fork.height)
|
||||||
log.info(f'backfill completed through block {fork.height} {from_timestamp(ts):%Y-%m-%d %H:%M:%S}')
|
log.info(f'backfill completed through block {fork.height} {from_timestamp(ts):%Y-%m-%d %H:%M:%S}')
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
|
import itertools
|
||||||
import logging
|
import logging
|
||||||
import sys
|
import sys
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
@@ -28,14 +29,16 @@ ohlcs = FinalOHLCRepository()
|
|||||||
|
|
||||||
async def handle_backfill_uniswap_swaps(swaps: list[EventData]):
|
async def handle_backfill_uniswap_swaps(swaps: list[EventData]):
|
||||||
# asynchronously prefetch the block timestamps we'll need
|
# asynchronously prefetch the block timestamps we'll need
|
||||||
hashes = set(swap['blockHash'] for swap in swaps)
|
block_ids = set((swap['blockHash'], swap['blockNumber']) for swap in swaps)
|
||||||
asyncio.gather(*[get_block_timestamp(h) for h in hashes]).add_done_callback(lambda _:None) # fire and forget, just to build cache
|
for batch in itertools.batched(block_ids, 4):
|
||||||
|
await asyncio.gather(*[get_block_timestamp(h,n) for h, n in batch])
|
||||||
|
|
||||||
# now execute the swaps synchronously
|
# now execute the swaps synchronously
|
||||||
for swap in swaps:
|
for swap in swaps:
|
||||||
data = await get_uniswap_data(swap)
|
data = await get_uniswap_data(swap)
|
||||||
if data is not None:
|
if data is not None:
|
||||||
pool, time, price = data
|
pool, time, price = data
|
||||||
# log.debug(f'OHLC {pool["address"]} {time} {price}')
|
# log.debug(f'OHLC {pool["address"]} {time} {price}')
|
||||||
ohlcs.light_update_all(pool['address'], time, price)
|
ohlcs.light_update_all(pool['address'], time, price)
|
||||||
|
|
||||||
async def flush_callback():
|
async def flush_callback():
|
||||||
|
|||||||
@@ -19,8 +19,13 @@ from dexorder.util.async_dict import AsyncDict
|
|||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
async def get_block_timestamp(blockid: Union[bytes,int]) -> int:
|
async def get_block_timestamp(blockid: Union[bytes,int], block_number: int = None) -> int:
|
||||||
block = await (fetch_block_by_number(blockid) if type(blockid) is int else get_block(blockid))
|
block = await (fetch_block_by_number(blockid) if type(blockid) is int else get_block(blockid))
|
||||||
|
if block is None:
|
||||||
|
if block_number is not None:
|
||||||
|
block = await fetch_block_by_number(block_number)
|
||||||
|
if block is None:
|
||||||
|
raise ValueError(f'Block {blockid} {block_number} not found')
|
||||||
return block.timestamp
|
return block.timestamp
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
|
import itertools
|
||||||
import logging
|
import logging
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
|
|
||||||
@@ -131,8 +132,10 @@ async def handle_transfer(transfer: EventData):
|
|||||||
|
|
||||||
async def handle_uniswap_swaps(swaps: list[EventData]):
|
async def handle_uniswap_swaps(swaps: list[EventData]):
|
||||||
# asynchronously prefetch the block timestamps we'll need
|
# asynchronously prefetch the block timestamps we'll need
|
||||||
hashes = set(swap['blockHash'] for swap in swaps)
|
block_ids = set((swap['blockHash'], swap['blockNumber']) for swap in swaps)
|
||||||
await asyncio.gather(*[get_block_timestamp(h) for h in hashes])
|
for batch in itertools.batched(block_ids, 4):
|
||||||
|
await asyncio.gather(*[get_block_timestamp(h,n) for h, n in batch])
|
||||||
|
|
||||||
# now execute the swaps synchronously
|
# now execute the swaps synchronously
|
||||||
for swap in swaps:
|
for swap in swaps:
|
||||||
await handle_uniswap_swap(swap)
|
await handle_uniswap_swap(swap)
|
||||||
|
|||||||
@@ -374,9 +374,10 @@ class OHLCRepository:
|
|||||||
updated = update_ohlc(historical[-1], period, time, price)
|
updated = update_ohlc(historical[-1], period, time, price)
|
||||||
# drop any historical bars that are older than we need
|
# drop any historical bars that are older than we need
|
||||||
# oldest_needed = cover the root block time plus one period prior
|
# oldest_needed = cover the root block time plus one period prior
|
||||||
root_hash = current_blockstate.get().root_branch.head
|
root_branch = current_blockstate.get().root_branch
|
||||||
|
root_hash = root_branch.head
|
||||||
if root_hash is not None:
|
if root_hash is not None:
|
||||||
root_timestamp = await get_block_timestamp(root_hash)
|
root_timestamp = await get_block_timestamp(root_hash, root_branch.height)
|
||||||
oldest_needed = from_timestamp(root_timestamp) - period
|
oldest_needed = from_timestamp(root_timestamp) - period
|
||||||
# noinspection PyTypeChecker
|
# noinspection PyTypeChecker
|
||||||
trim = (oldest_needed - historical[0].start) // period
|
trim = (oldest_needed - historical[0].start) // period
|
||||||
|
|||||||
@@ -118,6 +118,6 @@ async def get_uniswap_data(swap: EventData) -> Optional[tuple[OldPoolDict, datet
|
|||||||
if pool['exchange'] != Exchange.UniswapV3.value:
|
if pool['exchange'] != Exchange.UniswapV3.value:
|
||||||
return None
|
return None
|
||||||
price: dec = await uniswap_price(pool, sqrt_price)
|
price: dec = await uniswap_price(pool, sqrt_price)
|
||||||
timestamp = await get_block_timestamp(swap['blockHash'])
|
timestamp = await get_block_timestamp(swap['blockHash'], swap['blockNumber'])
|
||||||
dt = from_timestamp(timestamp)
|
dt = from_timestamp(timestamp)
|
||||||
return pool, dt, price
|
return pool, dt, price
|
||||||
|
|||||||
Reference in New Issue
Block a user