Compare commits

...

10 Commits

Author SHA1 Message Date
TAMARA LIPOWSKI
1287d8c5c5 fix: Ekubo test 2025-10-09 17:52:31 +02:00
zach
07315294ce fix: add traced entrypoint 2025-10-09 16:42:51 +02:00
Zizou
568f26116e feat: add native balance in StorageChanges (#292)
* feat: add native balance in StorageChanges

This will be used by the next update of DCI to properly track native balance for all the contract that it's indexing. Also added a performance optimization, we now ignore slot updates if the value after the transaction is the same than before.

* docs: add docs on how to generate test assets

---------

Co-authored-by: zizou <111426680+flopell@users.noreply.github.com>
2025-10-06 10:12:37 +00:00
dianacarvalho1
128de3037e fix: Misc improvements (#291)
* fix: Misc improvements

Make module_name optional and default to map_protocol_changes
Add build instructions for the DB image inside of the docker-compose
Improve error message
Remove unnecessary module_name from yaml files

#time 39m

* fix: Bring back module_name in balancer v3 tests

#time 8m

* fix: Fix balance and allowance overwrites

For tokens with proxies we need to use the address that is returned by the detector

#time 13m
2025-10-03 14:37:49 +00:00
dianacarvalho1
243cf14c3e feat: Upgrade tycho dependencies (#289)
Don't use tycho_execution from tycho_simulation

#time 8m
2025-09-30 14:33:01 +01:00
TAMARA LIPOWSKI
8de285a2ee feat: PancakeV3 test
- There are only two post-cancun PancakeV3 pools.
- One was ORDER-WETH which I guess had a liquidity issue, it failed with `StateDecodingFailure pool="0xb2dc4d7627501338b578985c214208eb32283086" error=Missing attributes tick_liquidities`
- The second one is this USDT-USDf pool used for this test, though we fail to detect the balance slot of USDT: `WrongSlotError(“Slot override didn’t change balance.“)`
- For this reason, we skip execution.
- Also includes stop_block fix (better explained in Uniswap V3 test PR)
2025-09-29 12:04:07 -04:00
TAMARA LIPOWSKI
f9b4b5c223 fix: UniswapV2 test - use post-cancun created pool
- Also fix balance check: this should use the stop block not start block, since the start block is before the component was even created.
2025-09-29 09:29:44 -04:00
Zizou
3c6f9c2083 fix: update balancer_v2 dependency after a fix in tycho-substreams (#281)
Co-authored-by: zizou <111426680+flopell@users.noreply.github.com>
2025-09-26 21:28:39 +00:00
dianacarvalho1
ea10bfa99a feat: Add state_registry.rs (#285)
This it to make it easier for users to add new protocol states

#time 34m


#time 0m
2025-09-26 17:10:44 +01:00
dianacarvalho1
9a7e6a1cf7 feat: Remove python suite (deprecated) (#284)
Update readme

#time 10m
2025-09-26 16:47:21 +01:00
48 changed files with 843 additions and 3443 deletions

View File

@@ -192,6 +192,8 @@ message StorageChanges {
bytes address = 1;
// The contract's storage changes
repeated ContractSlot slots = 2;
// [optional] The contract's balance change
optional bytes native_balance = 3;
}
// Aggregate entities

File diff suppressed because it is too large Load Diff

View File

@@ -8,7 +8,8 @@ glob = "0.3.0"
miette = { version = "7.6.0", features = ["fancy"] }
# Logging & Tracing
tracing = "0.1.37"
tycho-simulation = { git = "https://github.com/propeller-heads/tycho-simulation.git", rev = "f73c2ef28328abdde791edf1fb21748f78dbee6a", features = ["evm"] }
tycho-simulation = { git = "https://github.com/propeller-heads/tycho-simulation.git", tag = "0.167.0", features = ["evm"] }
tycho-execution = "0.129.0"
num-bigint = "0.4"
num-traits = "0.2"
num-rational = "0.4.2"

View File

@@ -1,8 +1,29 @@
# Protocol Testing
Rust-based integration testing framework for Tycho protocol implementations.
Rust-based integration testing framework for Tycho protocol implementations. See our full
docs [here](https://docs.propellerheads.xyz/tycho/for-dexs/protocol-integration/3.-testing).
## How to Run
## How to Run Locally
```bash
# Setup Environment Variables
export RPC_URL=..
export SUBSTREAMS_API_TOKEN=..
export RUST_LOG=protocol_testing=info,tycho_client=error
# Build Substreams wasm for BalancerV2
cd substreams
cargo build --release --package ethereum-balancer-v2 --target wasm32-unknown-unknown
cd ../protocol-testing
# Run Postgres DB using Docker compose
docker compose -f ./docker-compose.yaml up -d db
# Run test
cargo run -- --package ethereum-balancer-v2
```
## How to Run with Docker
```bash
# Build the images, from the project root dir
@@ -20,72 +41,3 @@ docker compose up -d && docker compose logs test-runner --follow
# Clean up
docker compose down
```
## Test Output Formatting
The test runner outputs results similar to:
```
Running 2 tests ...
--------------------------------
TEST 1: balancer_weighted_pool_test
✅ Protocol component validation passed.
✅ Token balance validation passed.
Amount out for 0x5c6ee304399dbdb9c8ef030ab642b10820db8f56000200000000000000000014: calculating for tokens "BAL"/"WETH"
Spot price "BAL"/"WETH": 0.123456
✅ Simulation validation passed.
✅ balancer_weighted_pool_test passed.
--------------------------------
Tests finished!
RESULTS: 2/2 passed.
```
## Module-specific Logging
```bash
# Enable debug logs for specific modules
export RUST_LOG=protocol_testing=debug,tycho_client=info
# Disable logs for noisy modules
export RUST_LOG=info,hyper=warn,reqwest=warn
```
## Running with Different Log Levels
```bash
# Standard test run with progress output
RUST_LOG=info cargo run -- --package uniswap-v2
# Detailed debug output
RUST_LOG=debug cargo run -- --package uniswap-v2
# Minimal output (errors only)
RUST_LOG=error cargo run -- --package uniswap-v2
```
## Test Configuration
Tests are configured via YAML files located in the substreams package directory:
- Test configuration: `../substreams/<package>/integration_test.tycho.yaml`
- Substreams configuration: `../substreams/<package>/substreams.yaml`
## What the Tests Do
1. **Component Validation**: Verifies that all expected protocol components are present in Tycho after indexing
2. **State Validation**: Compares indexed component states against expected values
3. **Balance Verification**: Validates token balances by querying the blockchain directly (can be skipped)
4. **Simulation Testing**: Runs Tycho simulation engine to verify protocol functionality
## Troubleshooting
- **Database Connection Issues**: Ensure PostgreSQL is running via `docker-compose up -d`
- **RPC Errors**: Verify `RPC_URL` is set and accessible
- **Missing Substreams**: Check that the package directory exists in `../substreams/<package>/`
- **Build Failures**: Ensure all dependencies are installed and environment variables are set

View File

@@ -1,5 +1,8 @@
services:
db:
build:
context: .
dockerfile: postgres.Dockerfile
image: protocol-testing-db:latest
restart: "always"
healthcheck:
@@ -30,7 +33,7 @@ services:
SUBSTREAMS_API_TOKEN: "${SUBSTREAMS_API_TOKEN}"
# PROTOCOLS to test separated by space and with optional filter
# e.g. "ethereum-balancer-v2=weighted_legacy_creation ethereum-ekubo-v2"
entrypoint: ["/entrypoint.sh", "${PROTOCOLS}"]
entrypoint: [ "/entrypoint.sh", "${PROTOCOLS}" ]
volumes:
postgres_data:

View File

@@ -149,6 +149,6 @@ pub struct IntegrationTestsConfig {
pub skip_balance_check: bool,
pub protocol_type_names: Vec<String>,
pub protocol_system: String,
pub module_name: String,
pub module_name: Option<String>,
pub tests: Vec<IntegrationTest>,
}

View File

@@ -10,17 +10,15 @@ use alloy::{primitives::Keccak256, sol_types::SolValue};
use miette::{IntoDiagnostic, WrapErr};
use num_bigint::BigUint;
use serde_json::json;
use tycho_execution::encoding::{
errors::EncodingError,
evm::{encoder_builders::TychoRouterEncoderBuilder, utils::bytes_to_address},
models::{EncodedSolution, NativeAction, Solution, SwapBuilder, Transaction, UserTransferType},
};
use tycho_simulation::{
evm::protocol::u256_num::biguint_to_u256,
protocol::models::ProtocolComponent,
tycho_common::{dto::Chain, Bytes},
tycho_execution::encoding::{
errors::EncodingError,
evm::{encoder_builders::TychoRouterEncoderBuilder, utils::bytes_to_address},
models::{
EncodedSolution, NativeAction, Solution, SwapBuilder, Transaction, UserTransferType,
},
},
};
use crate::execution::EXECUTOR_ADDRESS;

View File

@@ -13,6 +13,7 @@ use alloy::{
use miette::{miette, IntoDiagnostic, WrapErr};
use num_bigint::BigUint;
use tracing::info;
use tycho_execution::encoding::models::Solution;
use tycho_simulation::{
evm::protocol::u256_num::{biguint_to_u256, u256_to_biguint},
tycho_common::{
@@ -23,7 +24,6 @@ use tycho_simulation::{
allowance_slot_detector::{AllowanceSlotDetectorConfig, EVMAllowanceSlotDetector},
balance_slot_detector::{BalanceSlotDetectorConfig, EVMBalanceSlotDetector},
},
tycho_execution::encoding::models::Solution,
};
use crate::rpc::RPCProvider;
@@ -236,19 +236,18 @@ pub async fn setup_router_overwrites(
/// - RPC queries for storage detection fail
async fn setup_user_overwrites(
solution: &Solution,
transaction: &tycho_simulation::tycho_execution::encoding::models::Transaction,
transaction: &tycho_execution::encoding::models::Transaction,
user_address: Address,
rpc_url: String,
block: &Block,
) -> miette::Result<AddressHashMap<AccountOverride>> {
let mut overwrites = AddressHashMap::default();
// Add ETH balance override for the user to ensure they have enough gas funds
let mut eth_balance = U256::from_str("100000000000000000000").unwrap(); // 100 ETH
let token_address = Address::from_slice(&solution.given_token[..20]);
// If given token is ETH, add the given amount to the balance
// If given token is ETH, add the given amount to the balance + 100 ETH for gas
if solution.given_token == Bytes::zero(20) {
eth_balance += biguint_to_u256(&solution.given_amount);
let eth_balance = biguint_to_u256(&solution.given_amount) +
U256::from_str("100000000000000000000").unwrap(); // given_amount + 100 ETH for gas
overwrites.insert(user_address, AccountOverride::default().with_balance(eth_balance));
// if the given token is not ETH, do balance and allowance slots overwrites
} else {
let detector = EVMBalanceSlotDetector::new(BalanceSlotDetectorConfig {
@@ -265,9 +264,9 @@ async fn setup_user_overwrites(
)
.await;
let balance_slot =
if let Some(Ok((_storage_addr, slot))) = results.get(&solution.given_token.clone()) {
slot
let (balance_storage_addr, balance_slot) =
if let Some(Ok((storage_addr, slot))) = results.get(&solution.given_token.clone()) {
(storage_addr, slot)
} else {
return Err(miette!("Couldn't find balance storage slot for token {token_address}"));
};
@@ -287,39 +286,66 @@ async fn setup_user_overwrites(
)
.await;
let allowance_slot = if let Some(Ok((_storage_addr, slot))) =
let (allowance_storage_addr, allowance_slot) = if let Some(Ok((storage_addr, slot))) =
results.get(&solution.given_token.clone())
{
slot
(storage_addr, slot)
} else {
return Err(miette!("Couldn't find allowance storage slot for token {token_address}"));
};
// Use the exact given amount for balance and allowance (no buffer, no max)
let token_balance = biguint_to_u256(&solution.given_amount);
let token_allowance = biguint_to_u256(&solution.given_amount);
let balance_storage_address = Address::from_slice(&balance_storage_addr[..20]);
let allowance_storage_address = Address::from_slice(&allowance_storage_addr[..20]);
// Apply balance and allowance overrides
// If both storage addresses are the same, combine them into one override
if balance_storage_address == allowance_storage_address {
overwrites.insert(
token_address,
balance_storage_address,
AccountOverride::default().with_state_diff(vec![
(
alloy::primitives::B256::from_slice(allowance_slot),
alloy::primitives::B256::from_slice(&U256::MAX.to_be_bytes::<32>()),
alloy::primitives::B256::from_slice(balance_slot),
alloy::primitives::B256::from_slice(&token_balance.to_be_bytes::<32>()),
),
(
alloy::primitives::B256::from_slice(balance_slot),
alloy::primitives::B256::from_slice(
&biguint_to_u256(&solution.given_amount).to_be_bytes::<32>(),
),
alloy::primitives::B256::from_slice(allowance_slot),
alloy::primitives::B256::from_slice(&token_allowance.to_be_bytes::<32>()),
),
]),
);
} else {
// Different storage addresses, apply separately
overwrites.insert(
balance_storage_address,
AccountOverride::default().with_state_diff(vec![(
alloy::primitives::B256::from_slice(balance_slot),
alloy::primitives::B256::from_slice(&token_balance.to_be_bytes::<32>()),
)]),
);
overwrites.insert(
allowance_storage_address,
AccountOverride::default().with_state_diff(vec![(
alloy::primitives::B256::from_slice(allowance_slot),
alloy::primitives::B256::from_slice(&token_allowance.to_be_bytes::<32>()),
)]),
);
}
overwrites.insert(user_address, AccountOverride::default().with_balance(eth_balance));
// Add 1 ETH for gas for non-ETH token swaps
let eth_balance = U256::from_str("100000000000000000000").unwrap(); // 100 ETH for gas
overwrites.insert(user_address, AccountOverride::default().with_balance(eth_balance));
}
Ok(overwrites)
}
/// Simulate a trade using eth_call for historical blocks
pub async fn simulate_trade_with_eth_call(
rpc_provider: &RPCProvider,
transaction: &tycho_simulation::tycho_execution::encoding::models::Transaction,
transaction: &tycho_execution::encoding::models::Transaction,
solution: &Solution,
block: &Block,
) -> miette::Result<BigUint> {

View File

@@ -3,6 +3,7 @@ mod config;
mod encoding;
mod execution;
mod rpc;
mod state_registry;
mod test_runner;
mod traces;
mod tycho_rpc;

View File

@@ -0,0 +1,50 @@
use tycho_simulation::{
evm::{
decoder::TychoStreamDecoder,
engine_db::tycho_db::PreCachedDB,
protocol::{
ekubo::state::EkuboState, pancakeswap_v2::state::PancakeswapV2State,
uniswap_v2::state::UniswapV2State, uniswap_v3::state::UniswapV3State,
vm::state::EVMPoolState,
},
},
protocol::models::DecoderContext,
tycho_client::feed::BlockHeader,
};
/// Register decoder based on protocol system. Defaults to EVMPoolState.
/// To add a new protocol, just add a case to the match statement.
pub fn register_decoder_for_protocol(
decoder: &mut TychoStreamDecoder<BlockHeader>,
protocol_system: &str,
decoder_context: DecoderContext,
) -> miette::Result<()> {
match protocol_system {
"uniswap_v2" | "sushiswap_v2" => {
decoder
.register_decoder_with_context::<UniswapV2State>(protocol_system, decoder_context);
}
"pancakeswap_v2" => {
decoder.register_decoder_with_context::<PancakeswapV2State>(
protocol_system,
decoder_context,
);
}
"uniswap_v3" | "pancakeswap_v3" => {
decoder
.register_decoder_with_context::<UniswapV3State>(protocol_system, decoder_context);
}
"ekubo_v2" => {
decoder.register_decoder_with_context::<EkuboState>(protocol_system, decoder_context);
}
// Default to EVMPoolState for all other protocols
_ => {
decoder.register_decoder_with_context::<EVMPoolState<PreCachedDB>>(
protocol_system,
decoder_context,
);
}
}
Ok(())
}

View File

@@ -21,27 +21,22 @@ use num_traits::{Signed, ToPrimitive, Zero};
use postgres::{Client, Error, NoTls};
use tokio::runtime::Runtime;
use tracing::{debug, error, info, warn};
use tycho_execution::encoding::evm::utils::bytes_to_address;
use tycho_simulation::{
evm::{
decoder::TychoStreamDecoder,
engine_db::tycho_db::PreCachedDB,
protocol::{
ekubo::state::EkuboState, pancakeswap_v2::state::PancakeswapV2State,
u256_num::bytes_to_u256, uniswap_v2::state::UniswapV2State,
uniswap_v3::state::UniswapV3State, vm::state::EVMPoolState,
},
},
evm::{decoder::TychoStreamDecoder, protocol::u256_num::bytes_to_u256},
protocol::models::{DecoderContext, Update},
tycho_client::feed::{
synchronizer::{ComponentWithState, Snapshot, StateSyncMessage},
BlockHeader, FeedMessage,
},
tycho_common::{
dto::{Chain, ProtocolComponent, ResponseAccount, ResponseProtocolState},
dto::{
Chain, EntryPointWithTracingParams, ProtocolComponent, ResponseAccount,
ResponseProtocolState, TracingResult,
},
models::token::Token,
Bytes,
},
tycho_execution::encoding::evm::utils::bytes_to_address,
};
use crate::{
@@ -50,6 +45,7 @@ use crate::{
encoding::encode_swap,
execution,
rpc::RPCProvider,
state_registry::register_decoder_for_protocol,
tycho_rpc::TychoClient,
tycho_runner::TychoRunner,
utils::build_spkg,
@@ -241,7 +237,7 @@ impl TestRunner {
test.stop_block,
&config.protocol_type_names,
&config.protocol_system,
&config.module_name,
config.module_name.clone(),
)
.wrap_err("Failed to run Tycho")?;
@@ -274,7 +270,7 @@ impl TestRunner {
self.validate_token_balances(
&component_tokens,
&response_protocol_states_by_id,
test.start_block,
test.stop_block,
)?;
info!("All token balances match the values found onchain")
}
@@ -376,6 +372,16 @@ impl TestRunner {
.into_diagnostic()
.wrap_err("Failed to get contract state")?;
let traced_entry_points = self
.runtime
.block_on(tycho_client.get_traced_entry_points(
protocol_system,
expected_component_ids.clone(),
chain,
))
.into_diagnostic()
.wrap_err("Failed to get trace points")?;
// Create a map of component IDs to components for easy lookup
let mut components_by_id: HashMap<String, ProtocolComponent> = protocol_components
.clone()
@@ -393,6 +399,7 @@ impl TestRunner {
debug!("Found {} protocol components", components_by_id.len());
debug!("Found {} protocol states", protocol_states_by_id.len());
debug!("Found {} traced entry points", traced_entry_points.len());
let adapter_contract_path;
let mut adapter_contract_path_str: Option<&str> = None;
@@ -436,36 +443,7 @@ impl TestRunner {
if let Some(vm_adapter_path) = adapter_contract_path_str {
decoder_context = decoder_context.vm_adapter_path(vm_adapter_path);
}
match protocol_system {
"uniswap_v2" | "sushiswap_v2" => {
decoder.register_decoder_with_context::<UniswapV2State>(
protocol_system,
decoder_context,
);
}
"pancakeswap_v2" => {
decoder.register_decoder_with_context::<PancakeswapV2State>(
protocol_system,
decoder_context,
);
}
"uniswap_v3" | "pancakeswap_v3" => {
decoder.register_decoder_with_context::<UniswapV3State>(
protocol_system,
decoder_context,
);
}
"ekubo_v2" => {
decoder
.register_decoder_with_context::<EkuboState>(protocol_system, decoder_context);
}
_ => {
decoder.register_decoder_with_context::<EVMPoolState<PreCachedDB>>(
protocol_system,
decoder_context,
);
}
}
register_decoder_for_protocol(&mut decoder, protocol_system, decoder_context)?;
// Mock a stream message, with only a Snapshot and no deltas
let mut states: HashMap<String, ComponentWithState> = HashMap::new();
@@ -474,18 +452,27 @@ impl TestRunner {
let state = protocol_states_by_id
.get(component_id)
.wrap_err(format!(
"Component {id} does not exist in protocol_states_by_id {protocol_states_by_id:?}"
))?
.wrap_err(format!("No state found for component: {id}"))?
.clone();
let traced_entry_points: Vec<(EntryPointWithTracingParams, TracingResult)> =
traced_entry_points
.get(component_id)
.map(|inner| {
inner
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<Vec<_>>()
})
.unwrap_or_default();
let component_with_state = ComponentWithState {
state,
component: component.clone(),
component_tvl: None,
// Neither UniswapV4 with hooks not certain balancer pools are currently supported
// for SDK testing
entrypoints: vec![],
entrypoints: traced_entry_points,
};
states.insert(component_id.clone(), component_with_state);
}
@@ -808,7 +795,7 @@ impl TestRunner {
&self,
component_tokens: &HashMap<String, Vec<Token>>,
protocol_states_by_id: &HashMap<String, ResponseProtocolState>,
start_block: u64,
stop_block: u64,
) -> miette::Result<()> {
for (id, component) in protocol_states_by_id.iter() {
let tokens = component_tokens.get(id);
@@ -833,12 +820,11 @@ impl TestRunner {
.block_on(self.rpc_provider.get_token_balance(
token_address,
component_address,
start_block,
stop_block,
))?;
if balance != node_balance {
return Err(miette!(
"Token balance mismatch for component {} and token {}",
id,
"Token balance mismatch for component {id} and token {}. Balance: {balance}, Node balance: {node_balance}",
token.symbol
));
}

View File

@@ -5,8 +5,9 @@ use tycho_simulation::{
tycho_client::{rpc::RPCClient, HttpRPCClient},
tycho_common::{
dto::{
Chain, PaginationParams, ProtocolComponent, ProtocolComponentsRequestBody,
ResponseAccount, ResponseProtocolState, ResponseToken, StateRequestBody, VersionParam,
Chain, EntryPointWithTracingParams, PaginationParams, ProtocolComponent,
ProtocolComponentsRequestBody, ResponseAccount, ResponseProtocolState, ResponseToken,
StateRequestBody, TracedEntryPointRequestBody, TracingResult, VersionParam,
},
models::token::Token,
Bytes,
@@ -153,4 +154,26 @@ impl TychoClient {
Ok(res)
}
/// Gets traced entry points from the RPC server
pub async fn get_traced_entry_points(
&self,
protocol_system: &str,
component_ids: Vec<String>,
chain: Chain,
) -> Result<HashMap<String, Vec<(EntryPointWithTracingParams, TracingResult)>>, RpcError> {
let request_body = TracedEntryPointRequestBody {
protocol_system: protocol_system.to_string(),
chain,
pagination: PaginationParams { page: 0, page_size: 100 },
component_ids: Some(component_ids),
};
let traced_entry_points = self
.http_client
.get_traced_entry_points(&request_body)
.await?;
Ok(traced_entry_points.traced_entry_points)
}
}

View File

@@ -31,7 +31,7 @@ impl TychoRunner {
end_block: u64,
protocol_type_names: &[String],
protocol_system: &str,
module_name: &str,
module_name: Option<String>,
) -> miette::Result<()> {
info!("Running Tycho indexer from block {start_block} to {end_block}...");
@@ -48,7 +48,9 @@ impl TychoRunner {
"--spkg",
spkg_path,
"--module",
module_name,
module_name
.as_deref()
.unwrap_or("map_protocol_changes"),
"--protocol-type-names",
&protocol_type_names.join(","),
"--protocol-system",

27
substreams/Cargo.lock generated
View File

@@ -35,6 +35,12 @@ version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]]
name = "base64"
version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
[[package]]
name = "bigdecimal"
version = "0.3.1"
@@ -240,7 +246,7 @@ dependencies = [
[[package]]
name = "ethereum-balancer-v2"
version = "0.4.0"
version = "0.4.1"
dependencies = [
"anyhow",
"ethabi 18.0.0",
@@ -250,7 +256,7 @@ dependencies = [
"num-bigint",
"substreams",
"substreams-ethereum",
"tycho-substreams 0.5.0",
"tycho-substreams 0.5.1",
]
[[package]]
@@ -1415,7 +1421,7 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2c7fca123abff659d15ed30da5b605fa954a29e912c94260c488d0d18f9107d"
dependencies = [
"base64",
"base64 0.13.1",
"prost 0.11.9",
"prost-types 0.11.9",
"substreams",
@@ -1489,7 +1495,7 @@ name = "substreams-helper"
version = "0.0.2"
dependencies = [
"anyhow",
"base64",
"base64 0.13.1",
"bigdecimal",
"downcast-rs",
"ethabi 18.0.0",
@@ -1512,7 +1518,7 @@ version = "0.0.2"
source = "git+https://github.com/propeller-heads/tycho-protocol-sdk.git?tag=0.4.0#cfbf6812bdc9503ff51debcf5e171cd680b4d694"
dependencies = [
"anyhow",
"base64",
"base64 0.13.1",
"bigdecimal",
"downcast-rs",
"ethabi 18.0.0",
@@ -1535,7 +1541,7 @@ version = "0.0.2"
source = "git+https://github.com/propeller-heads/tycho-protocol-sdk.git?rev=52d5021#52d502198e9aa964814ef5f139df0886c3eb7bb0"
dependencies = [
"anyhow",
"base64",
"base64 0.13.1",
"bigdecimal",
"downcast-rs",
"ethabi 18.0.0",
@@ -1558,7 +1564,7 @@ version = "0.0.2"
source = "git+https://github.com/propeller-heads/tycho-protocol-sdk.git?rev=b8aeaa3#b8aeaa3dc6e7242a5dd23681921258ef2cb3c6dd"
dependencies = [
"anyhow",
"base64",
"base64 0.13.1",
"bigdecimal",
"downcast-rs",
"ethabi 18.0.0",
@@ -1760,9 +1766,9 @@ dependencies = [
[[package]]
name = "tycho-substreams"
version = "0.5.0"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "828cbe6f7b984fefe39d8fdb4c40311e329f30ded0a70e477e8f2add4d60483d"
checksum = "a164ecbc3f2d7515e9447d7c1933a01fba6e54a082bbe1c73eb7b827d2a45b47"
dependencies = [
"ethabi 18.0.0",
"hex",
@@ -1777,8 +1783,9 @@ dependencies = [
[[package]]
name = "tycho-substreams"
version = "0.5.1"
version = "0.6.0"
dependencies = [
"base64 0.22.1",
"ethabi 18.0.0",
"hex",
"itertools 0.12.1",

View File

@@ -1,6 +1,6 @@
[package]
name = "tycho-substreams"
version = "0.5.1"
version = "0.6.0"
edition = "2021"
description = "Tycho substreams development kit, contains tycho-indexer block changes model and helper functions for common indexing tasks."
repository = "https://github.com/propeller-heads/tycho-protocol-sdk/tree/main/substreams/crates/tycho-substreams"
@@ -24,3 +24,4 @@ serde_json = "1.0.120"
[dev-dependencies]
rstest = "0.24.0"
base64 = "0.22.1"

View File

@@ -10,3 +10,7 @@ directory:
```bash
buf generate --template substreams/crates/tycho-substreams/buf.gen.yaml --output substreams/crates/tycho-substreams/
```
## Generate block test assets
To be able to write complete unit tests, we rely on full block assets. These assets can be generated using the firecore tool from Substreams. More info in [Substreams documentation](https://docs.substreams.dev/reference-material/log-and-debug#generating-the-input-of-the-test)

File diff suppressed because one or more lines are too long

View File

@@ -248,7 +248,7 @@ pub fn extract_balance_deltas_from_tx<F: Fn(&[u8], &[u8]) -> bool>(
#[cfg(test)]
mod tests {
use super::*;
use crate::{mock_store::MockStore, pb::tycho::evm::v1::BalanceDelta};
use crate::{pb::tycho::evm::v1::BalanceDelta, testing::mock_store::MockStore};
use substreams::{
pb::substreams::StoreDelta,
prelude::{StoreGet, StoreNew},

View File

@@ -1,12 +1,12 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use substreams_ethereum::pb::{
eth,
eth::v2::{block::DetailLevel, StorageChange},
use substreams_ethereum::pb::eth::{
self,
v2::{block::DetailLevel, BalanceChange, StorageChange},
};
use crate::{
models::{ContractSlot, StorageChanges, Transaction},
models::{ContractSlot, StorageChanges},
pb::tycho::evm::v1::TransactionStorageChanges,
};
@@ -30,8 +30,6 @@ pub fn get_block_storage_changes(block: &eth::v2::Block) -> Vec<TransactionStora
let mut block_storage_changes = Vec::with_capacity(block.transaction_traces.len());
for block_tx in block.transactions() {
let transaction: Transaction = block_tx.into();
let mut changes_by_address: HashMap<Vec<u8>, Vec<StorageChange>> = HashMap::new();
for storage_change in block_tx
.calls
@@ -45,10 +43,36 @@ pub fn get_block_storage_changes(block: &eth::v2::Block) -> Vec<TransactionStora
.push(storage_change.clone());
}
// For each address, sort by ordinal and collect latest changes per slot
let tx_storage_changes: Vec<StorageChanges> = changes_by_address
let mut native_balance_changes_by_address: HashMap<Vec<u8>, Vec<BalanceChange>> =
HashMap::new();
for balance_change in block_tx
.calls
.iter()
.filter(|call| !call.state_reverted)
.flat_map(|call| call.balance_changes.iter())
{
native_balance_changes_by_address
.entry(balance_change.address.clone())
.or_default()
.push(balance_change.clone());
}
// Collect all unique addresses from both storage changes and balance changes
let mut all_addresses = HashSet::new();
all_addresses.extend(changes_by_address.keys().cloned());
all_addresses.extend(
native_balance_changes_by_address
.keys()
.cloned(),
);
// For each address, collect both storage changes and balance changes
let tx_storage_changes: Vec<StorageChanges> = all_addresses
.into_iter()
.map(|(address, mut changes)| {
.map(|address| {
// Process storage changes for this address
let slots = if let Some(changes) = changes_by_address.get(&address) {
let mut changes = changes.clone();
changes.sort_unstable_by_key(|change| change.ordinal);
// Collect latest change per slot
@@ -67,16 +91,222 @@ pub fn get_block_storage_changes(block: &eth::v2::Block) -> Vec<TransactionStora
previous_value: change.old_value,
});
}
latest_changes.into_values().collect()
} else {
vec![]
};
StorageChanges { address, slots: latest_changes.into_values().collect() }
// Filter out slots that have the same value before and after the transaction
let slots = slots
.into_iter()
.filter(|slot| slot.previous_value != slot.value)
.collect();
// Process native balance changes for this address
let native_balance = native_balance_changes_by_address
.get(&address)
.and_then(|balance_changes| {
let (first, last) = balance_changes.iter().fold(
(None, None),
|(min, max): (Option<&BalanceChange>, Option<&BalanceChange>),
change| {
let new_min = match min {
None => Some(change),
Some(m) if change.ordinal < m.ordinal => Some(change),
_ => min,
};
let new_max = match max {
None => Some(change),
Some(m) if change.ordinal > m.ordinal => Some(change),
_ => max,
};
(new_min, new_max)
},
);
let balance_before_tx = first.map(|f| {
f.old_value
.as_ref()
.map(|b| b.bytes.clone())
.unwrap_or_default()
});
let balance_after_tx = last.map(|l| {
l.new_value
.as_ref()
.map(|b| b.bytes.clone())
.unwrap_or_default()
});
(balance_before_tx != balance_after_tx).then_some(balance_after_tx.clone())
})
.flatten();
StorageChanges { address, slots, native_balance }
})
.collect();
block_storage_changes.push(TransactionStorageChanges {
tx: Some(transaction),
tx: Some(block_tx.into()),
storage_changes: tx_storage_changes,
});
}
block_storage_changes
}
#[cfg(test)]
mod test {
use super::*;
use crate::testing::assets::read_block;
#[test]
fn test_get_block_storage_changes_ethereum_block_23490768() {
let block = read_block("./assets/ethereum-block-23490768.binpb.base64");
let changes = get_block_storage_changes(&block);
let mut balance_map: HashMap<String, HashMap<String, String>> = HashMap::new();
let mut storage_map: HashMap<String, HashMap<String, HashMap<String, (String, String)>>> =
HashMap::new();
for change in changes {
let tx_hash = change.tx.unwrap().hash.clone();
let balance_tx_entry = balance_map
.entry(hex::encode(tx_hash.clone()))
.or_default();
let storage_tx_entry = storage_map
.entry(hex::encode(tx_hash.clone()))
.or_default();
for storage_change in change.storage_changes {
if let Some(native_balance) = storage_change.native_balance {
balance_tx_entry.insert(
hex::encode(storage_change.address.clone()),
hex::encode(native_balance.clone()),
);
}
for slot in storage_change.slots {
let contract_tx_entry = storage_tx_entry
.entry(hex::encode(storage_change.address.clone()))
.or_default();
contract_tx_entry.insert(
hex::encode(slot.slot.clone()),
(hex::encode(slot.previous_value.clone()), hex::encode(slot.value.clone())),
);
}
}
}
// Assertions for https://etherscan.io/tx/0x44a34ba7400fa7004ec5037aeb1103a7c0cd8a83a95c4cd5cf9561c3c38db326#statechange
// Check balance changes
let balance_tx_entry = balance_map
.get("44a34ba7400fa7004ec5037aeb1103a7c0cd8a83a95c4cd5cf9561c3c38db326")
.unwrap();
assert_eq!(balance_tx_entry.len(), 4);
assert_eq!(
balance_tx_entry
.get("dadb0d80178819f2319190d340ce9a924f783711")
.unwrap(),
"052196f442fadb8314"
);
assert_eq!(
balance_tx_entry
.get("c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2")
.unwrap(),
"0207150b274902c5e7871c"
);
assert_eq!(
balance_tx_entry
.get("ad01c20d5886137e056775af56915de824c8fce5")
.unwrap(),
"c83a1d6287cb5e"
);
assert_eq!(
balance_tx_entry
.get("638f1db9881a84af9835c6625d17b0af034234ad")
.unwrap(),
"0f69303da21468"
);
// Check storage changes
let storage_tx_entry = storage_map
.get("44a34ba7400fa7004ec5037aeb1103a7c0cd8a83a95c4cd5cf9561c3c38db326")
.unwrap();
assert_eq!(storage_tx_entry.len(), 3);
let storage_tx_entry_0f9e3401a5155a02c86353c3d9b24214876779dd = HashMap::from([
(
"0000000000000000000000000000000000000000000000000000000000000009".to_string(),
(
"00000000000000000000000000000000009faeae5180599c05015fcfa242d3b0".to_string(),
"00000000000000000000000000000000009faebb96f403f1913f425b3ea446e0".to_string(),
),
),
(
"000000000000000000000000000000000000000000000000000000000000000a".to_string(),
(
"00000000000000000000000000f94f053f65617829584571d9de584cd219fb88".to_string(),
"00000000000000000000000000f94f66e6e9d8f6688d6ca53ff9baae52e11cd8".to_string(),
),
),
(
"0000000000000000000000000000000000000000000000000000000000000008".to_string(),
(
"68de8f37000000000001fb7a6a5bb2b548080000000560989aab8af59d9be89b".to_string(),
"68de8f5b000000000001fb8b2909997ca55100000005606b52e81f19442026af".to_string(),
),
),
]);
assert_eq!(
storage_tx_entry
.get("0f9e3401a5155a02c86353c3d9b24214876779dd")
.unwrap(),
&storage_tx_entry_0f9e3401a5155a02c86353c3d9b24214876779dd
);
let storage_tx_entry_11dfc652eb62c723ad8c2ae731fcede58ab07564 = HashMap::from([
(
"654f44e59f538551b5124259a61eaadb863c6c10cc9d43aa550237a76a7de0b0".to_string(),
(
"000000000000000000000000000000000000000000000077c1c5e25db942af6a".to_string(),
"0000000000000000000000000000000000000000000000a2c5f2bc08a7dea7a4".to_string(),
),
),
(
"6b12653da4ae5b17258ea9b02a62123c9305455af47b7dceea1b7137f7c69671".to_string(),
(
"0000000000000000000000000000000000000000000001454f7d5d0ce8d4a21e".to_string(),
"0000000000000000000000000000000000000000000001479313ef3e53b46bd0".to_string(),
),
),
(
"8f60e36f69a92730149f231ad2475b4aa8a8e50f4072f62a1f099ffc11d0f647".to_string(),
(
"0000000000000000000000000000000000000000000560989aab8af59d9be89b".to_string(),
"00000000000000000000000000000000000000000005606b52e81f19442026af".to_string(),
),
),
]);
assert_eq!(
storage_tx_entry
.get("11dfc652eb62c723ad8c2ae731fcede58ab07564")
.unwrap(),
&storage_tx_entry_11dfc652eb62c723ad8c2ae731fcede58ab07564
);
let storage_tx_entry_c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2 = HashMap::from([(
"77f05379c72cc19907ba9648dcd0bda409fabc68ca111b532de62ffdb67e868f".to_string(),
(
"000000000000000000000000000000000000000000000001fb7a6a5bb2b54808".to_string(),
"000000000000000000000000000000000000000000000001fb8b2909997ca551".to_string(),
),
)]);
assert_eq!(
storage_tx_entry
.get("c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2")
.unwrap(),
&storage_tx_entry_c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2
);
}
}

View File

@@ -4,11 +4,12 @@ pub mod balances;
pub mod block_storage;
pub mod contract;
pub mod entrypoint;
#[cfg(test)]
mod mock_store;
pub mod models;
pub mod pb;
#[cfg(test)]
pub mod testing;
pub mod prelude {
pub use super::models::*;
}

View File

@@ -245,6 +245,9 @@ pub struct StorageChanges {
/// The contract's storage changes
#[prost(message, repeated, tag="2")]
pub slots: ::prost::alloc::vec::Vec<ContractSlot>,
/// \[optional\] The contract's balance change
#[prost(bytes="vec", optional, tag="3")]
pub native_balance: ::core::option::Option<::prost::alloc::vec::Vec<u8>>,
}
// Aggregate entities

View File

@@ -0,0 +1,12 @@
// Read a base64 encoded asset and return a decoded protobuf struct
// Panics if the file does not exist or the base64 decoding fails
pub fn read_block<B: prost::Message + Default>(filename: &str) -> B {
use base64::Engine;
let encoded = std::fs::read_to_string(filename).expect("Failed to read file");
let raw_bytes = base64::prelude::BASE64_STANDARD
.decode(&encoded)
.expect("Failed to decode base64");
B::decode(&*raw_bytes).expect("Not able to decode Block")
}

View File

@@ -0,0 +1,2 @@
pub mod assets;
pub mod mock_store;

View File

@@ -1,6 +1,6 @@
[package]
name = "ethereum-balancer-v2"
version = "0.4.0"
version = "0.4.1"
edition = "2021"
[lib]
@@ -15,7 +15,7 @@ hex = "0.4.3"
anyhow = "1.0.75"
num-bigint = "0.4.4"
itertools = "0.12.0"
tycho-substreams = "0.5.0"
tycho-substreams = "0.5.1"
[build-dependencies]
anyhow = "1"

View File

@@ -1,6 +1,5 @@
substreams_yaml_path: ./substreams.yaml
protocol_system: "vm:balancer_v2"
module_name: "map_protocol_changes"
protocol_type_names:
- "balancer_v2_pool"
adapter_contract: "BalancerV2SwapAdapter"

View File

@@ -1,7 +1,7 @@
specVersion: v0.1.0
package:
name: "ethereum_balancer_v2"
version: v0.4.0
version: v0.4.1
url: "https://github.com/propeller-heads/tycho-protocol-sdk/tree/main/substreams/ethereum-balancer-v2"
protobuf:

View File

@@ -1,6 +1,5 @@
substreams_yaml_path: ./substreams.yaml
protocol_system: "vm:curve"
module_name: "map_protocol_changes"
protocol_type_names:
- "curve_pool"
adapter_contract: "CurveAdapter"

View File

@@ -1,7 +1,6 @@
substreams_yaml_path: ./substreams.yaml
protocol_system: "ekubo_v2"
module_name: "map_protocol_changes"
adapter_contract: "EkuboSwapAdapter"
adapter_build_signature: "constructor(address)"
adapter_build_args: "0x16e186ecdc94083fff53ef2a41d46b92a54f61e2"
skip_balance_check: true # Fails because the pool id is not the owner of the tokens. Needs to be address in the testing framework.

View File

@@ -1,6 +1,5 @@
substreams_yaml_path: ./ethereum-maverick-v2.yaml
protocol_system: "vm:maverick_v2"
module_name: "map_protocol_changes"
adapter_contract: "MaverickV2SwapAdapter"
adapter_build_signature: "constructor(address,address)"
adapter_build_args: "0x0A7e848Aca42d879EF06507Fca0E7b33A0a63c1e,0xb40AfdB85a07f37aE217E7D6462e609900dD8D7A"

View File

@@ -0,0 +1,20 @@
substreams_yaml_path: ./ethereum-pancakeswap-v3.yaml
protocol_type_names:
- "pancakeswap_v3_pool"
module_name: "map_protocol_changes"
skip_balance_check: false
initialized_accounts:
tests:
- name: test_usdt_usdf_pool
start_block: 22187893
stop_block: 22187895
expected_components:
- id: "0x0d9ea0d5e3f400b1df8f695be04292308c041e77"
tokens:
- "0xfa2b947eec368f42195f24f36d2af29f7c24cec2" # USDf
- "0xdac17f958d2ee523a2206206994597c13d831ec7" # USDT
static_attributes:
fee: "0x64"
creation_tx: "0x87a9c643b0836ee7e7d7863d4f4d97310d14c438cb49bc8771c3d7a9d5a2749f"
skip_simulation: false
skip_execution: true

View File

@@ -6,7 +6,7 @@ adapter_contract: "SwapAdapter"
adapter_build_signature: "constructor(address)"
# A comma separated list of args to be passed to the contructor of the Adapter contract"
adapter_build_args: "0x0000000000000000000000000000000000000000"
# Whether or not the testing script should skip checking balances of the protocol components.
# Whether the testing script should skip checking balances of the protocol components.
# If set to `true` please always add a reason why it's skipped.
skip_balance_check: false
# Accounts that will be automatically initialized at test start
@@ -29,6 +29,8 @@ protocol_type_names:
- "type_name_1"
- "type_name_2"
# A list of tests.
# The name of the protocol system
protocol_system: "protocol_name"
tests:
# Name of the test
- name: test_pool_creation
@@ -45,11 +47,14 @@ tests:
- "0xdac17f958d2ee523a2206206994597c13d831ec7"
- "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"
- "0x6b175474e89094c44da98b954eedeac495271d0f"
static_attributes: {}
static_attributes: { }
creation_tx: "0x20793bbf260912aae189d5d261ff003c9b9166da8191d8f9d63ff1c7722f3ac6"
# Whether or not the script should skip trying to simulate a swap on this component.
# Whether the script should skip trying to simulate a swap on this component.
# If set to `true` please always add a reason why it's skipped.
skip_simulation: false
# Whether the script should skip trying to simulate execution of a swap on this component.
# If set to `true` please always add a reason why it's skipped.
skip_execution: false
- name: test_something_else
start_block: 123
stop_block: 456
@@ -58,6 +63,7 @@ tests:
tokens:
- "0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE"
- "0xae7ab96520DE3A18E5e111B5EaAb095312D7fE84"
static_attributes: {}
static_attributes: { }
creation_tx: "0xfac67ecbd423a5b915deff06045ec9343568edaec34ae95c43d35f2c018afdaa"
skip_simulation: true # If true, always add a reason
skip_execution: true # If true, always add a reason

View File

@@ -6,7 +6,7 @@ adapter_contract: "SwapAdapter"
adapter_build_signature: "constructor(address)"
# A comma separated list of args to be passed to the contructor of the Adapter contract"
adapter_build_args: "0x0000000000000000000000000000000000000000"
# Whether or not the testing script should skip checking balances of the protocol components.
# Whether the testing script should skip checking balances of the protocol components.
# If set to `true` please always add a reason why it's skipped.
skip_balance_check: false
# A list of accounts that need to be indexed to run the tests properly.
@@ -20,6 +20,8 @@ initialized_accounts:
protocol_type_names:
- "type_name_1"
- "type_name_2"
# The name of the protocol system
protocol_system: "protocol_name"
# A list of tests.
tests:
# Name of the test
@@ -37,11 +39,14 @@ tests:
- "0xdac17f958d2ee523a2206206994597c13d831ec7"
- "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"
- "0x6b175474e89094c44da98b954eedeac495271d0f"
static_attributes: {}
static_attributes: { }
creation_tx: "0x20793bbf260912aae189d5d261ff003c9b9166da8191d8f9d63ff1c7722f3ac6"
# Whether or not the script should skip trying to simulate a swap on this component.
# Whether the script should skip trying to simulate a swap on this component.
# If set to `true` please always add a reason why it's skipped.
skip_simulation: false
# Whether the script should skip trying to simulate execution of a swap on this component.
# If set to `true` please always add a reason why it's skipped.
skip_execution: false
- name: test_something_else
start_block: 123
stop_block: 456
@@ -50,6 +55,7 @@ tests:
tokens:
- "0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE"
- "0xae7ab96520DE3A18E5e111B5EaAb095312D7fE84"
static_attributes: {}
static_attributes: { }
creation_tx: "0xfac67ecbd423a5b915deff06045ec9343568edaec34ae95c43d35f2c018afdaa"
skip_simulation: true # If true, always add a reason
skip_execution: true # If true, always add a reason

View File

@@ -3,18 +3,18 @@ protocol_system: "uniswap_v2"
protocol_type_names:
- "uniswap_v2_pool"
module_name: "map_pool_events"
skip_balance_check: true
skip_balance_check: false
initialized_accounts:
tests:
- name: test_spx_weth_pool
start_block: 17924533
stop_block: 17924534
- name: test_wolf_weth_pool
start_block: 19701395
stop_block: 19701397
expected_components:
- id: "0x52c77b0CB827aFbAD022E6d6CAF2C44452eDbc39"
- id: "0x67324985b5014b36b960273353deb3d96f2f18c2"
tokens:
- "0xe0f63a424a4439cbe457d80e4f4b51ad25b2c56c"
- "0x67466BE17df832165F8C80a5A120CCc652bD7E69"
- "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"
static_attributes:
fee: "0x1e"
creation_tx: "0xf09ac8ad7e21d15ded627a176ec718903baae5e5a9ce671a611bd852691b24f9"
creation_tx: "0x5e01ae1522722340871708a3c55e1395dda647a57767781230317319fa36ea7b"
skip_simulation: false

View File

@@ -1,2 +0,0 @@
export RPC_URL=https://mainnet.infura.io/v3/your-infura-key
export SUBSTREAMS_API_TOKEN="changeme"

View File

@@ -1,6 +0,0 @@
# Substreams Testing
This package provides a comprehensive testing suite for Substreams modules. The testing suite is designed to facilitate
end-to-end testing, ensuring that your Substreams modules function as expected.
For more information on Substreams, please refer to the [Testing documentation](https://docs.propellerheads.xyz/tycho/for-dexs/protocol-integration-sdk/indexing/general-integration-steps/4.-testing)

View File

View File

@@ -1,19 +0,0 @@
version: "3.1"
services:
db:
build:
context: .
dockerfile: postgres.Dockerfile
restart: "always"
environment:
POSTGRES_PASSWORD: mypassword
POSTGRES_DATABASE: tycho_indexer_0
POSTGRES_USERNAME: postgres
POSTGRESQL_SHARED_PRELOAD_LIBRARIES: pg_cron
ports:
- "5431:5432"
shm_size: "1gb"
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
postgres_data:

View File

@@ -1,25 +0,0 @@
# This Dockerfile creates a custom postgres image used for CI and local deployment.
# This is required because we use some postgres extensions that aren't in the generic
# Postgres image such as pg_partman or pg_cron.
# As an image with pg_partman already exist, we start from this one and add pg_cron
# and possibly other extensions on top of that.
FROM ghcr.io/dbsystel/postgresql-partman:15-5
ARG PGCRON_VERSION="1.6.2"
USER root
RUN apk update && apk add --no-cache wget build-base clang19 llvm19
RUN cd /tmp \
&& wget "https://github.com/citusdata/pg_cron/archive/refs/tags/v${PGCRON_VERSION}.tar.gz" \
&& tar zxf v${PGCRON_VERSION}.tar.gz \
&& cd pg_cron-${PGCRON_VERSION} \
&& make \
&& make install \
&& cd .. && rm -r pg_cron-${PGCRON_VERSION} v${PGCRON_VERSION}.tar.gz
# Add configuration to postgresql.conf template
# Start with postgres database, then switch to tycho_indexer_0 after it's created
RUN echo "shared_preload_libraries = 'pg_partman_bgw,pg_cron'" >> /usr/local/share/postgresql/postgresql.conf.sample \
&& echo "cron.database_name = 'tycho_indexer_0'" >> /usr/local/share/postgresql/postgresql.conf.sample
# Stay as root user for PostgreSQL to work properly
# USER 1001

View File

@@ -1,6 +0,0 @@
psycopg2==2.9.9
PyYAML==6.0.1
Requests==2.32.2
web3==5.31.3
git+https://github.com/propeller-heads/tycho-indexer.git@0.74.0#subdirectory=tycho-client-py
git+https://github.com/propeller-heads/tycho-simulation.git@0.118.0#subdirectory=tycho_simulation_py

View File

@@ -1,45 +0,0 @@
#!/bin/bash
# To run: ./setup_env.sh
set -e
command_exists() {
command -v "$1" >/dev/null 2>&1
}
# Check each dependency is installed
deps=("git" "rustc" "gcc" "openssl" "conda" "pip" "pg_config")
names=("git" "rust" "gcc" "openssl" "conda" "pip" "libpq")
for i in "${!deps[@]}"; do
if ! command_exists "${deps[$i]}"; then
echo "Error: '${names[$i]}' is not installed."
exit 1
fi
done
echo "All dependencies are installed. Proceeding with setup..."
# Variables
ENV_NAME="tycho-protocol-sdk-testing"
PYTHON_VERSION="3.9"
# Get the directory where this script is located
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
ROOT_DIR="$( cd "$SCRIPT_DIR/.." && pwd )" # Assuming the script is in a subdirectory of the root
REQUIREMENTS_FILE="$ROOT_DIR/testing/requirements.txt"
# Create conda environment
echo "Creating conda environment ${ENV_NAME} with Python ${PYTHON_VERSION}..."
conda create --name $ENV_NAME python=$PYTHON_VERSION -y
# Activate the environment
echo "Activating the environment..."
eval "$(conda shell.bash hook)"
conda activate $ENV_NAME
# Install the requirements
echo "Installing the requirements from ${REQUIREMENTS_FILE}..."
pip install -r $REQUIREMENTS_FILE --index-url https://pypi.org/simple
conda activate $ENV_NAME
echo "----------------------------------------"
echo "SETUP COMPLETE."
echo "Run 'conda activate $ENV_NAME' to activate the environment."

View File

@@ -1,65 +0,0 @@
import os
import subprocess
from typing import Optional
class AdapterContractBuilder:
def __init__(self, src_path: str):
self.src_path = src_path
def find_contract(self, adapter_contract: str):
"""
Finds the contract file in the provided source path.
:param adapter_contract: The contract name to be found.
:return: The path to the contract file.
"""
contract_path = os.path.join(
self.src_path,
"out",
f"{adapter_contract}.sol",
f"{adapter_contract}.evm.runtime",
)
if not os.path.exists(contract_path):
raise FileNotFoundError(f"Contract {adapter_contract} not found.")
return contract_path
def build_target(
self, adapter_contract: str, signature: Optional[str], args: Optional[str]
) -> str:
"""
Runs the buildRuntime Bash script in a subprocess with the provided arguments.
:param src_path: Path to the script to be executed.
:param adapter_contract: The contract name to be passed to the script.
:param signature: The constructor signature to be passed to the script.
:param args: The constructor arguments to be passed to the script.
:return: The path to the contract file.
"""
script_path = "scripts/buildRuntime.sh"
cmd = [script_path, "-c", adapter_contract]
if signature:
cmd.extend(["-s", signature, "-a", args])
try:
# Running the bash script with the provided arguments
result = subprocess.run(
cmd,
cwd=self.src_path,
capture_output=True,
text=True,
check=True,
)
# Print standard output and error for debugging
print("Output:\n", result.stdout)
if result.stderr:
print("Errors:\n", result.stderr)
return self.find_contract(adapter_contract)
except subprocess.CalledProcessError as e:
print(f"An error occurred: {e}")
print("Error Output:\n", e.stderr)

View File

@@ -1,29 +0,0 @@
import argparse
from runner import TestRunner
def main() -> None:
parser = argparse.ArgumentParser(
description="Run indexer within a specified range of blocks"
)
parser.add_argument("--package", type=str, help="Name of the package to test.")
parser.add_argument("--tycho-logs", action="store_true", help="Enable Tycho logs.")
parser.add_argument(
"--db-url",
default="postgres://postgres:mypassword@localhost:5431/tycho_indexer_0",
type=str,
help="Postgres database URL for the Tycho indexer. Default: postgres://postgres:mypassword@localhost:5431/tycho_indexer_0",
)
parser.add_argument(
"--vm-traces", action="store_true", help="Enable tracing during vm simulations."
)
args = parser.parse_args()
test_runner = TestRunner(
args.package, args.tycho_logs, db_url=args.db_url, vm_traces=args.vm_traces
)
test_runner.run_tests()
if __name__ == "__main__":
main()

View File

@@ -1,61 +0,0 @@
import os
from web3 import Web3
native_aliases = [
"0x0000000000000000000000000000000000000000",
"0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee",
]
erc20_abi = [
{
"constant": True,
"inputs": [{"name": "_owner", "type": "address"}],
"name": "balanceOf",
"outputs": [{"name": "balance", "type": "uint256"}],
"type": "function",
}
]
def get_token_balance(token_address, wallet_address, block_number):
rpc_url = os.getenv("RPC_URL")
if rpc_url is None:
raise EnvironmentError("RPC_URL environment variable not set")
web3 = Web3(Web3.HTTPProvider(rpc_url))
if not web3.isConnected():
raise ConnectionError("Failed to connect to the Ethereum node")
# Check if the token_address is a native token alias
if token_address.lower() in native_aliases:
balance = web3.eth.get_balance(
Web3.toChecksumAddress(wallet_address), block_identifier=block_number
)
else:
contract = web3.eth.contract(
address=Web3.toChecksumAddress(token_address), abi=erc20_abi
)
balance = contract.functions.balanceOf(
Web3.toChecksumAddress(wallet_address)
).call(block_identifier=block_number)
return balance
def get_block_header(block_number):
rpc_url = os.getenv("RPC_URL")
if rpc_url is None:
raise EnvironmentError("RPC_URL environment variable not set")
web3 = Web3(Web3.HTTPProvider(rpc_url))
if not web3.isConnected():
raise ConnectionError("Failed to connect to the Ethereum node")
block = web3.eth.get_block(block_number)
return block

View File

@@ -1,128 +0,0 @@
import difflib
from hexbytes import HexBytes
from pydantic import BaseModel, Field, validator
from typing import List, Dict, Optional
class ProtocolComponentExpectation(BaseModel):
"""Represents a ProtocolComponent with its main attributes."""
id: str = Field(..., description="Identifier of the protocol component")
tokens: List[HexBytes] = Field(
...,
description="List of token addresses associated with the protocol component",
)
static_attributes: Optional[Dict[str, HexBytes]] = Field(
default_factory=dict, description="Static attributes of the protocol component"
)
creation_tx: HexBytes = Field(
..., description="Hash of the transaction that created the protocol component"
)
@validator("id", pre=True, always=True)
def lower_id(cls, v):
return v.lower()
@validator("tokens", pre=True, always=True)
def convert_tokens_to_hexbytes(cls, v):
return sorted(HexBytes(t.lower()) for t in v)
@validator("static_attributes", pre=True, always=True)
def convert_static_attributes_to_hexbytes(cls, v):
if v:
return {
k: v[k] if isinstance(v[k], HexBytes) else HexBytes(v[k].lower())
for k in v
}
return {}
@validator("creation_tx", pre=True, always=True)
def convert_creation_tx_to_hexbytes(cls, v):
return HexBytes(v.lower())
def compare(
self, other: "ProtocolComponentExpectation", colorize_output: bool = True
) -> Optional[str]:
"""Compares the current instance with another ProtocolComponent instance and returns a message with the
differences or None if there are no differences."""
def colorize_diff(diff):
colored_diff = []
for line in diff:
if line.startswith("-"):
colored_diff.append(f"\033[91m{line}\033[0m") # Red
elif line.startswith("+"):
colored_diff.append(f"\033[92m{line}\033[0m") # Green
elif line.startswith("?"):
colored_diff.append(f"\033[93m{line}\033[0m") # Yellow
else:
colored_diff.append(line)
return "\n".join(colored_diff)
differences = []
for field_name, field_value in self.__dict__.items():
other_value = getattr(other, field_name, None)
if field_value != other_value:
diff = list(difflib.ndiff([str(field_value)], [str(other_value)]))
highlighted_diff = (
colorize_diff(diff) if colorize_output else "\n".join(diff)
)
differences.append(
f"Field '{field_name}' mismatch for {self.id}:\n{highlighted_diff}"
)
if not differences:
return None
return "\n".join(differences)
class ProtocolComponentWithTestConfig(ProtocolComponentExpectation):
"""Represents a ProtocolComponent with its main attributes and test configuration."""
skip_simulation: Optional[bool] = Field(
False,
description="Flag indicating whether to skip simulation for this component",
)
class IntegrationTest(BaseModel):
"""Configuration for an individual test."""
name: str = Field(..., description="Name of the test")
start_block: int = Field(..., description="Starting block number for the test")
stop_block: int = Field(..., description="Stopping block number for the test")
initialized_accounts: Optional[List[str]] = Field(
None, description="List of initialized account addresses"
)
expected_components: List[ProtocolComponentWithTestConfig] = Field(
..., description="List of protocol components expected in the indexed state"
)
class IntegrationTestsConfig(BaseModel):
"""Main integration test configuration."""
substreams_yaml_path: str = Field(
"./substreams.yaml", description="Path of the Substreams YAML file"
)
adapter_contract: str = Field(
..., description="Name of the SwapAdapter contract for this protocol"
)
adapter_build_signature: Optional[str] = Field(
None, description="SwapAdapter's constructor signature"
)
adapter_build_args: Optional[str] = Field(
None, description="Arguments for the SwapAdapter constructor"
)
initialized_accounts: Optional[List[str]] = Field(
None,
description="List of initialized account addresses. These accounts will be initialized for every tests",
)
skip_balance_check: bool = Field(
..., description="Flag to skip balance check for all tests"
)
protocol_type_names: List[str] = Field(
..., description="List of protocol type names for the tested protocol"
)
tests: List[IntegrationTest] = Field(..., description="List of integration tests")

View File

@@ -1,418 +0,0 @@
import itertools
import os
import shutil
import subprocess
import traceback
from datetime import datetime
from decimal import Decimal
from pathlib import Path
from typing import Optional, Callable, Any
import yaml
from tycho_simulation_py.evm.decoders import ThirdPartyPoolTychoDecoder
from tycho_simulation_py.evm.storage import TychoDBSingleton
from tycho_simulation_py.models import EVMBlock
from pydantic import BaseModel
from tycho_indexer_client.dto import (
Chain,
ProtocolComponentsParams,
ProtocolStateParams,
ContractStateParams,
ProtocolComponent,
ResponseProtocolState,
HexBytes,
ResponseAccount,
Snapshot,
TracedEntryPointParams,
)
from tycho_indexer_client.rpc_client import TychoRPCClient
from models import (
IntegrationTestsConfig,
ProtocolComponentWithTestConfig,
ProtocolComponentExpectation,
)
from adapter_builder import AdapterContractBuilder
from evm import get_token_balance, get_block_header
from tycho import TychoRunner
from utils import build_snapshot_message, token_factory
class TestResult:
def __init__(
self, success: bool, step: Optional[str] = None, message: Optional[str] = None
):
self.success = success
self.step = step
self.message = message
@classmethod
def Passed(cls):
return cls(success=True)
@classmethod
def Failed(cls, step: str, message: str):
return cls(success=False, step=step, message=message)
def parse_config(yaml_path: str) -> IntegrationTestsConfig:
with open(yaml_path, "r") as file:
yaml_content = yaml.safe_load(file)
return IntegrationTestsConfig(**yaml_content)
class SimulationFailure(BaseModel):
pool_id: str
sell_token: str
buy_token: str
error: str
class TestRunner:
def __init__(
self, package: str, with_binary_logs: bool, db_url: str, vm_traces: bool
):
self.repo_root = os.getcwd()
config_path = os.path.join(
self.repo_root, "substreams", package, "integration_test.tycho.yaml"
)
self.config: IntegrationTestsConfig = parse_config(config_path)
self.spkg_src = os.path.join(self.repo_root, "substreams", package)
self.adapter_contract_builder = AdapterContractBuilder(
os.path.join(self.repo_root, "evm")
)
self.tycho_runner = TychoRunner(
db_url, with_binary_logs, self.config.initialized_accounts
)
self.tycho_rpc_client = TychoRPCClient()
self._token_factory_func = token_factory(self.tycho_rpc_client)
self.db_url = db_url
self._vm_traces = vm_traces
self._chain = Chain.ethereum
def run_tests(self) -> None:
"""Run all tests specified in the configuration."""
print(f"Running {len(self.config.tests)} tests ...\n")
print("--------------------------------\n")
failed_tests: list[str] = []
count = 1
for test in self.config.tests:
print(f"TEST {count}: {test.name}")
self.tycho_runner.empty_database(self.db_url)
spkg_path = self.build_spkg(
os.path.join(self.spkg_src, self.config.substreams_yaml_path),
lambda data: self.update_initial_block(data, test.start_block),
)
self.tycho_runner.run_tycho(
spkg_path,
test.start_block,
test.stop_block,
self.config.protocol_type_names,
test.initialized_accounts or [],
)
result: TestResult = self.tycho_runner.run_with_rpc_server(
self.validate_state,
test.expected_components,
test.stop_block,
test.initialized_accounts or [],
)
if result.success:
print(f"\n{test.name} passed.\n")
else:
failed_tests.append(test.name)
print(f"\n❗️ {test.name} failed on {result.step}: {result.message}\n")
print("--------------------------------\n")
count += 1
print(
"\nTests finished! \n"
f"RESULTS: {len(self.config.tests) - len(failed_tests)}/{len(self.config.tests)} passed.\n"
)
if failed_tests:
print("Failed tests:")
for failed_test in failed_tests:
print(f"- {failed_test}")
print("\n")
def validate_state(
self,
expected_components: list[ProtocolComponentWithTestConfig],
stop_block: int,
initialized_accounts: list[str],
) -> TestResult:
"""Validate the current protocol state against the expected state."""
protocol_components = self.tycho_rpc_client.get_protocol_components(
ProtocolComponentsParams(protocol_system="test_protocol")
).protocol_components
protocol_states = self.tycho_rpc_client.get_protocol_state(
ProtocolStateParams(protocol_system="test_protocol")
).states
components_by_id: dict[str, ProtocolComponent] = {
component.id: component for component in protocol_components
}
try:
# Step 1: Validate the protocol components
step = "Protocol component validation"
for expected_component in expected_components:
comp_id = expected_component.id.lower()
if comp_id not in components_by_id:
return TestResult.Failed(
step=step,
message=f"'{comp_id}' not found in protocol components. "
f"Available components: {set(components_by_id.keys())}",
)
diff = ProtocolComponentExpectation(
**components_by_id[comp_id].dict()
).compare(ProtocolComponentExpectation(**expected_component.dict()))
if diff is not None:
return TestResult.Failed(step=step, message=diff)
print(f"\n{step} passed.\n")
# Step 2: Validate the token balances
step = "Token balance validation"
if not self.config.skip_balance_check:
for component in protocol_components:
comp_id = component.id.lower()
for token in component.tokens:
state = next(
(
s
for s in protocol_states
if s.component_id.lower() == comp_id
),
None,
)
if state:
balance_hex = state.balances.get(token, HexBytes("0x00"))
else:
balance_hex = HexBytes("0x00")
tycho_balance = int(balance_hex)
node_balance = get_token_balance(token, comp_id, stop_block)
if node_balance != tycho_balance:
return TestResult.Failed(
step=step,
message=f"Balance mismatch for {comp_id}:{token} at block {stop_block}: got {node_balance} "
f"from rpc call and {tycho_balance} from Substreams",
)
print(f"\n{step} passed.\n")
else:
print(f"\n {step} skipped. \n")
# Step 3: Validate the simulation
step = "Simulation validation"
# Loads from Tycho-Indexer the state of all the contracts that are related to the protocol components.
simulation_components: list[str] = [
c.id for c in expected_components if c.skip_simulation is False
]
related_contracts: set[str] = set()
for account in self.config.initialized_accounts or []:
related_contracts.add(account)
for account in initialized_accounts or []:
related_contracts.add(account)
# Collect all contracts that are related to the simulation components
filtered_components: list[ProtocolComponent] = []
component_related_contracts: set[str] = set()
for component in protocol_components:
# Filter out components that are not set to be used for the simulation
if component.id in simulation_components:
# Collect component contracts
for a in component.contract_ids:
component_related_contracts.add(a.hex())
# Collect DCI detected contracts
traces_results = self.tycho_rpc_client.get_traced_entry_points(
TracedEntryPointParams(
protocol_system="test_protocol",
component_ids=[component.id],
)
).traced_entry_points.values()
for traces in traces_results:
for _, trace in traces:
component_related_contracts.update(
trace["accessed_slots"].keys()
)
filtered_components.append(component)
# Check if any of the initialized contracts are not listed as component contract dependencies
unspecified_contracts: list[str] = [
c for c in related_contracts if c not in component_related_contracts
]
related_contracts.update(component_related_contracts)
contract_states = self.tycho_rpc_client.get_contract_state(
ContractStateParams(contract_ids=list(related_contracts))
).accounts
if len(filtered_components):
if len(unspecified_contracts):
print(
f"⚠️ The following initialized contracts are not listed as component contract dependencies: {unspecified_contracts}. "
f"Please ensure that, if they are required for this component's simulation, they are specified under the Protocol Component's contract field."
)
simulation_failures = self.simulate_get_amount_out(
stop_block, protocol_states, filtered_components, contract_states
)
if len(simulation_failures):
error_msgs: list[str] = []
for pool_id, failures in simulation_failures.items():
failures_formatted: list[str] = [
f"{f.sell_token} -> {f.buy_token}: {f.error}"
for f in failures
]
error_msgs.append(
f"Pool {pool_id} failed simulations: {', '.join(failures_formatted)}"
)
return TestResult.Failed(step=step, message="\n".join(error_msgs))
print(f"\n{step} passed.\n")
else:
print(f"\n {step} skipped.\n")
return TestResult.Passed()
except Exception as e:
error_message = f"An error occurred: {str(e)}\n" + traceback.format_exc()
return TestResult.Failed(step=step, message=error_message)
def simulate_get_amount_out(
self,
block_number: int,
protocol_states: list[ResponseProtocolState],
protocol_components: list[ProtocolComponent],
contract_states: list[ResponseAccount],
) -> dict[str, list[SimulationFailure]]:
TychoDBSingleton.initialize()
block_header = get_block_header(block_number)
block: EVMBlock = EVMBlock(
id=block_number,
ts=datetime.fromtimestamp(block_header.timestamp),
hash_=block_header.hash.hex(),
)
failed_simulations: dict[str, list[SimulationFailure]] = {}
try:
adapter_contract = self.adapter_contract_builder.find_contract(
self.config.adapter_contract
)
except FileNotFoundError:
adapter_contract = self.adapter_contract_builder.build_target(
self.config.adapter_contract,
self.config.adapter_build_signature,
self.config.adapter_build_args,
)
TychoDBSingleton.clear_instance()
decoder = ThirdPartyPoolTychoDecoder(
token_factory_func=self._token_factory_func,
adapter_contract=adapter_contract,
minimum_gas=0,
trace=self._vm_traces,
)
snapshot_message: Snapshot = build_snapshot_message(
protocol_states, protocol_components, contract_states
)
decoded = decoder.decode_snapshot(snapshot_message, block)
for component in protocol_components:
if component.id not in decoded:
failed_simulations[component.id] = [
SimulationFailure(
pool_id=component.id,
sell_token=component.tokens[0].hex(),
buy_token=component.tokens[1].hex(),
error="Pool not found in decoded state.",
)
]
for pool_state in decoded.values():
pool_id = pool_state.id_
if not pool_state.balances:
raise ValueError(f"Missing balances for pool {pool_id}")
for sell_token, buy_token in itertools.permutations(pool_state.tokens, 2):
for prctg in ["0.001", "0.01", "0.1"]:
# Try to sell 0.1% of the protocol balance
try:
sell_amount = (
Decimal(prctg) * pool_state.balances[sell_token.address]
)
amount_out, gas_used, _ = pool_state.get_amount_out(
sell_token, sell_amount, buy_token
)
print(
f"Amount out for {pool_id}: {sell_amount} {sell_token} -> {amount_out} {buy_token} - "
f"Gas used: {gas_used}"
)
except Exception as e:
print(
f"Error simulating get_amount_out for {pool_id}: {sell_token} -> {buy_token} at block {block_number}. "
f"Error: {e}"
)
if pool_id not in failed_simulations:
failed_simulations[pool_id] = []
failed_simulations[pool_id].append(
SimulationFailure(
pool_id=pool_id,
sell_token=str(sell_token),
buy_token=str(buy_token),
error=str(e),
)
)
continue
return failed_simulations
@staticmethod
def build_spkg(
yaml_file_path: str, modify_func: Callable[[dict[str, Any]], None]
) -> str:
"""Build a Substreams package with modifications to the YAML file."""
backup_file_path = f"{yaml_file_path}.backup"
shutil.copy(yaml_file_path, backup_file_path)
with open(yaml_file_path, "r") as file:
data = yaml.safe_load(file)
modify_func(data)
spkg_name = f"{yaml_file_path.rsplit('/', 1)[0]}/{data['package']['name'].replace('_', '-')}-{data['package']['version']}.spkg"
with open(yaml_file_path, "w") as file:
yaml.dump(data, file, default_flow_style=False)
try:
result = subprocess.run(
["substreams", "pack", yaml_file_path], capture_output=True, text=True
)
if result.returncode != 0:
print("Substreams pack command failed:", result.stderr)
except Exception as e:
print(f"Error running substreams pack command: {e}")
shutil.copy(backup_file_path, yaml_file_path)
Path(backup_file_path).unlink()
return spkg_name
@staticmethod
def update_initial_block(data: dict[str, Any], start_block: int) -> None:
"""Update the initial block for all modules in the configuration data."""
for module in data["modules"]:
module["initialBlock"] = start_block

View File

@@ -1,211 +0,0 @@
import signal
import subprocess
import threading
import time
import psycopg2
from psycopg2 import sql
import os
def find_binary_file(file_name):
# Define usual locations for binary files in Unix-based systems
locations = [
"/bin",
"/sbin",
"/usr/bin",
"/usr/sbin",
"/usr/local/bin",
"/usr/local/sbin",
]
# Add user's local bin directory if it exists
home = os.path.expanduser("~")
if os.path.exists(home + "/.local/bin"):
locations.append(home + "/.local/bin")
# Check each location
for location in locations:
potential_path = os.path.join(location, file_name)
if os.path.exists(potential_path):
return potential_path
# If binary is not found in the usual locations, return None
searched_paths = "\n".join(locations)
raise RuntimeError(
f"Unable to locate {file_name} binary. Searched paths:\n{searched_paths}"
)
binary_path = find_binary_file("tycho-indexer")
class TychoRunner:
def __init__(
self,
db_url: str,
with_binary_logs: bool = False,
initialized_accounts: list[str] = None,
):
self.with_binary_logs = with_binary_logs
self._db_url = db_url
self._initialized_accounts = initialized_accounts or []
def run_tycho(
self,
spkg_path: str,
start_block: int,
end_block: int,
protocol_type_names: list,
initialized_accounts: list,
protocol_system: str = "test_protocol",
) -> None:
"""Run the Tycho indexer with the specified SPKG and block range."""
env = os.environ.copy()
env["RUST_LOG"] = "tycho_indexer=info"
all_accounts = self._initialized_accounts + initialized_accounts
try:
process = subprocess.Popen(
[
binary_path,
"--database-url",
self._db_url,
"run",
"--spkg",
spkg_path,
"--module",
"map_protocol_changes",
"--protocol-type-names",
",".join(protocol_type_names),
"--protocol-system",
protocol_system,
"--start-block",
str(start_block),
"--stop-block",
# +2 is to make up for the cache in the index side.
str(end_block + 2),
"--dci-plugin",
"rpc",
]
+ (
[
"--initialized-accounts",
",".join(all_accounts),
"--initialization-block",
str(start_block),
]
if all_accounts
else []
),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
env=env,
)
with process.stdout:
for line in iter(process.stdout.readline, ""):
if line and self.with_binary_logs:
print(line.strip())
with process.stderr:
for line in iter(process.stderr.readline, ""):
if line and self.with_binary_logs:
print(line.strip())
process.wait()
except Exception as e:
print(f"Error running Tycho indexer: {e}")
def run_with_rpc_server(self, func: callable, *args, **kwargs):
"""
Run a function with Tycho RPC running in background.
This function is a wrapper around a target function. It starts Tycho RPC as a background task, executes the target function and stops Tycho RPC.
"""
stop_event = threading.Event()
process = None
def run_rpc_server():
nonlocal process
try:
env = os.environ.copy()
env["RUST_LOG"] = "info"
process = subprocess.Popen(
[binary_path, "--database-url", self._db_url, "rpc"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
env=env,
)
# Read remaining stdout and stderr
if self.with_binary_logs:
for output in process.stdout:
if output:
print(output.strip())
for error_output in process.stderr:
if error_output:
print(error_output.strip())
process.wait()
if process.returncode != 0:
print("Command failed with return code:", process.returncode)
except Exception as e:
print(f"An error occurred while running the command: {e}")
finally:
if process and process.poll() is None:
process.terminate()
process.wait()
# Start the RPC server in a separate thread
rpc_thread = threading.Thread(target=run_rpc_server)
rpc_thread.start()
time.sleep(3) # Wait for the RPC server to start
try:
# Run the provided function
result = func(*args, **kwargs)
return result
finally:
stop_event.set()
if process and process.poll() is None:
process.send_signal(signal.SIGINT)
if rpc_thread.is_alive():
rpc_thread.join()
@staticmethod
def empty_database(db_url: str) -> None:
"""Drop and recreate the Tycho indexer database."""
try:
conn = psycopg2.connect(db_url[: db_url.rfind("/")])
conn.autocommit = True
cursor = conn.cursor()
cursor.execute(
sql.SQL("DROP DATABASE IF EXISTS {} WITH (FORCE)").format(
sql.Identifier("tycho_indexer_0")
)
)
cursor.execute(
sql.SQL("CREATE DATABASE {}").format(sql.Identifier("tycho_indexer_0"))
)
except psycopg2.Error as e:
print(f"Database error: {e}")
finally:
if cursor:
cursor.close()
if conn:
conn.close()

View File

@@ -1,79 +0,0 @@
from logging import getLogger
from typing import Union
from eth_utils import to_checksum_address
from tycho_simulation_py.models import EthereumToken
from tycho_indexer_client.dto import (
ResponseProtocolState,
ProtocolComponent,
ResponseAccount,
ComponentWithState,
Snapshot,
HexBytes,
TokensParams,
PaginationParams,
)
from tycho_indexer_client.rpc_client import TychoRPCClient
log = getLogger(__name__)
def build_snapshot_message(
protocol_states: list[ResponseProtocolState],
protocol_components: list[ProtocolComponent],
account_states: list[ResponseAccount],
) -> Snapshot:
vm_storage = {state.address: state for state in account_states}
states = {}
for component in protocol_components:
pool_id = component.id
states[pool_id] = {"component": component}
for state in protocol_states:
pool_id = state.component_id
if pool_id not in states:
continue
states[pool_id]["state"] = state
states = {id_: ComponentWithState(**state) for id_, state in states.items()}
return Snapshot(states=states, vm_storage=vm_storage)
def token_factory(rpc_client: TychoRPCClient) -> callable(HexBytes):
_client = rpc_client
_token_cache: dict[str, EthereumToken] = {}
def factory(requested_addresses: Union[str, list[str]]) -> list[EthereumToken]:
if not isinstance(requested_addresses, list):
requested_addresses = [to_checksum_address(requested_addresses)]
else:
requested_addresses = [to_checksum_address(a) for a in requested_addresses]
response = dict()
to_fetch = []
for address in requested_addresses:
if address in _token_cache:
response[address] = _token_cache[address]
else:
to_fetch.append(address)
if to_fetch:
pagination = PaginationParams(page_size=len(to_fetch), page=0)
params = TokensParams(token_addresses=to_fetch, pagination=pagination)
tokens = _client.get_tokens(params).tokens
for token in tokens:
address = to_checksum_address(token.address)
eth_token = EthereumToken(
symbol=token.symbol,
address=address,
decimals=token.decimals,
gas=token.gas,
)
response[address] = eth_token
_token_cache[address] = eth_token
return [response[address] for address in requested_addresses]
return factory