From c8f65a630619b6b282e8a14ce56cde01e1c50f0d Mon Sep 17 00:00:00 2001 From: Tim Date: Tue, 13 Feb 2024 23:25:03 -0400 Subject: [PATCH] complete rework of pool and token metadata into the address metadata blockdict --- .../versions/db62e7db828d_initial_schema.py | 13 ++ src/dexorder/__init__.py | 1 - src/dexorder/addrmeta.py | 31 +++++ src/dexorder/base/token.py | 121 ------------------ src/dexorder/bin/backfill_ohlc.py | 10 +- src/dexorder/configuration/schema.py | 1 - src/dexorder/database/__init__.py | 1 - src/dexorder/database/model/__init__.py | 1 + src/dexorder/database/model/orderindex.py | 1 - src/dexorder/database/model/pool.py | 33 ++++- src/dexorder/database/model/token.py | 41 ++++++ src/dexorder/event_handler.py | 14 +- src/dexorder/memcache/memcache_state.py | 2 +- src/dexorder/order/orderstate.py | 2 +- src/dexorder/order/triggers.py | 14 +- src/dexorder/pools.py | 114 +++++++---------- src/dexorder/runner.py | 5 +- src/dexorder/tokens.py | 33 +++++ src/dexorder/{data.py => vault_blockdata.py} | 0 19 files changed, 222 insertions(+), 216 deletions(-) create mode 100644 src/dexorder/addrmeta.py delete mode 100644 src/dexorder/base/token.py create mode 100644 src/dexorder/database/model/token.py create mode 100644 src/dexorder/tokens.py rename src/dexorder/{data.py => vault_blockdata.py} (100%) diff --git a/alembic/versions/db62e7db828d_initial_schema.py b/alembic/versions/db62e7db828d_initial_schema.py index 6fc2a17..ef5121a 100644 --- a/alembic/versions/db62e7db828d_initial_schema.py +++ b/alembic/versions/db62e7db828d_initial_schema.py @@ -74,6 +74,13 @@ def upgrade() -> None: sa.Column('state', sa.Enum('Open', 'Canceled', 'Filled', 'Expired', 'Underfunded', name='swaporderstate'), nullable=False), sa.PrimaryKeyConstraint('chain', 'vault', 'order_index') ) + op.create_table('token', + sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False), + sa.Column('address', dexorder.database.column_types.Address(), nullable=False), + sa.Column('symbol', sa.String(), nullable=False), + sa.Column('decimals', sa.SMALLINT(), nullable=False), + sa.PrimaryKeyConstraint('chain', 'address') + ) op.create_table('pool', sa.Column('chain', dexorder.database.column_types.Blockchain(), nullable=False), sa.Column('address', dexorder.database.column_types.Address(), nullable=False), @@ -81,11 +88,17 @@ def upgrade() -> None: sa.Column('base', dexorder.database.column_types.Address(), nullable=False), sa.Column('quote', dexorder.database.column_types.Address(), nullable=False), sa.Column('fee', sa.Integer(), nullable=False), + sa.Column('decimals', sa.Integer(), nullable=False), sa.PrimaryKeyConstraint('chain', 'address') ) + op.create_index(op.f('ix_pool_base'), 'pool', ['base'], unique=False) + op.create_index(op.f('ix_pool_quote'), 'pool', ['quote'], unique=False) def downgrade() -> None: + op.drop_table('token') + op.drop_index(op.f('ix_pool_quote'), table_name='pool') + op.drop_index(op.f('ix_pool_base'), table_name='pool') op.drop_table('pool') op.drop_table('orderindex') op.drop_table('seriesset') diff --git a/src/dexorder/__init__.py b/src/dexorder/__init__.py index 98945c2..4c3edbd 100644 --- a/src/dexorder/__init__.py +++ b/src/dexorder/__init__.py @@ -54,5 +54,4 @@ from .util import async_yield from .base.fixed import Fixed2, FixedDecimals, Dec18 from .configuration import config from .base.account import Account -from .base.token import Token, tokens from .database import db diff --git a/src/dexorder/addrmeta.py b/src/dexorder/addrmeta.py new file mode 100644 index 0000000..cd15203 --- /dev/null +++ b/src/dexorder/addrmeta.py @@ -0,0 +1,31 @@ +import logging +from typing import TypedDict + +from dexorder import db +from dexorder.blockstate import BlockDict +from dexorder.database.model import Pool +from dexorder.database.model.pool import PoolDict +from dexorder.database.model.token import Token, TokenDict + +log = logging.getLogger(__name__) + +# address_metadata is a polymorphic BlockDict which maps address keys to a dict of metadata describing the address +# used for Tokens and Pools + + +class AddressMetadata (TypedDict): + type: str + + +def save_addrmeta(address: str, meta: AddressMetadata): + if meta['type'] == 'Token': + meta: TokenDict + db.session.add(Token.load(meta)) + elif meta['type'] == 'Pool': + meta: PoolDict + db.session.add(Pool.load(meta)) + else: + log.warning(f'Address {address} had unknown metadata type {meta["type"]}') + + +address_metadata: BlockDict[str,AddressMetadata] = BlockDict('a', redis=True, db=True, savecb=save_addrmeta) diff --git a/src/dexorder/base/token.py b/src/dexorder/base/token.py deleted file mode 100644 index 4b71340..0000000 --- a/src/dexorder/base/token.py +++ /dev/null @@ -1,121 +0,0 @@ -from collections import defaultdict -from decimal import Decimal - -from sqlalchemy.orm import Mapped -from web3 import Web3 - -from dexorder import config, Blockchain, NARG, FixedDecimals, ADDRESS_0, current_w3 -from dexorder.base.account import current_account -from dexorder.blockchain import ByBlockchainDict -from dexorder.base.chain import current_chain -from dexorder.contract import ContractProxy, abis -import dexorder.database.column as col - - -class Token (ContractProxy, FixedDecimals): - chain: Mapped[col.Blockchain] - address: Mapped[col.Address] - decimals: Mapped[col.Uint8] - name: Mapped[str] - symbol: Mapped[str] - - - @staticmethod - def get(name_or_address:str, *, chain_id=None) -> 'Token': - try: - return tokens.get(name_or_address, default=NARG, chain_id=chain_id) - except KeyError: - try: - # noinspection PyTypeChecker - return Web3.to_checksum_address(name_or_address) - except ValueError: - raise ValueError(f'Could not resolve token {name_or_address} for chain {current_chain.get().chain_id}') - - def __init__(self, chain_id, address, decimals, symbol, name, *, abi=None): - FixedDecimals.__init__(self, decimals) - if abi is None: - load = 'ERC20' - else: - load = None - abi = abis.get(abi,abi) - ContractProxy.__init__(self, address, load, abi=abi) - self.chain_id = chain_id - # noinspection PyTypeChecker - self.address = address - self.decimals = decimals - self.symbol = symbol - self.name = name - - def balance(self, address: str = None) -> int: - if address is None: - address = current_account.get().address - return self.balanceOf(address) - - def balance_dec(self, address: str = None) -> Decimal: - return self.dec(self.balance(address)) - - - def __str__(self): - return self.symbol - - def __repr__(self): - return f'{self.symbol}({self.address},{self.decimals})' - - def __eq__(self, other): - return self.chain_id == other.chain_id and self.address == other.address - - def __hash__(self): - return hash((self.chain_id,self.address)) - - -class NativeToken (FixedDecimals): - """ Token-like but not a contract. """ - - @staticmethod - def get( chain_id = None) -> 'NativeToken': - if chain_id is None: - chain_id = current_chain.get().chain_id - return _native_tokens[chain_id] - - def __init__(self, chain_id, decimals, symbol, name, *, wrapper_token = None): - self.chain_id = chain_id - self.address = ADDRESS_0 # todo i think there's actually an address? like 0x11 or something? - super().__init__(decimals) - self.symbol = symbol - self.name = name - self._wrapper_token = wrapper_token if wrapper_token is not None else _tokens_by_chain[chain_id]['W'+symbol] - - def balance(self, address: str = None) -> int: - if address is None: - address = current_account.get() - assert current_chain.get().chain_id == self.chain_id - return current_w3.get().eth.get_balance(address) - - def balance_dec(self, address: str = None) -> Decimal: - return self.dec(self.balance(address)) - - @property - def wrapper(self) -> Token: - return self._wrapper_token - - def __repr__(self): - return self.symbol - - - -# convert TokenConfigs to Tokens -_tokens_by_chain:dict[int,dict[str,Token]] = defaultdict(dict) -for _c in config.tokens: - _chain_id = Blockchain.get(_c.chain).chain_id - _tokens_by_chain[_chain_id][_c.symbol] = Token(_chain_id, _c.address, _c.decimals, _c.symbol, _c.name, abi=_c.abi) - -_native_tokens: dict[int, NativeToken] = { - # Ethereum.chain_id: NativeToken(Ethereum.chain_id, 18, 'ETH', 'Ether'), # todo need WETH on Ethereum - # Polygon.chain_id: NativeToken(Polygon.chain_id, 18, 'MATIC', 'Polygon'), -} - -for _chain_id, _native in _native_tokens.items(): - # noinspection PyTypeChecker - _tokens_by_chain[_chain_id][_native.symbol] = _native - -tokens = ByBlockchainDict[Token](_tokens_by_chain) diff --git a/src/dexorder/bin/backfill_ohlc.py b/src/dexorder/bin/backfill_ohlc.py index 8d517db..b6dda70 100644 --- a/src/dexorder/bin/backfill_ohlc.py +++ b/src/dexorder/bin/backfill_ohlc.py @@ -41,13 +41,13 @@ async def main(): config.ohlc_dir = './ohlc' ohlcs.dir = config.ohlc_dir await blockchain.connect() - redis_state = None state = None - if memcache: - await memcache.connect() - redis_state = RedisState([recent_ohlcs]) # NOTE: ONLY the ohlc's are pushed to Redis. We do not want to touch anything else. + if not memcache: + log.error('backfill requires a memcache server') + await memcache.connect() + redis_state = RedisState([recent_ohlcs]) # NOTE: ONLY the ohlc's are pushed to Redis. We do not want to touch anything else. if db: - db.connect(url=config.datadb_url) # our main database is the data db + db.connect() db_state = DbState(BlockData.by_opt('db')) with db.session: state = db_state.load() diff --git a/src/dexorder/configuration/schema.py b/src/dexorder/configuration/schema.py index 3f43849..9853c40 100644 --- a/src/dexorder/configuration/schema.py +++ b/src/dexorder/configuration/schema.py @@ -13,7 +13,6 @@ class Config: ws_url: Optional[str] = 'ws://localhost:8545' rpc_urls: Optional[dict[str,str]] = field(default_factory=dict) db_url: Optional[str] = 'postgresql://dexorder:redroxed@localhost/dexorder' - datadb_url: Optional[str] = 'postgresql://dexorder:redroxed@localhost/dexorderdata' ohlc_dir: Optional[str] = None # if empty string or None, then OHLC's are not saved to disk dump_sql: bool = False redis_url: Optional[str] = 'redis://localhost:6379' diff --git a/src/dexorder/database/__init__.py b/src/dexorder/database/__init__.py index 8908956..570296f 100644 --- a/src/dexorder/database/__init__.py +++ b/src/dexorder/database/__init__.py @@ -99,4 +99,3 @@ class Db: raise Exception(f'{url} database version not found') db = Db() -datadb = Db('datadb_url') diff --git a/src/dexorder/database/model/__init__.py b/src/dexorder/database/model/__init__.py index cc39fcb..eefb9d0 100644 --- a/src/dexorder/database/model/__init__.py +++ b/src/dexorder/database/model/__init__.py @@ -5,3 +5,4 @@ from .series import SeriesSet, SeriesDict from .transaction import Transaction, TransactionJob from .orderindex import OrderIndex from .pool import Pool +from .token import Token diff --git a/src/dexorder/database/model/orderindex.py b/src/dexorder/database/model/orderindex.py index 35fa33d..aa15fa4 100644 --- a/src/dexorder/database/model/orderindex.py +++ b/src/dexorder/database/model/orderindex.py @@ -1,6 +1,5 @@ import logging -from sqlalchemy import SMALLINT from sqlalchemy.orm import Mapped, mapped_column from dexorder.database.column import Blockchain diff --git a/src/dexorder/database/model/pool.py b/src/dexorder/database/model/pool.py index aa141b6..3b9156e 100644 --- a/src/dexorder/database/model/pool.py +++ b/src/dexorder/database/model/pool.py @@ -1,4 +1,5 @@ import logging +from typing import TypedDict from sqlalchemy.orm import Mapped, mapped_column @@ -8,12 +9,40 @@ from dexorder.database.model import Base log = logging.getLogger(__name__) + +class PoolDict (TypedDict): + type: str + chain: int + address: str + exchange: int + base: str + quote: str + fee: int + decimals: int + + class Pool (Base): __tablename__ = 'pool' chain: Mapped[Blockchain] = mapped_column(primary_key=True) address: Mapped[Address] = mapped_column(primary_key=True) exchange: Mapped[Exchange] - base: Mapped[Address] - quote: Mapped[Address] + base: Mapped[Address] = mapped_column(index=True) # index for searching by token addr + quote: Mapped[Address] = mapped_column(index=True) # index for searching by token addr fee: Mapped[int] # in millionths aka 100ths of a bip + decimals: Mapped[int] + + + @staticmethod + def load(pool_dict: PoolDict) -> 'Pool': + return Pool(chain=Blockchain.get(pool_dict['chain']), address=pool_dict['address'], + exchange=Exchange(pool_dict['exchange']), + base=pool_dict['base'], quote=pool_dict['quote'], + fee=pool_dict['fee'], decimals=pool_dict['decimals']) + + + def dump(self): + return PoolDict(chain=self.chain.chain_id, address=self.address, + exchange=self.exchange.value, + base=self.base.address, quote=self.quote, + fee=self.fee, decimals=self.decimals) diff --git a/src/dexorder/database/model/token.py b/src/dexorder/database/model/token.py new file mode 100644 index 0000000..88d564a --- /dev/null +++ b/src/dexorder/database/model/token.py @@ -0,0 +1,41 @@ +import logging +from typing import TypedDict + +from sqlalchemy.orm import Mapped, mapped_column + +from dexorder.database.column import Address, Blockchain, Uint8 +from dexorder.database.model import Base + +log = logging.getLogger(__name__) + +# TokenDict is the primary dict we use in-memory, with basic JSON-able types + +class TokenDict (TypedDict): + type: str + chain: int + address: str + symbol: str + decimals: int + + +# the database object is primarily write-only so we are able to index queries for pools-by-token from the nodejs server + +class Token (Base): + __tablename__ = 'token' + + chain: Mapped[Blockchain] = mapped_column(primary_key=True) + address: Mapped[Address] = mapped_column(primary_key=True) + symbol: Mapped[str] + decimals: Mapped[Uint8] + + + @staticmethod + def load(token_dict: TokenDict) -> 'Token': + return Token(chain=Blockchain.get(token_dict['chain']), address=token_dict['address'], + symbol=token_dict['symbol'], decimals=token_dict['decimals']) + + + def dump(self): + return TokenDict(type='Token', chain=self.chain.chain_id, address=self.address, + symbol=self.symbol, decimals=self.decimals) + diff --git a/src/dexorder/event_handler.py b/src/dexorder/event_handler.py index c0bcf6c..bcc4a09 100644 --- a/src/dexorder/event_handler.py +++ b/src/dexorder/event_handler.py @@ -10,10 +10,10 @@ from dexorder.base.chain import current_chain, current_clock, get_block_timestam from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, OrderKey from dexorder.ohlc import ohlcs, recent_ohlcs from dexorder.transaction import submit_transaction_request -from dexorder.pools import uniswap_price, new_pool_prices, pool_prices, Pools +from dexorder.pools import uniswap_price, new_pool_prices, pool_prices, get_pool from dexorder.contract.dexorder import vault_address, VaultContract from dexorder.contract import ERC20 -from dexorder.data import vault_owners, vault_balances +from dexorder.vault_blockdata import vault_owners, vault_balances from dexorder.database.model.block import current_block from dexorder.database.model.transaction import TransactionJob from dexorder.base.orderlib import SwapOrderState, Exchange @@ -158,7 +158,7 @@ async def handle_uniswap_swap_old(swap: EventData): except KeyError: return addr = swap['address'] - price: dec = await uniswap_price(await Pools.get(addr), sqrt_price) + price: dec = await uniswap_price(await get_pool(addr), sqrt_price) pool_prices[addr] = price @@ -168,11 +168,9 @@ async def handle_uniswap_swap(swap: EventData): except KeyError: return addr = swap['address'] - pool = await Pools.get(addr) - if pool is None: - return - if pool.exchange != Exchange.UniswapV3: - log.debug(f'Ignoring {pool.exchange} pool {addr}') + pool = await get_pool(addr) + if pool['exchange'] != Exchange.UniswapV3.value: + log.debug(f'Ignoring {Exchange(pool["exchange"])} pool {addr}') return price: dec = await uniswap_price(pool, sqrt_price) timestamp = await get_block_timestamp(swap['blockHash']) diff --git a/src/dexorder/memcache/memcache_state.py b/src/dexorder/memcache/memcache_state.py index 2fd5d1c..7b42bdd 100644 --- a/src/dexorder/memcache/memcache_state.py +++ b/src/dexorder/memcache/memcache_state.py @@ -115,5 +115,5 @@ async def publish_all(pubs: list[tuple[str,str,list[Any]]]): r: Pipeline io = Emitter(dict(client=r)) for room, event, args in pubs: - log.debug(f'publishing {room} {event} {args}') + # log.debug(f'publishing {room} {event} {args}') io.To(room).Emit(event, *args) diff --git a/src/dexorder/order/orderstate.py b/src/dexorder/order/orderstate.py index 122dc22..c71724d 100644 --- a/src/dexorder/order/orderstate.py +++ b/src/dexorder/order/orderstate.py @@ -7,7 +7,7 @@ from dexorder import DELETE, db from dexorder.base.chain import current_chain from dexorder.base.order import OrderKey, TrancheKey from dexorder.blockstate import BlockDict, BlockSet -from dexorder.data import vault_owners +from dexorder.vault_blockdata import vault_owners from dexorder.database.model.orderindex import OrderIndex from dexorder.base.orderlib import SwapOrderStatus, SwapOrderState diff --git a/src/dexorder/order/triggers.py b/src/dexorder/order/triggers.py index 782f65e..e560ad2 100644 --- a/src/dexorder/order/triggers.py +++ b/src/dexorder/order/triggers.py @@ -11,7 +11,7 @@ from .orderstate import Order from .. import dec from ..base.chain import current_clock from ..base.order import OrderKey, TrancheKey, ExecutionRequest -from ..pools import ensure_pool_price, Pools, pool_decimals, pool_prices +from ..pools import ensure_pool_price, pool_prices, get_pool from ..routing import pool_address log = logging.getLogger(__name__) @@ -36,7 +36,7 @@ async def activate_order(order: Order): Call this to enable triggers on an order which is already in the state. """ address = pool_address(order.status.order) - pool = await Pools.get(address) + pool = await get_pool(address) await ensure_pool_price(pool) triggers = OrderTriggers(order) if triggers.closed: @@ -161,14 +161,14 @@ class TrancheTrigger: log.debug(f'price trigger ignored because trigger status is {self.status}') return log.debug(f'price trigger {cur}') + addr = pool_address(self.order.order) + pool = await get_pool(addr) if cur is None and self.has_line_constraint: - await ensure_pool_price(self.order.pool_address) - cur = pool_prices[self.order.pool_address] + await ensure_pool_price(pool) + cur = pool_prices[addr] if cur is not None: if self.pool_price_multiplier is None: - pool = await Pools.get(pool_address(self.order.order)) - pool_dec = await pool_decimals(pool) - self.pool_price_multiplier = dec(10) ** dec(-pool_dec) + self.pool_price_multiplier = dec(10) ** dec(-pool['decimals']) log.debug(f'adjusted cur price from {cur} => {cur*self.pool_price_multiplier}') cur *= self.pool_price_multiplier if cur is None or not self.has_line_constraint or all(await asyncio.gather( diff --git a/src/dexorder/pools.py b/src/dexorder/pools.py index 3847b03..e5b8f7a 100644 --- a/src/dexorder/pools.py +++ b/src/dexorder/pools.py @@ -1,55 +1,55 @@ import asyncio import logging -from typing import Optional from web3.exceptions import ContractLogicError -from dexorder import dec, db, ADDRESS_0 +from dexorder import dec, ADDRESS_0 +from dexorder.addrmeta import address_metadata from dexorder.base.chain import current_chain from dexorder.base.orderlib import Exchange from dexorder.blockstate import BlockDict from dexorder.blockstate.blockdata import K, V -from dexorder.contract.decimals import token_decimals -from dexorder.database.model.pool import Pool +from dexorder.database.model.pool import PoolDict +from dexorder.tokens import get_token from dexorder.uniswap import UniswapV3Pool, uniswapV3_pool_address log = logging.getLogger(__name__) -class Pools: - instances: dict[tuple[int,str], Pool] = {} - @staticmethod - async def get(address, *, chain=None) -> Optional[Pool]: - if not chain: - chain = current_chain.get() - key = (chain, address) - found = Pools.instances.get(key) - if not found: - found = db.session.get(Pool, key) - if not found: - # todo other exchanges - try: - v3 = UniswapV3Pool(address) - t0, t1, fee = await asyncio.gather(v3.token0(), v3.token1(), v3.fee()) - if uniswapV3_pool_address(t0, t1, fee) == address: # VALIDATE don't just trust - log.debug(f'new UniswapV3 pool at {address}') - found = Pool(chain=chain, address=address, exchange=Exchange.UniswapV3, base=t0, quote=t1, fee=fee) - db.session.add(found) - else: # NOT a genuine Uniswap V3 pool if the address test doesn't pass - log.debug(f'new Unknown pool at {address}') - found = Pool(chain=chain, address=address, exchange=Exchange.Unknown, base=ADDRESS_0, quote=ADDRESS_0, fee=0) - except ContractLogicError: - log.debug(f'new Unknown pool at {address}') - found = Pool(chain=chain, address=address, exchange=Exchange.Unknown, base=ADDRESS_0, quote=ADDRESS_0, fee=0) - except ValueError as v: - if v.args[0].get('code') == -32000: - log.debug(f'new Unknown pool at {address}') - found = Pool(chain=chain, address=address, exchange=Exchange.Unknown, base=ADDRESS_0, quote=ADDRESS_0, fee=0) - else: - raise - db.session.add(found) - Pools.instances[key] = found - return None if found.exchange == Exchange.Unknown else found +async def get_pool(address: str) -> PoolDict: + try: + return address_metadata[address] + except KeyError: + result = address_metadata[address] = await load_pool(address) + return result + + +async def load_pool(address: str) -> PoolDict: + found = None + chain_id = current_chain.get().chain_id + # todo other exchanges + try: + v3 = UniswapV3Pool(address) + t0, t1, fee = await asyncio.gather(v3.token0(), v3.token1(), v3.fee()) + if uniswapV3_pool_address(t0, t1, fee) == address: # VALIDATE don't just trust that it's a Uniswap pool + log.debug(f'new UniswapV3 pool at {address}') + token0, token1 = await asyncio.gather(get_token(t0), get_token(t1)) + decimals = token0['decimals'] - token1['decimals'] + found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.UniswapV3.value, + base=t0, quote=t1, fee=fee, decimals=decimals) + else: # NOT a genuine Uniswap V3 pool if the address test doesn't pass + log.debug(f'new Unknown pool at {address}') + except ContractLogicError: + log.debug(f'new Unknown pool at {address}') + except ValueError as v: + if v.args[0].get('code') == -32000: + log.debug(f'new Unknown pool at {address}') + else: + raise + if found is None: + found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.Unknown.value, + base=ADDRESS_0, quote=ADDRESS_0, fee=0, decimals=0) + return found class PoolPrices (BlockDict[str, dec]): @@ -67,38 +67,22 @@ new_pool_prices: dict[str, dec] = {} # tracks which prices were set during the c pool_prices: PoolPrices = PoolPrices('p', db=True, redis=True, pub=pub_pool_price, value2str=lambda d: f'{d:f}', str2value=dec) - -async def uniswap_price(pool, sqrt_price=None) -> dec: +async def uniswap_price(pool: PoolDict, sqrt_price=None) -> dec: if sqrt_price is None: - sqrt_price = (await UniswapV3Pool(pool.address).slot0())[0] - pool_dec = await pool_decimals(pool) + sqrt_price = (await UniswapV3Pool(pool['address']).slot0())[0] + pool_dec = pool['decimals'] price = dec(sqrt_price*sqrt_price) / 2 ** (96 * 2) result = price * dec(10) ** dec(pool_dec) log.debug(f'pool sqrtX96 {sqrt_price} with {pool_dec} decimals = {result}') return result -async def ensure_pool_price(pool): - # todo refactor to accept a Pool and switch on exchange type - if pool not in pool_prices: - log.debug(f'querying price for pool {pool.address}') - if pool.exchange == Exchange.UniswapV3: - pool_prices[pool.address] = await uniswap_price(pool) +async def ensure_pool_price(pool: PoolDict): + addr = pool['address'] + if addr not in pool_prices: + log.debug(f'querying price for pool {addr}') + if pool['exchange'] == Exchange.UniswapV3.value: + pool_prices[addr] = await uniswap_price(pool) else: - raise ValueError - - -_pool_decimals = {} -async def pool_decimals(pool): - if pool.exchange != Exchange.UniswapV3: - raise ValueError - found = _pool_decimals.get(pool) - if found is None: - key = f'pd|{pool.address}' - try: - found = db.kv[key] - except KeyError: - decimals0 = await token_decimals(pool.base) - decimals1 = await token_decimals(pool.quote) - found = _pool_decimals[pool] = db.kv[key] = decimals0 - decimals1 - return found + raise ValueError(f'Unsupported exchange type {pool["exchange"]}') + # todo other exchanges diff --git a/src/dexorder/runner.py b/src/dexorder/runner.py index f5946a6..cfa2e20 100644 --- a/src/dexorder/runner.py +++ b/src/dexorder/runner.py @@ -19,7 +19,6 @@ from dexorder.database.model import Block from dexorder.database.model.block import current_block, latest_block from dexorder.util import hexstr, topic from dexorder.util.async_util import maywait, Maywaitable -from dexorder.util.shutdown import fatal log = logging.getLogger(__name__) @@ -343,7 +342,9 @@ class BlockStateRunner: except ValueError as e: if e.args[0].get('code') == -32602: # too many logs were returned in the batch, so decrease the batch size. - fatal(f'Decrease batch size for {chain}') + # fatal(f'Decrease batch size for {chain}') + log.warning(f'Decrease batch size for {chain}') + return raise for log_event in log_events: try: diff --git a/src/dexorder/tokens.py b/src/dexorder/tokens.py new file mode 100644 index 0000000..e29681c --- /dev/null +++ b/src/dexorder/tokens.py @@ -0,0 +1,33 @@ +import logging + +from eth_abi.exceptions import InsufficientDataBytes +from web3.exceptions import ContractLogicError, BadFunctionCallOutput + +from dexorder.addrmeta import address_metadata +from dexorder.base.chain import current_chain +from dexorder.contract import ERC20 +from dexorder.database.model.token import TokenDict + +log = logging.getLogger(__name__) + + +async def get_token(address): + try: + return address_metadata[address] + except KeyError: + result = address_metadata[address] = await load_token(address) + return result + + +async def load_token(address: str) -> TokenDict: + contract = ERC20(address) + symbolProm = contract.symbol() + try: + decimals = await contract.decimals() + except (InsufficientDataBytes, ContractLogicError, BadFunctionCallOutput): + log.warning(f'token {address} has no decimals()') + decimals = 0 + symbol = await symbolProm + log.debug(f'new token {symbol} {address}') + return TokenDict(type='Token', chain=current_chain.get().chain_id, address=address, + symbol=symbol, decimals=decimals) diff --git a/src/dexorder/data.py b/src/dexorder/vault_blockdata.py similarity index 100% rename from src/dexorder/data.py rename to src/dexorder/vault_blockdata.py