diff --git a/alembic/versions/516b55c83144_initial_schema.py b/alembic/versions/516b55c83144_initial_schema.py index 1b6f6d0..c1e3266 100644 --- a/alembic/versions/516b55c83144_initial_schema.py +++ b/alembic/versions/516b55c83144_initial_schema.py @@ -74,24 +74,20 @@ def upgrade() -> None: sa.Column('height', sa.Integer(), nullable=False), sa.Column('state', sa.Enum('Requested', 'Signed', 'Sent', 'Mined', name='transactionjobstate'), nullable=False), sa.Column('request', dexorder.database.column_types.DataclassDictBase(astext_type=sa.Text()), nullable=False), + sa.Column('tx_id', postgresql.BYTEA(), nullable=True), + sa.Column('tx_data', postgresql.BYTEA(), nullable=True), + sa.Column('receipt', postgresql.JSONB(astext_type=sa.Text()), nullable=True), sa.PrimaryKeyConstraint('id') ) op.create_index(op.f('ix_transactionjob_chain'), 'transactionjob', ['chain'], unique=False) op.create_index(op.f('ix_transactionjob_height'), 'transactionjob', ['height'], unique=False) op.create_index(op.f('ix_transactionjob_state'), 'transactionjob', ['state'], unique=False) - op.create_table('tx', - sa.Column('id', postgresql.BYTEA(), nullable=False), - sa.Column('data', postgresql.BYTEA(), nullable=False), - sa.Column('job_id', sa.UUID(), nullable=False), - sa.Column('receipt', postgresql.JSONB(astext_type=sa.Text()), nullable=True), - sa.ForeignKeyConstraint(['job_id'], ['transactionjob.id'], ), - sa.PrimaryKeyConstraint('id') - ) + op.create_index(op.f('ix_transactionjob_tx_id'), 'transactionjob', ['tx_id'], unique=False) # ### end Alembic commands ### def downgrade() -> None: - op.drop_table('tx') + op.drop_index(op.f('ix_transactionjob_tx_id'), table_name='transactionjob') op.drop_index(op.f('ix_transactionjob_state'), table_name='transactionjob') op.drop_index(op.f('ix_transactionjob_height'), table_name='transactionjob') op.drop_index(op.f('ix_transactionjob_chain'), table_name='transactionjob') diff --git a/src/dexorder/bin/backfill_ohlc.py b/src/dexorder/bin/backfill_ohlc.py index 065b73e..d52bb63 100644 --- a/src/dexorder/bin/backfill_ohlc.py +++ b/src/dexorder/bin/backfill_ohlc.py @@ -24,7 +24,7 @@ log = logging.getLogger('dexorder.backfill') async def finalize_callback(fork: Fork, diffs: Reversible[Union[DiffItem, DiffEntryItem]]): - ohlc_save(diffs) + ohlc_save(fork, diffs) ts = await get_block_timestamp(fork.head) log.info(f'backfill completed through block {fork.height} {from_timestamp(ts):%Y-%m-%d %H:%M:%S}') @@ -62,7 +62,7 @@ async def main(): runner.on_promotion.append(finalize_callback) if db: # noinspection PyUnboundLocalVariable - runner.on_promotion.append(db_state.save) + runner.on_promotion.append(db_state.finalize) if redis_state: runner.on_head_update.append(redis_state.save) diff --git a/src/dexorder/bin/executable.py b/src/dexorder/bin/executable.py index 87e75b9..a1afe30 100644 --- a/src/dexorder/bin/executable.py +++ b/src/dexorder/bin/executable.py @@ -55,8 +55,8 @@ def execute(main:Coroutine, shutdown=None, *, parse_logging=True, parse_args=Tru loop = asyncio.get_event_loop() signals = Signals.SIGQUIT, Signals.SIGTERM, Signals.SIGINT for s in signals: - loop.add_signal_handler(s, lambda sig=s: asyncio.create_task(_shutdown_coro(sig, loop, shutdown))) - task = loop.create_task(main) + loop.add_signal_handler(s, lambda sig=s: asyncio.create_task(_shutdown_coro(sig, loop, shutdown), name=f'{s.name} handler')) + task = loop.create_task(main, name='main') loop.run_until_complete(task) x = task.exception() if x is not None: diff --git a/src/dexorder/bin/main.py b/src/dexorder/bin/main.py index f9d3e5d..89eb87c 100644 --- a/src/dexorder/bin/main.py +++ b/src/dexorder/bin/main.py @@ -16,7 +16,7 @@ from dexorder.event_handler import init_order_triggers, init, dump_log, handle_v from dexorder.memcache import memcache from dexorder.memcache.memcache_state import RedisState, publish_all from dexorder.runner import BlockStateRunner -from dexorder.transaction import handle_transaction_receipts +from dexorder.transaction import handle_transaction_receipts, finalize_transactions log = logging.getLogger('dexorder') LOG_ALL_EVENTS = False # for debug todo config @@ -57,7 +57,7 @@ def setup_logevent_triggers(runner): runner.add_event_trigger(handle_swap_filled, get_contract_event('OrderLib', 'DexorderSwapFilled')) runner.add_event_trigger(handle_order_canceled, get_contract_event('OrderLib', 'DexorderSwapCanceled')) runner.add_event_trigger(handle_order_cancel_all, get_contract_event('OrderLib', 'DexorderCancelAll')) - runner.add_event_trigger(handle_transaction_receipts) + runner.add_event_trigger(handle_transaction_receipts) # todo handle only the transactions that were posted to this block runner.add_event_trigger(handle_dexorderexecutions, executions) # these callbacks run after the ones above on each block, plus these also run every second @@ -97,9 +97,10 @@ async def main(): if db: runner.on_state_init.append(init_order_triggers) # noinspection PyUnboundLocalVariable - runner.on_promotion.append(db_state.save) + runner.on_promotion.append(db_state.finalize) if redis_state: runner.on_head_update.append(redis_state.save) + runner.on_promotion.append(finalize_transactions) try: await runner.run() diff --git a/src/dexorder/blocks.py b/src/dexorder/blocks.py index 76e8044..8442a68 100644 --- a/src/dexorder/blocks.py +++ b/src/dexorder/blocks.py @@ -6,6 +6,7 @@ Use `await get_block()` to retreive a Block from a given hash using the full cac Use `await fetch_block()` to force an RPC query for the Block, adding that block to the LRU cache. """ import logging +from contextvars import ContextVar from typing import Union from cachetools import LRUCache @@ -74,3 +75,6 @@ async def fetch_block(blockhash, *, chain_id=None): def promotion_height(chain, latest_height): confirm_offset = config.confirms if config.confirms is not None else chain.confirms return latest_height - confirm_offset + + +current_block = ContextVar[Block]('current_block') diff --git a/src/dexorder/blockstate/blockdata.py b/src/dexorder/blockstate/blockdata.py index c5cd7b3..a0047ce 100644 --- a/src/dexorder/blockstate/blockdata.py +++ b/src/dexorder/blockstate/blockdata.py @@ -25,10 +25,10 @@ class BlockData (Generic[T]): registry: dict[Any,'BlockData'] = {} # series name and instance def __init__(self, data_type: DataType, series: Any, *, - series2str=None, series2key=None, # defaults to key2str and str2key + series2str=None, series2key=None, # defaults to key2str and str2key key2str=util_key2str, str2key=util_str2key, - value2str=json.dumps, str2value=json.loads, # serialize/deserialize value to something JSON-able - savecb:Callable[[Any,Any],None]=None, # callback(key, value) where value may be DELETE + value2str=json.dumps, str2value=json.loads, # serialize/deserialize value to something JSON-able + finalize_cb:Callable[[Any, Any],None]=None, # callback(key, value) where value may be DELETE **opts): assert series not in BlockData.registry BlockData.registry[series] = self @@ -41,7 +41,7 @@ class BlockData (Generic[T]): self.series2key = series2key or self.str2key self.value2str = value2str self.str2value = str2value - self.savecb = savecb + self.finalize_cb = finalize_cb # set this to a method which fetches final data (e.g. database) self.lazy_getitem: Optional[Callable[['BlockData',Any],Union[NARG,T]]] = None diff --git a/src/dexorder/blockstate/db_state.py b/src/dexorder/blockstate/db_state.py index aa2406c..7b0e910 100644 --- a/src/dexorder/blockstate/db_state.py +++ b/src/dexorder/blockstate/db_state.py @@ -32,7 +32,7 @@ class DbState(SeriesCollection): value = db.session.get(Entity, (chain_id, series, key)) return var.str2value(value.value) - def save(self, fork: Fork, diffs: Iterable[Union[DiffItem,DiffEntryItem]] ): + def finalize(self, fork: Fork, diffs: Iterable[Union[DiffItem,DiffEntryItem]]): chain_id = current_chain.get().id for diff in diffs: try: @@ -61,8 +61,8 @@ class DbState(SeriesCollection): found.value = value else: raise NotImplementedError - if d.savecb: - d.savecb(diff.key, diff.value) + if d.finalize_cb: + d.finalize_cb(diff.key, diff.value) # save root block info db.kv[f'root_block|{chain_id}'] = [fork.height, fork.head] diff --git a/src/dexorder/database/model/__init__.py b/src/dexorder/database/model/__init__.py index de6a48a..7858b49 100644 --- a/src/dexorder/database/model/__init__.py +++ b/src/dexorder/database/model/__init__.py @@ -1,10 +1,8 @@ from .base import Base from .kv import KeyValue from .series import SeriesSet, SeriesDict -from .transaction import Transaction, TransactionJob +from .transaction import TransactionJob from .orderindex import OrderIndex from .pool import Pool from .token import Token - -class Block: pass diff --git a/src/dexorder/database/model/transaction.py b/src/dexorder/database/model/transaction.py index 3df9566..4a6f48a 100644 --- a/src/dexorder/database/model/transaction.py +++ b/src/dexorder/database/model/transaction.py @@ -31,14 +31,6 @@ class TransactionJob (Base): height: Mapped[int] = mapped_column(index=True) # the height at which the job was created, to be used for timeout/ data rolloff and/or by Timescale state: Mapped[TransactionJobState] = mapped_column(TransactionJobStateColumnType, default=TransactionJobState.Requested, index=True) request: Mapped[TransactionRequestDict] = mapped_column(DataclassDict(deserialize_transaction_request)) - tx: Mapped[list["Transaction"]] = relationship(back_populates='job', uselist=False) - - -class Transaction (Base): - __tablename__ = 'tx' # avoid the keyword "transaction" - - id: Mapped[Bytes] = mapped_column(primary_key=True) - data: Mapped[Bytes] # the signed tx data - job_id: Mapped[UUID] = mapped_column(ForeignKey("transactionjob.id")) - job: Mapped[TransactionJob] = relationship(back_populates='tx', single_parent=True) - receipt: Mapped[Optional[Dict]] # todo handle forks that didnt confirm: receipts are per-fork! + tx_id: Mapped[Optional[Bytes]] = mapped_column(index=True) + tx_data: Mapped[Optional[Bytes]] + receipt: Mapped[Optional[Dict]] diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index 1169c4a..4f2c157 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -244,6 +244,7 @@ async def process_execution_requests(): if pending is None or height-pending >= 30: # todo execution timeout => retry ; should we use timestamps? configure per-chain. # todo check balances + log.warning(f're-sending unconfirmed transaction {tk} is pending execution') execs[tk] = er else: log.debug(f'tranche {tk} is pending execution') diff --git a/src/dexorder/order/orderstate.py b/src/dexorder/order/orderstate.py index 3d962d2..a8c34b2 100644 --- a/src/dexorder/order/orderstate.py +++ b/src/dexorder/order/orderstate.py @@ -208,7 +208,7 @@ class Order: @staticmethod def pub_order_fills(_s, k, v): - log.debug(f'pub_order_fills {k} {v}') + # log.debug(f'pub_order_fills {k} {v}') # publish status updates (on placing and completion) to web clients if v is DELETE: return None diff --git a/src/dexorder/order/triggers.py b/src/dexorder/order/triggers.py index b23aeb0..f55bfd9 100644 --- a/src/dexorder/order/triggers.py +++ b/src/dexorder/order/triggers.py @@ -56,7 +56,7 @@ async def line_passes(lc: tuple[float,float], is_min: bool, price: dec) -> bool: if b == 0 and m == 0: return True limit = m * current_clock.get().timestamp + b - log.debug(f'line passes {limit} {"<" if is_min else ">"} {price}') + # log.debug(f'line passes {limit} {"<" if is_min else ">"} {price}') # todo ratios # prices AT the limit get zero volume, so we only trigger on >, not >= return is_min and limit < price or not is_min and limit > price @@ -119,7 +119,7 @@ class TrancheTrigger: time_triggers.remove(self.time_trigger) def time_trigger(self, now): - log.debug(f'time_trigger {now} {self.status} {self.time_constraint}') + # log.debug(f'time_trigger {now} {self.status} {self.time_constraint}') if self.closed: log.debug(f'price trigger ignored because trigger status is {self.status}') return @@ -150,7 +150,7 @@ class TrancheTrigger: if self.closed: log.debug(f'price trigger ignored because trigger status is {self.status}') return - log.debug(f'price trigger {cur}') + # log.debug(f'price trigger {cur}') addr = pool_address(self.order.order) pool = await get_pool(addr) if cur is None and self.has_line_constraint: diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index d6add0f..de2d9eb 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -1,6 +1,6 @@ import asyncio import logging -from asyncio import Event, CancelledError +from asyncio import Event from datetime import timedelta from typing import Any, Iterable, Callable, Optional @@ -12,14 +12,14 @@ from dexorder import Blockchain, db, current_pub, async_yield, current_w3, confi from dexorder.base.block import Block, latest_block from dexorder.base.chain import current_chain, current_clock, BlockClock from dexorder.blockchain.connection import create_w3_ws, create_w3 -from dexorder.blocks import cache_block, get_block, promotion_height, get_block_by_number +from dexorder.blocks import cache_block, get_block, promotion_height, get_block_by_number, current_block from dexorder.blockstate import BlockState, current_blockstate from dexorder.blockstate.branch import Branch from dexorder.blockstate.diff import DiffEntryItem from dexorder.blockstate.fork import current_fork, Fork from dexorder.progressor import BlockProgressor from dexorder.transaction import create_and_send_transactions -from dexorder.util import hexstr, hexbytes, hexint +from dexorder.util import hexstr, hexbytes from dexorder.util.async_util import maywait, Maywaitable from dexorder.util.shutdown import fatal @@ -64,7 +64,7 @@ class BlockStateRunner(BlockProgressor): # this run() process discovers new heads and puts them on a queue for the worker to process. the discovery is ether websockets or polling self.running = True # this run() process discovers new heads and puts them on a queue for the worker to process - _worker_task = asyncio.create_task(self.worker()) + _worker_task = asyncio.create_task(self.worker(), name='worker') return await (self.run_polling() if config.polling > 0 or not config.ws_url else self.run_ws()) async def run_ws(self): @@ -214,9 +214,14 @@ class BlockStateRunner(BlockProgressor): chain = current_chain.get() assert chain.id == await w3.eth.chain_id current_clock.set(BlockClock()) + fork = None while self.running: try: - await self.new_head_event.wait() + await asyncio.wait_for(self.new_head_event.wait(), timeout=1) # todo configure + except TimeoutError: + if fork is not None: + await self.handle_time_tick(fork) + continue except asyncio.CancelledError: break self.new_head_event.clear() @@ -261,6 +266,7 @@ class BlockStateRunner(BlockProgressor): # event callbacks are triggered in the order in which they're registered. the events passed to # each callback are in block transaction order block = await get_block(blockhash) + current_block.set(block) bloom = BloomFilter(int.from_bytes(hexbytes(block.data['logsBloom']))) for callback, event, log_filter in self.events: if log_filter is None: @@ -352,9 +358,6 @@ class BlockStateRunner(BlockProgressor): async def handle_time_tick(self, fork: Fork): - # todo re-enable time ticks - if current_blockstate.get() is None: - return current_fork.set(fork) session = db.session session.begin() diff --git a/src/dexorder/transaction.py b/src/dexorder/transaction.py index b990619..53af11f 100644 --- a/src/dexorder/transaction.py +++ b/src/dexorder/transaction.py @@ -2,14 +2,18 @@ import logging from abc import abstractmethod from uuid import uuid4 +from sqlalchemy import select from web3.exceptions import TransactionNotFound from dexorder import db, current_w3 from dexorder.base.chain import current_chain from dexorder.base.order import TransactionRequest -from dexorder.blockstate.fork import current_fork +from dexorder.blocks import current_block +from dexorder.blockstate import BlockDict +from dexorder.blockstate.diff import DiffEntryItem +from dexorder.blockstate.fork import current_fork, Fork from dexorder.contract.contract_proxy import ContractTransaction -from dexorder.database.model.transaction import TransactionJob, Transaction as DbTransaction, TransactionJobState +from dexorder.database.model.transaction import TransactionJob, TransactionJobState log = logging.getLogger(__name__) @@ -32,7 +36,8 @@ class TransactionHandler: def submit_transaction_request(tr: TransactionRequest): - job = TransactionJob(id=uuid4(), chain=current_chain.get(), height=current_fork.get().height, state=TransactionJobState.Requested, request=tr) + job = TransactionJob(id=uuid4(), chain=current_chain.get(), height=current_fork.get().height, + state=TransactionJobState.Requested, request=tr) db.session.add(job) return job @@ -64,9 +69,9 @@ async def create_transaction(job: TransactionJob): log.warning(f'unable to send transaction for job {job.id}') return job.state = TransactionJobState.Signed # todo lazy signing + job.tx_id = ctx.id_bytes + job.tx_data = ctx.data db.session.add(job) - dbtx = DbTransaction(id=ctx.id_bytes, job=job, data=ctx.data, receipt=None) - db.session.add(dbtx) log.info(f'servicing transaction request {job.request.__class__.__name__} {job.id} with tx {ctx.id}') # todo sign-and-send should be a single phase. if the send fails due to lack of wallet gas, or because gas price went up suddenly, @@ -79,37 +84,52 @@ async def send_transactions(): TransactionJob.state == TransactionJobState.Signed ): log.debug(f'sending transaction for job {job.id}') - sent = await w3.eth.send_raw_transaction(job.tx.data) - assert sent == job.tx.id + sent = await w3.eth.send_raw_transaction(job.tx_data) + assert sent == job.tx_id job.state = TransactionJobState.Sent db.session.add(job) async def handle_transaction_receipts(): + w3 = current_w3.get() for job in db.session.query(TransactionJob).filter( TransactionJob.chain == current_chain.get(), TransactionJob.state == TransactionJobState.Sent, ): - if job.tx and not job.tx.receipt: - await check_receipt(job) + assert job.tx_id and not job.receipt + block = current_block.get() + if job.tx_id in block.data['transactions']: + try: + receipt = await w3.eth.get_transaction_receipt(job.tx_id) + except TransactionNotFound: + pass + else: + # don't set the database yet because we could get reorged + completed_transactions[job.tx_id] = receipt + try: + handler = TransactionHandler.of(job.request.type) + except KeyError: + # todo remove bad request? + log.warning(f'ignoring transaction request with bad type "{job.request.type}"') + else: + await handler.complete_transaction(job) -async def check_receipt(job: TransactionJob): - if not job.tx: - return - w3 = current_w3.get() - try: - receipt = await w3.eth.get_transaction_receipt(job.tx.id) - except TransactionNotFound: - pass - else: - job.tx.receipt = receipt - job.state = TransactionJobState.Mined - try: - handler = TransactionHandler.of(job.request.type) - except KeyError: - # todo remove bad request? - log.warning(f'ignoring transaction request with bad type "{job.request.type}"') - else: - await handler.complete_transaction(job) +def finalize_transactions(_fork: Fork, diffs: list[DiffEntryItem]): + open_txs = set(db.session.execute(select(TransactionJob.tx_id).where( + TransactionJob.chain == current_chain.get(), + TransactionJob.state == TransactionJobState.Sent + )).all()) + for diff in diffs: + if diff.series == 'mined_txs' and diff.key in open_txs: + job = db.session.scalar(TransactionJob).where( + TransactionJob.chain == current_chain.get(), + TransactionJob.state == TransactionJobState.Sent, + TransactionJob.tx_id == diff.key + ).one() + job.state = TransactionJobState.Mined + job.receipt = diff.value + db.session.add(job) + +completed_transactions = BlockDict[bytes, dict]('mined_txs') # stores the transaction receipt