bugfixes; blockstate rework seems decent

This commit is contained in:
Tim
2024-04-03 01:47:44 -04:00
parent 7acc51a652
commit 8d66e97266
7 changed files with 47 additions and 30 deletions

View File

@@ -18,8 +18,8 @@ from dexorder.util.async_dict import AsyncDict
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
async def get_block_timestamp(blockhash) -> int: async def get_block_timestamp(blockid: Union[bytes,int]) -> int:
block = await get_block(blockhash) block = await (get_block_by_number(blockid) if type(blockid) is int else get_block(blockid))
return block.timestamp return block.timestamp
@@ -72,5 +72,5 @@ async def fetch_block(blockhash, *, chain_id=None):
def promotion_height(chain, latest_height): def promotion_height(chain, latest_height):
confirm_offset = (config.confirms if config.confirms is not None else chain.confirms) - 1 confirm_offset = config.confirms if config.confirms is not None else chain.confirms
return latest_height - confirm_offset return latest_height - confirm_offset

View File

@@ -59,8 +59,11 @@ class Branch:
def __str__(self): def __str__(self):
# noinspection PyTypeChecker # noinspection PyTypeChecker
return (f"Branch#{str(self.id)[2:7]}[" + return (f"Branch#{str(self.id)[2:7]}[{self.height}" +
(','.join(hexstr(b)[2:7] for b in self.path) if self.path else f'{self.start},{self.height}') + ']') (':' if self.contiguous else '_') +
('<-'.join(hexstr(b)[2:7] for b in self.path) if self.path else f'{self.start}') +
(f':{self.start}' if self.disjoint else '') +
f'<-({hexstr(self.parent)[2:7]})]')
current_branch = ContextVar[Branch]('current_branch') current_branch = ContextVar[Branch]('current_branch')

View File

@@ -1,6 +1,6 @@
import logging import logging
from contextvars import ContextVar from contextvars import ContextVar
from typing import Optional, Sequence from typing import Optional, Sequence, Union
from dexorder import NARG from dexorder import NARG
from dexorder.blockstate.branch import Branch from dexorder.blockstate.branch import Branch
@@ -29,6 +29,10 @@ class Fork:
self.head = self.branch.head self.head = self.branch.head
self.parent = branches[-1].parent self.parent = branches[-1].parent
@property
def head_identifier(self) -> Union[int, str]:
return self.head if self.head is not None else self.height
def __str__(self): def __str__(self):
return f'Fork[{"<-".join(str(b) for b in self.branches)}]' return f'Fork[{"<-".join(str(b) for b in self.branches)}]'

View File

@@ -11,6 +11,8 @@ from dexorder.blockstate.fork import Fork
from .branch import Branch, BranchId from .branch import Branch, BranchId
from .diff import DiffEntry, DELETE, DiffEntryItem from .diff import DiffEntry, DELETE, DiffEntryItem
from ..base.block import Block from ..base.block import Block
from ..base.chain import current_chain
from ..blocks import promotion_height
from ..util import hexstr from ..util import hexstr
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -55,6 +57,7 @@ class BlockState:
def __init__(self): def __init__(self):
self._root_branch: Optional[Branch] = None self._root_branch: Optional[Branch] = None
self._root_fork: Optional[Fork] = None self._root_fork: Optional[Fork] = None
self.height: int = 0 # highest branch seen
# Branches indexed by height # Branches indexed by height
self.branches_by_height: dict[int, list[Branch]] = defaultdict(list) self.branches_by_height: dict[int, list[Branch]] = defaultdict(list)
@@ -101,7 +104,7 @@ class BlockState:
return result return result
def add_branch(self, branch: Branch, *, strict=True) -> Fork: def add_branch(self, branch: Branch) -> Fork:
""" """
If there is no root_branch set yet, this branch becomes the root branch. Otherwise, returns a Fork with the If there is no root_branch set yet, this branch becomes the root branch. Otherwise, returns a Fork with the
set of branches leading to the root. set of branches leading to the root.
@@ -115,23 +118,24 @@ class BlockState:
if self.root_branch is None: if self.root_branch is None:
self.root_branch = branch self.root_branch = branch
self.height = branch.height
state_log.info(f'Initialized BlockState with {branch}') state_log.info(f'Initialized BlockState with {branch}')
return Fork([self.root_branch]) return Fork([self.root_branch])
self.branches_by_height[branch.height].append(branch)
self.branches_by_id[branch.id] = branch
# search for a path to the root branch # search for a path to the root branch
def build_fork(cur: Branch) -> list[Branch]: def build_fork(cur: Branch) -> list[Branch]:
if cur == self.root_branch: if cur == self.root_branch:
return [cur] return [cur]
if strict and not cur.parent: parent_height = cur.start - 1
height_finalized = parent_height <= promotion_height(current_chain.get(), max(self.height, branch.height))
if not cur.parent and not height_finalized:
raise ValueError(f'No parent for branch {branch}') raise ValueError(f'No parent for branch {branch}')
parent_branches = [ parent_branches = [
p for p in self.branches_by_height.get(cur.start-1, []) p for p in self.branches_by_height.get(parent_height, [])
if not strict or cur.parent == p.head if height_finalized or cur.parent == p.head
] ]
if cur.parent == self.root_branch.head or not strict and cur.start == self.root_branch.height + 1: if parent_height == self.root_branch.height:
assert len(parent_branches) == 0
parent_branches.append(self.root_branch) parent_branches.append(self.root_branch)
if not parent_branches: if not parent_branches:
raise ValueError raise ValueError
@@ -144,11 +148,17 @@ class BlockState:
return [cur, *build_fork(parent)] return [cur, *build_fork(parent)]
fork = Fork(build_fork(branch)) fork = Fork(build_fork(branch))
self.branches_by_height[branch.height].append(branch)
self.branches_by_id[branch.id] = branch
self.height = max(self.height, branch.height)
state_log.info(f'added branch {fork}') state_log.info(f'added branch {fork}')
return fork return fork
def remove_branch(self, branch: Branch, *, remove_series_diffs=True): def remove_branch(self, branch: Branch, *, remove_series_diffs=True):
if branch.height == self.height and len(self.branches_by_height[branch.height]) == 1:
# this is the only branch at this height: compute the new lower height
self.height = max(b.height for b in self.branches_by_id.values()) if self.branches_by_id else 0
del self.branches_by_id[branch.id] del self.branches_by_id[branch.id]
by_height = self.branches_by_height.get(branch.height) by_height = self.branches_by_height.get(branch.height)
if by_height is not None: if by_height is not None:
@@ -166,7 +176,7 @@ class BlockState:
difflist = self.diffs_by_series.get(diff.series,{}).get(diff.key) difflist = self.diffs_by_series.get(diff.series,{}).get(diff.key)
if difflist is not None: if difflist is not None:
difflist.remove(diff.entry) difflist.remove(diff.entry)
state_log.info(f'removed branch {branch}'+ ('' if remove_series_diffs else ' (promoting)')) state_log.info(('promoting' if remove_series_diffs else 'removed')+f' branch {branch}')
def get(self, fork: Fork, series, key, default=NARG): def get(self, fork: Fork, series, key, default=NARG):

View File

@@ -45,15 +45,14 @@ class DeployTransaction (ContractTransaction):
return receipt return receipt
def call_wrapper(addr, name, func): def call_wrapper(addr, name, func):
async def f(*args, **kwargs): async def f(*args, **kwargs):
try: try:
blockhash = hexstr(current_fork.get().head) blockid = current_fork.get().head_identifier
except (LookupError, AttributeError): except (LookupError, AttributeError):
blockhash = 'latest' blockid = 'latest'
try: try:
return await func(*args).call(block_identifier=blockhash, **kwargs) return await func(*args).call(block_identifier=blockid, **kwargs)
except Web3Exception as e: except Web3Exception as e:
e.args += addr, name e.args += addr, name
raise e raise e

View File

@@ -313,7 +313,7 @@ def finish_execution_request(req: TrancheExecutionRequest, error: str):
last_ohlc_rollover = 0 last_ohlc_rollover = 0
async def check_ohlc_rollover(): async def check_ohlc_rollover():
global last_ohlc_rollover global last_ohlc_rollover
time = await get_block_timestamp(current_fork.get().head) time = await get_block_timestamp(current_fork.get().head_identifier)
dt = from_timestamp(time) dt = from_timestamp(time)
diff = time - last_ohlc_rollover diff = time - last_ohlc_rollover
if diff >= 60 or dt.minute != from_timestamp(last_ohlc_rollover).minute: if diff >= 60 or dt.minute != from_timestamp(last_ohlc_rollover).minute:

View File

@@ -12,7 +12,7 @@ from dexorder import Blockchain, db, current_pub, async_yield, current_w3, confi
from dexorder.base.block import Block, latest_block from dexorder.base.block import Block, latest_block
from dexorder.base.chain import current_chain, current_clock, BlockClock from dexorder.base.chain import current_chain, current_clock, BlockClock
from dexorder.blockchain.connection import create_w3_ws, create_w3 from dexorder.blockchain.connection import create_w3_ws, create_w3
from dexorder.blocks import cache_block, get_block, promotion_height from dexorder.blocks import cache_block, get_block, promotion_height, get_block_by_number
from dexorder.blockstate import BlockState, current_blockstate from dexorder.blockstate import BlockState, current_blockstate
from dexorder.blockstate.branch import Branch from dexorder.blockstate.branch import Branch
from dexorder.blockstate.diff import DiffEntryItem from dexorder.blockstate.diff import DiffEntryItem
@@ -152,11 +152,11 @@ class BlockStateRunner(BlockProgressor):
# rpc call Hardhat seems to consistently support. The worker must then detect the type of object pushed to the # rpc call Hardhat seems to consistently support. The worker must then detect the type of object pushed to the
# work queue and either use the block directly or query for the block if the queue object is a hashcode. # work queue and either use the block directly or query for the block if the queue object is a hashcode.
blockdata = await w3.eth.get_block('latest') blockdata = await w3.eth.get_block('latest')
head = blockdata['hash']
if head == prev_blockhash:
return prev_blockhash
log.debug(f'polled new head {hexstr(head)} {hexint(blockdata["number"])}')
block = Block(chain.id, blockdata) block = Block(chain.id, blockdata)
if block.hash == prev_blockhash and (
self.state is None or self.state.root_branch is None or self.state.height == block.height):
return prev_blockhash
log.debug(f'polled new head {block}')
latest_block[chain.id] = block latest_block[chain.id] = block
# prefetch the head's ancestors # prefetch the head's ancestors
if self.state is not None and self.state.root_branch is not None: if self.state is not None and self.state.root_branch is not None:
@@ -166,7 +166,7 @@ class BlockStateRunner(BlockProgressor):
while self.state.root_branch is not None and cur.height > self.state.root_branch.height: while self.state.root_branch is not None and cur.height > self.state.root_branch.height:
cur = await get_block(cur.parent, chain_id=chain.id) cur = await get_block(cur.parent, chain_id=chain.id)
self.new_head_event.set() self.new_head_event.set()
return head return block.hash
async def create_branch(self, chain: Blockchain) -> Optional[Fork]: async def create_branch(self, chain: Blockchain) -> Optional[Fork]:
if chain.id not in latest_block: if chain.id not in latest_block:
@@ -180,14 +180,15 @@ class BlockStateRunner(BlockProgressor):
# no root branch, so create one from a single block branch # no root branch, so create one from a single block branch
return self.state.add_branch(Branch.from_block(block)) return self.state.add_branch(Branch.from_block(block))
if block.height - self.state.root_branch.height >= chain.confirms * 2: if block.height - self.state.height >= chain.confirms * 2:
# create a disjoint backfilling branch # create a disjoint backfilling branch
start = self.state.root_branch.height + 1 start = self.state.root_branch.height + 1
# do not query more than the chain's batch size # do not query more than the chain's batch size
# do not query into the reorgable area. only query finalized data. # do not query into the reorgable area. only query finalized data.
height = min( start + chain.batch_size, block.height - chain.confirms) height = min( start + chain.batch_size, block.height - chain.confirms)
branch = Branch(height, start) # no parent or path end_block = await get_block_by_number(height)
return self.state.add_branch(branch, strict=False) branch = Branch(height, start, path=[end_block.hash]) # no parent
return self.state.add_branch(branch)
# otherwise construct an explicit list of linked blocks from the most recent head to the latest block # otherwise construct an explicit list of linked blocks from the most recent head to the latest block
heads = self.state.heads heads = self.state.heads
@@ -195,7 +196,7 @@ class BlockStateRunner(BlockProgressor):
cur = block cur = block
while True: while True:
if cur.parent in heads: if cur.parent in heads:
branch = Branch( block.height, block.height - len(path) + 1, block.parent, path, chain=chain ) branch = Branch( block.height, cur.height, cur.parent, path, chain=chain )
return self.state.add_branch(branch) return self.state.add_branch(branch)
if cur.height <= self.state.root_branch.height: if cur.height <= self.state.root_branch.height:
fatal(f'Latest head {block.hash} does not have the root block {self.state.root_branch.head} as a parent') fatal(f'Latest head {block.hash} does not have the root block {self.state.root_branch.head} as a parent')