TRANCHE EXECUTION WORKS

This commit is contained in:
Tim Olson
2023-10-29 16:53:45 -04:00
parent 062085a79f
commit 1da4cf6a93
8 changed files with 47 additions and 24 deletions

View File

@@ -60,7 +60,7 @@ def upgrade() -> None:
op.create_index(op.f('ix_transactionjob_state'), 'transactionjob', ['state'], unique=False) op.create_index(op.f('ix_transactionjob_state'), 'transactionjob', ['state'], unique=False)
op.create_table('tx', op.create_table('tx',
sa.Column('id', postgresql.BYTEA(), nullable=False), sa.Column('id', postgresql.BYTEA(), nullable=False),
sa.Column('data', postgresql.JSONB(astext_type=sa.Text()), nullable=False), sa.Column('data', postgresql.BYTEA(), nullable=False),
sa.Column('job_id', sa.UUID(), nullable=False), sa.Column('job_id', sa.UUID(), nullable=False),
sa.Column('receipt', postgresql.JSONB(astext_type=sa.Text()), nullable=True), sa.Column('receipt', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.ForeignKeyConstraint(['job_id'], ['transactionjob.id'], ), sa.ForeignKeyConstraint(['job_id'], ['transactionjob.id'], ),

View File

@@ -49,6 +49,14 @@ class Account (LocalAccount):
self.name = name self.name = name
self.key_str = key_str self.key_str = key_str
self.signing_middleware = construct_sign_and_send_raw_middleware(self) self.signing_middleware = construct_sign_and_send_raw_middleware(self)
self._nonce: Optional[int] = None
async def next_nonce(self):
if self._nonce is None:
self._nonce = await current_w3.get().eth.get_transaction_count(self.address, 'pending')
else:
self._nonce += 1
return self._nonce
def attach(self, w3): def attach(self, w3):
w3.eth.default_account = self.address w3.eth.default_account = self.address

View File

@@ -1,6 +1,7 @@
import json import json
from typing import Optional from typing import Optional
import eth_account
from eth_utils import keccak from eth_utils import keccak
from web3.types import TxReceipt from web3.types import TxReceipt
@@ -19,15 +20,15 @@ def call_wrapper(func):
def transact_wrapper(func): def transact_wrapper(func):
async def f(*args, **kwargs): async def f(*args, **kwargs):
w3 = current_w3.get()
try: try:
account = current_account.get() account = current_account.get()
except LookupError: except LookupError:
raise RuntimeError('Cannot invoke a transaction without setting an Account.') raise RuntimeError('Cannot invoke a transaction without setting an Account.')
tx = await func(*args, **kwargs).build_transaction() tx = await func(*args, **kwargs).build_transaction()
tx['from'] = account.address tx['from'] = account.address
signed = w3.eth.account.sign_transaction(tx, private_key=account.key) tx['nonce'] = await account.next_nonce()
return ContractTransaction(signed) signed = eth_account.Account.sign_transaction(tx, private_key=account.key)
return ContractTransaction(signed['hash'], signed['rawTransaction'])
return f return f
@@ -88,10 +89,10 @@ class ContractProxy:
class ContractTransaction: class ContractTransaction:
def __init__(self, rawtx: bytes): def __init__(self, id_bytes: bytes, rawtx: bytes):
self.data = rawtx self.id_bytes = id_bytes
self.id_bytes = keccak(rawtx)
self.id = hexstr(self.id_bytes) self.id = hexstr(self.id_bytes)
self.data = rawtx
self.receipt: Optional[TxReceipt] = None self.receipt: Optional[TxReceipt] = None
def __repr__(self): def __repr__(self):

View File

@@ -7,6 +7,7 @@ from dexorder.util import defaultdictk, hexstr
# values of DELETE are serialized as nulls # values of DELETE are serialized as nulls
vault_owners: BlockDict[str,str] = BlockDict('v', db=True, redis=True) vault_owners: BlockDict[str,str] = BlockDict('v', db=True, redis=True)
vault_tokens: dict[str, BlockSet[str]] = defaultdictk(lambda vault: BlockSet(f'vt|{vault}', db=True, redis=True, pub=lambda k,v: ('vt', vault_owners[vault], [k]))) vault_balances: dict[str, BlockDict[str,int]] = defaultdictk(lambda vault: BlockDict(f'vb|{vault}', db=True, redis=True,
pool_prices: BlockDict[str,dec] = BlockDict('p', db=True, redis=True, pub=True, value2str=lambda d:f'{d:f}', str2value=dec) pub=lambda k,v: (vault_owners[vault], 'vb', (vault,k,v))))
underfunded_vaults: BlockDict[str, list[str]] = BlockDict('uv', db=True, redis=True, value2str=lambda v:','.join(v), str2value=lambda s: s.split(',')) pool_prices: BlockDict[str,dec] = BlockDict('p', db=True, redis=True, value2str=lambda d:f'{d:f}', str2value=dec,
pub=lambda k,v: (f'p|{k}', 'p', (k,str(v))))

View File

@@ -3,14 +3,14 @@ from uuid import UUID
from web3.types import EventData from web3.types import EventData
from dexorder import current_pub, db from dexorder import current_pub, db, dec
from dexorder.base.chain import current_chain from dexorder.base.chain import current_chain
from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request
from dexorder.transaction import handle_create_transactions, submit_transaction_request, handle_transaction_receipts from dexorder.transaction import handle_create_transactions, submit_transaction_request, handle_transaction_receipts, handle_send_transactions
from dexorder.blockchain.uniswap import uniswap_price from dexorder.blockchain.uniswap import uniswap_price
from dexorder.contract.dexorder import get_factory_contract, vault_address, VaultContract, get_dexorder_contract from dexorder.contract.dexorder import get_factory_contract, vault_address, VaultContract, get_dexorder_contract
from dexorder.contract import UniswapV3Pool, get_contract_event from dexorder.contract import UniswapV3Pool, get_contract_event
from dexorder.data import pool_prices, vault_owners, vault_tokens, underfunded_vaults from dexorder.data import pool_prices, vault_owners, vault_balances
from dexorder.database.model.block import current_block from dexorder.database.model.block import current_block
from dexorder.database.model.transaction import TransactionJob from dexorder.database.model.transaction import TransactionJob
from dexorder.order.orderlib import SwapOrderState, SwapOrderStatus from dexorder.order.orderlib import SwapOrderState, SwapOrderStatus
@@ -73,6 +73,7 @@ def setup_logevent_triggers(runner):
runner.add_event_trigger(activate_price_triggers) runner.add_event_trigger(activate_price_triggers)
runner.add_event_trigger(process_execution_requests) runner.add_event_trigger(process_execution_requests)
runner.add_event_trigger(handle_create_transactions) runner.add_event_trigger(handle_create_transactions)
runner.add_event_trigger(handle_send_transactions)
async def handle_order_placed(event: EventData): async def handle_order_placed(event: EventData):
@@ -111,18 +112,17 @@ def handle_order_error(event: EventData):
log.debug(f'DexorderError {event}') log.debug(f'DexorderError {event}')
def handle_transfer(transfer: EventData): def handle_transfer(transfer: EventData):
# todo handle native transfers incl gas for token transfers
from_address = transfer['args']['from'] from_address = transfer['args']['from']
to_address = transfer['args']['to'] to_address = transfer['args']['to']
amount = int(transfer['args']['value'])
log.debug(f'transfer {to_address}') log.debug(f'transfer {to_address}')
if to_address in vault_owners: if to_address in vault_owners and to_address != from_address:
token_address = transfer['address'] token_address = transfer['address']
vault_tokens[to_address].add(token_address) vault_balances[to_address].add(token_address, amount, 0)
if to_address in underfunded_vaults: if from_address in vault_owners and to_address != from_address:
# todo possibly funded now token_address = transfer['address']
pass vault_balances[to_address].add(token_address, -amount, 0)
if from_address in Order.open_keys:
# todo possibly underfunded now
pass
new_pool_prices: dict[str, int] = {} new_pool_prices: dict[str, int] = {}
@@ -224,9 +224,8 @@ def finish_execution_request(req: TrancheExecutionRequest, error: str):
log.debug(f'execution request for tranche {tk} was successful!') log.debug(f'execution request for tranche {tk} was successful!')
elif error in ('IIA', 'STF'): # todo not STF elif error in ('IIA', 'STF'): # todo not STF
# Insufficient Input Amount or Safe Transfer Failure: suspend execution until new funds are sent # Insufficient Input Amount or Safe Transfer Failure: suspend execution until new funds are sent
# todo replace with vault balance checks # todo vault balance checks
token = order.order.tokenIn token = order.order.tokenIn
# underfunded_vaults[req.vault] = underfunded_vaults.get(req.vault, []) + [token]
log.debug(f'insufficient funds {req.vault} {token} ') log.debug(f'insufficient funds {req.vault} {token} ')
else: else:
log.error(f'Unhandled execution error for transaction request {req} ERROR: "{error}"') log.error(f'Unhandled execution error for transaction request {req} ERROR: "{error}"')

View File

@@ -1,6 +1,9 @@
import logging import logging
from uuid import UUID from uuid import UUID
from web3.exceptions import ContractPanicError
from dexorder import db
from dexorder.base.order import TrancheExecutionRequest, TrancheKey from dexorder.base.order import TrancheExecutionRequest, TrancheKey
from dexorder.transaction import TransactionHandler from dexorder.transaction import TransactionHandler
from dexorder.contract.dexorder import get_dexorder_contract from dexorder.contract.dexorder import get_dexorder_contract
@@ -17,6 +20,9 @@ class TrancheExecutionHandler (TransactionHandler):
# noinspection PyBroadException # noinspection PyBroadException
try: try:
return await get_dexorder_contract().transact.execute(job_id.bytes, (req.vault, req.order_index, req.tranche_index, req.price_proof)) return await get_dexorder_contract().transact.execute(job_id.bytes, (req.vault, req.order_index, req.tranche_index, req.price_proof))
except ContractPanicError as px:
log.error(f'While executing job {job_id}: {px}')
await self.complete_transaction(db.session.get(TransactionJob, job_id))
except Exception: except Exception:
log.exception(f'Could not send execution request {req}') log.exception(f'Could not send execution request {req}')

View File

@@ -42,7 +42,11 @@ class Order:
@staticmethod @staticmethod
def of(a, b=None): def of(a, b=None):
return Order.instances[a if b is None else OrderKey(a, b)] key = a if b is None else OrderKey(a, b)
try:
return Order.instances[key]
except KeyError:
log.error(f'Could not find {key} among:\n{", ".join(str(k) for k in Order.instances.keys())}')
@staticmethod @staticmethod

View File

@@ -229,6 +229,7 @@ class BlockStateRunner:
if pubs and self.publish_all: if pubs and self.publish_all:
await maywait(self.publish_all(pubs)) await maywait(self.publish_all(pubs))
except: # legitimately catch EVERYTHING because we re-raise except: # legitimately catch EVERYTHING because we re-raise
log.debug('rolling back session')
if session is not None: if session is not None:
session.rollback() session.rollback()
if blockhash is not None and self.state is not None: if blockhash is not None and self.state is not None:
@@ -238,3 +239,6 @@ class BlockStateRunner:
if session is not None: if session is not None:
session.commit() session.commit()
log.info(f'completed block {block}') log.info(f'completed block {block}')
finally:
if session is not None:
session.close()