This commit is contained in:
Tim
2024-02-26 23:39:52 -04:00
parent a68c6a3c86
commit 36274556a2
3 changed files with 12 additions and 9 deletions

View File

@@ -37,7 +37,7 @@ class Fork:
@property
def parent(self):
return self.ancestry[1]
return self.ancestry[1] if len(self.ancestry) > 1 else None
def for_height(self, height):
""" returns a new Fork object for an older block along this fork. used for root promotion. """

View File

@@ -1,3 +1,4 @@
import functools
import logging
from abc import ABCMeta, abstractmethod
from typing import Callable, Union
@@ -29,8 +30,11 @@ class BlockProgressor(metaclass=ABCMeta):
def add_event_trigger(self,
# callback takes either a single event if multi=False, or if multi=True then a list of all events in the processing range
callback: Union[Callable[[EventData], Maywaitable[None]],
Callable[[list[EventData]], Maywaitable[None]]],
callback: Union[
Callable[[EventData], Maywaitable[None]],
Callable[[list[EventData]], Maywaitable[None]],
Callable[[], Maywaitable[None]],
],
event: ContractEvents = None,
log_filter: Union[dict, str] = None,
*, multi=False):
@@ -39,7 +43,7 @@ class BlockProgressor(metaclass=ABCMeta):
"""
if log_filter is None and event is not None:
log_filter = {'topics': [topic(event.abi)]}
cb = callback if multi else lambda events: map(cb, events)
cb = callback if event is None or multi else functools.partial(map, callback)
self.events.append((cb, event, log_filter))
@abstractmethod

View File

@@ -108,7 +108,7 @@ class BlockStateRunner(BlockProgressor):
subscription = await w3ws.eth.subscribe('newHeads') # the return value of this call is not consistent between anvil/hardhat/rpc. do not use it.
log.debug(f'subscribed to newHeads {subscription}')
while self.running:
async for message in w3ws.ws.listen_to_websocket():
async for message in w3ws.ws.process_subscriptions():
head = message['result']
log.debug(f'detected new block {head["number"]} {hexstr(head["hash"])}')
await self.add_head(head["hash"])
@@ -255,7 +255,6 @@ class BlockStateRunner(BlockProgressor):
async def handle_head(self, chain, block, w3):
print(f'logger {log} {log.name} level {log.level} {logging.DEBUG} {logging.FATAL}')
log.debug(f'handle_head {block.height} {hexstr(block.hash)}')
session = None
batches = []
@@ -275,7 +274,7 @@ class BlockStateRunner(BlockProgressor):
log.debug(f'discarded late-arriving head {block}')
else:
batches: list
from_height = fork.parent.height
from_height = self.state.by_hash[fork.parent].height if fork.parent is not None else fork.height
to_height = fork.height
if fork.disjoint:
batches = await self.get_backfill_batches(from_height, to_height, w3)
@@ -293,8 +292,8 @@ class BlockStateRunner(BlockProgressor):
if not config.parallel_logevent_queries:
get_logs = await get_logs
batches.append((get_logs, callback, event, log_filter))
for callback in self.postprocess_cbs:
batches.append((None, callback, None, None))
for callback in self.postprocess_cbs:
batches.append((None, callback, None, None))
# set up for callbacks
current_block.set(block)