BalancerV3: SwapAdapter and Substreams (#126)

* feat: add balancer swapAdapter and Substreams

* fix: undo tycho-substreams logs, ignore abi on rustmft

* ci: prevent warnings from failing CI

* ci: skip size check on CI

* chore: forge fmt

* feat: vault balance from storage

Vault contract tokenBalance message are set according to the vault
storage changes in the `_reserveOf` storage variable VaultStorage.sol
contract
This was the culprit that caused the failure in simulation since
balancer enforces the invariant that `token.balanceOf(vault_addr) == _reservesOf[token]`

* ci: warnings

* fix: avoid duplicated balance changes

* fix: order by ordinal

* chore: format

* feat: extract new contracts before extracting balance changes

* feat: skip unnecessary steps if no balance change is found

* refactor: filter out account balances for tokens that aren't part of any protocol components.

On the indexer side, when we receive an account balance, we need to know about the token. This commit ensure that the token was introduced before we emit any account balance with it.

* refactor: don't index liquidity buffers.

Liquidity buffers rely on rate providers. Therefore we need DCI (feature to be able to index previously created contract) to deal with them.

* refactor: cleanup tests and add docstrings

* chore: lock tycho-substreams version

* ci: set Foundry workflow to use stable foundry

* feat(DCI): Add DCI Entrypoints to BalancerV3 components (#218)

* refactor: fix typo in weighted_pool_factory_contract name

* feat: add rate_providers static attributes

* feat: add DCI entrypoints to BalancerV3 components

* fix: set default trade price to Fraction(0, 1)

* feat: remove buffers as components

Buffers are to be used internally by Boosted pools (stable/weighted pools that use ERC4626 tokens). They are not to be treated as a separate swap component.

* test: update test blocks

Extend tests some tests block range to ensure liquidity was added to the pool and can be simulated on

* feat: remove buffers as components

Remove balance updates for buffer components

* feat: listen for pool pause/unpause events

* chore: formating

* fix: encoding call data

* test: update Balancer V3 tests to use DCI

* test: set indexer log level to info

* docs: add comment on support of boosted pools

* feat: update balancer v3 package version

---------

Co-authored-by: Thales <thales@datarevenue.com>
Co-authored-by: zizou <111426680+flopell@users.noreply.github.com>
Co-authored-by: Louise Poole <louise@datarevenue.com>
Co-authored-by: Louise Poole <louisecarmenpoole@gmail.com>
This commit is contained in:
mrBovo
2025-06-26 12:19:39 +02:00
committed by GitHub
parent dfa87f662d
commit fc0fb1e540
43 changed files with 25137 additions and 14 deletions

67
substreams/Cargo.lock generated
View File

@@ -174,7 +174,7 @@ dependencies = [
"serde_json",
"sha3",
"thiserror 1.0.69",
"uint",
"uint 0.9.5",
]
[[package]]
@@ -191,7 +191,7 @@ dependencies = [
"serde_json",
"sha3",
"thiserror 1.0.69",
"uint",
"uint 0.9.5",
]
[[package]]
@@ -253,6 +253,29 @@ dependencies = [
"tycho-substreams 0.2.2",
]
[[package]]
name = "ethereum-balancer-v3"
version = "0.4.0"
dependencies = [
"anyhow",
"bytes",
"ethabi 18.0.0",
"getrandom",
"hex",
"hex-literal 0.4.1",
"itertools 0.13.0",
"keccak-hash",
"num-bigint",
"prost 0.11.9",
"prost-types 0.12.6",
"regex",
"serde",
"serde_qs",
"substreams",
"substreams-ethereum",
"tycho-substreams 0.2.1 (git+https://github.com/propeller-heads/tycho-protocol-sdk.git?rev=51995f9)",
]
[[package]]
name = "ethereum-curve"
version = "0.3.3"
@@ -417,7 +440,7 @@ dependencies = [
"impl-rlp",
"impl-serde 0.3.2",
"primitive-types 0.11.1",
"uint",
"uint 0.9.5",
]
[[package]]
@@ -431,7 +454,7 @@ dependencies = [
"impl-rlp",
"impl-serde 0.4.0",
"primitive-types 0.12.2",
"uint",
"uint 0.9.5",
]
[[package]]
@@ -766,6 +789,16 @@ dependencies = [
"cpufeatures",
]
[[package]]
name = "keccak-hash"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e1b8590eb6148af2ea2d75f38e7d29f5ca970d5a4df456b3ef19b8b415d0264"
dependencies = [
"primitive-types 0.13.1",
"tiny-keccak",
]
[[package]]
name = "lazy_static"
version = "1.5.0"
@@ -973,7 +1006,7 @@ dependencies = [
"impl-codec",
"impl-rlp",
"impl-serde 0.3.2",
"uint",
"uint 0.9.5",
]
[[package]]
@@ -986,7 +1019,17 @@ dependencies = [
"impl-codec",
"impl-rlp",
"impl-serde 0.4.0",
"uint",
"uint 0.9.5",
]
[[package]]
name = "primitive-types"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d15600a7d856470b7d278b3fe0e311fe28c2526348549f8ef2ff7db3299c87f5"
dependencies = [
"fixed-hash 0.8.0",
"uint 0.10.0",
]
[[package]]
@@ -1738,6 +1781,18 @@ dependencies = [
"static_assertions",
]
[[package]]
name = "uint"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "909988d098b2f738727b161a106cfc7cab00c539c2687a8836f8e565976fb53e"
dependencies = [
"byteorder",
"crunchy",
"hex",
"static_assertions",
]
[[package]]
name = "unicode-ident"
version = "1.0.14"

View File

@@ -16,6 +16,7 @@ members = [
"ethereum-uniswap-v4",
"ethereum-ekubo-v2",
"ethereum-maverick-v2",
"ethereum-balancer-v3",
]
resolver = "2"

View File

@@ -14,9 +14,9 @@ use crate::{
models::{InterimContractChange, TransactionChanges},
prelude::TransactionChangesBuilder,
};
use substreams_ethereum::pb::{
eth,
eth::v2::{block::DetailLevel, CallType, TransactionTrace},
use substreams_ethereum::pb::eth::{
self,
v2::{block::DetailLevel, CallType, TransactionTrace},
};
/// Extracts and aggregates contract changes from a block.

1330
substreams/ethereum-balancer-v3/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,34 @@
[package]
name = "ethereum-balancer-v3"
version = "0.4.0"
edition = "2021"
[lib]
name = "ethereum_balancer_v3"
crate-type = ["cdylib"]
[dependencies]
substreams = "0.5.22"
substreams-ethereum = "0.9.9"
prost = "0.11"
prost-types = "0.12.3"
hex-literal = "0.4.1"
ethabi = "18.0.0"
hex = "0.4.3"
bytes = "1.5.0"
anyhow = "1.0.75"
num-bigint = "0.4.4"
tycho-substreams = { git = "https://github.com/propeller-heads/tycho-protocol-sdk.git", rev = "51995f9" }
serde = { version = "1.0", features = ["derive"] }
serde_qs = "0.13.0"
itertools = "0.13.0"
keccak-hash = "0.11.0"
[build-dependencies]
anyhow = "1"
substreams-ethereum = "0.9"
regex = "1.10.0"
# Required so that ethabi > ethereum-types build correctly under wasm32-unknown-unknown
[target.wasm32-unknown-unknown.dependencies]
getrandom = { version = "0.2", features = ["custom"] }

View File

@@ -0,0 +1,38 @@
# balancer_v3 Substreams modules
This package was initialized via `substreams init`, using the `evm-events-calls` template.
## Usage
```bash
substreams build
substreams auth
substreams gui # Get streaming!
substreams registry login # Login to substreams.dev
substreams registry publish # Publish your Substreams to substreams.dev
```
## Modules
All of these modules produce data filtered by these contracts:
- _vault_ at **0xba1333333333a1ba1108e8412f11850a5c319ba9**
- _stable_pool_factory_ at **0xb9d01ca61b9c181da1051bfdd28e1097e920ab14**
- _weighted_pool_factory_ at **0x201efd508c8dfe9de1a13c2452863a78cb2a86cc**
- stable_pool contracts created from _stable_pool_factory_
- weighted_pool contracts created from _weighted_pool_factory_
### `map_events_calls`
This module gets you events _and_ calls
### `map_events`
This module gets you only events that matched.
### `map_calls`
This module gets you only calls that matched.

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,11 @@
version: v1
plugins:
- plugin: buf.build/community/neoeinstein-prost:v0.4.0
out: src/pb
opt:
- file_descriptor_set=false
- plugin: buf.build/community/neoeinstein-prost-crate:v0.4.1
out: src/pb
opt:
- no_features

View File

@@ -0,0 +1,50 @@
#![allow(clippy::all)]
use anyhow::{Ok, Result};
use regex::Regex;
use std::fs;
use substreams_ethereum::Abigen;
fn main() -> Result<(), anyhow::Error> {
let file_names = [
"abi/vault_contract.abi.json",
"abi/stable_pool_factory_contract.abi.json",
"abi/weighted_pool_factory_contract.abi.json",
"abi/stable_pool_contract.abi.json",
"abi/weighted_pool_contract.abi.json",
];
let file_output_names = [
"src/abi/vault_contract.rs",
"src/abi/stable_pool_factory_contract.rs",
"src/abi/weighted_pool_factory_contract.rs",
"src/abi/stable_pool_contract.rs",
"src/abi/weighted_pool_contract.rs",
];
let mut i = 0;
for f in file_names {
let contents = fs::read_to_string(f).expect("Should have been able to read the file");
// sanitize fields and attributes starting with an underscore
let regex = Regex::new(r#"("\w+"\s?:\s?")_(\w+")"#).unwrap();
let sanitized_abi_file = regex.replace_all(contents.as_str(), "${1}u_${2}");
// sanitize fields and attributes with multiple consecutive underscores
let re = Regex::new(r"_+").unwrap();
let re_sanitized_abi_file =
re.replace_all(&sanitized_abi_file, |caps: &regex::Captures| {
let count = caps[0].len();
let replacement = format!("{}_", "_u".repeat(count - 1));
replacement
});
Abigen::from_bytes("Contract", re_sanitized_abi_file.as_bytes())?
.generate()?
.write_to_file(file_output_names[i])?;
i = i + 1;
}
Ok(())
}

View File

@@ -0,0 +1,110 @@
substreams_yaml_path: ./substreams.yaml
protocol_type_names:
- "balancer_v3_pool"
adapter_contract: "BalancerV3SwapAdapter"
adapter_build_signature: "constructor(address,address,address,address)"
adapter_build_args: "0xbA1333333333a1BA1108E8412f11850A5C319bA9,0x136f1EFcC3f8f88516B9E94110D56FDBfB1778d1,0x000000000022D473030F116dDEE9F6B43aC78BA3,0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
skip_balance_check: true # Skipped because this Balancer V3 uses a Vault, the current version of the testing module doesn't support this.
# vault address for v3 needed for the router contract
initialized_accounts:
- "0xbA1333333333a1BA1108E8412f11850A5C319bA9"
tests:
- name: test_stable_pool
start_block: 21374757
stop_block: 21374777
expected_components:
- id: "0xc4ce391d82d164c166df9c8336ddf84206b2f812"
tokens:
- "0x0FE906e030a44eF24CA8c7dC7B7c53A6C4F00ce9"
- "0x775F661b0bD1739349b9A2A3EF60be277c5d2D29"
static_attributes:
manual_updates: "0x01"
pool_type: "0x537461626c65506f6f6c466163746f7279"
bpt: "0xc4ce391d82d164c166df9c8336ddf84206b2f812"
fee: "0x5af3107a4000"
rate_providers: "0x5b22307866346235643163323266333561343630623931656464376633336365666536313965326661616634222c22307863646161363863653332323732386665343138356136306631303363313934663165326334376263225d"
skip_simulation: false
creation_tx: "0xecfe638ea155e78ca2698bea0bee5f86c2f77abe3e9dc93ed19ab6e674165b95"
- name: test_weighted_pool_creation
start_block: 21701410
stop_block: 21701420
expected_components:
- id: "0x4Fd081923824D6AbdaCc862d8449e124A8634b12"
tokens:
- "0x249ca82617ec3dfb2589c4c17ab7ec9765350a18"
- "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"
static_attributes:
manual_updates: "0x01"
normalized_weights: "0x5b22307830623161326263326563353030303030222c22307830326336386166306262313430303030225d"
fee: "0x038d7ea4c68000"
pool_type: "0x5765696768746564506f6f6c466163746f7279"
skip_simulation: true # This test is only for creation, simulation tests are below
creation_tx: "0xaf0f8da4b7b148f156605673c6ac56657f67c47001aadd5b3b64d6cb787daad4"
- name: test_erc4626_pool_creation # stable pool
start_block: 21375196
stop_block: 21375216
expected_components:
- id: "0x89BB794097234E5E930446C0CeC0ea66b35D7570"
tokens:
- "0x7bc3485026ac48b6cf9baf0a377477fff5703af8"
- "0xd4fa2d31b7968e448877f69a96de69f5de8cd23e"
static_attributes:
manual_updates: "0x01"
pool_type: "0x537461626c65506f6f6c466163746f7279"
bpt: "0x89bb794097234e5e930446c0cec0ea66b35d7570"
fee: "0x2d79883d2000"
rate_providers: "0x5b22307865646636336363653462613730636265373430363462373638373838326537316562623065393838222c22307838663465383433396239373033363336343834323163363932646438393766623963306264316439225d"
skip_simulation: false
creation_tx: "0x536baa8f18a5f9e32b08f695593f47d996a509edc886ccab6f89b001f2d2b4e4"
- name: test_swap_g_usdc # weighted pool
start_block: 21389576
stop_block: 21389676
expected_components:
- id: "0xf91c11BA4220b7a72E1dc5E92f2b48D3fdF62726"
tokens:
- "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"
- "0x440017A1b021006d556d7fc06A54c32E42Eb745B"
static_attributes:
pool_type: "0x5765696768746564506f6f6c466163746f7279"
manual_updates: "0x01"
normalized_weights: "0x5b22307830366630356235396433623230303030222c22307830366630356235396433623230303030225d"
fee: "0x0aa87bee538000"
skip_simulation: false
creation_tx: "0x50a58ca41af90029a67bc7f4305cceb1e85800246f9c559fa74d8625ddfe14ed"
- name: test_obs_weth # weighted pool
start_block: 21420490
stop_block: 21420516
expected_components:
- id: "0x4403a2721A9A9956584dc19F553720CEf0Df35b0"
tokens:
- "0xafd9268fdfebee905f7439b12c943bc18ad293c2"
- "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"
static_attributes:
pool_type: "0x5765696768746564506f6f6c466163746f7279"
manual_updates: "0x01"
normalized_weights: "0x5b22307830646264326663313337613330303030222c2230783233383666323666633130303030225d"
fee: "0x038d7ea4c68000"
skip_simulation: false
creation_tx: "0x8540c2bfbc16dc2db44bc96dd32c2901eb2080d27f959b5831d96cd99903523e"
- name: test_tbc_verse
start_block: 21722567
stop_block: 21722950
expected_components:
- id: "0x82074b99f3E927658BcAFd81F9948d89192CDF91"
tokens:
- "0x18084fba666a33d37592fa2633fd49a74dd93a88"
- "0x249ca82617ec3dfb2589c4c17ab7ec9765350a18"
static_attributes:
pool_type: "0x5765696768746564506f6f6c466163746f7279"
manual_updates: "0x01"
normalized_weights: "0x5b22307830326336386166306262313430303030222c22307830623161326263326563353030303030225d"
fee: "0x038d7ea4c68000"
skip_simulation: false
creation_tx: "0x214064cfcf41f7b94b4f9cdd35644b851e19ba246822b7d8f2bdcc3d6c06074e"

View File

@@ -0,0 +1,7 @@
#![allow(clippy::all)]
pub mod stable_pool_contract;
pub mod stable_pool_factory_contract;
pub mod vault_contract;
pub mod weighted_pool_contract;
pub mod weighted_pool_factory_contract;

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,3 @@
mod abi;
mod modules;
mod pool_factories;

View File

@@ -0,0 +1,541 @@
use crate::{
abi::vault_contract::{
events::{LiquidityAdded, LiquidityRemoved, PoolPausedStateChanged, Swap},
functions::{Erc4626BufferWrapOrUnwrap, SendTo, Settle},
},
pool_factories,
};
use anyhow::Result;
use itertools::Itertools;
use keccak_hash::keccak;
use std::collections::HashMap;
use substreams::{
hex, log,
pb::substreams::StoreDeltas,
store::{
StoreAddBigInt, StoreGet, StoreGetInt64, StoreGetProto, StoreNew, StoreSetIfNotExists,
StoreSetIfNotExistsInt64, StoreSetIfNotExistsProto,
},
};
use substreams_ethereum::{
pb::eth::{self, v2::StorageChange},
Event, Function,
};
use tycho_substreams::{
attributes::json_deserialize_address_list, balances::aggregate_balances_changes,
block_storage::get_block_storage_changes, contract::extract_contract_changes_builder,
entrypoint::create_entrypoint, models::entry_point_params::TraceData, prelude::*,
};
pub const VAULT_ADDRESS: &[u8] = &hex!("bA1333333333a1BA1108E8412f11850A5C319bA9");
pub const VAULT_EXTENSION_ADDRESS: &[u8; 20] = &hex!("0E8B07657D719B86e06bF0806D6729e3D528C9A9");
pub const BATCH_ROUTER_ADDRESS: &[u8; 20] = &hex!("136f1efcc3f8f88516b9e94110d56fdbfb1778d1");
pub const PERMIT_2_ADDRESS: &[u8; 20] = &hex!("000000000022D473030F116dDEE9F6B43aC78BA3");
#[substreams::handlers::map]
pub fn map_components(block: eth::v2::Block) -> Result<BlockTransactionProtocolComponents> {
let mut tx_components = Vec::new();
for tx in block.transactions() {
let mut components = Vec::new();
for (log, call) in tx.logs_with_calls() {
if let Some(component) =
pool_factories::address_map(log.address.as_slice(), log, call.call)
{
components.push(component);
}
}
if !components.is_empty() {
tx_components.push(TransactionProtocolComponents { tx: Some(tx.into()), components });
}
}
Ok(BlockTransactionProtocolComponents { tx_components })
}
/// Simply stores the `ProtocolComponent`s with the pool address as the key and the pool id as value
#[substreams::handlers::store]
pub fn store_components(
map: BlockTransactionProtocolComponents,
store: StoreSetIfNotExistsProto<ProtocolComponent>,
) {
map.tx_components
.into_iter()
.for_each(|tx_pc| {
tx_pc
.components
.into_iter()
.for_each(|pc| store.set_if_not_exists(0, format!("pool:{0}", &pc.id), &pc))
});
}
/// Set of token that are used by BalancerV3. This is used to filter out account balances updates
/// for unknown tokens.
#[substreams::handlers::store]
pub fn store_token_set(map: BlockTransactionProtocolComponents, store: StoreSetIfNotExistsInt64) {
map.tx_components
.into_iter()
.for_each(|tx_pc| {
tx_pc
.components
.into_iter()
.for_each(|pc| {
pc.tokens
.into_iter()
.for_each(|token| store.set_if_not_exists(0, hex::encode(token), &1))
})
});
}
#[substreams::handlers::map]
pub fn map_relative_balances(
block: eth::v2::Block,
store: StoreGetProto<ProtocolComponent>,
) -> Result<BlockBalanceDeltas, anyhow::Error> {
let balance_deltas = block
.logs()
.filter(|log| log.address() == VAULT_ADDRESS)
.flat_map(|vault_log| {
let mut deltas = Vec::new();
if let Some(Swap { pool, token_in, token_out, amount_in, amount_out, .. }) =
Swap::match_and_decode(vault_log.log)
{
let component_id = format!("0x{}", hex::encode(pool));
log::info!(
"swap at component id: {:?} with key: {:?}",
component_id,
format!("pool:{}", &component_id)
);
if store
.get_last(format!("pool:{}", &component_id))
.is_some()
{
deltas.extend_from_slice(&[
BalanceDelta {
ord: vault_log.ordinal(),
tx: Some(vault_log.receipt.transaction.into()),
token: token_in.to_vec(),
delta: amount_in.to_signed_bytes_be(),
component_id: component_id.as_bytes().to_vec(),
},
BalanceDelta {
ord: vault_log.ordinal(),
tx: Some(vault_log.receipt.transaction.into()),
token: token_out.to_vec(),
delta: amount_out.neg().to_signed_bytes_be(),
component_id: component_id.as_bytes().to_vec(),
},
]);
}
}
if let Some(LiquidityAdded { pool, amounts_added_raw, .. }) =
LiquidityAdded::match_and_decode(vault_log.log)
{
let component_id = format!("0x{}", hex::encode(pool));
if let Some(component) = store.get_last(format!("pool:{}", &component_id)) {
if component.tokens.len() != amounts_added_raw.len() {
panic!(
"liquidity added to pool with different number of tokens than expected"
);
}
log::info!(
"liquidity added at component id: {:?} with key: {:?} with tokens: {:?}",
component_id,
format!("pool:{}", &component_id),
component.tokens
);
let deltas_from_added_liquidity = amounts_added_raw
.into_iter()
.zip(component.tokens.iter())
.map(|(amount, token)| BalanceDelta {
ord: vault_log.ordinal(),
tx: Some(vault_log.receipt.transaction.into()),
token: token.to_vec(),
delta: amount.to_signed_bytes_be(),
component_id: component_id.as_bytes().to_vec(),
})
.collect::<Vec<_>>();
deltas.extend_from_slice(&deltas_from_added_liquidity);
}
}
if let Some(LiquidityRemoved { pool, amounts_removed_raw, .. }) =
LiquidityRemoved::match_and_decode(vault_log.log)
{
let component_id = format!("0x{}", hex::encode(pool));
log::info!(
"liquidity removed at component id: {:?} with key: {:?}",
component_id,
format!("pool:{}", &component_id)
);
if let Some(component) = store.get_last(format!("pool:{}", &component_id)) {
if component.tokens.len() != amounts_removed_raw.len() {
panic!(
"liquidity removed from pool with different number of tokens than expected"
);
}
let deltas_from_removed_liquidity = amounts_removed_raw
.into_iter()
.zip(component.tokens.iter())
.map(|(amount, token)| BalanceDelta {
ord: vault_log.ordinal(),
tx: Some(vault_log.receipt.transaction.into()),
token: token.to_vec(),
delta: amount.neg().to_signed_bytes_be(),
component_id: component_id.as_bytes().to_vec(),
})
.collect::<Vec<_>>();
deltas.extend_from_slice(&deltas_from_removed_liquidity);
}
}
deltas
})
.collect::<Vec<_>>();
Ok(BlockBalanceDeltas { balance_deltas })
}
/// It's significant to include both the `pool_id` and the `token_id` for each balance delta as the
/// store key to ensure that there's a unique balance being tallied for each.
#[substreams::handlers::store]
pub fn store_balances(deltas: BlockBalanceDeltas, store: StoreAddBigInt) {
tycho_substreams::balances::store_balance_changes(deltas, store);
}
/// This is the main map that handles most of the indexing of this substream.
/// Every contract change is grouped by transaction index via the `transaction_changes`
/// map. Each block of code will extend the `TransactionChanges` struct with the
/// cooresponding changes (balance, component, contract), inserting a new one if it doesn't exist.
/// At the very end, the map can easily be sorted by index to ensure the final
/// `BlockChanges` is ordered by transactions properly.
#[substreams::handlers::map]
pub fn map_protocol_changes(
block: eth::v2::Block,
grouped_components: BlockTransactionProtocolComponents,
deltas: BlockBalanceDeltas,
components_store: StoreGetProto<ProtocolComponent>,
tokens_store: StoreGetInt64,
balance_store: StoreDeltas, // Note, this map module is using the `deltas` mode for the store.
) -> Result<BlockChanges> {
// We merge contract changes by transaction (identified by transaction index) making it easy to
// sort them at the very end.
let mut transaction_changes: HashMap<_, TransactionChangesBuilder> = HashMap::new();
// Handle pool pause state changes
block
.logs()
.filter(|log| log.address() == VAULT_ADDRESS)
.for_each(|log| {
if let Some(PoolPausedStateChanged { pool, paused }) =
PoolPausedStateChanged::match_and_decode(log)
{
let component_id = format!("0x{}", hex::encode(&pool));
let tx: Transaction = log.receipt.transaction.into();
if components_store
.get_last(format!("pool:{}", &component_id))
.is_some()
{
let builder = transaction_changes
.entry(tx.index)
.or_insert_with(|| TransactionChangesBuilder::new(&tx));
let entity_change = EntityChanges {
component_id,
attributes: vec![Attribute {
name: "paused".to_string(),
value: vec![1u8],
change: if paused {
ChangeType::Creation.into()
} else {
ChangeType::Deletion.into()
},
}],
};
builder.add_entity_change(&entity_change);
}
}
});
// `ProtocolComponents` are gathered from `map_pools_created` which just need a bit of work to
// convert into `TransactionChanges`
let default_attributes = vec![
Attribute {
// TODO: remove this and track account_balances instead
name: "balance_owner".to_string(),
value: VAULT_ADDRESS.to_vec(),
change: ChangeType::Creation.into(),
},
Attribute {
name: "stateless_contract_addr_0".into(),
value: address_to_bytes_with_0x(VAULT_EXTENSION_ADDRESS),
change: ChangeType::Creation.into(),
},
Attribute {
name: "stateless_contract_addr_1".into(),
value: address_to_bytes_with_0x(BATCH_ROUTER_ADDRESS),
change: ChangeType::Creation.into(),
},
Attribute {
name: "stateless_contract_addr_2".into(),
value: address_to_bytes_with_0x(PERMIT_2_ADDRESS),
change: ChangeType::Creation.into(),
},
Attribute {
name: "update_marker".to_string(),
value: vec![1u8],
change: ChangeType::Creation.into(),
},
];
grouped_components
.tx_components
.iter()
.for_each(|tx_component| {
// initialise builder if not yet present for this tx
let tx = tx_component.tx.as_ref().unwrap();
let builder = transaction_changes
.entry(tx.index)
.or_insert_with(|| TransactionChangesBuilder::new(tx));
// iterate over individual components created within this tx
tx_component
.components
.iter()
.for_each(|component| {
let rate_providers = component
.static_att
.iter()
.find(|att| att.name == "rate_providers")
.map(|att| json_deserialize_address_list(&att.value));
if let Some(rate_providers) = rate_providers {
for rate_provider in rate_providers {
let trace_data = TraceData::Rpc(RpcTraceData {
caller: None,
calldata: hex::decode("679aefce").unwrap(), // getRate()
});
let (entrypoint, entrypoint_params) = create_entrypoint(
rate_provider,
"getRate()".to_string(),
component.id.clone(),
trace_data,
);
builder.add_entrypoint(&entrypoint);
builder.add_entrypoint_params(&entrypoint_params);
}
}
builder.add_protocol_component(component);
let entity_change = EntityChanges {
component_id: component.id.clone(),
attributes: default_attributes.clone(),
};
builder.add_entity_change(&entity_change)
});
});
// Balance changes are gathered by the `StoreDelta` based on `PoolBalanceChanged` creating
// `BlockBalanceDeltas`. We essentially just process the changes that occurred to the `store`
// this block. Then, these balance changes are merged onto the existing map of tx contract
// changes, inserting a new one if it doesn't exist.
aggregate_balances_changes(balance_store, deltas)
.iter()
.for_each(|(_, (tx, balances))| {
let builder = transaction_changes
.entry(tx.index)
.or_insert_with(|| TransactionChangesBuilder::new(tx));
balances
.values()
.for_each(|token_bc_map| {
token_bc_map.values().for_each(|bc| {
builder.add_balance_change(bc);
})
});
});
// Extract and insert any storage changes that happened for any of the components.
extract_contract_changes_builder(
&block,
|addr| {
components_store
.get_last(format!("pool:0x{0}", hex::encode(addr)))
.is_some() ||
addr.eq(VAULT_ADDRESS)
},
&mut transaction_changes,
);
// Extract token balances for balancer v3 vault
block
.transaction_traces
.iter()
.for_each(|tx| {
let vault_balance_change_per_tx =
get_vault_reserves(tx, &components_store, &tokens_store);
if !vault_balance_change_per_tx.is_empty() {
let tycho_tx = Transaction::from(tx);
let builder = transaction_changes
.entry(tx.index.into())
.or_insert_with(|| TransactionChangesBuilder::new(&tycho_tx));
let mut vault_contract_tlv_changes =
InterimContractChange::new(VAULT_ADDRESS, false);
for (token_addr, reserve_value) in vault_balance_change_per_tx {
vault_contract_tlv_changes.upsert_token_balance(
token_addr.as_slice(),
reserve_value.value.as_slice(),
);
}
builder.add_contract_changes(&vault_contract_tlv_changes);
}
});
transaction_changes
.iter_mut()
.for_each(|(_, change)| {
// this indirection is necessary due to borrowing rules.
let addresses = change
.changed_contracts()
.map(|e| e.to_vec())
.collect::<Vec<_>>();
addresses
.into_iter()
.for_each(|address| {
if address != VAULT_ADDRESS {
// We reconstruct the component_id from the address here
let id = components_store
.get_last(format!("pool:0x{}", hex::encode(address)))
.map(|c| c.id)
.unwrap(); // Shouldn't happen because we filter by known components
// in `extract_contract_changes_builder`
change.mark_component_as_updated(&id);
}
})
});
let block_storage_changes = get_block_storage_changes(&block);
// Process all `transaction_changes` for final output in the `BlockChanges`,
// sorted by transaction index (the key).
Ok(BlockChanges {
block: Some((&block).into()),
changes: transaction_changes
.drain()
.sorted_unstable_by_key(|(index, _)| *index)
.filter_map(|(_, builder)| builder.build())
.collect::<Vec<_>>(),
storage_changes: block_storage_changes,
})
}
/// Converts address bytes into a Vec<u8> containing a leading `0x`.
fn address_to_bytes_with_0x(address: &[u8; 20]) -> Vec<u8> {
address_to_string_with_0x(address).into_bytes()
}
/// Converts address bytes into a string containing a leading `0x`.
fn address_to_string_with_0x(address: &[u8]) -> String {
format!("0x{}", hex::encode(address))
}
// function needed to match reservesOf in vault storage, which by definition
// they should always be equal to the `token.balanceOf(this)` except during unlock
fn get_vault_reserves(
transaction: &eth::v2::TransactionTrace,
store: &StoreGetProto<ProtocolComponent>,
token_store: &StoreGetInt64,
) -> HashMap<Vec<u8>, ReserveValue> {
// reservesOf mapping for the current block Address -> Balance
let mut reserves_of = HashMap::new();
transaction
.calls
.iter()
.filter(|call| !call.state_reverted)
.filter(|call| call.address == VAULT_ADDRESS)
.for_each(|call| {
if let Some(Settle { token, .. }) = Settle::match_and_decode(call) {
for change in &call.storage_changes {
add_change_if_accounted(
&mut reserves_of,
change,
token.as_slice(),
token_store,
);
}
}
if let Some(SendTo { token, .. }) = SendTo::match_and_decode(call) {
for change in &call.storage_changes {
add_change_if_accounted(
&mut reserves_of,
change,
token.as_slice(),
token_store,
);
}
}
if let Some(Erc4626BufferWrapOrUnwrap { params }) =
Erc4626BufferWrapOrUnwrap::match_and_decode(call)
{
for change in &call.storage_changes {
let wrapped_token = params.2.clone();
let component_id = format!("0x{}", hex::encode(&wrapped_token));
if let Some(component) = store.get_last(component_id) {
let underlying_token = component.tokens[1].clone();
add_change_if_accounted(
&mut reserves_of,
change,
wrapped_token.as_slice(),
token_store,
);
add_change_if_accounted(
&mut reserves_of,
change,
underlying_token.as_slice(),
token_store,
);
}
}
}
});
reserves_of
}
struct ReserveValue {
ordinal: u64,
value: Vec<u8>,
}
fn add_change_if_accounted(
reserves_of: &mut HashMap<Vec<u8>, ReserveValue>,
change: &StorageChange,
token_address: &[u8],
token_store: &StoreGetInt64,
) {
let slot_key = get_storage_key_for_token(token_address);
// record changes happening on vault contract at reserves_of storage key
if change.key == slot_key && token_store.has_last(hex::encode(token_address)) {
reserves_of
.entry(token_address.to_vec())
.and_modify(|v| {
if v.ordinal < change.ordinal {
v.value = change.new_value.clone();
v.ordinal = change.ordinal;
}
})
.or_insert(ReserveValue { value: change.new_value.clone(), ordinal: change.ordinal });
}
}
// token_addr -> keccak256(abi.encode(token_address, 8)) as 8 is the order in which reserves of are
// declared
fn get_storage_key_for_token(token_address: &[u8]) -> Vec<u8> {
let mut input = [0u8; 64];
input[12..32].copy_from_slice(token_address);
input[63] = 8u8;
let result = keccak(input.as_slice())
.as_bytes()
.to_vec();
result
}

View File

@@ -0,0 +1,119 @@
use crate::{abi, modules::VAULT_ADDRESS};
use abi::{
stable_pool_factory_contract::{
events::PoolCreated as StablePoolCreated, functions::Create as StablePoolCreate,
},
weighted_pool_factory_contract::{
events::PoolCreated as WeightedPoolCreated, functions::Create as WeightedPoolCreate,
},
};
use substreams::{hex, scalar::BigInt};
use substreams_ethereum::{
pb::eth::v2::{Call, Log},
Event, Function,
};
use tycho_substreams::{
attributes::{json_serialize_address_list, json_serialize_bigint_list},
prelude::*,
};
// Token config: (token_address, rate, rate_provider_address, is_exempt_from_yield_fees)
type TokenConfig = Vec<(Vec<u8>, substreams::scalar::BigInt, Vec<u8>, bool)>;
pub fn collect_rate_providers(tokens: &TokenConfig) -> Vec<Vec<u8>> {
tokens
.iter()
.filter(|token| token.1 == BigInt::from(1)) // WITH_RATE == 1
.map(|token| token.2.clone())
.collect::<Vec<_>>()
}
pub fn address_map(
pool_factory_address: &[u8],
log: &Log,
call: &Call,
) -> Option<ProtocolComponent> {
match *pool_factory_address {
hex!("201efd508c8DfE9DE1a13c2452863A78CB2a86Cc") => {
let WeightedPoolCreate {
tokens: token_config,
normalized_weights,
swap_fee_percentage,
..
} = WeightedPoolCreate::match_and_decode(call)?;
let WeightedPoolCreated { pool } = WeightedPoolCreated::match_and_decode(log)?;
let rate_providers = collect_rate_providers(&token_config);
// TODO: to add "buffers" support for boosted pools, we need to add the unwrapped
// version of all ERC4626 tokens to the pool tokens list. Skipped for now - we need
// to test that the adapter supports it correctly and ERC4626 overwrites are handled
// correctly in simulation.
let tokens = token_config
.into_iter()
.map(|t| t.0)
.collect::<Vec<_>>();
let normalized_weights_bytes =
json_serialize_bigint_list(normalized_weights.as_slice());
let fee_bytes = swap_fee_percentage.to_signed_bytes_be();
let rate_providers_bytes = json_serialize_address_list(rate_providers.as_slice());
let mut attributes = vec![
("pool_type", "WeightedPoolFactory".as_bytes()),
("normalized_weights", &normalized_weights_bytes),
("fee", &fee_bytes),
("manual_updates", &[1u8]),
];
if !rate_providers.is_empty() {
attributes.push(("rate_providers", &rate_providers_bytes));
}
Some(
ProtocolComponent::new(&format!("0x{}", hex::encode(&pool)))
.with_contracts(&[pool, VAULT_ADDRESS.to_vec()])
.with_tokens(tokens.as_slice())
.with_attributes(&attributes)
.as_swap_type("balancer_v3_pool", ImplementationType::Vm),
)
}
hex!("B9d01CA61b9C181dA1051bFDd28e1097e920AB14") => {
let StablePoolCreate { tokens: token_config, swap_fee_percentage, .. } =
StablePoolCreate::match_and_decode(call)?;
let StablePoolCreated { pool } = StablePoolCreated::match_and_decode(log)?;
let rate_providers = collect_rate_providers(&token_config);
// TODO: to add "buffers" support for boosted pools, we need to add the unwrapped
// version of all ERC4626 tokens to the pool tokens list. Skipped for now - we need
// to test that the adapter supports it correctly and ERC4626 overwrites are handled
// correctly in simulation.
let tokens = token_config
.into_iter()
.map(|t| t.0)
.collect::<Vec<_>>();
let fee_bytes = swap_fee_percentage.to_signed_bytes_be();
let rate_providers_bytes = json_serialize_address_list(rate_providers.as_slice());
let mut attributes = vec![
("pool_type", "StablePoolFactory".as_bytes()),
("bpt", &pool),
("fee", &fee_bytes),
("manual_updates", &[1u8]),
];
if !rate_providers.is_empty() {
attributes.push(("rate_providers", &rate_providers_bytes));
}
Some(
ProtocolComponent::new(&format!("0x{}", hex::encode(&pool)))
.with_contracts(&[pool.to_owned(), VAULT_ADDRESS.to_vec()])
.with_tokens(tokens.as_slice())
.with_attributes(&attributes)
.as_swap_type("balancer_v3_pool", ImplementationType::Vm),
)
}
_ => None,
}
}

View File

@@ -0,0 +1,78 @@
specVersion: v0.1.0
package:
name: "ethereum_balancer_v3"
version: v0.4.0
protobuf:
files:
- tycho/evm/v1/vm.proto
- tycho/evm/v1/common.proto
- tycho/evm/v1/utils.proto
importPaths:
- ../../proto
excludePaths:
- sf/substreams
- google
binaries:
default:
type: wasm/rust-v1
file: ../target/wasm32-unknown-unknown/release/ethereum_balancer_v3.wasm
modules:
- name: map_components
kind: map
initialBlock: 21332121
inputs:
- source: sf.ethereum.type.v2.Block
output:
type: proto:tycho.evm.v1.BlockTransactionProtocolComponents
- name: store_components
kind: store
initialBlock: 21332121
updatePolicy: set_if_not_exists
valueType: proto:tycho.evm.v1.ProtocolComponents
inputs:
- map: map_components
- name: store_token_set
kind: store
initialBlock: 21332121
updatePolicy: set_if_not_exists
valueType: int64
inputs:
- map: map_components
- name: map_relative_balances
kind: map
initialBlock: 21332121
inputs:
- source: sf.ethereum.type.v2.Block
- store: store_components
output:
type: proto:tycho.evm.v1.BlockBalanceDeltas
- name: store_balances
kind: store
initialBlock: 21332121
updatePolicy: add
valueType: bigint
inputs:
- map: map_relative_balances
- name: map_protocol_changes
kind: map
initialBlock: 21332121
inputs:
- source: sf.ethereum.type.v2.Block
- map: map_components
- map: map_relative_balances
- store: store_components
- store: store_token_set
- store: store_balances
mode: deltas # This is the key property that simplifies `BalanceChange` handling
output:
type: proto:tycho.evm.v1.BlockChanges
network: mainnet

View File

@@ -21,4 +21,5 @@ ignore = [
"ethereum-pancakeswap-v3/src/abi",
"ethereum-uniswap-v4/src/abi",
"ethereum-maverick-v2/src/abi",
]
"ethereum-balancer-v3/src/abi"
]