local bloom filters

This commit is contained in:
Tim
2024-03-24 15:00:04 -04:00
parent b9bd452508
commit f01f67a005
4 changed files with 29 additions and 20 deletions

View File

@@ -12,3 +12,4 @@ hexbytes
websockets
cachetools
async-lru
eth-bloom

View File

@@ -44,7 +44,7 @@ async def load_pool(address: str) -> PoolDict:
decimals = token0['decimals'] - token1['decimals']
found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.UniswapV3.value,
base=t0, quote=t1, fee=fee, decimals=decimals)
log.debug(f'new UniswapV3 pool {token0["symbol"]}/{token1["symbol"]} {fee/1_000_000:.2%}'
log.debug(f'new UniswapV3 pool {token0["symbol"]}/{token1["symbol"]} {fee/1_000_000:.2%} '
f'{("."+str(decimals)) if decimals >= 0 else (str(-decimals)+".")} {address}')
except ContractLogicError:
pass

View File

@@ -80,23 +80,24 @@ class BlockProgressor(metaclass=ABCMeta):
# we remove entries as we process them, so the exception handler doesn't re-await the callbacks
batch = batches.pop(0)
future, callback, event, filter_args = batch
if future is None:
if filter_args is None:
await maywait(callback()) # non-log callback
else:
try:
log_events = await future if config.parallel_logevent_queries else future
except ValueError as e:
if e.args[0].get('code') == -32602:
# too many logs were returned in the batch, so decrease the batch size.
fatal(f'Decrease batch size for {chain}')
raise
parsed_events = []
for log_event in log_events:
if future is not None:
try:
parsed = event.process_log(log_event) if event is not None else log_event
except (LogTopicError, MismatchedABI) as e:
# log.debug(f'logevent parse error {e}\n{log_event}') # this happens for Swap events from non-Uniswap pools
parsed = NARG # need a placeholder
parsed_events.append(parsed)
log_events = await future if config.parallel_logevent_queries else future
except ValueError as e:
if e.args[0].get('code') == -32602:
# too many logs were returned in the batch, so decrease the batch size.
fatal(f'Decrease batch size for {chain}')
raise
for log_event in log_events:
try:
parsed = event.process_log(log_event) if event is not None else log_event
except (LogTopicError, MismatchedABI) as e:
# log.debug(f'logevent parse error {e}\n{log_event}') # this happens for Swap events from non-Uniswap pools
parsed = NARG # need a placeholder
parsed_events.append(parsed)
# todo try/except for known retryable errors
await maywait(callback(parsed_events))

View File

@@ -3,6 +3,7 @@ import logging
from asyncio import Queue
from typing import Any, Iterable, Callable
from eth_bloom import BloomFilter
from web3.exceptions import LogTopicError, MismatchedABI
# noinspection PyPackageRequirements
from websockets.exceptions import ConnectionClosedError
@@ -16,7 +17,7 @@ from dexorder.blockstate.diff import DiffEntryItem
from dexorder.database.model import Block
from dexorder.database.model.block import current_block, latest_block
from dexorder.progressor import BlockProgressor
from dexorder.util import hexstr
from dexorder.util import hexstr, hexint, hexbytes
from dexorder.util.async_util import maywait, Maywaitable
from dexorder.util.shutdown import fatal
@@ -282,16 +283,22 @@ class BlockStateRunner(BlockProgressor):
else:
# event callbacks are triggered in the order in which they're registered. the events passed to
# each callback are in block transaction order
bloom = BloomFilter(int.from_bytes(block.data['logsBloom']))
for callback, event, log_filter in self.events:
if log_filter is None:
batches.append((None, callback, event, None))
else:
# todo use head['logsBloom'] to skip unnecessary log queries
lf = dict(log_filter)
lf['blockHash'] = hexstr(block.hash)
get_logs = w3.eth.get_logs(lf)
if not config.parallel_logevent_queries:
get_logs = await get_logs
has_logs = any(bytes(hexbytes(topic)) in bloom for topic in lf['topics'])
# log.debug(f'has {event.__class__.__name__}? {has_logs}')
if not has_logs:
get_logs = None
else:
# log.debug(f'has {event.__class__.__name__}')
get_logs = w3.eth.get_logs(lf)
if not config.parallel_logevent_queries:
get_logs = await get_logs
batches.append((get_logs, callback, event, log_filter))
for callback in self.postprocess_cbs:
batches.append((None, callback, None, None))