diff --git a/src/dexorder/base/fork.py b/src/dexorder/base/fork.py index fb430db..f5dc767 100644 --- a/src/dexorder/base/fork.py +++ b/src/dexorder/base/fork.py @@ -11,7 +11,7 @@ class Fork: """ A Fork is an ancestor path, stored as block hashes in reverse-chronological order from the "current" block at the start to ancestors at the end. The getitem [] operator indexes by block height for positive values, while negative value are relative to the latest block, so [-1] is the latest - block and [-2] is its parent, etc. + block and [-2] is its parent, etc. Any blocks older than the tail of the fork are considered finalized and may be referenced by height. """ def __init__(self, ancestry: Iterable[bytes], *, height: int): @@ -20,13 +20,16 @@ class Fork: self.disjoint = False def __contains__(self, item): - index = self.height - item.height - if index < 0: - return False - try: - return self.ancestry[index] == item.hash - except IndexError: + """ + item can be a Block or another Fork. returns True iff the given item appears on this fork. if item is ahead of this fork + or a cousin chain, returns False + """ + index = self.height - item.height # index is reverse chronological in order to index our ancentry list + if index < 0: # item is ahead of us in height return False + if index >= len(self.ancestry): # item is older than this fork + return True # consider old blocks settled and on this fork + return self.ancestry[index] == item.hash @property def hash(self): @@ -49,7 +52,7 @@ class Fork: class DisjointFork: """ - duck type of Fork for blocks that connect directly to root with a parent gap in-between + duck type of Fork for blocks that connect directly to root with a parent gap in-between. these forks are associated with backfill. """ def __init__(self, block: Block, root: Block): self.height = block.height @@ -58,6 +61,10 @@ class DisjointFork: self.disjoint = True def __contains__(self, item): + if item.height > self.height: + return False # item is in the future + if item.height < self.parent.height: + return True # item is ancient return item.hash in (self.hash, self.parent) def __str__(self): diff --git a/src/dexorder/blockstate/blockdata.py b/src/dexorder/blockstate/blockdata.py index 8ca6757..8a0e4be 100644 --- a/src/dexorder/blockstate/blockdata.py +++ b/src/dexorder/blockstate/blockdata.py @@ -1,10 +1,12 @@ import logging +from collections import defaultdict from enum import Enum -from typing import TypeVar, Generic, Iterable, Union +from typing import TypeVar, Generic, Iterable, Union, Any from dexorder import NARG, DELETE from dexorder.base.fork import current_fork from .state import current_blockstate +from dexorder.util import key2str as util_key2str, str2key as util_str2key log = logging.getLogger(__name__) T = TypeVar('T') @@ -18,14 +20,26 @@ class DataType(Enum): class BlockData: - registry: dict[str,'BlockData'] = {} # series name and instance + registry: dict[Any,'BlockData'] = {} # series name and instance + adapters: dict[list['BlockDataAdapter']] = defaultdict(list) - def __init__(self, series:str, data_type: DataType, **opts): + def __init__(self, data_type: DataType, series: Any, *, + series2str=None, series2key=None, # defaults to key2str and str2key + key2str=util_key2str, str2key=util_str2key, + value2basic=lambda x:x, basic2value=lambda x:x, # serialize/deserialize value to something JSON-able + **opts): assert series not in BlockData.registry BlockData.registry[series] = self self.series = series self.type = data_type self.opts = opts + self.key2str = key2str + self.str2key = str2key + self.series2str = series2str or self.key2str + self.series2key = series2key or self.str2key + self.value2basic = value2basic + self.basic2value = basic2value + self.lazy_getitem = None def setitem(self, item, value, overwrite=True): state = current_blockstate.get() @@ -35,7 +49,14 @@ class BlockData: def getitem(self, item, default=NARG): state = current_blockstate.get() fork = current_fork.get() - return state.get(fork, self.series, item, default) + result = state.get(fork, self.series, item, default) + if result is NARG and self.lazy_getitem: + lazy = self.lazy_getitem(self, item) + if lazy is not None: + lookup_fork, lookup_value = lazy + if lookup_fork in fork: + result = lookup_value + return result def delitem(self, item, overwrite=True): self.setitem(item, DELETE, overwrite) @@ -57,10 +78,15 @@ class BlockData: def by_opt(key): yield from (s for s in BlockData.registry.values() if key in s.opts) + def delete(self): + state = current_blockstate.get() + fork = current_fork.get() + state.delete_series(fork, self.series) + class BlockSet(Generic[T], Iterable[T], BlockData): - def __init__(self, series: str, **tags): - super().__init__(series, DataType.SET, **tags) + def __init__(self, series: Any, **tags): + super().__init__(DataType.SET, series, **tags) self.series = series def add(self, item): @@ -79,8 +105,8 @@ class BlockSet(Generic[T], Iterable[T], BlockData): class BlockDict(Generic[T], BlockData): - def __init__(self, series: str, **tags): - super().__init__(series, DataType.DICT, **tags) + def __init__(self, series: Any, **tags): + super().__init__(DataType.DICT, series, **tags) def __setitem__(self, item, value): self.setitem(item, value) @@ -102,8 +128,8 @@ class BlockDict(Generic[T], BlockData): class SeriesCollection: - def __init__(self, series_or_datavars: Iterable[Union[str,BlockData]]): - self.datas: dict[str,BlockData] = { - (d := BlockData.registry[x] if type(x) is str else x).series: d + def __init__(self, series_or_datavars: Iterable[Union[Any,BlockData]]): + self.datas: dict[Any,BlockData] = { + (d := BlockData.registry.get(x,x)).series: d for x in series_or_datavars } diff --git a/src/dexorder/blockstate/db_state.py b/src/dexorder/blockstate/db_state.py index 1ed7b72..49fab7e 100644 --- a/src/dexorder/blockstate/db_state.py +++ b/src/dexorder/blockstate/db_state.py @@ -1,30 +1,53 @@ import logging -from typing import Iterable, Optional, Union +from typing import Iterable, Optional, Union, Any from . import DiffItem, BlockSet, BlockDict, DELETE, BlockState, current_blockstate, DataType from .blockdata import BlockData, SeriesCollection from .diff import DiffEntryItem from .. import db from ..base.chain import current_chain -from ..base.fork import current_fork +from ..base.fork import current_fork, Fork from ..database.model import SeriesSet, SeriesDict, Block from ..database.model.block import current_block, latest_block, completed_block -from ..util import keystr, strkey, hexbytes +from ..util import hexbytes log = logging.getLogger(__name__) class DbState(SeriesCollection): + + def __init__(self, series_or_datavars: Iterable[Union[Any, BlockData]]): + super().__init__(series_or_datavars) + for d in self.datas.values(): + if d.opts.get('db') == 'lazy': + d.lazy_getitem = DbState.lazy_getitem + + @staticmethod + def lazy_getitem(var: BlockData, item): + chain_id = current_chain.get().chain_id + t = var.type + Entity = SeriesSet if t == DataType.SET else SeriesDict if t == DataType.DICT else None + series = var.series2str(var.series) + key = var.key2str(item) + try: + height, blockhash = db.kv[f'root_block.{chain_id}'] + except: + return None + fork = Fork([hexbytes(blockhash)], height=height) + value = db.session.get(Entity, (chain_id, series, key)) + return fork, value + def save(self, root_block: Block, diffs: Iterable[Union[DiffItem,DiffEntryItem]] ): chain_id = current_chain.get().chain_id for diff in diffs: try: - t = self.datas[diff.series].type + d = self.datas[diff.series] + t = d.type except KeyError: continue - diffseries = keystr(diff.series) - diffkey = keystr(diff.key) + diffseries = d.series2str(diff.series) + diffkey = d.key2str(diff.key) key = dict(chain=chain_id, series=diffseries, key=diffkey) if diff.value is DELETE: Entity = SeriesSet if t == DataType.SET else SeriesDict if t == DataType.DICT else None @@ -38,7 +61,7 @@ class DbState(SeriesCollection): elif t == DataType.DICT: found = db.session.get(SeriesDict, key) if found is None: - db.session.add(SeriesDict(**key, value=diff.value)) + db.session.add(SeriesDict(**key, value=d.value2basic(diff.value))) else: found.value = diff.value else: @@ -61,16 +84,17 @@ class DbState(SeriesCollection): current_blockstate.set(state) current_fork.set(None) # root fork for series, data in self.datas.items(): - t = data.type - if t == DataType.SET: - # noinspection PyTypeChecker - var: BlockSet = BlockData.registry[series] - for row in db.session.query(SeriesSet).where(SeriesSet.series==keystr(series)): - var.add(strkey(row.key)) - elif t == DataType.DICT: - # noinspection PyTypeChecker - var: BlockDict = BlockData.registry[series] - for row in db.session.query(SeriesDict).where(SeriesDict.series==keystr(series)): - var[strkey(row.key)] = row.value + if data.opts.get('db') != 'lazy': + t = data.type + if t == DataType.SET: + # noinspection PyTypeChecker + var: BlockSet = BlockData.registry[series] + for row in db.session.query(SeriesSet).where(SeriesSet.series == data.series2str(series)): + var.add(data.str2key(row.key)) + elif t == DataType.DICT: + # noinspection PyTypeChecker + var: BlockDict = BlockData.registry[series] + for row in db.session.query(SeriesDict).where(SeriesDict.series == data.series2str(series)): + var[data.str2key(row.key)] = data.basic2value(row.value) completed_block.set(root_block) return state diff --git a/src/dexorder/blockstate/state.py b/src/dexorder/blockstate/state.py index d044b0b..5cc7ac0 100644 --- a/src/dexorder/blockstate/state.py +++ b/src/dexorder/blockstate/state.py @@ -161,6 +161,7 @@ class BlockState: # walk the by_height list to delete any aged-out block data # in order to prune diffs_by_series, updated_keys remembers all the keys that were touched by any aged-out block + series_deletions = [] updated_keys = set() while self.by_height and self.by_height[0].height <= block.height: dead = self.by_height.pop(0) @@ -171,7 +172,11 @@ class BlockState: pass block_diffs = self.diffs_by_hash.get(dead.hash) if block_diffs is not None: - updated_keys.update((d.series, d.key) for d in block_diffs) + for d in block_diffs: + if d.key == BlockState._DELETE_SERIES_KEY and dead.hash in new_root_fork: + series_deletions.append(d.series) + else: + updated_keys.add((d.series, d.key)) del self.diffs_by_hash[dead.hash] del self.ancestors[dead.hash] @@ -195,11 +200,20 @@ class BlockState: # if only one diff remains, and it's old, and it's a delete, then we can actually delete the diff list if not difflist or len(difflist) == 1 and difflist[0].value == DELETE and difflist[0].height <= new_root_fork.height: del self.diffs_by_series[s][k] - + for s in series_deletions: + del self.diffs_by_series[s] self.root_block = block log.debug(f'promoted root {self.root_block}') return diffs + _DELETE_SERIES_KEY = '!^DEXORDER_DELETE_SERIES^!' + def delete_series(self, fork: Optional[Fork], series: str): + """ + deletes the series entirely. the deletion is part of the blockstate, so the series could remain active on some branches until the + series deletion matures into finality. + """ + self.set(fork, series, BlockState._DELETE_SERIES_KEY, None) # setting any value on this special key will trigger a delete when this block finalizes + def collect_diffs(self, block: Block, series_key=NARG) -> list[DiffEntryItem]: """ returns a list of the latest DiffItem for each key change along the ancestor path from block to root diff --git a/src/dexorder/data/__init__.py b/src/dexorder/data/__init__.py index c65edff..902307c 100644 --- a/src/dexorder/data/__init__.py +++ b/src/dexorder/data/__init__.py @@ -4,7 +4,7 @@ from dexorder.blockstate import BlockSet, BlockDict # if pub is True, then event is the current series name, room is the key, and value is passed through # values of DELETE are serialized as nulls -vault_addresses = BlockSet('v', db=True, redis=True) +vault_owners = BlockDict('v', db=True, redis=True) vault_tokens = BlockDict('vt', db=True, redis=True, pub=True) pool_prices = BlockDict('p', db=True, redis=True, pub=True) underfunded_vaults = BlockSet('uv', db=True) diff --git a/src/dexorder/database/model/block.py b/src/dexorder/database/model/block.py index 08ab7e7..e8793b0 100644 --- a/src/dexorder/database/model/block.py +++ b/src/dexorder/database/model/block.py @@ -1,5 +1,4 @@ from contextvars import ContextVar -from typing import Optional from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import Mapped, mapped_column diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index 633ae3c..5128847 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -6,7 +6,7 @@ from dexorder import dec, current_pub, current_w3 from dexorder.base.chain import current_chain from dexorder.blockchain.util import vault_address, get_contract_event, get_factory, get_contract_data from dexorder.contract import VaultContract -from dexorder.data import pool_prices, vault_addresses, vault_tokens, underfunded_vaults +from dexorder.data import pool_prices, vault_owners, vault_tokens, underfunded_vaults from dexorder.database.model.block import current_block from dexorder.orderlib.orderlib import SwapOrderStatus @@ -36,7 +36,7 @@ async def handle_order_placed(event: EventData): start_index = int(event['args']['startOrderIndex']) num_orders = int(event['args']['numOrders']) log.debug(f'DexorderPlaced {addr} {start_index} {num_orders}') - if addr not in vault_addresses: + if addr not in vault_owners: log.warning(f'block {current_block.get()} order from unknown vault {addr}') # todo insert (short) block hash into all logs # return todo discard rogues vault = VaultContract(addr) @@ -67,7 +67,7 @@ def handle_order_error(event: EventData): def handle_transfer(transfer: EventData): to_address = transfer['args']['to'] log.debug(f'transfer {to_address}') - if to_address in vault_addresses: + if to_address in vault_owners: token_address = transfer['address'] vault_tokens.add(token_address) if to_address in underfunded_vaults: @@ -96,12 +96,12 @@ def handle_vault_created(created: EventData): return vault = vault_address(owner,num) log.debug(f'VaultCreated {owner} #{num} => {vault}') - vault_addresses.add(vault) + vault_owners[vault] = owner vaults = [] for num in range(256): addr = vault_address(owner, num) # log.debug(f'v{num}? {addr}') - if addr in vault_addresses: + if addr in vault_owners: vaults.append(addr) else: break diff --git a/src/dexorder/memcache/memcache_state.py b/src/dexorder/memcache/memcache_state.py index b8b8329..debaa40 100644 --- a/src/dexorder/memcache/memcache_state.py +++ b/src/dexorder/memcache/memcache_state.py @@ -13,7 +13,6 @@ from dexorder.blockstate.diff import DiffEntryItem from dexorder.blockstate.state import compress_diffs from dexorder.database.model import Block from dexorder.memcache import current_redis, memcache -from dexorder.util import keystr from dexorder.util.json import json_encoder log = logging.getLogger(__name__) @@ -57,9 +56,9 @@ class RedisState (SeriesCollection): except KeyError: continue t = d.type - series = f'{chain_id}|{keystr(diff.series)}' - key = keystr(diff.key) - value = keystr(diff.value) + series = f'{chain_id}|{d.series2str(diff.series)}' + key = d.key2str(diff.key) + value = diff.value # pub/sub socketio/redis pub_kv = d.opts.get('pub') if pub_kv is True: diff --git a/src/dexorder/order/orderlib.py b/src/dexorder/order/orderlib.py index 39b6b95..64334d3 100644 --- a/src/dexorder/order/orderlib.py +++ b/src/dexorder/order/orderlib.py @@ -55,15 +55,24 @@ class SwapOrder: self.outputDirectlyToOwner, self.chainOrder, [t.dump() for t in self.tranches]) @dataclass -class SwapOrderStatus: +class SwapStatus: + state: SwapOrderState # todo refactor into canceled flag + start: int + ocoGroup: Optional[int] + filledIn: Optional[int] # if None then look in the order_filled blockstate + filledOut: Optional[int] # if None then look in the order_filled blockstate + trancheFilledIn: Optional[list[int]] # if None then look in the tranche_filled blockstate + trancheFilledOut: Optional[list[int]] # if None then look in the tranche_filled blockstate + + +@dataclass +class SwapOrderStatus (SwapStatus): order: SwapOrder - state: SwapOrderState - start: int - ocoGroup: Optional[int] - filledIn: int - filledOut: int - trancheFilledIn: list[int] - trancheFilledOut: list[int] + + def __init__(self, order, *swapstatus_args): + """ init with order object first follewed by the swap status args""" + super().__init__(*swapstatus_args) + self.order = order @staticmethod def load(obj): diff --git a/src/dexorder/order/orderstate.py b/src/dexorder/order/orderstate.py index db01a5a..a8bc1d1 100644 --- a/src/dexorder/order/orderstate.py +++ b/src/dexorder/order/orderstate.py @@ -1,30 +1,65 @@ import logging +from dataclasses import dataclass -from dexorder.blockstate import BlockSet, BlockDict -from dexorder.order.orderlib import SwapOrderStatus -from dexorder.util import keystr +from dexorder.blockstate import BlockDict +from dexorder.order.orderlib import SwapStatus log = logging.getLogger(__name__) -def order_key( vault:str, order_index:int ): - return keystr(vault, str(order_index)) +@dataclass +class OrderKey: + vault: str + order_index: int -def tranche_key( vault:str, order_index:int, tranche_index:int ): - return keystr(vault,str(order_index),str(tranche_index)) + @staticmethod + def str2key(keystring: str): + vault, order_index = keystring.split('|') + return OrderKey(vault, int(order_index)) -active_orders = BlockSet('ao') # unfilled, not-canceled orders whose triggers have been loaded/set -order_remaining = BlockDict('or') # by order key -tranche_remaining = BlockDict('tr') # by tranche key + def __str__(self): + return f'{self.vault}|{self.order_index}' -# todo forcibly dispose of entire series +@dataclass +class TrancheKey (OrderKey): + tranche_index: int -class OrderState: - def __init__(self, vault:str, order_index:int, status: SwapOrderStatus): - self.vault = vault - self.order_index = order_index - self.key = f'{vault}|{order_index}' - self.tranche_keys = [f'{self.key}|{i}' for i in range(len(status.trancheFilledIn))] + @staticmethod + def str2key(keystring: str): + vault, order_index, tranche_index = keystring.split('|') + return TrancheKey(vault, int(order_index), int(tranche_index)) -###### TODO TODO TODO vault state needs to be a dict pointing to owner addr + def __str__(self): + return f'{self.vault}|{self.order_index}|{self.tranche_index}' +@dataclass +class Remaining: + isInput: bool # True iff the remaining amount is in terms of the input token + remaining: int + + @staticmethod + def basic2remaining(basic): + return Remaining(*basic) + + def remaining2basic(self): + return self.isInput, self.remaining + + +# ORDER STATE +# various blockstate fields hold different aspects of an order's state. + +# all order and status data: writes to db but lazy-loads +orders = BlockDict[OrderKey,SwapStatus]('o', str2key=OrderKey.str2key, db='lazy') # todo lazy what's that about? + +# the set of unfilled, not-canceled orders +active_orders = BlockDict[OrderKey]('ao', str2key=OrderKey.str2key, db=True, redis=True) + +# total remaining amount per order, for all unfilled, not-canceled orders +order_remaining = BlockDict[OrderKey,Remaining]( + 'or', str2key=OrderKey.str2key, value2basic=Remaining.remaining2basic, basic2value=Remaining.basic2remaining, db=True, redis=True) + +# total remaining amount per tranche +tranche_remaining = BlockDict[TrancheKey,Remaining]( + 'tr', str2key=TrancheKey.str2key, value2basic=Remaining.remaining2basic, basic2value=Remaining.basic2remaining, db=True, redis=True) + +# todo oco groups diff --git a/src/dexorder/util/__init__.py b/src/dexorder/util/__init__.py index d7250ba..b36ca2a 100644 --- a/src/dexorder/util/__init__.py +++ b/src/dexorder/util/__init__.py @@ -34,19 +34,17 @@ def hexbytes(value: str): """ converts an optionally 0x-prefixed hex string into bytes """ return bytes.fromhex(value[2:] if value.startswith('0x') else value) -def keystr(*keys): - return '|'.join( - value if type(value) is str else - value.hex() if type(value) is HexBytes else - '0x' + value.hex() if type(value) is bytes else - str(value) - for value in keys - ) -def strkey(s): - if s.startswith('0x'): - return hexbytes(s) - return s +def _keystr1(value): + t = type(value) + return value if t is str else value.hex() if t is HexBytes else '0x' + value.hex() if t is bytes else str(value) + + +def key2str(key): + return _keystr1(key) if type(key) not in (list, tuple) else '|'.join(_keystr1(v) for v in key) + +def str2key(s,types=None): + return tuple(s.split('|')) if types is None else tuple(t(v) for t,v in zip(types,s.split('|'))) def topic(event_abi): event_name = f'{event_abi["name"]}(' + ','.join(i['type'] for i in event_abi['inputs']) + ')'