refactor: Several testing SDK improvements such as:
fix db reset, use latest wheel, print trace in case of failure and add logic to pull stateless contracts
This commit is contained in:
@@ -11,6 +11,7 @@ import eth_abi
|
||||
from eth_typing import HexStr
|
||||
from hexbytes import HexBytes
|
||||
from protosim_py import SimulationEngine, AccountInfo
|
||||
import requests
|
||||
from web3 import Web3
|
||||
|
||||
from .constants import EXTERNAL_ACCOUNT, MAX_BALANCE, ASSETS_FOLDER
|
||||
@@ -299,3 +300,42 @@ def maybe_coerce_error(
|
||||
repr(pool_state),
|
||||
)
|
||||
return err
|
||||
|
||||
|
||||
def exec_rpc_method(url, method, params, timeout=240) -> dict:
|
||||
payload = {"jsonrpc": "2.0", "method": method, "params": params, "id": 1}
|
||||
headers = {"Content-Type": "application/json"}
|
||||
|
||||
r = requests.post(url, data=json.dumps(payload), headers=headers, timeout=timeout)
|
||||
|
||||
if r.status_code >= 400:
|
||||
raise RuntimeError(
|
||||
"RPC failed: status_code not ok. (method {}: {})".format(
|
||||
method, r.status_code
|
||||
)
|
||||
)
|
||||
data = r.json()
|
||||
|
||||
if "result" in data:
|
||||
return data["result"]
|
||||
elif "error" in data:
|
||||
raise RuntimeError(
|
||||
"RPC failed with Error {} - {}".format(data["error"], method)
|
||||
)
|
||||
|
||||
|
||||
def get_code_for_address(address: str, connection_string: str = None):
|
||||
if connection_string is None:
|
||||
connection_string = os.getenv("RPC_URL")
|
||||
if connection_string is None:
|
||||
raise EnvironmentError("RPC_URL environment variable is not set")
|
||||
|
||||
method = "eth_getCode"
|
||||
params = [address, "latest"]
|
||||
|
||||
try:
|
||||
code = exec_rpc_method(connection_string, method, params)
|
||||
return bytes.fromhex(code[2:])
|
||||
except RuntimeError as e:
|
||||
print(f"Error fetching code for address {address}: {e}")
|
||||
return None
|
||||
Reference in New Issue
Block a user