diff --git a/requirements.txt b/requirements.txt index 125ad9d..b0ebf9b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,3 +12,4 @@ hexbytes websockets cachetools async-lru +eth-bloom diff --git a/src/dexorder/pools.py b/src/dexorder/pools.py index e5fc920..163ba78 100644 --- a/src/dexorder/pools.py +++ b/src/dexorder/pools.py @@ -44,7 +44,7 @@ async def load_pool(address: str) -> PoolDict: decimals = token0['decimals'] - token1['decimals'] found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.UniswapV3.value, base=t0, quote=t1, fee=fee, decimals=decimals) - log.debug(f'new UniswapV3 pool {token0["symbol"]}/{token1["symbol"]} {fee/1_000_000:.2%}' + log.debug(f'new UniswapV3 pool {token0["symbol"]}/{token1["symbol"]} {fee/1_000_000:.2%} ' f'{("."+str(decimals)) if decimals >= 0 else (str(-decimals)+".")} {address}') except ContractLogicError: pass diff --git a/src/dexorder/progressor.py b/src/dexorder/progressor.py index 7b0d356..08637de 100644 --- a/src/dexorder/progressor.py +++ b/src/dexorder/progressor.py @@ -80,23 +80,24 @@ class BlockProgressor(metaclass=ABCMeta): # we remove entries as we process them, so the exception handler doesn't re-await the callbacks batch = batches.pop(0) future, callback, event, filter_args = batch - if future is None: + if filter_args is None: await maywait(callback()) # non-log callback else: - try: - log_events = await future if config.parallel_logevent_queries else future - except ValueError as e: - if e.args[0].get('code') == -32602: - # too many logs were returned in the batch, so decrease the batch size. - fatal(f'Decrease batch size for {chain}') - raise parsed_events = [] - for log_event in log_events: + if future is not None: try: - parsed = event.process_log(log_event) if event is not None else log_event - except (LogTopicError, MismatchedABI) as e: - # log.debug(f'logevent parse error {e}\n{log_event}') # this happens for Swap events from non-Uniswap pools - parsed = NARG # need a placeholder - parsed_events.append(parsed) + log_events = await future if config.parallel_logevent_queries else future + except ValueError as e: + if e.args[0].get('code') == -32602: + # too many logs were returned in the batch, so decrease the batch size. + fatal(f'Decrease batch size for {chain}') + raise + for log_event in log_events: + try: + parsed = event.process_log(log_event) if event is not None else log_event + except (LogTopicError, MismatchedABI) as e: + # log.debug(f'logevent parse error {e}\n{log_event}') # this happens for Swap events from non-Uniswap pools + parsed = NARG # need a placeholder + parsed_events.append(parsed) # todo try/except for known retryable errors await maywait(callback(parsed_events)) diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index 656f233..5228633 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -3,6 +3,7 @@ import logging from asyncio import Queue from typing import Any, Iterable, Callable +from eth_bloom import BloomFilter from web3.exceptions import LogTopicError, MismatchedABI # noinspection PyPackageRequirements from websockets.exceptions import ConnectionClosedError @@ -16,7 +17,7 @@ from dexorder.blockstate.diff import DiffEntryItem from dexorder.database.model import Block from dexorder.database.model.block import current_block, latest_block from dexorder.progressor import BlockProgressor -from dexorder.util import hexstr +from dexorder.util import hexstr, hexint, hexbytes from dexorder.util.async_util import maywait, Maywaitable from dexorder.util.shutdown import fatal @@ -282,16 +283,22 @@ class BlockStateRunner(BlockProgressor): else: # event callbacks are triggered in the order in which they're registered. the events passed to # each callback are in block transaction order + bloom = BloomFilter(int.from_bytes(block.data['logsBloom'])) for callback, event, log_filter in self.events: if log_filter is None: batches.append((None, callback, event, None)) else: - # todo use head['logsBloom'] to skip unnecessary log queries lf = dict(log_filter) lf['blockHash'] = hexstr(block.hash) - get_logs = w3.eth.get_logs(lf) - if not config.parallel_logevent_queries: - get_logs = await get_logs + has_logs = any(bytes(hexbytes(topic)) in bloom for topic in lf['topics']) + # log.debug(f'has {event.__class__.__name__}? {has_logs}') + if not has_logs: + get_logs = None + else: + # log.debug(f'has {event.__class__.__name__}') + get_logs = w3.eth.get_logs(lf) + if not config.parallel_logevent_queries: + get_logs = await get_logs batches.append((get_logs, callback, event, log_filter)) for callback in self.postprocess_cbs: batches.append((None, callback, None, None))