Files
backend/src/dexorder/event_handler.py
2025-02-24 09:47:34 -04:00

227 lines
8.9 KiB
Python

import asyncio
import logging
from web3.types import EventData
from dexorder import db, metric, current_w3
from dexorder.accounting import accounting_fill, accounting_placement, accounting_transfer, is_tracked_address, \
accounting_lock
from dexorder.base.chain import current_chain
from dexorder.base.order import TrancheKey, OrderKey
from dexorder.base.orderlib import SwapOrderState
from dexorder.blocks import get_block_timestamp
from dexorder.blockstate import current_blockstate
from dexorder.contract.dexorder import VaultContract, get_factory_contract
from dexorder.database.model import VaultCreationRequest
from dexorder.impls import get_impl_version
from dexorder.ohlc import ohlcs
from dexorder.order.orderstate import Order
from dexorder.order.triggers import (OrderTriggers, activate_order, update_balance_triggers, start_trigger_updates,
update_price_triggers, TimeTrigger, PriceLineTrigger)
from dexorder.pools import new_pool_prices, pool_prices, get_uniswap_data
from dexorder.util import hexstr
from dexorder.vault_blockdata import vault_owners, adjust_balance, verify_vault, publish_vaults
log = logging.getLogger(__name__)
def dump_log(eventlog):
log.debug(f'\t{eventlog}')
def init():
new_pool_prices.clear()
start_trigger_updates()
accounting_lock()
async def handle_order_placed(event: EventData):
# event DexorderSwapPlaced (uint64 startOrderIndex, uint8 numOrders, uint);
addr = event['address']
try:
start_index = int(event['args']['startOrderIndex'])
num_orders = int(event['args']['numOrders'])
except KeyError:
log.warning(f'Rogue DexorderSwapPlaced in tx {hexstr(event["transactionHash"])}')
return
log.debug(f'DexorderSwapPlaced {addr} {start_index} {num_orders}')
if not await verify_vault(addr):
log.warning(f'Discarding order from rogue vault {addr}.')
return
await accounting_placement(event)
metric.orders.inc()
contract = None
for index in range(start_index, start_index+num_orders):
key = OrderKey(addr, index)
if key not in Order.instances:
if contract is None:
contract = VaultContract(addr)
obj = await contract.swapOrderStatus(index)
log.debug(f'raw order status {obj}')
order = Order.create(addr, index, event['transactionHash'], obj)
await activate_order(order)
log.debug(f'new order {order.key}{order}')
async def handle_swap_filled(event: EventData):
# event DexorderSwapFilled (uint64 orderIndex, uint8 trancheIndex, uint256 amountIn, uint256 amountOut);
log.debug(f'DexorderSwapFilled {event}')
args = event['args']
vault = event['address']
try:
order_index = args['orderIndex']
tranche_index = args['trancheIndex']
amount_in = args['amountIn']
amount_out = args['amountOut']
fill_fee = args['fillFee']
next_execution_time = args['nextExecutionTime']
except KeyError:
log.warning(f'Rogue DexorderSwapFilled in tx {hexstr(event["transactionHash"])}')
return
try:
order: Order = Order.of(vault, order_index)
except KeyError:
log.warning(f'DexorderSwapFilled IGNORED due to missing order {vault} {order_index}')
return
value = await accounting_fill(event, order.order.tokenOut)
if value is not None:
metric.volume.inc(float(value))
order.status.trancheStatus[tranche_index].activationTime = next_execution_time # update rate limit
try:
triggers = OrderTriggers.instances[order.key]
except KeyError:
log.warning(f'No order triggers for fill of {TrancheKey(order.key.vault, order.key.order_index, tranche_index)}')
else:
time = await get_block_timestamp(event['blockHash'])
triggers.fill(hexstr(event['transactionHash']), time, tranche_index, amount_in, amount_out, fill_fee, next_execution_time)
async def handle_order_canceled(event: EventData):
# event DexorderCanceled (uint64 orderIndex);
log.debug(f'DexorderCanceled {event}')
vault = event['address']
order_index = event['args']['orderIndex']
try:
order: Order = Order.of(vault, order_index)
order.complete(SwapOrderState.Canceled)
log.debug(f'Canceled order {OrderKey(vault,order_index)}')
except KeyError:
log.warning(f'DexorderCanceled IGNORED due to missing order {vault} {order_index}')
async def handle_order_cancel_all(event: EventData):
# event DexorderCancelAll (uint64 numSwapOrders);
log.debug(f'DexorderCancelAll {event}')
vault = event['address']
cancelAllIndex = event['args']['cancelAllIndex']
for orderIndex in Order.vault_open_orders.get(vault,[]):
if orderIndex < cancelAllIndex:
try:
order: Order = Order.of(vault, orderIndex)
order.complete(SwapOrderState.Canceled)
log.debug(f'Canceled order {OrderKey(vault,orderIndex)}')
except KeyError:
log.warning(f'DexorderCanceled IGNORED due to missing order {vault} {orderIndex}')
async def handle_transfer(transfer: EventData):
# log.debug(f'Transfer {transfer}')
from_address = transfer['args']['from']
to_address = transfer['args']['to']
amount = int(transfer['args']['value'])
token_address = transfer['address']
if to_address in vault_owners:
log.debug(f'deposit {to_address} {amount}')
vault = to_address
elif from_address in vault_owners:
log.debug(f'withdraw {to_address} {amount}')
vault = from_address
else:
vault = None
if vault is not None:
await adjust_balance(vault, token_address, amount)
await update_balance_triggers(vault, token_address, amount)
if is_tracked_address(to_address):
# noinspection PyTypeChecker
await accounting_transfer(transfer, token_address, from_address, to_address, amount, adjust_decimals=True)
async def handle_uniswap_swaps(swaps: list[EventData]):
# asynchronously prefetch the block timestamps we'll need
hashes = set(swap['blockHash'] for swap in swaps)
await asyncio.gather(*[get_block_timestamp(h) for h in hashes])
# now execute the swaps synchronously
for swap in swaps:
await handle_uniswap_swap(swap)
async def handle_uniswap_swap(swap: EventData):
data = await get_uniswap_data(swap)
if data is None:
return
pool, time, price = data
addr = pool['address']
pool_prices[addr] = price
await ohlcs.update_all(addr, time, price)
await update_price_triggers(pool, price)
# log.debug(f'pool {addr} {minutely(time)} {price}')
async def handle_vault_created(created: EventData):
try:
owner = created['args']['owner']
num = created['args']['num']
addr = created['address']
except KeyError:
log.debug('couldnt parse event data for VaultCreated', created)
return
# stop trying to create the vault
chain_id = current_chain.get().id
db_req = db.session.get(VaultCreationRequest, (chain_id, owner, num))
if db_req is None:
log.warning(f'could not find vault creation request {chain_id}|{owner}|{num}')
else:
db_req.vault = addr
# Verify the authenticity of the vault. We are permissive on Mockchain due to irregular restarts of various components
if not await verify_vault(addr, owner, num):
log.warning(f'Discarding rogue vault {addr}')
return
vault_owners[addr] = owner
log.debug(f'VaultCreated {owner} #{num} => {addr}')
publish_vaults(chain_id, owner)
# BlockData doesn't have an easy way to calculate exact sizes because some keys could hold DELETE values, so
# this is actually an upper limit on the size.
approx_size = len(current_blockstate.get().diffs_by_series)
metric.vaults.set(approx_size)
async def handle_vault_impl_changed(upgrade: EventData):
addr = upgrade['address']
# this event could come from the VaultFactory
if addr == get_factory_contract().address:
log.info(f'Default VaultImpl changed for VaultFactory {addr} to implementation {addr}')
return
# otherwise it's from a Vault
try:
impl = upgrade['args']['impl']
except KeyError:
log.debug('Could not parse VaultImplChanged', upgrade)
return
if not await verify_vault(addr):
log.warning(f'Ignoring "upgrade" of rogue vault {addr}')
return
version = await get_impl_version(impl)
log.debug(f'Vault {addr} upgraded to impl version {version}')
async def get_gas_price():
return await current_w3.get().eth.gas_price
def initialize_metrics():
metric.vaults.set_function(vault_owners.upper_len)
metric.open_orders.set_function(Order.open_orders.upper_len)
metric.triggers_time.set_function(lambda: len(TimeTrigger.all))
metric.triggers_line.set_function(lambda: len(PriceLineTrigger.triggers_set))
metric.gas_price.set_function(get_gas_price)