Run rustfmt

This commit is contained in:
kayibal
2024-03-14 00:27:52 +00:00
parent 425628ae97
commit 6b6e42e2f8
6 changed files with 92 additions and 83 deletions

View File

@@ -3,21 +3,21 @@
//! //!
//! To aggregate relative balances changes to absolute balances the general approach is: //! To aggregate relative balances changes to absolute balances the general approach is:
//! //!
//! 1. Use a map function that will extract a `BlockBalanceDeltas` message. BalanceDeltas //! 1. Use a map function that will extract a `BlockBalanceDeltas` message. BalanceDeltas within
//! within this message are required to have increasing ordinals so that //! this message are required to have increasing ordinals so that the order of relative balance
//! the order of relative balance changes is unambiguous. //! changes is unambiguous.
//! 2. Store the balances changes with a store handler. You can use the //! 2. Store the balances changes with a store handler. You can use the `store_balance_changes`
//! `store_balance_changes` library method directly for this. //! library method directly for this.
//! 3. In the output module, use aggregate_balance_changes to receive an //! 3. In the output module, use aggregate_balance_changes to receive an aggregated map of absolute
//! aggregated map of absolute balances. //! balances.
//!
use crate::pb::tycho::evm::v1::{BalanceChange, BlockBalanceDeltas, Transaction}; use crate::pb::tycho::evm::v1::{BalanceChange, BlockBalanceDeltas, Transaction};
use itertools::Itertools; use itertools::Itertools;
use std::collections::HashMap; use std::{collections::HashMap, str::FromStr};
use std::str::FromStr; use substreams::{
use substreams::key; key,
use substreams::pb::substreams::StoreDeltas; pb::substreams::StoreDeltas,
use substreams::prelude::{BigInt, StoreAdd}; prelude::{BigInt, StoreAdd},
};
/// Store relative balances changes in a additive manner. /// Store relative balances changes in a additive manner.
/// ///
@@ -25,9 +25,9 @@ use substreams::prelude::{BigInt, StoreAdd};
/// ///
/// ## Arguments /// ## Arguments
/// ///
/// * `deltas` - A `BlockBalanceDeltas` message containing the relative balances changes. /// * `deltas` - A `BlockBalanceDeltas` message containing the relative balances changes. Note:
/// Note: relative balance deltas must have strictly increasing ordinals per token /// relative balance deltas must have strictly increasing ordinals per token address, will panic
/// address, will panic otherwise. /// otherwise.
/// * `store` - An AddStore that will add relative balance changes. /// * `store` - An AddStore that will add relative balance changes.
/// ///
/// This method is meant to be used in combination with `aggregate_balances_changes` /// This method is meant to be used in combination with `aggregate_balances_changes`
@@ -68,8 +68,7 @@ type TxAggregatedBalances = HashMap<Vec<u8>, (Transaction, HashMap<Vec<u8>, Bala
/// Aggregates absolute balances per transaction and token. /// Aggregates absolute balances per transaction and token.
/// ///
/// ## Arguments /// ## Arguments
/// * `balance_store` - A `StoreDeltas` with all changes that occured in the source /// * `balance_store` - A `StoreDeltas` with all changes that occured in the source store module.
/// store module.
/// * `deltas` - A `BlockBalanceDeltas` message containing the relative balances changes. /// * `deltas` - A `BlockBalanceDeltas` message containing the relative balances changes.
/// ///
/// Reads absolute balance values from the additive store (see `store_balance_changes` /// Reads absolute balance values from the additive store (see `store_balance_changes`
@@ -127,10 +126,14 @@ pub fn aggregate_balances_changes(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::mock_store::MockStore; use crate::{
use crate::pb::tycho::evm::v1::{BalanceDelta, Transaction}; mock_store::MockStore,
use substreams::pb::substreams::StoreDelta; pb::tycho::evm::v1::{BalanceDelta, Transaction},
use substreams::prelude::{StoreGet, StoreNew}; };
use substreams::{
pb::substreams::StoreDelta,
prelude::{StoreGet, StoreNew},
};
fn block_balance_deltas() -> BlockBalanceDeltas { fn block_balance_deltas() -> BlockBalanceDeltas {
let comp_id = "0x42c0ffee" let comp_id = "0x42c0ffee"

View File

@@ -10,9 +10,10 @@
/// more [here](https://streamingfastio.medium.com/new-block-model-to-accelerate-chain-integration-9f65126e5425) /// more [here](https://streamingfastio.medium.com/new-block-model-to-accelerate-chain-integration-9f65126e5425)
use std::collections::HashMap; use std::collections::HashMap;
use substreams_ethereum::pb::eth; use substreams_ethereum::pb::{
use substreams_ethereum::pb::eth::v2::block::DetailLevel; eth,
use substreams_ethereum::pb::eth::v2::StorageChange; eth::v2::{block::DetailLevel, StorageChange},
};
use crate::pb::tycho::evm::v1::{self as tycho}; use crate::pb::tycho::evm::v1::{self as tycho};
@@ -78,7 +79,8 @@ impl From<InterimContractChange> for tycho::ContractChange {
/// ## Arguments /// ## Arguments
/// ///
/// * `block` - The block to extract changes from. Must be the extended block model. /// * `block` - The block to extract changes from. Must be the extended block model.
/// * `inclusion_predicate` - A predicate function that determines if a contract address is relevant. /// * `inclusion_predicate` - A predicate function that determines if a contract address is
/// relevant.
/// * `transaction_contract_changes` - A mutable map to store the contract changes in. /// * `transaction_contract_changes` - A mutable map to store the contract changes in.
/// ///
/// ## Panics /// ## Panics
@@ -189,9 +191,9 @@ pub fn extract_contract_changes<F: Fn(&[u8]) -> bool>(
.extend_from_slice(&code_change.new_code); .extend_from_slice(&code_change.new_code);
}); });
if !storage_changes.is_empty() if !storage_changes.is_empty() ||
|| !balance_changes.is_empty() !balance_changes.is_empty() ||
|| !code_changes.is_empty() !code_changes.is_empty()
{ {
transaction_contract_changes transaction_contract_changes
.entry(block_tx.index.into()) .entry(block_tx.index.into())

View File

@@ -1,11 +1,11 @@
//! Contains a mock store for internal testing. //! Contains a mock store for internal testing.
//! //!
//! Might make this public alter to users can test their store handlers. //! Might make this public alter to users can test their store handlers.
use std::cell::RefCell; use std::{cell::RefCell, collections::HashMap, rc::Rc};
use std::collections::HashMap; use substreams::{
use std::rc::Rc; prelude::{BigInt, StoreDelete, StoreGet, StoreNew},
use substreams::prelude::{BigInt, StoreDelete, StoreGet, StoreNew}; store::StoreAdd,
use substreams::store::StoreAdd; };
type BigIntStore = HashMap<String, Vec<(u64, BigInt)>>; type BigIntStore = HashMap<String, Vec<(u64, BigInt)>>;

View File

@@ -2,16 +2,15 @@ use crate::{abi, pool_factories};
use anyhow::Result; use anyhow::Result;
use itertools::Itertools; use itertools::Itertools;
use std::collections::HashMap; use std::collections::HashMap;
use substreams::hex; use substreams::{
use substreams::pb::substreams::StoreDeltas; hex,
use substreams::store::{ pb::substreams::StoreDeltas,
StoreAdd, StoreAddBigInt, StoreAddInt64, StoreGet, StoreGetInt64, StoreNew, store::{StoreAdd, StoreAddBigInt, StoreAddInt64, StoreGet, StoreGetInt64, StoreNew},
};
use substreams_ethereum::{pb::eth, Event};
use tycho_substreams::{
balances::aggregate_balances_changes, contract::extract_contract_changes, prelude::*,
}; };
use substreams_ethereum::pb::eth;
use substreams_ethereum::Event;
use tycho_substreams::balances::aggregate_balances_changes;
use tycho_substreams::contract::extract_contract_changes;
use tycho_substreams::prelude::*;
const VAULT_ADDRESS: &[u8] = &hex!("BA12222222228d8Ba445958a75a0704d566BF2C8"); const VAULT_ADDRESS: &[u8] = &hex!("BA12222222228d8Ba445958a75a0704d566BF2C8");
@@ -37,10 +36,7 @@ pub fn map_pools_created(block: eth::v2::Block) -> Result<BlockTransactionProtoc
.collect::<Vec<_>>(); .collect::<Vec<_>>();
if !components.is_empty() { if !components.is_empty() {
Some(TransactionProtocolComponents { Some(TransactionProtocolComponents { tx: Some(tx.into()), components })
tx: Some(tx.into()),
components,
})
} else { } else {
None None
} }
@@ -63,8 +59,8 @@ pub fn store_pools_created(map: BlockTransactionProtocolComponents, store: Store
); );
} }
/// Since the `PoolBalanceChanged` and `Swap` events administer only deltas, we need to leverage a map and a /// Since the `PoolBalanceChanged` and `Swap` events administer only deltas, we need to leverage a
/// store to be able to tally up final balances for tokens in a pool. /// map and a store to be able to tally up final balances for tokens in a pool.
#[substreams::handlers::map] #[substreams::handlers::map]
pub fn map_balance_deltas( pub fn map_balance_deltas(
block: eth::v2::Block, block: eth::v2::Block,
@@ -79,12 +75,10 @@ pub fn map_balance_deltas(
if let Some(ev) = if let Some(ev) =
abi::vault::events::PoolBalanceChanged::match_and_decode(vault_log.log) abi::vault::events::PoolBalanceChanged::match_and_decode(vault_log.log)
{ {
let component_id = format!( let component_id =
"0x{}", format!("0x{}", String::from_utf8(ev.pool_id[..20].to_vec()).unwrap())
String::from_utf8(ev.pool_id[..20].to_vec()).unwrap() .as_bytes()
) .to_vec();
.as_bytes()
.to_vec();
if store if store
.get_last(format!("pool:{}", hex::encode(&component_id))) .get_last(format!("pool:{}", hex::encode(&component_id)))
@@ -101,12 +95,10 @@ pub fn map_balance_deltas(
} }
} }
} else if let Some(ev) = abi::vault::events::Swap::match_and_decode(vault_log.log) { } else if let Some(ev) = abi::vault::events::Swap::match_and_decode(vault_log.log) {
let component_id = format!( let component_id =
"0x{}", format!("0x{}", String::from_utf8(ev.pool_id[..20].to_vec()).unwrap())
String::from_utf8(ev.pool_id[..20].to_vec()).unwrap() .as_bytes()
) .to_vec();
.as_bytes()
.to_vec();
if store if store
.get_last(format!("pool:{}", hex::encode(&component_id))) .get_last(format!("pool:{}", hex::encode(&component_id)))
@@ -149,8 +141,8 @@ pub fn store_balance_changes(deltas: BlockBalanceDeltas, store: StoreAddBigInt)
/// Every contract change is grouped by transaction index via the `transaction_contract_changes` /// Every contract change is grouped by transaction index via the `transaction_contract_changes`
/// map. Each block of code will extend the `TransactionContractChanges` struct with the /// map. Each block of code will extend the `TransactionContractChanges` struct with the
/// cooresponding changes (balance, component, contract), inserting a new one if it doesn't exist. /// cooresponding changes (balance, component, contract), inserting a new one if it doesn't exist.
/// At the very end, the map can easily be sorted by index to ensure the final `BlockContractChanges` /// At the very end, the map can easily be sorted by index to ensure the final
/// is ordered by transactions properly. /// `BlockContractChanges` is ordered by transactions properly.
#[substreams::handlers::map] #[substreams::handlers::map]
pub fn map_changes( pub fn map_changes(
block: eth::v2::Block, block: eth::v2::Block,
@@ -178,9 +170,9 @@ pub fn map_changes(
}); });
// Balance changes are gathered by the `StoreDelta` based on `PoolBalanceChanged` creating // Balance changes are gathered by the `StoreDelta` based on `PoolBalanceChanged` creating
// `BlockBalanceDeltas`. We essentially just process the changes that occurred to the `store` this // `BlockBalanceDeltas`. We essentially just process the changes that occurred to the `store`
// block. Then, these balance changes are merged onto the existing map of tx contract changes, // this block. Then, these balance changes are merged onto the existing map of tx contract
// inserting a new one if it doesn't exist. // changes, inserting a new one if it doesn't exist.
aggregate_balances_changes(balance_store, deltas) aggregate_balances_changes(balance_store, deltas)
.into_iter() .into_iter()
.for_each(|(_, (tx, balances))| { .for_each(|(_, (tx, balances))| {
@@ -210,9 +202,9 @@ pub fn map_changes(
.drain() .drain()
.sorted_unstable_by_key(|(index, _)| *index) .sorted_unstable_by_key(|(index, _)| *index)
.filter_map(|(_, change)| { .filter_map(|(_, change)| {
if change.contract_changes.is_empty() if change.contract_changes.is_empty() &&
&& change.component_changes.is_empty() change.component_changes.is_empty() &&
&& change.balance_changes.is_empty() change.balance_changes.is_empty()
{ {
None None
} else { } else {

View File

@@ -1,8 +1,9 @@
use crate::abi; use crate::abi;
use substreams::hex; use substreams::{hex, scalar::BigInt};
use substreams::scalar::BigInt; use substreams_ethereum::{
use substreams_ethereum::pb::eth::v2::{Call, Log}; pb::eth::v2::{Call, Log},
use substreams_ethereum::{Event, Function}; Event, Function,
};
use tycho_substreams::prelude::*; use tycho_substreams::prelude::*;
/// This trait defines some helpers for serializing and deserializing `Vec<BigInt` which is needed /// This trait defines some helpers for serializing and deserializing `Vec<BigInt` which is needed
@@ -59,7 +60,9 @@ pub fn address_map(
("pool_type", "WeightedPoolFactory".as_bytes()), ("pool_type", "WeightedPoolFactory".as_bytes()),
( (
"normalized_weights", "normalized_weights",
&create_call.normalized_weights.serialize_bytes(), &create_call
.normalized_weights
.serialize_bytes(),
), ),
]) ])
.as_swap_type("balancer_pool", ImplementationType::Vm), .as_swap_type("balancer_pool", ImplementationType::Vm),
@@ -91,7 +94,9 @@ pub fn address_map(
("pool_type", "ERC4626LinearPoolFactory".as_bytes()), ("pool_type", "ERC4626LinearPoolFactory".as_bytes()),
( (
"upper_target", "upper_target",
&create_call.upper_target.to_signed_bytes_be(), &create_call
.upper_target
.to_signed_bytes_be(),
), ),
]) ])
.as_swap_type("balancer_pool", ImplementationType::Vm), .as_swap_type("balancer_pool", ImplementationType::Vm),
@@ -110,7 +115,9 @@ pub fn address_map(
("pool_type", "EulerLinearPoolFactory".as_bytes()), ("pool_type", "EulerLinearPoolFactory".as_bytes()),
( (
"upper_target", "upper_target",
&create_call.upper_target.to_signed_bytes_be(), &create_call
.upper_target
.to_signed_bytes_be(),
), ),
]) ])
.as_swap_type("balancer_pool", ImplementationType::Vm), .as_swap_type("balancer_pool", ImplementationType::Vm),
@@ -142,10 +149,11 @@ pub fn address_map(
// change: tycho::ChangeType::Creation.into(), // change: tycho::ChangeType::Creation.into(),
// }) // })
// } // }
// ❌ The `ManagedPoolFactory` is a bit ✨ unique ✨, so we'll leave it commented out for now // ❌ The `ManagedPoolFactory` is a bit ✨ unique ✨, so we'll leave it commented out for
// Take a look at it's `Create` call to see how the params are structured. // now Take a look at it's `Create` call to see how the params are structured.
// hex!("BF904F9F340745B4f0c4702c7B6Ab1e808eA6b93") => { // hex!("BF904F9F340745B4f0c4702c7B6Ab1e808eA6b93") => {
// let create_call = abi::managed_pool_factory::functions::Create::match_and_decode(call)?; // let create_call =
// abi::managed_pool_factory::functions::Create::match_and_decode(call)?;
// let pool_created = // let pool_created =
// abi::managed_pool_factory::events::PoolCreated::match_and_decode(log)?; // abi::managed_pool_factory::events::PoolCreated::match_and_decode(log)?;
@@ -176,7 +184,9 @@ pub fn address_map(
("pool_type", "SiloLinearPoolFactory".as_bytes()), ("pool_type", "SiloLinearPoolFactory".as_bytes()),
( (
"upper_target", "upper_target",
&create_call.upper_target.to_signed_bytes_be(), &create_call
.upper_target
.to_signed_bytes_be(),
), ),
]) ])
.as_swap_type("balancer_pool", ImplementationType::Vm), .as_swap_type("balancer_pool", ImplementationType::Vm),
@@ -195,7 +205,9 @@ pub fn address_map(
("pool_type", "YearnLinearPoolFactory".as_bytes()), ("pool_type", "YearnLinearPoolFactory".as_bytes()),
( (
"upper_target", "upper_target",
&create_call.upper_target.to_signed_bytes_be(), &create_call
.upper_target
.to_signed_bytes_be(),
), ),
]) ])
.as_swap_type("balancer_pool", ImplementationType::Vm), .as_swap_type("balancer_pool", ImplementationType::Vm),

View File

@@ -9,6 +9,6 @@ trailing_semicolon = false
use_field_init_shorthand = true use_field_init_shorthand = true
chain_width = 40 chain_width = 40
ignore = [ ignore = [
"*/pb", "crates/tycho-substreams/src/pb",
"*/abi", "ethereum-balancer/src/abi",
] ]