From c2abf7dc3293ff7700cf137255188aa28f7f4840 Mon Sep 17 00:00:00 2001 From: Tim Olson <> Date: Sun, 8 Oct 2023 01:10:51 -0400 Subject: [PATCH] orderlib, parse order status from chain --- src/dexorder/__init__.py | 14 +- src/dexorder/base/account.py | 7 +- src/dexorder/base/chain.py | 4 +- src/dexorder/base/event_manager.py | 26 --- src/dexorder/base/token.py | 18 +- src/dexorder/bin/main.py | 8 +- src/dexorder/blockchain/__init__.py | 1 - src/dexorder/blockchain/chain_singletons.py | 2 - src/dexorder/blockchain/connection.py | 6 +- src/dexorder/blockchain/uniswap.py | 28 +++ src/dexorder/blockchain/util.py | 51 +++++- src/dexorder/configuration/schema.py | 2 + src/dexorder/contract/__init__.py | 13 ++ src/dexorder/contract/abi.py | 3 + src/dexorder/contract/contract_proxy.py | 114 ++++++++++++ src/dexorder/contract/pool_contract.py | 6 + src/dexorder/contract/uniswap_contracts.py | 19 ++ src/dexorder/data/__init__.py | 2 +- src/dexorder/event_handler.py | 113 ++++++++++++ src/dexorder/memcache/__init__.py | 2 +- src/dexorder/memcache/memcache_state.py | 25 +-- src/dexorder/orderlib/orders.py | 185 ++++++++++++++++++++ src/dexorder/runner.py | 140 ++++++--------- 23 files changed, 640 insertions(+), 149 deletions(-) delete mode 100644 src/dexorder/base/event_manager.py delete mode 100644 src/dexorder/blockchain/chain_singletons.py create mode 100644 src/dexorder/blockchain/uniswap.py create mode 100644 src/dexorder/contract/__init__.py create mode 100644 src/dexorder/contract/abi.py create mode 100644 src/dexorder/contract/contract_proxy.py create mode 100644 src/dexorder/contract/pool_contract.py create mode 100644 src/dexorder/contract/uniswap_contracts.py create mode 100644 src/dexorder/event_handler.py create mode 100644 src/dexorder/orderlib/orders.py diff --git a/src/dexorder/__init__.py b/src/dexorder/__init__.py index 60b104c..efc2e38 100644 --- a/src/dexorder/__init__.py +++ b/src/dexorder/__init__.py @@ -1,4 +1,10 @@ -from decimal import Decimal as dec +from contextvars import ContextVar +from decimal import Decimal +from typing import Callable, Any + +from web3 import AsyncWeb3 + +dec = Decimal # NARG is used in argument defaults to mean "not specified" rather than "specified as None" class _NARG: @@ -10,6 +16,10 @@ WEI = 1 GWEI = 1_000_000_000 ETH = 1_000_000_000_000_000_000 +current_w3 = ContextVar[AsyncWeb3]('current_w3') +current_pub = ContextVar[Callable[[str,str,Any],None]]('current_pub') + + # noinspection PyProtectedMember from .util.cwd import _cwd _cwd() # do this first so that config has the right current working directory @@ -19,6 +29,6 @@ from .base.chain import Blockchain # the singletons are loaded into the dexorder from .util import async_yield from .base.fixed import Fixed2, FixedDecimals, Dec18 from .configuration import config -from .base.account import Account # must come before context +from .base.account import Account from .base.token import Token, tokens from .database import db diff --git a/src/dexorder/base/account.py b/src/dexorder/base/account.py index 97d0242..c25c37b 100644 --- a/src/dexorder/base/account.py +++ b/src/dexorder/base/account.py @@ -1,10 +1,11 @@ +from contextvars import ContextVar from typing import Union, Optional import eth_account from eth_account.signers.local import LocalAccount from web3.middleware import construct_sign_and_send_raw_middleware -from dexorder import NARG, config +from dexorder import NARG, config, current_w3 # this is just here for typing the extra .name. the __new__() function returns an eth_account...LocalAccount @@ -59,8 +60,10 @@ class Account (LocalAccount): w3.middleware_onion.add(self.signing_middleware, 'account_signer') def balance(self): - return ctx.w3.eth.get_balance(self.address) + return current_w3.get().eth.get_balance(self.address) def __str__(self): return self.name + +current_account: ContextVar[Account] = ContextVar('current_account') diff --git a/src/dexorder/base/chain.py b/src/dexorder/base/chain.py index 1cd076c..e37405f 100644 --- a/src/dexorder/base/chain.py +++ b/src/dexorder/base/chain.py @@ -24,8 +24,8 @@ class Blockchain: """ self.chain_id = chain_id self.name = name - self.confirms = confirms - self.batch_size = batch_size + self.confirms = confirms # todo configure + self.batch_size = batch_size # todo configure Blockchain._instances_by_id[chain_id] = self Blockchain._instances_by_name[name] = self diff --git a/src/dexorder/base/event_manager.py b/src/dexorder/base/event_manager.py deleted file mode 100644 index d8053b0..0000000 --- a/src/dexorder/base/event_manager.py +++ /dev/null @@ -1,26 +0,0 @@ -from typing import Union - -from defaultlist import defaultlist -from eth_utils import keccak - -from dexorder.base.blockstate import BlockDict - - -class EventManager: - def __init__(self): - self.all_topics = set() - self.triggers:dict[str,BlockDict] = {} - self.rings = defaultlist(list) - - def add_handler(self, topic: Union[bytes,str], callback): - if type(topic) is str: - topic = bytes.fromhex(topic[2:]) if topic.startswith('0x') else keccak(text=topic) - triggers = self.triggers.get(topic) - if triggers is None: - triggers = self.triggers[topic] = BlockDict(topic) - triggers.add(callback) - self.all_topics.add(topic) - - def publish_topic(self, topic, data): - for callback, _ in self.triggers.get(topic, {}).items(): - callback(data) diff --git a/src/dexorder/base/token.py b/src/dexorder/base/token.py index f7999eb..64b92f4 100644 --- a/src/dexorder/base/token.py +++ b/src/dexorder/base/token.py @@ -4,9 +4,10 @@ from decimal import Decimal from sqlalchemy.orm import Mapped from web3 import Web3 -from dexorder import config, Blockchain, NARG, FixedDecimals, ADDRESS_0 +from dexorder import config, Blockchain, NARG, FixedDecimals, ADDRESS_0, current_w3 +from dexorder.base.account import current_account from dexorder.blockchain import ByBlockchainDict -from dexorder.base.chain import Polygon, Arbitrum, Ethereum +from dexorder.base.chain import current_chain from dexorder.contract import ContractProxy, abis import dexorder.database.column as col @@ -28,7 +29,7 @@ class Token (ContractProxy, FixedDecimals): # noinspection PyTypeChecker return Web3.to_checksum_address(name_or_address) except ValueError: - raise ValueError(f'Could not resolve token {name_or_address} for chain {ctx.chain_id}') + raise ValueError(f'Could not resolve token {name_or_address} for chain {current_chain.get().chain_id}') def __init__(self, chain_id, address, decimals, symbol, name, *, abi=None): FixedDecimals.__init__(self, decimals) @@ -39,6 +40,7 @@ class Token (ContractProxy, FixedDecimals): abi = abis.get(abi,abi) ContractProxy.__init__(self, address, load, abi=abi) self.chain_id = chain_id + # noinspection PyTypeChecker self.address = address self.decimals = decimals self.symbol = symbol @@ -46,7 +48,7 @@ class Token (ContractProxy, FixedDecimals): def balance(self, address: str = None) -> int: if address is None: - address = ctx.address + address = current_account.get().address return self.balanceOf(address) def balance_dec(self, address: str = None) -> Decimal: @@ -72,7 +74,7 @@ class NativeToken (FixedDecimals): @staticmethod def get( chain_id = None) -> 'NativeToken': if chain_id is None: - chain_id = ctx.chain_id + chain_id = current_chain.get().chain_id return _native_tokens[chain_id] def __init__(self, chain_id, decimals, symbol, name, *, wrapper_token = None): @@ -85,9 +87,9 @@ class NativeToken (FixedDecimals): def balance(self, address: str = None) -> int: if address is None: - address = ctx.address - assert ctx.chain_id == self.chain_id - return ctx.w3.eth.get_balance(address) + address = current_account.get() + assert current_chain.get().chain_id == self.chain_id + return current_w3.get().eth.get_balance(address) def balance_dec(self, address: str = None) -> Decimal: return self.dec(self.balance(address)) diff --git a/src/dexorder/bin/main.py b/src/dexorder/bin/main.py index 5a504a0..2c9323f 100644 --- a/src/dexorder/bin/main.py +++ b/src/dexorder/bin/main.py @@ -1,14 +1,14 @@ import logging +import sys from asyncio import CancelledError from dexorder import db, config, Blockchain from dexorder.base.chain import current_chain from dexorder.bin.executable import execute -from dexorder.blockstate import DiffItem, DiffEntry from dexorder.blockstate.blockdata import BlockData from dexorder.blockstate.db_state import DbState from dexorder.configuration import parse_args -from dexorder.memcache.memcache_state import RedisState +from dexorder.memcache.memcache_state import RedisState, publish_all from dexorder.memcache import memcache from dexorder.runner import BlockStateRunner @@ -16,7 +16,7 @@ log = logging.getLogger('dexorder') async def main(): - logging.basicConfig(level=logging.INFO) + logging.basicConfig(level=logging.INFO, stream=sys.stdout) log.setLevel(logging.DEBUG) parse_args() current_chain.set(Blockchain.get(config.chain)) @@ -35,7 +35,7 @@ async def main(): await redis_state.init(state) log.info(f'loaded state from db for root block {state.root_block}') - runner = BlockStateRunner(state) + runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None) if db: # noinspection PyUnboundLocalVariable runner.on_promotion.append(db_state.save) diff --git a/src/dexorder/blockchain/__init__.py b/src/dexorder/blockchain/__init__.py index ec80d01..593380b 100644 --- a/src/dexorder/blockchain/__init__.py +++ b/src/dexorder/blockchain/__init__.py @@ -1,3 +1,2 @@ from .by_blockchain import ByBlockchainDict, ByBlockchainList, ByBlockchainCollection from .connection import connect -from dexorder.base.chain import Ethereum, Polygon, Goerli, Mumbai, Arbitrum, BSC diff --git a/src/dexorder/blockchain/chain_singletons.py b/src/dexorder/blockchain/chain_singletons.py deleted file mode 100644 index ffe1c28..0000000 --- a/src/dexorder/blockchain/chain_singletons.py +++ /dev/null @@ -1,2 +0,0 @@ -from dexorder import Blockchain - diff --git a/src/dexorder/blockchain/connection.py b/src/dexorder/blockchain/connection.py index c21651f..0c58511 100644 --- a/src/dexorder/blockchain/connection.py +++ b/src/dexorder/blockchain/connection.py @@ -1,16 +1,12 @@ -from contextvars import ContextVar - from hexbytes import HexBytes from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider from dexorder.blockchain.util import get_contract_data +from .. import current_w3 from ..configuration import resolve_rpc_url from ..configuration.resolve import resolve_ws_url -current_w3 = ContextVar('current_w3') - - def connect(rpc_url=None): """ connects to the rpc_url and configures the context diff --git a/src/dexorder/blockchain/uniswap.py b/src/dexorder/blockchain/uniswap.py new file mode 100644 index 0000000..2f5b968 --- /dev/null +++ b/src/dexorder/blockchain/uniswap.py @@ -0,0 +1,28 @@ +class Fee: + LOWEST = 100 + LOW = 500 + MEDIUM = 3000 + HIGH = 10000 + + +def ordered_addresses(addr_a:str, addr_b:str): + return (addr_a, addr_b) if addr_a.lower() <= addr_b.lower() else (addr_b, addr_a) + + +def pool_address(_factory_addr:str, _addr_a:str, _addr_b:str): + # todo compute pool address + raise NotImplementedError + # addr0, addr1 = ordered_addresses(addr_a, addr_b) + # get_create2_address() + +# use the util.liquidity or util.simple_liquidity package instead + +# def liquidity_for_amount_0(lower: int, upper: int, amount_0: int): +# lower = convert.tick_to_price(lower) +# upper = convert.tick_to_price(upper) +# return amount_0 * upper * lower / (upper - lower) +# +# def liquidity_for_amount_1(lower: int, upper: int, amount_1: int ): +# lower = convert.tick_to_price(lower) +# upper = convert.tick_to_price(upper) +# return amount_1 / (upper - lower) diff --git a/src/dexorder/blockchain/util.py b/src/dexorder/blockchain/util.py index 94cf53f..a070821 100644 --- a/src/dexorder/blockchain/util.py +++ b/src/dexorder/blockchain/util.py @@ -1,7 +1,56 @@ import json +import logging + +from eth_abi.packed import encode_packed +from eth_utils import keccak, to_bytes, to_checksum_address + +from dexorder import config, current_w3 +from dexorder.base.chain import current_chain +from dexorder.contract import ContractProxy + +log = logging.getLogger(__name__) + +factory = {} + +def get_factory() -> ContractProxy: + chain_id = current_chain.get().chain_id + found = factory.get(chain_id) + if found is None: + deployment_tag = config.deployments.get(str(chain_id), 'latest') + try: + with open(f'contract/broadcast/Deploy.sol/{chain_id}/run-{deployment_tag}.json', 'rt') as file: + deployment = json.load(file) + for tx in deployment['transactions']: + if tx['contractName'] == 'Factory': + addr = tx['contractAddress'] + found = factory[chain_id] = ContractProxy(addr, 'Factory') + break + except FileNotFoundError: + log.warning(f'Could not find deployment for chain {chain_id} "{deployment_tag}"') + return found def get_contract_data(name): - with open(f'contract/out/{name}.sol/{name}.json') as file: + with open(f'contract/out/{name}.sol/{name}.json', 'rt') as file: return json.load(file) +def get_contract_event(contract_name:str, event_name:str): + return getattr(current_w3.get().eth.contract(abi=get_contract_data(contract_name)['abi']).events, event_name)() + + +with open('contract/out/Vault.sol/Vault.json', 'rt') as _file: + _vault_info = json.load(_file) +VAULT_INIT_CODE_HASH = keccak(to_bytes(hexstr=_vault_info['bytecode']['object'])) + +def vault_address(owner, num): + salt = keccak(encode_packed(['address','uint8'],[owner,num])) + contract_address = keccak( + b"\xff" + + to_bytes(hexstr=get_factory().address) + + salt + + VAULT_INIT_CODE_HASH + ).hex()[-40:] + addr = to_checksum_address(contract_address) + # log.debug(f'vault addr {owner} #{num} => {salt.hex()} {VAULT_INIT_CODE_HASH.hex()} = {addr}') + return addr + diff --git a/src/dexorder/configuration/schema.py b/src/dexorder/configuration/schema.py index 3195c78..6335b8d 100644 --- a/src/dexorder/configuration/schema.py +++ b/src/dexorder/configuration/schema.py @@ -1,3 +1,4 @@ +from collections import defaultdict from dataclasses import dataclass, field from typing import Optional, Union @@ -22,6 +23,7 @@ class Config: account: Optional[str] = None # may be a private key or an account alias accounts: Optional[dict[str,str]] = field(default_factory=dict) # account aliases + deployments: Optional[dict[str,str]] = field(default_factory=dict) min_gas: str = '0' diff --git a/src/dexorder/contract/__init__.py b/src/dexorder/contract/__init__.py new file mode 100644 index 0000000..aa74b32 --- /dev/null +++ b/src/dexorder/contract/__init__.py @@ -0,0 +1,13 @@ +from .abi import abis +from .contract_proxy import ContractProxy, Transaction +from .pool_contract import UniswapV3Pool +from .uniswap_contracts import uniswap + +from eth_abi.codec import ABIDecoder, ABIEncoder +from eth_abi.registry import registry as default_registry + +abi_decoder = ABIDecoder(default_registry) +abi_encoder = ABIEncoder(default_registry) + +def VaultContract(addr): + return ContractProxy(addr, 'Vault') diff --git a/src/dexorder/contract/abi.py b/src/dexorder/contract/abi.py new file mode 100644 index 0000000..8fd5a8c --- /dev/null +++ b/src/dexorder/contract/abi.py @@ -0,0 +1,3 @@ +abis = { + # 'WMATIC': '''[{"constant":true,"inputs":[],"name":"name","outputs":[{"name":"","type":"string"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"guy","type":"address"},{"name":"wad","type":"uint256"}],"name":"approve","outputs":[{"name":"","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[],"name":"totalSupply","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"src","type":"address"},{"name":"dst","type":"address"},{"name":"wad","type":"uint256"}],"name":"transferFrom","outputs":[{"name":"","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":false,"inputs":[{"name":"wad","type":"uint256"}],"name":"withdraw","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[],"name":"decimals","outputs":[{"name":"","type":"uint8"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"","type":"address"}],"name":"balanceOf","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[],"name":"symbol","outputs":[{"name":"","type":"string"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"dst","type":"address"},{"name":"wad","type":"uint256"}],"name":"transfer","outputs":[{"name":"","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":false,"inputs":[],"name":"deposit","outputs":[],"payable":true,"stateMutability":"payable","type":"function"},{"constant":true,"inputs":[{"name":"","type":"address"},{"name":"","type":"address"}],"name":"allowance","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"payable":true,"stateMutability":"payable","type":"fallback"},{"anonymous":false,"inputs":[{"indexed":true,"name":"src","type":"address"},{"indexed":true,"name":"guy","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Approval","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"src","type":"address"},{"indexed":true,"name":"dst","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Transfer","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"dst","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Deposit","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"src","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Withdrawal","type":"event"}]''', +} diff --git a/src/dexorder/contract/contract_proxy.py b/src/dexorder/contract/contract_proxy.py new file mode 100644 index 0000000..4d9adbc --- /dev/null +++ b/src/dexorder/contract/contract_proxy.py @@ -0,0 +1,114 @@ +import json +from typing import Optional + +from web3.exceptions import TransactionNotFound +from web3.types import TxReceipt + +from dexorder import Account, current_w3 +from dexorder.base.account import current_account +from dexorder.database.model.block import current_block +from dexorder.util import hexstr + + +class Transaction: + def __init__(self, account: Account, tx_id_bytes:bytes): + self.account = account + self.id_bytes = tx_id_bytes + self._id = None + self._receipt: Optional[TxReceipt] = None + + def wait(self) -> TxReceipt: + if self._receipt is None: + self._receipt = current_w3.get().eth.wait_for_transaction_receipt(self.id_bytes) + self.account.transaction_counter += 1 + return self._receipt + + @property + def id(self) -> str: + if self._id is None: + self._id = self.id_bytes.hex() + return self._id + + @property + def receipt(self) -> TxReceipt: + if self._receipt is None: + try: + self._receipt = current_w3.get().eth.get_transaction_receipt(self.id_bytes) + self.account.transaction_counter += 1 + except TransactionNotFound: + pass + return self._receipt + + def __repr__(self): + self.receipt() + receipt_status = 'IN_FLIGHT' if self._receipt is None else 'REVERTED' if self._receipt.status == 0 else self._receipt.blockNumber + return f'Transaction({self.id},{receipt_status})' + + +def call_wrapper(func): + def f(*args, **kwargs): + return func(*args, **kwargs).call(block_identifier=hexstr(current_block.get().hash)) + return f + + +def transact_wrapper(func): + def f(*args, **kwargs): + try: + account = current_account.get() + except LookupError: + raise RuntimeError('Cannot invoke a transaction without setting an Account.') + return Transaction(account, func(*args, **kwargs).transact()) + + return f + + +class ContractProxy: + def __init__(self, address: str = None, name=None, *, _contracts=None, _wrapper=call_wrapper, abi=None): + """ + For regular contract use, either name or abi must be supplied. If abi is None, then name is used to find + the ABI in the project's contract/out/ directory. Otherwise, abi may be either a string or preparsed dict. + If address is not supplied, this proxy may still be used to construct a new contract via deploy(). After + deploy() completes, the address member will be populated. + """ + self.address = address + self._interface_name = name if name is not None else self.__class__.__name__ + if abi is not None and type(abi) is str: + abi = json.loads(abi) + self._abi = abi + # contracts hold a ref to their w3, so we lazy-construct a contract for each unique w3 we find in the ctx + self._contracts = _contracts if _contracts is not None else {} + self._wrapper = _wrapper + + @property + def contract(self): + # lazy construction of the contract based on the current context's w3 + w3 = current_w3.get() + assert w3 is not None + found = self._contracts.get(w3) + if not found: + # this constructor interacts with _make_contract(address, abi_or_name) in dexorder.blockchain.connection() + found = w3.eth.Contract(self.address, self._abi or self._interface_name) + self._contracts[w3] = found + return found + + def deploy(self, *args): + """ + Calls the contract constructor transaction and waits to receive a transaction receipt. + """ + tx: Transaction = self.transact.constructor(*args) + receipt = tx.wait() + self.address = receipt.contractAddress + self._contracts.clear() + return receipt + + @property + def transact(self): + # noinspection PyTypeChecker + return ContractProxy(self.address, self._interface_name, _contracts=self._contracts, _wrapper=transact_wrapper, abi=self._abi) + + def __getattr__(self, item): + return self._wrapper(self.contract.constructor if item == 'constructor' else self.contract.functions[item]) + + def __repr__(self): + addr = self.contract.address + return f'{self._interface_name}({addr or ""})' diff --git a/src/dexorder/contract/pool_contract.py b/src/dexorder/contract/pool_contract.py new file mode 100644 index 0000000..32c1394 --- /dev/null +++ b/src/dexorder/contract/pool_contract.py @@ -0,0 +1,6 @@ +from .contract_proxy import ContractProxy + + +class UniswapV3Pool (ContractProxy): + def __init__(self, address: str = None): + super().__init__(address, 'IUniswapV3Pool') diff --git a/src/dexorder/contract/uniswap_contracts.py b/src/dexorder/contract/uniswap_contracts.py new file mode 100644 index 0000000..0966396 --- /dev/null +++ b/src/dexorder/contract/uniswap_contracts.py @@ -0,0 +1,19 @@ +from dexorder.base.chain import Ethereum, Goerli, Polygon, Mumbai +from dexorder.contract.contract_proxy import ContractProxy +from dexorder.blockchain import ByBlockchainDict + + +class _UniswapContracts (ByBlockchainDict[ContractProxy]): + + def __init__(self): + std = { + 'factory': ContractProxy('0x1F98431c8aD98523631AE4a59f267346ea31F984', 'IUniswapV3Factory'), + 'nfpm': ContractProxy('0xC36442b4a4522E871399CD717aBDD847Ab11FE88', 'INonfungiblePositionManager'), + 'quoter': ContractProxy('0xb27308f9F90D607463bb33eA1BeBb41C27CE5AB6', 'IQuoter'), + 'swap_router': ContractProxy('0xE592427A0AEce92De3Edee1F18E0157C05861564', 'ISwapRouter'), + } + super().__init__({chain.chain_id:std for chain in (Ethereum, Polygon, Goerli, Mumbai)}) + + +uniswap = _UniswapContracts() + diff --git a/src/dexorder/data/__init__.py b/src/dexorder/data/__init__.py index 328c9df..c65edff 100644 --- a/src/dexorder/data/__init__.py +++ b/src/dexorder/data/__init__.py @@ -4,7 +4,7 @@ from dexorder.blockstate import BlockSet, BlockDict # if pub is True, then event is the current series name, room is the key, and value is passed through # values of DELETE are serialized as nulls -vault_addresses = BlockSet('v', db=True, redis=True, pub=True) +vault_addresses = BlockSet('v', db=True, redis=True) vault_tokens = BlockDict('vt', db=True, redis=True, pub=True) pool_prices = BlockDict('p', db=True, redis=True, pub=True) underfunded_vaults = BlockSet('uv', db=True) diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py new file mode 100644 index 0000000..7a2156f --- /dev/null +++ b/src/dexorder/event_handler.py @@ -0,0 +1,113 @@ +import logging + +from web3.types import EventData + +from dexorder import dec, current_pub, current_w3 +from dexorder.base.chain import current_chain +from dexorder.blockchain.util import vault_address, get_contract_event, get_factory, get_contract_data +from dexorder.contract import VaultContract +from dexorder.data import pool_prices, vault_addresses, vault_tokens, underfunded_vaults +from dexorder.database.model.block import current_block +from dexorder.orderlib.orders import SwapOrderStatus + +log = logging.getLogger(__name__) + + +def setup_logevent_triggers(runner): + runner.events.clear() + + # the triggers for each log events are triggered in the order of event registry, so the + # code ordering here is also the trigger order: e.g. we process all vault creation events + # before any order creations + + vault_created = current_w3.get().eth.contract(get_factory().address, abi=get_contract_data('Factory')['abi']).events.VaultCreated() + runner.add_event_trigger(handle_vault_created, vault_created) + runner.add_event_trigger(handle_transfer, get_contract_event('ERC20', 'Transfer')) + runner.add_event_trigger(handle_swap, get_contract_event('IUniswapV3PoolEvents', 'Swap')) + runner.add_event_trigger(handle_order_placed, get_contract_event('OrderLib', 'DexorderSwapPlaced')) + runner.add_event_trigger(handle_swap_filled, get_contract_event('OrderLib', 'DexorderSwapFilled')) + runner.add_event_trigger(handle_order_completed, get_contract_event('OrderLib', 'DexorderSwapCompleted')) + runner.add_event_trigger(handle_order_error, get_contract_event('OrderLib', 'DexorderSwapError')) + + +async def handle_order_placed(event: EventData): + # event DexorderPlaced (uint64 startOrderIndex, uint8 numOrders); + addr = event['address'] + start_index = int(event['args']['startOrderIndex']) + num_orders = int(event['args']['numOrders']) + log.debug(f'DexorderPlaced {addr} {start_index} {num_orders}') + if addr not in vault_addresses: + log.warning(f'block {current_block.get()} order from unknown vault {addr}') # todo insert (short) block hash into all logs + # return todo discard rogues + vault = VaultContract(addr) + log.debug(await vault.orderList()) + for index in range(start_index, start_index+num_orders): + obj = await vault.swapOrderStatus(index) + log.debug(f'raw order status {obj}') + order_status = SwapOrderStatus.load(obj) + log.debug(f'order status {order_status}') + assert order_status == SwapOrderStatus.load(order_status.dump()) + log.debug('assert ok') + # todo record order + +def handle_swap_filled(event: EventData): + # event DexorderSwapFilled (uint64 orderIndex, uint8 trancheIndex, uint256 amountIn, uint256 amountOut); + log.debug(f'DexorderSwapFilled {event}') + +def handle_order_completed(event: EventData): + # event DexorderCompleted (uint64 orderIndex); // todo remove? + log.debug(f'DexorderCompleted {event}') + +def handle_order_error(event: EventData): + # event DexorderError (uint64 orderIndex, string reason); + log.debug(f'DexorderError {event}') + + + +def handle_transfer(transfer: EventData): + to_address = transfer['args']['to'] + log.debug(f'transfer {to_address}') + if to_address in vault_addresses: + token_address = transfer['address'] + vault_tokens.add(token_address) + if to_address in underfunded_vaults: + # todo flag underfunded vault (check token type?) + pass + + +def handle_swap(swap: EventData): + try: + sqrt_price = swap['args']['sqrtPriceX96'] + except KeyError: + return + addr = swap['address'] + d = dec(sqrt_price) + price = d * d / dec(2 ** (96 * 2)) + log.debug(f'pool {addr} {price}') + pool_prices[addr] = price + + +def handle_vault_created(created: EventData): + try: + owner = created['args']['owner'] + num = created['args']['num'] + except KeyError: + log.debug('couldnt parse event data for VaultCreated', created) + return + vault = vault_address(owner,num) + log.debug(f'VaultCreated {owner} #{num} => {vault}') + vault_addresses.add(vault) + vaults = [] + for num in range(256): + addr = vault_address(owner, num) + # log.debug(f'v{num}? {addr}') + if addr in vault_addresses: + vaults.append(addr) + else: + break + # log.debug(f'updated vaults: {vaults}') + current_pub.get()(f'{current_chain.get().chain_id}|{owner}', 'vaults', vaults) + +def handle_order_created(event:EventData): + print('order', event) + diff --git a/src/dexorder/memcache/__init__.py b/src/dexorder/memcache/__init__.py index c1d419e..c0c8fb7 100644 --- a/src/dexorder/memcache/__init__.py +++ b/src/dexorder/memcache/__init__.py @@ -5,6 +5,7 @@ from contextvars import ContextVar import redis.asyncio as redis from redis.asyncio import Redis from redis.asyncio.client import Pipeline +from socket_io_emitter import Emitter from dexorder import config @@ -42,4 +43,3 @@ class Memcache: memcache = Memcache() current_redis = ContextVar[Redis]('current_redis') - diff --git a/src/dexorder/memcache/memcache_state.py b/src/dexorder/memcache/memcache_state.py index bb0372e..b8b8329 100644 --- a/src/dexorder/memcache/memcache_state.py +++ b/src/dexorder/memcache/memcache_state.py @@ -41,7 +41,7 @@ class RedisState (SeriesCollection): # noinspection PyAsyncCall - async def save(self, block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]], *, publish=False ): + async def save(self, block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]): # the diffs must be already compressed such that there is only one action per key chain = current_chain.get() assert block.chain == chain.chain_id @@ -50,7 +50,7 @@ class RedisState (SeriesCollection): sdels: dict[str,set[str]] = defaultdict(set) hsets: dict[str,dict[str,str]] = defaultdict(dict) hdels: dict[str,set[str]] = defaultdict(set) - pubs: list[tuple[str,str,Any]] = [] # series, key, value => room, event, value + pubs: list[tuple[str,str,list[Any]]] = [] # series, key, value => room, event, value for diff in compress_diffs(diffs): try: d = self.datas[diff.series] @@ -67,8 +67,9 @@ class RedisState (SeriesCollection): elif callable(pub_kv): pub_kv = pub_kv((key,value)) if pub_kv is not None: + k, v = pub_kv # noinspection PyTypeChecker - pubs.append((series,*pub_kv)) + pubs.append((series,k,[v])) if diff.value is DELETE: if t == DataType.SET: sdels[series].add(key) @@ -95,11 +96,15 @@ class RedisState (SeriesCollection): r.hdel(series, *keys) block_series = f'{chain_id}|block.latest' r.json(json_encoder).set(block_series,'$',block.data) - pubs.append((str(chain_id), 'block.latest', block.data)) - # separate batch for + pubs.append((str(chain_id), 'block.latest', [block.data])) + # separate batch for pubs if pubs: - async with memcache.batch() as r: - r: Pipeline - io = Emitter(dict(client=r)) - for s,k,v in pubs: - io.To(s).Emit(k,v) + await publish_all(pubs) + + +async def publish_all(pubs: list[tuple[str,str,list[Any]]]): + async with memcache.batch() as r: + r: Pipeline + io = Emitter(dict(client=r)) + for room, event, args in pubs: + io.To(room).Emit(event, *args) diff --git a/src/dexorder/orderlib/orders.py b/src/dexorder/orderlib/orders.py new file mode 100644 index 0000000..db85e76 --- /dev/null +++ b/src/dexorder/orderlib/orders.py @@ -0,0 +1,185 @@ +import logging +from abc import ABC, abstractmethod +from dataclasses import dataclass +from enum import Enum +from typing import Optional + +from dexorder.contract import abi_decoder, abi_encoder + +log = logging.getLogger(__name__) + + +# enum SwapOrderState { +# Open, Canceled, Filled, Template +# } + +class SwapOrderState (Enum): + Open = 0 + Canceled = 1 + Filled = 2 + Template = 3 + +class Exchange (Enum): + UniswapV2 = 0 + UniswapV3 = 1 + +@dataclass +class Route: + exchange: Exchange + fee: int + + @staticmethod + def load(obj): + return Route(Exchange(obj[0]), obj[1]) + + def dump(self): + return self.exchange.value, self.fee + +@dataclass +class SwapOrder: + tokenIn: str + tokenOut: str + route: Route + amount: int + amountIsInput: bool + outputDirectlyToOwner: bool + chainOrder: int + tranches: list['Tranche'] + + @staticmethod + def load(obj): + return SwapOrder(obj[0], obj[1], Route.load(obj[2]), obj[3], obj[4], obj[5], obj[6], [Tranche.load(t) for t in obj[7]]) + + def dump(self): + return (self.tokenIn, self.tokenOut, self.route.dump(), self.amount, self.amountIsInput, + self.outputDirectlyToOwner, self.chainOrder, [t.dump() for t in self.tranches]) + +@dataclass +class SwapOrderStatus: + order: SwapOrder + state: SwapOrderState + start: int + ocoGroup: Optional[int] + filledIn: int + filledOut: int + + @staticmethod + def load(obj): + order = SwapOrder.load(obj[0]) + state = SwapOrderState(obj[1]) + start = obj[2] + ocoGroup = None if obj[3] == NO_OCO else obj[3] + filledIn = obj[4] + filledOut = obj[5] + return SwapOrderStatus(order, state, start, ocoGroup, filledIn, filledOut) + + def dump(self): + return self.order.dump(), self.state.value, self.start, self.ocoGroup, self.filledIn, self.filledOut + + +NO_OCO = 18446744073709551615 # max uint64 + +class ConstraintMode (Enum): + Time = 0 + Limit = 1 + Trailing = 2 + Barrier = 3 + Line = 4 + +@dataclass +class Constraint (ABC): + @staticmethod + def load(obj): + mode = ConstraintMode(obj[0]) + if mode == ConstraintMode.Time: + return TimeConstraint.load(obj[1]) + else: + raise NotImplementedError + + @abstractmethod + def dump(self): ... + + @staticmethod + def _dump(mode, types, values): + return mode, abi_encoder.encode(types, values) + +@dataclass +class PriceConstraint (Constraint): + isAbove: bool + isRatio: bool + valueSqrtX96: int + + def dump(self): + return Constraint._dump(ConstraintMode.Limit, ('bool','bool','uint160'), (self.isAbove, self.isRatio, self.valueSqrtX96)) + + +@dataclass +class LineConstraint (Constraint): + isAbove: bool + isRatio: bool + time: int + valueSqrtX96: int + slopeSqrtX96: int + + +class TimeMode (Enum): + Timestamp = 0 + SinceOrderStart = 1 + + +@dataclass +class Time: + mode: TimeMode + time: int + + +DISTANT_PAST = 0 +DISTANT_FUTURE = 4294967295 # max uint32 + + +@dataclass +class TimeConstraint (Constraint): + earliest: Time + latest: Time + + TYPES = ['uint8', 'uint32', 'uint8', 'uint32'] + + @staticmethod + def load(obj: bytes): + earliest_mode, earliest_time, latest_mode, latest_time = abi_decoder.decode(TimeConstraint.TYPES, obj) + return TimeConstraint(Time(TimeMode(earliest_mode),earliest_time), Time(TimeMode(latest_mode),latest_time)) + + def dump(self): + return Constraint._dump(ConstraintMode.Time, TimeConstraint.TYPES, + (self.earliest.mode.value, self.earliest.time, self.latest.mode.value, self.latest.time)) + + +@dataclass +class Tranche: + fraction: int # 18-decimal fraction of the order amount which is available to this tranche. must be <= 1 + constraints: list[Constraint] + + @staticmethod + def load(obj): + return Tranche(obj[0], [Constraint.load(c) for c in obj[1]]) + + def dump(self): + return self.fraction, [c.dump() for c in self.constraints] + + +@dataclass +class PriceProof: + proof: int + + +class OcoMode (Enum): + NO_OCO = 0 + CANCEL_ON_PARTIAL_FILL = 1 + CANCEL_ON_COMPLETION = 2 + + +@dataclass +class OcoGroup: + mode: OcoMode + startIndex: int + num: int diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index dd37516..ca08f7b 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -1,22 +1,18 @@ import logging -from typing import Callable, Union +from typing import Callable, Union, Any, Iterable -from web3 import AsyncWeb3 from web3.contract.contract import ContractEvents from web3.exceptions import LogTopicError, MismatchedABI -from web3.types import EventData -from dexorder import Blockchain, db, blockchain, NARG, dec +from dexorder import Blockchain, db, blockchain, NARG, current_pub from dexorder.base.chain import current_chain -from dexorder.base.fork import Fork, current_fork +from dexorder.base.fork import current_fork, Fork from dexorder.blockchain.connection import create_w3_ws -from dexorder.blockchain.util import get_contract_data -from dexorder.blockstate import DiffItem, BlockState, current_blockstate +from dexorder.blockstate import BlockState, current_blockstate from dexorder.blockstate.diff import DiffEntryItem -from dexorder.blockstate.state import compress_diffs -from dexorder.data import pool_prices, vault_tokens, underfunded_vaults, vault_addresses from dexorder.database.model import Block from dexorder.database.model.block import current_block, latest_block +from dexorder.event_handler import setup_logevent_triggers from dexorder.util import hexstr, topic from dexorder.util.async_util import maywait @@ -26,7 +22,7 @@ log = logging.getLogger(__name__) # todo detect reorgs and generate correct onHeadUpdate set by unioning the changes along the two forks, not including their common ancestor deltas class BlockStateRunner: - def __init__(self, state: BlockState = None): + def __init__(self, state: BlockState = None, *, publish_all=None): """ If state is None, then it is initialized as empty using the first block seen as the root block. Then the second block begins log event handling. """ @@ -41,6 +37,8 @@ class BlockStateRunner: # onPromotion callbacks are invoked with a list of DiffItems used to advance the root state self.on_promotion: list[Callable[[Block,list[DiffEntryItem]],None]] = [] + self.publish_all: Callable[[Iterable[tuple[str,str,Any]]],None] = publish_all + async def run(self): """ @@ -67,12 +65,13 @@ class BlockStateRunner: w3 = blockchain.connect() w3ws = create_w3_ws() - self.setup_triggers(w3) chain_id = await w3ws.eth.chain_id chain = Blockchain.for_id(chain_id) current_chain.set(chain) + setup_logevent_triggers(self) + state = self.state async with w3ws as w3ws: @@ -94,50 +93,53 @@ class BlockStateRunner: state = BlockState(block) current_blockstate.set(state) log.info('Created new empty root state') + fork = Fork([block.hash], height=block.height) else: fork = state.add_block(block) - if fork is None: - log.debug(f'discarded late-arriving head {block}') - else: - batches = [] - if fork.disjoint: - # backfill batches - for callback, event, log_filter in self.events: - from_height = state.root_block.height + 1 - end_height = block.height - while from_height <= end_height: - to_height = min(end_height, from_height + chain.batch_size - 1) - lf = dict(log_filter) - lf['fromBlock'] = from_height - lf['toBlock'] = to_height - log.debug(f'batch backfill {from_height} - {to_height}') - batches.append((w3.eth.get_logs(log_filter), callback, event, lf)) - from_height += chain.batch_size - else: - # event callbacks are triggered in the order in which they're registered. the events passed to - # each callback are in block transaction order - for callback, event, log_filter in self.events: + if fork is None: + log.debug(f'discarded late-arriving head {block}') + else: + batches = [] + if fork.disjoint: + # backfill batches + for callback, event, log_filter in self.events: + from_height = state.root_block.height + 1 + end_height = block.height + while from_height <= end_height: + to_height = min(end_height, from_height + chain.batch_size - 1) lf = dict(log_filter) - lf['blockhash'] = w3.to_hex(block.hash) - batches.append((w3.eth.get_logs(log_filter), callback, event, log_filter)) + lf['fromBlock'] = from_height + lf['toBlock'] = to_height + log.debug(f'batch backfill {from_height} - {to_height}') + batches.append((w3.eth.get_logs(lf), callback, event, lf)) + from_height += chain.batch_size + else: + # event callbacks are triggered in the order in which they're registered. the events passed to + # each callback are in block transaction order + for callback, event, log_filter in self.events: + lf = dict(log_filter) + lf['blockHash'] = hexstr(block.hash) + batches.append((w3.eth.get_logs(lf), callback, event, log_filter)) - # set up for callbacks - current_block.set(block) - current_fork.set(fork) - session = db.session # todo move session creation to here? - session.begin() - session.add(block) - # callbacks - for future,callback,event,filter_args in batches: - log_events = await future - for log_event in log_events: - try: - parsed = event.process_log(log_event) - except (LogTopicError, MismatchedABI): - pass - else: - # todo try/except for known retryable errors - callback(parsed) + # set up for callbacks + current_block.set(block) + current_fork.set(fork) + session = db.session + session.begin() + session.add(block) + pubs = [] + current_pub.set(lambda room, evnt, *args: pubs.append((room, evnt, args))) + # callbacks + for future,callback,event,filter_args in batches: + log_events = await future + for log_event in log_events: + try: + parsed = event.process_log(log_event) + except (LogTopicError, MismatchedABI): + pass + else: + # todo try/except for known retryable errors + await maywait(callback(parsed)) # todo check for reorg and generate a reorg diff list diff_items = state.diffs_by_hash[block.hash] @@ -151,6 +153,9 @@ class BlockStateRunner: for callback in self.on_promotion: # todo try/except for known retryable errors callback(state.root_block, diff_items) + + if pubs and self.publish_all: + await maywait(self.publish_all(pubs)) except: if session is not None: session.rollback() @@ -163,41 +168,8 @@ class BlockStateRunner: log.info(f'completed block {block}') - @staticmethod - def handle_transfer(transfer: EventData): - to_address = transfer['args']['to'] - log.debug(f'transfer {to_address}') - if to_address in vault_addresses: - token_address = transfer['address'] - vault_tokens.add(token_address) - if to_address in underfunded_vaults: - # todo flag underfunded vault (check token type?) - pass - - - @staticmethod - def handle_swap(swap: EventData): - try: - sqrt_price = swap['args']['sqrtPriceX96'] - except KeyError: - return - addr = swap['address'] - d = dec(sqrt_price) - price = d*d / dec(2**(96*2)) - log.debug(f'pool {addr} {price}') - pool_prices[addr] = price - def add_event_trigger(self, callback:Callable[[dict],None], event: ContractEvents, log_filter: Union[dict,str]=None): if log_filter is None: log_filter = {'topics':[topic(event.abi)]} self.events.append((callback, event, log_filter)) - - def setup_triggers(self, w3: AsyncWeb3): - self.events.clear() - - transfer = w3.eth.contract(abi=get_contract_data('ERC20')['abi']).events.Transfer() - self.add_event_trigger(self.handle_transfer, transfer) - - swap = w3.eth.contract(abi=get_contract_data('IUniswapV3PoolEvents')['abi']).events.Swap() - self.add_event_trigger(self.handle_swap, swap)