diff --git a/src/dexorder/blocks.py b/src/dexorder/blocks.py index 0deb470..aceb79c 100644 --- a/src/dexorder/blocks.py +++ b/src/dexorder/blocks.py @@ -31,7 +31,7 @@ async def get_block_timestamp(block_id: Union[bytes,int]) -> int: class FetchLock: def __init__(self): - self.lock = Event() + self.ready = Event() self.result = None self.exception = None @@ -64,8 +64,10 @@ async def _fetch(fetch: FetchLock, chain_id: int, block_id: Union[int,bytes]) -> return fetch.result except Exception as e: fetch.exception = e + fetch.result = None + raise finally: - fetch.lock.set() + fetch.ready.set() _lru = LRUCache[tuple[int, Union[int,bytes]], Block](maxsize=256) @@ -82,12 +84,14 @@ def cache_block(block: Block, confirmed=False): async def get_block(block_id: Union[bytes,int], *, chain_id=None) -> Block: + # log.debug(f'get_block {block_id}') if chain_id is None: chain_id = current_chain.get().id key = chain_id, block_id # try LRU cache synchronously first try: + # log.debug(f'\thit LRU') return _lru[key] except KeyError: pass @@ -95,29 +99,39 @@ async def get_block(block_id: Union[bytes,int], *, chain_id=None) -> Block: # check if another thread is already fetching fetch = _fetch_locks.get(key) if fetch is not None: - await fetch.lock.wait() + # log.debug(f'\tfound existing fetch') + await fetch.ready.wait() if fetch.exception is not None: raise fetch.exception return fetch.result + # log.debug(f'\tfetching') # otherwise initiate our own fetch fetch = _fetch_locks[key] = FetchLock() try: - return await _fetch(fetch, chain_id, block_id) + fetch.result = await _fetch(fetch, chain_id, block_id) + # log.debug(f'got fetch result {fetch.result}') + except Exception as e: + # log.exception('get_block exception') + fetch.exception = e + raise finally: + # log.debug(f'fetch.result {fetch.result}') del _fetch_locks[key] + # log.debug(f'\t{fetch.result}') + return fetch.result async def fetch_block_by_number(height: int, *, chain_id=None) -> Block: # todo roll into get_block() - # log.debug(f'fetch_block_by_number {height} {chain_id}') + # log.debug(f'fetch_block_by_number {chain_id} {height}') if chain_id is None: chain = current_chain.get() chain_id = chain.id else: chain = Blockchain.get(chain_id) response = await current_w3.get().provider.make_request('eth_getBlockByNumber', [hex(height), False]) - # log.debug(f'fetch_block_by_number response {height} {chain_id} {response}') + # log.debug(f'fetch_block_by_number response {chain_id} {height} {response}') block = Block(chain_id, response['result']) confirmed = height <= promotion_height(chain) cache_block(block, confirmed) @@ -143,7 +157,7 @@ def promotion_height(chain: Blockchain=None, latest_height: int=None): if chain is None: chain = current_chain.get() if latest_height is None: - latest_height = latest_block.get(chain.id) + latest_height = latest_block.get(chain.id).height if latest_height is None: return 0 confirm_offset = config.confirms if config.confirms is not None else chain.confirms diff --git a/src/dexorder/database/__init__.py b/src/dexorder/database/__init__.py index a15215a..8462c23 100644 --- a/src/dexorder/database/__init__.py +++ b/src/dexorder/database/__init__.py @@ -116,10 +116,10 @@ class Db: connection.execute(sqlalchemy.text("SET TIME ZONE 'UTC'")) result = connection.execute(sqlalchemy.text("select version_num from alembic_version")) for row in result: - log.info(f'{url} database revision {row[0]}') + log.info(f'database revision {row[0]}') _engine.set(engine) self.connected = True return self - raise Exception(f'{url} database version not found') + raise Exception(f'database version not found') db = Db() diff --git a/src/dexorder/walker.py b/src/dexorder/walker.py index 03577e0..967d0f0 100644 --- a/src/dexorder/walker.py +++ b/src/dexorder/walker.py @@ -5,13 +5,11 @@ from datetime import timedelta from typing import Union, Callable from dexorder import config, db, now, current_w3 +from dexorder.base.block import Block, BlockInfo, latest_block from dexorder.base.chain import current_chain from dexorder.blocks import promotion_height -from dexorder.blockstate import current_blockstate from dexorder.blockstate.branch import Branch from dexorder.blockstate.fork import Fork, current_fork -from dexorder.blockstate.state import FinalizedBlockState -from dexorder.base.block import Block, BlockInfo, latest_block from dexorder.progressor import BlockProgressor from dexorder.util.async_util import Maywaitable, maywait @@ -47,8 +45,6 @@ class BlockWalker (BlockProgressor): chain = current_chain.get() chain_id = chain.id batch_size = config.batch_size if config.batch_size is not None else chain.batch_size - state = FinalizedBlockState() - current_blockstate.set(state) kv_key = f'walker_height|{chain_id}|{self.name}' with db.session: