missing block handling
This commit is contained in:
@@ -1,7 +1,7 @@
|
|||||||
delete from block;
|
-- deleting from keyvalue will also reset the finaldata server so be careful
|
||||||
delete from keyvalue;
|
delete from keyvalue;
|
||||||
|
delete from block;
|
||||||
delete from orderindex;
|
delete from orderindex;
|
||||||
delete from seriesdict;
|
delete from seriesdict;
|
||||||
delete from seriesset;
|
delete from seriesset;
|
||||||
delete from tx;
|
|
||||||
delete from transactionjob;
|
delete from transactionjob;
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ from dexorder.addrmeta import address_metadata
|
|||||||
from dexorder.base.block import latest_block
|
from dexorder.base.block import latest_block
|
||||||
from dexorder.base.chain import current_chain
|
from dexorder.base.chain import current_chain
|
||||||
from dexorder.bin.executable import execute
|
from dexorder.bin.executable import execute
|
||||||
from dexorder.blocks import get_block_timestamp, get_block_by_number
|
from dexorder.blocks import get_block_timestamp, fetch_block_by_number
|
||||||
from dexorder.blockstate.fork import current_fork
|
from dexorder.blockstate.fork import current_fork
|
||||||
from dexorder.configuration import parse_args
|
from dexorder.configuration import parse_args
|
||||||
from dexorder.contract import get_contract_event
|
from dexorder.contract import get_contract_event
|
||||||
@@ -45,7 +45,7 @@ async def flush_callback():
|
|||||||
confirms = (config.confirms if config.confirms is not None else current_chain.get().confirms) - 1
|
confirms = (config.confirms if config.confirms is not None else current_chain.get().confirms) - 1
|
||||||
chain_id = current_chain.get().id
|
chain_id = current_chain.get().id
|
||||||
fork = current_fork.get()
|
fork = current_fork.get()
|
||||||
block = await get_block_by_number(fork.height, chain_id=chain_id)
|
block = await fetch_block_by_number(fork.height, chain_id=chain_id)
|
||||||
time = from_timestamp(block.timestamp)
|
time = from_timestamp(block.timestamp)
|
||||||
if latest_block[chain_id].height - fork.height <= 2*confirms:
|
if latest_block[chain_id].height - fork.height <= 2*confirms:
|
||||||
log.info(f'forward filling to present time')
|
log.info(f'forward filling to present time')
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ Use `await fetch_block()` to force an RPC query for the Block, adding that block
|
|||||||
"""
|
"""
|
||||||
import logging
|
import logging
|
||||||
from contextvars import ContextVar
|
from contextvars import ContextVar
|
||||||
from typing import Union
|
from typing import Union, Optional
|
||||||
|
|
||||||
from cachetools import LRUCache
|
from cachetools import LRUCache
|
||||||
|
|
||||||
@@ -20,11 +20,11 @@ log = logging.getLogger(__name__)
|
|||||||
|
|
||||||
|
|
||||||
async def get_block_timestamp(blockid: Union[bytes,int]) -> int:
|
async def get_block_timestamp(blockid: Union[bytes,int]) -> int:
|
||||||
block = await (get_block_by_number(blockid) if type(blockid) is int else get_block(blockid))
|
block = await (fetch_block_by_number(blockid) if type(blockid) is int else get_block(blockid))
|
||||||
return block.timestamp
|
return block.timestamp
|
||||||
|
|
||||||
|
|
||||||
async def _cache_fetch(key: tuple[int, bytes], default: Union[Block, NARG]) -> Block:
|
async def _cache_fetch(key: tuple[int, Union[int,bytes]], default: Union[Block, NARG]) -> Optional[Block]:
|
||||||
assert default is NARG
|
assert default is NARG
|
||||||
# try LRU cache first
|
# try LRU cache first
|
||||||
try:
|
try:
|
||||||
@@ -32,8 +32,15 @@ async def _cache_fetch(key: tuple[int, bytes], default: Union[Block, NARG]) -> B
|
|||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
# fetch from RPC
|
# fetch from RPC
|
||||||
chain_id, blockhash = key
|
chain_id, blockid = key
|
||||||
result = await fetch_block(blockhash, chain_id=chain_id)
|
# log.debug(f'block cache miss; fetching {chain_id} {blockid}')
|
||||||
|
if type(blockid) is int:
|
||||||
|
result = await fetch_block_by_number(blockid, chain_id=chain_id)
|
||||||
|
else:
|
||||||
|
result = await fetch_block(blockid, chain_id=chain_id)
|
||||||
|
if result is not None:
|
||||||
|
# log.debug(f'Could not lookup block {blockid}')
|
||||||
|
return None # do not cache
|
||||||
_lru[key] = result
|
_lru[key] = result
|
||||||
return result
|
return result
|
||||||
|
|
||||||
@@ -51,20 +58,27 @@ async def get_block(blockhash, *, chain_id=None) -> Block:
|
|||||||
return await _cache.get((chain_id, blockhash))
|
return await _cache.get((chain_id, blockhash))
|
||||||
|
|
||||||
|
|
||||||
async def get_block_by_number(height: int, *, chain_id=None) -> Block:
|
async def fetch_block_by_number(height: int, *, chain_id=None) -> Block:
|
||||||
|
# log.debug(f'fetch_block_by_number {height} {chain_id}')
|
||||||
if chain_id is None:
|
if chain_id is None:
|
||||||
chain_id = current_chain.get().id
|
chain_id = current_chain.get().id
|
||||||
response = await current_w3.get().provider.make_request('eth_getBlockByNumber', [hex(height), False])
|
response = await current_w3.get().provider.make_request('eth_getBlockByNumber', [hex(height), False])
|
||||||
|
# log.debug(f'fetch_block_by_number response {height} {chain_id} {response}')
|
||||||
block = Block(chain_id, response['result'])
|
block = Block(chain_id, response['result'])
|
||||||
cache_block(block)
|
cache_block(block)
|
||||||
return block
|
return block
|
||||||
|
|
||||||
|
|
||||||
async def fetch_block(blockhash, *, chain_id=None):
|
async def fetch_block(blockhash, *, chain_id=None):
|
||||||
|
# log.debug(f'fetch_block {blockhash} {chain_id}')
|
||||||
if chain_id is None:
|
if chain_id is None:
|
||||||
chain_id = current_chain.get().id
|
chain_id = current_chain.get().id
|
||||||
response = await current_w3.get().provider.make_request('eth_getBlockByHash', [blockhash, False])
|
response = await current_w3.get().provider.make_request('eth_getBlockByHash', [blockhash, False])
|
||||||
|
# log.debug(f'fetch_block response {blockhash} {chain_id} {response}')
|
||||||
blockdict: BlockInfo = response['result']
|
blockdict: BlockInfo = response['result']
|
||||||
|
if blockdict is None:
|
||||||
|
log.debug(f'block {blockhash} not found')
|
||||||
|
return None
|
||||||
block = Block(chain_id, blockdict)
|
block = Block(chain_id, blockdict)
|
||||||
# if db:
|
# if db:
|
||||||
# db.kv[block.db_key] = blockdict
|
# db.kv[block.db_key] = blockdict
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ from .. import db, DELETE
|
|||||||
from ..base.chain import current_chain
|
from ..base.chain import current_chain
|
||||||
from ..blocks import get_block
|
from ..blocks import get_block
|
||||||
from ..database.model import SeriesSet, SeriesDict
|
from ..database.model import SeriesSet, SeriesDict
|
||||||
|
from ..util.shutdown import fatal
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -73,7 +74,14 @@ class DbState(SeriesCollection):
|
|||||||
height, hash = db.kv[f'root_block|{chain_id}']
|
height, hash = db.kv[f'root_block|{chain_id}']
|
||||||
except (KeyError, ValueError):
|
except (KeyError, ValueError):
|
||||||
return None
|
return None
|
||||||
|
# log.debug(f'getting state for hash {hash}')
|
||||||
root_block = await get_block(hash)
|
root_block = await get_block(hash)
|
||||||
|
if root_block is None:
|
||||||
|
log.debug(f'couldn\'t find root block by hash. trying number {height}.')
|
||||||
|
root_block = await get_block(height)
|
||||||
|
if root_block is None:
|
||||||
|
fatal(f'Could not get root block {height} {hash} from RPC')
|
||||||
|
assert root_block.hash == hash
|
||||||
assert root_block.height == height
|
assert root_block.height == height
|
||||||
state = BlockState()
|
state = BlockState()
|
||||||
root_fork = state.init_root_block(root_block)
|
root_fork = state.init_root_block(root_block)
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ from dexorder import Blockchain, db, current_pub, async_yield, current_w3, confi
|
|||||||
from dexorder.base.block import Block, latest_block
|
from dexorder.base.block import Block, latest_block
|
||||||
from dexorder.base.chain import current_chain, current_clock, BlockClock
|
from dexorder.base.chain import current_chain, current_clock, BlockClock
|
||||||
from dexorder.blockchain.connection import create_w3_ws, create_w3
|
from dexorder.blockchain.connection import create_w3_ws, create_w3
|
||||||
from dexorder.blocks import cache_block, get_block, promotion_height, get_block_by_number, current_block
|
from dexorder.blocks import cache_block, get_block, promotion_height, fetch_block_by_number, current_block
|
||||||
from dexorder.blockstate import BlockState, current_blockstate
|
from dexorder.blockstate import BlockState, current_blockstate
|
||||||
from dexorder.blockstate.branch import Branch
|
from dexorder.blockstate.branch import Branch
|
||||||
from dexorder.blockstate.diff import DiffEntryItem
|
from dexorder.blockstate.diff import DiffEntryItem
|
||||||
@@ -189,7 +189,7 @@ class BlockStateRunner(BlockProgressor):
|
|||||||
# do not query more than the chain's batch size
|
# do not query more than the chain's batch size
|
||||||
# do not query into the reorgable area. only query finalized data.
|
# do not query into the reorgable area. only query finalized data.
|
||||||
height = min( start + chain.batch_size, block.height - chain.confirms)
|
height = min( start + chain.batch_size, block.height - chain.confirms)
|
||||||
end_block = await get_block_by_number(height)
|
end_block = await fetch_block_by_number(height)
|
||||||
branch = Branch(height, start, path=[end_block.hash]) # no parent
|
branch = Branch(height, start, path=[end_block.hash]) # no parent
|
||||||
return self.state.add_branch(branch)
|
return self.state.add_branch(branch)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user