From c9245615cbcc222ec2d9666504492b8b417a5668 Mon Sep 17 00:00:00 2001 From: tim Date: Mon, 24 Feb 2025 19:05:59 -0400 Subject: [PATCH] transaction job cleanup --- src/dexorder/bin/main.py | 6 ++---- src/dexorder/transactions.py | 31 ++++++++-------------------- src/dexorder/vaultcreationhandler.py | 21 ++++++++++++------- 3 files changed, 24 insertions(+), 34 deletions(-) diff --git a/src/dexorder/bin/main.py b/src/dexorder/bin/main.py index 70aa62d..26bdd41 100644 --- a/src/dexorder/bin/main.py +++ b/src/dexorder/bin/main.py @@ -2,6 +2,7 @@ import logging from asyncio import CancelledError from dexorder import db, blockchain +from dexorder.accounting import initialize_accounting from dexorder.alert import warningAlert from dexorder.base.chain import current_chain from dexorder.bin.executable import execute @@ -14,14 +15,12 @@ from dexorder.contract.dexorder import get_dexorder_contract from dexorder.event_handler import (init, dump_log, handle_vault_created, handle_order_placed, handle_transfer, handle_swap_filled, handle_order_canceled, handle_order_cancel_all, handle_uniswap_swaps, handle_vault_impl_changed, initialize_metrics) -from dexorder.gas_fees import adjust_gas, handle_fees_changed, handle_fee_limits_changed from dexorder.memcache import memcache from dexorder.memcache.memcache_state import RedisState, publish_all from dexorder.order.executionhandler import handle_dexorderexecutions, execute_tranches from dexorder.order.triggers import activate_orders, end_trigger_updates -from dexorder.accounting import initialize_accounting from dexorder.runner import BlockStateRunner -from dexorder.transactions import handle_transaction_receipts, finalize_transactions +from dexorder.transactions import handle_transaction_receipts from dexorder.vaultcreationhandler import handle_vault_creation_requests log = logging.getLogger('dexorder') @@ -127,7 +126,6 @@ async def main(): runner.on_promotion.append(db_state.finalize) if redis_state: runner.on_head_update.append(redis_state.save) - runner.on_promotion.append(finalize_transactions) try: await runner.run() diff --git a/src/dexorder/transactions.py b/src/dexorder/transactions.py index 0a5c966..ba9ca39 100644 --- a/src/dexorder/transactions.py +++ b/src/dexorder/transactions.py @@ -4,14 +4,12 @@ from abc import abstractmethod from typing import Optional from uuid import uuid4 -from sqlalchemy import select from web3.exceptions import TransactionNotFound, ContractPanicError, ContractLogicError from dexorder import db, current_w3, Account from dexorder.base import TransactionReceiptDict, TransactionRequest from dexorder.base.chain import current_chain -from dexorder.blockstate.diff import DiffEntryItem -from dexorder.blockstate.fork import current_fork, Fork +from dexorder.blockstate.fork import current_fork from dexorder.contract.contract_proxy import ContractTransaction from dexorder.database.model.transaction import TransactionJob, TransactionJobState from dexorder.util import hexstr @@ -93,9 +91,8 @@ async def create_and_send_transactions(): # these errors can be thrown immediately when the tx is tested for gas log.warning(f'failed to build transaction request for {job.request.__class__.__name__} {job.id}') job.state = TransactionJobState.Error - db.session.add(job) await handler.transaction_exception(job, x) - in_flight.discard((job.request.type, job.request.key)) + end_job(job) return except Exception as x: log.warning(f'unable to send transaction for job {job.id}', exc_info=x) @@ -103,8 +100,7 @@ async def create_and_send_transactions(): if ctx is None: log.info(f'Transaction request {job.request.__class__.__name__} {job.id} declined to build a tx.') job.state = TransactionJobState.Declined - db.session.add(job) - in_flight.discard((job.request.type, job.request.key)) + end_job(job) return w3 = current_w3.get() account = await handler.acquire_account() @@ -136,7 +132,6 @@ async def create_and_send_transactions(): job.tx_id = ctx.id_bytes job.tx_data = ctx.data assert sent == job.tx_id - db.session.add(job) async def handle_transaction_receipts(): @@ -151,6 +146,8 @@ async def handle_transaction_receipts(): receipt: TransactionReceiptDict = await w3.eth.get_transaction_receipt(job.tx_id) except TransactionNotFound: return + job.state = TransactionJobState.Mined + job.receipt = receipt fork = current_fork.get() assert fork is not None if fork.branch.contiguous and receipt['blockHash'] in fork.branch.path or \ @@ -166,20 +163,10 @@ async def handle_transaction_receipts(): await handler.release_account(accounts_in_flight.pop(job.tx_id)) except KeyError: pass - in_flight.discard((job.request.type, job.request.key)) + end_job(job) -def finalize_transactions(_fork: Fork, diffs: list[DiffEntryItem]): - # noinspection PyTypeChecker - open_jobs = db.session.scalars(select(TransactionJob).where( - TransactionJob.chain == current_chain.get(), - TransactionJob.state == TransactionJobState.Sent - )).all() - open_txs = {job.tx_id:job for job in open_jobs} - for diff in diffs: - if diff.series == 'mined_txs' and diff.key in open_txs: - job = open_txs[diff.key] - job.state = TransactionJobState.Mined - job.receipt = diff.value - db.session.add(job) +def end_job(job): + in_flight.discard((job.request.type, job.request.key)) + db.session.delete(job) diff --git a/src/dexorder/vaultcreationhandler.py b/src/dexorder/vaultcreationhandler.py index ae8e27f..fe36668 100644 --- a/src/dexorder/vaultcreationhandler.py +++ b/src/dexorder/vaultcreationhandler.py @@ -16,7 +16,7 @@ from dexorder.database.model import TransactionJob from dexorder.database.model import VaultCreationRequest as DbVaultCreationRequest from dexorder.database.model.accounting import AccountingSubcategory from dexorder.transactions import TransactionHandler, submit_transaction_request -from dexorder.vault_blockdata import publish_vaults +from dexorder.vault_blockdata import publish_vaults, vault_owners log = logging.getLogger(__name__) @@ -54,17 +54,21 @@ class VaultCreationHandler (TransactionHandler): super().__init__(VaultCreationRequest.TYPE) async def build_transaction(self, job_id: int, tr: VaultCreationRequest) -> Optional[ContractTransaction]: + owner_addr = to_checksum_address(tr.owner) + vault_addr = vault_address(owner_addr, tr.num) + if vault_owners.get(vault_addr): + # existing vault detected + publish_vaults(tr.chain_id, owner_addr) + return None factory = get_factory_contract() - owner_address = to_checksum_address(tr.owner) try: - return await factory.build.deployVault(owner_address, tr.num) + return await factory.build.deployVault(owner_addr, tr.num) except ContractLogicError: in_flight.discard((tr.chain_id, tr.owner, tr.num)) # maybe the vault already exists? - addr = vault_address(tr.owner, tr.num) - owner = await ContractProxy(addr, 'Vault').owner() - if owner == owner_address: - log.debug(f'detected existing vault at {addr}') + owner = await ContractProxy(vault_addr, 'Vault').owner() + if owner == owner_addr: + log.debug(f'detected existing vault at {vault_addr}') publish_vaults(tr.chain_id, owner) return None raise @@ -91,7 +95,8 @@ def handle_vault_creation_requests(): for req in db.session.query(DbVaultCreationRequest).where( DbVaultCreationRequest.vault == None, DbVaultCreationRequest.chain==current_chain.get()): req: DbVaultCreationRequest - key = req.chain.id, req.owner, req.num + owner = to_checksum_address(req.owner) + key = req.chain.id, owner, req.num if key not in in_flight: vcr = VaultCreationRequest(*key) submit_transaction_request(vcr)