mirror polling delay is per pool not for all pools

This commit is contained in:
Tim
2024-04-03 19:03:02 -04:00
parent 1eacc7de4e
commit e3ed893d81
12 changed files with 122 additions and 157 deletions

View File

@@ -6,3 +6,20 @@ redis_url='redis://redis:6379'
[deployments] [deployments]
1337='alpha' 1337='alpha'
mirror_pools=[
'0x2f5e87C9312fa29aed5c179E456625D79015299c', # WBTC/WETH 0.05%
'0x0d94947374cbc779a0FB4D1bfF795C0Af6Dfae25', # USDC/UNI 1.00%
'0x689C96ceAb93f5E131631D225D75DeA3fD37747E', # WBTC/ARB 0.30%
'0x0E4831319A50228B9e450861297aB92dee15B44F', # WBTC/USDC 0.05%
'0x2038eEAa7100E08739352a37Ed67852E8529E8ED', # ARB/UNI 1.00%
'0x468b88941e7Cc0B88c1869d68ab6b570bCEF62Ff', # WETH/LINK 0.30%
'0xC24f7d8E51A64dc1238880BD00bb961D54cbeb29', # WETH/UNI 0.30%
'0xbBe36e6f0331C6a36AB44Bc8421E28E1a1871C1e', # USDC/LINK 0.30%
'0xC6962004f452bE9203591991D15f6b388e09E8D0', # WETH/USDC 0.05%
'0xa79fD76cA2b24631Ec3151f10c0660a30Bc946E7', # WBTC/LINK 0.30%
'0xb0f6cA40411360c03d41C5fFc5F179b8403CdcF8', # ARB/USDC 0.05%
'0xC6F780497A95e246EB9449f5e4770916DCd6396A', # WETH/ARB 0.05%
'0x8b6149aF984140BD3F8e158CcDCD05984a4ad0f5', # ARB/LINK 0.30%
'0xEd701Ba0cec723d85B7d96c80C21148E49D2Bf05', # LINK/UNI 1.00%
]

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -4,8 +4,8 @@ from typing import TypedDict
from dexorder import db from dexorder import db
from dexorder.blockstate import BlockDict from dexorder.blockstate import BlockDict
from dexorder.database.model import Pool from dexorder.database.model import Pool
from dexorder.database.model.pool import PoolDict from dexorder.database.model.pool import OldPoolDict
from dexorder.database.model.token import Token, TokenDict from dexorder.database.model.token import Token, OldTokenDict
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -19,10 +19,10 @@ class AddressMetadata (TypedDict):
def save_addrmeta(address: str, meta: AddressMetadata): def save_addrmeta(address: str, meta: AddressMetadata):
if meta['type'] == 'Token': if meta['type'] == 'Token':
meta: TokenDict meta: OldTokenDict
db.session.add(Token.load(meta)) db.session.add(Token.load(meta))
elif meta['type'] == 'Pool': elif meta['type'] == 'Pool':
meta: PoolDict meta: OldPoolDict
db.session.add(Pool.load(meta)) db.session.add(Pool.load(meta))
else: else:
log.warning(f'Address {address} had unknown metadata type {meta["type"]}') log.warning(f'Address {address} had unknown metadata type {meta["type"]}')

View File

@@ -7,8 +7,6 @@ import logging
import sys import sys
from dexorder import config, blockchain from dexorder import config, blockchain
from dexorder.blockstate import current_blockstate
from dexorder.blockstate.state import FinalizedBlockState
from dexorder.configuration import parse_args from dexorder.configuration import parse_args
from dexorder.metadata import generate_metadata, get_metadata from dexorder.metadata import generate_metadata, get_metadata
from dexorder.pools import get_pool from dexorder.pools import get_pool
@@ -20,7 +18,6 @@ async def main():
logging.basicConfig(level=logging.INFO, stream=sys.stderr) logging.basicConfig(level=logging.INFO, stream=sys.stderr)
logging.getLogger('dexorder').setLevel(logging.DEBUG) logging.getLogger('dexorder').setLevel(logging.DEBUG)
parse_args() parse_args()
current_blockstate.set(FinalizedBlockState())
await blockchain.connect() await blockchain.connect()
if not config.metadata: if not config.metadata:
log.error("Must configure a metadata file") log.error("Must configure a metadata file")

View File

@@ -13,6 +13,7 @@ polling = [seconds between price updates. If zero or negative, prices are update
import asyncio import asyncio
import itertools
import logging import logging
import os import os
from datetime import timedelta from datetime import timedelta
@@ -110,12 +111,6 @@ async def write_metadata( pools, mirror_pools ):
last_prices = {} last_prices = {}
async def complete_update(_mirrorenv, pool, price, tx):
await tx.wait()
last_prices[pool] = price
log.debug(f'Mirrored {pool} {price}')
async def main(): async def main():
init_generating_metadata() init_generating_metadata()
if config.metadata is None: if config.metadata is None:
@@ -199,22 +194,29 @@ async def main():
log.info(f'Updating pools every {delay} seconds') log.info(f'Updating pools every {delay} seconds')
delay = timedelta(seconds=delay) delay = timedelta(seconds=delay)
to_update = pools to_update = pools
pool_iter = iter(to_update)
pool = next(pool_iter)
while True: while True:
wake_up = now() + delay wake_up = now() + delay
prices = await asyncio.gather(*[get_pool_price(pool) for pool in to_update]) # log.debug(f'querying {pool}')
updates = [(pool, price, await mirrorenv.transact.updatePool(pool, price, gas=LOTSA_GAS)) price = await get_pool_price(pool)
for pool, price in zip(to_update, prices) if price != last_prices.get(pool)] if price != last_prices.get(pool):
results = await asyncio.gather(*[complete_update(mirrorenv, pool, price, tx) try:
for (pool,price,tx) in updates], return_exceptions=True) tx = await mirrorenv.transact.updatePool(pool, price, gas=LOTSA_GAS)
failed = [] await tx.wait()
for result, pool, price in zip(results,to_update,prices): last_prices[pool] = price
if isinstance(result, Exception): log.debug(f'Mirrored {pool} {price}')
log.debug(f'Could not update {pool}: {result}') except Exception:
failed.append(pool) log.debug(f'Could not update {pool}')
if update_once and not failed: continue
log.info('mirror completed') try:
break pool = next(pool_iter)
to_update = failed if failed else pools except StopIteration:
if update_once:
log.info('mirror completed')
break
pool_iter = iter(to_update)
pool = next(pool_iter)
sleep = (wake_up - now()).total_seconds() sleep = (wake_up - now()).total_seconds()
if sleep > 0: if sleep > 0:
await asyncio.sleep(sleep) await asyncio.sleep(sleep)

View File

@@ -372,4 +372,4 @@ class FinalizedBlockState:
del self.data[series] del self.data[series]
current_blockstate = ContextVar[BlockState]('current_blockstate') current_blockstate = ContextVar[BlockState]('current_blockstate', default=FinalizedBlockState())

View File

@@ -1,5 +1,5 @@
import logging import logging
from typing import TypedDict from typing import TypedDict, Optional
from sqlalchemy.orm import Mapped, mapped_column from sqlalchemy.orm import Mapped, mapped_column
@@ -11,6 +11,16 @@ log = logging.getLogger(__name__)
class PoolDict (TypedDict): class PoolDict (TypedDict):
a: str
e: int
b: str
q: str
f: int
d: int
x: Optional[dict]
class OldPoolDict (TypedDict):
type: str type: str
chain: int chain: int
address: str address: str
@@ -34,7 +44,7 @@ class Pool (Base):
@staticmethod @staticmethod
def load(pool_dict: PoolDict) -> 'Pool': def load(pool_dict: OldPoolDict) -> 'Pool':
return Pool(chain=Blockchain.get(pool_dict['chain']), address=pool_dict['address'], return Pool(chain=Blockchain.get(pool_dict['chain']), address=pool_dict['address'],
exchange=Exchange(pool_dict['exchange']), exchange=Exchange(pool_dict['exchange']),
base=pool_dict['base'], quote=pool_dict['quote'], base=pool_dict['base'], quote=pool_dict['quote'],
@@ -42,8 +52,8 @@ class Pool (Base):
def dump(self): def dump(self):
return PoolDict(type='Pool', return OldPoolDict(type='Pool',
chain=self.chain.chain_id, address=self.address, chain=self.chain.chain_id, address=self.address,
exchange=self.exchange.value, exchange=self.exchange.value,
base=self.base.address, quote=self.quote, base=self.base.address, quote=self.quote,
fee=self.fee, decimals=self.decimals) fee=self.fee, decimals=self.decimals)

View File

@@ -1,5 +1,5 @@
import logging import logging
from typing import TypedDict from typing import TypedDict, Optional
from sqlalchemy.orm import Mapped, mapped_column from sqlalchemy.orm import Mapped, mapped_column
@@ -8,9 +8,19 @@ from dexorder.database.model import Base
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
# TokenDict is the primary dict we use in-memory, with basic JSON-able types
class TokenDict (TypedDict): class TokenDict (TypedDict):
a: str
n: str
s: str
d: int
w: Optional[bool] # approved ("w"hitelisted)
x: Optional[dict] # extra data
# OldTokenDict is the primary dict we use in-memory, with basic JSON-able types
class OldTokenDict (TypedDict):
type: str type: str
chain: int chain: int
address: str address: str
@@ -18,6 +28,7 @@ class TokenDict (TypedDict):
symbol: str symbol: str
decimals: int decimals: int
approved: bool # whether this token is in the whitelist or not approved: bool # whether this token is in the whitelist or not
x: Optional[dict] # extra data
# the database object is primarily write-only so we are able to index queries for pools-by-token from the nodejs server # the database object is primarily write-only so we are able to index queries for pools-by-token from the nodejs server
@@ -34,13 +45,13 @@ class Token (Base):
@staticmethod @staticmethod
def load(token_dict: TokenDict) -> 'Token': def load(token_dict: OldTokenDict) -> 'Token':
return Token(chain=Blockchain.get(token_dict['chain']), address=token_dict['address'], return Token(chain=Blockchain.get(token_dict['chain']), address=token_dict['address'],
name=token_dict['name'], symbol=token_dict['symbol'], decimals=token_dict['decimals'], name=token_dict['name'], symbol=token_dict['symbol'], decimals=token_dict['decimals'],
approved=token_dict['approved']) approved=token_dict['approved'])
def dump(self): def dump(self):
return TokenDict(type='Token', chain=self.chain.chain_id, address=self.address, return OldTokenDict(type='Token', chain=self.chain.chain_id, address=self.address,
name=self.name, symbol=self.symbol, decimals=self.decimals, approved=self.approved) name=self.name, symbol=self.symbol, decimals=self.decimals, approved=self.approved)

View File

@@ -28,8 +28,8 @@ from typing import Union, Iterable
from dexorder import config, NARG from dexorder import config, NARG
from dexorder.base.chain import current_chain from dexorder.base.chain import current_chain
from dexorder.database.model import Token, Pool from dexorder.database.model import Token, Pool
from dexorder.database.model.pool import PoolDict from dexorder.database.model.pool import OldPoolDict, PoolDict
from dexorder.database.model.token import TokenDict from dexorder.database.model.token import OldTokenDict, TokenDict
from dexorder.util import json from dexorder.util import json
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -61,14 +61,22 @@ def dump_tokens(out, tokens):
s = token.symbol, s = token.symbol,
d = token.decimals, d = token.decimals,
x = None x = None
else: elif 'a' in token:
token: TokenDict token: TokenDict
a = token['a']
n = token['n']
s = token['s']
d = token['d']
x = token.get('x')
elif 'address' in token:
token: OldTokenDict
a = token['address'] a = token['address']
n = token['name'] n = token['name']
s = token['symbol'] s = token['symbol']
d = token['decimals'] d = token['decimals']
# noinspection PyTypedDict x = token.get('x')
x = None if 'x' not in token else token['x'] else:
raise ValueError('Unknown token type', token)
token_map[a] = token token_map[a] = token
data = dict(a=a, n=n, s=s, d=d) data = dict(a=a, n=n, s=s, d=d)
if x is not None: if x is not None:
@@ -91,8 +99,18 @@ def dump_pools(out, pools):
e = pool.exchange.value e = pool.exchange.value
d = pool.decimals d = pool.decimals
x = None x = None
else: elif 'a' in pool:
pool: PoolDict pool: PoolDict
a = pool['a']
b = pool['b']
q = pool['q']
f = pool['f']
e = pool['e']
d = pool['d']
# noinspection PyTypedDict
x = pool.get('x')
elif 'address' in pool:
pool: OldPoolDict
a = pool['address'] a = pool['address']
b = pool['base'] b = pool['base']
q = pool['quote'] q = pool['quote']
@@ -100,7 +118,9 @@ def dump_pools(out, pools):
e = pool['exchange'] e = pool['exchange']
d = pool['decimals'] d = pool['decimals']
# noinspection PyTypedDict # noinspection PyTypedDict
x = None if 'x' not in pool else pool['x'] x = pool.get('x')
else:
raise ValueError(f'Unknown pool value', pool)
data = dict(a=a, b=b, q=q, f=f, e=e, d=d) data = dict(a=a, b=b, q=q, f=f, e=e, d=d)
if x is not None: if x is not None:
data['x'] = x data['x'] = x
@@ -121,7 +141,7 @@ def is_generating_metadata():
# noinspection PyShadowingNames # noinspection PyShadowingNames
def generate_metadata(tokens: Iterable[Union[Token, TokenDict]], pools: Iterable[Union[Pool, PoolDict]], def generate_metadata(tokens: Iterable[Union[Token, OldTokenDict]], pools: Iterable[Union[Pool, OldPoolDict]],
file=sys.stdout): file=sys.stdout):
dump(file, '{"' + str(current_chain.get().id) + '":{"t":[') dump(file, '{"' + str(current_chain.get().id) + '":{"t":[')
dump_tokens(file, tokens) dump_tokens(file, tokens)

View File

@@ -13,7 +13,7 @@ from dexorder.base.orderlib import Exchange
from dexorder.blockstate import BlockDict from dexorder.blockstate import BlockDict
from dexorder.blockstate.blockdata import K, V from dexorder.blockstate.blockdata import K, V
from dexorder.blocks import get_block_timestamp from dexorder.blocks import get_block_timestamp
from dexorder.database.model.pool import PoolDict from dexorder.database.model.pool import OldPoolDict
from dexorder.metadata import is_generating_metadata from dexorder.metadata import is_generating_metadata
from dexorder.tokens import get_token from dexorder.tokens import get_token
from dexorder.uniswap import UniswapV3Pool, uniswapV3_pool_address from dexorder.uniswap import UniswapV3Pool, uniswapV3_pool_address
@@ -21,7 +21,7 @@ from dexorder.uniswap import UniswapV3Pool, uniswapV3_pool_address
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
async def get_pool(address: str) -> PoolDict: async def get_pool(address: str) -> OldPoolDict:
try: try:
return address_metadata[address] return address_metadata[address]
except KeyError: except KeyError:
@@ -29,7 +29,7 @@ async def get_pool(address: str) -> PoolDict:
return result return result
async def load_pool(address: str) -> PoolDict: async def load_pool(address: str) -> OldPoolDict:
found = None found = None
chain_id = current_chain.get().id chain_id = current_chain.get().id
# todo other exchanges # todo other exchanges
@@ -42,8 +42,8 @@ async def load_pool(address: str) -> PoolDict:
fee = await v3.fee() fee = await v3.fee()
if uniswapV3_pool_address(t0, t1, fee) == address: # VALIDATE don't just trust that it's a Uniswap pool if uniswapV3_pool_address(t0, t1, fee) == address: # VALIDATE don't just trust that it's a Uniswap pool
decimals = token0['decimals'] - token1['decimals'] decimals = token0['decimals'] - token1['decimals']
found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.UniswapV3.value, found = OldPoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.UniswapV3.value,
base=t0, quote=t1, fee=fee, decimals=decimals) base=t0, quote=t1, fee=fee, decimals=decimals)
log.debug(f'new UniswapV3 pool {token0["symbol"]}/{token1["symbol"]} {fee/1_000_000:.2%} ' log.debug(f'new UniswapV3 pool {token0["symbol"]}/{token1["symbol"]} {fee/1_000_000:.2%} '
f'{("."+str(decimals)) if decimals >= 0 else (str(-decimals)+".")} {address}') f'{("."+str(decimals)) if decimals >= 0 else (str(-decimals)+".")} {address}')
except ContractLogicError: except ContractLogicError:
@@ -60,8 +60,8 @@ async def load_pool(address: str) -> PoolDict:
raise v raise v
if found is None: if found is None:
log.debug(f'new Unknown pool at {address}') log.debug(f'new Unknown pool at {address}')
found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.Unknown.value, found = OldPoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.Unknown.value,
base=ADDRESS_0, quote=ADDRESS_0, fee=0, decimals=0) base=ADDRESS_0, quote=ADDRESS_0, fee=0, decimals=0)
return found return found
@@ -80,7 +80,7 @@ new_pool_prices: dict[str, dec] = {} # tracks which prices were set during the c
pool_prices: PoolPrices = PoolPrices('p', db=True, redis=True, pub=pub_pool_price, value2str=lambda d: f'{d:f}', str2value=dec) pool_prices: PoolPrices = PoolPrices('p', db=True, redis=True, pub=pub_pool_price, value2str=lambda d: f'{d:f}', str2value=dec)
async def uniswap_price(pool: PoolDict, sqrt_price=None) -> dec: async def uniswap_price(pool: OldPoolDict, sqrt_price=None) -> dec:
if sqrt_price is None: if sqrt_price is None:
sqrt_price = (await UniswapV3Pool(pool['address']).slot0())[0] sqrt_price = (await UniswapV3Pool(pool['address']).slot0())[0]
pool_dec = pool['decimals'] pool_dec = pool['decimals']
@@ -90,7 +90,7 @@ async def uniswap_price(pool: PoolDict, sqrt_price=None) -> dec:
return result return result
async def ensure_pool_price(pool: PoolDict): async def ensure_pool_price(pool: OldPoolDict):
addr = pool['address'] addr = pool['address']
if addr not in pool_prices: if addr not in pool_prices:
log.debug(f'querying price for pool {addr}') log.debug(f'querying price for pool {addr}')
@@ -101,7 +101,7 @@ async def ensure_pool_price(pool: PoolDict):
# todo other exchanges # todo other exchanges
async def get_uniswap_data(swap: EventData) -> Optional[tuple[PoolDict, datetime, dec]]: async def get_uniswap_data(swap: EventData) -> Optional[tuple[OldPoolDict, datetime, dec]]:
try: try:
sqrt_price = swap['args']['sqrtPriceX96'] sqrt_price = swap['args']['sqrtPriceX96']
except KeyError: except KeyError:

View File

@@ -8,14 +8,14 @@ from dexorder import ADDRESS_0, config
from dexorder.addrmeta import address_metadata from dexorder.addrmeta import address_metadata
from dexorder.base.chain import current_chain from dexorder.base.chain import current_chain
from dexorder.contract import ERC20, ContractProxy from dexorder.contract import ERC20, ContractProxy
from dexorder.database.model.token import TokenDict from dexorder.database.model.token import OldTokenDict
from dexorder.metadata import get_metadata from dexorder.metadata import get_metadata
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
# todo needs chain_id # todo needs chain_id
async def get_token(address) -> Optional[TokenDict]: async def get_token(address) -> Optional[OldTokenDict]:
if address == ADDRESS_0: if address == ADDRESS_0:
raise ValueError('No token at address 0') raise ValueError('No token at address 0')
try: try:
@@ -25,7 +25,7 @@ async def get_token(address) -> Optional[TokenDict]:
return result return result
async def load_token(address: str) -> Optional[TokenDict]: async def load_token(address: str) -> Optional[OldTokenDict]:
contract = ERC20(address) contract = ERC20(address)
async def get_string_or_bytes32(func_name): async def get_string_or_bytes32(func_name):
@@ -56,8 +56,8 @@ async def load_token(address: str) -> Optional[TokenDict]:
chain_id = current_chain.get().id chain_id = current_chain.get().id
symbol = await symbol_prom symbol = await symbol_prom
name = await name_prom name = await name_prom
td = TokenDict(type='Token', chain=chain_id, address=address, td = OldTokenDict(type='Token', chain=chain_id, address=address,
name=name, symbol=symbol, decimals=decimals, approved=approved) name=name, symbol=symbol, decimals=decimals, approved=approved)
md = get_metadata(address, chain_id=chain_id) md = get_metadata(address, chain_id=chain_id)
if md is not None: if md is not None:
td['approved'] = True td['approved'] = True