use std::collections::HashMap; use anyhow::Result; use itertools::Itertools; use substreams::{ pb::substreams::StoreDeltas, scalar::BigInt, store::{StoreAddBigInt, StoreGet, StoreGetString, StoreNew, StoreSet, StoreSetString}, }; use substreams_ethereum::pb::eth; use crate::{ pool_changes::{emit_deltas, emit_eth_deltas}, pool_factories, pools::emit_specific_pools, }; use tycho_substreams::{ balances::store_balance_changes, contract::extract_contract_changes, prelude::*, }; /// This struct purely exists to spoof the `PartialEq` trait for `Transaction` so we can use it in /// a later groupby operation. #[derive(Debug)] struct TransactionWrapper(Transaction); impl PartialEq for TransactionWrapper { fn eq(&self, other: &Self) -> bool { self.0.hash == other.0.hash } } #[substreams::handlers::map] pub fn map_components( params: String, block: eth::v2::Block, ) -> Result { // Gather contract changes by indexing `PoolCreated` events and analysing the `Create` call // We store these as a hashmap by tx hash since we need to agg by tx hash later Ok(BlockTransactionProtocolComponents { tx_components: block .transactions() .filter_map(|tx| { let mut components = tx .logs_with_calls() .filter(|(_, call)| !call.call.state_reverted) .filter_map(|(log, call)| { Some(pool_factories::address_map( call.call .address .as_slice() .try_into() .ok()?, // this shouldn't fail log, call.call, tx, )?) }) .collect::>(); if let Some(component) = emit_specific_pools(¶ms, &tx).expect( "An unexpected error occured when parsing params for emitting specific pools", ) { components.push(component) } if !components.is_empty() { Some(TransactionProtocolComponents { tx: Some(Transaction { hash: tx.hash.clone(), from: tx.from.clone(), to: tx.to.clone(), index: Into::::into(tx.index), }), components, }) } else { None } }) .collect::>(), }) } /// Simply stores the `ProtocolComponent`s with the pool id as the key and tokens as the value #[substreams::handlers::store] pub fn store_component_tokens(map: BlockTransactionProtocolComponents, store: StoreSetString) { map.tx_components .iter() .flat_map(|tx_components| &tx_components.components) .for_each(|component| { store.set( 0, format!("pool:{0}", component.id), &component .tokens .iter() .map(|token| hex::encode(token)) .join(":".into()), ); }); } /// Since the `PoolBalanceChanged` events administer only deltas, we need to leverage a map and a /// store to be able to tally up final balances for tokens in a pool. #[substreams::handlers::map] pub fn map_relative_balances( block: eth::v2::Block, tokens_store: StoreGetString, ) -> Result { Ok(BlockBalanceDeltas { balance_deltas: { block .transactions() .into_iter() .flat_map(|tx| { emit_eth_deltas(&tx, &tokens_store) .into_iter() .chain(emit_deltas(&tx, &tokens_store)) }) .collect::>() }, }) } /// It's significant to include both the `pool_id` and the `token_id` for each balance delta as the /// store key to ensure that there's a unique balance being tallied for each. #[substreams::handlers::store] pub fn store_balances(deltas: BlockBalanceDeltas, store: StoreAddBigInt) { store_balance_changes(deltas, store) } /// This is the main map that handles most of the indexing of this substream. /// Every contract change is grouped by transaction index via the `transaction_contract_changes` /// map. Each block of code will extend the `TransactionContractChanges` struct with the /// cooresponding changes (balance, component, contract), inserting a new one if it doesn't exist. /// At the very end, the map can easily be sorted by index to ensure the final /// `BlockContractChanges` is ordered by transactions properly. #[substreams::handlers::map] pub fn map_protocol_changes( block: eth::v2::Block, grouped_components: BlockTransactionProtocolComponents, deltas: BlockBalanceDeltas, components_store: StoreGetString, balance_store: StoreDeltas, // Note, this map module is using the `deltas` mode for the store. ) -> Result { // We merge contract changes by transaction (identified by transaction index) making it easy to // sort them at the very end. let mut transaction_contract_changes: HashMap<_, TransactionContractChanges> = HashMap::new(); // `ProtocolComponents` are gathered from `map_pools_created` which just need a bit of work to // convert into `TransactionContractChanges` grouped_components .tx_components .into_iter() .for_each(|tx_component| { let tx = tx_component.tx.as_ref().unwrap(); transaction_contract_changes .entry(tx.index) .or_insert_with(|| TransactionContractChanges { tx: Some(tx.clone()), contract_changes: vec![], component_changes: vec![], balance_changes: vec![], }) .component_changes .extend_from_slice( &(tx_component .components .into_iter() .map(|mut component| { component.id = format!("0x{}", component.id); component }) .collect::>()), ); }); // Balance changes are gathered by the `StoreDelta` based on `TokenExchange`, etc. creating // `BalanceDeltas`. We essentially just process the changes that occured to the `store` this // block. Then, these balance changes are merged onto the existing map of tx contract changes, // inserting a new one if it doesn't exist. balance_store .deltas .into_iter() .zip(deltas.balance_deltas) .map(|(store_delta, balance_delta)| { let new_value_string = String::from_utf8(store_delta.new_value) .unwrap() .to_string(); ( balance_delta.tx.unwrap(), BalanceChange { token: balance_delta.token, balance: BigInt::try_from(new_value_string) .unwrap() .to_signed_bytes_be(), component_id: format!( "0x{}", String::from_utf8(balance_delta.component_id).unwrap() ) .into(), }, ) }) // We need to group the balance changes by tx hash for the `TransactionContractChanges` agg .chunk_by(|(tx, _)| TransactionWrapper(tx.clone())) .into_iter() .for_each(|(tx_wrapped, group)| { let tx = tx_wrapped.0; transaction_contract_changes .entry(tx.index) .or_insert_with(|| TransactionContractChanges { tx: Some(tx.clone()), contract_changes: vec![], component_changes: vec![], balance_changes: vec![], }) .balance_changes .extend(group.map(|(_, change)| change)); }); // General helper for extracting contract changes. Uses block, our component store which holds // all of our tracked deployed pool addresses, and the map of tx contract changes which we // output into for final processing later. extract_contract_changes( &block, |addr| { components_store .get_last(format!("pool:{0}", hex::encode(addr))) .is_some() }, &mut transaction_contract_changes, ); // Process all `transaction_contract_changes` for final output in the `BlockContractChanges`, // sorted by transaction index (the key). Ok(BlockContractChanges { block: Some(Block { number: block.number, hash: block.hash.clone(), parent_hash: block .header .as_ref() .expect("Block header not present") .parent_hash .clone(), ts: block.timestamp_seconds(), }), changes: transaction_contract_changes .drain() .sorted_unstable_by_key(|(index, _)| index.clone()) .filter_map(|(_, change)| { if change.contract_changes.is_empty() && change.component_changes.is_empty() && change.balance_changes.is_empty() { None } else { Some(change) } }) .collect::>(), }) }