archive_url and multiple rpc_url support

This commit is contained in:
tim
2025-02-10 20:28:24 -04:00
parent b22c044028
commit 416cff80b0
6 changed files with 208 additions and 66 deletions

View File

@@ -1,53 +1,75 @@
import asyncio import asyncio
import itertools
import logging import logging
from random import random from random import random
from typing import Any, Optional, Union from typing import Any, Optional, Union, Callable
# noinspection PyPackageRequirements # noinspection PyPackageRequirements
from aiohttp import ClientResponseError, ClientSession, ClientTimeout, TCPConnector from aiohttp import ClientResponseError, ClientSession, ClientTimeout, TCPConnector
from eth_typing import URI from eth_typing import URI
from hexbytes import HexBytes from hexbytes import HexBytes
from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider from web3 import WebsocketProviderV2, AsyncWeb3, AsyncHTTPProvider
from web3.exceptions import Web3Exception
from web3.middleware.signing import async_construct_sign_and_send_raw_middleware from web3.middleware.signing import async_construct_sign_and_send_raw_middleware
from web3.types import RPCEndpoint, RPCResponse from web3.types import RPCEndpoint, RPCResponse
from .. import current_w3, Blockchain, config, Account, NARG from .. import current_w3, Blockchain, config, Account, NARG
from ..base.chain import current_chain from ..base.chain import current_chain
from ..configuration import resolve_rpc_url
from ..configuration.resolve import resolve_ws_url
from ..contract import get_contract_data from ..contract import get_contract_data
async def connect(rpc_url=None, account=NARG, autosign=True, name='default'): log = logging.getLogger(__name__)
async def connect(rpc_url: Union[str,list[str]]=None, account=NARG, autosign=True, name='default', *, archive_url: Union[str,list[str]] = None):
""" """
connects to the rpc_url and configures context vars connects to the rpc_url and configures context vars
""" """
w3 = await create_w3(rpc_url, account, autosign, name) w3 = await create_w3(rpc_url, account, autosign, name, archive_url=archive_url)
current_w3.set(w3) current_w3.set(w3)
chain = Blockchain.get(await w3.eth.chain_id) chain = Blockchain.get(await w3.eth.chain_id)
current_chain.set(chain) current_chain.set(chain)
return w3 return w3
async def create_w3(rpc_url=None, account=NARG, autosign=True, name='default'): async def create_w3(rpc_url: Union[str,list[str]]=None, account=NARG, autosign=True, name='default', *, archive_url: Union[str,list[str]] = None) -> AsyncWeb3:
# todo create a proxy w3 that rotates among rpc urls if rpc_url is None:
# self.w3s = tuple(await create_w3(url) for url in rpc_url_or_tag) rpc_urls = [config.rpc_url.strip()]
# chain_id = self.w3s[0].eth.chain_id if isinstance(rpc_url, str):
# assert all(w3.eth.chain_id == chain_id for w3 in self.w3s) # all rpc urls must be the same blockchain rpc_urls = [resolve_rpc_url(s) for url in rpc_url.split(',') if (s:=url.strip()) != '']
# self.w3iter = itertools.cycle(self.w3s) elif isinstance(rpc_url, list):
url = resolve_rpc_url(rpc_url) rpc_urls = [resolve_rpc_url(s) for url in rpc_url if (s:=url.strip()) != '']
else:
raise ValueError("rpc_url must be a string or list of strings")
if archive_url is None:
archive_urls = config.archive_url.strip()
if archive_url is None:
archive_urls = []
elif isinstance(archive_url, str):
archive_urls = [resolve_rpc_url(s) for url in archive_url.split(',') if (s:=url.strip()) != '']
elif isinstance(archive_url, list):
archive_urls = [resolve_rpc_url(s) for url in archive_url if (s:=url.strip()) != '']
else:
raise ValueError("archive_url must be a string or list of strings")
if not rpc_urls:
raise ValueError("No rpc_url configured")
w3_instances = []
archive_instances = []
for (url, archive) in itertools.chain(((url, False) for url in rpc_urls), ((url, True) for url in archive_urls)):
connector = TCPConnector(limit=config.concurrent_rpc_connections) connector = TCPConnector(limit=config.concurrent_rpc_connections)
session = ClientSession(connector=connector, timeout=ClientTimeout(config.rpc_timeout)) session = ClientSession(connector=connector, timeout=ClientTimeout(config.rpc_timeout))
http_provider = RetryHTTPProvider(url) http_provider = RetryHTTPProvider(url)
await http_provider.cache_async_session(session) await http_provider.cache_async_session(session)
w3 = AsyncWeb3(http_provider) w3 = AsyncWeb3(http_provider)
# w3.middleware_onion.inject(geth_poa_middleware, layer=0) # todo is this line needed? w3.middleware_onion.add(archive_intercept_middleware, 'block_number_intercept_middleware')
# w3.middleware_onion.add(simple_cache_middleware)
# log.debug(f'middleware {list(w3.middleware_onion.middlewares)}')
w3.middleware_onion.remove('attrdict') w3.middleware_onion.remove('attrdict')
w3.middleware_onion.add(clean_input_async, 'clean_input') w3.middleware_onion.add(clean_input_async, 'clean_input')
w3.eth.Contract = _make_contract(w3.eth) w3.eth.Contract = _make_contract(w3.eth)
has_account = False # Highest block number that has reported a -32000 error indicating a lack of history that far back
w3.archive_fault_height = -1
if autosign: if autosign:
if account is NARG: if account is NARG:
account = Account.get() account = Account.get()
@@ -55,11 +77,18 @@ async def create_w3(rpc_url=None, account=NARG, autosign=True, name='default'):
# noinspection PyTypeChecker # noinspection PyTypeChecker
w3.middleware_onion.add(await async_construct_sign_and_send_raw_middleware(account)) w3.middleware_onion.add(await async_construct_sign_and_send_raw_middleware(account))
w3.eth.default_account = account.address w3.eth.default_account = account.address
has_account = True if archive:
log.info(f'{name} w3 configured to autosign as {account.address}') archive_instances.append(w3)
if not has_account: else:
log.info(f'No account set for {name} w3') w3_instances.append(w3)
return w3
# Ensure all instances share the same chain ID
chain_ids = await asyncio.gather(*(w3.eth.chain_id for w3 in itertools.chain(w3_instances, archive_instances)))
if len(set(chain_ids)) != 1:
raise RuntimeError("All RPC URLs must belong to the same blockchain")
# noinspection PyTypeChecker
return RoundRobinWebProxy(w3_instances, archive_instances) if len(w3_instances) > 1 or archive_instances else w3_instances[0]
async def create_w3_ws(ws_url=None) -> AsyncWeb3: async def create_w3_ws(ws_url=None) -> AsyncWeb3:
@@ -80,6 +109,30 @@ async def create_w3_ws(ws_url=None) -> AsyncWeb3:
return w3 return w3
def resolve_rpc_url(rpc_url=None):
if rpc_url is None:
rpc_url = config.rpc_url
if rpc_url == 'test':
return 'http://localhost:8545'
try:
return config.rpc_urls[rpc_url] # look up aliases
except KeyError:
pass
return rpc_url
def resolve_ws_url(ws_url=None):
if ws_url is None:
ws_url = config.ws_url
if ws_url == 'test':
return 'ws://localhost:8545'
try:
return config.rpc_urls[ws_url] # look up aliases
except KeyError:
pass
return ws_url
def _clean(obj): def _clean(obj):
if type(obj) is HexBytes: if type(obj) is HexBytes:
return bytes(obj) return bytes(obj)
@@ -123,7 +176,118 @@ def _make_contract(w3_eth):
return f return f
log = logging.getLogger(__name__) #
# ARCHIVE NODE MANAGEMENT
#
# Regular RPC nodes do not necessarily have the full state history available. The code below tracks the block heights
# of historical data lookup failures on rpc nodes and automatically retries failed history requests on an archive_rpc
# node. Archive RPC nodes are otherwise not used unless they are also listed in the rpc_url config.
# Define methods that may carry a `block_identifier` parameter,
# along with the required number of arguments to include it.
ARCHIVE_METHODS = {
# Examples:
"eth_getBalance": 2, # e.g., get_balance(address, block_identifier)
"eth_call": 2, # e.g., contract.call(params, block_identifier)
"eth_getStorageAt": 3, # e.g., get_storage_at(address, position, block_identifier)
# Add more methods as needed
}
ARCHIVE_ERRORS = {
-32000,
}
async def archive_intercept_middleware(make_request, w3):
"""
Middleware to intercept any call with `block_number` and manage marking archive_fault_height
"""
async def middleware(method, params):
# Only intercept relevant methods
expected_args = ARCHIVE_METHODS.get(method,-1)
is_archive_method = len(params) == expected_args
if is_archive_method:
block_identifier = params[-1]
block_identifier = int(block_identifier, 16) if type(block_identifier) is str else int(params[-1]) # we don't expect hex strings
if block_identifier <= w3.archive_fault_height:
# this block is at least as old as another block that already failed to fetch history from this RPC
raise ArchiveException(method, block_identifier)
resp = await make_request(method, params)
if is_archive_method and 'error' in resp and resp['error']['code'] in ARCHIVE_ERRORS:
# noinspection PyUnboundLocalVariable
w3.archive_fault_height = max(w3.archive_fault_height, block_identifier)
raise ArchiveException(method, block_identifier)
resp = await make_request(method, params)
return resp
return middleware
class ArchiveException (Exception):
def __init__(self, method, block_number):
super().__init__(f"Archive fault for method {method} at block {block_number}", block_number)
self.method = method
self.block_number = block_number
class RoundRobinWebProxy:
def __init__(self, w3_instances, archive_instances):
if not w3_instances:
raise ValueError("At least one w3 instance is required")
self._w3_instances = w3_instances
self._archive_instances = archive_instances
self._index = 0
self._archive_index = 0
for w3 in self._w3_instances:
w3.manager.coro_request = self.make_coro_request_function(w3)
def __getattr__(self, name):
# proxy in a round-robin fashion
return getattr(self._current(), name)
def _current(self):
if self._index >= len(self._w3_instances):
self._index = 0
current_instance = self._w3_instances[self._index]
self._index += 1
return current_instance
def _current_archive(self):
if self._archive_index >= len(self._archive_instances):
self._archive_index = 0
current_instance = self._archive_instances[self._archive_index]
self._archive_index += 1
return current_instance
def make_coro_request_function(rrwp, w3):
# This replaces w3.manager.coro_request with our own version that catches ArchiveExceptions and retries them
# on an archive w3
### NOTE!!! ###
# we use `self` to mean the RequestManager so we can copy that code directly over here.
# instead we rename the RoundRobinWebProxy rrwp and name the w3 instance too
self = w3.manager
async def RequestManager__coro_request(
method: Union[RPCEndpoint, Callable[..., RPCEndpoint]],
params: Any,
error_formatters: Optional[Callable[..., Any]] = None,
null_result_formatters: Optional[Callable[..., Any]] = None,
) -> Any:
"""
Coroutine for making a request using the provider
"""
try:
response = await self._coro_make_request(method, params)
return self.formatted_response(
response, params, error_formatters, null_result_formatters
)
except ArchiveException as e:
w3.archive_fault_height = max(w3.archive_fault_height, e.block_number)
return await rrwp._current_archive().manager.coro_request(method, params, error_formatters, null_result_formatters)
return RequestManager__coro_request
class RetryHTTPProvider (AsyncHTTPProvider): class RetryHTTPProvider (AsyncHTTPProvider):
@@ -141,7 +305,7 @@ class RetryHTTPProvider (AsyncHTTPProvider):
try: try:
async with self.in_flight: async with self.in_flight:
await self.rate_allowed.wait() await self.rate_allowed.wait()
# log.debug(f'Requesting RPC call {method}') log.debug(f'Requesting RPC call {method}')
return await super().make_request(method, params) return await super().make_request(method, params)
except ClientResponseError as e: except ClientResponseError as e:
if e.status != 429: if e.status != 429:

View File

@@ -1,3 +1,2 @@
from .standard_accounts import test_accounts from .standard_accounts import test_accounts
from .load import config, parse_args from .load import config, parse_args
from .resolve import resolve_rpc_url

View File

@@ -69,7 +69,7 @@ class ConfigDict (Config, DictConfig): # give type hints from Config plus method
pass pass
# Special command-line argument handling to get a config file. The -c/--config flag MUST BE FIRST. # Special command-line argument handling to get a config file. The -c/--config flag MUST BE FIRST.
if sys.argv[1] == '-c' or sys.argv[1] == '--config': if len(sys.argv) > 1 and (sys.argv[1] == '-c' or sys.argv[1] == '--config'):
if len(sys.argv) < 3: if len(sys.argv) < 3:
raise ConfigException('Missing config file argument') raise ConfigException('Missing config file argument')
else: else:

View File

@@ -1,25 +1,3 @@
from .load import config from .load import config
def resolve_rpc_url(rpc_url=None):
if rpc_url is None:
rpc_url = config.rpc_url
if rpc_url == 'test':
return 'http://localhost:8545'
try:
return config.rpc_urls[rpc_url] # look up aliases
except KeyError:
pass
return rpc_url
def resolve_ws_url(ws_url=None):
if ws_url is None:
ws_url = config.ws_url
if ws_url == 'test':
return 'ws://localhost:8545'
try:
return config.rpc_urls[ws_url] # look up aliases
except KeyError:
pass
return ws_url

View File

@@ -11,14 +11,15 @@ from typing import Optional
class Config: class Config:
confirms: Optional[int] = None # number of blocks before data is considered finalized. if None then the chain's default setting is used confirms: Optional[int] = None # number of blocks before data is considered finalized. if None then the chain's default setting is used
batch_size: Optional[int] = None # max number of blocks to query in a single backfill rpc request batch_size: Optional[int] = None # max number of blocks to query in a single backfill rpc request
rpc_url: str = 'http://localhost:8545' rpc_url: str = 'http://localhost:8545' # may be a comma-separated list. may include names of entries in rpc_urls.
archive_url: str = '' # these rpc URL's are not used unless a query uses an old block number that prior to what the currently-assigned rpc_url can provide
ws_url: Optional[str] = 'ws://localhost:8545' ws_url: Optional[str] = 'ws://localhost:8545'
rpc_urls: Optional[dict[str,str]] = field(default_factory=dict) rpc_urls: Optional[dict[str,str]] = field(default_factory=dict)
db_url: Optional[str] = 'postgresql://dexorder:redroxed@localhost/dexorder' db_url: Optional[str] = 'postgresql://dexorder:redroxed@localhost/dexorder'
dump_sql: bool = False dump_sql: bool = False
redis_url: Optional[str] = 'redis://localhost:6379' redis_url: Optional[str] = 'redis://localhost:6379'
metrics_port: int = 9090 metrics_port: Optional[int] = None
cache_blocks_in_db: bool = False cache_blocks_in_db: bool = False
metadata: Optional[str] = None metadata: Optional[str] = None

View File

@@ -7,7 +7,7 @@ from web3.exceptions import BadFunctionCallOutput
from dexorder import ADDRESS_0, db, NATIVE_TOKEN, dec, current_w3 from dexorder import ADDRESS_0, db, NATIVE_TOKEN, dec, current_w3
from dexorder.addrmeta import address_metadata from dexorder.addrmeta import address_metadata
from dexorder.base.chain import current_chain from dexorder.base.chain import current_chain
from dexorder.blockstate.fork import current_fork from dexorder.blocks import current_block
from dexorder.contract import ERC20, ContractProxy, CONTRACT_ERRORS from dexorder.contract import ERC20, ContractProxy, CONTRACT_ERRORS
from dexorder.database.model import Token from dexorder.database.model import Token
from dexorder.database.model.token import OldTokenDict from dexorder.database.model.token import OldTokenDict
@@ -36,7 +36,7 @@ async def get_erc20_balance(addr, token_addr, *, adjust_decimals=True):
# noinspection PyShadowingNames # noinspection PyShadowingNames
async def get_native_balance(addr, *, adjust_decimals=True) -> dec: async def get_native_balance(addr, *, adjust_decimals=True) -> dec:
try: try:
block_id = current_fork.get().height block_id = current_block.get().height
except LookupError: except LookupError:
block_id = 'latest' block_id = 'latest'
value = dec(await current_w3.get().eth.get_balance(addr, block_identifier=block_id)) value = dec(await current_w3.get().eth.get_balance(addr, block_identifier=block_id))