fix(balancer): miscellaneous improvements before resync (#104)
* fix(balancer): ignore self balance change Euler pool emit a balance change for the pool itself. We don't want to have it because it's an unknown token from Tycho's perspective. example: https://etherscan.io/tx/0x4a9ea683052afefdae3d189862868c3a7dc8f431d1d9828b6bfd9451a8816426#eventlog#338 * refactor(balancer): rename balancer module to balancer-v2 * ci: make clippy happy --------- Co-authored-by: zizou <111426680+flopell@users.noreply.github.com>
This commit is contained in:
267
substreams/ethereum-balancer-v2/src/modules.rs
Normal file
267
substreams/ethereum-balancer-v2/src/modules.rs
Normal file
@@ -0,0 +1,267 @@
|
||||
use crate::{abi, pool_factories};
|
||||
use anyhow::Result;
|
||||
use itertools::Itertools;
|
||||
use std::collections::HashMap;
|
||||
use substreams::{
|
||||
hex,
|
||||
pb::substreams::StoreDeltas,
|
||||
store::{StoreAddBigInt, StoreGet, StoreGetString, StoreNew, StoreSet, StoreSetString},
|
||||
};
|
||||
use substreams_ethereum::{pb::eth, Event};
|
||||
use tycho_substreams::{
|
||||
balances::aggregate_balances_changes, contract::extract_contract_changes_builder, prelude::*,
|
||||
};
|
||||
|
||||
pub const VAULT_ADDRESS: &[u8] = &hex!("BA12222222228d8Ba445958a75a0704d566BF2C8");
|
||||
|
||||
#[substreams::handlers::map]
|
||||
pub fn map_components(block: eth::v2::Block) -> Result<BlockTransactionProtocolComponents> {
|
||||
// Gather contract changes by indexing `PoolCreated` events and analysing the `Create` call
|
||||
// We store these as a hashmap by tx hash since we need to agg by tx hash later
|
||||
Ok(BlockTransactionProtocolComponents {
|
||||
tx_components: block
|
||||
.transactions()
|
||||
.filter_map(|tx| {
|
||||
let components = tx
|
||||
.logs_with_calls()
|
||||
.filter_map(|(log, call)| {
|
||||
pool_factories::address_map(
|
||||
call.call.address.as_slice(),
|
||||
log,
|
||||
call.call,
|
||||
tx,
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if !components.is_empty() {
|
||||
Some(TransactionProtocolComponents { tx: Some(tx.into()), components })
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Simply stores the `ProtocolComponent`s with the pool address as the key and the pool id as value
|
||||
#[substreams::handlers::store]
|
||||
pub fn store_components(map: BlockTransactionProtocolComponents, store: StoreSetString) {
|
||||
map.tx_components
|
||||
.into_iter()
|
||||
.for_each(|tx_pc| {
|
||||
tx_pc
|
||||
.components
|
||||
.into_iter()
|
||||
.for_each(|pc| store.set(0, format!("pool:{0}", &pc.id[..42]), &pc.id))
|
||||
});
|
||||
}
|
||||
|
||||
/// Since the `PoolBalanceChanged` and `Swap` events administer only deltas, we need to leverage a
|
||||
/// map and a store to be able to tally up final balances for tokens in a pool.
|
||||
#[substreams::handlers::map]
|
||||
pub fn map_relative_balances(
|
||||
block: eth::v2::Block,
|
||||
store: StoreGetString,
|
||||
) -> Result<BlockBalanceDeltas, anyhow::Error> {
|
||||
let balance_deltas = block
|
||||
.logs()
|
||||
.filter(|log| log.address() == VAULT_ADDRESS)
|
||||
.flat_map(|vault_log| {
|
||||
let mut deltas = Vec::new();
|
||||
|
||||
if let Some(ev) =
|
||||
abi::vault::events::PoolBalanceChanged::match_and_decode(vault_log.log)
|
||||
{
|
||||
let component_id = format!("0x{}", hex::encode(ev.pool_id));
|
||||
|
||||
if store
|
||||
.get_last(format!("pool:{}", &component_id[..42]))
|
||||
.is_some()
|
||||
{
|
||||
for (token, delta) in ev
|
||||
.tokens
|
||||
.iter()
|
||||
.zip(ev.deltas.iter())
|
||||
.filter(|(token, _)| **token != hex::decode(&component_id[2..42]).unwrap())
|
||||
{
|
||||
deltas.push(BalanceDelta {
|
||||
ord: vault_log.ordinal(),
|
||||
tx: Some(vault_log.receipt.transaction.into()),
|
||||
token: token.to_vec(),
|
||||
delta: delta.to_signed_bytes_be(),
|
||||
component_id: component_id.as_bytes().to_vec(),
|
||||
});
|
||||
}
|
||||
}
|
||||
} else if let Some(ev) = abi::vault::events::Swap::match_and_decode(vault_log.log) {
|
||||
let component_id = format!("0x{}", hex::encode(ev.pool_id));
|
||||
|
||||
if store
|
||||
.get_last(format!("pool:{}", &component_id[..42]))
|
||||
.is_some()
|
||||
{
|
||||
deltas.extend_from_slice(&[
|
||||
BalanceDelta {
|
||||
ord: vault_log.ordinal(),
|
||||
tx: Some(vault_log.receipt.transaction.into()),
|
||||
token: ev.token_in.to_vec(),
|
||||
delta: ev.amount_in.to_signed_bytes_be(),
|
||||
component_id: component_id.as_bytes().to_vec(),
|
||||
},
|
||||
BalanceDelta {
|
||||
ord: vault_log.ordinal(),
|
||||
tx: Some(vault_log.receipt.transaction.into()),
|
||||
token: ev.token_out.to_vec(),
|
||||
delta: ev.amount_out.neg().to_signed_bytes_be(),
|
||||
component_id: component_id.as_bytes().to_vec(),
|
||||
},
|
||||
]);
|
||||
}
|
||||
} else if let Some(ev) =
|
||||
abi::vault::events::PoolBalanceManaged::match_and_decode(vault_log.log)
|
||||
{
|
||||
let component_id = format!("0x{}", hex::encode(ev.pool_id));
|
||||
deltas.extend_from_slice(&[BalanceDelta {
|
||||
ord: vault_log.ordinal(),
|
||||
tx: Some(vault_log.receipt.transaction.into()),
|
||||
token: ev.token.to_vec(),
|
||||
delta: ev.cash_delta.to_signed_bytes_be(),
|
||||
component_id: component_id.as_bytes().to_vec(),
|
||||
}]);
|
||||
}
|
||||
|
||||
deltas
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Ok(BlockBalanceDeltas { balance_deltas })
|
||||
}
|
||||
|
||||
/// It's significant to include both the `pool_id` and the `token_id` for each balance delta as the
|
||||
/// store key to ensure that there's a unique balance being tallied for each.
|
||||
#[substreams::handlers::store]
|
||||
pub fn store_balances(deltas: BlockBalanceDeltas, store: StoreAddBigInt) {
|
||||
tycho_substreams::balances::store_balance_changes(deltas, store);
|
||||
}
|
||||
|
||||
/// This is the main map that handles most of the indexing of this substream.
|
||||
/// Every contract change is grouped by transaction index via the `transaction_changes`
|
||||
/// map. Each block of code will extend the `TransactionChanges` struct with the
|
||||
/// cooresponding changes (balance, component, contract), inserting a new one if it doesn't exist.
|
||||
/// At the very end, the map can easily be sorted by index to ensure the final
|
||||
/// `BlockChanges` is ordered by transactions properly.
|
||||
#[substreams::handlers::map]
|
||||
pub fn map_protocol_changes(
|
||||
block: eth::v2::Block,
|
||||
grouped_components: BlockTransactionProtocolComponents,
|
||||
deltas: BlockBalanceDeltas,
|
||||
components_store: StoreGetString,
|
||||
balance_store: StoreDeltas, // Note, this map module is using the `deltas` mode for the store.
|
||||
) -> Result<BlockChanges> {
|
||||
// We merge contract changes by transaction (identified by transaction index) making it easy to
|
||||
// sort them at the very end.
|
||||
let mut transaction_changes: HashMap<_, TransactionChangesBuilder> = HashMap::new();
|
||||
|
||||
// `ProtocolComponents` are gathered from `map_pools_created` which just need a bit of work to
|
||||
// convert into `TransactionChanges`
|
||||
let default_attributes = vec![
|
||||
Attribute {
|
||||
name: "balance_owner".to_string(),
|
||||
value: VAULT_ADDRESS.to_vec(),
|
||||
change: ChangeType::Creation.into(),
|
||||
},
|
||||
Attribute {
|
||||
name: "update_marker".to_string(),
|
||||
value: vec![1u8],
|
||||
change: ChangeType::Creation.into(),
|
||||
},
|
||||
];
|
||||
grouped_components
|
||||
.tx_components
|
||||
.iter()
|
||||
.for_each(|tx_component| {
|
||||
// initialise builder if not yet present for this tx
|
||||
let tx = tx_component.tx.as_ref().unwrap();
|
||||
let builder = transaction_changes
|
||||
.entry(tx.index)
|
||||
.or_insert_with(|| TransactionChangesBuilder::new(tx));
|
||||
|
||||
// iterate over individual components created within this tx
|
||||
tx_component
|
||||
.components
|
||||
.iter()
|
||||
.for_each(|component| {
|
||||
builder.add_protocol_component(component);
|
||||
let entity_change = EntityChanges {
|
||||
component_id: component.id.clone(),
|
||||
attributes: default_attributes.clone(),
|
||||
};
|
||||
builder.add_entity_change(&entity_change)
|
||||
});
|
||||
});
|
||||
|
||||
// Balance changes are gathered by the `StoreDelta` based on `PoolBalanceChanged` creating
|
||||
// `BlockBalanceDeltas`. We essentially just process the changes that occurred to the `store`
|
||||
// this block. Then, these balance changes are merged onto the existing map of tx contract
|
||||
// changes, inserting a new one if it doesn't exist.
|
||||
aggregate_balances_changes(balance_store, deltas)
|
||||
.into_iter()
|
||||
.for_each(|(_, (tx, balances))| {
|
||||
let builder = transaction_changes
|
||||
.entry(tx.index)
|
||||
.or_insert_with(|| TransactionChangesBuilder::new(&tx));
|
||||
balances
|
||||
.values()
|
||||
.for_each(|token_bc_map| {
|
||||
token_bc_map
|
||||
.values()
|
||||
.for_each(|bc| builder.add_balance_change(bc))
|
||||
});
|
||||
});
|
||||
|
||||
// Extract and insert any storage changes that happened for any of the components.
|
||||
extract_contract_changes_builder(
|
||||
&block,
|
||||
|addr| {
|
||||
components_store
|
||||
.get_last(format!("pool:0x{0}", hex::encode(addr)))
|
||||
.is_some() ||
|
||||
addr.eq(VAULT_ADDRESS)
|
||||
},
|
||||
&mut transaction_changes,
|
||||
);
|
||||
|
||||
transaction_changes
|
||||
.iter_mut()
|
||||
.for_each(|(_, change)| {
|
||||
// this indirection is necessary due to borrowing rules.
|
||||
let addresses = change
|
||||
.changed_contracts()
|
||||
.map(|e| e.to_vec())
|
||||
.collect::<Vec<_>>();
|
||||
addresses
|
||||
.into_iter()
|
||||
.for_each(|address| {
|
||||
if address != VAULT_ADDRESS {
|
||||
// We reconstruct the component_id from the address here
|
||||
let id = components_store
|
||||
.get_last(format!("pool:0x{}", hex::encode(address)))
|
||||
.unwrap(); // Shouldn't happen because we filter by known components in
|
||||
// `extract_contract_changes_builder`
|
||||
change.mark_component_as_updated(&id);
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
// Process all `transaction_changes` for final output in the `BlockChanges`,
|
||||
// sorted by transaction index (the key).
|
||||
Ok(BlockChanges {
|
||||
block: Some((&block).into()),
|
||||
changes: transaction_changes
|
||||
.drain()
|
||||
.sorted_unstable_by_key(|(index, _)| *index)
|
||||
.filter_map(|(_, builder)| builder.build())
|
||||
.collect::<Vec<_>>(),
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user