db persistence fixes
This commit is contained in:
@@ -2,6 +2,8 @@ import logging
|
|||||||
from abc import abstractmethod
|
from abc import abstractmethod
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
|
||||||
|
from web3.exceptions import TransactionNotFound
|
||||||
|
|
||||||
from dexorder import db, current_w3
|
from dexorder import db, current_w3
|
||||||
from dexorder.base.chain import current_chain
|
from dexorder.base.chain import current_chain
|
||||||
from dexorder.base.order import TransactionRequest
|
from dexorder.base.order import TransactionRequest
|
||||||
@@ -63,8 +65,11 @@ async def check_receipt(job: TransactionJob):
|
|||||||
if not job.tx:
|
if not job.tx:
|
||||||
return
|
return
|
||||||
w3 = current_w3.get()
|
w3 = current_w3.get()
|
||||||
receipt = await w3.eth.get_transaction_receipt(job.tx.id)
|
try:
|
||||||
if receipt is not None:
|
receipt = await w3.eth.get_transaction_receipt(job.tx.id)
|
||||||
|
except TransactionNotFound:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
job.tx.receipt = receipt
|
job.tx.receipt = receipt
|
||||||
job.completed = True
|
job.completed = True
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -157,36 +157,6 @@ def handle_vault_created(created: EventData):
|
|||||||
current_pub.get()(f'{current_chain.get().chain_id}|{owner}', 'vaults', vaults)
|
current_pub.get()(f'{current_chain.get().chain_id}|{owner}', 'vaults', vaults)
|
||||||
|
|
||||||
|
|
||||||
def handle_dexorderexecutions(event: EventData):
|
|
||||||
log.debug(f'executions {event}')
|
|
||||||
exe_id = UUID(bytes=event['args']['id'])
|
|
||||||
errors = event['args']['errors']
|
|
||||||
job = db.session.get(TransactionJob, exe_id)
|
|
||||||
req: TrancheExecutionRequest = job.request
|
|
||||||
tk = TrancheKey( req.vault, req.order_index, req.tranche_index )
|
|
||||||
order = Order.of(req.vault, req.order_index)
|
|
||||||
if job is None:
|
|
||||||
log.warning(f'Job {exe_id} not found!')
|
|
||||||
return
|
|
||||||
if len(errors) == 0:
|
|
||||||
log.warning(f'No errors found in DexorderExecutions event: {event}')
|
|
||||||
return
|
|
||||||
if len(errors) > 1:
|
|
||||||
log.warning(f'Multiple executions not yet implemented')
|
|
||||||
error = errors[0]
|
|
||||||
log.debug(f'job {exe_id} had error "{error}"')
|
|
||||||
if error == '':
|
|
||||||
pass # execution success
|
|
||||||
elif error == 'IIA':
|
|
||||||
# insufficient input amount: suspend execution until new funds are sent
|
|
||||||
# todo replace with vault balance checks
|
|
||||||
token = order.order.tokenIn
|
|
||||||
# underfunded_vaults[req.vault] = underfunded_vaults.get(req.vault, []) + [token]
|
|
||||||
log.debug(f'insufficient funds {req.vault} {token} ')
|
|
||||||
else:
|
|
||||||
log.error(f'Unhandled execution error for transaction request {req} ERROR: "{error}"')
|
|
||||||
|
|
||||||
|
|
||||||
def activate_time_triggers():
|
def activate_time_triggers():
|
||||||
now = current_block.get().timestamp
|
now = current_block.get().timestamp
|
||||||
log.debug(f'activating time triggers')
|
log.debug(f'activating time triggers')
|
||||||
@@ -223,3 +193,38 @@ async def process_execution_requests():
|
|||||||
job = submit_transaction(new_tranche_execution_request(tk, er.proof))
|
job = submit_transaction(new_tranche_execution_request(tk, er.proof))
|
||||||
inflight_execution_requests[tk] = height
|
inflight_execution_requests[tk] = height
|
||||||
log.info(f'executing tranche {tk} with job {job.id}')
|
log.info(f'executing tranche {tk} with job {job.id}')
|
||||||
|
|
||||||
|
|
||||||
|
def handle_dexorderexecutions(event: EventData):
|
||||||
|
log.debug(f'executions {event}')
|
||||||
|
exe_id = UUID(bytes=event['args']['id'])
|
||||||
|
errors = event['args']['errors']
|
||||||
|
if len(errors) == 0:
|
||||||
|
log.warning(f'No errors found in DexorderExecutions event: {event}')
|
||||||
|
return
|
||||||
|
if len(errors) > 1:
|
||||||
|
log.warning(f'Multiple executions not yet implemented')
|
||||||
|
job: TransactionJob = db.session.get(TransactionJob, exe_id)
|
||||||
|
if job is None:
|
||||||
|
log.warning(f'Job {exe_id} not found!')
|
||||||
|
return
|
||||||
|
|
||||||
|
finish_execution_request(job.request, errors[0])
|
||||||
|
|
||||||
|
|
||||||
|
def finish_execution_request(req: TrancheExecutionRequest, error: str):
|
||||||
|
order = Order.of(req.vault, req.order_index)
|
||||||
|
tk = TrancheKey(req.vault, req.order_index, req.tranche_index)
|
||||||
|
del inflight_execution_requests[tk] # no longer in-flight
|
||||||
|
if error != '':
|
||||||
|
log.debug(f'execution request for tranche {tk} had error "{error}"')
|
||||||
|
if error == '':
|
||||||
|
pass # execution success
|
||||||
|
elif error in ('IIA', 'STF'): # todo not STF
|
||||||
|
# Insufficient Input Amount or Safe Transfer Failure: suspend execution until new funds are sent
|
||||||
|
# todo replace with vault balance checks
|
||||||
|
token = order.order.tokenIn
|
||||||
|
# underfunded_vaults[req.vault] = underfunded_vaults.get(req.vault, []) + [token]
|
||||||
|
log.debug(f'insufficient funds {req.vault} {token} ')
|
||||||
|
else:
|
||||||
|
log.error(f'Unhandled execution error for transaction request {req} ERROR: "{error}"')
|
||||||
|
|||||||
@@ -5,9 +5,9 @@ from enum import Enum
|
|||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
from dexorder import dec
|
from dexorder import dec
|
||||||
from dexorder.blockchain.uniswap import uniswap_pool_address, uniswap_price
|
from dexorder.blockchain.uniswap import uniswap_price
|
||||||
from dexorder.contract.uniswap_contracts import uniswapV3_pool_address
|
from dexorder.contract.uniswap_contracts import uniswapV3_pool_address
|
||||||
from dexorder.contract import abi_decoder, abi_encoder, uniswapV3
|
from dexorder.contract import abi_decoder, abi_encoder
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -51,10 +51,10 @@ class SwapOrder:
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def load(obj):
|
def load(obj):
|
||||||
return SwapOrder(obj[0], obj[1], Route.load(obj[2]), obj[3], obj[4], obj[5], obj[6], [Tranche.load(t) for t in obj[7]])
|
return SwapOrder(obj[0], obj[1], Route.load(obj[2]), int(obj[3]), obj[4], obj[5], obj[6], [Tranche.load(t) for t in obj[7]])
|
||||||
|
|
||||||
def dump(self):
|
def dump(self):
|
||||||
return (self.tokenIn, self.tokenOut, self.route.dump(), self.amount, self.amountIsInput,
|
return (self.tokenIn, self.tokenOut, self.route.dump(), str(self.amount), self.amountIsInput,
|
||||||
self.outputDirectlyToOwner, self.chainOrder, [t.dump() for t in self.tranches])
|
self.outputDirectlyToOwner, self.chainOrder, [t.dump() for t in self.tranches])
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
|||||||
@@ -164,7 +164,8 @@ class Order:
|
|||||||
|
|
||||||
# this series holds "everything" about an order in the canonical format specified by the contract orderlib, except
|
# this series holds "everything" about an order in the canonical format specified by the contract orderlib, except
|
||||||
# the filled amount fields for active orders are maintained in the order_remainings and tranche_remainings series.
|
# the filled amount fields for active orders are maintained in the order_remainings and tranche_remainings series.
|
||||||
_statuses: BlockDict[OrderKey, SwapOrderStatus] = BlockDict('o', db='lazy', str2key=OrderKey.str2key)
|
_statuses: BlockDict[OrderKey, SwapOrderStatus] = BlockDict(
|
||||||
|
'o', db='lazy', str2key=OrderKey.str2key, value2str=SwapOrderStatus.dump, str2value=SwapOrderStatus.load)
|
||||||
|
|
||||||
# total remaining amount per order, for all unfilled, not-canceled orders
|
# total remaining amount per order, for all unfilled, not-canceled orders
|
||||||
_order_filled: BlockDict[OrderKey, Filled] = BlockDict(
|
_order_filled: BlockDict[OrderKey, Filled] = BlockDict(
|
||||||
|
|||||||
Reference in New Issue
Block a user