Integrate balancer substream with sdk.
We defer the contract storage extraction as well as the balance handling (apart from actually extracing relative deltas) to the SDK.
This commit is contained in:
@@ -1,27 +1,20 @@
|
||||
use std::collections::HashMap;
|
||||
use std::str::FromStr;
|
||||
|
||||
use crate::{abi, pool_factories};
|
||||
use anyhow::Result;
|
||||
use itertools::Itertools;
|
||||
use std::collections::HashMap;
|
||||
use substreams::hex;
|
||||
use substreams::pb::substreams::StoreDeltas;
|
||||
use substreams::store::{
|
||||
StoreAdd, StoreAddBigInt, StoreAddInt64, StoreGet, StoreGetInt64, StoreNew,
|
||||
};
|
||||
|
||||
use substreams::key;
|
||||
use substreams::scalar::BigInt;
|
||||
|
||||
use substreams_ethereum::pb::eth;
|
||||
|
||||
use itertools::Itertools;
|
||||
|
||||
|
||||
use contract_changes::extract_contract_changes;
|
||||
use substreams_ethereum::Event;
|
||||
|
||||
use crate::pb::tycho::evm::v1::{self as tycho};
|
||||
use crate::pb::tycho::evm::v1::{BalanceDelta, BlockBalanceDeltas, BlockTransactionProtocolComponents, TransactionProtocolComponents};
|
||||
use crate::{abi, contract_changes, pool_factories};
|
||||
use tycho_substreams::balances;
|
||||
use tycho_substreams::contract::extract_contract_changes;
|
||||
use tycho_substreams::pb::tycho::evm::v1::{
|
||||
self as tycho, BalanceDelta, BlockBalanceDeltas, BlockTransactionProtocolComponents,
|
||||
TransactionProtocolComponents,
|
||||
};
|
||||
|
||||
const VAULT_ADDRESS: &[u8] = &hex!("BA12222222228d8Ba445958a75a0704d566BF2C8");
|
||||
|
||||
@@ -110,7 +103,13 @@ pub fn map_balance_deltas(
|
||||
if let Some(ev) =
|
||||
abi::vault::events::PoolBalanceChanged::match_and_decode(vault_log.log)
|
||||
{
|
||||
let component_id = ev.pool_id[..20].to_vec();
|
||||
let component_id = format!(
|
||||
"0x{}",
|
||||
String::from_utf8(ev.pool_id[..20].to_vec()).unwrap()
|
||||
)
|
||||
.as_bytes()
|
||||
.to_vec();
|
||||
|
||||
if store
|
||||
.get_last(format!("pool:{}", hex::encode(&component_id)))
|
||||
.is_some()
|
||||
@@ -131,7 +130,13 @@ pub fn map_balance_deltas(
|
||||
}
|
||||
}
|
||||
} else if let Some(ev) = abi::vault::events::Swap::match_and_decode(vault_log.log) {
|
||||
let component_id = ev.pool_id[..20].to_vec();
|
||||
let component_id = format!(
|
||||
"0x{}",
|
||||
String::from_utf8(ev.pool_id[..20].to_vec()).unwrap()
|
||||
)
|
||||
.as_bytes()
|
||||
.to_vec();
|
||||
|
||||
if store
|
||||
.get_last(format!("pool:{}", hex::encode(&component_id)))
|
||||
.is_some()
|
||||
@@ -176,17 +181,7 @@ pub fn map_balance_deltas(
|
||||
/// store key to ensure that there's a unique balance being tallied for each.
|
||||
#[substreams::handlers::store]
|
||||
pub fn store_balance_changes(deltas: BlockBalanceDeltas, store: StoreAddBigInt) {
|
||||
deltas.balance_deltas.iter().for_each(|delta| {
|
||||
store.add(
|
||||
delta.ord,
|
||||
format!(
|
||||
"pool:{0}:token:{1}",
|
||||
hex::encode(&delta.component_id),
|
||||
hex::encode(&delta.token)
|
||||
),
|
||||
BigInt::from_signed_bytes_be(&delta.delta),
|
||||
);
|
||||
});
|
||||
balances::store_balance_changes(deltas, store);
|
||||
}
|
||||
|
||||
/// This is the main map that handles most of the indexing of this substream.
|
||||
@@ -215,67 +210,37 @@ pub fn map_changes(
|
||||
.iter()
|
||||
.for_each(|tx_component| {
|
||||
let tx = tx_component.tx.as_ref().unwrap();
|
||||
|
||||
transaction_contract_changes
|
||||
.entry(tx.index)
|
||||
.or_insert_with(|| tycho::TransactionContractChanges {
|
||||
tx: Some(tx.clone()),
|
||||
contract_changes: vec![],
|
||||
component_changes: vec![],
|
||||
balance_changes: vec![],
|
||||
})
|
||||
.or_insert_with(|| tycho::TransactionContractChanges::new(&tx))
|
||||
.component_changes
|
||||
.extend_from_slice(&tx_component.components);
|
||||
});
|
||||
|
||||
// Balance changes are gathered by the `StoreDelta` based on `PoolBalanceChanged` creating
|
||||
// `BlockBalanceDeltas`. We essentially just process the changes that occured to the `store` this
|
||||
// `BlockBalanceDeltas`. We essentially just process the changes that occurred to the `store` this
|
||||
// block. Then, these balance changes are merged onto the existing map of tx contract changes,
|
||||
// inserting a new one if it doesn't exist.
|
||||
balance_store
|
||||
.deltas
|
||||
balances::aggregate_balances_changes(balance_store, deltas)
|
||||
.into_iter()
|
||||
.zip(deltas.balance_deltas)
|
||||
.map(|(store_delta, balance_delta)| {
|
||||
let pool_id = key::segment_at(&store_delta.key, 1);
|
||||
let token_id = key::segment_at(&store_delta.key, 3);
|
||||
// store_delta.new_value is an ASCII string representing an integer
|
||||
let ascii_string =
|
||||
String::from_utf8(store_delta.new_value.clone()).expect("Invalid UTF-8 sequence");
|
||||
let balance = BigInt::from_str(&ascii_string).expect("Failed to parse integer");
|
||||
let big_endian_bytes_balance = balance.to_bytes_be().1;
|
||||
|
||||
(
|
||||
balance_delta.tx.unwrap(),
|
||||
tycho::BalanceChange {
|
||||
token: hex::decode(token_id).expect("Token ID not valid hex"),
|
||||
balance: big_endian_bytes_balance,
|
||||
component_id: pool_id.as_bytes().to_vec(),
|
||||
},
|
||||
)
|
||||
})
|
||||
// We need to group the balance changes by tx hash for the `TransactionContractChanges` agg
|
||||
.group_by(|(tx, _)| TransactionWrapper(tx.clone()))
|
||||
.into_iter()
|
||||
.for_each(|(tx_wrapped, group)| {
|
||||
let tx = tx_wrapped.0;
|
||||
|
||||
.for_each(|(_, (tx, balances))| {
|
||||
transaction_contract_changes
|
||||
.entry(tx.index)
|
||||
.or_insert_with(|| tycho::TransactionContractChanges {
|
||||
tx: Some(tx.clone()),
|
||||
contract_changes: vec![],
|
||||
component_changes: vec![],
|
||||
balance_changes: vec![],
|
||||
})
|
||||
.or_insert_with(|| tycho::TransactionContractChanges::new(&tx))
|
||||
.balance_changes
|
||||
.extend(group.map(|(_, change)| change));
|
||||
.extend(balances.into_iter().map(|(_, change)| change));
|
||||
});
|
||||
|
||||
// General helper for extracting contract changes. Uses block, our component store which holds
|
||||
// all of our tracked deployed pool addresses, and the map of tx contract changes which we
|
||||
// output into for final processing later.
|
||||
extract_contract_changes(&block, components_store, &mut transaction_contract_changes);
|
||||
// Extract and insert any storage changes that happened for any of the components.
|
||||
extract_contract_changes(
|
||||
&block,
|
||||
|addr| {
|
||||
components_store
|
||||
.get_last(format!("pool:{0}", hex::encode(&addr)))
|
||||
.is_some()
|
||||
},
|
||||
&mut transaction_contract_changes,
|
||||
);
|
||||
|
||||
// Process all `transaction_contract_changes` for final output in the `BlockContractChanges`,
|
||||
// sorted by transaction index (the key).
|
||||
|
||||
Reference in New Issue
Block a user