prefetch block timestamps

This commit is contained in:
Tim
2024-02-14 04:29:56 -04:00
parent 993b38dce4
commit a14a26d38d
8 changed files with 95 additions and 38 deletions

View File

@@ -2,11 +2,7 @@ import math
# noinspection PyPackageRequirements # noinspection PyPackageRequirements
from contextvars import ContextVar from contextvars import ContextVar
from async_lru import alru_cache
import dexorder import dexorder
from dexorder import current_w3
from dexorder.util import hexint
class Blockchain: class Blockchain:
@@ -71,9 +67,3 @@ class BlockClock:
current_clock = ContextVar[BlockClock]('clock') # current estimated timestamp of the blockchain. will be different than current_block.get().timestamp when evaluating time triggers in-between blocks or for historical playbacks current_clock = ContextVar[BlockClock]('clock') # current estimated timestamp of the blockchain. will be different than current_block.get().timestamp when evaluating time triggers in-between blocks or for historical playbacks
@alru_cache
async def get_block_timestamp(blockhash) -> int:
response = await current_w3.get().provider.make_request('eth_getBlockByHash', [blockhash, False])
raw = hexint(response['result']['timestamp'])
# noinspection PyTypeChecker
return raw if type(raw) is int else hexint(raw)

View File

@@ -13,9 +13,9 @@ from dexorder.configuration import parse_args
from dexorder.contract import get_contract_event from dexorder.contract import get_contract_event
from dexorder.database import db from dexorder.database import db
from dexorder.database.model import Block from dexorder.database.model import Block
from dexorder.event_handler import handle_uniswap_swap, check_ohlc_rollover from dexorder.event_handler import check_ohlc_rollover, handle_uniswap_swaps
from dexorder.memcache.memcache_state import RedisState, publish_all
from dexorder.memcache import memcache from dexorder.memcache import memcache
from dexorder.memcache.memcache_state import RedisState, publish_all
from dexorder.ohlc import recent_ohlcs, ohlc_save, ohlcs from dexorder.ohlc import recent_ohlcs, ohlc_save, ohlcs
from dexorder.runner import BlockStateRunner from dexorder.runner import BlockStateRunner
from dexorder.util import hexstr from dexorder.util import hexstr
@@ -59,7 +59,7 @@ async def main():
log.info(f'loaded state from db for root block {state.root_block}') log.info(f'loaded state from db for root block {state.root_block}')
runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None, timer_period=0) runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None, timer_period=0)
runner.add_event_trigger(handle_uniswap_swap, get_contract_event('IUniswapV3PoolEvents', 'Swap')) runner.add_event_trigger(handle_uniswap_swaps, get_contract_event('IUniswapV3PoolEvents', 'Swap'), multi=True)
runner.postprocess_cbs.append(check_ohlc_rollover) runner.postprocess_cbs.append(check_ohlc_rollover)
runner.on_promotion.append(finalize_callback) runner.on_promotion.append(finalize_callback)
if db: if db:

View File

@@ -10,11 +10,12 @@ from dexorder.blockstate.db_state import DbState
from dexorder.configuration import parse_args from dexorder.configuration import parse_args
from dexorder.contract import get_contract_event from dexorder.contract import get_contract_event
from dexorder.contract.dexorder import get_factory_contract, get_dexorder_contract from dexorder.contract.dexorder import get_factory_contract, get_dexorder_contract
from dexorder.event_handler import init_order_triggers, init, dump_log, handle_vault_created, handle_order_placed, handle_transfer, handle_uniswap_swap, \ from dexorder.event_handler import init_order_triggers, init, dump_log, handle_vault_created, handle_order_placed, \
handle_swap_filled, handle_order_canceled, handle_order_cancel_all, handle_dexorderexecutions, activate_time_triggers, activate_price_triggers, \ handle_transfer, handle_swap_filled, handle_order_canceled, handle_order_cancel_all, handle_dexorderexecutions, \
process_active_tranches, process_execution_requests, check_ohlc_rollover activate_time_triggers, activate_price_triggers, \
from dexorder.memcache.memcache_state import RedisState, publish_all process_active_tranches, process_execution_requests, check_ohlc_rollover, handle_uniswap_swaps
from dexorder.memcache import memcache from dexorder.memcache import memcache
from dexorder.memcache.memcache_state import RedisState, publish_all
from dexorder.ohlc import ohlc_save from dexorder.ohlc import ohlc_save
from dexorder.runner import BlockStateRunner from dexorder.runner import BlockStateRunner
from dexorder.transaction import handle_transaction_receipts, create_transactions, send_transactions from dexorder.transaction import handle_transaction_receipts, create_transactions, send_transactions
@@ -54,7 +55,7 @@ def setup_logevent_triggers(runner):
runner.add_event_trigger(handle_vault_created, vault_created) runner.add_event_trigger(handle_vault_created, vault_created)
runner.add_event_trigger(handle_order_placed, get_contract_event('OrderLib', 'DexorderSwapPlaced')) runner.add_event_trigger(handle_order_placed, get_contract_event('OrderLib', 'DexorderSwapPlaced'))
runner.add_event_trigger(handle_transfer, get_contract_event('ERC20', 'Transfer')) runner.add_event_trigger(handle_transfer, get_contract_event('ERC20', 'Transfer'))
runner.add_event_trigger(handle_uniswap_swap, get_contract_event('IUniswapV3PoolEvents', 'Swap')) runner.add_event_trigger(handle_uniswap_swaps, get_contract_event('IUniswapV3PoolEvents', 'Swap'), multi=True)
runner.add_event_trigger(handle_swap_filled, get_contract_event('OrderLib', 'DexorderSwapFilled')) runner.add_event_trigger(handle_swap_filled, get_contract_event('OrderLib', 'DexorderSwapFilled'))
runner.add_event_trigger(handle_order_canceled, get_contract_event('OrderLib', 'DexorderSwapCanceled')) runner.add_event_trigger(handle_order_canceled, get_contract_event('OrderLib', 'DexorderSwapCanceled'))
runner.add_event_trigger(handle_order_cancel_all, get_contract_event('OrderLib', 'DexorderCancelAll')) runner.add_event_trigger(handle_order_cancel_all, get_contract_event('OrderLib', 'DexorderCancelAll'))

View File

@@ -1,11 +1,20 @@
import asyncio
import logging
from random import random
from typing import Any, Optional, Union
# noinspection PyPackageRequirements
from aiohttp import ClientResponseError
from eth_typing import URI
from hexbytes import HexBytes from hexbytes import HexBytes
from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider
from web3.types import RPCEndpoint, RPCResponse
from ..base.chain import current_chain
from ..contract import get_contract_data
from .. import current_w3, Blockchain from .. import current_w3, Blockchain
from ..base.chain import current_chain
from ..configuration import resolve_rpc_url from ..configuration import resolve_rpc_url
from ..configuration.resolve import resolve_ws_url from ..configuration.resolve import resolve_ws_url
from ..contract import get_contract_data
async def connect(rpc_url=None): async def connect(rpc_url=None):
@@ -28,7 +37,7 @@ def create_w3(rpc_url=None):
# self.w3iter = itertools.cycle(self.w3s) # self.w3iter = itertools.cycle(self.w3s)
url = resolve_rpc_url(rpc_url) url = resolve_rpc_url(rpc_url)
w3 = AsyncWeb3(AsyncHTTPProvider(url)) w3 = AsyncWeb3(RetryHTTPProvider(url))
# w3.middleware_onion.inject(geth_poa_middleware, layer=0) # todo is this line needed? # w3.middleware_onion.inject(geth_poa_middleware, layer=0) # todo is this line needed?
# w3.middleware_onion.add(simple_cache_middleware) # w3.middleware_onion.add(simple_cache_middleware)
w3.middleware_onion.remove('attrdict') w3.middleware_onion.remove('attrdict')
@@ -97,3 +106,30 @@ def _make_contract(w3_eth):
return w3_eth.contract(address,abi=abi,bytecode=bytecode) return w3_eth.contract(address,abi=abi,bytecode=bytecode)
return f return f
log = logging.getLogger(__name__)
MAX_CONCURRENCY = 6 # todo configure
class RetryHTTPProvider (AsyncHTTPProvider):
def __init__(self, endpoint_uri: Optional[Union[URI, str]] = None, request_kwargs: Optional[Any] = None) -> None:
super().__init__(endpoint_uri, request_kwargs)
self.in_flight = asyncio.Semaphore(MAX_CONCURRENCY)
async def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse:
wait = 0
while True:
try:
async with self.in_flight:
return await super().make_request(method, params)
except ClientResponseError as e:
if e.status != 429:
raise
retry_after = e.headers.get('retry-after', None)
if retry_after is not None:
wait = float(retry_after)
else:
wait += 1 + .125 * random()
log.debug('rate limiting')
await asyncio.sleep(wait)

View File

@@ -3,16 +3,19 @@ import functools
import logging import logging
from uuid import UUID from uuid import UUID
from async_lru import alru_cache
from web3.types import EventData from web3.types import EventData
from dexorder import current_pub, db, dec, from_timestamp, minutely from dexorder import current_pub, db, dec, from_timestamp, minutely, current_w3
from dexorder.base.chain import current_chain, current_clock, get_block_timestamp from dexorder.base.chain import current_chain, current_clock
from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, OrderKey from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, OrderKey
from dexorder.blockstate import current_blockstate
from dexorder.ohlc import ohlcs, recent_ohlcs from dexorder.ohlc import ohlcs, recent_ohlcs
from dexorder.transaction import submit_transaction_request from dexorder.transaction import submit_transaction_request
from dexorder.pools import uniswap_price, new_pool_prices, pool_prices, get_pool from dexorder.pools import uniswap_price, new_pool_prices, pool_prices, get_pool
from dexorder.contract.dexorder import vault_address, VaultContract from dexorder.contract.dexorder import vault_address, VaultContract
from dexorder.contract import ERC20 from dexorder.contract import ERC20
from dexorder.util import hexint
from dexorder.vault_blockdata import vault_owners, vault_balances from dexorder.vault_blockdata import vault_owners, vault_balances
from dexorder.database.model.block import current_block from dexorder.database.model.block import current_block
from dexorder.database.model.transaction import TransactionJob from dexorder.database.model.transaction import TransactionJob
@@ -152,14 +155,13 @@ async def handle_transfer(transfer: EventData):
# log.debug(f'vaults: {list(vaults)}') # log.debug(f'vaults: {list(vaults)}')
async def handle_uniswap_swap_old(swap: EventData): async def handle_uniswap_swaps(swaps: list[EventData]):
try: # asynchronously prefetch the block timestamps we'll need
sqrt_price = swap['args']['sqrtPriceX96'] hashes = set(swap['blockHash'] for swap in swaps)
except KeyError: await asyncio.gather(*[get_block_timestamp(h) for h in hashes])
return # now execute the swaps synchronously
addr = swap['address'] for swap in swaps:
price: dec = await uniswap_price(await get_pool(addr), sqrt_price) await handle_uniswap_swap(swap)
pool_prices[addr] = price
async def handle_uniswap_swap(swap: EventData): async def handle_uniswap_swap(swap: EventData):
@@ -180,6 +182,18 @@ async def handle_uniswap_swap(swap: EventData):
log.debug(f'pool {addr} {minutely(dt)} {price}') log.debug(f'pool {addr} {minutely(dt)} {price}')
# todo is there a better spot for this function?
@alru_cache(maxsize=1024)
async def get_block_timestamp(blockhash) -> int:
block = current_blockstate.get().by_hash.get(blockhash)
if block is not None:
return block.timestamp
response = await current_w3.get().provider.make_request('eth_getBlockByHash', [blockhash, False])
raw = hexint(response['result']['timestamp'])
# noinspection PyTypeChecker
return raw if type(raw) is int else hexint(raw)
def handle_vault_created(created: EventData): def handle_vault_created(created: EventData):
try: try:
owner = created['args']['owner'] owner = created['args']['owner']

View File

@@ -218,6 +218,9 @@ class OHLCRepository:
return json.load(file) return json.load(file)
except FileNotFoundError: except FileNotFoundError:
return [] return []
except:
log.error(f'exception loading chunk {path}')
raise
def save_chunk(self, symbol: str, period: timedelta, chunk: list[OHLC]): def save_chunk(self, symbol: str, period: timedelta, chunk: list[OHLC]):
if not chunk: if not chunk:

View File

@@ -32,11 +32,11 @@ async def load_pool(address: str) -> PoolDict:
v3 = UniswapV3Pool(address) v3 = UniswapV3Pool(address)
t0, t1, fee = await asyncio.gather(v3.token0(), v3.token1(), v3.fee()) t0, t1, fee = await asyncio.gather(v3.token0(), v3.token1(), v3.fee())
if uniswapV3_pool_address(t0, t1, fee) == address: # VALIDATE don't just trust that it's a Uniswap pool if uniswapV3_pool_address(t0, t1, fee) == address: # VALIDATE don't just trust that it's a Uniswap pool
log.debug(f'new UniswapV3 pool at {address}')
token0, token1 = await asyncio.gather(get_token(t0), get_token(t1)) token0, token1 = await asyncio.gather(get_token(t0), get_token(t1))
decimals = token0['decimals'] - token1['decimals'] decimals = token0['decimals'] - token1['decimals']
found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.UniswapV3.value, found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.UniswapV3.value,
base=t0, quote=t1, fee=fee, decimals=decimals) base=t0, quote=t1, fee=fee, decimals=decimals)
log.debug(f'new UniswapV3 pool {token0["symbol"]}\\{token1["symbol"]} .{decimals} {address}')
else: # NOT a genuine Uniswap V3 pool if the address test doesn't pass else: # NOT a genuine Uniswap V3 pool if the address test doesn't pass
log.debug(f'new Unknown pool at {address}') log.debug(f'new Unknown pool at {address}')
except ContractLogicError: except ContractLogicError:

View File

@@ -9,7 +9,7 @@ from web3.types import EventData
# noinspection PyPackageRequirements # noinspection PyPackageRequirements
from websockets.exceptions import ConnectionClosedError from websockets.exceptions import ConnectionClosedError
from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config, NARG
from dexorder.base.chain import current_chain, current_clock, BlockClock from dexorder.base.chain import current_chain, current_clock, BlockClock
from dexorder.base.fork import current_fork, Fork from dexorder.base.fork import current_fork, Fork
from dexorder.blockchain.connection import create_w3_ws, create_w3 from dexorder.blockchain.connection import create_w3_ws, create_w3
@@ -87,13 +87,20 @@ class BlockStateRunner:
self.running = False self.running = False
def add_event_trigger(self, callback: Callable[[EventData], Maywaitable[None]], event: ContractEvents = None, log_filter: Union[dict, str] = None): def add_event_trigger(self,
# callback takes either a single event if multi=False, or if multi=True then a list of all events in the processing range
callback: Union[Callable[[EventData], Maywaitable[None]],
Callable[[list[EventData]], Maywaitable[None]]],
event: ContractEvents = None,
log_filter: Union[dict, str] = None,
*, multi=False):
""" """
if event is None, the callback is still invoked in the series of log handlers but with no argument instead of logs if event is None, the callback is still invoked in the series of log handlers but with no argument instead of logs
""" """
if log_filter is None and event is not None: if log_filter is None and event is not None:
log_filter = {'topics': [topic(event.abi)]} log_filter = {'topics': [topic(event.abi)]}
self.events.append((callback, event, log_filter)) cb = callback if multi else lambda events: map(cb, events)
self.events.append((cb, event, log_filter))
async def run(self): async def run(self):
# this run() process discovers new heads and puts them on a queue for the worker to process. the discovery is ether websockets or polling # this run() process discovers new heads and puts them on a queue for the worker to process. the discovery is ether websockets or polling
@@ -222,6 +229,10 @@ class BlockStateRunner:
self.max_height_seen = height self.max_height_seen = height
start_height += chain.batch_size start_height += chain.batch_size
batch_height += chain.batch_size batch_height += chain.batch_size
if self.queue.qsize() > 2:
await asyncio.sleep(1)
else:
await async_yield()
await self.queue.put(head) # add the head block await self.queue.put(head) # add the head block
self.max_height_seen = head.height self.max_height_seen = head.height
@@ -346,14 +357,16 @@ class BlockStateRunner:
log.warning(f'Decrease batch size for {chain}') log.warning(f'Decrease batch size for {chain}')
return return
raise raise
parsed_events = []
for log_event in log_events: for log_event in log_events:
try: try:
parsed = event.process_log(log_event) if event is not None else log_event parsed = event.process_log(log_event) if event is not None else log_event
except (LogTopicError, MismatchedABI) as e: except (LogTopicError, MismatchedABI) as e:
log.warning(f'logevent parse error {e}\n{log_event}') log.warning(f'logevent parse error {e}\n{log_event}')
else: parsed = NARG
# todo try/except for known retryable errors parsed_events.append(parsed)
await maywait(callback(parsed)) # todo try/except for known retryable errors
await maywait(callback(parsed_events))
# todo # todo
# IMPORTANT! check for a reorg and generate a reorg diff list. the diff list we need is the union of the set of keys touched by either # IMPORTANT! check for a reorg and generate a reorg diff list. the diff list we need is the union of the set of keys touched by either