runner yield() bugfix

This commit is contained in:
Tim Olson
2023-12-28 19:53:49 -04:00
parent 41ff02b9ba
commit 8a9813baed

View File

@@ -4,7 +4,6 @@ from asyncio import Queue
from datetime import datetime from datetime import datetime
from typing import Callable, Union, Any, Iterable from typing import Callable, Union, Any, Iterable
from sqlalchemy.sql.functions import current_timestamp
from web3.contract.contract import ContractEvents from web3.contract.contract import ContractEvents
from web3.exceptions import LogTopicError, MismatchedABI from web3.exceptions import LogTopicError, MismatchedABI
# noinspection PyPackageRequirements # noinspection PyPackageRequirements
@@ -118,6 +117,7 @@ class BlockStateRunner:
await self.queue.put(head["hash"]) await self.queue.put(head["hash"])
if not self.running: if not self.running:
break break
await async_yield()
except (ConnectionClosedError, TimeoutError): except (ConnectionClosedError, TimeoutError):
pass pass
finally: finally:
@@ -127,7 +127,7 @@ class BlockStateRunner:
await w3ws.provider.disconnect() await w3ws.provider.disconnect()
except Exception: except Exception:
pass pass
await async_yield() log.debug('yield')
log.debug('runner run_ws() exiting') log.debug('runner run_ws() exiting')
@@ -183,15 +183,17 @@ class BlockStateRunner:
while self.running: while self.running:
try: try:
async with asyncio.timeout(1): # check running flag every second async with asyncio.timeout(1): # check running flag every second
start = datetime.now()
head = await self.queue.get() head = await self.queue.get()
except TimeoutError: except TimeoutError:
log.debug('timeout in runner')
# 1 second has passed without a new head. Run the postprocess callbacks to check for activated time-based triggers # 1 second has passed without a new head. Run the postprocess callbacks to check for activated time-based triggers
if prev_head is not None: if prev_head is not None:
await self.handle_time_tick(head) await self.handle_time_tick(head)
else: else:
try: try:
log.debug('handle head...')
await self.handle_head(chain, head, w3) await self.handle_head(chain, head, w3)
log.debug('handled')
prev_head = head prev_head = head
except Exception as x: except Exception as x:
log.exception(x) log.exception(x)