diff --git a/dexorder-alpha.toml b/dexorder-alpha.toml index 32bd9f8..40d52c0 100644 --- a/dexorder-alpha.toml +++ b/dexorder-alpha.toml @@ -3,6 +3,7 @@ rpc_url='http://alpharpc:8545' ws_url='ws://alpharpc:8545' db_url='postgresql://dexorder:redroxed@postgres/dexorder' redis_url='redis://redis:6379' +polling=0.5 [deployments] 53261='alpha' diff --git a/src/dexorder/configuration/schema.py b/src/dexorder/configuration/schema.py index 5a9b565..741e6c0 100644 --- a/src/dexorder/configuration/schema.py +++ b/src/dexorder/configuration/schema.py @@ -16,6 +16,9 @@ class Config: dump_sql: bool = False redis_url: str = 'redis://localhost:6379' + parallel_logevent_queries: bool = True + polling: float = 0 # seconds between queries for a new block. 0 disables polling and uses a websocket subscription on ws_url instead + tokens: list['TokenConfig'] = field(default_factory=list) account: Optional[str] = None # may be a private key or an account alias diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index 4b0e742..1e62952 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -8,10 +8,10 @@ from web3.exceptions import LogTopicError, MismatchedABI # noinspection PyPackageRequirements from websockets.exceptions import ConnectionClosedError -from dexorder import Blockchain, db, current_pub, async_yield, current_w3 +from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config from dexorder.base.chain import current_chain from dexorder.base.fork import current_fork, Fork -from dexorder.blockchain.connection import create_w3_ws +from dexorder.blockchain.connection import create_w3_ws, create_w3 from dexorder.blockstate import BlockState, current_blockstate from dexorder.blockstate.diff import DiffEntryItem from dexorder.database.model import Block @@ -62,6 +62,9 @@ class BlockStateRunner: async def run(self): + return await (self.run_polling() if config.polling > 0 else self.run_ws()) + + async def run_ws(self): """ 1. load root stateBlockchain a. if no root, init from head @@ -97,6 +100,7 @@ class BlockStateRunner: while self.running: try: async with w3ws as w3ws: + log.debug('connecting to ws provider') await w3ws.provider.connect() subscription = await w3ws.eth.subscribe('newHeads') # the return value of this call is not consistent between anvil/hardhat/rpc. do not use it. log.debug(f'subscribed to newHeads {subscription}') @@ -104,7 +108,7 @@ class BlockStateRunner: async for message in w3ws.ws.listen_to_websocket(): head = message['result'] log.debug(f'detected new block {head["number"]} {hexstr(head["hash"])}') - await self.queue.put(head) + await self.queue.put(head["hash"]) if not self.running: break except (ConnectionClosedError, TimeoutError): @@ -117,10 +121,53 @@ class BlockStateRunner: except Exception: pass await async_yield() + log.debug('runner run_ws() exiting') + + + async def run_polling(self): + """ + Hardhat websocket stops sending messages after about 5 minutes. + https://github.com/NomicFoundation/hardhat/issues/2053 + So we must implement polling to work around their incompetence. + """ + + self.running = True + + w3 = create_w3() + chain_id = await w3.eth.chain_id + chain = Blockchain.for_id(chain_id) + current_chain.set(chain) + + setup_logevent_triggers(self) + _worker_task = asyncio.create_task(self.worker()) + prev_blockhash = None + + while self.running: + try: + new_blocks = await w3.eth.filter("latest") + for head in await new_blocks.get_new_entries(): + if head != prev_blockhash: + prev_blockhash = head + await self.queue.put(head) + log.debug(f'detected new block {hexstr(head)}') + if not self.running: + break + await asyncio.sleep(config.polling) + except ConnectionClosedError: + pass + finally: + # noinspection PyBroadException + try: + # noinspection PyUnresolvedReferences + await w3.provider.disconnect() + except Exception: + pass + await async_yield() + log.debug('runner run_polling() exiting') async def worker(self): - log.debug(f'runner worker started {self.running}') + log.debug(f'runner worker started') w3 = current_w3.get() chain = current_chain.get() assert chain.chain_id == await w3.eth.chain_id @@ -128,7 +175,7 @@ class BlockStateRunner: try: async with asyncio.timeout(1): # check running flag every second head = await self.queue.get() - log.debug(f'got head {hexstr(head["hash"])}') + log.debug(f'got head {hexstr(head)}') except TimeoutError: pass else: @@ -136,15 +183,15 @@ class BlockStateRunner: await self.handle_head(chain, head, w3) except Exception as x: log.exception(x) + log.debug('runner worker exiting') - async def handle_head(self, chain, head, w3): - log.debug(f'processing block {head["number"]} {hexstr(head["hash"])}') + async def handle_head(self, chain, blockhash, w3): + log.debug(f'processing block {hexstr(blockhash)}') chain_id = chain.chain_id session = None - blockhash = None try: - blockhash = hexstr(head["hash"]) + blockhash = hexstr(blockhash) if self.state is not None and blockhash in self.state.by_hash: return # block_data = await w3.eth.get_block(head['hash'], True) @@ -182,7 +229,10 @@ class BlockStateRunner: lf['fromBlock'] = from_height lf['toBlock'] = to_height log.debug(f'batch backfill {from_height} - {to_height}') - batches.append((w3.eth.get_logs(lf), callback, event, lf)) + get_logs = w3.eth.get_logs(lf) + if not config.parallel_logevent_queries: + get_logs = await get_logs + batches.append((get_logs, callback, event, lf)) from_height += chain.batch_size else: # event callbacks are triggered in the order in which they're registered. the events passed to @@ -194,7 +244,10 @@ class BlockStateRunner: # todo use head['logsBloom'] to skip unnecessary log queries lf = dict(log_filter) lf['blockHash'] = hexstr(block.hash) - batches.append((w3.eth.get_logs(lf), callback, event, log_filter)) + get_logs = w3.eth.get_logs(lf) + if not config.parallel_logevent_queries: + get_logs = await get_logs + batches.append((get_logs, callback, event, log_filter)) # set up for callbacks current_block.set(block) @@ -211,7 +264,7 @@ class BlockStateRunner: if future is None: await maywait(callback()) # non-log callback else: - log_events = await future + log_events = await future if config.parallel_logevent_queries else future for log_event in log_events: try: parsed = event.process_log(log_event) if event is not None else log_event