From 0946d82f5234896c5ec93e243f0333bcef1014a3 Mon Sep 17 00:00:00 2001 From: Tim Date: Thu, 18 Apr 2024 15:02:06 -0400 Subject: [PATCH] missing block handling --- bin/RESET_DB.sql | 4 ++-- src/dexorder/bin/finaldata.py | 4 ++-- src/dexorder/blocks.py | 26 ++++++++++++++++++++------ src/dexorder/blockstate/db_state.py | 8 ++++++++ src/dexorder/runner.py | 4 ++-- 5 files changed, 34 insertions(+), 12 deletions(-) diff --git a/bin/RESET_DB.sql b/bin/RESET_DB.sql index 15957cf..87b1b5a 100644 --- a/bin/RESET_DB.sql +++ b/bin/RESET_DB.sql @@ -1,7 +1,7 @@ -delete from block; +-- deleting from keyvalue will also reset the finaldata server so be careful delete from keyvalue; +delete from block; delete from orderindex; delete from seriesdict; delete from seriesset; -delete from tx; delete from transactionjob; diff --git a/src/dexorder/bin/finaldata.py b/src/dexorder/bin/finaldata.py index d81a9d1..d350927 100644 --- a/src/dexorder/bin/finaldata.py +++ b/src/dexorder/bin/finaldata.py @@ -10,7 +10,7 @@ from dexorder.addrmeta import address_metadata from dexorder.base.block import latest_block from dexorder.base.chain import current_chain from dexorder.bin.executable import execute -from dexorder.blocks import get_block_timestamp, get_block_by_number +from dexorder.blocks import get_block_timestamp, fetch_block_by_number from dexorder.blockstate.fork import current_fork from dexorder.configuration import parse_args from dexorder.contract import get_contract_event @@ -45,7 +45,7 @@ async def flush_callback(): confirms = (config.confirms if config.confirms is not None else current_chain.get().confirms) - 1 chain_id = current_chain.get().id fork = current_fork.get() - block = await get_block_by_number(fork.height, chain_id=chain_id) + block = await fetch_block_by_number(fork.height, chain_id=chain_id) time = from_timestamp(block.timestamp) if latest_block[chain_id].height - fork.height <= 2*confirms: log.info(f'forward filling to present time') diff --git a/src/dexorder/blocks.py b/src/dexorder/blocks.py index 8442a68..829d9e3 100644 --- a/src/dexorder/blocks.py +++ b/src/dexorder/blocks.py @@ -7,7 +7,7 @@ Use `await fetch_block()` to force an RPC query for the Block, adding that block """ import logging from contextvars import ContextVar -from typing import Union +from typing import Union, Optional from cachetools import LRUCache @@ -20,11 +20,11 @@ log = logging.getLogger(__name__) async def get_block_timestamp(blockid: Union[bytes,int]) -> int: - block = await (get_block_by_number(blockid) if type(blockid) is int else get_block(blockid)) + block = await (fetch_block_by_number(blockid) if type(blockid) is int else get_block(blockid)) return block.timestamp -async def _cache_fetch(key: tuple[int, bytes], default: Union[Block, NARG]) -> Block: +async def _cache_fetch(key: tuple[int, Union[int,bytes]], default: Union[Block, NARG]) -> Optional[Block]: assert default is NARG # try LRU cache first try: @@ -32,8 +32,15 @@ async def _cache_fetch(key: tuple[int, bytes], default: Union[Block, NARG]) -> B except KeyError: pass # fetch from RPC - chain_id, blockhash = key - result = await fetch_block(blockhash, chain_id=chain_id) + chain_id, blockid = key + # log.debug(f'block cache miss; fetching {chain_id} {blockid}') + if type(blockid) is int: + result = await fetch_block_by_number(blockid, chain_id=chain_id) + else: + result = await fetch_block(blockid, chain_id=chain_id) + if result is not None: + # log.debug(f'Could not lookup block {blockid}') + return None # do not cache _lru[key] = result return result @@ -51,20 +58,27 @@ async def get_block(blockhash, *, chain_id=None) -> Block: return await _cache.get((chain_id, blockhash)) -async def get_block_by_number(height: int, *, chain_id=None) -> Block: +async def fetch_block_by_number(height: int, *, chain_id=None) -> Block: + # log.debug(f'fetch_block_by_number {height} {chain_id}') if chain_id is None: chain_id = current_chain.get().id response = await current_w3.get().provider.make_request('eth_getBlockByNumber', [hex(height), False]) + # log.debug(f'fetch_block_by_number response {height} {chain_id} {response}') block = Block(chain_id, response['result']) cache_block(block) return block async def fetch_block(blockhash, *, chain_id=None): + # log.debug(f'fetch_block {blockhash} {chain_id}') if chain_id is None: chain_id = current_chain.get().id response = await current_w3.get().provider.make_request('eth_getBlockByHash', [blockhash, False]) + # log.debug(f'fetch_block response {blockhash} {chain_id} {response}') blockdict: BlockInfo = response['result'] + if blockdict is None: + log.debug(f'block {blockhash} not found') + return None block = Block(chain_id, blockdict) # if db: # db.kv[block.db_key] = blockdict diff --git a/src/dexorder/blockstate/db_state.py b/src/dexorder/blockstate/db_state.py index 712d196..e581cf6 100644 --- a/src/dexorder/blockstate/db_state.py +++ b/src/dexorder/blockstate/db_state.py @@ -9,6 +9,7 @@ from .. import db, DELETE from ..base.chain import current_chain from ..blocks import get_block from ..database.model import SeriesSet, SeriesDict +from ..util.shutdown import fatal log = logging.getLogger(__name__) @@ -73,7 +74,14 @@ class DbState(SeriesCollection): height, hash = db.kv[f'root_block|{chain_id}'] except (KeyError, ValueError): return None + # log.debug(f'getting state for hash {hash}') root_block = await get_block(hash) + if root_block is None: + log.debug(f'couldn\'t find root block by hash. trying number {height}.') + root_block = await get_block(height) + if root_block is None: + fatal(f'Could not get root block {height} {hash} from RPC') + assert root_block.hash == hash assert root_block.height == height state = BlockState() root_fork = state.init_root_block(root_block) diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index de2d9eb..0caef23 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -12,7 +12,7 @@ from dexorder import Blockchain, db, current_pub, async_yield, current_w3, confi from dexorder.base.block import Block, latest_block from dexorder.base.chain import current_chain, current_clock, BlockClock from dexorder.blockchain.connection import create_w3_ws, create_w3 -from dexorder.blocks import cache_block, get_block, promotion_height, get_block_by_number, current_block +from dexorder.blocks import cache_block, get_block, promotion_height, fetch_block_by_number, current_block from dexorder.blockstate import BlockState, current_blockstate from dexorder.blockstate.branch import Branch from dexorder.blockstate.diff import DiffEntryItem @@ -189,7 +189,7 @@ class BlockStateRunner(BlockProgressor): # do not query more than the chain's batch size # do not query into the reorgable area. only query finalized data. height = min( start + chain.batch_size, block.height - chain.confirms) - end_block = await get_block_by_number(height) + end_block = await fetch_block_by_number(height) branch = Branch(height, start, path=[end_block.hash]) # no parent return self.state.add_branch(branch)