runner refactor to support walker

This commit is contained in:
Tim
2024-02-17 20:25:14 -04:00
parent ea56487002
commit 39fe81a49d
9 changed files with 304 additions and 114 deletions

View File

@@ -28,8 +28,8 @@ class Blockchain:
""" """
self.chain_id = chain_id self.chain_id = chain_id
self.name = name self.name = name
self.confirms = confirms # todo configure self.confirms = confirms
self.batch_size = batch_size # todo configure self.batch_size = batch_size
Blockchain._instances_by_id[chain_id] = self Blockchain._instances_by_id[chain_id] = self
Blockchain._instances_by_name[name] = self Blockchain._instances_by_name[name] = self
@@ -47,7 +47,7 @@ Goerli = Blockchain(5, 'Goerli')
Polygon = Blockchain(137, 'Polygon') # POS not zkEVM Polygon = Blockchain(137, 'Polygon') # POS not zkEVM
Mumbai = Blockchain(80001, 'Mumbai') Mumbai = Blockchain(80001, 'Mumbai')
BSC = Blockchain(56, 'BSC') BSC = Blockchain(56, 'BSC')
Arbitrum = Blockchain(42161, 'Arbitrum', 3, batch_size=2000) # todo configure batch size Arbitrum = Blockchain(42161, 'Arbitrum', 3, batch_size=2000)
Mock = Blockchain(31337, 'Mock', 3, batch_size=10000) Mock = Blockchain(31337, 'Mock', 3, batch_size=10000)
Alpha = Blockchain(1337, 'Dexorder Alpha', 3, batch_size=1000) Alpha = Blockchain(1337, 'Dexorder Alpha', 3, batch_size=1000)

View File

@@ -0,0 +1,46 @@
import asyncio
import logging
from datetime import datetime
from web3.types import EventData
from dexorder import dec
from dexorder.bin.executable import execute
from dexorder.blocktime import get_block_timestamp
from dexorder.contract import get_contract_event
from dexorder.database.model.pool import PoolDict
from dexorder.pools import get_uniswap_data
from dexorder.walker import BlockWalker
log = logging.getLogger(__name__)
async def handle_backfill_uniswap_swaps(swaps: list[EventData]):
# asynchronously prefetch the block timestamps we'll need
hashes = set(swap['blockHash'] for swap in swaps)
await asyncio.gather(*[get_block_timestamp(h) for h in hashes])
# now execute the swaps synchronously
for swap in swaps:
pool, time, price = await get_uniswap_data(swap)
await handle_uniswap_swap(pool, time, price)
async def handle_uniswap_swap(pool: PoolDict, time: datetime, price: dec):
pass
def finalize_callback(block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]):
# start = now()
log.info("finalizing OHLC's")
ohlc_save(block, diffs)
# log.info(f'\ttook {(now() - start).total_seconds():.1f} seconds')
log.info(f'backfill completed through block {block.height} {from_timestamp(block.timestamp):%Y-%m-%d %H:%M:%S} {hexstr(block.hash)}')
async def main():
walker = BlockWalker()
walker.add_event_trigger(handle_backfill_uniswap_swaps,
get_contract_event('IUniswapV3PoolEvents', 'Swap'), multi=True)
await walker.run()
if __name__ == '__main__':
execute(main())

20
src/dexorder/blocktime.py Normal file
View File

@@ -0,0 +1,20 @@
import logging
from async_lru import alru_cache
from dexorder import current_w3
from dexorder.blockstate import current_blockstate
from dexorder.util import hexint
log = logging.getLogger(__name__)
@alru_cache(maxsize=1024)
async def get_block_timestamp(blockhash) -> int:
block = current_blockstate.get().by_hash.get(blockhash)
if block is not None:
return block.timestamp
response = await current_w3.get().provider.make_request('eth_getBlockByHash', [blockhash, False])
raw = hexint(response['result']['timestamp'])
# noinspection PyTypeChecker
return raw if type(raw) is int else hexint(raw)

View File

@@ -9,13 +9,15 @@ from typing import Optional
@dataclass @dataclass
class Config: class Config:
confirms: Optional[int] = None # number of blocks before data is considered finalized. if None then the chain's default setting is used
batch_size: Optional[int] = None # max number of blocks to query in a single backfill rpc request
rpc_url: str = 'http://localhost:8545' rpc_url: str = 'http://localhost:8545'
ws_url: Optional[str] = 'ws://localhost:8545' ws_url: Optional[str] = 'ws://localhost:8545'
rpc_urls: Optional[dict[str,str]] = field(default_factory=dict) rpc_urls: Optional[dict[str,str]] = field(default_factory=dict)
db_url: Optional[str] = 'postgresql://dexorder:redroxed@localhost/dexorder' db_url: Optional[str] = 'postgresql://dexorder:redroxed@localhost/dexorder'
ohlc_dir: Optional[str] = None # if empty string or None, then OHLC's are not saved to disk
dump_sql: bool = False dump_sql: bool = False
redis_url: Optional[str] = 'redis://localhost:6379' redis_url: Optional[str] = 'redis://localhost:6379'
ohlc_dir: Optional[str] = None # if empty string or None, then OHLC's are not saved to disk
parallel_logevent_queries: bool = True parallel_logevent_queries: bool = True
polling: float = 0 # seconds between queries for a new block. 0 disables polling and uses a websocket subscription on ws_url instead polling: float = 0 # seconds between queries for a new block. 0 disables polling and uses a websocket subscription on ws_url instead

View File

@@ -3,23 +3,21 @@ import functools
import logging import logging
from uuid import UUID from uuid import UUID
from async_lru import alru_cache
from web3.types import EventData from web3.types import EventData
from dexorder import current_pub, db, dec, from_timestamp, minutely, current_w3 from dexorder import current_pub, db, from_timestamp, minutely
from dexorder.base.chain import current_chain, current_clock from dexorder.base.chain import current_chain, current_clock
from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, OrderKey from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, OrderKey
from dexorder.blockstate import current_blockstate from dexorder.blocktime import get_block_timestamp
from dexorder.ohlc import ohlcs, recent_ohlcs from dexorder.ohlc import ohlcs, recent_ohlcs
from dexorder.transaction import submit_transaction_request from dexorder.transaction import submit_transaction_request
from dexorder.pools import uniswap_price, new_pool_prices, pool_prices, get_pool from dexorder.pools import new_pool_prices, pool_prices, get_uniswap_data
from dexorder.contract.dexorder import vault_address, VaultContract from dexorder.contract.dexorder import vault_address, VaultContract
from dexorder.contract import ERC20 from dexorder.contract import ERC20
from dexorder.util import hexint
from dexorder.vault_blockdata import vault_owners, vault_balances from dexorder.vault_blockdata import vault_owners, vault_balances
from dexorder.database.model.block import current_block from dexorder.database.model.block import current_block
from dexorder.database.model.transaction import TransactionJob from dexorder.database.model.transaction import TransactionJob
from dexorder.base.orderlib import SwapOrderState, Exchange from dexorder.base.orderlib import SwapOrderState
from dexorder.order.orderstate import Order from dexorder.order.orderstate import Order
from dexorder.order.triggers import OrderTriggers, price_triggers, time_triggers, \ from dexorder.order.triggers import OrderTriggers, price_triggers, time_triggers, \
unconstrained_price_triggers, execution_requests, inflight_execution_requests, TrancheStatus, active_tranches, new_price_triggers, activate_order unconstrained_price_triggers, execution_requests, inflight_execution_requests, TrancheStatus, active_tranches, new_price_triggers, activate_order
@@ -165,33 +163,14 @@ async def handle_uniswap_swaps(swaps: list[EventData]):
async def handle_uniswap_swap(swap: EventData): async def handle_uniswap_swap(swap: EventData):
try: data = await get_uniswap_data(swap)
sqrt_price = swap['args']['sqrtPriceX96'] if data is None:
except KeyError:
return return
addr = swap['address'] pool, time, price = data
pool = await get_pool(addr) addr = pool['address']
if pool['exchange'] != Exchange.UniswapV3.value:
log.debug(f'Ignoring {Exchange(pool["exchange"])} pool {addr}')
return
price: dec = await uniswap_price(pool, sqrt_price)
timestamp = await get_block_timestamp(swap['blockHash'])
dt = from_timestamp(timestamp)
pool_prices[addr] = price pool_prices[addr] = price
ohlcs.update_all(addr, dt, price) ohlcs.update_all(addr, time, price)
log.debug(f'pool {addr} {minutely(dt)} {price}') log.debug(f'pool {addr} {minutely(time)} {price}')
# todo is there a better spot for this function?
@alru_cache(maxsize=1024)
async def get_block_timestamp(blockhash) -> int:
block = current_blockstate.get().by_hash.get(blockhash)
if block is not None:
return block.timestamp
response = await current_w3.get().provider.make_request('eth_getBlockByHash', [blockhash, False])
raw = hexint(response['result']['timestamp'])
# noinspection PyTypeChecker
return raw if type(raw) is int else hexint(raw)
def handle_vault_created(created: EventData): def handle_vault_created(created: EventData):

View File

@@ -2,16 +2,18 @@ import asyncio
import logging import logging
from web3.exceptions import ContractLogicError from web3.exceptions import ContractLogicError
from web3.types import EventData
from dexorder import dec, ADDRESS_0 from dexorder import dec, ADDRESS_0, from_timestamp
from dexorder.addrmeta import address_metadata from dexorder.addrmeta import address_metadata
from dexorder.base.chain import current_chain from dexorder.base.chain import current_chain
from dexorder.base.orderlib import Exchange from dexorder.base.orderlib import Exchange
from dexorder.blockstate import BlockDict from dexorder.blockstate import BlockDict
from dexorder.blockstate.blockdata import K, V from dexorder.blockstate.blockdata import K, V
from dexorder.blocktime import get_block_timestamp
from dexorder.database.model.pool import PoolDict from dexorder.database.model.pool import PoolDict
from dexorder.tokens import get_token from dexorder.tokens import get_token
from dexorder.uniswap import UniswapV3Pool, uniswapV3_pool_address from dexorder.uniswap import UniswapV3Pool, uniswapV3_pool_address, log
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -86,3 +88,19 @@ async def ensure_pool_price(pool: PoolDict):
else: else:
raise ValueError(f'Unsupported exchange type {pool["exchange"]}') raise ValueError(f'Unsupported exchange type {pool["exchange"]}')
# todo other exchanges # todo other exchanges
async def get_uniswap_data(swap: EventData):
try:
sqrt_price = swap['args']['sqrtPriceX96']
except KeyError:
return None
addr = swap['address']
pool = await get_pool(addr)
if pool['exchange'] != Exchange.UniswapV3.value:
log.debug(f'Ignoring {Exchange(pool["exchange"])} pool {addr}')
return None
price: dec = await uniswap_price(pool, sqrt_price)
timestamp = await get_block_timestamp(swap['blockHash'])
dt = from_timestamp(timestamp)
return pool, dt, price

View File

@@ -0,0 +1,98 @@
import logging
from abc import ABCMeta, abstractmethod
from typing import Callable, Union
from web3.contract.contract import ContractEvents
from web3.exceptions import LogTopicError, MismatchedABI
from web3.types import EventData
from dexorder import config, current_w3, NARG
from dexorder.base.chain import current_chain
from dexorder.util import topic
from dexorder.util.async_util import Maywaitable, maywait
from dexorder.util.shutdown import fatal
log = logging.getLogger(__name__)
class BlockProgressor(metaclass=ABCMeta):
"""
Base class for BlockStateRunner and BlockWalker holds the common standard callback machinery and also the
height of the latest processed block
"""
def __init__(self):
# items are (callback, event, log_filter). The callback is invoked with web3 EventData for every detected event
self.events:list[tuple[Callable[[EventData],Maywaitable[None]],ContractEvents,dict]] = []
# these callbacks are invoked after every block and also every second if there wasnt a block
self.postprocess_cbs:list[Callable[[],Maywaitable[None]]] = []
def add_event_trigger(self,
# callback takes either a single event if multi=False, or if multi=True then a list of all events in the processing range
callback: Union[Callable[[EventData], Maywaitable[None]],
Callable[[list[EventData]], Maywaitable[None]]],
event: ContractEvents = None,
log_filter: Union[dict, str] = None,
*, multi=False):
"""
if event is None, the callback is still invoked in the series of log handlers but with no argument instead of logs
"""
if log_filter is None and event is not None:
log_filter = {'topics': [topic(event.abi)]}
cb = callback if multi else lambda events: map(cb, events)
self.events.append((cb, event, log_filter))
@abstractmethod
def run(self):
pass
async def get_backfill_batches(self, from_height, to_height, w3=None):
if w3 is None:
w3 = current_w3.get()
batches = []
for callback, event, log_filter in self.events:
if log_filter is None:
batches.append((None, callback, event, None))
else:
lf = dict(log_filter)
lf['fromBlock'] = from_height
lf['toBlock'] = to_height
log.debug(f'querying backfill {from_height} through {to_height}')
get_logs = w3.eth.get_logs(lf)
if not config.parallel_logevent_queries:
get_logs = await get_logs
batches.append((get_logs, callback, event, lf))
for callback in self.postprocess_cbs:
batches.append((None, callback, None, None))
return batches
@staticmethod
async def invoke_callbacks(batches, chain=None):
if chain is None:
chain = current_chain.get()
# logevent callbacks
while batches:
# we remove entries as we process them, so the exception handler doesn't re-await the callbacks
batch = batches.pop(0)
future, callback, event, filter_args = batch
if future is None:
await maywait(callback()) # non-log callback
else:
try:
log_events = await future if config.parallel_logevent_queries else future
except ValueError as e:
if e.args[0].get('code') == -32602:
# too many logs were returned in the batch, so decrease the batch size.
fatal(f'Decrease batch size for {chain}')
raise
parsed_events = []
for log_event in log_events:
try:
parsed = event.process_log(log_event) if event is not None else log_event
except (LogTopicError, MismatchedABI) as e:
log.warning(f'logevent parse error {e}\n{log_event}')
parsed = NARG # need a placeholder
parsed_events.append(parsed)
# todo try/except for known retryable errors
await maywait(callback(parsed_events))

View File

@@ -1,11 +1,9 @@
import asyncio import asyncio
import logging import logging
from asyncio import Queue from asyncio import Queue
from typing import Union, Any, Iterable, Callable from typing import Any, Iterable, Callable
from web3.contract.contract import ContractEvents
from web3.exceptions import LogTopicError, MismatchedABI from web3.exceptions import LogTopicError, MismatchedABI
from web3.types import EventData
# noinspection PyPackageRequirements # noinspection PyPackageRequirements
from websockets.exceptions import ConnectionClosedError from websockets.exceptions import ConnectionClosedError
@@ -17,15 +15,20 @@ from dexorder.blockstate import BlockState, current_blockstate
from dexorder.blockstate.diff import DiffEntryItem from dexorder.blockstate.diff import DiffEntryItem
from dexorder.database.model import Block from dexorder.database.model import Block
from dexorder.database.model.block import current_block, latest_block from dexorder.database.model.block import current_block, latest_block
from dexorder.util import hexstr, topic from dexorder.progressor import BlockProgressor
from dexorder.util import hexstr
from dexorder.util.async_util import maywait, Maywaitable from dexorder.util.async_util import maywait, Maywaitable
from dexorder.util.shutdown import fatal
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class Retry (Exception): ... class Retry (Exception): ...
# todo detect reorgs and generate correct onHeadUpdate set by unioning the changes along the two forks, not including their common ancestor deltas # todo detect reorgs and generate correct onHeadUpdate set by unioning the changes along the two forks, not including their common ancestor deltas
class BlockStateRunner:
class BlockStateRunner(BlockProgressor):
""" """
@@ -59,14 +62,9 @@ class BlockStateRunner:
""" """
If state is None, then it is initialized as empty using the first block seen as the root block. Then the second block begins log event handling. If state is None, then it is initialized as empty using the first block seen as the root block. Then the second block begins log event handling.
""" """
super().__init__()
self.state = state self.state = state
# items are (callback, event, log_filter). The callback is invoked with web3 EventData for every detected event
self.events:list[tuple[Callable[[EventData],Maywaitable[None]],ContractEvents,dict]] = []
# these callbacks are invoked after every block and also every second if there wasnt a block
self.postprocess_cbs:list[Callable[[],Maywaitable[None]]] = []
# onStateInit callbacks are invoked after the initial state is loaded or created # onStateInit callbacks are invoked after the initial state is loaded or created
self.on_state_init: list[Callable[[],Maywaitable[None]]] = [] self.on_state_init: list[Callable[[],Maywaitable[None]]] = []
self.state_initialized = False self.state_initialized = False
@@ -86,22 +84,6 @@ class BlockStateRunner:
self.running = False self.running = False
def add_event_trigger(self,
# callback takes either a single event if multi=False, or if multi=True then a list of all events in the processing range
callback: Union[Callable[[EventData], Maywaitable[None]],
Callable[[list[EventData]], Maywaitable[None]]],
event: ContractEvents = None,
log_filter: Union[dict, str] = None,
*, multi=False):
"""
if event is None, the callback is still invoked in the series of log handlers but with no argument instead of logs
"""
if log_filter is None and event is not None:
log_filter = {'topics': [topic(event.abi)]}
cb = callback if multi else lambda events: map(cb, events)
self.events.append((cb, event, log_filter))
async def run(self): async def run(self):
# this run() process discovers new heads and puts them on a queue for the worker to process. the discovery is ether websockets or polling # this run() process discovers new heads and puts them on a queue for the worker to process. the discovery is ether websockets or polling
if self.state: if self.state:
@@ -214,7 +196,8 @@ class BlockStateRunner:
if self.state or config.backfill: if self.state or config.backfill:
# backfill batches # backfill batches
start_height = self.max_height_seen start_height = self.max_height_seen
batch_height = start_height + chain.batch_size - 1 batch_size = config.batch_size if config.batch_size is not None else chain.batch_size
batch_height = start_height + batch_size - 1
while batch_height < head.height: while batch_height < head.height:
# the backfill is larger than a single batch, so we push intermediate head blocks onto the queue # the backfill is larger than a single batch, so we push intermediate head blocks onto the queue
response = await w3.provider.make_request('eth_getBlockByNumber', [hex(batch_height), False]) response = await w3.provider.make_request('eth_getBlockByNumber', [hex(batch_height), False])
@@ -291,25 +274,11 @@ class BlockStateRunner:
if fork is None: if fork is None:
log.debug(f'discarded late-arriving head {block}') log.debug(f'discarded late-arriving head {block}')
else: else:
batches = [] batches: list
from_height = fork.parent.height
to_height = fork.height
if fork.disjoint: if fork.disjoint:
# backfill batches batches = await self.get_backfill_batches(from_height, to_height, w3)
for callback, event, log_filter in self.events:
if log_filter is None:
batches.append((None, callback, event, None))
else:
from_height = fork.parent.height
to_height = fork.height
lf = dict(log_filter)
lf['fromBlock'] = from_height
lf['toBlock'] = to_height
log.debug(f'querying backfill {from_height} through {to_height}')
get_logs = w3.eth.get_logs(lf)
if not config.parallel_logevent_queries:
get_logs = await get_logs
batches.append((get_logs, callback, event, lf))
for callback in self.postprocess_cbs:
batches.append((None, callback, None, None))
else: else:
# event callbacks are triggered in the order in which they're registered. the events passed to # event callbacks are triggered in the order in which they're registered. the events passed to
# each callback are in block transaction order # each callback are in block transaction order
@@ -337,36 +306,7 @@ class BlockStateRunner:
current_pub.set(lambda room, evnt, *args: pubs.append((room, evnt, args))) # used by handle_vault_created current_pub.set(lambda room, evnt, *args: pubs.append((room, evnt, args))) # used by handle_vault_created
if not self.state_initialized: if not self.state_initialized:
await self.do_state_init_cbs() await self.do_state_init_cbs()
# logevent callbacks await self.invoke_callbacks(batches)
while True:
try:
# we remove entries as we process them, so the exception handler doesn't re-await the callbacks
batch = batches.pop(0)
except IndexError:
break
future, callback, event, filter_args = batch
if future is None:
await maywait(callback()) # non-log callback
else:
try:
log_events = await future if config.parallel_logevent_queries else future
except ValueError as e:
if e.args[0].get('code') == -32602:
# too many logs were returned in the batch, so decrease the batch size.
# fatal(f'Decrease batch size for {chain}')
log.warning(f'Decrease batch size for {chain}')
return
raise
parsed_events = []
for log_event in log_events:
try:
parsed = event.process_log(log_event) if event is not None else log_event
except (LogTopicError, MismatchedABI) as e:
log.warning(f'logevent parse error {e}\n{log_event}')
parsed = NARG
parsed_events.append(parsed)
# todo try/except for known retryable errors
await maywait(callback(parsed_events))
# todo # todo
# IMPORTANT! check for a reorg and generate a reorg diff list. the diff list we need is the union of the set of keys touched by either # IMPORTANT! check for a reorg and generate a reorg diff list. the diff list we need is the union of the set of keys touched by either
@@ -378,7 +318,8 @@ class BlockStateRunner:
await maywait(callback(block, diff_items)) await maywait(callback(block, diff_items))
# check for root promotion # check for root promotion
promotion_height = latest_block.get().height - chain.confirms confirm_offset = (config.confirms if config.confirms is not None else chain.confirms) - 1
promotion_height = latest_block.get().height - confirm_offset
new_root_fork = None new_root_fork = None
if fork.disjoint: if fork.disjoint:
# individually check the fork's head and ancestor # individually check the fork's head and ancestor

86
src/dexorder/walker.py Normal file
View File

@@ -0,0 +1,86 @@
import asyncio
import logging
from asyncio import Queue
from websockets import ConnectionClosedError
from dexorder import Blockchain, config, db
from dexorder.base.chain import current_chain
from dexorder.blockchain.connection import create_w3
from dexorder.progressor import BlockProgressor
log = logging.getLogger(__name__)
class BlockWalker (BlockProgressor):
"""
Walker is similar to the Runner in that it progresses through the blockchain, but unlike the Runner, the Walker only
processes blocks after they have reached finalization age. It does not create or maintain any BlockState.
"""
def __init__(self):
super().__init__()
self.queue: Queue = Queue()
self.running = False
async def run(self):
self.running = True
db.connect()
w3 = create_w3()
chain_id = await w3.eth.chain_id
chain = Blockchain.for_id(chain_id)
current_chain.set(chain)
confirm_offset = (config.confirms if config.confirms is not None else chain.confirms) - 1
batch_size = config.batch_size if config.batch_size is not None else chain.batch_size
kv_key = f'walker_height|{chain_id}'
processed_height = db.kv.get(kv_key, config.backfill)
prev_height = None
while self.running:
try:
block = await w3.eth.get_block('latest')
latest_height = block['number']
if latest_height > prev_height:
prev_height = latest_height
log.debug(f'polled new block {latest_height}')
promotion_height = latest_height - confirm_offset
while processed_height <= promotion_height:
stop = min(promotion_height, processed_height+batch_size-1)
await self.handle(processed_height, stop, chain)
processed_height = db.kv[kv_key] = stop
if not self.running:
break
await asyncio.sleep(config.polling)
except (ConnectionClosedError, TimeoutError, asyncio.TimeoutError) as e:
log.debug(f'walker timeout {e}')
finally:
# noinspection PyBroadException
try:
# noinspection PyUnresolvedReferences
await w3.provider.disconnect()
except Exception:
pass
async def handle(self, from_height, to_height, chain=None):
if chain is None:
chain = current_chain.get()
session = None
try:
batches = await self.get_backfill_batches(from_height, to_height)
session = db.session
session.begin()
await self.invoke_callbacks(batches, chain)
except: # legitimately catch EVERYTHING because we re-raise
log.debug('rolling back session')
if session is not None:
session.rollback()
raise
else:
if session is not None:
session.commit()
log.info(f'completed through block {to_height}')
finally:
if session is not None:
session.close()