From ee73a38f46b94d95c0529e3ff1d5206231477f09 Mon Sep 17 00:00:00 2001 From: Tim Olson <> Date: Fri, 20 Oct 2023 21:57:35 -0400 Subject: [PATCH] db persistence fixes --- src/dexorder/blockchain/transaction.py | 9 +++- src/dexorder/event_handler.py | 65 ++++++++++++++------------ src/dexorder/order/orderlib.py | 8 ++-- src/dexorder/order/orderstate.py | 3 +- 4 files changed, 48 insertions(+), 37 deletions(-) diff --git a/src/dexorder/blockchain/transaction.py b/src/dexorder/blockchain/transaction.py index ef67ca8..7a8f57b 100644 --- a/src/dexorder/blockchain/transaction.py +++ b/src/dexorder/blockchain/transaction.py @@ -2,6 +2,8 @@ import logging from abc import abstractmethod from uuid import uuid4 +from web3.exceptions import TransactionNotFound + from dexorder import db, current_w3 from dexorder.base.chain import current_chain from dexorder.base.order import TransactionRequest @@ -63,8 +65,11 @@ async def check_receipt(job: TransactionJob): if not job.tx: return w3 = current_w3.get() - receipt = await w3.eth.get_transaction_receipt(job.tx.id) - if receipt is not None: + try: + receipt = await w3.eth.get_transaction_receipt(job.tx.id) + except TransactionNotFound: + pass + else: job.tx.receipt = receipt job.completed = True try: diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index 877c1ba..6004775 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -157,36 +157,6 @@ def handle_vault_created(created: EventData): current_pub.get()(f'{current_chain.get().chain_id}|{owner}', 'vaults', vaults) -def handle_dexorderexecutions(event: EventData): - log.debug(f'executions {event}') - exe_id = UUID(bytes=event['args']['id']) - errors = event['args']['errors'] - job = db.session.get(TransactionJob, exe_id) - req: TrancheExecutionRequest = job.request - tk = TrancheKey( req.vault, req.order_index, req.tranche_index ) - order = Order.of(req.vault, req.order_index) - if job is None: - log.warning(f'Job {exe_id} not found!') - return - if len(errors) == 0: - log.warning(f'No errors found in DexorderExecutions event: {event}') - return - if len(errors) > 1: - log.warning(f'Multiple executions not yet implemented') - error = errors[0] - log.debug(f'job {exe_id} had error "{error}"') - if error == '': - pass # execution success - elif error == 'IIA': - # insufficient input amount: suspend execution until new funds are sent - # todo replace with vault balance checks - token = order.order.tokenIn - # underfunded_vaults[req.vault] = underfunded_vaults.get(req.vault, []) + [token] - log.debug(f'insufficient funds {req.vault} {token} ') - else: - log.error(f'Unhandled execution error for transaction request {req} ERROR: "{error}"') - - def activate_time_triggers(): now = current_block.get().timestamp log.debug(f'activating time triggers') @@ -223,3 +193,38 @@ async def process_execution_requests(): job = submit_transaction(new_tranche_execution_request(tk, er.proof)) inflight_execution_requests[tk] = height log.info(f'executing tranche {tk} with job {job.id}') + + +def handle_dexorderexecutions(event: EventData): + log.debug(f'executions {event}') + exe_id = UUID(bytes=event['args']['id']) + errors = event['args']['errors'] + if len(errors) == 0: + log.warning(f'No errors found in DexorderExecutions event: {event}') + return + if len(errors) > 1: + log.warning(f'Multiple executions not yet implemented') + job: TransactionJob = db.session.get(TransactionJob, exe_id) + if job is None: + log.warning(f'Job {exe_id} not found!') + return + + finish_execution_request(job.request, errors[0]) + + +def finish_execution_request(req: TrancheExecutionRequest, error: str): + order = Order.of(req.vault, req.order_index) + tk = TrancheKey(req.vault, req.order_index, req.tranche_index) + del inflight_execution_requests[tk] # no longer in-flight + if error != '': + log.debug(f'execution request for tranche {tk} had error "{error}"') + if error == '': + pass # execution success + elif error in ('IIA', 'STF'): # todo not STF + # Insufficient Input Amount or Safe Transfer Failure: suspend execution until new funds are sent + # todo replace with vault balance checks + token = order.order.tokenIn + # underfunded_vaults[req.vault] = underfunded_vaults.get(req.vault, []) + [token] + log.debug(f'insufficient funds {req.vault} {token} ') + else: + log.error(f'Unhandled execution error for transaction request {req} ERROR: "{error}"') diff --git a/src/dexorder/order/orderlib.py b/src/dexorder/order/orderlib.py index a3054cb..918afe7 100644 --- a/src/dexorder/order/orderlib.py +++ b/src/dexorder/order/orderlib.py @@ -5,9 +5,9 @@ from enum import Enum from typing import Optional from dexorder import dec -from dexorder.blockchain.uniswap import uniswap_pool_address, uniswap_price +from dexorder.blockchain.uniswap import uniswap_price from dexorder.contract.uniswap_contracts import uniswapV3_pool_address -from dexorder.contract import abi_decoder, abi_encoder, uniswapV3 +from dexorder.contract import abi_decoder, abi_encoder log = logging.getLogger(__name__) @@ -51,10 +51,10 @@ class SwapOrder: @staticmethod def load(obj): - return SwapOrder(obj[0], obj[1], Route.load(obj[2]), obj[3], obj[4], obj[5], obj[6], [Tranche.load(t) for t in obj[7]]) + return SwapOrder(obj[0], obj[1], Route.load(obj[2]), int(obj[3]), obj[4], obj[5], obj[6], [Tranche.load(t) for t in obj[7]]) def dump(self): - return (self.tokenIn, self.tokenOut, self.route.dump(), self.amount, self.amountIsInput, + return (self.tokenIn, self.tokenOut, self.route.dump(), str(self.amount), self.amountIsInput, self.outputDirectlyToOwner, self.chainOrder, [t.dump() for t in self.tranches]) @property diff --git a/src/dexorder/order/orderstate.py b/src/dexorder/order/orderstate.py index 18181a5..8b6790f 100644 --- a/src/dexorder/order/orderstate.py +++ b/src/dexorder/order/orderstate.py @@ -164,7 +164,8 @@ class Order: # this series holds "everything" about an order in the canonical format specified by the contract orderlib, except # the filled amount fields for active orders are maintained in the order_remainings and tranche_remainings series. - _statuses: BlockDict[OrderKey, SwapOrderStatus] = BlockDict('o', db='lazy', str2key=OrderKey.str2key) + _statuses: BlockDict[OrderKey, SwapOrderStatus] = BlockDict( + 'o', db='lazy', str2key=OrderKey.str2key, value2str=SwapOrderStatus.dump, str2value=SwapOrderStatus.load) # total remaining amount per order, for all unfilled, not-canceled orders _order_filled: BlockDict[OrderKey, Filled] = BlockDict(