diff --git a/src/dexorder/base/fork.py b/src/dexorder/base/fork.py index a9e5a0f..875a6c9 100644 --- a/src/dexorder/base/fork.py +++ b/src/dexorder/base/fork.py @@ -37,7 +37,7 @@ class Fork: @property def parent(self): - return self.ancestry[1] + return self.ancestry[1] if len(self.ancestry) > 1 else None def for_height(self, height): """ returns a new Fork object for an older block along this fork. used for root promotion. """ diff --git a/src/dexorder/progressor.py b/src/dexorder/progressor.py index ebb45a4..61cada6 100644 --- a/src/dexorder/progressor.py +++ b/src/dexorder/progressor.py @@ -1,3 +1,4 @@ +import functools import logging from abc import ABCMeta, abstractmethod from typing import Callable, Union @@ -29,8 +30,11 @@ class BlockProgressor(metaclass=ABCMeta): def add_event_trigger(self, # callback takes either a single event if multi=False, or if multi=True then a list of all events in the processing range - callback: Union[Callable[[EventData], Maywaitable[None]], - Callable[[list[EventData]], Maywaitable[None]]], + callback: Union[ + Callable[[EventData], Maywaitable[None]], + Callable[[list[EventData]], Maywaitable[None]], + Callable[[], Maywaitable[None]], + ], event: ContractEvents = None, log_filter: Union[dict, str] = None, *, multi=False): @@ -39,7 +43,7 @@ class BlockProgressor(metaclass=ABCMeta): """ if log_filter is None and event is not None: log_filter = {'topics': [topic(event.abi)]} - cb = callback if multi else lambda events: map(cb, events) + cb = callback if event is None or multi else functools.partial(map, callback) self.events.append((cb, event, log_filter)) @abstractmethod diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index ad5d030..ca5de55 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -108,7 +108,7 @@ class BlockStateRunner(BlockProgressor): subscription = await w3ws.eth.subscribe('newHeads') # the return value of this call is not consistent between anvil/hardhat/rpc. do not use it. log.debug(f'subscribed to newHeads {subscription}') while self.running: - async for message in w3ws.ws.listen_to_websocket(): + async for message in w3ws.ws.process_subscriptions(): head = message['result'] log.debug(f'detected new block {head["number"]} {hexstr(head["hash"])}') await self.add_head(head["hash"]) @@ -255,7 +255,6 @@ class BlockStateRunner(BlockProgressor): async def handle_head(self, chain, block, w3): - print(f'logger {log} {log.name} level {log.level} {logging.DEBUG} {logging.FATAL}') log.debug(f'handle_head {block.height} {hexstr(block.hash)}') session = None batches = [] @@ -275,7 +274,7 @@ class BlockStateRunner(BlockProgressor): log.debug(f'discarded late-arriving head {block}') else: batches: list - from_height = fork.parent.height + from_height = self.state.by_hash[fork.parent].height if fork.parent is not None else fork.height to_height = fork.height if fork.disjoint: batches = await self.get_backfill_batches(from_height, to_height, w3) @@ -293,8 +292,8 @@ class BlockStateRunner(BlockProgressor): if not config.parallel_logevent_queries: get_logs = await get_logs batches.append((get_logs, callback, event, log_filter)) - for callback in self.postprocess_cbs: - batches.append((None, callback, None, None)) + for callback in self.postprocess_cbs: + batches.append((None, callback, None, None)) # set up for callbacks current_block.set(block)