feat: Remove python suite (deprecated) (#284)

Update readme

#time 10m
This commit is contained in:
dianacarvalho1
2025-09-26 16:47:21 +01:00
committed by GitHub
parent e00063053d
commit 9a7e6a1cf7
15 changed files with 23 additions and 1165 deletions

View File

@@ -1,8 +1,29 @@
# Protocol Testing
Rust-based integration testing framework for Tycho protocol implementations.
Rust-based integration testing framework for Tycho protocol implementations. See our full
docs [here](https://docs.propellerheads.xyz/tycho/for-dexs/protocol-integration/3.-testing).
## How to Run
## How to Run Locally
```bash
# Setup Environment Variables
export RPC_URL=..
export SUBSTREAMS_API_TOKEN=..
export RUST_LOG=protocol_testing=info,tycho_client=error
# Build Substreams wasm for BalancerV2
cd substreams
cargo build --release --package ethereum-balancer-v2 --target wasm32-unknown-unknown
cd ../protocol-testing
# Run Postgres DB using Docker compose
docker compose -f ./docker-compose.yaml up -d db
# Run test
cargo run -- --package ethereum-balancer-v2
```
## How to Run with Docker
```bash
# Build the images, from the project root dir
@@ -20,72 +41,3 @@ docker compose up -d && docker compose logs test-runner --follow
# Clean up
docker compose down
```
## Test Output Formatting
The test runner outputs results similar to:
```
Running 2 tests ...
--------------------------------
TEST 1: balancer_weighted_pool_test
✅ Protocol component validation passed.
✅ Token balance validation passed.
Amount out for 0x5c6ee304399dbdb9c8ef030ab642b10820db8f56000200000000000000000014: calculating for tokens "BAL"/"WETH"
Spot price "BAL"/"WETH": 0.123456
✅ Simulation validation passed.
✅ balancer_weighted_pool_test passed.
--------------------------------
Tests finished!
RESULTS: 2/2 passed.
```
## Module-specific Logging
```bash
# Enable debug logs for specific modules
export RUST_LOG=protocol_testing=debug,tycho_client=info
# Disable logs for noisy modules
export RUST_LOG=info,hyper=warn,reqwest=warn
```
## Running with Different Log Levels
```bash
# Standard test run with progress output
RUST_LOG=info cargo run -- --package uniswap-v2
# Detailed debug output
RUST_LOG=debug cargo run -- --package uniswap-v2
# Minimal output (errors only)
RUST_LOG=error cargo run -- --package uniswap-v2
```
## Test Configuration
Tests are configured via YAML files located in the substreams package directory:
- Test configuration: `../substreams/<package>/integration_test.tycho.yaml`
- Substreams configuration: `../substreams/<package>/substreams.yaml`
## What the Tests Do
1. **Component Validation**: Verifies that all expected protocol components are present in Tycho after indexing
2. **State Validation**: Compares indexed component states against expected values
3. **Balance Verification**: Validates token balances by querying the blockchain directly (can be skipped)
4. **Simulation Testing**: Runs Tycho simulation engine to verify protocol functionality
## Troubleshooting
- **Database Connection Issues**: Ensure PostgreSQL is running via `docker-compose up -d`
- **RPC Errors**: Verify `RPC_URL` is set and accessible
- **Missing Substreams**: Check that the package directory exists in `../substreams/<package>/`
- **Build Failures**: Ensure all dependencies are installed and environment variables are set

View File

@@ -1,2 +0,0 @@
export RPC_URL=https://mainnet.infura.io/v3/your-infura-key
export SUBSTREAMS_API_TOKEN="changeme"

View File

@@ -1,6 +0,0 @@
# Substreams Testing
This package provides a comprehensive testing suite for Substreams modules. The testing suite is designed to facilitate
end-to-end testing, ensuring that your Substreams modules function as expected.
For more information on Substreams, please refer to the [Testing documentation](https://docs.propellerheads.xyz/tycho/for-dexs/protocol-integration-sdk/indexing/general-integration-steps/4.-testing)

View File

View File

@@ -1,19 +0,0 @@
version: "3.1"
services:
db:
build:
context: .
dockerfile: postgres.Dockerfile
restart: "always"
environment:
POSTGRES_PASSWORD: mypassword
POSTGRES_DATABASE: tycho_indexer_0
POSTGRES_USERNAME: postgres
POSTGRESQL_SHARED_PRELOAD_LIBRARIES: pg_cron
ports:
- "5431:5432"
shm_size: "1gb"
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
postgres_data:

View File

@@ -1,25 +0,0 @@
# This Dockerfile creates a custom postgres image used for CI and local deployment.
# This is required because we use some postgres extensions that aren't in the generic
# Postgres image such as pg_partman or pg_cron.
# As an image with pg_partman already exist, we start from this one and add pg_cron
# and possibly other extensions on top of that.
FROM ghcr.io/dbsystel/postgresql-partman:15-5
ARG PGCRON_VERSION="1.6.2"
USER root
RUN apk update && apk add --no-cache wget build-base clang19 llvm19
RUN cd /tmp \
&& wget "https://github.com/citusdata/pg_cron/archive/refs/tags/v${PGCRON_VERSION}.tar.gz" \
&& tar zxf v${PGCRON_VERSION}.tar.gz \
&& cd pg_cron-${PGCRON_VERSION} \
&& make \
&& make install \
&& cd .. && rm -r pg_cron-${PGCRON_VERSION} v${PGCRON_VERSION}.tar.gz
# Add configuration to postgresql.conf template
# Start with postgres database, then switch to tycho_indexer_0 after it's created
RUN echo "shared_preload_libraries = 'pg_partman_bgw,pg_cron'" >> /usr/local/share/postgresql/postgresql.conf.sample \
&& echo "cron.database_name = 'tycho_indexer_0'" >> /usr/local/share/postgresql/postgresql.conf.sample
# Stay as root user for PostgreSQL to work properly
# USER 1001

View File

@@ -1,6 +0,0 @@
psycopg2==2.9.9
PyYAML==6.0.1
Requests==2.32.2
web3==5.31.3
git+https://github.com/propeller-heads/tycho-indexer.git@0.74.0#subdirectory=tycho-client-py
git+https://github.com/propeller-heads/tycho-simulation.git@0.118.0#subdirectory=tycho_simulation_py

View File

@@ -1,45 +0,0 @@
#!/bin/bash
# To run: ./setup_env.sh
set -e
command_exists() {
command -v "$1" >/dev/null 2>&1
}
# Check each dependency is installed
deps=("git" "rustc" "gcc" "openssl" "conda" "pip" "pg_config")
names=("git" "rust" "gcc" "openssl" "conda" "pip" "libpq")
for i in "${!deps[@]}"; do
if ! command_exists "${deps[$i]}"; then
echo "Error: '${names[$i]}' is not installed."
exit 1
fi
done
echo "All dependencies are installed. Proceeding with setup..."
# Variables
ENV_NAME="tycho-protocol-sdk-testing"
PYTHON_VERSION="3.9"
# Get the directory where this script is located
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
ROOT_DIR="$( cd "$SCRIPT_DIR/.." && pwd )" # Assuming the script is in a subdirectory of the root
REQUIREMENTS_FILE="$ROOT_DIR/testing/requirements.txt"
# Create conda environment
echo "Creating conda environment ${ENV_NAME} with Python ${PYTHON_VERSION}..."
conda create --name $ENV_NAME python=$PYTHON_VERSION -y
# Activate the environment
echo "Activating the environment..."
eval "$(conda shell.bash hook)"
conda activate $ENV_NAME
# Install the requirements
echo "Installing the requirements from ${REQUIREMENTS_FILE}..."
pip install -r $REQUIREMENTS_FILE --index-url https://pypi.org/simple
conda activate $ENV_NAME
echo "----------------------------------------"
echo "SETUP COMPLETE."
echo "Run 'conda activate $ENV_NAME' to activate the environment."

View File

@@ -1,65 +0,0 @@
import os
import subprocess
from typing import Optional
class AdapterContractBuilder:
def __init__(self, src_path: str):
self.src_path = src_path
def find_contract(self, adapter_contract: str):
"""
Finds the contract file in the provided source path.
:param adapter_contract: The contract name to be found.
:return: The path to the contract file.
"""
contract_path = os.path.join(
self.src_path,
"out",
f"{adapter_contract}.sol",
f"{adapter_contract}.evm.runtime",
)
if not os.path.exists(contract_path):
raise FileNotFoundError(f"Contract {adapter_contract} not found.")
return contract_path
def build_target(
self, adapter_contract: str, signature: Optional[str], args: Optional[str]
) -> str:
"""
Runs the buildRuntime Bash script in a subprocess with the provided arguments.
:param src_path: Path to the script to be executed.
:param adapter_contract: The contract name to be passed to the script.
:param signature: The constructor signature to be passed to the script.
:param args: The constructor arguments to be passed to the script.
:return: The path to the contract file.
"""
script_path = "scripts/buildRuntime.sh"
cmd = [script_path, "-c", adapter_contract]
if signature:
cmd.extend(["-s", signature, "-a", args])
try:
# Running the bash script with the provided arguments
result = subprocess.run(
cmd,
cwd=self.src_path,
capture_output=True,
text=True,
check=True,
)
# Print standard output and error for debugging
print("Output:\n", result.stdout)
if result.stderr:
print("Errors:\n", result.stderr)
return self.find_contract(adapter_contract)
except subprocess.CalledProcessError as e:
print(f"An error occurred: {e}")
print("Error Output:\n", e.stderr)

View File

@@ -1,29 +0,0 @@
import argparse
from runner import TestRunner
def main() -> None:
parser = argparse.ArgumentParser(
description="Run indexer within a specified range of blocks"
)
parser.add_argument("--package", type=str, help="Name of the package to test.")
parser.add_argument("--tycho-logs", action="store_true", help="Enable Tycho logs.")
parser.add_argument(
"--db-url",
default="postgres://postgres:mypassword@localhost:5431/tycho_indexer_0",
type=str,
help="Postgres database URL for the Tycho indexer. Default: postgres://postgres:mypassword@localhost:5431/tycho_indexer_0",
)
parser.add_argument(
"--vm-traces", action="store_true", help="Enable tracing during vm simulations."
)
args = parser.parse_args()
test_runner = TestRunner(
args.package, args.tycho_logs, db_url=args.db_url, vm_traces=args.vm_traces
)
test_runner.run_tests()
if __name__ == "__main__":
main()

View File

@@ -1,61 +0,0 @@
import os
from web3 import Web3
native_aliases = [
"0x0000000000000000000000000000000000000000",
"0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee",
]
erc20_abi = [
{
"constant": True,
"inputs": [{"name": "_owner", "type": "address"}],
"name": "balanceOf",
"outputs": [{"name": "balance", "type": "uint256"}],
"type": "function",
}
]
def get_token_balance(token_address, wallet_address, block_number):
rpc_url = os.getenv("RPC_URL")
if rpc_url is None:
raise EnvironmentError("RPC_URL environment variable not set")
web3 = Web3(Web3.HTTPProvider(rpc_url))
if not web3.isConnected():
raise ConnectionError("Failed to connect to the Ethereum node")
# Check if the token_address is a native token alias
if token_address.lower() in native_aliases:
balance = web3.eth.get_balance(
Web3.toChecksumAddress(wallet_address), block_identifier=block_number
)
else:
contract = web3.eth.contract(
address=Web3.toChecksumAddress(token_address), abi=erc20_abi
)
balance = contract.functions.balanceOf(
Web3.toChecksumAddress(wallet_address)
).call(block_identifier=block_number)
return balance
def get_block_header(block_number):
rpc_url = os.getenv("RPC_URL")
if rpc_url is None:
raise EnvironmentError("RPC_URL environment variable not set")
web3 = Web3(Web3.HTTPProvider(rpc_url))
if not web3.isConnected():
raise ConnectionError("Failed to connect to the Ethereum node")
block = web3.eth.get_block(block_number)
return block

View File

@@ -1,128 +0,0 @@
import difflib
from hexbytes import HexBytes
from pydantic import BaseModel, Field, validator
from typing import List, Dict, Optional
class ProtocolComponentExpectation(BaseModel):
"""Represents a ProtocolComponent with its main attributes."""
id: str = Field(..., description="Identifier of the protocol component")
tokens: List[HexBytes] = Field(
...,
description="List of token addresses associated with the protocol component",
)
static_attributes: Optional[Dict[str, HexBytes]] = Field(
default_factory=dict, description="Static attributes of the protocol component"
)
creation_tx: HexBytes = Field(
..., description="Hash of the transaction that created the protocol component"
)
@validator("id", pre=True, always=True)
def lower_id(cls, v):
return v.lower()
@validator("tokens", pre=True, always=True)
def convert_tokens_to_hexbytes(cls, v):
return sorted(HexBytes(t.lower()) for t in v)
@validator("static_attributes", pre=True, always=True)
def convert_static_attributes_to_hexbytes(cls, v):
if v:
return {
k: v[k] if isinstance(v[k], HexBytes) else HexBytes(v[k].lower())
for k in v
}
return {}
@validator("creation_tx", pre=True, always=True)
def convert_creation_tx_to_hexbytes(cls, v):
return HexBytes(v.lower())
def compare(
self, other: "ProtocolComponentExpectation", colorize_output: bool = True
) -> Optional[str]:
"""Compares the current instance with another ProtocolComponent instance and returns a message with the
differences or None if there are no differences."""
def colorize_diff(diff):
colored_diff = []
for line in diff:
if line.startswith("-"):
colored_diff.append(f"\033[91m{line}\033[0m") # Red
elif line.startswith("+"):
colored_diff.append(f"\033[92m{line}\033[0m") # Green
elif line.startswith("?"):
colored_diff.append(f"\033[93m{line}\033[0m") # Yellow
else:
colored_diff.append(line)
return "\n".join(colored_diff)
differences = []
for field_name, field_value in self.__dict__.items():
other_value = getattr(other, field_name, None)
if field_value != other_value:
diff = list(difflib.ndiff([str(field_value)], [str(other_value)]))
highlighted_diff = (
colorize_diff(diff) if colorize_output else "\n".join(diff)
)
differences.append(
f"Field '{field_name}' mismatch for {self.id}:\n{highlighted_diff}"
)
if not differences:
return None
return "\n".join(differences)
class ProtocolComponentWithTestConfig(ProtocolComponentExpectation):
"""Represents a ProtocolComponent with its main attributes and test configuration."""
skip_simulation: Optional[bool] = Field(
False,
description="Flag indicating whether to skip simulation for this component",
)
class IntegrationTest(BaseModel):
"""Configuration for an individual test."""
name: str = Field(..., description="Name of the test")
start_block: int = Field(..., description="Starting block number for the test")
stop_block: int = Field(..., description="Stopping block number for the test")
initialized_accounts: Optional[List[str]] = Field(
None, description="List of initialized account addresses"
)
expected_components: List[ProtocolComponentWithTestConfig] = Field(
..., description="List of protocol components expected in the indexed state"
)
class IntegrationTestsConfig(BaseModel):
"""Main integration test configuration."""
substreams_yaml_path: str = Field(
"./substreams.yaml", description="Path of the Substreams YAML file"
)
adapter_contract: str = Field(
..., description="Name of the SwapAdapter contract for this protocol"
)
adapter_build_signature: Optional[str] = Field(
None, description="SwapAdapter's constructor signature"
)
adapter_build_args: Optional[str] = Field(
None, description="Arguments for the SwapAdapter constructor"
)
initialized_accounts: Optional[List[str]] = Field(
None,
description="List of initialized account addresses. These accounts will be initialized for every tests",
)
skip_balance_check: bool = Field(
..., description="Flag to skip balance check for all tests"
)
protocol_type_names: List[str] = Field(
..., description="List of protocol type names for the tested protocol"
)
tests: List[IntegrationTest] = Field(..., description="List of integration tests")

View File

@@ -1,418 +0,0 @@
import itertools
import os
import shutil
import subprocess
import traceback
from datetime import datetime
from decimal import Decimal
from pathlib import Path
from typing import Optional, Callable, Any
import yaml
from tycho_simulation_py.evm.decoders import ThirdPartyPoolTychoDecoder
from tycho_simulation_py.evm.storage import TychoDBSingleton
from tycho_simulation_py.models import EVMBlock
from pydantic import BaseModel
from tycho_indexer_client.dto import (
Chain,
ProtocolComponentsParams,
ProtocolStateParams,
ContractStateParams,
ProtocolComponent,
ResponseProtocolState,
HexBytes,
ResponseAccount,
Snapshot,
TracedEntryPointParams,
)
from tycho_indexer_client.rpc_client import TychoRPCClient
from models import (
IntegrationTestsConfig,
ProtocolComponentWithTestConfig,
ProtocolComponentExpectation,
)
from adapter_builder import AdapterContractBuilder
from evm import get_token_balance, get_block_header
from tycho import TychoRunner
from utils import build_snapshot_message, token_factory
class TestResult:
def __init__(
self, success: bool, step: Optional[str] = None, message: Optional[str] = None
):
self.success = success
self.step = step
self.message = message
@classmethod
def Passed(cls):
return cls(success=True)
@classmethod
def Failed(cls, step: str, message: str):
return cls(success=False, step=step, message=message)
def parse_config(yaml_path: str) -> IntegrationTestsConfig:
with open(yaml_path, "r") as file:
yaml_content = yaml.safe_load(file)
return IntegrationTestsConfig(**yaml_content)
class SimulationFailure(BaseModel):
pool_id: str
sell_token: str
buy_token: str
error: str
class TestRunner:
def __init__(
self, package: str, with_binary_logs: bool, db_url: str, vm_traces: bool
):
self.repo_root = os.getcwd()
config_path = os.path.join(
self.repo_root, "substreams", package, "integration_test.tycho.yaml"
)
self.config: IntegrationTestsConfig = parse_config(config_path)
self.spkg_src = os.path.join(self.repo_root, "substreams", package)
self.adapter_contract_builder = AdapterContractBuilder(
os.path.join(self.repo_root, "evm")
)
self.tycho_runner = TychoRunner(
db_url, with_binary_logs, self.config.initialized_accounts
)
self.tycho_rpc_client = TychoRPCClient()
self._token_factory_func = token_factory(self.tycho_rpc_client)
self.db_url = db_url
self._vm_traces = vm_traces
self._chain = Chain.ethereum
def run_tests(self) -> None:
"""Run all tests specified in the configuration."""
print(f"Running {len(self.config.tests)} tests ...\n")
print("--------------------------------\n")
failed_tests: list[str] = []
count = 1
for test in self.config.tests:
print(f"TEST {count}: {test.name}")
self.tycho_runner.empty_database(self.db_url)
spkg_path = self.build_spkg(
os.path.join(self.spkg_src, self.config.substreams_yaml_path),
lambda data: self.update_initial_block(data, test.start_block),
)
self.tycho_runner.run_tycho(
spkg_path,
test.start_block,
test.stop_block,
self.config.protocol_type_names,
test.initialized_accounts or [],
)
result: TestResult = self.tycho_runner.run_with_rpc_server(
self.validate_state,
test.expected_components,
test.stop_block,
test.initialized_accounts or [],
)
if result.success:
print(f"\n{test.name} passed.\n")
else:
failed_tests.append(test.name)
print(f"\n❗️ {test.name} failed on {result.step}: {result.message}\n")
print("--------------------------------\n")
count += 1
print(
"\nTests finished! \n"
f"RESULTS: {len(self.config.tests) - len(failed_tests)}/{len(self.config.tests)} passed.\n"
)
if failed_tests:
print("Failed tests:")
for failed_test in failed_tests:
print(f"- {failed_test}")
print("\n")
def validate_state(
self,
expected_components: list[ProtocolComponentWithTestConfig],
stop_block: int,
initialized_accounts: list[str],
) -> TestResult:
"""Validate the current protocol state against the expected state."""
protocol_components = self.tycho_rpc_client.get_protocol_components(
ProtocolComponentsParams(protocol_system="test_protocol")
).protocol_components
protocol_states = self.tycho_rpc_client.get_protocol_state(
ProtocolStateParams(protocol_system="test_protocol")
).states
components_by_id: dict[str, ProtocolComponent] = {
component.id: component for component in protocol_components
}
try:
# Step 1: Validate the protocol components
step = "Protocol component validation"
for expected_component in expected_components:
comp_id = expected_component.id.lower()
if comp_id not in components_by_id:
return TestResult.Failed(
step=step,
message=f"'{comp_id}' not found in protocol components. "
f"Available components: {set(components_by_id.keys())}",
)
diff = ProtocolComponentExpectation(
**components_by_id[comp_id].dict()
).compare(ProtocolComponentExpectation(**expected_component.dict()))
if diff is not None:
return TestResult.Failed(step=step, message=diff)
print(f"\n{step} passed.\n")
# Step 2: Validate the token balances
step = "Token balance validation"
if not self.config.skip_balance_check:
for component in protocol_components:
comp_id = component.id.lower()
for token in component.tokens:
state = next(
(
s
for s in protocol_states
if s.component_id.lower() == comp_id
),
None,
)
if state:
balance_hex = state.balances.get(token, HexBytes("0x00"))
else:
balance_hex = HexBytes("0x00")
tycho_balance = int(balance_hex)
node_balance = get_token_balance(token, comp_id, stop_block)
if node_balance != tycho_balance:
return TestResult.Failed(
step=step,
message=f"Balance mismatch for {comp_id}:{token} at block {stop_block}: got {node_balance} "
f"from rpc call and {tycho_balance} from Substreams",
)
print(f"\n{step} passed.\n")
else:
print(f"\n {step} skipped. \n")
# Step 3: Validate the simulation
step = "Simulation validation"
# Loads from Tycho-Indexer the state of all the contracts that are related to the protocol components.
simulation_components: list[str] = [
c.id for c in expected_components if c.skip_simulation is False
]
related_contracts: set[str] = set()
for account in self.config.initialized_accounts or []:
related_contracts.add(account)
for account in initialized_accounts or []:
related_contracts.add(account)
# Collect all contracts that are related to the simulation components
filtered_components: list[ProtocolComponent] = []
component_related_contracts: set[str] = set()
for component in protocol_components:
# Filter out components that are not set to be used for the simulation
if component.id in simulation_components:
# Collect component contracts
for a in component.contract_ids:
component_related_contracts.add(a.hex())
# Collect DCI detected contracts
traces_results = self.tycho_rpc_client.get_traced_entry_points(
TracedEntryPointParams(
protocol_system="test_protocol",
component_ids=[component.id],
)
).traced_entry_points.values()
for traces in traces_results:
for _, trace in traces:
component_related_contracts.update(
trace["accessed_slots"].keys()
)
filtered_components.append(component)
# Check if any of the initialized contracts are not listed as component contract dependencies
unspecified_contracts: list[str] = [
c for c in related_contracts if c not in component_related_contracts
]
related_contracts.update(component_related_contracts)
contract_states = self.tycho_rpc_client.get_contract_state(
ContractStateParams(contract_ids=list(related_contracts))
).accounts
if len(filtered_components):
if len(unspecified_contracts):
print(
f"⚠️ The following initialized contracts are not listed as component contract dependencies: {unspecified_contracts}. "
f"Please ensure that, if they are required for this component's simulation, they are specified under the Protocol Component's contract field."
)
simulation_failures = self.simulate_get_amount_out(
stop_block, protocol_states, filtered_components, contract_states
)
if len(simulation_failures):
error_msgs: list[str] = []
for pool_id, failures in simulation_failures.items():
failures_formatted: list[str] = [
f"{f.sell_token} -> {f.buy_token}: {f.error}"
for f in failures
]
error_msgs.append(
f"Pool {pool_id} failed simulations: {', '.join(failures_formatted)}"
)
return TestResult.Failed(step=step, message="\n".join(error_msgs))
print(f"\n{step} passed.\n")
else:
print(f"\n {step} skipped.\n")
return TestResult.Passed()
except Exception as e:
error_message = f"An error occurred: {str(e)}\n" + traceback.format_exc()
return TestResult.Failed(step=step, message=error_message)
def simulate_get_amount_out(
self,
block_number: int,
protocol_states: list[ResponseProtocolState],
protocol_components: list[ProtocolComponent],
contract_states: list[ResponseAccount],
) -> dict[str, list[SimulationFailure]]:
TychoDBSingleton.initialize()
block_header = get_block_header(block_number)
block: EVMBlock = EVMBlock(
id=block_number,
ts=datetime.fromtimestamp(block_header.timestamp),
hash_=block_header.hash.hex(),
)
failed_simulations: dict[str, list[SimulationFailure]] = {}
try:
adapter_contract = self.adapter_contract_builder.find_contract(
self.config.adapter_contract
)
except FileNotFoundError:
adapter_contract = self.adapter_contract_builder.build_target(
self.config.adapter_contract,
self.config.adapter_build_signature,
self.config.adapter_build_args,
)
TychoDBSingleton.clear_instance()
decoder = ThirdPartyPoolTychoDecoder(
token_factory_func=self._token_factory_func,
adapter_contract=adapter_contract,
minimum_gas=0,
trace=self._vm_traces,
)
snapshot_message: Snapshot = build_snapshot_message(
protocol_states, protocol_components, contract_states
)
decoded = decoder.decode_snapshot(snapshot_message, block)
for component in protocol_components:
if component.id not in decoded:
failed_simulations[component.id] = [
SimulationFailure(
pool_id=component.id,
sell_token=component.tokens[0].hex(),
buy_token=component.tokens[1].hex(),
error="Pool not found in decoded state.",
)
]
for pool_state in decoded.values():
pool_id = pool_state.id_
if not pool_state.balances:
raise ValueError(f"Missing balances for pool {pool_id}")
for sell_token, buy_token in itertools.permutations(pool_state.tokens, 2):
for prctg in ["0.001", "0.01", "0.1"]:
# Try to sell 0.1% of the protocol balance
try:
sell_amount = (
Decimal(prctg) * pool_state.balances[sell_token.address]
)
amount_out, gas_used, _ = pool_state.get_amount_out(
sell_token, sell_amount, buy_token
)
print(
f"Amount out for {pool_id}: {sell_amount} {sell_token} -> {amount_out} {buy_token} - "
f"Gas used: {gas_used}"
)
except Exception as e:
print(
f"Error simulating get_amount_out for {pool_id}: {sell_token} -> {buy_token} at block {block_number}. "
f"Error: {e}"
)
if pool_id not in failed_simulations:
failed_simulations[pool_id] = []
failed_simulations[pool_id].append(
SimulationFailure(
pool_id=pool_id,
sell_token=str(sell_token),
buy_token=str(buy_token),
error=str(e),
)
)
continue
return failed_simulations
@staticmethod
def build_spkg(
yaml_file_path: str, modify_func: Callable[[dict[str, Any]], None]
) -> str:
"""Build a Substreams package with modifications to the YAML file."""
backup_file_path = f"{yaml_file_path}.backup"
shutil.copy(yaml_file_path, backup_file_path)
with open(yaml_file_path, "r") as file:
data = yaml.safe_load(file)
modify_func(data)
spkg_name = f"{yaml_file_path.rsplit('/', 1)[0]}/{data['package']['name'].replace('_', '-')}-{data['package']['version']}.spkg"
with open(yaml_file_path, "w") as file:
yaml.dump(data, file, default_flow_style=False)
try:
result = subprocess.run(
["substreams", "pack", yaml_file_path], capture_output=True, text=True
)
if result.returncode != 0:
print("Substreams pack command failed:", result.stderr)
except Exception as e:
print(f"Error running substreams pack command: {e}")
shutil.copy(backup_file_path, yaml_file_path)
Path(backup_file_path).unlink()
return spkg_name
@staticmethod
def update_initial_block(data: dict[str, Any], start_block: int) -> None:
"""Update the initial block for all modules in the configuration data."""
for module in data["modules"]:
module["initialBlock"] = start_block

View File

@@ -1,211 +0,0 @@
import signal
import subprocess
import threading
import time
import psycopg2
from psycopg2 import sql
import os
def find_binary_file(file_name):
# Define usual locations for binary files in Unix-based systems
locations = [
"/bin",
"/sbin",
"/usr/bin",
"/usr/sbin",
"/usr/local/bin",
"/usr/local/sbin",
]
# Add user's local bin directory if it exists
home = os.path.expanduser("~")
if os.path.exists(home + "/.local/bin"):
locations.append(home + "/.local/bin")
# Check each location
for location in locations:
potential_path = os.path.join(location, file_name)
if os.path.exists(potential_path):
return potential_path
# If binary is not found in the usual locations, return None
searched_paths = "\n".join(locations)
raise RuntimeError(
f"Unable to locate {file_name} binary. Searched paths:\n{searched_paths}"
)
binary_path = find_binary_file("tycho-indexer")
class TychoRunner:
def __init__(
self,
db_url: str,
with_binary_logs: bool = False,
initialized_accounts: list[str] = None,
):
self.with_binary_logs = with_binary_logs
self._db_url = db_url
self._initialized_accounts = initialized_accounts or []
def run_tycho(
self,
spkg_path: str,
start_block: int,
end_block: int,
protocol_type_names: list,
initialized_accounts: list,
protocol_system: str = "test_protocol",
) -> None:
"""Run the Tycho indexer with the specified SPKG and block range."""
env = os.environ.copy()
env["RUST_LOG"] = "tycho_indexer=info"
all_accounts = self._initialized_accounts + initialized_accounts
try:
process = subprocess.Popen(
[
binary_path,
"--database-url",
self._db_url,
"run",
"--spkg",
spkg_path,
"--module",
"map_protocol_changes",
"--protocol-type-names",
",".join(protocol_type_names),
"--protocol-system",
protocol_system,
"--start-block",
str(start_block),
"--stop-block",
# +2 is to make up for the cache in the index side.
str(end_block + 2),
"--dci-plugin",
"rpc",
]
+ (
[
"--initialized-accounts",
",".join(all_accounts),
"--initialization-block",
str(start_block),
]
if all_accounts
else []
),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
env=env,
)
with process.stdout:
for line in iter(process.stdout.readline, ""):
if line and self.with_binary_logs:
print(line.strip())
with process.stderr:
for line in iter(process.stderr.readline, ""):
if line and self.with_binary_logs:
print(line.strip())
process.wait()
except Exception as e:
print(f"Error running Tycho indexer: {e}")
def run_with_rpc_server(self, func: callable, *args, **kwargs):
"""
Run a function with Tycho RPC running in background.
This function is a wrapper around a target function. It starts Tycho RPC as a background task, executes the target function and stops Tycho RPC.
"""
stop_event = threading.Event()
process = None
def run_rpc_server():
nonlocal process
try:
env = os.environ.copy()
env["RUST_LOG"] = "info"
process = subprocess.Popen(
[binary_path, "--database-url", self._db_url, "rpc"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
env=env,
)
# Read remaining stdout and stderr
if self.with_binary_logs:
for output in process.stdout:
if output:
print(output.strip())
for error_output in process.stderr:
if error_output:
print(error_output.strip())
process.wait()
if process.returncode != 0:
print("Command failed with return code:", process.returncode)
except Exception as e:
print(f"An error occurred while running the command: {e}")
finally:
if process and process.poll() is None:
process.terminate()
process.wait()
# Start the RPC server in a separate thread
rpc_thread = threading.Thread(target=run_rpc_server)
rpc_thread.start()
time.sleep(3) # Wait for the RPC server to start
try:
# Run the provided function
result = func(*args, **kwargs)
return result
finally:
stop_event.set()
if process and process.poll() is None:
process.send_signal(signal.SIGINT)
if rpc_thread.is_alive():
rpc_thread.join()
@staticmethod
def empty_database(db_url: str) -> None:
"""Drop and recreate the Tycho indexer database."""
try:
conn = psycopg2.connect(db_url[: db_url.rfind("/")])
conn.autocommit = True
cursor = conn.cursor()
cursor.execute(
sql.SQL("DROP DATABASE IF EXISTS {} WITH (FORCE)").format(
sql.Identifier("tycho_indexer_0")
)
)
cursor.execute(
sql.SQL("CREATE DATABASE {}").format(sql.Identifier("tycho_indexer_0"))
)
except psycopg2.Error as e:
print(f"Database error: {e}")
finally:
if cursor:
cursor.close()
if conn:
conn.close()

View File

@@ -1,79 +0,0 @@
from logging import getLogger
from typing import Union
from eth_utils import to_checksum_address
from tycho_simulation_py.models import EthereumToken
from tycho_indexer_client.dto import (
ResponseProtocolState,
ProtocolComponent,
ResponseAccount,
ComponentWithState,
Snapshot,
HexBytes,
TokensParams,
PaginationParams,
)
from tycho_indexer_client.rpc_client import TychoRPCClient
log = getLogger(__name__)
def build_snapshot_message(
protocol_states: list[ResponseProtocolState],
protocol_components: list[ProtocolComponent],
account_states: list[ResponseAccount],
) -> Snapshot:
vm_storage = {state.address: state for state in account_states}
states = {}
for component in protocol_components:
pool_id = component.id
states[pool_id] = {"component": component}
for state in protocol_states:
pool_id = state.component_id
if pool_id not in states:
continue
states[pool_id]["state"] = state
states = {id_: ComponentWithState(**state) for id_, state in states.items()}
return Snapshot(states=states, vm_storage=vm_storage)
def token_factory(rpc_client: TychoRPCClient) -> callable(HexBytes):
_client = rpc_client
_token_cache: dict[str, EthereumToken] = {}
def factory(requested_addresses: Union[str, list[str]]) -> list[EthereumToken]:
if not isinstance(requested_addresses, list):
requested_addresses = [to_checksum_address(requested_addresses)]
else:
requested_addresses = [to_checksum_address(a) for a in requested_addresses]
response = dict()
to_fetch = []
for address in requested_addresses:
if address in _token_cache:
response[address] = _token_cache[address]
else:
to_fetch.append(address)
if to_fetch:
pagination = PaginationParams(page_size=len(to_fetch), page=0)
params = TokensParams(token_addresses=to_fetch, pagination=pagination)
tokens = _client.get_tokens(params).tokens
for token in tokens:
address = to_checksum_address(token.address)
eth_token = EthereumToken(
symbol=token.symbol,
address=address,
decimals=token.decimals,
gas=token.gas,
)
response[address] = eth_token
_token_cache[address] = eth_token
return [response[address] for address in requested_addresses]
return factory