From b8b1a39dfff803cdf21281de3fbe3794784cb2df Mon Sep 17 00:00:00 2001 From: Tim Olson <> Date: Mon, 6 Nov 2023 23:48:47 -0400 Subject: [PATCH] order fill state refactored; order status publishing --- src/dexorder/blockstate/__init__.py | 3 +- src/dexorder/blockstate/blockdata.py | 29 ++-- src/dexorder/blockstate/db_state.py | 6 +- src/dexorder/blockstate/state.py | 4 +- src/dexorder/configuration/schema.py | 3 +- src/dexorder/data/__init__.py | 23 +++- src/dexorder/event_handler.py | 10 +- src/dexorder/memcache/__init__.py | 1 - src/dexorder/memcache/memcache_state.py | 8 +- src/dexorder/order/orderlib.py | 3 +- src/dexorder/order/orderstate.py | 168 +++++++++++++++--------- src/dexorder/order/triggers.py | 7 +- src/dexorder/runner.py | 3 + 13 files changed, 172 insertions(+), 96 deletions(-) diff --git a/src/dexorder/blockstate/__init__.py b/src/dexorder/blockstate/__init__.py index 26cc2bf..797a4dc 100644 --- a/src/dexorder/blockstate/__init__.py +++ b/src/dexorder/blockstate/__init__.py @@ -1,7 +1,6 @@ -from .diff import DiffEntry, DiffItem, DELETE, UNLOAD from .state import BlockState, current_blockstate from .blockdata import DataType, BlockDict, BlockSet - +from .diff import DiffItem # def _test(): # diff --git a/src/dexorder/blockstate/blockdata.py b/src/dexorder/blockstate/blockdata.py index f0fb324..20ed475 100644 --- a/src/dexorder/blockstate/blockdata.py +++ b/src/dexorder/blockstate/blockdata.py @@ -50,13 +50,18 @@ class BlockData: def getitem(self, item, default=NARG): state = current_blockstate.get() fork = current_fork.get() - result = state.get(fork, self.series, item, default) - if result is NARG and self.lazy_getitem: - lazy = self.lazy_getitem(self, item) - if lazy is not None: - lookup_fork, lookup_value = lazy - if lookup_fork in fork: - result = lookup_value + try: + result = state.get(fork, self.series, item) + except KeyError: + result = default + if self.lazy_getitem: + lazy = self.lazy_getitem(self, item) + if lazy is not None: + lookup_fork, lookup_value = lazy + if lookup_fork in fork: + result = lookup_value + if result is NARG: + raise KeyError return result def delitem(self, item, overwrite=True): @@ -161,8 +166,14 @@ class BlockDict(Generic[K,V], BlockData): self.setitem(item, result) return result - def add(self, item: K, value: Union[V], default: V = 0) -> Union[V,DELETE]: - return self.modify(item, lambda v: v+value, default=default) + def add(self, key: K, value: V, default: V = 0) -> V: + return self.modify(key, lambda v: v + value, default=default) + + def listappend(self, key: K, value) -> V: + return self.modify(key, lambda v: v + [value], default=[]) + + def listremove(self, key: K, value) -> V: + return self.modify(key, lambda v: [x for x in v if x != value], default=[]) class SeriesCollection: diff --git a/src/dexorder/blockstate/db_state.py b/src/dexorder/blockstate/db_state.py index 9cf8af0..23e8410 100644 --- a/src/dexorder/blockstate/db_state.py +++ b/src/dexorder/blockstate/db_state.py @@ -1,10 +1,10 @@ import logging from typing import Iterable, Optional, Union, Any -from . import DiffItem, BlockSet, BlockDict, DELETE, BlockState, current_blockstate, DataType +from . import BlockSet, BlockDict, BlockState, current_blockstate, DataType from .blockdata import BlockData, SeriesCollection -from .diff import DiffEntryItem -from .. import db, UNLOAD +from .diff import DiffItem, DiffEntryItem +from .. import db, UNLOAD, DELETE from ..base.chain import current_chain from ..base.fork import current_fork, Fork from ..database.model import SeriesSet, SeriesDict, Block diff --git a/src/dexorder/blockstate/state.py b/src/dexorder/blockstate/state.py index 65453df..c3eee64 100644 --- a/src/dexorder/blockstate/state.py +++ b/src/dexorder/blockstate/state.py @@ -2,7 +2,7 @@ import itertools import logging from collections import defaultdict from contextvars import ContextVar -from typing import Any, Optional, Union, Sequence, Reversible +from typing import Any, Optional, Union, Reversible from sortedcontainers import SortedList @@ -10,7 +10,7 @@ from dexorder import NARG, UNLOAD from dexorder.base.fork import Fork, DisjointFork from dexorder.database.model import Block from dexorder.util import hexstr -from .diff import DiffEntry, DiffItem, DELETE, DiffEntryItem +from .diff import DiffEntry, DELETE, DiffEntryItem log = logging.getLogger(__name__) diff --git a/src/dexorder/configuration/schema.py b/src/dexorder/configuration/schema.py index 0afdc6c..5a9b565 100644 --- a/src/dexorder/configuration/schema.py +++ b/src/dexorder/configuration/schema.py @@ -1,6 +1,5 @@ -from collections import defaultdict from dataclasses import dataclass, field -from typing import Optional, Union +from typing import Optional # SCHEMA NOTES: diff --git a/src/dexorder/data/__init__.py b/src/dexorder/data/__init__.py index b541339..dbad513 100644 --- a/src/dexorder/data/__init__.py +++ b/src/dexorder/data/__init__.py @@ -1,19 +1,30 @@ +import logging + from dexorder import dec from dexorder.base.chain import current_chain from dexorder.blockstate import BlockDict from dexorder.blockstate.blockdata import K, V -from dexorder.util import json, defaultdictk +from dexorder.util import json # pub=... publishes to a channel for web clients to consume. argument is (key,value) and return must be (event,room,args) # if pub is True, then event is the current series name, room is the key, and args is [value] # values of DELETE are serialized as nulls +def pub_vault_balances(k, v): + chain_id = current_chain.get().chain_id + try: + return f'{chain_id}|{vault_owners[k]}', 'vb', (chain_id, k, json.dumps({k2: str(v2) for k2, v2 in v.items()})) + except KeyError: + logging.warning(f'No vault owner record for {k}') + return None # no vault on record + + vault_owners: BlockDict[str, str] = BlockDict('v', db=True, redis=True) vault_balances: BlockDict[str, dict[str, int]] = BlockDict( f'vb', db=True, redis=True, value2str=lambda d: json.dumps({k: str(v) for k, v in d.items()}), # ints can be large so we need to stringify them in JSON str2value=lambda s: {k: int(v) for k, v in json.loads(s).items()}, - pub=lambda k, v: (f'{current_chain.get().chain_id}|{vault_owners[k]}', 'vb', (k,json.dumps({k2: str(v2) for k2, v2 in v.items()}))) + pub=pub_vault_balances ) @@ -23,7 +34,9 @@ class PoolPrices (BlockDict[str, dec]): new_pool_prices[item] = value +def pub_pool_price(k,v): + chain_id = current_chain.get().chain_id + return f'{chain_id}|{k}', 'p', (chain_id, k, str(v)) + new_pool_prices: dict[str, dec] = {} # tracks which prices were set during the current block. cleared every block. -pool_prices: PoolPrices = PoolPrices('p', db=True, redis=True, - value2str=lambda d: f'{d:f}', str2value=dec, - pub=lambda k, v: (f'{current_chain.get().chain_id}|{k}', 'p', (k, str(v)))) +pool_prices: PoolPrices = PoolPrices('p', db=True, redis=True, pub=pub_pool_price, value2str=lambda d: f'{d:f}', str2value=dec) diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index aac0155..54bba0a 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -93,7 +93,13 @@ async def handle_order_placed(event: EventData): log.debug(f'DexorderPlaced {addr} {start_index} {num_orders}') if addr not in vault_owners: log.warning(f'block {current_block.get()} order from unknown vault {addr}') # todo insert (short) block hash into all logs - # return todo discard rogues + # return todo always discard rogues + # noinspection PyBroadException + try: + vault_owners[addr] = await VaultContract(addr).owner() + except Exception: + log.warning(f'vault owner for {addr} could not be found.') + return vault = VaultContract(addr) for index in range(start_index, start_index+num_orders): obj = await vault.swapOrderStatus(index) @@ -138,7 +144,7 @@ async def handle_order_completed(event: EventData): log.warning(f'DexorderCompleted IGNORED due to missing order {vault} {order_index}') status = await VaultContract(vault).swapOrderStatus(order_index) log.debug(f'SwapOrderStatus #todo {status}') - # todo + # todo anything? def handle_order_error(event: EventData): # event DexorderError (uint64 orderIndex, string reason); diff --git a/src/dexorder/memcache/__init__.py b/src/dexorder/memcache/__init__.py index c0c8fb7..ad40c5e 100644 --- a/src/dexorder/memcache/__init__.py +++ b/src/dexorder/memcache/__init__.py @@ -5,7 +5,6 @@ from contextvars import ContextVar import redis.asyncio as redis from redis.asyncio import Redis from redis.asyncio.client import Pipeline -from socket_io_emitter import Emitter from dexorder import config diff --git a/src/dexorder/memcache/memcache_state.py b/src/dexorder/memcache/memcache_state.py index a4cb003..7e8c52c 100644 --- a/src/dexorder/memcache/memcache_state.py +++ b/src/dexorder/memcache/memcache_state.py @@ -5,14 +5,16 @@ from typing import Iterable, Union, Reversible, Any from redis.asyncio.client import Pipeline from socket_io_emitter import Emitter +from dexorder import DELETE from dexorder.base.chain import current_chain from dexorder.base.fork import current_fork -from dexorder.blockstate import DiffItem, DataType, DELETE, BlockState +from dexorder.blockstate import DiffItem, DataType, BlockState from dexorder.blockstate.blockdata import SeriesCollection, BlockData from dexorder.blockstate.diff import DiffEntryItem from dexorder.blockstate.state import compress_diffs from dexorder.database.model import Block from dexorder.memcache import current_redis, memcache +from dexorder.util.async_util import maywait from dexorder.util.json import json_encoder log = logging.getLogger(__name__) @@ -59,12 +61,14 @@ class RedisState (SeriesCollection): series = f'{chain_id}|{d.series2str(diff.series)}' key = d.key2str(diff.key) value = d.value2str(diff.value) + if type(value) is not str: + raise RuntimeError # pub/sub socketio/redis pub_era = d.opts.get('pub') # event, room, args if pub_era is True: pub_era = series, key, [value] elif callable(pub_era): - pub_era = pub_era(diff.key, diff.value) + pub_era = await maywait(pub_era(diff.key, diff.value)) if pub_era is not None: e, r, a = pub_era # noinspection PyTypeChecker diff --git a/src/dexorder/order/orderlib.py b/src/dexorder/order/orderlib.py index 2d2ecc7..2e1c01e 100644 --- a/src/dexorder/order/orderlib.py +++ b/src/dexorder/order/orderlib.py @@ -1,3 +1,4 @@ +import copy import logging from abc import ABC, abstractmethod from dataclasses import dataclass @@ -99,7 +100,7 @@ class SwapOrderStatus (SwapStatus): return self.order.dump(), self.state.value, self.start, self.ocoGroup, self.filledIn, self.filledOut, self.trancheFilledIn, self.trancheFilledOut def copy(self): - return SwapOrderStatus.load(self.dump()) + return copy.deepcopy(self) NO_OCO = 18446744073709551615 # max uint64 diff --git a/src/dexorder/order/orderstate.py b/src/dexorder/order/orderstate.py index 6fd08f1..4b76196 100644 --- a/src/dexorder/order/orderstate.py +++ b/src/dexorder/order/orderstate.py @@ -1,10 +1,14 @@ +import copy import logging from dataclasses import dataclass from typing import overload +from dexorder.base.chain import current_chain from dexorder.base.order import OrderKey, TrancheKey from dexorder.blockstate import BlockDict, BlockSet +from dexorder.data import vault_owners from dexorder.order.orderlib import SwapOrderStatus, SwapOrderState +from dexorder.util import json log = logging.getLogger(__name__) @@ -15,11 +19,25 @@ class Filled: filled_out: int @staticmethod - def str2remaining(basic): - return Filled(*map(int,basic.split(','))) if basic else Filled(0,0) + def load(string): + return Filled(*map(int,string[1:-1].split(','))) - def remaining2str(self): - return f'{self.filled_in},{self.filled_out}' + def dump(self): + return str(self.filled_in), str(self.filled_out) + + +@dataclass +class OrderFilled: + filled: Filled + tranche_filled: list[Filled] + + @staticmethod + def load(string): + f, tfs = json.loads(string) + return OrderFilled(Filled(*f), [Filled(*tf) for tf in tfs]) + + def dump(self): + return [self.filled.dump(), [tf.dump() for tf in self.tranche_filled]] # todo oco groups @@ -53,13 +71,13 @@ class Order: def create(vault: str, order_index: int, status: SwapOrderStatus): """ use when a brand new order is detected by the system """ key = OrderKey(vault, order_index) - Order._statuses[key] = status.copy() # always copy the struct when setting. values in BlockData must be immutable + Order.order_statuses[key] = status.copy() # always copy the struct when setting. values in BlockData must be immutable order = Order(key) if order.is_open: - Order.open_keys.add(key) - Order._order_filled[key] = Filled(status.filledIn, status.filledOut) - for i, tk in enumerate(order.tranche_keys): - Order._tranche_filled[tk] = Filled(status.trancheFilledIn[i], status.trancheFilledOut[i]) + Order.open_orders.add(key) + Order.vault_open_orders.listappend(key.vault, key.order_index) + tranche_filled = [Filled(*f) for f in zip(status.trancheFilledIn, status.trancheFilledOut)] + Order.order_filled[key] = OrderFilled(Filled(status.filledIn, status.filledOut), tranche_filled) return order @overload @@ -73,7 +91,7 @@ class Order: key = a if b is None else OrderKey(a, b) assert key not in Order.instances self.key = key - self.status: SwapOrderStatus = Order._statuses[key].copy() + self.status: SwapOrderStatus = Order.order_statuses[key].copy() self.pool_address: str = self.status.order.pool_address self.tranche_keys = [TrancheKey(key.vault, key.order_index, i) for i in range(len(self.status.trancheFilledIn))] # various flattenings @@ -93,23 +111,26 @@ class Order: @property def filled_in(self): - return Order._order_filled[self.key].filled_in if self.is_open else self.status.filledIn + return Order.order_filled[self.key].filled.filled_in if self.is_open else self.status.filledIn @property def filled_out(self): - return Order._order_filled[self.key].filled_out if self.is_open else self.status.filledOut + return Order.order_filled[self.key].filled.filled_out if self.is_open else self.status.filledOut - def tranche_filled_in(self, tk: TrancheKey): - return Order._tranche_filled[tk].filled_in if self.is_open else self.status.trancheFilledIn[tk.tranche_index] + def tranche_filled_in(self, tranche_index: int): + return Order.order_filled[self.key].tranche_filled[tranche_index].filled_in if self.is_open \ + else self.status.trancheFilledIn[tranche_index] - def tranche_filled_out(self, tk: TrancheKey): - return Order._tranche_filled[tk].filled_out if self.is_open else self.status.trancheFilledIn[tk.tranche_index] + def tranche_filled_out(self, tranche_index: int): + return Order.order_filled[self.key].tranche_filled[tranche_index].filled_out if self.is_open \ + else self.status.trancheFilledIn[tranche_index] - def tranche_filled(self, tk: TrancheKey): - return self.tranche_filled_in(tk) if self.amount_is_input else self.tranche_filled_out(tk) + def tranche_filled(self, tranche_index: int): + return self.tranche_filled_in(tranche_index) if self.amount_is_input \ + else self.tranche_filled_out(tranche_index) - def tranche_remaining(self, tk: TrancheKey): - return self.tranche_amounts[tk.tranche_index] - self.tranche_filled(tk) + def tranche_remaining(self, tranche_index: int): + return self.tranche_amounts[tranche_index] - self.tranche_filled(tranche_index) @property def filled(self): @@ -122,65 +143,86 @@ class Order: def add_fill(self, tranche_index: int, filled_in: int, filled_out: int): - # order fill - old = Order._order_filled[self.key] - fin = old.filled_in + filled_in - fout = old.filled_out + filled_out - Order._order_filled[self.key] = Filled(fin, fout) - # tranche fill - tk = self.tranche_keys[tranche_index] - old = Order._tranche_filled[tk] - fin = old.filled_in + filled_in - fout = old.filled_out + filled_out - Order._tranche_filled[tk] = Filled(fin, fout) + old = Order.order_filled[self.key] + new = copy.deepcopy(old) + new.filled.filled_in += filled_in + new.filled.filled_out += filled_out + new.tranche_filled[tranche_index].filled_in += filled_in + new.tranche_filled[tranche_index].filled_out += filled_out + Order.order_filled[self.key] = new + def complete(self, final_state: SwapOrderState): """ updates the static order record with its final values, then deletes all its dynamic blockstate and removes the Order from the actives list """ assert final_state is not SwapOrderState.Open - status = self.status + status = self.status.copy() status.state = final_state if self.is_open: - Order.open_keys.remove(self.key) + Order.open_orders.remove(self.key) + Order.vault_open_orders.listremove(self.key.vault, self.key.order_index) # set final fill values in the status - filled = Order._order_filled[self.key] - try: - del Order._order_filled[self.key] - except KeyError: - pass - status.filledIn = filled.filled_in - status.filledOut = filled.filled_out - for i, tk in enumerate(self.tranche_keys): - try: - filled = Order._tranche_filled[tk] - del Order._tranche_filled[tk] - status.trancheFilledIn[i] = filled.filled_in - status.trancheFilledOut[i] = filled.filled_out - except KeyError: - pass - final_status = status.copy() - Order._statuses[self.key] = final_status # set the status in order to save it - Order._statuses.unload(self.key) # but then unload from memory after root promotion + of = Order.order_filled[self.key] + del Order.order_filled[self.key] + status.filledIn = of.filled.filled_in + status.filledOut = of.filled.filled_out + for i, tf in enumerate(of.tranche_filled): + status.trancheFilledIn[i] += of.tranche_filled[i].filled_in + status.trancheFilledOut[i] += of.tranche_filled[i].filled_out + Order.order_statuses[self.key] = status # set the status in order to save it + Order.order_statuses.unload(self.key) # but then unload from memory after root promotion + + @staticmethod + async def pub_order_status(k, v): + # publish status updates (on placing and completion) to web clients + try: + chain_id = current_chain.get().chain_id + return (f'{chain_id}|{vault_owners[k.vault]}', # publish on the vault owner's channel + 'o', # order message type + (chain_id, k.vault, k.order_index, v.dump())) # (order_index, order_status) + except KeyError: + log.warning(f'No vault owner for {k}') + return None + + @staticmethod + async def pub_order_fills(k, v): + # publish status updates (on placing and completion) to web clients + try: + chain_id = current_chain.get().chain_id + return (f'{chain_id}|{vault_owners[k.vault]}', # publish on the vault owner's channel + 'of', # order message type + (chain_id, k.vault, k.order_index, v.dump())) # (order_index, order_fills) + except KeyError: + log.warning(f'No vault owner for {k}') + return None # ORDER STATE # various blockstate fields hold different aspects of an order's state. - # open orders = the set of unfilled, not-canceled orders - open_keys: BlockSet[OrderKey] = BlockSet('oo', db=True, redis=True, str2key=OrderKey.str2key) - - # this series holds "everything" about an order in the canonical format specified by the contract orderlib, except + # order statuses + # this is the main order table. + # it holds "everything" about an order in the canonical format specified by the contract orderlib, except that # the filled amount fields for active orders are maintained in the order_remainings and tranche_remainings series. - _statuses: BlockDict[OrderKey, SwapOrderStatus] = BlockDict( - 'o', db='lazy', str2key=OrderKey.str2key, value2str=SwapOrderStatus.dump, str2value=SwapOrderStatus.load) + order_statuses: BlockDict[OrderKey, SwapOrderStatus] = BlockDict( + 'o', db='lazy', redis=True, pub=pub_order_status, + str2key=OrderKey.str2key, value2str=lambda v: json.dumps(v.dump()), str2value=SwapOrderStatus.load, + ) - # total remaining amount per order, for all unfilled, not-canceled orders - _order_filled: BlockDict[OrderKey, Filled] = BlockDict( - 'of', db=True, redis=True, str2key=OrderKey.str2key, value2str=Filled.remaining2str, str2value=Filled.str2remaining) + # open orders = the set of unfilled, not-canceled orders + open_orders: BlockSet[OrderKey] = BlockSet('oo', db=True, str2key=OrderKey.str2key) - # total remaining amount per tranche - _tranche_filled: BlockDict[TrancheKey, Filled] = BlockDict( - 'tf', db=True, redis=True, str2key=TrancheKey.str2key, value2str=Filled.remaining2str, str2value=Filled.str2remaining) + # open orders organized by vault + vault_open_orders: BlockDict[str, list[int]] = BlockDict('voo', db=True, redis=True) + + # fill amounts for open orders are stored here so any updates and publishes do not have to work with the + # entire order structure, much of which is static. so any open orders must load the order_status entry first + # and then overide the fill values with the data from the order_filled table. once the order completes and + # is removed from open_orders, the order_status directly contains the final fill values. + order_filled: BlockDict[OrderKey, OrderFilled] = BlockDict( + 'of', db=True, redis=True, pub=pub_order_fills, + str2key=OrderKey.str2key, value2str=lambda v: json.dumps(v.dump()), str2value=OrderFilled.load) +# "active" means the order wants to be executed now. this is not BlockData because it's cleared every block active_orders: dict[OrderKey,Order] = {} diff --git a/src/dexorder/order/triggers.py b/src/dexorder/order/triggers.py index 00ce252..f66a77c 100644 --- a/src/dexorder/order/triggers.py +++ b/src/dexorder/order/triggers.py @@ -2,7 +2,7 @@ import asyncio import logging from collections import defaultdict from enum import Enum, auto -from typing import Callable, Optional, Union, Coroutine, Awaitable +from typing import Callable, Optional, Union, Awaitable from dexorder.blockstate import BlockSet, BlockDict from .orderlib import TimeConstraint, LineConstraint, ConstraintMode, SwapOrderState, PriceProof @@ -10,7 +10,6 @@ from dexorder.util import defaultdictk from .orderstate import Order from .. import dec from ..base.order import OrderKey, TrancheKey, ExecutionRequest -from ..data import pool_prices from ..database.model.block import current_block log = logging.getLogger(__name__) @@ -51,7 +50,7 @@ class TrancheTrigger: tranche = order.order.tranches[self.tk.tranche_index] tranche_amount = tranche.fraction_of(order.amount) - tranche_filled = order.tranche_filled(self.tk) + tranche_filled = order.tranche_filled(self.tk.tranche_index) tranche_remaining = tranche_amount - tranche_filled if tranche_remaining <= 0: @@ -129,7 +128,7 @@ class TrancheTrigger: active_tranches[self.tk] = None # or PriceProof(...) def fill(self, _amount_in, _amount_out ): - remaining = self.order.tranche_remaining(self.tk) + remaining = self.order.tranche_remaining(self.tk.tranche_index) filled = remaining <= 0 if filled: log.debug(f'tranche filled {self.tk}') diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index 78ded23..6a7c1f9 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -5,6 +5,7 @@ from typing import Callable, Union, Any, Iterable from web3.contract.contract import ContractEvents from web3.exceptions import LogTopicError, MismatchedABI +# noinspection PyPackageRequirements from websockets.exceptions import ConnectionClosedError from dexorder import Blockchain, db, blockchain, current_pub, async_yield, current_w3 @@ -105,7 +106,9 @@ class BlockStateRunner: except (ConnectionClosedError, TimeoutError): pass finally: + # noinspection PyBroadException try: + # noinspection PyUnresolvedReferences await w3ws.provider.disconnect() except Exception: pass