refactor(balancer-substreams): remove pool_id static attr and use ProtocolComponent.id instead

This commit is contained in:
Florian Pellissier
2024-08-22 10:04:15 +02:00
parent 9c140a71af
commit 27768ce114
4 changed files with 193 additions and 210 deletions

View File

@@ -5,7 +5,7 @@ use std::collections::HashMap;
use substreams::{
hex,
pb::substreams::StoreDeltas,
store::{StoreAdd, StoreAddBigInt, StoreAddInt64, StoreGet, StoreGetInt64, StoreNew},
store::{StoreAddBigInt, StoreGet, StoreGetString, StoreNew, StoreSet, StoreSetString},
};
use substreams_ethereum::{pb::eth, Event};
use tycho_substreams::{
@@ -44,18 +44,17 @@ pub fn map_components(block: eth::v2::Block) -> Result<BlockTransactionProtocolC
})
}
/// Simply stores the `ProtocolComponent`s with the pool id as the key
/// Simply stores the `ProtocolComponent`s with the pool address as the key and the pool id as value
#[substreams::handlers::store]
pub fn store_components(map: BlockTransactionProtocolComponents, store: StoreAddInt64) {
store.add_many(
0,
&map.tx_components
.iter()
.flat_map(|tx_components| &tx_components.components)
.map(|component| format!("pool:{0}", component.id))
.collect::<Vec<_>>(),
1,
);
pub fn store_components(map: BlockTransactionProtocolComponents, store: StoreSetString) {
map.tx_components
.into_iter()
.for_each(|tx_pc| {
tx_pc
.components
.into_iter()
.for_each(|pc| store.set(0, format!("pool:{0}", &pc.id[..42]), &pc.id))
});
}
/// Since the `PoolBalanceChanged` and `Swap` events administer only deltas, we need to leverage a
@@ -63,7 +62,7 @@ pub fn store_components(map: BlockTransactionProtocolComponents, store: StoreAdd
#[substreams::handlers::map]
pub fn map_relative_balances(
block: eth::v2::Block,
store: StoreGetInt64,
store: StoreGetString,
) -> Result<BlockBalanceDeltas, anyhow::Error> {
let balance_deltas = block
.logs()
@@ -74,10 +73,10 @@ pub fn map_relative_balances(
if let Some(ev) =
abi::vault::events::PoolBalanceChanged::match_and_decode(vault_log.log)
{
let component_id = format!("0x{}", hex::encode(&ev.pool_id[..20]));
let component_id = format!("0x{}", hex::encode(ev.pool_id));
if store
.get_last(format!("pool:{}", component_id))
.get_last(format!("pool:{}", &component_id[..42]))
.is_some()
{
for (token, delta) in ev.tokens.iter().zip(ev.deltas.iter()) {
@@ -141,7 +140,7 @@ pub fn map_protocol_changes(
block: eth::v2::Block,
grouped_components: BlockTransactionProtocolComponents,
deltas: BlockBalanceDeltas,
components_store: StoreGetInt64,
components_store: StoreGetString,
balance_store: StoreDeltas, // Note, this map module is using the `deltas` mode for the store.
) -> Result<BlockChanges> {
// We merge contract changes by transaction (identified by transaction index) making it easy to
@@ -226,7 +225,11 @@ pub fn map_protocol_changes(
.for_each(|address| {
if address != VAULT_ADDRESS {
// We reconstruct the component_id from the address here
change.mark_component_as_updated(&format!("0x{}", hex::encode(address)))
let id = components_store
.get_last(format!("pool:0x{}", hex::encode(address)))
.unwrap(); // Shouldn't happen because we filter by known components in
// `extract_contract_changes_builder`
change.mark_component_as_updated(&id);
}
})
});