From cb7cfe00e82ead8580dac28cacd5c035d592b0e2 Mon Sep 17 00:00:00 2001 From: Tim Date: Mon, 19 Feb 2024 21:23:55 -0400 Subject: [PATCH] walker exception catch --- src/dexorder/bin/finaldata.py | 11 ++++------- src/dexorder/walker.py | 3 +++ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/dexorder/bin/finaldata.py b/src/dexorder/bin/finaldata.py index 9707454..7975b47 100644 --- a/src/dexorder/bin/finaldata.py +++ b/src/dexorder/bin/finaldata.py @@ -1,18 +1,16 @@ import asyncio import logging import sys -from datetime import datetime, timedelta +from datetime import timedelta from web3.types import EventData -from dexorder import dec, from_timestamp, blockchain, config +from dexorder import from_timestamp, blockchain, config from dexorder.bin.executable import execute -from dexorder.blockstate import current_blockstate, BlockState from dexorder.blocktime import get_block_timestamp from dexorder.configuration import parse_args from dexorder.contract import get_contract_event from dexorder.database.model.block import current_block -from dexorder.database.model.pool import PoolDict from dexorder.ohlc import LightOHLCRepository from dexorder.pools import get_uniswap_data from dexorder.util import hexstr @@ -28,13 +26,12 @@ ohlcs = LightOHLCRepository() async def handle_backfill_uniswap_swaps(swaps: list[EventData]): # asynchronously prefetch the block timestamps we'll need hashes = set(swap['blockHash'] for swap in swaps) - await asyncio.gather(*[get_block_timestamp(h) for h in hashes]) + asyncio.gather(*[get_block_timestamp(h) for h in hashes]).add_done_callback(lambda _:None) # fire and forget, just to build cache # now execute the swaps synchronously for swap in swaps: pool, time, price = await get_uniswap_data(swap) ohlcs.light_update_all(pool['address'], time, price) - def flush_callback(): # start = now() # log.info("finalizing OHLC's") @@ -51,7 +48,7 @@ async def main(): if config.ohlc_dir is None: fatal('an ohlc_dir must be configured') await blockchain.connect() - walker = BlockWalker(flush_callback, timedelta(seconds=5)) # todo flush every minute + walker = BlockWalker(flush_callback, timedelta(minutes=5)) walker.add_event_trigger(handle_backfill_uniswap_swaps, get_contract_event('IUniswapV3PoolEvents', 'Swap'), multi=True) await walker.run() diff --git a/src/dexorder/walker.py b/src/dexorder/walker.py index d2ffb23..088f054 100644 --- a/src/dexorder/walker.py +++ b/src/dexorder/walker.py @@ -58,6 +58,7 @@ class BlockWalker (BlockProgressor): session = db.session session.begin() while self.running: + # noinspection PyBroadException try: latest_rawblock = await w3.eth.get_block('latest') latest_height = latest_rawblock['number'] @@ -92,6 +93,8 @@ class BlockWalker (BlockProgressor): if not self.running: break await asyncio.sleep(config.polling) + except Exception: + log.exception('Exception in walker loop') finally: # anything that wasnt committed yet by the flush timer is discarded # noinspection PyBroadException