From 062085a79f6d83b7ecbe96d812bc9e54943a8f57 Mon Sep 17 00:00:00 2001 From: Tim Olson <> Date: Thu, 26 Oct 2023 17:05:20 -0400 Subject: [PATCH] reworked to optionally use Hardhat in mock; chain id 31337; refactored TransactionJob management; execute() mostly commented out for minimalism --- .../versions/db62e7db828d_initial_schema.py | 7 +- requirements.txt | 16 +- src/dexorder/base/__init__.py | 14 + src/dexorder/base/account.py | 1 - src/dexorder/base/chain.py | 4 +- src/dexorder/bin/main.py | 3 +- src/dexorder/blockchain/connection.py | 4 +- src/dexorder/blockstate/db_state.py | 8 +- src/dexorder/blockstate/state.py | 2 +- src/dexorder/configuration/load.py | 10 +- src/dexorder/contract/__init__.py | 6 +- src/dexorder/contract/contract_proxy.py | 51 ++-- src/dexorder/database/model/transaction.py | 25 +- src/dexorder/event_handler.py | 14 +- src/dexorder/order/executionhandler.py | 19 +- src/dexorder/order/triggers.py | 7 +- src/dexorder/runner.py | 275 +++++++++++------- src/dexorder/{blockchain => }/transaction.py | 66 ++++- 18 files changed, 328 insertions(+), 204 deletions(-) rename src/dexorder/{blockchain => }/transaction.py (51%) diff --git a/alembic/versions/db62e7db828d_initial_schema.py b/alembic/versions/db62e7db828d_initial_schema.py index b37ca1a..cea645a 100644 --- a/alembic/versions/db62e7db828d_initial_schema.py +++ b/alembic/versions/db62e7db828d_initial_schema.py @@ -50,15 +50,17 @@ def upgrade() -> None: sa.Column('id', sa.UUID(), nullable=False), sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False), sa.Column('height', sa.Integer(), nullable=False), - sa.Column('completed', sa.Boolean(), nullable=False), + # sa.Column('state', sa.Enum(name='transactionjobstate'), nullable=False), + sa.Column('state', sa.Enum('Requested', 'Signed', 'Sent', 'Mined', name='transactionjobstate'), nullable=False), sa.Column('request', dexorder.database.column_types._DataclassDict(astext_type=sa.Text()), nullable=False), sa.PrimaryKeyConstraint('id') ) op.create_index(op.f('ix_transactionjob_chain'), 'transactionjob', ['chain'], unique=False) - op.create_index(op.f('ix_transactionjob_completed'), 'transactionjob', ['completed'], unique=False) op.create_index(op.f('ix_transactionjob_height'), 'transactionjob', ['height'], unique=False) + op.create_index(op.f('ix_transactionjob_state'), 'transactionjob', ['state'], unique=False) op.create_table('tx', sa.Column('id', postgresql.BYTEA(), nullable=False), + sa.Column('data', postgresql.JSONB(astext_type=sa.Text()), nullable=False), sa.Column('job_id', sa.UUID(), nullable=False), sa.Column('receipt', postgresql.JSONB(astext_type=sa.Text()), nullable=True), sa.ForeignKeyConstraint(['job_id'], ['transactionjob.id'], ), @@ -73,3 +75,4 @@ def downgrade() -> None: op.drop_table('block') op.drop_table('tx') op.drop_table('transactionjob') + op.execute('drop type transactionjobstate') # enum type \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 5d926be..64f75ac 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,11 +1,11 @@ -sqlalchemy~=2.0.20 -alembic~=1.11.3 -omegaconf~=2.3.0 -web3==6.9.0 +sqlalchemy +alembic +omegaconf +web3 psycopg2-binary -orjson~=3.9.7 -sortedcontainers~=2.4.0 -hexbytes~=0.3.1 -defaultlist~=1.0.0 +orjson +sortedcontainers +hexbytes +defaultlist redis[hiredis] socket.io-emitter diff --git a/src/dexorder/base/__init__.py b/src/dexorder/base/__init__.py index e69de29..3d32c0e 100644 --- a/src/dexorder/base/__init__.py +++ b/src/dexorder/base/__init__.py @@ -0,0 +1,14 @@ +from typing import TypedDict, Union + +Address = str +Quantity = Union[str,int] + +TransactionDict = TypedDict( 'TransactionDict', { + 'from': Address, + 'to': Address, + 'gas': Quantity, + 'gasPrice': Quantity, + 'value': Quantity, + 'data': Union[bytes,str], + 'nonce': Quantity, +}) diff --git a/src/dexorder/base/account.py b/src/dexorder/base/account.py index 82ade0a..201c5ef 100644 --- a/src/dexorder/base/account.py +++ b/src/dexorder/base/account.py @@ -47,7 +47,6 @@ class Account (LocalAccount): def __init__(self, local_account: LocalAccount, key_str, name: str): # todo chain_id? super().__init__(local_account._key_obj, local_account._publicapi) # from digging into the source code self.name = name - self.transaction_counter = 0 # used by GasHandler to detect when new transactions were fired self.key_str = key_str self.signing_middleware = construct_sign_and_send_raw_middleware(self) diff --git a/src/dexorder/base/chain.py b/src/dexorder/base/chain.py index 43edf98..86a4118 100644 --- a/src/dexorder/base/chain.py +++ b/src/dexorder/base/chain.py @@ -43,7 +43,7 @@ Goerli = Blockchain(5, 'Goerli') Polygon = Blockchain(137, 'Polygon') # POS not zkEVM Mumbai = Blockchain(80001, 'Mumbai') BSC = Blockchain(56, 'BSC') -Arbitrum = Blockchain(42161, 'Arbitrum', 10, batch_size=1000) # todo configure batch size... does it depend on log count? :( -Mock = Blockchain(1338, 'Mock', 10) +Arbitrum = Blockchain(42161, 'Arbitrum', 3, batch_size=1000) # todo configure batch size... does it depend on log count? :( +Mock = Blockchain(31337, 'Mock', 3) current_chain = ContextVar[Blockchain]('current_chain') diff --git a/src/dexorder/bin/main.py b/src/dexorder/bin/main.py index 2c9323f..a3d087e 100644 --- a/src/dexorder/bin/main.py +++ b/src/dexorder/bin/main.py @@ -2,7 +2,7 @@ import logging import sys from asyncio import CancelledError -from dexorder import db, config, Blockchain +from dexorder import db, config, Blockchain, blockchain from dexorder.base.chain import current_chain from dexorder.bin.executable import execute from dexorder.blockstate.blockdata import BlockData @@ -20,6 +20,7 @@ async def main(): log.setLevel(logging.DEBUG) parse_args() current_chain.set(Blockchain.get(config.chain)) + blockchain.connect() redis_state = None state = None if memcache: diff --git a/src/dexorder/blockchain/connection.py b/src/dexorder/blockchain/connection.py index f41336f..44b742f 100644 --- a/src/dexorder/blockchain/connection.py +++ b/src/dexorder/blockchain/connection.py @@ -35,7 +35,7 @@ def create_w3(rpc_url=None): return w3 -def create_w3_ws(ws_url=None): +async def create_w3_ws(ws_url=None) -> AsyncWeb3: """ this constructs a Web3 object but does NOT attach it to the context. consider using connect(...) instead this does *not* attach any signer to the w3. make sure to inject the proper middleware with Account.attach(w3) @@ -46,7 +46,7 @@ def create_w3_ws(ws_url=None): # assert all(w3.eth.chain_id == chain_id for w3 in self.w3s) # all rpc urls must be the same blockchain # self.w3iter = itertools.cycle(self.w3s) ws_provider = WebsocketProviderV2(resolve_ws_url(ws_url)) - w3 = AsyncWeb3.persistent_websocket(ws_provider) + w3 = await AsyncWeb3.persistent_websocket(ws_provider) w3.middleware_onion.remove('attrdict') # w3.middleware_onion.add(clean_input, 'clean_input') w3.eth.Contract = _make_contract(w3.eth) diff --git a/src/dexorder/blockstate/db_state.py b/src/dexorder/blockstate/db_state.py index fbc77b5..fd4bb31 100644 --- a/src/dexorder/blockstate/db_state.py +++ b/src/dexorder/blockstate/db_state.py @@ -31,8 +31,8 @@ class DbState(SeriesCollection): series = var.series2str(var.series) key = var.key2str(item) try: - height, blockhash = db.kv[f'root_block.{chain_id}'] - except: + height, blockhash = db.kv[f'root_block|{chain_id}'] + except Exception: return None fork = Fork([hexbytes(blockhash)], height=height) value = db.session.get(Entity, (chain_id, series, key)) @@ -68,13 +68,13 @@ class DbState(SeriesCollection): found.value = diff.value else: raise NotImplementedError - db.kv[f'root_block.{root_block.chain}'] = [root_block.height, root_block.hash] + db.kv[f'root_block|{root_block.chain}'] = [root_block.height, root_block.hash] # noinspection PyShadowingBuiltins def load(self) -> Optional[BlockState]: chain_id = current_chain.get().chain_id try: - height, hash = db.kv[f'root_block.{chain_id}'] + height, hash = db.kv[f'root_block|{chain_id}'] except (KeyError, ValueError): return None root_block = db.session.get(Block, dict(chain=chain_id, height=height, hash=hexbytes(hash))) diff --git a/src/dexorder/blockstate/state.py b/src/dexorder/blockstate/state.py index dd4b955..cc7a0c3 100644 --- a/src/dexorder/blockstate/state.py +++ b/src/dexorder/blockstate/state.py @@ -72,7 +72,7 @@ class BlockState: return self.fork(block) - def delete_block(self, block: Union[Block, Fork,bytes]): + def delete_block(self, block: Union[Block, Fork, bytes]): """ if there was an error during block processing, we need to remove the incomplete block data """ try: block = block.hash diff --git a/src/dexorder/configuration/load.py b/src/dexorder/configuration/load.py index 41e5117..461c25b 100644 --- a/src/dexorder/configuration/load.py +++ b/src/dexorder/configuration/load.py @@ -17,8 +17,7 @@ class ConfigException (Exception): def load_config(): - -# noinspection PyTypeChecker + # noinspection PyTypeChecker result:ConfigDict = OmegaConf.merge( schema, load_tokens(), @@ -51,12 +50,13 @@ def load_accounts(): def from_env(prefix='DEXORDER_'): - dotlist = [] + merge = {} for key, value in os.environ.items(): if key.startswith(prefix): key = key[len(prefix):].lower().replace('__','.') - dotlist.append(key+'='+value) - result = OmegaConf.from_dotlist(dotlist) + if key in schema: + merge[key] = value + result = OmegaConf.create(merge) try: OmegaConf.merge(schema, result) return result diff --git a/src/dexorder/contract/__init__.py b/src/dexorder/contract/__init__.py index 41b1e1a..c59f9a6 100644 --- a/src/dexorder/contract/__init__.py +++ b/src/dexorder/contract/__init__.py @@ -3,13 +3,13 @@ import json from eth_abi.codec import ABIDecoder, ABIEncoder from eth_abi.registry import registry as default_registry -from .. import current_w3 +from .. import current_w3 as _current_w3 abi_decoder = ABIDecoder(default_registry) abi_encoder = ABIEncoder(default_registry) from .abi import abis -from .contract_proxy import ContractProxy, Transaction +from .contract_proxy import ContractProxy from .pool_contract import UniswapV3Pool from .uniswap_contracts import uniswapV3 @@ -20,5 +20,5 @@ def get_contract_data(name): def get_contract_event(contract_name:str, event_name:str): - return getattr(current_w3.get().eth.contract(abi=get_contract_data(contract_name)['abi']).events, event_name)() + return getattr(_current_w3.get().eth.contract(abi=get_contract_data(contract_name)['abi']).events, event_name)() diff --git a/src/dexorder/contract/contract_proxy.py b/src/dexorder/contract/contract_proxy.py index 78fcea6..7479206 100644 --- a/src/dexorder/contract/contract_proxy.py +++ b/src/dexorder/contract/contract_proxy.py @@ -1,39 +1,16 @@ import json from typing import Optional -from web3.exceptions import TransactionNotFound +from eth_utils import keccak from web3.types import TxReceipt -from dexorder import Account, current_w3 +from dexorder import current_w3, Account from dexorder.base.account import current_account from dexorder.database.model.block import current_block +from dexorder.base import TransactionDict from dexorder.util import hexstr -class Transaction: - def __init__(self, account: Account, tx_id_bytes:bytes): - self.account = account - self.id_bytes = tx_id_bytes - self._id = None - self.receipt: Optional[TxReceipt] = None - - async def wait(self) -> TxReceipt: - if self.receipt is None: - self.receipt = await current_w3.get().eth.wait_for_transaction_receipt(self.id_bytes) - self.account.transaction_counter += 1 - return self.receipt - - @property - def id(self) -> str: - if self._id is None: - self._id = self.id_bytes.hex() - return self._id - - def __repr__(self): - receipt_status = 'IN_FLIGHT' if self.receipt is None else 'REVERTED' if self.receipt.status == 0 else self.receipt.blockNumber - return f'Transaction({self.id},{receipt_status})' - - def call_wrapper(func): async def f(*args, **kwargs): return await func(*args, **kwargs).call(block_identifier=hexstr(current_block.get().hash)) @@ -42,12 +19,15 @@ def call_wrapper(func): def transact_wrapper(func): async def f(*args, **kwargs): + w3 = current_w3.get() try: account = current_account.get() except LookupError: raise RuntimeError('Cannot invoke a transaction without setting an Account.') - return Transaction(account, await func(*args, **kwargs).transact()) - + tx = await func(*args, **kwargs).build_transaction() + tx['from'] = account.address + signed = w3.eth.account.sign_transaction(tx, private_key=account.key) + return ContractTransaction(signed) return f @@ -88,7 +68,7 @@ class ContractProxy: """ Calls the contract constructor transaction and waits to receive a transaction receipt. """ - tx: Transaction = self.transact.constructor(*args) + tx: ContractTransaction = self.transact.constructor(*args) receipt = tx.wait() self.address = receipt.contractAddress self._contracts.clear() @@ -105,3 +85,16 @@ class ContractProxy: def __repr__(self): addr = self.contract.address return f'{self._interface_name}({addr or ""})' + + +class ContractTransaction: + def __init__(self, rawtx: bytes): + self.data = rawtx + self.id_bytes = keccak(rawtx) + self.id = hexstr(self.id_bytes) + self.receipt: Optional[TxReceipt] = None + + def __repr__(self): + # todo this is from an old status system + receipt_status = 'IN_FLIGHT' if self.receipt is None else 'REVERTED' if self.receipt.status == 0 else self.receipt.blockNumber + return f'Transaction({self.id},{receipt_status})' diff --git a/src/dexorder/database/model/transaction.py b/src/dexorder/database/model/transaction.py index 7842ed3..c4f02c6 100644 --- a/src/dexorder/database/model/transaction.py +++ b/src/dexorder/database/model/transaction.py @@ -1,29 +1,46 @@ import logging +from enum import Enum from typing import Optional -from sqlalchemy import ForeignKey, UniqueConstraint +import sqlalchemy as sa +from sqlalchemy import ForeignKey +from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import mapped_column, Mapped, relationship from dexorder.base.order import TransactionRequest as TransactionRequestDict, deserialize_transaction_request +from dexorder.base import TransactionDict from dexorder.database.column import Dict, Bytes, UUID_PK, Blockchain, UUID from dexorder.database.column_types import DataclassDict from dexorder.database.model import Base log = logging.getLogger(__name__) +class TransactionJobState (Enum): + Requested = 'a' # request exists as a job but the tx has not been created and signed yet + Signed = 'n' # tx has been signed + Sent = 's' # tx has been delivered to a node + Mined = 'z' # mined on at least one fork, whether reverted or not. todo handle forks that didnt confirm: receipts are per-fork! + + +# noinspection PyProtectedMember +# TransactionJobStateColumnType = sa.Enum(*(e.value for e in TransactionJobState.__members__.values()), name='transactionjobstate') +TransactionJobStateColumnType = sa.Enum(TransactionJobState) + class TransactionJob (Base): id: Mapped[UUID_PK] chain: Mapped[Blockchain] = mapped_column(index=True) - height: Mapped[int] = mapped_column(index=True) # to be used for data rolloff and/or by Timescale - completed: Mapped[bool] = mapped_column(default=False, index=True) + height: Mapped[int] = mapped_column(index=True) # the height at which the job was created, to be used for timeout/ data rolloff and/or by Timescale + state: Mapped[TransactionJobState] = mapped_column(TransactionJobStateColumnType, default=TransactionJobState.Requested, index=True) request: Mapped[TransactionRequestDict] = mapped_column(DataclassDict(deserialize_transaction_request)) tx: Mapped[list["Transaction"]] = relationship(back_populates='job', uselist=False) + class Transaction (Base): __tablename__ = 'tx' # avoid the keyword "transaction" id: Mapped[Bytes] = mapped_column(primary_key=True) + data: Mapped[Bytes] # the signed tx data job_id: Mapped[UUID] = mapped_column(ForeignKey("transactionjob.id")) job: Mapped[TransactionJob] = relationship(back_populates='tx', single_parent=True) - receipt: Mapped[Optional[Dict]] + receipt: Mapped[Optional[Dict]] # todo handle forks that didnt confirm: receipts are per-fork! diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index 98cd38e..7852c19 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -5,8 +5,8 @@ from web3.types import EventData from dexorder import current_pub, db from dexorder.base.chain import current_chain -from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, OrderKey -from dexorder.blockchain.transaction import handle_transactions, submit_transaction +from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request +from dexorder.transaction import handle_create_transactions, submit_transaction_request, handle_transaction_receipts from dexorder.blockchain.uniswap import uniswap_price from dexorder.contract.dexorder import get_factory_contract, vault_address, VaultContract, get_dexorder_contract from dexorder.contract import UniswapV3Pool, get_contract_event @@ -17,7 +17,6 @@ from dexorder.order.orderlib import SwapOrderState, SwapOrderStatus from dexorder.order.orderstate import Order from dexorder.order.triggers import OrderTriggers, close_order_and_disable_triggers, price_triggers, time_triggers, \ unconstrained_price_triggers, execution_requests, inflight_execution_requests -from dexorder.util import hexbytes log = logging.getLogger(__name__) @@ -57,6 +56,9 @@ def setup_logevent_triggers(runner): else: executions = dexorder.events.DexorderExecutions() + # + # THIS IS BASICALLY THE MAIN RUN LOOP FOR EVERY BLOCK + # runner.add_event_trigger(handle_vault_created, vault_created) runner.add_event_trigger(handle_order_placed, get_contract_event('OrderLib', 'DexorderSwapPlaced')) @@ -65,11 +67,12 @@ def setup_logevent_triggers(runner): runner.add_event_trigger(handle_swap_filled, get_contract_event('OrderLib', 'DexorderSwapFilled')) runner.add_event_trigger(handle_order_completed, get_contract_event('OrderLib', 'DexorderSwapCompleted')) runner.add_event_trigger(handle_order_error, get_contract_event('OrderLib', 'DexorderSwapError')) + runner.add_event_trigger(handle_transaction_receipts) runner.add_event_trigger(handle_dexorderexecutions, executions) runner.add_event_trigger(activate_time_triggers) runner.add_event_trigger(activate_price_triggers) runner.add_event_trigger(process_execution_requests) - runner.add_event_trigger(handle_transactions) + runner.add_event_trigger(handle_create_transactions) async def handle_order_placed(event: EventData): @@ -190,7 +193,7 @@ async def process_execution_requests(): # todo batch execution for tk, er in execs: log.info(f'executing tranche {tk}') - job = submit_transaction(new_tranche_execution_request(tk, er.proof)) + job = submit_transaction_request(new_tranche_execution_request(tk, er.proof)) inflight_execution_requests[tk] = height log.info(f'executing tranche {tk} with job {job.id}') @@ -215,7 +218,6 @@ def handle_dexorderexecutions(event: EventData): def finish_execution_request(req: TrancheExecutionRequest, error: str): order = Order.of(req.vault, req.order_index) tk = TrancheKey(req.vault, req.order_index, req.tranche_index) - del inflight_execution_requests[tk] # no longer in-flight if error != '': log.debug(f'execution request for tranche {tk} had error "{error}"') if error == '': diff --git a/src/dexorder/order/executionhandler.py b/src/dexorder/order/executionhandler.py index 2c477d8..46782d7 100644 --- a/src/dexorder/order/executionhandler.py +++ b/src/dexorder/order/executionhandler.py @@ -1,10 +1,11 @@ import logging from uuid import UUID -from dexorder.base.order import TrancheExecutionRequest -from dexorder.blockchain.transaction import TransactionHandler +from dexorder.base.order import TrancheExecutionRequest, TrancheKey +from dexorder.transaction import TransactionHandler from dexorder.contract.dexorder import get_dexorder_contract from dexorder.database.model.transaction import TransactionJob +from dexorder.order.triggers import inflight_execution_requests log = logging.getLogger(__name__) @@ -12,11 +13,17 @@ class TrancheExecutionHandler (TransactionHandler): def __init__(self): super().__init__('te') - async def send_transaction(self, job_id: UUID, ter: TrancheExecutionRequest) -> dict: - return await get_dexorder_contract().transact.execute(job_id.bytes, (ter.vault, ter.order_index, ter.tranche_index, ter.price_proof)) + async def build_transaction(self, job_id: UUID, req: TrancheExecutionRequest) -> dict: + # noinspection PyBroadException + try: + return await get_dexorder_contract().transact.execute(job_id.bytes, (req.vault, req.order_index, req.tranche_index, req.price_proof)) + except Exception: + log.exception(f'Could not send execution request {req}') async def complete_transaction(self, job: TransactionJob) -> None: - # anything to do? - pass + req: TrancheExecutionRequest = job.request + tk = TrancheKey(req.vault, req.order_index, req.tranche_index) + del inflight_execution_requests[tk] # no longer in-flight + TrancheExecutionHandler() # map 'te' to a TrancheExecutionHandler diff --git a/src/dexorder/order/triggers.py b/src/dexorder/order/triggers.py index e9e7735..243ea24 100644 --- a/src/dexorder/order/triggers.py +++ b/src/dexorder/order/triggers.py @@ -1,13 +1,12 @@ import logging from enum import Enum -from typing import Callable, Optional +from typing import Callable from dexorder.blockstate import BlockSet, BlockDict from .orderlib import TimeConstraint, LimitConstraint, ConstraintMode, SwapOrderState, PriceProof from dexorder.util import defaultdictk from .orderstate import Order -from ..base.order import OrderKey, TrancheKey, new_tranche_execution_request, ExecutionRequest -from ..blockchain.transaction import submit_transaction +from ..base.order import OrderKey, TrancheKey, ExecutionRequest from ..database.model.block import current_block log = logging.getLogger(__name__) @@ -20,6 +19,8 @@ PriceTrigger = Callable[[int], None] # func(pool_price) price_triggers:dict[str, BlockSet[PriceTrigger]] = defaultdictk(lambda addr:BlockSet(f'pt|{addr}')) # different BlockSet per pool address unconstrained_price_triggers: BlockSet[PriceTrigger] = BlockSet('upt') # tranches with no price constraints, whose time constraint is fulfilled execution_requests:BlockDict[TrancheKey, ExecutionRequest] = BlockDict('e') # value is block height when the request was placed + +# todo should this really be blockdata? inflight_execution_requests:BlockDict[TrancheKey, int] = BlockDict('ei') # value is block height when the request was sent def intersect_ranges( a_low, a_high, b_low, b_high): diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index 001ce00..cae3b67 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -1,11 +1,13 @@ +import asyncio import logging -from typing import Callable, Union, Any, Iterable, Optional +from asyncio import Queue +from typing import Callable, Union, Any, Iterable from web3.contract.contract import ContractEvents from web3.exceptions import LogTopicError, MismatchedABI +from websockets.exceptions import ConnectionClosedError -from dexorder import Blockchain, db, blockchain, NARG, current_pub, Account -from dexorder.base.account import current_account +from dexorder import Blockchain, db, blockchain, current_pub, async_yield, current_w3 from dexorder.base.chain import current_chain from dexorder.base.fork import current_fork, Fork from dexorder.blockchain.connection import create_w3_ws @@ -14,7 +16,6 @@ from dexorder.blockstate.diff import DiffEntryItem from dexorder.database.model import Block from dexorder.database.model.block import current_block, latest_block from dexorder.event_handler import setup_logevent_triggers -from dexorder.order.triggers import time_triggers from dexorder.util import hexstr, topic from dexorder.util.async_util import maywait @@ -41,6 +42,19 @@ class BlockStateRunner: self.publish_all: Callable[[Iterable[tuple[str,str,Any]]],None] = publish_all + self.queue: Queue = Queue() + + self.running = False + + + def add_event_trigger(self, callback: Callable[[dict], None], event: ContractEvents = None, log_filter: Union[dict, str] = None): + """ + if event is None, the callback is still invoked in the series of log handlers but with no argument instead of logs + """ + if log_filter is None and event is not None: + log_filter = {'topics': [topic(event.abi)]} + self.events.append((callback, event, log_filter)) + async def run(self): """ @@ -65,123 +79,162 @@ class BlockStateRunner: 15. on tx confirmation, the block height of all executed trigger requests is set to the tx block """ - w3 = blockchain.connect() - w3ws = create_w3_ws() + self.running = True + w3ws = await create_w3_ws() chain_id = await w3ws.eth.chain_id chain = Blockchain.for_id(chain_id) current_chain.set(chain) setup_logevent_triggers(self) + _worker_task = asyncio.create_task(self.worker()) - state = self.state + while self.running: + try: + async with w3ws as w3ws: + await w3ws.provider.connect() + subscription = await w3ws.eth.subscribe('newHeads') # the return value of this call is not consistent between anvil/hardhat/rpc. do not use it. + log.debug(f'subscribed to newHeads {subscription}') + while self.running: + async for message in w3ws.ws.listen_to_websocket(): + head = message['result'] + log.debug(f'detected new block {head["number"]} {hexstr(head["hash"])}') + await self.queue.put(head) + if not self.running: + break + except (ConnectionClosedError, TimeoutError): + pass + finally: + try: + await w3ws.provider.disconnect() + except Exception: + pass + await async_yield() - async with w3ws as w3ws: - await w3ws.eth.subscribe('newHeads') - while True: - async for head in w3ws.listen_to_websocket(): - session = None - fork = None - try: - log.debug(f'head {head["hash"]}') - # block_data = await w3.eth.get_block(head['hash'], True) - block_data = (await w3.provider.make_request('eth_getBlockByHash',[hexstr(head['hash']),False]))['result'] - block = Block(chain=chain_id, height=int(block_data['number'],0), - hash=bytes.fromhex(block_data['hash'][2:]), parent=bytes.fromhex(block_data['parentHash'][2:]), data=block_data) - latest_block.set(block) - fork = NARG - if state is None: - # initialize - state = BlockState(block) - current_blockstate.set(state) - log.info('Created new empty root state') - fork = Fork([block.hash], height=block.height) + + async def worker(self): + log.debug(f'runner worker started {self.running}') + w3 = current_w3.get() + chain = current_chain.get() + assert chain.chain_id == await w3.eth.chain_id + while self.running: + try: + async with asyncio.timeout(1): # check running flag every second + head = await self.queue.get() + log.debug(f'got head {hexstr(head["hash"])}') + except TimeoutError: + pass + else: + try: + await self.handle_head(chain, head, w3) + except Exception as x: + log.exception(x) + + + async def handle_head(self, chain, head, w3): + log.debug(f'processing block {head["number"]} {hexstr(head["hash"])}') + chain_id = chain.chain_id + session = None + blockhash = None + try: + blockhash = hexstr(head["hash"]) + if self.state is not None and blockhash in self.state.by_hash: + return + # block_data = await w3.eth.get_block(head['hash'], True) + response = await w3.provider.make_request('eth_getBlockByHash', [blockhash, False]) + block_data = response['result'] + if block_data is None: + log.warning(f'block data for {blockhash} was None') + return # todo get block when hardhat stops responding to getBlockByHash + block = Block(chain=chain_id, height=int(block_data['number'], 0), + hash=bytes.fromhex(block_data['hash'][2:]), parent=bytes.fromhex(block_data['parentHash'][2:]), data=block_data) + latest_block.set(block) + if self.state is None: + # initialize + self.state = BlockState(block) + current_blockstate.set(self.state) + log.info('Created new empty root state') + fork = Fork([block.hash], height=block.height) + else: + fork = self.state.add_block(block) + if fork is None: + log.debug(f'discarded late-arriving head {block}') + else: + batches = [] + if fork.disjoint: + # backfill batches + for callback, event, log_filter in self.events: + if event is None: + batches.append(None) else: - fork = state.add_block(block) - if fork is None: - log.debug(f'discarded late-arriving head {block}') + from_height = self.state.root_block.height + 1 + end_height = block.height + while from_height <= end_height: + to_height = min(end_height, from_height + chain.batch_size - 1) + lf = dict(log_filter) + lf['fromBlock'] = from_height + lf['toBlock'] = to_height + log.debug(f'batch backfill {from_height} - {to_height}') + batches.append((w3.eth.get_logs(lf), callback, event, lf)) + from_height += chain.batch_size + else: + # event callbacks are triggered in the order in which they're registered. the events passed to + # each callback are in block transaction order + for callback, event, log_filter in self.events: + if log_filter is None: + batches.append((None, callback, event, None)) else: - batches = [] - if fork.disjoint: - # backfill batches - for callback, event, log_filter in self.events: - if event is None: - batches.append(None) - else: - from_height = state.root_block.height + 1 - end_height = block.height - while from_height <= end_height: - to_height = min(end_height, from_height + chain.batch_size - 1) - lf = dict(log_filter) - lf['fromBlock'] = from_height - lf['toBlock'] = to_height - log.debug(f'batch backfill {from_height} - {to_height}') - batches.append((w3.eth.get_logs(lf), callback, event, lf)) - from_height += chain.batch_size - else: - # event callbacks are triggered in the order in which they're registered. the events passed to - # each callback are in block transaction order - for callback, event, log_filter in self.events: - if log_filter is None: - batches.append((None, callback, event, None)) - else: - lf = dict(log_filter) - lf['blockHash'] = hexstr(block.hash) - batches.append((w3.eth.get_logs(lf), callback, event, log_filter)) + # todo use head['logsBloom'] to skip unnecessary log queries + lf = dict(log_filter) + lf['blockHash'] = hexstr(block.hash) + batches.append((w3.eth.get_logs(lf), callback, event, log_filter)) - # set up for callbacks - current_block.set(block) - current_fork.set(fork) - session = db.session - session.begin() - session.add(block) - pubs = [] - current_pub.set(lambda room, evnt, *args: pubs.append((room, evnt, args))) - # logevent callbacks - for future,callback,event,filter_args in batches: - if future is None: - await maywait(callback()) # non-log callback - else: - log_events = await future - for log_event in log_events: - try: - parsed = event.process_log(log_event) if event is not None else log_event - except (LogTopicError, MismatchedABI): - pass - else: - # todo try/except for known retryable errors - await maywait(callback(parsed)) - - # todo check for reorg and generate a reorg diff list - diff_items = state.diffs_by_hash[block.hash] - for callback in self.on_head_update: - await maywait(callback(block, diff_items)) - - # check for root promotion - promotion_height = fork.height - chain.confirms - if not fork.disjoint and promotion_height > state.root_block.height and (new_root_fork := fork.for_height(promotion_height)): - diff_items = state.promote_root(new_root_fork) - for callback in self.on_promotion: - # todo try/except for known retryable errors - callback(state.root_block, diff_items) - - if pubs and self.publish_all: - await maywait(self.publish_all(pubs)) - except: - if session is not None: - session.rollback() - if fork is not None: - state.delete_block(fork) - raise + # set up for callbacks + current_block.set(block) + current_fork.set(fork) + session = db.session + session.begin() + session.add(block) + pubs = [] + current_pub.set(lambda room, evnt, *args: pubs.append((room, evnt, args))) + # logevent callbacks + for future, callback, event, filter_args in batches: + if future is None: + await maywait(callback()) # non-log callback else: - if session is not None: - session.commit() - log.info(f'completed block {block}') + log_events = await future + for log_event in log_events: + try: + parsed = event.process_log(log_event) if event is not None else log_event + except (LogTopicError, MismatchedABI): + pass + else: + # todo try/except for known retryable errors + await maywait(callback(parsed)) - def add_event_trigger(self, callback: Callable[[dict], None], event: ContractEvents = None, log_filter: Union[dict, str] = None): - """ - if event is None, the callback is still invoked in the series of log handlers but with no argument instead of logs - """ - if log_filter is None and event is not None: - log_filter = {'topics': [topic(event.abi)]} - self.events.append((callback, event, log_filter)) + # todo check for reorg and generate a reorg diff list + diff_items = self.state.diffs_by_hash[block.hash] + for callback in self.on_head_update: + await maywait(callback(block, diff_items)) + + # check for root promotion + promotion_height = fork.height - chain.confirms + if not fork.disjoint and promotion_height > self.state.root_block.height and ( + new_root_fork := fork.for_height(promotion_height)): + diff_items = self.state.promote_root(new_root_fork) + for callback in self.on_promotion: + # todo try/except for known retryable errors + callback(self.state.root_block, diff_items) + + if pubs and self.publish_all: + await maywait(self.publish_all(pubs)) + except: # legitimately catch EVERYTHING because we re-raise + if session is not None: + session.rollback() + if blockhash is not None and self.state is not None: + self.state.delete_block(blockhash) + raise + else: + if session is not None: + session.commit() + log.info(f'completed block {block}') diff --git a/src/dexorder/blockchain/transaction.py b/src/dexorder/transaction.py similarity index 51% rename from src/dexorder/blockchain/transaction.py rename to src/dexorder/transaction.py index 7a8f57b..3ee977d 100644 --- a/src/dexorder/blockchain/transaction.py +++ b/src/dexorder/transaction.py @@ -1,15 +1,18 @@ import logging from abc import abstractmethod +from asyncio import Queue +from contextvars import ContextVar from uuid import uuid4 from web3.exceptions import TransactionNotFound from dexorder import db, current_w3 +from dexorder.base import TransactionDict from dexorder.base.chain import current_chain from dexorder.base.order import TransactionRequest -from dexorder.contract import Transaction +from dexorder.contract.contract_proxy import ContractTransaction from dexorder.database.model.block import current_block -from dexorder.database.model.transaction import TransactionJob, Transaction as DbTransaction +from dexorder.database.model.transaction import TransactionJob, Transaction as DbTransaction, TransactionJobState log = logging.getLogger(__name__) @@ -25,26 +28,34 @@ class TransactionHandler: TransactionHandler.instances[tag] = self @abstractmethod - async def send_transaction(self, job_id: int, tr: TransactionRequest) -> dict: ... + async def build_transaction(self, job_id: int, tr: TransactionRequest) -> ContractTransaction: ... @abstractmethod async def complete_transaction(self, job: TransactionJob) -> None: ... -def submit_transaction(tr: TransactionRequest): - job = TransactionJob(id=uuid4(), chain=current_chain.get(), height=current_block.get().height, completed=False, request=tr) +class TransactionSender: + def __init__(self): + self.queue = Queue[TransactionDict]() + def run(self): + while True: + pass # todo + +current_transaction_sender: ContextVar[TransactionSender] = ContextVar('current_transaction_sender') + + +def submit_transaction_request(tr: TransactionRequest): + job = TransactionJob(id=uuid4(), chain=current_chain.get(), height=current_block.get().height, state=TransactionJobState.Requested, request=tr) db.session.add(job) return job -async def handle_transactions(): + +async def handle_create_transactions(): for job in db.session.query(TransactionJob).filter( TransactionJob.chain == current_chain.get(), - TransactionJob.completed == False + TransactionJob.state == TransactionJobState.Requested ): - if not job.tx: - await create_transaction(job) - if job.tx and not job.tx.receipt: - await check_receipt(job) + await create_transaction(job) async def create_transaction(job: TransactionJob): @@ -54,11 +65,34 @@ async def create_transaction(job: TransactionJob): # todo remove bad request? log.warning(f'ignoring transaction request with bad type "{job.request.type}": {",".join(TransactionHandler.instances.keys())}') else: - # noinspection PyTypeChecker - tx: Transaction = await handler.send_transaction(job.id, job.request) - dbtx = DbTransaction(id=tx.id_bytes, job=job, receipt=None) + ctx: ContractTransaction = await handler.build_transaction(job.id, job.request) + if ctx is None: + log.warning(f'unable to build transaction for job {job.id}') + return + job.state = TransactionJobState.Signed # todo lazy signing + dbtx = DbTransaction(id=ctx.id_bytes, job=job, data=ctx.data, receipt=None) db.session.add(dbtx) - log.info(f'servicing transaction request {job.request.__class__.__name__} {job.id} with tx {tx}') + log.info(f'servicing transaction request {job.request.__class__.__name__} {job.id} with tx {ctx.id}') + + +async def handle_send_transactions(): + w3 = current_w3.get() + for job in db.session.query(TransactionJob).filter( + TransactionJob.chain == current_chain.get(), + TransactionJob.state == TransactionJobState.Signed + ): + sent = await w3.eth.send_raw_transaction(job.tx.data) + assert sent == job.tx.id + job.state = TransactionJobState.Sent + + +async def handle_transaction_receipts(): + for job in db.session.query(TransactionJob).filter( + TransactionJob.chain == current_chain.get(), + TransactionJob.state == TransactionJobState.Sent, + ): + if job.tx and not job.tx.receipt: + await check_receipt(job) async def check_receipt(job: TransactionJob): @@ -71,7 +105,7 @@ async def check_receipt(job: TransactionJob): pass else: job.tx.receipt = receipt - job.completed = True + job.state = TransactionJobState.Mined try: handler = TransactionHandler.of(job.request.type) except KeyError: