walker exception catch
This commit is contained in:
@@ -1,18 +1,16 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
import sys
|
import sys
|
||||||
from datetime import datetime, timedelta
|
from datetime import timedelta
|
||||||
|
|
||||||
from web3.types import EventData
|
from web3.types import EventData
|
||||||
|
|
||||||
from dexorder import dec, from_timestamp, blockchain, config
|
from dexorder import from_timestamp, blockchain, config
|
||||||
from dexorder.bin.executable import execute
|
from dexorder.bin.executable import execute
|
||||||
from dexorder.blockstate import current_blockstate, BlockState
|
|
||||||
from dexorder.blocktime import get_block_timestamp
|
from dexorder.blocktime import get_block_timestamp
|
||||||
from dexorder.configuration import parse_args
|
from dexorder.configuration import parse_args
|
||||||
from dexorder.contract import get_contract_event
|
from dexorder.contract import get_contract_event
|
||||||
from dexorder.database.model.block import current_block
|
from dexorder.database.model.block import current_block
|
||||||
from dexorder.database.model.pool import PoolDict
|
|
||||||
from dexorder.ohlc import LightOHLCRepository
|
from dexorder.ohlc import LightOHLCRepository
|
||||||
from dexorder.pools import get_uniswap_data
|
from dexorder.pools import get_uniswap_data
|
||||||
from dexorder.util import hexstr
|
from dexorder.util import hexstr
|
||||||
@@ -28,13 +26,12 @@ ohlcs = LightOHLCRepository()
|
|||||||
async def handle_backfill_uniswap_swaps(swaps: list[EventData]):
|
async def handle_backfill_uniswap_swaps(swaps: list[EventData]):
|
||||||
# asynchronously prefetch the block timestamps we'll need
|
# asynchronously prefetch the block timestamps we'll need
|
||||||
hashes = set(swap['blockHash'] for swap in swaps)
|
hashes = set(swap['blockHash'] for swap in swaps)
|
||||||
await asyncio.gather(*[get_block_timestamp(h) for h in hashes])
|
asyncio.gather(*[get_block_timestamp(h) for h in hashes]).add_done_callback(lambda _:None) # fire and forget, just to build cache
|
||||||
# now execute the swaps synchronously
|
# now execute the swaps synchronously
|
||||||
for swap in swaps:
|
for swap in swaps:
|
||||||
pool, time, price = await get_uniswap_data(swap)
|
pool, time, price = await get_uniswap_data(swap)
|
||||||
ohlcs.light_update_all(pool['address'], time, price)
|
ohlcs.light_update_all(pool['address'], time, price)
|
||||||
|
|
||||||
|
|
||||||
def flush_callback():
|
def flush_callback():
|
||||||
# start = now()
|
# start = now()
|
||||||
# log.info("finalizing OHLC's")
|
# log.info("finalizing OHLC's")
|
||||||
@@ -51,7 +48,7 @@ async def main():
|
|||||||
if config.ohlc_dir is None:
|
if config.ohlc_dir is None:
|
||||||
fatal('an ohlc_dir must be configured')
|
fatal('an ohlc_dir must be configured')
|
||||||
await blockchain.connect()
|
await blockchain.connect()
|
||||||
walker = BlockWalker(flush_callback, timedelta(seconds=5)) # todo flush every minute
|
walker = BlockWalker(flush_callback, timedelta(minutes=5))
|
||||||
walker.add_event_trigger(handle_backfill_uniswap_swaps,
|
walker.add_event_trigger(handle_backfill_uniswap_swaps,
|
||||||
get_contract_event('IUniswapV3PoolEvents', 'Swap'), multi=True)
|
get_contract_event('IUniswapV3PoolEvents', 'Swap'), multi=True)
|
||||||
await walker.run()
|
await walker.run()
|
||||||
|
|||||||
@@ -58,6 +58,7 @@ class BlockWalker (BlockProgressor):
|
|||||||
session = db.session
|
session = db.session
|
||||||
session.begin()
|
session.begin()
|
||||||
while self.running:
|
while self.running:
|
||||||
|
# noinspection PyBroadException
|
||||||
try:
|
try:
|
||||||
latest_rawblock = await w3.eth.get_block('latest')
|
latest_rawblock = await w3.eth.get_block('latest')
|
||||||
latest_height = latest_rawblock['number']
|
latest_height = latest_rawblock['number']
|
||||||
@@ -92,6 +93,8 @@ class BlockWalker (BlockProgressor):
|
|||||||
if not self.running:
|
if not self.running:
|
||||||
break
|
break
|
||||||
await asyncio.sleep(config.polling)
|
await asyncio.sleep(config.polling)
|
||||||
|
except Exception:
|
||||||
|
log.exception('Exception in walker loop')
|
||||||
finally:
|
finally:
|
||||||
# anything that wasnt committed yet by the flush timer is discarded
|
# anything that wasnt committed yet by the flush timer is discarded
|
||||||
# noinspection PyBroadException
|
# noinspection PyBroadException
|
||||||
|
|||||||
Reference in New Issue
Block a user