diff --git a/alembic/versions/db62e7db828d_.py b/alembic/versions/db62e7db828d_initial_schema.py similarity index 59% rename from alembic/versions/db62e7db828d_.py rename to alembic/versions/db62e7db828d_initial_schema.py index 7c9e871..d97c56d 100644 --- a/alembic/versions/db62e7db828d_.py +++ b/alembic/versions/db62e7db828d_initial_schema.py @@ -1,9 +1,9 @@ -"""empty message +""" +initial schema Revision ID: db62e7db828d Revises: Create Date: 2023-09-28 23:04:41.020644 - """ from typing import Sequence, Union @@ -46,6 +46,25 @@ def upgrade() -> None: sa.Column('key', sa.String(), nullable=False), sa.PrimaryKeyConstraint('chain', 'series', 'key') ) + op.create_table('transactionjob', + sa.Column('id', sa.UUID(), nullable=False), + sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False), + sa.Column('height', sa.Integer(), nullable=False), + sa.Column('completed', sa.Boolean(), nullable=False), + sa.Column('request', dexorder.database.column_types._DataclassDict(astext_type=sa.Text()), nullable=False), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_transactionjob_chain'), 'transactionjob', ['chain'], unique=False) + op.create_index(op.f('ix_transactionjob_completed'), 'transactionjob', ['completed'], unique=False) + op.create_index(op.f('ix_transactionjob_height'), 'transactionjob', ['height'], unique=False) + op.create_table('tx', + sa.Column('id', sa.UUID(), nullable=False), + sa.Column('tx', postgresql.JSONB(astext_type=sa.Text()), nullable=False), + sa.Column('hash', postgresql.BYTEA(), nullable=False), + sa.Column('receipt', postgresql.JSONB(astext_type=sa.Text()), nullable=False), + sa.ForeignKeyConstraint(['id'], ['transactionjob.id'], ), + sa.PrimaryKeyConstraint('id') + ) def downgrade() -> None: @@ -53,3 +72,5 @@ def downgrade() -> None: op.drop_table('seriesdict') op.drop_table('keyvalue') op.drop_table('block') + op.drop_table('tx') + op.drop_table('transactionjob') diff --git a/bin/df.sh b/bin/df.sh index d0bca27..528f701 100755 --- a/bin/df.sh +++ b/bin/df.sh @@ -1,2 +1,2 @@ #!/bin/bash -docker run --network=host --ulimit memlock=-1 docker.dragonflydb.io/dragonflydb/dragonfly --maxmemory 1G --dbfilename '' "$@" +docker run --network=host --ulimit memlock=-1 docker.dragonflydb.io/dragonflydb/dragonfly:latest --maxmemory 1G --dbfilename '' "$@" diff --git a/src/dexorder/base/account.py b/src/dexorder/base/account.py index c25c37b..82ade0a 100644 --- a/src/dexorder/base/account.py +++ b/src/dexorder/base/account.py @@ -66,4 +66,4 @@ class Account (LocalAccount): return self.name -current_account: ContextVar[Account] = ContextVar('current_account') +current_account: ContextVar[Optional[Account]] = ContextVar('current_account', default=Account.get()) diff --git a/src/dexorder/base/order.py b/src/dexorder/base/order.py new file mode 100644 index 0000000..9b85a92 --- /dev/null +++ b/src/dexorder/base/order.py @@ -0,0 +1,65 @@ +import logging +from dataclasses import dataclass +from typing import Optional, Type, Union + +log = logging.getLogger(__name__) + +@dataclass(frozen=True, eq=True) +class OrderKey: + vault: str + order_index: int + + @staticmethod + def str2key(keystring: str): + vault, order_index = keystring.split('|') + return OrderKey(vault, int(order_index)) + + def __str__(self): + return f'{self.vault}|{self.order_index}' + + +@dataclass(frozen=True, eq=True) +class TrancheKey (OrderKey): + tranche_index: int + + @staticmethod + def str2key(keystring: str): + vault, order_index, tranche_index = keystring.split('|') + return TrancheKey(vault, int(order_index), int(tranche_index)) + + def __str__(self): + return f'{self.vault}|{self.order_index}|{self.tranche_index}' + + +@dataclass +class ExecutionRequest: + height: int + proof: None + + +@dataclass +class TransactionRequest: + type: str # 'te' for tranche execution + +@dataclass +class TrancheExecutionRequest (TransactionRequest): + # type: str # 'te' for tranche execution + vault: str + order_index: int + tranche_index: int + price_proof: Union[None,dict,tuple[int]] + +def new_tranche_execution_request(tk: TrancheKey, _proof: Optional[dict]) -> TrancheExecutionRequest: + return TrancheExecutionRequest('te', tk.vault, tk.order_index, tk.tranche_index, (0,)) # todo proof + +def deserialize_transaction_request(**d): + t = d['type'] + Class = transaction_request_registry.get(t) + if Class is None: + raise ValueError(f'No TransactionRequest for type "{t}"') + # noinspection PyArgumentList + return Class(**d) + +transaction_request_registry: dict[str, Type[TransactionRequest]] = dict( + te = TrancheExecutionRequest, +) diff --git a/src/dexorder/blockchain/connection.py b/src/dexorder/blockchain/connection.py index 2f84f58..f41336f 100644 --- a/src/dexorder/blockchain/connection.py +++ b/src/dexorder/blockchain/connection.py @@ -1,7 +1,7 @@ from hexbytes import HexBytes from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider -from dexorder.util.uniswap_util import get_contract_data +from ..contract import get_contract_data from .. import current_w3 from ..configuration import resolve_rpc_url from ..configuration.resolve import resolve_ws_url diff --git a/src/dexorder/blockchain/transaction.py b/src/dexorder/blockchain/transaction.py new file mode 100644 index 0000000..8041a11 --- /dev/null +++ b/src/dexorder/blockchain/transaction.py @@ -0,0 +1,76 @@ +import logging +from abc import abstractmethod +from uuid import uuid4 + +from dexorder import db, current_w3 +from dexorder.base.chain import current_chain +from dexorder.base.order import TransactionRequest +from dexorder.database.model.block import current_block +from dexorder.database.model.transaction import TransactionJob, Transaction as DbTransaction + +log = logging.getLogger(__name__) + + +class TransactionHandler: + instances: dict[str,'TransactionHandler'] = {} + + @staticmethod + def of(tag: str): + return TransactionHandler.instances[tag] + + def __init__(self, tag): + TransactionHandler.instances[tag] = self + + @abstractmethod + async def send_transaction(self, job_id: int, tr: TransactionRequest) -> dict: ... + + @abstractmethod + async def complete_transaction(self, job: TransactionJob) -> None: ... + + +def submit_transaction(tr: TransactionRequest): + job = TransactionJob(id=uuid4(), chain=current_chain.get(), height=current_block.get().height, completed=False, request=tr) + db.session.add(job) + return job + +async def handle_transactions(): + for job in db.session.query(TransactionJob).filter( + TransactionJob.chain == current_chain.get(), + TransactionJob.completed == False + ): + if not job.tx: + await create_transaction(job) + if job.tx and not job.tx.receipt: + await check_receipt(job) + + +async def create_transaction(job: TransactionJob): + try: + handler = TransactionHandler.of(job.request.type) + except KeyError: + # todo remove bad request? + log.warning(f'ignoring transaction request with bad type "{job.request.type}"') + else: + tx = await handler.send_transaction(job.id, job.request) + job.tx = dtx = DbTransaction(tx=tx, hash=tx['hash'], receipt=None) + db.session.add(dtx) + log.info(f'servicing transaction request {job.request.__class__.__name__} {job.id} with tx {tx}') + + +async def check_receipt(job: TransactionJob): + if not job.tx: + return + w3 = current_w3.get() + receipt = await w3.eth.get_transaction_receipt(job.tx.hash) + if receipt is not None: + job.tx.receipt = receipt + job.completed = True + db.session.add(job.tx) + try: + handler = TransactionHandler.of(job.request.type) + except KeyError: + # todo remove bad request? + log.warning(f'ignoring transaction request with bad type "{job.request.type}"') + else: + await handler.complete_transaction(job) + diff --git a/src/dexorder/blockchain/uniswap.py b/src/dexorder/blockchain/uniswap.py index 55f266e..83c24a5 100644 --- a/src/dexorder/blockchain/uniswap.py +++ b/src/dexorder/blockchain/uniswap.py @@ -31,7 +31,7 @@ def uniswap_pool_address(factory_addr: str, addr_a: str, addr_b: str, fee: int) + UNISWAPV3_POOL_INIT_CODE_HASH ).hex()[-40:] result = to_checksum_address(contract_address) - log.debug(f'uniswap pool address {factory_addr} {addr_a} {addr_b} {fee} => {result}') + # log.debug(f'uniswap pool address {factory_addr} {addr_a} {addr_b} {fee} => {result}') return result def uniswap_price(sqrt_price): diff --git a/src/dexorder/contract/__init__.py b/src/dexorder/contract/__init__.py index ccaacc2..41b1e1a 100644 --- a/src/dexorder/contract/__init__.py +++ b/src/dexorder/contract/__init__.py @@ -1,6 +1,10 @@ +import json + from eth_abi.codec import ABIDecoder, ABIEncoder from eth_abi.registry import registry as default_registry +from .. import current_w3 + abi_decoder = ABIDecoder(default_registry) abi_encoder = ABIEncoder(default_registry) @@ -10,5 +14,11 @@ from .pool_contract import UniswapV3Pool from .uniswap_contracts import uniswapV3 -def VaultContract(addr): - return ContractProxy(addr, 'Vault') +def get_contract_data(name): + with open(f'../contract/out/{name}.sol/{name}.json', 'rt') as file: + return json.load(file) + + +def get_contract_event(contract_name:str, event_name:str): + return getattr(current_w3.get().eth.contract(abi=get_contract_data(contract_name)['abi']).events, event_name)() + diff --git a/src/dexorder/contract/contract_proxy.py b/src/dexorder/contract/contract_proxy.py index cd389b3..78fcea6 100644 --- a/src/dexorder/contract/contract_proxy.py +++ b/src/dexorder/contract/contract_proxy.py @@ -15,13 +15,13 @@ class Transaction: self.account = account self.id_bytes = tx_id_bytes self._id = None - self._receipt: Optional[TxReceipt] = None + self.receipt: Optional[TxReceipt] = None - def wait(self) -> TxReceipt: - if self._receipt is None: - self._receipt = current_w3.get().eth.wait_for_transaction_receipt(self.id_bytes) + async def wait(self) -> TxReceipt: + if self.receipt is None: + self.receipt = await current_w3.get().eth.wait_for_transaction_receipt(self.id_bytes) self.account.transaction_counter += 1 - return self._receipt + return self.receipt @property def id(self) -> str: @@ -29,35 +29,24 @@ class Transaction: self._id = self.id_bytes.hex() return self._id - @property - def receipt(self) -> TxReceipt: - if self._receipt is None: - try: - self._receipt = current_w3.get().eth.get_transaction_receipt(self.id_bytes) - self.account.transaction_counter += 1 - except TransactionNotFound: - pass - return self._receipt - def __repr__(self): - self.receipt() - receipt_status = 'IN_FLIGHT' if self._receipt is None else 'REVERTED' if self._receipt.status == 0 else self._receipt.blockNumber + receipt_status = 'IN_FLIGHT' if self.receipt is None else 'REVERTED' if self.receipt.status == 0 else self.receipt.blockNumber return f'Transaction({self.id},{receipt_status})' def call_wrapper(func): - def f(*args, **kwargs): - return func(*args, **kwargs).call(block_identifier=hexstr(current_block.get().hash)) + async def f(*args, **kwargs): + return await func(*args, **kwargs).call(block_identifier=hexstr(current_block.get().hash)) return f def transact_wrapper(func): - def f(*args, **kwargs): + async def f(*args, **kwargs): try: account = current_account.get() except LookupError: raise RuntimeError('Cannot invoke a transaction without setting an Account.') - return Transaction(account, func(*args, **kwargs).transact()) + return Transaction(account, await func(*args, **kwargs).transact()) return f @@ -91,6 +80,10 @@ class ContractProxy: self._contracts[w3] = found return found + @property + def events(self): + return self.contract.events + def deploy(self, *args): """ Calls the contract constructor transaction and waits to receive a transaction receipt. diff --git a/src/dexorder/contract/dexorder.py b/src/dexorder/contract/dexorder.py new file mode 100644 index 0000000..43acd6a --- /dev/null +++ b/src/dexorder/contract/dexorder.py @@ -0,0 +1,78 @@ +import json +import logging + +from eth_abi.packed import encode_packed +from eth_utils import keccak, to_bytes, to_checksum_address + +from dexorder import config +from dexorder.base.chain import current_chain +from dexorder.contract import ContractProxy +from dexorder.util import hexstr + +log = logging.getLogger(__name__) + +_factory = {} +_dexorder = {} + + +def _load_chain(chain_id: int): + deployment_tag = config.deployments.get(str(chain_id), 'latest') + try: + with open(f'../contract/broadcast/Deploy.sol/{chain_id}/run-{deployment_tag}.json', 'rt') as file: + deployment = json.load(file) + for tx in deployment['transactions']: + if tx['contractName'] == 'Factory': + addr = tx['contractAddress'] + _factory[chain_id] = ContractProxy(addr, 'Factory') + log.info(f'Factory {addr}') + elif tx['contractName'] == 'Dexorder': + addr = tx['contractAddress'] + _dexorder[chain_id] = DexorderContract(addr) + log.info(f'Dexorder {addr}') + except FileNotFoundError: + log.warning(f'Could not find deployment for chain {chain_id} "{deployment_tag}"') + + +def get_by_chain(d): + chain_id = current_chain.get().chain_id + try: + return d[chain_id] + except KeyError: + _load_chain(chain_id) + return d[chain_id] + +def get_factory_contract() -> ContractProxy: + return get_by_chain(_factory) + +def get_dexorder_contract() -> ContractProxy: + return get_by_chain(_dexorder) + + +VAULT_INIT_CODE_HASH = None + + +def vault_address(owner, num): + global VAULT_INIT_CODE_HASH + if VAULT_INIT_CODE_HASH is None: + with open('../contract/out/Vault.sol/Vault.json', 'rt') as _file: + vault_info = json.load(_file) + VAULT_INIT_CODE_HASH = keccak(to_bytes(hexstr=vault_info['bytecode']['object'])) + log.info(f'VAULT_INIT_CODE_HASH {hexstr(VAULT_INIT_CODE_HASH)}') + salt = keccak(encode_packed(['address','uint8'],[owner,num])) + contract_address = keccak( + b"\xff" + + to_bytes(hexstr=get_factory_contract().address) + + salt + + VAULT_INIT_CODE_HASH + ).hex()[-40:] + addr = to_checksum_address(contract_address) + # log.debug(f'vault addr {owner} #{num} => {salt.hex()} {VAULT_INIT_CODE_HASH.hex()} = {addr}') + return addr + + +def VaultContract(addr): + return ContractProxy(addr, 'Vault') + + +def DexorderContract(addr): + return ContractProxy(addr, 'Dexorder') diff --git a/src/dexorder/data/__init__.py b/src/dexorder/data/__init__.py index 6569b88..78d501f 100644 --- a/src/dexorder/data/__init__.py +++ b/src/dexorder/data/__init__.py @@ -1,10 +1,12 @@ from dexorder import dec from dexorder.blockstate import BlockSet, BlockDict +from dexorder.util import defaultdictk, hexstr # pub=... publishes to a channel for web clients to consume. argument is (key,value) and return must be (event,room,args) # if pub is True, then event is the current series name, room is the key, and args is [value] # values of DELETE are serialized as nulls vault_owners: BlockDict[str,str] = BlockDict('v', db=True, redis=True) -vault_tokens: BlockDict[str,str] = BlockDict('vt', db=True, redis=True, pub=True) +vault_tokens: dict[str, BlockSet[str]] = defaultdictk(lambda vault: BlockSet(f'vt|{vault}', db=True, redis=True, pub=lambda k,v: ('vt', vault_owners[vault], [k]))) pool_prices: BlockDict[str,dec] = BlockDict('p', db=True, redis=True, pub=True, value2str=lambda d:f'{d:f}', str2value=dec) +underfunded_vaults: BlockDict[str, list[str]] = BlockDict('uv', db=True, redis=True, value2str=lambda v:','.join(v), str2value=lambda s: s.split(',')) diff --git a/src/dexorder/database/column.py b/src/dexorder/database/column.py index c803b83..98e46c1 100644 --- a/src/dexorder/database/column.py +++ b/src/dexorder/database/column.py @@ -1,6 +1,8 @@ +import uuid from typing import Union +from uuid import uuid4 -from hexbytes import HexBytes +import sqlalchemy as sa from sqlalchemy import SMALLINT, INTEGER, BIGINT from sqlalchemy.dialects.postgresql import BYTEA, JSONB from sqlalchemy.orm import mapped_column @@ -9,6 +11,9 @@ from typing_extensions import Annotated from dexorder import Fixed2, Blockchain as NativeBlockchain from . import column_types as t +UUID = Annotated[uuid.UUID, mapped_column(sa.UUID(as_uuid=True))] +UUID_PK = Annotated[uuid.UUID, mapped_column(sa.UUID(as_uuid=True), primary_key=True, default=uuid4)] + # noinspection DuplicatedCode Uint8 = Annotated[int, mapped_column(SMALLINT)] Uint16 = Annotated[int, mapped_column(SMALLINT)] @@ -87,6 +92,8 @@ Blockchain = Annotated[NativeBlockchain, mapped_column(t.Blockchain)] Json = Annotated[Union[str,int,float,list,dict,None], mapped_column(JSONB)] +Dict = Annotated[dict, mapped_column(JSONB)] + # Uniswap aliases Tick = Int24 SqrtPriceX96 = Uint160 diff --git a/src/dexorder/database/column_types.py b/src/dexorder/database/column_types.py index c3fdbb4..9d96657 100644 --- a/src/dexorder/database/column_types.py +++ b/src/dexorder/database/column_types.py @@ -1,7 +1,8 @@ +import dataclasses import math from sqlalchemy import TypeDecorator, BIGINT -from sqlalchemy.dialects.postgresql import BYTEA +from sqlalchemy.dialects.postgresql import BYTEA, JSONB from web3 import Web3 from dexorder import Fixed2 as NativeFixed, Blockchain as NativeBlockchain @@ -26,7 +27,7 @@ class Blockchain(TypeDecorator): return value.chain_id def process_result_value(self, value: int, dialect): - return Blockchain.for_id(value) + return NativeBlockchain.for_id(value) @@ -69,3 +70,18 @@ def Fixed(bits, dbits, signed=False): result.dbits = dbits result.signed = signed return result + + +class _DataclassDict(TypeDecorator): + impl = JSONB + + def process_bind_param(self, value, dialect): + return dataclasses.asdict(value) + + def process_result_value(self, value, dialect): + return self.Constructor(**value) + +def DataclassDict(constructor): + result = _DataclassDict() + result.Constructor = constructor + return result diff --git a/src/dexorder/database/model/__init__.py b/src/dexorder/database/model/__init__.py index d8a6b48..e75a79f 100644 --- a/src/dexorder/database/model/__init__.py +++ b/src/dexorder/database/model/__init__.py @@ -1,3 +1,4 @@ from .base import Base from .block import Block from .series import SeriesSet, SeriesDict +from .transaction import Transaction, TransactionJob diff --git a/src/dexorder/database/model/transaction.py b/src/dexorder/database/model/transaction.py new file mode 100644 index 0000000..3aed731 --- /dev/null +++ b/src/dexorder/database/model/transaction.py @@ -0,0 +1,29 @@ +import logging + +from sqlalchemy import ForeignKey, UniqueConstraint +from sqlalchemy.orm import mapped_column, Mapped, relationship + +from dexorder.base.order import TransactionRequest as TransactionRequestDict, deserialize_transaction_request +from dexorder.database.column import Dict, Bytes, UUID_PK, Blockchain, UUID +from dexorder.database.column_types import DataclassDict +from dexorder.database.model import Base + +log = logging.getLogger(__name__) + + +class TransactionJob (Base): + id: Mapped[UUID_PK] + chain: Mapped[Blockchain] = mapped_column(index=True) + height: Mapped[int] = mapped_column(index=True) # to be used for data rolloff and/or by Timescale + completed: Mapped[bool] = mapped_column(default=False, index=True) + request: Mapped[TransactionRequestDict] = mapped_column(DataclassDict(deserialize_transaction_request)) + tx: Mapped["Transaction"] = relationship(back_populates="job", uselist=False) + +class Transaction (Base): + __tablename__ = 'tx' # avoid the keyword "transaction" + + id: Mapped[UUID] = mapped_column(ForeignKey("transactionjob.id"), primary_key=True) + job: Mapped[TransactionJob] = relationship(back_populates="tx", single_parent=True) + tx: Mapped[Dict] + hash: Mapped[Bytes] + receipt: Mapped[Dict] diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index aae6527..df36e57 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -1,21 +1,28 @@ import logging +from uuid import UUID from web3.types import EventData -from dexorder import current_pub, current_w3 +from dexorder import current_pub, db from dexorder.base.chain import current_chain +from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request +from dexorder.blockchain.transaction import handle_transactions, submit_transaction from dexorder.blockchain.uniswap import uniswap_price -from dexorder.util.uniswap_util import vault_address, get_contract_event, get_factory, get_contract_data -from dexorder.contract import VaultContract, UniswapV3Pool -from dexorder.data import pool_prices, vault_owners, vault_tokens +from dexorder.contract.dexorder import get_factory_contract, vault_address, VaultContract, get_dexorder_contract +from dexorder.contract import UniswapV3Pool, get_contract_event +from dexorder.data import pool_prices, vault_owners, vault_tokens, underfunded_vaults from dexorder.database.model.block import current_block - +from dexorder.database.model.transaction import TransactionJob from dexorder.order.orderlib import SwapOrderState, SwapOrderStatus -from dexorder.order.orderstate import Order -from dexorder.order.triggers import OrderTriggers, close_order_and_disable_triggers, price_triggers, time_triggers +from dexorder.order.orderstate import Order, active_orders +from dexorder.order.triggers import OrderTriggers, close_order_and_disable_triggers, price_triggers, time_triggers, \ + unconstrained_price_triggers, execution_requests, inflight_execution_requests +from dexorder.util import hexbytes log = logging.getLogger(__name__) +LOG_ALL_EVENTS = False # for debug + async def ensure_pool_price(pool_addr): if pool_addr not in pool_prices: @@ -32,24 +39,36 @@ def setup_logevent_triggers(runner): # code ordering here is also the trigger order: e.g. we process all vault creation events # before any order creations - # DEBUG - runner.add_event_trigger(dump_log, None, {}) + if LOG_ALL_EVENTS: + runner.add_event_trigger(dump_log, None, {}) - factory = get_factory() + factory = get_factory_contract() if factory is None: log.warning(f'No Factory for {current_chain.get()}') vault_created = get_contract_event('Factory', 'VaultCreated') else: - vault_created = current_w3.get().eth.contract(factory.address, abi=get_contract_data('Factory')['abi']).events.VaultCreated() + vault_created = factory.events.VaultCreated() + + dexorder = get_dexorder_contract() + if dexorder is None: + log.warning(f'No Dexorder for {current_chain.get()}') + executions = get_contract_event('Dexorder', 'DexorderExecutions') + else: + executions = dexorder.events.DexorderExecutions() + + runner.add_event_trigger(handle_vault_created, vault_created) runner.add_event_trigger(handle_order_placed, get_contract_event('OrderLib', 'DexorderSwapPlaced')) - runner.add_event_trigger(activate_time_triggers) runner.add_event_trigger(handle_transfer, get_contract_event('ERC20', 'Transfer')) runner.add_event_trigger(handle_uniswap_swap, get_contract_event('IUniswapV3PoolEvents', 'Swap')) runner.add_event_trigger(handle_swap_filled, get_contract_event('OrderLib', 'DexorderSwapFilled')) runner.add_event_trigger(handle_order_completed, get_contract_event('OrderLib', 'DexorderSwapCompleted')) runner.add_event_trigger(handle_order_error, get_contract_event('OrderLib', 'DexorderSwapError')) + runner.add_event_trigger(handle_dexorderexecutions, executions) + runner.add_event_trigger(activate_time_triggers) runner.add_event_trigger(activate_price_triggers) + runner.add_event_trigger(process_execution_requests) + runner.add_event_trigger(handle_transactions) async def handle_order_placed(event: EventData): @@ -89,13 +108,17 @@ def handle_order_error(event: EventData): log.debug(f'DexorderError {event}') def handle_transfer(transfer: EventData): + from_address = transfer['args']['from'] to_address = transfer['args']['to'] log.debug(f'transfer {to_address}') if to_address in vault_owners: token_address = transfer['address'] - vault_tokens.add(token_address) + vault_tokens[to_address].add(token_address) if to_address in underfunded_vaults: - # todo flag underfunded vault (check token type?) + # todo possibly funded now + pass + if from_address in active_orders: + # todo possibly underfunded now pass @@ -113,7 +136,6 @@ def handle_uniswap_swap(swap: EventData): def handle_vault_created(created: EventData): - log.debug(f'VaultCreated {created}') try: owner = created['args']['owner'] num = created['args']['num'] @@ -135,18 +157,68 @@ def handle_vault_created(created: EventData): current_pub.get()(f'{current_chain.get().chain_id}|{owner}', 'vaults', vaults) +def handle_dexorderexecutions(event: EventData): + log.debug(f'executions {event}') + exe_id = UUID(hexbytes(event['args']['id'])) + errors = event['args']['errors'] + job = db.session.get(TransactionJob, exe_id) + req: TrancheExecutionRequest = job.request + tk = TrancheKey( req.vault, req.order_index, req.tranche_index ) + order = active_orders[tk] + if job is None: + log.warning(f'Job {exe_id} not found!') + return + if len(errors) == 0: + log.warning(f'No errors found in DexorderExecutions event: {event}') + return + if len(errors) > 1: + log.warning(f'Multiple executions not yet implemented') + error = errors[0] + log.debug(f'job {exe_id} had error "{error}"') + if error == '': + pass # execution success + elif error == 'IIA': + # insufficient input amount: suspend execution until new funds are sent + token = order.order.tokenIn + underfunded_vaults[req.vault] = underfunded_vaults.get(req.vault, []) + [token] + log.debug(f'insufficient funds {req.vault} {token} ') + else: + log.error(f'Unhandled execution error for transaction request {req} ERROR: "{error}"') + + def activate_time_triggers(): now = current_block.get().timestamp + log.debug(f'activating time triggers') # time triggers for tt in time_triggers: tt(now) def activate_price_triggers(): + log.debug('activating price triggers') for pool, price in new_pool_prices.items(): for pt in price_triggers[pool]: pt(price) + new_pool_prices.clear() + for t in unconstrained_price_triggers: + # noinspection PyTypeChecker + t(None) -def execute_requests(): - log.info('execute requests: todo') - pass # todo +async def process_execution_requests(): + height = current_block.get().height + execs = [] # which requests to act on + for tk, er in execution_requests.items(): + tk: TrancheKey + er: ExecutionRequest + pending = inflight_execution_requests.get(tk) + if pending is None or pending > height or height-pending >= 30: # todo execution timeout => retry ; should we use timestamps? configure per-chain. + execs.append((tk,er)) + else: + log.debug(f'tranche {tk} is pending execution') + # execute the list + # todo batch execution + for tk, er in execs: + log.info(f'executing tranche {tk}') + job = submit_transaction(new_tranche_execution_request(tk, er.proof)) + inflight_execution_requests[tk] = height + log.info(f'executing tranche {tk} with job {job.id}') diff --git a/src/dexorder/order/executionhandler.py b/src/dexorder/order/executionhandler.py new file mode 100644 index 0000000..f5efff4 --- /dev/null +++ b/src/dexorder/order/executionhandler.py @@ -0,0 +1,21 @@ +import logging + +from dexorder.base.order import TrancheExecutionRequest +from dexorder.blockchain.transaction import TransactionHandler +from dexorder.contract.dexorder import get_dexorder_contract +from dexorder.database.model.transaction import TransactionJob + +log = logging.getLogger(__name__) + +class TrancheExecutionHandler (TransactionHandler): + def __init__(self): + super().__init__('te') + + async def send_transaction(self, job_id: int, ter: TrancheExecutionRequest) -> dict: + return await get_dexorder_contract().transact.execute(job_id, (ter.vault, ter.order_index, ter.tranche_index, ter.price_proof)) + + async def complete_transaction(self, job: TransactionJob) -> None: + # anything to do? + pass + +TrancheExecutionHandler() # map 'te' to a TrancheExecutionHandler diff --git a/src/dexorder/order/orderstate.py b/src/dexorder/order/orderstate.py index a68ebf2..a8348e2 100644 --- a/src/dexorder/order/orderstate.py +++ b/src/dexorder/order/orderstate.py @@ -2,36 +2,12 @@ import logging from dataclasses import dataclass from typing import overload +from dexorder.base.order import OrderKey, TrancheKey from dexorder.blockstate import BlockDict, BlockSet from dexorder.order.orderlib import SwapOrderStatus, SwapOrderState log = logging.getLogger(__name__) -@dataclass(frozen=True, eq=True) -class OrderKey: - vault: str - order_index: int - - @staticmethod - def str2key(keystring: str): - vault, order_index = keystring.split('|') - return OrderKey(vault, int(order_index)) - - def __str__(self): - return f'{self.vault}|{self.order_index}' - -@dataclass(frozen=True, eq=True) -class TrancheKey (OrderKey): - tranche_index: int - - @staticmethod - def str2key(keystring: str): - vault, order_index, tranche_index = keystring.split('|') - return TrancheKey(vault, int(order_index), int(tranche_index)) - - def __str__(self): - return f'{self.vault}|{self.order_index}|{self.tranche_index}' - @dataclass class Filled: @@ -66,7 +42,7 @@ class Order: @staticmethod def of(a, b=None): - return Order.instances[a if b is None else OrderKey(a,b)] + return Order.instances[a if b is None else OrderKey(a, b)] @staticmethod @@ -76,7 +52,7 @@ class Order: Order._statuses[key] = status.copy() # always copy the struct when setting. values in BlockData must be immutable order = Order(key) if order.is_open: - Order._open_keys.add(key) + Order.open_keys.add(key) Order._order_filled[key] = Filled(status.filledIn, status.filledOut) for i, tk in enumerate(order.tranche_keys): Order._tranche_filled[tk] = Filled(status.trancheFilledIn[i], status.trancheFilledOut[i]) @@ -90,7 +66,7 @@ class Order: def __init__(self, a, b=None): """ references an existing Order in the system. to create a new order, use create() """ - key = a if b is None else OrderKey(a,b) + key = a if b is None else OrderKey(a, b) assert key not in Order.instances self.key = key self.status: SwapOrderStatus = Order._statuses[key].copy() @@ -157,7 +133,7 @@ class Order: status = self.status status.state = final_state if self.is_open: - Order._open_keys.remove(self.key) + Order.open_keys.remove(self.key) # set final fill values in the status filled = Order._order_filled[self.key] try: @@ -178,19 +154,17 @@ class Order: Order._statuses[self.key] = final_status # set the status in order to save it Order._statuses.unload(self.key) # but then unload from memory after root promotion + # ORDER STATE # various blockstate fields hold different aspects of an order's state. + # open orders = the set of unfilled, not-canceled orders + open_keys: BlockSet[OrderKey] = BlockSet('oo', db=True, redis=True, str2key=OrderKey.str2key) + # this series holds "everything" about an order in the canonical format specified by the contract orderlib, except # the filled amount fields for active orders are maintained in the order_remainings and tranche_remainings series. _statuses: BlockDict[OrderKey, SwapOrderStatus] = BlockDict('o', db='lazy', str2key=OrderKey.str2key) - # open orders = the set of unfilled, not-canceled orders - _open_keys: BlockSet[OrderKey] = BlockSet('oo', db=True, redis=True, str2key=OrderKey.str2key) - - # underfunded vaults - _underfunded: BlockSet[str] = BlockSet('uv', db=True, redis=True) - # total remaining amount per order, for all unfilled, not-canceled orders _order_filled: BlockDict[OrderKey, Filled] = BlockDict( 'of', db=True, redis=True, str2key=OrderKey.str2key, value2str=Filled.remaining2str, str2value=Filled.str2remaining) diff --git a/src/dexorder/order/triggers.py b/src/dexorder/order/triggers.py index 29830f0..e9e7735 100644 --- a/src/dexorder/order/triggers.py +++ b/src/dexorder/order/triggers.py @@ -1,11 +1,13 @@ import logging from enum import Enum -from typing import Callable +from typing import Callable, Optional from dexorder.blockstate import BlockSet, BlockDict -from .orderlib import TimeConstraint, LimitConstraint, ConstraintMode, SwapOrderState +from .orderlib import TimeConstraint, LimitConstraint, ConstraintMode, SwapOrderState, PriceProof from dexorder.util import defaultdictk -from .orderstate import TrancheKey, Order, OrderKey +from .orderstate import Order +from ..base.order import OrderKey, TrancheKey, new_tranche_execution_request, ExecutionRequest +from ..blockchain.transaction import submit_transaction from ..database.model.block import current_block log = logging.getLogger(__name__) @@ -16,8 +18,9 @@ time_triggers:BlockSet[TimeTrigger] = BlockSet('tt') PriceTrigger = Callable[[int], None] # func(pool_price) price_triggers:dict[str, BlockSet[PriceTrigger]] = defaultdictk(lambda addr:BlockSet(f'pt|{addr}')) # different BlockSet per pool address - -execution_requests:BlockDict[TrancheKey,int] = BlockDict('te') # value is block height of the request +unconstrained_price_triggers: BlockSet[PriceTrigger] = BlockSet('upt') # tranches with no price constraints, whose time constraint is fulfilled +execution_requests:BlockDict[TrancheKey, ExecutionRequest] = BlockDict('e') # value is block height when the request was placed +inflight_execution_requests:BlockDict[TrancheKey, int] = BlockDict('ei') # value is block height when the request was sent def intersect_ranges( a_low, a_high, b_low, b_high): low, high = max(a_low,b_low), min(a_high,b_high) @@ -90,18 +93,24 @@ class TrancheTrigger: self.enable_price_trigger() def enable_price_trigger(self): - price_triggers[self.order.pool_address].add(self.price_trigger) + if self.price_constraints: + price_triggers[self.order.pool_address].add(self.price_trigger) + else: + unconstrained_price_triggers.add(self.price_trigger) def disable_price_trigger(self): price_triggers[self.order.pool_address].remove(self.price_trigger) def price_trigger(self, cur): - if all(pc.passes(cur) for pc in self.price_constraints): + if not self.price_constraints or all(pc.passes(cur) for pc in self.price_constraints): self.execute() - def execute(self): - log.info(f'execution request for {self.tk}') - execution_requests[self.tk] = current_block.get().height + def execute(self, proof: PriceProof = None): + old_req = execution_requests.get(self.tk) + height = current_block.get().height + if old_req is None or old_req.height <= height: + log.info(f'execution request for {self.tk}') + execution_requests[self.tk] = ExecutionRequest(height, proof) def disable(self): self.disable_time_trigger() @@ -124,6 +133,7 @@ class OrderTriggers: self.order = order self.triggers = [TrancheTrigger(order, tk) for tk in self.order.tranche_keys] OrderTriggers.instances[order.key] = self + log.debug(f'created OrderTriggers for {order.key}') def disable(self): for t in self.triggers: diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index 5fce22f..001ce00 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -4,7 +4,8 @@ from typing import Callable, Union, Any, Iterable, Optional from web3.contract.contract import ContractEvents from web3.exceptions import LogTopicError, MismatchedABI -from dexorder import Blockchain, db, blockchain, NARG, current_pub +from dexorder import Blockchain, db, blockchain, NARG, current_pub, Account +from dexorder.base.account import current_account from dexorder.base.chain import current_chain from dexorder.base.fork import current_fork, Fork from dexorder.blockchain.connection import create_w3_ws @@ -126,7 +127,6 @@ class BlockStateRunner: else: lf = dict(log_filter) lf['blockHash'] = hexstr(block.hash) - print(lf) batches.append((w3.eth.get_logs(lf), callback, event, log_filter)) # set up for callbacks diff --git a/src/dexorder/util/uniswap_util.py b/src/dexorder/util/uniswap_util.py deleted file mode 100644 index 86aa5d6..0000000 --- a/src/dexorder/util/uniswap_util.py +++ /dev/null @@ -1,62 +0,0 @@ -import json -import logging - -from eth_abi.packed import encode_packed -from eth_utils import keccak, to_bytes, to_checksum_address - -from dexorder import config, current_w3 -from dexorder.base.chain import current_chain -from dexorder.contract import ContractProxy -from dexorder.util import hexstr - -log = logging.getLogger(__name__) - -factory = {} - -def get_factory() -> ContractProxy: - chain_id = current_chain.get().chain_id - found = factory.get(chain_id) - if found is None: - deployment_tag = config.deployments.get(str(chain_id), 'latest') - try: - with open(f'../contract/broadcast/Deploy.sol/{chain_id}/run-{deployment_tag}.json', 'rt') as file: - deployment = json.load(file) - for tx in deployment['transactions']: - if tx['contractName'] == 'Factory': - addr = tx['contractAddress'] - found = factory[chain_id] = ContractProxy(addr, 'Factory') - log.info(f'Factory {addr}') - break - except FileNotFoundError: - log.warning(f'Could not find deployment for chain {chain_id} "{deployment_tag}"') - return found - - -def get_contract_data(name): - with open(f'../contract/out/{name}.sol/{name}.json', 'rt') as file: - return json.load(file) - -def get_contract_event(contract_name:str, event_name:str): - return getattr(current_w3.get().eth.contract(abi=get_contract_data(contract_name)['abi']).events, event_name)() - - -VAULT_INIT_CODE_HASH = None - -def vault_address(owner, num): - global VAULT_INIT_CODE_HASH - if VAULT_INIT_CODE_HASH is None: - with open('../contract/out/Vault.sol/Vault.json', 'rt') as _file: - vault_info = json.load(_file) - VAULT_INIT_CODE_HASH = keccak(to_bytes(hexstr=vault_info['bytecode']['object'])) - log.info(f'VAULT_INIT_CODE_HASH {hexstr(VAULT_INIT_CODE_HASH)}') - salt = keccak(encode_packed(['address','uint8'],[owner,num])) - contract_address = keccak( - b"\xff" - + to_bytes(hexstr=get_factory().address) - + salt - + VAULT_INIT_CODE_HASH - ).hex()[-40:] - addr = to_checksum_address(contract_address) - # log.debug(f'vault addr {owner} #{num} => {salt.hex()} {VAULT_INIT_CODE_HASH.hex()} = {addr}') - return addr -