From 2a1b83f6c5c317ac5e2f270904e9f944c9668f09 Mon Sep 17 00:00:00 2001 From: Florian Pellissier <111426680+flopell@users.noreply.github.com> Date: Tue, 12 Mar 2024 22:53:30 +0100 Subject: [PATCH] fix: index balance changes from `Swap` events --- substreams/ethereum-balancer/src/modules.rs | 90 +++++++++++++++------ 1 file changed, 65 insertions(+), 25 deletions(-) diff --git a/substreams/ethereum-balancer/src/modules.rs b/substreams/ethereum-balancer/src/modules.rs index f84a496..71f0e83 100644 --- a/substreams/ethereum-balancer/src/modules.rs +++ b/substreams/ethereum-balancer/src/modules.rs @@ -17,6 +17,7 @@ use itertools::Itertools; use pb::tycho::evm::v1::{self as tycho}; use contract_changes::extract_contract_changes; +use substreams_ethereum::Event; use crate::pb::balancer::{ BalanceDelta, BalanceDeltas, GroupedTransactionProtocolComponents, @@ -95,43 +96,82 @@ pub fn store_pools_created(map: GroupedTransactionProtocolComponents, store: Sto ); } -/// Since the `PoolBalanceChanged` events administer only deltas, we need to leverage a map and a +/// Since the `PoolBalanceChanged` and `Swap` events administer only deltas, we need to leverage a map and a /// store to be able to tally up final balances for tokens in a pool. #[substreams::handlers::map] pub fn map_balance_deltas( block: eth::v2::Block, store: StoreGetInt64, ) -> Result { - Ok(BalanceDeltas { - balance_deltas: block - .events::(&[VAULT_ADDRESS]) - .flat_map(|(event, log)| { - event - .tokens - .iter() - .zip(event.deltas.iter()) - .filter_map(|(token, delta)| { - let component_id = event.pool_id[..20].to_vec(); + let balance_deltas = block + .logs() + .filter(|log| log.address() == VAULT_ADDRESS) + .flat_map(|vault_log| { + let mut deltas = Vec::new(); - store.get_last(format!("pool:{0}", hex::encode(&component_id)))?; - - Some(BalanceDelta { - ord: log.log.ordinal, + if let Some(ev) = + abi::vault::events::PoolBalanceChanged::match_and_decode(vault_log.log) + { + let component_id = ev.pool_id[..20].to_vec(); + if store + .get_last(format!("pool:{}", hex::encode(&component_id))) + .is_some() + { + for (token, delta) in ev.tokens.iter().zip(ev.deltas.iter()) { + deltas.push(BalanceDelta { + ord: vault_log.ordinal(), tx: Some(tycho::Transaction { - hash: log.receipt.transaction.hash.clone(), - from: log.receipt.transaction.from.clone(), - to: log.receipt.transaction.to.clone(), - index: log.receipt.transaction.index.into(), + hash: vault_log.receipt.transaction.hash.clone(), + from: vault_log.receipt.transaction.from.clone(), + to: vault_log.receipt.transaction.to.clone(), + index: vault_log.receipt.transaction.index.into(), }), token: token.to_vec(), delta: delta.to_signed_bytes_be(), + component_id: component_id.clone(), + }); + } + } + } else if let Some(ev) = abi::vault::events::Swap::match_and_decode(vault_log.log) { + let component_id = ev.pool_id[..20].to_vec(); + if store + .get_last(format!("pool:{}", hex::encode(&component_id))) + .is_some() + { + deltas.extend_from_slice(&[ + BalanceDelta { + ord: vault_log.ordinal(), + tx: Some(tycho::Transaction { + hash: vault_log.receipt.transaction.hash.clone(), + from: vault_log.receipt.transaction.from.clone(), + to: vault_log.receipt.transaction.to.clone(), + index: vault_log.receipt.transaction.index.into(), + }), + token: ev.token_in.to_vec(), + delta: ev.amount_in.to_signed_bytes_be(), + component_id: component_id.clone(), + }, + BalanceDelta { + ord: vault_log.ordinal(), + tx: Some(tycho::Transaction { + hash: vault_log.receipt.transaction.hash.clone(), + from: vault_log.receipt.transaction.from.clone(), + to: vault_log.receipt.transaction.to.clone(), + index: vault_log.receipt.transaction.index.into(), + }), + token: ev.token_out.to_vec(), + delta: ev.amount_out.neg().to_signed_bytes_be(), component_id, - }) - }) - .collect::>() - }) - .collect::>(), - }) + }, + ]); + } + } + + deltas + }) + .collect::>(); + + Ok(BalanceDeltas { balance_deltas }) } /// It's significant to include both the `pool_id` and the `token_id` for each balance delta as the