walker exception catch

This commit is contained in:
Tim
2024-02-19 21:23:55 -04:00
parent ee2ef4c7ac
commit 50bfb09da7
2 changed files with 7 additions and 7 deletions

View File

@@ -1,18 +1,16 @@
import asyncio import asyncio
import logging import logging
import sys import sys
from datetime import datetime, timedelta from datetime import timedelta
from web3.types import EventData from web3.types import EventData
from dexorder import dec, from_timestamp, blockchain, config from dexorder import from_timestamp, blockchain, config
from dexorder.bin.executable import execute from dexorder.bin.executable import execute
from dexorder.blockstate import current_blockstate, BlockState
from dexorder.blocktime import get_block_timestamp from dexorder.blocktime import get_block_timestamp
from dexorder.configuration import parse_args from dexorder.configuration import parse_args
from dexorder.contract import get_contract_event from dexorder.contract import get_contract_event
from dexorder.database.model.block import current_block from dexorder.database.model.block import current_block
from dexorder.database.model.pool import PoolDict
from dexorder.ohlc import LightOHLCRepository from dexorder.ohlc import LightOHLCRepository
from dexorder.pools import get_uniswap_data from dexorder.pools import get_uniswap_data
from dexorder.util import hexstr from dexorder.util import hexstr
@@ -28,13 +26,12 @@ ohlcs = LightOHLCRepository()
async def handle_backfill_uniswap_swaps(swaps: list[EventData]): async def handle_backfill_uniswap_swaps(swaps: list[EventData]):
# asynchronously prefetch the block timestamps we'll need # asynchronously prefetch the block timestamps we'll need
hashes = set(swap['blockHash'] for swap in swaps) hashes = set(swap['blockHash'] for swap in swaps)
await asyncio.gather(*[get_block_timestamp(h) for h in hashes]) asyncio.gather(*[get_block_timestamp(h) for h in hashes]).add_done_callback(lambda _:None) # fire and forget, just to build cache
# now execute the swaps synchronously # now execute the swaps synchronously
for swap in swaps: for swap in swaps:
pool, time, price = await get_uniswap_data(swap) pool, time, price = await get_uniswap_data(swap)
ohlcs.light_update_all(pool['address'], time, price) ohlcs.light_update_all(pool['address'], time, price)
def flush_callback(): def flush_callback():
# start = now() # start = now()
# log.info("finalizing OHLC's") # log.info("finalizing OHLC's")
@@ -51,7 +48,7 @@ async def main():
if config.ohlc_dir is None: if config.ohlc_dir is None:
fatal('an ohlc_dir must be configured') fatal('an ohlc_dir must be configured')
await blockchain.connect() await blockchain.connect()
walker = BlockWalker(flush_callback, timedelta(seconds=5)) # todo flush every minute walker = BlockWalker(flush_callback, timedelta(minutes=5))
walker.add_event_trigger(handle_backfill_uniswap_swaps, walker.add_event_trigger(handle_backfill_uniswap_swaps,
get_contract_event('IUniswapV3PoolEvents', 'Swap'), multi=True) get_contract_event('IUniswapV3PoolEvents', 'Swap'), multi=True)
await walker.run() await walker.run()

View File

@@ -58,6 +58,7 @@ class BlockWalker (BlockProgressor):
session = db.session session = db.session
session.begin() session.begin()
while self.running: while self.running:
# noinspection PyBroadException
try: try:
latest_rawblock = await w3.eth.get_block('latest') latest_rawblock = await w3.eth.get_block('latest')
latest_height = latest_rawblock['number'] latest_height = latest_rawblock['number']
@@ -92,6 +93,8 @@ class BlockWalker (BlockProgressor):
if not self.running: if not self.running:
break break
await asyncio.sleep(config.polling) await asyncio.sleep(config.polling)
except Exception:
log.exception('Exception in walker loop')
finally: finally:
# anything that wasnt committed yet by the flush timer is discarded # anything that wasnt committed yet by the flush timer is discarded
# noinspection PyBroadException # noinspection PyBroadException