diff --git a/substreams/crates/tycho-substreams/src/contract.rs b/substreams/crates/tycho-substreams/src/contract.rs index f81491b..c237d1a 100644 --- a/substreams/crates/tycho-substreams/src/contract.rs +++ b/substreams/crates/tycho-substreams/src/contract.rs @@ -10,67 +10,13 @@ /// more [here](https://streamingfastio.medium.com/new-block-model-to-accelerate-chain-integration-9f65126e5425) use std::collections::HashMap; -use substreams_ethereum::pb::eth::{ - self, - v2::{block::DetailLevel, CallType, StorageChange}, +use substreams_ethereum::pb::{ + eth, + eth::v2::block::DetailLevel, eth::v2::CallType }; - -use crate::pb::tycho::evm::v1::{self as tycho}; - -struct SlotValue { - new_value: Vec, - start_value: Vec, -} - -impl From<&StorageChange> for SlotValue { - fn from(change: &StorageChange) -> Self { - Self { new_value: change.new_value.clone(), start_value: change.old_value.clone() } - } -} - -impl SlotValue { - fn has_changed(&self) -> bool { - self.start_value != self.new_value - } -} - -// Uses a map for slots, protobuf does not allow bytes in hashmap keys -struct InterimContractChange { - address: Vec, - balance: Vec, - code: Vec, - slots: HashMap, SlotValue>, - change: tycho::ChangeType, -} - -impl InterimContractChange { - fn new(address: &[u8], creation: bool) -> Self { - Self { - address: address.to_vec(), - balance: vec![], - code: vec![], - slots: Default::default(), - change: if creation { tycho::ChangeType::Creation } else { tycho::ChangeType::Update }, - } - } -} - -impl From for tycho::ContractChange { - fn from(value: InterimContractChange) -> Self { - tycho::ContractChange { - address: value.address, - balance: value.balance, - code: value.code, - slots: value - .slots - .into_iter() - .filter(|(_, value)| value.has_changed()) - .map(|(slot, value)| tycho::ContractSlot { slot, value: value.new_value }) - .collect(), - change: value.change.into(), - } - } -} +use substreams_ethereum::pb::eth::v2::TransactionTrace; +use crate::models::{InterimContractChange, TransactionChanges}; +use crate::prelude::TransactionChangesBuilder; /// Extracts and aggregates contract changes from a block. /// @@ -101,7 +47,51 @@ impl From for tycho::ContractChange { pub fn extract_contract_changes bool>( block: ð::v2::Block, inclusion_predicate: F, - transaction_changes: &mut HashMap, + transaction_changes: &mut HashMap, +) { + extract_contract_changes_generic( + block, + inclusion_predicate, + |tx, changed_contracts| { + transaction_changes + .entry(tx.index.into()) + .or_insert_with(|| TransactionChanges::new(&(tx.into()))) + .contract_changes + .extend( + changed_contracts + .clone() + .into_values() + .map(|change| change.into()), + ); + }, + ) +} + + +pub fn extract_contract_changes_builder bool>( + block: ð::v2::Block, + inclusion_predicate: F, + transaction_changes: &mut HashMap, +) { + extract_contract_changes_generic( + block, + inclusion_predicate, + |tx, changed_contracts| { + let builder = transaction_changes + .entry(tx.index.into()) + .or_insert_with(|| TransactionChangesBuilder::new(&(tx.into()))); + changed_contracts + .clone() + .into_iter() + .for_each(|(_, change)| builder.add_contract_changes(&change)); + }, + ) +} + +fn extract_contract_changes_generic bool, G: FnMut(&TransactionTrace, &HashMap, InterimContractChange>)>( + block: ð::v2::Block, + inclusion_predicate: F, + mut store_changes: G, ) { if block.detail_level != Into::::into(DetailLevel::DetaillevelExtended) { panic!("Only extended blocks are supported"); @@ -160,14 +150,7 @@ pub fn extract_contract_changes bool>( ) }); - let slot_value = contract_change - .slots - .entry(storage_change.key.clone()) - .or_insert_with(|| storage_change.into()); - - slot_value - .new_value - .copy_from_slice(&storage_change.new_value); + contract_change.upsert_slot(storage_change); }); balance_changes @@ -184,10 +167,7 @@ pub fn extract_contract_changes bool>( }); if let Some(new_balance) = &balance_change.new_value { - contract_change.balance.clear(); - contract_change - .balance - .extend_from_slice(&new_balance.bytes); + contract_change.set_balance(&new_balance.bytes); } }); @@ -204,25 +184,15 @@ pub fn extract_contract_changes bool>( ) }); - contract_change.code.clear(); - contract_change - .code - .extend_from_slice(&code_change.new_code); + contract_change.set_code(&code_change.new_code); }); if !storage_changes.is_empty() || !balance_changes.is_empty() || !code_changes.is_empty() { - transaction_changes - .entry(block_tx.index.into()) - .or_insert_with(|| tycho::TransactionChanges::new(&(block_tx.into()))) - .contract_changes - .extend( - changed_contracts - .drain() - .map(|(_, change)| change.into()), - ); + store_changes(block_tx, &changed_contracts) } + changed_contracts.clear() }); } diff --git a/substreams/crates/tycho-substreams/src/models.rs b/substreams/crates/tycho-substreams/src/models.rs index a176d2d..41aaa13 100644 --- a/substreams/crates/tycho-substreams/src/models.rs +++ b/substreams/crates/tycho-substreams/src/models.rs @@ -1,4 +1,5 @@ -use substreams_ethereum::pb::eth::v2::{self as sf}; +use std::collections::HashMap; +use substreams_ethereum::pb::eth::v2::{self as sf, StorageChange}; // re-export the protobuf types here. pub use crate::pb::tycho::evm::v1::*; @@ -17,6 +18,161 @@ impl TransactionChanges { } } +/// Builds `TransactionChanges` struct +/// +/// Ensures uniqueness for contract addresses and component ids. +#[derive(Default)] +pub struct TransactionChangesBuilder { + tx: Option, + contract_changes: HashMap, InterimContractChange>, + entity_changes: HashMap, + component_changes: HashMap, + balance_changes: HashMap<(Vec, Vec), BalanceChange>, +} + +impl TransactionChangesBuilder { + /// Initialize a new builder for a transaction. + pub fn new(tx: &Transaction) -> Self { + Self { tx: Some(tx.clone()), ..Default::default() } + } + + /// Register a new contract change. + /// + /// Will prioritize the new change over any already present one. + pub fn add_contract_changes(&mut self, change: &InterimContractChange) { + self.contract_changes + .entry(change.address.clone()) + .and_modify(|c| { + if !change.balance.is_empty() { + c.set_balance(&change.balance) + } + if !change.slots.is_empty() { + c.upsert_slots(&change.slots) + } + if !change.code.is_empty() { + c.set_code(&change.code) + } + }) + .or_insert_with(|| { + let mut c = InterimContractChange::new( + &change.address, + change.change == ChangeType::Creation, + ); + c.upsert_slots(&change.slots); + c.set_code(&change.code); + c.set_balance(&change.balance); + c + }); + } + + /// Unique contract/account addresses that have been changed so far. + pub fn changed_contracts(&self) -> impl Iterator { + self.contract_changes + .keys() + .map(|k| k.as_slice()) + } + + /// Marks a component as updated. + /// + /// If the protocol does not follow a 1:1 logic between components and contracts. + /// Components can be manually marked as updated using this method. + pub fn mark_component_as_updated(&mut self, component_id: &str) { + let attr = Attribute { + name: "update_marker".to_string(), + value: vec![1u8], + change: ChangeType::Update.into(), + }; + if let Some(entry) = self + .entity_changes + .get_mut(component_id) + { + entry.set_attribute(&attr); + } else { + let mut change = InterimEntityChanges::new(component_id); + change.set_attribute(&attr); + self.entity_changes + .insert(component_id.to_string(), change); + } + } + + /// Registers a new entity change. + /// + /// Will prioritize the new change over any already present one. + pub fn add_entity_change(&mut self, change: &EntityChanges) { + self.entity_changes + .entry(change.component_id.clone()) + .and_modify(|ec| { + for attr in change.attributes.iter() { + ec.set_attribute(attr); + } + }) + .or_insert_with(|| InterimEntityChanges { + component_id: change.component_id.clone(), + attributes: change + .attributes + .clone() + .into_iter() + .map(|a| (a.name.clone(), a)) + .collect(), + }); + } + + /// Adds a new protocol component. + /// + /// ## Note + /// This method is a noop, in case the component is already present. Since + /// components are assumed to be immutable. + pub fn add_protocol_component(&mut self, component: &ProtocolComponent) { + if !self + .component_changes + .contains_key(&component.id) + { + self.component_changes + .insert(component.id.clone(), component.clone()); + } + } + + /// Updates a components balances + /// + /// Overwrites any previous balance changes of the component if present. + pub fn add_balance_change(&mut self, change: &BalanceChange) { + self.balance_changes + .insert((change.component_id.clone(), change.token.clone()), change.clone()); + } + + pub fn build(self) -> Option { + if self.contract_changes.is_empty() && + self.component_changes.is_empty() && + self.balance_changes.is_empty() && + self.entity_changes.is_empty() + { + None + } else { + Some(TransactionChanges { + tx: self.tx, + contract_changes: self + .contract_changes + .into_values() + .map(|interim| interim.into()) + .collect::>(), + entity_changes: self + .entity_changes + .into_values() + .map(|interim| interim.into()) + .collect::>(), + component_changes: self + .component_changes + .into_values() + .collect::>(), + balance_changes: self + .balance_changes + .into_values() + .collect::>(), + }) + } + } +} + impl From<&sf::TransactionTrace> for Transaction { fn from(tx: &sf::TransactionTrace) -> Self { Self { @@ -145,3 +301,128 @@ impl ProtocolComponent { self } } + +/// Same as `EntityChanges` but ensures attributes are unique by name. +#[derive(Default)] +pub struct InterimEntityChanges { + component_id: String, + attributes: HashMap, +} + +impl InterimEntityChanges { + pub fn new(id: &str) -> Self { + Self { component_id: id.to_string(), ..Default::default() } + } + + pub fn set_attribute(&mut self, attr: &Attribute) { + self.attributes + .entry(attr.name.clone()) + .and_modify(|existing| *existing = attr.clone()) + .or_insert(attr.clone()); + } +} + +impl From for EntityChanges { + fn from(value: InterimEntityChanges) -> Self { + EntityChanges { + component_id: value.component_id.clone(), + attributes: value + .attributes + .into_values() + .collect::>(), + } + } +} + +#[derive(Clone)] +struct SlotValue { + new_value: Vec, + start_value: Vec, +} + +impl SlotValue { + fn has_changed(&self) -> bool { + self.start_value != self.new_value + } +} + +impl From<&StorageChange> for SlotValue { + fn from(change: &StorageChange) -> Self { + Self { new_value: change.new_value.clone(), start_value: change.old_value.clone() } + } +} + +// Uses a map for slots, protobuf does not allow bytes in hashmap keys +#[derive(Clone)] +pub struct InterimContractChange { + address: Vec, + balance: Vec, + code: Vec, + slots: HashMap, SlotValue>, + change: ChangeType, +} + +impl InterimContractChange { + pub fn new(address: &[u8], creation: bool) -> Self { + Self { + address: address.to_vec(), + balance: vec![], + code: vec![], + slots: Default::default(), + change: if creation { ChangeType::Creation } else { ChangeType::Update }, + } + } + + pub fn upsert_slot(&mut self, change: &StorageChange) { + if change.address != self.address { + panic!("Bad storage change"); + } + self.slots + .entry(change.key.clone()) + .and_modify(|sv| { + sv.new_value + .copy_from_slice(&change.new_value) + }) + .or_insert_with(|| change.into()); + } + + fn upsert_slots(&mut self, changes: &HashMap, SlotValue>) { + for (slot, value) in changes.iter() { + self.slots + .entry(slot.clone()) + .and_modify(|sv| { + sv.new_value + .copy_from_slice(&value.new_value) + }) + .or_insert(value.clone()); + } + } + + pub fn set_balance(&mut self, new_balance: &[u8]) { + self.balance.clear(); + self.balance + .extend_from_slice(new_balance); + } + + pub fn set_code(&mut self, code: &[u8]) { + self.code.clear(); + self.code.extend_from_slice(code); + } +} + +impl From for ContractChange { + fn from(value: InterimContractChange) -> Self { + ContractChange { + address: value.address, + balance: value.balance, + code: value.code, + slots: value + .slots + .into_iter() + .filter(|(_, value)| value.has_changed()) + .map(|(slot, value)| ContractSlot { slot, value: value.new_value }) + .collect(), + change: value.change.into(), + } + } +} diff --git a/substreams/crates/tycho-substreams/src/pb/mod.rs b/substreams/crates/tycho-substreams/src/pb/mod.rs index 43d8838..43002f2 100644 --- a/substreams/crates/tycho-substreams/src/pb/mod.rs +++ b/substreams/crates/tycho-substreams/src/pb/mod.rs @@ -7,4 +7,4 @@ pub mod tycho { // @@protoc_insertion_point(tycho.evm.v1) } } -} +} \ No newline at end of file diff --git a/substreams/ethereum-balancer/src/modules.rs b/substreams/ethereum-balancer/src/modules.rs index 90297a1..3804bd8 100644 --- a/substreams/ethereum-balancer/src/modules.rs +++ b/substreams/ethereum-balancer/src/modules.rs @@ -9,7 +9,7 @@ use substreams::{ }; use substreams_ethereum::{pb::eth, Event}; use tycho_substreams::{ - balances::aggregate_balances_changes, contract::extract_contract_changes, prelude::*, + balances::aggregate_balances_changes, contract::extract_contract_changes_builder, prelude::*, }; pub const VAULT_ADDRESS: &[u8] = &hex!("BA12222222228d8Ba445958a75a0704d566BF2C8"); @@ -146,40 +146,44 @@ pub fn map_protocol_changes( ) -> Result { // We merge contract changes by transaction (identified by transaction index) making it easy to // sort them at the very end. - let mut transaction_changes: HashMap<_, TransactionChanges> = HashMap::new(); + let mut transaction_changes: HashMap<_, TransactionChangesBuilder> = HashMap::new(); // `ProtocolComponents` are gathered from `map_pools_created` which just need a bit of work to // convert into `TransactionChanges` + let default_attributes = vec![ + Attribute { + name: "balance_owner".to_string(), + value: "0xBA12222222228d8Ba445958a75a0704d566BF2C8" + .to_string() + .as_bytes() + .to_vec(), + change: ChangeType::Creation.into(), + }, + Attribute { + name: "update_marker".to_string(), + value: vec![1u8], + change: ChangeType::Creation.into(), + }, + ]; grouped_components .tx_components .iter() .for_each(|tx_component| { + // initialise builder if not yet present for this tx let tx = tx_component.tx.as_ref().unwrap(); - transaction_changes + let builder = transaction_changes .entry(tx.index) - .or_insert_with(|| TransactionChanges::new(tx)) - .component_changes - .extend_from_slice(&tx_component.components); - tx_component - .components - .iter() - .for_each(|component| { - transaction_changes - .entry(tx.index) - .or_insert_with(|| TransactionChanges::new(tx)) - .entity_changes - .push(EntityChanges { - component_id: component.id.clone(), - attributes: vec![Attribute { - name: "balance_owner".to_string(), - value: "0xBA12222222228d8Ba445958a75a0704d566BF2C8" - .to_string() - .as_bytes() - .to_vec(), - change: ChangeType::Creation.into(), - }], - }); - }); + .or_insert_with(|| TransactionChangesBuilder::new(tx)); + + // iterate over individual components created within this tx + tx_component.components.iter().for_each(|component| { + builder.add_protocol_component(component); + let entity_change = EntityChanges { + component_id: component.id.clone(), + attributes: default_attributes.clone(), + }; + builder.add_entity_change(&entity_change) + }); }); // Balance changes are gathered by the `StoreDelta` based on `PoolBalanceChanged` creating @@ -189,15 +193,14 @@ pub fn map_protocol_changes( aggregate_balances_changes(balance_store, deltas) .into_iter() .for_each(|(_, (tx, balances))| { - transaction_changes + let builder = transaction_changes .entry(tx.index) - .or_insert_with(|| TransactionChanges::new(&tx)) - .balance_changes - .extend(balances.into_values()); + .or_insert_with(|| TransactionChangesBuilder::new(&tx)); + balances.values().for_each(|bc| builder.add_balance_change(bc)); }); // Extract and insert any storage changes that happened for any of the components. - extract_contract_changes( + extract_contract_changes_builder( &block, |addr| { components_store @@ -208,6 +211,17 @@ pub fn map_protocol_changes( &mut transaction_changes, ); + transaction_changes.iter_mut().for_each(|(_, change)| { + // this indirection is necessary due to borrowing rules. + let addresses = change.changed_contracts().map(|e| e.to_vec()).collect::>(); + addresses.into_iter().for_each(|address| { + if address != VAULT_ADDRESS { + // We reconstruct the component_id from the address here + change.mark_component_as_updated(&format!("0x{}", hex::encode(address))) + } + }) + }); + // Process all `transaction_changes` for final output in the `BlockChanges`, // sorted by transaction index (the key). Ok(BlockChanges { @@ -215,16 +229,8 @@ pub fn map_protocol_changes( changes: transaction_changes .drain() .sorted_unstable_by_key(|(index, _)| *index) - .filter_map(|(_, change)| { - if change.contract_changes.is_empty() && - change.component_changes.is_empty() && - change.balance_changes.is_empty() && - change.entity_changes.is_empty() - { - None - } else { - Some(change) - } + .filter_map(|(_, builder)| { + builder.build() }) .collect::>(), })