This commit is contained in:
tim
2024-07-26 01:27:05 -04:00
parent cafea2155b
commit e61eec73f6
3 changed files with 24 additions and 14 deletions

View File

@@ -31,7 +31,7 @@ async def get_block_timestamp(block_id: Union[bytes,int]) -> int:
class FetchLock: class FetchLock:
def __init__(self): def __init__(self):
self.lock = Event() self.ready = Event()
self.result = None self.result = None
self.exception = None self.exception = None
@@ -64,8 +64,10 @@ async def _fetch(fetch: FetchLock, chain_id: int, block_id: Union[int,bytes]) ->
return fetch.result return fetch.result
except Exception as e: except Exception as e:
fetch.exception = e fetch.exception = e
fetch.result = None
raise
finally: finally:
fetch.lock.set() fetch.ready.set()
_lru = LRUCache[tuple[int, Union[int,bytes]], Block](maxsize=256) _lru = LRUCache[tuple[int, Union[int,bytes]], Block](maxsize=256)
@@ -82,12 +84,14 @@ def cache_block(block: Block, confirmed=False):
async def get_block(block_id: Union[bytes,int], *, chain_id=None) -> Block: async def get_block(block_id: Union[bytes,int], *, chain_id=None) -> Block:
# log.debug(f'get_block {block_id}')
if chain_id is None: if chain_id is None:
chain_id = current_chain.get().id chain_id = current_chain.get().id
key = chain_id, block_id key = chain_id, block_id
# try LRU cache synchronously first # try LRU cache synchronously first
try: try:
# log.debug(f'\thit LRU')
return _lru[key] return _lru[key]
except KeyError: except KeyError:
pass pass
@@ -95,29 +99,39 @@ async def get_block(block_id: Union[bytes,int], *, chain_id=None) -> Block:
# check if another thread is already fetching # check if another thread is already fetching
fetch = _fetch_locks.get(key) fetch = _fetch_locks.get(key)
if fetch is not None: if fetch is not None:
await fetch.lock.wait() # log.debug(f'\tfound existing fetch')
await fetch.ready.wait()
if fetch.exception is not None: if fetch.exception is not None:
raise fetch.exception raise fetch.exception
return fetch.result return fetch.result
# log.debug(f'\tfetching')
# otherwise initiate our own fetch # otherwise initiate our own fetch
fetch = _fetch_locks[key] = FetchLock() fetch = _fetch_locks[key] = FetchLock()
try: try:
return await _fetch(fetch, chain_id, block_id) fetch.result = await _fetch(fetch, chain_id, block_id)
# log.debug(f'got fetch result {fetch.result}')
except Exception as e:
# log.exception('get_block exception')
fetch.exception = e
raise
finally: finally:
# log.debug(f'fetch.result {fetch.result}')
del _fetch_locks[key] del _fetch_locks[key]
# log.debug(f'\t{fetch.result}')
return fetch.result
async def fetch_block_by_number(height: int, *, chain_id=None) -> Block: async def fetch_block_by_number(height: int, *, chain_id=None) -> Block:
# todo roll into get_block() # todo roll into get_block()
# log.debug(f'fetch_block_by_number {height} {chain_id}') # log.debug(f'fetch_block_by_number {chain_id} {height}')
if chain_id is None: if chain_id is None:
chain = current_chain.get() chain = current_chain.get()
chain_id = chain.id chain_id = chain.id
else: else:
chain = Blockchain.get(chain_id) chain = Blockchain.get(chain_id)
response = await current_w3.get().provider.make_request('eth_getBlockByNumber', [hex(height), False]) response = await current_w3.get().provider.make_request('eth_getBlockByNumber', [hex(height), False])
# log.debug(f'fetch_block_by_number response {height} {chain_id} {response}') # log.debug(f'fetch_block_by_number response {chain_id} {height} {response}')
block = Block(chain_id, response['result']) block = Block(chain_id, response['result'])
confirmed = height <= promotion_height(chain) confirmed = height <= promotion_height(chain)
cache_block(block, confirmed) cache_block(block, confirmed)
@@ -143,7 +157,7 @@ def promotion_height(chain: Blockchain=None, latest_height: int=None):
if chain is None: if chain is None:
chain = current_chain.get() chain = current_chain.get()
if latest_height is None: if latest_height is None:
latest_height = latest_block.get(chain.id) latest_height = latest_block.get(chain.id).height
if latest_height is None: if latest_height is None:
return 0 return 0
confirm_offset = config.confirms if config.confirms is not None else chain.confirms confirm_offset = config.confirms if config.confirms is not None else chain.confirms

View File

@@ -116,10 +116,10 @@ class Db:
connection.execute(sqlalchemy.text("SET TIME ZONE 'UTC'")) connection.execute(sqlalchemy.text("SET TIME ZONE 'UTC'"))
result = connection.execute(sqlalchemy.text("select version_num from alembic_version")) result = connection.execute(sqlalchemy.text("select version_num from alembic_version"))
for row in result: for row in result:
log.info(f'{url} database revision {row[0]}') log.info(f'database revision {row[0]}')
_engine.set(engine) _engine.set(engine)
self.connected = True self.connected = True
return self return self
raise Exception(f'{url} database version not found') raise Exception(f'database version not found')
db = Db() db = Db()

View File

@@ -5,13 +5,11 @@ from datetime import timedelta
from typing import Union, Callable from typing import Union, Callable
from dexorder import config, db, now, current_w3 from dexorder import config, db, now, current_w3
from dexorder.base.block import Block, BlockInfo, latest_block
from dexorder.base.chain import current_chain from dexorder.base.chain import current_chain
from dexorder.blocks import promotion_height from dexorder.blocks import promotion_height
from dexorder.blockstate import current_blockstate
from dexorder.blockstate.branch import Branch from dexorder.blockstate.branch import Branch
from dexorder.blockstate.fork import Fork, current_fork from dexorder.blockstate.fork import Fork, current_fork
from dexorder.blockstate.state import FinalizedBlockState
from dexorder.base.block import Block, BlockInfo, latest_block
from dexorder.progressor import BlockProgressor from dexorder.progressor import BlockProgressor
from dexorder.util.async_util import Maywaitable, maywait from dexorder.util.async_util import Maywaitable, maywait
@@ -47,8 +45,6 @@ class BlockWalker (BlockProgressor):
chain = current_chain.get() chain = current_chain.get()
chain_id = chain.id chain_id = chain.id
batch_size = config.batch_size if config.batch_size is not None else chain.batch_size batch_size = config.batch_size if config.batch_size is not None else chain.batch_size
state = FinalizedBlockState()
current_blockstate.set(state)
kv_key = f'walker_height|{chain_id}|{self.name}' kv_key = f'walker_height|{chain_id}|{self.name}'
with db.session: with db.session: