order state; lazy db read; untested

This commit is contained in:
Tim Olson
2023-10-11 00:27:37 -04:00
parent 5078db53c1
commit 393d4d4019
11 changed files with 199 additions and 88 deletions

View File

@@ -11,7 +11,7 @@ class Fork:
"""
A Fork is an ancestor path, stored as block hashes in reverse-chronological order from the "current" block at the start to ancestors at the end. The
getitem [] operator indexes by block height for positive values, while negative value are relative to the latest block, so [-1] is the latest
block and [-2] is its parent, etc.
block and [-2] is its parent, etc. Any blocks older than the tail of the fork are considered finalized and may be referenced by height.
"""
def __init__(self, ancestry: Iterable[bytes], *, height: int):
@@ -20,13 +20,16 @@ class Fork:
self.disjoint = False
def __contains__(self, item):
index = self.height - item.height
if index < 0:
"""
item can be a Block or another Fork. returns True iff the given item appears on this fork. if item is ahead of this fork
or a cousin chain, returns False
"""
index = self.height - item.height # index is reverse chronological in order to index our ancentry list
if index < 0: # item is ahead of us in height
return False
try:
if index >= len(self.ancestry): # item is older than this fork
return True # consider old blocks settled and on this fork
return self.ancestry[index] == item.hash
except IndexError:
return False
@property
def hash(self):
@@ -49,7 +52,7 @@ class Fork:
class DisjointFork:
"""
duck type of Fork for blocks that connect directly to root with a parent gap in-between
duck type of Fork for blocks that connect directly to root with a parent gap in-between. these forks are associated with backfill.
"""
def __init__(self, block: Block, root: Block):
self.height = block.height
@@ -58,6 +61,10 @@ class DisjointFork:
self.disjoint = True
def __contains__(self, item):
if item.height > self.height:
return False # item is in the future
if item.height < self.parent.height:
return True # item is ancient
return item.hash in (self.hash, self.parent)
def __str__(self):

View File

@@ -1,10 +1,12 @@
import logging
from collections import defaultdict
from enum import Enum
from typing import TypeVar, Generic, Iterable, Union
from typing import TypeVar, Generic, Iterable, Union, Any
from dexorder import NARG, DELETE
from dexorder.base.fork import current_fork
from .state import current_blockstate
from dexorder.util import key2str as util_key2str, str2key as util_str2key
log = logging.getLogger(__name__)
T = TypeVar('T')
@@ -18,14 +20,26 @@ class DataType(Enum):
class BlockData:
registry: dict[str,'BlockData'] = {} # series name and instance
registry: dict[Any,'BlockData'] = {} # series name and instance
adapters: dict[list['BlockDataAdapter']] = defaultdict(list)
def __init__(self, series:str, data_type: DataType, **opts):
def __init__(self, data_type: DataType, series: Any, *,
series2str=None, series2key=None, # defaults to key2str and str2key
key2str=util_key2str, str2key=util_str2key,
value2basic=lambda x:x, basic2value=lambda x:x, # serialize/deserialize value to something JSON-able
**opts):
assert series not in BlockData.registry
BlockData.registry[series] = self
self.series = series
self.type = data_type
self.opts = opts
self.key2str = key2str
self.str2key = str2key
self.series2str = series2str or self.key2str
self.series2key = series2key or self.str2key
self.value2basic = value2basic
self.basic2value = basic2value
self.lazy_getitem = None
def setitem(self, item, value, overwrite=True):
state = current_blockstate.get()
@@ -35,7 +49,14 @@ class BlockData:
def getitem(self, item, default=NARG):
state = current_blockstate.get()
fork = current_fork.get()
return state.get(fork, self.series, item, default)
result = state.get(fork, self.series, item, default)
if result is NARG and self.lazy_getitem:
lazy = self.lazy_getitem(self, item)
if lazy is not None:
lookup_fork, lookup_value = lazy
if lookup_fork in fork:
result = lookup_value
return result
def delitem(self, item, overwrite=True):
self.setitem(item, DELETE, overwrite)
@@ -57,10 +78,15 @@ class BlockData:
def by_opt(key):
yield from (s for s in BlockData.registry.values() if key in s.opts)
def delete(self):
state = current_blockstate.get()
fork = current_fork.get()
state.delete_series(fork, self.series)
class BlockSet(Generic[T], Iterable[T], BlockData):
def __init__(self, series: str, **tags):
super().__init__(series, DataType.SET, **tags)
def __init__(self, series: Any, **tags):
super().__init__(DataType.SET, series, **tags)
self.series = series
def add(self, item):
@@ -79,8 +105,8 @@ class BlockSet(Generic[T], Iterable[T], BlockData):
class BlockDict(Generic[T], BlockData):
def __init__(self, series: str, **tags):
super().__init__(series, DataType.DICT, **tags)
def __init__(self, series: Any, **tags):
super().__init__(DataType.DICT, series, **tags)
def __setitem__(self, item, value):
self.setitem(item, value)
@@ -102,8 +128,8 @@ class BlockDict(Generic[T], BlockData):
class SeriesCollection:
def __init__(self, series_or_datavars: Iterable[Union[str,BlockData]]):
self.datas: dict[str,BlockData] = {
(d := BlockData.registry[x] if type(x) is str else x).series: d
def __init__(self, series_or_datavars: Iterable[Union[Any,BlockData]]):
self.datas: dict[Any,BlockData] = {
(d := BlockData.registry.get(x,x)).series: d
for x in series_or_datavars
}

View File

@@ -1,30 +1,53 @@
import logging
from typing import Iterable, Optional, Union
from typing import Iterable, Optional, Union, Any
from . import DiffItem, BlockSet, BlockDict, DELETE, BlockState, current_blockstate, DataType
from .blockdata import BlockData, SeriesCollection
from .diff import DiffEntryItem
from .. import db
from ..base.chain import current_chain
from ..base.fork import current_fork
from ..base.fork import current_fork, Fork
from ..database.model import SeriesSet, SeriesDict, Block
from ..database.model.block import current_block, latest_block, completed_block
from ..util import keystr, strkey, hexbytes
from ..util import hexbytes
log = logging.getLogger(__name__)
class DbState(SeriesCollection):
def __init__(self, series_or_datavars: Iterable[Union[Any, BlockData]]):
super().__init__(series_or_datavars)
for d in self.datas.values():
if d.opts.get('db') == 'lazy':
d.lazy_getitem = DbState.lazy_getitem
@staticmethod
def lazy_getitem(var: BlockData, item):
chain_id = current_chain.get().chain_id
t = var.type
Entity = SeriesSet if t == DataType.SET else SeriesDict if t == DataType.DICT else None
series = var.series2str(var.series)
key = var.key2str(item)
try:
height, blockhash = db.kv[f'root_block.{chain_id}']
except:
return None
fork = Fork([hexbytes(blockhash)], height=height)
value = db.session.get(Entity, (chain_id, series, key))
return fork, value
def save(self, root_block: Block, diffs: Iterable[Union[DiffItem,DiffEntryItem]] ):
chain_id = current_chain.get().chain_id
for diff in diffs:
try:
t = self.datas[diff.series].type
d = self.datas[diff.series]
t = d.type
except KeyError:
continue
diffseries = keystr(diff.series)
diffkey = keystr(diff.key)
diffseries = d.series2str(diff.series)
diffkey = d.key2str(diff.key)
key = dict(chain=chain_id, series=diffseries, key=diffkey)
if diff.value is DELETE:
Entity = SeriesSet if t == DataType.SET else SeriesDict if t == DataType.DICT else None
@@ -38,7 +61,7 @@ class DbState(SeriesCollection):
elif t == DataType.DICT:
found = db.session.get(SeriesDict, key)
if found is None:
db.session.add(SeriesDict(**key, value=diff.value))
db.session.add(SeriesDict(**key, value=d.value2basic(diff.value)))
else:
found.value = diff.value
else:
@@ -61,16 +84,17 @@ class DbState(SeriesCollection):
current_blockstate.set(state)
current_fork.set(None) # root fork
for series, data in self.datas.items():
if data.opts.get('db') != 'lazy':
t = data.type
if t == DataType.SET:
# noinspection PyTypeChecker
var: BlockSet = BlockData.registry[series]
for row in db.session.query(SeriesSet).where(SeriesSet.series==keystr(series)):
var.add(strkey(row.key))
for row in db.session.query(SeriesSet).where(SeriesSet.series == data.series2str(series)):
var.add(data.str2key(row.key))
elif t == DataType.DICT:
# noinspection PyTypeChecker
var: BlockDict = BlockData.registry[series]
for row in db.session.query(SeriesDict).where(SeriesDict.series==keystr(series)):
var[strkey(row.key)] = row.value
for row in db.session.query(SeriesDict).where(SeriesDict.series == data.series2str(series)):
var[data.str2key(row.key)] = data.basic2value(row.value)
completed_block.set(root_block)
return state

View File

@@ -161,6 +161,7 @@ class BlockState:
# walk the by_height list to delete any aged-out block data
# in order to prune diffs_by_series, updated_keys remembers all the keys that were touched by any aged-out block
series_deletions = []
updated_keys = set()
while self.by_height and self.by_height[0].height <= block.height:
dead = self.by_height.pop(0)
@@ -171,7 +172,11 @@ class BlockState:
pass
block_diffs = self.diffs_by_hash.get(dead.hash)
if block_diffs is not None:
updated_keys.update((d.series, d.key) for d in block_diffs)
for d in block_diffs:
if d.key == BlockState._DELETE_SERIES_KEY and dead.hash in new_root_fork:
series_deletions.append(d.series)
else:
updated_keys.add((d.series, d.key))
del self.diffs_by_hash[dead.hash]
del self.ancestors[dead.hash]
@@ -195,11 +200,20 @@ class BlockState:
# if only one diff remains, and it's old, and it's a delete, then we can actually delete the diff list
if not difflist or len(difflist) == 1 and difflist[0].value == DELETE and difflist[0].height <= new_root_fork.height:
del self.diffs_by_series[s][k]
for s in series_deletions:
del self.diffs_by_series[s]
self.root_block = block
log.debug(f'promoted root {self.root_block}')
return diffs
_DELETE_SERIES_KEY = '!^DEXORDER_DELETE_SERIES^!'
def delete_series(self, fork: Optional[Fork], series: str):
"""
deletes the series entirely. the deletion is part of the blockstate, so the series could remain active on some branches until the
series deletion matures into finality.
"""
self.set(fork, series, BlockState._DELETE_SERIES_KEY, None) # setting any value on this special key will trigger a delete when this block finalizes
def collect_diffs(self, block: Block, series_key=NARG) -> list[DiffEntryItem]:
"""
returns a list of the latest DiffItem for each key change along the ancestor path from block to root

View File

@@ -4,7 +4,7 @@ from dexorder.blockstate import BlockSet, BlockDict
# if pub is True, then event is the current series name, room is the key, and value is passed through
# values of DELETE are serialized as nulls
vault_addresses = BlockSet('v', db=True, redis=True)
vault_owners = BlockDict('v', db=True, redis=True)
vault_tokens = BlockDict('vt', db=True, redis=True, pub=True)
pool_prices = BlockDict('p', db=True, redis=True, pub=True)
underfunded_vaults = BlockSet('uv', db=True)

View File

@@ -1,5 +1,4 @@
from contextvars import ContextVar
from typing import Optional
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column

View File

@@ -6,7 +6,7 @@ from dexorder import dec, current_pub, current_w3
from dexorder.base.chain import current_chain
from dexorder.blockchain.util import vault_address, get_contract_event, get_factory, get_contract_data
from dexorder.contract import VaultContract
from dexorder.data import pool_prices, vault_addresses, vault_tokens, underfunded_vaults
from dexorder.data import pool_prices, vault_owners, vault_tokens, underfunded_vaults
from dexorder.database.model.block import current_block
from dexorder.orderlib.orderlib import SwapOrderStatus
@@ -36,7 +36,7 @@ async def handle_order_placed(event: EventData):
start_index = int(event['args']['startOrderIndex'])
num_orders = int(event['args']['numOrders'])
log.debug(f'DexorderPlaced {addr} {start_index} {num_orders}')
if addr not in vault_addresses:
if addr not in vault_owners:
log.warning(f'block {current_block.get()} order from unknown vault {addr}') # todo insert (short) block hash into all logs
# return todo discard rogues
vault = VaultContract(addr)
@@ -67,7 +67,7 @@ def handle_order_error(event: EventData):
def handle_transfer(transfer: EventData):
to_address = transfer['args']['to']
log.debug(f'transfer {to_address}')
if to_address in vault_addresses:
if to_address in vault_owners:
token_address = transfer['address']
vault_tokens.add(token_address)
if to_address in underfunded_vaults:
@@ -96,12 +96,12 @@ def handle_vault_created(created: EventData):
return
vault = vault_address(owner,num)
log.debug(f'VaultCreated {owner} #{num} => {vault}')
vault_addresses.add(vault)
vault_owners[vault] = owner
vaults = []
for num in range(256):
addr = vault_address(owner, num)
# log.debug(f'v{num}? {addr}')
if addr in vault_addresses:
if addr in vault_owners:
vaults.append(addr)
else:
break

View File

@@ -13,7 +13,6 @@ from dexorder.blockstate.diff import DiffEntryItem
from dexorder.blockstate.state import compress_diffs
from dexorder.database.model import Block
from dexorder.memcache import current_redis, memcache
from dexorder.util import keystr
from dexorder.util.json import json_encoder
log = logging.getLogger(__name__)
@@ -57,9 +56,9 @@ class RedisState (SeriesCollection):
except KeyError:
continue
t = d.type
series = f'{chain_id}|{keystr(diff.series)}'
key = keystr(diff.key)
value = keystr(diff.value)
series = f'{chain_id}|{d.series2str(diff.series)}'
key = d.key2str(diff.key)
value = diff.value
# pub/sub socketio/redis
pub_kv = d.opts.get('pub')
if pub_kv is True:

View File

@@ -55,15 +55,24 @@ class SwapOrder:
self.outputDirectlyToOwner, self.chainOrder, [t.dump() for t in self.tranches])
@dataclass
class SwapOrderStatus:
order: SwapOrder
state: SwapOrderState
class SwapStatus:
state: SwapOrderState # todo refactor into canceled flag
start: int
ocoGroup: Optional[int]
filledIn: int
filledOut: int
trancheFilledIn: list[int]
trancheFilledOut: list[int]
filledIn: Optional[int] # if None then look in the order_filled blockstate
filledOut: Optional[int] # if None then look in the order_filled blockstate
trancheFilledIn: Optional[list[int]] # if None then look in the tranche_filled blockstate
trancheFilledOut: Optional[list[int]] # if None then look in the tranche_filled blockstate
@dataclass
class SwapOrderStatus (SwapStatus):
order: SwapOrder
def __init__(self, order, *swapstatus_args):
""" init with order object first follewed by the swap status args"""
super().__init__(*swapstatus_args)
self.order = order
@staticmethod
def load(obj):

View File

@@ -1,30 +1,65 @@
import logging
from dataclasses import dataclass
from dexorder.blockstate import BlockSet, BlockDict
from dexorder.order.orderlib import SwapOrderStatus
from dexorder.util import keystr
from dexorder.blockstate import BlockDict
from dexorder.order.orderlib import SwapStatus
log = logging.getLogger(__name__)
def order_key( vault:str, order_index:int ):
return keystr(vault, str(order_index))
@dataclass
class OrderKey:
vault: str
order_index: int
def tranche_key( vault:str, order_index:int, tranche_index:int ):
return keystr(vault,str(order_index),str(tranche_index))
@staticmethod
def str2key(keystring: str):
vault, order_index = keystring.split('|')
return OrderKey(vault, int(order_index))
active_orders = BlockSet('ao') # unfilled, not-canceled orders whose triggers have been loaded/set
order_remaining = BlockDict('or') # by order key
tranche_remaining = BlockDict('tr') # by tranche key
def __str__(self):
return f'{self.vault}|{self.order_index}'
# todo forcibly dispose of entire series
@dataclass
class TrancheKey (OrderKey):
tranche_index: int
class OrderState:
def __init__(self, vault:str, order_index:int, status: SwapOrderStatus):
self.vault = vault
self.order_index = order_index
self.key = f'{vault}|{order_index}'
self.tranche_keys = [f'{self.key}|{i}' for i in range(len(status.trancheFilledIn))]
@staticmethod
def str2key(keystring: str):
vault, order_index, tranche_index = keystring.split('|')
return TrancheKey(vault, int(order_index), int(tranche_index))
###### TODO TODO TODO vault state needs to be a dict pointing to owner addr
def __str__(self):
return f'{self.vault}|{self.order_index}|{self.tranche_index}'
@dataclass
class Remaining:
isInput: bool # True iff the remaining amount is in terms of the input token
remaining: int
@staticmethod
def basic2remaining(basic):
return Remaining(*basic)
def remaining2basic(self):
return self.isInput, self.remaining
# ORDER STATE
# various blockstate fields hold different aspects of an order's state.
# all order and status data: writes to db but lazy-loads
orders = BlockDict[OrderKey,SwapStatus]('o', str2key=OrderKey.str2key, db='lazy') # todo lazy what's that about?
# the set of unfilled, not-canceled orders
active_orders = BlockDict[OrderKey]('ao', str2key=OrderKey.str2key, db=True, redis=True)
# total remaining amount per order, for all unfilled, not-canceled orders
order_remaining = BlockDict[OrderKey,Remaining](
'or', str2key=OrderKey.str2key, value2basic=Remaining.remaining2basic, basic2value=Remaining.basic2remaining, db=True, redis=True)
# total remaining amount per tranche
tranche_remaining = BlockDict[TrancheKey,Remaining](
'tr', str2key=TrancheKey.str2key, value2basic=Remaining.remaining2basic, basic2value=Remaining.basic2remaining, db=True, redis=True)
# todo oco groups

View File

@@ -34,19 +34,17 @@ def hexbytes(value: str):
""" converts an optionally 0x-prefixed hex string into bytes """
return bytes.fromhex(value[2:] if value.startswith('0x') else value)
def keystr(*keys):
return '|'.join(
value if type(value) is str else
value.hex() if type(value) is HexBytes else
'0x' + value.hex() if type(value) is bytes else
str(value)
for value in keys
)
def strkey(s):
if s.startswith('0x'):
return hexbytes(s)
return s
def _keystr1(value):
t = type(value)
return value if t is str else value.hex() if t is HexBytes else '0x' + value.hex() if t is bytes else str(value)
def key2str(key):
return _keystr1(key) if type(key) not in (list, tuple) else '|'.join(_keystr1(v) for v in key)
def str2key(s,types=None):
return tuple(s.split('|')) if types is None else tuple(t(v) for t,v in zip(types,s.split('|')))
def topic(event_abi):
event_name = f'{event_abi["name"]}(' + ','.join(i['type'] for i in event_abi['inputs']) + ')'