diff --git a/src/dexorder/base/chain.py b/src/dexorder/base/chain.py index aab1379..1cd076c 100644 --- a/src/dexorder/base/chain.py +++ b/src/dexorder/base/chain.py @@ -17,13 +17,15 @@ class Blockchain: def get(name_or_id): return Blockchain.for_name(name_or_id) if type(name_or_id) is str else Blockchain.for_id(name_or_id) - def __init__(self, chain_id, name, confirms=10): + def __init__(self, chain_id, name, confirms=10, batch_size=100): """ confirms is the number of blocks until a block can be considered finalized and unforkable + batch_size is the number of blocks to fetch per logs query """ self.chain_id = chain_id self.name = name self.confirms = confirms + self.batch_size = batch_size Blockchain._instances_by_id[chain_id] = self Blockchain._instances_by_name[name] = self diff --git a/src/dexorder/base/fork.py b/src/dexorder/base/fork.py index d92bfb6..fb430db 100644 --- a/src/dexorder/base/fork.py +++ b/src/dexorder/base/fork.py @@ -38,7 +38,9 @@ class Fork: def for_height(self, height): """ returns a new Fork object for an older block along this fork. used for root promotion. """ - assert( self.height - len(self.ancestry) < height <= self.height) + assert( height <= self.height ) + if height <= self.height - len(self.ancestry): + return None return Fork(self.ancestry[self.height-height:], height=height) def __str__(self): diff --git a/src/dexorder/bin/main.py b/src/dexorder/bin/main.py index 56e4630..53437b7 100644 --- a/src/dexorder/bin/main.py +++ b/src/dexorder/bin/main.py @@ -7,7 +7,6 @@ from dexorder.blockstate.blockdata import BlockData from dexorder.blockstate.db_state import DbState from dexorder.configuration import parse_args from dexorder.runner import BlockStateRunner -from dexorder.data import pool_prices, vault_tokens, vault_addresses, underfunded_vaults, active_orders log = logging.getLogger('dexorder') @@ -24,6 +23,7 @@ if __name__ == '__main__': db_state = DbState(BlockData.by_tag['db']) with db.session: state = db_state.load() + log.info(f'loaded state from db for root block {state.root_block}') runner = BlockStateRunner(state) if db: # noinspection PyUnboundLocalVariable diff --git a/src/dexorder/blockstate/db_state.py b/src/dexorder/blockstate/db_state.py index b0435f1..23149b3 100644 --- a/src/dexorder/blockstate/db_state.py +++ b/src/dexorder/blockstate/db_state.py @@ -73,7 +73,7 @@ class DbState: elif t == BlockData.Type.DICT: # noinspection PyTypeChecker var: BlockDict = BlockData.registry[series] - for row in db.session.query(SeriesDict).where(SeriesSet.series==keystr(series)): + for row in db.session.query(SeriesDict).where(SeriesDict.series==keystr(series)): var[strkey(row.key)] = row.value completed_block.set(root_block) return state diff --git a/src/dexorder/blockstate/state.py b/src/dexorder/blockstate/state.py index 583df89..381e85b 100644 --- a/src/dexorder/blockstate/state.py +++ b/src/dexorder/blockstate/state.py @@ -182,7 +182,6 @@ class BlockState: if not difflist or len(difflist) == 1 and difflist[0].value == DELETE and difflist[0].height <= new_root_fork.height: del self.diffs_by_series[s][k] - del self.by_hash[self.root_block.hash] # old root block self.root_block = block log.debug(f'promoted root {self.root_block}') return diffs diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index f267229..9b7ea4e 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -3,7 +3,7 @@ from typing import Callable, Union from web3 import AsyncWeb3 from web3.contract.contract import ContractEvents -from web3.exceptions import LogTopicError +from web3.exceptions import LogTopicError, MismatchedABI from web3.types import EventData from dexorder import Blockchain, db, blockchain, NARG, dec @@ -64,12 +64,14 @@ class BlockStateRunner: w3 = blockchain.connect() w3ws = create_w3_ws() + self.setup_triggers(w3) chain_id = await w3ws.eth.chain_id chain = Blockchain.for_id(chain_id) current_chain.set(chain) state = self.state + async with w3ws as w3ws: await w3ws.eth.subscribe('newHeads') while True: @@ -88,25 +90,33 @@ class BlockStateRunner: # initialize state = BlockState(block) current_blockstate.set(state) - self.setup_triggers(w3) log.info('Created new empty root state') else: fork = state.add_block(block) if fork is None: log.debug(f'discarded late-arriving head {block}') else: - futures = [] + batches = [] if fork.disjoint: - # todo backfill batches - from_height = state.root_block.height + 1 - log.error(f'backfill unimplemented for range {from_height} to {block}') - exit(1) + # backfill batches + for callback, event, log_filter in self.events: + from_height = state.root_block.height + 1 + end_height = block.height + while from_height <= end_height: + to_height = min(end_height, from_height + chain.batch_size - 1) + lf = dict(log_filter) + lf['fromBlock'] = from_height + lf['toBlock'] = to_height + log.debug(f'batch backfill {from_height} - {to_height}') + batches.append((w3.eth.get_logs(log_filter), callback, event, lf)) + from_height += chain.batch_size else: # event callbacks are triggered in the order in which they're registered. the events passed to # each callback are in block transaction order for callback, event, log_filter in self.events: - log_filter['blockhash'] = w3.to_hex(block.hash) - futures.append(w3.eth.get_logs(log_filter)) + lf = dict(log_filter) + lf['blockhash'] = w3.to_hex(block.hash) + batches.append((w3.eth.get_logs(log_filter), callback, event, log_filter)) # set up for callbacks current_block.set(block) @@ -115,11 +125,12 @@ class BlockStateRunner: session.begin() session.add(block) # callbacks - for future, (callback,event,filter_args) in zip(futures,self.events): - for log_event in await future: + for future,callback,event,filter_args in batches: + log_events = await future + for log_event in log_events: try: parsed = event.process_log(log_event) - except LogTopicError: + except (LogTopicError, MismatchedABI): pass else: # todo try/except for known retryable errors @@ -132,8 +143,8 @@ class BlockStateRunner: # check for root promotion promotion_height = fork.height - chain.confirms - if not fork.disjoint and promotion_height > state.root_block.height: - diff_items = state.promote_root(fork.for_height(promotion_height)) + if not fork.disjoint and promotion_height > state.root_block.height and (new_root_fork := fork.for_height(promotion_height)): + diff_items = state.promote_root(new_root_fork) for callback in self.on_promotion: # todo try/except for known retryable errors callback(state.root_block, diff_items) @@ -146,12 +157,13 @@ class BlockStateRunner: else: if session is not None: session.commit() + log.info(f'completed block {block}') @staticmethod def handle_transfer(transfer: EventData): to_address = transfer['args']['to'] - print('transfer', to_address) + log.debug(f'transfer {to_address}') if to_address in vault_addresses: token_address = transfer['address'] vault_tokens.add(token_address) @@ -169,7 +181,7 @@ class BlockStateRunner: addr = swap['address'] d = dec(sqrt_price) price = d*d / dec(2**(96*2)) - print(f'pool {addr} {price}') + log.debug(f'pool {addr} {price}') pool_prices[addr] = price @@ -179,6 +191,8 @@ class BlockStateRunner: self.events.append((callback, event, log_filter)) def setup_triggers(self, w3: AsyncWeb3): + self.events.clear() + transfer = w3.eth.contract(abi=get_contract_data('ERC20')['abi']).events.Transfer() self.add_event_trigger(self.handle_transfer, transfer)