This commit is contained in:
Tim Olson
2023-09-29 00:21:24 -04:00
parent 6ab486ac18
commit fd5613f575
6 changed files with 38 additions and 21 deletions

View File

@@ -17,13 +17,15 @@ class Blockchain:
def get(name_or_id):
return Blockchain.for_name(name_or_id) if type(name_or_id) is str else Blockchain.for_id(name_or_id)
def __init__(self, chain_id, name, confirms=10):
def __init__(self, chain_id, name, confirms=10, batch_size=100):
"""
confirms is the number of blocks until a block can be considered finalized and unforkable
batch_size is the number of blocks to fetch per logs query
"""
self.chain_id = chain_id
self.name = name
self.confirms = confirms
self.batch_size = batch_size
Blockchain._instances_by_id[chain_id] = self
Blockchain._instances_by_name[name] = self

View File

@@ -38,7 +38,9 @@ class Fork:
def for_height(self, height):
""" returns a new Fork object for an older block along this fork. used for root promotion. """
assert( self.height - len(self.ancestry) < height <= self.height)
assert( height <= self.height )
if height <= self.height - len(self.ancestry):
return None
return Fork(self.ancestry[self.height-height:], height=height)
def __str__(self):

View File

@@ -7,7 +7,6 @@ from dexorder.blockstate.blockdata import BlockData
from dexorder.blockstate.db_state import DbState
from dexorder.configuration import parse_args
from dexorder.runner import BlockStateRunner
from dexorder.data import pool_prices, vault_tokens, vault_addresses, underfunded_vaults, active_orders
log = logging.getLogger('dexorder')
@@ -24,6 +23,7 @@ if __name__ == '__main__':
db_state = DbState(BlockData.by_tag['db'])
with db.session:
state = db_state.load()
log.info(f'loaded state from db for root block {state.root_block}')
runner = BlockStateRunner(state)
if db:
# noinspection PyUnboundLocalVariable

View File

@@ -73,7 +73,7 @@ class DbState:
elif t == BlockData.Type.DICT:
# noinspection PyTypeChecker
var: BlockDict = BlockData.registry[series]
for row in db.session.query(SeriesDict).where(SeriesSet.series==keystr(series)):
for row in db.session.query(SeriesDict).where(SeriesDict.series==keystr(series)):
var[strkey(row.key)] = row.value
completed_block.set(root_block)
return state

View File

@@ -182,7 +182,6 @@ class BlockState:
if not difflist or len(difflist) == 1 and difflist[0].value == DELETE and difflist[0].height <= new_root_fork.height:
del self.diffs_by_series[s][k]
del self.by_hash[self.root_block.hash] # old root block
self.root_block = block
log.debug(f'promoted root {self.root_block}')
return diffs

View File

@@ -3,7 +3,7 @@ from typing import Callable, Union
from web3 import AsyncWeb3
from web3.contract.contract import ContractEvents
from web3.exceptions import LogTopicError
from web3.exceptions import LogTopicError, MismatchedABI
from web3.types import EventData
from dexorder import Blockchain, db, blockchain, NARG, dec
@@ -64,12 +64,14 @@ class BlockStateRunner:
w3 = blockchain.connect()
w3ws = create_w3_ws()
self.setup_triggers(w3)
chain_id = await w3ws.eth.chain_id
chain = Blockchain.for_id(chain_id)
current_chain.set(chain)
state = self.state
async with w3ws as w3ws:
await w3ws.eth.subscribe('newHeads')
while True:
@@ -88,25 +90,33 @@ class BlockStateRunner:
# initialize
state = BlockState(block)
current_blockstate.set(state)
self.setup_triggers(w3)
log.info('Created new empty root state')
else:
fork = state.add_block(block)
if fork is None:
log.debug(f'discarded late-arriving head {block}')
else:
futures = []
batches = []
if fork.disjoint:
# todo backfill batches
from_height = state.root_block.height + 1
log.error(f'backfill unimplemented for range {from_height} to {block}')
exit(1)
# backfill batches
for callback, event, log_filter in self.events:
from_height = state.root_block.height + 1
end_height = block.height
while from_height <= end_height:
to_height = min(end_height, from_height + chain.batch_size - 1)
lf = dict(log_filter)
lf['fromBlock'] = from_height
lf['toBlock'] = to_height
log.debug(f'batch backfill {from_height} - {to_height}')
batches.append((w3.eth.get_logs(log_filter), callback, event, lf))
from_height += chain.batch_size
else:
# event callbacks are triggered in the order in which they're registered. the events passed to
# each callback are in block transaction order
for callback, event, log_filter in self.events:
log_filter['blockhash'] = w3.to_hex(block.hash)
futures.append(w3.eth.get_logs(log_filter))
lf = dict(log_filter)
lf['blockhash'] = w3.to_hex(block.hash)
batches.append((w3.eth.get_logs(log_filter), callback, event, log_filter))
# set up for callbacks
current_block.set(block)
@@ -115,11 +125,12 @@ class BlockStateRunner:
session.begin()
session.add(block)
# callbacks
for future, (callback,event,filter_args) in zip(futures,self.events):
for log_event in await future:
for future,callback,event,filter_args in batches:
log_events = await future
for log_event in log_events:
try:
parsed = event.process_log(log_event)
except LogTopicError:
except (LogTopicError, MismatchedABI):
pass
else:
# todo try/except for known retryable errors
@@ -132,8 +143,8 @@ class BlockStateRunner:
# check for root promotion
promotion_height = fork.height - chain.confirms
if not fork.disjoint and promotion_height > state.root_block.height:
diff_items = state.promote_root(fork.for_height(promotion_height))
if not fork.disjoint and promotion_height > state.root_block.height and (new_root_fork := fork.for_height(promotion_height)):
diff_items = state.promote_root(new_root_fork)
for callback in self.on_promotion:
# todo try/except for known retryable errors
callback(state.root_block, diff_items)
@@ -146,12 +157,13 @@ class BlockStateRunner:
else:
if session is not None:
session.commit()
log.info(f'completed block {block}')
@staticmethod
def handle_transfer(transfer: EventData):
to_address = transfer['args']['to']
print('transfer', to_address)
log.debug(f'transfer {to_address}')
if to_address in vault_addresses:
token_address = transfer['address']
vault_tokens.add(token_address)
@@ -169,7 +181,7 @@ class BlockStateRunner:
addr = swap['address']
d = dec(sqrt_price)
price = d*d / dec(2**(96*2))
print(f'pool {addr} {price}')
log.debug(f'pool {addr} {price}')
pool_prices[addr] = price
@@ -179,6 +191,8 @@ class BlockStateRunner:
self.events.append((callback, event, log_filter))
def setup_triggers(self, w3: AsyncWeb3):
self.events.clear()
transfer = w3.eth.contract(abi=get_contract_data('ERC20')['abi']).events.Transfer()
self.add_event_trigger(self.handle_transfer, transfer)