runner block polling

This commit is contained in:
Tim Olson
2023-11-21 17:59:31 -04:00
parent 90d7c89a9d
commit ecc9e2eb98
3 changed files with 69 additions and 12 deletions

View File

@@ -3,6 +3,7 @@ rpc_url='http://alpharpc:8545'
ws_url='ws://alpharpc:8545'
db_url='postgresql://dexorder:redroxed@postgres/dexorder'
redis_url='redis://redis:6379'
polling=0.5
[deployments]
53261='alpha'

View File

@@ -16,6 +16,9 @@ class Config:
dump_sql: bool = False
redis_url: str = 'redis://localhost:6379'
parallel_logevent_queries: bool = True
polling: float = 0 # seconds between queries for a new block. 0 disables polling and uses a websocket subscription on ws_url instead
tokens: list['TokenConfig'] = field(default_factory=list)
account: Optional[str] = None # may be a private key or an account alias

View File

@@ -8,10 +8,10 @@ from web3.exceptions import LogTopicError, MismatchedABI
# noinspection PyPackageRequirements
from websockets.exceptions import ConnectionClosedError
from dexorder import Blockchain, db, current_pub, async_yield, current_w3
from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config
from dexorder.base.chain import current_chain
from dexorder.base.fork import current_fork, Fork
from dexorder.blockchain.connection import create_w3_ws
from dexorder.blockchain.connection import create_w3_ws, create_w3
from dexorder.blockstate import BlockState, current_blockstate
from dexorder.blockstate.diff import DiffEntryItem
from dexorder.database.model import Block
@@ -62,6 +62,9 @@ class BlockStateRunner:
async def run(self):
return await (self.run_polling() if config.polling > 0 else self.run_ws())
async def run_ws(self):
"""
1. load root stateBlockchain
a. if no root, init from head
@@ -97,6 +100,7 @@ class BlockStateRunner:
while self.running:
try:
async with w3ws as w3ws:
log.debug('connecting to ws provider')
await w3ws.provider.connect()
subscription = await w3ws.eth.subscribe('newHeads') # the return value of this call is not consistent between anvil/hardhat/rpc. do not use it.
log.debug(f'subscribed to newHeads {subscription}')
@@ -104,7 +108,7 @@ class BlockStateRunner:
async for message in w3ws.ws.listen_to_websocket():
head = message['result']
log.debug(f'detected new block {head["number"]} {hexstr(head["hash"])}')
await self.queue.put(head)
await self.queue.put(head["hash"])
if not self.running:
break
except (ConnectionClosedError, TimeoutError):
@@ -117,10 +121,53 @@ class BlockStateRunner:
except Exception:
pass
await async_yield()
log.debug('runner run_ws() exiting')
async def run_polling(self):
"""
Hardhat websocket stops sending messages after about 5 minutes.
https://github.com/NomicFoundation/hardhat/issues/2053
So we must implement polling to work around their incompetence.
"""
self.running = True
w3 = create_w3()
chain_id = await w3.eth.chain_id
chain = Blockchain.for_id(chain_id)
current_chain.set(chain)
setup_logevent_triggers(self)
_worker_task = asyncio.create_task(self.worker())
prev_blockhash = None
while self.running:
try:
new_blocks = await w3.eth.filter("latest")
for head in await new_blocks.get_new_entries():
if head != prev_blockhash:
prev_blockhash = head
await self.queue.put(head)
log.debug(f'detected new block {hexstr(head)}')
if not self.running:
break
await asyncio.sleep(config.polling)
except ConnectionClosedError:
pass
finally:
# noinspection PyBroadException
try:
# noinspection PyUnresolvedReferences
await w3.provider.disconnect()
except Exception:
pass
await async_yield()
log.debug('runner run_polling() exiting')
async def worker(self):
log.debug(f'runner worker started {self.running}')
log.debug(f'runner worker started')
w3 = current_w3.get()
chain = current_chain.get()
assert chain.chain_id == await w3.eth.chain_id
@@ -128,7 +175,7 @@ class BlockStateRunner:
try:
async with asyncio.timeout(1): # check running flag every second
head = await self.queue.get()
log.debug(f'got head {hexstr(head["hash"])}')
log.debug(f'got head {hexstr(head)}')
except TimeoutError:
pass
else:
@@ -136,15 +183,15 @@ class BlockStateRunner:
await self.handle_head(chain, head, w3)
except Exception as x:
log.exception(x)
log.debug('runner worker exiting')
async def handle_head(self, chain, head, w3):
log.debug(f'processing block {head["number"]} {hexstr(head["hash"])}')
async def handle_head(self, chain, blockhash, w3):
log.debug(f'processing block {hexstr(blockhash)}')
chain_id = chain.chain_id
session = None
blockhash = None
try:
blockhash = hexstr(head["hash"])
blockhash = hexstr(blockhash)
if self.state is not None and blockhash in self.state.by_hash:
return
# block_data = await w3.eth.get_block(head['hash'], True)
@@ -182,7 +229,10 @@ class BlockStateRunner:
lf['fromBlock'] = from_height
lf['toBlock'] = to_height
log.debug(f'batch backfill {from_height} - {to_height}')
batches.append((w3.eth.get_logs(lf), callback, event, lf))
get_logs = w3.eth.get_logs(lf)
if not config.parallel_logevent_queries:
get_logs = await get_logs
batches.append((get_logs, callback, event, lf))
from_height += chain.batch_size
else:
# event callbacks are triggered in the order in which they're registered. the events passed to
@@ -194,7 +244,10 @@ class BlockStateRunner:
# todo use head['logsBloom'] to skip unnecessary log queries
lf = dict(log_filter)
lf['blockHash'] = hexstr(block.hash)
batches.append((w3.eth.get_logs(lf), callback, event, log_filter))
get_logs = w3.eth.get_logs(lf)
if not config.parallel_logevent_queries:
get_logs = await get_logs
batches.append((get_logs, callback, event, log_filter))
# set up for callbacks
current_block.set(block)
@@ -211,7 +264,7 @@ class BlockStateRunner:
if future is None:
await maywait(callback()) # non-log callback
else:
log_events = await future
log_events = await future if config.parallel_logevent_queries else future
for log_event in log_events:
try:
parsed = event.process_log(log_event) if event is not None else log_event