block cache bugfixes

This commit is contained in:
tim
2024-07-19 17:13:47 -04:00
parent e7ec80fdd8
commit 95d94e8d60
4 changed files with 56 additions and 83 deletions

View File

@@ -0,0 +1,20 @@
# https://docs.python.org/3/library/logging.config.html#logging-config-dictschema
version=1
[loggers.'']
level='INFO'
handlers=['console',]
[loggers.dexorder]
level='DEBUG'
[handlers.console]
class='logging.StreamHandler'
formatter='notime'
stream='ext://sys.stdout'
[formatters.notime]
# https://docs.python.org/3/library/logging.html#logrecord-attributes
format='%(levelname)s %(name)s %(message)s'
# https://docs.python.org/3/library/time.html#time.strftime
datefmt='%Y-%m-%d %H:%M:%S'

View File

@@ -6,8 +6,9 @@ Use `await get_block()` to retreive a Block from a given hash using the full cac
Use `await fetch_block()` to force an RPC query for the Block, adding that block to the LRU cache.
"""
import logging
from asyncio import Event
from contextvars import ContextVar
from typing import Union, Optional, Awaitable
from typing import Union, Optional
from cachetools import LRUCache
@@ -28,17 +29,30 @@ async def get_block_timestamp(blockid: Union[bytes,int], block_number: int = Non
return block.timestamp
async def _fetch(key: tuple[int, Union[int,bytes]]) -> Optional[Block]:
class FetchLock:
def __init__(self):
self.lock = Event()
self.result = None
self.exception = None
async def _fetch(fetch: FetchLock, key: tuple[int, Union[int,bytes]]) -> Optional[Block]:
# fetch from RPC
chain_id, blockid = key
# log.debug(f'block cache miss; fetching {chain_id} {blockid}')
if type(blockid) is int:
return await fetch_block_by_number(blockid, chain_id=chain_id)
else:
return await fetch_block(blockid, chain_id=chain_id)
try:
if type(blockid) is int:
fetch.result = await fetch_block_by_number(blockid, chain_id=chain_id)
else:
fetch.result = await fetch_block(blockid, chain_id=chain_id)
return fetch.result
except Exception as e:
fetch.exception = e
finally:
fetch.lock.set()
_lru = LRUCache[tuple[int, bytes], Block](maxsize=128)
_fetches:dict[tuple[int, bytes], Awaitable[Block]] = {}
_fetch_locks:dict[tuple[int, bytes], FetchLock] = {}
def cache_block(block: Block):
@@ -49,20 +63,27 @@ async def get_block(blockhash, *, chain_id=None) -> Block:
if chain_id is None:
chain_id = current_chain.get().id
key = chain_id, blockhash
# try LRU cache first
try:
return _lru[key]
except KeyError:
pass
# check if another thread is already fetching
fetch = _fetches.get(key)
fetch = _fetch_locks.get(key)
if fetch is not None:
return await fetch
await fetch.lock.wait()
if fetch.exception is not None:
raise fetch.exception
return fetch.result
# otherwise initiate our own fetch
fetch = _fetches[key] = _fetch(key)
result = await fetch
del _fetches[key]
return result
fetch = _fetch_locks[key] = FetchLock()
try:
return await _fetch(fetch, key)
finally:
del _fetch_locks[key]
async def fetch_block_by_number(height: int, *, chain_id=None) -> Block:
@@ -76,7 +97,7 @@ async def fetch_block_by_number(height: int, *, chain_id=None) -> Block:
return block
async def fetch_block(blockhash, *, chain_id=None):
async def fetch_block(blockhash, *, chain_id=None) -> Optional[Block]:
# log.debug(f'fetch_block {blockhash} {chain_id}')
if chain_id is None:
chain_id = current_chain.get().id

View File

@@ -1,68 +0,0 @@
import asyncio
import logging
from abc import abstractmethod
from asyncio import Event
from typing import TypeVar, Generic, Awaitable, Callable, Optional, Any
from dexorder import NARG
log = logging.getLogger(__name__)
K = TypeVar('K')
V = TypeVar('V')
###
### NOT TESTED AND NOT USED
###
class AsyncDict (Generic[K,V]):
"""
Implements per-key locks around accessing dictionary values.
Either supply fetch and store functions in the constructor, or override those methods in a subclass.
fetch(key,default) takes two arguments and when a key is missing, it may either return the default value explicitly
or raise KeyError, in which case the call wrapper will return the default value.
"""
def __init__(self,
fetch: Callable[[K,V], Awaitable[V]] = None,
store: Callable[[K,V], Awaitable[Any]] = None,
):
self._queries: dict[K, tuple[bool,Awaitable]] = {} # bool indicates if it's a write (True) or a read (False)
if fetch is not None:
self.fetch = fetch
if store is not None:
self.store = store
async def get(self, key: K, default: V = NARG) -> V:
found = self._queries.get(key)
if found is not None:
write, query = found
result = await query
if not write:
return result
# either there was no query or it was a write query that's over
query = self.fetch(key, default)
self._queries[key] = False, query
return await query
async def set(self, key: K, value: V):
found = self._queries.get(key)
if found is not None:
write, query = found
await query
query = self.store(key, value)
self._queries[key] = True, query
await query
# noinspection PyMethodMayBeStatic,PyUnusedLocal
@abstractmethod
async def fetch(self, key: K, default: V = NARG) -> V:
raise NotImplementedError
# noinspection PyMethodMayBeStatic,PyUnusedLocal
@abstractmethod
async def store(self, key: K, value: V) -> V:
"""
Must return the value that was just set.
"""
raise NotImplementedError