From 3d248f3fa1e18a95efae60237e789d5a6ae6fba8 Mon Sep 17 00:00:00 2001 From: kayibal Date: Wed, 13 Mar 2024 18:46:10 +0000 Subject: [PATCH] Implement helpers for relative balances. Adds some helpers that will users help to convert relative balances changes to absolute ones. The goal is to make this as reduce the effort of the user to implementing a simple map handler that extracts relative balances changes. --- crates/tycho-substreams/Cargo.lock | 17 +- crates/tycho-substreams/Cargo.toml | 6 +- crates/tycho-substreams/rustfmt.toml | 13 + crates/tycho-substreams/src/balances.rs | 306 ++++++++++++++++++ crates/tycho-substreams/src/contract.rs | 245 +++++++------- crates/tycho-substreams/src/lib.rs | 6 +- crates/tycho-substreams/src/mock_store.rs | 93 ++++++ .../tycho-substreams/src/pb/tycho.evm.v1.rs | 51 ++- crates/tycho-substreams/src/utils.rs | 24 ++ 9 files changed, 632 insertions(+), 129 deletions(-) create mode 100644 crates/tycho-substreams/rustfmt.toml create mode 100644 crates/tycho-substreams/src/balances.rs create mode 100644 crates/tycho-substreams/src/mock_store.rs create mode 100644 crates/tycho-substreams/src/utils.rs diff --git a/crates/tycho-substreams/Cargo.lock b/crates/tycho-substreams/Cargo.lock index 5e61c4a..cd827d8 100644 --- a/crates/tycho-substreams/Cargo.lock +++ b/crates/tycho-substreams/Cargo.lock @@ -333,6 +333,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.10" @@ -530,7 +539,7 @@ checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270" dependencies = [ "bytes", "heck", - "itertools", + "itertools 0.10.5", "lazy_static", "log", "multimap", @@ -551,7 +560,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" dependencies = [ "anyhow", - "itertools", + "itertools 0.10.5", "proc-macro2", "quote", "syn 1.0.109", @@ -908,6 +917,10 @@ dependencies = [ name = "tycho-substreams" version = "0.1.0" dependencies = [ + "hex", + "itertools 0.12.1", + "prost", + "substreams", "substreams-ethereum", ] diff --git a/crates/tycho-substreams/Cargo.toml b/crates/tycho-substreams/Cargo.toml index 269aea3..cc0f7bc 100644 --- a/crates/tycho-substreams/Cargo.toml +++ b/crates/tycho-substreams/Cargo.toml @@ -4,4 +4,8 @@ version = "0.1.0" edition = "2021" [dependencies] -substreams-ethereum = "0.9.9" \ No newline at end of file +substreams-ethereum = "0.9.9" +substreams = "0.5" +prost = "0.11" +hex = "0.4.3" +itertools = "0.12.0" \ No newline at end of file diff --git a/crates/tycho-substreams/rustfmt.toml b/crates/tycho-substreams/rustfmt.toml new file mode 100644 index 0000000..d0c0193 --- /dev/null +++ b/crates/tycho-substreams/rustfmt.toml @@ -0,0 +1,13 @@ +reorder_imports = true +imports_granularity = "Crate" +use_small_heuristics = "Max" +comment_width = 100 +wrap_comments = true +binop_separator = "Back" +trailing_comma = "Vertical" +trailing_semicolon = false +use_field_init_shorthand = true +chain_width = 40 +ignore = [ + "src/pb", +] \ No newline at end of file diff --git a/crates/tycho-substreams/src/balances.rs b/crates/tycho-substreams/src/balances.rs new file mode 100644 index 0000000..dbc780a --- /dev/null +++ b/crates/tycho-substreams/src/balances.rs @@ -0,0 +1,306 @@ +//! Utilities to handle relative balances. +//! +//! +//! To aggregate relative balances changes to absolute balances the general approach is: +//! +//! 1. Use a map function that will extract a `BlockBalanceDeltas` message. BalanceDeltas +//! within this message are required to have increasing ordinals so that +//! the order of relative balance changes is unambiguous. +//! 2. Store the balances changes with a store handler. You can use the +//! `store_balance_changes` library method directly for this. +//! 3. In the output module, use aggregate_balance_changes to receive an +//! aggregated map of absolute balances. +//! +use crate::pb::tycho::evm::v1::{BalanceChange, BlockBalanceDeltas}; +use itertools::Itertools; +use std::collections::HashMap; +use std::str::FromStr; +use substreams::key; +use substreams::pb::substreams::StoreDeltas; +use substreams::prelude::{BigInt, StoreAdd}; + +pub fn store_balance_changes(deltas: BlockBalanceDeltas, store: impl StoreAdd) { + let mut previous_ordinal = HashMap::::new(); + deltas + .balance_deltas + .iter() + .for_each(|delta| { + let balance_key = format!( + "{0}:{1}", + String::from_utf8(delta.component_id.clone()) + .expect("delta.component_id is not valid utf-8!"), + hex::encode(&delta.token) + ); + let current_ord = delta.ord; + previous_ordinal + .entry(balance_key.clone()) + .and_modify(|ord| { + // ordinals must arrive in increasing order + if *ord >= current_ord { + panic!( + "Invalid ordinal sequence for {}: {} >= {}", + balance_key, *ord, current_ord + ); + } + *ord = current_ord; + }) + .or_insert(delta.ord); + store.add(delta.ord, balance_key, BigInt::from_signed_bytes_be(&delta.delta)); + }); +} + +pub fn aggregate_balances_changes( + balance_store: StoreDeltas, + deltas: BlockBalanceDeltas, +) -> HashMap, HashMap, BalanceChange>> { + balance_store + .deltas + .into_iter() + .zip(deltas.balance_deltas) + .map(|(store_delta, balance_delta)| { + let component_id = key::segment_at(&store_delta.key, 0); + let token_id = key::segment_at(&store_delta.key, 1); + // store_delta.new_value is an ASCII string representing an integer + let ascii_string = + String::from_utf8(store_delta.new_value.clone()).expect("Invalid UTF-8 sequence"); + let balance = BigInt::from_str(&ascii_string).expect("Failed to parse integer"); + let big_endian_bytes_balance = balance.to_bytes_be().1; + + ( + balance_delta.tx.unwrap(), + BalanceChange { + token: hex::decode(token_id).expect("Token ID not valid hex"), + balance: big_endian_bytes_balance, + component_id: component_id.as_bytes().to_vec(), + }, + ) + }) + // We need to group the balance changes by tx hash for the `TransactionContractChanges` agg + .group_by(|(tx, _)| tx.hash.clone()) + .into_iter() + .map(|(txh, group)| { + let balances = group + .into_iter() + .map(|(_, delta)| (delta.token.clone(), delta)) + .collect(); + (txh, balances) + }) + .collect() +} + +mod tests { + use super::*; + use crate::mock_store::MockStore; + use crate::pb::tycho::evm::v1::{BalanceDelta, Transaction}; + use substreams::pb::substreams::StoreDelta; + use substreams::prelude::{StoreGet, StoreNew}; + + fn block_balance_deltas() -> BlockBalanceDeltas { + let comp_id = "0x42c0ffee" + .to_string() + .as_bytes() + .to_vec(); + let token_0 = hex::decode("bad999").unwrap(); + let token_1 = hex::decode("babe00").unwrap(); + BlockBalanceDeltas { + balance_deltas: vec![ + BalanceDelta { + ord: 0, + tx: Some(Transaction { + hash: vec![0, 1], + from: vec![9, 9], + to: vec![8, 8], + index: 0, + }), + token: token_0.clone(), + delta: BigInt::from_str("+1000") + .unwrap() + .to_signed_bytes_be(), + component_id: comp_id.clone(), + }, + BalanceDelta { + ord: 2, + tx: Some(Transaction { + hash: vec![0, 1], + from: vec![9, 9], + to: vec![8, 8], + index: 0, + }), + token: token_1.clone(), + delta: BigInt::from_str("+100") + .unwrap() + .to_signed_bytes_be(), + component_id: comp_id.clone(), + }, + BalanceDelta { + ord: 3, + tx: Some(Transaction { + hash: vec![0, 1], + from: vec![9, 9], + to: vec![8, 8], + index: 0, + }), + token: token_1.clone(), + delta: BigInt::from_str("50") + .unwrap() + .to_signed_bytes_be(), + component_id: comp_id.clone(), + }, + BalanceDelta { + ord: 10, + tx: Some(Transaction { + hash: vec![0, 1], + from: vec![9, 9], + to: vec![8, 8], + index: 0, + }), + token: token_0.clone(), + delta: BigInt::from_str("-1") + .unwrap() + .to_signed_bytes_be(), + component_id: comp_id.clone(), + }, + ], + } + } + fn store_deltas() -> StoreDeltas { + let comp_id = "0x42c0ffee" + .to_string() + .as_bytes() + .to_vec(); + let token_0 = hex::decode("bad999").unwrap(); + let token_1 = hex::decode("babe00").unwrap(); + + let t0_key = + format!("{}:{}", String::from_utf8(comp_id.clone()).unwrap(), hex::encode(&token_0)); + let t1_key = + format!("{}:{}", String::from_utf8(comp_id.clone()).unwrap(), hex::encode(&token_1)); + StoreDeltas { + deltas: vec![ + StoreDelta { + operation: 0, + ordinal: 0, + key: t0_key.clone(), + old_value: BigInt::from(0) + .to_string() + .as_bytes() + .to_vec(), + new_value: BigInt::from(1000) + .to_string() + .as_bytes() + .to_vec(), + }, + StoreDelta { + operation: 0, + ordinal: 2, + key: t1_key.clone(), + old_value: BigInt::from(0) + .to_string() + .as_bytes() + .to_vec(), + new_value: BigInt::from(100) + .to_string() + .as_bytes() + .to_vec(), + }, + StoreDelta { + operation: 0, + ordinal: 3, + key: t1_key.clone(), + old_value: BigInt::from(100) + .to_string() + .as_bytes() + .to_vec(), + new_value: BigInt::from(150) + .to_string() + .as_bytes() + .to_vec(), + }, + StoreDelta { + operation: 0, + ordinal: 10, + key: t0_key.clone(), + old_value: BigInt::from(1000) + .to_string() + .as_bytes() + .to_vec(), + new_value: BigInt::from(999) + .to_string() + .as_bytes() + .to_vec(), + }, + ], + } + } + + #[test] + fn test_store_balances() { + let comp_id = "0x42c0ffee" + .to_string() + .as_bytes() + .to_vec(); + let token_0 = hex::decode("bad999").unwrap(); + let token_1 = hex::decode("babe00").unwrap(); + let deltas = block_balance_deltas(); + let store = ::new(); + + store_balance_changes(deltas, store.clone()); + let res_0 = store.get_last(format!( + "{}:{}", + String::from_utf8(comp_id.clone()).unwrap(), + hex::encode(&token_0) + )); + let res_1 = store.get_last(format!( + "{}:{}", + String::from_utf8(comp_id.clone()).unwrap(), + hex::encode(&token_1) + )); + + assert_eq!(res_0, Some(BigInt::from_str("+999").unwrap())); + assert_eq!(res_1, Some(BigInt::from_str("+150").unwrap())); + } + + #[test] + fn test_aggregate_balances_changes() { + let store_deltas = store_deltas(); + let balance_deltas = block_balance_deltas(); + let comp_id = "0x42c0ffee" + .to_string() + .as_bytes() + .to_vec(); + let token_0 = hex::decode("bad999").unwrap(); + let token_1 = hex::decode("babe00").unwrap(); + let exp = [( + vec![0, 1], + [ + ( + token_0.clone(), + BalanceChange { + token: token_0, + balance: BigInt::from(999) + .to_signed_bytes_be() + .to_vec(), + component_id: comp_id.clone(), + }, + ), + ( + token_1.clone(), + BalanceChange { + token: token_1, + balance: vec![150], + component_id: comp_id.clone(), + }, + ), + ] + .into_iter() + .collect(), + )] + .into_iter() + .collect(); + + let res = aggregate_balances_changes(store_deltas, balance_deltas); + dbg!(&res); + + assert_eq!(res, exp); + } +} diff --git a/crates/tycho-substreams/src/contract.rs b/crates/tycho-substreams/src/contract.rs index a0391b3..79ad454 100644 --- a/crates/tycho-substreams/src/contract.rs +++ b/crates/tycho-substreams/src/contract.rs @@ -22,7 +22,7 @@ impl SlotValue { } // Uses a map for slots, protobuf does not allow bytes in hashmap keys -pub struct InterimContractChange { +struct InterimContractChange { address: Vec, balance: Vec, code: Vec, @@ -40,17 +40,14 @@ impl From for tycho::ContractChange { .slots .into_iter() .filter(|(_, value)| value.has_changed()) - .map(|(slot, value)| tycho::ContractSlot { - slot, - value: value.new_value, - }) + .map(|(slot, value)| tycho::ContractSlot { slot, value: value.new_value }) .collect(), change: value.change.into(), } } } -pub fn extract_contract_changes bool >( +pub fn extract_contract_changes bool>( block: ð::v2::Block, inclusion_predicate: F, transaction_contract_changes: &mut HashMap, @@ -69,131 +66,131 @@ pub fn extract_contract_changes bool >( }) .collect(); - block.transactions().for_each(|block_tx| { - let mut storage_changes = Vec::new(); - let mut balance_changes = Vec::new(); - let mut code_changes = Vec::new(); + block + .transactions() + .for_each(|block_tx| { + let mut storage_changes = Vec::new(); + let mut balance_changes = Vec::new(); + let mut code_changes = Vec::new(); - block_tx - .calls - .iter() - .filter(|call| { - !call.state_reverted - && inclusion_predicate(&call.address) - }) - .for_each(|call| { - storage_changes.extend(call.storage_changes.iter()); - balance_changes.extend(call.balance_changes.iter()); - code_changes.extend(call.code_changes.iter()); - }); + block_tx + .calls + .iter() + .filter(|call| !call.state_reverted && inclusion_predicate(&call.address)) + .for_each(|call| { + storage_changes.extend(call.storage_changes.iter()); + balance_changes.extend(call.balance_changes.iter()); + code_changes.extend(call.code_changes.iter()); + }); - storage_changes.sort_unstable_by_key(|change| change.ordinal); - balance_changes.sort_unstable_by_key(|change| change.ordinal); - code_changes.sort_unstable_by_key(|change| change.ordinal); + storage_changes.sort_unstable_by_key(|change| change.ordinal); + balance_changes.sort_unstable_by_key(|change| change.ordinal); + code_changes.sort_unstable_by_key(|change| change.ordinal); - storage_changes - .iter() - .filter(|changes| { - inclusion_predicate(&changes.address) - }) - .for_each(|storage_change| { - let contract_change = changed_contracts - .entry(storage_change.address.clone()) - .or_insert_with(|| InterimContractChange { - address: storage_change.address.clone(), - balance: Vec::new(), - code: Vec::new(), - slots: HashMap::new(), - change: if created_accounts.contains_key(&storage_change.address) { - tycho::ChangeType::Creation - } else { - tycho::ChangeType::Update - }, - }); + storage_changes + .iter() + .filter(|changes| inclusion_predicate(&changes.address)) + .for_each(|storage_change| { + let contract_change = changed_contracts + .entry(storage_change.address.clone()) + .or_insert_with(|| InterimContractChange { + address: storage_change.address.clone(), + balance: Vec::new(), + code: Vec::new(), + slots: HashMap::new(), + change: if created_accounts.contains_key(&storage_change.address) { + tycho::ChangeType::Creation + } else { + tycho::ChangeType::Update + }, + }); - let slot_value = contract_change - .slots - .entry(storage_change.key.clone()) - .or_insert_with(|| SlotValue { - new_value: storage_change.new_value.clone(), - start_value: storage_change.old_value.clone(), - }); + let slot_value = contract_change + .slots + .entry(storage_change.key.clone()) + .or_insert_with(|| SlotValue { + new_value: storage_change.new_value.clone(), + start_value: storage_change.old_value.clone(), + }); - slot_value - .new_value - .copy_from_slice(&storage_change.new_value); - }); + slot_value + .new_value + .copy_from_slice(&storage_change.new_value); + }); - balance_changes - .iter() - .filter(|changes| { - inclusion_predicate(&changes.address) - }) - .for_each(|balance_change| { - let contract_change = changed_contracts - .entry(balance_change.address.clone()) - .or_insert_with(|| InterimContractChange { - address: balance_change.address.clone(), - balance: Vec::new(), - code: Vec::new(), - slots: HashMap::new(), - change: if created_accounts.contains_key(&balance_change.address) { - tycho::ChangeType::Creation - } else { - tycho::ChangeType::Update - }, - }); + balance_changes + .iter() + .filter(|changes| inclusion_predicate(&changes.address)) + .for_each(|balance_change| { + let contract_change = changed_contracts + .entry(balance_change.address.clone()) + .or_insert_with(|| InterimContractChange { + address: balance_change.address.clone(), + balance: Vec::new(), + code: Vec::new(), + slots: HashMap::new(), + change: if created_accounts.contains_key(&balance_change.address) { + tycho::ChangeType::Creation + } else { + tycho::ChangeType::Update + }, + }); - if let Some(new_balance) = &balance_change.new_value { - contract_change.balance.clear(); + if let Some(new_balance) = &balance_change.new_value { + contract_change.balance.clear(); + contract_change + .balance + .extend_from_slice(&new_balance.bytes); + } + }); + + code_changes + .iter() + .filter(|changes| inclusion_predicate(&changes.address)) + .for_each(|code_change| { + let contract_change = changed_contracts + .entry(code_change.address.clone()) + .or_insert_with(|| InterimContractChange { + address: code_change.address.clone(), + balance: Vec::new(), + code: Vec::new(), + slots: HashMap::new(), + change: if created_accounts.contains_key(&code_change.address) { + tycho::ChangeType::Creation + } else { + tycho::ChangeType::Update + }, + }); + + contract_change.code.clear(); contract_change - .balance - .extend_from_slice(&new_balance.bytes); - } - }); + .code + .extend_from_slice(&code_change.new_code); + }); - code_changes - .iter() - .filter(|changes| { - inclusion_predicate(&changes.address) - }) - .for_each(|code_change| { - let contract_change = changed_contracts - .entry(code_change.address.clone()) - .or_insert_with(|| InterimContractChange { - address: code_change.address.clone(), - balance: Vec::new(), - code: Vec::new(), - slots: HashMap::new(), - change: if created_accounts.contains_key(&code_change.address) { - tycho::ChangeType::Creation - } else { - tycho::ChangeType::Update - }, - }); - - contract_change.code.clear(); - contract_change - .code - .extend_from_slice(&code_change.new_code); - }); - - if !storage_changes.is_empty() || !balance_changes.is_empty() || !code_changes.is_empty() { - transaction_contract_changes - .entry(block_tx.index.into()) - .or_insert_with(|| tycho::TransactionContractChanges { - tx: Some(tycho::Transaction { - hash: block_tx.hash.clone(), - from: block_tx.from.clone(), - to: block_tx.to.clone(), - index: block_tx.index as u64, - }), - contract_changes: vec![], - component_changes: vec![], - balance_changes: vec![], - }) - .contract_changes - .extend(changed_contracts.drain().map(|(_, change)| change.into())); - } - }); + if !storage_changes.is_empty() + || !balance_changes.is_empty() + || !code_changes.is_empty() + { + transaction_contract_changes + .entry(block_tx.index.into()) + .or_insert_with(|| tycho::TransactionContractChanges { + tx: Some(tycho::Transaction { + hash: block_tx.hash.clone(), + from: block_tx.from.clone(), + to: block_tx.to.clone(), + index: block_tx.index as u64, + }), + contract_changes: vec![], + component_changes: vec![], + balance_changes: vec![], + }) + .contract_changes + .extend( + changed_contracts + .drain() + .map(|(_, change)| change.into()), + ); + } + }); } diff --git a/crates/tycho-substreams/src/lib.rs b/crates/tycho-substreams/src/lib.rs index 2f52c0e..d5b0d0f 100644 --- a/crates/tycho-substreams/src/lib.rs +++ b/crates/tycho-substreams/src/lib.rs @@ -1,2 +1,6 @@ +pub mod balances; pub mod contract; -pub mod pb; \ No newline at end of file +mod mock_store; +pub mod pb; +// TODO: consider removing this module, after integrating with balancer +pub mod utils; diff --git a/crates/tycho-substreams/src/mock_store.rs b/crates/tycho-substreams/src/mock_store.rs new file mode 100644 index 0000000..a1c0cd2 --- /dev/null +++ b/crates/tycho-substreams/src/mock_store.rs @@ -0,0 +1,93 @@ +//! Contains a mock store for internal testing. +//! +//! Might make this public alter to users can test their store handlers. +use std::cell::RefCell; +use std::collections::HashMap; +use std::rc::Rc; +use substreams::prelude::{BigInt, StoreDelete, StoreGet, StoreNew}; +use substreams::store::StoreAdd; + +#[derive(Debug, Clone)] +pub struct MockStore { + data: Rc>>>, +} + +impl StoreDelete for MockStore { + fn delete_prefix(&self, _ord: i64, prefix: &String) { + self.data + .borrow_mut() + .retain(|k, _| !k.starts_with(prefix)); + } +} + +impl StoreNew for MockStore { + fn new() -> Self { + Self { data: Rc::new(RefCell::new(HashMap::new())) } + } +} + +impl StoreAdd for MockStore { + fn add>(&self, ord: u64, key: K, value: BigInt) { + let mut guard = self.data.borrow_mut(); + guard + .entry(key.as_ref().to_string()) + .and_modify(|v| { + let prev_value = v.last().unwrap().1.clone(); + v.push((ord, prev_value + value.clone())); + }) + .or_insert(vec![(ord, value)]); + } + + fn add_many>(&self, _ord: u64, _keys: &Vec, _value: BigInt) { + todo!() + } +} + +impl StoreGet for MockStore { + fn new(_idx: u32) -> Self { + Self { data: Rc::new(RefCell::new(HashMap::new())) } + } + + fn get_at>(&self, ord: u64, key: K) -> Option { + self.data + .borrow() + .get(&key.as_ref().to_string()) + .map(|v| { + v.iter() + .find(|(current_ord, _)| *current_ord == ord) + .unwrap() + .1 + .clone() + }) + } + + fn get_last>(&self, key: K) -> Option { + self.data + .borrow() + .get(&key.as_ref().to_string()) + .map(|v| v.last().unwrap().1.clone()) + } + + fn get_first>(&self, key: K) -> Option { + self.data + .borrow() + .get(&key.as_ref().to_string()) + .map(|v| v.first().unwrap().1.clone()) + } + + fn has_at>(&self, ord: u64, key: K) -> bool { + self.data + .borrow() + .get(&key.as_ref().to_string()) + .map(|v| v.iter().any(|(v, _)| *v == ord)) + .unwrap_or(false) + } + + fn has_last>(&self, _key: K) -> bool { + todo!() + } + + fn has_first>(&self, _key: K) -> bool { + todo!() + } +} diff --git a/crates/tycho-substreams/src/pb/tycho.evm.v1.rs b/crates/tycho-substreams/src/pb/tycho.evm.v1.rs index 7424ba2..5409eff 100644 --- a/crates/tycho-substreams/src/pb/tycho.evm.v1.rs +++ b/crates/tycho-substreams/src/pb/tycho.evm.v1.rs @@ -107,7 +107,7 @@ pub struct BalanceChange { /// The address of the ERC20 token whose balance changed. #[prost(bytes="vec", tag="1")] pub token: ::prost::alloc::vec::Vec, - /// The new balance of the token. + /// The new balance of the token. Note: it must be a big endian encoded int. #[prost(bytes="vec", tag="2")] pub balance: ::prost::alloc::vec::Vec, /// The id of the component whose TVL is tracked. Note: This MUST be utf8 encoded. @@ -244,6 +244,55 @@ pub struct BlockEntityChanges { #[prost(message, repeated, tag="2")] pub changes: ::prost::alloc::vec::Vec, } +/// A message containing relative balance changes. +/// +/// Used to track token balances of protocol components in case they are only +/// available as relative values within a block. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BalanceDelta { + /// The ordinal of the balance change. Must be unique & deterministic over all balances + /// changes within a block. + #[prost(uint64, tag="1")] + pub ord: u64, + /// The tx hash of the transaction that caused the balance change. + #[prost(message, optional, tag="2")] + pub tx: ::core::option::Option, + /// The address of the ERC20 token whose balance changed. + #[prost(bytes="vec", tag="3")] + pub token: ::prost::alloc::vec::Vec, + /// The delta balance of the token. + #[prost(bytes="vec", tag="4")] + pub delta: ::prost::alloc::vec::Vec, + /// The id of the component whose TVL is tracked. + /// If the protocol component includes multiple contracts, the balance change must be + /// aggregated to reflect how much tokens can be traded. + #[prost(bytes="vec", tag="5")] + pub component_id: ::prost::alloc::vec::Vec, +} +/// A set of balances deltas, usually a group of changes within a single block. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BlockBalanceDeltas { + #[prost(message, repeated, tag="1")] + pub balance_deltas: ::prost::alloc::vec::Vec, +} +/// A message containing protocol components that were created by a single tx. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TransactionProtocolComponents { + #[prost(message, optional, tag="1")] + pub tx: ::core::option::Option, + #[prost(message, repeated, tag="2")] + pub components: ::prost::alloc::vec::Vec, +} +/// All protocol components that were created within a block with their corresponding tx. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BlockTransactionProtocolComponents { + #[prost(message, repeated, tag="1")] + pub tx_components: ::prost::alloc::vec::Vec, +} // This file contains proto definitions specific to the VM integration. /// A key value entry into contract storage. diff --git a/crates/tycho-substreams/src/utils.rs b/crates/tycho-substreams/src/utils.rs new file mode 100644 index 0000000..e4729db --- /dev/null +++ b/crates/tycho-substreams/src/utils.rs @@ -0,0 +1,24 @@ +use crate::pb::tycho::evm::v1::Transaction; + +/// This struct purely exists to spoof the `PartialEq` trait for `Transaction` so we can use it in +/// a later groupby operation. +#[derive(Debug)] +pub struct TransactionWrapper(Transaction); + +impl TransactionWrapper { + pub fn new(tx: Transaction) -> Self { + Self(tx) + } +} + +impl PartialEq for TransactionWrapper { + fn eq(&self, other: &Self) -> bool { + self.0.hash == other.0.hash + } +} + +impl From for Transaction { + fn from(value: TransactionWrapper) -> Self { + value.0 + } +}