order placement doesnt crash

This commit is contained in:
Tim Olson
2023-10-14 22:06:07 -04:00
parent 1ed6b759bc
commit 6b15634ddc
18 changed files with 248 additions and 127 deletions

View File

@@ -1,6 +1,7 @@
from typing import Generic, TypeVar, Any, Iterator
from dexorder import NARG
from dexorder.base.chain import current_chain
_T = TypeVar('_T')
@@ -18,18 +19,18 @@ class ByBlockchainCollection (Generic[_T]):
self.by_blockchain = by_blockchain if by_blockchain is not None else {}
def __getitem__(self, item) -> _T:
return self.by_blockchain[ctx.chain_id][item]
return self.by_blockchain[current_chain.get().chain_id][item]
class ByBlockchainDict (ByBlockchainCollection[_T], Generic[_T]):
def __getattr__(self, name: str) -> _T:
return self.by_blockchain[ctx.chain_id][name]
return self.by_blockchain[current_chain.get().chain_id][name]
def get(self, item, default=None, *, chain_id=None) -> _T:
# will raise if default is NARG
if chain_id is None:
chain_id = ctx.chain_id
chain_id = current_chain.get().chain_id
if chain_id is None:
raise KeyError('no ctx.chain_id set')
found = self.by_blockchain.get(chain_id, {}).get(item, default)
@@ -40,16 +41,16 @@ class ByBlockchainDict (ByBlockchainCollection[_T], Generic[_T]):
class ByBlockchainList (ByBlockchainCollection[_T], Generic[_T]):
def __iter__(self) -> Iterator[_T]:
return iter(self.by_blockchain[ctx.chain_id])
return iter(self.by_blockchain[current_chain.get().chain_id])
def iter(self, *, chain_id=None) -> Iterator[_T]:
if chain_id is None:
chain_id = ctx.chain_id
chain_id = current_chain.get().chain_id
return iter(self.by_blockchain[chain_id])
def get(self, index, *, chain_id=None) -> _T:
if chain_id is None:
chain_id = ctx.chain_id
chain_id = current_chain.get().chain_id
if chain_id is None:
raise KeyError('no ctx.chain_id set')
return self.by_blockchain[chain_id][index]

View File

@@ -1,7 +1,7 @@
from hexbytes import HexBytes
from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider
from dexorder.blockchain.util import get_contract_data
from dexorder.util.uniswap_util import get_contract_data
from .. import current_w3
from ..configuration import resolve_rpc_url
from ..configuration.resolve import resolve_ws_url

View File

@@ -1,12 +1,14 @@
from charset_normalizer.md import getLogger
from eth_abi.packed import encode_packed
from eth_utils import keccak, to_bytes, to_checksum_address
from dexorder import dec
from dexorder.contract import uniswapV3
from dexorder.contract import abi_encoder
from dexorder.util import hexbytes
UNISWAPV3_POOL_INIT_CODE_HASH = hexbytes('0xe34f199b19b2b4f47f68442619d555527d244f78a3297ea89325f843f87b8b54')
log = getLogger(__name__)
class Fee:
LOWEST = 100
@@ -18,19 +20,19 @@ class Fee:
def ordered_addresses(addr_a:str, addr_b:str):
return (addr_a, addr_b) if addr_a.lower() <= addr_b.lower() else (addr_b, addr_a)
def uniswapV3_pool_address( addr_a: str, addr_b: str, fee: int):
return uniswap_pool_address(uniswapV3['factory'], addr_a, addr_b, fee)
def uniswap_pool_address(factory_addr: str, addr_a: str, addr_b: str, fee: int) -> str:
token0, token1 = ordered_addresses(addr_a, addr_b)
salt = keccak(encode_packed(['address','address','uint24'],[token0, token1, fee]))
salt = keccak(abi_encoder.encode(['address','address','uint24'],[token0, token1, fee]))
contract_address = keccak(
b"\xff"
+ to_bytes(hexstr=factory_addr)
+ salt
+ UNISWAPV3_POOL_INIT_CODE_HASH
).hex()[-40:]
return to_checksum_address(contract_address)
result = to_checksum_address(contract_address)
log.debug(f'uniswap pool address {factory_addr} {addr_a} {addr_b} {fee} => {result}')
return result
def uniswap_price(sqrt_price):
d = dec(sqrt_price)

View File

@@ -28,7 +28,7 @@ class BlockData:
def __init__(self, data_type: DataType, series: Any, *,
series2str=None, series2key=None, # defaults to key2str and str2key
key2str=util_key2str, str2key=util_str2key,
value2basic=lambda x:x, basic2value=lambda x:x, # serialize/deserialize value to something JSON-able
value2str=lambda x:x, str2value=lambda x:x, # serialize/deserialize value to something JSON-able
**opts):
assert series not in BlockData.registry
BlockData.registry[series] = self
@@ -39,8 +39,8 @@ class BlockData:
self.str2key = str2key
self.series2str = series2str or self.key2str
self.series2key = series2key or self.str2key
self.value2basic = value2basic
self.basic2value = basic2value
self.value2str = value2str
self.str2value = str2value
self.lazy_getitem = None
def setitem(self, item, value, overwrite=True):

View File

@@ -63,7 +63,7 @@ class DbState(SeriesCollection):
elif t == DataType.DICT:
found = db.session.get(SeriesDict, key)
if found is None:
db.session.add(SeriesDict(**key, value=d.value2basic(diff.value)))
db.session.add(SeriesDict(**key, value=d.value2str(diff.value)))
else:
found.value = diff.value
else:
@@ -97,6 +97,6 @@ class DbState(SeriesCollection):
# noinspection PyTypeChecker
var: BlockDict = BlockData.registry[series]
for row in db.session.query(SeriesDict).where(SeriesDict.series == data.series2str(series)):
var[data.str2key(row.key)] = data.basic2value(row.value)
var[data.str2key(row.key)] = data.str2value(row.value)
completed_block.set(root_block)
return state

View File

@@ -1,13 +1,14 @@
from .abi import abis
from .contract_proxy import ContractProxy, Transaction
from .pool_contract import UniswapV3Pool
from .uniswap_contracts import uniswapV3
from eth_abi.codec import ABIDecoder, ABIEncoder
from eth_abi.registry import registry as default_registry
abi_decoder = ABIDecoder(default_registry)
abi_encoder = ABIEncoder(default_registry)
from .abi import abis
from .contract_proxy import ContractProxy, Transaction
from .pool_contract import UniswapV3Pool
from .uniswap_contracts import uniswapV3
def VaultContract(addr):
return ContractProxy(addr, 'Vault')

View File

@@ -1,6 +1,10 @@
from .contract_proxy import ContractProxy
from ..blockchain.uniswap import uniswap_price
class UniswapV3Pool (ContractProxy):
def __init__(self, address: str = None):
super().__init__(address, 'IUniswapV3Pool')
async def price(self):
return uniswap_price((await self.slot0())[0])

View File

@@ -1,4 +1,5 @@
from dexorder.base.chain import Ethereum, Goerli, Polygon, Mumbai, Arbitrum, Mock
from dexorder.blockchain.uniswap import uniswap_pool_address
from dexorder.contract.contract_proxy import ContractProxy
from dexorder.blockchain import ByBlockchainDict
@@ -17,3 +18,6 @@ class _UniswapContracts (ByBlockchainDict[ContractProxy]):
uniswapV3 = _UniswapContracts()
def uniswapV3_pool_address( addr_a: str, addr_b: str, fee: int):
return uniswap_pool_address(uniswapV3['factory'].address, addr_a, addr_b, fee)

View File

@@ -1,11 +1,10 @@
from dexorder import dec
from dexorder.blockstate import BlockSet, BlockDict
# pub=... publishes to a channel for web clients to consume. argument is (key,value) and return must be (event,room,value)
# if pub is True, then event is the current series name, room is the key, and value is passed through
# pub=... publishes to a channel for web clients to consume. argument is (key,value) and return must be (event,room,args)
# if pub is True, then event is the current series name, room is the key, and args is [value]
# values of DELETE are serialized as nulls
vault_owners = BlockDict('v', db=True, redis=True)
vault_tokens = BlockDict('vt', db=True, redis=True, pub=True)
pool_prices = BlockDict('p', db=True, redis=True, pub=True)
underfunded_vaults = BlockSet('uv', db=True)
active_orders = BlockSet('a', db=True)
vault_owners: BlockDict[str,str] = BlockDict('v', db=True, redis=True)
vault_tokens: BlockDict[str,str] = BlockDict('vt', db=True, redis=True, pub=True)
pool_prices: BlockDict[str,dec] = BlockDict('p', db=True, redis=True, pub=True, value2str=lambda d:f'{d:f}', str2value=dec)

View File

@@ -4,6 +4,7 @@ from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from dexorder.database.model import Base
from dexorder.util import hexint
class Block(Base):
@@ -13,6 +14,11 @@ class Block(Base):
parent: Mapped[bytes]
data: Mapped[dict] = mapped_column(JSONB)
@property
def timestamp(self) -> int:
# noinspection PyTypeChecker
return hexint(self.data['timestamp'])
def __str__(self):
return f'{self.height}_{self.hash.hex()[:5]}'

View File

@@ -5,15 +5,26 @@ from web3.types import EventData
from dexorder import current_pub, current_w3
from dexorder.base.chain import current_chain
from dexorder.blockchain.uniswap import uniswap_price
from dexorder.blockchain.util import vault_address, get_contract_event, get_factory, get_contract_data
from dexorder.contract import VaultContract
from dexorder.data import pool_prices, vault_owners, vault_tokens, underfunded_vaults
from dexorder.util.uniswap_util import vault_address, get_contract_event, get_factory, get_contract_data
from dexorder.contract import VaultContract, UniswapV3Pool
from dexorder.data import pool_prices, vault_owners, vault_tokens
from dexorder.database.model.block import current_block
from dexorder.orderlib.orderlib import SwapOrderStatus
from dexorder.order.orderlib import SwapOrderState, SwapOrderStatus
from dexorder.order.orderstate import Order
from dexorder.order.triggers import OrderTriggers, close_order_and_disable_triggers, price_triggers, time_triggers
log = logging.getLogger(__name__)
async def ensure_pool_price(pool_addr):
if pool_addr not in pool_prices:
log.debug(f'querying price for pool {pool_addr}')
pool_prices[pool_addr] = await UniswapV3Pool(pool_addr).price()
def dump_log(eventlog):
log.debug(f'eventlog {eventlog}')
def setup_logevent_triggers(runner):
runner.events.clear()
@@ -21,17 +32,28 @@ def setup_logevent_triggers(runner):
# code ordering here is also the trigger order: e.g. we process all vault creation events
# before any order creations
vault_created = current_w3.get().eth.contract(get_factory().address, abi=get_contract_data('Factory')['abi']).events.VaultCreated()
# DEBUG
runner.add_event_trigger(dump_log, None, {})
factory = get_factory()
if factory is None:
log.warning(f'No Factory for {current_chain.get()}')
vault_created = get_contract_event('Factory', 'VaultCreated')
else:
vault_created = current_w3.get().eth.contract(factory.address, abi=get_contract_data('Factory')['abi']).events.VaultCreated()
runner.add_event_trigger(handle_vault_created, vault_created)
runner.add_event_trigger(handle_transfer, get_contract_event('ERC20', 'Transfer'))
runner.add_event_trigger(handle_swap, get_contract_event('IUniswapV3PoolEvents', 'Swap'))
runner.add_event_trigger(handle_order_placed, get_contract_event('OrderLib', 'DexorderSwapPlaced'))
runner.add_event_trigger(activate_time_triggers)
runner.add_event_trigger(handle_transfer, get_contract_event('ERC20', 'Transfer'))
runner.add_event_trigger(handle_uniswap_swap, get_contract_event('IUniswapV3PoolEvents', 'Swap'))
runner.add_event_trigger(handle_swap_filled, get_contract_event('OrderLib', 'DexorderSwapFilled'))
runner.add_event_trigger(handle_order_completed, get_contract_event('OrderLib', 'DexorderSwapCompleted'))
runner.add_event_trigger(handle_order_error, get_contract_event('OrderLib', 'DexorderSwapError'))
runner.add_event_trigger(activate_price_triggers)
async def handle_order_placed(event: EventData):
log.debug(f'handle order placed {event}')
# event DexorderPlaced (uint64 startOrderIndex, uint8 numOrders);
addr = event['address']
start_index = int(event['args']['startOrderIndex'])
@@ -46,10 +68,13 @@ async def handle_order_placed(event: EventData):
obj = await vault.swapOrderStatus(index)
log.debug(f'raw order status {obj}')
order_status = SwapOrderStatus.load(obj)
log.debug(f'order status {order_status}')
assert order_status == SwapOrderStatus.load(order_status.dump())
log.debug('assert ok')
# todo record order
order = Order.create(vault.address, index, order_status)
await ensure_pool_price(order.pool_address)
triggers = OrderTriggers(order)
log.debug(f'created order {order_status}')
if triggers.closed:
log.warning(f'order {order.key} was immediately closed')
close_order_and_disable_triggers(order.key, SwapOrderState.Filled if not order.remaining else SwapOrderState.Expired)
def handle_swap_filled(event: EventData):
# event DexorderSwapFilled (uint64 orderIndex, uint8 trancheIndex, uint256 amountIn, uint256 amountOut);
@@ -63,8 +88,6 @@ def handle_order_error(event: EventData):
# event DexorderError (uint64 orderIndex, string reason);
log.debug(f'DexorderError {event}')
def handle_transfer(transfer: EventData):
to_address = transfer['args']['to']
log.debug(f'transfer {to_address}')
@@ -76,7 +99,9 @@ def handle_transfer(transfer: EventData):
pass
def handle_swap(swap: EventData):
new_pool_prices: dict[str, int] = {}
def handle_uniswap_swap(swap: EventData):
try:
sqrt_price = swap['args']['sqrtPriceX96']
except KeyError:
@@ -84,10 +109,11 @@ def handle_swap(swap: EventData):
addr = swap['address']
price = uniswap_price(sqrt_price)
log.debug(f'pool {addr} {price}')
pool_prices[addr] = price
new_pool_prices[addr] = price
def handle_vault_created(created: EventData):
log.debug(f'VaultCreated {created}')
try:
owner = created['args']['owner']
num = created['args']['num']
@@ -108,6 +134,19 @@ def handle_vault_created(created: EventData):
# log.debug(f'updated vaults: {vaults}')
current_pub.get()(f'{current_chain.get().chain_id}|{owner}', 'vaults', vaults)
def handle_order_created(event:EventData):
print('order', event)
def activate_time_triggers():
now = current_block.get().timestamp
# time triggers
for tt in time_triggers:
tt(now)
def activate_price_triggers():
for pool, price in new_pool_prices.items():
for pt in price_triggers[pool]:
pt(price)
def execute_requests():
log.info('execute requests: todo')
pass # todo

View File

@@ -58,17 +58,17 @@ class RedisState (SeriesCollection):
t = d.type
series = f'{chain_id}|{d.series2str(diff.series)}'
key = d.key2str(diff.key)
value = diff.value
value = d.value2str(diff.value)
# pub/sub socketio/redis
pub_kv = d.opts.get('pub')
if pub_kv is True:
pub_kv = key, value
elif callable(pub_kv):
pub_kv = pub_kv((key,value))
if pub_kv is not None:
k, v = pub_kv
pub_era = d.opts.get('pub') # event, room, args
if pub_era is True:
pub_era = series, key, [value]
elif callable(pub_era):
pub_era = pub_era(diff.key, diff.value)
if pub_era is not None:
e, r, a = pub_era
# noinspection PyTypeChecker
pubs.append((series,k,[v]))
pubs.append((e,r,a))
if diff.value is DELETE:
if t == DataType.SET:
sdels[series].add(key)

View File

@@ -5,7 +5,8 @@ from enum import Enum
from typing import Optional
from dexorder import dec
from dexorder.blockchain.uniswap import uniswap_pool_address, uniswap_price, uniswapV3_pool_address
from dexorder.blockchain.uniswap import uniswap_pool_address, uniswap_price
from dexorder.contract.uniswap_contracts import uniswapV3_pool_address
from dexorder.contract import abi_decoder, abi_encoder, uniswapV3
log = logging.getLogger(__name__)
@@ -131,7 +132,7 @@ class Constraint (ABC):
class PriceConstraint (Constraint, ABC):
@abstractmethod
def passes(self, old_price: dec, new_price: dec) -> bool:...
def passes(self, price: dec) -> bool:...
@dataclass
@@ -153,8 +154,8 @@ class LimitConstraint (PriceConstraint):
def dump(self):
return self._dump(LimitConstraint.TYPES, (self.isAbove, self.isRatio, self.valueSqrtX96))
def passes(self, old_price: dec, new_price: dec) -> bool:
return self.isAbove and new_price >= self.limit or not self.isAbove and new_price <= self.limit
def passes(self, price: dec) -> bool:
return self.isAbove and price >= self.limit or not self.isAbove and price <= self.limit
@dataclass
@@ -176,6 +177,9 @@ class Time:
mode: TimeMode
time: int
def timestamp(self, order_start: int):
return self.time if self.mode is TimeMode.Timestamp else order_start + self.time
DISTANT_PAST = 0
DISTANT_FUTURE = 4294967295 # max uint32
@@ -194,8 +198,7 @@ class TimeConstraint (Constraint):
return TimeConstraint(ConstraintMode.Time, Time(TimeMode(earliest_mode),earliest_time), Time(TimeMode(latest_mode),latest_time))
def dump(self):
return Constraint._dump(ConstraintMode.Time, TimeConstraint.TYPES,
(self.earliest.mode.value, self.earliest.time, self.latest.mode.value, self.latest.time))
return self._dump(TimeConstraint.TYPES, (self.earliest.mode.value, self.earliest.time, self.latest.mode.value, self.latest.time))
@dataclass
@@ -203,6 +206,9 @@ class Tranche:
fraction: int # 18-decimal fraction of the order amount which is available to this tranche. must be <= 1
constraints: list[Constraint]
def fraction_of(self, amount):
return amount * self.fraction // 65535
@staticmethod
def load(obj):
return Tranche(obj[0], [Constraint.load(c) for c in obj[1]])

View File

@@ -7,7 +7,7 @@ from dexorder.order.orderlib import SwapOrderStatus, SwapOrderState
log = logging.getLogger(__name__)
@dataclass
@dataclass(frozen=True, eq=True)
class OrderKey:
vault: str
order_index: int
@@ -20,7 +20,7 @@ class OrderKey:
def __str__(self):
return f'{self.vault}|{self.order_index}'
@dataclass
@dataclass(frozen=True, eq=True)
class TrancheKey (OrderKey):
tranche_index: int
@@ -39,11 +39,11 @@ class Filled:
filled_out: int
@staticmethod
def basic2remaining(basic):
return Filled(*basic)
def str2remaining(basic):
return Filled(*map(int,basic.split(','))) if basic else Filled(0,0)
def remaining2basic(self):
return self.filled_in, self.filled_out
def remaining2str(self):
return f'{self.filled_in},{self.filled_out}'
# todo oco groups
@@ -96,19 +96,16 @@ class Order:
self.status: SwapOrderStatus = Order._statuses[key].copy()
self.pool_address: str = self.status.order.pool_address
self.tranche_keys = [TrancheKey(key.vault, key.order_index, i) for i in range(len(self.status.trancheFilledIn))]
# various flattenings
self.order = self.status.order
self.amount = self.status.order.amount
self.amount_is_input = self.status.order.amountIsInput
self.tranche_amounts = [t.fraction_of(self.amount) for t in self.order.tranches]
@property
def state(self):
return self.status.state
@property
def order(self):
return self.status.order
@property
def amount(self):
return self.order.amount
@property
def remaining(self):
return self.amount - self.filled
@@ -134,10 +131,6 @@ class Order:
def filled(self):
return self.filled_in if self.amount_is_input else self.filled_out
@property
def amount_is_input(self):
return self.order.amountIsInput
@property
def is_open(self):
@@ -156,6 +149,7 @@ class Order:
fin = old.filled_in + filled_in
fout = old.filled_out + filled_out
Order._tranche_filled[tk] = Filled(fin, fout)
# todo check for completion
def complete(self, final_state: SwapOrderState):
""" updates the static order record with its final values, then deletes all its dynamic blockstate and removes the Order from the actives list """
@@ -163,16 +157,23 @@ class Order:
status = self.status
status.state = final_state
if self.is_open:
del Order._open_keys[self.key]
Order._open_keys.remove(self.key)
# set final fill values in the status
filled = Order._order_filled[self.key]
del Order._order_filled[self.key]
try:
del Order._order_filled[self.key]
except KeyError:
pass
status.filledIn = filled.filled_in
status.filledOut = filled.filled_out
for i, tk in enumerate(self.tranche_keys):
filled = Order._tranche_filled[tk]
del Order._tranche_filled[tk]
status.trancheFilledIn[i] = filled.filled_in
status.trancheFilledOut[i] = filled.filled_out
try:
filled = Order._tranche_filled[tk]
del Order._tranche_filled[tk]
status.trancheFilledIn[i] = filled.filled_in
status.trancheFilledOut[i] = filled.filled_out
except KeyError:
pass
final_status = status.copy()
Order._statuses[self.key] = final_status # set the status in order to save it
Order._statuses.unload(self.key) # but then unload from memory after root promotion
@@ -187,13 +188,16 @@ class Order:
# open orders = the set of unfilled, not-canceled orders
_open_keys: BlockSet[OrderKey] = BlockSet('oo', db=True, redis=True, str2key=OrderKey.str2key)
# underfunded vaults
_underfunded: BlockSet[str] = BlockSet('uv', db=True, redis=True)
# total remaining amount per order, for all unfilled, not-canceled orders
_order_filled: BlockDict[OrderKey, Filled] = BlockDict(
'of', db=True, redis=True, str2key=OrderKey.str2key, value2basic=Filled.remaining2basic, basic2value=Filled.basic2remaining)
'of', db=True, redis=True, str2key=OrderKey.str2key, value2str=Filled.remaining2str, str2value=Filled.str2remaining)
# total remaining amount per tranche
_tranche_filled: BlockDict[TrancheKey, Filled] = BlockDict(
'tf', db=True, redis=True, str2key=TrancheKey.str2key, value2basic=Filled.remaining2basic, basic2value=Filled.basic2remaining)
'tf', db=True, redis=True, str2key=TrancheKey.str2key, value2str=Filled.remaining2str, str2value=Filled.str2remaining)
active_orders: dict[OrderKey,Order] = {}

View File

@@ -3,18 +3,18 @@ from enum import Enum
from typing import Callable
from dexorder.blockstate import BlockSet, BlockDict
from .orderlib import TimeConstraint, LimitConstraint, ConstraintMode
from .orderlib import TimeConstraint, LimitConstraint, ConstraintMode, SwapOrderState
from dexorder.util import defaultdictk
from .orderstate import TrancheKey, Order
from .orderstate import TrancheKey, Order, OrderKey
from ..database.model.block import current_block
log = logging.getLogger(__name__)
# todo time and price triggers should be BlockSortedSets that support range queries
TimeTrigger = Callable[[int, int], None] # func(previous_timestamp, current_timestamp)
# todo time and price triggers should be BlockSortedSets that support range queries, for efficient lookup of triggers
TimeTrigger = Callable[[int], None] # func(timestamp)
time_triggers:BlockSet[TimeTrigger] = BlockSet('tt')
PriceTrigger = Callable[[int, int], None] # pool previous price, pool new price
PriceTrigger = Callable[[int], None] # func(pool_price)
price_triggers:dict[str, BlockSet[PriceTrigger]] = defaultdictk(lambda addr:BlockSet(f'pt|{addr}')) # different BlockSet per pool address
execution_requests:BlockDict[TrancheKey,int] = BlockDict('te') # value is block height of the request
@@ -37,9 +37,10 @@ class TrancheTrigger:
self.order = order
self.tk = tranche_key
self.status = TrancheStatus.Early
start = self.order.status.start
tranche = order.order.tranches[self.tk.tranche_index]
tranche_amount = order.amount * tranche.fraction // 10**18
tranche_amount = tranche.fraction_of(order.amount)
tranche_filled = order.tranche_filled(self.tk)
tranche_remaining = tranche_amount - tranche_filled
@@ -47,12 +48,14 @@ class TrancheTrigger:
self.status = TrancheStatus.Filled
return
self.time_constraint = time_constraint = None
self.time_constraint = time_constraint = None # stored as a tuple of two ints for earliest and latest absolute timestamps
self.price_constraints = []
for c in tranche.constraints:
if c.mode == ConstraintMode.Time:
c: TimeConstraint
time_constraint = (c.earliest, c.latest) if time_constraint is None else intersect_ranges(*time_constraint, c.earliest, c.latest)
earliest = c.earliest.timestamp(start)
latest = c.latest.timestamp(start)
time_constraint = (earliest, latest) if time_constraint is None else intersect_ranges(*time_constraint, earliest, latest)
elif c.mode == ConstraintMode.Limit:
c: LimitConstraint
raise NotImplementedError
@@ -70,13 +73,15 @@ class TrancheTrigger:
def enable_time_trigger(self):
if self.time_constraint:
log.debug(f'enable_time_trigger')
time_triggers.add(self.time_trigger)
def disable_time_trigger(self):
if self.time_constraint:
time_triggers.remove(self.time_trigger)
def time_trigger(self, _prev, now):
def time_trigger(self, now):
log.debug(f'time_trigger {now}')
if now >= self.time_constraint[1]:
self.disable()
self.status = TrancheStatus.Expired
@@ -90,8 +95,8 @@ class TrancheTrigger:
def disable_price_trigger(self):
price_triggers[self.order.pool_address].remove(self.price_trigger)
def price_trigger(self, prev, cur):
if all(pc.passes(prev,cur) for pc in self.price_constraints):
def price_trigger(self, cur):
if all(pc.passes(cur) for pc in self.price_constraints):
self.execute()
def execute(self):
@@ -102,13 +107,44 @@ class TrancheTrigger:
self.disable_time_trigger()
self.disable_price_trigger()
@property
def closed(self):
return self.status in (TrancheStatus.Filled, TrancheStatus.Expired)
@property
def open(self):
return not self.closed
class OrderTriggers:
instances: dict[OrderKey, 'OrderTriggers'] = {}
def __init__(self, order: Order):
assert order.key not in OrderTriggers.instances
self.order = order
self.triggers = [TrancheTrigger(order, tk) for tk in self.order.tranche_keys]
OrderTriggers.instances[order.key] = self
def disable(self):
for t in self.triggers:
t.disable()
del OrderTriggers.instances[self.order.key]
@property
def closed(self):
return all(t.closed for t in self.triggers)
@property
def open(self):
return not self.closed
def close_order_and_disable_triggers(order_key: OrderKey, final_state: SwapOrderState):
Order.instances[order_key].complete(final_state)
try:
triggers = OrderTriggers.instances[order_key]
except KeyError:
pass
else:
triggers.disable()

View File

@@ -1,5 +1,5 @@
import logging
from typing import Callable, Union, Any, Iterable
from typing import Callable, Union, Any, Iterable, Optional
from web3.contract.contract import ContractEvents
from web3.exceptions import LogTopicError, MismatchedABI
@@ -13,6 +13,7 @@ from dexorder.blockstate.diff import DiffEntryItem
from dexorder.database.model import Block
from dexorder.database.model.block import current_block, latest_block
from dexorder.event_handler import setup_logevent_triggers
from dexorder.order.triggers import time_triggers
from dexorder.util import hexstr, topic
from dexorder.util.async_util import maywait
@@ -103,23 +104,30 @@ class BlockStateRunner:
if fork.disjoint:
# backfill batches
for callback, event, log_filter in self.events:
from_height = state.root_block.height + 1
end_height = block.height
while from_height <= end_height:
to_height = min(end_height, from_height + chain.batch_size - 1)
lf = dict(log_filter)
lf['fromBlock'] = from_height
lf['toBlock'] = to_height
log.debug(f'batch backfill {from_height} - {to_height}')
batches.append((w3.eth.get_logs(lf), callback, event, lf))
from_height += chain.batch_size
if event is None:
batches.append(None)
else:
from_height = state.root_block.height + 1
end_height = block.height
while from_height <= end_height:
to_height = min(end_height, from_height + chain.batch_size - 1)
lf = dict(log_filter)
lf['fromBlock'] = from_height
lf['toBlock'] = to_height
log.debug(f'batch backfill {from_height} - {to_height}')
batches.append((w3.eth.get_logs(lf), callback, event, lf))
from_height += chain.batch_size
else:
# event callbacks are triggered in the order in which they're registered. the events passed to
# each callback are in block transaction order
for callback, event, log_filter in self.events:
lf = dict(log_filter)
lf['blockHash'] = hexstr(block.hash)
batches.append((w3.eth.get_logs(lf), callback, event, log_filter))
if log_filter is None:
batches.append((None, callback, event, None))
else:
lf = dict(log_filter)
lf['blockHash'] = hexstr(block.hash)
print(lf)
batches.append((w3.eth.get_logs(lf), callback, event, log_filter))
# set up for callbacks
current_block.set(block)
@@ -129,17 +137,20 @@ class BlockStateRunner:
session.add(block)
pubs = []
current_pub.set(lambda room, evnt, *args: pubs.append((room, evnt, args)))
# callbacks
# logevent callbacks
for future,callback,event,filter_args in batches:
log_events = await future
for log_event in log_events:
try:
parsed = event.process_log(log_event)
except (LogTopicError, MismatchedABI):
pass
else:
# todo try/except for known retryable errors
await maywait(callback(parsed))
if future is None:
await maywait(callback()) # non-log callback
else:
log_events = await future
for log_event in log_events:
try:
parsed = event.process_log(log_event) if event is not None else log_event
except (LogTopicError, MismatchedABI):
pass
else:
# todo try/except for known retryable errors
await maywait(callback(parsed))
# todo check for reorg and generate a reorg diff list
diff_items = state.diffs_by_hash[block.hash]
@@ -167,9 +178,10 @@ class BlockStateRunner:
session.commit()
log.info(f'completed block {block}')
def add_event_trigger(self, callback:Callable[[dict],None], event: ContractEvents, log_filter: Union[dict,str]=None):
if log_filter is None:
log_filter = {'topics':[topic(event.abi)]}
def add_event_trigger(self, callback: Callable[[dict], None], event: ContractEvents = None, log_filter: Union[dict, str] = None):
"""
if event is None, the callback is still invoked in the series of log handlers but with no argument instead of logs
"""
if log_filter is None and event is not None:
log_filter = {'topics': [topic(event.abi)]}
self.events.append((callback, event, log_filter))

View File

@@ -35,6 +35,10 @@ def hexbytes(value: str):
return bytes.fromhex(value[2:] if value.startswith('0x') else value)
def hexint(value: str):
return int(value[2:] if value.startswith('0x') else value, 16)
def _keystr1(value):
t = type(value)
return value if t is str else value.hex() if t is HexBytes else '0x' + value.hex() if t is bytes else str(value)

View File

@@ -7,6 +7,7 @@ from eth_utils import keccak, to_bytes, to_checksum_address
from dexorder import config, current_w3
from dexorder.base.chain import current_chain
from dexorder.contract import ContractProxy
from dexorder.util import hexstr
log = logging.getLogger(__name__)
@@ -24,6 +25,7 @@ def get_factory() -> ContractProxy:
if tx['contractName'] == 'Factory':
addr = tx['contractAddress']
found = factory[chain_id] = ContractProxy(addr, 'Factory')
log.info(f'Factory {addr}')
break
except FileNotFoundError:
log.warning(f'Could not find deployment for chain {chain_id} "{deployment_tag}"')
@@ -46,6 +48,7 @@ def vault_address(owner, num):
with open('../contract/out/Vault.sol/Vault.json', 'rt') as _file:
vault_info = json.load(_file)
VAULT_INIT_CODE_HASH = keccak(to_bytes(hexstr=vault_info['bytecode']['object']))
log.info(f'VAULT_INIT_CODE_HASH {hexstr(VAULT_INIT_CODE_HASH)}')
salt = keccak(encode_packed(['address','uint8'],[owner,num]))
contract_address = keccak(
b"\xff"