backend metadata; approved tokens; logging.toml;

This commit is contained in:
Tim
2024-03-18 18:06:16 -04:00
parent 94839ce37a
commit 91dee5c030
19 changed files with 375 additions and 75 deletions

1
.gitignore vendored
View File

@@ -2,6 +2,7 @@ venv
*secret*
!/.secret-mock.toml
dexorder.toml
logging.toml
/contract
__pycache__
.idea

View File

@@ -80,6 +80,7 @@ def upgrade() -> None:
sa.Column('name', sa.String(), nullable=False),
sa.Column('symbol', sa.String(), nullable=False),
sa.Column('decimals', sa.SMALLINT(), nullable=False),
sa.Column('approved', sa.Boolean(), nullable=False),
sa.PrimaryKeyConstraint('chain', 'address')
)
op.create_table('pool',

27
logging-default.toml Normal file
View File

@@ -0,0 +1,27 @@
# https://docs.python.org/3/library/logging.config.html#logging-config-dictschema
version=1
[loggers.'']
level='INFO'
handlers=['console',]
[loggers.dexorder]
level='DEBUG'
[handlers.console]
class='logging.StreamHandler'
formatter='default'
level='INFO'
stream='ext://sys.stdout'
[formatters.default]
# https://docs.python.org/3/library/logging.html#logrecord-attributes
format='%(asctime)s %(name)s %(message)s'
# https://docs.python.org/3/library/time.html#time.strftime
datefmt='%Y-%m-%d %H:%M:%S'
[formatters.notime]
# https://docs.python.org/3/library/logging.html#logrecord-attributes
format='%(name)s %(message)s'
# https://docs.python.org/3/library/time.html#time.strftime
datefmt='%Y-%m-%d %H:%M:%S'

92
metadata-finaldata.json Normal file
View File

@@ -0,0 +1,92 @@
{
"42161": {
"t": [
{
"a": "0x2653aB60a6fD19d3e1f4E729412C219257CA6e24",
"n": "USD Coin (Arbitrum Native)",
"s": "USDC",
"d": 6
},
{
"a": "0x87b60276434d5cB5CE46d2316B596c7Bc5f87cCA",
"n": "Wrapped Ether",
"s": "WETH",
"d": 18
},
{
"a": "0xEaD55ce1cC577C8020F5FcE39D4e5C643F591a0c",
"n": "USD Coin (Ethereum Bridged)",
"s": "USDC.e",
"d": 6
},
{
"a": "0x2f2a2543B76A4166549F7aaB2e75Bef0aefC5B0f",
"s": "WBTC"
},
{
"a": "0x912CE59144191C1204E64559FE8253a0e49E6548",
"s": "ARB"
},
{
"a": "0x9623063377AD1B27544C965cCd7342f7EA7e88C7",
"s": "GRT"
},
{
"a": "0xFd086bC7CD5C481DCC9C85ebE478A1C0b69FCbb9",
"s": "USDT"
},
{
"a": "0xDA10009cBd5D07dd0CeCc66161FC93D7c9000da1",
"s": "DAI"
},
{
"a": "0xf97f4df75117a78c1A5a0DBb814Af92458539FB4",
"s": "LINK"
},
{
"a": "0xFa7F8980b0f1E64A2062791cc3b0871572f1F7f0",
"s": "UNI"
},
{
"a": "0xba5DdD1f9d7F570dc94a51479a000E3BCE967196",
"s": "AAVE"
},
{
"a": "0x371c7ec6D8039ff7933a2AA28EB827Ffe1F52f07",
"s": "JOE"
},
{
"a": "0xfc5A1A6EB076a2C7aD06eD22C90d7E710E35ad0a",
"s": "GMX"
},
{
"a": "0x11cDb42B0EB46D95f990BeDD4695A6e3fA034978",
"s": "CRV"
},
{
"a": "0x13Ad51ed4F1B7e9Dc168d8a00cB3f4dDD85EfA60",
"s": "LDO"
},
{
"a": "0xba0Dda8762C24dA9487f5FA026a9B64b695A07Ea",
"s": "OX"
},
{
"a": "0x561877b6b3DD7651313794e5F2894B2F18bE0766",
"s": "MATIC"
},
{
"a": "0x5979D7b546E38E414F7E9822514be443A4800529",
"s": "wstETH"
},
{
"a": "0x539bdE0d7Dbd336b79148AA742883198BBF60342",
"s": "MAGIC"
},
{
"a": "0x0c880f6761F1af8d9Aa9C466984b80DAb9a8c9e8",
"s": "PENDLE"
}
]
}
}

View File

@@ -1,10 +1,14 @@
import logging
import logging.config
import tomllib
from asyncio import CancelledError
from traceback import print_exception
import asyncio
from signal import Signals
from typing import Coroutine
import sys
from dexorder import configuration
if __name__ == '__main__':
@@ -30,7 +34,22 @@ async def _shutdown_coro(_sig, loop, extra_shutdown):
print_exception(x)
def execute(main:Coroutine, shutdown=None, parse_args=True):
def execute(main:Coroutine, shutdown=None, *, parse_logging=True, parse_args=True):
configured = False
if parse_logging:
try:
with open('logging.toml', 'rb') as file:
dictconf = tomllib.load(file)
except FileNotFoundError:
pass
else:
logging.config.dictConfig(dictconf)
log.info('Logging configured from logging.toml')
configured = True
if not configured:
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
log.setLevel(logging.DEBUG)
log.info('Logging configured to default')
if parse_args:
configuration.parse_args()
loop = asyncio.get_event_loop()

View File

@@ -13,7 +13,7 @@ from dexorder.blocktime import get_block_timestamp
from dexorder.configuration import parse_args
from dexorder.contract import get_contract_event
from dexorder.database.model.block import current_block, latest_block
from dexorder.ohlc import LightOHLCRepository
from dexorder.ohlc import FinalOHLCRepository
from dexorder.pools import get_uniswap_data
from dexorder.util import hexstr
from dexorder.util.shutdown import fatal
@@ -22,7 +22,7 @@ from dexorder.walker import BlockWalker
log = logging.getLogger('dexorder')
ohlcs = LightOHLCRepository()
ohlcs = FinalOHLCRepository()
async def handle_backfill_uniswap_swaps(swaps: list[EventData]):

View File

@@ -74,9 +74,6 @@ def setup_logevent_triggers(runner):
# noinspection DuplicatedCode
async def main():
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
log.setLevel(logging.DEBUG)
parse_args()
await blockchain.connect()
redis_state = None
state = None

View File

@@ -1,7 +1,6 @@
import asyncio
import logging
import os
import sys
from dexorder import config, blockchain, current_w3
from dexorder.bin.executable import execute
@@ -9,7 +8,7 @@ from dexorder.blockchain.connection import create_w3
from dexorder.blockstate import current_blockstate
from dexorder.blockstate.state import FinalizedBlockState
from dexorder.contract import get_deployment_address, ContractProxy, ERC20
from dexorder.metadata import generate_metadata
from dexorder.metadata import generate_metadata, init_generating_metadata
from dexorder.pools import get_pool
from dexorder.tokens import get_token
from dexorder.uniswap import UniswapV3Pool
@@ -71,7 +70,7 @@ async def get_pool_info( pool ):
return [pool, t0, t1, fee, price, amount0, amount1]
async def write_metadata( pools, mirror_pools ):
filename = config.mirror_metadata
filename = config.metadata
if filename is None:
return
pool_dicts = [get_pool(addr) for (addr,_inverted) in mirror_pools]
@@ -92,6 +91,10 @@ async def await_mirror(tx, pool_addr, mirror_addr, mirror_inverted ):
async def main():
init_generating_metadata()
if config.metadata is None:
log.error('Must configure metadata (filename to write)')
return
if config.mirror_source_rpc_url is None:
log.error('Must configure mirror_source_rpc_url')
return
@@ -139,6 +142,4 @@ async def main():
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
log.setLevel(logging.DEBUG)
execute(main())

View File

@@ -0,0 +1,29 @@
# Prints a JSON string to stdout containing metadata information for all the known tokens and pools
#
# see metadata.py
import logging
import sys
from sqlalchemy import select
from dexorder import db
from dexorder.configuration import parse_args
from dexorder.database.model import Pool, Token
from dexorder.metadata import generate_metadata
log = logging.getLogger(__name__)
def main():
logging.basicConfig(level=logging.INFO, stream=sys.stderr)
log.setLevel(logging.DEBUG)
parse_args()
db.connect(migrate=False)
tokens = db.session.scalars(select(Token))
pools = db.session.scalars(select(Pool))
generate_metadata(tokens, pools)
if __name__ == '__main__':
main()

View File

@@ -34,7 +34,6 @@ async def create_w3(rpc_url=None, account=NARG, autosign=False):
# chain_id = self.w3s[0].eth.chain_id
# assert all(w3.eth.chain_id == chain_id for w3 in self.w3s) # all rpc urls must be the same blockchain
# self.w3iter = itertools.cycle(self.w3s)
url = resolve_rpc_url(rpc_url)
w3 = AsyncWeb3(RetryHTTPProvider(url))
# w3.middleware_onion.inject(geth_poa_middleware, layer=0) # todo is this line needed?
@@ -114,13 +113,12 @@ def _make_contract(w3_eth):
log = logging.getLogger(__name__)
MAX_CONCURRENCY = config.concurrent_rpc_connections
class RetryHTTPProvider (AsyncHTTPProvider):
def __init__(self, endpoint_uri: Optional[Union[URI, str]] = None, request_kwargs: Optional[Any] = None) -> None:
super().__init__(endpoint_uri, request_kwargs)
self.in_flight = asyncio.Semaphore(MAX_CONCURRENCY)
self.in_flight = asyncio.Semaphore(config.concurrent_rpc_connections)
self.rate_allowed = asyncio.Event()
self.rate_allowed.set()

View File

@@ -33,5 +33,5 @@ class Config:
mirror_source_rpc_url: Optional[str] = None # source RPC for original pools
mirror_env: Optional[str] = None
mirror_pools: Optional[list[str]] = field(default_factory=list)
mirror_metadata: str = field(default='metadata.json')
mirror_pools: list[str] = field(default_factory=list)
metadata: Optional[str] = field(default='../web/public/metadata.json')

View File

@@ -1,3 +1,5 @@
abis = {
# ERC20 where symbol() returns a bytes32 instead of a string
'ERC20.sb': '''[{"type":"function","name":"symbol","inputs":[],"outputs":[{"name":"","type":"bytes32","internalType":"bytes32"}],"stateMutability":"view"}]'''
# 'WMATIC': '''[{"constant":true,"inputs":[],"name":"name","outputs":[{"name":"","type":"string"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"guy","type":"address"},{"name":"wad","type":"uint256"}],"name":"approve","outputs":[{"name":"","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[],"name":"totalSupply","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"src","type":"address"},{"name":"dst","type":"address"},{"name":"wad","type":"uint256"}],"name":"transferFrom","outputs":[{"name":"","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":false,"inputs":[{"name":"wad","type":"uint256"}],"name":"withdraw","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[],"name":"decimals","outputs":[{"name":"","type":"uint8"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"","type":"address"}],"name":"balanceOf","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[],"name":"symbol","outputs":[{"name":"","type":"string"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"dst","type":"address"},{"name":"wad","type":"uint256"}],"name":"transfer","outputs":[{"name":"","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":false,"inputs":[],"name":"deposit","outputs":[],"payable":true,"stateMutability":"payable","type":"function"},{"constant":true,"inputs":[{"name":"","type":"address"},{"name":"","type":"address"}],"name":"allowance","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"payable":true,"stateMutability":"payable","type":"fallback"},{"anonymous":false,"inputs":[{"indexed":true,"name":"src","type":"address"},{"indexed":true,"name":"guy","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Approval","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"src","type":"address"},{"indexed":true,"name":"dst","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Transfer","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"dst","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Deposit","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"src","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Withdrawal","type":"event"}]''',
}

View File

@@ -1,7 +1,9 @@
import json
import logging
from typing import Optional
import eth_account
from web3.exceptions import BadFunctionCallOutput, Web3Exception
from web3.types import TxReceipt
from dexorder import current_w3
@@ -9,6 +11,7 @@ from dexorder.base.account import current_account
from dexorder.database.model.block import current_block
from dexorder.util import hexstr
log = logging.getLogger(__name__)
class ContractTransaction:
def __init__(self, id_bytes: bytes, rawtx: Optional[bytes] = None):
@@ -42,29 +45,37 @@ class DeployTransaction (ContractTransaction):
def call_wrapper(func):
def call_wrapper(addr, name, func):
async def f(*args, **kwargs):
try:
blockhash = hexstr(current_block.get().hash)
except LookupError:
blockhash = 'latest'
return await func(*args, **kwargs).call(block_identifier=blockhash)
try:
return await func(*args, **kwargs).call(block_identifier=blockhash)
except Web3Exception as e:
log.error(f"Exception calling {addr}.{name}()")
raise e
return f
def transact_wrapper(func):
def transact_wrapper(addr, name, func):
async def f(*args, **kwargs):
tx_id = await func(*args, **kwargs).transact()
try:
tx_id = await func(*args, **kwargs).transact()
except Web3Exception as e:
log.error(f'Exception transacting {addr}.{name}()')
raise e
return ContractTransaction(tx_id)
return f
def build_wrapper(func):
def build_wrapper(addr, name, func):
async def f(*args, **kwargs):
try:
account = current_account.get()
except LookupError:
raise RuntimeError('Cannot invoke a transaction without setting an Account.')
raise RuntimeError(f'Cannot invoke transaction {addr}.{name}() without setting an Account.')
tx = await func(*args, **kwargs).build_transaction()
tx['from'] = account.address
tx['nonce'] = await account.next_nonce()
@@ -128,7 +139,7 @@ class ContractProxy:
found = self.contract.functions[item]
else:
raise AttributeError(item)
return self._wrapper(found)
return self._wrapper(self.address, item, found)
def __repr__(self):
addr = self.contract.address

View File

@@ -1,5 +1,5 @@
import logging
from typing import TypedDict
from typing import TypedDict, Optional
from sqlalchemy.orm import Mapped, mapped_column
@@ -19,6 +19,8 @@ class PoolDict (TypedDict):
quote: str
fee: int
decimals: int
approved: bool # whether this pool has only whitelisted tokens
liquidity: Optional[int]
class Pool (Base):
@@ -42,7 +44,8 @@ class Pool (Base):
def dump(self):
return PoolDict(chain=self.chain.chain_id, address=self.address,
return PoolDict(type='Pool',
chain=self.chain.chain_id, address=self.address,
exchange=self.exchange.value,
base=self.base.address, quote=self.quote,
fee=self.fee, decimals=self.decimals)

View File

@@ -17,6 +17,7 @@ class TokenDict (TypedDict):
name: str
symbol: str
decimals: int
approved: bool # whether this token is in the whitelist or not
# the database object is primarily write-only so we are able to index queries for pools-by-token from the nodejs server
@@ -29,15 +30,17 @@ class Token (Base):
name: Mapped[str]
symbol: Mapped[str]
decimals: Mapped[Uint8]
approved: Mapped[bool]
@staticmethod
def load(token_dict: TokenDict) -> 'Token':
return Token(chain=Blockchain.get(token_dict['chain']), address=token_dict['address'],
name=token_dict['name'], symbol=token_dict['symbol'], decimals=token_dict['decimals'])
name=token_dict['name'], symbol=token_dict['symbol'], decimals=token_dict['decimals'],
approved=token_dict['approved'])
def dump(self):
return TokenDict(type='Token', chain=self.chain.chain_id, address=self.address,
name=self.name, symbol=self.symbol, decimals=self.decimals)
name=self.name, symbol=self.symbol, decimals=self.decimals, approved=self.approved)

View File

@@ -21,18 +21,23 @@
# ]
# }
#
import logging
import sys
from typing import Union, Iterable
from dexorder import config, NARG
from dexorder.base.chain import current_chain
from dexorder.database.model import Token, Pool
from dexorder.database.model.pool import PoolDict
from dexorder.database.model.token import TokenDict
from dexorder.util import json
log = logging.getLogger(__name__)
token_map: dict[str, Token] = {}
# noinspection PyShadowingNames
def dump(file, *args):
print(*args, end='', file=file)
@@ -96,10 +101,47 @@ def dump_pools(out, pools):
json_dump(out,**data)
generating_metadata = False
def init_generating_metadata():
"""
Calling this will prevent the metadata whitelist from squelching unknown pools
"""
global generating_metadata
generating_metadata = True
def is_generating_metadata():
return generating_metadata
# noinspection PyShadowingNames
def generate_metadata(tokens: Iterable[Union[Token, TokenDict]], pools: Iterable[Union[Pool, PoolDict]],
file=sys.stdout):
dump(file, '{"t":[')
dump(file, '{"'+str(current_chain.get().chain_id)+'":{"t":[')
dump_tokens(file, tokens)
dump(file, '],"p":[')
dump_pools(file, pools)
dump(file, ']}')
dump(file, ']}}')
metadata = NARG
metadata_by_chainaddr = {}
def get_metadata(addr=None, *, chain_id=None):
if chain_id is None:
chain_id = current_chain.get().chain_id
global metadata
if metadata is NARG:
if config.metadata is None or generating_metadata:
metadata = None
else:
with open(config.metadata) as file:
metadata = json.load(file)
for chain_id, chain_info in metadata.items():
chain_id = int(chain_id)
for t in chain_info['t']:
metadata_by_chainaddr[chain_id,t['a']] = t
for p in chain_info['p']:
metadata_by_chainaddr[chain_id,p['a']] = p
log.info(f'Loaded metadata from {config.metadata}')
return metadata if addr is None else metadata_by_chainaddr.get((chain_id,addr))

View File

@@ -1,8 +1,9 @@
import logging
import os
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from decimal import InvalidOperation
from typing import Optional, NamedTuple, Reversible, Union
from typing import Optional, NamedTuple, Reversible, Union, TypedDict
from cachetools import LFUCache
@@ -49,6 +50,9 @@ def dt(v):
return v if isinstance(v, datetime) else from_timestamp(v)
class NativeOHLC:
"""
in-memory version of OHLC data using native python types of datetime and decimal
"""
@staticmethod
def from_ohlc(ohlc: OHLC) -> 'NativeOHLC':
return NativeOHLC(*[cast(value) for value, cast in zip(ohlc,(dt, opt_dec, opt_dec, opt_dec, try_dec))], ohlc=ohlc)
@@ -175,6 +179,11 @@ def quotes_path(chain_id: int = None):
chain_id = current_chain.get().chain_id
return f'{chain_id}/quotes.json'
def series_path(chain_id: int = None):
if chain_id is None:
chain_id = current_chain.get().chain_id
return f'{chain_id}/series.json'
def chunk_path(symbol: str, period: timedelta, time: datetime, *, chain_id: int = None) -> str:
if chain_id is None:
@@ -189,7 +198,10 @@ def chunk_path(symbol: str, period: timedelta, time: datetime, *, chain_id: int
class Chunk:
""" Chunks map to files of OHLC's on disk """
"""
Chunks map to files of OHLC's on disk. If an OHLC contains 6 fields instead of just 5, the 6th field is a
timestamp pointing to the next
"""
def __init__(self, repo_dir: str, symbol: str, period: timedelta, time: datetime,
*, bars: Optional[list[NativeOHLC]] = None, chain_id: int = None):
self.repo_dir = repo_dir
@@ -251,14 +263,7 @@ class Chunk:
def save(self):
for _ in range(2):
try:
with open(self.fullpath, 'w') as file:
json.dump([n.ohlc for n in self.bars], file)
return
except FileNotFoundError:
os.makedirs(os.path.dirname(self.fullpath), exist_ok=True)
raise IOError(f'Could not write chunk {self.fullpath}')
save_json([n.ohlc for n in self.bars], self.fullpath)
def __eq__(self, other):
@@ -270,12 +275,23 @@ class Chunk:
class OHLCRepository:
def __init__(self, base_dir: str = None):
"""
this is only used for the backend now and should not write to disk. the finaldata process runs LightOHLCRepository
instead to create json files.
OHLCRepository still functions for in-memory recent ohlc's
"""
def __init__(self, base_dir: str = None, *, chain_id: int = None):
""" can't actually make more than one of these because there's a global recent_ohlcs BlockDict """
self._dir = base_dir
self._chain_id = chain_id
self.cache = LFUCache(len(OHLC_PERIODS) * 1024)
self.dirty_chunks = set()
self._quotes:Optional[dict[str,tuple[int,str]]] = None # todo tim
self._quotes = None
@property
def chain_id(self):
return self._chain_id if self._chain_id is not None else current_chain.get().chain_id
@property
def dir(self):
@@ -291,7 +307,7 @@ class OHLCRepository:
self._quotes = {}
else:
try:
with open(os.path.join(self.dir, quotes_path()), 'r') as f:
with open(os.path.join(self.dir, quotes_path(self.chain_id)), 'r') as f:
self._quotes = json.load(f)
except FileNotFoundError:
self._quotes = {}
@@ -318,7 +334,7 @@ class OHLCRepository:
"""
if price is None, then bars are advanced based on the time but no new price is added to the series.
"""
logname = f'{symbol} {period_name(period)}'
# logname = f'{symbol} {period_name(period)}'
# log.debug(f'Updating OHLC {logname} {minutely(time)} {price}')
if price is not None:
self.quotes[symbol] = timestamp(time), str(price)
@@ -338,6 +354,7 @@ class OHLCRepository:
# drop any historical bars that are older than we need
# oldest_needed = cover the root block time plus one period prior
oldest_needed = from_timestamp(current_blockstate.get().root_block.timestamp) - period
# noinspection PyTypeChecker
trim = (oldest_needed - historical[0].start) // period
if trim > 0:
historical = historical[trim:]
@@ -375,10 +392,9 @@ class OHLCRepository:
chunk.update(native)
self.dirty_chunks.add(chunk)
def get_chunk(self, symbol: str, period: timedelta, time: datetime) -> Chunk:
start_time = ohlc_start_time(time, period)
def get_chunk(self, symbol: str, period: timedelta, start_time: datetime) -> Chunk:
chain_id = current_chain.get().chain_id
key = chunk_path(symbol, period, time, chain_id=chain_id)
key = chunk_path(symbol, period, start_time, chain_id=chain_id)
found = self.cache.get(key)
if found is None:
found = self.cache[key] = Chunk(self.dir, symbol, period, start_time, chain_id=chain_id)
@@ -397,14 +413,20 @@ class OHLCRepository:
os.makedirs(os.path.dirname(filepath), exist_ok=True)
class LightOHLCRepository (OHLCRepository):
class SeriesDict (TypedDict):
start: int # timestamp of the start of the series
class FinalOHLCRepository (OHLCRepository):
"""
Used for backfill when all data is known to be final. Keeps a simple table of current NativeOHLC candles.
"""
def __init__(self, base_dir: str = None):
super().__init__(base_dir)
def __init__(self, base_dir: str = None, *, chain_id: int = None):
super().__init__(base_dir, chain_id=chain_id)
self.current:dict[tuple[str,timedelta],NativeOHLC] = {} # current bars keyed by symbol
self.dirty_bars = set()
self.series:dict[int,dict[str,SeriesDict]] = defaultdict(dict) # keyed by [chain_id][symbol]
self._series_dirty = False
def light_update_all(self, symbol: str, time: datetime, price: Optional[dec]):
for period in OHLC_PERIODS:
@@ -423,8 +445,11 @@ class LightOHLCRepository (OHLCRepository):
# cache miss. load from chunk.
prev = self.current[key] = chunk.bar_at(start)
# log.debug(f'loaded prev bar from chunk {prev}')
if prev is None and symbol in self.quotes:
latest_bar_time = ohlc_start_time(self.quotes[symbol][0], period)
prev = self.current[key] = self.get_chunk(symbol, period, latest_bar_time).bar_at(latest_bar_time)
if prev is None:
# not in cache or chunk. create new bar.
# never seen before. create new bar.
# log.debug(f'no prev bar')
if price is not None:
close = price
@@ -437,6 +462,8 @@ class LightOHLCRepository (OHLCRepository):
bar = self.current[key] = NativeOHLC(start, price, price, price, close)
chunk.update(bar, backfill=backfill)
self.dirty_chunks.add(chunk)
self.series[current_chain.get().chain_id][f'{key[0]}|{key[1]}'] = {'start': timestamp(start)}
self._series_dirty = True
else:
updated = update_ohlc(prev, period, time, price)
for bar in updated:
@@ -445,6 +472,25 @@ class LightOHLCRepository (OHLCRepository):
self.dirty_chunks.add(chunk)
self.current[key] = updated[-1]
def flush(self) -> None:
# flush chunks
super().flush()
# flush series.json if needed
if self._series_dirty:
save_json(self.series, os.path.join(self.dir, series_path(self.chain_id)))
self._series_dirty = False
def save_json(obj, filename):
for _ in range(2):
try:
with open(filename, 'w') as file:
json.dump(obj, file)
return
except FileNotFoundError:
os.makedirs(os.path.dirname(filename), exist_ok=True)
raise IOError(f'Could not write {filename}')
def pub_ohlc(_series:str, key: OHLCKey, bars: list[NativeOHLC]):
pool_addr, period = key

View File

@@ -1,5 +1,7 @@
import asyncio
import logging
from datetime import datetime
from typing import Optional
from web3.exceptions import ContractLogicError
from web3.types import EventData
@@ -12,6 +14,7 @@ from dexorder.blockstate import BlockDict
from dexorder.blockstate.blockdata import K, V
from dexorder.blocktime import get_block_timestamp
from dexorder.database.model.pool import PoolDict
from dexorder.metadata import generating_metadata, is_generating_metadata
from dexorder.tokens import get_token
from dexorder.uniswap import UniswapV3Pool, uniswapV3_pool_address
@@ -32,27 +35,31 @@ async def load_pool(address: str) -> PoolDict:
# todo other exchanges
try:
v3 = UniswapV3Pool(address)
t0, t1, fee = await asyncio.gather(v3.token0(), v3.token1(), v3.fee())
if uniswapV3_pool_address(t0, t1, fee) == address: # VALIDATE don't just trust that it's a Uniswap pool
token0, token1 = await asyncio.gather(get_token(t0), get_token(t1))
if token0 is None or token1 is None:
found = None
else:
t0, t1 = await asyncio.gather(v3.token0(), v3.token1())
token0, token1 = await asyncio.gather(get_token(t0), get_token(t1))
if (token0 is not None and token1 is not None
and (token0['approved'] and token1['approved'] or is_generating_metadata())):
fee = await v3.fee()
if uniswapV3_pool_address(t0, t1, fee) == address: # VALIDATE don't just trust that it's a Uniswap pool
decimals = token0['decimals'] - token1['decimals']
found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.UniswapV3.value,
base=t0, quote=t1, fee=fee, decimals=decimals)
log.debug(f'new UniswapV3 pool {token0["symbol"]}\\{token1["symbol"]} '
f'{("."+str(decimals)) if decimals >= 0 else (str(-decimals)+".")} {address}')
else: # NOT a genuine Uniswap V3 pool if the address test doesn't pass
log.debug(f'new Unknown pool at {address}')
except ContractLogicError:
log.debug(f'new Unknown pool at {address}')
pass
except ValueError as v:
if v.args[0].get('code') == -32000:
log.debug(f'new Unknown pool at {address}')
try:
code = v.args[0].get('code')
except:
raise v
else:
raise
if code == -32000:
pass
else:
raise v
if found is None:
log.debug(f'new Unknown pool at {address}')
found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.Unknown.value,
base=ADDRESS_0, quote=ADDRESS_0, fee=0, decimals=0)
return found
@@ -94,7 +101,7 @@ async def ensure_pool_price(pool: PoolDict):
# todo other exchanges
async def get_uniswap_data(swap: EventData):
async def get_uniswap_data(swap: EventData) -> Optional[tuple[PoolDict, datetime, dec]]:
try:
sqrt_price = swap['args']['sqrtPriceX96']
except KeyError:
@@ -102,7 +109,6 @@ async def get_uniswap_data(swap: EventData):
addr = swap['address']
pool = await get_pool(addr)
if pool['exchange'] != Exchange.UniswapV3.value:
# log.debug(f'Ignoring {Exchange(pool["exchange"])} pool {addr}')
return None
price: dec = await uniswap_price(pool, sqrt_price)
timestamp = await get_block_timestamp(swap['blockHash'])

View File

@@ -5,15 +5,19 @@ from typing import Optional
from eth_abi.exceptions import InsufficientDataBytes
from web3.exceptions import ContractLogicError, BadFunctionCallOutput
from dexorder import ADDRESS_0
from dexorder.addrmeta import address_metadata
from dexorder.base.chain import current_chain
from dexorder.contract import ERC20
from dexorder.contract import ERC20, ContractProxy
from dexorder.database.model.token import TokenDict
from dexorder.metadata import get_metadata
log = logging.getLogger(__name__)
async def get_token(address) -> Optional[TokenDict]:
if address == ADDRESS_0:
raise ValueError('No token at address 0')
try:
return address_metadata[address]
except KeyError:
@@ -23,17 +27,35 @@ async def get_token(address) -> Optional[TokenDict]:
async def load_token(address: str) -> Optional[TokenDict]:
contract = ERC20(address)
prom = asyncio.gather(contract.name(), contract.symbol())
name_prom = contract.name()
dec_prom = contract.decimals()
try:
decimals = await contract.decimals()
symbol = await contract.symbol()
except (InsufficientDataBytes, BadFunctionCallOutput):
# this happens when the token returns bytes32 instead of a string
try:
symbol = await ContractProxy(address, 'ERC20.sb').symbol()
log.info(f'got bytes32 symbol {symbol}')
except (InsufficientDataBytes, ContractLogicError, BadFunctionCallOutput):
log.warning(f'token {address} has broken symbol()')
return None
try:
decimals = await dec_prom
except (InsufficientDataBytes, ContractLogicError, BadFunctionCallOutput):
log.warning(f'token {address} has no decimals()')
decimals = 0
try:
name, symbol = await prom
except OverflowError:
# this happens when the token returns bytes32 instead of a string
return None
name = await name_prom
log.debug(f'new token {name} {symbol} {address}')
return TokenDict(type='Token', chain=current_chain.get().chain_id, address=address,
name=name, symbol=symbol, decimals=decimals)
chain_id = current_chain.get().chain_id
td = TokenDict(type='Token', chain=chain_id, address=address,
name=name, symbol=symbol, decimals=decimals, approved=False)
md = get_metadata(address, chain_id=chain_id)
if md is not None:
td['approved'] = True
if 'n' in md:
td['name'] = md['n']
if 's' in md:
td['symbol'] = md['s']
if 'd' in md:
td['decimals'] = md['d']
return td