From 9f82671082d635fc595324e448d41cb88a4a5632 Mon Sep 17 00:00:00 2001 From: 0xMochan Date: Mon, 29 Jan 2024 11:11:06 -0500 Subject: [PATCH] feat: lots of refactoring (file splitting - `lib.rs` split into multiple files with `modules.rs` being the main entrypoint for substreams - contract changes are tracked similiarly to ambient (generalized to use a store of tracked contracts) - static attributes were pruned of the dynamic ones --- substreams/ethereum-balancer/abi/get_abis.py | 3 +- .../proto/tycho/evm/v1/common.proto | 9 +- .../ethereum-balancer/src/contract_changes.rs | 190 +++++++ substreams/ethereum-balancer/src/lib.rs | 492 +----------------- substreams/ethereum-balancer/src/modules.rs | 239 +++++++++ substreams/ethereum-balancer/src/pb/mod.rs | 1 + .../ethereum-balancer/src/pb/tycho.evm.v1.rs | 21 +- .../ethereum-balancer/src/pool_factories.rs | 273 ++++++++++ substreams/ethereum-balancer/substreams.yaml | 18 + 9 files changed, 744 insertions(+), 502 deletions(-) create mode 100644 substreams/ethereum-balancer/src/contract_changes.rs create mode 100644 substreams/ethereum-balancer/src/modules.rs create mode 100644 substreams/ethereum-balancer/src/pool_factories.rs diff --git a/substreams/ethereum-balancer/abi/get_abis.py b/substreams/ethereum-balancer/abi/get_abis.py index 6649c17..6b941e8 100644 --- a/substreams/ethereum-balancer/abi/get_abis.py +++ b/substreams/ethereum-balancer/abi/get_abis.py @@ -9,12 +9,13 @@ import requests # Exports contract ABI in JSON abis = { + # Factories "WeightedPoolFactory (v4)": "0x897888115Ada5773E02aA29F775430BFB5F34c51", "WeightedPool2TokensFactory": "0xA5bf2ddF098bb0Ef6d120C98217dD6B141c74EE0", # 80Bal-20WETH "ComposableStablePoolFactory (v5)": "0xDB8d758BCb971e482B2C45f7F8a7740283A1bd3A", "ERC4626LinearPoolFactory (v4)": "0x813EE7a840CE909E7Fea2117A44a90b8063bd4fd", "EulerLinearPoolFactory": "0x5F43FBa61f63Fa6bFF101a0A0458cEA917f6B347", - "GearboxLinearPoolFactory (v2)": "0x39A79EB449Fc05C92c39aA6f0e9BfaC03BE8dE5B", + # "GearboxLinearPoolFactory (v2)": "0x39A79EB449Fc05C92c39aA6f0e9BfaC03BE8dE5B", "ManagedPoolFactory (v2)": "0xBF904F9F340745B4f0c4702c7B6Ab1e808eA6b93", "SiloLinearPoolFactory (v2)": "0x4E11AEec21baF1660b1a46472963cB3DA7811C89", "YearnLinearPoolFactory (v2)": "0x5F5222Ffa40F2AEd6380D022184D6ea67C776eE0", diff --git a/substreams/ethereum-balancer/proto/tycho/evm/v1/common.proto b/substreams/ethereum-balancer/proto/tycho/evm/v1/common.proto index b88d0f6..1a5caf4 100644 --- a/substreams/ethereum-balancer/proto/tycho/evm/v1/common.proto +++ b/substreams/ethereum-balancer/proto/tycho/evm/v1/common.proto @@ -70,8 +70,13 @@ message ProtocolComponent { ChangeType change = 5; } -message ProtocolComponents { - repeated ProtocolComponent components = 1; +message TransactionProtocolComponents { + Transaction tx = 1; + repeated ProtocolComponent components = 2; +} + +message GroupedTransactionProtocolComponents { + repeated TransactionProtocolComponents tx_components = 1; } // A struct for following the changes of Total Value Locked (TVL) of a protocol component. diff --git a/substreams/ethereum-balancer/src/contract_changes.rs b/substreams/ethereum-balancer/src/contract_changes.rs new file mode 100644 index 0000000..4ab49fe --- /dev/null +++ b/substreams/ethereum-balancer/src/contract_changes.rs @@ -0,0 +1,190 @@ +/// This file contains helpers to capture contract changes from the expanded block model. These +/// leverage the `code_changes`, `balance_changes`, and `storage_changes` fields available on the +/// `Call` type provided by block model in a substream (i.e. `logs_and_calls`, etc). +/// +/// ⚠️ These helpers *only* work if the **expanded block model** is available, more info blow. +/// https://streamingfastio.medium.com/new-block-model-to-accelerate-chain-integration-9f65126e5425 +use std::collections::HashMap; + +use substreams_ethereum::pb::eth; + +use pb::tycho::evm::v1::{self as tycho}; + +use substreams::store::{StoreGet, StoreGetInt64}; + +use crate::pb; + +struct SlotValue { + new_value: Vec, + start_value: Vec, +} + +impl SlotValue { + fn has_changed(&self) -> bool { + self.start_value != self.new_value + } +} + +// Uses a map for slots, protobuf does not allow bytes in hashmap keys +pub struct InterimContractChange { + address: Vec, + balance: Vec, + code: Vec, + slots: HashMap, SlotValue>, + change: tycho::ChangeType, +} + +impl From for tycho::ContractChange { + fn from(value: InterimContractChange) -> Self { + tycho::ContractChange { + address: value.address, + balance: value.balance, + code: value.code, + slots: value + .slots + .into_iter() + .filter(|(_, value)| value.has_changed()) + .map(|(slot, value)| tycho::ContractSlot { + slot, + value: value.new_value, + }) + .collect(), + change: value.change.into(), + } + } +} + +pub fn extract_contract_changes( + block: ð::v2::Block, + contracts: StoreGetInt64, + transaction_contract_changes: &mut HashMap, +) { + let mut changed_contracts: HashMap, InterimContractChange> = HashMap::new(); + + // Collect all accounts created in this block + let created_accounts: HashMap<_, _> = block + .transactions() + .flat_map(|tx| { + tx.calls.iter().flat_map(|call| { + call.account_creations + .iter() + .map(|ac| (&ac.account, ac.ordinal)) + }) + }) + .collect(); + + block.transactions().for_each(|block_tx| { + let mut storage_changes = Vec::new(); + let mut balance_changes = Vec::new(); + let mut code_changes = Vec::new(); + + block_tx + .calls + .iter() + .filter(|call| { + !call.state_reverted + && contracts + .get_last(format!("pool:{0}", hex::encode(&call.address))) + .is_some() + }) + .for_each(|call| { + storage_changes.extend(call.storage_changes.iter()); + balance_changes.extend(call.balance_changes.iter()); + code_changes.extend(call.code_changes.iter()); + }); + + storage_changes.sort_unstable_by_key(|change| change.ordinal); + balance_changes.sort_unstable_by_key(|change| change.ordinal); + code_changes.sort_unstable_by_key(|change| change.ordinal); + + storage_changes.iter().for_each(|storage_change| { + let contract_change = changed_contracts + .entry(storage_change.address.clone()) + .or_insert_with(|| InterimContractChange { + address: storage_change.address.clone(), + balance: Vec::new(), + code: Vec::new(), + slots: HashMap::new(), + change: if created_accounts.contains_key(&storage_change.address) { + tycho::ChangeType::Creation + } else { + tycho::ChangeType::Update + }, + }); + + let slot_value = contract_change + .slots + .entry(storage_change.key.clone()) + .or_insert_with(|| SlotValue { + new_value: storage_change.new_value.clone(), + start_value: storage_change.old_value.clone(), + }); + + slot_value + .new_value + .copy_from_slice(&storage_change.new_value); + }); + + balance_changes.iter().for_each(|balance_change| { + let contract_change = changed_contracts + .entry(balance_change.address.clone()) + .or_insert_with(|| InterimContractChange { + address: balance_change.address.clone(), + balance: Vec::new(), + code: Vec::new(), + slots: HashMap::new(), + change: if created_accounts.contains_key(&balance_change.address) { + tycho::ChangeType::Creation + } else { + tycho::ChangeType::Update + }, + }); + + if let Some(new_balance) = &balance_change.new_value { + contract_change.balance.clear(); + contract_change + .balance + .extend_from_slice(&new_balance.bytes); + } + }); + + code_changes.iter().for_each(|code_change| { + let contract_change = changed_contracts + .entry(code_change.address.clone()) + .or_insert_with(|| InterimContractChange { + address: code_change.address.clone(), + balance: Vec::new(), + code: Vec::new(), + slots: HashMap::new(), + change: if created_accounts.contains_key(&code_change.address) { + tycho::ChangeType::Creation + } else { + tycho::ChangeType::Update + }, + }); + + contract_change.code.clear(); + contract_change + .code + .extend_from_slice(&code_change.new_code); + }); + + if !storage_changes.is_empty() || !balance_changes.is_empty() || !code_changes.is_empty() { + transaction_contract_changes + .entry(block_tx.index.into()) + .or_insert_with(|| tycho::TransactionContractChanges { + tx: Some(tycho::Transaction { + hash: block_tx.hash.clone(), + from: block_tx.from.clone(), + to: block_tx.to.clone(), + index: block_tx.index as u64, + }), + contract_changes: vec![], + component_changes: vec![], + balance_changes: vec![], + }) + .contract_changes + .extend(changed_contracts.drain().map(|(_, change)| change.into())); + } + }); +} diff --git a/substreams/ethereum-balancer/src/lib.rs b/substreams/ethereum-balancer/src/lib.rs index 7a27d81..88ecc50 100644 --- a/substreams/ethereum-balancer/src/lib.rs +++ b/substreams/ethereum-balancer/src/lib.rs @@ -1,491 +1,5 @@ -use std::collections::HashMap; - -use anyhow::Result; -use substreams::hex; -use substreams::pb::substreams::StoreDeltas; -use substreams::store::{StoreAdd, StoreAddBigInt, StoreNew}; - -use substreams::key; -use substreams::scalar::BigInt; - -use substreams_ethereum::pb::eth; -use substreams_ethereum::pb::eth::v2::{Call, Log}; -use substreams_ethereum::{Event, Function}; - -use itertools::Itertools; -use pb::tycho::evm::v1::{self as tycho}; - mod abi; +mod contract_changes; +mod modules; mod pb; - -const VAULT_ADDRESS: &[u8] = &hex!("BA12222222228d8Ba445958a75a0704d566BF2C8"); - -/// This trait defines some helpers for serializing and deserializing `Vec Vec; - fn deserialize_bytes(bytes: &[u8]) -> Vec; -} - -impl SerializableVecBigInt for Vec { - fn serialize_bytes(&self) -> Vec { - self.iter() - .flat_map(|big_int| big_int.to_signed_bytes_be()) - .collect() - } - fn deserialize_bytes(bytes: &[u8]) -> Vec { - bytes - .chunks_exact(32) - .map(|chunk| BigInt::from_signed_bytes_be(chunk)) - .collect::>() - } -} - -/// This struct purely exists to spoof the `PartialEq` trait for `Transaction` so we can use it in -/// a later groupby operation. -struct TransactionWrapper(tycho::Transaction); - -impl PartialEq for TransactionWrapper { - fn eq(&self, other: &Self) -> bool { - self.0.hash == other.0.hash - } -} - -/// This is the main function that handles the creation of `ProtocolComponent`s with `Attribute`s -/// based on the specific factory address. There's 3 factory groups that are represented here: -/// - Weighted Pool Factories -/// - Linear Pool Factories -/// - Stable Pool Factories -/// (Balancer does have a bit more (esp. in the deprecated section) that could be implemented as -/// desired.) -/// We use the specific ABIs to decode both the log event and cooresponding call to gather -/// `PoolCreated` event information alongside the `Create` calldata that provide us details to -/// fufill both the required details + any extra `Attributes` -/// Ref: https://docs.balancer.fi/reference/contracts/deployment-addresses/mainnet.html -fn pool_factory_map(pool_addr: &[u8], log: &Log, call: &Call) -> Option { - match *pool_addr { - hex!("897888115Ada5773E02aA29F775430BFB5F34c51") => { - let create_call = - abi::weighted_pool_factory::functions::Create::match_and_decode(call)?; - let pool_created = - abi::weighted_pool_factory::events::PoolCreated::match_and_decode(log)?; - - Some(tycho::ProtocolComponent { - id: hex::encode(&pool_created.pool), - tokens: create_call.tokens, - contracts: vec![pool_addr.into(), pool_created.pool], - static_att: vec![ - tycho::Attribute { - name: "pool_type".into(), - value: "WeightedPoolFactory".into(), - change: tycho::ChangeType::Creation.into(), - }, - tycho::Attribute { - name: "swap_fee_percentage".into(), - value: create_call.swap_fee_percentage.to_signed_bytes_be(), - change: tycho::ChangeType::Creation.into(), - }, - tycho::Attribute { - name: "normalized_weights".into(), - value: create_call.normalized_weights.serialize_bytes(), - change: tycho::ChangeType::Creation.into(), - }, - ], - change: tycho::ChangeType::Creation.into(), - }) - } - hex!("DB8d758BCb971e482B2C45f7F8a7740283A1bd3A") => { - let create_call = - abi::composable_stable_pool_factory::functions::Create::match_and_decode(call)?; - let pool_created = - abi::composable_stable_pool_factory::events::PoolCreated::match_and_decode(log)?; - - Some(tycho::ProtocolComponent { - id: hex::encode(&pool_created.pool), - tokens: create_call.tokens, - contracts: vec![pool_addr.into(), pool_created.pool], - static_att: vec![ - tycho::Attribute { - name: "pool_type".into(), - value: "ComposableStablePoolFactory".into(), - change: tycho::ChangeType::Creation.into(), - }, - tycho::Attribute { - name: "swap_fee_percentage".into(), - value: create_call.swap_fee_percentage.to_signed_bytes_be(), - change: tycho::ChangeType::Creation.into(), - }, - tycho::Attribute { - name: "amplification_parameter".into(), - value: create_call.amplification_parameter.to_signed_bytes_be(), - change: tycho::ChangeType::Creation.into(), - }, - ], - change: tycho::ChangeType::Creation.into(), - }) - } - hex!("813EE7a840CE909E7Fea2117A44a90b8063bd4fd") => { - let create_call = - abi::erc_linear_pool_factory::functions::Create::match_and_decode(call)?; - let pool_created = - abi::erc_linear_pool_factory::events::PoolCreated::match_and_decode(log)?; - - Some(tycho::ProtocolComponent { - id: hex::encode(&pool_created.pool), - tokens: vec![create_call.main_token, create_call.wrapped_token], - contracts: vec![pool_addr.into(), pool_created.pool], - static_att: vec![ - tycho::Attribute { - name: "pool_type".into(), - value: "ERC4626LinearPoolFactory".into(), - change: tycho::ChangeType::Creation.into(), - }, - tycho::Attribute { - name: "swap_fee_percentage".into(), - value: create_call.swap_fee_percentage.to_signed_bytes_be(), - change: tycho::ChangeType::Creation.into(), - }, - tycho::Attribute { - name: "upper_target".into(), - value: create_call.upper_target.to_signed_bytes_be(), - change: tycho::ChangeType::Creation.into(), - }, - // Note, `lower_target` is generally hardcoded for all pools, not located in call data - // Note, rate provider might be provided as `create.protocol_id`, but as a BigInt. needs investigation - ], - change: tycho::ChangeType::Creation.into(), - }) - } - hex!("5F43FBa61f63Fa6bFF101a0A0458cEA917f6B347") => { - let create_call = - abi::euler_linear_pool_factory::functions::Create::match_and_decode(call)?; - let pool_created = - abi::euler_linear_pool_factory::events::PoolCreated::match_and_decode(log)?; - - Some(tycho::ProtocolComponent { - id: hex::encode(&pool_created.pool), - tokens: vec![create_call.main_token, create_call.wrapped_token], - contracts: vec![pool_addr.into(), pool_created.pool], - static_att: vec![ - tycho::Attribute { - name: "pool_type".into(), - value: "EulerLinearPoolFactory".into(), - change: tycho::ChangeType::Creation.into(), - }, - tycho::Attribute { - name: "swap_fee_percentage".into(), - value: create_call.swap_fee_percentage.to_signed_bytes_be(), - change: tycho::ChangeType::Creation.into(), - }, - tycho::Attribute { - name: "upper_target".into(), - value: create_call.upper_target.to_signed_bytes_be(), - change: tycho::ChangeType::Creation.into(), - }, - ], - change: tycho::ChangeType::Creation.into(), - }) - } - hex!("39A79EB449Fc05C92c39aA6f0e9BfaC03BE8dE5B") => { - let create_call = - abi::gearbox_linear_pool_factory::functions::Create::match_and_decode(call)?; - let pool_created = - abi::gearbox_linear_pool_factory::events::PoolCreated::match_and_decode(log)?; - - Some(tycho::ProtocolComponent { - id: hex::encode(&pool_created.pool), - tokens: vec![create_call.main_token, create_call.wrapped_token], - contracts: vec![pool_addr.into(), pool_created.pool], - static_att: vec![ - tycho::Attribute { - name: "pool_type".into(), - value: "GearboxLinearPoolFactory".into(), - change: tycho::ChangeType::Creation.into(), - }, - tycho::Attribute { - name: "swap_fee_percentage".into(), - value: create_call.swap_fee_percentage.to_signed_bytes_be(), - change: tycho::ChangeType::Creation.into(), - }, - tycho::Attribute { - name: "upper_target".into(), - value: create_call.upper_target.to_signed_bytes_be(), - change: tycho::ChangeType::Creation.into(), - }, - ], - change: tycho::ChangeType::Creation.into(), - }) - } - // The `ManagedPoolFactory` is a bit ✨ unique ✨, so we'll leave it commented out for now - // Take a look at it's `Create` call to see how the params are structured. - // hex!("BF904F9F340745B4f0c4702c7B6Ab1e808eA6b93") => { - // let create_call = abi::managed_pool_factory::functions::Create::match_and_decode(call)?; - // let pool_created = - // abi::managed_pool_factory::events::PoolCreated::match_and_decode(log)?; - - // Some(tycho::ProtocolComponent { - // id: hex::encode(&pool_created.pool), - // tokens: create_call.tokens, - // contracts: vec![pool_addr.into(), pool_created.pool], - // static_att: vec![ - // tycho::Attribute { - // name: "pool_type".into(), - // value: "ManagedPoolFactory".into(), - // change: tycho::ChangeType::Creation.into(), - // }, - // tycho::Attribute { - // name: "swap_fee_percentage".into(), - // value: create_call.swap_fee_percentage.to_signed_bytes_be(), - // change: tycho::ChangeType::Creation.into(), - // }, - // ], - // change: tycho::ChangeType::Creation.into(), - // }) - // } - hex!("4E11AEec21baF1660b1a46472963cB3DA7811C89") => { - let create_call = - abi::silo_linear_pool_factory::functions::Create::match_and_decode(call)?; - let pool_created = - abi::silo_linear_pool_factory::events::PoolCreated::match_and_decode(log)?; - - Some(tycho::ProtocolComponent { - id: hex::encode(&pool_created.pool), - tokens: vec![create_call.main_token, create_call.wrapped_token], - contracts: vec![pool_addr.into(), pool_created.pool], - static_att: vec![ - tycho::Attribute { - name: "pool_type".into(), - value: "SiloLinearPoolFactory".into(), - change: tycho::ChangeType::Creation.into(), - }, - tycho::Attribute { - name: "swap_fee_percentage".into(), - value: create_call.swap_fee_percentage.to_signed_bytes_be(), - change: tycho::ChangeType::Creation.into(), - }, - tycho::Attribute { - name: "upper_target".into(), - value: create_call.upper_target.to_signed_bytes_be(), - change: tycho::ChangeType::Creation.into(), - }, - ], - change: tycho::ChangeType::Creation.into(), - }) - } - hex!("5F5222Ffa40F2AEd6380D022184D6ea67C776eE0") => { - let create_call = - abi::yearn_linear_pool_factory::functions::Create::match_and_decode(call)?; - let pool_created = - abi::yearn_linear_pool_factory::events::PoolCreated::match_and_decode(log)?; - - Some(tycho::ProtocolComponent { - id: hex::encode(&pool_created.pool), - tokens: vec![create_call.main_token, create_call.wrapped_token], - contracts: vec![pool_addr.into(), pool_created.pool], - static_att: vec![ - tycho::Attribute { - name: "pool_type".into(), - value: "YearnLinearPoolFactory".into(), - change: tycho::ChangeType::Creation.into(), - }, - tycho::Attribute { - name: "swap_fee_percentage".into(), - value: create_call.swap_fee_percentage.to_signed_bytes_be(), - change: tycho::ChangeType::Creation.into(), - }, - tycho::Attribute { - name: "upper_target".into(), - value: create_call.upper_target.to_signed_bytes_be(), - change: tycho::ChangeType::Creation.into(), - }, - ], - change: tycho::ChangeType::Creation.into(), - }) - } - // The `WeightedPool2TokenFactory` is a deprecated contract but we've included it since one - // of the highest TVL pools, 80BAL-20WETH, is able to be tracked. - hex!("A5bf2ddF098bb0Ef6d120C98217dD6B141c74EE0") => { - let create_call = - abi::weighted_pool_tokens_factory::functions::Create::match_and_decode(call)?; - let pool_created = - abi::weighted_pool_tokens_factory::events::PoolCreated::match_and_decode(log)?; - - Some(tycho::ProtocolComponent { - id: hex::encode(&pool_created.pool), - tokens: create_call.tokens, - contracts: vec![pool_addr.into(), pool_created.pool], - static_att: vec![ - tycho::Attribute { - name: "pool_type".into(), - value: "WeightedPool2TokensFactory".into(), - change: tycho::ChangeType::Creation.into(), - }, - tycho::Attribute { - name: "swap_fee_percentage".into(), - value: create_call.swap_fee_percentage.to_signed_bytes_be(), - change: tycho::ChangeType::Creation.into(), - }, - // TODO - tycho::Attribute { - name: "weights".into(), - value: create_call.weights.serialize_bytes(), - change: tycho::ChangeType::Creation.into(), - }, - ], - change: tycho::ChangeType::Creation.into(), - }) - } - _ => None, - } -} - -/// Since the `PoolBalanceChanged` events administer only deltas, we need to leverage a map and a -/// store to be able to tally up final balances for tokens in a pool. -#[substreams::handlers::map] -pub fn map_balance_deltas(block: eth::v2::Block) -> Result { - Ok(tycho::BalanceDeltas { - balance_deltas: block - .events::(&[&VAULT_ADDRESS]) - .flat_map(|(event, log)| { - event - .tokens - .iter() - .zip(event.deltas.iter()) - .map(|(token, delta)| tycho::BalanceDelta { - ord: log.log.ordinal, - tx: Some(tycho::Transaction { - hash: log.receipt.transaction.hash.clone(), - from: log.receipt.transaction.from.clone(), - to: log.receipt.transaction.to.clone(), - index: Into::::into(log.receipt.transaction.index), - }), - token: token.clone(), - delta: delta.to_signed_bytes_be(), - component_id: event.pool_id.into(), - }) - .collect::>() - }) - .collect::>(), - }) -} - -/// It's significant to include both the `pool_id` and the `token_id` for each balance delta as the -/// store key to ensure that there's a unique balance being tallied for each. -#[substreams::handlers::store] -pub fn store_balance_changes(deltas: tycho::BalanceDeltas, store: StoreAddBigInt) { - deltas.balance_deltas.iter().for_each(|delta| { - store.add( - delta.ord, - format!( - "pool:{0}:token:{1}", - hex::encode(&delta.component_id), - hex::encode(&delta.token) - ), - BigInt::from_signed_bytes_be(&delta.delta), - ); - }); -} - -/// This is the main map that handles most of the indexing of this substream. -#[substreams::handlers::map] -pub fn map_changes( - block: eth::v2::Block, - deltas: tycho::BalanceDeltas, - store: StoreDeltas, // Note, this map module is using the `deltas` mode for the store. -) -> Result { - // Gather contract changes by indexing `PoolCreated` events and analysing the `Create` call - // We store these as a hashmap by tx hash since we need to agg by tx hash later - let mut transaction_contract_changes = block - .transactions() - .flat_map(|tx| { - tx.logs_with_calls() - .filter(|(_, call)| !call.call.state_reverted) - .filter_map(|(log, call)| { - let pool_factory_address = call.call.address.as_slice(); - - Some(( - tx.hash.clone(), - tycho::TransactionContractChanges { - tx: Some(tycho::Transaction { - hash: tx.hash.clone(), - from: tx.from.clone(), - to: tx.to.clone(), - index: Into::::into(tx.index), - }), - contract_changes: vec![], - balance_changes: vec![], - component_changes: vec![pool_factory_map( - pool_factory_address, - log, - call.call, - )?], - }, - )) - }) - }) - .collect::>(); - - // Balance changes are gathered by the `StoreDelta` based on `PoolBalanceChanged` creating `BalanceDeltas` - // We essentially just process the changes that occured to the `store` this block - // Then, these balance changes are merged onto the existing map of tx contract changes, - // inserting a new one if it doesn't exist. - store - .deltas - .into_iter() - .zip(deltas.balance_deltas) - .map(|(store_delta, balance_delta)| { - let pool_id = key::segment_at(&store_delta.key, 1); - let token_id = key::segment_at(&store_delta.key, 3); - ( - balance_delta.tx.unwrap(), - tycho::BalanceChange { - token: hex::decode(token_id).expect("Token ID not valid hex"), - balance: store_delta.new_value, - component_id: hex::decode(pool_id).expect("Token ID not valid hex"), - }, - ) - }) - // We need to group the balance changes by tx hash for the `TransactionContractChanges` agg - .group_by(|(tx, _)| TransactionWrapper(tx.clone())) - .into_iter() - .for_each(|(tx_wrapped, group)| { - let tx = tx_wrapped.0; - - if let Some(tx_change) = transaction_contract_changes.get_mut(&tx.hash) { - tx_change - .balance_changes - .extend(group.map(|(_, change)| change.clone())); - } else { - transaction_contract_changes.insert( - tx.hash.clone(), - tycho::TransactionContractChanges { - tx: Some(tx), - contract_changes: vec![], - component_changes: vec![], - balance_changes: group - .map(|(_, change)| change.clone()) - .collect::>(), - }, - ); - } - }); - - Ok(tycho::BlockContractChanges { - block: Some(tycho::Block { - number: block.number, - hash: block.hash.clone(), - parent_hash: block - .header - .as_ref() - .expect("Block header not present") - .parent_hash - .clone(), - ts: block.timestamp_seconds(), - }), - changes: transaction_contract_changes - .into_values() - .sorted_unstable_by_key(|tx_change| tx_change.tx.clone().unwrap().index) - .collect::>(), - }) -} +mod pool_factories; diff --git a/substreams/ethereum-balancer/src/modules.rs b/substreams/ethereum-balancer/src/modules.rs new file mode 100644 index 0000000..7471c36 --- /dev/null +++ b/substreams/ethereum-balancer/src/modules.rs @@ -0,0 +1,239 @@ +use std::collections::HashMap; + +use anyhow::Result; +use substreams::hex; +use substreams::pb::substreams::StoreDeltas; +use substreams::store::{ + StoreAdd, StoreAddBigInt, StoreAddInt64, StoreGet, StoreGetInt64, StoreNew, +}; + +use substreams::key; +use substreams::scalar::BigInt; + +use substreams_ethereum::pb::eth; + +use itertools::Itertools; +use pb::tycho::evm::v1::{self as tycho}; + +use contract_changes::extract_contract_changes; + +use crate::{abi, contract_changes, pb, pool_factories}; + +const VAULT_ADDRESS: &[u8] = &hex!("BA12222222228d8Ba445958a75a0704d566BF2C8"); + +/// This struct purely exists to spoof the `PartialEq` trait for `Transaction` so we can use it in +/// a later groupby operation. +#[derive(Debug)] +struct TransactionWrapper(tycho::Transaction); + +impl PartialEq for TransactionWrapper { + fn eq(&self, other: &Self) -> bool { + self.0.hash == other.0.hash + } +} + +#[substreams::handlers::map] +pub fn map_pools_created( + block: eth::v2::Block, +) -> Result { + // Gather contract changes by indexing `PoolCreated` events and analysing the `Create` call + // We store these as a hashmap by tx hash since we need to agg by tx hash later + Ok(tycho::GroupedTransactionProtocolComponents { + tx_components: block + .transactions() + .map(|tx| tycho::TransactionProtocolComponents { + tx: Some(tycho::Transaction { + hash: tx.hash.clone(), + from: tx.from.clone(), + to: tx.to.clone(), + index: Into::::into(tx.index), + }), + components: tx + .logs_with_calls() + .filter(|(_, call)| !call.call.state_reverted) + .filter_map(|(log, call)| { + Some(pool_factories::address_map( + call.call.address.as_slice(), + log, + call.call, + )?) + }) + .collect::>(), + }) + .collect::>(), + }) +} + +/// Simply stores the `ProtocolComponent`s with the pool id as the key +#[substreams::handlers::store] +pub fn store_pools_created(map: tycho::GroupedTransactionProtocolComponents, store: StoreAddInt64) { + store.add_many( + 0, + &map.tx_components + .iter() + .flat_map(|tx_components| &tx_components.components) + .map(|component| format!("pool:{0}", component.id)) + .collect::>(), + 1, + ); +} + +/// Since the `PoolBalanceChanged` events administer only deltas, we need to leverage a map and a +/// store to be able to tally up final balances for tokens in a pool. +#[substreams::handlers::map] +pub fn map_balance_deltas(block: eth::v2::Block) -> Result { + Ok(tycho::BalanceDeltas { + balance_deltas: block + .events::(&[&VAULT_ADDRESS]) + .flat_map(|(event, log)| { + event + .tokens + .iter() + .zip(event.deltas.iter()) + .map(|(token, delta)| tycho::BalanceDelta { + ord: log.log.ordinal, + tx: Some(tycho::Transaction { + hash: log.receipt.transaction.hash.clone(), + from: log.receipt.transaction.from.clone(), + to: log.receipt.transaction.to.clone(), + index: Into::::into(log.receipt.transaction.index), + }), + token: token.clone(), + delta: delta.to_signed_bytes_be(), + component_id: event.pool_id.into(), + }) + .collect::>() + }) + .collect::>(), + }) +} + +/// It's significant to include both the `pool_id` and the `token_id` for each balance delta as the +/// store key to ensure that there's a unique balance being tallied for each. +#[substreams::handlers::store] +pub fn store_balance_changes(deltas: tycho::BalanceDeltas, store: StoreAddBigInt) { + deltas.balance_deltas.iter().for_each(|delta| { + store.add( + delta.ord, + format!( + "pool:{0}:token:{1}", + hex::encode(&delta.component_id), + hex::encode(&delta.token) + ), + BigInt::from_signed_bytes_be(&delta.delta), + ); + }); +} + +/// This is the main map that handles most of the indexing of this substream. +/// Every contract change is grouped by transaction index via the `transaction_contract_changes` +/// map. Each block of code will extend the `TransactionContractChanges` struct with the +/// cooresponding changes (balance, component, contract), inserting a new one if it doesn't exist. +/// At the very end, the map can easily be sorted by index to ensure the final `BlockContractChanges` +/// is ordered by transactions properly. +#[substreams::handlers::map] +pub fn map_changes( + block: eth::v2::Block, + grouped_components: tycho::GroupedTransactionProtocolComponents, + deltas: tycho::BalanceDeltas, + components_store: StoreGetInt64, + balance_store: StoreDeltas, // Note, this map module is using the `deltas` mode for the store. +) -> Result { + // We merge contract changes by transaction (identified by transaction index) making it easy to + // sort them at the very end. + let mut transaction_contract_changes: HashMap<_, tycho::TransactionContractChanges> = + HashMap::new(); + + // `ProtocolComponents` are gathered from `map_pools_created` which just need a bit of work to + // convert into `TransactionContractChanges` + grouped_components + .tx_components + .iter() + .for_each(|tx_component| { + let tx = tx_component.tx.as_ref().unwrap(); + + transaction_contract_changes + .entry(tx.index) + .or_insert_with(|| tycho::TransactionContractChanges { + tx: Some(tx.clone()), + contract_changes: vec![], + component_changes: vec![], + balance_changes: vec![], + }) + .component_changes + .extend_from_slice(&tx_component.components); + }); + + // Balance changes are gathered by the `StoreDelta` based on `PoolBalanceChanged` creating + // `BalanceDeltas`. We essentially just process the changes that occured to the `store` this + // block. Then, these balance changes are merged onto the existing map of tx contract changes, + // inserting a new one if it doesn't exist. + balance_store + .deltas + .into_iter() + .zip(deltas.balance_deltas) + .map(|(store_delta, balance_delta)| { + let pool_id = key::segment_at(&store_delta.key, 1); + let token_id = key::segment_at(&store_delta.key, 3); + ( + balance_delta.tx.unwrap(), + tycho::BalanceChange { + token: hex::decode(token_id).expect("Token ID not valid hex"), + balance: store_delta.new_value, + component_id: hex::decode(pool_id).expect("Token ID not valid hex"), + }, + ) + }) + // We need to group the balance changes by tx hash for the `TransactionContractChanges` agg + .group_by(|(tx, _)| TransactionWrapper(tx.clone())) + .into_iter() + .for_each(|(tx_wrapped, group)| { + let tx = tx_wrapped.0; + + transaction_contract_changes + .entry(tx.index) + .or_insert_with(|| tycho::TransactionContractChanges { + tx: Some(tx.clone()), + contract_changes: vec![], + component_changes: vec![], + balance_changes: vec![], + }) + .balance_changes + .extend(group.map(|(_, change)| change)); + }); + + // General helper for extracting contract changes. Uses block, our component store which holds + // all of our tracked deployed pool addresses, and the map of tx contract changes which we + // output into for final processing later. + extract_contract_changes(&block, components_store, &mut transaction_contract_changes); + + // Process all `transaction_contract_changes` for final output in the `BlockContractChanges`, + // sorted by transaction index (the key). + Ok(tycho::BlockContractChanges { + block: Some(tycho::Block { + number: block.number, + hash: block.hash.clone(), + parent_hash: block + .header + .as_ref() + .expect("Block header not present") + .parent_hash + .clone(), + ts: block.timestamp_seconds(), + }), + changes: transaction_contract_changes + .drain() + .sorted_unstable_by_key(|(index, _)| index.clone()) + .filter_map(|(_, change)| { + if change.contract_changes.is_empty() + && change.component_changes.is_empty() + && change.balance_changes.is_empty() + { + None + } else { + Some(change) + } + }) + .collect::>(), + }) +} diff --git a/substreams/ethereum-balancer/src/pb/mod.rs b/substreams/ethereum-balancer/src/pb/mod.rs index abb6b01..43d8838 100644 --- a/substreams/ethereum-balancer/src/pb/mod.rs +++ b/substreams/ethereum-balancer/src/pb/mod.rs @@ -1,3 +1,4 @@ +// @generated pub mod tycho { pub mod evm { // @@protoc_insertion_point(attribute:tycho.evm.v1) diff --git a/substreams/ethereum-balancer/src/pb/tycho.evm.v1.rs b/substreams/ethereum-balancer/src/pb/tycho.evm.v1.rs index a2c781a..affc411 100644 --- a/substreams/ethereum-balancer/src/pb/tycho.evm.v1.rs +++ b/substreams/ethereum-balancer/src/pb/tycho.evm.v1.rs @@ -32,6 +32,7 @@ pub struct Transaction { #[prost(bytes="vec", tag="3")] pub to: ::prost::alloc::vec::Vec, /// The transactions index within the block. + /// TODO: should this be uint32? to match the type from the native substream type? #[prost(uint64, tag="4")] pub index: u64, } @@ -80,10 +81,18 @@ pub struct ProtocolComponent { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct ProtocolComponents { - #[prost(message, repeated, tag="1")] +pub struct TransactionProtocolComponents { + #[prost(message, optional, tag="1")] + pub tx: ::core::option::Option, + #[prost(message, repeated, tag="2")] pub components: ::prost::alloc::vec::Vec, } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GroupedTransactionProtocolComponents { + #[prost(message, repeated, tag="1")] + pub tx_components: ::prost::alloc::vec::Vec, +} /// A struct for following the changes of Total Value Locked (TVL) of a protocol component. /// Note that if a ProtocolComponent contains multiple contracts, the TVL is tracked for the component as a whole. /// E.g. for UniswapV2 pair WETH/USDC, this tracks the USDC and WETH balance of the pair contract. @@ -129,14 +138,6 @@ pub struct BalanceDeltas { #[prost(message, repeated, tag="1")] pub balance_deltas: ::prost::alloc::vec::Vec, } -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct BlockBalanceChanges { - #[prost(message, optional, tag="1")] - pub block: ::core::option::Option, - #[prost(message, repeated, tag="2")] - pub balance_changes: ::prost::alloc::vec::Vec, -} /// Enum to specify the type of a change. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] diff --git a/substreams/ethereum-balancer/src/pool_factories.rs b/substreams/ethereum-balancer/src/pool_factories.rs new file mode 100644 index 0000000..0956103 --- /dev/null +++ b/substreams/ethereum-balancer/src/pool_factories.rs @@ -0,0 +1,273 @@ +use substreams_ethereum::pb::eth::v2::{Call, Log}; +use substreams_ethereum::{Event, Function}; + +use crate::abi; +use crate::pb; +use pb::tycho::evm::v1::{self as tycho}; +use substreams::hex; + +use substreams::scalar::BigInt; + +/// This trait defines some helpers for serializing and deserializing `Vec Vec; + fn deserialize_bytes(bytes: &[u8]) -> Vec; +} + +impl SerializableVecBigInt for Vec { + fn serialize_bytes(&self) -> Vec { + self.iter() + .flat_map(|big_int| big_int.to_signed_bytes_be()) + .collect() + } + fn deserialize_bytes(bytes: &[u8]) -> Vec { + bytes + .chunks_exact(32) + .map(|chunk| BigInt::from_signed_bytes_be(chunk)) + .collect::>() + } +} + +/// This is the main function that handles the creation of `ProtocolComponent`s with `Attribute`s +/// based on the specific factory address. There's 3 factory groups that are represented here: +/// - Weighted Pool Factories +/// - Linear Pool Factories +/// - Stable Pool Factories +/// (Balancer does have a bit more (esp. in the deprecated section) that could be implemented as +/// desired.) +/// We use the specific ABIs to decode both the log event and cooresponding call to gather +/// `PoolCreated` event information alongside the `Create` calldata that provide us details to +/// fufill both the required details + any extra `Attributes` +/// Ref: https://docs.balancer.fi/reference/contracts/deployment-addresses/mainnet.html +pub fn address_map( + pool_addr: &[u8], + log: &Log, + call: &Call, +) -> Option { + match *pool_addr { + hex!("897888115Ada5773E02aA29F775430BFB5F34c51") => { + let create_call = + abi::weighted_pool_factory::functions::Create::match_and_decode(call)?; + let pool_created = + abi::weighted_pool_factory::events::PoolCreated::match_and_decode(log)?; + + Some(tycho::ProtocolComponent { + id: hex::encode(&pool_created.pool), + tokens: create_call.tokens, + contracts: vec![pool_addr.into(), pool_created.pool], + static_att: vec![ + tycho::Attribute { + name: "pool_type".into(), + value: "WeightedPoolFactory".into(), + change: tycho::ChangeType::Creation.into(), + }, + tycho::Attribute { + name: "normalized_weights".into(), + value: create_call.normalized_weights.serialize_bytes(), + change: tycho::ChangeType::Creation.into(), + }, + ], + change: tycho::ChangeType::Creation.into(), + }) + } + hex!("DB8d758BCb971e482B2C45f7F8a7740283A1bd3A") => { + let create_call = + abi::composable_stable_pool_factory::functions::Create::match_and_decode(call)?; + let pool_created = + abi::composable_stable_pool_factory::events::PoolCreated::match_and_decode(log)?; + + Some(tycho::ProtocolComponent { + id: hex::encode(&pool_created.pool), + tokens: create_call.tokens, + contracts: vec![pool_addr.into(), pool_created.pool], + static_att: vec![ + tycho::Attribute { + name: "pool_type".into(), + value: "ComposableStablePoolFactory".into(), + change: tycho::ChangeType::Creation.into(), + }, + ], + change: tycho::ChangeType::Creation.into(), + }) + } + hex!("813EE7a840CE909E7Fea2117A44a90b8063bd4fd") => { + let create_call = + abi::erc_linear_pool_factory::functions::Create::match_and_decode(call)?; + let pool_created = + abi::erc_linear_pool_factory::events::PoolCreated::match_and_decode(log)?; + + Some(tycho::ProtocolComponent { + id: hex::encode(&pool_created.pool), + tokens: vec![create_call.main_token, create_call.wrapped_token], + contracts: vec![pool_addr.into(), pool_created.pool], + static_att: vec![ + tycho::Attribute { + name: "pool_type".into(), + value: "ERC4626LinearPoolFactory".into(), + change: tycho::ChangeType::Creation.into(), + }, + tycho::Attribute { + name: "upper_target".into(), + value: create_call.upper_target.to_signed_bytes_be(), + change: tycho::ChangeType::Creation.into(), + }, + // Note, `lower_target` is generally hardcoded for all pools, not located in call data + // Note, rate provider might be provided as `create.protocol_id`, but as a BigInt. needs investigation + ], + change: tycho::ChangeType::Creation.into(), + }) + } + hex!("5F43FBa61f63Fa6bFF101a0A0458cEA917f6B347") => { + let create_call = + abi::euler_linear_pool_factory::functions::Create::match_and_decode(call)?; + let pool_created = + abi::euler_linear_pool_factory::events::PoolCreated::match_and_decode(log)?; + + Some(tycho::ProtocolComponent { + id: hex::encode(&pool_created.pool), + tokens: vec![create_call.main_token, create_call.wrapped_token], + contracts: vec![pool_addr.into(), pool_created.pool], + static_att: vec![ + tycho::Attribute { + name: "pool_type".into(), + value: "EulerLinearPoolFactory".into(), + change: tycho::ChangeType::Creation.into(), + }, + tycho::Attribute { + name: "upper_target".into(), + value: create_call.upper_target.to_signed_bytes_be(), + change: tycho::ChangeType::Creation.into(), + }, + ], + change: tycho::ChangeType::Creation.into(), + }) + } + // ❌ Reading the deployed factory for Gearbox showcases that it's currently disabled + // hex!("39A79EB449Fc05C92c39aA6f0e9BfaC03BE8dE5B") => { + // let create_call = + // abi::gearbox_linear_pool_factory::functions::Create::match_and_decode(call)?; + // let pool_created = + // abi::gearbox_linear_pool_factory::events::PoolCreated::match_and_decode(log)?; + + // Some(tycho::ProtocolComponent { + // id: hex::encode(&pool_created.pool), + // tokens: vec![create_call.main_token, create_call.wrapped_token], + // contracts: vec![pool_addr.into(), pool_created.pool], + // static_att: vec![ + // tycho::Attribute { + // name: "pool_type".into(), + // value: "GearboxLinearPoolFactory".into(), + // change: tycho::ChangeType::Creation.into(), + // }, + // tycho::Attribute { + // name: "upper_target".into(), + // value: create_call.upper_target.to_signed_bytes_be(), + // change: tycho::ChangeType::Creation.into(), + // }, + // ], + // change: tycho::ChangeType::Creation.into(), + // }) + // } + // ❌ The `ManagedPoolFactory` is a bit ✨ unique ✨, so we'll leave it commented out for now + // Take a look at it's `Create` call to see how the params are structured. + // hex!("BF904F9F340745B4f0c4702c7B6Ab1e808eA6b93") => { + // let create_call = abi::managed_pool_factory::functions::Create::match_and_decode(call)?; + // let pool_created = + // abi::managed_pool_factory::events::PoolCreated::match_and_decode(log)?; + + // Some(tycho::ProtocolComponent { + // id: hex::encode(&pool_created.pool), + // tokens: create_call.tokens, + // contracts: vec![pool_addr.into(), pool_created.pool], + // static_att: vec![ + // tycho::Attribute { + // name: "pool_type".into(), + // value: "ManagedPoolFactory".into(), + // change: tycho::ChangeType::Creation.into(), + // }, + // ], + // change: tycho::ChangeType::Creation.into(), + // }) + // } + hex!("4E11AEec21baF1660b1a46472963cB3DA7811C89") => { + let create_call = + abi::silo_linear_pool_factory::functions::Create::match_and_decode(call)?; + let pool_created = + abi::silo_linear_pool_factory::events::PoolCreated::match_and_decode(log)?; + + Some(tycho::ProtocolComponent { + id: hex::encode(&pool_created.pool), + tokens: vec![create_call.main_token, create_call.wrapped_token], + contracts: vec![pool_addr.into(), pool_created.pool], + static_att: vec![ + tycho::Attribute { + name: "pool_type".into(), + value: "SiloLinearPoolFactory".into(), + change: tycho::ChangeType::Creation.into(), + }, + tycho::Attribute { + name: "upper_target".into(), + value: create_call.upper_target.to_signed_bytes_be(), + change: tycho::ChangeType::Creation.into(), + }, + ], + change: tycho::ChangeType::Creation.into(), + }) + } + hex!("5F5222Ffa40F2AEd6380D022184D6ea67C776eE0") => { + let create_call = + abi::yearn_linear_pool_factory::functions::Create::match_and_decode(call)?; + let pool_created = + abi::yearn_linear_pool_factory::events::PoolCreated::match_and_decode(log)?; + + Some(tycho::ProtocolComponent { + id: hex::encode(&pool_created.pool), + tokens: vec![create_call.main_token, create_call.wrapped_token], + contracts: vec![pool_addr.into(), pool_created.pool], + static_att: vec![ + tycho::Attribute { + name: "pool_type".into(), + value: "YearnLinearPoolFactory".into(), + change: tycho::ChangeType::Creation.into(), + }, + tycho::Attribute { + name: "upper_target".into(), + value: create_call.upper_target.to_signed_bytes_be(), + change: tycho::ChangeType::Creation.into(), + }, + ], + change: tycho::ChangeType::Creation.into(), + }) + } + // The `WeightedPool2TokenFactory` is a deprecated contract but we've included it since one + // of the highest TVL pools, 80BAL-20WETH, is able to be tracked. + hex!("A5bf2ddF098bb0Ef6d120C98217dD6B141c74EE0") => { + let create_call = + abi::weighted_pool_tokens_factory::functions::Create::match_and_decode(call)?; + let pool_created = + abi::weighted_pool_tokens_factory::events::PoolCreated::match_and_decode(log)?; + + Some(tycho::ProtocolComponent { + id: hex::encode(&pool_created.pool), + tokens: create_call.tokens, + contracts: vec![pool_addr.into(), pool_created.pool], + static_att: vec![ + tycho::Attribute { + name: "pool_type".into(), + value: "WeightedPool2TokensFactory".into(), + change: tycho::ChangeType::Creation.into(), + }, + tycho::Attribute { + name: "weights".into(), + value: create_call.weights.serialize_bytes(), + change: tycho::ChangeType::Creation.into(), + }, + ], + change: tycho::ChangeType::Creation.into(), + }) + } + _ => None, + } +} diff --git a/substreams/ethereum-balancer/substreams.yaml b/substreams/ethereum-balancer/substreams.yaml index 98ea7e0..7006218 100644 --- a/substreams/ethereum-balancer/substreams.yaml +++ b/substreams/ethereum-balancer/substreams.yaml @@ -17,6 +17,22 @@ binaries: file: target/wasm32-unknown-unknown/release/substreams_balancer.wasm modules: + - name: map_pools_created + kind: map + initialBlock: 12369300 + inputs: + - source: sf.ethereum.type.v2.Block + output: + type: proto:tycho.evm.v1.GroupedTransactionProtocolComponents + + - name: store_pools_created + kind: store + initialBlock: 12369300 + updatePolicy: add + valueType: int64 + inputs: + - map: map_pools_created + - name: map_balance_deltas kind: map initialBlock: 12369300 # An arbitrary block that should change based on your requirements @@ -38,7 +54,9 @@ modules: initialBlock: 12369300 inputs: - source: sf.ethereum.type.v2.Block + - map: map_pools_created - map: map_balance_deltas + - store: store_pools_created - store: store_balance_changes mode: deltas # This is the key property that simplifies `BalanceChange` handling output: