reworked transaction receipt handling

This commit is contained in:
Tim
2024-04-09 23:04:45 -04:00
parent 54f6ffd52a
commit acbbaa0229
14 changed files with 91 additions and 76 deletions

View File

@@ -74,24 +74,20 @@ def upgrade() -> None:
sa.Column('height', sa.Integer(), nullable=False),
sa.Column('state', sa.Enum('Requested', 'Signed', 'Sent', 'Mined', name='transactionjobstate'), nullable=False),
sa.Column('request', dexorder.database.column_types.DataclassDictBase(astext_type=sa.Text()), nullable=False),
sa.Column('tx_id', postgresql.BYTEA(), nullable=True),
sa.Column('tx_data', postgresql.BYTEA(), nullable=True),
sa.Column('receipt', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_transactionjob_chain'), 'transactionjob', ['chain'], unique=False)
op.create_index(op.f('ix_transactionjob_height'), 'transactionjob', ['height'], unique=False)
op.create_index(op.f('ix_transactionjob_state'), 'transactionjob', ['state'], unique=False)
op.create_table('tx',
sa.Column('id', postgresql.BYTEA(), nullable=False),
sa.Column('data', postgresql.BYTEA(), nullable=False),
sa.Column('job_id', sa.UUID(), nullable=False),
sa.Column('receipt', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.ForeignKeyConstraint(['job_id'], ['transactionjob.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_transactionjob_tx_id'), 'transactionjob', ['tx_id'], unique=False)
# ### end Alembic commands ###
def downgrade() -> None:
op.drop_table('tx')
op.drop_index(op.f('ix_transactionjob_tx_id'), table_name='transactionjob')
op.drop_index(op.f('ix_transactionjob_state'), table_name='transactionjob')
op.drop_index(op.f('ix_transactionjob_height'), table_name='transactionjob')
op.drop_index(op.f('ix_transactionjob_chain'), table_name='transactionjob')

View File

@@ -24,7 +24,7 @@ log = logging.getLogger('dexorder.backfill')
async def finalize_callback(fork: Fork, diffs: Reversible[Union[DiffItem, DiffEntryItem]]):
ohlc_save(diffs)
ohlc_save(fork, diffs)
ts = await get_block_timestamp(fork.head)
log.info(f'backfill completed through block {fork.height} {from_timestamp(ts):%Y-%m-%d %H:%M:%S}')
@@ -62,7 +62,7 @@ async def main():
runner.on_promotion.append(finalize_callback)
if db:
# noinspection PyUnboundLocalVariable
runner.on_promotion.append(db_state.save)
runner.on_promotion.append(db_state.finalize)
if redis_state:
runner.on_head_update.append(redis_state.save)

View File

@@ -55,8 +55,8 @@ def execute(main:Coroutine, shutdown=None, *, parse_logging=True, parse_args=Tru
loop = asyncio.get_event_loop()
signals = Signals.SIGQUIT, Signals.SIGTERM, Signals.SIGINT
for s in signals:
loop.add_signal_handler(s, lambda sig=s: asyncio.create_task(_shutdown_coro(sig, loop, shutdown)))
task = loop.create_task(main)
loop.add_signal_handler(s, lambda sig=s: asyncio.create_task(_shutdown_coro(sig, loop, shutdown), name=f'{s.name} handler'))
task = loop.create_task(main, name='main')
loop.run_until_complete(task)
x = task.exception()
if x is not None:

View File

@@ -16,7 +16,7 @@ from dexorder.event_handler import init_order_triggers, init, dump_log, handle_v
from dexorder.memcache import memcache
from dexorder.memcache.memcache_state import RedisState, publish_all
from dexorder.runner import BlockStateRunner
from dexorder.transaction import handle_transaction_receipts
from dexorder.transaction import handle_transaction_receipts, finalize_transactions
log = logging.getLogger('dexorder')
LOG_ALL_EVENTS = False # for debug todo config
@@ -57,7 +57,7 @@ def setup_logevent_triggers(runner):
runner.add_event_trigger(handle_swap_filled, get_contract_event('OrderLib', 'DexorderSwapFilled'))
runner.add_event_trigger(handle_order_canceled, get_contract_event('OrderLib', 'DexorderSwapCanceled'))
runner.add_event_trigger(handle_order_cancel_all, get_contract_event('OrderLib', 'DexorderCancelAll'))
runner.add_event_trigger(handle_transaction_receipts)
runner.add_event_trigger(handle_transaction_receipts) # todo handle only the transactions that were posted to this block
runner.add_event_trigger(handle_dexorderexecutions, executions)
# these callbacks run after the ones above on each block, plus these also run every second
@@ -97,9 +97,10 @@ async def main():
if db:
runner.on_state_init.append(init_order_triggers)
# noinspection PyUnboundLocalVariable
runner.on_promotion.append(db_state.save)
runner.on_promotion.append(db_state.finalize)
if redis_state:
runner.on_head_update.append(redis_state.save)
runner.on_promotion.append(finalize_transactions)
try:
await runner.run()

View File

@@ -6,6 +6,7 @@ Use `await get_block()` to retreive a Block from a given hash using the full cac
Use `await fetch_block()` to force an RPC query for the Block, adding that block to the LRU cache.
"""
import logging
from contextvars import ContextVar
from typing import Union
from cachetools import LRUCache
@@ -74,3 +75,6 @@ async def fetch_block(blockhash, *, chain_id=None):
def promotion_height(chain, latest_height):
confirm_offset = config.confirms if config.confirms is not None else chain.confirms
return latest_height - confirm_offset
current_block = ContextVar[Block]('current_block')

View File

@@ -25,10 +25,10 @@ class BlockData (Generic[T]):
registry: dict[Any,'BlockData'] = {} # series name and instance
def __init__(self, data_type: DataType, series: Any, *,
series2str=None, series2key=None, # defaults to key2str and str2key
series2str=None, series2key=None, # defaults to key2str and str2key
key2str=util_key2str, str2key=util_str2key,
value2str=json.dumps, str2value=json.loads, # serialize/deserialize value to something JSON-able
savecb:Callable[[Any,Any],None]=None, # callback(key, value) where value may be DELETE
value2str=json.dumps, str2value=json.loads, # serialize/deserialize value to something JSON-able
finalize_cb:Callable[[Any, Any],None]=None, # callback(key, value) where value may be DELETE
**opts):
assert series not in BlockData.registry
BlockData.registry[series] = self
@@ -41,7 +41,7 @@ class BlockData (Generic[T]):
self.series2key = series2key or self.str2key
self.value2str = value2str
self.str2value = str2value
self.savecb = savecb
self.finalize_cb = finalize_cb
# set this to a method which fetches final data (e.g. database)
self.lazy_getitem: Optional[Callable[['BlockData',Any],Union[NARG,T]]] = None

View File

@@ -32,7 +32,7 @@ class DbState(SeriesCollection):
value = db.session.get(Entity, (chain_id, series, key))
return var.str2value(value.value)
def save(self, fork: Fork, diffs: Iterable[Union[DiffItem,DiffEntryItem]] ):
def finalize(self, fork: Fork, diffs: Iterable[Union[DiffItem,DiffEntryItem]]):
chain_id = current_chain.get().id
for diff in diffs:
try:
@@ -61,8 +61,8 @@ class DbState(SeriesCollection):
found.value = value
else:
raise NotImplementedError
if d.savecb:
d.savecb(diff.key, diff.value)
if d.finalize_cb:
d.finalize_cb(diff.key, diff.value)
# save root block info
db.kv[f'root_block|{chain_id}'] = [fork.height, fork.head]

View File

@@ -1,10 +1,8 @@
from .base import Base
from .kv import KeyValue
from .series import SeriesSet, SeriesDict
from .transaction import Transaction, TransactionJob
from .transaction import TransactionJob
from .orderindex import OrderIndex
from .pool import Pool
from .token import Token
class Block: pass

View File

@@ -31,14 +31,6 @@ class TransactionJob (Base):
height: Mapped[int] = mapped_column(index=True) # the height at which the job was created, to be used for timeout/ data rolloff and/or by Timescale
state: Mapped[TransactionJobState] = mapped_column(TransactionJobStateColumnType, default=TransactionJobState.Requested, index=True)
request: Mapped[TransactionRequestDict] = mapped_column(DataclassDict(deserialize_transaction_request))
tx: Mapped[list["Transaction"]] = relationship(back_populates='job', uselist=False)
class Transaction (Base):
__tablename__ = 'tx' # avoid the keyword "transaction"
id: Mapped[Bytes] = mapped_column(primary_key=True)
data: Mapped[Bytes] # the signed tx data
job_id: Mapped[UUID] = mapped_column(ForeignKey("transactionjob.id"))
job: Mapped[TransactionJob] = relationship(back_populates='tx', single_parent=True)
receipt: Mapped[Optional[Dict]] # todo handle forks that didnt confirm: receipts are per-fork!
tx_id: Mapped[Optional[Bytes]] = mapped_column(index=True)
tx_data: Mapped[Optional[Bytes]]
receipt: Mapped[Optional[Dict]]

View File

@@ -244,6 +244,7 @@ async def process_execution_requests():
if pending is None or height-pending >= 30:
# todo execution timeout => retry ; should we use timestamps? configure per-chain.
# todo check balances
log.warning(f're-sending unconfirmed transaction {tk} is pending execution')
execs[tk] = er
else:
log.debug(f'tranche {tk} is pending execution')

View File

@@ -208,7 +208,7 @@ class Order:
@staticmethod
def pub_order_fills(_s, k, v):
log.debug(f'pub_order_fills {k} {v}')
# log.debug(f'pub_order_fills {k} {v}')
# publish status updates (on placing and completion) to web clients
if v is DELETE:
return None

View File

@@ -56,7 +56,7 @@ async def line_passes(lc: tuple[float,float], is_min: bool, price: dec) -> bool:
if b == 0 and m == 0:
return True
limit = m * current_clock.get().timestamp + b
log.debug(f'line passes {limit} {"<" if is_min else ">"} {price}')
# log.debug(f'line passes {limit} {"<" if is_min else ">"} {price}')
# todo ratios
# prices AT the limit get zero volume, so we only trigger on >, not >=
return is_min and limit < price or not is_min and limit > price
@@ -119,7 +119,7 @@ class TrancheTrigger:
time_triggers.remove(self.time_trigger)
def time_trigger(self, now):
log.debug(f'time_trigger {now} {self.status} {self.time_constraint}')
# log.debug(f'time_trigger {now} {self.status} {self.time_constraint}')
if self.closed:
log.debug(f'price trigger ignored because trigger status is {self.status}')
return
@@ -150,7 +150,7 @@ class TrancheTrigger:
if self.closed:
log.debug(f'price trigger ignored because trigger status is {self.status}')
return
log.debug(f'price trigger {cur}')
# log.debug(f'price trigger {cur}')
addr = pool_address(self.order.order)
pool = await get_pool(addr)
if cur is None and self.has_line_constraint:

View File

@@ -1,6 +1,6 @@
import asyncio
import logging
from asyncio import Event, CancelledError
from asyncio import Event
from datetime import timedelta
from typing import Any, Iterable, Callable, Optional
@@ -12,14 +12,14 @@ from dexorder import Blockchain, db, current_pub, async_yield, current_w3, confi
from dexorder.base.block import Block, latest_block
from dexorder.base.chain import current_chain, current_clock, BlockClock
from dexorder.blockchain.connection import create_w3_ws, create_w3
from dexorder.blocks import cache_block, get_block, promotion_height, get_block_by_number
from dexorder.blocks import cache_block, get_block, promotion_height, get_block_by_number, current_block
from dexorder.blockstate import BlockState, current_blockstate
from dexorder.blockstate.branch import Branch
from dexorder.blockstate.diff import DiffEntryItem
from dexorder.blockstate.fork import current_fork, Fork
from dexorder.progressor import BlockProgressor
from dexorder.transaction import create_and_send_transactions
from dexorder.util import hexstr, hexbytes, hexint
from dexorder.util import hexstr, hexbytes
from dexorder.util.async_util import maywait, Maywaitable
from dexorder.util.shutdown import fatal
@@ -64,7 +64,7 @@ class BlockStateRunner(BlockProgressor):
# this run() process discovers new heads and puts them on a queue for the worker to process. the discovery is ether websockets or polling
self.running = True
# this run() process discovers new heads and puts them on a queue for the worker to process
_worker_task = asyncio.create_task(self.worker())
_worker_task = asyncio.create_task(self.worker(), name='worker')
return await (self.run_polling() if config.polling > 0 or not config.ws_url else self.run_ws())
async def run_ws(self):
@@ -214,9 +214,14 @@ class BlockStateRunner(BlockProgressor):
chain = current_chain.get()
assert chain.id == await w3.eth.chain_id
current_clock.set(BlockClock())
fork = None
while self.running:
try:
await self.new_head_event.wait()
await asyncio.wait_for(self.new_head_event.wait(), timeout=1) # todo configure
except TimeoutError:
if fork is not None:
await self.handle_time_tick(fork)
continue
except asyncio.CancelledError:
break
self.new_head_event.clear()
@@ -261,6 +266,7 @@ class BlockStateRunner(BlockProgressor):
# event callbacks are triggered in the order in which they're registered. the events passed to
# each callback are in block transaction order
block = await get_block(blockhash)
current_block.set(block)
bloom = BloomFilter(int.from_bytes(hexbytes(block.data['logsBloom'])))
for callback, event, log_filter in self.events:
if log_filter is None:
@@ -352,9 +358,6 @@ class BlockStateRunner(BlockProgressor):
async def handle_time_tick(self, fork: Fork):
# todo re-enable time ticks
if current_blockstate.get() is None:
return
current_fork.set(fork)
session = db.session
session.begin()

View File

@@ -2,14 +2,18 @@ import logging
from abc import abstractmethod
from uuid import uuid4
from sqlalchemy import select
from web3.exceptions import TransactionNotFound
from dexorder import db, current_w3
from dexorder.base.chain import current_chain
from dexorder.base.order import TransactionRequest
from dexorder.blockstate.fork import current_fork
from dexorder.blocks import current_block
from dexorder.blockstate import BlockDict
from dexorder.blockstate.diff import DiffEntryItem
from dexorder.blockstate.fork import current_fork, Fork
from dexorder.contract.contract_proxy import ContractTransaction
from dexorder.database.model.transaction import TransactionJob, Transaction as DbTransaction, TransactionJobState
from dexorder.database.model.transaction import TransactionJob, TransactionJobState
log = logging.getLogger(__name__)
@@ -32,7 +36,8 @@ class TransactionHandler:
def submit_transaction_request(tr: TransactionRequest):
job = TransactionJob(id=uuid4(), chain=current_chain.get(), height=current_fork.get().height, state=TransactionJobState.Requested, request=tr)
job = TransactionJob(id=uuid4(), chain=current_chain.get(), height=current_fork.get().height,
state=TransactionJobState.Requested, request=tr)
db.session.add(job)
return job
@@ -64,9 +69,9 @@ async def create_transaction(job: TransactionJob):
log.warning(f'unable to send transaction for job {job.id}')
return
job.state = TransactionJobState.Signed # todo lazy signing
job.tx_id = ctx.id_bytes
job.tx_data = ctx.data
db.session.add(job)
dbtx = DbTransaction(id=ctx.id_bytes, job=job, data=ctx.data, receipt=None)
db.session.add(dbtx)
log.info(f'servicing transaction request {job.request.__class__.__name__} {job.id} with tx {ctx.id}')
# todo sign-and-send should be a single phase. if the send fails due to lack of wallet gas, or because gas price went up suddenly,
@@ -79,37 +84,52 @@ async def send_transactions():
TransactionJob.state == TransactionJobState.Signed
):
log.debug(f'sending transaction for job {job.id}')
sent = await w3.eth.send_raw_transaction(job.tx.data)
assert sent == job.tx.id
sent = await w3.eth.send_raw_transaction(job.tx_data)
assert sent == job.tx_id
job.state = TransactionJobState.Sent
db.session.add(job)
async def handle_transaction_receipts():
w3 = current_w3.get()
for job in db.session.query(TransactionJob).filter(
TransactionJob.chain == current_chain.get(),
TransactionJob.state == TransactionJobState.Sent,
):
if job.tx and not job.tx.receipt:
await check_receipt(job)
assert job.tx_id and not job.receipt
block = current_block.get()
if job.tx_id in block.data['transactions']:
try:
receipt = await w3.eth.get_transaction_receipt(job.tx_id)
except TransactionNotFound:
pass
else:
# don't set the database yet because we could get reorged
completed_transactions[job.tx_id] = receipt
try:
handler = TransactionHandler.of(job.request.type)
except KeyError:
# todo remove bad request?
log.warning(f'ignoring transaction request with bad type "{job.request.type}"')
else:
await handler.complete_transaction(job)
async def check_receipt(job: TransactionJob):
if not job.tx:
return
w3 = current_w3.get()
try:
receipt = await w3.eth.get_transaction_receipt(job.tx.id)
except TransactionNotFound:
pass
else:
job.tx.receipt = receipt
job.state = TransactionJobState.Mined
try:
handler = TransactionHandler.of(job.request.type)
except KeyError:
# todo remove bad request?
log.warning(f'ignoring transaction request with bad type "{job.request.type}"')
else:
await handler.complete_transaction(job)
def finalize_transactions(_fork: Fork, diffs: list[DiffEntryItem]):
open_txs = set(db.session.execute(select(TransactionJob.tx_id).where(
TransactionJob.chain == current_chain.get(),
TransactionJob.state == TransactionJobState.Sent
)).all())
for diff in diffs:
if diff.series == 'mined_txs' and diff.key in open_txs:
job = db.session.scalar(TransactionJob).where(
TransactionJob.chain == current_chain.get(),
TransactionJob.state == TransactionJobState.Sent,
TransactionJob.tx_id == diff.key
).one()
job.state = TransactionJobState.Mined
job.receipt = diff.value
db.session.add(job)
completed_transactions = BlockDict[bytes, dict]('mined_txs') # stores the transaction receipt