From a27300b5e4c88fcb04de5ce9da2ddc3c5ebeaa8f Mon Sep 17 00:00:00 2001 From: tim Date: Thu, 3 Apr 2025 18:15:16 -0400 Subject: [PATCH] info log for websocket connection drops --- src/dexorder/runner.py | 40 +++++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index 165c696..1cab05a 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -5,7 +5,6 @@ from datetime import timedelta from typing import Any, Iterable, Callable, Optional from eth_bloom import BloomFilter -# noinspection PyPackageRequirements from websockets.exceptions import ConnectionClosedError from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config, now, timestamp, metric @@ -81,8 +80,7 @@ class BlockStateRunner(BlockProgressor): async with w3ws as w3ws: log.debug('connecting to ws provider') await w3ws.provider.connect() - subscription = await w3ws.eth.subscribe('newHeads') # the return value of this call is not consistent between anvil/hardhat/rpc. - # log.debug(f'subscribed to newHeads {subscription}') + await w3ws.eth.subscribe('newHeads') # the return value of this call is not consistent between anvil/hardhat/rpc. while self.running: async for message in w3ws.ws.process_subscriptions(): block = Block(chain_id, message['result']) @@ -94,11 +92,15 @@ class BlockStateRunner(BlockProgressor): if not self.running: break await async_yield() - except (ConnectionClosedError, TimeoutError, asyncio.TimeoutError) as e: + except (TimeoutError, asyncio.TimeoutError) as e: log.debug(f'runner timeout {e}') + except ConnectionClosedError as e: + log.info(f'websocket connection closed {e}') except ConnectionRefusedError: log.warning(f'Could not connect to websocket {config.ws_url}') await asyncio.sleep(1) + except StopAsyncIteration: + log.info(f'websocket stream ended') except Exception: log.exception(f'Unhandled exception during run_ws()') finally: @@ -397,21 +399,21 @@ class BlockStateRunner(BlockProgressor): # propragate to the DB or Redis. # TIME TICKS ARE DISABLED FOR THIS REASON return - current_fork.set(fork) - session = db.session - session.begin() - try: - for callback, on_timer in self.callbacks: - if on_timer: - # noinspection PyCallingNonCallable - await maywait(callback()) - except BaseException: - session.rollback() - raise - else: - session.commit() - finally: - db.close_session() + # current_fork.set(fork) + # session = db.session + # session.begin() + # try: + # for callback, on_timer in self.callbacks: + # if on_timer: + # # noinspection PyCallingNonCallable + # await maywait(callback()) + # except BaseException: + # session.rollback() + # raise + # else: + # session.commit() + # finally: + # db.close_session() async def do_state_init_cbs(self):