transaction job cleanup

This commit is contained in:
tim
2025-02-24 19:05:59 -04:00
parent 7d929db304
commit c9245615cb
3 changed files with 24 additions and 34 deletions

View File

@@ -2,6 +2,7 @@ import logging
from asyncio import CancelledError from asyncio import CancelledError
from dexorder import db, blockchain from dexorder import db, blockchain
from dexorder.accounting import initialize_accounting
from dexorder.alert import warningAlert from dexorder.alert import warningAlert
from dexorder.base.chain import current_chain from dexorder.base.chain import current_chain
from dexorder.bin.executable import execute from dexorder.bin.executable import execute
@@ -14,14 +15,12 @@ from dexorder.contract.dexorder import get_dexorder_contract
from dexorder.event_handler import (init, dump_log, handle_vault_created, handle_order_placed, from dexorder.event_handler import (init, dump_log, handle_vault_created, handle_order_placed,
handle_transfer, handle_swap_filled, handle_order_canceled, handle_order_cancel_all, handle_transfer, handle_swap_filled, handle_order_canceled, handle_order_cancel_all,
handle_uniswap_swaps, handle_vault_impl_changed, initialize_metrics) handle_uniswap_swaps, handle_vault_impl_changed, initialize_metrics)
from dexorder.gas_fees import adjust_gas, handle_fees_changed, handle_fee_limits_changed
from dexorder.memcache import memcache from dexorder.memcache import memcache
from dexorder.memcache.memcache_state import RedisState, publish_all from dexorder.memcache.memcache_state import RedisState, publish_all
from dexorder.order.executionhandler import handle_dexorderexecutions, execute_tranches from dexorder.order.executionhandler import handle_dexorderexecutions, execute_tranches
from dexorder.order.triggers import activate_orders, end_trigger_updates from dexorder.order.triggers import activate_orders, end_trigger_updates
from dexorder.accounting import initialize_accounting
from dexorder.runner import BlockStateRunner from dexorder.runner import BlockStateRunner
from dexorder.transactions import handle_transaction_receipts, finalize_transactions from dexorder.transactions import handle_transaction_receipts
from dexorder.vaultcreationhandler import handle_vault_creation_requests from dexorder.vaultcreationhandler import handle_vault_creation_requests
log = logging.getLogger('dexorder') log = logging.getLogger('dexorder')
@@ -127,7 +126,6 @@ async def main():
runner.on_promotion.append(db_state.finalize) runner.on_promotion.append(db_state.finalize)
if redis_state: if redis_state:
runner.on_head_update.append(redis_state.save) runner.on_head_update.append(redis_state.save)
runner.on_promotion.append(finalize_transactions)
try: try:
await runner.run() await runner.run()

View File

@@ -4,14 +4,12 @@ from abc import abstractmethod
from typing import Optional from typing import Optional
from uuid import uuid4 from uuid import uuid4
from sqlalchemy import select
from web3.exceptions import TransactionNotFound, ContractPanicError, ContractLogicError from web3.exceptions import TransactionNotFound, ContractPanicError, ContractLogicError
from dexorder import db, current_w3, Account from dexorder import db, current_w3, Account
from dexorder.base import TransactionReceiptDict, TransactionRequest from dexorder.base import TransactionReceiptDict, TransactionRequest
from dexorder.base.chain import current_chain from dexorder.base.chain import current_chain
from dexorder.blockstate.diff import DiffEntryItem from dexorder.blockstate.fork import current_fork
from dexorder.blockstate.fork import current_fork, Fork
from dexorder.contract.contract_proxy import ContractTransaction from dexorder.contract.contract_proxy import ContractTransaction
from dexorder.database.model.transaction import TransactionJob, TransactionJobState from dexorder.database.model.transaction import TransactionJob, TransactionJobState
from dexorder.util import hexstr from dexorder.util import hexstr
@@ -93,9 +91,8 @@ async def create_and_send_transactions():
# these errors can be thrown immediately when the tx is tested for gas # these errors can be thrown immediately when the tx is tested for gas
log.warning(f'failed to build transaction request for {job.request.__class__.__name__} {job.id}') log.warning(f'failed to build transaction request for {job.request.__class__.__name__} {job.id}')
job.state = TransactionJobState.Error job.state = TransactionJobState.Error
db.session.add(job)
await handler.transaction_exception(job, x) await handler.transaction_exception(job, x)
in_flight.discard((job.request.type, job.request.key)) end_job(job)
return return
except Exception as x: except Exception as x:
log.warning(f'unable to send transaction for job {job.id}', exc_info=x) log.warning(f'unable to send transaction for job {job.id}', exc_info=x)
@@ -103,8 +100,7 @@ async def create_and_send_transactions():
if ctx is None: if ctx is None:
log.info(f'Transaction request {job.request.__class__.__name__} {job.id} declined to build a tx.') log.info(f'Transaction request {job.request.__class__.__name__} {job.id} declined to build a tx.')
job.state = TransactionJobState.Declined job.state = TransactionJobState.Declined
db.session.add(job) end_job(job)
in_flight.discard((job.request.type, job.request.key))
return return
w3 = current_w3.get() w3 = current_w3.get()
account = await handler.acquire_account() account = await handler.acquire_account()
@@ -136,7 +132,6 @@ async def create_and_send_transactions():
job.tx_id = ctx.id_bytes job.tx_id = ctx.id_bytes
job.tx_data = ctx.data job.tx_data = ctx.data
assert sent == job.tx_id assert sent == job.tx_id
db.session.add(job)
async def handle_transaction_receipts(): async def handle_transaction_receipts():
@@ -151,6 +146,8 @@ async def handle_transaction_receipts():
receipt: TransactionReceiptDict = await w3.eth.get_transaction_receipt(job.tx_id) receipt: TransactionReceiptDict = await w3.eth.get_transaction_receipt(job.tx_id)
except TransactionNotFound: except TransactionNotFound:
return return
job.state = TransactionJobState.Mined
job.receipt = receipt
fork = current_fork.get() fork = current_fork.get()
assert fork is not None assert fork is not None
if fork.branch.contiguous and receipt['blockHash'] in fork.branch.path or \ if fork.branch.contiguous and receipt['blockHash'] in fork.branch.path or \
@@ -166,20 +163,10 @@ async def handle_transaction_receipts():
await handler.release_account(accounts_in_flight.pop(job.tx_id)) await handler.release_account(accounts_in_flight.pop(job.tx_id))
except KeyError: except KeyError:
pass pass
in_flight.discard((job.request.type, job.request.key)) end_job(job)
def finalize_transactions(_fork: Fork, diffs: list[DiffEntryItem]): def end_job(job):
# noinspection PyTypeChecker in_flight.discard((job.request.type, job.request.key))
open_jobs = db.session.scalars(select(TransactionJob).where( db.session.delete(job)
TransactionJob.chain == current_chain.get(),
TransactionJob.state == TransactionJobState.Sent
)).all()
open_txs = {job.tx_id:job for job in open_jobs}
for diff in diffs:
if diff.series == 'mined_txs' and diff.key in open_txs:
job = open_txs[diff.key]
job.state = TransactionJobState.Mined
job.receipt = diff.value
db.session.add(job)

View File

@@ -16,7 +16,7 @@ from dexorder.database.model import TransactionJob
from dexorder.database.model import VaultCreationRequest as DbVaultCreationRequest from dexorder.database.model import VaultCreationRequest as DbVaultCreationRequest
from dexorder.database.model.accounting import AccountingSubcategory from dexorder.database.model.accounting import AccountingSubcategory
from dexorder.transactions import TransactionHandler, submit_transaction_request from dexorder.transactions import TransactionHandler, submit_transaction_request
from dexorder.vault_blockdata import publish_vaults from dexorder.vault_blockdata import publish_vaults, vault_owners
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -54,17 +54,21 @@ class VaultCreationHandler (TransactionHandler):
super().__init__(VaultCreationRequest.TYPE) super().__init__(VaultCreationRequest.TYPE)
async def build_transaction(self, job_id: int, tr: VaultCreationRequest) -> Optional[ContractTransaction]: async def build_transaction(self, job_id: int, tr: VaultCreationRequest) -> Optional[ContractTransaction]:
owner_addr = to_checksum_address(tr.owner)
vault_addr = vault_address(owner_addr, tr.num)
if vault_owners.get(vault_addr):
# existing vault detected
publish_vaults(tr.chain_id, owner_addr)
return None
factory = get_factory_contract() factory = get_factory_contract()
owner_address = to_checksum_address(tr.owner)
try: try:
return await factory.build.deployVault(owner_address, tr.num) return await factory.build.deployVault(owner_addr, tr.num)
except ContractLogicError: except ContractLogicError:
in_flight.discard((tr.chain_id, tr.owner, tr.num)) in_flight.discard((tr.chain_id, tr.owner, tr.num))
# maybe the vault already exists? # maybe the vault already exists?
addr = vault_address(tr.owner, tr.num) owner = await ContractProxy(vault_addr, 'Vault').owner()
owner = await ContractProxy(addr, 'Vault').owner() if owner == owner_addr:
if owner == owner_address: log.debug(f'detected existing vault at {vault_addr}')
log.debug(f'detected existing vault at {addr}')
publish_vaults(tr.chain_id, owner) publish_vaults(tr.chain_id, owner)
return None return None
raise raise
@@ -91,7 +95,8 @@ def handle_vault_creation_requests():
for req in db.session.query(DbVaultCreationRequest).where( for req in db.session.query(DbVaultCreationRequest).where(
DbVaultCreationRequest.vault == None, DbVaultCreationRequest.chain==current_chain.get()): DbVaultCreationRequest.vault == None, DbVaultCreationRequest.chain==current_chain.get()):
req: DbVaultCreationRequest req: DbVaultCreationRequest
key = req.chain.id, req.owner, req.num owner = to_checksum_address(req.owner)
key = req.chain.id, owner, req.num
if key not in in_flight: if key not in in_flight:
vcr = VaultCreationRequest(*key) vcr = VaultCreationRequest(*key)
submit_transaction_request(vcr) submit_transaction_request(vcr)