transactionjob updates and fixes

This commit is contained in:
Tim Olson
2023-10-20 01:08:59 -04:00
parent 558b94bfb2
commit 2fd378a936
7 changed files with 32 additions and 28 deletions

View File

@@ -58,11 +58,10 @@ def upgrade() -> None:
op.create_index(op.f('ix_transactionjob_completed'), 'transactionjob', ['completed'], unique=False)
op.create_index(op.f('ix_transactionjob_height'), 'transactionjob', ['height'], unique=False)
op.create_table('tx',
sa.Column('id', sa.UUID(), nullable=False),
sa.Column('tx', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.Column('hash', postgresql.BYTEA(), nullable=False),
sa.Column('receipt', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.ForeignKeyConstraint(['id'], ['transactionjob.id'], ),
sa.Column('id', postgresql.BYTEA(), nullable=False),
sa.Column('job_id', sa.UUID(), nullable=False),
sa.Column('receipt', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.ForeignKeyConstraint(['job_id'], ['transactionjob.id'], ),
sa.PrimaryKeyConstraint('id')
)

View File

@@ -5,6 +5,7 @@ from uuid import uuid4
from dexorder import db, current_w3
from dexorder.base.chain import current_chain
from dexorder.base.order import TransactionRequest
from dexorder.contract import Transaction
from dexorder.database.model.block import current_block
from dexorder.database.model.transaction import TransactionJob, Transaction as DbTransaction
@@ -49,11 +50,12 @@ async def create_transaction(job: TransactionJob):
handler = TransactionHandler.of(job.request.type)
except KeyError:
# todo remove bad request?
log.warning(f'ignoring transaction request with bad type "{job.request.type}"')
log.warning(f'ignoring transaction request with bad type "{job.request.type}": {",".join(TransactionHandler.instances.keys())}')
else:
tx = await handler.send_transaction(job.id, job.request)
job.tx = dtx = DbTransaction(tx=tx, hash=tx['hash'], receipt=None)
db.session.add(dtx)
# noinspection PyTypeChecker
tx: Transaction = await handler.send_transaction(job.id, job.request)
dbtx = DbTransaction(id=tx.id_bytes, job=job, receipt=None)
db.session.add(dbtx)
log.info(f'servicing transaction request {job.request.__class__.__name__} {job.id} with tx {tx}')
@@ -61,11 +63,10 @@ async def check_receipt(job: TransactionJob):
if not job.tx:
return
w3 = current_w3.get()
receipt = await w3.eth.get_transaction_receipt(job.tx.hash)
receipt = await w3.eth.get_transaction_receipt(job.tx.id)
if receipt is not None:
job.tx.receipt = receipt
job.completed = True
db.session.add(job.tx)
try:
handler = TransactionHandler.of(job.request.type)
except KeyError:

View File

@@ -1,4 +1,5 @@
import logging
from typing import Optional
from sqlalchemy import ForeignKey, UniqueConstraint
from sqlalchemy.orm import mapped_column, Mapped, relationship
@@ -17,13 +18,12 @@ class TransactionJob (Base):
height: Mapped[int] = mapped_column(index=True) # to be used for data rolloff and/or by Timescale
completed: Mapped[bool] = mapped_column(default=False, index=True)
request: Mapped[TransactionRequestDict] = mapped_column(DataclassDict(deserialize_transaction_request))
tx: Mapped["Transaction"] = relationship(back_populates="job", uselist=False)
tx: Mapped[list["Transaction"]] = relationship(back_populates='job', uselist=False)
class Transaction (Base):
__tablename__ = 'tx' # avoid the keyword "transaction"
id: Mapped[UUID] = mapped_column(ForeignKey("transactionjob.id"), primary_key=True)
job: Mapped[TransactionJob] = relationship(back_populates="tx", single_parent=True)
tx: Mapped[Dict]
hash: Mapped[Bytes]
receipt: Mapped[Dict]
id: Mapped[Bytes] = mapped_column(primary_key=True)
job_id: Mapped[UUID] = mapped_column(ForeignKey("transactionjob.id"))
job: Mapped[TransactionJob] = relationship(back_populates='tx', single_parent=True)
receipt: Mapped[Optional[Dict]]

View File

@@ -5,7 +5,7 @@ from web3.types import EventData
from dexorder import current_pub, db
from dexorder.base.chain import current_chain
from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request
from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, OrderKey
from dexorder.blockchain.transaction import handle_transactions, submit_transaction
from dexorder.blockchain.uniswap import uniswap_price
from dexorder.contract.dexorder import get_factory_contract, vault_address, VaultContract, get_dexorder_contract
@@ -14,14 +14,14 @@ from dexorder.data import pool_prices, vault_owners, vault_tokens, underfunded_v
from dexorder.database.model.block import current_block
from dexorder.database.model.transaction import TransactionJob
from dexorder.order.orderlib import SwapOrderState, SwapOrderStatus
from dexorder.order.orderstate import Order, active_orders
from dexorder.order.orderstate import Order
from dexorder.order.triggers import OrderTriggers, close_order_and_disable_triggers, price_triggers, time_triggers, \
unconstrained_price_triggers, execution_requests, inflight_execution_requests
from dexorder.util import hexbytes
log = logging.getLogger(__name__)
LOG_ALL_EVENTS = False # for debug
LOG_ALL_EVENTS = True # for debug
async def ensure_pool_price(pool_addr):
@@ -30,7 +30,7 @@ async def ensure_pool_price(pool_addr):
pool_prices[pool_addr] = await UniswapV3Pool(pool_addr).price()
def dump_log(eventlog):
log.debug(f'eventlog {eventlog}')
log.debug(f'\t{eventlog}')
def setup_logevent_triggers(runner):
runner.events.clear()
@@ -40,6 +40,7 @@ def setup_logevent_triggers(runner):
# before any order creations
if LOG_ALL_EVENTS:
log.debug('all events:')
runner.add_event_trigger(dump_log, None, {})
factory = get_factory_contract()
@@ -82,7 +83,6 @@ async def handle_order_placed(event: EventData):
log.warning(f'block {current_block.get()} order from unknown vault {addr}') # todo insert (short) block hash into all logs
# return todo discard rogues
vault = VaultContract(addr)
log.debug(await vault.orderList())
for index in range(start_index, start_index+num_orders):
obj = await vault.swapOrderStatus(index)
log.debug(f'raw order status {obj}')
@@ -117,7 +117,7 @@ def handle_transfer(transfer: EventData):
if to_address in underfunded_vaults:
# todo possibly funded now
pass
if from_address in active_orders:
if from_address in Order.open_keys:
# todo possibly underfunded now
pass
@@ -159,12 +159,12 @@ def handle_vault_created(created: EventData):
def handle_dexorderexecutions(event: EventData):
log.debug(f'executions {event}')
exe_id = UUID(hexbytes(event['args']['id']))
exe_id = UUID(bytes=event['args']['id'])
errors = event['args']['errors']
job = db.session.get(TransactionJob, exe_id)
req: TrancheExecutionRequest = job.request
tk = TrancheKey( req.vault, req.order_index, req.tranche_index )
order = active_orders[tk]
order = Order.of(req.vault, req.order_index)
if job is None:
log.warning(f'Job {exe_id} not found!')
return
@@ -179,8 +179,9 @@ def handle_dexorderexecutions(event: EventData):
pass # execution success
elif error == 'IIA':
# insufficient input amount: suspend execution until new funds are sent
# todo replace with vault balance checks
token = order.order.tokenIn
underfunded_vaults[req.vault] = underfunded_vaults.get(req.vault, []) + [token]
# underfunded_vaults[req.vault] = underfunded_vaults.get(req.vault, []) + [token]
log.debug(f'insufficient funds {req.vault} {token} ')
else:
log.error(f'Unhandled execution error for transaction request {req} ERROR: "{error}"')

View File

@@ -1,3 +1,4 @@
from .executionhandler import TrancheExecutionHandler # do not remove. ensures the handler is registered.
def order_key(vault:str, ):
return f'{vault}'

View File

@@ -1,4 +1,5 @@
import logging
from uuid import UUID
from dexorder.base.order import TrancheExecutionRequest
from dexorder.blockchain.transaction import TransactionHandler
@@ -11,8 +12,8 @@ class TrancheExecutionHandler (TransactionHandler):
def __init__(self):
super().__init__('te')
async def send_transaction(self, job_id: int, ter: TrancheExecutionRequest) -> dict:
return await get_dexorder_contract().transact.execute(job_id, (ter.vault, ter.order_index, ter.tranche_index, ter.price_proof))
async def send_transaction(self, job_id: UUID, ter: TrancheExecutionRequest) -> dict:
return await get_dexorder_contract().transact.execute(job_id.bytes, (ter.vault, ter.order_index, ter.tranche_index, ter.price_proof))
async def complete_transaction(self, job: TransactionJob) -> None:
# anything to do?

View File

@@ -77,6 +77,7 @@ class Order:
self.amount = self.status.order.amount
self.amount_is_input = self.status.order.amountIsInput
self.tranche_amounts = [t.fraction_of(self.amount) for t in self.order.tranches]
Order.instances[self.key] = self
@property
def state(self):