From 6ab486ac1840b53302748c13fc4b9dd55aa2c6e1 Mon Sep 17 00:00:00 2001 From: Tim Olson <> Date: Thu, 28 Sep 2023 23:20:55 -0400 Subject: [PATCH] db state --- alembic/versions/1da309899f95_.py | 44 --- alembic/versions/db62e7db828d_.py | 55 ++++ src/dexorder/base/chain.py | 18 +- src/dexorder/base/fork.py | 65 +++++ src/dexorder/base/token.py | 2 +- src/dexorder/bin/main.py | 24 +- src/dexorder/blockchain/__init__.py | 2 +- src/dexorder/blockchain/connection.py | 14 +- src/dexorder/blockstate/__init__.py | 70 +++++ src/dexorder/blockstate/blockdata.py | 101 +++++++ src/dexorder/blockstate/db_state.py | 79 ++++++ src/dexorder/blockstate/diff.py | 22 ++ .../blockstate.py => blockstate/state.py} | 250 +----------------- src/dexorder/data.py | 8 - src/dexorder/data/__init__.py | 7 + src/dexorder/database/__init__.py | 35 ++- src/dexorder/database/column.py | 6 +- src/dexorder/database/model/__init__.py | 1 + src/dexorder/database/model/block.py | 6 +- src/dexorder/database/model/kv.py | 13 + src/dexorder/database/model/series.py | 21 ++ src/dexorder/{trigger_runner.py => runner.py} | 27 +- src/dexorder/util/__init__.py | 13 + src/dexorder/util/json.py | 13 +- 24 files changed, 556 insertions(+), 340 deletions(-) delete mode 100644 alembic/versions/1da309899f95_.py create mode 100644 alembic/versions/db62e7db828d_.py create mode 100644 src/dexorder/base/fork.py create mode 100644 src/dexorder/blockstate/__init__.py create mode 100644 src/dexorder/blockstate/blockdata.py create mode 100644 src/dexorder/blockstate/db_state.py create mode 100644 src/dexorder/blockstate/diff.py rename src/dexorder/{base/blockstate.py => blockstate/state.py} (58%) delete mode 100644 src/dexorder/data.py create mode 100644 src/dexorder/data/__init__.py create mode 100644 src/dexorder/database/model/kv.py create mode 100644 src/dexorder/database/model/series.py rename src/dexorder/{trigger_runner.py => runner.py} (90%) diff --git a/alembic/versions/1da309899f95_.py b/alembic/versions/1da309899f95_.py deleted file mode 100644 index 82999c4..0000000 --- a/alembic/versions/1da309899f95_.py +++ /dev/null @@ -1,44 +0,0 @@ -"""empty message - -Revision ID: 1da309899f95 -Revises: -Create Date: 2023-09-18 19:18:11.706312 - -""" -from typing import Sequence, Union - -from alembic import op -import sqlalchemy as sa -from sqlalchemy.dialects import postgresql - -# revision identifiers, used by Alembic. -revision: str = '1da309899f95' -down_revision: Union[str, None] = None -branch_labels: Union[str, Sequence[str], None] = None -depends_on: Union[str, Sequence[str], None] = None - - -def upgrade() -> None: - # ### commands auto generated by Alembic - please adjust! ### - op.create_table('block', - sa.Column('chain', sa.Integer(), nullable=False), - sa.Column('height', sa.Integer(), nullable=False), - sa.Column('hash', sa.LargeBinary(), nullable=False), - sa.Column('parent', sa.LargeBinary(), nullable=False), - sa.Column('data', postgresql.JSONB(astext_type=sa.Text()), nullable=False), - sa.PrimaryKeyConstraint('chain', 'height', 'hash') - ) - op.drop_table('migrations') - # ### end Alembic commands ### - - -def downgrade() -> None: - # ### commands auto generated by Alembic - please adjust! ### - op.create_table('migrations', - sa.Column('id', sa.INTEGER(), autoincrement=True, nullable=False), - sa.Column('name', sa.VARCHAR(length=255), autoincrement=False, nullable=False), - sa.Column('run_on', postgresql.TIMESTAMP(), autoincrement=False, nullable=False), - sa.PrimaryKeyConstraint('id', name='migrations_pkey') - ) - op.drop_table('block') - # ### end Alembic commands ### diff --git a/alembic/versions/db62e7db828d_.py b/alembic/versions/db62e7db828d_.py new file mode 100644 index 0000000..7c9e871 --- /dev/null +++ b/alembic/versions/db62e7db828d_.py @@ -0,0 +1,55 @@ +"""empty message + +Revision ID: db62e7db828d +Revises: +Create Date: 2023-09-28 23:04:41.020644 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +import dexorder.database +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = 'db62e7db828d' +down_revision: Union[str, None] = None +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table('block', + sa.Column('chain', sa.Integer(), nullable=False), + sa.Column('height', sa.Integer(), nullable=False), + sa.Column('hash', sa.LargeBinary(), nullable=False), + sa.Column('parent', sa.LargeBinary(), nullable=False), + sa.Column('data', postgresql.JSONB(astext_type=sa.Text()), nullable=False), + sa.PrimaryKeyConstraint('chain', 'height', 'hash') + ) + op.create_table('keyvalue', + sa.Column('key', sa.String(), nullable=False), + sa.Column('value', postgresql.JSONB(astext_type=sa.Text()), nullable=True), + sa.PrimaryKeyConstraint('key') + ) + op.create_table('seriesdict', + sa.Column('value', postgresql.JSONB(astext_type=sa.Text()), nullable=True), + sa.Column('chain', sa.Integer(), nullable=False), + sa.Column('series', sa.String(), nullable=False), + sa.Column('key', sa.String(), nullable=False), + sa.PrimaryKeyConstraint('chain', 'series', 'key') + ) + op.create_table('seriesset', + sa.Column('chain', sa.Integer(), nullable=False), + sa.Column('series', sa.String(), nullable=False), + sa.Column('key', sa.String(), nullable=False), + sa.PrimaryKeyConstraint('chain', 'series', 'key') + ) + + +def downgrade() -> None: + op.drop_table('seriesset') + op.drop_table('seriesdict') + op.drop_table('keyvalue') + op.drop_table('block') diff --git a/src/dexorder/base/chain.py b/src/dexorder/base/chain.py index f9d192e..aab1379 100644 --- a/src/dexorder/base/chain.py +++ b/src/dexorder/base/chain.py @@ -2,14 +2,6 @@ from contextvars import ContextVar class Blockchain: - @staticmethod - def cur() -> 'Blockchain': - return _cur.get() - - @staticmethod - def set_cur(value: 'Blockchain'): - _cur.set(value) - @staticmethod def for_id(chain_id): result = Blockchain._instances_by_id.get(chain_id) @@ -25,9 +17,13 @@ class Blockchain: def get(name_or_id): return Blockchain.for_name(name_or_id) if type(name_or_id) is str else Blockchain.for_id(name_or_id) - def __init__(self, chain_id, name): + def __init__(self, chain_id, name, confirms=10): + """ + confirms is the number of blocks until a block can be considered finalized and unforkable + """ self.chain_id = chain_id self.name = name + self.confirms = confirms Blockchain._instances_by_id[chain_id] = self Blockchain._instances_by_name[name] = self @@ -45,6 +41,6 @@ Goerli = Blockchain(5, 'Goerli') Polygon = Blockchain(137, 'Polygon') # POS not zkEVM Mumbai = Blockchain(80001, 'Mumbai') BSC = Blockchain(56, 'BSC') -Arbitrum = ArbitrumOne = Blockchain(42161, 'ArbitrumOne') +Arbitrum = Blockchain(42161, 'Arbitrum', 10) -_cur = ContextVar[Blockchain]('Blockchain.cur') +current_chain = ContextVar[Blockchain]('current_chain') diff --git a/src/dexorder/base/fork.py b/src/dexorder/base/fork.py new file mode 100644 index 0000000..d92bfb6 --- /dev/null +++ b/src/dexorder/base/fork.py @@ -0,0 +1,65 @@ +import logging +from contextvars import ContextVar +from typing import Iterable, Optional + +from dexorder.database.model import Block + +log = logging.getLogger(__name__) + + +class Fork: + """ + A Fork is an ancestor path, stored as block hashes in reverse-chronological order from the "current" block at the start to ancestors at the end. The + getitem [] operator indexes by block height for positive values, while negative value are relative to the latest block, so [-1] is the latest + block and [-2] is its parent, etc. + """ + + def __init__(self, ancestry: Iterable[bytes], *, height: int): + self.ancestry = list(ancestry) + self.height = height + self.disjoint = False + + def __contains__(self, item): + index = self.height - item.height + if index < 0: + return False + try: + return self.ancestry[index] == item.hash + except IndexError: + return False + + @property + def hash(self): + return self.ancestry[0] + + @property + def parent(self): + return self.ancestry[1] + + def for_height(self, height): + """ returns a new Fork object for an older block along this fork. used for root promotion. """ + assert( self.height - len(self.ancestry) < height <= self.height) + return Fork(self.ancestry[self.height-height:], height=height) + + def __str__(self): + return f'{self.height}_[{"->".join(h.hex() for h in self.ancestry)}]' + + +class DisjointFork: + """ + duck type of Fork for blocks that connect directly to root with a parent gap in-between + """ + def __init__(self, block: Block, root: Block): + self.height = block.height + self.hash = block.hash + self.parent = root.hash + self.disjoint = True + + def __contains__(self, item): + return item.hash in (self.hash, self.parent) + + def __str__(self): + return f'{self.height}_[{self.hash.hex()}->{self.parent.hex()}]' + + +current_fork = ContextVar[Optional[Fork]]('current_fork', default=None) diff --git a/src/dexorder/base/token.py b/src/dexorder/base/token.py index db69d6e..f7999eb 100644 --- a/src/dexorder/base/token.py +++ b/src/dexorder/base/token.py @@ -6,7 +6,7 @@ from web3 import Web3 from dexorder import config, Blockchain, NARG, FixedDecimals, ADDRESS_0 from dexorder.blockchain import ByBlockchainDict -from dexorder.base.chain import Polygon, ArbitrumOne, Ethereum +from dexorder.base.chain import Polygon, Arbitrum, Ethereum from dexorder.contract import ContractProxy, abis import dexorder.database.column as col diff --git a/src/dexorder/bin/main.py b/src/dexorder/bin/main.py index b576672..56e4630 100644 --- a/src/dexorder/bin/main.py +++ b/src/dexorder/bin/main.py @@ -1,16 +1,32 @@ import logging +from dexorder import db, config, Blockchain +from dexorder.base.chain import current_chain from dexorder.bin.executable import execute -from dexorder.trigger_runner import TriggerRunner +from dexorder.blockstate.blockdata import BlockData +from dexorder.blockstate.db_state import DbState +from dexorder.configuration import parse_args +from dexorder.runner import BlockStateRunner +from dexorder.data import pool_prices, vault_tokens, vault_addresses, underfunded_vaults, active_orders log = logging.getLogger('dexorder') -ROOT_AGE = 10 # todo set per chain - if __name__ == '__main__': logging.basicConfig(level=logging.INFO) log = logging.getLogger('dexorder') log.setLevel(logging.DEBUG) - execute(TriggerRunner().run()) + parse_args() + current_chain.set(Blockchain.get(config.chain)) + state = None + if db: + db.connect() + db_state = DbState(BlockData.by_tag['db']) + with db.session: + state = db_state.load() + runner = BlockStateRunner(state) + if db: + # noinspection PyUnboundLocalVariable + runner.on_promotion.append(db_state.save) + execute(runner.run()) # single task log.info('exiting') diff --git a/src/dexorder/blockchain/__init__.py b/src/dexorder/blockchain/__init__.py index 0458fd4..ec80d01 100644 --- a/src/dexorder/blockchain/__init__.py +++ b/src/dexorder/blockchain/__init__.py @@ -1,3 +1,3 @@ from .by_blockchain import ByBlockchainDict, ByBlockchainList, ByBlockchainCollection from .connection import connect -from dexorder.base.chain import Ethereum, Polygon, Goerli, Mumbai, ArbitrumOne, BSC +from dexorder.base.chain import Ethereum, Polygon, Goerli, Mumbai, Arbitrum, BSC diff --git a/src/dexorder/blockchain/connection.py b/src/dexorder/blockchain/connection.py index 6f81ca3..c21651f 100644 --- a/src/dexorder/blockchain/connection.py +++ b/src/dexorder/blockchain/connection.py @@ -8,17 +8,7 @@ from ..configuration import resolve_rpc_url from ..configuration.resolve import resolve_ws_url -_w3 = ContextVar('w3') - -class W3: - @staticmethod - def cur() -> AsyncWeb3: - return _w3.get() - - @staticmethod - def set_cur(value:AsyncWeb3): - _w3.set(value) - +current_w3 = ContextVar('current_w3') def connect(rpc_url=None): @@ -28,7 +18,7 @@ def connect(rpc_url=None): use create_w3() and set w3.eth.default_account separately """ w3 = create_w3(rpc_url) - W3.set_cur(w3) + current_w3.set(w3) return w3 diff --git a/src/dexorder/blockstate/__init__.py b/src/dexorder/blockstate/__init__.py new file mode 100644 index 0000000..1dafa73 --- /dev/null +++ b/src/dexorder/blockstate/__init__.py @@ -0,0 +1,70 @@ +from .diff import DiffEntry, DiffItem, DELETE +from .state import BlockState, current_blockstate +from .blockdata import BlockDict, BlockSet + + +def _test(): + + def B(height, hash:str, parent): + return Block(chain=1337, height=height, hash=hash.encode('utf8'), parent=None if parent is None else parent.hash, data=None) + + root_block = B(10, '#root', None ) + state = BlockState(root_block) + current_blockstate.set(state) + b11 = B(11, '#b11', parent=root_block) + f11: Fork = state.add_block(b11) + print('f11',f11) + b11b = B(11, '#b11b', parent=root_block) + f11b: Fork = state.add_block(b11b) + print('f11b',f11b) + b12 = B(12, '#b12', parent=b11) + f12: Fork = state.add_block(b12) + print('f12',f12) + + d = BlockDict('ser') + + def dump(): + print() + print(current_fork.get().hash if current_fork.get() is not None else 'root') + for k,v in d.items(): + print(f'{k} = {v}') + + current_fork.set(None) # Use None to set values on root + d['foo'] = 'bar' + d['test'] = 'failed' + + current_fork.set(f11) + d['foo2'] = 'bar2' + del d['test'] + + current_fork.set(f11b) + del d['foo2'] + d['foob'] = 'barb' + + current_fork.set(f12) + d['test'] = 'ok' + + for f in (None, f11, f11b, f12): + current_fork.set(f) + dump() + + print() + print('all b12 diffs') + for i in state.collect_diffs(b12): + print(i) + + print() + print('promoting b11') + state.promote_root(f11) + current_fork.set(f12) + dump() + + print() + print('promoting b12') + state.promote_root(f12) + current_fork.set(f12) + dump() + + +if __name__ == '__main__': + _test() diff --git a/src/dexorder/blockstate/blockdata.py b/src/dexorder/blockstate/blockdata.py new file mode 100644 index 0000000..24cace1 --- /dev/null +++ b/src/dexorder/blockstate/blockdata.py @@ -0,0 +1,101 @@ +import logging +from collections import defaultdict +from enum import Enum +from typing import TypeVar, Generic, Iterable + +from dexorder import NARG +from dexorder.base.fork import current_fork +from .diff import DELETE +from .state import current_blockstate + +log = logging.getLogger(__name__) +T = TypeVar('T') + + +class BlockData: + class Type (Enum): + SCALAR:int = 0 + SET:int = 1 + LIST:int = 2 + DICT:int = 3 + + registry: dict[str,'BlockData'] = {} # series name and instance + by_tag: dict[str, list['BlockData']] = defaultdict(list) + + def __init__(self, series:str, data_type: Type, **tags): + assert series not in BlockData.registry + BlockData.registry[series] = self + self.series = series + self.type = data_type + for tag, value in tags.items(): + if value: + BlockData.by_tag[tag].append(self) + + def setitem(self, item, value, overwrite=True): + state = current_blockstate.get() + fork = current_fork.get() + state.set(fork, self.series, item, value, overwrite) + + def getitem(self, item, default=NARG): + state = current_blockstate.get() + fork = current_fork.get() + return state.get(fork, self.series, item, default) + + def delitem(self, item, overwrite=True): + self.setitem(item, DELETE, overwrite) + + def contains(self, item): + try: + self.getitem(item) + return True + except KeyError: # getitem with no default will raise on a missing item + return False + + @staticmethod + def iter_items(series_key): + state = current_blockstate.get() + fork = current_fork.get() + return state.iteritems(fork, series_key) + + +class BlockSet(Generic[T], Iterable[T], BlockData): + def __init__(self, series: str, **tags): + super().__init__(series, BlockData.Type.SET, **tags) + self.series = series + + def add(self, item): + """ set-like semantics. the item key is added with a value of None. """ + self.setitem(item, None, overwrite=False) + + def __delitem__(self, item): + self.delitem(item, overwrite=False) + + def __contains__(self, item): + return self.contains(item) + + def __iter__(self): + yield from (k for k,v in self.iter_items(self.series)) + + +class BlockDict(Generic[T], BlockData): + + def __init__(self, series: str, **tags): + super().__init__(series, BlockData.Type.DICT, **tags) + + def __setitem__(self, item, value): + self.setitem(item, value) + + def __getitem__(self, item): + return self.getitem(item) + + def __delitem__(self, item): + self.delitem(item) + + def __contains__(self, item): + return self.contains(item) + + def items(self): + return self.iter_items(self.series) + + def get(self, item, default=None): + return self.getitem(item, default) diff --git a/src/dexorder/blockstate/db_state.py b/src/dexorder/blockstate/db_state.py new file mode 100644 index 0000000..b0435f1 --- /dev/null +++ b/src/dexorder/blockstate/db_state.py @@ -0,0 +1,79 @@ +import logging +from typing import Iterable, Optional, Union + +from . import DiffItem, BlockSet, BlockDict, DELETE, BlockState, current_blockstate +from .blockdata import BlockData +from .. import db +from ..base.chain import current_chain +from ..base.fork import current_fork +from ..database.model import SeriesSet, SeriesDict, Block +from ..database.model.block import current_block, latest_block, completed_block +from ..util import keystr, strkey, hexbytes + +log = logging.getLogger(__name__) + + +class DbState: + def __init__(self, series_or_datavars: Iterable[Union[str,BlockData]]): + self.types = { + (d:=BlockData.registry[x] if type(x) is str else x).series:d.type + for x in series_or_datavars + } + + def save(self, root_block: Block, diffs: Iterable[DiffItem] ): + chain_id = current_chain.get().chain_id + for diff in diffs: + try: + t = self.types[diff.series] + except KeyError: + continue + diffseries = keystr(diff.series) + diffkey = keystr(diff.key) + key = dict(chain=chain_id, series=diffseries, key=diffkey) + if diff.entry.value is DELETE: + Entity = SeriesSet if t == BlockData.Type.SET else SeriesDict if t == BlockData.Type.DICT else None + db.session.query(Entity).filter(Entity.chain==chain_id, Entity.series==diffseries, Entity.key==diffkey).delete() + else: + # upsert + if t == BlockData.Type.SET: + found = db.session.get(SeriesSet, key) + if found is None: + db.session.add(SeriesSet(**key)) + elif t == BlockData.Type.DICT: + found = db.session.get(SeriesDict, key) + if found is None: + db.session.add(SeriesDict(**key, value=diff.entry.value)) + else: + found.value = diff.entry.value + else: + raise NotImplementedError + db.kv[f'root_block.{root_block.chain}'] = [root_block.height, root_block.hash] + + # noinspection PyShadowingBuiltins + def load(self) -> Optional[BlockState]: + chain_id = current_chain.get().chain_id + try: + height, hash = db.kv[f'root_block.{chain_id}'] + except (KeyError, ValueError): + return None + root_block = db.session.get(Block, dict(chain=chain_id, height=height, hash=hexbytes(hash))) + if root_block is None: + return None + current_block.set(root_block) + latest_block.set(root_block) + state = BlockState(root_block) + current_blockstate.set(state) + current_fork.set(None) # root fork + for series, t in self.types.items(): + if t == BlockData.Type.SET: + # noinspection PyTypeChecker + var: BlockSet = BlockData.registry[series] + for row in db.session.query(SeriesSet).where(SeriesSet.series==keystr(series)): + var.add(strkey(row.key)) + elif t == BlockData.Type.DICT: + # noinspection PyTypeChecker + var: BlockDict = BlockData.registry[series] + for row in db.session.query(SeriesDict).where(SeriesSet.series==keystr(series)): + var[strkey(row.key)] = row.value + completed_block.set(root_block) + return state diff --git a/src/dexorder/blockstate/diff.py b/src/dexorder/blockstate/diff.py new file mode 100644 index 0000000..280902b --- /dev/null +++ b/src/dexorder/blockstate/diff.py @@ -0,0 +1,22 @@ +from dataclasses import dataclass +from typing import Union, Any + + +DELETE = object() # used as a value token to indicate removal of the key + + +@dataclass +class DiffEntry: + value: Union[Any, DELETE] + height: int + hash: bytes + + +@dataclass +class DiffItem: + series: Any + key: Any + entry: DiffEntry + + def __str__(self): + return f'{self.entry.hash.hex()} {self.series}.{self.key}={"[DEL]" if self.entry.value is DELETE else self.entry.value}' diff --git a/src/dexorder/base/blockstate.py b/src/dexorder/blockstate/state.py similarity index 58% rename from src/dexorder/base/blockstate.py rename to src/dexorder/blockstate/state.py index 54e4d9d..583df89 100644 --- a/src/dexorder/base/blockstate.py +++ b/src/dexorder/blockstate/state.py @@ -2,96 +2,20 @@ import itertools import logging from collections import defaultdict from contextvars import ContextVar -from dataclasses import dataclass -from enum import Enum -from typing import Union, TypeVar, Generic, Any, Optional, Iterable +from typing import Any, Optional, Union from sortedcontainers import SortedList from dexorder import NARG -from dexorder.database.model.block import Block +from dexorder.base.fork import Fork, DisjointFork +from dexorder.database.model import Block from dexorder.util import hexstr +from .diff import DiffEntry, DiffItem, DELETE log = logging.getLogger(__name__) -@dataclass -class DiffEntry: - value: Union[Any, 'BlockState.DELETE'] - height: int - hash: bytes - - -@dataclass -class DiffItem: - series: Any - key: Any - entry: DiffEntry - - def __str__(self): - return f'{self.entry.hash} {self.series}.{self.key}={"[DEL]" if self.entry.value is BlockState.DELETE else self.entry.value}' - - -class Fork: - """ - A Fork is an ancestor path, stored as block hashes in reverse-chronological order from the "current" block at the start to ancestors at the end. The - getitem [] operator indexes by block height for positive values, while negative value are relative to the latest block, so [-1] is the latest - block and [-2] is its parent, etc. - """ - - def __init__(self, ancestry: Iterable[bytes], *, height: int): - self.ancestry = list(ancestry) - self.height = height - self.disjoint = False - - def __contains__(self, item): - index = self.height - item.height - if index < 0: - return False - try: - return self.ancestry[index] == item.hash - except IndexError: - return False - - @property - def hash(self): - return self.ancestry[0] - - @property - def parent(self): - return self.ancestry[1] - - def for_height(self, height): - """ returns a new Fork object for an older block along this fork. used for root promotion. """ - assert( self.height - len(self.ancestry) < height <= self.height) - return Fork(self.ancestry[self.height-height:], height=height) - - def __str__(self): - return f'{self.height}_[{"->".join(h.hex() for h in self.ancestry)}]' - - -current_fork = ContextVar[Optional[Fork]]('current_fork', default=None) - - -class DisjointFork: - """ - duck type of Fork for blocks that connect directly to root with a parent gap in-between - """ - def __init__(self, block: Block, root: Block): - self.height = block.height - self.hash = block.hash - self.parent = root.hash - self.disjoint = True - - def __contains__(self, item): - return item.hash in (self.hash, self.parent) - - def __str__(self): - return f'{self.height}_[{self.hash.hex()}->{self.parent.hex()}]' - - class BlockState: - DELETE = object() by_chain: dict[int, 'BlockState'] = {} @@ -134,7 +58,7 @@ class BlockState: return self.fork(block) - def delete_block(self, block: Union[Block,Fork,bytes]): + def delete_block(self, block: Union[Block, Fork,bytes]): """ if there was an error during block processing, we need to remove the incomplete block data """ try: block = block.hash @@ -178,7 +102,7 @@ class BlockState: return default diffs: list[DiffEntry] = series_diffs.get(key, []) value = self._get_from_diffs(fork, diffs) - if value is not BlockState.DELETE: + if value is not DELETE: return value # value not found or was DELETE if default is NARG: @@ -188,13 +112,13 @@ class BlockState: def _get_from_diffs(self, fork, diffs): for diff in reversed(diffs): if diff.height <= self.root_block.height or fork is not None and diff in fork: - if diff.value is BlockState.DELETE: + if diff.value is DELETE: break else: if self.root_block not in fork: # todo move this assertion elsewhere so it runs once per task raise ValueError(f'Cannot get value for a non-root fork {hexstr(fork.hash)}') return diff.value - return BlockState.DELETE + return DELETE def set(self, fork: Optional[Fork], series, key, value, overwrite=True): @@ -211,7 +135,7 @@ class BlockState: for k, difflist in self.diffs_by_series.get(series, {}).items(): for diff in reversed(difflist): if diff.height <= self.root_block.height or fork is not None and diff in fork: - if diff.value is not BlockState.DELETE: + if diff.value is not DELETE: yield k, diff.value break @@ -255,7 +179,7 @@ class BlockState: while len(difflist) >= 2 and difflist[1].height <= new_root_fork.height: difflist.pop(0) # if only one diff remains, and it's old, and it's a delete, then we can actually delete the diff list - if not difflist or len(difflist) == 1 and difflist[0].value == BlockState.DELETE and difflist[0].height <= new_root_fork.height: + if not difflist or len(difflist) == 1 and difflist[0].value == DELETE and difflist[0].height <= new_root_fork.height: del self.diffs_by_series[s][k] del self.by_hash[self.root_block.hash] # old root block @@ -291,157 +215,3 @@ class BlockState: current_blockstate = ContextVar[BlockState]('current_blockstate') - -T = TypeVar('T') - - -class BlockData: - class Type (Enum): - SCALAR:int = 0 - SET:int = 1 - LIST:int = 2 - DICT:int = 3 - - registry: dict[str,'BlockData'] = {} # series name and instance - - def __init__(self, series:str, data_type: Type): - assert series not in BlockData.registry - BlockData.registry[series] = self - self.series = series - self.type = data_type - - def setitem(self, item, value, overwrite=True): - state = current_blockstate.get() - fork = current_fork.get() - state.set(fork, self.series, item, value, overwrite) - - def getitem(self, item, default=NARG): - state = current_blockstate.get() - fork = current_fork.get() - return state.get(fork, self.series, item, default) - - def delitem(self, item, overwrite=True): - self.setitem(item, BlockState.DELETE, overwrite) - - def contains(self, item): - try: - self.getitem(item) - return True - except KeyError: # getitem with no default will raise on a missing item - return False - - @staticmethod - def iter_items(series_key): - state = current_blockstate.get() - fork = current_fork.get() - return state.iteritems(fork, series_key) - - -class BlockSet(Generic[T], Iterable[T], BlockData): - def __init__(self, series: str): - super().__init__(series, BlockData.Type.SET) - self.series = series - - def add(self, item): - """ set-like semantics. the item key is added with a value of None. """ - self.setitem(item, None, overwrite=False) - - def __delitem__(self, item): - self.delitem(item, overwrite=False) - - def __contains__(self, item): - return self.contains(item) - - def __iter__(self): - yield from (k for k,v in self.iter_items(self.series)) - - -class BlockDict(Generic[T], BlockData): - - def __init__(self, series: str): - super().__init__(series, BlockData.Type.DICT) - - def __setitem__(self, item, value): - self.setitem(item, value) - - def __getitem__(self, item): - return self.getitem(item) - - def __delitem__(self, item): - self.delitem(item) - - def __contains__(self, item): - return self.contains(item) - - def items(self): - return self.iter_items(self.series) - - def get(self, item, default=None): - return self.getitem(item, default) - - -def _test(): - - def B(height, hash:str, parent): - return Block(chain=1337, height=height, hash=hash.encode('utf8'), parent=None if parent is None else parent.hash, data=None) - - root_block = B(10, '#root', None ) - state = BlockState(root_block) - current_blockstate.set(state) - b11 = B(11, '#b11', parent=root_block) - f11: Fork = state.add_block(b11) - print('f11',f11) - b11b = B(11, '#b11b', parent=root_block) - f11b: Fork = state.add_block(b11b) - print('f11b',f11b) - b12 = B(12, '#b12', parent=b11) - f12: Fork = state.add_block(b12) - print('f12',f12) - - d = BlockDict('ser') - - def dump(): - print() - print(current_fork.get().hash if current_fork.get() is not None else 'root') - for k,v in d.items(): - print(f'{k} = {v}') - - current_fork.set(None) # Use None to set values on root - d['foo'] = 'bar' - d['test'] = 'failed' - - current_fork.set(f11) - d['foo2'] = 'bar2' - del d['test'] - - current_fork.set(f11b) - del d['foo2'] - d['foob'] = 'barb' - - current_fork.set(f12) - d['test'] = 'ok' - - for f in (None, f11, f11b, f12): - current_fork.set(f) - dump() - - print() - print('all b12 diffs') - for i in state.collect_diffs(b12): - print(i) - - print() - print('promoting b11') - state.promote_root(f11) - current_fork.set(f12) - dump() - - print() - print('promoting b12') - state.promote_root(f12) - current_fork.set(f12) - dump() - - -if __name__ == '__main__': - _test() diff --git a/src/dexorder/data.py b/src/dexorder/data.py deleted file mode 100644 index 998e09c..0000000 --- a/src/dexorder/data.py +++ /dev/null @@ -1,8 +0,0 @@ -from dexorder.base.blockstate import BlockSet, BlockDict - -vault_addresses = BlockSet('v') -vault_tokens = BlockDict('vt') -underfunded_vaults = BlockSet('uv') -active_orders = BlockSet('a') -pool_prices = BlockDict('p') - diff --git a/src/dexorder/data/__init__.py b/src/dexorder/data/__init__.py new file mode 100644 index 0000000..e6398bc --- /dev/null +++ b/src/dexorder/data/__init__.py @@ -0,0 +1,7 @@ +from dexorder.blockstate import BlockSet, BlockDict + +vault_addresses = BlockSet('v', db=True, redis=True) +vault_tokens = BlockDict('vt', db=True, redis=True) +pool_prices = BlockDict('p', db=True, redis=True) +underfunded_vaults = BlockSet('uv', db=True) +active_orders = BlockSet('a', db=True) diff --git a/src/dexorder/database/__init__.py b/src/dexorder/database/__init__.py index 709721a..766bda3 100644 --- a/src/dexorder/database/__init__.py +++ b/src/dexorder/database/__init__.py @@ -1,3 +1,4 @@ +from dexorder.util import json import logging from contextvars import ContextVar @@ -6,6 +7,7 @@ from sqlalchemy import Engine from sqlalchemy.orm import Session, SessionTransaction from .migrate import migrate_database +from .model.kv import KeyValue from .. import config log = logging.getLogger(__name__) @@ -14,7 +16,38 @@ _engine = ContextVar[Engine]('engine', default=None) _session = ContextVar[Session]('session', default=None) +# Key-value store in DB for general metadata use +class Kv: + def __getitem__(self, key: str): + found = db.session.get(KeyValue, key) + if found is None: + raise KeyError + return found.value + + def __setitem__(self, key: str, value): + found = db.session.get(KeyValue, key) + if found is None: + db.session.add(KeyValue(key=key, value=value)) + else: + found.value = value + + def __delitem__(self, key: str): + db.session.query(KeyValue).filter(KeyValue.key == key).delete() + + def get(self, key: str, default=None): + try: + return self[key] + except KeyError: + return default + + class Db: + def __init__(self): + self.kv = Kv() + + def __bool__(self): + return bool(config.db_url) + def transaction(self) -> SessionTransaction: """ this type of block should be at the top-level of any group of db operations. it will automatically commit @@ -54,7 +87,7 @@ class Db: url = config.db_url if dump_sql is None: dump_sql = config.dump_sql - engine = sqlalchemy.create_engine(url, echo=dump_sql) + engine = sqlalchemy.create_engine(url, echo=dump_sql, json_serializer=json.dumps, json_deserializer=json.loads) if migrate: migrate_database() with engine.connect() as connection: diff --git a/src/dexorder/database/column.py b/src/dexorder/database/column.py index 7f60ce9..c803b83 100644 --- a/src/dexorder/database/column.py +++ b/src/dexorder/database/column.py @@ -1,6 +1,8 @@ +from typing import Union + from hexbytes import HexBytes from sqlalchemy import SMALLINT, INTEGER, BIGINT -from sqlalchemy.dialects.postgresql import BYTEA +from sqlalchemy.dialects.postgresql import BYTEA, JSONB from sqlalchemy.orm import mapped_column from typing_extensions import Annotated @@ -83,6 +85,8 @@ BlockCol = Annotated[int, mapped_column(BIGINT)] Blockchain = Annotated[NativeBlockchain, mapped_column(t.Blockchain)] +Json = Annotated[Union[str,int,float,list,dict,None], mapped_column(JSONB)] + # Uniswap aliases Tick = Int24 SqrtPriceX96 = Uint160 diff --git a/src/dexorder/database/model/__init__.py b/src/dexorder/database/model/__init__.py index ffeb25f..d8a6b48 100644 --- a/src/dexorder/database/model/__init__.py +++ b/src/dexorder/database/model/__init__.py @@ -1,2 +1,3 @@ from .base import Base from .block import Block +from .series import SeriesSet, SeriesDict diff --git a/src/dexorder/database/model/block.py b/src/dexorder/database/model/block.py index 4a70a8d..08ab7e7 100644 --- a/src/dexorder/database/model/block.py +++ b/src/dexorder/database/model/block.py @@ -9,13 +9,13 @@ from dexorder.database.model import Base class Block(Base): chain: Mapped[int] = mapped_column(primary_key=True) - height: Mapped[int] = mapped_column(primary_key=True) # timescaledb index + height: Mapped[int] = mapped_column(primary_key=True) hash: Mapped[bytes] = mapped_column(primary_key=True) parent: Mapped[bytes] - data: Mapped[dict] = mapped_column('data',JSONB) + data: Mapped[dict] = mapped_column(JSONB) def __str__(self): - return f'{self.height}_{self.hash.hex()}' + return f'{self.height}_{self.hash.hex()[:5]}' current_block = ContextVar[Block]('Block.cur') # block for the current thread diff --git a/src/dexorder/database/model/kv.py b/src/dexorder/database/model/kv.py new file mode 100644 index 0000000..ab80472 --- /dev/null +++ b/src/dexorder/database/model/kv.py @@ -0,0 +1,13 @@ +import logging + +from sqlalchemy.orm import Mapped, mapped_column + +from dexorder.database.column import Json +from dexorder.database.model import Base + +log = logging.getLogger(__name__) + + +class KeyValue (Base): + key: Mapped[str] = mapped_column(primary_key=True) + value: Mapped[Json] diff --git a/src/dexorder/database/model/series.py b/src/dexorder/database/model/series.py new file mode 100644 index 0000000..f6ac91d --- /dev/null +++ b/src/dexorder/database/model/series.py @@ -0,0 +1,21 @@ +import logging +from typing import Union + +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.orm import mapped_column, Mapped + +from dexorder.database.column import Json +from dexorder.database.model import Base + +log = logging.getLogger(__name__) + +class SeriesBase: + chain: Mapped[int] = mapped_column(primary_key=True) + series: Mapped[str] = mapped_column(primary_key=True) + key: Mapped[str] = mapped_column(primary_key=True) + +class SeriesSet (SeriesBase, Base): + pass + +class SeriesDict (SeriesBase, Base): + value: Mapped[Json] diff --git a/src/dexorder/trigger_runner.py b/src/dexorder/runner.py similarity index 90% rename from src/dexorder/trigger_runner.py rename to src/dexorder/runner.py index e2e3c18..f267229 100644 --- a/src/dexorder/trigger_runner.py +++ b/src/dexorder/runner.py @@ -7,9 +7,11 @@ from web3.exceptions import LogTopicError from web3.types import EventData from dexorder import Blockchain, db, blockchain, NARG, dec -from dexorder.base.blockstate import BlockState, BlockDict, Fork, DiffItem, BlockSet, current_blockstate, current_fork +from dexorder.base.chain import current_chain +from dexorder.base.fork import Fork, current_fork from dexorder.blockchain.connection import create_w3_ws from dexorder.blockchain.util import get_contract_data +from dexorder.blockstate import DiffItem, BlockState, current_blockstate from dexorder.data import pool_prices, vault_tokens, underfunded_vaults, vault_addresses from dexorder.database.model import Block from dexorder.database.model.block import current_block, latest_block @@ -19,10 +21,15 @@ log = logging.getLogger(__name__) # todo detect reorgs and generate correct onHeadUpdate set by unioning the changes along the two forks, not including their common ancestor deltas -class TriggerRunner: +class BlockStateRunner: - def __init__(self): - self.root_age = 10 # todo set per chain + def __init__(self, state: BlockState = None): + """ + If state is None, then it is initialized as empty using the first block seen as the root block. Then the second block begins log event handling. + """ + self.state = state + + # items are (callback, event, log_filter). The callback is invoked with web3 EventData for every detected event self.events:list[tuple[Callable[[dict],None],ContractEvents,dict]] = [] # onHeadUpdate callbacks are invoked with a list of DiffItems used to update the head state from either the previous head or the root @@ -31,13 +38,14 @@ class TriggerRunner: # onPromotion callbacks are invoked with a list of DiffItems used to advance the root state self.on_promotion: list[Callable[[Block,list[DiffItem]],None]] = [] + async def run(self): """ 1. load root stateBlockchain a. if no root, init from head b. if root is old, batch forward by height 2. discover new heads - 2b. find in-memory ancestor else use root + 2b. find in-state parent block else use root 3. context = ancestor->head diff 4. query global log filter 5. process new vaults @@ -54,15 +62,14 @@ class TriggerRunner: 15. on tx confirmation, the block height of all executed trigger requests is set to the tx block """ - db.connect() w3 = blockchain.connect() w3ws = create_w3_ws() chain_id = await w3ws.eth.chain_id - Blockchain.set_cur(Blockchain.for_id(chain_id)) + chain = Blockchain.for_id(chain_id) + current_chain.set(chain) - # todo load root - state = None + state = self.state async with w3ws as w3ws: await w3ws.eth.subscribe('newHeads') while True: @@ -124,7 +131,7 @@ class TriggerRunner: callback(block, diff_items) # check for root promotion - promotion_height = fork.height - self.root_age + promotion_height = fork.height - chain.confirms if not fork.disjoint and promotion_height > state.root_block.height: diff_items = state.promote_root(fork.for_height(promotion_height)) for callback in self.on_promotion: diff --git a/src/dexorder/util/__init__.py b/src/dexorder/util/__init__.py index 6e9235b..d01b67e 100644 --- a/src/dexorder/util/__init__.py +++ b/src/dexorder/util/__init__.py @@ -32,6 +32,19 @@ def hexbytes(value: str): """ converts an optionally 0x-prefixed hex string into bytes """ return bytes.fromhex(value[2:] if value.startswith('0x') else value) +def keystr(value): + if type(value) is str: + return value + if type(value) is HexBytes: + return value.hex() + if type(value) is bytes: + return '0x' + value.hex() + return str(value) + +def strkey(s): + if s.startswith('0x'): + return hexbytes(s) + return s def topic(event_abi): event_name = f'{event_abi["name"]}(' + ','.join(i['type'] for i in event_abi['inputs']) + ')' diff --git a/src/dexorder/util/json.py b/src/dexorder/util/json.py index b452a20..2b46334 100644 --- a/src/dexorder/util/json.py +++ b/src/dexorder/util/json.py @@ -1,19 +1,24 @@ +from decimal import Decimal + from hexbytes import HexBytes from orjson import orjson from web3.datastructures import AttributeDict def _serialize(v): - # todo wrap json.dumps() if type(v) is HexBytes: return v.hex() - if type(v) is AttributeDict: + elif type(v) is bytes: + return '0x' + v.hex() + elif type(v) is AttributeDict: return v.__dict__ - raise ValueError(v) + elif type(v) is Decimal: + return f'{v:f}' + raise TypeError def loads(s): return orjson.loads(s) def dumps(obj): - return orjson.dumps(obj, default=_serialize) + return orjson.dumps(obj, default=_serialize, option=orjson.OPT_PASSTHROUGH_SUBCLASS).decode('utf8')