From 91dee5c0309b2ac2ab531a4cbf2b06ddd3587613 Mon Sep 17 00:00:00 2001 From: Tim Date: Mon, 18 Mar 2024 18:06:16 -0400 Subject: [PATCH] backend metadata; approved tokens; logging.toml; --- .gitignore | 1 + .../versions/db62e7db828d_initial_schema.py | 1 + logging-default.toml | 27 ++++++ metadata-finaldata.json | 92 +++++++++++++++++++ src/dexorder/bin/executable.py | 21 ++++- src/dexorder/bin/finaldata.py | 4 +- src/dexorder/bin/main.py | 3 - src/dexorder/bin/mirror.py | 11 ++- src/dexorder/bin/tokenlist_metadata.py | 29 ++++++ src/dexorder/blockchain/connection.py | 4 +- src/dexorder/configuration/schema.py | 4 +- src/dexorder/contract/abi.py | 2 + src/dexorder/contract/contract_proxy.py | 25 +++-- src/dexorder/database/model/pool.py | 7 +- src/dexorder/database/model/token.py | 7 +- src/dexorder/metadata.py | 48 +++++++++- src/dexorder/ohlc.py | 88 +++++++++++++----- src/dexorder/pools.py | 34 ++++--- src/dexorder/tokens.py | 42 +++++++-- 19 files changed, 375 insertions(+), 75 deletions(-) create mode 100644 logging-default.toml create mode 100644 metadata-finaldata.json create mode 100644 src/dexorder/bin/tokenlist_metadata.py diff --git a/.gitignore b/.gitignore index 588c514..cc0926a 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ venv *secret* !/.secret-mock.toml dexorder.toml +logging.toml /contract __pycache__ .idea diff --git a/alembic/versions/db62e7db828d_initial_schema.py b/alembic/versions/db62e7db828d_initial_schema.py index 5c26284..d6f32d7 100644 --- a/alembic/versions/db62e7db828d_initial_schema.py +++ b/alembic/versions/db62e7db828d_initial_schema.py @@ -80,6 +80,7 @@ def upgrade() -> None: sa.Column('name', sa.String(), nullable=False), sa.Column('symbol', sa.String(), nullable=False), sa.Column('decimals', sa.SMALLINT(), nullable=False), + sa.Column('approved', sa.Boolean(), nullable=False), sa.PrimaryKeyConstraint('chain', 'address') ) op.create_table('pool', diff --git a/logging-default.toml b/logging-default.toml new file mode 100644 index 0000000..ac2dd7f --- /dev/null +++ b/logging-default.toml @@ -0,0 +1,27 @@ +# https://docs.python.org/3/library/logging.config.html#logging-config-dictschema +version=1 + +[loggers.''] +level='INFO' +handlers=['console',] + +[loggers.dexorder] +level='DEBUG' + +[handlers.console] +class='logging.StreamHandler' +formatter='default' +level='INFO' +stream='ext://sys.stdout' + +[formatters.default] +# https://docs.python.org/3/library/logging.html#logrecord-attributes +format='%(asctime)s %(name)s %(message)s' +# https://docs.python.org/3/library/time.html#time.strftime +datefmt='%Y-%m-%d %H:%M:%S' + +[formatters.notime] +# https://docs.python.org/3/library/logging.html#logrecord-attributes +format='%(name)s %(message)s' +# https://docs.python.org/3/library/time.html#time.strftime +datefmt='%Y-%m-%d %H:%M:%S' diff --git a/metadata-finaldata.json b/metadata-finaldata.json new file mode 100644 index 0000000..3f4d49d --- /dev/null +++ b/metadata-finaldata.json @@ -0,0 +1,92 @@ +{ + "42161": { + "t": [ + { + "a": "0x2653aB60a6fD19d3e1f4E729412C219257CA6e24", + "n": "USD Coin (Arbitrum Native)", + "s": "USDC", + "d": 6 + }, + { + "a": "0x87b60276434d5cB5CE46d2316B596c7Bc5f87cCA", + "n": "Wrapped Ether", + "s": "WETH", + "d": 18 + }, + { + "a": "0xEaD55ce1cC577C8020F5FcE39D4e5C643F591a0c", + "n": "USD Coin (Ethereum Bridged)", + "s": "USDC.e", + "d": 6 + }, + { + "a": "0x2f2a2543B76A4166549F7aaB2e75Bef0aefC5B0f", + "s": "WBTC" + }, + { + "a": "0x912CE59144191C1204E64559FE8253a0e49E6548", + "s": "ARB" + }, + { + "a": "0x9623063377AD1B27544C965cCd7342f7EA7e88C7", + "s": "GRT" + }, + { + "a": "0xFd086bC7CD5C481DCC9C85ebE478A1C0b69FCbb9", + "s": "USDT" + }, + { + "a": "0xDA10009cBd5D07dd0CeCc66161FC93D7c9000da1", + "s": "DAI" + }, + { + "a": "0xf97f4df75117a78c1A5a0DBb814Af92458539FB4", + "s": "LINK" + }, + { + "a": "0xFa7F8980b0f1E64A2062791cc3b0871572f1F7f0", + "s": "UNI" + }, + { + "a": "0xba5DdD1f9d7F570dc94a51479a000E3BCE967196", + "s": "AAVE" + }, + { + "a": "0x371c7ec6D8039ff7933a2AA28EB827Ffe1F52f07", + "s": "JOE" + }, + { + "a": "0xfc5A1A6EB076a2C7aD06eD22C90d7E710E35ad0a", + "s": "GMX" + }, + { + "a": "0x11cDb42B0EB46D95f990BeDD4695A6e3fA034978", + "s": "CRV" + }, + { + "a": "0x13Ad51ed4F1B7e9Dc168d8a00cB3f4dDD85EfA60", + "s": "LDO" + }, + { + "a": "0xba0Dda8762C24dA9487f5FA026a9B64b695A07Ea", + "s": "OX" + }, + { + "a": "0x561877b6b3DD7651313794e5F2894B2F18bE0766", + "s": "MATIC" + }, + { + "a": "0x5979D7b546E38E414F7E9822514be443A4800529", + "s": "wstETH" + }, + { + "a": "0x539bdE0d7Dbd336b79148AA742883198BBF60342", + "s": "MAGIC" + }, + { + "a": "0x0c880f6761F1af8d9Aa9C466984b80DAb9a8c9e8", + "s": "PENDLE" + } + ] + } +} \ No newline at end of file diff --git a/src/dexorder/bin/executable.py b/src/dexorder/bin/executable.py index 26326bf..87e75b9 100644 --- a/src/dexorder/bin/executable.py +++ b/src/dexorder/bin/executable.py @@ -1,10 +1,14 @@ import logging +import logging.config +import tomllib from asyncio import CancelledError from traceback import print_exception import asyncio from signal import Signals from typing import Coroutine +import sys + from dexorder import configuration if __name__ == '__main__': @@ -30,7 +34,22 @@ async def _shutdown_coro(_sig, loop, extra_shutdown): print_exception(x) -def execute(main:Coroutine, shutdown=None, parse_args=True): +def execute(main:Coroutine, shutdown=None, *, parse_logging=True, parse_args=True): + configured = False + if parse_logging: + try: + with open('logging.toml', 'rb') as file: + dictconf = tomllib.load(file) + except FileNotFoundError: + pass + else: + logging.config.dictConfig(dictconf) + log.info('Logging configured from logging.toml') + configured = True + if not configured: + logging.basicConfig(level=logging.INFO, stream=sys.stdout) + log.setLevel(logging.DEBUG) + log.info('Logging configured to default') if parse_args: configuration.parse_args() loop = asyncio.get_event_loop() diff --git a/src/dexorder/bin/finaldata.py b/src/dexorder/bin/finaldata.py index 9eea7a9..a057de9 100644 --- a/src/dexorder/bin/finaldata.py +++ b/src/dexorder/bin/finaldata.py @@ -13,7 +13,7 @@ from dexorder.blocktime import get_block_timestamp from dexorder.configuration import parse_args from dexorder.contract import get_contract_event from dexorder.database.model.block import current_block, latest_block -from dexorder.ohlc import LightOHLCRepository +from dexorder.ohlc import FinalOHLCRepository from dexorder.pools import get_uniswap_data from dexorder.util import hexstr from dexorder.util.shutdown import fatal @@ -22,7 +22,7 @@ from dexorder.walker import BlockWalker log = logging.getLogger('dexorder') -ohlcs = LightOHLCRepository() +ohlcs = FinalOHLCRepository() async def handle_backfill_uniswap_swaps(swaps: list[EventData]): diff --git a/src/dexorder/bin/main.py b/src/dexorder/bin/main.py index b7d0ae9..61ec5a2 100644 --- a/src/dexorder/bin/main.py +++ b/src/dexorder/bin/main.py @@ -74,9 +74,6 @@ def setup_logevent_triggers(runner): # noinspection DuplicatedCode async def main(): - logging.basicConfig(level=logging.INFO, stream=sys.stdout) - log.setLevel(logging.DEBUG) - parse_args() await blockchain.connect() redis_state = None state = None diff --git a/src/dexorder/bin/mirror.py b/src/dexorder/bin/mirror.py index 52ffecd..8ced0df 100644 --- a/src/dexorder/bin/mirror.py +++ b/src/dexorder/bin/mirror.py @@ -1,7 +1,6 @@ import asyncio import logging import os -import sys from dexorder import config, blockchain, current_w3 from dexorder.bin.executable import execute @@ -9,7 +8,7 @@ from dexorder.blockchain.connection import create_w3 from dexorder.blockstate import current_blockstate from dexorder.blockstate.state import FinalizedBlockState from dexorder.contract import get_deployment_address, ContractProxy, ERC20 -from dexorder.metadata import generate_metadata +from dexorder.metadata import generate_metadata, init_generating_metadata from dexorder.pools import get_pool from dexorder.tokens import get_token from dexorder.uniswap import UniswapV3Pool @@ -71,7 +70,7 @@ async def get_pool_info( pool ): return [pool, t0, t1, fee, price, amount0, amount1] async def write_metadata( pools, mirror_pools ): - filename = config.mirror_metadata + filename = config.metadata if filename is None: return pool_dicts = [get_pool(addr) for (addr,_inverted) in mirror_pools] @@ -92,6 +91,10 @@ async def await_mirror(tx, pool_addr, mirror_addr, mirror_inverted ): async def main(): + init_generating_metadata() + if config.metadata is None: + log.error('Must configure metadata (filename to write)') + return if config.mirror_source_rpc_url is None: log.error('Must configure mirror_source_rpc_url') return @@ -139,6 +142,4 @@ async def main(): if __name__ == '__main__': - logging.basicConfig(level=logging.INFO, stream=sys.stdout) - log.setLevel(logging.DEBUG) execute(main()) diff --git a/src/dexorder/bin/tokenlist_metadata.py b/src/dexorder/bin/tokenlist_metadata.py new file mode 100644 index 0000000..26d1d86 --- /dev/null +++ b/src/dexorder/bin/tokenlist_metadata.py @@ -0,0 +1,29 @@ +# Prints a JSON string to stdout containing metadata information for all the known tokens and pools +# +# see metadata.py + +import logging +import sys + +from sqlalchemy import select + +from dexorder import db +from dexorder.configuration import parse_args +from dexorder.database.model import Pool, Token +from dexorder.metadata import generate_metadata + +log = logging.getLogger(__name__) + + +def main(): + logging.basicConfig(level=logging.INFO, stream=sys.stderr) + log.setLevel(logging.DEBUG) + parse_args() + db.connect(migrate=False) + tokens = db.session.scalars(select(Token)) + pools = db.session.scalars(select(Pool)) + generate_metadata(tokens, pools) + + +if __name__ == '__main__': + main() diff --git a/src/dexorder/blockchain/connection.py b/src/dexorder/blockchain/connection.py index d5c6eb1..5575b4d 100644 --- a/src/dexorder/blockchain/connection.py +++ b/src/dexorder/blockchain/connection.py @@ -34,7 +34,6 @@ async def create_w3(rpc_url=None, account=NARG, autosign=False): # chain_id = self.w3s[0].eth.chain_id # assert all(w3.eth.chain_id == chain_id for w3 in self.w3s) # all rpc urls must be the same blockchain # self.w3iter = itertools.cycle(self.w3s) - url = resolve_rpc_url(rpc_url) w3 = AsyncWeb3(RetryHTTPProvider(url)) # w3.middleware_onion.inject(geth_poa_middleware, layer=0) # todo is this line needed? @@ -114,13 +113,12 @@ def _make_contract(w3_eth): log = logging.getLogger(__name__) -MAX_CONCURRENCY = config.concurrent_rpc_connections class RetryHTTPProvider (AsyncHTTPProvider): def __init__(self, endpoint_uri: Optional[Union[URI, str]] = None, request_kwargs: Optional[Any] = None) -> None: super().__init__(endpoint_uri, request_kwargs) - self.in_flight = asyncio.Semaphore(MAX_CONCURRENCY) + self.in_flight = asyncio.Semaphore(config.concurrent_rpc_connections) self.rate_allowed = asyncio.Event() self.rate_allowed.set() diff --git a/src/dexorder/configuration/schema.py b/src/dexorder/configuration/schema.py index 38a0e40..34a27e0 100644 --- a/src/dexorder/configuration/schema.py +++ b/src/dexorder/configuration/schema.py @@ -33,5 +33,5 @@ class Config: mirror_source_rpc_url: Optional[str] = None # source RPC for original pools mirror_env: Optional[str] = None - mirror_pools: Optional[list[str]] = field(default_factory=list) - mirror_metadata: str = field(default='metadata.json') + mirror_pools: list[str] = field(default_factory=list) + metadata: Optional[str] = field(default='../web/public/metadata.json') diff --git a/src/dexorder/contract/abi.py b/src/dexorder/contract/abi.py index 8fd5a8c..cdffb5e 100644 --- a/src/dexorder/contract/abi.py +++ b/src/dexorder/contract/abi.py @@ -1,3 +1,5 @@ abis = { + # ERC20 where symbol() returns a bytes32 instead of a string + 'ERC20.sb': '''[{"type":"function","name":"symbol","inputs":[],"outputs":[{"name":"","type":"bytes32","internalType":"bytes32"}],"stateMutability":"view"}]''' # 'WMATIC': '''[{"constant":true,"inputs":[],"name":"name","outputs":[{"name":"","type":"string"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"guy","type":"address"},{"name":"wad","type":"uint256"}],"name":"approve","outputs":[{"name":"","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[],"name":"totalSupply","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"src","type":"address"},{"name":"dst","type":"address"},{"name":"wad","type":"uint256"}],"name":"transferFrom","outputs":[{"name":"","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":false,"inputs":[{"name":"wad","type":"uint256"}],"name":"withdraw","outputs":[],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":true,"inputs":[],"name":"decimals","outputs":[{"name":"","type":"uint8"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[{"name":"","type":"address"}],"name":"balanceOf","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":true,"inputs":[],"name":"symbol","outputs":[{"name":"","type":"string"}],"payable":false,"stateMutability":"view","type":"function"},{"constant":false,"inputs":[{"name":"dst","type":"address"},{"name":"wad","type":"uint256"}],"name":"transfer","outputs":[{"name":"","type":"bool"}],"payable":false,"stateMutability":"nonpayable","type":"function"},{"constant":false,"inputs":[],"name":"deposit","outputs":[],"payable":true,"stateMutability":"payable","type":"function"},{"constant":true,"inputs":[{"name":"","type":"address"},{"name":"","type":"address"}],"name":"allowance","outputs":[{"name":"","type":"uint256"}],"payable":false,"stateMutability":"view","type":"function"},{"payable":true,"stateMutability":"payable","type":"fallback"},{"anonymous":false,"inputs":[{"indexed":true,"name":"src","type":"address"},{"indexed":true,"name":"guy","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Approval","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"src","type":"address"},{"indexed":true,"name":"dst","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Transfer","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"dst","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Deposit","type":"event"},{"anonymous":false,"inputs":[{"indexed":true,"name":"src","type":"address"},{"indexed":false,"name":"wad","type":"uint256"}],"name":"Withdrawal","type":"event"}]''', } diff --git a/src/dexorder/contract/contract_proxy.py b/src/dexorder/contract/contract_proxy.py index 1118b0e..1628bd1 100644 --- a/src/dexorder/contract/contract_proxy.py +++ b/src/dexorder/contract/contract_proxy.py @@ -1,7 +1,9 @@ import json +import logging from typing import Optional import eth_account +from web3.exceptions import BadFunctionCallOutput, Web3Exception from web3.types import TxReceipt from dexorder import current_w3 @@ -9,6 +11,7 @@ from dexorder.base.account import current_account from dexorder.database.model.block import current_block from dexorder.util import hexstr +log = logging.getLogger(__name__) class ContractTransaction: def __init__(self, id_bytes: bytes, rawtx: Optional[bytes] = None): @@ -42,29 +45,37 @@ class DeployTransaction (ContractTransaction): -def call_wrapper(func): +def call_wrapper(addr, name, func): async def f(*args, **kwargs): try: blockhash = hexstr(current_block.get().hash) except LookupError: blockhash = 'latest' - return await func(*args, **kwargs).call(block_identifier=blockhash) + try: + return await func(*args, **kwargs).call(block_identifier=blockhash) + except Web3Exception as e: + log.error(f"Exception calling {addr}.{name}()") + raise e return f -def transact_wrapper(func): +def transact_wrapper(addr, name, func): async def f(*args, **kwargs): - tx_id = await func(*args, **kwargs).transact() + try: + tx_id = await func(*args, **kwargs).transact() + except Web3Exception as e: + log.error(f'Exception transacting {addr}.{name}()') + raise e return ContractTransaction(tx_id) return f -def build_wrapper(func): +def build_wrapper(addr, name, func): async def f(*args, **kwargs): try: account = current_account.get() except LookupError: - raise RuntimeError('Cannot invoke a transaction without setting an Account.') + raise RuntimeError(f'Cannot invoke transaction {addr}.{name}() without setting an Account.') tx = await func(*args, **kwargs).build_transaction() tx['from'] = account.address tx['nonce'] = await account.next_nonce() @@ -128,7 +139,7 @@ class ContractProxy: found = self.contract.functions[item] else: raise AttributeError(item) - return self._wrapper(found) + return self._wrapper(self.address, item, found) def __repr__(self): addr = self.contract.address diff --git a/src/dexorder/database/model/pool.py b/src/dexorder/database/model/pool.py index 3b9156e..58a98d1 100644 --- a/src/dexorder/database/model/pool.py +++ b/src/dexorder/database/model/pool.py @@ -1,5 +1,5 @@ import logging -from typing import TypedDict +from typing import TypedDict, Optional from sqlalchemy.orm import Mapped, mapped_column @@ -19,6 +19,8 @@ class PoolDict (TypedDict): quote: str fee: int decimals: int + approved: bool # whether this pool has only whitelisted tokens + liquidity: Optional[int] class Pool (Base): @@ -42,7 +44,8 @@ class Pool (Base): def dump(self): - return PoolDict(chain=self.chain.chain_id, address=self.address, + return PoolDict(type='Pool', + chain=self.chain.chain_id, address=self.address, exchange=self.exchange.value, base=self.base.address, quote=self.quote, fee=self.fee, decimals=self.decimals) diff --git a/src/dexorder/database/model/token.py b/src/dexorder/database/model/token.py index 8c1ac2d..50adfda 100644 --- a/src/dexorder/database/model/token.py +++ b/src/dexorder/database/model/token.py @@ -17,6 +17,7 @@ class TokenDict (TypedDict): name: str symbol: str decimals: int + approved: bool # whether this token is in the whitelist or not # the database object is primarily write-only so we are able to index queries for pools-by-token from the nodejs server @@ -29,15 +30,17 @@ class Token (Base): name: Mapped[str] symbol: Mapped[str] decimals: Mapped[Uint8] + approved: Mapped[bool] @staticmethod def load(token_dict: TokenDict) -> 'Token': return Token(chain=Blockchain.get(token_dict['chain']), address=token_dict['address'], - name=token_dict['name'], symbol=token_dict['symbol'], decimals=token_dict['decimals']) + name=token_dict['name'], symbol=token_dict['symbol'], decimals=token_dict['decimals'], + approved=token_dict['approved']) def dump(self): return TokenDict(type='Token', chain=self.chain.chain_id, address=self.address, - name=self.name, symbol=self.symbol, decimals=self.decimals) + name=self.name, symbol=self.symbol, decimals=self.decimals, approved=self.approved) diff --git a/src/dexorder/metadata.py b/src/dexorder/metadata.py index 6844afb..d89228d 100644 --- a/src/dexorder/metadata.py +++ b/src/dexorder/metadata.py @@ -21,18 +21,23 @@ # ] # } # - +import logging import sys from typing import Union, Iterable +from dexorder import config, NARG +from dexorder.base.chain import current_chain from dexorder.database.model import Token, Pool from dexorder.database.model.pool import PoolDict from dexorder.database.model.token import TokenDict from dexorder.util import json +log = logging.getLogger(__name__) + token_map: dict[str, Token] = {} +# noinspection PyShadowingNames def dump(file, *args): print(*args, end='', file=file) @@ -96,10 +101,47 @@ def dump_pools(out, pools): json_dump(out,**data) +generating_metadata = False + +def init_generating_metadata(): + """ + Calling this will prevent the metadata whitelist from squelching unknown pools + """ + global generating_metadata + generating_metadata = True + +def is_generating_metadata(): + return generating_metadata + + +# noinspection PyShadowingNames def generate_metadata(tokens: Iterable[Union[Token, TokenDict]], pools: Iterable[Union[Pool, PoolDict]], file=sys.stdout): - dump(file, '{"t":[') + dump(file, '{"'+str(current_chain.get().chain_id)+'":{"t":[') dump_tokens(file, tokens) dump(file, '],"p":[') dump_pools(file, pools) - dump(file, ']}') + dump(file, ']}}') + + +metadata = NARG +metadata_by_chainaddr = {} + +def get_metadata(addr=None, *, chain_id=None): + if chain_id is None: + chain_id = current_chain.get().chain_id + global metadata + if metadata is NARG: + if config.metadata is None or generating_metadata: + metadata = None + else: + with open(config.metadata) as file: + metadata = json.load(file) + for chain_id, chain_info in metadata.items(): + chain_id = int(chain_id) + for t in chain_info['t']: + metadata_by_chainaddr[chain_id,t['a']] = t + for p in chain_info['p']: + metadata_by_chainaddr[chain_id,p['a']] = p + log.info(f'Loaded metadata from {config.metadata}') + return metadata if addr is None else metadata_by_chainaddr.get((chain_id,addr)) diff --git a/src/dexorder/ohlc.py b/src/dexorder/ohlc.py index 348cd77..589cdf7 100644 --- a/src/dexorder/ohlc.py +++ b/src/dexorder/ohlc.py @@ -1,8 +1,9 @@ import logging import os +from collections import defaultdict from datetime import datetime, timedelta, timezone from decimal import InvalidOperation -from typing import Optional, NamedTuple, Reversible, Union +from typing import Optional, NamedTuple, Reversible, Union, TypedDict from cachetools import LFUCache @@ -49,6 +50,9 @@ def dt(v): return v if isinstance(v, datetime) else from_timestamp(v) class NativeOHLC: + """ + in-memory version of OHLC data using native python types of datetime and decimal + """ @staticmethod def from_ohlc(ohlc: OHLC) -> 'NativeOHLC': return NativeOHLC(*[cast(value) for value, cast in zip(ohlc,(dt, opt_dec, opt_dec, opt_dec, try_dec))], ohlc=ohlc) @@ -175,6 +179,11 @@ def quotes_path(chain_id: int = None): chain_id = current_chain.get().chain_id return f'{chain_id}/quotes.json' +def series_path(chain_id: int = None): + if chain_id is None: + chain_id = current_chain.get().chain_id + return f'{chain_id}/series.json' + def chunk_path(symbol: str, period: timedelta, time: datetime, *, chain_id: int = None) -> str: if chain_id is None: @@ -189,7 +198,10 @@ def chunk_path(symbol: str, period: timedelta, time: datetime, *, chain_id: int class Chunk: - """ Chunks map to files of OHLC's on disk """ + """ + Chunks map to files of OHLC's on disk. If an OHLC contains 6 fields instead of just 5, the 6th field is a + timestamp pointing to the next + """ def __init__(self, repo_dir: str, symbol: str, period: timedelta, time: datetime, *, bars: Optional[list[NativeOHLC]] = None, chain_id: int = None): self.repo_dir = repo_dir @@ -251,14 +263,7 @@ class Chunk: def save(self): - for _ in range(2): - try: - with open(self.fullpath, 'w') as file: - json.dump([n.ohlc for n in self.bars], file) - return - except FileNotFoundError: - os.makedirs(os.path.dirname(self.fullpath), exist_ok=True) - raise IOError(f'Could not write chunk {self.fullpath}') + save_json([n.ohlc for n in self.bars], self.fullpath) def __eq__(self, other): @@ -270,12 +275,23 @@ class Chunk: class OHLCRepository: - def __init__(self, base_dir: str = None): + """ + this is only used for the backend now and should not write to disk. the finaldata process runs LightOHLCRepository + instead to create json files. + OHLCRepository still functions for in-memory recent ohlc's + """ + + def __init__(self, base_dir: str = None, *, chain_id: int = None): """ can't actually make more than one of these because there's a global recent_ohlcs BlockDict """ self._dir = base_dir + self._chain_id = chain_id self.cache = LFUCache(len(OHLC_PERIODS) * 1024) self.dirty_chunks = set() - self._quotes:Optional[dict[str,tuple[int,str]]] = None # todo tim + self._quotes = None + + @property + def chain_id(self): + return self._chain_id if self._chain_id is not None else current_chain.get().chain_id @property def dir(self): @@ -291,7 +307,7 @@ class OHLCRepository: self._quotes = {} else: try: - with open(os.path.join(self.dir, quotes_path()), 'r') as f: + with open(os.path.join(self.dir, quotes_path(self.chain_id)), 'r') as f: self._quotes = json.load(f) except FileNotFoundError: self._quotes = {} @@ -318,7 +334,7 @@ class OHLCRepository: """ if price is None, then bars are advanced based on the time but no new price is added to the series. """ - logname = f'{symbol} {period_name(period)}' + # logname = f'{symbol} {period_name(period)}' # log.debug(f'Updating OHLC {logname} {minutely(time)} {price}') if price is not None: self.quotes[symbol] = timestamp(time), str(price) @@ -338,6 +354,7 @@ class OHLCRepository: # drop any historical bars that are older than we need # oldest_needed = cover the root block time plus one period prior oldest_needed = from_timestamp(current_blockstate.get().root_block.timestamp) - period + # noinspection PyTypeChecker trim = (oldest_needed - historical[0].start) // period if trim > 0: historical = historical[trim:] @@ -375,10 +392,9 @@ class OHLCRepository: chunk.update(native) self.dirty_chunks.add(chunk) - def get_chunk(self, symbol: str, period: timedelta, time: datetime) -> Chunk: - start_time = ohlc_start_time(time, period) + def get_chunk(self, symbol: str, period: timedelta, start_time: datetime) -> Chunk: chain_id = current_chain.get().chain_id - key = chunk_path(symbol, period, time, chain_id=chain_id) + key = chunk_path(symbol, period, start_time, chain_id=chain_id) found = self.cache.get(key) if found is None: found = self.cache[key] = Chunk(self.dir, symbol, period, start_time, chain_id=chain_id) @@ -397,14 +413,20 @@ class OHLCRepository: os.makedirs(os.path.dirname(filepath), exist_ok=True) -class LightOHLCRepository (OHLCRepository): +class SeriesDict (TypedDict): + start: int # timestamp of the start of the series + + +class FinalOHLCRepository (OHLCRepository): """ Used for backfill when all data is known to be final. Keeps a simple table of current NativeOHLC candles. """ - def __init__(self, base_dir: str = None): - super().__init__(base_dir) + def __init__(self, base_dir: str = None, *, chain_id: int = None): + super().__init__(base_dir, chain_id=chain_id) self.current:dict[tuple[str,timedelta],NativeOHLC] = {} # current bars keyed by symbol self.dirty_bars = set() + self.series:dict[int,dict[str,SeriesDict]] = defaultdict(dict) # keyed by [chain_id][symbol] + self._series_dirty = False def light_update_all(self, symbol: str, time: datetime, price: Optional[dec]): for period in OHLC_PERIODS: @@ -423,8 +445,11 @@ class LightOHLCRepository (OHLCRepository): # cache miss. load from chunk. prev = self.current[key] = chunk.bar_at(start) # log.debug(f'loaded prev bar from chunk {prev}') + if prev is None and symbol in self.quotes: + latest_bar_time = ohlc_start_time(self.quotes[symbol][0], period) + prev = self.current[key] = self.get_chunk(symbol, period, latest_bar_time).bar_at(latest_bar_time) if prev is None: - # not in cache or chunk. create new bar. + # never seen before. create new bar. # log.debug(f'no prev bar') if price is not None: close = price @@ -437,6 +462,8 @@ class LightOHLCRepository (OHLCRepository): bar = self.current[key] = NativeOHLC(start, price, price, price, close) chunk.update(bar, backfill=backfill) self.dirty_chunks.add(chunk) + self.series[current_chain.get().chain_id][f'{key[0]}|{key[1]}'] = {'start': timestamp(start)} + self._series_dirty = True else: updated = update_ohlc(prev, period, time, price) for bar in updated: @@ -445,6 +472,25 @@ class LightOHLCRepository (OHLCRepository): self.dirty_chunks.add(chunk) self.current[key] = updated[-1] + def flush(self) -> None: + # flush chunks + super().flush() + # flush series.json if needed + if self._series_dirty: + save_json(self.series, os.path.join(self.dir, series_path(self.chain_id))) + self._series_dirty = False + + +def save_json(obj, filename): + for _ in range(2): + try: + with open(filename, 'w') as file: + json.dump(obj, file) + return + except FileNotFoundError: + os.makedirs(os.path.dirname(filename), exist_ok=True) + raise IOError(f'Could not write {filename}') + def pub_ohlc(_series:str, key: OHLCKey, bars: list[NativeOHLC]): pool_addr, period = key diff --git a/src/dexorder/pools.py b/src/dexorder/pools.py index 9571d1a..1e274d5 100644 --- a/src/dexorder/pools.py +++ b/src/dexorder/pools.py @@ -1,5 +1,7 @@ import asyncio import logging +from datetime import datetime +from typing import Optional from web3.exceptions import ContractLogicError from web3.types import EventData @@ -12,6 +14,7 @@ from dexorder.blockstate import BlockDict from dexorder.blockstate.blockdata import K, V from dexorder.blocktime import get_block_timestamp from dexorder.database.model.pool import PoolDict +from dexorder.metadata import generating_metadata, is_generating_metadata from dexorder.tokens import get_token from dexorder.uniswap import UniswapV3Pool, uniswapV3_pool_address @@ -32,27 +35,31 @@ async def load_pool(address: str) -> PoolDict: # todo other exchanges try: v3 = UniswapV3Pool(address) - t0, t1, fee = await asyncio.gather(v3.token0(), v3.token1(), v3.fee()) - if uniswapV3_pool_address(t0, t1, fee) == address: # VALIDATE don't just trust that it's a Uniswap pool - token0, token1 = await asyncio.gather(get_token(t0), get_token(t1)) - if token0 is None or token1 is None: - found = None - else: + t0, t1 = await asyncio.gather(v3.token0(), v3.token1()) + token0, token1 = await asyncio.gather(get_token(t0), get_token(t1)) + if (token0 is not None and token1 is not None + and (token0['approved'] and token1['approved'] or is_generating_metadata())): + fee = await v3.fee() + if uniswapV3_pool_address(t0, t1, fee) == address: # VALIDATE don't just trust that it's a Uniswap pool decimals = token0['decimals'] - token1['decimals'] found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.UniswapV3.value, base=t0, quote=t1, fee=fee, decimals=decimals) log.debug(f'new UniswapV3 pool {token0["symbol"]}\\{token1["symbol"]} ' f'{("."+str(decimals)) if decimals >= 0 else (str(-decimals)+".")} {address}') - else: # NOT a genuine Uniswap V3 pool if the address test doesn't pass - log.debug(f'new Unknown pool at {address}') except ContractLogicError: - log.debug(f'new Unknown pool at {address}') + pass except ValueError as v: - if v.args[0].get('code') == -32000: - log.debug(f'new Unknown pool at {address}') + try: + code = v.args[0].get('code') + except: + raise v else: - raise + if code == -32000: + pass + else: + raise v if found is None: + log.debug(f'new Unknown pool at {address}') found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.Unknown.value, base=ADDRESS_0, quote=ADDRESS_0, fee=0, decimals=0) return found @@ -94,7 +101,7 @@ async def ensure_pool_price(pool: PoolDict): # todo other exchanges -async def get_uniswap_data(swap: EventData): +async def get_uniswap_data(swap: EventData) -> Optional[tuple[PoolDict, datetime, dec]]: try: sqrt_price = swap['args']['sqrtPriceX96'] except KeyError: @@ -102,7 +109,6 @@ async def get_uniswap_data(swap: EventData): addr = swap['address'] pool = await get_pool(addr) if pool['exchange'] != Exchange.UniswapV3.value: - # log.debug(f'Ignoring {Exchange(pool["exchange"])} pool {addr}') return None price: dec = await uniswap_price(pool, sqrt_price) timestamp = await get_block_timestamp(swap['blockHash']) diff --git a/src/dexorder/tokens.py b/src/dexorder/tokens.py index 9b2e4b0..f00e6dd 100644 --- a/src/dexorder/tokens.py +++ b/src/dexorder/tokens.py @@ -5,15 +5,19 @@ from typing import Optional from eth_abi.exceptions import InsufficientDataBytes from web3.exceptions import ContractLogicError, BadFunctionCallOutput +from dexorder import ADDRESS_0 from dexorder.addrmeta import address_metadata from dexorder.base.chain import current_chain -from dexorder.contract import ERC20 +from dexorder.contract import ERC20, ContractProxy from dexorder.database.model.token import TokenDict +from dexorder.metadata import get_metadata log = logging.getLogger(__name__) async def get_token(address) -> Optional[TokenDict]: + if address == ADDRESS_0: + raise ValueError('No token at address 0') try: return address_metadata[address] except KeyError: @@ -23,17 +27,35 @@ async def get_token(address) -> Optional[TokenDict]: async def load_token(address: str) -> Optional[TokenDict]: contract = ERC20(address) - prom = asyncio.gather(contract.name(), contract.symbol()) + name_prom = contract.name() + dec_prom = contract.decimals() try: - decimals = await contract.decimals() + symbol = await contract.symbol() + except (InsufficientDataBytes, BadFunctionCallOutput): + # this happens when the token returns bytes32 instead of a string + try: + symbol = await ContractProxy(address, 'ERC20.sb').symbol() + log.info(f'got bytes32 symbol {symbol}') + except (InsufficientDataBytes, ContractLogicError, BadFunctionCallOutput): + log.warning(f'token {address} has broken symbol()') + return None + try: + decimals = await dec_prom except (InsufficientDataBytes, ContractLogicError, BadFunctionCallOutput): log.warning(f'token {address} has no decimals()') decimals = 0 - try: - name, symbol = await prom - except OverflowError: - # this happens when the token returns bytes32 instead of a string - return None + name = await name_prom log.debug(f'new token {name} {symbol} {address}') - return TokenDict(type='Token', chain=current_chain.get().chain_id, address=address, - name=name, symbol=symbol, decimals=decimals) + chain_id = current_chain.get().chain_id + td = TokenDict(type='Token', chain=chain_id, address=address, + name=name, symbol=symbol, decimals=decimals, approved=False) + md = get_metadata(address, chain_id=chain_id) + if md is not None: + td['approved'] = True + if 'n' in md: + td['name'] = md['n'] + if 's' in md: + td['symbol'] = md['s'] + if 'd' in md: + td['decimals'] = md['d'] + return td