This commit is contained in:
Tim Olson
2023-11-07 12:58:27 -04:00
parent b8b1a39dff
commit 54f0687f64
7 changed files with 52 additions and 35 deletions

View File

@@ -42,6 +42,10 @@ class BlockData:
self.str2value = str2value
self.lazy_getitem = None
@property
def seriesstr(self):
return self.series2str(self.series)
def setitem(self, item, value, overwrite=True):
state = current_blockstate.get()
fork = current_fork.get()
@@ -159,10 +163,12 @@ class BlockDict(Generic[K,V], BlockData):
return self.getitem(item, default)
def modify(self, item: K, func: Callable[[V],V], default: V=NARG) -> V:
try:
result = func(self.getitem(item, default))
except KeyError:
result = func(NARG)
result = func(self.getitem(item, default))
self.setitem(item, result)
return result
async def async_modify(self, item: K, func: Callable[[V],V], default: V=NARG) -> V:
result = await func(self.getitem(item, default))
self.setitem(item, result)
return result

View File

@@ -19,3 +19,7 @@ def get_contract_data(name):
def get_contract_event(contract_name:str, event_name:str):
return getattr(_current_w3.get().eth.contract(abi=get_contract_data(contract_name)['abi']).events, event_name)()
class ERC20 (ContractProxy):
def __init__(self, addr):
super().__init__(addr, 'ERC20')

View File

@@ -1,7 +1,7 @@
import logging
from dexorder import db
from dexorder.contract import ContractProxy
from dexorder.contract import ERC20
log = logging.getLogger(__name__)
@@ -11,6 +11,6 @@ async def token_decimals(addr):
try:
return db.kv[key]
except KeyError:
decimals = await ContractProxy(addr, 'ERC20').decimals()
decimals = await ERC20(addr).decimals()
db.kv[key] = decimals
return decimals

View File

@@ -1,3 +1,4 @@
import functools
import logging
from uuid import UUID
@@ -9,7 +10,7 @@ from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRe
from dexorder.transaction import create_transactions, submit_transaction_request, handle_transaction_receipts, send_transactions
from dexorder.uniswap import UniswapV3Pool, uniswap_price
from dexorder.contract.dexorder import get_factory_contract, vault_address, VaultContract, get_dexorder_contract
from dexorder.contract import get_contract_event
from dexorder.contract import get_contract_event, ERC20
from dexorder.data import pool_prices, vault_owners, vault_balances, new_pool_prices
from dexorder.database.model.block import current_block
from dexorder.database.model.transaction import TransactionJob
@@ -131,7 +132,7 @@ def handle_swap_filled(event: EventData):
triggers.fill(tranche_index, amount_in, amount_out)
# check for fill
if order.remaining <= 0:
close_order_and_disable_triggers(order.key, SwapOrderState.Filled)
close_order_and_disable_triggers(order, SwapOrderState.Filled)
async def handle_order_completed(event: EventData):
# event DexorderCompleted (uint64 orderIndex); // todo remove?
@@ -150,7 +151,24 @@ def handle_order_error(event: EventData):
# event DexorderError (uint64 orderIndex, string reason);
log.debug(f'DexorderError {event}')
def handle_transfer(transfer: EventData):
def balance_adjuster(vault, token_address, amount):
async def _adjust(vaddr, taddr, amt, old_balances):
result = dict(old_balances) # copy
try:
old = old_balances[vaddr]
new_amt = old + amt
if new_amt < 0:
log.warning(f'NEGATIVE BALANCE for vault {current_chain.get()} {vault} token {taddr} {old} {amt:+} = {new_amt}')
new_amt = 0
except KeyError:
new_amt = await ERC20(taddr).balanceOf(vaddr)
result[taddr] = new_amt
return result
return functools.partial(_adjust, vault, token_address, amount)
async def handle_transfer(transfer: EventData):
# todo handle native transfers incl gas for token transfers
log.debug(f'Transfer {transfer}')
from_address = transfer['args']['from']
@@ -160,28 +178,15 @@ def handle_transfer(transfer: EventData):
log.debug(f'deposit {to_address} {amount}')
vault = to_address
token_address = transfer['address']
def transfer_in(d):
result = dict(d)
result[token_address] = result.get(token_address, 0) + amount
return result
vault_balances.modify(vault, transfer_in, default={})
await vault_balances.async_modify(vault, balance_adjuster(vault, token_address, +amount), default={})
if from_address in vault_owners and to_address != from_address:
log.debug(f'withdraw {to_address} {amount}')
vault = from_address
token_address = transfer['address']
def transfer_out(d):
result = dict(d)
result[token_address] = new_value = result.get(token_address, 0) - amount
if new_value < 0:
log.warning(f'Negative balance in vault {vault}:\n{d} - {token_address} : {amount}')
# value = await ContractProxy(from_address, 'ERC20').balanceOf(from_address)
return result
vault_balances.modify(vault, transfer_out, default={})
# todo check for negative balances.
if to_address not in vault_owners and from_address not in vault_owners:
vaults = vault_owners.keys()
log.debug(f'vaults: {list(vaults)}')
await vault_balances.async_modify(vault, balance_adjuster(vault, token_address, amount), default={})
# if to_address not in vault_owners and from_address not in vault_owners:
# vaults = vault_owners.keys()
# log.debug(f'vaults: {list(vaults)}')

View File

@@ -2,7 +2,7 @@ import logging
from contextlib import asynccontextmanager
from contextvars import ContextVar
import redis.asyncio as redis
import redis.asyncio as redis_async
from redis.asyncio import Redis
from redis.asyncio.client import Pipeline
@@ -29,7 +29,7 @@ class Memcache:
async def connect(redis_url=None):
if redis_url is None:
redis_url = config.redis_url
r = await redis.from_url(redis_url, decode_responses=True, protocol=3)
r = await redis_async.from_url(redis_url, decode_responses=True, protocol=3)
current_redis.set(r)
return r

View File

@@ -26,9 +26,12 @@ class RedisState (SeriesCollection):
super().__init__(series_or_datavars)
self.exists:set[str] = set()
# noinspection PyMethodMayBeStatic
async def clear(self):
log.debug('clearing memcache')
r = current_redis.get()
await r.delete(f'{current_chain.get().chain_id}|latest_block', *self.datas.keys())
result = await r.delete(*[f'{current_chain.get().chain_id}|{k}' for k in ['latest_block', *self.datas.keys()]])
print(result)
async def init(self, state: BlockState):
@@ -61,8 +64,7 @@ class RedisState (SeriesCollection):
series = f'{chain_id}|{d.series2str(diff.series)}'
key = d.key2str(diff.key)
value = d.value2str(diff.value)
if type(value) is not str:
raise RuntimeError
# pub/sub socketio/redis
pub_era = d.opts.get('pub') # event, room, args
if pub_era is True:

View File

@@ -8,7 +8,7 @@ from web3.exceptions import LogTopicError, MismatchedABI
# noinspection PyPackageRequirements
from websockets.exceptions import ConnectionClosedError
from dexorder import Blockchain, db, blockchain, current_pub, async_yield, current_w3
from dexorder import Blockchain, db, current_pub, async_yield, current_w3
from dexorder.base.chain import current_chain
from dexorder.base.fork import current_fork, Fork
from dexorder.blockchain.connection import create_w3_ws
@@ -209,8 +209,8 @@ class BlockStateRunner:
for log_event in log_events:
try:
parsed = event.process_log(log_event) if event is not None else log_event
except (LogTopicError, MismatchedABI):
pass
except (LogTopicError, MismatchedABI) as x:
log.warning(f'logevent parse error {x}\n{log_event}')
else:
# todo try/except for known retryable errors
await maywait(callback(parsed))