diff --git a/alembic/versions/ee22683693a5_blockindex.py b/alembic/versions/ee22683693a5_blockindex.py new file mode 100644 index 0000000..4e83a03 --- /dev/null +++ b/alembic/versions/ee22683693a5_blockindex.py @@ -0,0 +1,44 @@ +"""BlockIndex + +Revision ID: ee22683693a5 +Revises: 516b55c83144 +Create Date: 2024-07-19 18:52:04.933167 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +import dexorder.database +import dexorder.database.column_types +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = 'ee22683693a5' +down_revision: Union[str, None] = '516b55c83144' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('dbblock', + sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False), + sa.Column('hash', postgresql.BYTEA(), nullable=False), + sa.Column('height', sa.Integer(), nullable=False), + sa.Column('timestamp', sa.INTEGER(), nullable=False), + sa.Column('confirmed', sa.Boolean(), nullable=False), + sa.Column('data', postgresql.JSONB(astext_type=sa.Text()), nullable=False), + sa.PrimaryKeyConstraint('chain', 'hash') + ) + op.create_index(op.f('ix_dbblock_height'), 'dbblock', ['height'], unique=False) + op.create_index(op.f('ix_dbblock_timestamp'), 'dbblock', ['timestamp'], unique=False) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_dbblock_timestamp'), table_name='dbblock') + op.drop_index(op.f('ix_dbblock_height'), table_name='dbblock') + op.drop_table('dbblock') + # ### end Alembic commands ### diff --git a/src/dexorder/bin/finaldata.py b/src/dexorder/bin/finaldata.py index 9fb9f44..abf809b 100644 --- a/src/dexorder/bin/finaldata.py +++ b/src/dexorder/bin/finaldata.py @@ -11,7 +11,7 @@ from dexorder.addrmeta import address_metadata from dexorder.base.block import latest_block from dexorder.base.chain import current_chain from dexorder.bin.executable import execute -from dexorder.blocks import get_block_timestamp, fetch_block_by_number +from dexorder.blocks import get_block_timestamp, get_block from dexorder.blockstate.fork import current_fork from dexorder.configuration import parse_args from dexorder.contract import get_contract_event @@ -48,7 +48,7 @@ async def flush_callback(): confirms = (config.confirms if config.confirms is not None else current_chain.get().confirms) - 1 chain_id = current_chain.get().id fork = current_fork.get() - block = await fetch_block_by_number(fork.height, chain_id=chain_id) + block = await get_block(fork.height, chain_id=chain_id) time = from_timestamp(block.timestamp) if latest_block[chain_id].height - fork.height <= 2*confirms: log.info(f'forward filling to present time') diff --git a/src/dexorder/blocks.py b/src/dexorder/blocks.py index be88e44..ed46775 100644 --- a/src/dexorder/blocks.py +++ b/src/dexorder/blocks.py @@ -11,21 +11,21 @@ from contextvars import ContextVar from typing import Union, Optional from cachetools import LRUCache +from sqlalchemy import select +from sqlalchemy.dialects.postgresql import insert -from dexorder import current_w3, config -from dexorder.base.block import Block, BlockInfo +from dexorder import current_w3, config, db, Blockchain +from dexorder.base.block import Block, BlockInfo, latest_block from dexorder.base.chain import current_chain +from dexorder.database.model import DbBlock log = logging.getLogger(__name__) -async def get_block_timestamp(blockid: Union[bytes,int], block_number: int = None) -> int: - block = await (fetch_block_by_number(blockid) if type(blockid) is int else get_block(blockid)) +async def get_block_timestamp(block_id: Union[bytes,int]) -> int: + block = await get_block(block_id) if block is None: - if block_number is not None: - block = await fetch_block_by_number(block_number) - if block is None: - raise ValueError(f'Block {blockid} {block_number} not found') + raise ValueError(f'Block {block_id} not found') return block.timestamp @@ -36,14 +36,31 @@ class FetchLock: self.exception = None -async def _fetch(fetch: FetchLock, key: tuple[int, Union[int,bytes]]) -> Optional[Block]: - # fetch from RPC - chain_id, blockid = key - try: - if type(blockid) is int: - fetch.result = await fetch_block_by_number(blockid, chain_id=chain_id) +async def _fetch(fetch: FetchLock, chain_id: int, block_id: Union[int,bytes]) -> Optional[Block]: + # try database first + if db: + chain = Blockchain.get(chain_id) + if type(block_id) is int: + # by-number could return multiple results + # noinspection PyTypeChecker + blocks: list[DbBlock] = db.session.execute(select(DbBlock).where( + DbBlock.chain == chain, + DbBlock.height == block_id + )).all() + if len(blocks) > 1: + blocks = [b for b in blocks if b.confirmed] + found = blocks[0] if len(blocks) == 1 else None else: - fetch.result = await fetch_block(blockid, chain_id=chain_id) + found = db.session.get(DbBlock, (chain, block_id)) # by-hash is the primary key + if found: + return Block(chain_id, found.data) + + # fetch from RPC + try: + if type(block_id) is int: + fetch.result = await fetch_block_by_number(block_id, chain_id=chain_id) + else: + fetch.result = await fetch_block(block_id, chain_id=chain_id) return fetch.result except Exception as e: fetch.exception = e @@ -51,20 +68,25 @@ async def _fetch(fetch: FetchLock, key: tuple[int, Union[int,bytes]]) -> Optiona fetch.lock.set() -_lru = LRUCache[tuple[int, bytes], Block](maxsize=128) +_lru = LRUCache[tuple[int, Union[int,bytes]], Block](maxsize=256) _fetch_locks:dict[tuple[int, bytes], FetchLock] = {} -def cache_block(block: Block): +def cache_block(block: Block, confirmed=False): _lru[block.chain_id, block.hash] = block + _lru[block.chain_id, block.height] = block + if db: + db.session.execute(insert(DbBlock).values( + chain=block.chain_id, hash=block.hash, height=block.height, timestamp=block.timestamp, + confirmed=confirmed, data=block.data).on_conflict_do_nothing()) -async def get_block(blockhash, *, chain_id=None) -> Block: +async def get_block(block_id: Union[bytes,int], *, chain_id=None) -> Block: if chain_id is None: chain_id = current_chain.get().id - key = chain_id, blockhash - # try LRU cache first + key = chain_id, block_id + # try LRU cache synchronously first try: return _lru[key] except KeyError: @@ -81,19 +103,24 @@ async def get_block(blockhash, *, chain_id=None) -> Block: # otherwise initiate our own fetch fetch = _fetch_locks[key] = FetchLock() try: - return await _fetch(fetch, key) + return await _fetch(fetch, chain_id, block_id) finally: del _fetch_locks[key] async def fetch_block_by_number(height: int, *, chain_id=None) -> Block: + # todo roll into get_block() # log.debug(f'fetch_block_by_number {height} {chain_id}') if chain_id is None: - chain_id = current_chain.get().id + chain = current_chain.get() + chain_id = chain.id + else: + chain = Blockchain.get(chain_id) response = await current_w3.get().provider.make_request('eth_getBlockByNumber', [hex(height), False]) # log.debug(f'fetch_block_by_number response {height} {chain_id} {response}') block = Block(chain_id, response['result']) - cache_block(block) + confirmed = height <= promotion_height(chain) + cache_block(block, confirmed) return block @@ -108,13 +135,17 @@ async def fetch_block(blockhash, *, chain_id=None) -> Optional[Block]: log.debug(f'block {blockhash} not found') return None block = Block(chain_id, blockdict) - # if db: - # db.kv[block.db_key] = blockdict cache_block(block) return block -def promotion_height(chain, latest_height): +def promotion_height(chain: Blockchain=None, latest_height: int=None): + if chain is None: + chain = current_chain.get() + if latest_height is None: + latest_height = latest_block.get(chain.id) + if latest_height is None: + return 0 confirm_offset = config.confirms if config.confirms is not None else chain.confirms return latest_height - confirm_offset diff --git a/src/dexorder/database/model/__init__.py b/src/dexorder/database/model/__init__.py index 7858b49..6b33f78 100644 --- a/src/dexorder/database/model/__init__.py +++ b/src/dexorder/database/model/__init__.py @@ -2,7 +2,7 @@ from .base import Base from .kv import KeyValue from .series import SeriesSet, SeriesDict from .transaction import TransactionJob +from .dbblock import DbBlock from .orderindex import OrderIndex from .pool import Pool from .token import Token - diff --git a/src/dexorder/database/model/dbblock.py b/src/dexorder/database/model/dbblock.py new file mode 100644 index 0000000..3ff8e59 --- /dev/null +++ b/src/dexorder/database/model/dbblock.py @@ -0,0 +1,20 @@ +import logging + +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.orm import Mapped, mapped_column + +from dexorder.base.block import BlockInfo +from dexorder.database.column import Blockchain, Bytes, Uint32 +from dexorder.database.model import Base + +log = logging.getLogger(__name__) + + +class DbBlock (Base): + table_name = 'block' + chain: Mapped[Blockchain] = mapped_column(primary_key=True) + hash: Mapped[Bytes] = mapped_column(primary_key=True) + height: Mapped[int] = mapped_column(index=True) # todo replace "height" with "number" projectwide + timestamp: Mapped[Uint32] = mapped_column(index=True) + confirmed: Mapped[bool] # true iff the block has been finalized as part of the main chain history + data: Mapped[BlockInfo] = mapped_column(JSONB) diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index 13d41e1..702814f 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -12,7 +12,7 @@ from dexorder import Blockchain, db, current_pub, async_yield, current_w3, confi from dexorder.base.block import Block, latest_block from dexorder.base.chain import current_chain, current_clock, BlockClock from dexorder.blockchain.connection import create_w3_ws, create_w3 -from dexorder.blocks import cache_block, get_block, promotion_height, fetch_block_by_number, current_block +from dexorder.blocks import cache_block, get_block, promotion_height, current_block from dexorder.blockstate import BlockState, current_blockstate from dexorder.blockstate.branch import Branch from dexorder.blockstate.diff import DiffEntryItem @@ -189,7 +189,7 @@ class BlockStateRunner(BlockProgressor): # do not query more than the chain's batch size # do not query into the reorgable area. only query finalized data. height = min( start + chain.batch_size, block.height - chain.confirms) - end_block = await fetch_block_by_number(height) + end_block = await get_block(height) branch = Branch(height, start, path=[end_block.hash]) # no parent return self.state.add_branch(branch)