db-based transaction management; checkin for juan

This commit is contained in:
Tim Olson
2023-10-19 17:41:37 -04:00
parent 6b15634ddc
commit 558b94bfb2
21 changed files with 473 additions and 160 deletions

View File

@@ -1,9 +1,9 @@
"""empty message
"""
initial schema
Revision ID: db62e7db828d
Revises:
Create Date: 2023-09-28 23:04:41.020644
"""
from typing import Sequence, Union
@@ -46,6 +46,25 @@ def upgrade() -> None:
sa.Column('key', sa.String(), nullable=False),
sa.PrimaryKeyConstraint('chain', 'series', 'key')
)
op.create_table('transactionjob',
sa.Column('id', sa.UUID(), nullable=False),
sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False),
sa.Column('height', sa.Integer(), nullable=False),
sa.Column('completed', sa.Boolean(), nullable=False),
sa.Column('request', dexorder.database.column_types._DataclassDict(astext_type=sa.Text()), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_transactionjob_chain'), 'transactionjob', ['chain'], unique=False)
op.create_index(op.f('ix_transactionjob_completed'), 'transactionjob', ['completed'], unique=False)
op.create_index(op.f('ix_transactionjob_height'), 'transactionjob', ['height'], unique=False)
op.create_table('tx',
sa.Column('id', sa.UUID(), nullable=False),
sa.Column('tx', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.Column('hash', postgresql.BYTEA(), nullable=False),
sa.Column('receipt', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.ForeignKeyConstraint(['id'], ['transactionjob.id'], ),
sa.PrimaryKeyConstraint('id')
)
def downgrade() -> None:
@@ -53,3 +72,5 @@ def downgrade() -> None:
op.drop_table('seriesdict')
op.drop_table('keyvalue')
op.drop_table('block')
op.drop_table('tx')
op.drop_table('transactionjob')

View File

@@ -1,2 +1,2 @@
#!/bin/bash
docker run --network=host --ulimit memlock=-1 docker.dragonflydb.io/dragonflydb/dragonfly --maxmemory 1G --dbfilename '' "$@"
docker run --network=host --ulimit memlock=-1 docker.dragonflydb.io/dragonflydb/dragonfly:latest --maxmemory 1G --dbfilename '' "$@"

View File

@@ -66,4 +66,4 @@ class Account (LocalAccount):
return self.name
current_account: ContextVar[Account] = ContextVar('current_account')
current_account: ContextVar[Optional[Account]] = ContextVar('current_account', default=Account.get())

View File

@@ -0,0 +1,65 @@
import logging
from dataclasses import dataclass
from typing import Optional, Type, Union
log = logging.getLogger(__name__)
@dataclass(frozen=True, eq=True)
class OrderKey:
vault: str
order_index: int
@staticmethod
def str2key(keystring: str):
vault, order_index = keystring.split('|')
return OrderKey(vault, int(order_index))
def __str__(self):
return f'{self.vault}|{self.order_index}'
@dataclass(frozen=True, eq=True)
class TrancheKey (OrderKey):
tranche_index: int
@staticmethod
def str2key(keystring: str):
vault, order_index, tranche_index = keystring.split('|')
return TrancheKey(vault, int(order_index), int(tranche_index))
def __str__(self):
return f'{self.vault}|{self.order_index}|{self.tranche_index}'
@dataclass
class ExecutionRequest:
height: int
proof: None
@dataclass
class TransactionRequest:
type: str # 'te' for tranche execution
@dataclass
class TrancheExecutionRequest (TransactionRequest):
# type: str # 'te' for tranche execution
vault: str
order_index: int
tranche_index: int
price_proof: Union[None,dict,tuple[int]]
def new_tranche_execution_request(tk: TrancheKey, _proof: Optional[dict]) -> TrancheExecutionRequest:
return TrancheExecutionRequest('te', tk.vault, tk.order_index, tk.tranche_index, (0,)) # todo proof
def deserialize_transaction_request(**d):
t = d['type']
Class = transaction_request_registry.get(t)
if Class is None:
raise ValueError(f'No TransactionRequest for type "{t}"')
# noinspection PyArgumentList
return Class(**d)
transaction_request_registry: dict[str, Type[TransactionRequest]] = dict(
te = TrancheExecutionRequest,
)

View File

@@ -1,7 +1,7 @@
from hexbytes import HexBytes
from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider
from dexorder.util.uniswap_util import get_contract_data
from ..contract import get_contract_data
from .. import current_w3
from ..configuration import resolve_rpc_url
from ..configuration.resolve import resolve_ws_url

View File

@@ -0,0 +1,76 @@
import logging
from abc import abstractmethod
from uuid import uuid4
from dexorder import db, current_w3
from dexorder.base.chain import current_chain
from dexorder.base.order import TransactionRequest
from dexorder.database.model.block import current_block
from dexorder.database.model.transaction import TransactionJob, Transaction as DbTransaction
log = logging.getLogger(__name__)
class TransactionHandler:
instances: dict[str,'TransactionHandler'] = {}
@staticmethod
def of(tag: str):
return TransactionHandler.instances[tag]
def __init__(self, tag):
TransactionHandler.instances[tag] = self
@abstractmethod
async def send_transaction(self, job_id: int, tr: TransactionRequest) -> dict: ...
@abstractmethod
async def complete_transaction(self, job: TransactionJob) -> None: ...
def submit_transaction(tr: TransactionRequest):
job = TransactionJob(id=uuid4(), chain=current_chain.get(), height=current_block.get().height, completed=False, request=tr)
db.session.add(job)
return job
async def handle_transactions():
for job in db.session.query(TransactionJob).filter(
TransactionJob.chain == current_chain.get(),
TransactionJob.completed == False
):
if not job.tx:
await create_transaction(job)
if job.tx and not job.tx.receipt:
await check_receipt(job)
async def create_transaction(job: TransactionJob):
try:
handler = TransactionHandler.of(job.request.type)
except KeyError:
# todo remove bad request?
log.warning(f'ignoring transaction request with bad type "{job.request.type}"')
else:
tx = await handler.send_transaction(job.id, job.request)
job.tx = dtx = DbTransaction(tx=tx, hash=tx['hash'], receipt=None)
db.session.add(dtx)
log.info(f'servicing transaction request {job.request.__class__.__name__} {job.id} with tx {tx}')
async def check_receipt(job: TransactionJob):
if not job.tx:
return
w3 = current_w3.get()
receipt = await w3.eth.get_transaction_receipt(job.tx.hash)
if receipt is not None:
job.tx.receipt = receipt
job.completed = True
db.session.add(job.tx)
try:
handler = TransactionHandler.of(job.request.type)
except KeyError:
# todo remove bad request?
log.warning(f'ignoring transaction request with bad type "{job.request.type}"')
else:
await handler.complete_transaction(job)

View File

@@ -31,7 +31,7 @@ def uniswap_pool_address(factory_addr: str, addr_a: str, addr_b: str, fee: int)
+ UNISWAPV3_POOL_INIT_CODE_HASH
).hex()[-40:]
result = to_checksum_address(contract_address)
log.debug(f'uniswap pool address {factory_addr} {addr_a} {addr_b} {fee} => {result}')
# log.debug(f'uniswap pool address {factory_addr} {addr_a} {addr_b} {fee} => {result}')
return result
def uniswap_price(sqrt_price):

View File

@@ -1,6 +1,10 @@
import json
from eth_abi.codec import ABIDecoder, ABIEncoder
from eth_abi.registry import registry as default_registry
from .. import current_w3
abi_decoder = ABIDecoder(default_registry)
abi_encoder = ABIEncoder(default_registry)
@@ -10,5 +14,11 @@ from .pool_contract import UniswapV3Pool
from .uniswap_contracts import uniswapV3
def VaultContract(addr):
return ContractProxy(addr, 'Vault')
def get_contract_data(name):
with open(f'../contract/out/{name}.sol/{name}.json', 'rt') as file:
return json.load(file)
def get_contract_event(contract_name:str, event_name:str):
return getattr(current_w3.get().eth.contract(abi=get_contract_data(contract_name)['abi']).events, event_name)()

View File

@@ -15,13 +15,13 @@ class Transaction:
self.account = account
self.id_bytes = tx_id_bytes
self._id = None
self._receipt: Optional[TxReceipt] = None
self.receipt: Optional[TxReceipt] = None
def wait(self) -> TxReceipt:
if self._receipt is None:
self._receipt = current_w3.get().eth.wait_for_transaction_receipt(self.id_bytes)
async def wait(self) -> TxReceipt:
if self.receipt is None:
self.receipt = await current_w3.get().eth.wait_for_transaction_receipt(self.id_bytes)
self.account.transaction_counter += 1
return self._receipt
return self.receipt
@property
def id(self) -> str:
@@ -29,35 +29,24 @@ class Transaction:
self._id = self.id_bytes.hex()
return self._id
@property
def receipt(self) -> TxReceipt:
if self._receipt is None:
try:
self._receipt = current_w3.get().eth.get_transaction_receipt(self.id_bytes)
self.account.transaction_counter += 1
except TransactionNotFound:
pass
return self._receipt
def __repr__(self):
self.receipt()
receipt_status = 'IN_FLIGHT' if self._receipt is None else 'REVERTED' if self._receipt.status == 0 else self._receipt.blockNumber
receipt_status = 'IN_FLIGHT' if self.receipt is None else 'REVERTED' if self.receipt.status == 0 else self.receipt.blockNumber
return f'Transaction({self.id},{receipt_status})'
def call_wrapper(func):
def f(*args, **kwargs):
return func(*args, **kwargs).call(block_identifier=hexstr(current_block.get().hash))
async def f(*args, **kwargs):
return await func(*args, **kwargs).call(block_identifier=hexstr(current_block.get().hash))
return f
def transact_wrapper(func):
def f(*args, **kwargs):
async def f(*args, **kwargs):
try:
account = current_account.get()
except LookupError:
raise RuntimeError('Cannot invoke a transaction without setting an Account.')
return Transaction(account, func(*args, **kwargs).transact())
return Transaction(account, await func(*args, **kwargs).transact())
return f
@@ -91,6 +80,10 @@ class ContractProxy:
self._contracts[w3] = found
return found
@property
def events(self):
return self.contract.events
def deploy(self, *args):
"""
Calls the contract constructor transaction and waits to receive a transaction receipt.

View File

@@ -0,0 +1,78 @@
import json
import logging
from eth_abi.packed import encode_packed
from eth_utils import keccak, to_bytes, to_checksum_address
from dexorder import config
from dexorder.base.chain import current_chain
from dexorder.contract import ContractProxy
from dexorder.util import hexstr
log = logging.getLogger(__name__)
_factory = {}
_dexorder = {}
def _load_chain(chain_id: int):
deployment_tag = config.deployments.get(str(chain_id), 'latest')
try:
with open(f'../contract/broadcast/Deploy.sol/{chain_id}/run-{deployment_tag}.json', 'rt') as file:
deployment = json.load(file)
for tx in deployment['transactions']:
if tx['contractName'] == 'Factory':
addr = tx['contractAddress']
_factory[chain_id] = ContractProxy(addr, 'Factory')
log.info(f'Factory {addr}')
elif tx['contractName'] == 'Dexorder':
addr = tx['contractAddress']
_dexorder[chain_id] = DexorderContract(addr)
log.info(f'Dexorder {addr}')
except FileNotFoundError:
log.warning(f'Could not find deployment for chain {chain_id} "{deployment_tag}"')
def get_by_chain(d):
chain_id = current_chain.get().chain_id
try:
return d[chain_id]
except KeyError:
_load_chain(chain_id)
return d[chain_id]
def get_factory_contract() -> ContractProxy:
return get_by_chain(_factory)
def get_dexorder_contract() -> ContractProxy:
return get_by_chain(_dexorder)
VAULT_INIT_CODE_HASH = None
def vault_address(owner, num):
global VAULT_INIT_CODE_HASH
if VAULT_INIT_CODE_HASH is None:
with open('../contract/out/Vault.sol/Vault.json', 'rt') as _file:
vault_info = json.load(_file)
VAULT_INIT_CODE_HASH = keccak(to_bytes(hexstr=vault_info['bytecode']['object']))
log.info(f'VAULT_INIT_CODE_HASH {hexstr(VAULT_INIT_CODE_HASH)}')
salt = keccak(encode_packed(['address','uint8'],[owner,num]))
contract_address = keccak(
b"\xff"
+ to_bytes(hexstr=get_factory_contract().address)
+ salt
+ VAULT_INIT_CODE_HASH
).hex()[-40:]
addr = to_checksum_address(contract_address)
# log.debug(f'vault addr {owner} #{num} => {salt.hex()} {VAULT_INIT_CODE_HASH.hex()} = {addr}')
return addr
def VaultContract(addr):
return ContractProxy(addr, 'Vault')
def DexorderContract(addr):
return ContractProxy(addr, 'Dexorder')

View File

@@ -1,10 +1,12 @@
from dexorder import dec
from dexorder.blockstate import BlockSet, BlockDict
from dexorder.util import defaultdictk, hexstr
# pub=... publishes to a channel for web clients to consume. argument is (key,value) and return must be (event,room,args)
# if pub is True, then event is the current series name, room is the key, and args is [value]
# values of DELETE are serialized as nulls
vault_owners: BlockDict[str,str] = BlockDict('v', db=True, redis=True)
vault_tokens: BlockDict[str,str] = BlockDict('vt', db=True, redis=True, pub=True)
vault_tokens: dict[str, BlockSet[str]] = defaultdictk(lambda vault: BlockSet(f'vt|{vault}', db=True, redis=True, pub=lambda k,v: ('vt', vault_owners[vault], [k])))
pool_prices: BlockDict[str,dec] = BlockDict('p', db=True, redis=True, pub=True, value2str=lambda d:f'{d:f}', str2value=dec)
underfunded_vaults: BlockDict[str, list[str]] = BlockDict('uv', db=True, redis=True, value2str=lambda v:','.join(v), str2value=lambda s: s.split(','))

View File

@@ -1,6 +1,8 @@
import uuid
from typing import Union
from uuid import uuid4
from hexbytes import HexBytes
import sqlalchemy as sa
from sqlalchemy import SMALLINT, INTEGER, BIGINT
from sqlalchemy.dialects.postgresql import BYTEA, JSONB
from sqlalchemy.orm import mapped_column
@@ -9,6 +11,9 @@ from typing_extensions import Annotated
from dexorder import Fixed2, Blockchain as NativeBlockchain
from . import column_types as t
UUID = Annotated[uuid.UUID, mapped_column(sa.UUID(as_uuid=True))]
UUID_PK = Annotated[uuid.UUID, mapped_column(sa.UUID(as_uuid=True), primary_key=True, default=uuid4)]
# noinspection DuplicatedCode
Uint8 = Annotated[int, mapped_column(SMALLINT)]
Uint16 = Annotated[int, mapped_column(SMALLINT)]
@@ -87,6 +92,8 @@ Blockchain = Annotated[NativeBlockchain, mapped_column(t.Blockchain)]
Json = Annotated[Union[str,int,float,list,dict,None], mapped_column(JSONB)]
Dict = Annotated[dict, mapped_column(JSONB)]
# Uniswap aliases
Tick = Int24
SqrtPriceX96 = Uint160

View File

@@ -1,7 +1,8 @@
import dataclasses
import math
from sqlalchemy import TypeDecorator, BIGINT
from sqlalchemy.dialects.postgresql import BYTEA
from sqlalchemy.dialects.postgresql import BYTEA, JSONB
from web3 import Web3
from dexorder import Fixed2 as NativeFixed, Blockchain as NativeBlockchain
@@ -26,7 +27,7 @@ class Blockchain(TypeDecorator):
return value.chain_id
def process_result_value(self, value: int, dialect):
return Blockchain.for_id(value)
return NativeBlockchain.for_id(value)
@@ -69,3 +70,18 @@ def Fixed(bits, dbits, signed=False):
result.dbits = dbits
result.signed = signed
return result
class _DataclassDict(TypeDecorator):
impl = JSONB
def process_bind_param(self, value, dialect):
return dataclasses.asdict(value)
def process_result_value(self, value, dialect):
return self.Constructor(**value)
def DataclassDict(constructor):
result = _DataclassDict()
result.Constructor = constructor
return result

View File

@@ -1,3 +1,4 @@
from .base import Base
from .block import Block
from .series import SeriesSet, SeriesDict
from .transaction import Transaction, TransactionJob

View File

@@ -0,0 +1,29 @@
import logging
from sqlalchemy import ForeignKey, UniqueConstraint
from sqlalchemy.orm import mapped_column, Mapped, relationship
from dexorder.base.order import TransactionRequest as TransactionRequestDict, deserialize_transaction_request
from dexorder.database.column import Dict, Bytes, UUID_PK, Blockchain, UUID
from dexorder.database.column_types import DataclassDict
from dexorder.database.model import Base
log = logging.getLogger(__name__)
class TransactionJob (Base):
id: Mapped[UUID_PK]
chain: Mapped[Blockchain] = mapped_column(index=True)
height: Mapped[int] = mapped_column(index=True) # to be used for data rolloff and/or by Timescale
completed: Mapped[bool] = mapped_column(default=False, index=True)
request: Mapped[TransactionRequestDict] = mapped_column(DataclassDict(deserialize_transaction_request))
tx: Mapped["Transaction"] = relationship(back_populates="job", uselist=False)
class Transaction (Base):
__tablename__ = 'tx' # avoid the keyword "transaction"
id: Mapped[UUID] = mapped_column(ForeignKey("transactionjob.id"), primary_key=True)
job: Mapped[TransactionJob] = relationship(back_populates="tx", single_parent=True)
tx: Mapped[Dict]
hash: Mapped[Bytes]
receipt: Mapped[Dict]

View File

@@ -1,21 +1,28 @@
import logging
from uuid import UUID
from web3.types import EventData
from dexorder import current_pub, current_w3
from dexorder import current_pub, db
from dexorder.base.chain import current_chain
from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request
from dexorder.blockchain.transaction import handle_transactions, submit_transaction
from dexorder.blockchain.uniswap import uniswap_price
from dexorder.util.uniswap_util import vault_address, get_contract_event, get_factory, get_contract_data
from dexorder.contract import VaultContract, UniswapV3Pool
from dexorder.data import pool_prices, vault_owners, vault_tokens
from dexorder.contract.dexorder import get_factory_contract, vault_address, VaultContract, get_dexorder_contract
from dexorder.contract import UniswapV3Pool, get_contract_event
from dexorder.data import pool_prices, vault_owners, vault_tokens, underfunded_vaults
from dexorder.database.model.block import current_block
from dexorder.database.model.transaction import TransactionJob
from dexorder.order.orderlib import SwapOrderState, SwapOrderStatus
from dexorder.order.orderstate import Order
from dexorder.order.triggers import OrderTriggers, close_order_and_disable_triggers, price_triggers, time_triggers
from dexorder.order.orderstate import Order, active_orders
from dexorder.order.triggers import OrderTriggers, close_order_and_disable_triggers, price_triggers, time_triggers, \
unconstrained_price_triggers, execution_requests, inflight_execution_requests
from dexorder.util import hexbytes
log = logging.getLogger(__name__)
LOG_ALL_EVENTS = False # for debug
async def ensure_pool_price(pool_addr):
if pool_addr not in pool_prices:
@@ -32,24 +39,36 @@ def setup_logevent_triggers(runner):
# code ordering here is also the trigger order: e.g. we process all vault creation events
# before any order creations
# DEBUG
if LOG_ALL_EVENTS:
runner.add_event_trigger(dump_log, None, {})
factory = get_factory()
factory = get_factory_contract()
if factory is None:
log.warning(f'No Factory for {current_chain.get()}')
vault_created = get_contract_event('Factory', 'VaultCreated')
else:
vault_created = current_w3.get().eth.contract(factory.address, abi=get_contract_data('Factory')['abi']).events.VaultCreated()
vault_created = factory.events.VaultCreated()
dexorder = get_dexorder_contract()
if dexorder is None:
log.warning(f'No Dexorder for {current_chain.get()}')
executions = get_contract_event('Dexorder', 'DexorderExecutions')
else:
executions = dexorder.events.DexorderExecutions()
runner.add_event_trigger(handle_vault_created, vault_created)
runner.add_event_trigger(handle_order_placed, get_contract_event('OrderLib', 'DexorderSwapPlaced'))
runner.add_event_trigger(activate_time_triggers)
runner.add_event_trigger(handle_transfer, get_contract_event('ERC20', 'Transfer'))
runner.add_event_trigger(handle_uniswap_swap, get_contract_event('IUniswapV3PoolEvents', 'Swap'))
runner.add_event_trigger(handle_swap_filled, get_contract_event('OrderLib', 'DexorderSwapFilled'))
runner.add_event_trigger(handle_order_completed, get_contract_event('OrderLib', 'DexorderSwapCompleted'))
runner.add_event_trigger(handle_order_error, get_contract_event('OrderLib', 'DexorderSwapError'))
runner.add_event_trigger(handle_dexorderexecutions, executions)
runner.add_event_trigger(activate_time_triggers)
runner.add_event_trigger(activate_price_triggers)
runner.add_event_trigger(process_execution_requests)
runner.add_event_trigger(handle_transactions)
async def handle_order_placed(event: EventData):
@@ -89,13 +108,17 @@ def handle_order_error(event: EventData):
log.debug(f'DexorderError {event}')
def handle_transfer(transfer: EventData):
from_address = transfer['args']['from']
to_address = transfer['args']['to']
log.debug(f'transfer {to_address}')
if to_address in vault_owners:
token_address = transfer['address']
vault_tokens.add(token_address)
vault_tokens[to_address].add(token_address)
if to_address in underfunded_vaults:
# todo flag underfunded vault (check token type?)
# todo possibly funded now
pass
if from_address in active_orders:
# todo possibly underfunded now
pass
@@ -113,7 +136,6 @@ def handle_uniswap_swap(swap: EventData):
def handle_vault_created(created: EventData):
log.debug(f'VaultCreated {created}')
try:
owner = created['args']['owner']
num = created['args']['num']
@@ -135,18 +157,68 @@ def handle_vault_created(created: EventData):
current_pub.get()(f'{current_chain.get().chain_id}|{owner}', 'vaults', vaults)
def handle_dexorderexecutions(event: EventData):
log.debug(f'executions {event}')
exe_id = UUID(hexbytes(event['args']['id']))
errors = event['args']['errors']
job = db.session.get(TransactionJob, exe_id)
req: TrancheExecutionRequest = job.request
tk = TrancheKey( req.vault, req.order_index, req.tranche_index )
order = active_orders[tk]
if job is None:
log.warning(f'Job {exe_id} not found!')
return
if len(errors) == 0:
log.warning(f'No errors found in DexorderExecutions event: {event}')
return
if len(errors) > 1:
log.warning(f'Multiple executions not yet implemented')
error = errors[0]
log.debug(f'job {exe_id} had error "{error}"')
if error == '':
pass # execution success
elif error == 'IIA':
# insufficient input amount: suspend execution until new funds are sent
token = order.order.tokenIn
underfunded_vaults[req.vault] = underfunded_vaults.get(req.vault, []) + [token]
log.debug(f'insufficient funds {req.vault} {token} ')
else:
log.error(f'Unhandled execution error for transaction request {req} ERROR: "{error}"')
def activate_time_triggers():
now = current_block.get().timestamp
log.debug(f'activating time triggers')
# time triggers
for tt in time_triggers:
tt(now)
def activate_price_triggers():
log.debug('activating price triggers')
for pool, price in new_pool_prices.items():
for pt in price_triggers[pool]:
pt(price)
new_pool_prices.clear()
for t in unconstrained_price_triggers:
# noinspection PyTypeChecker
t(None)
def execute_requests():
log.info('execute requests: todo')
pass # todo
async def process_execution_requests():
height = current_block.get().height
execs = [] # which requests to act on
for tk, er in execution_requests.items():
tk: TrancheKey
er: ExecutionRequest
pending = inflight_execution_requests.get(tk)
if pending is None or pending > height or height-pending >= 30: # todo execution timeout => retry ; should we use timestamps? configure per-chain.
execs.append((tk,er))
else:
log.debug(f'tranche {tk} is pending execution')
# execute the list
# todo batch execution
for tk, er in execs:
log.info(f'executing tranche {tk}')
job = submit_transaction(new_tranche_execution_request(tk, er.proof))
inflight_execution_requests[tk] = height
log.info(f'executing tranche {tk} with job {job.id}')

View File

@@ -0,0 +1,21 @@
import logging
from dexorder.base.order import TrancheExecutionRequest
from dexorder.blockchain.transaction import TransactionHandler
from dexorder.contract.dexorder import get_dexorder_contract
from dexorder.database.model.transaction import TransactionJob
log = logging.getLogger(__name__)
class TrancheExecutionHandler (TransactionHandler):
def __init__(self):
super().__init__('te')
async def send_transaction(self, job_id: int, ter: TrancheExecutionRequest) -> dict:
return await get_dexorder_contract().transact.execute(job_id, (ter.vault, ter.order_index, ter.tranche_index, ter.price_proof))
async def complete_transaction(self, job: TransactionJob) -> None:
# anything to do?
pass
TrancheExecutionHandler() # map 'te' to a TrancheExecutionHandler

View File

@@ -2,36 +2,12 @@ import logging
from dataclasses import dataclass
from typing import overload
from dexorder.base.order import OrderKey, TrancheKey
from dexorder.blockstate import BlockDict, BlockSet
from dexorder.order.orderlib import SwapOrderStatus, SwapOrderState
log = logging.getLogger(__name__)
@dataclass(frozen=True, eq=True)
class OrderKey:
vault: str
order_index: int
@staticmethod
def str2key(keystring: str):
vault, order_index = keystring.split('|')
return OrderKey(vault, int(order_index))
def __str__(self):
return f'{self.vault}|{self.order_index}'
@dataclass(frozen=True, eq=True)
class TrancheKey (OrderKey):
tranche_index: int
@staticmethod
def str2key(keystring: str):
vault, order_index, tranche_index = keystring.split('|')
return TrancheKey(vault, int(order_index), int(tranche_index))
def __str__(self):
return f'{self.vault}|{self.order_index}|{self.tranche_index}'
@dataclass
class Filled:
@@ -76,7 +52,7 @@ class Order:
Order._statuses[key] = status.copy() # always copy the struct when setting. values in BlockData must be immutable
order = Order(key)
if order.is_open:
Order._open_keys.add(key)
Order.open_keys.add(key)
Order._order_filled[key] = Filled(status.filledIn, status.filledOut)
for i, tk in enumerate(order.tranche_keys):
Order._tranche_filled[tk] = Filled(status.trancheFilledIn[i], status.trancheFilledOut[i])
@@ -157,7 +133,7 @@ class Order:
status = self.status
status.state = final_state
if self.is_open:
Order._open_keys.remove(self.key)
Order.open_keys.remove(self.key)
# set final fill values in the status
filled = Order._order_filled[self.key]
try:
@@ -178,19 +154,17 @@ class Order:
Order._statuses[self.key] = final_status # set the status in order to save it
Order._statuses.unload(self.key) # but then unload from memory after root promotion
# ORDER STATE
# various blockstate fields hold different aspects of an order's state.
# open orders = the set of unfilled, not-canceled orders
open_keys: BlockSet[OrderKey] = BlockSet('oo', db=True, redis=True, str2key=OrderKey.str2key)
# this series holds "everything" about an order in the canonical format specified by the contract orderlib, except
# the filled amount fields for active orders are maintained in the order_remainings and tranche_remainings series.
_statuses: BlockDict[OrderKey, SwapOrderStatus] = BlockDict('o', db='lazy', str2key=OrderKey.str2key)
# open orders = the set of unfilled, not-canceled orders
_open_keys: BlockSet[OrderKey] = BlockSet('oo', db=True, redis=True, str2key=OrderKey.str2key)
# underfunded vaults
_underfunded: BlockSet[str] = BlockSet('uv', db=True, redis=True)
# total remaining amount per order, for all unfilled, not-canceled orders
_order_filled: BlockDict[OrderKey, Filled] = BlockDict(
'of', db=True, redis=True, str2key=OrderKey.str2key, value2str=Filled.remaining2str, str2value=Filled.str2remaining)

View File

@@ -1,11 +1,13 @@
import logging
from enum import Enum
from typing import Callable
from typing import Callable, Optional
from dexorder.blockstate import BlockSet, BlockDict
from .orderlib import TimeConstraint, LimitConstraint, ConstraintMode, SwapOrderState
from .orderlib import TimeConstraint, LimitConstraint, ConstraintMode, SwapOrderState, PriceProof
from dexorder.util import defaultdictk
from .orderstate import TrancheKey, Order, OrderKey
from .orderstate import Order
from ..base.order import OrderKey, TrancheKey, new_tranche_execution_request, ExecutionRequest
from ..blockchain.transaction import submit_transaction
from ..database.model.block import current_block
log = logging.getLogger(__name__)
@@ -16,8 +18,9 @@ time_triggers:BlockSet[TimeTrigger] = BlockSet('tt')
PriceTrigger = Callable[[int], None] # func(pool_price)
price_triggers:dict[str, BlockSet[PriceTrigger]] = defaultdictk(lambda addr:BlockSet(f'pt|{addr}')) # different BlockSet per pool address
execution_requests:BlockDict[TrancheKey,int] = BlockDict('te') # value is block height of the request
unconstrained_price_triggers: BlockSet[PriceTrigger] = BlockSet('upt') # tranches with no price constraints, whose time constraint is fulfilled
execution_requests:BlockDict[TrancheKey, ExecutionRequest] = BlockDict('e') # value is block height when the request was placed
inflight_execution_requests:BlockDict[TrancheKey, int] = BlockDict('ei') # value is block height when the request was sent
def intersect_ranges( a_low, a_high, b_low, b_high):
low, high = max(a_low,b_low), min(a_high,b_high)
@@ -90,18 +93,24 @@ class TrancheTrigger:
self.enable_price_trigger()
def enable_price_trigger(self):
if self.price_constraints:
price_triggers[self.order.pool_address].add(self.price_trigger)
else:
unconstrained_price_triggers.add(self.price_trigger)
def disable_price_trigger(self):
price_triggers[self.order.pool_address].remove(self.price_trigger)
def price_trigger(self, cur):
if all(pc.passes(cur) for pc in self.price_constraints):
if not self.price_constraints or all(pc.passes(cur) for pc in self.price_constraints):
self.execute()
def execute(self):
def execute(self, proof: PriceProof = None):
old_req = execution_requests.get(self.tk)
height = current_block.get().height
if old_req is None or old_req.height <= height:
log.info(f'execution request for {self.tk}')
execution_requests[self.tk] = current_block.get().height
execution_requests[self.tk] = ExecutionRequest(height, proof)
def disable(self):
self.disable_time_trigger()
@@ -124,6 +133,7 @@ class OrderTriggers:
self.order = order
self.triggers = [TrancheTrigger(order, tk) for tk in self.order.tranche_keys]
OrderTriggers.instances[order.key] = self
log.debug(f'created OrderTriggers for {order.key}')
def disable(self):
for t in self.triggers:

View File

@@ -4,7 +4,8 @@ from typing import Callable, Union, Any, Iterable, Optional
from web3.contract.contract import ContractEvents
from web3.exceptions import LogTopicError, MismatchedABI
from dexorder import Blockchain, db, blockchain, NARG, current_pub
from dexorder import Blockchain, db, blockchain, NARG, current_pub, Account
from dexorder.base.account import current_account
from dexorder.base.chain import current_chain
from dexorder.base.fork import current_fork, Fork
from dexorder.blockchain.connection import create_w3_ws
@@ -126,7 +127,6 @@ class BlockStateRunner:
else:
lf = dict(log_filter)
lf['blockHash'] = hexstr(block.hash)
print(lf)
batches.append((w3.eth.get_logs(lf), callback, event, log_filter))
# set up for callbacks

View File

@@ -1,62 +0,0 @@
import json
import logging
from eth_abi.packed import encode_packed
from eth_utils import keccak, to_bytes, to_checksum_address
from dexorder import config, current_w3
from dexorder.base.chain import current_chain
from dexorder.contract import ContractProxy
from dexorder.util import hexstr
log = logging.getLogger(__name__)
factory = {}
def get_factory() -> ContractProxy:
chain_id = current_chain.get().chain_id
found = factory.get(chain_id)
if found is None:
deployment_tag = config.deployments.get(str(chain_id), 'latest')
try:
with open(f'../contract/broadcast/Deploy.sol/{chain_id}/run-{deployment_tag}.json', 'rt') as file:
deployment = json.load(file)
for tx in deployment['transactions']:
if tx['contractName'] == 'Factory':
addr = tx['contractAddress']
found = factory[chain_id] = ContractProxy(addr, 'Factory')
log.info(f'Factory {addr}')
break
except FileNotFoundError:
log.warning(f'Could not find deployment for chain {chain_id} "{deployment_tag}"')
return found
def get_contract_data(name):
with open(f'../contract/out/{name}.sol/{name}.json', 'rt') as file:
return json.load(file)
def get_contract_event(contract_name:str, event_name:str):
return getattr(current_w3.get().eth.contract(abi=get_contract_data(contract_name)['abi']).events, event_name)()
VAULT_INIT_CODE_HASH = None
def vault_address(owner, num):
global VAULT_INIT_CODE_HASH
if VAULT_INIT_CODE_HASH is None:
with open('../contract/out/Vault.sol/Vault.json', 'rt') as _file:
vault_info = json.load(_file)
VAULT_INIT_CODE_HASH = keccak(to_bytes(hexstr=vault_info['bytecode']['object']))
log.info(f'VAULT_INIT_CODE_HASH {hexstr(VAULT_INIT_CODE_HASH)}')
salt = keccak(encode_packed(['address','uint8'],[owner,num]))
contract_address = keccak(
b"\xff"
+ to_bytes(hexstr=get_factory().address)
+ salt
+ VAULT_INIT_CODE_HASH
).hex()[-40:]
addr = to_checksum_address(contract_address)
# log.debug(f'vault addr {owner} #{num} => {salt.hex()} {VAULT_INIT_CODE_HASH.hex()} = {addr}')
return addr