orderlib, parse order status from chain

This commit is contained in:
Tim Olson
2023-10-08 01:10:51 -04:00
parent 5c71305293
commit c2abf7dc32
23 changed files with 640 additions and 149 deletions

View File

@@ -1,4 +1,10 @@
from decimal import Decimal as dec from contextvars import ContextVar
from decimal import Decimal
from typing import Callable, Any
from web3 import AsyncWeb3
dec = Decimal
# NARG is used in argument defaults to mean "not specified" rather than "specified as None" # NARG is used in argument defaults to mean "not specified" rather than "specified as None"
class _NARG: class _NARG:
@@ -10,6 +16,10 @@ WEI = 1
GWEI = 1_000_000_000 GWEI = 1_000_000_000
ETH = 1_000_000_000_000_000_000 ETH = 1_000_000_000_000_000_000
current_w3 = ContextVar[AsyncWeb3]('current_w3')
current_pub = ContextVar[Callable[[str,str,Any],None]]('current_pub')
# noinspection PyProtectedMember # noinspection PyProtectedMember
from .util.cwd import _cwd from .util.cwd import _cwd
_cwd() # do this first so that config has the right current working directory _cwd() # do this first so that config has the right current working directory
@@ -19,6 +29,6 @@ from .base.chain import Blockchain # the singletons are loaded into the dexorder
from .util import async_yield from .util import async_yield
from .base.fixed import Fixed2, FixedDecimals, Dec18 from .base.fixed import Fixed2, FixedDecimals, Dec18
from .configuration import config from .configuration import config
from .base.account import Account # must come before context from .base.account import Account
from .base.token import Token, tokens from .base.token import Token, tokens
from .database import db from .database import db

View File

@@ -1,10 +1,11 @@
from contextvars import ContextVar
from typing import Union, Optional from typing import Union, Optional
import eth_account import eth_account
from eth_account.signers.local import LocalAccount from eth_account.signers.local import LocalAccount
from web3.middleware import construct_sign_and_send_raw_middleware from web3.middleware import construct_sign_and_send_raw_middleware
from dexorder import NARG, config from dexorder import NARG, config, current_w3
# this is just here for typing the extra .name. the __new__() function returns an eth_account...LocalAccount # this is just here for typing the extra .name. the __new__() function returns an eth_account...LocalAccount
@@ -59,8 +60,10 @@ class Account (LocalAccount):
w3.middleware_onion.add(self.signing_middleware, 'account_signer') w3.middleware_onion.add(self.signing_middleware, 'account_signer')
def balance(self): def balance(self):
return ctx.w3.eth.get_balance(self.address) return current_w3.get().eth.get_balance(self.address)
def __str__(self): def __str__(self):
return self.name return self.name
current_account: ContextVar[Account] = ContextVar('current_account')

View File

@@ -24,8 +24,8 @@ class Blockchain:
""" """
self.chain_id = chain_id self.chain_id = chain_id
self.name = name self.name = name
self.confirms = confirms self.confirms = confirms # todo configure
self.batch_size = batch_size self.batch_size = batch_size # todo configure
Blockchain._instances_by_id[chain_id] = self Blockchain._instances_by_id[chain_id] = self
Blockchain._instances_by_name[name] = self Blockchain._instances_by_name[name] = self

View File

@@ -1,26 +0,0 @@
from typing import Union
from defaultlist import defaultlist
from eth_utils import keccak
from dexorder.base.blockstate import BlockDict
class EventManager:
def __init__(self):
self.all_topics = set()
self.triggers:dict[str,BlockDict] = {}
self.rings = defaultlist(list)
def add_handler(self, topic: Union[bytes,str], callback):
if type(topic) is str:
topic = bytes.fromhex(topic[2:]) if topic.startswith('0x') else keccak(text=topic)
triggers = self.triggers.get(topic)
if triggers is None:
triggers = self.triggers[topic] = BlockDict(topic)
triggers.add(callback)
self.all_topics.add(topic)
def publish_topic(self, topic, data):
for callback, _ in self.triggers.get(topic, {}).items():
callback(data)

View File

@@ -4,9 +4,10 @@ from decimal import Decimal
from sqlalchemy.orm import Mapped from sqlalchemy.orm import Mapped
from web3 import Web3 from web3 import Web3
from dexorder import config, Blockchain, NARG, FixedDecimals, ADDRESS_0 from dexorder import config, Blockchain, NARG, FixedDecimals, ADDRESS_0, current_w3
from dexorder.base.account import current_account
from dexorder.blockchain import ByBlockchainDict from dexorder.blockchain import ByBlockchainDict
from dexorder.base.chain import Polygon, Arbitrum, Ethereum from dexorder.base.chain import current_chain
from dexorder.contract import ContractProxy, abis from dexorder.contract import ContractProxy, abis
import dexorder.database.column as col import dexorder.database.column as col
@@ -28,7 +29,7 @@ class Token (ContractProxy, FixedDecimals):
# noinspection PyTypeChecker # noinspection PyTypeChecker
return Web3.to_checksum_address(name_or_address) return Web3.to_checksum_address(name_or_address)
except ValueError: except ValueError:
raise ValueError(f'Could not resolve token {name_or_address} for chain {ctx.chain_id}') raise ValueError(f'Could not resolve token {name_or_address} for chain {current_chain.get().chain_id}')
def __init__(self, chain_id, address, decimals, symbol, name, *, abi=None): def __init__(self, chain_id, address, decimals, symbol, name, *, abi=None):
FixedDecimals.__init__(self, decimals) FixedDecimals.__init__(self, decimals)
@@ -39,6 +40,7 @@ class Token (ContractProxy, FixedDecimals):
abi = abis.get(abi,abi) abi = abis.get(abi,abi)
ContractProxy.__init__(self, address, load, abi=abi) ContractProxy.__init__(self, address, load, abi=abi)
self.chain_id = chain_id self.chain_id = chain_id
# noinspection PyTypeChecker
self.address = address self.address = address
self.decimals = decimals self.decimals = decimals
self.symbol = symbol self.symbol = symbol
@@ -46,7 +48,7 @@ class Token (ContractProxy, FixedDecimals):
def balance(self, address: str = None) -> int: def balance(self, address: str = None) -> int:
if address is None: if address is None:
address = ctx.address address = current_account.get().address
return self.balanceOf(address) return self.balanceOf(address)
def balance_dec(self, address: str = None) -> Decimal: def balance_dec(self, address: str = None) -> Decimal:
@@ -72,7 +74,7 @@ class NativeToken (FixedDecimals):
@staticmethod @staticmethod
def get( chain_id = None) -> 'NativeToken': def get( chain_id = None) -> 'NativeToken':
if chain_id is None: if chain_id is None:
chain_id = ctx.chain_id chain_id = current_chain.get().chain_id
return _native_tokens[chain_id] return _native_tokens[chain_id]
def __init__(self, chain_id, decimals, symbol, name, *, wrapper_token = None): def __init__(self, chain_id, decimals, symbol, name, *, wrapper_token = None):
@@ -85,9 +87,9 @@ class NativeToken (FixedDecimals):
def balance(self, address: str = None) -> int: def balance(self, address: str = None) -> int:
if address is None: if address is None:
address = ctx.address address = current_account.get()
assert ctx.chain_id == self.chain_id assert current_chain.get().chain_id == self.chain_id
return ctx.w3.eth.get_balance(address) return current_w3.get().eth.get_balance(address)
def balance_dec(self, address: str = None) -> Decimal: def balance_dec(self, address: str = None) -> Decimal:
return self.dec(self.balance(address)) return self.dec(self.balance(address))

View File

@@ -1,14 +1,14 @@
import logging import logging
import sys
from asyncio import CancelledError from asyncio import CancelledError
from dexorder import db, config, Blockchain from dexorder import db, config, Blockchain
from dexorder.base.chain import current_chain from dexorder.base.chain import current_chain
from dexorder.bin.executable import execute from dexorder.bin.executable import execute
from dexorder.blockstate import DiffItem, DiffEntry
from dexorder.blockstate.blockdata import BlockData from dexorder.blockstate.blockdata import BlockData
from dexorder.blockstate.db_state import DbState from dexorder.blockstate.db_state import DbState
from dexorder.configuration import parse_args from dexorder.configuration import parse_args
from dexorder.memcache.memcache_state import RedisState from dexorder.memcache.memcache_state import RedisState, publish_all
from dexorder.memcache import memcache from dexorder.memcache import memcache
from dexorder.runner import BlockStateRunner from dexorder.runner import BlockStateRunner
@@ -16,7 +16,7 @@ log = logging.getLogger('dexorder')
async def main(): async def main():
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO, stream=sys.stdout)
log.setLevel(logging.DEBUG) log.setLevel(logging.DEBUG)
parse_args() parse_args()
current_chain.set(Blockchain.get(config.chain)) current_chain.set(Blockchain.get(config.chain))
@@ -35,7 +35,7 @@ async def main():
await redis_state.init(state) await redis_state.init(state)
log.info(f'loaded state from db for root block {state.root_block}') log.info(f'loaded state from db for root block {state.root_block}')
runner = BlockStateRunner(state) runner = BlockStateRunner(state, publish_all=publish_all if redis_state else None)
if db: if db:
# noinspection PyUnboundLocalVariable # noinspection PyUnboundLocalVariable
runner.on_promotion.append(db_state.save) runner.on_promotion.append(db_state.save)

View File

@@ -1,3 +1,2 @@
from .by_blockchain import ByBlockchainDict, ByBlockchainList, ByBlockchainCollection from .by_blockchain import ByBlockchainDict, ByBlockchainList, ByBlockchainCollection
from .connection import connect from .connection import connect
from dexorder.base.chain import Ethereum, Polygon, Goerli, Mumbai, Arbitrum, BSC

View File

@@ -1,2 +0,0 @@
from dexorder import Blockchain

View File

@@ -1,16 +1,12 @@
from contextvars import ContextVar
from hexbytes import HexBytes from hexbytes import HexBytes
from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider
from dexorder.blockchain.util import get_contract_data from dexorder.blockchain.util import get_contract_data
from .. import current_w3
from ..configuration import resolve_rpc_url from ..configuration import resolve_rpc_url
from ..configuration.resolve import resolve_ws_url from ..configuration.resolve import resolve_ws_url
current_w3 = ContextVar('current_w3')
def connect(rpc_url=None): def connect(rpc_url=None):
""" """
connects to the rpc_url and configures the context connects to the rpc_url and configures the context

View File

@@ -0,0 +1,28 @@
class Fee:
LOWEST = 100
LOW = 500
MEDIUM = 3000
HIGH = 10000
def ordered_addresses(addr_a:str, addr_b:str):
return (addr_a, addr_b) if addr_a.lower() <= addr_b.lower() else (addr_b, addr_a)
def pool_address(_factory_addr:str, _addr_a:str, _addr_b:str):
# todo compute pool address
raise NotImplementedError
# addr0, addr1 = ordered_addresses(addr_a, addr_b)
# get_create2_address()
# use the util.liquidity or util.simple_liquidity package instead
# def liquidity_for_amount_0(lower: int, upper: int, amount_0: int):
# lower = convert.tick_to_price(lower)
# upper = convert.tick_to_price(upper)
# return amount_0 * upper * lower / (upper - lower)
#
# def liquidity_for_amount_1(lower: int, upper: int, amount_1: int ):
# lower = convert.tick_to_price(lower)
# upper = convert.tick_to_price(upper)
# return amount_1 / (upper - lower)

View File

@@ -1,7 +1,56 @@
import json import json
import logging
from eth_abi.packed import encode_packed
from eth_utils import keccak, to_bytes, to_checksum_address
from dexorder import config, current_w3
from dexorder.base.chain import current_chain
from dexorder.contract import ContractProxy
log = logging.getLogger(__name__)
factory = {}
def get_factory() -> ContractProxy:
chain_id = current_chain.get().chain_id
found = factory.get(chain_id)
if found is None:
deployment_tag = config.deployments.get(str(chain_id), 'latest')
try:
with open(f'contract/broadcast/Deploy.sol/{chain_id}/run-{deployment_tag}.json', 'rt') as file:
deployment = json.load(file)
for tx in deployment['transactions']:
if tx['contractName'] == 'Factory':
addr = tx['contractAddress']
found = factory[chain_id] = ContractProxy(addr, 'Factory')
break
except FileNotFoundError:
log.warning(f'Could not find deployment for chain {chain_id} "{deployment_tag}"')
return found
def get_contract_data(name): def get_contract_data(name):
with open(f'contract/out/{name}.sol/{name}.json') as file: with open(f'contract/out/{name}.sol/{name}.json', 'rt') as file:
return json.load(file) return json.load(file)
def get_contract_event(contract_name:str, event_name:str):
return getattr(current_w3.get().eth.contract(abi=get_contract_data(contract_name)['abi']).events, event_name)()
with open('contract/out/Vault.sol/Vault.json', 'rt') as _file:
_vault_info = json.load(_file)
VAULT_INIT_CODE_HASH = keccak(to_bytes(hexstr=_vault_info['bytecode']['object']))
def vault_address(owner, num):
salt = keccak(encode_packed(['address','uint8'],[owner,num]))
contract_address = keccak(
b"\xff"
+ to_bytes(hexstr=get_factory().address)
+ salt
+ VAULT_INIT_CODE_HASH
).hex()[-40:]
addr = to_checksum_address(contract_address)
# log.debug(f'vault addr {owner} #{num} => {salt.hex()} {VAULT_INIT_CODE_HASH.hex()} = {addr}')
return addr

View File

@@ -1,3 +1,4 @@
from collections import defaultdict
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Optional, Union from typing import Optional, Union
@@ -22,6 +23,7 @@ class Config:
account: Optional[str] = None # may be a private key or an account alias account: Optional[str] = None # may be a private key or an account alias
accounts: Optional[dict[str,str]] = field(default_factory=dict) # account aliases accounts: Optional[dict[str,str]] = field(default_factory=dict) # account aliases
deployments: Optional[dict[str,str]] = field(default_factory=dict)
min_gas: str = '0' min_gas: str = '0'

View File

@@ -0,0 +1,13 @@
from .abi import abis
from .contract_proxy import ContractProxy, Transaction
from .pool_contract import UniswapV3Pool
from .uniswap_contracts import uniswap
from eth_abi.codec import ABIDecoder, ABIEncoder
from eth_abi.registry import registry as default_registry
abi_decoder = ABIDecoder(default_registry)
abi_encoder = ABIEncoder(default_registry)
def VaultContract(addr):
return ContractProxy(addr, 'Vault')

View File

@@ -0,0 +1,3 @@
abis = {
# 'WMATIC': '''[{"constant":true,"inputs":[],"name":"name","outputs":[{"name":"","type":"string"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"guy","type":"address"},{"name":"wad","type":"uint256"}],"name":"approve","outputs":[{"name":"","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[],"name":"totalSupply","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"src","type":"address"},{"name":"dst","type":"address"},{"name":"wad","type":"uint256"}],"name":"transferFrom","outputs":[{"name":"","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":false,"inputs":[{"name":"wad","type":"uint256"}],"name":"withdraw","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[],"name":"decimals","outputs":[{"name":"","type":"uint8"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"","type":"address"}],"name":"balanceOf","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[],"name":"symbol","outputs":[{"name":"","type":"string"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"dst","type":"address"},{"name":"wad","type":"uint256"}],"name":"transfer","outputs":[{"name":"","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":false,"inputs":[],"name":"deposit","outputs":[],"payable":true,"stateMutability":"payable","type":"function"},{"constant":true,"inputs":[{"name":"","type":"address"},{"name":"","type":"address"}],"name":"allowance","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"payable":true,"stateMutability":"payable","type":"fallback"},{"anonymous":false,"inputs":[{"indexed":true,"name":"src","type":"address"},{"indexed":true,"name":"guy","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Approval","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"src","type":"address"},{"indexed":true,"name":"dst","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Transfer","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"dst","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Deposit","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"src","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Withdrawal","type":"event"}]''',
}

View File

@@ -0,0 +1,114 @@
import json
from typing import Optional
from web3.exceptions import TransactionNotFound
from web3.types import TxReceipt
from dexorder import Account, current_w3
from dexorder.base.account import current_account
from dexorder.database.model.block import current_block
from dexorder.util import hexstr
class Transaction:
def __init__(self, account: Account, tx_id_bytes:bytes):
self.account = account
self.id_bytes = tx_id_bytes
self._id = None
self._receipt: Optional[TxReceipt] = None
def wait(self) -> TxReceipt:
if self._receipt is None:
self._receipt = current_w3.get().eth.wait_for_transaction_receipt(self.id_bytes)
self.account.transaction_counter += 1
return self._receipt
@property
def id(self) -> str:
if self._id is None:
self._id = self.id_bytes.hex()
return self._id
@property
def receipt(self) -> TxReceipt:
if self._receipt is None:
try:
self._receipt = current_w3.get().eth.get_transaction_receipt(self.id_bytes)
self.account.transaction_counter += 1
except TransactionNotFound:
pass
return self._receipt
def __repr__(self):
self.receipt()
receipt_status = 'IN_FLIGHT' if self._receipt is None else 'REVERTED' if self._receipt.status == 0 else self._receipt.blockNumber
return f'Transaction({self.id},{receipt_status})'
def call_wrapper(func):
def f(*args, **kwargs):
return func(*args, **kwargs).call(block_identifier=hexstr(current_block.get().hash))
return f
def transact_wrapper(func):
def f(*args, **kwargs):
try:
account = current_account.get()
except LookupError:
raise RuntimeError('Cannot invoke a transaction without setting an Account.')
return Transaction(account, func(*args, **kwargs).transact())
return f
class ContractProxy:
def __init__(self, address: str = None, name=None, *, _contracts=None, _wrapper=call_wrapper, abi=None):
"""
For regular contract use, either name or abi must be supplied. If abi is None, then name is used to find
the ABI in the project's contract/out/ directory. Otherwise, abi may be either a string or preparsed dict.
If address is not supplied, this proxy may still be used to construct a new contract via deploy(). After
deploy() completes, the address member will be populated.
"""
self.address = address
self._interface_name = name if name is not None else self.__class__.__name__
if abi is not None and type(abi) is str:
abi = json.loads(abi)
self._abi = abi
# contracts hold a ref to their w3, so we lazy-construct a contract for each unique w3 we find in the ctx
self._contracts = _contracts if _contracts is not None else {}
self._wrapper = _wrapper
@property
def contract(self):
# lazy construction of the contract based on the current context's w3
w3 = current_w3.get()
assert w3 is not None
found = self._contracts.get(w3)
if not found:
# this constructor interacts with _make_contract(address, abi_or_name) in dexorder.blockchain.connection()
found = w3.eth.Contract(self.address, self._abi or self._interface_name)
self._contracts[w3] = found
return found
def deploy(self, *args):
"""
Calls the contract constructor transaction and waits to receive a transaction receipt.
"""
tx: Transaction = self.transact.constructor(*args)
receipt = tx.wait()
self.address = receipt.contractAddress
self._contracts.clear()
return receipt
@property
def transact(self):
# noinspection PyTypeChecker
return ContractProxy(self.address, self._interface_name, _contracts=self._contracts, _wrapper=transact_wrapper, abi=self._abi)
def __getattr__(self, item):
return self._wrapper(self.contract.constructor if item == 'constructor' else self.contract.functions[item])
def __repr__(self):
addr = self.contract.address
return f'{self._interface_name}({addr or ""})'

View File

@@ -0,0 +1,6 @@
from .contract_proxy import ContractProxy
class UniswapV3Pool (ContractProxy):
def __init__(self, address: str = None):
super().__init__(address, 'IUniswapV3Pool')

View File

@@ -0,0 +1,19 @@
from dexorder.base.chain import Ethereum, Goerli, Polygon, Mumbai
from dexorder.contract.contract_proxy import ContractProxy
from dexorder.blockchain import ByBlockchainDict
class _UniswapContracts (ByBlockchainDict[ContractProxy]):
def __init__(self):
std = {
'factory': ContractProxy('0x1F98431c8aD98523631AE4a59f267346ea31F984', 'IUniswapV3Factory'),
'nfpm': ContractProxy('0xC36442b4a4522E871399CD717aBDD847Ab11FE88', 'INonfungiblePositionManager'),
'quoter': ContractProxy('0xb27308f9F90D607463bb33eA1BeBb41C27CE5AB6', 'IQuoter'),
'swap_router': ContractProxy('0xE592427A0AEce92De3Edee1F18E0157C05861564', 'ISwapRouter'),
}
super().__init__({chain.chain_id:std for chain in (Ethereum, Polygon, Goerli, Mumbai)})
uniswap = _UniswapContracts()

View File

@@ -4,7 +4,7 @@ from dexorder.blockstate import BlockSet, BlockDict
# if pub is True, then event is the current series name, room is the key, and value is passed through # if pub is True, then event is the current series name, room is the key, and value is passed through
# values of DELETE are serialized as nulls # values of DELETE are serialized as nulls
vault_addresses = BlockSet('v', db=True, redis=True, pub=True) vault_addresses = BlockSet('v', db=True, redis=True)
vault_tokens = BlockDict('vt', db=True, redis=True, pub=True) vault_tokens = BlockDict('vt', db=True, redis=True, pub=True)
pool_prices = BlockDict('p', db=True, redis=True, pub=True) pool_prices = BlockDict('p', db=True, redis=True, pub=True)
underfunded_vaults = BlockSet('uv', db=True) underfunded_vaults = BlockSet('uv', db=True)

View File

@@ -0,0 +1,113 @@
import logging
from web3.types import EventData
from dexorder import dec, current_pub, current_w3
from dexorder.base.chain import current_chain
from dexorder.blockchain.util import vault_address, get_contract_event, get_factory, get_contract_data
from dexorder.contract import VaultContract
from dexorder.data import pool_prices, vault_addresses, vault_tokens, underfunded_vaults
from dexorder.database.model.block import current_block
from dexorder.orderlib.orders import SwapOrderStatus
log = logging.getLogger(__name__)
def setup_logevent_triggers(runner):
runner.events.clear()
# the triggers for each log events are triggered in the order of event registry, so the
# code ordering here is also the trigger order: e.g. we process all vault creation events
# before any order creations
vault_created = current_w3.get().eth.contract(get_factory().address, abi=get_contract_data('Factory')['abi']).events.VaultCreated()
runner.add_event_trigger(handle_vault_created, vault_created)
runner.add_event_trigger(handle_transfer, get_contract_event('ERC20', 'Transfer'))
runner.add_event_trigger(handle_swap, get_contract_event('IUniswapV3PoolEvents', 'Swap'))
runner.add_event_trigger(handle_order_placed, get_contract_event('OrderLib', 'DexorderSwapPlaced'))
runner.add_event_trigger(handle_swap_filled, get_contract_event('OrderLib', 'DexorderSwapFilled'))
runner.add_event_trigger(handle_order_completed, get_contract_event('OrderLib', 'DexorderSwapCompleted'))
runner.add_event_trigger(handle_order_error, get_contract_event('OrderLib', 'DexorderSwapError'))
async def handle_order_placed(event: EventData):
# event DexorderPlaced (uint64 startOrderIndex, uint8 numOrders);
addr = event['address']
start_index = int(event['args']['startOrderIndex'])
num_orders = int(event['args']['numOrders'])
log.debug(f'DexorderPlaced {addr} {start_index} {num_orders}')
if addr not in vault_addresses:
log.warning(f'block {current_block.get()} order from unknown vault {addr}') # todo insert (short) block hash into all logs
# return todo discard rogues
vault = VaultContract(addr)
log.debug(await vault.orderList())
for index in range(start_index, start_index+num_orders):
obj = await vault.swapOrderStatus(index)
log.debug(f'raw order status {obj}')
order_status = SwapOrderStatus.load(obj)
log.debug(f'order status {order_status}')
assert order_status == SwapOrderStatus.load(order_status.dump())
log.debug('assert ok')
# todo record order
def handle_swap_filled(event: EventData):
# event DexorderSwapFilled (uint64 orderIndex, uint8 trancheIndex, uint256 amountIn, uint256 amountOut);
log.debug(f'DexorderSwapFilled {event}')
def handle_order_completed(event: EventData):
# event DexorderCompleted (uint64 orderIndex); // todo remove?
log.debug(f'DexorderCompleted {event}')
def handle_order_error(event: EventData):
# event DexorderError (uint64 orderIndex, string reason);
log.debug(f'DexorderError {event}')
def handle_transfer(transfer: EventData):
to_address = transfer['args']['to']
log.debug(f'transfer {to_address}')
if to_address in vault_addresses:
token_address = transfer['address']
vault_tokens.add(token_address)
if to_address in underfunded_vaults:
# todo flag underfunded vault (check token type?)
pass
def handle_swap(swap: EventData):
try:
sqrt_price = swap['args']['sqrtPriceX96']
except KeyError:
return
addr = swap['address']
d = dec(sqrt_price)
price = d * d / dec(2 ** (96 * 2))
log.debug(f'pool {addr} {price}')
pool_prices[addr] = price
def handle_vault_created(created: EventData):
try:
owner = created['args']['owner']
num = created['args']['num']
except KeyError:
log.debug('couldnt parse event data for VaultCreated', created)
return
vault = vault_address(owner,num)
log.debug(f'VaultCreated {owner} #{num} => {vault}')
vault_addresses.add(vault)
vaults = []
for num in range(256):
addr = vault_address(owner, num)
# log.debug(f'v{num}? {addr}')
if addr in vault_addresses:
vaults.append(addr)
else:
break
# log.debug(f'updated vaults: {vaults}')
current_pub.get()(f'{current_chain.get().chain_id}|{owner}', 'vaults', vaults)
def handle_order_created(event:EventData):
print('order', event)

View File

@@ -5,6 +5,7 @@ from contextvars import ContextVar
import redis.asyncio as redis import redis.asyncio as redis
from redis.asyncio import Redis from redis.asyncio import Redis
from redis.asyncio.client import Pipeline from redis.asyncio.client import Pipeline
from socket_io_emitter import Emitter
from dexorder import config from dexorder import config
@@ -42,4 +43,3 @@ class Memcache:
memcache = Memcache() memcache = Memcache()
current_redis = ContextVar[Redis]('current_redis') current_redis = ContextVar[Redis]('current_redis')

View File

@@ -41,7 +41,7 @@ class RedisState (SeriesCollection):
# noinspection PyAsyncCall # noinspection PyAsyncCall
async def save(self, block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]], *, publish=False ): async def save(self, block: Block, diffs: Reversible[Union[DiffItem, DiffEntryItem]]):
# the diffs must be already compressed such that there is only one action per key # the diffs must be already compressed such that there is only one action per key
chain = current_chain.get() chain = current_chain.get()
assert block.chain == chain.chain_id assert block.chain == chain.chain_id
@@ -50,7 +50,7 @@ class RedisState (SeriesCollection):
sdels: dict[str,set[str]] = defaultdict(set) sdels: dict[str,set[str]] = defaultdict(set)
hsets: dict[str,dict[str,str]] = defaultdict(dict) hsets: dict[str,dict[str,str]] = defaultdict(dict)
hdels: dict[str,set[str]] = defaultdict(set) hdels: dict[str,set[str]] = defaultdict(set)
pubs: list[tuple[str,str,Any]] = [] # series, key, value => room, event, value pubs: list[tuple[str,str,list[Any]]] = [] # series, key, value => room, event, value
for diff in compress_diffs(diffs): for diff in compress_diffs(diffs):
try: try:
d = self.datas[diff.series] d = self.datas[diff.series]
@@ -67,8 +67,9 @@ class RedisState (SeriesCollection):
elif callable(pub_kv): elif callable(pub_kv):
pub_kv = pub_kv((key,value)) pub_kv = pub_kv((key,value))
if pub_kv is not None: if pub_kv is not None:
k, v = pub_kv
# noinspection PyTypeChecker # noinspection PyTypeChecker
pubs.append((series,*pub_kv)) pubs.append((series,k,[v]))
if diff.value is DELETE: if diff.value is DELETE:
if t == DataType.SET: if t == DataType.SET:
sdels[series].add(key) sdels[series].add(key)
@@ -95,11 +96,15 @@ class RedisState (SeriesCollection):
r.hdel(series, *keys) r.hdel(series, *keys)
block_series = f'{chain_id}|block.latest' block_series = f'{chain_id}|block.latest'
r.json(json_encoder).set(block_series,'$',block.data) r.json(json_encoder).set(block_series,'$',block.data)
pubs.append((str(chain_id), 'block.latest', block.data)) pubs.append((str(chain_id), 'block.latest', [block.data]))
# separate batch for # separate batch for pubs
if pubs: if pubs:
async with memcache.batch() as r: await publish_all(pubs)
r: Pipeline
io = Emitter(dict(client=r))
for s,k,v in pubs: async def publish_all(pubs: list[tuple[str,str,list[Any]]]):
io.To(s).Emit(k,v) async with memcache.batch() as r:
r: Pipeline
io = Emitter(dict(client=r))
for room, event, args in pubs:
io.To(room).Emit(event, *args)

View File

@@ -0,0 +1,185 @@
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import Optional
from dexorder.contract import abi_decoder, abi_encoder
log = logging.getLogger(__name__)
# enum SwapOrderState {
# Open, Canceled, Filled, Template
# }
class SwapOrderState (Enum):
Open = 0
Canceled = 1
Filled = 2
Template = 3
class Exchange (Enum):
UniswapV2 = 0
UniswapV3 = 1
@dataclass
class Route:
exchange: Exchange
fee: int
@staticmethod
def load(obj):
return Route(Exchange(obj[0]), obj[1])
def dump(self):
return self.exchange.value, self.fee
@dataclass
class SwapOrder:
tokenIn: str
tokenOut: str
route: Route
amount: int
amountIsInput: bool
outputDirectlyToOwner: bool
chainOrder: int
tranches: list['Tranche']
@staticmethod
def load(obj):
return SwapOrder(obj[0], obj[1], Route.load(obj[2]), obj[3], obj[4], obj[5], obj[6], [Tranche.load(t) for t in obj[7]])
def dump(self):
return (self.tokenIn, self.tokenOut, self.route.dump(), self.amount, self.amountIsInput,
self.outputDirectlyToOwner, self.chainOrder, [t.dump() for t in self.tranches])
@dataclass
class SwapOrderStatus:
order: SwapOrder
state: SwapOrderState
start: int
ocoGroup: Optional[int]
filledIn: int
filledOut: int
@staticmethod
def load(obj):
order = SwapOrder.load(obj[0])
state = SwapOrderState(obj[1])
start = obj[2]
ocoGroup = None if obj[3] == NO_OCO else obj[3]
filledIn = obj[4]
filledOut = obj[5]
return SwapOrderStatus(order, state, start, ocoGroup, filledIn, filledOut)
def dump(self):
return self.order.dump(), self.state.value, self.start, self.ocoGroup, self.filledIn, self.filledOut
NO_OCO = 18446744073709551615 # max uint64
class ConstraintMode (Enum):
Time = 0
Limit = 1
Trailing = 2
Barrier = 3
Line = 4
@dataclass
class Constraint (ABC):
@staticmethod
def load(obj):
mode = ConstraintMode(obj[0])
if mode == ConstraintMode.Time:
return TimeConstraint.load(obj[1])
else:
raise NotImplementedError
@abstractmethod
def dump(self): ...
@staticmethod
def _dump(mode, types, values):
return mode, abi_encoder.encode(types, values)
@dataclass
class PriceConstraint (Constraint):
isAbove: bool
isRatio: bool
valueSqrtX96: int
def dump(self):
return Constraint._dump(ConstraintMode.Limit, ('bool','bool','uint160'), (self.isAbove, self.isRatio, self.valueSqrtX96))
@dataclass
class LineConstraint (Constraint):
isAbove: bool
isRatio: bool
time: int
valueSqrtX96: int
slopeSqrtX96: int
class TimeMode (Enum):
Timestamp = 0
SinceOrderStart = 1
@dataclass
class Time:
mode: TimeMode
time: int
DISTANT_PAST = 0
DISTANT_FUTURE = 4294967295 # max uint32
@dataclass
class TimeConstraint (Constraint):
earliest: Time
latest: Time
TYPES = ['uint8', 'uint32', 'uint8', 'uint32']
@staticmethod
def load(obj: bytes):
earliest_mode, earliest_time, latest_mode, latest_time = abi_decoder.decode(TimeConstraint.TYPES, obj)
return TimeConstraint(Time(TimeMode(earliest_mode),earliest_time), Time(TimeMode(latest_mode),latest_time))
def dump(self):
return Constraint._dump(ConstraintMode.Time, TimeConstraint.TYPES,
(self.earliest.mode.value, self.earliest.time, self.latest.mode.value, self.latest.time))
@dataclass
class Tranche:
fraction: int # 18-decimal fraction of the order amount which is available to this tranche. must be <= 1
constraints: list[Constraint]
@staticmethod
def load(obj):
return Tranche(obj[0], [Constraint.load(c) for c in obj[1]])
def dump(self):
return self.fraction, [c.dump() for c in self.constraints]
@dataclass
class PriceProof:
proof: int
class OcoMode (Enum):
NO_OCO = 0
CANCEL_ON_PARTIAL_FILL = 1
CANCEL_ON_COMPLETION = 2
@dataclass
class OcoGroup:
mode: OcoMode
startIndex: int
num: int

View File

@@ -1,22 +1,18 @@
import logging import logging
from typing import Callable, Union from typing import Callable, Union, Any, Iterable
from web3 import AsyncWeb3
from web3.contract.contract import ContractEvents from web3.contract.contract import ContractEvents
from web3.exceptions import LogTopicError, MismatchedABI from web3.exceptions import LogTopicError, MismatchedABI
from web3.types import EventData
from dexorder import Blockchain, db, blockchain, NARG, dec from dexorder import Blockchain, db, blockchain, NARG, current_pub
from dexorder.base.chain import current_chain from dexorder.base.chain import current_chain
from dexorder.base.fork import Fork, current_fork from dexorder.base.fork import current_fork, Fork
from dexorder.blockchain.connection import create_w3_ws from dexorder.blockchain.connection import create_w3_ws
from dexorder.blockchain.util import get_contract_data from dexorder.blockstate import BlockState, current_blockstate
from dexorder.blockstate import DiffItem, BlockState, current_blockstate
from dexorder.blockstate.diff import DiffEntryItem from dexorder.blockstate.diff import DiffEntryItem
from dexorder.blockstate.state import compress_diffs
from dexorder.data import pool_prices, vault_tokens, underfunded_vaults, vault_addresses
from dexorder.database.model import Block from dexorder.database.model import Block
from dexorder.database.model.block import current_block, latest_block from dexorder.database.model.block import current_block, latest_block
from dexorder.event_handler import setup_logevent_triggers
from dexorder.util import hexstr, topic from dexorder.util import hexstr, topic
from dexorder.util.async_util import maywait from dexorder.util.async_util import maywait
@@ -26,7 +22,7 @@ log = logging.getLogger(__name__)
# todo detect reorgs and generate correct onHeadUpdate set by unioning the changes along the two forks, not including their common ancestor deltas # todo detect reorgs and generate correct onHeadUpdate set by unioning the changes along the two forks, not including their common ancestor deltas
class BlockStateRunner: class BlockStateRunner:
def __init__(self, state: BlockState = None): def __init__(self, state: BlockState = None, *, publish_all=None):
""" """
If state is None, then it is initialized as empty using the first block seen as the root block. Then the second block begins log event handling. If state is None, then it is initialized as empty using the first block seen as the root block. Then the second block begins log event handling.
""" """
@@ -41,6 +37,8 @@ class BlockStateRunner:
# onPromotion callbacks are invoked with a list of DiffItems used to advance the root state # onPromotion callbacks are invoked with a list of DiffItems used to advance the root state
self.on_promotion: list[Callable[[Block,list[DiffEntryItem]],None]] = [] self.on_promotion: list[Callable[[Block,list[DiffEntryItem]],None]] = []
self.publish_all: Callable[[Iterable[tuple[str,str,Any]]],None] = publish_all
async def run(self): async def run(self):
""" """
@@ -67,12 +65,13 @@ class BlockStateRunner:
w3 = blockchain.connect() w3 = blockchain.connect()
w3ws = create_w3_ws() w3ws = create_w3_ws()
self.setup_triggers(w3)
chain_id = await w3ws.eth.chain_id chain_id = await w3ws.eth.chain_id
chain = Blockchain.for_id(chain_id) chain = Blockchain.for_id(chain_id)
current_chain.set(chain) current_chain.set(chain)
setup_logevent_triggers(self)
state = self.state state = self.state
async with w3ws as w3ws: async with w3ws as w3ws:
@@ -94,50 +93,53 @@ class BlockStateRunner:
state = BlockState(block) state = BlockState(block)
current_blockstate.set(state) current_blockstate.set(state)
log.info('Created new empty root state') log.info('Created new empty root state')
fork = Fork([block.hash], height=block.height)
else: else:
fork = state.add_block(block) fork = state.add_block(block)
if fork is None: if fork is None:
log.debug(f'discarded late-arriving head {block}') log.debug(f'discarded late-arriving head {block}')
else: else:
batches = [] batches = []
if fork.disjoint: if fork.disjoint:
# backfill batches # backfill batches
for callback, event, log_filter in self.events: for callback, event, log_filter in self.events:
from_height = state.root_block.height + 1 from_height = state.root_block.height + 1
end_height = block.height end_height = block.height
while from_height <= end_height: while from_height <= end_height:
to_height = min(end_height, from_height + chain.batch_size - 1) to_height = min(end_height, from_height + chain.batch_size - 1)
lf = dict(log_filter)
lf['fromBlock'] = from_height
lf['toBlock'] = to_height
log.debug(f'batch backfill {from_height} - {to_height}')
batches.append((w3.eth.get_logs(log_filter), callback, event, lf))
from_height += chain.batch_size
else:
# event callbacks are triggered in the order in which they're registered. the events passed to
# each callback are in block transaction order
for callback, event, log_filter in self.events:
lf = dict(log_filter) lf = dict(log_filter)
lf['blockhash'] = w3.to_hex(block.hash) lf['fromBlock'] = from_height
batches.append((w3.eth.get_logs(log_filter), callback, event, log_filter)) lf['toBlock'] = to_height
log.debug(f'batch backfill {from_height} - {to_height}')
batches.append((w3.eth.get_logs(lf), callback, event, lf))
from_height += chain.batch_size
else:
# event callbacks are triggered in the order in which they're registered. the events passed to
# each callback are in block transaction order
for callback, event, log_filter in self.events:
lf = dict(log_filter)
lf['blockHash'] = hexstr(block.hash)
batches.append((w3.eth.get_logs(lf), callback, event, log_filter))
# set up for callbacks # set up for callbacks
current_block.set(block) current_block.set(block)
current_fork.set(fork) current_fork.set(fork)
session = db.session # todo move session creation to here? session = db.session
session.begin() session.begin()
session.add(block) session.add(block)
# callbacks pubs = []
for future,callback,event,filter_args in batches: current_pub.set(lambda room, evnt, *args: pubs.append((room, evnt, args)))
log_events = await future # callbacks
for log_event in log_events: for future,callback,event,filter_args in batches:
try: log_events = await future
parsed = event.process_log(log_event) for log_event in log_events:
except (LogTopicError, MismatchedABI): try:
pass parsed = event.process_log(log_event)
else: except (LogTopicError, MismatchedABI):
# todo try/except for known retryable errors pass
callback(parsed) else:
# todo try/except for known retryable errors
await maywait(callback(parsed))
# todo check for reorg and generate a reorg diff list # todo check for reorg and generate a reorg diff list
diff_items = state.diffs_by_hash[block.hash] diff_items = state.diffs_by_hash[block.hash]
@@ -151,6 +153,9 @@ class BlockStateRunner:
for callback in self.on_promotion: for callback in self.on_promotion:
# todo try/except for known retryable errors # todo try/except for known retryable errors
callback(state.root_block, diff_items) callback(state.root_block, diff_items)
if pubs and self.publish_all:
await maywait(self.publish_all(pubs))
except: except:
if session is not None: if session is not None:
session.rollback() session.rollback()
@@ -163,41 +168,8 @@ class BlockStateRunner:
log.info(f'completed block {block}') log.info(f'completed block {block}')
@staticmethod
def handle_transfer(transfer: EventData):
to_address = transfer['args']['to']
log.debug(f'transfer {to_address}')
if to_address in vault_addresses:
token_address = transfer['address']
vault_tokens.add(token_address)
if to_address in underfunded_vaults:
# todo flag underfunded vault (check token type?)
pass
@staticmethod
def handle_swap(swap: EventData):
try:
sqrt_price = swap['args']['sqrtPriceX96']
except KeyError:
return
addr = swap['address']
d = dec(sqrt_price)
price = d*d / dec(2**(96*2))
log.debug(f'pool {addr} {price}')
pool_prices[addr] = price
def add_event_trigger(self, callback:Callable[[dict],None], event: ContractEvents, log_filter: Union[dict,str]=None): def add_event_trigger(self, callback:Callable[[dict],None], event: ContractEvents, log_filter: Union[dict,str]=None):
if log_filter is None: if log_filter is None:
log_filter = {'topics':[topic(event.abi)]} log_filter = {'topics':[topic(event.abi)]}
self.events.append((callback, event, log_filter)) self.events.append((callback, event, log_filter))
def setup_triggers(self, w3: AsyncWeb3):
self.events.clear()
transfer = w3.eth.contract(abi=get_contract_data('ERC20')['abi']).events.Transfer()
self.add_event_trigger(self.handle_transfer, transfer)
swap = w3.eth.contract(abi=get_contract_data('IUniswapV3PoolEvents')['abi']).events.Swap()
self.add_event_trigger(self.handle_swap, swap)