From e70d5f0e86320abf9def12ffa5da14ed31aab884 Mon Sep 17 00:00:00 2001 From: kayibal Date: Thu, 14 Mar 2024 11:04:41 +0000 Subject: [PATCH] Clip balances at 0. In case we get negative balances, which happens sometimes e.g. in balancer and seems to be due to some rounding issues within the protocol, we simply clip the balance at 0 so we never emit negative balances as tycho-indexer the upstream system will interpret the balance bytes as unsigned integers. --- .../crates/tycho-substreams/src/balances.rs | 85 +++++++++++++------ .../crates/tycho-substreams/src/contract.rs | 30 +++++-- .../crates/tycho-substreams/src/models.rs | 58 +++++++++---- 3 files changed, 122 insertions(+), 51 deletions(-) diff --git a/substreams/crates/tycho-substreams/src/balances.rs b/substreams/crates/tycho-substreams/src/balances.rs index 862e1a2..707f60d 100644 --- a/substreams/crates/tycho-substreams/src/balances.rs +++ b/substreams/crates/tycho-substreams/src/balances.rs @@ -1,15 +1,26 @@ -//! Utilities to handle relative balances. +//! Module for Handling Relative Balance Changes. //! +//! This module facilitates the conversion of relative balance changes into absolute balances, +//! employing a structured approach to ensure the accurate representation of balance data. //! -//! To aggregate relative balances changes to absolute balances the general approach is: +//! Process Overview: //! -//! 1. Use a map function that will extract a `BlockBalanceDeltas` message. BalanceDeltas within -//! this message are required to have increasing ordinals so that the order of relative balance -//! changes is unambiguous. -//! 2. Store the balances changes with a store handler. You can use the `store_balance_changes` -//! library method directly for this. -//! 3. In the output module, use aggregate_balance_changes to receive an aggregated map of absolute -//! balances. +//! 1. **Mapping (User-Implemented)**: The initial step requires the user to implement a mapping +//! function that extracts `BlockBalanceDeltas` messages. It's crucial that `BalanceDelta` +//! messages within these messages have strictly increasing ordinals, which guarantees the order +//! of balance changes is preserved and unambiguous. This step is not provided by the SDK and +//! must be custom-implemented to suit the specific protocol. +//! +//! 2. **Storing Changes**: Utilize the `store_balance_changes` function to apply relative balance +//! changes. This function handles changes additively, preparing them for final aggregation. +//! +//! 3. **Aggregation**: Use the `aggregate_balance_changes` function to compile the processed +//! changes into a detailed map of absolute balances. This final step produces the comprehensive +//! balance data ready for output modules or further analysis. +//! +//! Through this sequence, the module ensures the transformation from relative to absolute +//! balances is conducted with high fidelity, upholding the integrity of transactional data. + use crate::pb::tycho::evm::v1::{BalanceChange, BlockBalanceDeltas, Transaction}; use itertools::Itertools; use std::{collections::HashMap, str::FromStr}; @@ -19,20 +30,29 @@ use substreams::{ prelude::{BigInt, StoreAdd}, }; -/// Store relative balances changes in a additive manner. +/// Stores relative balance changes in an additive manner. /// -/// Effectively aggregates the relative balances changes into an absolute balances. +/// Aggregates the relative balance changes from a `BlockBalanceDeltas` message into the store +/// in an additive way. This function ensures that balance changes are applied correctly +/// according to the order specified by their ordinal values. Each token's balance changes +/// must have strictly increasing ordinals; otherwise, the function will panic. +/// +/// This method is designed to work in conjunction with `aggregate_balances_changes`, +/// which consumes the data stored by this function. The stored data is intended for use +/// in a "deltas mode" processing pattern, as described in the +/// [Substreams documentation](https://substreams.streamingfast.io/documentation/develop/manifest-modules/types#deltas-mode). /// /// ## Arguments +/// * `deltas` - A `BlockBalanceDeltas` message containing the relative balance changes. It is +/// crucial that the relative balance deltas for each token address have strictly increasing +/// ordinals; the function will panic otherwise. +/// * `store` - An implementation of the `StoreAdd` trait that will be used to add relative balance +/// changes. This store should support the addition of `BigInt` values. /// -/// * `deltas` - A `BlockBalanceDeltas` message containing the relative balances changes. Note: -/// relative balance deltas must have strictly increasing ordinals per token address, will panic -/// otherwise. -/// * `store` - An AddStore that will add relative balance changes. -/// -/// This method is meant to be used in combination with `aggregate_balances_changes` -/// which consumes the store filled with this methods in -/// [deltas mode](https://substreams.streamingfast.io/documentation/develop/manifest-modules/types#deltas-mode). +/// ## Panics +/// This function will panic if: +/// - The `component_id` of any delta is not valid UTF-8. +/// - The ordinals for any given token address are not strictly increasing. pub fn store_balance_changes(deltas: BlockBalanceDeltas, store: impl StoreAdd) { let mut previous_ordinal = HashMap::::new(); deltas @@ -71,14 +91,23 @@ type TxAggregatedBalances = HashMap, (Transaction, HashMap, Bala /// * `balance_store` - A `StoreDeltas` with all changes that occured in the source store module. /// * `deltas` - A `BlockBalanceDeltas` message containing the relative balances changes. /// -/// Reads absolute balance values from the additive store (see `store_balance_changes` -/// on how to create such a store), proceeds to zip them with the relative balance -/// deltas to associate balance values to token and component. +/// This function reads absolute balance values from an additive store (see `store_balance_changes` +/// for how to create such a store). It zips these values with the relative balance deltas to +/// associate balance values with tokens and components, ensuring the last balance change per token +/// per transaction is kept if there are multiple changes. Negative balances are set to 0, adhering +/// to the expectation that absolute balances must be non-negative. /// /// Will keep the last balance change per token per transaction if there are multiple -/// changes. +/// changes. In case a balance ends up being negative, it will be clipped to 0 since +/// absolute balances are expected to be either zero or positive. /// -/// Returns a map of transactions hashes to the full transaction and aggregated +/// ## Panics +/// May panic if the store deltas values are not in the correct format. Values are +/// expected to be utf-8 encoded string integers, which is the default behaviour +/// for substreams stores. +/// +/// ## Returns +/// A map of transactions hashes to a tuple of `Transaction` and aggregated /// absolute balance changes. pub fn aggregate_balances_changes( balance_store: StoreDeltas, @@ -95,7 +124,13 @@ pub fn aggregate_balances_changes( let ascii_string = String::from_utf8(store_delta.new_value.clone()).expect("Invalid UTF-8 sequence"); let balance = BigInt::from_str(&ascii_string).expect("Failed to parse integer"); - let big_endian_bytes_balance = balance.to_bytes_be().1; + + // If the absolute balance is negative, we set it to zero. + let big_endian_bytes_balance = if balance < BigInt::zero() { + BigInt::zero().to_bytes_be().1 + } else { + balance.to_bytes_be().1 + }; ( balance_delta diff --git a/substreams/crates/tycho-substreams/src/contract.rs b/substreams/crates/tycho-substreams/src/contract.rs index a99fd60..250da36 100644 --- a/substreams/crates/tycho-substreams/src/contract.rs +++ b/substreams/crates/tycho-substreams/src/contract.rs @@ -72,19 +72,33 @@ impl From for tycho::ContractChange { } } -/// Extracts relevant contract changes from the block. +/// Extracts and aggregates contract changes from a block. /// -/// Contract changes include changes in storage, code and native balance. +/// This function identifies and collects changes to contract storage, code, and native balance for +/// contracts of interest within a given block. It filters contracts based on a user-defined +/// predicate and aggregates changes into a provided mutable map. /// /// ## Arguments -/// -/// * `block` - The block to extract changes from. Must be the extended block model. -/// * `inclusion_predicate` - A predicate function that determines if a contract address is -/// relevant. -/// * `transaction_contract_changes` - A mutable map to store the contract changes in. +/// * `block` - The block from which to extract contract changes, expected to be an extended block +/// model. +/// * `inclusion_predicate` - A closure that determines if a contract's address is of interest for +/// the collection of changes. Only contracts satisfying this predicate are included. +/// * `transaction_contract_changes` - A mutable reference to a map where extracted contract changes +/// are stored. Keyed by transaction index, it aggregates changes into +/// `tycho::TransactionContractChanges`. /// /// ## Panics -/// Will panic in case the detail level of the block is not extended. +/// Panics if the provided block is not an extended block model, as indicated by its detail level. +/// +/// ## Operation +/// The function iterates over transactions and their calls within the block, collecting contract +/// changes (storage, balance, code) that pass the inclusion predicate. Changes are then sorted by +/// their ordinals to maintain the correct sequence of events. Aggregated changes for each contract +/// are stored in `transaction_contract_changes`, categorized by transaction index. +/// +/// Contracts created within the block are tracked to differentiate between new and existing +/// contracts. The aggregation process respects transaction boundaries, ensuring that changes are +/// mapped accurately to their originating transactions. pub fn extract_contract_changes bool>( block: ð::v2::Block, inclusion_predicate: F, diff --git a/substreams/crates/tycho-substreams/src/models.rs b/substreams/crates/tycho-substreams/src/models.rs index 14ab6f5..fa0fa07 100644 --- a/substreams/crates/tycho-substreams/src/models.rs +++ b/substreams/crates/tycho-substreams/src/models.rs @@ -43,38 +43,50 @@ impl From<&sf::Block> for Block { } impl ProtocolComponent { - /// Creates a new empty `ProtocolComponent` instance. + /// Constructs a new, empty `ProtocolComponent`. /// - /// You can use the `with_*` methods to set the fields in a convience way. + /// Initializes an instance with default values. Use `with_*` methods to populate fields + /// conveniently. + /// + /// ## Parameters + /// - `id`: Identifier for the component. + /// - `tx`: Reference to the associated transaction. pub fn new(id: &str, tx: &Transaction) -> Self { Self { id: id.to_string(), - tokens: vec![], - contracts: vec![], - static_att: vec![], + tokens: Vec::new(), + contracts: Vec::new(), + static_att: Vec::new(), change: ChangeType::Creation.into(), protocol_type: None, tx: Some(tx.clone()), } } - /// Shorthand to create a component with a 1-1 relationship to a contract. + /// Initializes a `ProtocolComponent` with a direct association to a contract. /// - /// Will set the component id to a hex encoded address with a 0x prefix - /// and add the contract to contracts attributes. + /// Sets the component's ID to the hex-encoded address with a `0x` prefix and includes the + /// contract in the contracts list. + /// + /// ## Parameters + /// - `id`: Contract address to be encoded and set as the component's ID. + /// - `tx`: Reference to the associated transaction. pub fn at_contract(id: &[u8], tx: &Transaction) -> Self { Self { id: format!("0x{}", hex::encode(id)), - tokens: vec![], + tokens: Vec::new(), contracts: vec![id.to_vec()], - static_att: vec![], + static_att: Vec::new(), change: ChangeType::Creation.into(), protocol_type: None, tx: Some(tx.clone()), } } - /// Replaces the tokens on this component. + /// Updates the tokens associated with this component. + /// + /// ## Parameters + /// - `tokens`: Slice of byte slices representing the tokens to associate. pub fn with_tokens>(mut self, tokens: &[B]) -> Self { self.tokens = tokens .iter() @@ -83,7 +95,10 @@ impl ProtocolComponent { self } - /// Replaces the contracts associated with this component. + /// Updates the contracts associated with this component. + /// + /// ## Parameters + /// - `contracts`: Slice of byte slices representing the contracts to associate. pub fn with_contracts>(mut self, contracts: &[B]) -> Self { self.contracts = contracts .iter() @@ -92,9 +107,12 @@ impl ProtocolComponent { self } - /// Replaces the static attributes on this component. + /// Updates the static attributes of this component. /// - /// The change type will be set to Creation. + /// Sets the change type to `Creation` for all attributes. + /// + /// ## Parameters + /// - `attributes`: Slice of key-value pairs representing the attributes to set. pub fn with_attributes, V: AsRef<[u8]>>(mut self, attributes: &[(K, V)]) -> Self { self.static_att = attributes .iter() @@ -107,15 +125,19 @@ impl ProtocolComponent { self } - /// Sets the protocol_type on this component. + /// Designates this component as a swap type within the protocol. /// - /// Will set the `financial_type` to FinancialType::Swap and the - /// `attribute_schema` to an empty list. + /// Sets the `protocol_type` accordingly, including `financial_type` as `Swap` and leaving + /// `attribute_schema` empty. + /// + /// ## Parameters + /// - `name`: The name of the swap protocol. + /// - `implementation_type`: The implementation type of the protocol. pub fn as_swap_type(mut self, name: &str, implementation_type: ImplementationType) -> Self { self.protocol_type = Some(ProtocolType { name: name.to_string(), financial_type: FinancialType::Swap.into(), - attribute_schema: vec![], + attribute_schema: Vec::new(), implementation_type: implementation_type.into(), }); self