diff --git a/conf/dexorder-finaldata.toml b/conf/finaldata/dexorder-finaldata.toml similarity index 100% rename from conf/dexorder-finaldata.toml rename to conf/finaldata/dexorder-finaldata.toml diff --git a/conf/finaldata/logging-finaldata.toml b/conf/finaldata/logging-finaldata.toml new file mode 100644 index 0000000..376ed2b --- /dev/null +++ b/conf/finaldata/logging-finaldata.toml @@ -0,0 +1,20 @@ +# https://docs.python.org/3/library/logging.config.html#logging-config-dictschema +version=1 + +[loggers.''] +level='INFO' +handlers=['console',] + +[loggers.dexorder] +level='DEBUG' + +[handlers.console] +class='logging.StreamHandler' +formatter='notime' +stream='ext://sys.stdout' + +[formatters.notime] +# https://docs.python.org/3/library/logging.html#logrecord-attributes +format='%(levelname)s %(name)s %(message)s' +# https://docs.python.org/3/library/time.html#time.strftime +datefmt='%Y-%m-%d %H:%M:%S' diff --git a/src/dexorder/blocks.py b/src/dexorder/blocks.py index e788ce5..be88e44 100644 --- a/src/dexorder/blocks.py +++ b/src/dexorder/blocks.py @@ -6,8 +6,9 @@ Use `await get_block()` to retreive a Block from a given hash using the full cac Use `await fetch_block()` to force an RPC query for the Block, adding that block to the LRU cache. """ import logging +from asyncio import Event from contextvars import ContextVar -from typing import Union, Optional, Awaitable +from typing import Union, Optional from cachetools import LRUCache @@ -28,17 +29,30 @@ async def get_block_timestamp(blockid: Union[bytes,int], block_number: int = Non return block.timestamp -async def _fetch(key: tuple[int, Union[int,bytes]]) -> Optional[Block]: +class FetchLock: + def __init__(self): + self.lock = Event() + self.result = None + self.exception = None + + +async def _fetch(fetch: FetchLock, key: tuple[int, Union[int,bytes]]) -> Optional[Block]: # fetch from RPC chain_id, blockid = key - # log.debug(f'block cache miss; fetching {chain_id} {blockid}') - if type(blockid) is int: - return await fetch_block_by_number(blockid, chain_id=chain_id) - else: - return await fetch_block(blockid, chain_id=chain_id) + try: + if type(blockid) is int: + fetch.result = await fetch_block_by_number(blockid, chain_id=chain_id) + else: + fetch.result = await fetch_block(blockid, chain_id=chain_id) + return fetch.result + except Exception as e: + fetch.exception = e + finally: + fetch.lock.set() + _lru = LRUCache[tuple[int, bytes], Block](maxsize=128) -_fetches:dict[tuple[int, bytes], Awaitable[Block]] = {} +_fetch_locks:dict[tuple[int, bytes], FetchLock] = {} def cache_block(block: Block): @@ -49,20 +63,27 @@ async def get_block(blockhash, *, chain_id=None) -> Block: if chain_id is None: chain_id = current_chain.get().id key = chain_id, blockhash + # try LRU cache first try: return _lru[key] except KeyError: pass + # check if another thread is already fetching - fetch = _fetches.get(key) + fetch = _fetch_locks.get(key) if fetch is not None: - return await fetch + await fetch.lock.wait() + if fetch.exception is not None: + raise fetch.exception + return fetch.result + # otherwise initiate our own fetch - fetch = _fetches[key] = _fetch(key) - result = await fetch - del _fetches[key] - return result + fetch = _fetch_locks[key] = FetchLock() + try: + return await _fetch(fetch, key) + finally: + del _fetch_locks[key] async def fetch_block_by_number(height: int, *, chain_id=None) -> Block: @@ -76,7 +97,7 @@ async def fetch_block_by_number(height: int, *, chain_id=None) -> Block: return block -async def fetch_block(blockhash, *, chain_id=None): +async def fetch_block(blockhash, *, chain_id=None) -> Optional[Block]: # log.debug(f'fetch_block {blockhash} {chain_id}') if chain_id is None: chain_id = current_chain.get().id diff --git a/src/dexorder/util/async_dict.py b/src/dexorder/util/async_dict.py deleted file mode 100644 index 1c43621..0000000 --- a/src/dexorder/util/async_dict.py +++ /dev/null @@ -1,68 +0,0 @@ -import asyncio -import logging -from abc import abstractmethod -from asyncio import Event -from typing import TypeVar, Generic, Awaitable, Callable, Optional, Any - -from dexorder import NARG - -log = logging.getLogger(__name__) - -K = TypeVar('K') -V = TypeVar('V') - - -### -### NOT TESTED AND NOT USED -### - -class AsyncDict (Generic[K,V]): - """ - Implements per-key locks around accessing dictionary values. - Either supply fetch and store functions in the constructor, or override those methods in a subclass. - fetch(key,default) takes two arguments and when a key is missing, it may either return the default value explicitly - or raise KeyError, in which case the call wrapper will return the default value. - """ - def __init__(self, - fetch: Callable[[K,V], Awaitable[V]] = None, - store: Callable[[K,V], Awaitable[Any]] = None, - ): - self._queries: dict[K, tuple[bool,Awaitable]] = {} # bool indicates if it's a write (True) or a read (False) - if fetch is not None: - self.fetch = fetch - if store is not None: - self.store = store - - async def get(self, key: K, default: V = NARG) -> V: - found = self._queries.get(key) - if found is not None: - write, query = found - result = await query - if not write: - return result - # either there was no query or it was a write query that's over - query = self.fetch(key, default) - self._queries[key] = False, query - return await query - - async def set(self, key: K, value: V): - found = self._queries.get(key) - if found is not None: - write, query = found - await query - query = self.store(key, value) - self._queries[key] = True, query - await query - - # noinspection PyMethodMayBeStatic,PyUnusedLocal - @abstractmethod - async def fetch(self, key: K, default: V = NARG) -> V: - raise NotImplementedError - - # noinspection PyMethodMayBeStatic,PyUnusedLocal - @abstractmethod - async def store(self, key: K, value: V) -> V: - """ - Must return the value that was just set. - """ - raise NotImplementedError