complete rework of pool and token metadata into the address metadata blockdict

This commit is contained in:
Tim
2024-02-13 23:25:03 -04:00
parent 2313cf018e
commit c8f65a6306
19 changed files with 222 additions and 216 deletions

View File

@@ -54,5 +54,4 @@ from .util import async_yield
from .base.fixed import Fixed2, FixedDecimals, Dec18
from .configuration import config
from .base.account import Account
from .base.token import Token, tokens
from .database import db

31
src/dexorder/addrmeta.py Normal file
View File

@@ -0,0 +1,31 @@
import logging
from typing import TypedDict
from dexorder import db
from dexorder.blockstate import BlockDict
from dexorder.database.model import Pool
from dexorder.database.model.pool import PoolDict
from dexorder.database.model.token import Token, TokenDict
log = logging.getLogger(__name__)
# address_metadata is a polymorphic BlockDict which maps address keys to a dict of metadata describing the address
# used for Tokens and Pools
class AddressMetadata (TypedDict):
type: str
def save_addrmeta(address: str, meta: AddressMetadata):
if meta['type'] == 'Token':
meta: TokenDict
db.session.add(Token.load(meta))
elif meta['type'] == 'Pool':
meta: PoolDict
db.session.add(Pool.load(meta))
else:
log.warning(f'Address {address} had unknown metadata type {meta["type"]}')
address_metadata: BlockDict[str,AddressMetadata] = BlockDict('a', redis=True, db=True, savecb=save_addrmeta)

View File

@@ -1,121 +0,0 @@
from collections import defaultdict
from decimal import Decimal
from sqlalchemy.orm import Mapped
from web3 import Web3
from dexorder import config, Blockchain, NARG, FixedDecimals, ADDRESS_0, current_w3
from dexorder.base.account import current_account
from dexorder.blockchain import ByBlockchainDict
from dexorder.base.chain import current_chain
from dexorder.contract import ContractProxy, abis
import dexorder.database.column as col
class Token (ContractProxy, FixedDecimals):
chain: Mapped[col.Blockchain]
address: Mapped[col.Address]
decimals: Mapped[col.Uint8]
name: Mapped[str]
symbol: Mapped[str]
@staticmethod
def get(name_or_address:str, *, chain_id=None) -> 'Token':
try:
return tokens.get(name_or_address, default=NARG, chain_id=chain_id)
except KeyError:
try:
# noinspection PyTypeChecker
return Web3.to_checksum_address(name_or_address)
except ValueError:
raise ValueError(f'Could not resolve token {name_or_address} for chain {current_chain.get().chain_id}')
def __init__(self, chain_id, address, decimals, symbol, name, *, abi=None):
FixedDecimals.__init__(self, decimals)
if abi is None:
load = 'ERC20'
else:
load = None
abi = abis.get(abi,abi)
ContractProxy.__init__(self, address, load, abi=abi)
self.chain_id = chain_id
# noinspection PyTypeChecker
self.address = address
self.decimals = decimals
self.symbol = symbol
self.name = name
def balance(self, address: str = None) -> int:
if address is None:
address = current_account.get().address
return self.balanceOf(address)
def balance_dec(self, address: str = None) -> Decimal:
return self.dec(self.balance(address))
def __str__(self):
return self.symbol
def __repr__(self):
return f'{self.symbol}({self.address},{self.decimals})'
def __eq__(self, other):
return self.chain_id == other.chain_id and self.address == other.address
def __hash__(self):
return hash((self.chain_id,self.address))
class NativeToken (FixedDecimals):
""" Token-like but not a contract. """
@staticmethod
def get( chain_id = None) -> 'NativeToken':
if chain_id is None:
chain_id = current_chain.get().chain_id
return _native_tokens[chain_id]
def __init__(self, chain_id, decimals, symbol, name, *, wrapper_token = None):
self.chain_id = chain_id
self.address = ADDRESS_0 # todo i think there's actually an address? like 0x11 or something?
super().__init__(decimals)
self.symbol = symbol
self.name = name
self._wrapper_token = wrapper_token if wrapper_token is not None else _tokens_by_chain[chain_id]['W'+symbol]
def balance(self, address: str = None) -> int:
if address is None:
address = current_account.get()
assert current_chain.get().chain_id == self.chain_id
return current_w3.get().eth.get_balance(address)
def balance_dec(self, address: str = None) -> Decimal:
return self.dec(self.balance(address))
@property
def wrapper(self) -> Token:
return self._wrapper_token
def __repr__(self):
return self.symbol
# convert TokenConfigs to Tokens
_tokens_by_chain:dict[int,dict[str,Token]] = defaultdict(dict)
for _c in config.tokens:
_chain_id = Blockchain.get(_c.chain).chain_id
_tokens_by_chain[_chain_id][_c.symbol] = Token(_chain_id, _c.address, _c.decimals, _c.symbol, _c.name, abi=_c.abi)
_native_tokens: dict[int, NativeToken] = {
# Ethereum.chain_id: NativeToken(Ethereum.chain_id, 18, 'ETH', 'Ether'), # todo need WETH on Ethereum
# Polygon.chain_id: NativeToken(Polygon.chain_id, 18, 'MATIC', 'Polygon'),
}
for _chain_id, _native in _native_tokens.items():
# noinspection PyTypeChecker
_tokens_by_chain[_chain_id][_native.symbol] = _native
tokens = ByBlockchainDict[Token](_tokens_by_chain)

View File

@@ -41,13 +41,13 @@ async def main():
config.ohlc_dir = './ohlc'
ohlcs.dir = config.ohlc_dir
await blockchain.connect()
redis_state = None
state = None
if memcache:
await memcache.connect()
redis_state = RedisState([recent_ohlcs]) # NOTE: ONLY the ohlc's are pushed to Redis. We do not want to touch anything else.
if not memcache:
log.error('backfill requires a memcache server')
await memcache.connect()
redis_state = RedisState([recent_ohlcs]) # NOTE: ONLY the ohlc's are pushed to Redis. We do not want to touch anything else.
if db:
db.connect(url=config.datadb_url) # our main database is the data db
db.connect()
db_state = DbState(BlockData.by_opt('db'))
with db.session:
state = db_state.load()

View File

@@ -13,7 +13,6 @@ class Config:
ws_url: Optional[str] = 'ws://localhost:8545'
rpc_urls: Optional[dict[str,str]] = field(default_factory=dict)
db_url: Optional[str] = 'postgresql://dexorder:redroxed@localhost/dexorder'
datadb_url: Optional[str] = 'postgresql://dexorder:redroxed@localhost/dexorderdata'
ohlc_dir: Optional[str] = None # if empty string or None, then OHLC's are not saved to disk
dump_sql: bool = False
redis_url: Optional[str] = 'redis://localhost:6379'

View File

@@ -99,4 +99,3 @@ class Db:
raise Exception(f'{url} database version not found')
db = Db()
datadb = Db('datadb_url')

View File

@@ -5,3 +5,4 @@ from .series import SeriesSet, SeriesDict
from .transaction import Transaction, TransactionJob
from .orderindex import OrderIndex
from .pool import Pool
from .token import Token

View File

@@ -1,6 +1,5 @@
import logging
from sqlalchemy import SMALLINT
from sqlalchemy.orm import Mapped, mapped_column
from dexorder.database.column import Blockchain

View File

@@ -1,4 +1,5 @@
import logging
from typing import TypedDict
from sqlalchemy.orm import Mapped, mapped_column
@@ -8,12 +9,40 @@ from dexorder.database.model import Base
log = logging.getLogger(__name__)
class PoolDict (TypedDict):
type: str
chain: int
address: str
exchange: int
base: str
quote: str
fee: int
decimals: int
class Pool (Base):
__tablename__ = 'pool'
chain: Mapped[Blockchain] = mapped_column(primary_key=True)
address: Mapped[Address] = mapped_column(primary_key=True)
exchange: Mapped[Exchange]
base: Mapped[Address]
quote: Mapped[Address]
base: Mapped[Address] = mapped_column(index=True) # index for searching by token addr
quote: Mapped[Address] = mapped_column(index=True) # index for searching by token addr
fee: Mapped[int] # in millionths aka 100ths of a bip
decimals: Mapped[int]
@staticmethod
def load(pool_dict: PoolDict) -> 'Pool':
return Pool(chain=Blockchain.get(pool_dict['chain']), address=pool_dict['address'],
exchange=Exchange(pool_dict['exchange']),
base=pool_dict['base'], quote=pool_dict['quote'],
fee=pool_dict['fee'], decimals=pool_dict['decimals'])
def dump(self):
return PoolDict(chain=self.chain.chain_id, address=self.address,
exchange=self.exchange.value,
base=self.base.address, quote=self.quote,
fee=self.fee, decimals=self.decimals)

View File

@@ -0,0 +1,41 @@
import logging
from typing import TypedDict
from sqlalchemy.orm import Mapped, mapped_column
from dexorder.database.column import Address, Blockchain, Uint8
from dexorder.database.model import Base
log = logging.getLogger(__name__)
# TokenDict is the primary dict we use in-memory, with basic JSON-able types
class TokenDict (TypedDict):
type: str
chain: int
address: str
symbol: str
decimals: int
# the database object is primarily write-only so we are able to index queries for pools-by-token from the nodejs server
class Token (Base):
__tablename__ = 'token'
chain: Mapped[Blockchain] = mapped_column(primary_key=True)
address: Mapped[Address] = mapped_column(primary_key=True)
symbol: Mapped[str]
decimals: Mapped[Uint8]
@staticmethod
def load(token_dict: TokenDict) -> 'Token':
return Token(chain=Blockchain.get(token_dict['chain']), address=token_dict['address'],
symbol=token_dict['symbol'], decimals=token_dict['decimals'])
def dump(self):
return TokenDict(type='Token', chain=self.chain.chain_id, address=self.address,
symbol=self.symbol, decimals=self.decimals)

View File

@@ -10,10 +10,10 @@ from dexorder.base.chain import current_chain, current_clock, get_block_timestam
from dexorder.base.order import TrancheExecutionRequest, TrancheKey, ExecutionRequest, new_tranche_execution_request, OrderKey
from dexorder.ohlc import ohlcs, recent_ohlcs
from dexorder.transaction import submit_transaction_request
from dexorder.pools import uniswap_price, new_pool_prices, pool_prices, Pools
from dexorder.pools import uniswap_price, new_pool_prices, pool_prices, get_pool
from dexorder.contract.dexorder import vault_address, VaultContract
from dexorder.contract import ERC20
from dexorder.data import vault_owners, vault_balances
from dexorder.vault_blockdata import vault_owners, vault_balances
from dexorder.database.model.block import current_block
from dexorder.database.model.transaction import TransactionJob
from dexorder.base.orderlib import SwapOrderState, Exchange
@@ -158,7 +158,7 @@ async def handle_uniswap_swap_old(swap: EventData):
except KeyError:
return
addr = swap['address']
price: dec = await uniswap_price(await Pools.get(addr), sqrt_price)
price: dec = await uniswap_price(await get_pool(addr), sqrt_price)
pool_prices[addr] = price
@@ -168,11 +168,9 @@ async def handle_uniswap_swap(swap: EventData):
except KeyError:
return
addr = swap['address']
pool = await Pools.get(addr)
if pool is None:
return
if pool.exchange != Exchange.UniswapV3:
log.debug(f'Ignoring {pool.exchange} pool {addr}')
pool = await get_pool(addr)
if pool['exchange'] != Exchange.UniswapV3.value:
log.debug(f'Ignoring {Exchange(pool["exchange"])} pool {addr}')
return
price: dec = await uniswap_price(pool, sqrt_price)
timestamp = await get_block_timestamp(swap['blockHash'])

View File

@@ -115,5 +115,5 @@ async def publish_all(pubs: list[tuple[str,str,list[Any]]]):
r: Pipeline
io = Emitter(dict(client=r))
for room, event, args in pubs:
log.debug(f'publishing {room} {event} {args}')
# log.debug(f'publishing {room} {event} {args}')
io.To(room).Emit(event, *args)

View File

@@ -7,7 +7,7 @@ from dexorder import DELETE, db
from dexorder.base.chain import current_chain
from dexorder.base.order import OrderKey, TrancheKey
from dexorder.blockstate import BlockDict, BlockSet
from dexorder.data import vault_owners
from dexorder.vault_blockdata import vault_owners
from dexorder.database.model.orderindex import OrderIndex
from dexorder.base.orderlib import SwapOrderStatus, SwapOrderState

View File

@@ -11,7 +11,7 @@ from .orderstate import Order
from .. import dec
from ..base.chain import current_clock
from ..base.order import OrderKey, TrancheKey, ExecutionRequest
from ..pools import ensure_pool_price, Pools, pool_decimals, pool_prices
from ..pools import ensure_pool_price, pool_prices, get_pool
from ..routing import pool_address
log = logging.getLogger(__name__)
@@ -36,7 +36,7 @@ async def activate_order(order: Order):
Call this to enable triggers on an order which is already in the state.
"""
address = pool_address(order.status.order)
pool = await Pools.get(address)
pool = await get_pool(address)
await ensure_pool_price(pool)
triggers = OrderTriggers(order)
if triggers.closed:
@@ -161,14 +161,14 @@ class TrancheTrigger:
log.debug(f'price trigger ignored because trigger status is {self.status}')
return
log.debug(f'price trigger {cur}')
addr = pool_address(self.order.order)
pool = await get_pool(addr)
if cur is None and self.has_line_constraint:
await ensure_pool_price(self.order.pool_address)
cur = pool_prices[self.order.pool_address]
await ensure_pool_price(pool)
cur = pool_prices[addr]
if cur is not None:
if self.pool_price_multiplier is None:
pool = await Pools.get(pool_address(self.order.order))
pool_dec = await pool_decimals(pool)
self.pool_price_multiplier = dec(10) ** dec(-pool_dec)
self.pool_price_multiplier = dec(10) ** dec(-pool['decimals'])
log.debug(f'adjusted cur price from {cur} => {cur*self.pool_price_multiplier}')
cur *= self.pool_price_multiplier
if cur is None or not self.has_line_constraint or all(await asyncio.gather(

View File

@@ -1,55 +1,55 @@
import asyncio
import logging
from typing import Optional
from web3.exceptions import ContractLogicError
from dexorder import dec, db, ADDRESS_0
from dexorder import dec, ADDRESS_0
from dexorder.addrmeta import address_metadata
from dexorder.base.chain import current_chain
from dexorder.base.orderlib import Exchange
from dexorder.blockstate import BlockDict
from dexorder.blockstate.blockdata import K, V
from dexorder.contract.decimals import token_decimals
from dexorder.database.model.pool import Pool
from dexorder.database.model.pool import PoolDict
from dexorder.tokens import get_token
from dexorder.uniswap import UniswapV3Pool, uniswapV3_pool_address
log = logging.getLogger(__name__)
class Pools:
instances: dict[tuple[int,str], Pool] = {}
@staticmethod
async def get(address, *, chain=None) -> Optional[Pool]:
if not chain:
chain = current_chain.get()
key = (chain, address)
found = Pools.instances.get(key)
if not found:
found = db.session.get(Pool, key)
if not found:
# todo other exchanges
try:
v3 = UniswapV3Pool(address)
t0, t1, fee = await asyncio.gather(v3.token0(), v3.token1(), v3.fee())
if uniswapV3_pool_address(t0, t1, fee) == address: # VALIDATE don't just trust
log.debug(f'new UniswapV3 pool at {address}')
found = Pool(chain=chain, address=address, exchange=Exchange.UniswapV3, base=t0, quote=t1, fee=fee)
db.session.add(found)
else: # NOT a genuine Uniswap V3 pool if the address test doesn't pass
log.debug(f'new Unknown pool at {address}')
found = Pool(chain=chain, address=address, exchange=Exchange.Unknown, base=ADDRESS_0, quote=ADDRESS_0, fee=0)
except ContractLogicError:
log.debug(f'new Unknown pool at {address}')
found = Pool(chain=chain, address=address, exchange=Exchange.Unknown, base=ADDRESS_0, quote=ADDRESS_0, fee=0)
except ValueError as v:
if v.args[0].get('code') == -32000:
log.debug(f'new Unknown pool at {address}')
found = Pool(chain=chain, address=address, exchange=Exchange.Unknown, base=ADDRESS_0, quote=ADDRESS_0, fee=0)
else:
raise
db.session.add(found)
Pools.instances[key] = found
return None if found.exchange == Exchange.Unknown else found
async def get_pool(address: str) -> PoolDict:
try:
return address_metadata[address]
except KeyError:
result = address_metadata[address] = await load_pool(address)
return result
async def load_pool(address: str) -> PoolDict:
found = None
chain_id = current_chain.get().chain_id
# todo other exchanges
try:
v3 = UniswapV3Pool(address)
t0, t1, fee = await asyncio.gather(v3.token0(), v3.token1(), v3.fee())
if uniswapV3_pool_address(t0, t1, fee) == address: # VALIDATE don't just trust that it's a Uniswap pool
log.debug(f'new UniswapV3 pool at {address}')
token0, token1 = await asyncio.gather(get_token(t0), get_token(t1))
decimals = token0['decimals'] - token1['decimals']
found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.UniswapV3.value,
base=t0, quote=t1, fee=fee, decimals=decimals)
else: # NOT a genuine Uniswap V3 pool if the address test doesn't pass
log.debug(f'new Unknown pool at {address}')
except ContractLogicError:
log.debug(f'new Unknown pool at {address}')
except ValueError as v:
if v.args[0].get('code') == -32000:
log.debug(f'new Unknown pool at {address}')
else:
raise
if found is None:
found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.Unknown.value,
base=ADDRESS_0, quote=ADDRESS_0, fee=0, decimals=0)
return found
class PoolPrices (BlockDict[str, dec]):
@@ -67,38 +67,22 @@ new_pool_prices: dict[str, dec] = {} # tracks which prices were set during the c
pool_prices: PoolPrices = PoolPrices('p', db=True, redis=True, pub=pub_pool_price, value2str=lambda d: f'{d:f}', str2value=dec)
async def uniswap_price(pool, sqrt_price=None) -> dec:
async def uniswap_price(pool: PoolDict, sqrt_price=None) -> dec:
if sqrt_price is None:
sqrt_price = (await UniswapV3Pool(pool.address).slot0())[0]
pool_dec = await pool_decimals(pool)
sqrt_price = (await UniswapV3Pool(pool['address']).slot0())[0]
pool_dec = pool['decimals']
price = dec(sqrt_price*sqrt_price) / 2 ** (96 * 2)
result = price * dec(10) ** dec(pool_dec)
log.debug(f'pool sqrtX96 {sqrt_price} with {pool_dec} decimals = {result}')
return result
async def ensure_pool_price(pool):
# todo refactor to accept a Pool and switch on exchange type
if pool not in pool_prices:
log.debug(f'querying price for pool {pool.address}')
if pool.exchange == Exchange.UniswapV3:
pool_prices[pool.address] = await uniswap_price(pool)
async def ensure_pool_price(pool: PoolDict):
addr = pool['address']
if addr not in pool_prices:
log.debug(f'querying price for pool {addr}')
if pool['exchange'] == Exchange.UniswapV3.value:
pool_prices[addr] = await uniswap_price(pool)
else:
raise ValueError
_pool_decimals = {}
async def pool_decimals(pool):
if pool.exchange != Exchange.UniswapV3:
raise ValueError
found = _pool_decimals.get(pool)
if found is None:
key = f'pd|{pool.address}'
try:
found = db.kv[key]
except KeyError:
decimals0 = await token_decimals(pool.base)
decimals1 = await token_decimals(pool.quote)
found = _pool_decimals[pool] = db.kv[key] = decimals0 - decimals1
return found
raise ValueError(f'Unsupported exchange type {pool["exchange"]}')
# todo other exchanges

View File

@@ -19,7 +19,6 @@ from dexorder.database.model import Block
from dexorder.database.model.block import current_block, latest_block
from dexorder.util import hexstr, topic
from dexorder.util.async_util import maywait, Maywaitable
from dexorder.util.shutdown import fatal
log = logging.getLogger(__name__)
@@ -343,7 +342,9 @@ class BlockStateRunner:
except ValueError as e:
if e.args[0].get('code') == -32602:
# too many logs were returned in the batch, so decrease the batch size.
fatal(f'Decrease batch size for {chain}')
# fatal(f'Decrease batch size for {chain}')
log.warning(f'Decrease batch size for {chain}')
return
raise
for log_event in log_events:
try:

33
src/dexorder/tokens.py Normal file
View File

@@ -0,0 +1,33 @@
import logging
from eth_abi.exceptions import InsufficientDataBytes
from web3.exceptions import ContractLogicError, BadFunctionCallOutput
from dexorder.addrmeta import address_metadata
from dexorder.base.chain import current_chain
from dexorder.contract import ERC20
from dexorder.database.model.token import TokenDict
log = logging.getLogger(__name__)
async def get_token(address):
try:
return address_metadata[address]
except KeyError:
result = address_metadata[address] = await load_token(address)
return result
async def load_token(address: str) -> TokenDict:
contract = ERC20(address)
symbolProm = contract.symbol()
try:
decimals = await contract.decimals()
except (InsufficientDataBytes, ContractLogicError, BadFunctionCallOutput):
log.warning(f'token {address} has no decimals()')
decimals = 0
symbol = await symbolProm
log.debug(f'new token {symbol} {address}')
return TokenDict(type='Token', chain=current_chain.get().chain_id, address=address,
symbol=symbol, decimals=decimals)