fix: index balance changes from Swap events

This commit is contained in:
Florian Pellissier
2024-03-12 22:53:30 +01:00
parent 38dcd9d843
commit 2a1b83f6c5

View File

@@ -17,6 +17,7 @@ use itertools::Itertools;
use pb::tycho::evm::v1::{self as tycho};
use contract_changes::extract_contract_changes;
use substreams_ethereum::Event;
use crate::pb::balancer::{
BalanceDelta, BalanceDeltas, GroupedTransactionProtocolComponents,
@@ -95,43 +96,82 @@ pub fn store_pools_created(map: GroupedTransactionProtocolComponents, store: Sto
);
}
/// Since the `PoolBalanceChanged` events administer only deltas, we need to leverage a map and a
/// Since the `PoolBalanceChanged` and `Swap` events administer only deltas, we need to leverage a map and a
/// store to be able to tally up final balances for tokens in a pool.
#[substreams::handlers::map]
pub fn map_balance_deltas(
block: eth::v2::Block,
store: StoreGetInt64,
) -> Result<BalanceDeltas, anyhow::Error> {
Ok(BalanceDeltas {
balance_deltas: block
.events::<abi::vault::events::PoolBalanceChanged>(&[VAULT_ADDRESS])
.flat_map(|(event, log)| {
event
.tokens
.iter()
.zip(event.deltas.iter())
.filter_map(|(token, delta)| {
let component_id = event.pool_id[..20].to_vec();
let balance_deltas = block
.logs()
.filter(|log| log.address() == VAULT_ADDRESS)
.flat_map(|vault_log| {
let mut deltas = Vec::new();
store.get_last(format!("pool:{0}", hex::encode(&component_id)))?;
Some(BalanceDelta {
ord: log.log.ordinal,
if let Some(ev) =
abi::vault::events::PoolBalanceChanged::match_and_decode(vault_log.log)
{
let component_id = ev.pool_id[..20].to_vec();
if store
.get_last(format!("pool:{}", hex::encode(&component_id)))
.is_some()
{
for (token, delta) in ev.tokens.iter().zip(ev.deltas.iter()) {
deltas.push(BalanceDelta {
ord: vault_log.ordinal(),
tx: Some(tycho::Transaction {
hash: log.receipt.transaction.hash.clone(),
from: log.receipt.transaction.from.clone(),
to: log.receipt.transaction.to.clone(),
index: log.receipt.transaction.index.into(),
hash: vault_log.receipt.transaction.hash.clone(),
from: vault_log.receipt.transaction.from.clone(),
to: vault_log.receipt.transaction.to.clone(),
index: vault_log.receipt.transaction.index.into(),
}),
token: token.to_vec(),
delta: delta.to_signed_bytes_be(),
component_id: component_id.clone(),
});
}
}
} else if let Some(ev) = abi::vault::events::Swap::match_and_decode(vault_log.log) {
let component_id = ev.pool_id[..20].to_vec();
if store
.get_last(format!("pool:{}", hex::encode(&component_id)))
.is_some()
{
deltas.extend_from_slice(&[
BalanceDelta {
ord: vault_log.ordinal(),
tx: Some(tycho::Transaction {
hash: vault_log.receipt.transaction.hash.clone(),
from: vault_log.receipt.transaction.from.clone(),
to: vault_log.receipt.transaction.to.clone(),
index: vault_log.receipt.transaction.index.into(),
}),
token: ev.token_in.to_vec(),
delta: ev.amount_in.to_signed_bytes_be(),
component_id: component_id.clone(),
},
BalanceDelta {
ord: vault_log.ordinal(),
tx: Some(tycho::Transaction {
hash: vault_log.receipt.transaction.hash.clone(),
from: vault_log.receipt.transaction.from.clone(),
to: vault_log.receipt.transaction.to.clone(),
index: vault_log.receipt.transaction.index.into(),
}),
token: ev.token_out.to_vec(),
delta: ev.amount_out.neg().to_signed_bytes_be(),
component_id,
})
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>(),
})
},
]);
}
}
deltas
})
.collect::<Vec<_>>();
Ok(BalanceDeltas { balance_deltas })
}
/// It's significant to include both the `pool_id` and the `token_id` for each balance delta as the