block cache uses db; alembic upgrade

This commit is contained in:
tim
2024-07-19 18:58:29 -04:00
parent 95d94e8d60
commit 8389fd2078
6 changed files with 126 additions and 31 deletions

View File

@@ -0,0 +1,44 @@
"""BlockIndex
Revision ID: ee22683693a5
Revises: 516b55c83144
Create Date: 2024-07-19 18:52:04.933167
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
import dexorder.database
import dexorder.database.column_types
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = 'ee22683693a5'
down_revision: Union[str, None] = '516b55c83144'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('dbblock',
sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False),
sa.Column('hash', postgresql.BYTEA(), nullable=False),
sa.Column('height', sa.Integer(), nullable=False),
sa.Column('timestamp', sa.INTEGER(), nullable=False),
sa.Column('confirmed', sa.Boolean(), nullable=False),
sa.Column('data', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.PrimaryKeyConstraint('chain', 'hash')
)
op.create_index(op.f('ix_dbblock_height'), 'dbblock', ['height'], unique=False)
op.create_index(op.f('ix_dbblock_timestamp'), 'dbblock', ['timestamp'], unique=False)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_dbblock_timestamp'), table_name='dbblock')
op.drop_index(op.f('ix_dbblock_height'), table_name='dbblock')
op.drop_table('dbblock')
# ### end Alembic commands ###

View File

@@ -11,7 +11,7 @@ from dexorder.addrmeta import address_metadata
from dexorder.base.block import latest_block from dexorder.base.block import latest_block
from dexorder.base.chain import current_chain from dexorder.base.chain import current_chain
from dexorder.bin.executable import execute from dexorder.bin.executable import execute
from dexorder.blocks import get_block_timestamp, fetch_block_by_number from dexorder.blocks import get_block_timestamp, get_block
from dexorder.blockstate.fork import current_fork from dexorder.blockstate.fork import current_fork
from dexorder.configuration import parse_args from dexorder.configuration import parse_args
from dexorder.contract import get_contract_event from dexorder.contract import get_contract_event
@@ -48,7 +48,7 @@ async def flush_callback():
confirms = (config.confirms if config.confirms is not None else current_chain.get().confirms) - 1 confirms = (config.confirms if config.confirms is not None else current_chain.get().confirms) - 1
chain_id = current_chain.get().id chain_id = current_chain.get().id
fork = current_fork.get() fork = current_fork.get()
block = await fetch_block_by_number(fork.height, chain_id=chain_id) block = await get_block(fork.height, chain_id=chain_id)
time = from_timestamp(block.timestamp) time = from_timestamp(block.timestamp)
if latest_block[chain_id].height - fork.height <= 2*confirms: if latest_block[chain_id].height - fork.height <= 2*confirms:
log.info(f'forward filling to present time') log.info(f'forward filling to present time')

View File

@@ -11,21 +11,21 @@ from contextvars import ContextVar
from typing import Union, Optional from typing import Union, Optional
from cachetools import LRUCache from cachetools import LRUCache
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert
from dexorder import current_w3, config from dexorder import current_w3, config, db, Blockchain
from dexorder.base.block import Block, BlockInfo from dexorder.base.block import Block, BlockInfo, latest_block
from dexorder.base.chain import current_chain from dexorder.base.chain import current_chain
from dexorder.database.model import DbBlock
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
async def get_block_timestamp(blockid: Union[bytes,int], block_number: int = None) -> int: async def get_block_timestamp(block_id: Union[bytes,int]) -> int:
block = await (fetch_block_by_number(blockid) if type(blockid) is int else get_block(blockid)) block = await get_block(block_id)
if block is None: if block is None:
if block_number is not None: raise ValueError(f'Block {block_id} not found')
block = await fetch_block_by_number(block_number)
if block is None:
raise ValueError(f'Block {blockid} {block_number} not found')
return block.timestamp return block.timestamp
@@ -36,14 +36,31 @@ class FetchLock:
self.exception = None self.exception = None
async def _fetch(fetch: FetchLock, key: tuple[int, Union[int,bytes]]) -> Optional[Block]: async def _fetch(fetch: FetchLock, chain_id: int, block_id: Union[int,bytes]) -> Optional[Block]:
# fetch from RPC # try database first
chain_id, blockid = key if db:
try: chain = Blockchain.get(chain_id)
if type(blockid) is int: if type(block_id) is int:
fetch.result = await fetch_block_by_number(blockid, chain_id=chain_id) # by-number could return multiple results
# noinspection PyTypeChecker
blocks: list[DbBlock] = db.session.execute(select(DbBlock).where(
DbBlock.chain == chain,
DbBlock.height == block_id
)).all()
if len(blocks) > 1:
blocks = [b for b in blocks if b.confirmed]
found = blocks[0] if len(blocks) == 1 else None
else: else:
fetch.result = await fetch_block(blockid, chain_id=chain_id) found = db.session.get(DbBlock, (chain, block_id)) # by-hash is the primary key
if found:
return Block(chain_id, found.data)
# fetch from RPC
try:
if type(block_id) is int:
fetch.result = await fetch_block_by_number(block_id, chain_id=chain_id)
else:
fetch.result = await fetch_block(block_id, chain_id=chain_id)
return fetch.result return fetch.result
except Exception as e: except Exception as e:
fetch.exception = e fetch.exception = e
@@ -51,20 +68,25 @@ async def _fetch(fetch: FetchLock, key: tuple[int, Union[int,bytes]]) -> Optiona
fetch.lock.set() fetch.lock.set()
_lru = LRUCache[tuple[int, bytes], Block](maxsize=128) _lru = LRUCache[tuple[int, Union[int,bytes]], Block](maxsize=256)
_fetch_locks:dict[tuple[int, bytes], FetchLock] = {} _fetch_locks:dict[tuple[int, bytes], FetchLock] = {}
def cache_block(block: Block): def cache_block(block: Block, confirmed=False):
_lru[block.chain_id, block.hash] = block _lru[block.chain_id, block.hash] = block
_lru[block.chain_id, block.height] = block
if db:
db.session.execute(insert(DbBlock).values(
chain=block.chain_id, hash=block.hash, height=block.height, timestamp=block.timestamp,
confirmed=confirmed, data=block.data).on_conflict_do_nothing())
async def get_block(blockhash, *, chain_id=None) -> Block: async def get_block(block_id: Union[bytes,int], *, chain_id=None) -> Block:
if chain_id is None: if chain_id is None:
chain_id = current_chain.get().id chain_id = current_chain.get().id
key = chain_id, blockhash
# try LRU cache first key = chain_id, block_id
# try LRU cache synchronously first
try: try:
return _lru[key] return _lru[key]
except KeyError: except KeyError:
@@ -81,19 +103,24 @@ async def get_block(blockhash, *, chain_id=None) -> Block:
# otherwise initiate our own fetch # otherwise initiate our own fetch
fetch = _fetch_locks[key] = FetchLock() fetch = _fetch_locks[key] = FetchLock()
try: try:
return await _fetch(fetch, key) return await _fetch(fetch, chain_id, block_id)
finally: finally:
del _fetch_locks[key] del _fetch_locks[key]
async def fetch_block_by_number(height: int, *, chain_id=None) -> Block: async def fetch_block_by_number(height: int, *, chain_id=None) -> Block:
# todo roll into get_block()
# log.debug(f'fetch_block_by_number {height} {chain_id}') # log.debug(f'fetch_block_by_number {height} {chain_id}')
if chain_id is None: if chain_id is None:
chain_id = current_chain.get().id chain = current_chain.get()
chain_id = chain.id
else:
chain = Blockchain.get(chain_id)
response = await current_w3.get().provider.make_request('eth_getBlockByNumber', [hex(height), False]) response = await current_w3.get().provider.make_request('eth_getBlockByNumber', [hex(height), False])
# log.debug(f'fetch_block_by_number response {height} {chain_id} {response}') # log.debug(f'fetch_block_by_number response {height} {chain_id} {response}')
block = Block(chain_id, response['result']) block = Block(chain_id, response['result'])
cache_block(block) confirmed = height <= promotion_height(chain)
cache_block(block, confirmed)
return block return block
@@ -108,13 +135,17 @@ async def fetch_block(blockhash, *, chain_id=None) -> Optional[Block]:
log.debug(f'block {blockhash} not found') log.debug(f'block {blockhash} not found')
return None return None
block = Block(chain_id, blockdict) block = Block(chain_id, blockdict)
# if db:
# db.kv[block.db_key] = blockdict
cache_block(block) cache_block(block)
return block return block
def promotion_height(chain, latest_height): def promotion_height(chain: Blockchain=None, latest_height: int=None):
if chain is None:
chain = current_chain.get()
if latest_height is None:
latest_height = latest_block.get(chain.id)
if latest_height is None:
return 0
confirm_offset = config.confirms if config.confirms is not None else chain.confirms confirm_offset = config.confirms if config.confirms is not None else chain.confirms
return latest_height - confirm_offset return latest_height - confirm_offset

View File

@@ -2,7 +2,7 @@ from .base import Base
from .kv import KeyValue from .kv import KeyValue
from .series import SeriesSet, SeriesDict from .series import SeriesSet, SeriesDict
from .transaction import TransactionJob from .transaction import TransactionJob
from .dbblock import DbBlock
from .orderindex import OrderIndex from .orderindex import OrderIndex
from .pool import Pool from .pool import Pool
from .token import Token from .token import Token

View File

@@ -0,0 +1,20 @@
import logging
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from dexorder.base.block import BlockInfo
from dexorder.database.column import Blockchain, Bytes, Uint32
from dexorder.database.model import Base
log = logging.getLogger(__name__)
class DbBlock (Base):
table_name = 'block'
chain: Mapped[Blockchain] = mapped_column(primary_key=True)
hash: Mapped[Bytes] = mapped_column(primary_key=True)
height: Mapped[int] = mapped_column(index=True) # todo replace "height" with "number" projectwide
timestamp: Mapped[Uint32] = mapped_column(index=True)
confirmed: Mapped[bool] # true iff the block has been finalized as part of the main chain history
data: Mapped[BlockInfo] = mapped_column(JSONB)

View File

@@ -12,7 +12,7 @@ from dexorder import Blockchain, db, current_pub, async_yield, current_w3, confi
from dexorder.base.block import Block, latest_block from dexorder.base.block import Block, latest_block
from dexorder.base.chain import current_chain, current_clock, BlockClock from dexorder.base.chain import current_chain, current_clock, BlockClock
from dexorder.blockchain.connection import create_w3_ws, create_w3 from dexorder.blockchain.connection import create_w3_ws, create_w3
from dexorder.blocks import cache_block, get_block, promotion_height, fetch_block_by_number, current_block from dexorder.blocks import cache_block, get_block, promotion_height, current_block
from dexorder.blockstate import BlockState, current_blockstate from dexorder.blockstate import BlockState, current_blockstate
from dexorder.blockstate.branch import Branch from dexorder.blockstate.branch import Branch
from dexorder.blockstate.diff import DiffEntryItem from dexorder.blockstate.diff import DiffEntryItem
@@ -189,7 +189,7 @@ class BlockStateRunner(BlockProgressor):
# do not query more than the chain's batch size # do not query more than the chain's batch size
# do not query into the reorgable area. only query finalized data. # do not query into the reorgable area. only query finalized data.
height = min( start + chain.batch_size, block.height - chain.confirms) height = min( start + chain.batch_size, block.height - chain.confirms)
end_block = await fetch_block_by_number(height) end_block = await get_block(height)
branch = Branch(height, start, path=[end_block.hash]) # no parent branch = Branch(height, start, path=[end_block.hash]) # no parent
return self.state.add_branch(branch) return self.state.add_branch(branch)