diff --git a/substreams/crates/tycho-substreams/src/contract.rs b/substreams/crates/tycho-substreams/src/contract.rs index 250da36..7a00283 100644 --- a/substreams/crates/tycho-substreams/src/contract.rs +++ b/substreams/crates/tycho-substreams/src/contract.rs @@ -83,9 +83,8 @@ impl From for tycho::ContractChange { /// model. /// * `inclusion_predicate` - A closure that determines if a contract's address is of interest for /// the collection of changes. Only contracts satisfying this predicate are included. -/// * `transaction_contract_changes` - A mutable reference to a map where extracted contract changes -/// are stored. Keyed by transaction index, it aggregates changes into -/// `tycho::TransactionContractChanges`. +/// * `transaction_changes` - A mutable reference to a map where extracted contract changes are +/// stored. Keyed by transaction index, it aggregates changes into `tycho::TransactionChanges`. /// /// ## Panics /// Panics if the provided block is not an extended block model, as indicated by its detail level. @@ -94,7 +93,7 @@ impl From for tycho::ContractChange { /// The function iterates over transactions and their calls within the block, collecting contract /// changes (storage, balance, code) that pass the inclusion predicate. Changes are then sorted by /// their ordinals to maintain the correct sequence of events. Aggregated changes for each contract -/// are stored in `transaction_contract_changes`, categorized by transaction index. +/// are stored in `transaction_changes`, categorized by transaction index. /// /// Contracts created within the block are tracked to differentiate between new and existing /// contracts. The aggregation process respects transaction boundaries, ensuring that changes are @@ -102,7 +101,7 @@ impl From for tycho::ContractChange { pub fn extract_contract_changes bool>( block: ð::v2::Block, inclusion_predicate: F, - transaction_contract_changes: &mut HashMap, + transaction_changes: &mut HashMap, ) { if block.detail_level != Into::::into(DetailLevel::DetaillevelExtended) { panic!("Only extended blocks are supported"); @@ -209,9 +208,9 @@ pub fn extract_contract_changes bool>( !balance_changes.is_empty() || !code_changes.is_empty() { - transaction_contract_changes + transaction_changes .entry(block_tx.index.into()) - .or_insert_with(|| tycho::TransactionContractChanges::new(&(block_tx.into()))) + .or_insert_with(|| tycho::TransactionChanges::new(&(block_tx.into()))) .contract_changes .extend( changed_contracts diff --git a/substreams/crates/tycho-substreams/src/models.rs b/substreams/crates/tycho-substreams/src/models.rs index fa0fa07..a176d2d 100644 --- a/substreams/crates/tycho-substreams/src/models.rs +++ b/substreams/crates/tycho-substreams/src/models.rs @@ -6,12 +6,14 @@ pub use crate::pb::tycho::evm::v1::*; impl TransactionContractChanges { /// Creates a new empty `TransactionContractChanges` instance. pub fn new(tx: &Transaction) -> Self { - Self { - tx: Some(tx.clone()), - contract_changes: vec![], - component_changes: vec![], - balance_changes: vec![], - } + Self { tx: Some(tx.clone()), ..Default::default() } + } +} + +impl TransactionChanges { + /// Creates a new empty `TransactionChanges` instance. + pub fn new(tx: &Transaction) -> Self { + Self { tx: Some(tx.clone()), ..Default::default() } } } diff --git a/substreams/ethereum-balancer/src/modules.rs b/substreams/ethereum-balancer/src/modules.rs index 565c672..62fcdf6 100644 --- a/substreams/ethereum-balancer/src/modules.rs +++ b/substreams/ethereum-balancer/src/modules.rs @@ -24,7 +24,6 @@ pub fn map_components(block: eth::v2::Block) -> Result Result { +) -> Result { // We merge contract changes by transaction (identified by transaction index) making it easy to // sort them at the very end. - let mut transaction_contract_changes: HashMap<_, TransactionContractChanges> = HashMap::new(); + let mut transaction_changes: HashMap<_, TransactionChanges> = HashMap::new(); // `ProtocolComponents` are gathered from `map_pools_created` which just need a bit of work to - // convert into `TransactionContractChanges` + // convert into `TransactionChanges` grouped_components .tx_components .iter() .for_each(|tx_component| { let tx = tx_component.tx.as_ref().unwrap(); - transaction_contract_changes + transaction_changes .entry(tx.index) - .or_insert_with(|| TransactionContractChanges::new(tx)) + .or_insert_with(|| TransactionChanges::new(tx)) .component_changes .extend_from_slice(&tx_component.components); }); + block + .transactions() + .flat_map(|tx| { + let components = tx + .logs_with_calls() + .filter(|(log, _)| log.address == VAULT_ADDRESS) + .filter_map(|(log, _)| { + let registered = abi::vault::events::PoolRegistered::match_and_decode(log)?; + substreams::log::info!("{:?}", log); + Some(( + tx.clone(), + EntityChanges { + component_id: hex::encode(registered.pool_address), + attributes: vec![ + Attribute { + name: "pool_id".to_string(), + value: format!("0x{}", hex::encode(registered.pool_id)) + .as_bytes() + .to_vec(), + change: ChangeType::Update.into(), + }, + Attribute { + name: "balance_owner".to_string(), + value: "0xBA12222222228d8Ba445958a75a0704d566BF2C8" + .to_string() + .as_bytes() + .to_vec(), + change: ChangeType::Creation.into(), + }, + ], + }, + )) + }); + components + }) + .for_each(|(tx, state_change)| { + transaction_changes + .entry(tx.index.into()) + .or_insert_with(|| TransactionChanges::new(&(&tx).into())) + .entity_changes + .push(state_change); + }); + // Balance changes are gathered by the `StoreDelta` based on `PoolBalanceChanged` creating // `BlockBalanceDeltas`. We essentially just process the changes that occurred to the `store` // this block. Then, these balance changes are merged onto the existing map of tx contract @@ -170,9 +212,9 @@ pub fn map_protocol_changes( aggregate_balances_changes(balance_store, deltas) .into_iter() .for_each(|(_, (tx, balances))| { - transaction_contract_changes + transaction_changes .entry(tx.index) - .or_insert_with(|| TransactionContractChanges::new(&tx)) + .or_insert_with(|| TransactionChanges::new(&tx)) .balance_changes .extend(balances.into_values()); }); @@ -185,20 +227,21 @@ pub fn map_protocol_changes( .get_last(format!("pool:0x{0}", hex::encode(addr))) .is_some() }, - &mut transaction_contract_changes, + &mut transaction_changes, ); - // Process all `transaction_contract_changes` for final output in the `BlockContractChanges`, + // Process all `transaction_changes` for final output in the `BlockChanges`, // sorted by transaction index (the key). - Ok(BlockContractChanges { + Ok(BlockChanges { block: Some((&block).into()), - changes: transaction_contract_changes + changes: transaction_changes .drain() .sorted_unstable_by_key(|(index, _)| *index) .filter_map(|(_, change)| { if change.contract_changes.is_empty() && change.component_changes.is_empty() && - change.balance_changes.is_empty() + change.balance_changes.is_empty() && + change.entity_changes.is_empty() { None } else { diff --git a/substreams/ethereum-balancer/substreams.yaml b/substreams/ethereum-balancer/substreams.yaml index 99810e2..5d61749 100644 --- a/substreams/ethereum-balancer/substreams.yaml +++ b/substreams/ethereum-balancer/substreams.yaml @@ -23,7 +23,7 @@ modules: inputs: - source: sf.ethereum.type.v2.Block output: - type: proto:tycho.evm.v1.GroupedTransactionProtocolComponents + type: proto:tycho.evm.v1.BlockTransactionProtocolComponents - name: store_components kind: store @@ -40,7 +40,7 @@ modules: - source: sf.ethereum.type.v2.Block - store: store_components output: - type: proto:tycho.evm.v1.BalanceDeltas + type: proto:tycho.evm.v1.BlockBalanceDeltas - name: store_balances kind: store @@ -61,4 +61,4 @@ modules: - store: store_balances mode: deltas # This is the key property that simplifies `BalanceChange` handling output: - type: proto:tycho.evm.v1.BlockContractChanges + type: proto:tycho.evm.v1.BlockChanges