diagonal line support

This commit is contained in:
Tim Olson
2023-12-19 17:06:50 -04:00
parent 9c0f09ea62
commit 41ff02b9ba
4 changed files with 94 additions and 30 deletions

View File

@@ -1,4 +1,6 @@
import math
from contextvars import ContextVar
from datetime import datetime
class Blockchain:
@@ -48,3 +50,19 @@ Mock = Blockchain(31337, 'Mock', 3, batch_size=10000)
Alpha = Blockchain(53261, 'Dexorder Alpha', 3, batch_size=10000)
current_chain = ContextVar[Blockchain]('current_chain')
class BlockClock:
def __init__(self):
self.timestamp = 0
self.adjustment = 0
def set(self, timestamp):
self.timestamp = timestamp
self.adjustment = timestamp - datetime.now().timestamp()
def now(self):
return math.ceil(datetime.now().timestamp() + self.adjustment)
current_clock = ContextVar[BlockClock]('clock') # current estimated timestamp of the blockchain. will be different than current_block.get().timestamp when evaluating time triggers in-between blocks

View File

@@ -6,7 +6,7 @@ from uuid import UUID
from web3.types import EventData
from dexorder import current_pub, db, dec
from dexorder.base.chain import current_chain
from dexorder.base.chain import current_chain, current_clock
from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, OrderKey
from dexorder.transaction import create_transactions, submit_transaction_request, handle_transaction_receipts, send_transactions
from dexorder.pools import uniswap_price
@@ -66,12 +66,14 @@ def setup_logevent_triggers(runner):
runner.add_event_trigger(handle_order_cancel_all, get_contract_event('OrderLib', 'DexorderCancelAll'))
runner.add_event_trigger(handle_transaction_receipts)
runner.add_event_trigger(handle_dexorderexecutions, executions)
runner.add_event_trigger(activate_time_triggers)
runner.add_event_trigger(activate_price_triggers)
runner.add_event_trigger(process_active_tranches)
runner.add_event_trigger(process_execution_requests)
runner.add_event_trigger(create_transactions)
runner.add_event_trigger(send_transactions)
# these callbacks run after the ones above on each block, plus these also run every second
runner.add_postprocess_trigger(activate_time_triggers)
runner.add_postprocess_trigger(activate_price_triggers)
runner.add_postprocess_trigger(process_active_tranches)
runner.add_postprocess_trigger(process_execution_requests)
runner.add_postprocess_trigger(create_transactions)
runner.add_postprocess_trigger(send_transactions)
def dump_log(eventlog):
@@ -236,8 +238,8 @@ def handle_vault_created(created: EventData):
async def activate_time_triggers():
now = current_block.get().timestamp
log.debug(f'activating time triggers')
now = current_clock.get().now()
log.debug(f'activating time triggers at {now}')
# time triggers
for tt in tuple(time_triggers):
await maywait(tt(now))

View File

@@ -9,9 +9,9 @@ from dexorder.base.orderlib import SwapOrderState, PriceProof, DISTANT_FUTURE, D
from dexorder.util import defaultdictk
from .orderstate import Order
from .. import dec
from ..base.chain import current_clock
from ..base.order import OrderKey, TrancheKey, ExecutionRequest
from ..pools import ensure_pool_price, Pools, pool_decimals
from ..database.model.block import current_block
from ..pools import ensure_pool_price, Pools, pool_decimals, pool_prices
from ..routing import pool_address
log = logging.getLogger(__name__)
@@ -38,10 +38,7 @@ async def activate_order(order: Order):
address = pool_address(order.status.order)
pool = await Pools.get(address)
await ensure_pool_price(pool)
inverted = pool.base != order.order.tokenIn
if inverted:
assert pool.base == order.order.tokenOut
triggers = OrderTriggers(order, inverted)
triggers = OrderTriggers(order)
if triggers.closed:
log.debug(f'order {order.key} was immediately closed')
close_order_and_disable_triggers(order, SwapOrderState.Filled if order.remaining == 0 or order.remaining <= order.min_fill_amount else SwapOrderState.Expired)
@@ -58,10 +55,11 @@ async def line_passes(lc: tuple[float,float], is_min: bool, price: dec) -> bool:
b, m = lc
if b == 0 and m == 0:
return True
limit = m * current_block.get().timestamp + b
limit = m * current_clock.get().now() + b
log.debug(f'line passes {limit} {"<" if is_min else ">"} {price}')
# todo ratios
# prices AT the limit get zero volume, so we only trigger on >, not >=
return is_min and price > limit or not is_min and price < limit
return is_min and limit < price or not is_min and limit > price
class TrancheStatus (Enum):
@@ -71,7 +69,7 @@ class TrancheStatus (Enum):
Expired = auto() # time deadline has past and this tranche cannot be filled
class TrancheTrigger:
def __init__(self, order: Order, tranche_key: TrancheKey, inverted: bool):
def __init__(self, order: Order, tranche_key: TrancheKey):
assert order.key.vault == tranche_key.vault and order.key.order_index == tranche_key.order_index
self.order = order
self.tk = tranche_key
@@ -93,8 +91,7 @@ class TrancheTrigger:
self.min_line_constraint = (0.,0.) if tranche.marketOrder else (tranche.minIntercept, tranche.minSlope)
self.max_line_constraint = (0.,0.) if tranche.marketOrder else (tranche.maxIntercept, tranche.maxSlope)
self.has_line_constraint = any( a or b for a,b in (self.min_line_constraint, self.max_line_constraint))
if not tranche.marketOrder and inverted:
self.min_line_constraint, self.max_line_constraint = self.max_line_constraint, self.min_line_constraint
self.has_sloped_line_constraint = any(m!=0 for b,m in (self.min_line_constraint, self.max_line_constraint))
self.slippage = tranche.minIntercept if tranche.marketOrder else 0
self.pool_price_multiplier = None
@@ -102,8 +99,12 @@ class TrancheTrigger:
if tranche_remaining == 0 or tranche_remaining < self.order.min_fill_amount: # min_fill_amount could be 0 (disabled) so we also check for the 0 case separately
self._status = TrancheStatus.Filled
return
timestamp = current_block.get().timestamp
self._status = TrancheStatus.Early if timestamp < self.time_constraint[0] else TrancheStatus.Expired if timestamp > self.time_constraint[1] else TrancheStatus.Pricing
timestamp = current_clock.get().now()
self._status = \
TrancheStatus.Pricing if self.time_constraint is None else \
TrancheStatus.Early if timestamp < self.time_constraint[0] else \
TrancheStatus.Expired if timestamp > self.time_constraint[1] else \
TrancheStatus.Pricing
self.enable_time_trigger()
if self._status == TrancheStatus.Pricing:
self.enable_price_trigger()
@@ -142,14 +143,14 @@ class TrancheTrigger:
self.enable_price_trigger()
def enable_price_trigger(self):
if self.has_line_constraint:
if self.has_line_constraint and not self.has_sloped_line_constraint: # sloped constraints must be triggered every tick, not just on pool price changes
price_triggers[self.order.pool_address].add(self.price_trigger)
new_price_triggers[self.order.pool_address].add(self.price_trigger)
else:
unconstrained_price_triggers.add(self.price_trigger)
def disable_price_trigger(self):
if self.has_line_constraint:
if self.has_line_constraint and not self.has_sloped_line_constraint:
price_triggers[self.order.pool_address].remove(self.price_trigger)
else:
unconstrained_price_triggers.remove(self.price_trigger)
@@ -159,11 +160,16 @@ class TrancheTrigger:
if self.closed:
log.debug(f'price trigger ignored because trigger status is {self.status}')
return
log.debug(f'price trigger {cur}')
if cur is None and self.has_line_constraint:
await ensure_pool_price(self.order.pool_address)
cur = pool_prices[self.order.pool_address]
if cur is not None:
if self.pool_price_multiplier is None:
pool = await Pools.get(pool_address(self.order.order))
pool_dec = await pool_decimals(pool)
self.pool_price_multiplier = dec(10) ** dec(-pool_dec)
log.debug(f'adjusted cur price from {cur} => {cur*self.pool_price_multiplier}')
cur *= self.pool_price_multiplier
if cur is None or not self.has_line_constraint or all(await asyncio.gather(
line_passes(self.min_line_constraint, True, cur),
@@ -199,10 +205,10 @@ class TrancheTrigger:
class OrderTriggers:
instances: dict[OrderKey, 'OrderTriggers'] = {}
def __init__(self, order: Order, inverted: bool):
def __init__(self, order: Order):
assert order.key not in OrderTriggers.instances
self.order = order
self.triggers = [TrancheTrigger(order, tk, inverted) for tk in self.order.tranche_keys]
self.triggers = [TrancheTrigger(order, tk) for tk in self.order.tranche_keys]
OrderTriggers.instances[order.key] = self
log.debug(f'created OrderTriggers for {order.key}')

View File

@@ -1,15 +1,17 @@
import asyncio
import logging
from asyncio import Queue
from datetime import datetime
from typing import Callable, Union, Any, Iterable
from sqlalchemy.sql.functions import current_timestamp
from web3.contract.contract import ContractEvents
from web3.exceptions import LogTopicError, MismatchedABI
# noinspection PyPackageRequirements
from websockets.exceptions import ConnectionClosedError
from dexorder import Blockchain, db, current_pub, async_yield, current_w3, config
from dexorder.base.chain import current_chain
from dexorder.base.chain import current_chain, current_clock, BlockClock
from dexorder.base.fork import current_fork, Fork
from dexorder.blockchain.connection import create_w3_ws, create_w3
from dexorder.blockstate import BlockState, current_blockstate
@@ -35,6 +37,9 @@ class BlockStateRunner:
# items are (callback, event, log_filter). The callback is invoked with web3 EventData for every detected event
self.events:list[tuple[Callable[[dict],None],ContractEvents,dict]] = []
# these callbacks are invoked after every block and also every second if there wasnt a block
self.postprocess_cbs:list[Callable[[],None]] = []
# onStateInit callbacks are invoked after the initial state is loaded or created
self.on_state_init: list[Callable[[],None]] = []
self.state_initialized = False
@@ -60,6 +65,8 @@ class BlockStateRunner:
log_filter = {'topics': [topic(event.abi)]}
self.events.append((callback, event, log_filter))
def add_postprocess_trigger(self, callback: Callable[[dict], None]):
self.postprocess_cbs.append(callback)
async def run(self):
return await (self.run_polling() if config.polling > 0 else self.run_ws())
@@ -171,16 +178,21 @@ class BlockStateRunner:
w3 = current_w3.get()
chain = current_chain.get()
assert chain.chain_id == await w3.eth.chain_id
current_clock.set(BlockClock())
prev_head = None
while self.running:
try:
async with asyncio.timeout(1): # check running flag every second
start = datetime.now()
head = await self.queue.get()
log.debug(f'got head {hexstr(head)}')
except TimeoutError:
pass
# 1 second has passed without a new head. Run the postprocess callbacks to check for activated time-based triggers
if prev_head is not None:
await self.handle_time_tick(head)
else:
try:
await self.handle_head(chain, head, w3)
prev_head = head
except Exception as x:
log.exception(x)
log.debug('runner worker exiting')
@@ -203,6 +215,7 @@ class BlockStateRunner:
block = Block(chain=chain_id, height=int(block_data['number'], 0),
hash=bytes.fromhex(block_data['hash'][2:]), parent=bytes.fromhex(block_data['parentHash'][2:]), data=block_data)
latest_block.set(block)
current_clock.get().set(block.timestamp)
if self.state is None:
# initialize
self.state = BlockState(block)
@@ -234,6 +247,8 @@ class BlockStateRunner:
get_logs = await get_logs
batches.append((get_logs, callback, event, lf))
from_height += chain.batch_size
for callback in self.postprocess_cbs:
batches.append((None, callback, None, None))
else:
# event callbacks are triggered in the order in which they're registered. the events passed to
# each callback are in block transaction order
@@ -248,10 +263,12 @@ class BlockStateRunner:
if not config.parallel_logevent_queries:
get_logs = await get_logs
batches.append((get_logs, callback, event, log_filter))
for callback in self.postprocess_cbs:
batches.append((None, callback, None, None))
# set up for callbacks
current_block.set(block)
current_fork.set(fork) # this is set earlier
current_fork.set(fork)
session = db.session
session.begin()
session.add(block)
@@ -305,6 +322,27 @@ class BlockStateRunner:
if session is not None:
session.close()
async def handle_time_tick(self, blockhash):
# similar to handle_head, but we only call the postprocess events, since there was only a time tick and no new block data
block = self.state.by_hash[blockhash]
fork = self.state.fork(block)
current_block.set(block)
current_fork.set(fork)
session = db.session
session.begin()
try:
for callback in self.postprocess_cbs:
await maywait(callback())
except:
session.rollback()
raise
else:
session.commit()
finally:
if session is not None:
session.close()
async def do_state_init_cbs(self):
if self.state_initialized:
return