Merge branch 'master' into fees

This commit is contained in:
7400
2024-01-30 21:02:57 -08:00
22 changed files with 213 additions and 113 deletions

View File

@@ -11,7 +11,7 @@ def now():
return datetime.now(timezone.utc)
def timestamp():
return datetime.now().timestamp()
return int(datetime.now().timestamp())
def from_timestamp(ts):
return datetime.fromtimestamp(ts, timezone.utc)

View File

@@ -51,9 +51,9 @@ Goerli = Blockchain(5, 'Goerli')
Polygon = Blockchain(137, 'Polygon') # POS not zkEVM
Mumbai = Blockchain(80001, 'Mumbai')
BSC = Blockchain(56, 'BSC')
Arbitrum = Blockchain(42161, 'Arbitrum', 3, batch_size=1000) # todo configure batch size... does it depend on log count? :(
Arbitrum = Blockchain(42161, 'Arbitrum', 3, batch_size=2000) # todo configure batch size
Mock = Blockchain(31337, 'Mock', 3, batch_size=10000)
Alpha = Blockchain(1337, 'Dexorder Alpha', 3, batch_size=100)
Alpha = Blockchain(1337, 'Dexorder Alpha', 3, batch_size=1000)
current_chain = ContextVar[Blockchain]('current_chain')

View File

@@ -41,7 +41,8 @@ class Fork:
def for_height(self, height):
""" returns a new Fork object for an older block along this fork. used for root promotion. """
assert( height <= self.height )
if height > self.height :
raise ValueError
if height <= self.height - len(self.ancestry):
return None
return Fork(self.ancestry[self.height-height:], height=height)

View File

@@ -1,28 +1,45 @@
import logging
import sys
from asyncio import CancelledError
from typing import Union, Reversible
from dexorder import blockchain, config
from dexorder.base.ohlc import recent_ohlcs
from dexorder import blockchain, config, from_timestamp, now
from dexorder.bin.executable import execute
from dexorder.blockstate import DiffItem
from dexorder.blockstate.blockdata import BlockData
from dexorder.blockstate.db_state import DbState
from dexorder.blockstate.diff import DiffEntryItem
from dexorder.configuration import parse_args
from dexorder.contract import get_contract_event
from dexorder.database import db
from dexorder.event_handler import handle_uniswap_swap
from dexorder.database.model import Block
from dexorder.event_handler import handle_uniswap_swap, check_ohlc_rollover
from dexorder.memcache.memcache_state import RedisState, publish_all
from dexorder.memcache import memcache
from dexorder.ohlc import recent_ohlcs, ohlc_save, ohlcs
from dexorder.runner import BlockStateRunner
from dexorder.util import hexstr
log = logging.getLogger('dexorder')
def finalize_callback(block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]):
start = now()
log.info("finalizing OHLC's...")
ohlc_save(block, diffs)
log.info(f'\ttook {(now() - start).total_seconds():.1f} seconds')
log.info(f'backfill completed through block {block.height} {from_timestamp(block.timestamp):%Y-%m-%d %H:%M:%S} {hexstr(block.hash)}')
# noinspection DuplicatedCode
async def main():
# noinspection DuplicatedCode
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
log.setLevel(logging.DEBUG)
parse_args()
if not config.ohlc_dir:
config.ohlc_dir = './ohlc'
ohlcs.dir = config.ohlc_dir
await blockchain.connect()
redis_state = None
state = None
@@ -31,7 +48,6 @@ async def main():
redis_state = RedisState([recent_ohlcs]) # NOTE: ONLY the ohlc's are pushed to Redis. We do not want to touch anything else.
if db:
db.connect(url=config.datadb_url) # our main database is the data db
# noinspection DuplicatedCode
db_state = DbState(BlockData.by_opt('db'))
with db.session:
state = db_state.load()
@@ -43,14 +59,13 @@ async def main():
log.info(f'loaded state from db for root block {state.root_block}')
runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None, timer_period=0)
# noinspection PyTypeChecker
runner.add_event_trigger(handle_uniswap_swap, get_contract_event('IUniswapV3PoolEvents', 'Swap'))
runner.postprocess_cbs.append(check_ohlc_rollover)
runner.on_promotion.append(finalize_callback)
if db:
# noinspection PyUnboundLocalVariable,PyTypeChecker
# noinspection PyUnboundLocalVariable
runner.on_promotion.append(db_state.save)
if redis_state:
# noinspection PyTypeChecker
runner.on_head_update.append(redis_state.save)
try:

View File

@@ -2,7 +2,7 @@ import logging
import sys
from asyncio import CancelledError
from dexorder import db, blockchain
from dexorder import db, blockchain, config
from dexorder.base.chain import current_chain
from dexorder.bin.executable import execute
from dexorder.blockstate.blockdata import BlockData
@@ -15,11 +15,12 @@ from dexorder.event_handler import init_order_triggers, init, dump_log, handle_v
process_active_tranches, process_execution_requests, check_ohlc_rollover
from dexorder.memcache.memcache_state import RedisState, publish_all
from dexorder.memcache import memcache
from dexorder.ohlc import ohlc_save
from dexorder.runner import BlockStateRunner
from dexorder.transaction import handle_transaction_receipts, create_transactions, send_transactions
log = logging.getLogger('dexorder')
LOG_ALL_EVENTS = True # for debug todo config
LOG_ALL_EVENTS = False # for debug todo config
#
@@ -70,6 +71,7 @@ def setup_logevent_triggers(runner):
runner.postprocess_cbs.append(send_transactions)
# noinspection DuplicatedCode
async def main():
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
log.setLevel(logging.DEBUG)
@@ -94,13 +96,13 @@ async def main():
runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None)
setup_logevent_triggers(runner)
if config.ohlc_dir:
runner.on_promotion.append(ohlc_save)
if db:
# noinspection PyTypeChecker
runner.on_state_init.append(init_order_triggers)
# noinspection PyUnboundLocalVariable
runner.on_promotion.append(db_state.save)
if redis_state:
# noinspection PyTypeChecker
runner.on_head_update.append(redis_state.save)
try:

View File

@@ -104,7 +104,7 @@ class DbState(SeriesCollection):
for row in db.session.query(SeriesDict).where(SeriesDict.chain == chain_id, SeriesDict.series == data.series2str(series)):
key = data.str2key(row.key)
value = data.str2value(row.value)
log.debug(f'load {series} {key} {value}')
# log.debug(f'load {series} {key} {value}')
var[key] = value
completed_block.set(root_block)
log.debug(f'loaded db state from block {root_block}')

View File

@@ -6,6 +6,7 @@ from dexorder import DELETE
@dataclass
class DiffEntry:
""" DiffEntry is the "value" part of a key-value pair, but DiffEntry also has metadata about the block in which the value was set """
value: Union[Any, DELETE]
height: int
hash: bytes
@@ -13,6 +14,7 @@ class DiffEntry:
@dataclass
class DiffItem:
""" DiffItem is a simple series-key-value triple """
series: Any
key: Any
value: Any
@@ -22,6 +24,7 @@ class DiffItem:
@dataclass
class DiffEntryItem:
""" DiffEntryItem is a DiffItem that has a DiffEntry as its extended value, instead of storing just the primary value directly """
series: Any
key: Any
entry: DiffEntry

View File

@@ -97,7 +97,6 @@ class BlockState:
return Fork([block.hash], height=block.height)
if block.height - self.ancestors[block.hash].height > 1:
# noinspection PyTypeChecker
return DisjointFork(block, self.root_block)
def ancestors():

View File

@@ -80,7 +80,6 @@ def from_toml(filename):
def parse_args(args=None):
""" should be called from binaries to parse args as command-line config settings """
# noinspection PyTypeChecker
try:
config.merge_with(OmegaConf.from_cli(args)) # updates config in-place. THANK YOU OmegaConf!
except OmegaConfBaseException as x:

View File

@@ -10,13 +10,13 @@ from typing import Optional
@dataclass
class Config:
rpc_url: str = 'http://localhost:8545'
ws_url: str = 'ws://localhost:8545'
ws_url: Optional[str] = 'ws://localhost:8545'
rpc_urls: Optional[dict[str,str]] = field(default_factory=dict)
db_url: str = 'postgresql://dexorder:redroxed@localhost/dexorder'
datadb_url: str = 'postgresql://dexorder:redroxed@localhost/dexorderdata'
ohlc_dir: str = './ohlc'
db_url: Optional[str] = 'postgresql://dexorder:redroxed@localhost/dexorder'
datadb_url: Optional[str] = 'postgresql://dexorder:redroxed@localhost/dexorderdata'
ohlc_dir: Optional[str] = None # if empty string or None, then OHLC's are not saved to disk
dump_sql: bool = False
redis_url: str = 'redis://localhost:6379'
redis_url: Optional[str] = 'redis://localhost:6379'
parallel_logevent_queries: bool = True
polling: float = 0 # seconds between queries for a new block. 0 disables polling and uses a websocket subscription on ws_url instead

View File

@@ -69,24 +69,22 @@ class ContractProxy:
def events(self):
return self.contract.events
def deploy(self, *args):
"""
Calls the contract constructor transaction and waits to receive a transaction receipt.
"""
tx: ContractTransaction = self.transact.constructor(*args)
receipt = tx.wait()
self.address = receipt.contractAddress
self._contracts.clear()
return receipt
# def deploy(self, *args):
# """
# Calls the contract constructor transaction and waits to receive a transaction receipt.
# """
# tx: ContractTransaction = self.transact.constructor(*args)
# receipt = tx.wait()
# self.address = receipt.contractAddress
# self._contracts.clear()
# return receipt
@property
def transact(self):
# noinspection PyTypeChecker
return ContractProxy(self.address, self._interface_name, _contracts=self._contracts, _wrapper=transact_wrapper, abi=self._abi)
@property
def build(self):
# noinspection PyTypeChecker
return ContractProxy(self.address, self._interface_name, _contracts=self._contracts, _wrapper=build_wrapper, abi=self._abi)
def __getattr__(self, item):

View File

@@ -75,7 +75,6 @@ class Db:
if engine is None:
raise RuntimeError('Cannot create session: no database engine set. Use dexorder.db.connect() first')
s = Session(engine, expire_on_commit=False)
# noinspection PyTypeChecker
_session.set(s)
return s

View File

@@ -4,7 +4,7 @@ from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from dexorder.database.model import Base
from dexorder.util import hexint
from dexorder.util import hexint, Field
class Block(Base):
@@ -25,5 +25,5 @@ class Block(Base):
current_block = ContextVar[Block]('Block.cur') # block for the current thread
latest_block = ContextVar[Block]('Block.latest') # most recent discovered but may not be processed yet
latest_block = Field[Block]() # most recent discovered block but maybe not the currently processing one
completed_block = ContextVar[Block]('Block.completed') # most recent fully-processed block

View File

@@ -1,7 +1,6 @@
import asyncio
import functools
import logging
from datetime import datetime
from uuid import UUID
from web3.types import EventData
@@ -210,7 +209,6 @@ async def activate_time_triggers():
# log.debug(f'activating time triggers at {now}')
# time triggers
for tt in tuple(time_triggers):
# noinspection PyTypeChecker
await maywait(tt(now))
@@ -220,16 +218,13 @@ async def activate_price_triggers():
for pool, price in new_pool_prices.items():
pools_triggered.add(pool)
for pt in tuple(price_triggers[pool]):
# noinspection PyTypeChecker
await maywait(pt(price))
for pool, triggers in new_price_triggers.items():
if pool not in pools_triggered:
price = pool_prices[pool]
for pt in triggers:
# noinspection PyTypeChecker
await maywait(pt(price))
for t in tuple(unconstrained_price_triggers):
# noinspection PyTypeChecker
await maywait(t(None))

View File

@@ -54,7 +54,7 @@ class RedisState (SeriesCollection):
sdels: dict[str,set[str]] = defaultdict(set)
hsets: dict[str,dict[str,str]] = defaultdict(dict)
hdels: dict[str,set[str]] = defaultdict(set)
pubs: list[tuple[str,str,list[Any]]] = [] # series, key, value => room, event, value
pubs: list[tuple[str,str,Any]] = [] # series, key, value => room, event, value
for diff in compress_diffs(diffs):
try:
d = self.datas[diff.series]

View File

@@ -3,18 +3,21 @@ import logging
import os
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Optional, NamedTuple
from typing import Optional, NamedTuple, Reversible, Union
from cachetools import LFUCache
from dexorder import dec, config, from_isotime, minutely
from dexorder import dec, config, from_isotime, minutely, from_timestamp
from dexorder.base.chain import current_chain
from dexorder.blockstate import BlockDict
from dexorder.blockstate import BlockDict, DiffItem, current_blockstate
from dexorder.blockstate.diff import DiffEntryItem
from dexorder.database.model import Block
log = logging.getLogger(__name__)
OHLC_PERIODS = [
timedelta(minutes=1), timedelta(minutes=3), timedelta(minutes=5), timedelta(minutes=10), timedelta(minutes=15), timedelta(minutes=30),
timedelta(minutes=1),
timedelta(minutes=3), timedelta(minutes=5), timedelta(minutes=10), timedelta(minutes=15), timedelta(minutes=30),
timedelta(hours=1), timedelta(hours=2), timedelta(hours=4), timedelta(hours=8), timedelta(hours=12),
timedelta(days=1), timedelta(days=2), timedelta(days=3), timedelta(days=7)
]
@@ -47,7 +50,7 @@ class NativeOHLC:
@property
def ohlc(self) -> OHLC:
return [
self.start.isoformat(timespec='minutes'),
minutely(self.start),
None if self.open is None else str(self.open),
None if self.high is None else str(self.high),
None if self.low is None else str(self.low),
@@ -55,8 +58,7 @@ class NativeOHLC:
]
def ohlc_name(period: timedelta) -> str:
def period_name(period: timedelta) -> str:
return f'{period // timedelta(minutes=1)}m' if period < timedelta(hours=1) \
else f'{period // timedelta(hours=1)}H' if period < timedelta(days=1) \
else f'{period // timedelta(days=7)}W' if period == timedelta(days=7) \
@@ -82,7 +84,7 @@ def update_ohlc(prev: OHLC, period: timedelta, time: datetime, price: Optional[d
returns an ordered list of OHLC's that have been created/modified by the new time/price
if price is None, then bars are advanced based on the time but no new price is added to the series.
"""
log.debug(f'\tupdating {prev} with {minutely(time)} {price}')
# log.debug(f'\tupdating {prev} with {minutely(time)} {price}')
cur = NativeOHLC.from_ohlc(prev)
assert time >= cur.start
result = []
@@ -93,7 +95,7 @@ def update_ohlc(prev: OHLC, period: timedelta, time: datetime, price: Optional[d
break
result.append(cur.ohlc)
cur = NativeOHLC(end, None, None, None, cur.close)
log.debug(f'\tresult after finalization: {result}')
# log.debug(f'\ttime advancements: {result}')
# if we are setting a price, update the current bar
if price is not None:
if cur.open is None:
@@ -105,8 +107,8 @@ def update_ohlc(prev: OHLC, period: timedelta, time: datetime, price: Optional[d
cur.low = min(cur.low, price)
cur.close = price
result.append(cur.ohlc)
log.debug(f'\tappended current bar: {cur.ohlc}')
log.debug(f'\tupdate result: {result}')
# log.debug(f'\tappended current bar: {cur.ohlc}')
# log.debug(f'\tupdate result: {result}')
return result
class OHLCKey (NamedTuple):
@@ -136,36 +138,50 @@ class OHLCRepository:
for period in OHLC_PERIODS:
self.update(symbol, period, time, price, create=create)
def update(self, symbol: str, period: timedelta, time: datetime, price: Optional[dec] = None, *, create: bool = True) -> Optional[list[OHLC]]:
@staticmethod
def update(symbol: str, period: timedelta, time: datetime, price: Optional[dec] = None, *, create: bool = True) -> Optional[list[OHLC]]:
"""
if price is None, then bars are advanced based on the time but no new price is added to the series.
"""
logname = f'{symbol} {ohlc_name(period)}'
log.debug(f'Updating OHLC {logname} {minutely(time)} {price}')
logname = f'{symbol} {period_name(period)}'
# log.debug(f'Updating OHLC {logname} {minutely(time)} {price}')
key = (symbol, period)
bars: Optional[list[OHLC]] = recent_ohlcs.get(key)
if not bars:
# bars is a list of "recent" OHLC's stored as blockdata. we try to keep the recent array long enough to extend before the root block time
historical: Optional[list[OHLC]] = recent_ohlcs.get(key)
# log.debug(f'got recent {historical}')
if not historical:
if create is False or price is None:
return # do not track symbols which have not been explicity set up
p = str(price)
historical = []
updated = [OHLC((minutely(ohlc_start_time(time, period)), p, p, p, p))]
log.debug(f'\tcreated new bars {updated}')
# log.debug(f'\tcreated new bars {updated}')
else:
updated = update_ohlc(bars[-1], period, time, price)
if len(updated) == 1:
updated = [bars[-1], updated[0]] # return the previous finalized bar along with the updated current bar
log.debug(f'\tnew recents: {updated}')
updated = update_ohlc(historical[-1], period, time, price)
# drop any historical bars that are older than we need
oldest_needed = from_timestamp(current_blockstate.get().root_block.timestamp) - period # cover the root block time plus one period prior
trim = (oldest_needed - from_isotime(historical[0][0])) // period
if trim > 0:
historical = historical[trim:]
# now overlap the updated data on top of the historical data
if not historical or not updated:
updated = historical + updated
else:
last_bar = from_isotime(historical[-1][0])
first_updated = from_isotime(updated[0][0])
overlap = (first_updated - last_bar) // period + 1
updated = historical[:-overlap] + updated if overlap > 0 else historical + updated
# log.debug(f'\tnew recents: {updated}')
recent_ohlcs.setitem(key, updated)
if len(updated) > 1:
log.debug(f'\tsaving finalized bars: {updated[:-1]}')
self.save_all(symbol, period, updated[:-1]) # save any finalized bars to storage
return updated
def save_all(self, symbol: str, period: timedelta, ohlc_list: list[OHLC]) -> None:
for ohlc in ohlc_list:
self.save(symbol, period, ohlc)
self.save(symbol, period, ohlc) # we need to act synchronously so we don't have conflicting access to chunks
def save(self, symbol: str, period: timedelta, ohlc: OHLC) -> None:
# log.debug(f'saving ohlc {symbol} {period_name(period)} {ohlc}')
time = dt(ohlc[0])
chunk = self.get_chunk(symbol, period, time)
if not chunk:
@@ -173,7 +189,10 @@ class OHLCRepository:
else:
start = from_isotime(chunk[0][0])
index = (time - start) // period
assert index <= len(chunk)
# log.debug(f'save {symbol} {ohlc_name(period)} chunk {start} index {index} <= {len(chunk)}')
if index > len(chunk):
log.error(f'chunk gap: {index} > {len(chunk)} {symbol} {period_name(period)} {ohlc}'+''.join(f'\n\t{c}' for c in chunk))
exit(1)
if index == len(chunk):
assert from_isotime(chunk[-1][0]) + period == time
chunk.append(ohlc)
@@ -204,20 +223,23 @@ class OHLCRepository:
if not chunk:
return
path = self.chunk_path(symbol, period, from_isotime(chunk[0][0]))
try:
with open(path, 'w') as file:
json.dump(chunk, file)
return
except FileNotFoundError:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w') as file:
json.dump(chunk, file)
for _ in range(2):
try:
with open(path, 'w') as file:
json.dump(chunk, file)
self.cache[path] = chunk
return
except FileNotFoundError:
os.makedirs(os.path.dirname(path), exist_ok=True)
raise IOError(f'Could not write chunk {path}')
def chunk_path(self, symbol: str, period: timedelta, time: datetime) -> str:
def chunk_path(self, symbol: str, period: timedelta, time: datetime, *, chain_id: int = None) -> str:
if chain_id is None:
chain_id = current_chain.get().chain_id
start = ohlc_start_time(time, period)
name = ohlc_name(period)
return f'{self.dir}/{symbol}/{name}/' + (
name = period_name(period)
return f'{self.dir}/{chain_id}/{symbol}/{name}/' + (
f'{start.year}/{symbol}-{name}-{start:%Y%m%d}.json' if period < timedelta(hours=1) else # <1H data has a file per day
f'{start.year}/{symbol}-{name}-{start:%Y%m}.json' if period < timedelta(days=7) else # <1W data has a file per month
f'{symbol}-{name}.json' # long periods are a single file for all of history
@@ -228,18 +250,28 @@ def pub_ohlc(_series:str, key: OHLCKey, bars: list[OHLC]):
pool_addr, period = key
chain_id = current_chain.get().chain_id
return (
f'{chain_id}|{pool_addr}|{ohlc_name(period)}', # channel name is like 0x...|1m
f'{chain_id}|{pool_addr}|{period_name(period)}', # channel name is like 0x...|1m
'ohlcs',
(chain_id, pool_addr, bars)
)
def ohlc_key_to_str(k):
return f'{k[0]}|{ohlc_name(k[1])}'
return f'{k[0]}|{period_name(k[1])}'
def ohlc_str_to_key(s):
pool, period_name = s.split('|')
return pool, period_from_name(period_name)
def ohlc_save(_block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]):
"""
used as a finalization callback from BlockState data.
"""
for diff in diffs:
if diff.series == 'ohlc':
symbol, period = diff.key
ohlcs.save_all(symbol, period, diff.value)
# The most recent OHLC's are stored as block data. We store a list of at least the two latest bars, which provides clients with
# the latest finalized bar as well as the current open bar.
recent_ohlcs: BlockDict[OHLCKey, list[OHLC]] = BlockDict('ohlc', db=True, redis=True, pub=pub_ohlc,

View File

@@ -21,9 +21,10 @@ TimeTrigger = Callable[[int], None] # func(timestamp)
time_triggers:BlockSet[TimeTrigger] = BlockSet('tt')
PriceTrigger = Callable[[dec], Union[Awaitable[None],None]] # [async] func(pool_price)
UnconstrainedPriceTrigger = Callable[[Optional[dec]], Union[Awaitable[None],None]] # [async] func(pool_price)
price_triggers:dict[str, BlockSet[PriceTrigger]] = defaultdictk(lambda addr:BlockSet(f'pt|{addr}')) # different BlockSet per pool address
new_price_triggers:dict[str, set[PriceTrigger]] = defaultdict(set) # when price triggers are first set, they must be tested against the current price even if it didnt change this block
unconstrained_price_triggers: BlockSet[PriceTrigger] = BlockSet('upt') # tranches with no price constraints, whose time constraint is fulfilled
unconstrained_price_triggers: BlockSet[UnconstrainedPriceTrigger] = BlockSet('upt') # tranches with no price constraints, whose time constraint is fulfilled
active_tranches: BlockDict[TrancheKey, Optional[PriceProof]] = BlockDict('at') # tranches which have passed all constraints and should be executed
execution_requests:BlockDict[TrancheKey, ExecutionRequest] = BlockDict('e') # generated by the active tranches
# todo should this really be blockdata?

View File

@@ -41,6 +41,12 @@ class Pools:
except ContractLogicError:
log.debug(f'new Unknown pool at {address}')
found = Pool(chain=chain, address=address, exchange=Exchange.Unknown, base=ADDRESS_0, quote=ADDRESS_0, fee=0)
except ValueError as v:
if v.args[0].get('code') == -32000:
log.debug(f'new Unknown pool at {address}')
found = Pool(chain=chain, address=address, exchange=Exchange.Unknown, base=ADDRESS_0, quote=ADDRESS_0, fee=0)
else:
raise
db.session.add(found)
Pools.instances[key] = found
return None if found.exchange == Exchange.Unknown else found

View File

@@ -1,7 +1,7 @@
import asyncio
import logging
from asyncio import Queue
from typing import Union, Any, Iterable
from typing import Union, Any, Iterable, Callable
from web3.contract.contract import ContractEvents
from web3.exceptions import LogTopicError, MismatchedABI
@@ -19,9 +19,12 @@ from dexorder.database.model import Block
from dexorder.database.model.block import current_block, latest_block
from dexorder.util import hexstr, topic
from dexorder.util.async_util import maywait, Maywaitable
from dexorder.util.shutdown import fatal
log = logging.getLogger(__name__)
class Retry (Exception): ...
# todo detect reorgs and generate correct onHeadUpdate set by unioning the changes along the two forks, not including their common ancestor deltas
class BlockStateRunner:
@@ -60,22 +63,22 @@ class BlockStateRunner:
self.state = state
# items are (callback, event, log_filter). The callback is invoked with web3 EventData for every detected event
self.events:list[tuple[Maywaitable[[EventData],None],ContractEvents,dict]] = []
self.events:list[tuple[Callable[[EventData],Maywaitable[None]],ContractEvents,dict]] = []
# these callbacks are invoked after every block and also every second if there wasnt a block
self.postprocess_cbs:list[Maywaitable[[],None]] = []
self.postprocess_cbs:list[Callable[[],Maywaitable[None]]] = []
# onStateInit callbacks are invoked after the initial state is loaded or created
self.on_state_init: list[Maywaitable[[],None]] = []
self.on_state_init: list[Callable[[],Maywaitable[None]]] = []
self.state_initialized = False
# onHeadUpdate callbacks are invoked with a list of DiffItems used to update the head state from either the previous head or the root
self.on_head_update: list[Maywaitable[[Block,list[DiffEntryItem]],None]] = []
self.on_head_update: list[Callable[[Block,list[DiffEntryItem]],Maywaitable[None]]] = []
# onPromotion callbacks are invoked with a list of DiffItems used to advance the root state
self.on_promotion: list[Maywaitable[[Block,list[DiffEntryItem]],None]] = []
self.on_promotion: list[Callable[[Block,list[DiffEntryItem]],Maywaitable[None]]] = []
self.publish_all: Maywaitable[[Iterable[tuple[str,str,Any]]],None] = publish_all
self.publish_all: Callable[[Iterable[tuple[str,str,Any]]],Maywaitable[None]] = publish_all
self.timer_period = timer_period
@@ -85,7 +88,7 @@ class BlockStateRunner:
self.running = False
def add_event_trigger(self, callback: Maywaitable[[EventData], None], event: ContractEvents = None, log_filter: Union[dict, str] = None):
def add_event_trigger(self, callback: Callable[[EventData], Maywaitable[None]], event: ContractEvents = None, log_filter: Union[dict, str] = None):
"""
if event is None, the callback is still invoked in the series of log handlers but with no argument instead of logs
"""
@@ -98,7 +101,7 @@ class BlockStateRunner:
if self.state:
self.max_height_seen = max(self.max_height_seen, self.state.root_block.height)
self.running = True
return await (self.run_polling() if config.polling > 0 else self.run_ws())
return await (self.run_polling() if config.polling > 0 or not config.ws_url else self.run_ws())
async def run_ws(self):
w3ws = await create_w3_ws()
@@ -124,8 +127,8 @@ class BlockStateRunner:
if not self.running:
break
await async_yield()
except (ConnectionClosedError, TimeoutError):
pass
except (ConnectionClosedError, TimeoutError, asyncio.TimeoutError) as e:
log.debug(f'runner timeout {e}')
finally:
# noinspection PyBroadException
try:
@@ -168,8 +171,8 @@ class BlockStateRunner:
if not self.running:
break
await asyncio.sleep(config.polling)
except ConnectionClosedError:
pass
except (ConnectionClosedError, TimeoutError, asyncio.TimeoutError) as e:
log.debug(f'runner timeout {e}')
finally:
# noinspection PyBroadException
try:
@@ -200,6 +203,7 @@ class BlockStateRunner:
parent = bytes.fromhex(block_data['parentHash'][2:])
height = int(block_data['number'], 0)
head = Block(chain=chain.chain_id, height=height, hash=blockhash, parent=parent, data=block_data)
latest_block.set(head)
if self.state or config.backfill:
# backfill batches
@@ -246,6 +250,8 @@ class BlockStateRunner:
try:
await self.handle_head(chain, head, w3)
prev_head = head
except Retry:
pass
except Exception as x:
log.exception(x)
except Exception:
@@ -257,14 +263,13 @@ class BlockStateRunner:
async def handle_head(self, chain, block, w3):
print(f'logger {log} {log.name} level {log.level} {logging.DEBUG} {logging.FATAL}')
log.debug(f'handle_head {block.height} {block.hash}')
log.debug(f'handle_head {block.height} {hexstr(block.hash)}')
session = None
batches = []
try:
if self.state is not None and block.hash in self.state.by_hash:
log.debug(f'block {block.hash} was already processed')
return
latest_block.set(block)
if self.state is None:
# initialize
self.state = BlockState(block)
@@ -323,16 +328,28 @@ class BlockStateRunner:
if not self.state_initialized:
await self.do_state_init_cbs()
# logevent callbacks
for future, callback, event, filter_args in batches:
while True:
try:
# we remove entries as we process them, so the exception handler doesn't re-await the callbacks
batch = batches.pop(0)
except IndexError:
break
future, callback, event, filter_args = batch
if future is None:
await maywait(callback()) # non-log callback
else:
log_events = await future if config.parallel_logevent_queries else future
try:
log_events = await future if config.parallel_logevent_queries else future
except ValueError as e:
if e.args[0].get('code') == -32602:
# too many logs were returned in the batch, so decrease the batch size.
fatal(f'Decrease batch size for {chain}')
raise
for log_event in log_events:
try:
parsed = event.process_log(log_event) if event is not None else log_event
except (LogTopicError, MismatchedABI) as x:
log.warning(f'logevent parse error {x}\n{log_event}')
except (LogTopicError, MismatchedABI) as e:
log.warning(f'logevent parse error {e}\n{log_event}')
else:
# todo try/except for known retryable errors
await maywait(callback(parsed))
@@ -343,18 +360,38 @@ class BlockStateRunner:
# isn't updated by the new fork is still queried from the root state to overwrite any stale data from the abandoned branch.
diff_items = self.state.diffs_by_hash[block.hash]
for callback in self.on_head_update:
# noinspection PyCallingNonCallable
await maywait(callback(block, diff_items))
# check for root promotion
promotion_height = fork.height - chain.confirms
if not fork.disjoint and promotion_height > self.state.root_block.height and (
new_root_fork := fork.for_height(promotion_height)):
promotion_height = latest_block.get().height - chain.confirms
new_root_fork = None
if fork.disjoint:
# individually check the fork's head and ancestor
if fork.height <= promotion_height:
new_root_fork = fork
else:
state = current_blockstate.get()
parent_block = state.by_hash[fork.parent]
if parent_block.height <= promotion_height:
new_root_fork = state.fork(parent_block)
else:
# non-disjoint, contiguous fork
if fork.height <= promotion_height:
new_root_fork = fork
else:
new_root_fork = fork.for_height(promotion_height)
if new_root_fork:
log.debug(f'promoting root {new_root_fork.height} {hexstr(new_root_fork.hash)}')
diff_items = self.state.promote_root(new_root_fork)
for callback in self.on_promotion:
# todo try/except for known retryable errors
# noinspection PyCallingNonCallable
await maywait(callback(self.state.root_block, diff_items))
# publish messages
if pubs and self.publish_all:
# noinspection PyCallingNonCallable
await maywait(self.publish_all(pubs))
except: # legitimately catch EVERYTHING because we re-raise
log.debug('rolling back session')
@@ -364,6 +401,7 @@ class BlockStateRunner:
self.state.delete_block(block.hash)
if config.parallel_logevent_queries:
for get_logs, *_ in batches:
# noinspection PyBroadException
try:
await get_logs
except Exception:
@@ -388,6 +426,7 @@ class BlockStateRunner:
session.begin()
try:
for callback in self.postprocess_cbs:
# noinspection PyCallingNonCallable
await maywait(callback())
except:
session.rollback()
@@ -402,5 +441,6 @@ class BlockStateRunner:
if self.state_initialized:
return
for cb in self.on_state_init:
# noinspection PyCallingNonCallable
await maywait(cb())
self.state_initialized = True

View File

@@ -1,5 +1,5 @@
import re
from typing import Callable, TypeVar, Generic, Union
from typing import Callable, TypeVar, Generic, Union, Any
from eth_utils import keccak
from hexbytes import HexBytes
@@ -17,14 +17,13 @@ def align_decimal(value, left_columns) -> str:
return ' ' * pad + s
def hexstr(value: bytes):
def hexstr(value: Union[HexBytes, bytes, str]):
""" returns an 0x-prefixed hex string """
if type(value) is HexBytes:
return value.hex()
elif type(value) is bytes:
return '0x' + value.hex()
elif type(value) is str:
# noinspection PyTypeChecker
return value if value.startswith('0x') else '0x' + value
else:
raise ValueError
@@ -71,3 +70,15 @@ class defaultdictk (Generic[K,V], dict[K,V]):
except KeyError:
default = self[item] = self.default_factory(item)
return default
T = TypeVar('T')
class Field (Generic[T]):
def __init__(self, value: T = None):
self._value = value
def get(self) -> T:
return self._value
def set(self, value: T):
self._value = value

View File

@@ -12,9 +12,7 @@ async def async_yield():
Args = TypeVar('Args')
Return = TypeVar('Return')
class Maywaitable (Generic[Args, Return], Callable[[Args],Return], Awaitable[Return], ABC):
pass
Maywaitable = Union[Return, Awaitable[Return]]
async def maywait(obj: Maywaitable):
if inspect.isawaitable(obj):

View File

@@ -1,8 +1,9 @@
import logging
from typing import Never
log = logging.getLogger('dexorder')
def fatal(message, exception=None):
def fatal(message, exception=None) -> Never:
if exception is None and isinstance(message, (BaseException,RuntimeError)):
exception = message
log.exception(message, exc_info=exception)