From a14a26d38d6b36938c7e433e0d2d0e823ae9555c Mon Sep 17 00:00:00 2001 From: Tim Date: Wed, 14 Feb 2024 04:29:56 -0400 Subject: [PATCH] prefetch block timestamps --- src/dexorder/base/chain.py | 10 ------- src/dexorder/bin/backfill_ohlc.py | 6 ++-- src/dexorder/bin/main.py | 11 +++---- src/dexorder/blockchain/connection.py | 42 +++++++++++++++++++++++++-- src/dexorder/event_handler.py | 34 +++++++++++++++------- src/dexorder/ohlc.py | 3 ++ src/dexorder/pools.py | 2 +- src/dexorder/runner.py | 25 ++++++++++++---- 8 files changed, 95 insertions(+), 38 deletions(-) diff --git a/src/dexorder/base/chain.py b/src/dexorder/base/chain.py index 41bbb10..01e3240 100644 --- a/src/dexorder/base/chain.py +++ b/src/dexorder/base/chain.py @@ -2,11 +2,7 @@ import math # noinspection PyPackageRequirements from contextvars import ContextVar -from async_lru import alru_cache - import dexorder -from dexorder import current_w3 -from dexorder.util import hexint class Blockchain: @@ -71,9 +67,3 @@ class BlockClock: current_clock = ContextVar[BlockClock]('clock') # current estimated timestamp of the blockchain. will be different than current_block.get().timestamp when evaluating time triggers in-between blocks or for historical playbacks -@alru_cache -async def get_block_timestamp(blockhash) -> int: - response = await current_w3.get().provider.make_request('eth_getBlockByHash', [blockhash, False]) - raw = hexint(response['result']['timestamp']) - # noinspection PyTypeChecker - return raw if type(raw) is int else hexint(raw) diff --git a/src/dexorder/bin/backfill_ohlc.py b/src/dexorder/bin/backfill_ohlc.py index b6dda70..2d4eccd 100644 --- a/src/dexorder/bin/backfill_ohlc.py +++ b/src/dexorder/bin/backfill_ohlc.py @@ -13,9 +13,9 @@ from dexorder.configuration import parse_args from dexorder.contract import get_contract_event from dexorder.database import db from dexorder.database.model import Block -from dexorder.event_handler import handle_uniswap_swap, check_ohlc_rollover -from dexorder.memcache.memcache_state import RedisState, publish_all +from dexorder.event_handler import check_ohlc_rollover, handle_uniswap_swaps from dexorder.memcache import memcache +from dexorder.memcache.memcache_state import RedisState, publish_all from dexorder.ohlc import recent_ohlcs, ohlc_save, ohlcs from dexorder.runner import BlockStateRunner from dexorder.util import hexstr @@ -59,7 +59,7 @@ async def main(): log.info(f'loaded state from db for root block {state.root_block}') runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None, timer_period=0) - runner.add_event_trigger(handle_uniswap_swap, get_contract_event('IUniswapV3PoolEvents', 'Swap')) + runner.add_event_trigger(handle_uniswap_swaps, get_contract_event('IUniswapV3PoolEvents', 'Swap'), multi=True) runner.postprocess_cbs.append(check_ohlc_rollover) runner.on_promotion.append(finalize_callback) if db: diff --git a/src/dexorder/bin/main.py b/src/dexorder/bin/main.py index 526406a..b7d0ae9 100644 --- a/src/dexorder/bin/main.py +++ b/src/dexorder/bin/main.py @@ -10,11 +10,12 @@ from dexorder.blockstate.db_state import DbState from dexorder.configuration import parse_args from dexorder.contract import get_contract_event from dexorder.contract.dexorder import get_factory_contract, get_dexorder_contract -from dexorder.event_handler import init_order_triggers, init, dump_log, handle_vault_created, handle_order_placed, handle_transfer, handle_uniswap_swap, \ - handle_swap_filled, handle_order_canceled, handle_order_cancel_all, handle_dexorderexecutions, activate_time_triggers, activate_price_triggers, \ - process_active_tranches, process_execution_requests, check_ohlc_rollover -from dexorder.memcache.memcache_state import RedisState, publish_all +from dexorder.event_handler import init_order_triggers, init, dump_log, handle_vault_created, handle_order_placed, \ + handle_transfer, handle_swap_filled, handle_order_canceled, handle_order_cancel_all, handle_dexorderexecutions, \ + activate_time_triggers, activate_price_triggers, \ + process_active_tranches, process_execution_requests, check_ohlc_rollover, handle_uniswap_swaps from dexorder.memcache import memcache +from dexorder.memcache.memcache_state import RedisState, publish_all from dexorder.ohlc import ohlc_save from dexorder.runner import BlockStateRunner from dexorder.transaction import handle_transaction_receipts, create_transactions, send_transactions @@ -54,7 +55,7 @@ def setup_logevent_triggers(runner): runner.add_event_trigger(handle_vault_created, vault_created) runner.add_event_trigger(handle_order_placed, get_contract_event('OrderLib', 'DexorderSwapPlaced')) runner.add_event_trigger(handle_transfer, get_contract_event('ERC20', 'Transfer')) - runner.add_event_trigger(handle_uniswap_swap, get_contract_event('IUniswapV3PoolEvents', 'Swap')) + runner.add_event_trigger(handle_uniswap_swaps, get_contract_event('IUniswapV3PoolEvents', 'Swap'), multi=True) runner.add_event_trigger(handle_swap_filled, get_contract_event('OrderLib', 'DexorderSwapFilled')) runner.add_event_trigger(handle_order_canceled, get_contract_event('OrderLib', 'DexorderSwapCanceled')) runner.add_event_trigger(handle_order_cancel_all, get_contract_event('OrderLib', 'DexorderCancelAll')) diff --git a/src/dexorder/blockchain/connection.py b/src/dexorder/blockchain/connection.py index 571f3ff..ed368e4 100644 --- a/src/dexorder/blockchain/connection.py +++ b/src/dexorder/blockchain/connection.py @@ -1,11 +1,20 @@ +import asyncio +import logging +from random import random +from typing import Any, Optional, Union + +# noinspection PyPackageRequirements +from aiohttp import ClientResponseError +from eth_typing import URI from hexbytes import HexBytes from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider +from web3.types import RPCEndpoint, RPCResponse -from ..base.chain import current_chain -from ..contract import get_contract_data from .. import current_w3, Blockchain +from ..base.chain import current_chain from ..configuration import resolve_rpc_url from ..configuration.resolve import resolve_ws_url +from ..contract import get_contract_data async def connect(rpc_url=None): @@ -28,7 +37,7 @@ def create_w3(rpc_url=None): # self.w3iter = itertools.cycle(self.w3s) url = resolve_rpc_url(rpc_url) - w3 = AsyncWeb3(AsyncHTTPProvider(url)) + w3 = AsyncWeb3(RetryHTTPProvider(url)) # w3.middleware_onion.inject(geth_poa_middleware, layer=0) # todo is this line needed? # w3.middleware_onion.add(simple_cache_middleware) w3.middleware_onion.remove('attrdict') @@ -97,3 +106,30 @@ def _make_contract(w3_eth): return w3_eth.contract(address,abi=abi,bytecode=bytecode) return f + +log = logging.getLogger(__name__) + +MAX_CONCURRENCY = 6 # todo configure + +class RetryHTTPProvider (AsyncHTTPProvider): + + def __init__(self, endpoint_uri: Optional[Union[URI, str]] = None, request_kwargs: Optional[Any] = None) -> None: + super().__init__(endpoint_uri, request_kwargs) + self.in_flight = asyncio.Semaphore(MAX_CONCURRENCY) + + async def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse: + wait = 0 + while True: + try: + async with self.in_flight: + return await super().make_request(method, params) + except ClientResponseError as e: + if e.status != 429: + raise + retry_after = e.headers.get('retry-after', None) + if retry_after is not None: + wait = float(retry_after) + else: + wait += 1 + .125 * random() + log.debug('rate limiting') + await asyncio.sleep(wait) diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index bcc4a09..150bdd8 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -3,16 +3,19 @@ import functools import logging from uuid import UUID +from async_lru import alru_cache from web3.types import EventData -from dexorder import current_pub, db, dec, from_timestamp, minutely -from dexorder.base.chain import current_chain, current_clock, get_block_timestamp +from dexorder import current_pub, db, dec, from_timestamp, minutely, current_w3 +from dexorder.base.chain import current_chain, current_clock from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, OrderKey +from dexorder.blockstate import current_blockstate from dexorder.ohlc import ohlcs, recent_ohlcs from dexorder.transaction import submit_transaction_request from dexorder.pools import uniswap_price, new_pool_prices, pool_prices, get_pool from dexorder.contract.dexorder import vault_address, VaultContract from dexorder.contract import ERC20 +from dexorder.util import hexint from dexorder.vault_blockdata import vault_owners, vault_balances from dexorder.database.model.block import current_block from dexorder.database.model.transaction import TransactionJob @@ -152,14 +155,13 @@ async def handle_transfer(transfer: EventData): # log.debug(f'vaults: {list(vaults)}') -async def handle_uniswap_swap_old(swap: EventData): - try: - sqrt_price = swap['args']['sqrtPriceX96'] - except KeyError: - return - addr = swap['address'] - price: dec = await uniswap_price(await get_pool(addr), sqrt_price) - pool_prices[addr] = price +async def handle_uniswap_swaps(swaps: list[EventData]): + # asynchronously prefetch the block timestamps we'll need + hashes = set(swap['blockHash'] for swap in swaps) + await asyncio.gather(*[get_block_timestamp(h) for h in hashes]) + # now execute the swaps synchronously + for swap in swaps: + await handle_uniswap_swap(swap) async def handle_uniswap_swap(swap: EventData): @@ -180,6 +182,18 @@ async def handle_uniswap_swap(swap: EventData): log.debug(f'pool {addr} {minutely(dt)} {price}') +# todo is there a better spot for this function? +@alru_cache(maxsize=1024) +async def get_block_timestamp(blockhash) -> int: + block = current_blockstate.get().by_hash.get(blockhash) + if block is not None: + return block.timestamp + response = await current_w3.get().provider.make_request('eth_getBlockByHash', [blockhash, False]) + raw = hexint(response['result']['timestamp']) + # noinspection PyTypeChecker + return raw if type(raw) is int else hexint(raw) + + def handle_vault_created(created: EventData): try: owner = created['args']['owner'] diff --git a/src/dexorder/ohlc.py b/src/dexorder/ohlc.py index 3b8553c..f7f088a 100644 --- a/src/dexorder/ohlc.py +++ b/src/dexorder/ohlc.py @@ -218,6 +218,9 @@ class OHLCRepository: return json.load(file) except FileNotFoundError: return [] + except: + log.error(f'exception loading chunk {path}') + raise def save_chunk(self, symbol: str, period: timedelta, chunk: list[OHLC]): if not chunk: diff --git a/src/dexorder/pools.py b/src/dexorder/pools.py index e5b8f7a..767f5fd 100644 --- a/src/dexorder/pools.py +++ b/src/dexorder/pools.py @@ -32,11 +32,11 @@ async def load_pool(address: str) -> PoolDict: v3 = UniswapV3Pool(address) t0, t1, fee = await asyncio.gather(v3.token0(), v3.token1(), v3.fee()) if uniswapV3_pool_address(t0, t1, fee) == address: # VALIDATE don't just trust that it's a Uniswap pool - log.debug(f'new UniswapV3 pool at {address}') token0, token1 = await asyncio.gather(get_token(t0), get_token(t1)) decimals = token0['decimals'] - token1['decimals'] found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.UniswapV3.value, base=t0, quote=t1, fee=fee, decimals=decimals) + log.debug(f'new UniswapV3 pool {token0["symbol"]}\\{token1["symbol"]} .{decimals} {address}') else: # NOT a genuine Uniswap V3 pool if the address test doesn't pass log.debug(f'new Unknown pool at {address}') except ContractLogicError: diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index fb4d34c..acd1a34 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -9,7 +9,7 @@ from web3.types import EventData # noinspection PyPackageRequirements from websockets.exceptions import ConnectionClosedError -from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config +from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config, NARG from dexorder.base.chain import current_chain, current_clock, BlockClock from dexorder.base.fork import current_fork, Fork from dexorder.blockchain.connection import create_w3_ws, create_w3 @@ -87,13 +87,20 @@ class BlockStateRunner: self.running = False - def add_event_trigger(self, callback: Callable[[EventData], Maywaitable[None]], event: ContractEvents = None, log_filter: Union[dict, str] = None): + def add_event_trigger(self, + # callback takes either a single event if multi=False, or if multi=True then a list of all events in the processing range + callback: Union[Callable[[EventData], Maywaitable[None]], + Callable[[list[EventData]], Maywaitable[None]]], + event: ContractEvents = None, + log_filter: Union[dict, str] = None, + *, multi=False): """ if event is None, the callback is still invoked in the series of log handlers but with no argument instead of logs """ if log_filter is None and event is not None: log_filter = {'topics': [topic(event.abi)]} - self.events.append((callback, event, log_filter)) + cb = callback if multi else lambda events: map(cb, events) + self.events.append((cb, event, log_filter)) async def run(self): # this run() process discovers new heads and puts them on a queue for the worker to process. the discovery is ether websockets or polling @@ -222,6 +229,10 @@ class BlockStateRunner: self.max_height_seen = height start_height += chain.batch_size batch_height += chain.batch_size + if self.queue.qsize() > 2: + await asyncio.sleep(1) + else: + await async_yield() await self.queue.put(head) # add the head block self.max_height_seen = head.height @@ -346,14 +357,16 @@ class BlockStateRunner: log.warning(f'Decrease batch size for {chain}') return raise + parsed_events = [] for log_event in log_events: try: parsed = event.process_log(log_event) if event is not None else log_event except (LogTopicError, MismatchedABI) as e: log.warning(f'logevent parse error {e}\n{log_event}') - else: - # todo try/except for known retryable errors - await maywait(callback(parsed)) + parsed = NARG + parsed_events.append(parsed) + # todo try/except for known retryable errors + await maywait(callback(parsed_events)) # todo # IMPORTANT! check for a reorg and generate a reorg diff list. the diff list we need is the union of the set of keys touched by either