From 54f0687f64793ccce363269fb5f2a2ded19b5d57 Mon Sep 17 00:00:00 2001 From: Tim Olson <> Date: Tue, 7 Nov 2023 12:58:27 -0400 Subject: [PATCH] bugfixes --- src/dexorder/blockstate/blockdata.py | 14 +++++--- src/dexorder/contract/__init__.py | 4 +++ src/dexorder/contract/decimals.py | 4 +-- src/dexorder/event_handler.py | 47 ++++++++++++++----------- src/dexorder/memcache/__init__.py | 4 +-- src/dexorder/memcache/memcache_state.py | 8 +++-- src/dexorder/runner.py | 6 ++-- 7 files changed, 52 insertions(+), 35 deletions(-) diff --git a/src/dexorder/blockstate/blockdata.py b/src/dexorder/blockstate/blockdata.py index 20ed475..1e83154 100644 --- a/src/dexorder/blockstate/blockdata.py +++ b/src/dexorder/blockstate/blockdata.py @@ -42,6 +42,10 @@ class BlockData: self.str2value = str2value self.lazy_getitem = None + @property + def seriesstr(self): + return self.series2str(self.series) + def setitem(self, item, value, overwrite=True): state = current_blockstate.get() fork = current_fork.get() @@ -159,10 +163,12 @@ class BlockDict(Generic[K,V], BlockData): return self.getitem(item, default) def modify(self, item: K, func: Callable[[V],V], default: V=NARG) -> V: - try: - result = func(self.getitem(item, default)) - except KeyError: - result = func(NARG) + result = func(self.getitem(item, default)) + self.setitem(item, result) + return result + + async def async_modify(self, item: K, func: Callable[[V],V], default: V=NARG) -> V: + result = await func(self.getitem(item, default)) self.setitem(item, result) return result diff --git a/src/dexorder/contract/__init__.py b/src/dexorder/contract/__init__.py index 62c0e23..d3bbfc3 100644 --- a/src/dexorder/contract/__init__.py +++ b/src/dexorder/contract/__init__.py @@ -19,3 +19,7 @@ def get_contract_data(name): def get_contract_event(contract_name:str, event_name:str): return getattr(_current_w3.get().eth.contract(abi=get_contract_data(contract_name)['abi']).events, event_name)() + +class ERC20 (ContractProxy): + def __init__(self, addr): + super().__init__(addr, 'ERC20') diff --git a/src/dexorder/contract/decimals.py b/src/dexorder/contract/decimals.py index cc16240..f5f4446 100644 --- a/src/dexorder/contract/decimals.py +++ b/src/dexorder/contract/decimals.py @@ -1,7 +1,7 @@ import logging from dexorder import db -from dexorder.contract import ContractProxy +from dexorder.contract import ERC20 log = logging.getLogger(__name__) @@ -11,6 +11,6 @@ async def token_decimals(addr): try: return db.kv[key] except KeyError: - decimals = await ContractProxy(addr, 'ERC20').decimals() + decimals = await ERC20(addr).decimals() db.kv[key] = decimals return decimals diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index 54bba0a..25977fd 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -1,3 +1,4 @@ +import functools import logging from uuid import UUID @@ -9,7 +10,7 @@ from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRe from dexorder.transaction import create_transactions, submit_transaction_request, handle_transaction_receipts, send_transactions from dexorder.uniswap import UniswapV3Pool, uniswap_price from dexorder.contract.dexorder import get_factory_contract, vault_address, VaultContract, get_dexorder_contract -from dexorder.contract import get_contract_event +from dexorder.contract import get_contract_event, ERC20 from dexorder.data import pool_prices, vault_owners, vault_balances, new_pool_prices from dexorder.database.model.block import current_block from dexorder.database.model.transaction import TransactionJob @@ -131,7 +132,7 @@ def handle_swap_filled(event: EventData): triggers.fill(tranche_index, amount_in, amount_out) # check for fill if order.remaining <= 0: - close_order_and_disable_triggers(order.key, SwapOrderState.Filled) + close_order_and_disable_triggers(order, SwapOrderState.Filled) async def handle_order_completed(event: EventData): # event DexorderCompleted (uint64 orderIndex); // todo remove? @@ -150,7 +151,24 @@ def handle_order_error(event: EventData): # event DexorderError (uint64 orderIndex, string reason); log.debug(f'DexorderError {event}') -def handle_transfer(transfer: EventData): + +def balance_adjuster(vault, token_address, amount): + async def _adjust(vaddr, taddr, amt, old_balances): + result = dict(old_balances) # copy + try: + old = old_balances[vaddr] + new_amt = old + amt + if new_amt < 0: + log.warning(f'NEGATIVE BALANCE for vault {current_chain.get()} {vault} token {taddr} {old} {amt:+} = {new_amt}') + new_amt = 0 + except KeyError: + new_amt = await ERC20(taddr).balanceOf(vaddr) + result[taddr] = new_amt + return result + return functools.partial(_adjust, vault, token_address, amount) + + +async def handle_transfer(transfer: EventData): # todo handle native transfers incl gas for token transfers log.debug(f'Transfer {transfer}') from_address = transfer['args']['from'] @@ -160,28 +178,15 @@ def handle_transfer(transfer: EventData): log.debug(f'deposit {to_address} {amount}') vault = to_address token_address = transfer['address'] - def transfer_in(d): - result = dict(d) - result[token_address] = result.get(token_address, 0) + amount - return result - vault_balances.modify(vault, transfer_in, default={}) + await vault_balances.async_modify(vault, balance_adjuster(vault, token_address, +amount), default={}) if from_address in vault_owners and to_address != from_address: log.debug(f'withdraw {to_address} {amount}') vault = from_address token_address = transfer['address'] - def transfer_out(d): - result = dict(d) - result[token_address] = new_value = result.get(token_address, 0) - amount - if new_value < 0: - log.warning(f'Negative balance in vault {vault}:\n{d} - {token_address} : {amount}') - # value = await ContractProxy(from_address, 'ERC20').balanceOf(from_address) - return result - vault_balances.modify(vault, transfer_out, default={}) - # todo check for negative balances. - - if to_address not in vault_owners and from_address not in vault_owners: - vaults = vault_owners.keys() - log.debug(f'vaults: {list(vaults)}') + await vault_balances.async_modify(vault, balance_adjuster(vault, token_address, amount), default={}) + # if to_address not in vault_owners and from_address not in vault_owners: + # vaults = vault_owners.keys() + # log.debug(f'vaults: {list(vaults)}') diff --git a/src/dexorder/memcache/__init__.py b/src/dexorder/memcache/__init__.py index ad40c5e..27d7839 100644 --- a/src/dexorder/memcache/__init__.py +++ b/src/dexorder/memcache/__init__.py @@ -2,7 +2,7 @@ import logging from contextlib import asynccontextmanager from contextvars import ContextVar -import redis.asyncio as redis +import redis.asyncio as redis_async from redis.asyncio import Redis from redis.asyncio.client import Pipeline @@ -29,7 +29,7 @@ class Memcache: async def connect(redis_url=None): if redis_url is None: redis_url = config.redis_url - r = await redis.from_url(redis_url, decode_responses=True, protocol=3) + r = await redis_async.from_url(redis_url, decode_responses=True, protocol=3) current_redis.set(r) return r diff --git a/src/dexorder/memcache/memcache_state.py b/src/dexorder/memcache/memcache_state.py index 7e8c52c..1ba036a 100644 --- a/src/dexorder/memcache/memcache_state.py +++ b/src/dexorder/memcache/memcache_state.py @@ -26,9 +26,12 @@ class RedisState (SeriesCollection): super().__init__(series_or_datavars) self.exists:set[str] = set() + # noinspection PyMethodMayBeStatic async def clear(self): + log.debug('clearing memcache') r = current_redis.get() - await r.delete(f'{current_chain.get().chain_id}|latest_block', *self.datas.keys()) + result = await r.delete(*[f'{current_chain.get().chain_id}|{k}' for k in ['latest_block', *self.datas.keys()]]) + print(result) async def init(self, state: BlockState): @@ -61,8 +64,7 @@ class RedisState (SeriesCollection): series = f'{chain_id}|{d.series2str(diff.series)}' key = d.key2str(diff.key) value = d.value2str(diff.value) - if type(value) is not str: - raise RuntimeError + # pub/sub socketio/redis pub_era = d.opts.get('pub') # event, room, args if pub_era is True: diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index 6a7c1f9..c85ea0a 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -8,7 +8,7 @@ from web3.exceptions import LogTopicError, MismatchedABI # noinspection PyPackageRequirements from websockets.exceptions import ConnectionClosedError -from dexorder import Blockchain, db, blockchain, current_pub, async_yield, current_w3 +from dexorder import Blockchain, db, current_pub, async_yield, current_w3 from dexorder.base.chain import current_chain from dexorder.base.fork import current_fork, Fork from dexorder.blockchain.connection import create_w3_ws @@ -209,8 +209,8 @@ class BlockStateRunner: for log_event in log_events: try: parsed = event.process_log(log_event) if event is not None else log_event - except (LogTopicError, MismatchedABI): - pass + except (LogTopicError, MismatchedABI) as x: + log.warning(f'logevent parse error {x}\n{log_event}') else: # todo try/except for known retryable errors await maywait(callback(parsed))