322 lines
12 KiB
Python
322 lines
12 KiB
Python
import asyncio
|
|
import functools
|
|
import logging
|
|
from uuid import UUID
|
|
|
|
from web3.types import EventData
|
|
|
|
from dexorder import current_pub, db, from_timestamp, minutely
|
|
from dexorder.base.chain import current_chain, current_clock
|
|
from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, OrderKey
|
|
from dexorder.blocktime import get_block_timestamp
|
|
from dexorder.ohlc import ohlcs, recent_ohlcs
|
|
from dexorder.transaction import submit_transaction_request
|
|
from dexorder.pools import new_pool_prices, pool_prices, get_uniswap_data
|
|
from dexorder.contract.dexorder import vault_address, VaultContract
|
|
from dexorder.contract import ERC20
|
|
from dexorder.vault_blockdata import vault_owners, vault_balances
|
|
from dexorder.database.model.block import current_block
|
|
from dexorder.database.model.transaction import TransactionJob
|
|
from dexorder.base.orderlib import SwapOrderState
|
|
from dexorder.order.orderstate import Order
|
|
from dexorder.order.triggers import OrderTriggers, price_triggers, time_triggers, \
|
|
unconstrained_price_triggers, execution_requests, inflight_execution_requests, TrancheStatus, active_tranches, new_price_triggers, activate_order
|
|
from dexorder.util.async_util import maywait
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def dump_log(eventlog):
|
|
log.debug(f'\t{eventlog}')
|
|
|
|
|
|
async def init_order_triggers():
|
|
log.debug('activating orders')
|
|
# this is a state init callback, called only once after the state has been loaded from the db or created fresh
|
|
orders = [Order.of(key) for key in Order.open_orders]
|
|
futures = [activate_order(order) for order in orders]
|
|
await asyncio.gather(*futures, return_exceptions=True)
|
|
log.debug(f'activated {len(futures)} orders')
|
|
|
|
def init():
|
|
new_pool_prices.clear()
|
|
new_price_triggers.clear()
|
|
|
|
|
|
async def handle_order_placed(event: EventData):
|
|
log.debug(f'handle order placed {event}')
|
|
# event DexorderPlaced (uint64 startOrderIndex, uint8 numOrders);
|
|
addr = event['address']
|
|
start_index = int(event['args']['startOrderIndex'])
|
|
num_orders = int(event['args']['numOrders'])
|
|
log.debug(f'DexorderPlaced {addr} {start_index} {num_orders}')
|
|
if addr not in vault_owners:
|
|
log.warning(f'block {current_block.get()} order from unknown vault {addr}') # todo insert (short) block hash into all logs
|
|
# return todo always discard rogues
|
|
# noinspection PyBroadException
|
|
try:
|
|
vault_owners[addr] = await VaultContract(addr).owner()
|
|
except Exception:
|
|
log.warning(f'vault owner for {addr} could not be found.', exc_info=True)
|
|
return
|
|
vault = VaultContract(addr)
|
|
for index in range(start_index, start_index+num_orders):
|
|
obj = await vault.swapOrderStatus(index)
|
|
log.debug(f'raw order status {obj}')
|
|
order = Order.create(vault.address, index, obj)
|
|
await activate_order(order)
|
|
log.debug(f'new order {order} {order.order}')
|
|
|
|
|
|
def handle_swap_filled(event: EventData):
|
|
# event DexorderSwapFilled (uint64 orderIndex, uint8 trancheIndex, uint256 amountIn, uint256 amountOut);
|
|
log.debug(f'DexorderSwapFilled {event}')
|
|
args = event['args']
|
|
vault = event['address']
|
|
order_index = args['orderIndex']
|
|
tranche_index = args['trancheIndex']
|
|
amount_in = args['amountIn']
|
|
amount_out = args['amountOut']
|
|
try:
|
|
order: Order = Order.of(vault, order_index)
|
|
except KeyError:
|
|
log.warning(f'DexorderSwapFilled IGNORED due to missing order {vault} {order_index}')
|
|
return
|
|
try:
|
|
triggers = OrderTriggers.instances[order.key]
|
|
triggers.fill(tranche_index, amount_in, amount_out)
|
|
except KeyError:
|
|
log.warning(f'No order triggers for fill of {TrancheKey(order.key.vault, order.key.order_index, tranche_index)}')
|
|
|
|
async def handle_order_canceled(event: EventData):
|
|
# event DexorderCanceled (uint64 orderIndex);
|
|
log.debug(f'DexorderCanceled {event}')
|
|
vault = event['address']
|
|
order_index = event['args']['orderIndex']
|
|
try:
|
|
order: Order = Order.of(vault, order_index)
|
|
order.complete(SwapOrderState.Canceled)
|
|
log.debug(f'Canceled order {OrderKey(vault,order_index)}')
|
|
except KeyError:
|
|
log.warning(f'DexorderCanceled IGNORED due to missing order {vault} {order_index}')
|
|
|
|
|
|
async def handle_order_cancel_all(event: EventData):
|
|
# event DexorderCancelAll (uint64 numSwapOrders);
|
|
log.debug(f'DexorderCancelAll {event}')
|
|
vault = event['address']
|
|
cancelAllIndex = event['args']['cancelAllIndex']
|
|
for orderIndex in Order.vault_open_orders[vault]:
|
|
if orderIndex < cancelAllIndex:
|
|
try:
|
|
order: Order = Order.of(vault, orderIndex)
|
|
order.complete(SwapOrderState.Canceled)
|
|
log.debug(f'Canceled order {OrderKey(vault,orderIndex)}')
|
|
except KeyError:
|
|
log.warning(f'DexorderCanceled IGNORED due to missing order {vault} {orderIndex}')
|
|
|
|
|
|
def balance_adjuster(vault, token_address, amount):
|
|
async def _adjust(vaddr, taddr, amt, old_balances):
|
|
result = dict(old_balances) # copy
|
|
try:
|
|
old = old_balances[vaddr]
|
|
new_amt = old + amt
|
|
if new_amt < 0:
|
|
log.warning(f'NEGATIVE BALANCE for vault {current_chain.get()} {vault} token {taddr} {old} {amt:+} = {new_amt}')
|
|
new_amt = 0
|
|
except KeyError:
|
|
new_amt = await ERC20(taddr).balanceOf(vaddr)
|
|
result[taddr] = new_amt
|
|
return result
|
|
return functools.partial(_adjust, vault, token_address, amount)
|
|
|
|
|
|
async def handle_transfer(transfer: EventData):
|
|
# todo handle native transfers incl gas for token transfers
|
|
log.debug(f'Transfer {transfer}')
|
|
from_address = transfer['args']['from']
|
|
to_address = transfer['args']['to']
|
|
amount = int(transfer['args']['value'])
|
|
if to_address in vault_owners and to_address != from_address:
|
|
log.debug(f'deposit {to_address} {amount}')
|
|
vault = to_address
|
|
token_address = transfer['address']
|
|
await vault_balances.async_modify(vault, balance_adjuster(vault, token_address, +amount), default={})
|
|
if from_address in vault_owners and to_address != from_address:
|
|
log.debug(f'withdraw {to_address} {amount}')
|
|
vault = from_address
|
|
token_address = transfer['address']
|
|
await vault_balances.async_modify(vault, balance_adjuster(vault, token_address, amount), default={})
|
|
# if to_address not in vault_owners and from_address not in vault_owners:
|
|
# vaults = vault_owners.keys()
|
|
# log.debug(f'vaults: {list(vaults)}')
|
|
|
|
|
|
async def handle_uniswap_swaps(swaps: list[EventData]):
|
|
# asynchronously prefetch the block timestamps we'll need
|
|
hashes = set(swap['blockHash'] for swap in swaps)
|
|
await asyncio.gather(*[get_block_timestamp(h) for h in hashes])
|
|
# now execute the swaps synchronously
|
|
for swap in swaps:
|
|
await handle_uniswap_swap(swap)
|
|
|
|
|
|
async def handle_uniswap_swap(swap: EventData):
|
|
data = await get_uniswap_data(swap)
|
|
if data is None:
|
|
return
|
|
pool, time, price = data
|
|
addr = pool['address']
|
|
pool_prices[addr] = price
|
|
ohlcs.update_all(addr, time, price)
|
|
log.debug(f'pool {addr} {minutely(time)} {price}')
|
|
|
|
|
|
def handle_vault_created(created: EventData):
|
|
try:
|
|
owner = created['args']['owner']
|
|
num = created['args']['num']
|
|
except KeyError:
|
|
log.debug('couldnt parse event data for VaultCreated', created)
|
|
return
|
|
vault = vault_address(owner,num)
|
|
log.debug(f'VaultCreated {owner} #{num} => {vault}')
|
|
vault_owners[vault] = owner
|
|
vaults = []
|
|
for num in range(256):
|
|
addr = vault_address(owner, num)
|
|
# log.debug(f'v{num}? {addr}')
|
|
if addr in vault_owners:
|
|
vaults.append(addr)
|
|
else:
|
|
break
|
|
# log.debug(f'updated vaults: {vaults}')
|
|
current_pub.get()(f'{current_chain.get().chain_id}|{owner}', 'vaults', vaults)
|
|
|
|
|
|
async def activate_time_triggers():
|
|
now = current_clock.get().timestamp
|
|
# log.debug(f'activating time triggers at {now}')
|
|
# time triggers
|
|
for tt in tuple(time_triggers):
|
|
await maywait(tt(now))
|
|
|
|
|
|
async def activate_price_triggers():
|
|
# log.debug(f'activating price triggers')
|
|
pools_triggered = set()
|
|
for pool, price in new_pool_prices.items():
|
|
pools_triggered.add(pool)
|
|
for pt in tuple(price_triggers[pool]):
|
|
await maywait(pt(price))
|
|
for pool, triggers in new_price_triggers.items():
|
|
if pool not in pools_triggered:
|
|
price = pool_prices[pool]
|
|
for pt in triggers:
|
|
await maywait(pt(price))
|
|
for t in tuple(unconstrained_price_triggers):
|
|
await maywait(t(None))
|
|
|
|
|
|
def process_active_tranches():
|
|
for tk, proof in active_tranches.items():
|
|
old_req = execution_requests.get(tk)
|
|
height = current_block.get().height
|
|
if old_req is None or old_req.height <= height: # '<=' is used so proof is updated with more recent values
|
|
log.info(f'execution request for {tk}')
|
|
execution_requests[tk] = ExecutionRequest(height, proof)
|
|
|
|
|
|
async def process_execution_requests():
|
|
height = current_block.get().height
|
|
execs = {} # which requests to act on
|
|
for tk, er in execution_requests.items():
|
|
tk: TrancheKey
|
|
er: ExecutionRequest
|
|
pending = inflight_execution_requests.get(tk)
|
|
if pending is None or height-pending >= 30:
|
|
# todo execution timeout => retry ; should we use timestamps? configure per-chain.
|
|
# todo check balances
|
|
execs[tk] = er
|
|
else:
|
|
log.debug(f'tranche {tk} is pending execution')
|
|
|
|
# execute the list
|
|
# todo batch execution
|
|
for tk, er in execs.items():
|
|
job = submit_transaction_request(new_tranche_execution_request(tk, er.proof))
|
|
inflight_execution_requests[tk] = height
|
|
log.info(f'created job {job.id} to execute tranche {tk}')
|
|
|
|
|
|
def handle_dexorderexecutions(event: EventData):
|
|
log.debug(f'executions {event}')
|
|
exe_id = UUID(bytes=event['args']['id'])
|
|
errors = event['args']['errors']
|
|
if len(errors) == 0:
|
|
log.warning(f'No errors found in DexorderExecutions event: {event}')
|
|
return
|
|
if len(errors) > 1:
|
|
log.warning(f'Multiple executions not yet implemented')
|
|
job: TransactionJob = db.session.get(TransactionJob, exe_id)
|
|
if job is None:
|
|
log.warning(f'Job {exe_id} not found!')
|
|
return
|
|
finish_execution_request(job.request, errors[0])
|
|
|
|
|
|
def finish_execution_request(req: TrancheExecutionRequest, error: str):
|
|
try:
|
|
order: Order = Order.of(req.vault, req.order_index)
|
|
except KeyError:
|
|
log.error(f'Could not get order {OrderKey(req.vault, req.order_index)}')
|
|
return
|
|
tk = TrancheKey(req.vault, req.order_index, req.tranche_index)
|
|
if error != '':
|
|
log.debug(f'execution request for tranche {tk} had error "{error}"')
|
|
if error == '':
|
|
log.debug(f'execution request for tranche {tk} was successful!')
|
|
elif error in ('IIA', 'STF'): # todo not STF
|
|
# Insufficient Input Amount or Safe Transfer Failure: suspend execution until new funds are sent
|
|
# todo vault balance checks
|
|
token = order.order.tokenIn
|
|
log.debug(f'insufficient funds {req.vault} {token} ')
|
|
elif error == 'TF':
|
|
# Tranche Filled
|
|
log.warning(f'tranche already filled {tk}')
|
|
try:
|
|
triggers = OrderTriggers.instances[order.key]
|
|
tranche_trigger = triggers.triggers[tk.tranche_index]
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
tranche_trigger.status = TrancheStatus.Filled
|
|
tranche_trigger.disable()
|
|
elif error == 'Too little received':
|
|
# from UniswapV3 SwapRouter when not even 1 satoshi of output was gained
|
|
log.debug('warning: de minimis liquidity in pool')
|
|
# todo dont keep trying
|
|
else:
|
|
log.error(f'Unhandled execution error for transaction request {req} ERROR: "{error}"')
|
|
try:
|
|
er = execution_requests[tk]
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
if er.height < current_block.get().height:
|
|
del execution_requests[tk]
|
|
|
|
|
|
last_ohlc_rollover = 0
|
|
def check_ohlc_rollover():
|
|
global last_ohlc_rollover
|
|
time = current_block.get().timestamp
|
|
dt = from_timestamp(time)
|
|
diff = time - last_ohlc_rollover
|
|
if diff >= 60 or dt.minute != from_timestamp(last_ohlc_rollover).minute:
|
|
for (symbol, period) in recent_ohlcs.keys():
|
|
ohlcs.update(symbol, period, dt)
|
|
last_ohlc_rollover = time
|
|
|