From 8d66e972665a0c80abe6802d9fa89014c9ea4cdd Mon Sep 17 00:00:00 2001 From: Tim Date: Wed, 3 Apr 2024 01:47:44 -0400 Subject: [PATCH] bugfixes; blockstate rework seems decent --- src/dexorder/blocks.py | 6 +++--- src/dexorder/blockstate/branch.py | 7 +++++-- src/dexorder/blockstate/fork.py | 6 +++++- src/dexorder/blockstate/state.py | 28 +++++++++++++++++-------- src/dexorder/contract/contract_proxy.py | 7 +++---- src/dexorder/event_handler.py | 2 +- src/dexorder/runner.py | 21 ++++++++++--------- 7 files changed, 47 insertions(+), 30 deletions(-) diff --git a/src/dexorder/blocks.py b/src/dexorder/blocks.py index 07fa87a..31a1846 100644 --- a/src/dexorder/blocks.py +++ b/src/dexorder/blocks.py @@ -18,8 +18,8 @@ from dexorder.util.async_dict import AsyncDict log = logging.getLogger(__name__) -async def get_block_timestamp(blockhash) -> int: - block = await get_block(blockhash) +async def get_block_timestamp(blockid: Union[bytes,int]) -> int: + block = await (get_block_by_number(blockid) if type(blockid) is int else get_block(blockid)) return block.timestamp @@ -72,5 +72,5 @@ async def fetch_block(blockhash, *, chain_id=None): def promotion_height(chain, latest_height): - confirm_offset = (config.confirms if config.confirms is not None else chain.confirms) - 1 + confirm_offset = config.confirms if config.confirms is not None else chain.confirms return latest_height - confirm_offset diff --git a/src/dexorder/blockstate/branch.py b/src/dexorder/blockstate/branch.py index 825cb47..415c19f 100644 --- a/src/dexorder/blockstate/branch.py +++ b/src/dexorder/blockstate/branch.py @@ -59,8 +59,11 @@ class Branch: def __str__(self): # noinspection PyTypeChecker - return (f"Branch#{str(self.id)[2:7]}[" + - (','.join(hexstr(b)[2:7] for b in self.path) if self.path else f'{self.start},{self.height}') + ']') + return (f"Branch#{str(self.id)[2:7]}[{self.height}" + + (':' if self.contiguous else '_') + + ('<-'.join(hexstr(b)[2:7] for b in self.path) if self.path else f'{self.start}') + + (f':{self.start}' if self.disjoint else '') + + f'<-({hexstr(self.parent)[2:7]})]') current_branch = ContextVar[Branch]('current_branch') diff --git a/src/dexorder/blockstate/fork.py b/src/dexorder/blockstate/fork.py index 2b96986..3d2b291 100644 --- a/src/dexorder/blockstate/fork.py +++ b/src/dexorder/blockstate/fork.py @@ -1,6 +1,6 @@ import logging from contextvars import ContextVar -from typing import Optional, Sequence +from typing import Optional, Sequence, Union from dexorder import NARG from dexorder.blockstate.branch import Branch @@ -29,6 +29,10 @@ class Fork: self.head = self.branch.head self.parent = branches[-1].parent + @property + def head_identifier(self) -> Union[int, str]: + return self.head if self.head is not None else self.height + def __str__(self): return f'Fork[{"<-".join(str(b) for b in self.branches)}]' diff --git a/src/dexorder/blockstate/state.py b/src/dexorder/blockstate/state.py index 0a4fc92..55ca214 100644 --- a/src/dexorder/blockstate/state.py +++ b/src/dexorder/blockstate/state.py @@ -11,6 +11,8 @@ from dexorder.blockstate.fork import Fork from .branch import Branch, BranchId from .diff import DiffEntry, DELETE, DiffEntryItem from ..base.block import Block +from ..base.chain import current_chain +from ..blocks import promotion_height from ..util import hexstr log = logging.getLogger(__name__) @@ -55,6 +57,7 @@ class BlockState: def __init__(self): self._root_branch: Optional[Branch] = None self._root_fork: Optional[Fork] = None + self.height: int = 0 # highest branch seen # Branches indexed by height self.branches_by_height: dict[int, list[Branch]] = defaultdict(list) @@ -101,7 +104,7 @@ class BlockState: return result - def add_branch(self, branch: Branch, *, strict=True) -> Fork: + def add_branch(self, branch: Branch) -> Fork: """ If there is no root_branch set yet, this branch becomes the root branch. Otherwise, returns a Fork with the set of branches leading to the root. @@ -115,23 +118,24 @@ class BlockState: if self.root_branch is None: self.root_branch = branch + self.height = branch.height state_log.info(f'Initialized BlockState with {branch}') return Fork([self.root_branch]) - self.branches_by_height[branch.height].append(branch) - self.branches_by_id[branch.id] = branch - # search for a path to the root branch def build_fork(cur: Branch) -> list[Branch]: if cur == self.root_branch: return [cur] - if strict and not cur.parent: + parent_height = cur.start - 1 + height_finalized = parent_height <= promotion_height(current_chain.get(), max(self.height, branch.height)) + if not cur.parent and not height_finalized: raise ValueError(f'No parent for branch {branch}') parent_branches = [ - p for p in self.branches_by_height.get(cur.start-1, []) - if not strict or cur.parent == p.head + p for p in self.branches_by_height.get(parent_height, []) + if height_finalized or cur.parent == p.head ] - if cur.parent == self.root_branch.head or not strict and cur.start == self.root_branch.height + 1: + if parent_height == self.root_branch.height: + assert len(parent_branches) == 0 parent_branches.append(self.root_branch) if not parent_branches: raise ValueError @@ -144,11 +148,17 @@ class BlockState: return [cur, *build_fork(parent)] fork = Fork(build_fork(branch)) + self.branches_by_height[branch.height].append(branch) + self.branches_by_id[branch.id] = branch + self.height = max(self.height, branch.height) state_log.info(f'added branch {fork}') return fork def remove_branch(self, branch: Branch, *, remove_series_diffs=True): + if branch.height == self.height and len(self.branches_by_height[branch.height]) == 1: + # this is the only branch at this height: compute the new lower height + self.height = max(b.height for b in self.branches_by_id.values()) if self.branches_by_id else 0 del self.branches_by_id[branch.id] by_height = self.branches_by_height.get(branch.height) if by_height is not None: @@ -166,7 +176,7 @@ class BlockState: difflist = self.diffs_by_series.get(diff.series,{}).get(diff.key) if difflist is not None: difflist.remove(diff.entry) - state_log.info(f'removed branch {branch}'+ ('' if remove_series_diffs else ' (promoting)')) + state_log.info(('promoting' if remove_series_diffs else 'removed')+f' branch {branch}') def get(self, fork: Fork, series, key, default=NARG): diff --git a/src/dexorder/contract/contract_proxy.py b/src/dexorder/contract/contract_proxy.py index fcf2363..9ae6aab 100644 --- a/src/dexorder/contract/contract_proxy.py +++ b/src/dexorder/contract/contract_proxy.py @@ -45,15 +45,14 @@ class DeployTransaction (ContractTransaction): return receipt - def call_wrapper(addr, name, func): async def f(*args, **kwargs): try: - blockhash = hexstr(current_fork.get().head) + blockid = current_fork.get().head_identifier except (LookupError, AttributeError): - blockhash = 'latest' + blockid = 'latest' try: - return await func(*args).call(block_identifier=blockhash, **kwargs) + return await func(*args).call(block_identifier=blockid, **kwargs) except Web3Exception as e: e.args += addr, name raise e diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index 5636b2f..46df375 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -313,7 +313,7 @@ def finish_execution_request(req: TrancheExecutionRequest, error: str): last_ohlc_rollover = 0 async def check_ohlc_rollover(): global last_ohlc_rollover - time = await get_block_timestamp(current_fork.get().head) + time = await get_block_timestamp(current_fork.get().head_identifier) dt = from_timestamp(time) diff = time - last_ohlc_rollover if diff >= 60 or dt.minute != from_timestamp(last_ohlc_rollover).minute: diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index 5aac6d9..04329af 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -12,7 +12,7 @@ from dexorder import Blockchain, db, current_pub, async_yield, current_w3, confi from dexorder.base.block import Block, latest_block from dexorder.base.chain import current_chain, current_clock, BlockClock from dexorder.blockchain.connection import create_w3_ws, create_w3 -from dexorder.blocks import cache_block, get_block, promotion_height +from dexorder.blocks import cache_block, get_block, promotion_height, get_block_by_number from dexorder.blockstate import BlockState, current_blockstate from dexorder.blockstate.branch import Branch from dexorder.blockstate.diff import DiffEntryItem @@ -152,11 +152,11 @@ class BlockStateRunner(BlockProgressor): # rpc call Hardhat seems to consistently support. The worker must then detect the type of object pushed to the # work queue and either use the block directly or query for the block if the queue object is a hashcode. blockdata = await w3.eth.get_block('latest') - head = blockdata['hash'] - if head == prev_blockhash: - return prev_blockhash - log.debug(f'polled new head {hexstr(head)} {hexint(blockdata["number"])}') block = Block(chain.id, blockdata) + if block.hash == prev_blockhash and ( + self.state is None or self.state.root_branch is None or self.state.height == block.height): + return prev_blockhash + log.debug(f'polled new head {block}') latest_block[chain.id] = block # prefetch the head's ancestors if self.state is not None and self.state.root_branch is not None: @@ -166,7 +166,7 @@ class BlockStateRunner(BlockProgressor): while self.state.root_branch is not None and cur.height > self.state.root_branch.height: cur = await get_block(cur.parent, chain_id=chain.id) self.new_head_event.set() - return head + return block.hash async def create_branch(self, chain: Blockchain) -> Optional[Fork]: if chain.id not in latest_block: @@ -180,14 +180,15 @@ class BlockStateRunner(BlockProgressor): # no root branch, so create one from a single block branch return self.state.add_branch(Branch.from_block(block)) - if block.height - self.state.root_branch.height >= chain.confirms * 2: + if block.height - self.state.height >= chain.confirms * 2: # create a disjoint backfilling branch start = self.state.root_branch.height + 1 # do not query more than the chain's batch size # do not query into the reorgable area. only query finalized data. height = min( start + chain.batch_size, block.height - chain.confirms) - branch = Branch(height, start) # no parent or path - return self.state.add_branch(branch, strict=False) + end_block = await get_block_by_number(height) + branch = Branch(height, start, path=[end_block.hash]) # no parent + return self.state.add_branch(branch) # otherwise construct an explicit list of linked blocks from the most recent head to the latest block heads = self.state.heads @@ -195,7 +196,7 @@ class BlockStateRunner(BlockProgressor): cur = block while True: if cur.parent in heads: - branch = Branch( block.height, block.height - len(path) + 1, block.parent, path, chain=chain ) + branch = Branch( block.height, cur.height, cur.parent, path, chain=chain ) return self.state.add_branch(branch) if cur.height <= self.state.root_branch.height: fatal(f'Latest head {block.hash} does not have the root block {self.state.root_branch.head} as a parent')