feat: lots of refactoring (file splitting

- `lib.rs` split into multiple files with `modules.rs` being the main entrypoint for substreams
- contract changes are tracked similiarly to ambient (generalized to use a store of tracked contracts)
- static attributes were pruned of the dynamic ones
This commit is contained in:
0xMochan
2024-01-29 11:11:06 -05:00
parent 4d56335f2e
commit 9f82671082
9 changed files with 744 additions and 502 deletions

View File

@@ -9,12 +9,13 @@ import requests
# Exports contract ABI in JSON # Exports contract ABI in JSON
abis = { abis = {
# Factories
"WeightedPoolFactory (v4)": "0x897888115Ada5773E02aA29F775430BFB5F34c51", "WeightedPoolFactory (v4)": "0x897888115Ada5773E02aA29F775430BFB5F34c51",
"WeightedPool2TokensFactory": "0xA5bf2ddF098bb0Ef6d120C98217dD6B141c74EE0", # 80Bal-20WETH "WeightedPool2TokensFactory": "0xA5bf2ddF098bb0Ef6d120C98217dD6B141c74EE0", # 80Bal-20WETH
"ComposableStablePoolFactory (v5)": "0xDB8d758BCb971e482B2C45f7F8a7740283A1bd3A", "ComposableStablePoolFactory (v5)": "0xDB8d758BCb971e482B2C45f7F8a7740283A1bd3A",
"ERC4626LinearPoolFactory (v4)": "0x813EE7a840CE909E7Fea2117A44a90b8063bd4fd", "ERC4626LinearPoolFactory (v4)": "0x813EE7a840CE909E7Fea2117A44a90b8063bd4fd",
"EulerLinearPoolFactory": "0x5F43FBa61f63Fa6bFF101a0A0458cEA917f6B347", "EulerLinearPoolFactory": "0x5F43FBa61f63Fa6bFF101a0A0458cEA917f6B347",
"GearboxLinearPoolFactory (v2)": "0x39A79EB449Fc05C92c39aA6f0e9BfaC03BE8dE5B", # "GearboxLinearPoolFactory (v2)": "0x39A79EB449Fc05C92c39aA6f0e9BfaC03BE8dE5B",
"ManagedPoolFactory (v2)": "0xBF904F9F340745B4f0c4702c7B6Ab1e808eA6b93", "ManagedPoolFactory (v2)": "0xBF904F9F340745B4f0c4702c7B6Ab1e808eA6b93",
"SiloLinearPoolFactory (v2)": "0x4E11AEec21baF1660b1a46472963cB3DA7811C89", "SiloLinearPoolFactory (v2)": "0x4E11AEec21baF1660b1a46472963cB3DA7811C89",
"YearnLinearPoolFactory (v2)": "0x5F5222Ffa40F2AEd6380D022184D6ea67C776eE0", "YearnLinearPoolFactory (v2)": "0x5F5222Ffa40F2AEd6380D022184D6ea67C776eE0",

View File

@@ -70,8 +70,13 @@ message ProtocolComponent {
ChangeType change = 5; ChangeType change = 5;
} }
message ProtocolComponents { message TransactionProtocolComponents {
repeated ProtocolComponent components = 1; Transaction tx = 1;
repeated ProtocolComponent components = 2;
}
message GroupedTransactionProtocolComponents {
repeated TransactionProtocolComponents tx_components = 1;
} }
// A struct for following the changes of Total Value Locked (TVL) of a protocol component. // A struct for following the changes of Total Value Locked (TVL) of a protocol component.

View File

@@ -0,0 +1,190 @@
/// This file contains helpers to capture contract changes from the expanded block model. These
/// leverage the `code_changes`, `balance_changes`, and `storage_changes` fields available on the
/// `Call` type provided by block model in a substream (i.e. `logs_and_calls`, etc).
///
/// ⚠️ These helpers *only* work if the **expanded block model** is available, more info blow.
/// https://streamingfastio.medium.com/new-block-model-to-accelerate-chain-integration-9f65126e5425
use std::collections::HashMap;
use substreams_ethereum::pb::eth;
use pb::tycho::evm::v1::{self as tycho};
use substreams::store::{StoreGet, StoreGetInt64};
use crate::pb;
struct SlotValue {
new_value: Vec<u8>,
start_value: Vec<u8>,
}
impl SlotValue {
fn has_changed(&self) -> bool {
self.start_value != self.new_value
}
}
// Uses a map for slots, protobuf does not allow bytes in hashmap keys
pub struct InterimContractChange {
address: Vec<u8>,
balance: Vec<u8>,
code: Vec<u8>,
slots: HashMap<Vec<u8>, SlotValue>,
change: tycho::ChangeType,
}
impl From<InterimContractChange> for tycho::ContractChange {
fn from(value: InterimContractChange) -> Self {
tycho::ContractChange {
address: value.address,
balance: value.balance,
code: value.code,
slots: value
.slots
.into_iter()
.filter(|(_, value)| value.has_changed())
.map(|(slot, value)| tycho::ContractSlot {
slot,
value: value.new_value,
})
.collect(),
change: value.change.into(),
}
}
}
pub fn extract_contract_changes(
block: &eth::v2::Block,
contracts: StoreGetInt64,
transaction_contract_changes: &mut HashMap<u64, tycho::TransactionContractChanges>,
) {
let mut changed_contracts: HashMap<Vec<u8>, InterimContractChange> = HashMap::new();
// Collect all accounts created in this block
let created_accounts: HashMap<_, _> = block
.transactions()
.flat_map(|tx| {
tx.calls.iter().flat_map(|call| {
call.account_creations
.iter()
.map(|ac| (&ac.account, ac.ordinal))
})
})
.collect();
block.transactions().for_each(|block_tx| {
let mut storage_changes = Vec::new();
let mut balance_changes = Vec::new();
let mut code_changes = Vec::new();
block_tx
.calls
.iter()
.filter(|call| {
!call.state_reverted
&& contracts
.get_last(format!("pool:{0}", hex::encode(&call.address)))
.is_some()
})
.for_each(|call| {
storage_changes.extend(call.storage_changes.iter());
balance_changes.extend(call.balance_changes.iter());
code_changes.extend(call.code_changes.iter());
});
storage_changes.sort_unstable_by_key(|change| change.ordinal);
balance_changes.sort_unstable_by_key(|change| change.ordinal);
code_changes.sort_unstable_by_key(|change| change.ordinal);
storage_changes.iter().for_each(|storage_change| {
let contract_change = changed_contracts
.entry(storage_change.address.clone())
.or_insert_with(|| InterimContractChange {
address: storage_change.address.clone(),
balance: Vec::new(),
code: Vec::new(),
slots: HashMap::new(),
change: if created_accounts.contains_key(&storage_change.address) {
tycho::ChangeType::Creation
} else {
tycho::ChangeType::Update
},
});
let slot_value = contract_change
.slots
.entry(storage_change.key.clone())
.or_insert_with(|| SlotValue {
new_value: storage_change.new_value.clone(),
start_value: storage_change.old_value.clone(),
});
slot_value
.new_value
.copy_from_slice(&storage_change.new_value);
});
balance_changes.iter().for_each(|balance_change| {
let contract_change = changed_contracts
.entry(balance_change.address.clone())
.or_insert_with(|| InterimContractChange {
address: balance_change.address.clone(),
balance: Vec::new(),
code: Vec::new(),
slots: HashMap::new(),
change: if created_accounts.contains_key(&balance_change.address) {
tycho::ChangeType::Creation
} else {
tycho::ChangeType::Update
},
});
if let Some(new_balance) = &balance_change.new_value {
contract_change.balance.clear();
contract_change
.balance
.extend_from_slice(&new_balance.bytes);
}
});
code_changes.iter().for_each(|code_change| {
let contract_change = changed_contracts
.entry(code_change.address.clone())
.or_insert_with(|| InterimContractChange {
address: code_change.address.clone(),
balance: Vec::new(),
code: Vec::new(),
slots: HashMap::new(),
change: if created_accounts.contains_key(&code_change.address) {
tycho::ChangeType::Creation
} else {
tycho::ChangeType::Update
},
});
contract_change.code.clear();
contract_change
.code
.extend_from_slice(&code_change.new_code);
});
if !storage_changes.is_empty() || !balance_changes.is_empty() || !code_changes.is_empty() {
transaction_contract_changes
.entry(block_tx.index.into())
.or_insert_with(|| tycho::TransactionContractChanges {
tx: Some(tycho::Transaction {
hash: block_tx.hash.clone(),
from: block_tx.from.clone(),
to: block_tx.to.clone(),
index: block_tx.index as u64,
}),
contract_changes: vec![],
component_changes: vec![],
balance_changes: vec![],
})
.contract_changes
.extend(changed_contracts.drain().map(|(_, change)| change.into()));
}
});
}

View File

@@ -1,491 +1,5 @@
use std::collections::HashMap;
use anyhow::Result;
use substreams::hex;
use substreams::pb::substreams::StoreDeltas;
use substreams::store::{StoreAdd, StoreAddBigInt, StoreNew};
use substreams::key;
use substreams::scalar::BigInt;
use substreams_ethereum::pb::eth;
use substreams_ethereum::pb::eth::v2::{Call, Log};
use substreams_ethereum::{Event, Function};
use itertools::Itertools;
use pb::tycho::evm::v1::{self as tycho};
mod abi; mod abi;
mod contract_changes;
mod modules;
mod pb; mod pb;
mod pool_factories;
const VAULT_ADDRESS: &[u8] = &hex!("BA12222222228d8Ba445958a75a0704d566BF2C8");
/// This trait defines some helpers for serializing and deserializing `Vec<BigInt` which is needed
/// to be able to encode the `normalized_weights` and `weights` `Attribute`s. This should also be
/// handled by any downstream application.
trait SerializableVecBigInt {
fn serialize_bytes(&self) -> Vec<u8>;
fn deserialize_bytes(bytes: &[u8]) -> Vec<BigInt>;
}
impl SerializableVecBigInt for Vec<BigInt> {
fn serialize_bytes(&self) -> Vec<u8> {
self.iter()
.flat_map(|big_int| big_int.to_signed_bytes_be())
.collect()
}
fn deserialize_bytes(bytes: &[u8]) -> Vec<BigInt> {
bytes
.chunks_exact(32)
.map(|chunk| BigInt::from_signed_bytes_be(chunk))
.collect::<Vec<BigInt>>()
}
}
/// This struct purely exists to spoof the `PartialEq` trait for `Transaction` so we can use it in
/// a later groupby operation.
struct TransactionWrapper(tycho::Transaction);
impl PartialEq for TransactionWrapper {
fn eq(&self, other: &Self) -> bool {
self.0.hash == other.0.hash
}
}
/// This is the main function that handles the creation of `ProtocolComponent`s with `Attribute`s
/// based on the specific factory address. There's 3 factory groups that are represented here:
/// - Weighted Pool Factories
/// - Linear Pool Factories
/// - Stable Pool Factories
/// (Balancer does have a bit more (esp. in the deprecated section) that could be implemented as
/// desired.)
/// We use the specific ABIs to decode both the log event and cooresponding call to gather
/// `PoolCreated` event information alongside the `Create` calldata that provide us details to
/// fufill both the required details + any extra `Attributes`
/// Ref: https://docs.balancer.fi/reference/contracts/deployment-addresses/mainnet.html
fn pool_factory_map(pool_addr: &[u8], log: &Log, call: &Call) -> Option<tycho::ProtocolComponent> {
match *pool_addr {
hex!("897888115Ada5773E02aA29F775430BFB5F34c51") => {
let create_call =
abi::weighted_pool_factory::functions::Create::match_and_decode(call)?;
let pool_created =
abi::weighted_pool_factory::events::PoolCreated::match_and_decode(log)?;
Some(tycho::ProtocolComponent {
id: hex::encode(&pool_created.pool),
tokens: create_call.tokens,
contracts: vec![pool_addr.into(), pool_created.pool],
static_att: vec![
tycho::Attribute {
name: "pool_type".into(),
value: "WeightedPoolFactory".into(),
change: tycho::ChangeType::Creation.into(),
},
tycho::Attribute {
name: "swap_fee_percentage".into(),
value: create_call.swap_fee_percentage.to_signed_bytes_be(),
change: tycho::ChangeType::Creation.into(),
},
tycho::Attribute {
name: "normalized_weights".into(),
value: create_call.normalized_weights.serialize_bytes(),
change: tycho::ChangeType::Creation.into(),
},
],
change: tycho::ChangeType::Creation.into(),
})
}
hex!("DB8d758BCb971e482B2C45f7F8a7740283A1bd3A") => {
let create_call =
abi::composable_stable_pool_factory::functions::Create::match_and_decode(call)?;
let pool_created =
abi::composable_stable_pool_factory::events::PoolCreated::match_and_decode(log)?;
Some(tycho::ProtocolComponent {
id: hex::encode(&pool_created.pool),
tokens: create_call.tokens,
contracts: vec![pool_addr.into(), pool_created.pool],
static_att: vec![
tycho::Attribute {
name: "pool_type".into(),
value: "ComposableStablePoolFactory".into(),
change: tycho::ChangeType::Creation.into(),
},
tycho::Attribute {
name: "swap_fee_percentage".into(),
value: create_call.swap_fee_percentage.to_signed_bytes_be(),
change: tycho::ChangeType::Creation.into(),
},
tycho::Attribute {
name: "amplification_parameter".into(),
value: create_call.amplification_parameter.to_signed_bytes_be(),
change: tycho::ChangeType::Creation.into(),
},
],
change: tycho::ChangeType::Creation.into(),
})
}
hex!("813EE7a840CE909E7Fea2117A44a90b8063bd4fd") => {
let create_call =
abi::erc_linear_pool_factory::functions::Create::match_and_decode(call)?;
let pool_created =
abi::erc_linear_pool_factory::events::PoolCreated::match_and_decode(log)?;
Some(tycho::ProtocolComponent {
id: hex::encode(&pool_created.pool),
tokens: vec![create_call.main_token, create_call.wrapped_token],
contracts: vec![pool_addr.into(), pool_created.pool],
static_att: vec![
tycho::Attribute {
name: "pool_type".into(),
value: "ERC4626LinearPoolFactory".into(),
change: tycho::ChangeType::Creation.into(),
},
tycho::Attribute {
name: "swap_fee_percentage".into(),
value: create_call.swap_fee_percentage.to_signed_bytes_be(),
change: tycho::ChangeType::Creation.into(),
},
tycho::Attribute {
name: "upper_target".into(),
value: create_call.upper_target.to_signed_bytes_be(),
change: tycho::ChangeType::Creation.into(),
},
// Note, `lower_target` is generally hardcoded for all pools, not located in call data
// Note, rate provider might be provided as `create.protocol_id`, but as a BigInt. needs investigation
],
change: tycho::ChangeType::Creation.into(),
})
}
hex!("5F43FBa61f63Fa6bFF101a0A0458cEA917f6B347") => {
let create_call =
abi::euler_linear_pool_factory::functions::Create::match_and_decode(call)?;
let pool_created =
abi::euler_linear_pool_factory::events::PoolCreated::match_and_decode(log)?;
Some(tycho::ProtocolComponent {
id: hex::encode(&pool_created.pool),
tokens: vec![create_call.main_token, create_call.wrapped_token],
contracts: vec![pool_addr.into(), pool_created.pool],
static_att: vec![
tycho::Attribute {
name: "pool_type".into(),
value: "EulerLinearPoolFactory".into(),
change: tycho::ChangeType::Creation.into(),
},
tycho::Attribute {
name: "swap_fee_percentage".into(),
value: create_call.swap_fee_percentage.to_signed_bytes_be(),
change: tycho::ChangeType::Creation.into(),
},
tycho::Attribute {
name: "upper_target".into(),
value: create_call.upper_target.to_signed_bytes_be(),
change: tycho::ChangeType::Creation.into(),
},
],
change: tycho::ChangeType::Creation.into(),
})
}
hex!("39A79EB449Fc05C92c39aA6f0e9BfaC03BE8dE5B") => {
let create_call =
abi::gearbox_linear_pool_factory::functions::Create::match_and_decode(call)?;
let pool_created =
abi::gearbox_linear_pool_factory::events::PoolCreated::match_and_decode(log)?;
Some(tycho::ProtocolComponent {
id: hex::encode(&pool_created.pool),
tokens: vec![create_call.main_token, create_call.wrapped_token],
contracts: vec![pool_addr.into(), pool_created.pool],
static_att: vec![
tycho::Attribute {
name: "pool_type".into(),
value: "GearboxLinearPoolFactory".into(),
change: tycho::ChangeType::Creation.into(),
},
tycho::Attribute {
name: "swap_fee_percentage".into(),
value: create_call.swap_fee_percentage.to_signed_bytes_be(),
change: tycho::ChangeType::Creation.into(),
},
tycho::Attribute {
name: "upper_target".into(),
value: create_call.upper_target.to_signed_bytes_be(),
change: tycho::ChangeType::Creation.into(),
},
],
change: tycho::ChangeType::Creation.into(),
})
}
// The `ManagedPoolFactory` is a bit ✨ unique ✨, so we'll leave it commented out for now
// Take a look at it's `Create` call to see how the params are structured.
// hex!("BF904F9F340745B4f0c4702c7B6Ab1e808eA6b93") => {
// let create_call = abi::managed_pool_factory::functions::Create::match_and_decode(call)?;
// let pool_created =
// abi::managed_pool_factory::events::PoolCreated::match_and_decode(log)?;
// Some(tycho::ProtocolComponent {
// id: hex::encode(&pool_created.pool),
// tokens: create_call.tokens,
// contracts: vec![pool_addr.into(), pool_created.pool],
// static_att: vec![
// tycho::Attribute {
// name: "pool_type".into(),
// value: "ManagedPoolFactory".into(),
// change: tycho::ChangeType::Creation.into(),
// },
// tycho::Attribute {
// name: "swap_fee_percentage".into(),
// value: create_call.swap_fee_percentage.to_signed_bytes_be(),
// change: tycho::ChangeType::Creation.into(),
// },
// ],
// change: tycho::ChangeType::Creation.into(),
// })
// }
hex!("4E11AEec21baF1660b1a46472963cB3DA7811C89") => {
let create_call =
abi::silo_linear_pool_factory::functions::Create::match_and_decode(call)?;
let pool_created =
abi::silo_linear_pool_factory::events::PoolCreated::match_and_decode(log)?;
Some(tycho::ProtocolComponent {
id: hex::encode(&pool_created.pool),
tokens: vec![create_call.main_token, create_call.wrapped_token],
contracts: vec![pool_addr.into(), pool_created.pool],
static_att: vec![
tycho::Attribute {
name: "pool_type".into(),
value: "SiloLinearPoolFactory".into(),
change: tycho::ChangeType::Creation.into(),
},
tycho::Attribute {
name: "swap_fee_percentage".into(),
value: create_call.swap_fee_percentage.to_signed_bytes_be(),
change: tycho::ChangeType::Creation.into(),
},
tycho::Attribute {
name: "upper_target".into(),
value: create_call.upper_target.to_signed_bytes_be(),
change: tycho::ChangeType::Creation.into(),
},
],
change: tycho::ChangeType::Creation.into(),
})
}
hex!("5F5222Ffa40F2AEd6380D022184D6ea67C776eE0") => {
let create_call =
abi::yearn_linear_pool_factory::functions::Create::match_and_decode(call)?;
let pool_created =
abi::yearn_linear_pool_factory::events::PoolCreated::match_and_decode(log)?;
Some(tycho::ProtocolComponent {
id: hex::encode(&pool_created.pool),
tokens: vec![create_call.main_token, create_call.wrapped_token],
contracts: vec![pool_addr.into(), pool_created.pool],
static_att: vec![
tycho::Attribute {
name: "pool_type".into(),
value: "YearnLinearPoolFactory".into(),
change: tycho::ChangeType::Creation.into(),
},
tycho::Attribute {
name: "swap_fee_percentage".into(),
value: create_call.swap_fee_percentage.to_signed_bytes_be(),
change: tycho::ChangeType::Creation.into(),
},
tycho::Attribute {
name: "upper_target".into(),
value: create_call.upper_target.to_signed_bytes_be(),
change: tycho::ChangeType::Creation.into(),
},
],
change: tycho::ChangeType::Creation.into(),
})
}
// The `WeightedPool2TokenFactory` is a deprecated contract but we've included it since one
// of the highest TVL pools, 80BAL-20WETH, is able to be tracked.
hex!("A5bf2ddF098bb0Ef6d120C98217dD6B141c74EE0") => {
let create_call =
abi::weighted_pool_tokens_factory::functions::Create::match_and_decode(call)?;
let pool_created =
abi::weighted_pool_tokens_factory::events::PoolCreated::match_and_decode(log)?;
Some(tycho::ProtocolComponent {
id: hex::encode(&pool_created.pool),
tokens: create_call.tokens,
contracts: vec![pool_addr.into(), pool_created.pool],
static_att: vec![
tycho::Attribute {
name: "pool_type".into(),
value: "WeightedPool2TokensFactory".into(),
change: tycho::ChangeType::Creation.into(),
},
tycho::Attribute {
name: "swap_fee_percentage".into(),
value: create_call.swap_fee_percentage.to_signed_bytes_be(),
change: tycho::ChangeType::Creation.into(),
},
// TODO
tycho::Attribute {
name: "weights".into(),
value: create_call.weights.serialize_bytes(),
change: tycho::ChangeType::Creation.into(),
},
],
change: tycho::ChangeType::Creation.into(),
})
}
_ => None,
}
}
/// Since the `PoolBalanceChanged` events administer only deltas, we need to leverage a map and a
/// store to be able to tally up final balances for tokens in a pool.
#[substreams::handlers::map]
pub fn map_balance_deltas(block: eth::v2::Block) -> Result<tycho::BalanceDeltas, anyhow::Error> {
Ok(tycho::BalanceDeltas {
balance_deltas: block
.events::<abi::vault::events::PoolBalanceChanged>(&[&VAULT_ADDRESS])
.flat_map(|(event, log)| {
event
.tokens
.iter()
.zip(event.deltas.iter())
.map(|(token, delta)| tycho::BalanceDelta {
ord: log.log.ordinal,
tx: Some(tycho::Transaction {
hash: log.receipt.transaction.hash.clone(),
from: log.receipt.transaction.from.clone(),
to: log.receipt.transaction.to.clone(),
index: Into::<u64>::into(log.receipt.transaction.index),
}),
token: token.clone(),
delta: delta.to_signed_bytes_be(),
component_id: event.pool_id.into(),
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>(),
})
}
/// It's significant to include both the `pool_id` and the `token_id` for each balance delta as the
/// store key to ensure that there's a unique balance being tallied for each.
#[substreams::handlers::store]
pub fn store_balance_changes(deltas: tycho::BalanceDeltas, store: StoreAddBigInt) {
deltas.balance_deltas.iter().for_each(|delta| {
store.add(
delta.ord,
format!(
"pool:{0}:token:{1}",
hex::encode(&delta.component_id),
hex::encode(&delta.token)
),
BigInt::from_signed_bytes_be(&delta.delta),
);
});
}
/// This is the main map that handles most of the indexing of this substream.
#[substreams::handlers::map]
pub fn map_changes(
block: eth::v2::Block,
deltas: tycho::BalanceDeltas,
store: StoreDeltas, // Note, this map module is using the `deltas` mode for the store.
) -> Result<tycho::BlockContractChanges> {
// Gather contract changes by indexing `PoolCreated` events and analysing the `Create` call
// We store these as a hashmap by tx hash since we need to agg by tx hash later
let mut transaction_contract_changes = block
.transactions()
.flat_map(|tx| {
tx.logs_with_calls()
.filter(|(_, call)| !call.call.state_reverted)
.filter_map(|(log, call)| {
let pool_factory_address = call.call.address.as_slice();
Some((
tx.hash.clone(),
tycho::TransactionContractChanges {
tx: Some(tycho::Transaction {
hash: tx.hash.clone(),
from: tx.from.clone(),
to: tx.to.clone(),
index: Into::<u64>::into(tx.index),
}),
contract_changes: vec![],
balance_changes: vec![],
component_changes: vec![pool_factory_map(
pool_factory_address,
log,
call.call,
)?],
},
))
})
})
.collect::<HashMap<_, _>>();
// Balance changes are gathered by the `StoreDelta` based on `PoolBalanceChanged` creating `BalanceDeltas`
// We essentially just process the changes that occured to the `store` this block
// Then, these balance changes are merged onto the existing map of tx contract changes,
// inserting a new one if it doesn't exist.
store
.deltas
.into_iter()
.zip(deltas.balance_deltas)
.map(|(store_delta, balance_delta)| {
let pool_id = key::segment_at(&store_delta.key, 1);
let token_id = key::segment_at(&store_delta.key, 3);
(
balance_delta.tx.unwrap(),
tycho::BalanceChange {
token: hex::decode(token_id).expect("Token ID not valid hex"),
balance: store_delta.new_value,
component_id: hex::decode(pool_id).expect("Token ID not valid hex"),
},
)
})
// We need to group the balance changes by tx hash for the `TransactionContractChanges` agg
.group_by(|(tx, _)| TransactionWrapper(tx.clone()))
.into_iter()
.for_each(|(tx_wrapped, group)| {
let tx = tx_wrapped.0;
if let Some(tx_change) = transaction_contract_changes.get_mut(&tx.hash) {
tx_change
.balance_changes
.extend(group.map(|(_, change)| change.clone()));
} else {
transaction_contract_changes.insert(
tx.hash.clone(),
tycho::TransactionContractChanges {
tx: Some(tx),
contract_changes: vec![],
component_changes: vec![],
balance_changes: group
.map(|(_, change)| change.clone())
.collect::<Vec<_>>(),
},
);
}
});
Ok(tycho::BlockContractChanges {
block: Some(tycho::Block {
number: block.number,
hash: block.hash.clone(),
parent_hash: block
.header
.as_ref()
.expect("Block header not present")
.parent_hash
.clone(),
ts: block.timestamp_seconds(),
}),
changes: transaction_contract_changes
.into_values()
.sorted_unstable_by_key(|tx_change| tx_change.tx.clone().unwrap().index)
.collect::<Vec<_>>(),
})
}

View File

@@ -0,0 +1,239 @@
use std::collections::HashMap;
use anyhow::Result;
use substreams::hex;
use substreams::pb::substreams::StoreDeltas;
use substreams::store::{
StoreAdd, StoreAddBigInt, StoreAddInt64, StoreGet, StoreGetInt64, StoreNew,
};
use substreams::key;
use substreams::scalar::BigInt;
use substreams_ethereum::pb::eth;
use itertools::Itertools;
use pb::tycho::evm::v1::{self as tycho};
use contract_changes::extract_contract_changes;
use crate::{abi, contract_changes, pb, pool_factories};
const VAULT_ADDRESS: &[u8] = &hex!("BA12222222228d8Ba445958a75a0704d566BF2C8");
/// This struct purely exists to spoof the `PartialEq` trait for `Transaction` so we can use it in
/// a later groupby operation.
#[derive(Debug)]
struct TransactionWrapper(tycho::Transaction);
impl PartialEq for TransactionWrapper {
fn eq(&self, other: &Self) -> bool {
self.0.hash == other.0.hash
}
}
#[substreams::handlers::map]
pub fn map_pools_created(
block: eth::v2::Block,
) -> Result<tycho::GroupedTransactionProtocolComponents> {
// Gather contract changes by indexing `PoolCreated` events and analysing the `Create` call
// We store these as a hashmap by tx hash since we need to agg by tx hash later
Ok(tycho::GroupedTransactionProtocolComponents {
tx_components: block
.transactions()
.map(|tx| tycho::TransactionProtocolComponents {
tx: Some(tycho::Transaction {
hash: tx.hash.clone(),
from: tx.from.clone(),
to: tx.to.clone(),
index: Into::<u64>::into(tx.index),
}),
components: tx
.logs_with_calls()
.filter(|(_, call)| !call.call.state_reverted)
.filter_map(|(log, call)| {
Some(pool_factories::address_map(
call.call.address.as_slice(),
log,
call.call,
)?)
})
.collect::<Vec<_>>(),
})
.collect::<Vec<_>>(),
})
}
/// Simply stores the `ProtocolComponent`s with the pool id as the key
#[substreams::handlers::store]
pub fn store_pools_created(map: tycho::GroupedTransactionProtocolComponents, store: StoreAddInt64) {
store.add_many(
0,
&map.tx_components
.iter()
.flat_map(|tx_components| &tx_components.components)
.map(|component| format!("pool:{0}", component.id))
.collect::<Vec<_>>(),
1,
);
}
/// Since the `PoolBalanceChanged` events administer only deltas, we need to leverage a map and a
/// store to be able to tally up final balances for tokens in a pool.
#[substreams::handlers::map]
pub fn map_balance_deltas(block: eth::v2::Block) -> Result<tycho::BalanceDeltas, anyhow::Error> {
Ok(tycho::BalanceDeltas {
balance_deltas: block
.events::<abi::vault::events::PoolBalanceChanged>(&[&VAULT_ADDRESS])
.flat_map(|(event, log)| {
event
.tokens
.iter()
.zip(event.deltas.iter())
.map(|(token, delta)| tycho::BalanceDelta {
ord: log.log.ordinal,
tx: Some(tycho::Transaction {
hash: log.receipt.transaction.hash.clone(),
from: log.receipt.transaction.from.clone(),
to: log.receipt.transaction.to.clone(),
index: Into::<u64>::into(log.receipt.transaction.index),
}),
token: token.clone(),
delta: delta.to_signed_bytes_be(),
component_id: event.pool_id.into(),
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>(),
})
}
/// It's significant to include both the `pool_id` and the `token_id` for each balance delta as the
/// store key to ensure that there's a unique balance being tallied for each.
#[substreams::handlers::store]
pub fn store_balance_changes(deltas: tycho::BalanceDeltas, store: StoreAddBigInt) {
deltas.balance_deltas.iter().for_each(|delta| {
store.add(
delta.ord,
format!(
"pool:{0}:token:{1}",
hex::encode(&delta.component_id),
hex::encode(&delta.token)
),
BigInt::from_signed_bytes_be(&delta.delta),
);
});
}
/// This is the main map that handles most of the indexing of this substream.
/// Every contract change is grouped by transaction index via the `transaction_contract_changes`
/// map. Each block of code will extend the `TransactionContractChanges` struct with the
/// cooresponding changes (balance, component, contract), inserting a new one if it doesn't exist.
/// At the very end, the map can easily be sorted by index to ensure the final `BlockContractChanges`
/// is ordered by transactions properly.
#[substreams::handlers::map]
pub fn map_changes(
block: eth::v2::Block,
grouped_components: tycho::GroupedTransactionProtocolComponents,
deltas: tycho::BalanceDeltas,
components_store: StoreGetInt64,
balance_store: StoreDeltas, // Note, this map module is using the `deltas` mode for the store.
) -> Result<tycho::BlockContractChanges> {
// We merge contract changes by transaction (identified by transaction index) making it easy to
// sort them at the very end.
let mut transaction_contract_changes: HashMap<_, tycho::TransactionContractChanges> =
HashMap::new();
// `ProtocolComponents` are gathered from `map_pools_created` which just need a bit of work to
// convert into `TransactionContractChanges`
grouped_components
.tx_components
.iter()
.for_each(|tx_component| {
let tx = tx_component.tx.as_ref().unwrap();
transaction_contract_changes
.entry(tx.index)
.or_insert_with(|| tycho::TransactionContractChanges {
tx: Some(tx.clone()),
contract_changes: vec![],
component_changes: vec![],
balance_changes: vec![],
})
.component_changes
.extend_from_slice(&tx_component.components);
});
// Balance changes are gathered by the `StoreDelta` based on `PoolBalanceChanged` creating
// `BalanceDeltas`. We essentially just process the changes that occured to the `store` this
// block. Then, these balance changes are merged onto the existing map of tx contract changes,
// inserting a new one if it doesn't exist.
balance_store
.deltas
.into_iter()
.zip(deltas.balance_deltas)
.map(|(store_delta, balance_delta)| {
let pool_id = key::segment_at(&store_delta.key, 1);
let token_id = key::segment_at(&store_delta.key, 3);
(
balance_delta.tx.unwrap(),
tycho::BalanceChange {
token: hex::decode(token_id).expect("Token ID not valid hex"),
balance: store_delta.new_value,
component_id: hex::decode(pool_id).expect("Token ID not valid hex"),
},
)
})
// We need to group the balance changes by tx hash for the `TransactionContractChanges` agg
.group_by(|(tx, _)| TransactionWrapper(tx.clone()))
.into_iter()
.for_each(|(tx_wrapped, group)| {
let tx = tx_wrapped.0;
transaction_contract_changes
.entry(tx.index)
.or_insert_with(|| tycho::TransactionContractChanges {
tx: Some(tx.clone()),
contract_changes: vec![],
component_changes: vec![],
balance_changes: vec![],
})
.balance_changes
.extend(group.map(|(_, change)| change));
});
// General helper for extracting contract changes. Uses block, our component store which holds
// all of our tracked deployed pool addresses, and the map of tx contract changes which we
// output into for final processing later.
extract_contract_changes(&block, components_store, &mut transaction_contract_changes);
// Process all `transaction_contract_changes` for final output in the `BlockContractChanges`,
// sorted by transaction index (the key).
Ok(tycho::BlockContractChanges {
block: Some(tycho::Block {
number: block.number,
hash: block.hash.clone(),
parent_hash: block
.header
.as_ref()
.expect("Block header not present")
.parent_hash
.clone(),
ts: block.timestamp_seconds(),
}),
changes: transaction_contract_changes
.drain()
.sorted_unstable_by_key(|(index, _)| index.clone())
.filter_map(|(_, change)| {
if change.contract_changes.is_empty()
&& change.component_changes.is_empty()
&& change.balance_changes.is_empty()
{
None
} else {
Some(change)
}
})
.collect::<Vec<_>>(),
})
}

View File

@@ -1,3 +1,4 @@
// @generated
pub mod tycho { pub mod tycho {
pub mod evm { pub mod evm {
// @@protoc_insertion_point(attribute:tycho.evm.v1) // @@protoc_insertion_point(attribute:tycho.evm.v1)

View File

@@ -32,6 +32,7 @@ pub struct Transaction {
#[prost(bytes="vec", tag="3")] #[prost(bytes="vec", tag="3")]
pub to: ::prost::alloc::vec::Vec<u8>, pub to: ::prost::alloc::vec::Vec<u8>,
/// The transactions index within the block. /// The transactions index within the block.
/// TODO: should this be uint32? to match the type from the native substream type?
#[prost(uint64, tag="4")] #[prost(uint64, tag="4")]
pub index: u64, pub index: u64,
} }
@@ -80,10 +81,18 @@ pub struct ProtocolComponent {
} }
#[allow(clippy::derive_partial_eq_without_eq)] #[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)] #[derive(Clone, PartialEq, ::prost::Message)]
pub struct ProtocolComponents { pub struct TransactionProtocolComponents {
#[prost(message, repeated, tag="1")] #[prost(message, optional, tag="1")]
pub tx: ::core::option::Option<Transaction>,
#[prost(message, repeated, tag="2")]
pub components: ::prost::alloc::vec::Vec<ProtocolComponent>, pub components: ::prost::alloc::vec::Vec<ProtocolComponent>,
} }
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GroupedTransactionProtocolComponents {
#[prost(message, repeated, tag="1")]
pub tx_components: ::prost::alloc::vec::Vec<TransactionProtocolComponents>,
}
/// A struct for following the changes of Total Value Locked (TVL) of a protocol component. /// A struct for following the changes of Total Value Locked (TVL) of a protocol component.
/// Note that if a ProtocolComponent contains multiple contracts, the TVL is tracked for the component as a whole. /// Note that if a ProtocolComponent contains multiple contracts, the TVL is tracked for the component as a whole.
/// E.g. for UniswapV2 pair WETH/USDC, this tracks the USDC and WETH balance of the pair contract. /// E.g. for UniswapV2 pair WETH/USDC, this tracks the USDC and WETH balance of the pair contract.
@@ -129,14 +138,6 @@ pub struct BalanceDeltas {
#[prost(message, repeated, tag="1")] #[prost(message, repeated, tag="1")]
pub balance_deltas: ::prost::alloc::vec::Vec<BalanceDelta>, pub balance_deltas: ::prost::alloc::vec::Vec<BalanceDelta>,
} }
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct BlockBalanceChanges {
#[prost(message, optional, tag="1")]
pub block: ::core::option::Option<Block>,
#[prost(message, repeated, tag="2")]
pub balance_changes: ::prost::alloc::vec::Vec<BalanceChange>,
}
/// Enum to specify the type of a change. /// Enum to specify the type of a change.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)] #[repr(i32)]

View File

@@ -0,0 +1,273 @@
use substreams_ethereum::pb::eth::v2::{Call, Log};
use substreams_ethereum::{Event, Function};
use crate::abi;
use crate::pb;
use pb::tycho::evm::v1::{self as tycho};
use substreams::hex;
use substreams::scalar::BigInt;
/// This trait defines some helpers for serializing and deserializing `Vec<BigInt` which is needed
/// to be able to encode the `normalized_weights` and `weights` `Attribute`s. This should also be
/// handled by any downstream application.
trait SerializableVecBigInt {
fn serialize_bytes(&self) -> Vec<u8>;
fn deserialize_bytes(bytes: &[u8]) -> Vec<BigInt>;
}
impl SerializableVecBigInt for Vec<BigInt> {
fn serialize_bytes(&self) -> Vec<u8> {
self.iter()
.flat_map(|big_int| big_int.to_signed_bytes_be())
.collect()
}
fn deserialize_bytes(bytes: &[u8]) -> Vec<BigInt> {
bytes
.chunks_exact(32)
.map(|chunk| BigInt::from_signed_bytes_be(chunk))
.collect::<Vec<BigInt>>()
}
}
/// This is the main function that handles the creation of `ProtocolComponent`s with `Attribute`s
/// based on the specific factory address. There's 3 factory groups that are represented here:
/// - Weighted Pool Factories
/// - Linear Pool Factories
/// - Stable Pool Factories
/// (Balancer does have a bit more (esp. in the deprecated section) that could be implemented as
/// desired.)
/// We use the specific ABIs to decode both the log event and cooresponding call to gather
/// `PoolCreated` event information alongside the `Create` calldata that provide us details to
/// fufill both the required details + any extra `Attributes`
/// Ref: https://docs.balancer.fi/reference/contracts/deployment-addresses/mainnet.html
pub fn address_map(
pool_addr: &[u8],
log: &Log,
call: &Call,
) -> Option<tycho::ProtocolComponent> {
match *pool_addr {
hex!("897888115Ada5773E02aA29F775430BFB5F34c51") => {
let create_call =
abi::weighted_pool_factory::functions::Create::match_and_decode(call)?;
let pool_created =
abi::weighted_pool_factory::events::PoolCreated::match_and_decode(log)?;
Some(tycho::ProtocolComponent {
id: hex::encode(&pool_created.pool),
tokens: create_call.tokens,
contracts: vec![pool_addr.into(), pool_created.pool],
static_att: vec![
tycho::Attribute {
name: "pool_type".into(),
value: "WeightedPoolFactory".into(),
change: tycho::ChangeType::Creation.into(),
},
tycho::Attribute {
name: "normalized_weights".into(),
value: create_call.normalized_weights.serialize_bytes(),
change: tycho::ChangeType::Creation.into(),
},
],
change: tycho::ChangeType::Creation.into(),
})
}
hex!("DB8d758BCb971e482B2C45f7F8a7740283A1bd3A") => {
let create_call =
abi::composable_stable_pool_factory::functions::Create::match_and_decode(call)?;
let pool_created =
abi::composable_stable_pool_factory::events::PoolCreated::match_and_decode(log)?;
Some(tycho::ProtocolComponent {
id: hex::encode(&pool_created.pool),
tokens: create_call.tokens,
contracts: vec![pool_addr.into(), pool_created.pool],
static_att: vec![
tycho::Attribute {
name: "pool_type".into(),
value: "ComposableStablePoolFactory".into(),
change: tycho::ChangeType::Creation.into(),
},
],
change: tycho::ChangeType::Creation.into(),
})
}
hex!("813EE7a840CE909E7Fea2117A44a90b8063bd4fd") => {
let create_call =
abi::erc_linear_pool_factory::functions::Create::match_and_decode(call)?;
let pool_created =
abi::erc_linear_pool_factory::events::PoolCreated::match_and_decode(log)?;
Some(tycho::ProtocolComponent {
id: hex::encode(&pool_created.pool),
tokens: vec![create_call.main_token, create_call.wrapped_token],
contracts: vec![pool_addr.into(), pool_created.pool],
static_att: vec![
tycho::Attribute {
name: "pool_type".into(),
value: "ERC4626LinearPoolFactory".into(),
change: tycho::ChangeType::Creation.into(),
},
tycho::Attribute {
name: "upper_target".into(),
value: create_call.upper_target.to_signed_bytes_be(),
change: tycho::ChangeType::Creation.into(),
},
// Note, `lower_target` is generally hardcoded for all pools, not located in call data
// Note, rate provider might be provided as `create.protocol_id`, but as a BigInt. needs investigation
],
change: tycho::ChangeType::Creation.into(),
})
}
hex!("5F43FBa61f63Fa6bFF101a0A0458cEA917f6B347") => {
let create_call =
abi::euler_linear_pool_factory::functions::Create::match_and_decode(call)?;
let pool_created =
abi::euler_linear_pool_factory::events::PoolCreated::match_and_decode(log)?;
Some(tycho::ProtocolComponent {
id: hex::encode(&pool_created.pool),
tokens: vec![create_call.main_token, create_call.wrapped_token],
contracts: vec![pool_addr.into(), pool_created.pool],
static_att: vec![
tycho::Attribute {
name: "pool_type".into(),
value: "EulerLinearPoolFactory".into(),
change: tycho::ChangeType::Creation.into(),
},
tycho::Attribute {
name: "upper_target".into(),
value: create_call.upper_target.to_signed_bytes_be(),
change: tycho::ChangeType::Creation.into(),
},
],
change: tycho::ChangeType::Creation.into(),
})
}
// ❌ Reading the deployed factory for Gearbox showcases that it's currently disabled
// hex!("39A79EB449Fc05C92c39aA6f0e9BfaC03BE8dE5B") => {
// let create_call =
// abi::gearbox_linear_pool_factory::functions::Create::match_and_decode(call)?;
// let pool_created =
// abi::gearbox_linear_pool_factory::events::PoolCreated::match_and_decode(log)?;
// Some(tycho::ProtocolComponent {
// id: hex::encode(&pool_created.pool),
// tokens: vec![create_call.main_token, create_call.wrapped_token],
// contracts: vec![pool_addr.into(), pool_created.pool],
// static_att: vec![
// tycho::Attribute {
// name: "pool_type".into(),
// value: "GearboxLinearPoolFactory".into(),
// change: tycho::ChangeType::Creation.into(),
// },
// tycho::Attribute {
// name: "upper_target".into(),
// value: create_call.upper_target.to_signed_bytes_be(),
// change: tycho::ChangeType::Creation.into(),
// },
// ],
// change: tycho::ChangeType::Creation.into(),
// })
// }
// ❌ The `ManagedPoolFactory` is a bit ✨ unique ✨, so we'll leave it commented out for now
// Take a look at it's `Create` call to see how the params are structured.
// hex!("BF904F9F340745B4f0c4702c7B6Ab1e808eA6b93") => {
// let create_call = abi::managed_pool_factory::functions::Create::match_and_decode(call)?;
// let pool_created =
// abi::managed_pool_factory::events::PoolCreated::match_and_decode(log)?;
// Some(tycho::ProtocolComponent {
// id: hex::encode(&pool_created.pool),
// tokens: create_call.tokens,
// contracts: vec![pool_addr.into(), pool_created.pool],
// static_att: vec![
// tycho::Attribute {
// name: "pool_type".into(),
// value: "ManagedPoolFactory".into(),
// change: tycho::ChangeType::Creation.into(),
// },
// ],
// change: tycho::ChangeType::Creation.into(),
// })
// }
hex!("4E11AEec21baF1660b1a46472963cB3DA7811C89") => {
let create_call =
abi::silo_linear_pool_factory::functions::Create::match_and_decode(call)?;
let pool_created =
abi::silo_linear_pool_factory::events::PoolCreated::match_and_decode(log)?;
Some(tycho::ProtocolComponent {
id: hex::encode(&pool_created.pool),
tokens: vec![create_call.main_token, create_call.wrapped_token],
contracts: vec![pool_addr.into(), pool_created.pool],
static_att: vec![
tycho::Attribute {
name: "pool_type".into(),
value: "SiloLinearPoolFactory".into(),
change: tycho::ChangeType::Creation.into(),
},
tycho::Attribute {
name: "upper_target".into(),
value: create_call.upper_target.to_signed_bytes_be(),
change: tycho::ChangeType::Creation.into(),
},
],
change: tycho::ChangeType::Creation.into(),
})
}
hex!("5F5222Ffa40F2AEd6380D022184D6ea67C776eE0") => {
let create_call =
abi::yearn_linear_pool_factory::functions::Create::match_and_decode(call)?;
let pool_created =
abi::yearn_linear_pool_factory::events::PoolCreated::match_and_decode(log)?;
Some(tycho::ProtocolComponent {
id: hex::encode(&pool_created.pool),
tokens: vec![create_call.main_token, create_call.wrapped_token],
contracts: vec![pool_addr.into(), pool_created.pool],
static_att: vec![
tycho::Attribute {
name: "pool_type".into(),
value: "YearnLinearPoolFactory".into(),
change: tycho::ChangeType::Creation.into(),
},
tycho::Attribute {
name: "upper_target".into(),
value: create_call.upper_target.to_signed_bytes_be(),
change: tycho::ChangeType::Creation.into(),
},
],
change: tycho::ChangeType::Creation.into(),
})
}
// The `WeightedPool2TokenFactory` is a deprecated contract but we've included it since one
// of the highest TVL pools, 80BAL-20WETH, is able to be tracked.
hex!("A5bf2ddF098bb0Ef6d120C98217dD6B141c74EE0") => {
let create_call =
abi::weighted_pool_tokens_factory::functions::Create::match_and_decode(call)?;
let pool_created =
abi::weighted_pool_tokens_factory::events::PoolCreated::match_and_decode(log)?;
Some(tycho::ProtocolComponent {
id: hex::encode(&pool_created.pool),
tokens: create_call.tokens,
contracts: vec![pool_addr.into(), pool_created.pool],
static_att: vec![
tycho::Attribute {
name: "pool_type".into(),
value: "WeightedPool2TokensFactory".into(),
change: tycho::ChangeType::Creation.into(),
},
tycho::Attribute {
name: "weights".into(),
value: create_call.weights.serialize_bytes(),
change: tycho::ChangeType::Creation.into(),
},
],
change: tycho::ChangeType::Creation.into(),
})
}
_ => None,
}
}

View File

@@ -17,6 +17,22 @@ binaries:
file: target/wasm32-unknown-unknown/release/substreams_balancer.wasm file: target/wasm32-unknown-unknown/release/substreams_balancer.wasm
modules: modules:
- name: map_pools_created
kind: map
initialBlock: 12369300
inputs:
- source: sf.ethereum.type.v2.Block
output:
type: proto:tycho.evm.v1.GroupedTransactionProtocolComponents
- name: store_pools_created
kind: store
initialBlock: 12369300
updatePolicy: add
valueType: int64
inputs:
- map: map_pools_created
- name: map_balance_deltas - name: map_balance_deltas
kind: map kind: map
initialBlock: 12369300 # An arbitrary block that should change based on your requirements initialBlock: 12369300 # An arbitrary block that should change based on your requirements
@@ -38,7 +54,9 @@ modules:
initialBlock: 12369300 initialBlock: 12369300
inputs: inputs:
- source: sf.ethereum.type.v2.Block - source: sf.ethereum.type.v2.Block
- map: map_pools_created
- map: map_balance_deltas - map: map_balance_deltas
- store: store_pools_created
- store: store_balance_changes - store: store_balance_changes
mode: deltas # This is the key property that simplifies `BalanceChange` handling mode: deltas # This is the key property that simplifies `BalanceChange` handling
output: output: