diff --git a/substreams/crates/tycho-substreams/src/balances.rs b/substreams/crates/tycho-substreams/src/balances.rs index afda1ff..bed19af 100644 --- a/substreams/crates/tycho-substreams/src/balances.rs +++ b/substreams/crates/tycho-substreams/src/balances.rs @@ -88,7 +88,8 @@ pub fn store_balance_changes(deltas: BlockBalanceDeltas, store: impl StoreAdd, (Transaction, HashMap, BalanceChange>)>; +type TxAggregatedBalances = + HashMap, (Transaction, HashMap, HashMap, BalanceChange>>)>; /// Aggregates absolute balances per transaction and token. /// @@ -98,13 +99,14 @@ type TxAggregatedBalances = HashMap, (Transaction, HashMap, Bala /// /// This function reads absolute balance values from an additive store (see `store_balance_changes` /// for how to create such a store). It zips these values with the relative balance deltas to -/// associate balance values with tokens and components, ensuring the last balance change per token -/// per transaction is kept if there are multiple changes. Negative balances are set to 0, adhering -/// to the expectation that absolute balances must be non-negative. +/// associate balance values with tokens and components, ensuring the last balance change for each +/// unique combination of component, token, and transaction is kept if there are multiple changes. +/// Negative balances are set to 0, adhering to the expectation that absolute balances must be +/// non-negative. /// -/// Will keep the last balance change per token per transaction if there are multiple -/// changes. In case a balance ends up being negative, it will be clipped to 0 since -/// absolute balances are expected to be either zero or positive. +/// Will keep the last balance change for each unique combination of component, token, and +/// transaction if there are multiple changes. In case a balance ends up being negative, it will be +/// clipped to 0 since absolute balances are expected to be either zero or positive. /// /// ## Panics /// May panic if the store deltas values are not in the correct format. Values are @@ -153,11 +155,16 @@ pub fn aggregate_balances_changes( .into_iter() .map(|(txh, group)| { let (mut transactions, balance_changes): (Vec<_>, Vec<_>) = group.into_iter().unzip(); + let mut balances: HashMap, HashMap, BalanceChange>> = HashMap::new(); - let balances = balance_changes - .into_iter() - .map(|balance_change| (balance_change.token.clone(), balance_change)) - .collect(); + for balance_change in balance_changes.into_iter() { + let component_entry = balances + .entry(balance_change.component_id.clone()) + .or_default(); + + // Insert or overwrite the balance change for the specific token + component_entry.insert(balance_change.token.clone(), balance_change); + } (txh, (transactions.pop().unwrap(), balances)) }) .collect() @@ -420,26 +427,31 @@ mod tests { vec![0, 1], ( Transaction { hash: vec![0, 1], from: vec![9, 9], to: vec![8, 8], index: 0 }, - [ - ( - token_0.clone(), - BalanceChange { - token: token_0, - balance: BigInt::from(999) - .to_signed_bytes_be() - .to_vec(), - component_id: comp_id.clone(), - }, - ), - ( - token_1.clone(), - BalanceChange { - token: token_1, - balance: vec![150], - component_id: comp_id.clone(), - }, - ), - ] + [( + comp_id.clone(), + [ + ( + token_0.clone(), + BalanceChange { + token: token_0, + balance: BigInt::from(999) + .to_signed_bytes_be() + .to_vec(), + component_id: comp_id.clone(), + }, + ), + ( + token_1.clone(), + BalanceChange { + token: token_1, + balance: vec![150], + component_id: comp_id.clone(), + }, + ), + ] + .into_iter() + .collect::>(), + )] .into_iter() .collect::>(), ), diff --git a/substreams/ethereum-balancer/src/modules.rs b/substreams/ethereum-balancer/src/modules.rs index 89ef7d7..d0feaa3 100644 --- a/substreams/ethereum-balancer/src/modules.rs +++ b/substreams/ethereum-balancer/src/modules.rs @@ -208,7 +208,11 @@ pub fn map_protocol_changes( .or_insert_with(|| TransactionChangesBuilder::new(&tx)); balances .values() - .for_each(|bc| builder.add_balance_change(bc)); + .for_each(|token_bc_map| { + token_bc_map + .values() + .for_each(|bc| builder.add_balance_change(bc)) + }); }); // Extract and insert any storage changes that happened for any of the components. diff --git a/substreams/ethereum-sfraxeth/src/modules.rs b/substreams/ethereum-sfraxeth/src/modules.rs index 8638886..dee48c0 100644 --- a/substreams/ethereum-sfraxeth/src/modules.rs +++ b/substreams/ethereum-sfraxeth/src/modules.rs @@ -305,9 +305,13 @@ pub fn map_protocol_changes( let builder = transaction_changes .entry(tx.index) .or_insert_with(|| TransactionChangesBuilder::new(&tx)); - balances.values().for_each(|bc| { - builder.add_balance_change(bc); - }); + balances + .values() + .for_each(|token_bc_map| { + token_bc_map + .values() + .for_each(|bc| builder.add_balance_change(bc)) + }); }); // Extract and insert any storage changes that happened for any of the components.