reworked to optionally use Hardhat in mock; chain id 31337; refactored TransactionJob management; execute() mostly commented out for minimalism

This commit is contained in:
Tim Olson
2023-10-26 17:05:20 -04:00
parent 577efe77d6
commit 062085a79f
18 changed files with 328 additions and 204 deletions

View File

@@ -50,15 +50,17 @@ def upgrade() -> None:
sa.Column('id', sa.UUID(), nullable=False), sa.Column('id', sa.UUID(), nullable=False),
sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False), sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False),
sa.Column('height', sa.Integer(), nullable=False), sa.Column('height', sa.Integer(), nullable=False),
sa.Column('completed', sa.Boolean(), nullable=False), # sa.Column('state', sa.Enum(name='transactionjobstate'), nullable=False),
sa.Column('state', sa.Enum('Requested', 'Signed', 'Sent', 'Mined', name='transactionjobstate'), nullable=False),
sa.Column('request', dexorder.database.column_types._DataclassDict(astext_type=sa.Text()), nullable=False), sa.Column('request', dexorder.database.column_types._DataclassDict(astext_type=sa.Text()), nullable=False),
sa.PrimaryKeyConstraint('id') sa.PrimaryKeyConstraint('id')
) )
op.create_index(op.f('ix_transactionjob_chain'), 'transactionjob', ['chain'], unique=False) op.create_index(op.f('ix_transactionjob_chain'), 'transactionjob', ['chain'], unique=False)
op.create_index(op.f('ix_transactionjob_completed'), 'transactionjob', ['completed'], unique=False)
op.create_index(op.f('ix_transactionjob_height'), 'transactionjob', ['height'], unique=False) op.create_index(op.f('ix_transactionjob_height'), 'transactionjob', ['height'], unique=False)
op.create_index(op.f('ix_transactionjob_state'), 'transactionjob', ['state'], unique=False)
op.create_table('tx', op.create_table('tx',
sa.Column('id', postgresql.BYTEA(), nullable=False), sa.Column('id', postgresql.BYTEA(), nullable=False),
sa.Column('data', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.Column('job_id', sa.UUID(), nullable=False), sa.Column('job_id', sa.UUID(), nullable=False),
sa.Column('receipt', postgresql.JSONB(astext_type=sa.Text()), nullable=True), sa.Column('receipt', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.ForeignKeyConstraint(['job_id'], ['transactionjob.id'], ), sa.ForeignKeyConstraint(['job_id'], ['transactionjob.id'], ),
@@ -73,3 +75,4 @@ def downgrade() -> None:
op.drop_table('block') op.drop_table('block')
op.drop_table('tx') op.drop_table('tx')
op.drop_table('transactionjob') op.drop_table('transactionjob')
op.execute('drop type transactionjobstate') # enum type

View File

@@ -1,11 +1,11 @@
sqlalchemy~=2.0.20 sqlalchemy
alembic~=1.11.3 alembic
omegaconf~=2.3.0 omegaconf
web3==6.9.0 web3
psycopg2-binary psycopg2-binary
orjson~=3.9.7 orjson
sortedcontainers~=2.4.0 sortedcontainers
hexbytes~=0.3.1 hexbytes
defaultlist~=1.0.0 defaultlist
redis[hiredis] redis[hiredis]
socket.io-emitter socket.io-emitter

View File

@@ -0,0 +1,14 @@
from typing import TypedDict, Union
Address = str
Quantity = Union[str,int]
TransactionDict = TypedDict( 'TransactionDict', {
'from': Address,
'to': Address,
'gas': Quantity,
'gasPrice': Quantity,
'value': Quantity,
'data': Union[bytes,str],
'nonce': Quantity,
})

View File

@@ -47,7 +47,6 @@ class Account (LocalAccount):
def __init__(self, local_account: LocalAccount, key_str, name: str): # todo chain_id? def __init__(self, local_account: LocalAccount, key_str, name: str): # todo chain_id?
super().__init__(local_account._key_obj, local_account._publicapi) # from digging into the source code super().__init__(local_account._key_obj, local_account._publicapi) # from digging into the source code
self.name = name self.name = name
self.transaction_counter = 0 # used by GasHandler to detect when new transactions were fired
self.key_str = key_str self.key_str = key_str
self.signing_middleware = construct_sign_and_send_raw_middleware(self) self.signing_middleware = construct_sign_and_send_raw_middleware(self)

View File

@@ -43,7 +43,7 @@ Goerli = Blockchain(5, 'Goerli')
Polygon = Blockchain(137, 'Polygon') # POS not zkEVM Polygon = Blockchain(137, 'Polygon') # POS not zkEVM
Mumbai = Blockchain(80001, 'Mumbai') Mumbai = Blockchain(80001, 'Mumbai')
BSC = Blockchain(56, 'BSC') BSC = Blockchain(56, 'BSC')
Arbitrum = Blockchain(42161, 'Arbitrum', 10, batch_size=1000) # todo configure batch size... does it depend on log count? :( Arbitrum = Blockchain(42161, 'Arbitrum', 3, batch_size=1000) # todo configure batch size... does it depend on log count? :(
Mock = Blockchain(1338, 'Mock', 10) Mock = Blockchain(31337, 'Mock', 3)
current_chain = ContextVar[Blockchain]('current_chain') current_chain = ContextVar[Blockchain]('current_chain')

View File

@@ -2,7 +2,7 @@ import logging
import sys import sys
from asyncio import CancelledError from asyncio import CancelledError
from dexorder import db, config, Blockchain from dexorder import db, config, Blockchain, blockchain
from dexorder.base.chain import current_chain from dexorder.base.chain import current_chain
from dexorder.bin.executable import execute from dexorder.bin.executable import execute
from dexorder.blockstate.blockdata import BlockData from dexorder.blockstate.blockdata import BlockData
@@ -20,6 +20,7 @@ async def main():
log.setLevel(logging.DEBUG) log.setLevel(logging.DEBUG)
parse_args() parse_args()
current_chain.set(Blockchain.get(config.chain)) current_chain.set(Blockchain.get(config.chain))
blockchain.connect()
redis_state = None redis_state = None
state = None state = None
if memcache: if memcache:

View File

@@ -35,7 +35,7 @@ def create_w3(rpc_url=None):
return w3 return w3
def create_w3_ws(ws_url=None): async def create_w3_ws(ws_url=None) -> AsyncWeb3:
""" """
this constructs a Web3 object but does NOT attach it to the context. consider using connect(...) instead this constructs a Web3 object but does NOT attach it to the context. consider using connect(...) instead
this does *not* attach any signer to the w3. make sure to inject the proper middleware with Account.attach(w3) this does *not* attach any signer to the w3. make sure to inject the proper middleware with Account.attach(w3)
@@ -46,7 +46,7 @@ def create_w3_ws(ws_url=None):
# assert all(w3.eth.chain_id == chain_id for w3 in self.w3s) # all rpc urls must be the same blockchain # assert all(w3.eth.chain_id == chain_id for w3 in self.w3s) # all rpc urls must be the same blockchain
# self.w3iter = itertools.cycle(self.w3s) # self.w3iter = itertools.cycle(self.w3s)
ws_provider = WebsocketProviderV2(resolve_ws_url(ws_url)) ws_provider = WebsocketProviderV2(resolve_ws_url(ws_url))
w3 = AsyncWeb3.persistent_websocket(ws_provider) w3 = await AsyncWeb3.persistent_websocket(ws_provider)
w3.middleware_onion.remove('attrdict') w3.middleware_onion.remove('attrdict')
# w3.middleware_onion.add(clean_input, 'clean_input') # w3.middleware_onion.add(clean_input, 'clean_input')
w3.eth.Contract = _make_contract(w3.eth) w3.eth.Contract = _make_contract(w3.eth)

View File

@@ -31,8 +31,8 @@ class DbState(SeriesCollection):
series = var.series2str(var.series) series = var.series2str(var.series)
key = var.key2str(item) key = var.key2str(item)
try: try:
height, blockhash = db.kv[f'root_block.{chain_id}'] height, blockhash = db.kv[f'root_block|{chain_id}']
except: except Exception:
return None return None
fork = Fork([hexbytes(blockhash)], height=height) fork = Fork([hexbytes(blockhash)], height=height)
value = db.session.get(Entity, (chain_id, series, key)) value = db.session.get(Entity, (chain_id, series, key))
@@ -68,13 +68,13 @@ class DbState(SeriesCollection):
found.value = diff.value found.value = diff.value
else: else:
raise NotImplementedError raise NotImplementedError
db.kv[f'root_block.{root_block.chain}'] = [root_block.height, root_block.hash] db.kv[f'root_block|{root_block.chain}'] = [root_block.height, root_block.hash]
# noinspection PyShadowingBuiltins # noinspection PyShadowingBuiltins
def load(self) -> Optional[BlockState]: def load(self) -> Optional[BlockState]:
chain_id = current_chain.get().chain_id chain_id = current_chain.get().chain_id
try: try:
height, hash = db.kv[f'root_block.{chain_id}'] height, hash = db.kv[f'root_block|{chain_id}']
except (KeyError, ValueError): except (KeyError, ValueError):
return None return None
root_block = db.session.get(Block, dict(chain=chain_id, height=height, hash=hexbytes(hash))) root_block = db.session.get(Block, dict(chain=chain_id, height=height, hash=hexbytes(hash)))

View File

@@ -72,7 +72,7 @@ class BlockState:
return self.fork(block) return self.fork(block)
def delete_block(self, block: Union[Block, Fork,bytes]): def delete_block(self, block: Union[Block, Fork, bytes]):
""" if there was an error during block processing, we need to remove the incomplete block data """ """ if there was an error during block processing, we need to remove the incomplete block data """
try: try:
block = block.hash block = block.hash

View File

@@ -17,8 +17,7 @@ class ConfigException (Exception):
def load_config(): def load_config():
# noinspection PyTypeChecker
# noinspection PyTypeChecker
result:ConfigDict = OmegaConf.merge( result:ConfigDict = OmegaConf.merge(
schema, schema,
load_tokens(), load_tokens(),
@@ -51,12 +50,13 @@ def load_accounts():
def from_env(prefix='DEXORDER_'): def from_env(prefix='DEXORDER_'):
dotlist = [] merge = {}
for key, value in os.environ.items(): for key, value in os.environ.items():
if key.startswith(prefix): if key.startswith(prefix):
key = key[len(prefix):].lower().replace('__','.') key = key[len(prefix):].lower().replace('__','.')
dotlist.append(key+'='+value) if key in schema:
result = OmegaConf.from_dotlist(dotlist) merge[key] = value
result = OmegaConf.create(merge)
try: try:
OmegaConf.merge(schema, result) OmegaConf.merge(schema, result)
return result return result

View File

@@ -3,13 +3,13 @@ import json
from eth_abi.codec import ABIDecoder, ABIEncoder from eth_abi.codec import ABIDecoder, ABIEncoder
from eth_abi.registry import registry as default_registry from eth_abi.registry import registry as default_registry
from .. import current_w3 from .. import current_w3 as _current_w3
abi_decoder = ABIDecoder(default_registry) abi_decoder = ABIDecoder(default_registry)
abi_encoder = ABIEncoder(default_registry) abi_encoder = ABIEncoder(default_registry)
from .abi import abis from .abi import abis
from .contract_proxy import ContractProxy, Transaction from .contract_proxy import ContractProxy
from .pool_contract import UniswapV3Pool from .pool_contract import UniswapV3Pool
from .uniswap_contracts import uniswapV3 from .uniswap_contracts import uniswapV3
@@ -20,5 +20,5 @@ def get_contract_data(name):
def get_contract_event(contract_name:str, event_name:str): def get_contract_event(contract_name:str, event_name:str):
return getattr(current_w3.get().eth.contract(abi=get_contract_data(contract_name)['abi']).events, event_name)() return getattr(_current_w3.get().eth.contract(abi=get_contract_data(contract_name)['abi']).events, event_name)()

View File

@@ -1,39 +1,16 @@
import json import json
from typing import Optional from typing import Optional
from web3.exceptions import TransactionNotFound from eth_utils import keccak
from web3.types import TxReceipt from web3.types import TxReceipt
from dexorder import Account, current_w3 from dexorder import current_w3, Account
from dexorder.base.account import current_account from dexorder.base.account import current_account
from dexorder.database.model.block import current_block from dexorder.database.model.block import current_block
from dexorder.base import TransactionDict
from dexorder.util import hexstr from dexorder.util import hexstr
class Transaction:
def __init__(self, account: Account, tx_id_bytes:bytes):
self.account = account
self.id_bytes = tx_id_bytes
self._id = None
self.receipt: Optional[TxReceipt] = None
async def wait(self) -> TxReceipt:
if self.receipt is None:
self.receipt = await current_w3.get().eth.wait_for_transaction_receipt(self.id_bytes)
self.account.transaction_counter += 1
return self.receipt
@property
def id(self) -> str:
if self._id is None:
self._id = self.id_bytes.hex()
return self._id
def __repr__(self):
receipt_status = 'IN_FLIGHT' if self.receipt is None else 'REVERTED' if self.receipt.status == 0 else self.receipt.blockNumber
return f'Transaction({self.id},{receipt_status})'
def call_wrapper(func): def call_wrapper(func):
async def f(*args, **kwargs): async def f(*args, **kwargs):
return await func(*args, **kwargs).call(block_identifier=hexstr(current_block.get().hash)) return await func(*args, **kwargs).call(block_identifier=hexstr(current_block.get().hash))
@@ -42,12 +19,15 @@ def call_wrapper(func):
def transact_wrapper(func): def transact_wrapper(func):
async def f(*args, **kwargs): async def f(*args, **kwargs):
w3 = current_w3.get()
try: try:
account = current_account.get() account = current_account.get()
except LookupError: except LookupError:
raise RuntimeError('Cannot invoke a transaction without setting an Account.') raise RuntimeError('Cannot invoke a transaction without setting an Account.')
return Transaction(account, await func(*args, **kwargs).transact()) tx = await func(*args, **kwargs).build_transaction()
tx['from'] = account.address
signed = w3.eth.account.sign_transaction(tx, private_key=account.key)
return ContractTransaction(signed)
return f return f
@@ -88,7 +68,7 @@ class ContractProxy:
""" """
Calls the contract constructor transaction and waits to receive a transaction receipt. Calls the contract constructor transaction and waits to receive a transaction receipt.
""" """
tx: Transaction = self.transact.constructor(*args) tx: ContractTransaction = self.transact.constructor(*args)
receipt = tx.wait() receipt = tx.wait()
self.address = receipt.contractAddress self.address = receipt.contractAddress
self._contracts.clear() self._contracts.clear()
@@ -105,3 +85,16 @@ class ContractProxy:
def __repr__(self): def __repr__(self):
addr = self.contract.address addr = self.contract.address
return f'{self._interface_name}({addr or ""})' return f'{self._interface_name}({addr or ""})'
class ContractTransaction:
def __init__(self, rawtx: bytes):
self.data = rawtx
self.id_bytes = keccak(rawtx)
self.id = hexstr(self.id_bytes)
self.receipt: Optional[TxReceipt] = None
def __repr__(self):
# todo this is from an old status system
receipt_status = 'IN_FLIGHT' if self.receipt is None else 'REVERTED' if self.receipt.status == 0 else self.receipt.blockNumber
return f'Transaction({self.id},{receipt_status})'

View File

@@ -1,29 +1,46 @@
import logging import logging
from enum import Enum
from typing import Optional from typing import Optional
from sqlalchemy import ForeignKey, UniqueConstraint import sqlalchemy as sa
from sqlalchemy import ForeignKey
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import mapped_column, Mapped, relationship from sqlalchemy.orm import mapped_column, Mapped, relationship
from dexorder.base.order import TransactionRequest as TransactionRequestDict, deserialize_transaction_request from dexorder.base.order import TransactionRequest as TransactionRequestDict, deserialize_transaction_request
from dexorder.base import TransactionDict
from dexorder.database.column import Dict, Bytes, UUID_PK, Blockchain, UUID from dexorder.database.column import Dict, Bytes, UUID_PK, Blockchain, UUID
from dexorder.database.column_types import DataclassDict from dexorder.database.column_types import DataclassDict
from dexorder.database.model import Base from dexorder.database.model import Base
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class TransactionJobState (Enum):
Requested = 'a' # request exists as a job but the tx has not been created and signed yet
Signed = 'n' # tx has been signed
Sent = 's' # tx has been delivered to a node
Mined = 'z' # mined on at least one fork, whether reverted or not. todo handle forks that didnt confirm: receipts are per-fork!
# noinspection PyProtectedMember
# TransactionJobStateColumnType = sa.Enum(*(e.value for e in TransactionJobState.__members__.values()), name='transactionjobstate')
TransactionJobStateColumnType = sa.Enum(TransactionJobState)
class TransactionJob (Base): class TransactionJob (Base):
id: Mapped[UUID_PK] id: Mapped[UUID_PK]
chain: Mapped[Blockchain] = mapped_column(index=True) chain: Mapped[Blockchain] = mapped_column(index=True)
height: Mapped[int] = mapped_column(index=True) # to be used for data rolloff and/or by Timescale height: Mapped[int] = mapped_column(index=True) # the height at which the job was created, to be used for timeout/ data rolloff and/or by Timescale
completed: Mapped[bool] = mapped_column(default=False, index=True) state: Mapped[TransactionJobState] = mapped_column(TransactionJobStateColumnType, default=TransactionJobState.Requested, index=True)
request: Mapped[TransactionRequestDict] = mapped_column(DataclassDict(deserialize_transaction_request)) request: Mapped[TransactionRequestDict] = mapped_column(DataclassDict(deserialize_transaction_request))
tx: Mapped[list["Transaction"]] = relationship(back_populates='job', uselist=False) tx: Mapped[list["Transaction"]] = relationship(back_populates='job', uselist=False)
class Transaction (Base): class Transaction (Base):
__tablename__ = 'tx' # avoid the keyword "transaction" __tablename__ = 'tx' # avoid the keyword "transaction"
id: Mapped[Bytes] = mapped_column(primary_key=True) id: Mapped[Bytes] = mapped_column(primary_key=True)
data: Mapped[Bytes] # the signed tx data
job_id: Mapped[UUID] = mapped_column(ForeignKey("transactionjob.id")) job_id: Mapped[UUID] = mapped_column(ForeignKey("transactionjob.id"))
job: Mapped[TransactionJob] = relationship(back_populates='tx', single_parent=True) job: Mapped[TransactionJob] = relationship(back_populates='tx', single_parent=True)
receipt: Mapped[Optional[Dict]] receipt: Mapped[Optional[Dict]] # todo handle forks that didnt confirm: receipts are per-fork!

View File

@@ -5,8 +5,8 @@ from web3.types import EventData
from dexorder import current_pub, db from dexorder import current_pub, db
from dexorder.base.chain import current_chain from dexorder.base.chain import current_chain
from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, OrderKey from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request
from dexorder.blockchain.transaction import handle_transactions, submit_transaction from dexorder.transaction import handle_create_transactions, submit_transaction_request, handle_transaction_receipts
from dexorder.blockchain.uniswap import uniswap_price from dexorder.blockchain.uniswap import uniswap_price
from dexorder.contract.dexorder import get_factory_contract, vault_address, VaultContract, get_dexorder_contract from dexorder.contract.dexorder import get_factory_contract, vault_address, VaultContract, get_dexorder_contract
from dexorder.contract import UniswapV3Pool, get_contract_event from dexorder.contract import UniswapV3Pool, get_contract_event
@@ -17,7 +17,6 @@ from dexorder.order.orderlib import SwapOrderState, SwapOrderStatus
from dexorder.order.orderstate import Order from dexorder.order.orderstate import Order
from dexorder.order.triggers import OrderTriggers, close_order_and_disable_triggers, price_triggers, time_triggers, \ from dexorder.order.triggers import OrderTriggers, close_order_and_disable_triggers, price_triggers, time_triggers, \
unconstrained_price_triggers, execution_requests, inflight_execution_requests unconstrained_price_triggers, execution_requests, inflight_execution_requests
from dexorder.util import hexbytes
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -57,6 +56,9 @@ def setup_logevent_triggers(runner):
else: else:
executions = dexorder.events.DexorderExecutions() executions = dexorder.events.DexorderExecutions()
#
# THIS IS BASICALLY THE MAIN RUN LOOP FOR EVERY BLOCK
#
runner.add_event_trigger(handle_vault_created, vault_created) runner.add_event_trigger(handle_vault_created, vault_created)
runner.add_event_trigger(handle_order_placed, get_contract_event('OrderLib', 'DexorderSwapPlaced')) runner.add_event_trigger(handle_order_placed, get_contract_event('OrderLib', 'DexorderSwapPlaced'))
@@ -65,11 +67,12 @@ def setup_logevent_triggers(runner):
runner.add_event_trigger(handle_swap_filled, get_contract_event('OrderLib', 'DexorderSwapFilled')) runner.add_event_trigger(handle_swap_filled, get_contract_event('OrderLib', 'DexorderSwapFilled'))
runner.add_event_trigger(handle_order_completed, get_contract_event('OrderLib', 'DexorderSwapCompleted')) runner.add_event_trigger(handle_order_completed, get_contract_event('OrderLib', 'DexorderSwapCompleted'))
runner.add_event_trigger(handle_order_error, get_contract_event('OrderLib', 'DexorderSwapError')) runner.add_event_trigger(handle_order_error, get_contract_event('OrderLib', 'DexorderSwapError'))
runner.add_event_trigger(handle_transaction_receipts)
runner.add_event_trigger(handle_dexorderexecutions, executions) runner.add_event_trigger(handle_dexorderexecutions, executions)
runner.add_event_trigger(activate_time_triggers) runner.add_event_trigger(activate_time_triggers)
runner.add_event_trigger(activate_price_triggers) runner.add_event_trigger(activate_price_triggers)
runner.add_event_trigger(process_execution_requests) runner.add_event_trigger(process_execution_requests)
runner.add_event_trigger(handle_transactions) runner.add_event_trigger(handle_create_transactions)
async def handle_order_placed(event: EventData): async def handle_order_placed(event: EventData):
@@ -190,7 +193,7 @@ async def process_execution_requests():
# todo batch execution # todo batch execution
for tk, er in execs: for tk, er in execs:
log.info(f'executing tranche {tk}') log.info(f'executing tranche {tk}')
job = submit_transaction(new_tranche_execution_request(tk, er.proof)) job = submit_transaction_request(new_tranche_execution_request(tk, er.proof))
inflight_execution_requests[tk] = height inflight_execution_requests[tk] = height
log.info(f'executing tranche {tk} with job {job.id}') log.info(f'executing tranche {tk} with job {job.id}')
@@ -215,7 +218,6 @@ def handle_dexorderexecutions(event: EventData):
def finish_execution_request(req: TrancheExecutionRequest, error: str): def finish_execution_request(req: TrancheExecutionRequest, error: str):
order = Order.of(req.vault, req.order_index) order = Order.of(req.vault, req.order_index)
tk = TrancheKey(req.vault, req.order_index, req.tranche_index) tk = TrancheKey(req.vault, req.order_index, req.tranche_index)
del inflight_execution_requests[tk] # no longer in-flight
if error != '': if error != '':
log.debug(f'execution request for tranche {tk} had error "{error}"') log.debug(f'execution request for tranche {tk} had error "{error}"')
if error == '': if error == '':

View File

@@ -1,10 +1,11 @@
import logging import logging
from uuid import UUID from uuid import UUID
from dexorder.base.order import TrancheExecutionRequest from dexorder.base.order import TrancheExecutionRequest, TrancheKey
from dexorder.blockchain.transaction import TransactionHandler from dexorder.transaction import TransactionHandler
from dexorder.contract.dexorder import get_dexorder_contract from dexorder.contract.dexorder import get_dexorder_contract
from dexorder.database.model.transaction import TransactionJob from dexorder.database.model.transaction import TransactionJob
from dexorder.order.triggers import inflight_execution_requests
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -12,11 +13,17 @@ class TrancheExecutionHandler (TransactionHandler):
def __init__(self): def __init__(self):
super().__init__('te') super().__init__('te')
async def send_transaction(self, job_id: UUID, ter: TrancheExecutionRequest) -> dict: async def build_transaction(self, job_id: UUID, req: TrancheExecutionRequest) -> dict:
return await get_dexorder_contract().transact.execute(job_id.bytes, (ter.vault, ter.order_index, ter.tranche_index, ter.price_proof)) # noinspection PyBroadException
try:
return await get_dexorder_contract().transact.execute(job_id.bytes, (req.vault, req.order_index, req.tranche_index, req.price_proof))
except Exception:
log.exception(f'Could not send execution request {req}')
async def complete_transaction(self, job: TransactionJob) -> None: async def complete_transaction(self, job: TransactionJob) -> None:
# anything to do? req: TrancheExecutionRequest = job.request
pass tk = TrancheKey(req.vault, req.order_index, req.tranche_index)
del inflight_execution_requests[tk] # no longer in-flight
TrancheExecutionHandler() # map 'te' to a TrancheExecutionHandler TrancheExecutionHandler() # map 'te' to a TrancheExecutionHandler

View File

@@ -1,13 +1,12 @@
import logging import logging
from enum import Enum from enum import Enum
from typing import Callable, Optional from typing import Callable
from dexorder.blockstate import BlockSet, BlockDict from dexorder.blockstate import BlockSet, BlockDict
from .orderlib import TimeConstraint, LimitConstraint, ConstraintMode, SwapOrderState, PriceProof from .orderlib import TimeConstraint, LimitConstraint, ConstraintMode, SwapOrderState, PriceProof
from dexorder.util import defaultdictk from dexorder.util import defaultdictk
from .orderstate import Order from .orderstate import Order
from ..base.order import OrderKey, TrancheKey, new_tranche_execution_request, ExecutionRequest from ..base.order import OrderKey, TrancheKey, ExecutionRequest
from ..blockchain.transaction import submit_transaction
from ..database.model.block import current_block from ..database.model.block import current_block
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -20,6 +19,8 @@ PriceTrigger = Callable[[int], None] # func(pool_price)
price_triggers:dict[str, BlockSet[PriceTrigger]] = defaultdictk(lambda addr:BlockSet(f'pt|{addr}')) # different BlockSet per pool address price_triggers:dict[str, BlockSet[PriceTrigger]] = defaultdictk(lambda addr:BlockSet(f'pt|{addr}')) # different BlockSet per pool address
unconstrained_price_triggers: BlockSet[PriceTrigger] = BlockSet('upt') # tranches with no price constraints, whose time constraint is fulfilled unconstrained_price_triggers: BlockSet[PriceTrigger] = BlockSet('upt') # tranches with no price constraints, whose time constraint is fulfilled
execution_requests:BlockDict[TrancheKey, ExecutionRequest] = BlockDict('e') # value is block height when the request was placed execution_requests:BlockDict[TrancheKey, ExecutionRequest] = BlockDict('e') # value is block height when the request was placed
# todo should this really be blockdata?
inflight_execution_requests:BlockDict[TrancheKey, int] = BlockDict('ei') # value is block height when the request was sent inflight_execution_requests:BlockDict[TrancheKey, int] = BlockDict('ei') # value is block height when the request was sent
def intersect_ranges( a_low, a_high, b_low, b_high): def intersect_ranges( a_low, a_high, b_low, b_high):

View File

@@ -1,11 +1,13 @@
import asyncio
import logging import logging
from typing import Callable, Union, Any, Iterable, Optional from asyncio import Queue
from typing import Callable, Union, Any, Iterable
from web3.contract.contract import ContractEvents from web3.contract.contract import ContractEvents
from web3.exceptions import LogTopicError, MismatchedABI from web3.exceptions import LogTopicError, MismatchedABI
from websockets.exceptions import ConnectionClosedError
from dexorder import Blockchain, db, blockchain, NARG, current_pub, Account from dexorder import Blockchain, db, blockchain, current_pub, async_yield, current_w3
from dexorder.base.account import current_account
from dexorder.base.chain import current_chain from dexorder.base.chain import current_chain
from dexorder.base.fork import current_fork, Fork from dexorder.base.fork import current_fork, Fork
from dexorder.blockchain.connection import create_w3_ws from dexorder.blockchain.connection import create_w3_ws
@@ -14,7 +16,6 @@ from dexorder.blockstate.diff import DiffEntryItem
from dexorder.database.model import Block from dexorder.database.model import Block
from dexorder.database.model.block import current_block, latest_block from dexorder.database.model.block import current_block, latest_block
from dexorder.event_handler import setup_logevent_triggers from dexorder.event_handler import setup_logevent_triggers
from dexorder.order.triggers import time_triggers
from dexorder.util import hexstr, topic from dexorder.util import hexstr, topic
from dexorder.util.async_util import maywait from dexorder.util.async_util import maywait
@@ -41,6 +42,19 @@ class BlockStateRunner:
self.publish_all: Callable[[Iterable[tuple[str,str,Any]]],None] = publish_all self.publish_all: Callable[[Iterable[tuple[str,str,Any]]],None] = publish_all
self.queue: Queue = Queue()
self.running = False
def add_event_trigger(self, callback: Callable[[dict], None], event: ContractEvents = None, log_filter: Union[dict, str] = None):
"""
if event is None, the callback is still invoked in the series of log handlers but with no argument instead of logs
"""
if log_filter is None and event is not None:
log_filter = {'topics': [topic(event.abi)]}
self.events.append((callback, event, log_filter))
async def run(self): async def run(self):
""" """
@@ -65,123 +79,162 @@ class BlockStateRunner:
15. on tx confirmation, the block height of all executed trigger requests is set to the tx block 15. on tx confirmation, the block height of all executed trigger requests is set to the tx block
""" """
w3 = blockchain.connect() self.running = True
w3ws = create_w3_ws()
w3ws = await create_w3_ws()
chain_id = await w3ws.eth.chain_id chain_id = await w3ws.eth.chain_id
chain = Blockchain.for_id(chain_id) chain = Blockchain.for_id(chain_id)
current_chain.set(chain) current_chain.set(chain)
setup_logevent_triggers(self) setup_logevent_triggers(self)
_worker_task = asyncio.create_task(self.worker())
state = self.state while self.running:
try:
async with w3ws as w3ws:
await w3ws.provider.connect()
subscription = await w3ws.eth.subscribe('newHeads') # the return value of this call is not consistent between anvil/hardhat/rpc. do not use it.
log.debug(f'subscribed to newHeads {subscription}')
while self.running:
async for message in w3ws.ws.listen_to_websocket():
head = message['result']
log.debug(f'detected new block {head["number"]} {hexstr(head["hash"])}')
await self.queue.put(head)
if not self.running:
break
except (ConnectionClosedError, TimeoutError):
pass
finally:
try:
await w3ws.provider.disconnect()
except Exception:
pass
await async_yield()
async with w3ws as w3ws:
await w3ws.eth.subscribe('newHeads') async def worker(self):
while True: log.debug(f'runner worker started {self.running}')
async for head in w3ws.listen_to_websocket(): w3 = current_w3.get()
session = None chain = current_chain.get()
fork = None assert chain.chain_id == await w3.eth.chain_id
try: while self.running:
log.debug(f'head {head["hash"]}') try:
# block_data = await w3.eth.get_block(head['hash'], True) async with asyncio.timeout(1): # check running flag every second
block_data = (await w3.provider.make_request('eth_getBlockByHash',[hexstr(head['hash']),False]))['result'] head = await self.queue.get()
block = Block(chain=chain_id, height=int(block_data['number'],0), log.debug(f'got head {hexstr(head["hash"])}')
hash=bytes.fromhex(block_data['hash'][2:]), parent=bytes.fromhex(block_data['parentHash'][2:]), data=block_data) except TimeoutError:
latest_block.set(block) pass
fork = NARG else:
if state is None: try:
# initialize await self.handle_head(chain, head, w3)
state = BlockState(block) except Exception as x:
current_blockstate.set(state) log.exception(x)
log.info('Created new empty root state')
fork = Fork([block.hash], height=block.height)
async def handle_head(self, chain, head, w3):
log.debug(f'processing block {head["number"]} {hexstr(head["hash"])}')
chain_id = chain.chain_id
session = None
blockhash = None
try:
blockhash = hexstr(head["hash"])
if self.state is not None and blockhash in self.state.by_hash:
return
# block_data = await w3.eth.get_block(head['hash'], True)
response = await w3.provider.make_request('eth_getBlockByHash', [blockhash, False])
block_data = response['result']
if block_data is None:
log.warning(f'block data for {blockhash} was None')
return # todo get block when hardhat stops responding to getBlockByHash
block = Block(chain=chain_id, height=int(block_data['number'], 0),
hash=bytes.fromhex(block_data['hash'][2:]), parent=bytes.fromhex(block_data['parentHash'][2:]), data=block_data)
latest_block.set(block)
if self.state is None:
# initialize
self.state = BlockState(block)
current_blockstate.set(self.state)
log.info('Created new empty root state')
fork = Fork([block.hash], height=block.height)
else:
fork = self.state.add_block(block)
if fork is None:
log.debug(f'discarded late-arriving head {block}')
else:
batches = []
if fork.disjoint:
# backfill batches
for callback, event, log_filter in self.events:
if event is None:
batches.append(None)
else: else:
fork = state.add_block(block) from_height = self.state.root_block.height + 1
if fork is None: end_height = block.height
log.debug(f'discarded late-arriving head {block}') while from_height <= end_height:
to_height = min(end_height, from_height + chain.batch_size - 1)
lf = dict(log_filter)
lf['fromBlock'] = from_height
lf['toBlock'] = to_height
log.debug(f'batch backfill {from_height} - {to_height}')
batches.append((w3.eth.get_logs(lf), callback, event, lf))
from_height += chain.batch_size
else:
# event callbacks are triggered in the order in which they're registered. the events passed to
# each callback are in block transaction order
for callback, event, log_filter in self.events:
if log_filter is None:
batches.append((None, callback, event, None))
else: else:
batches = [] # todo use head['logsBloom'] to skip unnecessary log queries
if fork.disjoint: lf = dict(log_filter)
# backfill batches lf['blockHash'] = hexstr(block.hash)
for callback, event, log_filter in self.events: batches.append((w3.eth.get_logs(lf), callback, event, log_filter))
if event is None:
batches.append(None)
else:
from_height = state.root_block.height + 1
end_height = block.height
while from_height <= end_height:
to_height = min(end_height, from_height + chain.batch_size - 1)
lf = dict(log_filter)
lf['fromBlock'] = from_height
lf['toBlock'] = to_height
log.debug(f'batch backfill {from_height} - {to_height}')
batches.append((w3.eth.get_logs(lf), callback, event, lf))
from_height += chain.batch_size
else:
# event callbacks are triggered in the order in which they're registered. the events passed to
# each callback are in block transaction order
for callback, event, log_filter in self.events:
if log_filter is None:
batches.append((None, callback, event, None))
else:
lf = dict(log_filter)
lf['blockHash'] = hexstr(block.hash)
batches.append((w3.eth.get_logs(lf), callback, event, log_filter))
# set up for callbacks # set up for callbacks
current_block.set(block) current_block.set(block)
current_fork.set(fork) current_fork.set(fork)
session = db.session session = db.session
session.begin() session.begin()
session.add(block) session.add(block)
pubs = [] pubs = []
current_pub.set(lambda room, evnt, *args: pubs.append((room, evnt, args))) current_pub.set(lambda room, evnt, *args: pubs.append((room, evnt, args)))
# logevent callbacks # logevent callbacks
for future,callback,event,filter_args in batches: for future, callback, event, filter_args in batches:
if future is None: if future is None:
await maywait(callback()) # non-log callback await maywait(callback()) # non-log callback
else:
log_events = await future
for log_event in log_events:
try:
parsed = event.process_log(log_event) if event is not None else log_event
except (LogTopicError, MismatchedABI):
pass
else:
# todo try/except for known retryable errors
await maywait(callback(parsed))
# todo check for reorg and generate a reorg diff list
diff_items = state.diffs_by_hash[block.hash]
for callback in self.on_head_update:
await maywait(callback(block, diff_items))
# check for root promotion
promotion_height = fork.height - chain.confirms
if not fork.disjoint and promotion_height > state.root_block.height and (new_root_fork := fork.for_height(promotion_height)):
diff_items = state.promote_root(new_root_fork)
for callback in self.on_promotion:
# todo try/except for known retryable errors
callback(state.root_block, diff_items)
if pubs and self.publish_all:
await maywait(self.publish_all(pubs))
except:
if session is not None:
session.rollback()
if fork is not None:
state.delete_block(fork)
raise
else: else:
if session is not None: log_events = await future
session.commit() for log_event in log_events:
log.info(f'completed block {block}') try:
parsed = event.process_log(log_event) if event is not None else log_event
except (LogTopicError, MismatchedABI):
pass
else:
# todo try/except for known retryable errors
await maywait(callback(parsed))
def add_event_trigger(self, callback: Callable[[dict], None], event: ContractEvents = None, log_filter: Union[dict, str] = None): # todo check for reorg and generate a reorg diff list
""" diff_items = self.state.diffs_by_hash[block.hash]
if event is None, the callback is still invoked in the series of log handlers but with no argument instead of logs for callback in self.on_head_update:
""" await maywait(callback(block, diff_items))
if log_filter is None and event is not None:
log_filter = {'topics': [topic(event.abi)]} # check for root promotion
self.events.append((callback, event, log_filter)) promotion_height = fork.height - chain.confirms
if not fork.disjoint and promotion_height > self.state.root_block.height and (
new_root_fork := fork.for_height(promotion_height)):
diff_items = self.state.promote_root(new_root_fork)
for callback in self.on_promotion:
# todo try/except for known retryable errors
callback(self.state.root_block, diff_items)
if pubs and self.publish_all:
await maywait(self.publish_all(pubs))
except: # legitimately catch EVERYTHING because we re-raise
if session is not None:
session.rollback()
if blockhash is not None and self.state is not None:
self.state.delete_block(blockhash)
raise
else:
if session is not None:
session.commit()
log.info(f'completed block {block}')

View File

@@ -1,15 +1,18 @@
import logging import logging
from abc import abstractmethod from abc import abstractmethod
from asyncio import Queue
from contextvars import ContextVar
from uuid import uuid4 from uuid import uuid4
from web3.exceptions import TransactionNotFound from web3.exceptions import TransactionNotFound
from dexorder import db, current_w3 from dexorder import db, current_w3
from dexorder.base import TransactionDict
from dexorder.base.chain import current_chain from dexorder.base.chain import current_chain
from dexorder.base.order import TransactionRequest from dexorder.base.order import TransactionRequest
from dexorder.contract import Transaction from dexorder.contract.contract_proxy import ContractTransaction
from dexorder.database.model.block import current_block from dexorder.database.model.block import current_block
from dexorder.database.model.transaction import TransactionJob, Transaction as DbTransaction from dexorder.database.model.transaction import TransactionJob, Transaction as DbTransaction, TransactionJobState
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -25,26 +28,34 @@ class TransactionHandler:
TransactionHandler.instances[tag] = self TransactionHandler.instances[tag] = self
@abstractmethod @abstractmethod
async def send_transaction(self, job_id: int, tr: TransactionRequest) -> dict: ... async def build_transaction(self, job_id: int, tr: TransactionRequest) -> ContractTransaction: ...
@abstractmethod @abstractmethod
async def complete_transaction(self, job: TransactionJob) -> None: ... async def complete_transaction(self, job: TransactionJob) -> None: ...
def submit_transaction(tr: TransactionRequest): class TransactionSender:
job = TransactionJob(id=uuid4(), chain=current_chain.get(), height=current_block.get().height, completed=False, request=tr) def __init__(self):
self.queue = Queue[TransactionDict]()
def run(self):
while True:
pass # todo
current_transaction_sender: ContextVar[TransactionSender] = ContextVar('current_transaction_sender')
def submit_transaction_request(tr: TransactionRequest):
job = TransactionJob(id=uuid4(), chain=current_chain.get(), height=current_block.get().height, state=TransactionJobState.Requested, request=tr)
db.session.add(job) db.session.add(job)
return job return job
async def handle_transactions():
async def handle_create_transactions():
for job in db.session.query(TransactionJob).filter( for job in db.session.query(TransactionJob).filter(
TransactionJob.chain == current_chain.get(), TransactionJob.chain == current_chain.get(),
TransactionJob.completed == False TransactionJob.state == TransactionJobState.Requested
): ):
if not job.tx: await create_transaction(job)
await create_transaction(job)
if job.tx and not job.tx.receipt:
await check_receipt(job)
async def create_transaction(job: TransactionJob): async def create_transaction(job: TransactionJob):
@@ -54,11 +65,34 @@ async def create_transaction(job: TransactionJob):
# todo remove bad request? # todo remove bad request?
log.warning(f'ignoring transaction request with bad type "{job.request.type}": {",".join(TransactionHandler.instances.keys())}') log.warning(f'ignoring transaction request with bad type "{job.request.type}": {",".join(TransactionHandler.instances.keys())}')
else: else:
# noinspection PyTypeChecker ctx: ContractTransaction = await handler.build_transaction(job.id, job.request)
tx: Transaction = await handler.send_transaction(job.id, job.request) if ctx is None:
dbtx = DbTransaction(id=tx.id_bytes, job=job, receipt=None) log.warning(f'unable to build transaction for job {job.id}')
return
job.state = TransactionJobState.Signed # todo lazy signing
dbtx = DbTransaction(id=ctx.id_bytes, job=job, data=ctx.data, receipt=None)
db.session.add(dbtx) db.session.add(dbtx)
log.info(f'servicing transaction request {job.request.__class__.__name__} {job.id} with tx {tx}') log.info(f'servicing transaction request {job.request.__class__.__name__} {job.id} with tx {ctx.id}')
async def handle_send_transactions():
w3 = current_w3.get()
for job in db.session.query(TransactionJob).filter(
TransactionJob.chain == current_chain.get(),
TransactionJob.state == TransactionJobState.Signed
):
sent = await w3.eth.send_raw_transaction(job.tx.data)
assert sent == job.tx.id
job.state = TransactionJobState.Sent
async def handle_transaction_receipts():
for job in db.session.query(TransactionJob).filter(
TransactionJob.chain == current_chain.get(),
TransactionJob.state == TransactionJobState.Sent,
):
if job.tx and not job.tx.receipt:
await check_receipt(job)
async def check_receipt(job: TransactionJob): async def check_receipt(job: TransactionJob):
@@ -71,7 +105,7 @@ async def check_receipt(job: TransactionJob):
pass pass
else: else:
job.tx.receipt = receipt job.tx.receipt = receipt
job.completed = True job.state = TransactionJobState.Mined
try: try:
handler = TransactionHandler.of(job.request.type) handler = TransactionHandler.of(job.request.type)
except KeyError: except KeyError: