From 6b15634ddc4b8b0e48a2bbefed01cc5b26b9ad4d Mon Sep 17 00:00:00 2001 From: Tim Olson <> Date: Sat, 14 Oct 2023 22:06:07 -0400 Subject: [PATCH] order placement doesnt crash --- src/dexorder/blockchain/by_blockchain.py | 13 ++-- src/dexorder/blockchain/connection.py | 2 +- src/dexorder/blockchain/uniswap.py | 12 +-- src/dexorder/blockstate/blockdata.py | 6 +- src/dexorder/blockstate/db_state.py | 4 +- src/dexorder/contract/__init__.py | 11 +-- src/dexorder/contract/pool_contract.py | 4 + src/dexorder/contract/uniswap_contracts.py | 4 + src/dexorder/data/__init__.py | 13 ++-- src/dexorder/database/model/block.py | 6 ++ src/dexorder/event_handler.py | 73 ++++++++++++++----- src/dexorder/memcache/memcache_state.py | 18 ++--- src/dexorder/order/orderlib.py | 18 +++-- src/dexorder/order/orderstate.py | 56 +++++++------- src/dexorder/order/triggers.py | 58 ++++++++++++--- src/dexorder/runner.py | 70 ++++++++++-------- src/dexorder/util/__init__.py | 4 + .../util.py => util/uniswap_util.py} | 3 + 18 files changed, 248 insertions(+), 127 deletions(-) rename src/dexorder/{blockchain/util.py => util/uniswap_util.py} (93%) diff --git a/src/dexorder/blockchain/by_blockchain.py b/src/dexorder/blockchain/by_blockchain.py index 657c402..c7b6006 100644 --- a/src/dexorder/blockchain/by_blockchain.py +++ b/src/dexorder/blockchain/by_blockchain.py @@ -1,6 +1,7 @@ from typing import Generic, TypeVar, Any, Iterator from dexorder import NARG +from dexorder.base.chain import current_chain _T = TypeVar('_T') @@ -18,18 +19,18 @@ class ByBlockchainCollection (Generic[_T]): self.by_blockchain = by_blockchain if by_blockchain is not None else {} def __getitem__(self, item) -> _T: - return self.by_blockchain[ctx.chain_id][item] + return self.by_blockchain[current_chain.get().chain_id][item] class ByBlockchainDict (ByBlockchainCollection[_T], Generic[_T]): def __getattr__(self, name: str) -> _T: - return self.by_blockchain[ctx.chain_id][name] + return self.by_blockchain[current_chain.get().chain_id][name] def get(self, item, default=None, *, chain_id=None) -> _T: # will raise if default is NARG if chain_id is None: - chain_id = ctx.chain_id + chain_id = current_chain.get().chain_id if chain_id is None: raise KeyError('no ctx.chain_id set') found = self.by_blockchain.get(chain_id, {}).get(item, default) @@ -40,16 +41,16 @@ class ByBlockchainDict (ByBlockchainCollection[_T], Generic[_T]): class ByBlockchainList (ByBlockchainCollection[_T], Generic[_T]): def __iter__(self) -> Iterator[_T]: - return iter(self.by_blockchain[ctx.chain_id]) + return iter(self.by_blockchain[current_chain.get().chain_id]) def iter(self, *, chain_id=None) -> Iterator[_T]: if chain_id is None: - chain_id = ctx.chain_id + chain_id = current_chain.get().chain_id return iter(self.by_blockchain[chain_id]) def get(self, index, *, chain_id=None) -> _T: if chain_id is None: - chain_id = ctx.chain_id + chain_id = current_chain.get().chain_id if chain_id is None: raise KeyError('no ctx.chain_id set') return self.by_blockchain[chain_id][index] diff --git a/src/dexorder/blockchain/connection.py b/src/dexorder/blockchain/connection.py index 0c58511..2f84f58 100644 --- a/src/dexorder/blockchain/connection.py +++ b/src/dexorder/blockchain/connection.py @@ -1,7 +1,7 @@ from hexbytes import HexBytes from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider -from dexorder.blockchain.util import get_contract_data +from dexorder.util.uniswap_util import get_contract_data from .. import current_w3 from ..configuration import resolve_rpc_url from ..configuration.resolve import resolve_ws_url diff --git a/src/dexorder/blockchain/uniswap.py b/src/dexorder/blockchain/uniswap.py index 5996522..55f266e 100644 --- a/src/dexorder/blockchain/uniswap.py +++ b/src/dexorder/blockchain/uniswap.py @@ -1,12 +1,14 @@ +from charset_normalizer.md import getLogger from eth_abi.packed import encode_packed from eth_utils import keccak, to_bytes, to_checksum_address from dexorder import dec -from dexorder.contract import uniswapV3 +from dexorder.contract import abi_encoder from dexorder.util import hexbytes UNISWAPV3_POOL_INIT_CODE_HASH = hexbytes('0xe34f199b19b2b4f47f68442619d555527d244f78a3297ea89325f843f87b8b54') +log = getLogger(__name__) class Fee: LOWEST = 100 @@ -18,19 +20,19 @@ class Fee: def ordered_addresses(addr_a:str, addr_b:str): return (addr_a, addr_b) if addr_a.lower() <= addr_b.lower() else (addr_b, addr_a) -def uniswapV3_pool_address( addr_a: str, addr_b: str, fee: int): - return uniswap_pool_address(uniswapV3['factory'], addr_a, addr_b, fee) def uniswap_pool_address(factory_addr: str, addr_a: str, addr_b: str, fee: int) -> str: token0, token1 = ordered_addresses(addr_a, addr_b) - salt = keccak(encode_packed(['address','address','uint24'],[token0, token1, fee])) + salt = keccak(abi_encoder.encode(['address','address','uint24'],[token0, token1, fee])) contract_address = keccak( b"\xff" + to_bytes(hexstr=factory_addr) + salt + UNISWAPV3_POOL_INIT_CODE_HASH ).hex()[-40:] - return to_checksum_address(contract_address) + result = to_checksum_address(contract_address) + log.debug(f'uniswap pool address {factory_addr} {addr_a} {addr_b} {fee} => {result}') + return result def uniswap_price(sqrt_price): d = dec(sqrt_price) diff --git a/src/dexorder/blockstate/blockdata.py b/src/dexorder/blockstate/blockdata.py index 85dcceb..dd20741 100644 --- a/src/dexorder/blockstate/blockdata.py +++ b/src/dexorder/blockstate/blockdata.py @@ -28,7 +28,7 @@ class BlockData: def __init__(self, data_type: DataType, series: Any, *, series2str=None, series2key=None, # defaults to key2str and str2key key2str=util_key2str, str2key=util_str2key, - value2basic=lambda x:x, basic2value=lambda x:x, # serialize/deserialize value to something JSON-able + value2str=lambda x:x, str2value=lambda x:x, # serialize/deserialize value to something JSON-able **opts): assert series not in BlockData.registry BlockData.registry[series] = self @@ -39,8 +39,8 @@ class BlockData: self.str2key = str2key self.series2str = series2str or self.key2str self.series2key = series2key or self.str2key - self.value2basic = value2basic - self.basic2value = basic2value + self.value2str = value2str + self.str2value = str2value self.lazy_getitem = None def setitem(self, item, value, overwrite=True): diff --git a/src/dexorder/blockstate/db_state.py b/src/dexorder/blockstate/db_state.py index dab2bb9..fbc77b5 100644 --- a/src/dexorder/blockstate/db_state.py +++ b/src/dexorder/blockstate/db_state.py @@ -63,7 +63,7 @@ class DbState(SeriesCollection): elif t == DataType.DICT: found = db.session.get(SeriesDict, key) if found is None: - db.session.add(SeriesDict(**key, value=d.value2basic(diff.value))) + db.session.add(SeriesDict(**key, value=d.value2str(diff.value))) else: found.value = diff.value else: @@ -97,6 +97,6 @@ class DbState(SeriesCollection): # noinspection PyTypeChecker var: BlockDict = BlockData.registry[series] for row in db.session.query(SeriesDict).where(SeriesDict.series == data.series2str(series)): - var[data.str2key(row.key)] = data.basic2value(row.value) + var[data.str2key(row.key)] = data.str2value(row.value) completed_block.set(root_block) return state diff --git a/src/dexorder/contract/__init__.py b/src/dexorder/contract/__init__.py index c45ecca..ccaacc2 100644 --- a/src/dexorder/contract/__init__.py +++ b/src/dexorder/contract/__init__.py @@ -1,13 +1,14 @@ -from .abi import abis -from .contract_proxy import ContractProxy, Transaction -from .pool_contract import UniswapV3Pool -from .uniswap_contracts import uniswapV3 - from eth_abi.codec import ABIDecoder, ABIEncoder from eth_abi.registry import registry as default_registry abi_decoder = ABIDecoder(default_registry) abi_encoder = ABIEncoder(default_registry) +from .abi import abis +from .contract_proxy import ContractProxy, Transaction +from .pool_contract import UniswapV3Pool +from .uniswap_contracts import uniswapV3 + + def VaultContract(addr): return ContractProxy(addr, 'Vault') diff --git a/src/dexorder/contract/pool_contract.py b/src/dexorder/contract/pool_contract.py index 32c1394..eff7f0a 100644 --- a/src/dexorder/contract/pool_contract.py +++ b/src/dexorder/contract/pool_contract.py @@ -1,6 +1,10 @@ from .contract_proxy import ContractProxy +from ..blockchain.uniswap import uniswap_price class UniswapV3Pool (ContractProxy): def __init__(self, address: str = None): super().__init__(address, 'IUniswapV3Pool') + + async def price(self): + return uniswap_price((await self.slot0())[0]) diff --git a/src/dexorder/contract/uniswap_contracts.py b/src/dexorder/contract/uniswap_contracts.py index c5a0705..3d9a163 100644 --- a/src/dexorder/contract/uniswap_contracts.py +++ b/src/dexorder/contract/uniswap_contracts.py @@ -1,4 +1,5 @@ from dexorder.base.chain import Ethereum, Goerli, Polygon, Mumbai, Arbitrum, Mock +from dexorder.blockchain.uniswap import uniswap_pool_address from dexorder.contract.contract_proxy import ContractProxy from dexorder.blockchain import ByBlockchainDict @@ -17,3 +18,6 @@ class _UniswapContracts (ByBlockchainDict[ContractProxy]): uniswapV3 = _UniswapContracts() + +def uniswapV3_pool_address( addr_a: str, addr_b: str, fee: int): + return uniswap_pool_address(uniswapV3['factory'].address, addr_a, addr_b, fee) diff --git a/src/dexorder/data/__init__.py b/src/dexorder/data/__init__.py index 902307c..6569b88 100644 --- a/src/dexorder/data/__init__.py +++ b/src/dexorder/data/__init__.py @@ -1,11 +1,10 @@ +from dexorder import dec from dexorder.blockstate import BlockSet, BlockDict -# pub=... publishes to a channel for web clients to consume. argument is (key,value) and return must be (event,room,value) -# if pub is True, then event is the current series name, room is the key, and value is passed through +# pub=... publishes to a channel for web clients to consume. argument is (key,value) and return must be (event,room,args) +# if pub is True, then event is the current series name, room is the key, and args is [value] # values of DELETE are serialized as nulls -vault_owners = BlockDict('v', db=True, redis=True) -vault_tokens = BlockDict('vt', db=True, redis=True, pub=True) -pool_prices = BlockDict('p', db=True, redis=True, pub=True) -underfunded_vaults = BlockSet('uv', db=True) -active_orders = BlockSet('a', db=True) +vault_owners: BlockDict[str,str] = BlockDict('v', db=True, redis=True) +vault_tokens: BlockDict[str,str] = BlockDict('vt', db=True, redis=True, pub=True) +pool_prices: BlockDict[str,dec] = BlockDict('p', db=True, redis=True, pub=True, value2str=lambda d:f'{d:f}', str2value=dec) diff --git a/src/dexorder/database/model/block.py b/src/dexorder/database/model/block.py index e8793b0..caecca3 100644 --- a/src/dexorder/database/model/block.py +++ b/src/dexorder/database/model/block.py @@ -4,6 +4,7 @@ from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import Mapped, mapped_column from dexorder.database.model import Base +from dexorder.util import hexint class Block(Base): @@ -13,6 +14,11 @@ class Block(Base): parent: Mapped[bytes] data: Mapped[dict] = mapped_column(JSONB) + @property + def timestamp(self) -> int: + # noinspection PyTypeChecker + return hexint(self.data['timestamp']) + def __str__(self): return f'{self.height}_{self.hash.hex()[:5]}' diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index cc282b2..aae6527 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -5,15 +5,26 @@ from web3.types import EventData from dexorder import current_pub, current_w3 from dexorder.base.chain import current_chain from dexorder.blockchain.uniswap import uniswap_price -from dexorder.blockchain.util import vault_address, get_contract_event, get_factory, get_contract_data -from dexorder.contract import VaultContract -from dexorder.data import pool_prices, vault_owners, vault_tokens, underfunded_vaults +from dexorder.util.uniswap_util import vault_address, get_contract_event, get_factory, get_contract_data +from dexorder.contract import VaultContract, UniswapV3Pool +from dexorder.data import pool_prices, vault_owners, vault_tokens from dexorder.database.model.block import current_block -from dexorder.orderlib.orderlib import SwapOrderStatus + +from dexorder.order.orderlib import SwapOrderState, SwapOrderStatus +from dexorder.order.orderstate import Order +from dexorder.order.triggers import OrderTriggers, close_order_and_disable_triggers, price_triggers, time_triggers log = logging.getLogger(__name__) +async def ensure_pool_price(pool_addr): + if pool_addr not in pool_prices: + log.debug(f'querying price for pool {pool_addr}') + pool_prices[pool_addr] = await UniswapV3Pool(pool_addr).price() + +def dump_log(eventlog): + log.debug(f'eventlog {eventlog}') + def setup_logevent_triggers(runner): runner.events.clear() @@ -21,17 +32,28 @@ def setup_logevent_triggers(runner): # code ordering here is also the trigger order: e.g. we process all vault creation events # before any order creations - vault_created = current_w3.get().eth.contract(get_factory().address, abi=get_contract_data('Factory')['abi']).events.VaultCreated() + # DEBUG + runner.add_event_trigger(dump_log, None, {}) + + factory = get_factory() + if factory is None: + log.warning(f'No Factory for {current_chain.get()}') + vault_created = get_contract_event('Factory', 'VaultCreated') + else: + vault_created = current_w3.get().eth.contract(factory.address, abi=get_contract_data('Factory')['abi']).events.VaultCreated() runner.add_event_trigger(handle_vault_created, vault_created) - runner.add_event_trigger(handle_transfer, get_contract_event('ERC20', 'Transfer')) - runner.add_event_trigger(handle_swap, get_contract_event('IUniswapV3PoolEvents', 'Swap')) runner.add_event_trigger(handle_order_placed, get_contract_event('OrderLib', 'DexorderSwapPlaced')) + runner.add_event_trigger(activate_time_triggers) + runner.add_event_trigger(handle_transfer, get_contract_event('ERC20', 'Transfer')) + runner.add_event_trigger(handle_uniswap_swap, get_contract_event('IUniswapV3PoolEvents', 'Swap')) runner.add_event_trigger(handle_swap_filled, get_contract_event('OrderLib', 'DexorderSwapFilled')) runner.add_event_trigger(handle_order_completed, get_contract_event('OrderLib', 'DexorderSwapCompleted')) runner.add_event_trigger(handle_order_error, get_contract_event('OrderLib', 'DexorderSwapError')) + runner.add_event_trigger(activate_price_triggers) async def handle_order_placed(event: EventData): + log.debug(f'handle order placed {event}') # event DexorderPlaced (uint64 startOrderIndex, uint8 numOrders); addr = event['address'] start_index = int(event['args']['startOrderIndex']) @@ -46,10 +68,13 @@ async def handle_order_placed(event: EventData): obj = await vault.swapOrderStatus(index) log.debug(f'raw order status {obj}') order_status = SwapOrderStatus.load(obj) - log.debug(f'order status {order_status}') - assert order_status == SwapOrderStatus.load(order_status.dump()) - log.debug('assert ok') - # todo record order + order = Order.create(vault.address, index, order_status) + await ensure_pool_price(order.pool_address) + triggers = OrderTriggers(order) + log.debug(f'created order {order_status}') + if triggers.closed: + log.warning(f'order {order.key} was immediately closed') + close_order_and_disable_triggers(order.key, SwapOrderState.Filled if not order.remaining else SwapOrderState.Expired) def handle_swap_filled(event: EventData): # event DexorderSwapFilled (uint64 orderIndex, uint8 trancheIndex, uint256 amountIn, uint256 amountOut); @@ -63,8 +88,6 @@ def handle_order_error(event: EventData): # event DexorderError (uint64 orderIndex, string reason); log.debug(f'DexorderError {event}') - - def handle_transfer(transfer: EventData): to_address = transfer['args']['to'] log.debug(f'transfer {to_address}') @@ -76,7 +99,9 @@ def handle_transfer(transfer: EventData): pass -def handle_swap(swap: EventData): +new_pool_prices: dict[str, int] = {} + +def handle_uniswap_swap(swap: EventData): try: sqrt_price = swap['args']['sqrtPriceX96'] except KeyError: @@ -84,10 +109,11 @@ def handle_swap(swap: EventData): addr = swap['address'] price = uniswap_price(sqrt_price) log.debug(f'pool {addr} {price}') - pool_prices[addr] = price + new_pool_prices[addr] = price def handle_vault_created(created: EventData): + log.debug(f'VaultCreated {created}') try: owner = created['args']['owner'] num = created['args']['num'] @@ -108,6 +134,19 @@ def handle_vault_created(created: EventData): # log.debug(f'updated vaults: {vaults}') current_pub.get()(f'{current_chain.get().chain_id}|{owner}', 'vaults', vaults) -def handle_order_created(event:EventData): - print('order', event) + +def activate_time_triggers(): + now = current_block.get().timestamp + # time triggers + for tt in time_triggers: + tt(now) + +def activate_price_triggers(): + for pool, price in new_pool_prices.items(): + for pt in price_triggers[pool]: + pt(price) + +def execute_requests(): + log.info('execute requests: todo') + pass # todo diff --git a/src/dexorder/memcache/memcache_state.py b/src/dexorder/memcache/memcache_state.py index debaa40..0c3bcd2 100644 --- a/src/dexorder/memcache/memcache_state.py +++ b/src/dexorder/memcache/memcache_state.py @@ -58,17 +58,17 @@ class RedisState (SeriesCollection): t = d.type series = f'{chain_id}|{d.series2str(diff.series)}' key = d.key2str(diff.key) - value = diff.value + value = d.value2str(diff.value) # pub/sub socketio/redis - pub_kv = d.opts.get('pub') - if pub_kv is True: - pub_kv = key, value - elif callable(pub_kv): - pub_kv = pub_kv((key,value)) - if pub_kv is not None: - k, v = pub_kv + pub_era = d.opts.get('pub') # event, room, args + if pub_era is True: + pub_era = series, key, [value] + elif callable(pub_era): + pub_era = pub_era(diff.key, diff.value) + if pub_era is not None: + e, r, a = pub_era # noinspection PyTypeChecker - pubs.append((series,k,[v])) + pubs.append((e,r,a)) if diff.value is DELETE: if t == DataType.SET: sdels[series].add(key) diff --git a/src/dexorder/order/orderlib.py b/src/dexorder/order/orderlib.py index d30886f..a3054cb 100644 --- a/src/dexorder/order/orderlib.py +++ b/src/dexorder/order/orderlib.py @@ -5,7 +5,8 @@ from enum import Enum from typing import Optional from dexorder import dec -from dexorder.blockchain.uniswap import uniswap_pool_address, uniswap_price, uniswapV3_pool_address +from dexorder.blockchain.uniswap import uniswap_pool_address, uniswap_price +from dexorder.contract.uniswap_contracts import uniswapV3_pool_address from dexorder.contract import abi_decoder, abi_encoder, uniswapV3 log = logging.getLogger(__name__) @@ -131,7 +132,7 @@ class Constraint (ABC): class PriceConstraint (Constraint, ABC): @abstractmethod - def passes(self, old_price: dec, new_price: dec) -> bool:... + def passes(self, price: dec) -> bool:... @dataclass @@ -153,8 +154,8 @@ class LimitConstraint (PriceConstraint): def dump(self): return self._dump(LimitConstraint.TYPES, (self.isAbove, self.isRatio, self.valueSqrtX96)) - def passes(self, old_price: dec, new_price: dec) -> bool: - return self.isAbove and new_price >= self.limit or not self.isAbove and new_price <= self.limit + def passes(self, price: dec) -> bool: + return self.isAbove and price >= self.limit or not self.isAbove and price <= self.limit @dataclass @@ -176,6 +177,9 @@ class Time: mode: TimeMode time: int + def timestamp(self, order_start: int): + return self.time if self.mode is TimeMode.Timestamp else order_start + self.time + DISTANT_PAST = 0 DISTANT_FUTURE = 4294967295 # max uint32 @@ -194,8 +198,7 @@ class TimeConstraint (Constraint): return TimeConstraint(ConstraintMode.Time, Time(TimeMode(earliest_mode),earliest_time), Time(TimeMode(latest_mode),latest_time)) def dump(self): - return Constraint._dump(ConstraintMode.Time, TimeConstraint.TYPES, - (self.earliest.mode.value, self.earliest.time, self.latest.mode.value, self.latest.time)) + return self._dump(TimeConstraint.TYPES, (self.earliest.mode.value, self.earliest.time, self.latest.mode.value, self.latest.time)) @dataclass @@ -203,6 +206,9 @@ class Tranche: fraction: int # 18-decimal fraction of the order amount which is available to this tranche. must be <= 1 constraints: list[Constraint] + def fraction_of(self, amount): + return amount * self.fraction // 65535 + @staticmethod def load(obj): return Tranche(obj[0], [Constraint.load(c) for c in obj[1]]) diff --git a/src/dexorder/order/orderstate.py b/src/dexorder/order/orderstate.py index 950b78a..a68ebf2 100644 --- a/src/dexorder/order/orderstate.py +++ b/src/dexorder/order/orderstate.py @@ -7,7 +7,7 @@ from dexorder.order.orderlib import SwapOrderStatus, SwapOrderState log = logging.getLogger(__name__) -@dataclass +@dataclass(frozen=True, eq=True) class OrderKey: vault: str order_index: int @@ -20,7 +20,7 @@ class OrderKey: def __str__(self): return f'{self.vault}|{self.order_index}' -@dataclass +@dataclass(frozen=True, eq=True) class TrancheKey (OrderKey): tranche_index: int @@ -39,11 +39,11 @@ class Filled: filled_out: int @staticmethod - def basic2remaining(basic): - return Filled(*basic) + def str2remaining(basic): + return Filled(*map(int,basic.split(','))) if basic else Filled(0,0) - def remaining2basic(self): - return self.filled_in, self.filled_out + def remaining2str(self): + return f'{self.filled_in},{self.filled_out}' # todo oco groups @@ -96,19 +96,16 @@ class Order: self.status: SwapOrderStatus = Order._statuses[key].copy() self.pool_address: str = self.status.order.pool_address self.tranche_keys = [TrancheKey(key.vault, key.order_index, i) for i in range(len(self.status.trancheFilledIn))] + # various flattenings + self.order = self.status.order + self.amount = self.status.order.amount + self.amount_is_input = self.status.order.amountIsInput + self.tranche_amounts = [t.fraction_of(self.amount) for t in self.order.tranches] @property def state(self): return self.status.state - @property - def order(self): - return self.status.order - - @property - def amount(self): - return self.order.amount - @property def remaining(self): return self.amount - self.filled @@ -134,10 +131,6 @@ class Order: def filled(self): return self.filled_in if self.amount_is_input else self.filled_out - @property - def amount_is_input(self): - return self.order.amountIsInput - @property def is_open(self): @@ -156,6 +149,7 @@ class Order: fin = old.filled_in + filled_in fout = old.filled_out + filled_out Order._tranche_filled[tk] = Filled(fin, fout) + # todo check for completion def complete(self, final_state: SwapOrderState): """ updates the static order record with its final values, then deletes all its dynamic blockstate and removes the Order from the actives list """ @@ -163,16 +157,23 @@ class Order: status = self.status status.state = final_state if self.is_open: - del Order._open_keys[self.key] + Order._open_keys.remove(self.key) + # set final fill values in the status filled = Order._order_filled[self.key] - del Order._order_filled[self.key] + try: + del Order._order_filled[self.key] + except KeyError: + pass status.filledIn = filled.filled_in status.filledOut = filled.filled_out for i, tk in enumerate(self.tranche_keys): - filled = Order._tranche_filled[tk] - del Order._tranche_filled[tk] - status.trancheFilledIn[i] = filled.filled_in - status.trancheFilledOut[i] = filled.filled_out + try: + filled = Order._tranche_filled[tk] + del Order._tranche_filled[tk] + status.trancheFilledIn[i] = filled.filled_in + status.trancheFilledOut[i] = filled.filled_out + except KeyError: + pass final_status = status.copy() Order._statuses[self.key] = final_status # set the status in order to save it Order._statuses.unload(self.key) # but then unload from memory after root promotion @@ -187,13 +188,16 @@ class Order: # open orders = the set of unfilled, not-canceled orders _open_keys: BlockSet[OrderKey] = BlockSet('oo', db=True, redis=True, str2key=OrderKey.str2key) + # underfunded vaults + _underfunded: BlockSet[str] = BlockSet('uv', db=True, redis=True) + # total remaining amount per order, for all unfilled, not-canceled orders _order_filled: BlockDict[OrderKey, Filled] = BlockDict( - 'of', db=True, redis=True, str2key=OrderKey.str2key, value2basic=Filled.remaining2basic, basic2value=Filled.basic2remaining) + 'of', db=True, redis=True, str2key=OrderKey.str2key, value2str=Filled.remaining2str, str2value=Filled.str2remaining) # total remaining amount per tranche _tranche_filled: BlockDict[TrancheKey, Filled] = BlockDict( - 'tf', db=True, redis=True, str2key=TrancheKey.str2key, value2basic=Filled.remaining2basic, basic2value=Filled.basic2remaining) + 'tf', db=True, redis=True, str2key=TrancheKey.str2key, value2str=Filled.remaining2str, str2value=Filled.str2remaining) active_orders: dict[OrderKey,Order] = {} diff --git a/src/dexorder/order/triggers.py b/src/dexorder/order/triggers.py index 85fa2bd..29830f0 100644 --- a/src/dexorder/order/triggers.py +++ b/src/dexorder/order/triggers.py @@ -3,18 +3,18 @@ from enum import Enum from typing import Callable from dexorder.blockstate import BlockSet, BlockDict -from .orderlib import TimeConstraint, LimitConstraint, ConstraintMode +from .orderlib import TimeConstraint, LimitConstraint, ConstraintMode, SwapOrderState from dexorder.util import defaultdictk -from .orderstate import TrancheKey, Order +from .orderstate import TrancheKey, Order, OrderKey from ..database.model.block import current_block log = logging.getLogger(__name__) -# todo time and price triggers should be BlockSortedSets that support range queries -TimeTrigger = Callable[[int, int], None] # func(previous_timestamp, current_timestamp) +# todo time and price triggers should be BlockSortedSets that support range queries, for efficient lookup of triggers +TimeTrigger = Callable[[int], None] # func(timestamp) time_triggers:BlockSet[TimeTrigger] = BlockSet('tt') -PriceTrigger = Callable[[int, int], None] # pool previous price, pool new price +PriceTrigger = Callable[[int], None] # func(pool_price) price_triggers:dict[str, BlockSet[PriceTrigger]] = defaultdictk(lambda addr:BlockSet(f'pt|{addr}')) # different BlockSet per pool address execution_requests:BlockDict[TrancheKey,int] = BlockDict('te') # value is block height of the request @@ -37,9 +37,10 @@ class TrancheTrigger: self.order = order self.tk = tranche_key self.status = TrancheStatus.Early + start = self.order.status.start tranche = order.order.tranches[self.tk.tranche_index] - tranche_amount = order.amount * tranche.fraction // 10**18 + tranche_amount = tranche.fraction_of(order.amount) tranche_filled = order.tranche_filled(self.tk) tranche_remaining = tranche_amount - tranche_filled @@ -47,12 +48,14 @@ class TrancheTrigger: self.status = TrancheStatus.Filled return - self.time_constraint = time_constraint = None + self.time_constraint = time_constraint = None # stored as a tuple of two ints for earliest and latest absolute timestamps self.price_constraints = [] for c in tranche.constraints: if c.mode == ConstraintMode.Time: c: TimeConstraint - time_constraint = (c.earliest, c.latest) if time_constraint is None else intersect_ranges(*time_constraint, c.earliest, c.latest) + earliest = c.earliest.timestamp(start) + latest = c.latest.timestamp(start) + time_constraint = (earliest, latest) if time_constraint is None else intersect_ranges(*time_constraint, earliest, latest) elif c.mode == ConstraintMode.Limit: c: LimitConstraint raise NotImplementedError @@ -70,13 +73,15 @@ class TrancheTrigger: def enable_time_trigger(self): if self.time_constraint: + log.debug(f'enable_time_trigger') time_triggers.add(self.time_trigger) def disable_time_trigger(self): if self.time_constraint: time_triggers.remove(self.time_trigger) - def time_trigger(self, _prev, now): + def time_trigger(self, now): + log.debug(f'time_trigger {now}') if now >= self.time_constraint[1]: self.disable() self.status = TrancheStatus.Expired @@ -90,8 +95,8 @@ class TrancheTrigger: def disable_price_trigger(self): price_triggers[self.order.pool_address].remove(self.price_trigger) - def price_trigger(self, prev, cur): - if all(pc.passes(prev,cur) for pc in self.price_constraints): + def price_trigger(self, cur): + if all(pc.passes(cur) for pc in self.price_constraints): self.execute() def execute(self): @@ -102,13 +107,44 @@ class TrancheTrigger: self.disable_time_trigger() self.disable_price_trigger() + @property + def closed(self): + return self.status in (TrancheStatus.Filled, TrancheStatus.Expired) + + @property + def open(self): + return not self.closed + class OrderTriggers: + instances: dict[OrderKey, 'OrderTriggers'] = {} + def __init__(self, order: Order): + assert order.key not in OrderTriggers.instances self.order = order self.triggers = [TrancheTrigger(order, tk) for tk in self.order.tranche_keys] + OrderTriggers.instances[order.key] = self def disable(self): for t in self.triggers: t.disable() + del OrderTriggers.instances[self.order.key] + + @property + def closed(self): + return all(t.closed for t in self.triggers) + + @property + def open(self): + return not self.closed + + +def close_order_and_disable_triggers(order_key: OrderKey, final_state: SwapOrderState): + Order.instances[order_key].complete(final_state) + try: + triggers = OrderTriggers.instances[order_key] + except KeyError: + pass + else: + triggers.disable() diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index ca08f7b..5fce22f 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -1,5 +1,5 @@ import logging -from typing import Callable, Union, Any, Iterable +from typing import Callable, Union, Any, Iterable, Optional from web3.contract.contract import ContractEvents from web3.exceptions import LogTopicError, MismatchedABI @@ -13,6 +13,7 @@ from dexorder.blockstate.diff import DiffEntryItem from dexorder.database.model import Block from dexorder.database.model.block import current_block, latest_block from dexorder.event_handler import setup_logevent_triggers +from dexorder.order.triggers import time_triggers from dexorder.util import hexstr, topic from dexorder.util.async_util import maywait @@ -103,23 +104,30 @@ class BlockStateRunner: if fork.disjoint: # backfill batches for callback, event, log_filter in self.events: - from_height = state.root_block.height + 1 - end_height = block.height - while from_height <= end_height: - to_height = min(end_height, from_height + chain.batch_size - 1) - lf = dict(log_filter) - lf['fromBlock'] = from_height - lf['toBlock'] = to_height - log.debug(f'batch backfill {from_height} - {to_height}') - batches.append((w3.eth.get_logs(lf), callback, event, lf)) - from_height += chain.batch_size + if event is None: + batches.append(None) + else: + from_height = state.root_block.height + 1 + end_height = block.height + while from_height <= end_height: + to_height = min(end_height, from_height + chain.batch_size - 1) + lf = dict(log_filter) + lf['fromBlock'] = from_height + lf['toBlock'] = to_height + log.debug(f'batch backfill {from_height} - {to_height}') + batches.append((w3.eth.get_logs(lf), callback, event, lf)) + from_height += chain.batch_size else: # event callbacks are triggered in the order in which they're registered. the events passed to # each callback are in block transaction order for callback, event, log_filter in self.events: - lf = dict(log_filter) - lf['blockHash'] = hexstr(block.hash) - batches.append((w3.eth.get_logs(lf), callback, event, log_filter)) + if log_filter is None: + batches.append((None, callback, event, None)) + else: + lf = dict(log_filter) + lf['blockHash'] = hexstr(block.hash) + print(lf) + batches.append((w3.eth.get_logs(lf), callback, event, log_filter)) # set up for callbacks current_block.set(block) @@ -129,17 +137,20 @@ class BlockStateRunner: session.add(block) pubs = [] current_pub.set(lambda room, evnt, *args: pubs.append((room, evnt, args))) - # callbacks + # logevent callbacks for future,callback,event,filter_args in batches: - log_events = await future - for log_event in log_events: - try: - parsed = event.process_log(log_event) - except (LogTopicError, MismatchedABI): - pass - else: - # todo try/except for known retryable errors - await maywait(callback(parsed)) + if future is None: + await maywait(callback()) # non-log callback + else: + log_events = await future + for log_event in log_events: + try: + parsed = event.process_log(log_event) if event is not None else log_event + except (LogTopicError, MismatchedABI): + pass + else: + # todo try/except for known retryable errors + await maywait(callback(parsed)) # todo check for reorg and generate a reorg diff list diff_items = state.diffs_by_hash[block.hash] @@ -167,9 +178,10 @@ class BlockStateRunner: session.commit() log.info(f'completed block {block}') - - - def add_event_trigger(self, callback:Callable[[dict],None], event: ContractEvents, log_filter: Union[dict,str]=None): - if log_filter is None: - log_filter = {'topics':[topic(event.abi)]} + def add_event_trigger(self, callback: Callable[[dict], None], event: ContractEvents = None, log_filter: Union[dict, str] = None): + """ + if event is None, the callback is still invoked in the series of log handlers but with no argument instead of logs + """ + if log_filter is None and event is not None: + log_filter = {'topics': [topic(event.abi)]} self.events.append((callback, event, log_filter)) diff --git a/src/dexorder/util/__init__.py b/src/dexorder/util/__init__.py index b36ca2a..098023b 100644 --- a/src/dexorder/util/__init__.py +++ b/src/dexorder/util/__init__.py @@ -35,6 +35,10 @@ def hexbytes(value: str): return bytes.fromhex(value[2:] if value.startswith('0x') else value) +def hexint(value: str): + return int(value[2:] if value.startswith('0x') else value, 16) + + def _keystr1(value): t = type(value) return value if t is str else value.hex() if t is HexBytes else '0x' + value.hex() if t is bytes else str(value) diff --git a/src/dexorder/blockchain/util.py b/src/dexorder/util/uniswap_util.py similarity index 93% rename from src/dexorder/blockchain/util.py rename to src/dexorder/util/uniswap_util.py index bb39f1f..86aa5d6 100644 --- a/src/dexorder/blockchain/util.py +++ b/src/dexorder/util/uniswap_util.py @@ -7,6 +7,7 @@ from eth_utils import keccak, to_bytes, to_checksum_address from dexorder import config, current_w3 from dexorder.base.chain import current_chain from dexorder.contract import ContractProxy +from dexorder.util import hexstr log = logging.getLogger(__name__) @@ -24,6 +25,7 @@ def get_factory() -> ContractProxy: if tx['contractName'] == 'Factory': addr = tx['contractAddress'] found = factory[chain_id] = ContractProxy(addr, 'Factory') + log.info(f'Factory {addr}') break except FileNotFoundError: log.warning(f'Could not find deployment for chain {chain_id} "{deployment_tag}"') @@ -46,6 +48,7 @@ def vault_address(owner, num): with open('../contract/out/Vault.sol/Vault.json', 'rt') as _file: vault_info = json.load(_file) VAULT_INIT_CODE_HASH = keccak(to_bytes(hexstr=vault_info['bytecode']['object'])) + log.info(f'VAULT_INIT_CODE_HASH {hexstr(VAULT_INIT_CODE_HASH)}') salt = keccak(encode_packed(['address','uint8'],[owner,num])) contract_address = keccak( b"\xff"