finaldata bugfixes

This commit is contained in:
Tim
2024-02-21 14:48:52 -04:00
parent ffbbf89607
commit dabcfcb2ae
6 changed files with 42 additions and 21 deletions

View File

@@ -29,8 +29,10 @@ async def handle_backfill_uniswap_swaps(swaps: list[EventData]):
asyncio.gather(*[get_block_timestamp(h) for h in hashes]).add_done_callback(lambda _:None) # fire and forget, just to build cache asyncio.gather(*[get_block_timestamp(h) for h in hashes]).add_done_callback(lambda _:None) # fire and forget, just to build cache
# now execute the swaps synchronously # now execute the swaps synchronously
for swap in swaps: for swap in swaps:
pool, time, price = await get_uniswap_data(swap) data = await get_uniswap_data(swap)
ohlcs.light_update_all(pool['address'], time, price) if data is not None:
pool, time, price = data
ohlcs.light_update_all(pool['address'], time, price)
def flush_callback(): def flush_callback():
# start = now() # start = now()

View File

@@ -10,7 +10,7 @@ from hexbytes import HexBytes
from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider
from web3.types import RPCEndpoint, RPCResponse from web3.types import RPCEndpoint, RPCResponse
from .. import current_w3, Blockchain from .. import current_w3, Blockchain, config
from ..base.chain import current_chain from ..base.chain import current_chain
from ..configuration import resolve_rpc_url from ..configuration import resolve_rpc_url
from ..configuration.resolve import resolve_ws_url from ..configuration.resolve import resolve_ws_url
@@ -109,27 +109,34 @@ def _make_contract(w3_eth):
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
MAX_CONCURRENCY = 6 # todo configure MAX_CONCURRENCY = config.concurrent_rpc_connections
class RetryHTTPProvider (AsyncHTTPProvider): class RetryHTTPProvider (AsyncHTTPProvider):
def __init__(self, endpoint_uri: Optional[Union[URI, str]] = None, request_kwargs: Optional[Any] = None) -> None: def __init__(self, endpoint_uri: Optional[Union[URI, str]] = None, request_kwargs: Optional[Any] = None) -> None:
super().__init__(endpoint_uri, request_kwargs) super().__init__(endpoint_uri, request_kwargs)
self.in_flight = asyncio.Semaphore(MAX_CONCURRENCY) self.in_flight = asyncio.Semaphore(MAX_CONCURRENCY)
self.rate_allowed = asyncio.Event()
self.rate_allowed.set()
async def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse: async def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse:
wait = 0 wait = 0
while True: while True:
try: try:
async with self.in_flight: async with self.in_flight:
await self.rate_allowed.wait()
return await super().make_request(method, params) return await super().make_request(method, params)
except ClientResponseError as e: except ClientResponseError as e:
if e.status != 429: if e.status != 429:
raise raise
retry_after = e.headers.get('retry-after', None) self.rate_allowed.clear()
if retry_after is not None: try:
wait = float(retry_after) retry_after = e.headers.get('retry-after', None)
else: if retry_after is not None:
wait += 1 + .125 * random() wait = float(retry_after)
log.debug('rate limiting') else:
await asyncio.sleep(wait) wait += 1 + .125 * random()
log.debug('rate limiting')
await asyncio.sleep(wait)
finally:
self.rate_allowed.set()

View File

@@ -19,6 +19,7 @@ class Config:
redis_url: Optional[str] = 'redis://localhost:6379' redis_url: Optional[str] = 'redis://localhost:6379'
ohlc_dir: Optional[str] = None # if empty string or None, then OHLC's are not saved to disk ohlc_dir: Optional[str] = None # if empty string or None, then OHLC's are not saved to disk
concurrent_rpc_connections: int = 4
parallel_logevent_queries: bool = True parallel_logevent_queries: bool = True
polling: float = 0 # seconds between queries for a new block. 0 disables polling and uses a websocket subscription on ws_url instead polling: float = 0 # seconds between queries for a new block. 0 disables polling and uses a websocket subscription on ws_url instead
backfill: int = 0 # if not 0, then runner will initialize an empty database by backfilling from the given block height backfill: int = 0 # if not 0, then runner will initialize an empty database by backfilling from the given block height

View File

@@ -11,6 +11,7 @@ from dexorder.base.chain import current_chain
from dexorder.blockstate import BlockDict, DiffItem, current_blockstate from dexorder.blockstate import BlockDict, DiffItem, current_blockstate
from dexorder.blockstate.diff import DiffEntryItem from dexorder.blockstate.diff import DiffEntryItem
from dexorder.database.model import Block from dexorder.database.model import Block
from dexorder.util.shutdown import fatal
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -121,7 +122,9 @@ def update_ohlc(prev: NativeOHLC, period: timedelta, time: datetime, price: Opti
""" """
# log.debug(f'\tupdating {prev} with {minutely(time)} {price}') # log.debug(f'\tupdating {prev} with {minutely(time)} {price}')
cur = prev cur = prev
assert time >= cur.start if time < cur.start:
# data corruption. just shut down
fatal(f'update_ohlc({prev}, {period}, {time}, {price}) failed because time is before the start of the candle')
result = [] result = []
# advance time and finalize any past OHLC's into the result array # advance time and finalize any past OHLC's into the result array
while True: while True:

View File

@@ -35,11 +35,14 @@ async def load_pool(address: str) -> PoolDict:
t0, t1, fee = await asyncio.gather(v3.token0(), v3.token1(), v3.fee()) t0, t1, fee = await asyncio.gather(v3.token0(), v3.token1(), v3.fee())
if uniswapV3_pool_address(t0, t1, fee) == address: # VALIDATE don't just trust that it's a Uniswap pool if uniswapV3_pool_address(t0, t1, fee) == address: # VALIDATE don't just trust that it's a Uniswap pool
token0, token1 = await asyncio.gather(get_token(t0), get_token(t1)) token0, token1 = await asyncio.gather(get_token(t0), get_token(t1))
decimals = token0['decimals'] - token1['decimals'] if token0 is None or token1 is None:
found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.UniswapV3.value, found = None
base=t0, quote=t1, fee=fee, decimals=decimals) else:
log.debug(f'new UniswapV3 pool {token0["symbol"]}\\{token1["symbol"]} ' decimals = token0['decimals'] - token1['decimals']
f'{("."+str(decimals)) if decimals >= 0 else (str(-decimals)+".")} {address}') found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.UniswapV3.value,
base=t0, quote=t1, fee=fee, decimals=decimals)
log.debug(f'new UniswapV3 pool {token0["symbol"]}\\{token1["symbol"]} '
f'{("."+str(decimals)) if decimals >= 0 else (str(-decimals)+".")} {address}')
else: # NOT a genuine Uniswap V3 pool if the address test doesn't pass else: # NOT a genuine Uniswap V3 pool if the address test doesn't pass
log.debug(f'new Unknown pool at {address}') log.debug(f'new Unknown pool at {address}')
except ContractLogicError: except ContractLogicError:
@@ -99,7 +102,7 @@ async def get_uniswap_data(swap: EventData):
addr = swap['address'] addr = swap['address']
pool = await get_pool(addr) pool = await get_pool(addr)
if pool['exchange'] != Exchange.UniswapV3.value: if pool['exchange'] != Exchange.UniswapV3.value:
log.debug(f'Ignoring {Exchange(pool["exchange"])} pool {addr}') # log.debug(f'Ignoring {Exchange(pool["exchange"])} pool {addr}')
return None return None
price: dec = await uniswap_price(pool, sqrt_price) price: dec = await uniswap_price(pool, sqrt_price)
timestamp = await get_block_timestamp(swap['blockHash']) timestamp = await get_block_timestamp(swap['blockHash'])

View File

@@ -1,5 +1,6 @@
import asyncio import asyncio
import logging import logging
from typing import Optional
from eth_abi.exceptions import InsufficientDataBytes from eth_abi.exceptions import InsufficientDataBytes
from web3.exceptions import ContractLogicError, BadFunctionCallOutput from web3.exceptions import ContractLogicError, BadFunctionCallOutput
@@ -12,7 +13,7 @@ from dexorder.database.model.token import TokenDict
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
async def get_token(address) -> TokenDict: async def get_token(address) -> Optional[TokenDict]:
try: try:
return address_metadata[address] return address_metadata[address]
except KeyError: except KeyError:
@@ -20,7 +21,7 @@ async def get_token(address) -> TokenDict:
return result return result
async def load_token(address: str) -> TokenDict: async def load_token(address: str) -> Optional[TokenDict]:
contract = ERC20(address) contract = ERC20(address)
prom = asyncio.gather(contract.name(), contract.symbol()) prom = asyncio.gather(contract.name(), contract.symbol())
try: try:
@@ -28,7 +29,11 @@ async def load_token(address: str) -> TokenDict:
except (InsufficientDataBytes, ContractLogicError, BadFunctionCallOutput): except (InsufficientDataBytes, ContractLogicError, BadFunctionCallOutput):
log.warning(f'token {address} has no decimals()') log.warning(f'token {address} has no decimals()')
decimals = 0 decimals = 0
name, symbol = await prom try:
name, symbol = await prom
except OverflowError:
# this happens when the token returns bytes32 instead of a string
return None
log.debug(f'new token {name} {symbol} {address}') log.debug(f'new token {name} {symbol} {address}')
return TokenDict(type='Token', chain=current_chain.get().chain_id, address=address, return TokenDict(type='Token', chain=current_chain.get().chain_id, address=address,
name=name, symbol=symbol, decimals=decimals) name=name, symbol=symbol, decimals=decimals)