info log for websocket connection drops

This commit is contained in:
tim
2025-04-03 18:15:16 -04:00
parent f3faaa3dd6
commit a27300b5e4

View File

@@ -5,7 +5,6 @@ from datetime import timedelta
from typing import Any, Iterable, Callable, Optional
from eth_bloom import BloomFilter
# noinspection PyPackageRequirements
from websockets.exceptions import ConnectionClosedError
from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config, now, timestamp, metric
@@ -81,8 +80,7 @@ class BlockStateRunner(BlockProgressor):
async with w3ws as w3ws:
log.debug('connecting to ws provider')
await w3ws.provider.connect()
subscription = await w3ws.eth.subscribe('newHeads') # the return value of this call is not consistent between anvil/hardhat/rpc.
# log.debug(f'subscribed to newHeads {subscription}')
await w3ws.eth.subscribe('newHeads') # the return value of this call is not consistent between anvil/hardhat/rpc.
while self.running:
async for message in w3ws.ws.process_subscriptions():
block = Block(chain_id, message['result'])
@@ -94,11 +92,15 @@ class BlockStateRunner(BlockProgressor):
if not self.running:
break
await async_yield()
except (ConnectionClosedError, TimeoutError, asyncio.TimeoutError) as e:
except (TimeoutError, asyncio.TimeoutError) as e:
log.debug(f'runner timeout {e}')
except ConnectionClosedError as e:
log.info(f'websocket connection closed {e}')
except ConnectionRefusedError:
log.warning(f'Could not connect to websocket {config.ws_url}')
await asyncio.sleep(1)
except StopAsyncIteration:
log.info(f'websocket stream ended')
except Exception:
log.exception(f'Unhandled exception during run_ws()')
finally:
@@ -397,21 +399,21 @@ class BlockStateRunner(BlockProgressor):
# propragate to the DB or Redis.
# TIME TICKS ARE DISABLED FOR THIS REASON
return
current_fork.set(fork)
session = db.session
session.begin()
try:
for callback, on_timer in self.callbacks:
if on_timer:
# noinspection PyCallingNonCallable
await maywait(callback())
except BaseException:
session.rollback()
raise
else:
session.commit()
finally:
db.close_session()
# current_fork.set(fork)
# session = db.session
# session.begin()
# try:
# for callback, on_timer in self.callbacks:
# if on_timer:
# # noinspection PyCallingNonCallable
# await maywait(callback())
# except BaseException:
# session.rollback()
# raise
# else:
# session.commit()
# finally:
# db.close_session()
async def do_state_init_cbs(self):