diff --git a/src/dexorder/bin/finaldata.py b/src/dexorder/bin/finaldata.py index 7975b47..be0271c 100644 --- a/src/dexorder/bin/finaldata.py +++ b/src/dexorder/bin/finaldata.py @@ -29,8 +29,10 @@ async def handle_backfill_uniswap_swaps(swaps: list[EventData]): asyncio.gather(*[get_block_timestamp(h) for h in hashes]).add_done_callback(lambda _:None) # fire and forget, just to build cache # now execute the swaps synchronously for swap in swaps: - pool, time, price = await get_uniswap_data(swap) - ohlcs.light_update_all(pool['address'], time, price) + data = await get_uniswap_data(swap) + if data is not None: + pool, time, price = data + ohlcs.light_update_all(pool['address'], time, price) def flush_callback(): # start = now() diff --git a/src/dexorder/blockchain/connection.py b/src/dexorder/blockchain/connection.py index ed368e4..4f56f46 100644 --- a/src/dexorder/blockchain/connection.py +++ b/src/dexorder/blockchain/connection.py @@ -10,7 +10,7 @@ from hexbytes import HexBytes from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider from web3.types import RPCEndpoint, RPCResponse -from .. import current_w3, Blockchain +from .. import current_w3, Blockchain, config from ..base.chain import current_chain from ..configuration import resolve_rpc_url from ..configuration.resolve import resolve_ws_url @@ -109,27 +109,34 @@ def _make_contract(w3_eth): log = logging.getLogger(__name__) -MAX_CONCURRENCY = 6 # todo configure +MAX_CONCURRENCY = config.concurrent_rpc_connections class RetryHTTPProvider (AsyncHTTPProvider): def __init__(self, endpoint_uri: Optional[Union[URI, str]] = None, request_kwargs: Optional[Any] = None) -> None: super().__init__(endpoint_uri, request_kwargs) self.in_flight = asyncio.Semaphore(MAX_CONCURRENCY) + self.rate_allowed = asyncio.Event() + self.rate_allowed.set() async def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse: wait = 0 while True: try: async with self.in_flight: + await self.rate_allowed.wait() return await super().make_request(method, params) except ClientResponseError as e: if e.status != 429: raise - retry_after = e.headers.get('retry-after', None) - if retry_after is not None: - wait = float(retry_after) - else: - wait += 1 + .125 * random() - log.debug('rate limiting') - await asyncio.sleep(wait) + self.rate_allowed.clear() + try: + retry_after = e.headers.get('retry-after', None) + if retry_after is not None: + wait = float(retry_after) + else: + wait += 1 + .125 * random() + log.debug('rate limiting') + await asyncio.sleep(wait) + finally: + self.rate_allowed.set() diff --git a/src/dexorder/configuration/schema.py b/src/dexorder/configuration/schema.py index c708a1a..603dd3a 100644 --- a/src/dexorder/configuration/schema.py +++ b/src/dexorder/configuration/schema.py @@ -19,6 +19,7 @@ class Config: redis_url: Optional[str] = 'redis://localhost:6379' ohlc_dir: Optional[str] = None # if empty string or None, then OHLC's are not saved to disk + concurrent_rpc_connections: int = 4 parallel_logevent_queries: bool = True polling: float = 0 # seconds between queries for a new block. 0 disables polling and uses a websocket subscription on ws_url instead backfill: int = 0 # if not 0, then runner will initialize an empty database by backfilling from the given block height diff --git a/src/dexorder/ohlc.py b/src/dexorder/ohlc.py index 717e7f8..fc094e6 100644 --- a/src/dexorder/ohlc.py +++ b/src/dexorder/ohlc.py @@ -11,6 +11,7 @@ from dexorder.base.chain import current_chain from dexorder.blockstate import BlockDict, DiffItem, current_blockstate from dexorder.blockstate.diff import DiffEntryItem from dexorder.database.model import Block +from dexorder.util.shutdown import fatal log = logging.getLogger(__name__) @@ -121,7 +122,9 @@ def update_ohlc(prev: NativeOHLC, period: timedelta, time: datetime, price: Opti """ # log.debug(f'\tupdating {prev} with {minutely(time)} {price}') cur = prev - assert time >= cur.start + if time < cur.start: + # data corruption. just shut down + fatal(f'update_ohlc({prev}, {period}, {time}, {price}) failed because time is before the start of the candle') result = [] # advance time and finalize any past OHLC's into the result array while True: diff --git a/src/dexorder/pools.py b/src/dexorder/pools.py index 54e02f7..9571d1a 100644 --- a/src/dexorder/pools.py +++ b/src/dexorder/pools.py @@ -35,11 +35,14 @@ async def load_pool(address: str) -> PoolDict: t0, t1, fee = await asyncio.gather(v3.token0(), v3.token1(), v3.fee()) if uniswapV3_pool_address(t0, t1, fee) == address: # VALIDATE don't just trust that it's a Uniswap pool token0, token1 = await asyncio.gather(get_token(t0), get_token(t1)) - decimals = token0['decimals'] - token1['decimals'] - found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.UniswapV3.value, - base=t0, quote=t1, fee=fee, decimals=decimals) - log.debug(f'new UniswapV3 pool {token0["symbol"]}\\{token1["symbol"]} ' - f'{("."+str(decimals)) if decimals >= 0 else (str(-decimals)+".")} {address}') + if token0 is None or token1 is None: + found = None + else: + decimals = token0['decimals'] - token1['decimals'] + found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.UniswapV3.value, + base=t0, quote=t1, fee=fee, decimals=decimals) + log.debug(f'new UniswapV3 pool {token0["symbol"]}\\{token1["symbol"]} ' + f'{("."+str(decimals)) if decimals >= 0 else (str(-decimals)+".")} {address}') else: # NOT a genuine Uniswap V3 pool if the address test doesn't pass log.debug(f'new Unknown pool at {address}') except ContractLogicError: @@ -99,7 +102,7 @@ async def get_uniswap_data(swap: EventData): addr = swap['address'] pool = await get_pool(addr) if pool['exchange'] != Exchange.UniswapV3.value: - log.debug(f'Ignoring {Exchange(pool["exchange"])} pool {addr}') + # log.debug(f'Ignoring {Exchange(pool["exchange"])} pool {addr}') return None price: dec = await uniswap_price(pool, sqrt_price) timestamp = await get_block_timestamp(swap['blockHash']) diff --git a/src/dexorder/tokens.py b/src/dexorder/tokens.py index a743b29..9b2e4b0 100644 --- a/src/dexorder/tokens.py +++ b/src/dexorder/tokens.py @@ -1,5 +1,6 @@ import asyncio import logging +from typing import Optional from eth_abi.exceptions import InsufficientDataBytes from web3.exceptions import ContractLogicError, BadFunctionCallOutput @@ -12,7 +13,7 @@ from dexorder.database.model.token import TokenDict log = logging.getLogger(__name__) -async def get_token(address) -> TokenDict: +async def get_token(address) -> Optional[TokenDict]: try: return address_metadata[address] except KeyError: @@ -20,7 +21,7 @@ async def get_token(address) -> TokenDict: return result -async def load_token(address: str) -> TokenDict: +async def load_token(address: str) -> Optional[TokenDict]: contract = ERC20(address) prom = asyncio.gather(contract.name(), contract.symbol()) try: @@ -28,7 +29,11 @@ async def load_token(address: str) -> TokenDict: except (InsufficientDataBytes, ContractLogicError, BadFunctionCallOutput): log.warning(f'token {address} has no decimals()') decimals = 0 - name, symbol = await prom + try: + name, symbol = await prom + except OverflowError: + # this happens when the token returns bytes32 instead of a string + return None log.debug(f'new token {name} {symbol} {address}') return TokenDict(type='Token', chain=current_chain.get().chain_id, address=address, name=name, symbol=symbol, decimals=decimals)