finaldata bugfixes
This commit is contained in:
@@ -29,8 +29,10 @@ async def handle_backfill_uniswap_swaps(swaps: list[EventData]):
|
|||||||
asyncio.gather(*[get_block_timestamp(h) for h in hashes]).add_done_callback(lambda _:None) # fire and forget, just to build cache
|
asyncio.gather(*[get_block_timestamp(h) for h in hashes]).add_done_callback(lambda _:None) # fire and forget, just to build cache
|
||||||
# now execute the swaps synchronously
|
# now execute the swaps synchronously
|
||||||
for swap in swaps:
|
for swap in swaps:
|
||||||
pool, time, price = await get_uniswap_data(swap)
|
data = await get_uniswap_data(swap)
|
||||||
ohlcs.light_update_all(pool['address'], time, price)
|
if data is not None:
|
||||||
|
pool, time, price = data
|
||||||
|
ohlcs.light_update_all(pool['address'], time, price)
|
||||||
|
|
||||||
def flush_callback():
|
def flush_callback():
|
||||||
# start = now()
|
# start = now()
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ from hexbytes import HexBytes
|
|||||||
from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider
|
from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider
|
||||||
from web3.types import RPCEndpoint, RPCResponse
|
from web3.types import RPCEndpoint, RPCResponse
|
||||||
|
|
||||||
from .. import current_w3, Blockchain
|
from .. import current_w3, Blockchain, config
|
||||||
from ..base.chain import current_chain
|
from ..base.chain import current_chain
|
||||||
from ..configuration import resolve_rpc_url
|
from ..configuration import resolve_rpc_url
|
||||||
from ..configuration.resolve import resolve_ws_url
|
from ..configuration.resolve import resolve_ws_url
|
||||||
@@ -109,27 +109,34 @@ def _make_contract(w3_eth):
|
|||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
MAX_CONCURRENCY = 6 # todo configure
|
MAX_CONCURRENCY = config.concurrent_rpc_connections
|
||||||
|
|
||||||
class RetryHTTPProvider (AsyncHTTPProvider):
|
class RetryHTTPProvider (AsyncHTTPProvider):
|
||||||
|
|
||||||
def __init__(self, endpoint_uri: Optional[Union[URI, str]] = None, request_kwargs: Optional[Any] = None) -> None:
|
def __init__(self, endpoint_uri: Optional[Union[URI, str]] = None, request_kwargs: Optional[Any] = None) -> None:
|
||||||
super().__init__(endpoint_uri, request_kwargs)
|
super().__init__(endpoint_uri, request_kwargs)
|
||||||
self.in_flight = asyncio.Semaphore(MAX_CONCURRENCY)
|
self.in_flight = asyncio.Semaphore(MAX_CONCURRENCY)
|
||||||
|
self.rate_allowed = asyncio.Event()
|
||||||
|
self.rate_allowed.set()
|
||||||
|
|
||||||
async def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse:
|
async def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse:
|
||||||
wait = 0
|
wait = 0
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
async with self.in_flight:
|
async with self.in_flight:
|
||||||
|
await self.rate_allowed.wait()
|
||||||
return await super().make_request(method, params)
|
return await super().make_request(method, params)
|
||||||
except ClientResponseError as e:
|
except ClientResponseError as e:
|
||||||
if e.status != 429:
|
if e.status != 429:
|
||||||
raise
|
raise
|
||||||
retry_after = e.headers.get('retry-after', None)
|
self.rate_allowed.clear()
|
||||||
if retry_after is not None:
|
try:
|
||||||
wait = float(retry_after)
|
retry_after = e.headers.get('retry-after', None)
|
||||||
else:
|
if retry_after is not None:
|
||||||
wait += 1 + .125 * random()
|
wait = float(retry_after)
|
||||||
log.debug('rate limiting')
|
else:
|
||||||
await asyncio.sleep(wait)
|
wait += 1 + .125 * random()
|
||||||
|
log.debug('rate limiting')
|
||||||
|
await asyncio.sleep(wait)
|
||||||
|
finally:
|
||||||
|
self.rate_allowed.set()
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ class Config:
|
|||||||
redis_url: Optional[str] = 'redis://localhost:6379'
|
redis_url: Optional[str] = 'redis://localhost:6379'
|
||||||
ohlc_dir: Optional[str] = None # if empty string or None, then OHLC's are not saved to disk
|
ohlc_dir: Optional[str] = None # if empty string or None, then OHLC's are not saved to disk
|
||||||
|
|
||||||
|
concurrent_rpc_connections: int = 4
|
||||||
parallel_logevent_queries: bool = True
|
parallel_logevent_queries: bool = True
|
||||||
polling: float = 0 # seconds between queries for a new block. 0 disables polling and uses a websocket subscription on ws_url instead
|
polling: float = 0 # seconds between queries for a new block. 0 disables polling and uses a websocket subscription on ws_url instead
|
||||||
backfill: int = 0 # if not 0, then runner will initialize an empty database by backfilling from the given block height
|
backfill: int = 0 # if not 0, then runner will initialize an empty database by backfilling from the given block height
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ from dexorder.base.chain import current_chain
|
|||||||
from dexorder.blockstate import BlockDict, DiffItem, current_blockstate
|
from dexorder.blockstate import BlockDict, DiffItem, current_blockstate
|
||||||
from dexorder.blockstate.diff import DiffEntryItem
|
from dexorder.blockstate.diff import DiffEntryItem
|
||||||
from dexorder.database.model import Block
|
from dexorder.database.model import Block
|
||||||
|
from dexorder.util.shutdown import fatal
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -121,7 +122,9 @@ def update_ohlc(prev: NativeOHLC, period: timedelta, time: datetime, price: Opti
|
|||||||
"""
|
"""
|
||||||
# log.debug(f'\tupdating {prev} with {minutely(time)} {price}')
|
# log.debug(f'\tupdating {prev} with {minutely(time)} {price}')
|
||||||
cur = prev
|
cur = prev
|
||||||
assert time >= cur.start
|
if time < cur.start:
|
||||||
|
# data corruption. just shut down
|
||||||
|
fatal(f'update_ohlc({prev}, {period}, {time}, {price}) failed because time is before the start of the candle')
|
||||||
result = []
|
result = []
|
||||||
# advance time and finalize any past OHLC's into the result array
|
# advance time and finalize any past OHLC's into the result array
|
||||||
while True:
|
while True:
|
||||||
|
|||||||
@@ -35,11 +35,14 @@ async def load_pool(address: str) -> PoolDict:
|
|||||||
t0, t1, fee = await asyncio.gather(v3.token0(), v3.token1(), v3.fee())
|
t0, t1, fee = await asyncio.gather(v3.token0(), v3.token1(), v3.fee())
|
||||||
if uniswapV3_pool_address(t0, t1, fee) == address: # VALIDATE don't just trust that it's a Uniswap pool
|
if uniswapV3_pool_address(t0, t1, fee) == address: # VALIDATE don't just trust that it's a Uniswap pool
|
||||||
token0, token1 = await asyncio.gather(get_token(t0), get_token(t1))
|
token0, token1 = await asyncio.gather(get_token(t0), get_token(t1))
|
||||||
decimals = token0['decimals'] - token1['decimals']
|
if token0 is None or token1 is None:
|
||||||
found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.UniswapV3.value,
|
found = None
|
||||||
base=t0, quote=t1, fee=fee, decimals=decimals)
|
else:
|
||||||
log.debug(f'new UniswapV3 pool {token0["symbol"]}\\{token1["symbol"]} '
|
decimals = token0['decimals'] - token1['decimals']
|
||||||
f'{("."+str(decimals)) if decimals >= 0 else (str(-decimals)+".")} {address}')
|
found = PoolDict(type='Pool', chain=chain_id, address=address, exchange=Exchange.UniswapV3.value,
|
||||||
|
base=t0, quote=t1, fee=fee, decimals=decimals)
|
||||||
|
log.debug(f'new UniswapV3 pool {token0["symbol"]}\\{token1["symbol"]} '
|
||||||
|
f'{("."+str(decimals)) if decimals >= 0 else (str(-decimals)+".")} {address}')
|
||||||
else: # NOT a genuine Uniswap V3 pool if the address test doesn't pass
|
else: # NOT a genuine Uniswap V3 pool if the address test doesn't pass
|
||||||
log.debug(f'new Unknown pool at {address}')
|
log.debug(f'new Unknown pool at {address}')
|
||||||
except ContractLogicError:
|
except ContractLogicError:
|
||||||
@@ -99,7 +102,7 @@ async def get_uniswap_data(swap: EventData):
|
|||||||
addr = swap['address']
|
addr = swap['address']
|
||||||
pool = await get_pool(addr)
|
pool = await get_pool(addr)
|
||||||
if pool['exchange'] != Exchange.UniswapV3.value:
|
if pool['exchange'] != Exchange.UniswapV3.value:
|
||||||
log.debug(f'Ignoring {Exchange(pool["exchange"])} pool {addr}')
|
# log.debug(f'Ignoring {Exchange(pool["exchange"])} pool {addr}')
|
||||||
return None
|
return None
|
||||||
price: dec = await uniswap_price(pool, sqrt_price)
|
price: dec = await uniswap_price(pool, sqrt_price)
|
||||||
timestamp = await get_block_timestamp(swap['blockHash'])
|
timestamp = await get_block_timestamp(swap['blockHash'])
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
from eth_abi.exceptions import InsufficientDataBytes
|
from eth_abi.exceptions import InsufficientDataBytes
|
||||||
from web3.exceptions import ContractLogicError, BadFunctionCallOutput
|
from web3.exceptions import ContractLogicError, BadFunctionCallOutput
|
||||||
@@ -12,7 +13,7 @@ from dexorder.database.model.token import TokenDict
|
|||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
async def get_token(address) -> TokenDict:
|
async def get_token(address) -> Optional[TokenDict]:
|
||||||
try:
|
try:
|
||||||
return address_metadata[address]
|
return address_metadata[address]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
@@ -20,7 +21,7 @@ async def get_token(address) -> TokenDict:
|
|||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
async def load_token(address: str) -> TokenDict:
|
async def load_token(address: str) -> Optional[TokenDict]:
|
||||||
contract = ERC20(address)
|
contract = ERC20(address)
|
||||||
prom = asyncio.gather(contract.name(), contract.symbol())
|
prom = asyncio.gather(contract.name(), contract.symbol())
|
||||||
try:
|
try:
|
||||||
@@ -28,7 +29,11 @@ async def load_token(address: str) -> TokenDict:
|
|||||||
except (InsufficientDataBytes, ContractLogicError, BadFunctionCallOutput):
|
except (InsufficientDataBytes, ContractLogicError, BadFunctionCallOutput):
|
||||||
log.warning(f'token {address} has no decimals()')
|
log.warning(f'token {address} has no decimals()')
|
||||||
decimals = 0
|
decimals = 0
|
||||||
name, symbol = await prom
|
try:
|
||||||
|
name, symbol = await prom
|
||||||
|
except OverflowError:
|
||||||
|
# this happens when the token returns bytes32 instead of a string
|
||||||
|
return None
|
||||||
log.debug(f'new token {name} {symbol} {address}')
|
log.debug(f'new token {name} {symbol} {address}')
|
||||||
return TokenDict(type='Token', chain=current_chain.get().chain_id, address=address,
|
return TokenDict(type='Token', chain=current_chain.get().chain_id, address=address,
|
||||||
name=name, symbol=symbol, decimals=decimals)
|
name=name, symbol=symbol, decimals=decimals)
|
||||||
|
|||||||
Reference in New Issue
Block a user