diff --git a/crates/tycho-substreams/Cargo.lock b/crates/tycho-substreams/Cargo.lock index 5e61c4a..cd827d8 100644 --- a/crates/tycho-substreams/Cargo.lock +++ b/crates/tycho-substreams/Cargo.lock @@ -333,6 +333,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.10" @@ -530,7 +539,7 @@ checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270" dependencies = [ "bytes", "heck", - "itertools", + "itertools 0.10.5", "lazy_static", "log", "multimap", @@ -551,7 +560,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" dependencies = [ "anyhow", - "itertools", + "itertools 0.10.5", "proc-macro2", "quote", "syn 1.0.109", @@ -908,6 +917,10 @@ dependencies = [ name = "tycho-substreams" version = "0.1.0" dependencies = [ + "hex", + "itertools 0.12.1", + "prost", + "substreams", "substreams-ethereum", ] diff --git a/crates/tycho-substreams/Cargo.toml b/crates/tycho-substreams/Cargo.toml index 269aea3..cc0f7bc 100644 --- a/crates/tycho-substreams/Cargo.toml +++ b/crates/tycho-substreams/Cargo.toml @@ -4,4 +4,8 @@ version = "0.1.0" edition = "2021" [dependencies] -substreams-ethereum = "0.9.9" \ No newline at end of file +substreams-ethereum = "0.9.9" +substreams = "0.5" +prost = "0.11" +hex = "0.4.3" +itertools = "0.12.0" \ No newline at end of file diff --git a/crates/tycho-substreams/rustfmt.toml b/crates/tycho-substreams/rustfmt.toml new file mode 100644 index 0000000..d0c0193 --- /dev/null +++ b/crates/tycho-substreams/rustfmt.toml @@ -0,0 +1,13 @@ +reorder_imports = true +imports_granularity = "Crate" +use_small_heuristics = "Max" +comment_width = 100 +wrap_comments = true +binop_separator = "Back" +trailing_comma = "Vertical" +trailing_semicolon = false +use_field_init_shorthand = true +chain_width = 40 +ignore = [ + "src/pb", +] \ No newline at end of file diff --git a/crates/tycho-substreams/src/balances.rs b/crates/tycho-substreams/src/balances.rs new file mode 100644 index 0000000..dbc780a --- /dev/null +++ b/crates/tycho-substreams/src/balances.rs @@ -0,0 +1,306 @@ +//! Utilities to handle relative balances. +//! +//! +//! To aggregate relative balances changes to absolute balances the general approach is: +//! +//! 1. Use a map function that will extract a `BlockBalanceDeltas` message. BalanceDeltas +//! within this message are required to have increasing ordinals so that +//! the order of relative balance changes is unambiguous. +//! 2. Store the balances changes with a store handler. You can use the +//! `store_balance_changes` library method directly for this. +//! 3. In the output module, use aggregate_balance_changes to receive an +//! aggregated map of absolute balances. +//! +use crate::pb::tycho::evm::v1::{BalanceChange, BlockBalanceDeltas}; +use itertools::Itertools; +use std::collections::HashMap; +use std::str::FromStr; +use substreams::key; +use substreams::pb::substreams::StoreDeltas; +use substreams::prelude::{BigInt, StoreAdd}; + +pub fn store_balance_changes(deltas: BlockBalanceDeltas, store: impl StoreAdd) { + let mut previous_ordinal = HashMap::::new(); + deltas + .balance_deltas + .iter() + .for_each(|delta| { + let balance_key = format!( + "{0}:{1}", + String::from_utf8(delta.component_id.clone()) + .expect("delta.component_id is not valid utf-8!"), + hex::encode(&delta.token) + ); + let current_ord = delta.ord; + previous_ordinal + .entry(balance_key.clone()) + .and_modify(|ord| { + // ordinals must arrive in increasing order + if *ord >= current_ord { + panic!( + "Invalid ordinal sequence for {}: {} >= {}", + balance_key, *ord, current_ord + ); + } + *ord = current_ord; + }) + .or_insert(delta.ord); + store.add(delta.ord, balance_key, BigInt::from_signed_bytes_be(&delta.delta)); + }); +} + +pub fn aggregate_balances_changes( + balance_store: StoreDeltas, + deltas: BlockBalanceDeltas, +) -> HashMap, HashMap, BalanceChange>> { + balance_store + .deltas + .into_iter() + .zip(deltas.balance_deltas) + .map(|(store_delta, balance_delta)| { + let component_id = key::segment_at(&store_delta.key, 0); + let token_id = key::segment_at(&store_delta.key, 1); + // store_delta.new_value is an ASCII string representing an integer + let ascii_string = + String::from_utf8(store_delta.new_value.clone()).expect("Invalid UTF-8 sequence"); + let balance = BigInt::from_str(&ascii_string).expect("Failed to parse integer"); + let big_endian_bytes_balance = balance.to_bytes_be().1; + + ( + balance_delta.tx.unwrap(), + BalanceChange { + token: hex::decode(token_id).expect("Token ID not valid hex"), + balance: big_endian_bytes_balance, + component_id: component_id.as_bytes().to_vec(), + }, + ) + }) + // We need to group the balance changes by tx hash for the `TransactionContractChanges` agg + .group_by(|(tx, _)| tx.hash.clone()) + .into_iter() + .map(|(txh, group)| { + let balances = group + .into_iter() + .map(|(_, delta)| (delta.token.clone(), delta)) + .collect(); + (txh, balances) + }) + .collect() +} + +mod tests { + use super::*; + use crate::mock_store::MockStore; + use crate::pb::tycho::evm::v1::{BalanceDelta, Transaction}; + use substreams::pb::substreams::StoreDelta; + use substreams::prelude::{StoreGet, StoreNew}; + + fn block_balance_deltas() -> BlockBalanceDeltas { + let comp_id = "0x42c0ffee" + .to_string() + .as_bytes() + .to_vec(); + let token_0 = hex::decode("bad999").unwrap(); + let token_1 = hex::decode("babe00").unwrap(); + BlockBalanceDeltas { + balance_deltas: vec![ + BalanceDelta { + ord: 0, + tx: Some(Transaction { + hash: vec![0, 1], + from: vec![9, 9], + to: vec![8, 8], + index: 0, + }), + token: token_0.clone(), + delta: BigInt::from_str("+1000") + .unwrap() + .to_signed_bytes_be(), + component_id: comp_id.clone(), + }, + BalanceDelta { + ord: 2, + tx: Some(Transaction { + hash: vec![0, 1], + from: vec![9, 9], + to: vec![8, 8], + index: 0, + }), + token: token_1.clone(), + delta: BigInt::from_str("+100") + .unwrap() + .to_signed_bytes_be(), + component_id: comp_id.clone(), + }, + BalanceDelta { + ord: 3, + tx: Some(Transaction { + hash: vec![0, 1], + from: vec![9, 9], + to: vec![8, 8], + index: 0, + }), + token: token_1.clone(), + delta: BigInt::from_str("50") + .unwrap() + .to_signed_bytes_be(), + component_id: comp_id.clone(), + }, + BalanceDelta { + ord: 10, + tx: Some(Transaction { + hash: vec![0, 1], + from: vec![9, 9], + to: vec![8, 8], + index: 0, + }), + token: token_0.clone(), + delta: BigInt::from_str("-1") + .unwrap() + .to_signed_bytes_be(), + component_id: comp_id.clone(), + }, + ], + } + } + fn store_deltas() -> StoreDeltas { + let comp_id = "0x42c0ffee" + .to_string() + .as_bytes() + .to_vec(); + let token_0 = hex::decode("bad999").unwrap(); + let token_1 = hex::decode("babe00").unwrap(); + + let t0_key = + format!("{}:{}", String::from_utf8(comp_id.clone()).unwrap(), hex::encode(&token_0)); + let t1_key = + format!("{}:{}", String::from_utf8(comp_id.clone()).unwrap(), hex::encode(&token_1)); + StoreDeltas { + deltas: vec![ + StoreDelta { + operation: 0, + ordinal: 0, + key: t0_key.clone(), + old_value: BigInt::from(0) + .to_string() + .as_bytes() + .to_vec(), + new_value: BigInt::from(1000) + .to_string() + .as_bytes() + .to_vec(), + }, + StoreDelta { + operation: 0, + ordinal: 2, + key: t1_key.clone(), + old_value: BigInt::from(0) + .to_string() + .as_bytes() + .to_vec(), + new_value: BigInt::from(100) + .to_string() + .as_bytes() + .to_vec(), + }, + StoreDelta { + operation: 0, + ordinal: 3, + key: t1_key.clone(), + old_value: BigInt::from(100) + .to_string() + .as_bytes() + .to_vec(), + new_value: BigInt::from(150) + .to_string() + .as_bytes() + .to_vec(), + }, + StoreDelta { + operation: 0, + ordinal: 10, + key: t0_key.clone(), + old_value: BigInt::from(1000) + .to_string() + .as_bytes() + .to_vec(), + new_value: BigInt::from(999) + .to_string() + .as_bytes() + .to_vec(), + }, + ], + } + } + + #[test] + fn test_store_balances() { + let comp_id = "0x42c0ffee" + .to_string() + .as_bytes() + .to_vec(); + let token_0 = hex::decode("bad999").unwrap(); + let token_1 = hex::decode("babe00").unwrap(); + let deltas = block_balance_deltas(); + let store = ::new(); + + store_balance_changes(deltas, store.clone()); + let res_0 = store.get_last(format!( + "{}:{}", + String::from_utf8(comp_id.clone()).unwrap(), + hex::encode(&token_0) + )); + let res_1 = store.get_last(format!( + "{}:{}", + String::from_utf8(comp_id.clone()).unwrap(), + hex::encode(&token_1) + )); + + assert_eq!(res_0, Some(BigInt::from_str("+999").unwrap())); + assert_eq!(res_1, Some(BigInt::from_str("+150").unwrap())); + } + + #[test] + fn test_aggregate_balances_changes() { + let store_deltas = store_deltas(); + let balance_deltas = block_balance_deltas(); + let comp_id = "0x42c0ffee" + .to_string() + .as_bytes() + .to_vec(); + let token_0 = hex::decode("bad999").unwrap(); + let token_1 = hex::decode("babe00").unwrap(); + let exp = [( + vec![0, 1], + [ + ( + token_0.clone(), + BalanceChange { + token: token_0, + balance: BigInt::from(999) + .to_signed_bytes_be() + .to_vec(), + component_id: comp_id.clone(), + }, + ), + ( + token_1.clone(), + BalanceChange { + token: token_1, + balance: vec![150], + component_id: comp_id.clone(), + }, + ), + ] + .into_iter() + .collect(), + )] + .into_iter() + .collect(); + + let res = aggregate_balances_changes(store_deltas, balance_deltas); + dbg!(&res); + + assert_eq!(res, exp); + } +} diff --git a/crates/tycho-substreams/src/contract.rs b/crates/tycho-substreams/src/contract.rs index a0391b3..79ad454 100644 --- a/crates/tycho-substreams/src/contract.rs +++ b/crates/tycho-substreams/src/contract.rs @@ -22,7 +22,7 @@ impl SlotValue { } // Uses a map for slots, protobuf does not allow bytes in hashmap keys -pub struct InterimContractChange { +struct InterimContractChange { address: Vec, balance: Vec, code: Vec, @@ -40,17 +40,14 @@ impl From for tycho::ContractChange { .slots .into_iter() .filter(|(_, value)| value.has_changed()) - .map(|(slot, value)| tycho::ContractSlot { - slot, - value: value.new_value, - }) + .map(|(slot, value)| tycho::ContractSlot { slot, value: value.new_value }) .collect(), change: value.change.into(), } } } -pub fn extract_contract_changes bool >( +pub fn extract_contract_changes bool>( block: ð::v2::Block, inclusion_predicate: F, transaction_contract_changes: &mut HashMap, @@ -69,131 +66,131 @@ pub fn extract_contract_changes bool >( }) .collect(); - block.transactions().for_each(|block_tx| { - let mut storage_changes = Vec::new(); - let mut balance_changes = Vec::new(); - let mut code_changes = Vec::new(); + block + .transactions() + .for_each(|block_tx| { + let mut storage_changes = Vec::new(); + let mut balance_changes = Vec::new(); + let mut code_changes = Vec::new(); - block_tx - .calls - .iter() - .filter(|call| { - !call.state_reverted - && inclusion_predicate(&call.address) - }) - .for_each(|call| { - storage_changes.extend(call.storage_changes.iter()); - balance_changes.extend(call.balance_changes.iter()); - code_changes.extend(call.code_changes.iter()); - }); + block_tx + .calls + .iter() + .filter(|call| !call.state_reverted && inclusion_predicate(&call.address)) + .for_each(|call| { + storage_changes.extend(call.storage_changes.iter()); + balance_changes.extend(call.balance_changes.iter()); + code_changes.extend(call.code_changes.iter()); + }); - storage_changes.sort_unstable_by_key(|change| change.ordinal); - balance_changes.sort_unstable_by_key(|change| change.ordinal); - code_changes.sort_unstable_by_key(|change| change.ordinal); + storage_changes.sort_unstable_by_key(|change| change.ordinal); + balance_changes.sort_unstable_by_key(|change| change.ordinal); + code_changes.sort_unstable_by_key(|change| change.ordinal); - storage_changes - .iter() - .filter(|changes| { - inclusion_predicate(&changes.address) - }) - .for_each(|storage_change| { - let contract_change = changed_contracts - .entry(storage_change.address.clone()) - .or_insert_with(|| InterimContractChange { - address: storage_change.address.clone(), - balance: Vec::new(), - code: Vec::new(), - slots: HashMap::new(), - change: if created_accounts.contains_key(&storage_change.address) { - tycho::ChangeType::Creation - } else { - tycho::ChangeType::Update - }, - }); + storage_changes + .iter() + .filter(|changes| inclusion_predicate(&changes.address)) + .for_each(|storage_change| { + let contract_change = changed_contracts + .entry(storage_change.address.clone()) + .or_insert_with(|| InterimContractChange { + address: storage_change.address.clone(), + balance: Vec::new(), + code: Vec::new(), + slots: HashMap::new(), + change: if created_accounts.contains_key(&storage_change.address) { + tycho::ChangeType::Creation + } else { + tycho::ChangeType::Update + }, + }); - let slot_value = contract_change - .slots - .entry(storage_change.key.clone()) - .or_insert_with(|| SlotValue { - new_value: storage_change.new_value.clone(), - start_value: storage_change.old_value.clone(), - }); + let slot_value = contract_change + .slots + .entry(storage_change.key.clone()) + .or_insert_with(|| SlotValue { + new_value: storage_change.new_value.clone(), + start_value: storage_change.old_value.clone(), + }); - slot_value - .new_value - .copy_from_slice(&storage_change.new_value); - }); + slot_value + .new_value + .copy_from_slice(&storage_change.new_value); + }); - balance_changes - .iter() - .filter(|changes| { - inclusion_predicate(&changes.address) - }) - .for_each(|balance_change| { - let contract_change = changed_contracts - .entry(balance_change.address.clone()) - .or_insert_with(|| InterimContractChange { - address: balance_change.address.clone(), - balance: Vec::new(), - code: Vec::new(), - slots: HashMap::new(), - change: if created_accounts.contains_key(&balance_change.address) { - tycho::ChangeType::Creation - } else { - tycho::ChangeType::Update - }, - }); + balance_changes + .iter() + .filter(|changes| inclusion_predicate(&changes.address)) + .for_each(|balance_change| { + let contract_change = changed_contracts + .entry(balance_change.address.clone()) + .or_insert_with(|| InterimContractChange { + address: balance_change.address.clone(), + balance: Vec::new(), + code: Vec::new(), + slots: HashMap::new(), + change: if created_accounts.contains_key(&balance_change.address) { + tycho::ChangeType::Creation + } else { + tycho::ChangeType::Update + }, + }); - if let Some(new_balance) = &balance_change.new_value { - contract_change.balance.clear(); + if let Some(new_balance) = &balance_change.new_value { + contract_change.balance.clear(); + contract_change + .balance + .extend_from_slice(&new_balance.bytes); + } + }); + + code_changes + .iter() + .filter(|changes| inclusion_predicate(&changes.address)) + .for_each(|code_change| { + let contract_change = changed_contracts + .entry(code_change.address.clone()) + .or_insert_with(|| InterimContractChange { + address: code_change.address.clone(), + balance: Vec::new(), + code: Vec::new(), + slots: HashMap::new(), + change: if created_accounts.contains_key(&code_change.address) { + tycho::ChangeType::Creation + } else { + tycho::ChangeType::Update + }, + }); + + contract_change.code.clear(); contract_change - .balance - .extend_from_slice(&new_balance.bytes); - } - }); + .code + .extend_from_slice(&code_change.new_code); + }); - code_changes - .iter() - .filter(|changes| { - inclusion_predicate(&changes.address) - }) - .for_each(|code_change| { - let contract_change = changed_contracts - .entry(code_change.address.clone()) - .or_insert_with(|| InterimContractChange { - address: code_change.address.clone(), - balance: Vec::new(), - code: Vec::new(), - slots: HashMap::new(), - change: if created_accounts.contains_key(&code_change.address) { - tycho::ChangeType::Creation - } else { - tycho::ChangeType::Update - }, - }); - - contract_change.code.clear(); - contract_change - .code - .extend_from_slice(&code_change.new_code); - }); - - if !storage_changes.is_empty() || !balance_changes.is_empty() || !code_changes.is_empty() { - transaction_contract_changes - .entry(block_tx.index.into()) - .or_insert_with(|| tycho::TransactionContractChanges { - tx: Some(tycho::Transaction { - hash: block_tx.hash.clone(), - from: block_tx.from.clone(), - to: block_tx.to.clone(), - index: block_tx.index as u64, - }), - contract_changes: vec![], - component_changes: vec![], - balance_changes: vec![], - }) - .contract_changes - .extend(changed_contracts.drain().map(|(_, change)| change.into())); - } - }); + if !storage_changes.is_empty() + || !balance_changes.is_empty() + || !code_changes.is_empty() + { + transaction_contract_changes + .entry(block_tx.index.into()) + .or_insert_with(|| tycho::TransactionContractChanges { + tx: Some(tycho::Transaction { + hash: block_tx.hash.clone(), + from: block_tx.from.clone(), + to: block_tx.to.clone(), + index: block_tx.index as u64, + }), + contract_changes: vec![], + component_changes: vec![], + balance_changes: vec![], + }) + .contract_changes + .extend( + changed_contracts + .drain() + .map(|(_, change)| change.into()), + ); + } + }); } diff --git a/crates/tycho-substreams/src/lib.rs b/crates/tycho-substreams/src/lib.rs index 2f52c0e..d5b0d0f 100644 --- a/crates/tycho-substreams/src/lib.rs +++ b/crates/tycho-substreams/src/lib.rs @@ -1,2 +1,6 @@ +pub mod balances; pub mod contract; -pub mod pb; \ No newline at end of file +mod mock_store; +pub mod pb; +// TODO: consider removing this module, after integrating with balancer +pub mod utils; diff --git a/crates/tycho-substreams/src/mock_store.rs b/crates/tycho-substreams/src/mock_store.rs new file mode 100644 index 0000000..a1c0cd2 --- /dev/null +++ b/crates/tycho-substreams/src/mock_store.rs @@ -0,0 +1,93 @@ +//! Contains a mock store for internal testing. +//! +//! Might make this public alter to users can test their store handlers. +use std::cell::RefCell; +use std::collections::HashMap; +use std::rc::Rc; +use substreams::prelude::{BigInt, StoreDelete, StoreGet, StoreNew}; +use substreams::store::StoreAdd; + +#[derive(Debug, Clone)] +pub struct MockStore { + data: Rc>>>, +} + +impl StoreDelete for MockStore { + fn delete_prefix(&self, _ord: i64, prefix: &String) { + self.data + .borrow_mut() + .retain(|k, _| !k.starts_with(prefix)); + } +} + +impl StoreNew for MockStore { + fn new() -> Self { + Self { data: Rc::new(RefCell::new(HashMap::new())) } + } +} + +impl StoreAdd for MockStore { + fn add>(&self, ord: u64, key: K, value: BigInt) { + let mut guard = self.data.borrow_mut(); + guard + .entry(key.as_ref().to_string()) + .and_modify(|v| { + let prev_value = v.last().unwrap().1.clone(); + v.push((ord, prev_value + value.clone())); + }) + .or_insert(vec![(ord, value)]); + } + + fn add_many>(&self, _ord: u64, _keys: &Vec, _value: BigInt) { + todo!() + } +} + +impl StoreGet for MockStore { + fn new(_idx: u32) -> Self { + Self { data: Rc::new(RefCell::new(HashMap::new())) } + } + + fn get_at>(&self, ord: u64, key: K) -> Option { + self.data + .borrow() + .get(&key.as_ref().to_string()) + .map(|v| { + v.iter() + .find(|(current_ord, _)| *current_ord == ord) + .unwrap() + .1 + .clone() + }) + } + + fn get_last>(&self, key: K) -> Option { + self.data + .borrow() + .get(&key.as_ref().to_string()) + .map(|v| v.last().unwrap().1.clone()) + } + + fn get_first>(&self, key: K) -> Option { + self.data + .borrow() + .get(&key.as_ref().to_string()) + .map(|v| v.first().unwrap().1.clone()) + } + + fn has_at>(&self, ord: u64, key: K) -> bool { + self.data + .borrow() + .get(&key.as_ref().to_string()) + .map(|v| v.iter().any(|(v, _)| *v == ord)) + .unwrap_or(false) + } + + fn has_last>(&self, _key: K) -> bool { + todo!() + } + + fn has_first>(&self, _key: K) -> bool { + todo!() + } +} diff --git a/crates/tycho-substreams/src/pb/tycho.evm.v1.rs b/crates/tycho-substreams/src/pb/tycho.evm.v1.rs index 7424ba2..5409eff 100644 --- a/crates/tycho-substreams/src/pb/tycho.evm.v1.rs +++ b/crates/tycho-substreams/src/pb/tycho.evm.v1.rs @@ -107,7 +107,7 @@ pub struct BalanceChange { /// The address of the ERC20 token whose balance changed. #[prost(bytes="vec", tag="1")] pub token: ::prost::alloc::vec::Vec, - /// The new balance of the token. + /// The new balance of the token. Note: it must be a big endian encoded int. #[prost(bytes="vec", tag="2")] pub balance: ::prost::alloc::vec::Vec, /// The id of the component whose TVL is tracked. Note: This MUST be utf8 encoded. @@ -244,6 +244,55 @@ pub struct BlockEntityChanges { #[prost(message, repeated, tag="2")] pub changes: ::prost::alloc::vec::Vec, } +/// A message containing relative balance changes. +/// +/// Used to track token balances of protocol components in case they are only +/// available as relative values within a block. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BalanceDelta { + /// The ordinal of the balance change. Must be unique & deterministic over all balances + /// changes within a block. + #[prost(uint64, tag="1")] + pub ord: u64, + /// The tx hash of the transaction that caused the balance change. + #[prost(message, optional, tag="2")] + pub tx: ::core::option::Option, + /// The address of the ERC20 token whose balance changed. + #[prost(bytes="vec", tag="3")] + pub token: ::prost::alloc::vec::Vec, + /// The delta balance of the token. + #[prost(bytes="vec", tag="4")] + pub delta: ::prost::alloc::vec::Vec, + /// The id of the component whose TVL is tracked. + /// If the protocol component includes multiple contracts, the balance change must be + /// aggregated to reflect how much tokens can be traded. + #[prost(bytes="vec", tag="5")] + pub component_id: ::prost::alloc::vec::Vec, +} +/// A set of balances deltas, usually a group of changes within a single block. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BlockBalanceDeltas { + #[prost(message, repeated, tag="1")] + pub balance_deltas: ::prost::alloc::vec::Vec, +} +/// A message containing protocol components that were created by a single tx. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TransactionProtocolComponents { + #[prost(message, optional, tag="1")] + pub tx: ::core::option::Option, + #[prost(message, repeated, tag="2")] + pub components: ::prost::alloc::vec::Vec, +} +/// All protocol components that were created within a block with their corresponding tx. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BlockTransactionProtocolComponents { + #[prost(message, repeated, tag="1")] + pub tx_components: ::prost::alloc::vec::Vec, +} // This file contains proto definitions specific to the VM integration. /// A key value entry into contract storage. diff --git a/crates/tycho-substreams/src/utils.rs b/crates/tycho-substreams/src/utils.rs new file mode 100644 index 0000000..e4729db --- /dev/null +++ b/crates/tycho-substreams/src/utils.rs @@ -0,0 +1,24 @@ +use crate::pb::tycho::evm::v1::Transaction; + +/// This struct purely exists to spoof the `PartialEq` trait for `Transaction` so we can use it in +/// a later groupby operation. +#[derive(Debug)] +pub struct TransactionWrapper(Transaction); + +impl TransactionWrapper { + pub fn new(tx: Transaction) -> Self { + Self(tx) + } +} + +impl PartialEq for TransactionWrapper { + fn eq(&self, other: &Self) -> bool { + self.0.hash == other.0.hash + } +} + +impl From for Transaction { + fn from(value: TransactionWrapper) -> Self { + value.0 + } +}