From 7acc51a6523a6b9cb191b01014412e9f061422bf Mon Sep 17 00:00:00 2001 From: Tim Date: Wed, 3 Apr 2024 00:03:01 -0400 Subject: [PATCH] blockstate rework, not debugged --- ...hema.py => 516b55c83144_initial_schema.py} | 117 +++-- src/dexorder/base/block.py | 66 +++ src/dexorder/base/chain.py | 6 +- src/dexorder/base/fork.py | 76 ---- src/dexorder/base/orderlib.py | 4 +- src/dexorder/bin/backfill_ohlc.py | 13 +- src/dexorder/bin/block_for_time.py | 8 +- src/dexorder/bin/finaldata.py | 19 +- src/dexorder/bin/main.py | 16 +- src/dexorder/bin/mirror.py | 2 +- src/dexorder/bin/tokenlist_metadata.py | 29 -- src/dexorder/blockchain/by_blockchain.py | 16 +- src/dexorder/blocks.py | 72 ++- src/dexorder/blockstate/blockdata.py | 17 +- src/dexorder/blockstate/branch.py | 66 +++ src/dexorder/blockstate/db_state.py | 45 +- src/dexorder/blockstate/diff.py | 4 +- src/dexorder/blockstate/fork.py | 37 ++ src/dexorder/blockstate/state.py | 410 ++++++++++-------- src/dexorder/configuration/standard_tokens.py | 8 - src/dexorder/contract/__init__.py | 2 +- src/dexorder/contract/contract_proxy.py | 8 +- src/dexorder/contract/decimals.py | 3 +- src/dexorder/contract/dexorder.py | 2 +- src/dexorder/database/column_types.py | 14 +- src/dexorder/database/model/__init__.py | 4 +- src/dexorder/database/model/block.py | 36 -- src/dexorder/database/model/pool.py | 2 +- src/dexorder/event_handler.py | 18 +- src/dexorder/memcache/memcache_state.py | 22 +- src/dexorder/metadata.py | 4 +- src/dexorder/ohlc.py | 40 +- src/dexorder/order/orderstate.py | 5 +- src/dexorder/pools.py | 4 +- src/dexorder/progressor.py | 4 +- src/dexorder/runner.py | 400 ++++++++--------- src/dexorder/tokens.py | 3 +- src/dexorder/transaction.py | 4 +- src/dexorder/uniswap.py | 2 +- src/dexorder/util/__init__.py | 17 +- src/dexorder/util/async_dict.py | 80 ++++ src/dexorder/util/async_util.py | 3 +- src/dexorder/util/json.py | 1 - src/dexorder/util/lru.py | 59 +++ src/dexorder/util/tick_math.py | 24 - src/dexorder/vault_blockdata.py | 2 +- src/dexorder/walker.py | 27 +- test/test_blockstate.py | 175 +++++--- 48 files changed, 1126 insertions(+), 870 deletions(-) rename alembic/versions/{db62e7db828d_initial_schema.py => 516b55c83144_initial_schema.py} (76%) create mode 100644 src/dexorder/base/block.py delete mode 100644 src/dexorder/base/fork.py delete mode 100644 src/dexorder/bin/tokenlist_metadata.py create mode 100644 src/dexorder/blockstate/branch.py create mode 100644 src/dexorder/blockstate/fork.py delete mode 100644 src/dexorder/configuration/standard_tokens.py delete mode 100644 src/dexorder/database/model/block.py create mode 100644 src/dexorder/util/async_dict.py create mode 100644 src/dexorder/util/lru.py delete mode 100644 src/dexorder/util/tick_math.py diff --git a/alembic/versions/db62e7db828d_initial_schema.py b/alembic/versions/516b55c83144_initial_schema.py similarity index 76% rename from alembic/versions/db62e7db828d_initial_schema.py rename to alembic/versions/516b55c83144_initial_schema.py index d6f32d7..1b6f6d0 100644 --- a/alembic/versions/db62e7db828d_initial_schema.py +++ b/alembic/versions/516b55c83144_initial_schema.py @@ -1,9 +1,9 @@ -""" -initial schema +"""initial_schema -Revision ID: db62e7db828d +Revision ID: 516b55c83144 Revises: -Create Date: 2023-09-28 23:04:41.020644 +Create Date: 2024-04-02 22:52:44.614707 + """ from typing import Sequence, Union @@ -14,75 +14,26 @@ import dexorder.database.column_types from sqlalchemy.dialects import postgresql # revision identifiers, used by Alembic. -revision: str = 'db62e7db828d' +revision: str = '516b55c83144' down_revision: Union[str, None] = None branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None def upgrade() -> None: - op.create_table('block', - sa.Column('chain', sa.Integer(), nullable=False), - sa.Column('height', sa.Integer(), nullable=False), - sa.Column('hash', sa.LargeBinary(), nullable=False), - sa.Column('parent', sa.LargeBinary(), nullable=False), - sa.Column('data', postgresql.JSONB(astext_type=sa.Text()), nullable=False), - sa.PrimaryKeyConstraint('chain', 'height', 'hash') - ) + # ### commands auto generated by Alembic - please adjust! ### op.create_table('keyvalue', sa.Column('key', sa.String(), nullable=False), sa.Column('value', postgresql.JSONB(astext_type=sa.Text()), nullable=True), sa.PrimaryKeyConstraint('key') ) - op.create_table('seriesdict', - sa.Column('value', sa.String(), nullable=False), - sa.Column('chain', sa.Integer(), nullable=False), - sa.Column('series', sa.String(), nullable=False), - sa.Column('key', sa.String(), nullable=False), - sa.PrimaryKeyConstraint('chain', 'series', 'key') - ) - op.create_table('seriesset', - sa.Column('chain', sa.Integer(), nullable=False), - sa.Column('series', sa.String(), nullable=False), - sa.Column('key', sa.String(), nullable=False), - sa.PrimaryKeyConstraint('chain', 'series', 'key') - ) - op.create_table('transactionjob', - sa.Column('id', sa.UUID(), nullable=False), - sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False), - sa.Column('height', sa.Integer(), nullable=False), - # sa.Column('state', sa.Enum(name='transactionjobstate'), nullable=False), - sa.Column('state', sa.Enum('Requested', 'Signed', 'Sent', 'Mined', name='transactionjobstate'), nullable=False), - sa.Column('request', dexorder.database.column_types.DataclassDictBase(astext_type=sa.Text()), nullable=False), - sa.PrimaryKeyConstraint('id') - ) - op.create_index(op.f('ix_transactionjob_chain'), 'transactionjob', ['chain'], unique=False) - op.create_index(op.f('ix_transactionjob_height'), 'transactionjob', ['height'], unique=False) - op.create_index(op.f('ix_transactionjob_state'), 'transactionjob', ['state'], unique=False) - op.create_table('tx', - sa.Column('id', postgresql.BYTEA(), nullable=False), - sa.Column('data', postgresql.BYTEA(), nullable=False), - sa.Column('job_id', sa.UUID(), nullable=False), - sa.Column('receipt', postgresql.JSONB(astext_type=sa.Text()), nullable=True), - sa.ForeignKeyConstraint(['job_id'], ['transactionjob.id'], ), - sa.PrimaryKeyConstraint('id') - ) op.create_table('orderindex', sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False), sa.Column('vault', sa.String(), nullable=False), sa.Column('order_index', sa.Integer(), nullable=False), - sa.Column('state', sa.Enum('Open', 'Canceled', 'Filled', 'Expired', 'Underfunded', name='swaporderstate'), nullable=False), + sa.Column('state', sa.Enum('Unknown', 'Signing', 'Underfunded', 'Open', 'Canceled', 'Expired', 'Filled', name='swaporderstate'), nullable=False), sa.PrimaryKeyConstraint('chain', 'vault', 'order_index') ) - op.create_table('token', - sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False), - sa.Column('address', dexorder.database.column_types.Address(), nullable=False), - sa.Column('name', sa.String(), nullable=False), - sa.Column('symbol', sa.String(), nullable=False), - sa.Column('decimals', sa.SMALLINT(), nullable=False), - sa.Column('approved', sa.Boolean(), nullable=False), - sa.PrimaryKeyConstraint('chain', 'address') - ) op.create_table('pool', sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False), sa.Column('address', dexorder.database.column_types.Address(), nullable=False), @@ -95,20 +46,64 @@ def upgrade() -> None: ) op.create_index(op.f('ix_pool_base'), 'pool', ['base'], unique=False) op.create_index(op.f('ix_pool_quote'), 'pool', ['quote'], unique=False) + op.create_table('seriesdict', + sa.Column('value', sa.String(), nullable=False), + sa.Column('chain', sa.Integer(), nullable=False), + sa.Column('series', sa.String(), nullable=False), + sa.Column('key', sa.String(), nullable=False), + sa.PrimaryKeyConstraint('chain', 'series', 'key') + ) + op.create_table('seriesset', + sa.Column('chain', sa.Integer(), nullable=False), + sa.Column('series', sa.String(), nullable=False), + sa.Column('key', sa.String(), nullable=False), + sa.PrimaryKeyConstraint('chain', 'series', 'key') + ) + op.create_table('token', + sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False), + sa.Column('address', dexorder.database.column_types.Address(), nullable=False), + sa.Column('name', sa.String(), nullable=False), + sa.Column('symbol', sa.String(), nullable=False), + sa.Column('decimals', sa.SMALLINT(), nullable=False), + sa.Column('approved', sa.Boolean(), nullable=False), + sa.PrimaryKeyConstraint('chain', 'address') + ) + op.create_table('transactionjob', + sa.Column('id', sa.UUID(), nullable=False), + sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False), + sa.Column('height', sa.Integer(), nullable=False), + sa.Column('state', sa.Enum('Requested', 'Signed', 'Sent', 'Mined', name='transactionjobstate'), nullable=False), + sa.Column('request', dexorder.database.column_types.DataclassDictBase(astext_type=sa.Text()), nullable=False), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_transactionjob_chain'), 'transactionjob', ['chain'], unique=False) + op.create_index(op.f('ix_transactionjob_height'), 'transactionjob', ['height'], unique=False) + op.create_index(op.f('ix_transactionjob_state'), 'transactionjob', ['state'], unique=False) + op.create_table('tx', + sa.Column('id', postgresql.BYTEA(), nullable=False), + sa.Column('data', postgresql.BYTEA(), nullable=False), + sa.Column('job_id', sa.UUID(), nullable=False), + sa.Column('receipt', postgresql.JSONB(astext_type=sa.Text()), nullable=True), + sa.ForeignKeyConstraint(['job_id'], ['transactionjob.id'], ), + sa.PrimaryKeyConstraint('id') + ) + # ### end Alembic commands ### def downgrade() -> None: + op.drop_table('tx') + op.drop_index(op.f('ix_transactionjob_state'), table_name='transactionjob') + op.drop_index(op.f('ix_transactionjob_height'), table_name='transactionjob') + op.drop_index(op.f('ix_transactionjob_chain'), table_name='transactionjob') + op.drop_table('transactionjob') op.drop_table('token') + op.drop_table('seriesset') + op.drop_table('seriesdict') op.drop_index(op.f('ix_pool_quote'), table_name='pool') op.drop_index(op.f('ix_pool_base'), table_name='pool') op.drop_table('pool') op.drop_table('orderindex') - op.drop_table('seriesset') - op.drop_table('seriesdict') op.drop_table('keyvalue') - op.drop_table('block') - op.drop_table('tx') - op.drop_table('transactionjob') op.execute('drop type swaporderstate') # enum type op.execute('drop type transactionjobstate') # enum type op.execute('drop type exchange') # enum type diff --git a/src/dexorder/base/block.py b/src/dexorder/base/block.py new file mode 100644 index 0000000..6a18364 --- /dev/null +++ b/src/dexorder/base/block.py @@ -0,0 +1,66 @@ +import logging +from typing import TypedDict, Literal + +from dexorder.util import hexbytes, hexint, hexstr + +log = logging.getLogger(__name__) + + +class BigNumber (TypedDict): + type: Literal['BigNumber'] + hex: str + +class BlockInfo (TypedDict): + number: int + hash: bytes + parentHash: bytes + nonce: bytes + sha3Uncles: bytes + logsBloom: bytes + transactionsRoot: bytes + stateRoot: bytes + receiptsRoot: bytes + miner: str + difficulty: int + totalDifficulty: int + extraData: bytes + size: int + gasLimit: BigNumber + gasUsed: BigNumber + timestamp: int + transactions: list[bytes] + uncles: list[bytes] + mixHash: bytes + baseFeePerGas: int + + +def block_db_key(chain_id: int, blockhash: bytes): + return f'Block|{chain_id}|{hexstr(blockhash)}' + + +class Block: + + def __init__(self, chain_id: int, blockdata: BlockInfo): + self.chain_id = chain_id + self.hash = hexbytes(blockdata['hash']) + # noinspection PyTypeChecker + self.height = hexint(blockdata['number']) + self.timestamp = hexint(blockdata['timestamp']) + self.parent = hexbytes(blockdata['parentHash']) + self.data = blockdata + + @property + def db_key(self): + return block_db_key(self.chain_id, self.hash) + + def __str__(self): + return f'{self.height}_{hexstr(self.hash)[2:7]}' + + def __hash__(self): + return hash(self.hash) # blockhashes should be unique even across chains + + def __eq__(self, other): + return self.hash == other.hash and self.chain_id == other.chain_id + + +latest_block: dict[int,Block] = {} # most recent discovered block but maybe not the currently processing one, indexed by chain_id diff --git a/src/dexorder/base/chain.py b/src/dexorder/base/chain.py index a9a62b2..df8580f 100644 --- a/src/dexorder/base/chain.py +++ b/src/dexorder/base/chain.py @@ -26,7 +26,7 @@ class Blockchain: confirms is the number of blocks until a block can be considered finalized and unforkable batch_size is the number of blocks to fetch per logs query """ - self.chain_id = chain_id + self.id = chain_id self.name = name self.confirms = confirms self.batch_size = batch_size @@ -48,10 +48,10 @@ Polygon = Blockchain(137, 'Polygon') # POS not zkEVM Mumbai = Blockchain(80001, 'Mumbai') BSC = Blockchain(56, 'BSC') Arbitrum = Blockchain(42161, 'Arbitrum', 3, batch_size=2000) -Mock = Blockchain(31337, 'Mock', 3, batch_size=10000) +Mock = Blockchain(31337, 'Mock', 3, batch_size=2000) Alpha = Blockchain(1337, 'Dexorder Alpha', 3, batch_size=1000) -current_chain = ContextVar[Blockchain]('current_chain') +current_chain = ContextVar[Blockchain]('current_chain', default=Mock) class BlockClock: diff --git a/src/dexorder/base/fork.py b/src/dexorder/base/fork.py deleted file mode 100644 index a66c770..0000000 --- a/src/dexorder/base/fork.py +++ /dev/null @@ -1,76 +0,0 @@ -import logging -from contextvars import ContextVar -from typing import Iterable, Optional - -from dexorder.database.model import Block - -log = logging.getLogger(__name__) - - -class Fork: - """ - A Fork is an ancestor path, stored as block hashes in reverse-chronological order from the "current" block at the start to ancestors at the end. The - getitem [] operator indexes by block height for positive values, while negative value are relative to the latest block, so [-1] is the latest - block and [-2] is its parent, etc. Any blocks older than the tail of the fork are considered finalized and may be referenced by height. - """ - - def __init__(self, ancestry: Iterable[bytes], *, height: int): - self.ancestry = list(ancestry) - self.height = height - self.disjoint = False - - def __contains__(self, item): - """ - item can be a Block or another Fork. returns True iff the given item appears on this fork. if item is ahead of this fork - or a cousin chain, returns False - """ - index = self.height - item.height # index is reverse chronological in order to index our ancentry list - if index < 0: # item is ahead of us in height - return False - if index >= len(self.ancestry): # item is older than this fork - return True # consider old blocks settled and on this fork - return self.ancestry[index] == item.hash - - @property - def hash(self): - return self.ancestry[0] - - @property - def parent(self): - return self.ancestry[1] if len(self.ancestry) > 1 else None - - def for_height(self, height): - """ returns a new Fork object for an older block along this fork. used for root promotion. """ - if height > self.height : - raise ValueError - if height <= self.height - len(self.ancestry): - return None - return Fork(self.ancestry[self.height-height:], height=height) - - def __str__(self): - return f'{self.height}_[{"->".join(h.hex() for h in self.ancestry)}]' - - -class DisjointFork: - """ - duck type of Fork for blocks that connect directly to root with a parent gap in-between. these forks are associated with backfill. - """ - def __init__(self, block: Block, root: Block): - self.height = block.height - self.hash = block.hash - self.parent = root.hash - self.disjoint = True - self.root = root - - def __contains__(self, item): - if item.height > self.height: - return False # item is in the future - if item.height < self.root.height: - return True # item is ancient - return item.hash in (self.hash, self.parent) - - def __str__(self): - return f'{self.height}_[{self.hash.hex()}->{self.parent.hash.hex()}]' - - -current_fork = ContextVar[Optional[Fork]]('current_fork', default=None) diff --git a/src/dexorder/base/orderlib.py b/src/dexorder/base/orderlib.py index f80433f..fb1fc59 100644 --- a/src/dexorder/base/orderlib.py +++ b/src/dexorder/base/orderlib.py @@ -47,12 +47,10 @@ class SwapOrder: chainOrder: int tranches: list['Tranche'] - state: SwapOrderState # this is not in the blockchain orderstatus: it's a computed and cached field. - @staticmethod def load(obj): return SwapOrder(obj[0], obj[1], Route.load(obj[2]), int(obj[3]), int(obj[4]), obj[5], obj[6], obj[7], - [Tranche.load(t) for t in obj[8]], SwapOrderState.Unknown) + [Tranche.load(t) for t in obj[8]]) def dump(self): return (self.tokenIn, self.tokenOut, self.route.dump(), str(self.amount), str(self.minFillAmount), self.amountIsInput, diff --git a/src/dexorder/bin/backfill_ohlc.py b/src/dexorder/bin/backfill_ohlc.py index 62f54ca..065b73e 100644 --- a/src/dexorder/bin/backfill_ohlc.py +++ b/src/dexorder/bin/backfill_ohlc.py @@ -3,29 +3,30 @@ import sys from asyncio import CancelledError from typing import Union, Reversible -from dexorder import blockchain, config, from_timestamp, now +from dexorder import blockchain, config, from_timestamp from dexorder.bin.executable import execute +from dexorder.blocks import get_block_timestamp from dexorder.blockstate import DiffItem from dexorder.blockstate.blockdata import BlockData from dexorder.blockstate.db_state import DbState from dexorder.blockstate.diff import DiffEntryItem +from dexorder.blockstate.fork import Fork from dexorder.configuration import parse_args from dexorder.contract import get_contract_event from dexorder.database import db -from dexorder.database.model import Block from dexorder.event_handler import check_ohlc_rollover, handle_uniswap_swaps from dexorder.memcache import memcache from dexorder.memcache.memcache_state import RedisState, publish_all from dexorder.ohlc import recent_ohlcs, ohlc_save, ohlcs from dexorder.runner import BlockStateRunner -from dexorder.util import hexstr log = logging.getLogger('dexorder.backfill') -def finalize_callback(block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]): - ohlc_save(block, diffs) - log.info(f'backfill completed through block {block.height} {from_timestamp(block.timestamp):%Y-%m-%d %H:%M:%S} {hexstr(block.hash)}') +async def finalize_callback(fork: Fork, diffs: Reversible[Union[DiffItem, DiffEntryItem]]): + ohlc_save(diffs) + ts = await get_block_timestamp(fork.head) + log.info(f'backfill completed through block {fork.height} {from_timestamp(ts):%Y-%m-%d %H:%M:%S}') # noinspection DuplicatedCode diff --git a/src/dexorder/bin/block_for_time.py b/src/dexorder/bin/block_for_time.py index bae3845..9720484 100644 --- a/src/dexorder/bin/block_for_time.py +++ b/src/dexorder/bin/block_for_time.py @@ -7,7 +7,7 @@ from dexorder.base.chain import current_chain from dexorder.bin.executable import execute from dateutil.parser import parse as parse_date -from dexorder.database.model import Block +from dexorder.base.block import Block log = logging.getLogger(__name__) @@ -15,9 +15,9 @@ log = logging.getLogger(__name__) async def main(): log.debug(f'Finding block nearest to {time}') w3 = await blockchain.connect() - chain_id = current_chain.get().chain_id + chain_id = current_chain.get().id blockdata = await w3.eth.get_block('latest') - latest = cur = Block.from_data(chain_id, blockdata) + latest = cur = Block(chain_id, blockdata) while True: cur_time = from_timestamp(cur.timestamp) delta = (time - cur_time).total_seconds() @@ -28,7 +28,7 @@ async def main(): elif estimated == cur.height: print(f'Closest block to {time}: {cur.height} {cur_time}') exit(0) - cur = Block.from_data(chain_id, await w3.eth.get_block(estimated)) + cur = Block(chain_id, await w3.eth.get_block(estimated)) if __name__ == '__main__': if len(sys.argv) < 3: diff --git a/src/dexorder/bin/finaldata.py b/src/dexorder/bin/finaldata.py index 60bd5cc..d81a9d1 100644 --- a/src/dexorder/bin/finaldata.py +++ b/src/dexorder/bin/finaldata.py @@ -7,12 +7,13 @@ from web3.types import EventData from dexorder import from_timestamp, blockchain, config from dexorder.addrmeta import address_metadata +from dexorder.base.block import latest_block from dexorder.base.chain import current_chain from dexorder.bin.executable import execute -from dexorder.blocks import get_block_timestamp +from dexorder.blocks import get_block_timestamp, get_block_by_number +from dexorder.blockstate.fork import current_fork from dexorder.configuration import parse_args from dexorder.contract import get_contract_event -from dexorder.database.model.block import current_block, latest_block from dexorder.ohlc import FinalOHLCRepository from dexorder.pools import get_uniswap_data from dexorder.util import hexstr @@ -37,21 +38,23 @@ async def handle_backfill_uniswap_swaps(swaps: list[EventData]): # log.debug(f'OHLC {pool["address"]} {time} {price}') ohlcs.light_update_all(pool['address'], time, price) -def flush_callback(): +async def flush_callback(): # start = now() # log.info("finalizing OHLC's") # log.info(f'\ttook {(now() - start).total_seconds():.1f} seconds') - block = current_block.get() confirms = (config.confirms if config.confirms is not None else current_chain.get().confirms) - 1 - if latest_block.get().height - block.height <= 2*confirms: + chain_id = current_chain.get().id + fork = current_fork.get() + block = await get_block_by_number(fork.height, chain_id=chain_id) + time = from_timestamp(block.timestamp) + if latest_block[chain_id].height - fork.height <= 2*confirms: log.info(f'forward filling to present time') for addr, data in address_metadata.items(): if data['type'] == 'Pool' and data['exchange'] >= 0: - ohlcs.light_update_all(addr, from_timestamp(block.timestamp), None) + ohlcs.light_update_all(addr, time, None) log.info("flushing OHLC's") ohlcs.flush() - log.info(f'backfill completed through block {block.height} ' - f'{from_timestamp(block.timestamp):%Y-%m-%d %H:%M:%S} {hexstr(block.hash)}') + log.info(f'backfill completed through block {block.height} {time:%Y-%m-%d %H:%M:%S} {hexstr(block.hash)}') async def main(): logging.basicConfig(level=logging.INFO, stream=sys.stdout) diff --git a/src/dexorder/bin/main.py b/src/dexorder/bin/main.py index 84a56c0..dad57a5 100644 --- a/src/dexorder/bin/main.py +++ b/src/dexorder/bin/main.py @@ -1,7 +1,7 @@ import logging from asyncio import CancelledError -from dexorder import db, blockchain, config +from dexorder import db, blockchain from dexorder.base.chain import current_chain from dexorder.bin.executable import execute from dexorder.blockstate.blockdata import BlockData @@ -14,9 +14,8 @@ from dexorder.event_handler import init_order_triggers, init, dump_log, handle_v process_active_tranches, process_execution_requests, check_ohlc_rollover, handle_uniswap_swaps from dexorder.memcache import memcache from dexorder.memcache.memcache_state import RedisState, publish_all -from dexorder.ohlc import ohlc_save from dexorder.runner import BlockStateRunner -from dexorder.transaction import handle_transaction_receipts, create_transactions, send_transactions +from dexorder.transaction import handle_transaction_receipts log = logging.getLogger('dexorder') LOG_ALL_EVENTS = False # for debug todo config @@ -80,18 +79,19 @@ async def main(): db.connect() db_state = DbState(BlockData.by_opt('db')) with db.session: - state = db_state.load() + state = await db_state.load() if state is None: log.info('no state in database') else: if redis_state: - await redis_state.init(state) - log.info(f'loaded state from db for root block {state.root_block}') + await redis_state.init(state, state.root_fork) + log.info(f'loaded state from db for root block {state.root_branch.height}') runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None) setup_logevent_triggers(runner) - if config.ohlc_dir: - runner.on_promotion.append(ohlc_save) + # OHLC printing hard-disabled for main. Use the finaldata process. + # if config.ohlc_dir: + # runner.on_promotion.append(ohlc_save) if db: runner.on_state_init.append(init_order_triggers) # noinspection PyUnboundLocalVariable diff --git a/src/dexorder/bin/mirror.py b/src/dexorder/bin/mirror.py index 99f3cf3..b85e33c 100644 --- a/src/dexorder/bin/mirror.py +++ b/src/dexorder/bin/mirror.py @@ -110,7 +110,7 @@ async def write_metadata( pools, mirror_pools ): last_prices = {} -async def complete_update(mirrorenv, pool, price, tx): +async def complete_update(_mirrorenv, pool, price, tx): await tx.wait() last_prices[pool] = price log.debug(f'Mirrored {pool} {price}') diff --git a/src/dexorder/bin/tokenlist_metadata.py b/src/dexorder/bin/tokenlist_metadata.py deleted file mode 100644 index 26d1d86..0000000 --- a/src/dexorder/bin/tokenlist_metadata.py +++ /dev/null @@ -1,29 +0,0 @@ -# Prints a JSON string to stdout containing metadata information for all the known tokens and pools -# -# see metadata.py - -import logging -import sys - -from sqlalchemy import select - -from dexorder import db -from dexorder.configuration import parse_args -from dexorder.database.model import Pool, Token -from dexorder.metadata import generate_metadata - -log = logging.getLogger(__name__) - - -def main(): - logging.basicConfig(level=logging.INFO, stream=sys.stderr) - log.setLevel(logging.DEBUG) - parse_args() - db.connect(migrate=False) - tokens = db.session.scalars(select(Token)) - pools = db.session.scalars(select(Pool)) - generate_metadata(tokens, pools) - - -if __name__ == '__main__': - main() diff --git a/src/dexorder/blockchain/by_blockchain.py b/src/dexorder/blockchain/by_blockchain.py index c7b6006..2e9bdbd 100644 --- a/src/dexorder/blockchain/by_blockchain.py +++ b/src/dexorder/blockchain/by_blockchain.py @@ -19,20 +19,20 @@ class ByBlockchainCollection (Generic[_T]): self.by_blockchain = by_blockchain if by_blockchain is not None else {} def __getitem__(self, item) -> _T: - return self.by_blockchain[current_chain.get().chain_id][item] + return self.by_blockchain[current_chain.get().id][item] class ByBlockchainDict (ByBlockchainCollection[_T], Generic[_T]): def __getattr__(self, name: str) -> _T: - return self.by_blockchain[current_chain.get().chain_id][name] + return self.by_blockchain[current_chain.get().id][name] def get(self, item, default=None, *, chain_id=None) -> _T: # will raise if default is NARG if chain_id is None: - chain_id = current_chain.get().chain_id + chain_id = current_chain.get().id if chain_id is None: - raise KeyError('no ctx.chain_id set') + raise KeyError('no current_chain set') found = self.by_blockchain.get(chain_id, {}).get(item, default) if found is NARG: raise KeyError @@ -41,16 +41,16 @@ class ByBlockchainDict (ByBlockchainCollection[_T], Generic[_T]): class ByBlockchainList (ByBlockchainCollection[_T], Generic[_T]): def __iter__(self) -> Iterator[_T]: - return iter(self.by_blockchain[current_chain.get().chain_id]) + return iter(self.by_blockchain[current_chain.get().id]) def iter(self, *, chain_id=None) -> Iterator[_T]: if chain_id is None: - chain_id = current_chain.get().chain_id + chain_id = current_chain.get().id return iter(self.by_blockchain[chain_id]) def get(self, index, *, chain_id=None) -> _T: if chain_id is None: - chain_id = current_chain.get().chain_id + chain_id = current_chain.get().id if chain_id is None: - raise KeyError('no ctx.chain_id set') + raise KeyError('no current_chain set') return self.by_blockchain[chain_id][index] diff --git a/src/dexorder/blocks.py b/src/dexorder/blocks.py index 01172a4..07fa87a 100644 --- a/src/dexorder/blocks.py +++ b/src/dexorder/blocks.py @@ -1,12 +1,19 @@ +""" +Blocks are stored locally in an LRU cache and queried via RPC lazily. + +Use `await get_block()` to retreive a Block from a given hash using the full caching mechanism. + +Use `await fetch_block()` to force an RPC query for the Block, adding that block to the LRU cache. +""" import logging +from typing import Union -from async_lru import alru_cache +from cachetools import LRUCache -from dexorder import current_w3 +from dexorder import current_w3, NARG, config +from dexorder.base.block import Block, BlockInfo from dexorder.base.chain import current_chain -from dexorder.blockstate import current_blockstate -from dexorder.database.model import Block -from dexorder.util import hexint +from dexorder.util.async_dict import AsyncDict log = logging.getLogger(__name__) @@ -16,13 +23,54 @@ async def get_block_timestamp(blockhash) -> int: return block.timestamp -@alru_cache(maxsize=128) -async def get_block(blockhash) -> Block: - # first look in the state +async def _cache_fetch(key: tuple[int, bytes], default: Union[Block, NARG]) -> Block: + assert default is NARG + # try LRU cache first try: - return current_blockstate.get().by_hash[blockhash] - except (LookupError, KeyError): + return _lru[key] + except KeyError: pass - # otherwise query + # fetch from RPC + chain_id, blockhash = key + result = await fetch_block(blockhash, chain_id=chain_id) + _lru[key] = result + return result + +_lru = LRUCache[tuple[int, bytes], Block](maxsize=128) +_cache = AsyncDict[tuple[int, bytes], Block](fetch=_cache_fetch) + + +def cache_block(block: Block): + _lru[block.chain_id, block.hash] = block + + +async def get_block(blockhash, *, chain_id=None) -> Block: + if chain_id is None: + chain_id = current_chain.get().id + return await _cache.get((chain_id, blockhash)) + + +async def get_block_by_number(height: int, *, chain_id=None) -> Block: + if chain_id is None: + chain_id = current_chain.get().id + response = await current_w3.get().provider.make_request('eth_getBlockByNumber', [height, False]) + block = Block(chain_id, response['result']) + cache_block(block) + return block + + +async def fetch_block(blockhash, *, chain_id=None): + if chain_id is None: + chain_id = current_chain.get().id response = await current_w3.get().provider.make_request('eth_getBlockByHash', [blockhash, False]) - return Block.from_data(current_chain.get().chain_id, response['result']) + blockdict: BlockInfo = response['result'] + block = Block(chain_id, blockdict) + # if db: + # db.kv[block.db_key] = blockdict + cache_block(block) + return block + + +def promotion_height(chain, latest_height): + confirm_offset = (config.confirms if config.confirms is not None else chain.confirms) - 1 + return latest_height - confirm_offset diff --git a/src/dexorder/blockstate/blockdata.py b/src/dexorder/blockstate/blockdata.py index 6b7ad7a..c5cd7b3 100644 --- a/src/dexorder/blockstate/blockdata.py +++ b/src/dexorder/blockstate/blockdata.py @@ -1,11 +1,10 @@ -import copy import json import logging from enum import Enum -from typing import TypeVar, Generic, Iterable, Union, Any, Iterator, Callable +from typing import TypeVar, Generic, Iterable, Union, Any, Iterator, Callable, Optional from dexorder import NARG, DELETE -from dexorder.base.fork import current_fork +from dexorder.blockstate.fork import current_fork from .state import current_blockstate from dexorder.util import key2str as util_key2str, str2key as util_str2key @@ -43,7 +42,8 @@ class BlockData (Generic[T]): self.value2str = value2str self.str2value = str2value self.savecb = savecb - self.lazy_getitem = None + # set this to a method which fetches final data (e.g. database) + self.lazy_getitem: Optional[Callable[['BlockData',Any],Union[NARG,T]]] = None @property def seriesstr(self): @@ -63,10 +63,9 @@ class BlockData (Generic[T]): result = default if self.lazy_getitem: lazy = self.lazy_getitem(self, item) - if lazy is not None: - lookup_fork, lookup_value = lazy - if lookup_fork in fork: - result = lookup_value + if lazy is not NARG: + state.set(state.root_fork, self.series, item, lazy) + result = lazy if result is NARG: raise KeyError return result @@ -126,7 +125,7 @@ class BlockSet(Generic[T], Iterable[T], BlockData[T]): def add(self, item: T): """ set-like semantics. the item key is added with a value of None. """ - self.setitem(item, None, overwrite=False) + self.setitem(item, None, overwrite=False) # setting overwrite to False means don't create a new DiffEntry if the key exists def remove(self, item: T): self.delitem(item) diff --git a/src/dexorder/blockstate/branch.py b/src/dexorder/blockstate/branch.py new file mode 100644 index 0000000..825cb47 --- /dev/null +++ b/src/dexorder/blockstate/branch.py @@ -0,0 +1,66 @@ +from contextvars import ContextVar +from uuid import uuid4, UUID + +from dexorder.base.block import Block +from dexorder.base.chain import current_chain +from dexorder.util import hexstr + +BranchId = UUID + +class Branch: + + def __init__(self, height, start, parent=bytes(), path: list[bytes] = None, *, chain=None): + assert (0 if path is None else len(path)) <= height - start + 1 + if chain is None: + chain = current_chain.get() + self.id: BranchId = uuid4() + self.chain = chain + self.height = height # highest block number in the path + self.start = start # lowest block number in the path + # parent is the blockhash of the block from which this branch started. Empty bytes indicates unknown. + self.parent = parent + # path is a list of blockhashes included in the branch, from highest block to lowest. path[0], if present, must + # be the hash of the head block in the branch. + # Branches without a complete path are called "disjoint" since their interior is unspecified. Branches that do have + # explicit paths are called "contiguous." + self.path = path if path is not None else [] + + @property + def head(self): + """ the blockhash of the head of this Branch, if known """ + return None if not self.path else self.path[0] + + @property + def disjoint(self): + """ branches that are disjoint do not have a complete list of blockhashes for their interior path """ + return not self.contiguous + + @property + def contiguous(self): + """ contiguous branches have a complete list of blockhashes in their path attribute """ + return len(self.path) == self.height - self.start + 1 + + @staticmethod + def from_block(block: Block) -> 'Branch': + """ create a new Branch from a single Block """ + return Branch(chain=block.chain_id, height=block.height, start=block.height, + parent=block.parent, path=[block.hash]) + + @staticmethod + def from_blocks(blocks: list[Block]): + """ create a new Branch from a list of Block objects """ + # check continuity of block parents + assert all(b.parent == a.hash for a, b in zip(blocks, blocks[1:])) + return Branch(chain=blocks[0].chain_id, height=blocks[-1].height, start=blocks[0].height, + parent=blocks[0].parent, path=[b.hash for b in blocks]) + + def __len__(self): + return self.height - self.start + 1 + + def __str__(self): + # noinspection PyTypeChecker + return (f"Branch#{str(self.id)[2:7]}[" + + (','.join(hexstr(b)[2:7] for b in self.path) if self.path else f'{self.start},{self.height}') + ']') + + +current_branch = ContextVar[Branch]('current_branch') diff --git a/src/dexorder/blockstate/db_state.py b/src/dexorder/blockstate/db_state.py index 46af53b..aa2406c 100644 --- a/src/dexorder/blockstate/db_state.py +++ b/src/dexorder/blockstate/db_state.py @@ -1,15 +1,14 @@ import logging from typing import Iterable, Optional, Union, Any -from . import BlockSet, BlockDict, BlockState, current_blockstate, DataType +from dexorder.blockstate.fork import Fork +from . import BlockSet, BlockDict, BlockState, DataType from .blockdata import BlockData, SeriesCollection from .diff import DiffItem, DiffEntryItem from .. import db, DELETE from ..base.chain import current_chain -from ..base.fork import current_fork, Fork -from ..database.model import SeriesSet, SeriesDict, Block -from ..database.model.block import current_block, latest_block, completed_block -from ..util import hexbytes +from ..blocks import get_block +from ..database.model import SeriesSet, SeriesDict log = logging.getLogger(__name__) @@ -25,21 +24,16 @@ class DbState(SeriesCollection): @staticmethod def lazy_getitem(var: BlockData, item): - chain_id = current_chain.get().chain_id + chain_id = current_chain.get().id t = var.type Entity = SeriesSet if t == DataType.SET else SeriesDict if t == DataType.DICT else None series = var.series2str(var.series) key = var.key2str(item) - try: - height, blockhash = db.kv[f'root_block|{chain_id}'] - except Exception: - return None - fork = Fork([hexbytes(blockhash)], height=height) value = db.session.get(Entity, (chain_id, series, key)) - return fork, var.str2value(value.value) + return var.str2value(value.value) - def save(self, root_block: Block, diffs: Iterable[Union[DiffItem,DiffEntryItem]] ): - chain_id = current_chain.get().chain_id + def save(self, fork: Fork, diffs: Iterable[Union[DiffItem,DiffEntryItem]] ): + chain_id = current_chain.get().id for diff in diffs: try: d = self.datas[diff.series] @@ -70,23 +64,19 @@ class DbState(SeriesCollection): if d.savecb: d.savecb(diff.key, diff.value) # save root block info - db.kv[f'root_block|{root_block.chain}'] = [root_block.height, root_block.hash] + db.kv[f'root_block|{chain_id}'] = [fork.height, fork.head] # noinspection PyShadowingBuiltins - def load(self) -> Optional[BlockState]: - chain_id = current_chain.get().chain_id + async def load(self) -> Optional[BlockState]: + chain_id = current_chain.get().id try: height, hash = db.kv[f'root_block|{chain_id}'] except (KeyError, ValueError): return None - root_block = db.session.get(Block, dict(chain=chain_id, height=height, hash=hexbytes(hash))) - if root_block is None: - return None - current_block.set(root_block) - latest_block.set(root_block) - state = BlockState(root_block) - current_blockstate.set(state) - current_fork.set(None) # root fork + root_block = await get_block(hash) + assert root_block.height == height + state = BlockState() + root_fork = state.init_root_block(root_block) for series, data in self.datas.items(): if data.opts.get('db') != 'lazy': log.debug(f'loading series {series}') @@ -97,7 +87,7 @@ class DbState(SeriesCollection): for row in db.session.query(SeriesSet).where(SeriesSet.chain == chain_id, SeriesSet.series == data.series2str(series)): key = data.str2key(row.key) log.debug(f'load {series} {key}') - var.add(key) + state.set(root_fork, var.series, key, None, overwrite=False) elif t == DataType.DICT: # noinspection PyTypeChecker var: BlockDict = BlockData.registry[series] @@ -105,7 +95,6 @@ class DbState(SeriesCollection): key = data.str2key(row.key) value = data.str2value(row.value) # log.debug(f'load {series} {key} {value}') - var[key] = value - completed_block.set(root_block) + state.set(root_fork, var.series, key, value, overwrite=True) log.debug(f'loaded db state from block {root_block}') return state diff --git a/src/dexorder/blockstate/diff.py b/src/dexorder/blockstate/diff.py index fc301a4..4e850ff 100644 --- a/src/dexorder/blockstate/diff.py +++ b/src/dexorder/blockstate/diff.py @@ -9,7 +9,7 @@ class DiffEntry: """ DiffEntry is the "value" part of a key-value pair, but DiffEntry also has metadata about the block in which the value was set """ value: Union[Any, DELETE] height: int - hash: bytes + branch_id: int @dataclass @@ -34,5 +34,5 @@ class DiffEntryItem: return self.entry.value def __str__(self): - return (f'{self.entry.hash.hex()} {self.series}.{self.key}=' + return (f'B{self.entry.branch_id} {self.series}.{self.key}=' f'{"[DEL]" if self.entry.value is DELETE else self.entry.value}') diff --git a/src/dexorder/blockstate/fork.py b/src/dexorder/blockstate/fork.py new file mode 100644 index 0000000..2b96986 --- /dev/null +++ b/src/dexorder/blockstate/fork.py @@ -0,0 +1,37 @@ +import logging +from contextvars import ContextVar +from typing import Optional, Sequence + +from dexorder import NARG +from dexorder.blockstate.branch import Branch + +log = logging.getLogger(__name__) + + +class Fork: + """ + A Fork is a collection of Branches describing a path through the blockchain. Branches are stored in reverse + chronological order from newest (branches[0]) to oldest (branches[-1]). + """ + + def __init__(self, branches: Sequence[Branch]): + assert len(branches) > 0 + self.branches = branches + + # "properties" + self.branch = self.branches[0] + self.branch_id = self.branch.id + self.branch_ids = [b.id for b in branches] + + self.height = self.branch.height + self.start = branches[-1].height + + self.head = self.branch.head + self.parent = branches[-1].parent + + + def __str__(self): + return f'Fork[{"<-".join(str(b) for b in self.branches)}]' + + +current_fork = ContextVar[Optional[Fork]]('current_fork', default=NARG) diff --git a/src/dexorder/blockstate/state.py b/src/dexorder/blockstate/state.py index 6e7cce4..0a4fc92 100644 --- a/src/dexorder/blockstate/state.py +++ b/src/dexorder/blockstate/state.py @@ -1,18 +1,20 @@ -import itertools import logging from collections import defaultdict +# noinspection PyPackageRequirements from contextvars import ContextVar -from typing import Any, Optional, Union, Reversible +from typing import Any, Optional, Reversible, Sequence from sortedcontainers import SortedList from dexorder import NARG -from dexorder.base.fork import Fork, DisjointFork -from dexorder.database.model import Block -from dexorder.util import hexstr +from dexorder.blockstate.fork import Fork +from .branch import Branch, BranchId from .diff import DiffEntry, DELETE, DiffEntryItem +from ..base.block import Block +from ..util import hexstr log = logging.getLogger(__name__) +state_log = logging.getLogger('dexorder.state') def compress_diffs(difflist: Reversible): @@ -31,86 +33,143 @@ def compress_diffs(difflist: Reversible): class BlockState: - by_chain: dict[int, 'BlockState'] = {} - """ - Since recent blocks can be part of temporary forks, we need to be able to undo certain operations if they were part of a reorg. Instead of implementing - undo, we recover state via snapshot plus replay of recent diffs. When old blocks become low enough in the blockheight they may be considered canonical - at which point the deltas may be reliably incorporated into a rolling permanent collection. BlockState manages separate memory areas - for every block, per-block state that defaults to its parent's state, up the ancestry tree to the root. State clients may read the state for their block, - by applying any diffs along the block's fork path to the root data. + Since recent blocks can be part of temporary forks, we need to be able to undo certain operations if they were part + of a reorg. Instead of implementing undo, we recover state via snapshot plus replay of recent diffs. When old + blocks become low enough in the blockheight, they may be considered finalized at which point the deltas may be + reliably incorporated into a rolling permanent collection (the database.) BlockState manages separate memory areas + for every Branch, which is a segment of the blockchain that represents either a generic range (if the Branch + has no path) or an explicit set of block hashes (if the Branch does specify a path.) The Runner processes events + on a per-Branch basis, and eventually chooses to promote a branch of data after it has aged into finalization. + + The primary data structure is diffs_by_series, which keys by [series][item] into a list of DiffEntry's which are + sorted by block height. Access of a series item scans the list of diffs in reverse blockheight order, returning + the first valid diff it finds. Diffs are valid if the Branch that generated them is part of the current Fork. A + fork is a list of Branches which describes a path from the current block backwards to, and including, the root + branch. The diff lists are garbage collected during set(). If there is more than one diff whose height is older + than the current root branch, then we need only keep the latest value and may discard the elder. Furthermore, + when branches become older than the root branch but are not promoted, they are discarded, ensuring that data + with a diff height of the root branch or older is always part of the finalized blockchain. """ - def __init__(self, root_block: Block): - self.root_block: Block = root_block - self.by_height: SortedList[Block] = SortedList(key=lambda x: x.height) - self.by_hash: dict[bytes, Block] = {root_block.hash: root_block} - # diffs_by_series is the main data structure. leaf nodes are list of diffs sorted by blockheight - self.diffs_by_series: dict[Any, dict[Any, SortedList[DiffEntry]]] = defaultdict(lambda: defaultdict(lambda: SortedList(key=lambda x: x.height))) - # diffs_by_hash holds the diff items generated by each block - self.diffs_by_hash: dict[bytes, list[DiffEntryItem]] = defaultdict(list) - self.ancestors: dict[bytes, Block] = {} - self.unloads: dict[bytes, list] = defaultdict(list) - BlockState.by_chain[root_block.chain] = self + def __init__(self): + self._root_branch: Optional[Branch] = None + self._root_fork: Optional[Fork] = None - def add_block(self, block: Block) -> Optional[Fork]: + # Branches indexed by height + self.branches_by_height: dict[int, list[Branch]] = defaultdict(list) + + # Branches indexed by id + self.branches_by_id: dict[BranchId, Branch] = {} + + # diffs_by_series is the main data structure. leaf nodes are lists of DiffEntrys ordered highest height first + self.diffs_by_series: dict[Any, dict[Any, SortedList[DiffEntry]]] = defaultdict(dict) + + # diffs_by_hash holds the diff items generated by each block. this is needed for cleanup of the + # diffs_by_series data structure when branches expire into history. diffs are stored in insertion order. + self.diffs_by_branch: dict[BranchId, list[DiffEntryItem]] = defaultdict(list) + + self.unloads: dict[BranchId, list] = defaultdict(list) # unload requests for lazy series, keyed by branch id + + + @property + def root_branch(self): + return self._root_branch + + @root_branch.setter + def root_branch(self, value: Branch): + self._root_branch = value + self._root_fork = Fork([value]) + + @property + def root_fork(self): + return self._root_fork + + @property + def root_hash(self): + return self._root_branch.head + + def init_root_block(self, root_block: Block) -> Fork: + assert self.root_branch is None + return self.add_branch(Branch.from_block(root_block)) + + + @property + def heads(self): + result = set(b.head for b in self.branches_by_id.values() if b.head is not None) + result.add(self.root_branch.head) + return result + + + def add_branch(self, branch: Branch, *, strict=True) -> Fork: """ - If block is the same age as root_height or older, it is ignored and None is returned. Otherwise, returns a Fork leading to root. - The ancestor block is set in the ancestors dictionary and any state updates to block are considered to have occured between the registered ancestor - block and the given block. This could be an interval of many blocks, and the ancestor does not need to be the block's immediate parent. + If there is no root_branch set yet, this branch becomes the root branch. Otherwise, returns a Fork with the + set of branches leading to the root. + raises ValueError if no path from this branch to the root branch can be found. + + If strict is True, then a ValueError is raised if the branch does not have a parent hash set. strict + should only be set to False when it is assured that the branch may be joined by height alone, because + the branch join is known to be at a live-blockchain-finalized height. """ - # check height - height_diff = block.height - self.root_block.height - if height_diff <= 0: - log.debug(f'IGNORING old block {block}') - return None - if block.hash not in self.by_hash: - self.by_hash[block.hash] = block - parent = self.by_hash.get(block.parent) - self.ancestors[block.hash] = parent or self.root_block - self.by_height.add(block) - log.debug(f'new block state {block}') - return self.fork(block) + assert branch.id not in self.branches_by_id + + if self.root_branch is None: + self.root_branch = branch + state_log.info(f'Initialized BlockState with {branch}') + return Fork([self.root_branch]) + + self.branches_by_height[branch.height].append(branch) + self.branches_by_id[branch.id] = branch + + # search for a path to the root branch + def build_fork(cur: Branch) -> list[Branch]: + if cur == self.root_branch: + return [cur] + if strict and not cur.parent: + raise ValueError(f'No parent for branch {branch}') + parent_branches = [ + p for p in self.branches_by_height.get(cur.start-1, []) + if not strict or cur.parent == p.head + ] + if cur.parent == self.root_branch.head or not strict and cur.start == self.root_branch.height + 1: + parent_branches.append(self.root_branch) + if not parent_branches: + raise ValueError + def branch_score(b: Branch): + if b.path: + return len(b) # score is the length of the branch: bigger = better + return 1_000_000_000 + len(b.path) # score contiguous branches highest. again, bigger = better + parent_branches.sort(key=branch_score) + parent = parent_branches[-1] # highest score + return [cur, *build_fork(parent)] + + fork = Fork(build_fork(branch)) + state_log.info(f'added branch {fork}') + return fork - def delete_block(self, block: Union[Block, Fork, bytes]): - """ if there was an error during block processing, we need to remove the incomplete block data """ + def remove_branch(self, branch: Branch, *, remove_series_diffs=True): + del self.branches_by_id[branch.id] + by_height = self.branches_by_height.get(branch.height) + if by_height is not None: + by_height.remove(branch) + if len(by_height) == 0: + # garbage collect empty arrays + del self.branches_by_height[branch.height] try: - block = block.hash - except AttributeError: - pass - try: - del self.by_hash[block] - except KeyError: - pass - try: - del self.diffs_by_hash[block] - except KeyError: - pass - try: - del self.ancestors[block] + del self.unloads[branch.id] except KeyError: pass + diffs = self.diffs_by_branch.pop(branch.id, []) + if remove_series_diffs: # this will be False for promoted branches + for diff in diffs: + difflist = self.diffs_by_series.get(diff.series,{}).get(diff.key) + if difflist is not None: + difflist.remove(diff.entry) + state_log.info(f'removed branch {branch}'+ ('' if remove_series_diffs else ' (promoting)')) - def fork(self, block: Block): - if block.hash == self.root_block.hash: - return Fork([block.hash], height=block.height) - - if block.height - self.ancestors[block.hash].height > 1: - return DisjointFork(block, self.root_block) - - def ancestors(): - bh = block.hash - while True: - yield bh - if bh == self.root_block.hash: - return - bh = self.ancestors[bh].hash - return Fork(ancestors(), height=block.height) - - - def get(self, fork: Optional[Fork], series, key, default=NARG): + def get(self, fork: Fork, series, key, default=NARG): series_diffs = self.diffs_by_series.get(series) if series_diffs is None: if default is NARG: @@ -126,140 +185,147 @@ class BlockState: raise KeyError((series, key)) return default - def _get_from_diffs(self, fork, diffs): - for diff in reversed(diffs): - if diff.height <= self.root_block.height or fork is not None and diff in fork: + + def _get_from_diffs(self, fork, diffs: Sequence[DiffEntry] ): + for diff in diffs: + # diffs with old heights are kept around if and only if their branches were promoted, so we can trust them. + if self._fork_has_diff(fork, diff): if diff.value is DELETE: break else: - if fork and self.root_block not in fork: # todo move this assertion elsewhere so it runs once per task - raise ValueError(f'Cannot get value for a non-root fork {hexstr(fork.hash)}') return diff.value return DELETE - def set(self, fork: Optional[Fork], series, key, value, overwrite=True): - diffs = self.diffs_by_series[series][key] - if overwrite or self._get_from_diffs(fork, diffs) is DELETE and value is not DELETE: - diff = DiffEntry(value, - fork.height if fork is not None else self.root_block.height, - fork.hash if fork is not None else self.root_block.hash) - if fork is not None: - self.diffs_by_hash[fork.hash].append(DiffEntryItem(series, key, diff)) - diffs.add(diff) + def set(self, fork: Fork, series, key, value, overwrite=True): + # first look for an existing value + branch = fork.branch + diffs = self.diffs_by_series.get(series,{}).get(key) + old_value = DELETE + if diffs: + for diff in diffs: + if diff.branch_id == branch.id: + # if there's an existing value for this branch, we replace it + diff.value = value + return + elif self._fork_has_diff(fork, diff): + # if there's an existing value on this fork, remember it + old_value = diff.value + break + if not overwrite: + overwrite = value != old_value + if overwrite: + if diffs is None: + diffs = self.diffs_by_series[series][key] = SortedList(key=lambda x: -x.height) + diff = DiffEntry(value, branch.height, branch.id) + diffs.add(diff) + self.diffs_by_branch[branch.id].append(DiffEntryItem(series, key, diff)) + def unload(self, fork: Optional[Fork], series, key): - self.unloads[fork.hash].append((series, key)) + self.unloads[fork.branch_id].append((series, key)) def iteritems(self, fork: Optional[Fork], series): for k, difflist in self.diffs_by_series.get(series, {}).items(): - for diff in reversed(difflist): - if diff.height <= self.root_block.height or fork is not None and diff in fork: + for diff in difflist: + if self._fork_has_diff(fork, diff): if diff.value is not DELETE: yield k, diff.value break def iterkeys(self, fork: Optional[Fork], series): for k, difflist in self.diffs_by_series.get(series, {}).items(): - for diff in reversed(difflist): - if diff.height <= self.root_block.height or fork is not None and diff in fork: + for diff in difflist: + if self._fork_has_diff(fork, diff): if diff.value is not DELETE: yield k break def itervalues(self, fork: Optional[Fork], series): for k, difflist in self.diffs_by_series.get(series, {}).items(): - for diff in reversed(difflist): - if diff.height <= self.root_block.height or fork is not None and diff in fork: + for diff in difflist: + if self._fork_has_diff(fork, diff): if diff.value is not DELETE: yield diff.value break - def promote_root(self, new_root_fork: Fork): - block = self.by_hash[new_root_fork.hash] - diffs = self.collect_diffs(block) - # no application of diffs to the internal state is required, just clean up + def _fork_has_diff(self, fork: Optional[Fork], diff: DiffEntry): + # promotion removes diffs from any abandoned branches, so if a diff has a height at least as old as + # the current root branch, then it is known to be a finalized true value for all current forks + return diff.height <= self.root_branch.height or fork is not None and diff.branch_id in fork.branch_ids - # walk the by_height list to delete any aged-out block data + + def promote_root(self, fork: Fork): + """ + Fork must be based off the root branch. + + The root branch is advanced to be the latest branch in the fork. + + Old branches whose height is less than or equal to the new root's height are garbage collected and discarded. + + Old braches that are not part of the promotion fork have their diffs discarded as well, such that any + diffs remaining in the diffs_by_series structure that are at least as old as the root branch are known to have + been finalized on chain and are valid data. Thus, the series diffs structure will always have at + least one diff, possibly ancient, representing the latest set value for each key. If promote_root detects that + newly promoted diffs make an older diff no longer relevant, that old diff is finally garbage collected. + + Returns the set of diffs for the promoted fork. + """ + found_root = False + promotion_branches = [] + for branch in reversed(fork.branches): + if branch == self.root_branch: + found_root = True + elif found_root: + promotion_branches.append(branch) + assert found_root + if not promotion_branches: + state_log.debug('null promotion ignored') + return None + + # diffs are ordered from oldest branch to newest, using insertion order within each branch + diffs = [d for b in promotion_branches for d in self.diffs_by_branch.get(b.id, [])] + + # walk the branches_by_height list to delete any aged-out block data # in order to prune diffs_by_series, updated_keys remembers all the keys that were touched by any aged-out block - series_deletions = [] - updated_keys = set() - while self.by_height and self.by_height[0].height <= block.height: - dead = self.by_height.pop(0) - if dead is not block: - try: - del self.by_hash[dead.hash] - except KeyError: - pass - block_diffs = self.diffs_by_hash.get(dead.hash) - if block_diffs is not None: - for d in block_diffs: - if d.key == BlockState._DELETE_SERIES_KEY and dead.hash in new_root_fork: - series_deletions.append(d.series) - else: - updated_keys.add((d.series, d.key)) - del self.diffs_by_hash[dead.hash] - try: - del self.ancestors[dead.hash] - except KeyError: - pass # todo is this bad? + for height in range(self.root_branch.height + 1, fork.height + 1): + for old in self.branches_by_height.pop(height, []): + # remove diffs if the branch is not in the promotion fork + self.remove_branch(old, remove_series_diffs=old not in fork.branches) - # prune diffs_by_series by removing old series diffs that have been superceded by new diffs - for s, k in updated_keys: + # old diffs from non-promotion branches have been removed. now we scan all the keys changed by the promotion + # fork to see if there are still extra-old diffs we can garbage collect out of diffs_by_series + for s, k in set((d.series, d.key) for d in diffs): difflist = self.diffs_by_series[s][k] - # remove old diffs on abandoned forks but keep old diffs on the root fork - removals = None - for d in difflist: - if d.height <= new_root_fork.height and d not in new_root_fork: - if removals is None: - removals = [d] - else: - removals.append(d) - if removals is not None: - for r in removals: - difflist.remove(r) - # while the second-oldest diff is still root-age, pop off the oldest diff - while len(difflist) >= 2 and difflist[1].height <= new_root_fork.height: - difflist.pop(0) + # if the second-oldest diff is at least root-age, we don't need the oldest diff + while len(difflist) >= 2 and difflist[-2].height <= fork.height: + difflist.pop() # if only one diff remains, and it's old, and it's a delete, then we can actually delete the diff list - if not difflist or len(difflist) == 1 and difflist[0].value == DELETE and difflist[0].height <= new_root_fork.height: + if not difflist or len(difflist) == 1 and difflist[0].value == DELETE and difflist[0].height <= fork.height: del self.diffs_by_series[s][k] - if block.hash in self.unloads: - key_unloads = self.unloads.pop(block.hash) + + # series unloads + for branch_id in fork.branch_ids: + key_unloads = self.unloads.pop(branch_id, []) for s,k in key_unloads: try: + log.debug(f'unloading ${s} {k}') del self.diffs_by_series[s][k] except KeyError: pass - for s in series_deletions: - del self.diffs_by_series[s] - self.root_block = block - log.debug(f'promoted root {self.root_block}') + + self.root_branch = fork.branch + state_log.info(f'promoted {self.root_branch.height} '+(hexstr(self.root_branch.path[0])[:7]+' ' if self.root_branch.path else '')+' '.join(str(b) for b in reversed(promotion_branches))) return diffs - _DELETE_SERIES_KEY = '!^DEXORDER_DELETE_SERIES^!' - def delete_series(self, fork: Optional[Fork], series: str): - """ - deletes the series entirely. the deletion is part of the blockstate, so the series could remain active on some branches until the - series deletion matures into finality. - """ - self.set(fork, series, BlockState._DELETE_SERIES_KEY, None) # setting any value on this special key will trigger a delete when this block finalizes - - def collect_diffs(self, block: Block, series_key=NARG) -> list[DiffEntryItem]: - """ - returns a list of the latest DiffItem for each key change along the ancestor path from block to root - """ - # first collect the exhaustive list of diffs along the ancestry path - diff_lists: list[list[DiffEntryItem]] = [] - while block.height > self.root_block.height: - diffs = self.diffs_by_hash.get(block.hash) - if diffs: - if series_key is not NARG: - diffs = [d for d in diffs if d.series == series_key] - diff_lists.append(diffs) - block = self.ancestors[block.hash] - difflist = list(itertools.chain(*reversed(diff_lists))) - return compress_diffs(difflist) + # old code that would remove a series entirely upon promotion of the branch that deleted it + # _DELETE_SERIES_KEY = '!^DEXORDER_DELETE_SERIES^!' + # def delete_series(self, fork: Optional[Fork], series: str): + # """ + # deletes the series entirely. the deletion is part of the blockstate, so the series could remain active on some branches until the + # series deletion matures into finality. + # """ + # self.set(fork, series, BlockState._DELETE_SERIES_KEY, None) # setting any value on this special key will trigger a delete when this block finalizes # noinspection PyMethodMayBeStatic @@ -273,20 +339,6 @@ class FinalizedBlockState: self.data = {} self.by_hash = {} - def add_block(self, block: Block) -> Optional[Fork]: - self.by_hash[block.hash] = block - return self.fork(block) - - def delete_block(self, block: Union[Block, Fork, bytes]): - blockhash = block if isinstance(block, bytes) else block.hash - try: - del self.by_hash[blockhash] - except KeyError: - pass - - def fork(self, block: Block): - return Fork([block.hash], height=block.height) - def get(self, _fork: Optional[Fork], series, key, default=NARG): result = self.data.get(series,{}).get(key, default) if result is NARG: diff --git a/src/dexorder/configuration/standard_tokens.py b/src/dexorder/configuration/standard_tokens.py deleted file mode 100644 index 1d02e55..0000000 --- a/src/dexorder/configuration/standard_tokens.py +++ /dev/null @@ -1,8 +0,0 @@ -from .schema import TokenConfig - -default_token_config = [ - # TokenConfig('Wrapped Matic', 'WMATIC', 18, 'Polygon', '0x0d500B1d8E8eF31E21C99d1Db9A6444d3ADf1270', abi='WMATIC'), - # TokenConfig('Wrapped Ethereum','WETH', 18, 'Polygon', '0x7ceB23fD6bC0adD59E62ac25578270cFf1b9f619'), - # TokenConfig('Wrapped Bitcoin', 'WBTC', 8, 'Polygon', '0x1BFD67037B42Cf73acF2047067bd4F2C47D9BfD6'), - # TokenConfig('USD Coin', 'USDC', 6, 'Polygon', '0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174'), -] diff --git a/src/dexorder/contract/__init__.py b/src/dexorder/contract/__init__.py index b2e7f42..14dc0a0 100644 --- a/src/dexorder/contract/__init__.py +++ b/src/dexorder/contract/__init__.py @@ -19,7 +19,7 @@ def get_contract_data(name): def get_deployment_address(deployment_name, contract_name, *, chain_id=None): if chain_id is None: - chain_id = current_chain.get().chain_id + chain_id = current_chain.get().id with open(f'../contract/broadcast/{deployment_name}.sol/{chain_id}/run-latest.json', 'rt') as file: data = json.load(file) for tx in data.get('transactions',[]): diff --git a/src/dexorder/contract/contract_proxy.py b/src/dexorder/contract/contract_proxy.py index f8117ba..fcf2363 100644 --- a/src/dexorder/contract/contract_proxy.py +++ b/src/dexorder/contract/contract_proxy.py @@ -3,12 +3,12 @@ import logging from typing import Optional import eth_account -from web3.exceptions import BadFunctionCallOutput, Web3Exception +from web3.exceptions import Web3Exception from web3.types import TxReceipt from dexorder import current_w3 from dexorder.base.account import current_account -from dexorder.database.model.block import current_block +from dexorder.blockstate.fork import current_fork from dexorder.util import hexstr log = logging.getLogger(__name__) @@ -49,8 +49,8 @@ class DeployTransaction (ContractTransaction): def call_wrapper(addr, name, func): async def f(*args, **kwargs): try: - blockhash = hexstr(current_block.get().hash) - except LookupError: + blockhash = hexstr(current_fork.get().head) + except (LookupError, AttributeError): blockhash = 'latest' try: return await func(*args).call(block_identifier=blockhash, **kwargs) diff --git a/src/dexorder/contract/decimals.py b/src/dexorder/contract/decimals.py index dde230d..9e4cd81 100644 --- a/src/dexorder/contract/decimals.py +++ b/src/dexorder/contract/decimals.py @@ -3,7 +3,7 @@ import logging from eth_abi.exceptions import InsufficientDataBytes from web3.exceptions import ContractLogicError, BadFunctionCallOutput -from dexorder import db, dec +from dexorder import db from dexorder.contract import ERC20 log = logging.getLogger(__name__) @@ -14,6 +14,7 @@ async def token_decimals(addr): try: return db.kv[key] except KeyError: + # noinspection PyBroadException try: decimals = await ERC20(addr).decimals() except (InsufficientDataBytes, ContractLogicError, BadFunctionCallOutput): diff --git a/src/dexorder/contract/dexorder.py b/src/dexorder/contract/dexorder.py index 7833c81..d068a6e 100644 --- a/src/dexorder/contract/dexorder.py +++ b/src/dexorder/contract/dexorder.py @@ -34,7 +34,7 @@ def _load_chain(chain_id: int): def get_by_chain(d): - chain_id = current_chain.get().chain_id + chain_id = current_chain.get().id try: return d[chain_id] except KeyError: diff --git a/src/dexorder/database/column_types.py b/src/dexorder/database/column_types.py index 522f375..4367361 100644 --- a/src/dexorder/database/column_types.py +++ b/src/dexorder/database/column_types.py @@ -6,6 +6,7 @@ from sqlalchemy.dialects.postgresql import BYTEA, JSONB from web3 import Web3 from dexorder import Fixed2 as NativeFixed, Blockchain as NativeBlockchain +from dexorder.util import hexstr, hexbytes class Address(TypeDecorator): @@ -24,7 +25,7 @@ class Blockchain(TypeDecorator): cache_ok = True def process_bind_param(self, value: NativeBlockchain, dialect): - return value.chain_id + return value.id def process_result_value(self, value: int, dialect): return NativeBlockchain.for_id(value) @@ -85,3 +86,14 @@ def DataclassDict(constructor): result = DataclassDictBase() result.Constructor = constructor return result + + +class BytesList(TypeDecorator): + impl = JSONB + + def process_bind_param(self, value, dialect): + return [hexstr(b) for b in value] + + def process_result_value(self, result, dialect): + return [hexbytes(s) for s in result] + diff --git a/src/dexorder/database/model/__init__.py b/src/dexorder/database/model/__init__.py index eefb9d0..de6a48a 100644 --- a/src/dexorder/database/model/__init__.py +++ b/src/dexorder/database/model/__init__.py @@ -1,8 +1,10 @@ from .base import Base from .kv import KeyValue -from .block import Block from .series import SeriesSet, SeriesDict from .transaction import Transaction, TransactionJob from .orderindex import OrderIndex from .pool import Pool from .token import Token + + +class Block: pass diff --git a/src/dexorder/database/model/block.py b/src/dexorder/database/model/block.py deleted file mode 100644 index a0d21cc..0000000 --- a/src/dexorder/database/model/block.py +++ /dev/null @@ -1,36 +0,0 @@ -from contextvars import ContextVar - -from sqlalchemy.dialects.postgresql import JSONB -from sqlalchemy.orm import Mapped, mapped_column - -from dexorder.database.model import Base -from dexorder.util import hexint, Field, hexstr - - -class Block(Base): - - @staticmethod - def from_data(chain_id:int, data:dict): - """ Builds a Block using the response data from an RPC server """ - return Block(chain=chain_id, height=data['number'] if type(data['number']) is int else int(data['number'],0), - hash=hexstr(data['hash']), parent=hexstr(data['parentHash']), data=data) - - chain: Mapped[int] = mapped_column(primary_key=True) - height: Mapped[int] = mapped_column(primary_key=True) - hash: Mapped[bytes] = mapped_column(primary_key=True) - parent: Mapped[bytes] - data: Mapped[dict] = mapped_column(JSONB) - - @property - def timestamp(self) -> int: - raw = self.data['timestamp'] - # noinspection PyTypeChecker - return raw if type(raw) is int else hexint(raw) - - def __str__(self): - return f'{self.height}_{self.hash.hex()[2:7]}' - - -current_block = ContextVar[Block]('Block.cur') # block for the current thread -latest_block = Field[Block]() # most recent discovered block but maybe not the currently processing one -completed_block = ContextVar[Block]('Block.completed') # most recent fully-processed block diff --git a/src/dexorder/database/model/pool.py b/src/dexorder/database/model/pool.py index fea0e75..7efab0d 100644 --- a/src/dexorder/database/model/pool.py +++ b/src/dexorder/database/model/pool.py @@ -1,5 +1,5 @@ import logging -from typing import TypedDict, Optional +from typing import TypedDict from sqlalchemy.orm import Mapped, mapped_column diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index 2c770fc..5636b2f 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -8,13 +8,13 @@ from dexorder import current_pub, db, from_timestamp, minutely from dexorder.base.chain import current_chain, current_clock from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, OrderKey from dexorder.blocks import get_block_timestamp +from dexorder.blockstate.fork import current_fork from dexorder.ohlc import ohlcs, recent_ohlcs from dexorder.transaction import submit_transaction_request from dexorder.pools import new_pool_prices, pool_prices, get_uniswap_data from dexorder.contract.dexorder import vault_address, VaultContract from dexorder.contract import ERC20 from dexorder.vault_blockdata import vault_owners, vault_balances, adjust_balance -from dexorder.database.model.block import current_block from dexorder.database.model.transaction import TransactionJob from dexorder.base.orderlib import SwapOrderState from dexorder.order.orderstate import Order @@ -50,7 +50,7 @@ async def handle_order_placed(event: EventData): num_orders = int(event['args']['numOrders']) log.debug(f'DexorderPlaced {addr} {start_index} {num_orders}') if addr not in vault_owners: - log.warning(f'block {current_block.get()} order from unknown vault {addr}') # todo insert (short) block hash into all logs + log.warning(f'block {current_fork.get().head} order from unknown vault {addr}') # todo insert (short) block hash into all logs # return todo always discard rogues # noinspection PyBroadException try: @@ -155,7 +155,7 @@ async def handle_uniswap_swap(swap: EventData): pool, time, price = data addr = pool['address'] pool_prices[addr] = price - ohlcs.update_all(addr, time, price) + await ohlcs.update_all(addr, time, price) log.debug(f'pool {addr} {minutely(time)} {price}') @@ -178,7 +178,7 @@ def handle_vault_created(created: EventData): else: break # log.debug(f'updated vaults: {vaults}') - current_pub.get()(f'{current_chain.get().chain_id}|{owner}', 'vaults', vaults) + current_pub.get()(f'{current_chain.get().id}|{owner}', 'vaults', vaults) async def activate_time_triggers(): @@ -208,7 +208,7 @@ async def activate_price_triggers(): async def process_active_tranches(): for tk, proof in active_tranches.items(): old_req = execution_requests.get(tk) - height = current_block.get().height + height = current_fork.get().height if old_req is None or old_req.height <= height: # '<=' is used so proof is updated with more recent values if await has_funds(tk): log.info(f'execution request for {tk}') @@ -234,7 +234,7 @@ async def has_funds(tk: TrancheKey): async def process_execution_requests(): - height = current_block.get().height + height = current_fork.get().height execs = {} # which requests to act on for tk, er in execution_requests.items(): tk: TrancheKey @@ -311,13 +311,13 @@ def finish_execution_request(req: TrancheExecutionRequest, error: str): last_ohlc_rollover = 0 -def check_ohlc_rollover(): +async def check_ohlc_rollover(): global last_ohlc_rollover - time = current_block.get().timestamp + time = await get_block_timestamp(current_fork.get().head) dt = from_timestamp(time) diff = time - last_ohlc_rollover if diff >= 60 or dt.minute != from_timestamp(last_ohlc_rollover).minute: for (symbol, period) in recent_ohlcs.keys(): - ohlcs.update(symbol, period, dt) + await ohlcs.update(symbol, period, dt) last_ohlc_rollover = time diff --git a/src/dexorder/memcache/memcache_state.py b/src/dexorder/memcache/memcache_state.py index 5f4b66d..7ed0b96 100644 --- a/src/dexorder/memcache/memcache_state.py +++ b/src/dexorder/memcache/memcache_state.py @@ -7,12 +7,11 @@ from socket_io_emitter import Emitter from dexorder import DELETE from dexorder.base.chain import current_chain -from dexorder.base.fork import current_fork from dexorder.blockstate import DiffItem, DataType, BlockState from dexorder.blockstate.blockdata import SeriesCollection, BlockData from dexorder.blockstate.diff import DiffEntryItem +from dexorder.blockstate.fork import Fork from dexorder.blockstate.state import compress_diffs -from dexorder.database.model import Block from dexorder.memcache import current_redis, memcache from dexorder.util.async_util import maywait from dexorder.util.json import json_encoder @@ -26,29 +25,28 @@ class RedisState (SeriesCollection): super().__init__(series_or_datavars) self.exists:set[str] = set() + # noinspection PyMethodMayBeStatic async def clear(self): log.debug('clearing memcache') r = current_redis.get() - await r.delete(*[f'{current_chain.get().chain_id}|{k}' for k in ['latest_block', *self.datas.keys()]]) + await r.delete(*[f'{current_chain.get().id}|{k}' for k in ['latest_block', *self.datas.keys()]]) - async def init(self, state: BlockState): - fork = current_fork.get() + async def init(self, state: BlockState, fork: Fork): await self.clear() diffs = [] for series in self.datas.keys(): for k, v in state.iteritems(fork, series): diffs.append(DiffItem(series, k, v)) - await self.save(state.root_block, diffs) + await self.save(fork, diffs) # noinspection PyAsyncCall - async def save(self, block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]): + async def save(self, fork: Fork, diffs: Reversible[Union[DiffItem, DiffEntryItem]]): # the diffs must be already compressed such that there is only one action per key chain = current_chain.get() - assert block.chain == chain.chain_id - chain_id = chain.chain_id + chain_id = chain.id sadds: dict[str,set[str]] = defaultdict(set) sdels: dict[str,set[str]] = defaultdict(set) hsets: dict[str,dict[str,str]] = defaultdict(dict) @@ -101,9 +99,9 @@ class RedisState (SeriesCollection): r.hset(series, mapping=kvs) for series, keys in hdels.items(): r.hdel(series, *keys) - block_series = f'{chain_id}|block.latest' - r.json(json_encoder).set(block_series,'$',block.data) - pubs.append((str(chain_id), 'block.latest', [block.data])) + block_series = f'{chain_id}|head' + r.json(json_encoder).set(block_series,'$',[fork.height, fork.head]) + pubs.append((str(chain_id), 'head', [fork.height, fork.head])) # separate batch for pubs if pubs: await publish_all(pubs) diff --git a/src/dexorder/metadata.py b/src/dexorder/metadata.py index e448234..04e0467 100644 --- a/src/dexorder/metadata.py +++ b/src/dexorder/metadata.py @@ -123,7 +123,7 @@ def is_generating_metadata(): # noinspection PyShadowingNames def generate_metadata(tokens: Iterable[Union[Token, TokenDict]], pools: Iterable[Union[Pool, PoolDict]], file=sys.stdout): - dump(file, '{"'+str(current_chain.get().chain_id)+'":{"t":[') + dump(file, '{"' + str(current_chain.get().id) + '":{"t":[') dump_tokens(file, tokens) dump(file, '],"p":[') dump_pools(file, pools) @@ -135,7 +135,7 @@ metadata_by_chainaddr = {} def get_metadata(addr=None, *, chain_id=None): if chain_id is None: - chain_id = current_chain.get().chain_id + chain_id = current_chain.get().id global metadata if metadata is NARG: if config.metadata is None or generating_metadata: diff --git a/src/dexorder/ohlc.py b/src/dexorder/ohlc.py index 0c16429..7ce533a 100644 --- a/src/dexorder/ohlc.py +++ b/src/dexorder/ohlc.py @@ -8,9 +8,10 @@ from cachetools import LFUCache from dexorder import dec, config, from_timestamp, timestamp, now, minutely from dexorder.base.chain import current_chain +from dexorder.blocks import get_block_timestamp from dexorder.blockstate import BlockDict, DiffItem, current_blockstate from dexorder.blockstate.diff import DiffEntryItem -from dexorder.database.model import Block +from dexorder.blockstate.fork import Fork, current_fork from dexorder.util import json from dexorder.util.shutdown import fatal @@ -184,18 +185,18 @@ class OHLCKey (NamedTuple): def quotes_path(chain_id: int = None): if chain_id is None: - chain_id = current_chain.get().chain_id + chain_id = current_chain.get().id return f'{chain_id}/quotes.json' def series_path(chain_id: int = None): if chain_id is None: - chain_id = current_chain.get().chain_id + chain_id = current_chain.get().id return f'{chain_id}/series.json' def chunk_path(symbol: str, period: timedelta, time: datetime, *, chain_id: int = None) -> str: if chain_id is None: - chain_id = current_chain.get().chain_id + chain_id = current_chain.get().id start = ohlc_start_time(time, period) name = period_name(period) return f'{chain_id}/{symbol}/{name}/' + ( @@ -215,7 +216,7 @@ class Chunk: self.repo_dir = repo_dir self.symbol = symbol self.period = period - self.chain_id = chain_id if chain_id is not None else current_chain.get().chain_id + self.chain_id = chain_id if chain_id is not None else current_chain.get().id self.path = chunk_path(symbol, period, time, chain_id=chain_id) self.fullpath = os.path.join(repo_dir, self.path) if bars is not None: @@ -299,7 +300,7 @@ class OHLCRepository: @property def chain_id(self): - return self._chain_id if self._chain_id is not None else current_chain.get().chain_id + return self._chain_id if self._chain_id is not None else current_chain.get().id @property def dir(self): @@ -332,12 +333,12 @@ class OHLCRepository: if (symbol, period) not in recent_ohlcs: recent_ohlcs[(symbol, period)] = [] - def update_all(self, symbol: str, time: datetime, price: dec, *, create: bool = True): + async def update_all(self, symbol: str, time: datetime, price: dec, *, create: bool = True): """ the update_all() and update() methods generate bars for the recent_ohlcs BlockDict """ for period in OHLC_PERIODS: - self.update(symbol, period, time, price, create=create) + await self.update(symbol, period, time, price, create=create) - def update(self, symbol: str, period: timedelta, time: datetime, price: Optional[dec] = None, *, create: bool = True) \ + async def update(self, symbol: str, period: timedelta, time: datetime, price: Optional[dec] = None, *, create: bool = True) \ -> Optional[list[NativeOHLC]]: """ if price is None, then bars are advanced based on the time but no new price is added to the series. @@ -363,11 +364,14 @@ class OHLCRepository: updated = update_ohlc(historical[-1], period, time, price) # drop any historical bars that are older than we need # oldest_needed = cover the root block time plus one period prior - oldest_needed = from_timestamp(current_blockstate.get().root_block.timestamp) - period - # noinspection PyTypeChecker - trim = (oldest_needed - historical[0].start) // period - if trim > 0: - historical = historical[trim:] + root_hash = current_blockstate.get().root_branch.head + if root_hash is not None: + root_timestamp = await get_block_timestamp(root_hash) + oldest_needed = from_timestamp(root_timestamp) - period + # noinspection PyTypeChecker + trim = (oldest_needed - historical[0].start) // period + if trim > 0: + historical = historical[trim:] # now overlap the updated data on top of the historical data if not historical or not updated: @@ -403,7 +407,7 @@ class OHLCRepository: self.dirty_chunks.add(chunk) def get_chunk(self, symbol: str, period: timedelta, start_time: datetime) -> Chunk: - chain_id = current_chain.get().chain_id + chain_id = current_chain.get().id key = chunk_path(symbol, period, start_time, chain_id=chain_id) found = self.cache.get(key) if found is None: @@ -476,7 +480,7 @@ class FinalOHLCRepository (OHLCRepository): bar = self.current[key] = NativeOHLC(start, price, price, price, close) chunk.update(bar, backfill=backfill) self.dirty_chunks.add(chunk) - chain_id_str = str(current_chain.get().chain_id) + chain_id_str = str(current_chain.get().id) if chain_id_str not in self.series: self.series[chain_id_str] = {} self.series[chain_id_str][f'{key[0]}|{period_name(key[1])}'] = {'start': timestamp(start)} @@ -511,7 +515,7 @@ def save_json(obj, filename): def pub_ohlc(_series:str, key: OHLCKey, bars: list[NativeOHLC]): pool_addr, period = key - chain_id = current_chain.get().chain_id + chain_id = current_chain.get().id period = period_name(period) key = f'{pool_addr}|{period}' return f'{chain_id}|{key}', 'ohlc', (chain_id, key, [b.ohlc for b in bars]) @@ -523,7 +527,7 @@ def ohlc_str_to_key(s): pool, period = s.split('|') return pool, period_from_name(period) -def ohlc_save(_block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]): +def ohlc_save(_fork: Fork, diffs: Reversible[Union[DiffItem, DiffEntryItem]]): """ used as a finalization callback from BlockState data. """ diff --git a/src/dexorder/order/orderstate.py b/src/dexorder/order/orderstate.py index 146c87e..3d962d2 100644 --- a/src/dexorder/order/orderstate.py +++ b/src/dexorder/order/orderstate.py @@ -196,7 +196,7 @@ class Order: log.debug(f'pub order status {_s} {k} {v}') # publish status updates (on placing and completion) to web clients try: - chain_id = current_chain.get().chain_id + chain_id = current_chain.get().id return (f'{chain_id}|{vault_owners[k.vault]}', # publish on the vault owner's channel 'o', # order message type (chain_id, k.vault, k.order_index, v.dump())) # (order_index, order_status) @@ -213,7 +213,7 @@ class Order: if v is DELETE: return None try: - chain_id = current_chain.get().chain_id + chain_id = current_chain.get().id return (f'{chain_id}|{vault_owners[k.vault]}', # publish on the vault owner's channel 'of', # order message type (chain_id, k.vault, k.order_index, v.dump())) # (order_index, order_fills) @@ -234,6 +234,7 @@ class Order: if oi: oi.state = status.state else: + order_log.debug(f'saving OrderIndex {key} {status.state}') oi = OrderIndex(chain=current_chain.get(), vault=key.vault, order_index=key.order_index, state=status.state) sess.add(oi) diff --git a/src/dexorder/pools.py b/src/dexorder/pools.py index 98135d1..e070ee5 100644 --- a/src/dexorder/pools.py +++ b/src/dexorder/pools.py @@ -31,7 +31,7 @@ async def get_pool(address: str) -> PoolDict: async def load_pool(address: str) -> PoolDict: found = None - chain_id = current_chain.get().chain_id + chain_id = current_chain.get().id # todo other exchanges try: v3 = UniswapV3Pool(address) @@ -72,7 +72,7 @@ class PoolPrices (BlockDict[str, dec]): def pub_pool_price(_s,k,v): - chain_id = current_chain.get().chain_id + chain_id = current_chain.get().id return f'{chain_id}|{k}', 'p', (chain_id, k, str(v)) diff --git a/src/dexorder/progressor.py b/src/dexorder/progressor.py index b6c4617..0c4f74b 100644 --- a/src/dexorder/progressor.py +++ b/src/dexorder/progressor.py @@ -101,8 +101,8 @@ class BlockProgressor(metaclass=ABCMeta): for log_event in log_events: try: parsed = event.process_log(log_event) if event is not None else log_event - except (LogTopicError, MismatchedABI) as e: - # log.debug(f'logevent parse error {e}\n{log_event}') # this happens for Swap events from non-Uniswap pools + except (LogTopicError, MismatchedABI): + # this happens for Swap events from non-Uniswap pools parsed = NARG # need a placeholder parsed_events.append(parsed) # todo try/except for known retryable errors diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index 30f87fd..5aac6d9 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -1,69 +1,42 @@ import asyncio import logging -from asyncio import Queue -from typing import Any, Iterable, Callable +from asyncio import Event +from datetime import timedelta +from typing import Any, Iterable, Callable, Optional from eth_bloom import BloomFilter -from web3.exceptions import LogTopicError, MismatchedABI # noinspection PyPackageRequirements from websockets.exceptions import ConnectionClosedError -from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config, NARG +from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config, now +from dexorder.base.block import Block, latest_block from dexorder.base.chain import current_chain, current_clock, BlockClock -from dexorder.base.fork import current_fork, Fork, DisjointFork from dexorder.blockchain.connection import create_w3_ws, create_w3 -from dexorder.blocks import get_block +from dexorder.blocks import cache_block, get_block, promotion_height from dexorder.blockstate import BlockState, current_blockstate +from dexorder.blockstate.branch import Branch from dexorder.blockstate.diff import DiffEntryItem -from dexorder.database.model import Block -from dexorder.database.model.block import current_block, latest_block +from dexorder.blockstate.fork import current_fork, Fork from dexorder.progressor import BlockProgressor from dexorder.transaction import create_and_send_transactions -from dexorder.util import hexstr, hexint, hexbytes +from dexorder.util import hexstr, hexbytes, hexint from dexorder.util.async_util import maywait, Maywaitable from dexorder.util.shutdown import fatal log = logging.getLogger(__name__) -class Retry (Exception): ... - # todo detect reorgs and generate correct onHeadUpdate set by unioning the changes along the two forks, not including their common ancestor deltas class BlockStateRunner(BlockProgressor): - """ - - NOTE: This doc is old and not strictly true but still has the basic idea - - 1. load root stateBlockchain - a. if no root, init from head - b. if root is old, batch forward by height - 2. discover new heads - 2b. find in-state parent block else use root - 3. set the current fork = ancestor->head diff state - 4. query blockchain eventlogs - 5. process new vaults - 6. process new orders and cancels - a. new pools - 7. process Swap events and generate pool prices - 8. process price horizons - 9. process token movement - 10. process swap triggers (zero constraint tranches) - 11. process price tranche triggers - 12. process horizon tranche triggers - 13. filter by time tranche triggers - 14. bundle execution requests and send tx. tx has require(block= 0 \ - else current_block.get().height + config.backfill # if backfill is negative then it's relative to the current block + self.timer_period = timer_period # todo deprecated? self.running = False async def run(self): # this run() process discovers new heads and puts them on a queue for the worker to process. the discovery is ether websockets or polling - if self.state: - self.max_height_seen = max(self.max_height_seen, self.state.root_block.height) self.running = True + # this run() process discovers new heads and puts them on a queue for the worker to process + _worker_task = asyncio.create_task(self.worker()) return await (self.run_polling() if config.polling > 0 or not config.ws_url else self.run_ws()) async def run_ws(self): @@ -101,26 +73,28 @@ class BlockStateRunner(BlockProgressor): chain = Blockchain.for_id(chain_id) current_chain.set(chain) - # this run() process discovers new heads and puts them on a queue for the worker to process - _worker_task = asyncio.create_task(self.worker()) - while self.running: + # noinspection PyBroadException try: async with w3ws as w3ws: log.debug('connecting to ws provider') await w3ws.provider.connect() - subscription = await w3ws.eth.subscribe('newHeads') # the return value of this call is not consistent between anvil/hardhat/rpc. do not use it. + subscription = await w3ws.eth.subscribe('newHeads') # the return value of this call is not consistent between anvil/hardhat/rpc. log.debug(f'subscribed to newHeads {subscription}') while self.running: async for message in w3ws.ws.process_subscriptions(): - head = message['result'] - log.debug(f'detected new block {head["number"]} {hexstr(head["hash"])}') - await self.add_head(head) + block = Block(chain_id, message['result']) + cache_block(block) + latest_block[chain_id] = block + self.new_head_event.set() + log.debug(f'detected new head {block}') if not self.running: break await async_yield() except (ConnectionClosedError, TimeoutError, asyncio.TimeoutError) as e: log.debug(f'runner timeout {e}') + except: + log.exception(f'Unhandled exception during run_polling()') finally: # noinspection PyBroadException try: @@ -128,7 +102,6 @@ class BlockStateRunner(BlockProgressor): await w3ws.provider.disconnect() except Exception: pass - log.debug('yield') log.debug('runner run_ws() exiting') @@ -138,86 +111,96 @@ class BlockStateRunner(BlockProgressor): https://github.com/NomicFoundation/hardhat/issues/2053 So we implement polling as a workaround. """ + assert config.polling > 0 w3 = await create_w3() chain_id = await w3.eth.chain_id chain = Blockchain.for_id(chain_id) current_chain.set(chain) - _worker_task = asyncio.create_task(self.worker()) - prev_blockhash = None - while self.running: - try: - # polling mode is used primarily because Hardhat fails to deliver newHeads events after about an hour - # unfortunately, hardhat also stops responding to eth_getBlockByHash. so instead, we use the standard (stupid) - # 'latest' polling for blocks, and we push the entire block to the queue since apparently this is the only - # rpc call Hardhat seems to consistently support. The worker must then detect the type of object pushed to the - # work queue and either use the block directly or query for the block if the queue object is a hashcode. - block = await w3.eth.get_block('latest') - head = block['hash'] - if head != prev_blockhash: - prev_blockhash = head - log.debug(f'polled new block {hexstr(head)}') - await self.add_head(block) - if not self.running: - break - await asyncio.sleep(config.polling) - except (ConnectionClosedError, TimeoutError, asyncio.TimeoutError) as e: - log.debug(f'runner timeout {e}') - finally: + next_poll = now() + try: + while self.running: + sleep = (next_poll - now()).total_seconds() + if sleep > 0: + await asyncio.sleep(sleep) + next_poll = now() + timedelta(seconds=config.polling) # noinspection PyBroadException try: - # noinspection PyUnresolvedReferences - await w3.provider.disconnect() - except Exception: - pass - await async_yield() - log.debug('runner run_polling() exiting') + prev_blockhash = await asyncio.wait_for( + self.poll_head(chain, w3, prev_blockhash), timeout=config.polling) + except TimeoutError as e: + log.debug(f'runner timeout {e}') + except (ConnectionClosedError, TimeoutError) as e: + log.debug(f'runner timeout {e}') + except: + log.exception(f'Unhandled exception during run_polling()') + finally: + # noinspection PyBroadException + try: + # noinspection PyUnresolvedReferences + await w3.provider.disconnect() + except Exception: + pass + log.debug('runner run_polling() exiting') + async def poll_head(self, chain, w3, prev_blockhash): + # polling mode is used primarily because Hardhat fails to deliver newHeads events after about an hour + # unfortunately, hardhat also stops responding to eth_getBlockByHash. so instead, we use the standard (stupid) + # 'latest' polling for blocks, and we push the entire block to the queue since apparently this is the only + # rpc call Hardhat seems to consistently support. The worker must then detect the type of object pushed to the + # work queue and either use the block directly or query for the block if the queue object is a hashcode. + blockdata = await w3.eth.get_block('latest') + head = blockdata['hash'] + if head == prev_blockhash: + return prev_blockhash + log.debug(f'polled new head {hexstr(head)} {hexint(blockdata["number"])}') + block = Block(chain.id, blockdata) + latest_block[chain.id] = block + # prefetch the head's ancestors + if self.state is not None and self.state.root_branch is not None: + if self.state.root_branch.height >= block.height - chain.confirms * 2: + # prefetch parent blocks back to the root height + cur = block + while self.state.root_branch is not None and cur.height > self.state.root_branch.height: + cur = await get_block(cur.parent, chain_id=chain.id) + self.new_head_event.set() + return head - async def add_head(self, head): - """ - head can either be a full block-data struct or simply a block hash. this method converts it to a Block - and pushes that Block onto the worker queue - """ - chain = current_chain.get() - w3 = current_w3.get() - try: - block_data = head - blockhash = block_data['hash'] - parent = block_data['parentHash'] - height = block_data['number'] - head = Block(chain=chain.chain_id, height=height, hash=blockhash, parent=parent, data=block_data) - except TypeError: - head = await get_block(head) - latest_block.set(head) + async def create_branch(self, chain: Blockchain) -> Optional[Fork]: + if chain.id not in latest_block: + return None + block = latest_block[chain.id] - if self.state or config.backfill: - # backfill batches - start_height = self.max_height_seen - batch_size = config.batch_size if config.batch_size is not None else chain.batch_size - batch_height = start_height + batch_size - 1 - while batch_height < head.height: - # the backfill is larger than a single batch, so we push intermediate head blocks onto the queue - response = await w3.provider.make_request('eth_getBlockByNumber', [hex(batch_height), False]) - block_data: dict = response['result'] - blockhash = bytes.fromhex(block_data['hash'][2:]) - parent = bytes.fromhex(block_data['parentHash'][2:]) - height = int(block_data['number'], 0) - assert height == batch_height - block = Block(chain=chain.chain_id, height=height, hash=blockhash, parent=parent, data=block_data) - log.debug(f'enqueueing batch backfill from {start_height} through {batch_height}') - await self.queue.put(block) # add an intermediate block - self.max_height_seen = height - start_height += chain.batch_size - batch_height += chain.batch_size - if self.queue.qsize() > 2: - await asyncio.sleep(1) - else: - await async_yield() - await self.queue.put(head) # add the head block - self.max_height_seen = head.height + if self.state is None: + self.state = BlockState() + + if self.state.root_branch is None: + # no root branch, so create one from a single block branch + return self.state.add_branch(Branch.from_block(block)) + + if block.height - self.state.root_branch.height >= chain.confirms * 2: + # create a disjoint backfilling branch + start = self.state.root_branch.height + 1 + # do not query more than the chain's batch size + # do not query into the reorgable area. only query finalized data. + height = min( start + chain.batch_size, block.height - chain.confirms) + branch = Branch(height, start) # no parent or path + return self.state.add_branch(branch, strict=False) + + # otherwise construct an explicit list of linked blocks from the most recent head to the latest block + heads = self.state.heads + path = [block.hash] + cur = block + while True: + if cur.parent in heads: + branch = Branch( block.height, block.height - len(path) + 1, block.parent, path, chain=chain ) + return self.state.add_branch(branch) + if cur.height <= self.state.root_branch.height: + fatal(f'Latest head {block.hash} does not have the root block {self.state.root_branch.head} as a parent') + cur = await get_block(cur.parent) + path.append(cur.hash) async def worker(self): @@ -225,68 +208,54 @@ class BlockStateRunner(BlockProgressor): log.debug(f'runner worker started') w3 = current_w3.get() chain = current_chain.get() - assert chain.chain_id == await w3.eth.chain_id + assert chain.id == await w3.eth.chain_id current_clock.set(BlockClock()) - prev_head = None while self.running: try: - if self.timer_period: - async with asyncio.timeout(self.timer_period): - head = await self.queue.get() - else: - head = await self.queue.get() - except TimeoutError: - # 1 second has passed without a new head. Run the postprocess callbacks to check for activated time-based triggers - if prev_head is not None: - await self.handle_time_tick(prev_head) - else: - try: - await self.handle_head(chain, head, w3) - prev_head = head - except Retry: - pass - except Exception as x: - log.exception(x) + await self.new_head_event.wait() + except asyncio.CancelledError: + break + self.new_head_event.clear() + try: + fork = await self.create_branch(chain) + except ValueError: + log.warning(f'Could not build a branch back to root! {hexstr(latest_block[chain.id].hash)} <-?- {hexstr(self.state.root_branch.head)}') + continue + # noinspection PyBroadException + try: + if fork is not None: + await self.process(fork) + except: + log.exception('Reverting branch due to exception') + self.state.remove_branch(fork.branch) except Exception: - log.exception('exception in runner worker') + log.exception('Unhandled exception in runner worker') raise finally: log.debug('runner worker exiting') - async def handle_head(self, chain, block, w3): - # todo refactor this to generate a fork from the latest block back to whatever ancestor it can find - log.debug(f'handle_head {block.height} {hexstr(block.hash)}') - if self.state and block.height <= self.state.root_block.height: - log.debug(f'ignoring old head') - return + async def process(self, fork: Fork): + log.debug(f'processing {fork}') + chain = current_chain.get() + w3 = current_w3.get() + current_blockstate.set(self.state) + current_fork.set(fork) session = None batches = [] pubs = [] try: - if self.state is not None and block.hash in self.state.by_hash: - log.debug(f'block {block.hash} was already processed') - return - if self.state is None: - # initialize - self.state = BlockState(block) - current_blockstate.set(self.state) - fork: Fork = Fork([block.hash], height=block.height) - log.info('Created new empty root state') + branch = fork.branch + if branch.disjoint: + # query the entire range (this assumes branch.height is a finalized height) + batches = await self.get_backfill_batches(branch.start, branch.height) else: - fork = self.state.add_block(block) - if fork is None: - log.debug(f'discarded late-arriving head {block}') - else: - batches: list - from_height = self.state.by_hash[fork.parent].height + 1 if fork.parent is not None else fork.height - to_height = fork.height - if fork.disjoint: - batches = await self.get_backfill_batches(from_height, to_height, w3) - else: + # query every block explicitly + for blockhash in reversed(branch.path): # event callbacks are triggered in the order in which they're registered. the events passed to # each callback are in block transaction order - bloom = BloomFilter(int.from_bytes(block.data['logsBloom'])) + block = await get_block(blockhash) + bloom = BloomFilter(int.from_bytes(hexbytes(block.data['logsBloom']))) for callback, event, log_filter in self.events: if log_filter is None: batches.append((None, callback, event, None)) @@ -303,63 +272,44 @@ class BlockStateRunner(BlockProgressor): if not config.parallel_logevent_queries: get_logs = await get_logs batches.append((get_logs, callback, event, log_filter)) - for callback in self.postprocess_cbs: - batches.append((None, callback, None, None)) + for callback in self.postprocess_cbs: + batches.append((None, callback, None, None)) - # set up for callbacks - current_block.set(block) - current_fork.set(fork) - session = db.make_session(autocommit=False) - session.begin() - session.add(block) - current_pub.set(lambda room, evnt, *args: pubs.append((room, evnt, args))) # used by handle_vault_created - if not self.state_initialized: - await self.do_state_init_cbs() - await self.invoke_callbacks(batches) + # set up for callbacks + session = db.make_session(autocommit=False) + session.begin() + current_pub.set(lambda room, evnt, *args: pubs.append((room, evnt, args))) # used by handle_vault_created + if not self.state_initialized: + await self.do_state_init_cbs() + log.debug(f'invoking callbacks with fork {current_fork.get()}') + await self.invoke_callbacks(batches) - # todo - # IMPORTANT! check for a reorg and generate a reorg diff list. the diff list we need is the union of the set of keys touched by either - # branch. Then we query all the values for those keys and apply that kv list to redis. This will make sure that any orphaned data that - # isn't updated by the new fork is still queried from the root state to overwrite any stale data from the abandoned branch. - diff_items = self.state.diffs_by_hash[block.hash] - for callback in self.on_head_update: + # todo + # IMPORTANT! check for a reorg and generate a reorg diff list. the diff list we need is the union of the set of keys touched by either + # branch. Then we query all the values for those keys and apply that kv list to redis. This will make sure that any orphaned data that + # isn't updated by the new fork is still queried from the root state to overwrite any stale data from the abandoned branch. + diff_items = self.state.diffs_by_branch[fork.branch_id] + for callback in self.on_head_update: + # noinspection PyCallingNonCallable + await maywait(callback(fork, diff_items)) + + # check for root promotion + promo_height = promotion_height(chain, fork.height) + promotable_branches = [b for b in fork.branches + if self.state.root_branch.height < b.height <= promo_height + or b == self.state.root_branch] + + if len(promotable_branches) > 1: + promotion_fork = Fork(promotable_branches) + diff_items = self.state.promote_root(promotion_fork) + for callback in self.on_promotion: + # todo try/except for known retryable errors # noinspection PyCallingNonCallable - await maywait(callback(block, diff_items)) - - # check for root promotion - confirm_offset = (config.confirms if config.confirms is not None else chain.confirms) - 1 - promotion_height = latest_block.get().height - confirm_offset # todo latest_block should not be a ContextVar but a global dict by chain_id - new_root_fork = None - if fork.disjoint: - fork: DisjointFork - # individually check the fork's head and ancestor - if fork.height <= promotion_height: - new_root_fork = fork - else: - state = current_blockstate.get() - parent_block = fork.root - if parent_block.height <= promotion_height: - new_root_fork = state.fork(parent_block) - else: - fork: Fork - # non-disjoint, contiguous fork - if fork.height <= promotion_height: - new_root_fork = fork - else: - new_root_fork = fork.for_height(promotion_height) - if new_root_fork: - log.debug(f'promoting root {new_root_fork.height} {hexstr(new_root_fork.hash)}') - diff_items = self.state.promote_root(new_root_fork) - for callback in self.on_promotion: - # todo try/except for known retryable errors - # noinspection PyCallingNonCallable - await maywait(callback(self.state.root_block, diff_items)) + await maywait(callback(promotion_fork, diff_items)) except: # legitimately catch EVERYTHING because we re-raise log.debug('rolling back session') if session is not None: session.rollback() - if block.hash is not None and self.state is not None: - self.state.delete_block(block.hash) if config.parallel_logevent_queries: for get_logs, *_ in batches: if get_logs is not None: @@ -390,16 +340,15 @@ class BlockStateRunner(BlockProgressor): # noinspection PyCallingNonCallable await maywait(self.publish_all(pubs)) - log.info(f'completed block {block}') + log.info(f'completed {fork.branch}') finally: db.close_session() - async def handle_time_tick(self, block): + async def handle_time_tick(self, fork: Fork): + # todo re-enable time ticks if current_blockstate.get() is None: return - fork = self.state.fork(block) - current_block.set(block) current_fork.set(fork) session = db.session session.begin() @@ -415,6 +364,7 @@ class BlockStateRunner(BlockProgressor): finally: session.close() + async def do_state_init_cbs(self): if self.state_initialized: return diff --git a/src/dexorder/tokens.py b/src/dexorder/tokens.py index a9c81ca..6f61036 100644 --- a/src/dexorder/tokens.py +++ b/src/dexorder/tokens.py @@ -1,4 +1,3 @@ -import asyncio import logging from typing import Optional @@ -53,7 +52,7 @@ async def load_token(address: str) -> Optional[TokenDict]: log.warning(f'token {address} has no decimals()') decimals = 0 approved = config.metadata is None - chain_id = current_chain.get().chain_id + chain_id = current_chain.get().id symbol = await symbol_prom name = await name_prom td = TokenDict(type='Token', chain=chain_id, address=address, diff --git a/src/dexorder/transaction.py b/src/dexorder/transaction.py index 6579af5..b990619 100644 --- a/src/dexorder/transaction.py +++ b/src/dexorder/transaction.py @@ -7,8 +7,8 @@ from web3.exceptions import TransactionNotFound from dexorder import db, current_w3 from dexorder.base.chain import current_chain from dexorder.base.order import TransactionRequest +from dexorder.blockstate.fork import current_fork from dexorder.contract.contract_proxy import ContractTransaction -from dexorder.database.model.block import current_block from dexorder.database.model.transaction import TransactionJob, Transaction as DbTransaction, TransactionJobState log = logging.getLogger(__name__) @@ -32,7 +32,7 @@ class TransactionHandler: def submit_transaction_request(tr: TransactionRequest): - job = TransactionJob(id=uuid4(), chain=current_chain.get(), height=current_block.get().height, state=TransactionJobState.Requested, request=tr) + job = TransactionJob(id=uuid4(), chain=current_chain.get(), height=current_fork.get().height, state=TransactionJobState.Requested, request=tr) db.session.add(job) return job diff --git a/src/dexorder/uniswap.py b/src/dexorder/uniswap.py index a4c1294..fef94ae 100644 --- a/src/dexorder/uniswap.py +++ b/src/dexorder/uniswap.py @@ -54,7 +54,7 @@ class _UniswapContracts (ByBlockchainDict[ContractProxy]): 'quoter': ContractProxy('0xb27308f9F90D607463bb33eA1BeBb41C27CE5AB6', 'IQuoter'), 'swap_router': ContractProxy('0xE592427A0AEce92De3Edee1F18E0157C05861564', 'ISwapRouter'), } - super().__init__({chain.chain_id:std for chain in (Ethereum, Polygon, Goerli, Mumbai, Arbitrum, Mock, Alpha)}) + super().__init__({chain.id:std for chain in (Ethereum, Polygon, Goerli, Mumbai, Arbitrum, Mock, Alpha)}) uniswapV3 = _UniswapContracts() diff --git a/src/dexorder/util/__init__.py b/src/dexorder/util/__init__.py index 4be444d..5b2c070 100644 --- a/src/dexorder/util/__init__.py +++ b/src/dexorder/util/__init__.py @@ -1,11 +1,10 @@ import re -from typing import Callable, TypeVar, Generic, Union, Any +from typing import Callable, TypeVar, Generic, Union from eth_utils import keccak from hexbytes import HexBytes from .async_util import async_yield -from .tick_math import nearest_available_ticks, round_tick, spans_tick, spans_range def align_decimal(value, left_columns) -> str: @@ -26,16 +25,18 @@ def hexstr(value: Union[HexBytes, bytes, str]): elif type(value) is str: return value if value.startswith('0x') else '0x' + value else: - raise ValueError + raise ValueError(f'Could not convert hexstr {value}') -def hexbytes(value: Union[str|bytes]): +def hexbytes(value: Union[str,bytes]): """ converts an optionally 0x-prefixed hex string into bytes """ - return value if type(value) is bytes else bytes.fromhex(value[2:] if value.startswith('0x') else value) + return value if type(value) is bytes else \ + bytes(value) if type(value) is HexBytes else \ + bytes.fromhex(value[2:] if value.startswith('0x') else value) -def hexint(value: str): - return int(value[2:] if value.startswith('0x') else value, 16) +def hexint(value: Union[str,int]): + return value if type(value) is int else int(value,0) def _keystr1(value): @@ -73,7 +74,7 @@ class defaultdictk (Generic[K,V], dict[K,V]): T = TypeVar('T') -class Field (Generic[T]): +class GlobalVar (Generic[T]): def __init__(self, value: T = None): self._value = value diff --git a/src/dexorder/util/async_dict.py b/src/dexorder/util/async_dict.py new file mode 100644 index 0000000..720c4ec --- /dev/null +++ b/src/dexorder/util/async_dict.py @@ -0,0 +1,80 @@ +import asyncio +import logging +from abc import abstractmethod +from asyncio import Event +from typing import TypeVar, Generic, Awaitable, Callable, Optional + +from dexorder import NARG + +log = logging.getLogger(__name__) + +K = TypeVar('K') +V = TypeVar('V') + + +class _Query (Generic[V]): + def __init__ (self): + self.event = Event() + self.result: V = NARG + self.exception: Optional[Exception] = None + + def __bool__(self): + return self.result is not NARG + + +class AsyncDict (Generic[K,V]): + """ + Implements per-key locks around accessing dictionary values. + Either supply fetch and store functions in the constructor, or override those methods in a subclass. + """ + def __init__(self, + fetch: Callable[[K,V], Awaitable[V]] = None, + store: Callable[[K,V], Awaitable[V]] = None, + ): + self._queries: dict[K,_Query[V]] = {} + if fetch is not None: + self.fetch = fetch + if store is not None: + self.store = store + + async def get(self, key: K, default: V = NARG) -> V: + query = self._queries.get(key) + if query is None: + return await self._query(key, self.fetch(key, default)) + else: + await query.event.wait() + if query.exception is not None: + raise query.exception + return query.result + + async def set(self, key: K, value: V): + query = self._queries.get(K) + if query is not None: + await query.event.wait() + await self._query(key, self.store(key, value)) + + # noinspection PyMethodMayBeStatic,PyUnusedLocal + @abstractmethod + async def fetch(self, key: K, default: V = NARG) -> V: + raise NotImplementedError + + # noinspection PyMethodMayBeStatic,PyUnusedLocal + @abstractmethod + async def store(self, key: K, value: V) -> V: + """ + Must return the value that was just set. + """ + raise NotImplementedError + + async def _query(self, key: K, coro: Awaitable[V]) -> V: + assert key not in self._queries + query = _Query() + self._queries[key] = query + try: + query.result = await coro + except Exception as e: + query.exception = e + finally: + del self._queries[key] + query.event.set() + return query.result diff --git a/src/dexorder/util/async_util.py b/src/dexorder/util/async_util.py index 13bb1d0..3ec4824 100644 --- a/src/dexorder/util/async_util.py +++ b/src/dexorder/util/async_util.py @@ -1,7 +1,6 @@ import asyncio import inspect -from abc import ABC -from typing import Union, Callable, Awaitable, TypeVar, Generic +from typing import Union, Awaitable, TypeVar async def async_yield(): diff --git a/src/dexorder/util/json.py b/src/dexorder/util/json.py index 3f65a71..2876dc4 100644 --- a/src/dexorder/util/json.py +++ b/src/dexorder/util/json.py @@ -1,4 +1,3 @@ -from collections import defaultdict from decimal import Decimal from json import JSONEncoder from typing import Any diff --git a/src/dexorder/util/lru.py b/src/dexorder/util/lru.py new file mode 100644 index 0000000..a6449f8 --- /dev/null +++ b/src/dexorder/util/lru.py @@ -0,0 +1,59 @@ +from collections import OrderedDict +from typing import Optional, Callable, TypeVar, Generic, MutableMapping, Iterator + +K = TypeVar('K') +V = TypeVar('V') + +class LRUCache (MutableMapping[K,V], Generic[K,V]): + + def __init__(self, capacity: int, factory: Optional[Callable[[K],V]]=None): + assert capacity >= 0 + super().__init__() + self._d = OrderedDict() + self.capacity = capacity + self.factory = factory + + def __setitem__(self, key, value): + self._d.__setitem__(key, value) + self._d.move_to_end(key) + if len(self._d) > self.capacity: + self._d.popitem(last=False) + + def __delitem__(self, key: K): + del self._d[key] + + def __getitem__(self, key: K) -> V: + try: + result = self._d[key] + self._d.move_to_end(key) # mark as recently used + except KeyError: + if self.factory is None: + raise + result = self.factory(key) + self._d[key] = result + return result + + def get(self, key, default: Optional[V] = None) -> V: + try: + result = self._d[key] + self._d.move_to_end(key) # mark as recently used + except KeyError: + if self.factory is None: + raise + result = self.factory(key) + self._d[key] = result + return result + + def __len__(self) -> int: + return len(self._d) + + def __iter__(self) -> Iterator[K]: + return iter(self._d) + + @property + def is_empty(self): + return len(self._d) == 0 + + @property + def is_full(self): + return len(self._d) == self.capacity diff --git a/src/dexorder/util/tick_math.py b/src/dexorder/util/tick_math.py deleted file mode 100644 index b0516d2..0000000 --- a/src/dexorder/util/tick_math.py +++ /dev/null @@ -1,24 +0,0 @@ - -def round_tick(tick, tick_spacing): - """ - returns the nearest available tick - """ - return round(tick/tick_spacing) * tick_spacing - - -def nearest_available_ticks(tick, tick_spacing): - """ - returns the two available ticks just below and above the given tick - """ - lower = tick // tick_spacing * tick_spacing - upper = lower + tick_spacing - return lower, upper - - -def spans_tick(tick, lower, upper): - return spans_range( *nearest_available_ticks(tick), lower, upper) - - -def spans_range(below, above, lower, upper): - return lower < above and upper > below - diff --git a/src/dexorder/vault_blockdata.py b/src/dexorder/vault_blockdata.py index b8af71b..4815385 100644 --- a/src/dexorder/vault_blockdata.py +++ b/src/dexorder/vault_blockdata.py @@ -13,7 +13,7 @@ log = logging.getLogger(__name__) # values of DELETE are serialized as nulls def pub_vault_balances(_s, k, v): - chain_id = current_chain.get().chain_id + chain_id = current_chain.get().id try: return f'{chain_id}|{vault_owners[k]}', 'vb', (chain_id, k, json.dumps({k2: str(v2) for k2, v2 in v.items()})) except KeyError: diff --git a/src/dexorder/walker.py b/src/dexorder/walker.py index 42a3aa6..3c6a6a4 100644 --- a/src/dexorder/walker.py +++ b/src/dexorder/walker.py @@ -7,9 +7,10 @@ from typing import Union, Callable from dexorder import config, db, now, current_w3 from dexorder.base.chain import current_chain from dexorder.blockstate import current_blockstate +from dexorder.blockstate.branch import Branch +from dexorder.blockstate.fork import Fork, current_fork from dexorder.blockstate.state import FinalizedBlockState -from dexorder.database.model import Block -from dexorder.database.model.block import current_block, latest_block +from dexorder.base.block import Block, BlockInfo from dexorder.progressor import BlockProgressor from dexorder.util.async_util import Maywaitable @@ -43,7 +44,7 @@ class BlockWalker (BlockProgressor): db.connect() w3 = current_w3.get() chain = current_chain.get() - chain_id = chain.chain_id + chain_id = chain.id confirm_offset = (config.confirms if config.confirms is not None else chain.confirms) - 1 batch_size = config.batch_size if config.batch_size is not None else chain.batch_size current_blockstate.set(FinalizedBlockState()) @@ -66,22 +67,17 @@ class BlockWalker (BlockProgressor): while self.running: # noinspection PyBroadException try: - latest_rawblock = await w3.eth.get_block('latest') - latest_height = latest_rawblock['number'] - latest_block.set(Block.from_data(chain_id, latest_rawblock)) - if prev_height is None or latest_height > prev_height: - prev_height = latest_height - log.debug(f'polled new block {latest_height}') - promotion_height = latest_height - confirm_offset + latest_blockdata: BlockInfo = await w3.eth.get_block('latest') + latest = Block(chain_id, latest_blockdata) + if prev_height is None or latest.height > prev_height: + prev_height = latest.height + log.debug(f'polled new block {latest.height}') + promotion_height = latest.height - confirm_offset while (processed_height < promotion_height and (config.walker_stop is None or processed_height < config.walker_stop)): cur_height = min(promotion_height, processed_height+batch_size-1) if config.walker_stop is not None: cur_height = min(cur_height, config.walker_stop) - block_data = await w3.eth.get_block(cur_height) - block = Block.from_data(chain_id, block_data) - assert block.height == cur_height - current_block.set(block) await self.handle(processed_height+1, cur_height, chain=chain, w3=w3) if self.flush_delay is None or \ self.flush_type=='blocks' and last_flush + self.flush_delay <= processed_height or \ @@ -129,6 +125,9 @@ class BlockWalker (BlockProgressor): chain = current_chain.get() if w3 is None: w3 = current_w3.get() + branch = Branch(to_height, from_height) + fork = Fork([branch]) + current_fork.set(fork) batches = await self.get_backfill_batches(from_height, to_height, w3=w3) await self.invoke_callbacks(batches, chain) log.info(f'completed through block {to_height}') diff --git a/test/test_blockstate.py b/test/test_blockstate.py index 1e4fe9e..617ee6f 100644 --- a/test/test_blockstate.py +++ b/test/test_blockstate.py @@ -1,62 +1,133 @@ -from dexorder.blockstate import BlockState, BlockDict -from dexorder.database.model.block import Block +import logging +import sys -block_10 = Block(chain=1, height=10, hash=bytes.fromhex('10'), parent=bytes.fromhex('09'), data=None) -block_11a = Block(chain=1, height=11, hash=bytes.fromhex('1a'), parent=block_10.hash, data=None) -block_11b = Block(chain=1, height=11, hash=bytes.fromhex('1b'), parent=block_10.hash, data=None) -block_12a = Block(chain=1, height=12, hash=bytes.fromhex('12'), parent=block_11a.hash, data=None) -state = BlockState(block_10, {'series':{'foo':'bar'}}) -BlockState.set_cur(state) -d = BlockDict('series') +from dexorder import DELETE, NARG +from dexorder.base.chain import current_chain, Mock +from dexorder.blockstate import BlockState, BlockDict, current_blockstate +from dexorder.blockstate.branch import Branch +from dexorder.blockstate.fork import current_fork, Fork -def start_block(b): - Block.set_cur(b) - state.add_block(b) +logging.basicConfig(level=logging.INFO, stream=sys.stdout) +logging.getLogger('dexorder').setLevel(logging.DEBUG) -start_block(block_11a) -del d['foo'] -d['foue'] = 'barre' +current_chain.set(Mock) -start_block(block_12a) -d['foo'] = 'bar2' +b0 = bytes([0]) # genesis block hash +root_branch = Branch(0, 0, bytes(), [b0]) -start_block(block_11b) -d['fu'] = 'ku' +def new_state(): + state = BlockState() + state.add_branch(root_branch) + current_blockstate.set(state) + return state -def print_dict(x:dict=d): - for k, v in x.items(): - print(f'{k:>10} : {v}') +s = new_state() -for block in [block_10,block_11a,block_12a,block_11b]: - Block.set_cur(block) - print() - print(Block.cur().hash) - print_dict() +series_name = 'test' +series = BlockDict(series_name) -def test11b(): - Block.set_cur(block_11b) - assert 'fu' in d - assert d['fu'] == 'ku' - assert 'foo' in d - assert d['foo'] == 'bar' +def get(fork: Fork, default=NARG): + value = s.get(fork, series_name, 'foo', default) + # print(f'{fork} => {value}') + return value -def test12a(): - Block.set_cur(block_12a) - assert 'fu' not in d - assert 'foo' in d - assert d['foo'] == 'bar2' - assert 'foue' in d - assert d['foue'] == 'barre' -test11b() -test12a() -state.promote_root(block_11a) -print() -print('promoted root') -print_dict(state.root_state) -test12a() -state.promote_root(block_12a) -print() -print('promoted root') -print_dict(state.root_state) -test12a() +block_data = {} + +def make_block(num: int, data: dict=None): + key = bytes([num]) + block_data[key] = data if data is not None else dict(foo=hex(num)[2:]) + return key + + +# blocks are by height and then an a-b-c fork +# by default, each block sets foo= +b1a = make_block(0x1a) +b2a = make_block(0x2a) +b3a = make_block(0x3a) +b4a = make_block(0x4a) +b5a = make_block(0x5a) + + +def make_branch(state: BlockState, height: int, start: int, parent: bytes, path: list[bytes]): + branch = Branch(height, start, parent, path) + fork = state.add_branch(branch) + current_fork.set(fork) + for block_id in reversed(branch.path): + for k,v in block_data[block_id].items(): + series[k] = v + return fork + +fork_a = make_branch(s, 5, 1, b0, [b5a, b4a, b3a, b2a, b1a]) +fork_a1 = make_branch(s, 1, 1, b0, [b1a]) +fork_a2 = make_branch(s, 2, 2, b1a, [b2a]) +fork_a3 = make_branch(s, 3, 3, b2a, [b3a]) +fork_aa = make_branch(s, 3, 1, b0, [b3a, b2a, b1a]) + +fork_ab = make_branch(s, 5, 4, b3a, [b5a, b4a]) +# this fork has multiple branch combinations. the algo should prefer using fewer branches. +assert fork_ab.branches[1] == fork_aa.branch + +assert get(fork_a) == '5a' +assert get(fork_aa) == '3a' +assert get(fork_ab) == '5a' + +# now change the current value at the end of fork_a +current_fork.set(fork_a) +diff_count = len(s.diffs_by_branch[fork_a.branch_id]) +series['foo'] = 'not' +assert get(fork_a) == 'not' +series['foo'] = 'bar' +assert get(fork_a) == 'bar' +# make sure it didn't create any extra diffs but performed value replacement in the DiffEntry instead +assert diff_count == len(s.diffs_by_branch[fork_a.branch_id]) + +# chain B does nothing until deleting foo at height 3, then it sets it back at height 5 +# block 1 is taken from a-chain +b2b = make_block(0x2b, {}) +b3b = make_block(0x3b, dict(foo=DELETE)) +b4b = make_block(0x4b, {}) +b5b = make_block(0x5b) + +fork_from_a = make_branch(s, 2, 2, b1a, [b2b]) +# this fork should have joined the branch from fork_a1, which connects to genesis for a total of three branches +assert len(fork_from_a.branches) == 3 +assert fork_from_a.branches[1] == fork_a1.branch +# the value should have carried over from the other branch +assert get(fork_from_a) == '1a' + +fork_delete = make_branch(s, 4, 3, b2b, [b4b, b3b]) +missing = 'missing' +assert get(fork_delete, missing) is missing +# make sure it throws KeyError since the key is deleted +try: + found = series['foo'] + assert False +except KeyError: + pass +# restore the 'foo' key with a value of '5b' +fork_restore = make_branch(s, 5, 5, b4b, [b5b]) +assert get(fork_restore) == '5b' + + +s.promote_root(fork_aa) + +# test garbage collection +diffs = s.diffs_by_series[series_name].get('foo') +assert diffs +assert diffs[-1].height == 3 # only the very latest value should be maintained + +try: + s.promote_root(fork_from_a) + assert False # fork B should not be able to be promoted +except AssertionError: + pass + +# chain C +b1c = make_block(0x1c) +b2c = make_block(0x2c) +b3c = make_block(0x3c) +b4c = make_block(0x4c) +b5c = make_block(0x5c) + +logging.getLogger('dexorder').error('Insufficient number of test cases')