order fill state refactored; order status publishing

This commit is contained in:
Tim Olson
2023-11-06 23:48:47 -04:00
parent 0e49ebdcc5
commit b8b1a39dff
13 changed files with 172 additions and 96 deletions

View File

@@ -1,7 +1,6 @@
from .diff import DiffEntry, DiffItem, DELETE, UNLOAD
from .state import BlockState, current_blockstate
from .blockdata import DataType, BlockDict, BlockSet
from .diff import DiffItem
# def _test():
#

View File

@@ -50,13 +50,18 @@ class BlockData:
def getitem(self, item, default=NARG):
state = current_blockstate.get()
fork = current_fork.get()
result = state.get(fork, self.series, item, default)
if result is NARG and self.lazy_getitem:
lazy = self.lazy_getitem(self, item)
if lazy is not None:
lookup_fork, lookup_value = lazy
if lookup_fork in fork:
result = lookup_value
try:
result = state.get(fork, self.series, item)
except KeyError:
result = default
if self.lazy_getitem:
lazy = self.lazy_getitem(self, item)
if lazy is not None:
lookup_fork, lookup_value = lazy
if lookup_fork in fork:
result = lookup_value
if result is NARG:
raise KeyError
return result
def delitem(self, item, overwrite=True):
@@ -161,8 +166,14 @@ class BlockDict(Generic[K,V], BlockData):
self.setitem(item, result)
return result
def add(self, item: K, value: Union[V], default: V = 0) -> Union[V,DELETE]:
return self.modify(item, lambda v: v+value, default=default)
def add(self, key: K, value: V, default: V = 0) -> V:
return self.modify(key, lambda v: v + value, default=default)
def listappend(self, key: K, value) -> V:
return self.modify(key, lambda v: v + [value], default=[])
def listremove(self, key: K, value) -> V:
return self.modify(key, lambda v: [x for x in v if x != value], default=[])
class SeriesCollection:

View File

@@ -1,10 +1,10 @@
import logging
from typing import Iterable, Optional, Union, Any
from . import DiffItem, BlockSet, BlockDict, DELETE, BlockState, current_blockstate, DataType
from . import BlockSet, BlockDict, BlockState, current_blockstate, DataType
from .blockdata import BlockData, SeriesCollection
from .diff import DiffEntryItem
from .. import db, UNLOAD
from .diff import DiffItem, DiffEntryItem
from .. import db, UNLOAD, DELETE
from ..base.chain import current_chain
from ..base.fork import current_fork, Fork
from ..database.model import SeriesSet, SeriesDict, Block

View File

@@ -2,7 +2,7 @@ import itertools
import logging
from collections import defaultdict
from contextvars import ContextVar
from typing import Any, Optional, Union, Sequence, Reversible
from typing import Any, Optional, Union, Reversible
from sortedcontainers import SortedList
@@ -10,7 +10,7 @@ from dexorder import NARG, UNLOAD
from dexorder.base.fork import Fork, DisjointFork
from dexorder.database.model import Block
from dexorder.util import hexstr
from .diff import DiffEntry, DiffItem, DELETE, DiffEntryItem
from .diff import DiffEntry, DELETE, DiffEntryItem
log = logging.getLogger(__name__)

View File

@@ -1,6 +1,5 @@
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Optional, Union
from typing import Optional
# SCHEMA NOTES:

View File

@@ -1,19 +1,30 @@
import logging
from dexorder import dec
from dexorder.base.chain import current_chain
from dexorder.blockstate import BlockDict
from dexorder.blockstate.blockdata import K, V
from dexorder.util import json, defaultdictk
from dexorder.util import json
# pub=... publishes to a channel for web clients to consume. argument is (key,value) and return must be (event,room,args)
# if pub is True, then event is the current series name, room is the key, and args is [value]
# values of DELETE are serialized as nulls
def pub_vault_balances(k, v):
chain_id = current_chain.get().chain_id
try:
return f'{chain_id}|{vault_owners[k]}', 'vb', (chain_id, k, json.dumps({k2: str(v2) for k2, v2 in v.items()}))
except KeyError:
logging.warning(f'No vault owner record for {k}')
return None # no vault on record
vault_owners: BlockDict[str, str] = BlockDict('v', db=True, redis=True)
vault_balances: BlockDict[str, dict[str, int]] = BlockDict(
f'vb', db=True, redis=True,
value2str=lambda d: json.dumps({k: str(v) for k, v in d.items()}), # ints can be large so we need to stringify them in JSON
str2value=lambda s: {k: int(v) for k, v in json.loads(s).items()},
pub=lambda k, v: (f'{current_chain.get().chain_id}|{vault_owners[k]}', 'vb', (k,json.dumps({k2: str(v2) for k2, v2 in v.items()})))
pub=pub_vault_balances
)
@@ -23,7 +34,9 @@ class PoolPrices (BlockDict[str, dec]):
new_pool_prices[item] = value
def pub_pool_price(k,v):
chain_id = current_chain.get().chain_id
return f'{chain_id}|{k}', 'p', (chain_id, k, str(v))
new_pool_prices: dict[str, dec] = {} # tracks which prices were set during the current block. cleared every block.
pool_prices: PoolPrices = PoolPrices('p', db=True, redis=True,
value2str=lambda d: f'{d:f}', str2value=dec,
pub=lambda k, v: (f'{current_chain.get().chain_id}|{k}', 'p', (k, str(v))))
pool_prices: PoolPrices = PoolPrices('p', db=True, redis=True, pub=pub_pool_price, value2str=lambda d: f'{d:f}', str2value=dec)

View File

@@ -93,7 +93,13 @@ async def handle_order_placed(event: EventData):
log.debug(f'DexorderPlaced {addr} {start_index} {num_orders}')
if addr not in vault_owners:
log.warning(f'block {current_block.get()} order from unknown vault {addr}') # todo insert (short) block hash into all logs
# return todo discard rogues
# return todo always discard rogues
# noinspection PyBroadException
try:
vault_owners[addr] = await VaultContract(addr).owner()
except Exception:
log.warning(f'vault owner for {addr} could not be found.')
return
vault = VaultContract(addr)
for index in range(start_index, start_index+num_orders):
obj = await vault.swapOrderStatus(index)
@@ -138,7 +144,7 @@ async def handle_order_completed(event: EventData):
log.warning(f'DexorderCompleted IGNORED due to missing order {vault} {order_index}')
status = await VaultContract(vault).swapOrderStatus(order_index)
log.debug(f'SwapOrderStatus #todo {status}')
# todo
# todo anything?
def handle_order_error(event: EventData):
# event DexorderError (uint64 orderIndex, string reason);

View File

@@ -5,7 +5,6 @@ from contextvars import ContextVar
import redis.asyncio as redis
from redis.asyncio import Redis
from redis.asyncio.client import Pipeline
from socket_io_emitter import Emitter
from dexorder import config

View File

@@ -5,14 +5,16 @@ from typing import Iterable, Union, Reversible, Any
from redis.asyncio.client import Pipeline
from socket_io_emitter import Emitter
from dexorder import DELETE
from dexorder.base.chain import current_chain
from dexorder.base.fork import current_fork
from dexorder.blockstate import DiffItem, DataType, DELETE, BlockState
from dexorder.blockstate import DiffItem, DataType, BlockState
from dexorder.blockstate.blockdata import SeriesCollection, BlockData
from dexorder.blockstate.diff import DiffEntryItem
from dexorder.blockstate.state import compress_diffs
from dexorder.database.model import Block
from dexorder.memcache import current_redis, memcache
from dexorder.util.async_util import maywait
from dexorder.util.json import json_encoder
log = logging.getLogger(__name__)
@@ -59,12 +61,14 @@ class RedisState (SeriesCollection):
series = f'{chain_id}|{d.series2str(diff.series)}'
key = d.key2str(diff.key)
value = d.value2str(diff.value)
if type(value) is not str:
raise RuntimeError
# pub/sub socketio/redis
pub_era = d.opts.get('pub') # event, room, args
if pub_era is True:
pub_era = series, key, [value]
elif callable(pub_era):
pub_era = pub_era(diff.key, diff.value)
pub_era = await maywait(pub_era(diff.key, diff.value))
if pub_era is not None:
e, r, a = pub_era
# noinspection PyTypeChecker

View File

@@ -1,3 +1,4 @@
import copy
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
@@ -99,7 +100,7 @@ class SwapOrderStatus (SwapStatus):
return self.order.dump(), self.state.value, self.start, self.ocoGroup, self.filledIn, self.filledOut, self.trancheFilledIn, self.trancheFilledOut
def copy(self):
return SwapOrderStatus.load(self.dump())
return copy.deepcopy(self)
NO_OCO = 18446744073709551615 # max uint64

View File

@@ -1,10 +1,14 @@
import copy
import logging
from dataclasses import dataclass
from typing import overload
from dexorder.base.chain import current_chain
from dexorder.base.order import OrderKey, TrancheKey
from dexorder.blockstate import BlockDict, BlockSet
from dexorder.data import vault_owners
from dexorder.order.orderlib import SwapOrderStatus, SwapOrderState
from dexorder.util import json
log = logging.getLogger(__name__)
@@ -15,11 +19,25 @@ class Filled:
filled_out: int
@staticmethod
def str2remaining(basic):
return Filled(*map(int,basic.split(','))) if basic else Filled(0,0)
def load(string):
return Filled(*map(int,string[1:-1].split(',')))
def remaining2str(self):
return f'{self.filled_in},{self.filled_out}'
def dump(self):
return str(self.filled_in), str(self.filled_out)
@dataclass
class OrderFilled:
filled: Filled
tranche_filled: list[Filled]
@staticmethod
def load(string):
f, tfs = json.loads(string)
return OrderFilled(Filled(*f), [Filled(*tf) for tf in tfs])
def dump(self):
return [self.filled.dump(), [tf.dump() for tf in self.tranche_filled]]
# todo oco groups
@@ -53,13 +71,13 @@ class Order:
def create(vault: str, order_index: int, status: SwapOrderStatus):
""" use when a brand new order is detected by the system """
key = OrderKey(vault, order_index)
Order._statuses[key] = status.copy() # always copy the struct when setting. values in BlockData must be immutable
Order.order_statuses[key] = status.copy() # always copy the struct when setting. values in BlockData must be immutable
order = Order(key)
if order.is_open:
Order.open_keys.add(key)
Order._order_filled[key] = Filled(status.filledIn, status.filledOut)
for i, tk in enumerate(order.tranche_keys):
Order._tranche_filled[tk] = Filled(status.trancheFilledIn[i], status.trancheFilledOut[i])
Order.open_orders.add(key)
Order.vault_open_orders.listappend(key.vault, key.order_index)
tranche_filled = [Filled(*f) for f in zip(status.trancheFilledIn, status.trancheFilledOut)]
Order.order_filled[key] = OrderFilled(Filled(status.filledIn, status.filledOut), tranche_filled)
return order
@overload
@@ -73,7 +91,7 @@ class Order:
key = a if b is None else OrderKey(a, b)
assert key not in Order.instances
self.key = key
self.status: SwapOrderStatus = Order._statuses[key].copy()
self.status: SwapOrderStatus = Order.order_statuses[key].copy()
self.pool_address: str = self.status.order.pool_address
self.tranche_keys = [TrancheKey(key.vault, key.order_index, i) for i in range(len(self.status.trancheFilledIn))]
# various flattenings
@@ -93,23 +111,26 @@ class Order:
@property
def filled_in(self):
return Order._order_filled[self.key].filled_in if self.is_open else self.status.filledIn
return Order.order_filled[self.key].filled.filled_in if self.is_open else self.status.filledIn
@property
def filled_out(self):
return Order._order_filled[self.key].filled_out if self.is_open else self.status.filledOut
return Order.order_filled[self.key].filled.filled_out if self.is_open else self.status.filledOut
def tranche_filled_in(self, tk: TrancheKey):
return Order._tranche_filled[tk].filled_in if self.is_open else self.status.trancheFilledIn[tk.tranche_index]
def tranche_filled_in(self, tranche_index: int):
return Order.order_filled[self.key].tranche_filled[tranche_index].filled_in if self.is_open \
else self.status.trancheFilledIn[tranche_index]
def tranche_filled_out(self, tk: TrancheKey):
return Order._tranche_filled[tk].filled_out if self.is_open else self.status.trancheFilledIn[tk.tranche_index]
def tranche_filled_out(self, tranche_index: int):
return Order.order_filled[self.key].tranche_filled[tranche_index].filled_out if self.is_open \
else self.status.trancheFilledIn[tranche_index]
def tranche_filled(self, tk: TrancheKey):
return self.tranche_filled_in(tk) if self.amount_is_input else self.tranche_filled_out(tk)
def tranche_filled(self, tranche_index: int):
return self.tranche_filled_in(tranche_index) if self.amount_is_input \
else self.tranche_filled_out(tranche_index)
def tranche_remaining(self, tk: TrancheKey):
return self.tranche_amounts[tk.tranche_index] - self.tranche_filled(tk)
def tranche_remaining(self, tranche_index: int):
return self.tranche_amounts[tranche_index] - self.tranche_filled(tranche_index)
@property
def filled(self):
@@ -122,65 +143,86 @@ class Order:
def add_fill(self, tranche_index: int, filled_in: int, filled_out: int):
# order fill
old = Order._order_filled[self.key]
fin = old.filled_in + filled_in
fout = old.filled_out + filled_out
Order._order_filled[self.key] = Filled(fin, fout)
# tranche fill
tk = self.tranche_keys[tranche_index]
old = Order._tranche_filled[tk]
fin = old.filled_in + filled_in
fout = old.filled_out + filled_out
Order._tranche_filled[tk] = Filled(fin, fout)
old = Order.order_filled[self.key]
new = copy.deepcopy(old)
new.filled.filled_in += filled_in
new.filled.filled_out += filled_out
new.tranche_filled[tranche_index].filled_in += filled_in
new.tranche_filled[tranche_index].filled_out += filled_out
Order.order_filled[self.key] = new
def complete(self, final_state: SwapOrderState):
""" updates the static order record with its final values, then deletes all its dynamic blockstate and removes the Order from the actives list """
assert final_state is not SwapOrderState.Open
status = self.status
status = self.status.copy()
status.state = final_state
if self.is_open:
Order.open_keys.remove(self.key)
Order.open_orders.remove(self.key)
Order.vault_open_orders.listremove(self.key.vault, self.key.order_index)
# set final fill values in the status
filled = Order._order_filled[self.key]
try:
del Order._order_filled[self.key]
except KeyError:
pass
status.filledIn = filled.filled_in
status.filledOut = filled.filled_out
for i, tk in enumerate(self.tranche_keys):
try:
filled = Order._tranche_filled[tk]
del Order._tranche_filled[tk]
status.trancheFilledIn[i] = filled.filled_in
status.trancheFilledOut[i] = filled.filled_out
except KeyError:
pass
final_status = status.copy()
Order._statuses[self.key] = final_status # set the status in order to save it
Order._statuses.unload(self.key) # but then unload from memory after root promotion
of = Order.order_filled[self.key]
del Order.order_filled[self.key]
status.filledIn = of.filled.filled_in
status.filledOut = of.filled.filled_out
for i, tf in enumerate(of.tranche_filled):
status.trancheFilledIn[i] += of.tranche_filled[i].filled_in
status.trancheFilledOut[i] += of.tranche_filled[i].filled_out
Order.order_statuses[self.key] = status # set the status in order to save it
Order.order_statuses.unload(self.key) # but then unload from memory after root promotion
@staticmethod
async def pub_order_status(k, v):
# publish status updates (on placing and completion) to web clients
try:
chain_id = current_chain.get().chain_id
return (f'{chain_id}|{vault_owners[k.vault]}', # publish on the vault owner's channel
'o', # order message type
(chain_id, k.vault, k.order_index, v.dump())) # (order_index, order_status)
except KeyError:
log.warning(f'No vault owner for {k}')
return None
@staticmethod
async def pub_order_fills(k, v):
# publish status updates (on placing and completion) to web clients
try:
chain_id = current_chain.get().chain_id
return (f'{chain_id}|{vault_owners[k.vault]}', # publish on the vault owner's channel
'of', # order message type
(chain_id, k.vault, k.order_index, v.dump())) # (order_index, order_fills)
except KeyError:
log.warning(f'No vault owner for {k}')
return None
# ORDER STATE
# various blockstate fields hold different aspects of an order's state.
# open orders = the set of unfilled, not-canceled orders
open_keys: BlockSet[OrderKey] = BlockSet('oo', db=True, redis=True, str2key=OrderKey.str2key)
# this series holds "everything" about an order in the canonical format specified by the contract orderlib, except
# order statuses
# this is the main order table.
# it holds "everything" about an order in the canonical format specified by the contract orderlib, except that
# the filled amount fields for active orders are maintained in the order_remainings and tranche_remainings series.
_statuses: BlockDict[OrderKey, SwapOrderStatus] = BlockDict(
'o', db='lazy', str2key=OrderKey.str2key, value2str=SwapOrderStatus.dump, str2value=SwapOrderStatus.load)
order_statuses: BlockDict[OrderKey, SwapOrderStatus] = BlockDict(
'o', db='lazy', redis=True, pub=pub_order_status,
str2key=OrderKey.str2key, value2str=lambda v: json.dumps(v.dump()), str2value=SwapOrderStatus.load,
)
# total remaining amount per order, for all unfilled, not-canceled orders
_order_filled: BlockDict[OrderKey, Filled] = BlockDict(
'of', db=True, redis=True, str2key=OrderKey.str2key, value2str=Filled.remaining2str, str2value=Filled.str2remaining)
# open orders = the set of unfilled, not-canceled orders
open_orders: BlockSet[OrderKey] = BlockSet('oo', db=True, str2key=OrderKey.str2key)
# total remaining amount per tranche
_tranche_filled: BlockDict[TrancheKey, Filled] = BlockDict(
'tf', db=True, redis=True, str2key=TrancheKey.str2key, value2str=Filled.remaining2str, str2value=Filled.str2remaining)
# open orders organized by vault
vault_open_orders: BlockDict[str, list[int]] = BlockDict('voo', db=True, redis=True)
# fill amounts for open orders are stored here so any updates and publishes do not have to work with the
# entire order structure, much of which is static. so any open orders must load the order_status entry first
# and then overide the fill values with the data from the order_filled table. once the order completes and
# is removed from open_orders, the order_status directly contains the final fill values.
order_filled: BlockDict[OrderKey, OrderFilled] = BlockDict(
'of', db=True, redis=True, pub=pub_order_fills,
str2key=OrderKey.str2key, value2str=lambda v: json.dumps(v.dump()), str2value=OrderFilled.load)
# "active" means the order wants to be executed now. this is not BlockData because it's cleared every block
active_orders: dict[OrderKey,Order] = {}

View File

@@ -2,7 +2,7 @@ import asyncio
import logging
from collections import defaultdict
from enum import Enum, auto
from typing import Callable, Optional, Union, Coroutine, Awaitable
from typing import Callable, Optional, Union, Awaitable
from dexorder.blockstate import BlockSet, BlockDict
from .orderlib import TimeConstraint, LineConstraint, ConstraintMode, SwapOrderState, PriceProof
@@ -10,7 +10,6 @@ from dexorder.util import defaultdictk
from .orderstate import Order
from .. import dec
from ..base.order import OrderKey, TrancheKey, ExecutionRequest
from ..data import pool_prices
from ..database.model.block import current_block
log = logging.getLogger(__name__)
@@ -51,7 +50,7 @@ class TrancheTrigger:
tranche = order.order.tranches[self.tk.tranche_index]
tranche_amount = tranche.fraction_of(order.amount)
tranche_filled = order.tranche_filled(self.tk)
tranche_filled = order.tranche_filled(self.tk.tranche_index)
tranche_remaining = tranche_amount - tranche_filled
if tranche_remaining <= 0:
@@ -129,7 +128,7 @@ class TrancheTrigger:
active_tranches[self.tk] = None # or PriceProof(...)
def fill(self, _amount_in, _amount_out ):
remaining = self.order.tranche_remaining(self.tk)
remaining = self.order.tranche_remaining(self.tk.tranche_index)
filled = remaining <= 0
if filled:
log.debug(f'tranche filled {self.tk}')

View File

@@ -5,6 +5,7 @@ from typing import Callable, Union, Any, Iterable
from web3.contract.contract import ContractEvents
from web3.exceptions import LogTopicError, MismatchedABI
# noinspection PyPackageRequirements
from websockets.exceptions import ConnectionClosedError
from dexorder import Blockchain, db, blockchain, current_pub, async_yield, current_w3
@@ -105,7 +106,9 @@ class BlockStateRunner:
except (ConnectionClosedError, TimeoutError):
pass
finally:
# noinspection PyBroadException
try:
# noinspection PyUnresolvedReferences
await w3ws.provider.disconnect()
except Exception:
pass