This commit is contained in:
Tim Olson
2023-09-28 23:20:55 -04:00
parent f22841e93e
commit 6ab486ac18
24 changed files with 556 additions and 340 deletions

View File

@@ -1,44 +0,0 @@
"""empty message
Revision ID: 1da309899f95
Revises:
Create Date: 2023-09-18 19:18:11.706312
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = '1da309899f95'
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('block',
sa.Column('chain', sa.Integer(), nullable=False),
sa.Column('height', sa.Integer(), nullable=False),
sa.Column('hash', sa.LargeBinary(), nullable=False),
sa.Column('parent', sa.LargeBinary(), nullable=False),
sa.Column('data', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.PrimaryKeyConstraint('chain', 'height', 'hash')
)
op.drop_table('migrations')
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('migrations',
sa.Column('id', sa.INTEGER(), autoincrement=True, nullable=False),
sa.Column('name', sa.VARCHAR(length=255), autoincrement=False, nullable=False),
sa.Column('run_on', postgresql.TIMESTAMP(), autoincrement=False, nullable=False),
sa.PrimaryKeyConstraint('id', name='migrations_pkey')
)
op.drop_table('block')
# ### end Alembic commands ###

View File

@@ -0,0 +1,55 @@
"""empty message
Revision ID: db62e7db828d
Revises:
Create Date: 2023-09-28 23:04:41.020644
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
import dexorder.database
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = 'db62e7db828d'
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table('block',
sa.Column('chain', sa.Integer(), nullable=False),
sa.Column('height', sa.Integer(), nullable=False),
sa.Column('hash', sa.LargeBinary(), nullable=False),
sa.Column('parent', sa.LargeBinary(), nullable=False),
sa.Column('data', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.PrimaryKeyConstraint('chain', 'height', 'hash')
)
op.create_table('keyvalue',
sa.Column('key', sa.String(), nullable=False),
sa.Column('value', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.PrimaryKeyConstraint('key')
)
op.create_table('seriesdict',
sa.Column('value', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.Column('chain', sa.Integer(), nullable=False),
sa.Column('series', sa.String(), nullable=False),
sa.Column('key', sa.String(), nullable=False),
sa.PrimaryKeyConstraint('chain', 'series', 'key')
)
op.create_table('seriesset',
sa.Column('chain', sa.Integer(), nullable=False),
sa.Column('series', sa.String(), nullable=False),
sa.Column('key', sa.String(), nullable=False),
sa.PrimaryKeyConstraint('chain', 'series', 'key')
)
def downgrade() -> None:
op.drop_table('seriesset')
op.drop_table('seriesdict')
op.drop_table('keyvalue')
op.drop_table('block')

View File

@@ -2,14 +2,6 @@ from contextvars import ContextVar
class Blockchain: class Blockchain:
@staticmethod
def cur() -> 'Blockchain':
return _cur.get()
@staticmethod
def set_cur(value: 'Blockchain'):
_cur.set(value)
@staticmethod @staticmethod
def for_id(chain_id): def for_id(chain_id):
result = Blockchain._instances_by_id.get(chain_id) result = Blockchain._instances_by_id.get(chain_id)
@@ -25,9 +17,13 @@ class Blockchain:
def get(name_or_id): def get(name_or_id):
return Blockchain.for_name(name_or_id) if type(name_or_id) is str else Blockchain.for_id(name_or_id) return Blockchain.for_name(name_or_id) if type(name_or_id) is str else Blockchain.for_id(name_or_id)
def __init__(self, chain_id, name): def __init__(self, chain_id, name, confirms=10):
"""
confirms is the number of blocks until a block can be considered finalized and unforkable
"""
self.chain_id = chain_id self.chain_id = chain_id
self.name = name self.name = name
self.confirms = confirms
Blockchain._instances_by_id[chain_id] = self Blockchain._instances_by_id[chain_id] = self
Blockchain._instances_by_name[name] = self Blockchain._instances_by_name[name] = self
@@ -45,6 +41,6 @@ Goerli = Blockchain(5, 'Goerli')
Polygon = Blockchain(137, 'Polygon') # POS not zkEVM Polygon = Blockchain(137, 'Polygon') # POS not zkEVM
Mumbai = Blockchain(80001, 'Mumbai') Mumbai = Blockchain(80001, 'Mumbai')
BSC = Blockchain(56, 'BSC') BSC = Blockchain(56, 'BSC')
Arbitrum = ArbitrumOne = Blockchain(42161, 'ArbitrumOne') Arbitrum = Blockchain(42161, 'Arbitrum', 10)
_cur = ContextVar[Blockchain]('Blockchain.cur') current_chain = ContextVar[Blockchain]('current_chain')

65
src/dexorder/base/fork.py Normal file
View File

@@ -0,0 +1,65 @@
import logging
from contextvars import ContextVar
from typing import Iterable, Optional
from dexorder.database.model import Block
log = logging.getLogger(__name__)
class Fork:
"""
A Fork is an ancestor path, stored as block hashes in reverse-chronological order from the "current" block at the start to ancestors at the end. The
getitem [] operator indexes by block height for positive values, while negative value are relative to the latest block, so [-1] is the latest
block and [-2] is its parent, etc.
"""
def __init__(self, ancestry: Iterable[bytes], *, height: int):
self.ancestry = list(ancestry)
self.height = height
self.disjoint = False
def __contains__(self, item):
index = self.height - item.height
if index < 0:
return False
try:
return self.ancestry[index] == item.hash
except IndexError:
return False
@property
def hash(self):
return self.ancestry[0]
@property
def parent(self):
return self.ancestry[1]
def for_height(self, height):
""" returns a new Fork object for an older block along this fork. used for root promotion. """
assert( self.height - len(self.ancestry) < height <= self.height)
return Fork(self.ancestry[self.height-height:], height=height)
def __str__(self):
return f'{self.height}_[{"->".join(h.hex() for h in self.ancestry)}]'
class DisjointFork:
"""
duck type of Fork for blocks that connect directly to root with a parent gap in-between
"""
def __init__(self, block: Block, root: Block):
self.height = block.height
self.hash = block.hash
self.parent = root.hash
self.disjoint = True
def __contains__(self, item):
return item.hash in (self.hash, self.parent)
def __str__(self):
return f'{self.height}_[{self.hash.hex()}->{self.parent.hex()}]'
current_fork = ContextVar[Optional[Fork]]('current_fork', default=None)

View File

@@ -6,7 +6,7 @@ from web3 import Web3
from dexorder import config, Blockchain, NARG, FixedDecimals, ADDRESS_0 from dexorder import config, Blockchain, NARG, FixedDecimals, ADDRESS_0
from dexorder.blockchain import ByBlockchainDict from dexorder.blockchain import ByBlockchainDict
from dexorder.base.chain import Polygon, ArbitrumOne, Ethereum from dexorder.base.chain import Polygon, Arbitrum, Ethereum
from dexorder.contract import ContractProxy, abis from dexorder.contract import ContractProxy, abis
import dexorder.database.column as col import dexorder.database.column as col

View File

@@ -1,16 +1,32 @@
import logging import logging
from dexorder import db, config, Blockchain
from dexorder.base.chain import current_chain
from dexorder.bin.executable import execute from dexorder.bin.executable import execute
from dexorder.trigger_runner import TriggerRunner from dexorder.blockstate.blockdata import BlockData
from dexorder.blockstate.db_state import DbState
from dexorder.configuration import parse_args
from dexorder.runner import BlockStateRunner
from dexorder.data import pool_prices, vault_tokens, vault_addresses, underfunded_vaults, active_orders
log = logging.getLogger('dexorder') log = logging.getLogger('dexorder')
ROOT_AGE = 10 # todo set per chain
if __name__ == '__main__': if __name__ == '__main__':
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
log = logging.getLogger('dexorder') log = logging.getLogger('dexorder')
log.setLevel(logging.DEBUG) log.setLevel(logging.DEBUG)
execute(TriggerRunner().run()) parse_args()
current_chain.set(Blockchain.get(config.chain))
state = None
if db:
db.connect()
db_state = DbState(BlockData.by_tag['db'])
with db.session:
state = db_state.load()
runner = BlockStateRunner(state)
if db:
# noinspection PyUnboundLocalVariable
runner.on_promotion.append(db_state.save)
execute(runner.run()) # single task
log.info('exiting') log.info('exiting')

View File

@@ -1,3 +1,3 @@
from .by_blockchain import ByBlockchainDict, ByBlockchainList, ByBlockchainCollection from .by_blockchain import ByBlockchainDict, ByBlockchainList, ByBlockchainCollection
from .connection import connect from .connection import connect
from dexorder.base.chain import Ethereum, Polygon, Goerli, Mumbai, ArbitrumOne, BSC from dexorder.base.chain import Ethereum, Polygon, Goerli, Mumbai, Arbitrum, BSC

View File

@@ -8,17 +8,7 @@ from ..configuration import resolve_rpc_url
from ..configuration.resolve import resolve_ws_url from ..configuration.resolve import resolve_ws_url
_w3 = ContextVar('w3') current_w3 = ContextVar('current_w3')
class W3:
@staticmethod
def cur() -> AsyncWeb3:
return _w3.get()
@staticmethod
def set_cur(value:AsyncWeb3):
_w3.set(value)
def connect(rpc_url=None): def connect(rpc_url=None):
@@ -28,7 +18,7 @@ def connect(rpc_url=None):
use create_w3() and set w3.eth.default_account separately use create_w3() and set w3.eth.default_account separately
""" """
w3 = create_w3(rpc_url) w3 = create_w3(rpc_url)
W3.set_cur(w3) current_w3.set(w3)
return w3 return w3

View File

@@ -0,0 +1,70 @@
from .diff import DiffEntry, DiffItem, DELETE
from .state import BlockState, current_blockstate
from .blockdata import BlockDict, BlockSet
def _test():
def B(height, hash:str, parent):
return Block(chain=1337, height=height, hash=hash.encode('utf8'), parent=None if parent is None else parent.hash, data=None)
root_block = B(10, '#root', None )
state = BlockState(root_block)
current_blockstate.set(state)
b11 = B(11, '#b11', parent=root_block)
f11: Fork = state.add_block(b11)
print('f11',f11)
b11b = B(11, '#b11b', parent=root_block)
f11b: Fork = state.add_block(b11b)
print('f11b',f11b)
b12 = B(12, '#b12', parent=b11)
f12: Fork = state.add_block(b12)
print('f12',f12)
d = BlockDict('ser')
def dump():
print()
print(current_fork.get().hash if current_fork.get() is not None else 'root')
for k,v in d.items():
print(f'{k} = {v}')
current_fork.set(None) # Use None to set values on root
d['foo'] = 'bar'
d['test'] = 'failed'
current_fork.set(f11)
d['foo2'] = 'bar2'
del d['test']
current_fork.set(f11b)
del d['foo2']
d['foob'] = 'barb'
current_fork.set(f12)
d['test'] = 'ok'
for f in (None, f11, f11b, f12):
current_fork.set(f)
dump()
print()
print('all b12 diffs')
for i in state.collect_diffs(b12):
print(i)
print()
print('promoting b11')
state.promote_root(f11)
current_fork.set(f12)
dump()
print()
print('promoting b12')
state.promote_root(f12)
current_fork.set(f12)
dump()
if __name__ == '__main__':
_test()

View File

@@ -0,0 +1,101 @@
import logging
from collections import defaultdict
from enum import Enum
from typing import TypeVar, Generic, Iterable
from dexorder import NARG
from dexorder.base.fork import current_fork
from .diff import DELETE
from .state import current_blockstate
log = logging.getLogger(__name__)
T = TypeVar('T')
class BlockData:
class Type (Enum):
SCALAR:int = 0
SET:int = 1
LIST:int = 2
DICT:int = 3
registry: dict[str,'BlockData'] = {} # series name and instance
by_tag: dict[str, list['BlockData']] = defaultdict(list)
def __init__(self, series:str, data_type: Type, **tags):
assert series not in BlockData.registry
BlockData.registry[series] = self
self.series = series
self.type = data_type
for tag, value in tags.items():
if value:
BlockData.by_tag[tag].append(self)
def setitem(self, item, value, overwrite=True):
state = current_blockstate.get()
fork = current_fork.get()
state.set(fork, self.series, item, value, overwrite)
def getitem(self, item, default=NARG):
state = current_blockstate.get()
fork = current_fork.get()
return state.get(fork, self.series, item, default)
def delitem(self, item, overwrite=True):
self.setitem(item, DELETE, overwrite)
def contains(self, item):
try:
self.getitem(item)
return True
except KeyError: # getitem with no default will raise on a missing item
return False
@staticmethod
def iter_items(series_key):
state = current_blockstate.get()
fork = current_fork.get()
return state.iteritems(fork, series_key)
class BlockSet(Generic[T], Iterable[T], BlockData):
def __init__(self, series: str, **tags):
super().__init__(series, BlockData.Type.SET, **tags)
self.series = series
def add(self, item):
""" set-like semantics. the item key is added with a value of None. """
self.setitem(item, None, overwrite=False)
def __delitem__(self, item):
self.delitem(item, overwrite=False)
def __contains__(self, item):
return self.contains(item)
def __iter__(self):
yield from (k for k,v in self.iter_items(self.series))
class BlockDict(Generic[T], BlockData):
def __init__(self, series: str, **tags):
super().__init__(series, BlockData.Type.DICT, **tags)
def __setitem__(self, item, value):
self.setitem(item, value)
def __getitem__(self, item):
return self.getitem(item)
def __delitem__(self, item):
self.delitem(item)
def __contains__(self, item):
return self.contains(item)
def items(self):
return self.iter_items(self.series)
def get(self, item, default=None):
return self.getitem(item, default)

View File

@@ -0,0 +1,79 @@
import logging
from typing import Iterable, Optional, Union
from . import DiffItem, BlockSet, BlockDict, DELETE, BlockState, current_blockstate
from .blockdata import BlockData
from .. import db
from ..base.chain import current_chain
from ..base.fork import current_fork
from ..database.model import SeriesSet, SeriesDict, Block
from ..database.model.block import current_block, latest_block, completed_block
from ..util import keystr, strkey, hexbytes
log = logging.getLogger(__name__)
class DbState:
def __init__(self, series_or_datavars: Iterable[Union[str,BlockData]]):
self.types = {
(d:=BlockData.registry[x] if type(x) is str else x).series:d.type
for x in series_or_datavars
}
def save(self, root_block: Block, diffs: Iterable[DiffItem] ):
chain_id = current_chain.get().chain_id
for diff in diffs:
try:
t = self.types[diff.series]
except KeyError:
continue
diffseries = keystr(diff.series)
diffkey = keystr(diff.key)
key = dict(chain=chain_id, series=diffseries, key=diffkey)
if diff.entry.value is DELETE:
Entity = SeriesSet if t == BlockData.Type.SET else SeriesDict if t == BlockData.Type.DICT else None
db.session.query(Entity).filter(Entity.chain==chain_id, Entity.series==diffseries, Entity.key==diffkey).delete()
else:
# upsert
if t == BlockData.Type.SET:
found = db.session.get(SeriesSet, key)
if found is None:
db.session.add(SeriesSet(**key))
elif t == BlockData.Type.DICT:
found = db.session.get(SeriesDict, key)
if found is None:
db.session.add(SeriesDict(**key, value=diff.entry.value))
else:
found.value = diff.entry.value
else:
raise NotImplementedError
db.kv[f'root_block.{root_block.chain}'] = [root_block.height, root_block.hash]
# noinspection PyShadowingBuiltins
def load(self) -> Optional[BlockState]:
chain_id = current_chain.get().chain_id
try:
height, hash = db.kv[f'root_block.{chain_id}']
except (KeyError, ValueError):
return None
root_block = db.session.get(Block, dict(chain=chain_id, height=height, hash=hexbytes(hash)))
if root_block is None:
return None
current_block.set(root_block)
latest_block.set(root_block)
state = BlockState(root_block)
current_blockstate.set(state)
current_fork.set(None) # root fork
for series, t in self.types.items():
if t == BlockData.Type.SET:
# noinspection PyTypeChecker
var: BlockSet = BlockData.registry[series]
for row in db.session.query(SeriesSet).where(SeriesSet.series==keystr(series)):
var.add(strkey(row.key))
elif t == BlockData.Type.DICT:
# noinspection PyTypeChecker
var: BlockDict = BlockData.registry[series]
for row in db.session.query(SeriesDict).where(SeriesSet.series==keystr(series)):
var[strkey(row.key)] = row.value
completed_block.set(root_block)
return state

View File

@@ -0,0 +1,22 @@
from dataclasses import dataclass
from typing import Union, Any
DELETE = object() # used as a value token to indicate removal of the key
@dataclass
class DiffEntry:
value: Union[Any, DELETE]
height: int
hash: bytes
@dataclass
class DiffItem:
series: Any
key: Any
entry: DiffEntry
def __str__(self):
return f'{self.entry.hash.hex()} {self.series}.{self.key}={"[DEL]" if self.entry.value is DELETE else self.entry.value}'

View File

@@ -2,96 +2,20 @@ import itertools
import logging import logging
from collections import defaultdict from collections import defaultdict
from contextvars import ContextVar from contextvars import ContextVar
from dataclasses import dataclass from typing import Any, Optional, Union
from enum import Enum
from typing import Union, TypeVar, Generic, Any, Optional, Iterable
from sortedcontainers import SortedList from sortedcontainers import SortedList
from dexorder import NARG from dexorder import NARG
from dexorder.database.model.block import Block from dexorder.base.fork import Fork, DisjointFork
from dexorder.database.model import Block
from dexorder.util import hexstr from dexorder.util import hexstr
from .diff import DiffEntry, DiffItem, DELETE
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@dataclass
class DiffEntry:
value: Union[Any, 'BlockState.DELETE']
height: int
hash: bytes
@dataclass
class DiffItem:
series: Any
key: Any
entry: DiffEntry
def __str__(self):
return f'{self.entry.hash} {self.series}.{self.key}={"[DEL]" if self.entry.value is BlockState.DELETE else self.entry.value}'
class Fork:
"""
A Fork is an ancestor path, stored as block hashes in reverse-chronological order from the "current" block at the start to ancestors at the end. The
getitem [] operator indexes by block height for positive values, while negative value are relative to the latest block, so [-1] is the latest
block and [-2] is its parent, etc.
"""
def __init__(self, ancestry: Iterable[bytes], *, height: int):
self.ancestry = list(ancestry)
self.height = height
self.disjoint = False
def __contains__(self, item):
index = self.height - item.height
if index < 0:
return False
try:
return self.ancestry[index] == item.hash
except IndexError:
return False
@property
def hash(self):
return self.ancestry[0]
@property
def parent(self):
return self.ancestry[1]
def for_height(self, height):
""" returns a new Fork object for an older block along this fork. used for root promotion. """
assert( self.height - len(self.ancestry) < height <= self.height)
return Fork(self.ancestry[self.height-height:], height=height)
def __str__(self):
return f'{self.height}_[{"->".join(h.hex() for h in self.ancestry)}]'
current_fork = ContextVar[Optional[Fork]]('current_fork', default=None)
class DisjointFork:
"""
duck type of Fork for blocks that connect directly to root with a parent gap in-between
"""
def __init__(self, block: Block, root: Block):
self.height = block.height
self.hash = block.hash
self.parent = root.hash
self.disjoint = True
def __contains__(self, item):
return item.hash in (self.hash, self.parent)
def __str__(self):
return f'{self.height}_[{self.hash.hex()}->{self.parent.hex()}]'
class BlockState: class BlockState:
DELETE = object()
by_chain: dict[int, 'BlockState'] = {} by_chain: dict[int, 'BlockState'] = {}
@@ -134,7 +58,7 @@ class BlockState:
return self.fork(block) return self.fork(block)
def delete_block(self, block: Union[Block,Fork,bytes]): def delete_block(self, block: Union[Block, Fork,bytes]):
""" if there was an error during block processing, we need to remove the incomplete block data """ """ if there was an error during block processing, we need to remove the incomplete block data """
try: try:
block = block.hash block = block.hash
@@ -178,7 +102,7 @@ class BlockState:
return default return default
diffs: list[DiffEntry] = series_diffs.get(key, []) diffs: list[DiffEntry] = series_diffs.get(key, [])
value = self._get_from_diffs(fork, diffs) value = self._get_from_diffs(fork, diffs)
if value is not BlockState.DELETE: if value is not DELETE:
return value return value
# value not found or was DELETE # value not found or was DELETE
if default is NARG: if default is NARG:
@@ -188,13 +112,13 @@ class BlockState:
def _get_from_diffs(self, fork, diffs): def _get_from_diffs(self, fork, diffs):
for diff in reversed(diffs): for diff in reversed(diffs):
if diff.height <= self.root_block.height or fork is not None and diff in fork: if diff.height <= self.root_block.height or fork is not None and diff in fork:
if diff.value is BlockState.DELETE: if diff.value is DELETE:
break break
else: else:
if self.root_block not in fork: # todo move this assertion elsewhere so it runs once per task if self.root_block not in fork: # todo move this assertion elsewhere so it runs once per task
raise ValueError(f'Cannot get value for a non-root fork {hexstr(fork.hash)}') raise ValueError(f'Cannot get value for a non-root fork {hexstr(fork.hash)}')
return diff.value return diff.value
return BlockState.DELETE return DELETE
def set(self, fork: Optional[Fork], series, key, value, overwrite=True): def set(self, fork: Optional[Fork], series, key, value, overwrite=True):
@@ -211,7 +135,7 @@ class BlockState:
for k, difflist in self.diffs_by_series.get(series, {}).items(): for k, difflist in self.diffs_by_series.get(series, {}).items():
for diff in reversed(difflist): for diff in reversed(difflist):
if diff.height <= self.root_block.height or fork is not None and diff in fork: if diff.height <= self.root_block.height or fork is not None and diff in fork:
if diff.value is not BlockState.DELETE: if diff.value is not DELETE:
yield k, diff.value yield k, diff.value
break break
@@ -255,7 +179,7 @@ class BlockState:
while len(difflist) >= 2 and difflist[1].height <= new_root_fork.height: while len(difflist) >= 2 and difflist[1].height <= new_root_fork.height:
difflist.pop(0) difflist.pop(0)
# if only one diff remains, and it's old, and it's a delete, then we can actually delete the diff list # if only one diff remains, and it's old, and it's a delete, then we can actually delete the diff list
if not difflist or len(difflist) == 1 and difflist[0].value == BlockState.DELETE and difflist[0].height <= new_root_fork.height: if not difflist or len(difflist) == 1 and difflist[0].value == DELETE and difflist[0].height <= new_root_fork.height:
del self.diffs_by_series[s][k] del self.diffs_by_series[s][k]
del self.by_hash[self.root_block.hash] # old root block del self.by_hash[self.root_block.hash] # old root block
@@ -291,157 +215,3 @@ class BlockState:
current_blockstate = ContextVar[BlockState]('current_blockstate') current_blockstate = ContextVar[BlockState]('current_blockstate')
T = TypeVar('T')
class BlockData:
class Type (Enum):
SCALAR:int = 0
SET:int = 1
LIST:int = 2
DICT:int = 3
registry: dict[str,'BlockData'] = {} # series name and instance
def __init__(self, series:str, data_type: Type):
assert series not in BlockData.registry
BlockData.registry[series] = self
self.series = series
self.type = data_type
def setitem(self, item, value, overwrite=True):
state = current_blockstate.get()
fork = current_fork.get()
state.set(fork, self.series, item, value, overwrite)
def getitem(self, item, default=NARG):
state = current_blockstate.get()
fork = current_fork.get()
return state.get(fork, self.series, item, default)
def delitem(self, item, overwrite=True):
self.setitem(item, BlockState.DELETE, overwrite)
def contains(self, item):
try:
self.getitem(item)
return True
except KeyError: # getitem with no default will raise on a missing item
return False
@staticmethod
def iter_items(series_key):
state = current_blockstate.get()
fork = current_fork.get()
return state.iteritems(fork, series_key)
class BlockSet(Generic[T], Iterable[T], BlockData):
def __init__(self, series: str):
super().__init__(series, BlockData.Type.SET)
self.series = series
def add(self, item):
""" set-like semantics. the item key is added with a value of None. """
self.setitem(item, None, overwrite=False)
def __delitem__(self, item):
self.delitem(item, overwrite=False)
def __contains__(self, item):
return self.contains(item)
def __iter__(self):
yield from (k for k,v in self.iter_items(self.series))
class BlockDict(Generic[T], BlockData):
def __init__(self, series: str):
super().__init__(series, BlockData.Type.DICT)
def __setitem__(self, item, value):
self.setitem(item, value)
def __getitem__(self, item):
return self.getitem(item)
def __delitem__(self, item):
self.delitem(item)
def __contains__(self, item):
return self.contains(item)
def items(self):
return self.iter_items(self.series)
def get(self, item, default=None):
return self.getitem(item, default)
def _test():
def B(height, hash:str, parent):
return Block(chain=1337, height=height, hash=hash.encode('utf8'), parent=None if parent is None else parent.hash, data=None)
root_block = B(10, '#root', None )
state = BlockState(root_block)
current_blockstate.set(state)
b11 = B(11, '#b11', parent=root_block)
f11: Fork = state.add_block(b11)
print('f11',f11)
b11b = B(11, '#b11b', parent=root_block)
f11b: Fork = state.add_block(b11b)
print('f11b',f11b)
b12 = B(12, '#b12', parent=b11)
f12: Fork = state.add_block(b12)
print('f12',f12)
d = BlockDict('ser')
def dump():
print()
print(current_fork.get().hash if current_fork.get() is not None else 'root')
for k,v in d.items():
print(f'{k} = {v}')
current_fork.set(None) # Use None to set values on root
d['foo'] = 'bar'
d['test'] = 'failed'
current_fork.set(f11)
d['foo2'] = 'bar2'
del d['test']
current_fork.set(f11b)
del d['foo2']
d['foob'] = 'barb'
current_fork.set(f12)
d['test'] = 'ok'
for f in (None, f11, f11b, f12):
current_fork.set(f)
dump()
print()
print('all b12 diffs')
for i in state.collect_diffs(b12):
print(i)
print()
print('promoting b11')
state.promote_root(f11)
current_fork.set(f12)
dump()
print()
print('promoting b12')
state.promote_root(f12)
current_fork.set(f12)
dump()
if __name__ == '__main__':
_test()

View File

@@ -1,8 +0,0 @@
from dexorder.base.blockstate import BlockSet, BlockDict
vault_addresses = BlockSet('v')
vault_tokens = BlockDict('vt')
underfunded_vaults = BlockSet('uv')
active_orders = BlockSet('a')
pool_prices = BlockDict('p')

View File

@@ -0,0 +1,7 @@
from dexorder.blockstate import BlockSet, BlockDict
vault_addresses = BlockSet('v', db=True, redis=True)
vault_tokens = BlockDict('vt', db=True, redis=True)
pool_prices = BlockDict('p', db=True, redis=True)
underfunded_vaults = BlockSet('uv', db=True)
active_orders = BlockSet('a', db=True)

View File

@@ -1,3 +1,4 @@
from dexorder.util import json
import logging import logging
from contextvars import ContextVar from contextvars import ContextVar
@@ -6,6 +7,7 @@ from sqlalchemy import Engine
from sqlalchemy.orm import Session, SessionTransaction from sqlalchemy.orm import Session, SessionTransaction
from .migrate import migrate_database from .migrate import migrate_database
from .model.kv import KeyValue
from .. import config from .. import config
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -14,7 +16,38 @@ _engine = ContextVar[Engine]('engine', default=None)
_session = ContextVar[Session]('session', default=None) _session = ContextVar[Session]('session', default=None)
# Key-value store in DB for general metadata use
class Kv:
def __getitem__(self, key: str):
found = db.session.get(KeyValue, key)
if found is None:
raise KeyError
return found.value
def __setitem__(self, key: str, value):
found = db.session.get(KeyValue, key)
if found is None:
db.session.add(KeyValue(key=key, value=value))
else:
found.value = value
def __delitem__(self, key: str):
db.session.query(KeyValue).filter(KeyValue.key == key).delete()
def get(self, key: str, default=None):
try:
return self[key]
except KeyError:
return default
class Db: class Db:
def __init__(self):
self.kv = Kv()
def __bool__(self):
return bool(config.db_url)
def transaction(self) -> SessionTransaction: def transaction(self) -> SessionTransaction:
""" """
this type of block should be at the top-level of any group of db operations. it will automatically commit this type of block should be at the top-level of any group of db operations. it will automatically commit
@@ -54,7 +87,7 @@ class Db:
url = config.db_url url = config.db_url
if dump_sql is None: if dump_sql is None:
dump_sql = config.dump_sql dump_sql = config.dump_sql
engine = sqlalchemy.create_engine(url, echo=dump_sql) engine = sqlalchemy.create_engine(url, echo=dump_sql, json_serializer=json.dumps, json_deserializer=json.loads)
if migrate: if migrate:
migrate_database() migrate_database()
with engine.connect() as connection: with engine.connect() as connection:

View File

@@ -1,6 +1,8 @@
from typing import Union
from hexbytes import HexBytes from hexbytes import HexBytes
from sqlalchemy import SMALLINT, INTEGER, BIGINT from sqlalchemy import SMALLINT, INTEGER, BIGINT
from sqlalchemy.dialects.postgresql import BYTEA from sqlalchemy.dialects.postgresql import BYTEA, JSONB
from sqlalchemy.orm import mapped_column from sqlalchemy.orm import mapped_column
from typing_extensions import Annotated from typing_extensions import Annotated
@@ -83,6 +85,8 @@ BlockCol = Annotated[int, mapped_column(BIGINT)]
Blockchain = Annotated[NativeBlockchain, mapped_column(t.Blockchain)] Blockchain = Annotated[NativeBlockchain, mapped_column(t.Blockchain)]
Json = Annotated[Union[str,int,float,list,dict,None], mapped_column(JSONB)]
# Uniswap aliases # Uniswap aliases
Tick = Int24 Tick = Int24
SqrtPriceX96 = Uint160 SqrtPriceX96 = Uint160

View File

@@ -1,2 +1,3 @@
from .base import Base from .base import Base
from .block import Block from .block import Block
from .series import SeriesSet, SeriesDict

View File

@@ -9,13 +9,13 @@ from dexorder.database.model import Base
class Block(Base): class Block(Base):
chain: Mapped[int] = mapped_column(primary_key=True) chain: Mapped[int] = mapped_column(primary_key=True)
height: Mapped[int] = mapped_column(primary_key=True) # timescaledb index height: Mapped[int] = mapped_column(primary_key=True)
hash: Mapped[bytes] = mapped_column(primary_key=True) hash: Mapped[bytes] = mapped_column(primary_key=True)
parent: Mapped[bytes] parent: Mapped[bytes]
data: Mapped[dict] = mapped_column('data',JSONB) data: Mapped[dict] = mapped_column(JSONB)
def __str__(self): def __str__(self):
return f'{self.height}_{self.hash.hex()}' return f'{self.height}_{self.hash.hex()[:5]}'
current_block = ContextVar[Block]('Block.cur') # block for the current thread current_block = ContextVar[Block]('Block.cur') # block for the current thread

View File

@@ -0,0 +1,13 @@
import logging
from sqlalchemy.orm import Mapped, mapped_column
from dexorder.database.column import Json
from dexorder.database.model import Base
log = logging.getLogger(__name__)
class KeyValue (Base):
key: Mapped[str] = mapped_column(primary_key=True)
value: Mapped[Json]

View File

@@ -0,0 +1,21 @@
import logging
from typing import Union
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import mapped_column, Mapped
from dexorder.database.column import Json
from dexorder.database.model import Base
log = logging.getLogger(__name__)
class SeriesBase:
chain: Mapped[int] = mapped_column(primary_key=True)
series: Mapped[str] = mapped_column(primary_key=True)
key: Mapped[str] = mapped_column(primary_key=True)
class SeriesSet (SeriesBase, Base):
pass
class SeriesDict (SeriesBase, Base):
value: Mapped[Json]

View File

@@ -7,9 +7,11 @@ from web3.exceptions import LogTopicError
from web3.types import EventData from web3.types import EventData
from dexorder import Blockchain, db, blockchain, NARG, dec from dexorder import Blockchain, db, blockchain, NARG, dec
from dexorder.base.blockstate import BlockState, BlockDict, Fork, DiffItem, BlockSet, current_blockstate, current_fork from dexorder.base.chain import current_chain
from dexorder.base.fork import Fork, current_fork
from dexorder.blockchain.connection import create_w3_ws from dexorder.blockchain.connection import create_w3_ws
from dexorder.blockchain.util import get_contract_data from dexorder.blockchain.util import get_contract_data
from dexorder.blockstate import DiffItem, BlockState, current_blockstate
from dexorder.data import pool_prices, vault_tokens, underfunded_vaults, vault_addresses from dexorder.data import pool_prices, vault_tokens, underfunded_vaults, vault_addresses
from dexorder.database.model import Block from dexorder.database.model import Block
from dexorder.database.model.block import current_block, latest_block from dexorder.database.model.block import current_block, latest_block
@@ -19,10 +21,15 @@ log = logging.getLogger(__name__)
# todo detect reorgs and generate correct onHeadUpdate set by unioning the changes along the two forks, not including their common ancestor deltas # todo detect reorgs and generate correct onHeadUpdate set by unioning the changes along the two forks, not including their common ancestor deltas
class TriggerRunner: class BlockStateRunner:
def __init__(self): def __init__(self, state: BlockState = None):
self.root_age = 10 # todo set per chain """
If state is None, then it is initialized as empty using the first block seen as the root block. Then the second block begins log event handling.
"""
self.state = state
# items are (callback, event, log_filter). The callback is invoked with web3 EventData for every detected event
self.events:list[tuple[Callable[[dict],None],ContractEvents,dict]] = [] self.events:list[tuple[Callable[[dict],None],ContractEvents,dict]] = []
# onHeadUpdate callbacks are invoked with a list of DiffItems used to update the head state from either the previous head or the root # onHeadUpdate callbacks are invoked with a list of DiffItems used to update the head state from either the previous head or the root
@@ -31,13 +38,14 @@ class TriggerRunner:
# onPromotion callbacks are invoked with a list of DiffItems used to advance the root state # onPromotion callbacks are invoked with a list of DiffItems used to advance the root state
self.on_promotion: list[Callable[[Block,list[DiffItem]],None]] = [] self.on_promotion: list[Callable[[Block,list[DiffItem]],None]] = []
async def run(self): async def run(self):
""" """
1. load root stateBlockchain 1. load root stateBlockchain
a. if no root, init from head a. if no root, init from head
b. if root is old, batch forward by height b. if root is old, batch forward by height
2. discover new heads 2. discover new heads
2b. find in-memory ancestor else use root 2b. find in-state parent block else use root
3. context = ancestor->head diff 3. context = ancestor->head diff
4. query global log filter 4. query global log filter
5. process new vaults 5. process new vaults
@@ -54,15 +62,14 @@ class TriggerRunner:
15. on tx confirmation, the block height of all executed trigger requests is set to the tx block 15. on tx confirmation, the block height of all executed trigger requests is set to the tx block
""" """
db.connect()
w3 = blockchain.connect() w3 = blockchain.connect()
w3ws = create_w3_ws() w3ws = create_w3_ws()
chain_id = await w3ws.eth.chain_id chain_id = await w3ws.eth.chain_id
Blockchain.set_cur(Blockchain.for_id(chain_id)) chain = Blockchain.for_id(chain_id)
current_chain.set(chain)
# todo load root state = self.state
state = None
async with w3ws as w3ws: async with w3ws as w3ws:
await w3ws.eth.subscribe('newHeads') await w3ws.eth.subscribe('newHeads')
while True: while True:
@@ -124,7 +131,7 @@ class TriggerRunner:
callback(block, diff_items) callback(block, diff_items)
# check for root promotion # check for root promotion
promotion_height = fork.height - self.root_age promotion_height = fork.height - chain.confirms
if not fork.disjoint and promotion_height > state.root_block.height: if not fork.disjoint and promotion_height > state.root_block.height:
diff_items = state.promote_root(fork.for_height(promotion_height)) diff_items = state.promote_root(fork.for_height(promotion_height))
for callback in self.on_promotion: for callback in self.on_promotion:

View File

@@ -32,6 +32,19 @@ def hexbytes(value: str):
""" converts an optionally 0x-prefixed hex string into bytes """ """ converts an optionally 0x-prefixed hex string into bytes """
return bytes.fromhex(value[2:] if value.startswith('0x') else value) return bytes.fromhex(value[2:] if value.startswith('0x') else value)
def keystr(value):
if type(value) is str:
return value
if type(value) is HexBytes:
return value.hex()
if type(value) is bytes:
return '0x' + value.hex()
return str(value)
def strkey(s):
if s.startswith('0x'):
return hexbytes(s)
return s
def topic(event_abi): def topic(event_abi):
event_name = f'{event_abi["name"]}(' + ','.join(i['type'] for i in event_abi['inputs']) + ')' event_name = f'{event_abi["name"]}(' + ','.join(i['type'] for i in event_abi['inputs']) + ')'

View File

@@ -1,19 +1,24 @@
from decimal import Decimal
from hexbytes import HexBytes from hexbytes import HexBytes
from orjson import orjson from orjson import orjson
from web3.datastructures import AttributeDict from web3.datastructures import AttributeDict
def _serialize(v): def _serialize(v):
# todo wrap json.dumps()
if type(v) is HexBytes: if type(v) is HexBytes:
return v.hex() return v.hex()
if type(v) is AttributeDict: elif type(v) is bytes:
return '0x' + v.hex()
elif type(v) is AttributeDict:
return v.__dict__ return v.__dict__
raise ValueError(v) elif type(v) is Decimal:
return f'{v:f}'
raise TypeError
def loads(s): def loads(s):
return orjson.loads(s) return orjson.loads(s)
def dumps(obj): def dumps(obj):
return orjson.dumps(obj, default=_serialize) return orjson.dumps(obj, default=_serialize, option=orjson.OPT_PASSTHROUGH_SUBCLASS).decode('utf8')