feat(tycho-substreams): Add builder for TransactionChanges.
This builder allows easier access to already changed contract addresses, and entity attributes while avoiding duplicated entries. This is needed to implement the update markers on balancer substreams.
This commit is contained in:
@@ -9,7 +9,7 @@ use substreams::{
|
||||
};
|
||||
use substreams_ethereum::{pb::eth, Event};
|
||||
use tycho_substreams::{
|
||||
balances::aggregate_balances_changes, contract::extract_contract_changes, prelude::*,
|
||||
balances::aggregate_balances_changes, contract::extract_contract_changes_builder, prelude::*,
|
||||
};
|
||||
|
||||
pub const VAULT_ADDRESS: &[u8] = &hex!("BA12222222228d8Ba445958a75a0704d566BF2C8");
|
||||
@@ -146,40 +146,44 @@ pub fn map_protocol_changes(
|
||||
) -> Result<BlockChanges> {
|
||||
// We merge contract changes by transaction (identified by transaction index) making it easy to
|
||||
// sort them at the very end.
|
||||
let mut transaction_changes: HashMap<_, TransactionChanges> = HashMap::new();
|
||||
let mut transaction_changes: HashMap<_, TransactionChangesBuilder> = HashMap::new();
|
||||
|
||||
// `ProtocolComponents` are gathered from `map_pools_created` which just need a bit of work to
|
||||
// convert into `TransactionChanges`
|
||||
let default_attributes = vec![
|
||||
Attribute {
|
||||
name: "balance_owner".to_string(),
|
||||
value: "0xBA12222222228d8Ba445958a75a0704d566BF2C8"
|
||||
.to_string()
|
||||
.as_bytes()
|
||||
.to_vec(),
|
||||
change: ChangeType::Creation.into(),
|
||||
},
|
||||
Attribute {
|
||||
name: "update_marker".to_string(),
|
||||
value: vec![1u8],
|
||||
change: ChangeType::Creation.into(),
|
||||
},
|
||||
];
|
||||
grouped_components
|
||||
.tx_components
|
||||
.iter()
|
||||
.for_each(|tx_component| {
|
||||
// initialise builder if not yet present for this tx
|
||||
let tx = tx_component.tx.as_ref().unwrap();
|
||||
transaction_changes
|
||||
let builder = transaction_changes
|
||||
.entry(tx.index)
|
||||
.or_insert_with(|| TransactionChanges::new(tx))
|
||||
.component_changes
|
||||
.extend_from_slice(&tx_component.components);
|
||||
tx_component
|
||||
.components
|
||||
.iter()
|
||||
.for_each(|component| {
|
||||
transaction_changes
|
||||
.entry(tx.index)
|
||||
.or_insert_with(|| TransactionChanges::new(tx))
|
||||
.entity_changes
|
||||
.push(EntityChanges {
|
||||
component_id: component.id.clone(),
|
||||
attributes: vec![Attribute {
|
||||
name: "balance_owner".to_string(),
|
||||
value: "0xBA12222222228d8Ba445958a75a0704d566BF2C8"
|
||||
.to_string()
|
||||
.as_bytes()
|
||||
.to_vec(),
|
||||
change: ChangeType::Creation.into(),
|
||||
}],
|
||||
});
|
||||
});
|
||||
.or_insert_with(|| TransactionChangesBuilder::new(tx));
|
||||
|
||||
// iterate over individual components created within this tx
|
||||
tx_component.components.iter().for_each(|component| {
|
||||
builder.add_protocol_component(component);
|
||||
let entity_change = EntityChanges {
|
||||
component_id: component.id.clone(),
|
||||
attributes: default_attributes.clone(),
|
||||
};
|
||||
builder.add_entity_change(&entity_change)
|
||||
});
|
||||
});
|
||||
|
||||
// Balance changes are gathered by the `StoreDelta` based on `PoolBalanceChanged` creating
|
||||
@@ -189,15 +193,14 @@ pub fn map_protocol_changes(
|
||||
aggregate_balances_changes(balance_store, deltas)
|
||||
.into_iter()
|
||||
.for_each(|(_, (tx, balances))| {
|
||||
transaction_changes
|
||||
let builder = transaction_changes
|
||||
.entry(tx.index)
|
||||
.or_insert_with(|| TransactionChanges::new(&tx))
|
||||
.balance_changes
|
||||
.extend(balances.into_values());
|
||||
.or_insert_with(|| TransactionChangesBuilder::new(&tx));
|
||||
balances.values().for_each(|bc| builder.add_balance_change(bc));
|
||||
});
|
||||
|
||||
// Extract and insert any storage changes that happened for any of the components.
|
||||
extract_contract_changes(
|
||||
extract_contract_changes_builder(
|
||||
&block,
|
||||
|addr| {
|
||||
components_store
|
||||
@@ -208,6 +211,17 @@ pub fn map_protocol_changes(
|
||||
&mut transaction_changes,
|
||||
);
|
||||
|
||||
transaction_changes.iter_mut().for_each(|(_, change)| {
|
||||
// this indirection is necessary due to borrowing rules.
|
||||
let addresses = change.changed_contracts().map(|e| e.to_vec()).collect::<Vec<_>>();
|
||||
addresses.into_iter().for_each(|address| {
|
||||
if address != VAULT_ADDRESS {
|
||||
// We reconstruct the component_id from the address here
|
||||
change.mark_component_as_updated(&format!("0x{}", hex::encode(address)))
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
// Process all `transaction_changes` for final output in the `BlockChanges`,
|
||||
// sorted by transaction index (the key).
|
||||
Ok(BlockChanges {
|
||||
@@ -215,16 +229,8 @@ pub fn map_protocol_changes(
|
||||
changes: transaction_changes
|
||||
.drain()
|
||||
.sorted_unstable_by_key(|(index, _)| *index)
|
||||
.filter_map(|(_, change)| {
|
||||
if change.contract_changes.is_empty() &&
|
||||
change.component_changes.is_empty() &&
|
||||
change.balance_changes.is_empty() &&
|
||||
change.entity_changes.is_empty()
|
||||
{
|
||||
None
|
||||
} else {
|
||||
Some(change)
|
||||
}
|
||||
.filter_map(|(_, builder)| {
|
||||
builder.build()
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user