Clip balances at 0.
In case we get negative balances, which happens sometimes e.g. in balancer and seems to be due to some rounding issues within the protocol, we simply clip the balance at 0 so we never emit negative balances as tycho-indexer the upstream system will interpret the balance bytes as unsigned integers.
This commit is contained in:
@@ -1,15 +1,26 @@
|
|||||||
//! Utilities to handle relative balances.
|
//! Module for Handling Relative Balance Changes.
|
||||||
//!
|
//!
|
||||||
|
//! This module facilitates the conversion of relative balance changes into absolute balances,
|
||||||
|
//! employing a structured approach to ensure the accurate representation of balance data.
|
||||||
//!
|
//!
|
||||||
//! To aggregate relative balances changes to absolute balances the general approach is:
|
//! Process Overview:
|
||||||
//!
|
//!
|
||||||
//! 1. Use a map function that will extract a `BlockBalanceDeltas` message. BalanceDeltas within
|
//! 1. **Mapping (User-Implemented)**: The initial step requires the user to implement a mapping
|
||||||
//! this message are required to have increasing ordinals so that the order of relative balance
|
//! function that extracts `BlockBalanceDeltas` messages. It's crucial that `BalanceDelta`
|
||||||
//! changes is unambiguous.
|
//! messages within these messages have strictly increasing ordinals, which guarantees the order
|
||||||
//! 2. Store the balances changes with a store handler. You can use the `store_balance_changes`
|
//! of balance changes is preserved and unambiguous. This step is not provided by the SDK and
|
||||||
//! library method directly for this.
|
//! must be custom-implemented to suit the specific protocol.
|
||||||
//! 3. In the output module, use aggregate_balance_changes to receive an aggregated map of absolute
|
//!
|
||||||
//! balances.
|
//! 2. **Storing Changes**: Utilize the `store_balance_changes` function to apply relative balance
|
||||||
|
//! changes. This function handles changes additively, preparing them for final aggregation.
|
||||||
|
//!
|
||||||
|
//! 3. **Aggregation**: Use the `aggregate_balance_changes` function to compile the processed
|
||||||
|
//! changes into a detailed map of absolute balances. This final step produces the comprehensive
|
||||||
|
//! balance data ready for output modules or further analysis.
|
||||||
|
//!
|
||||||
|
//! Through this sequence, the module ensures the transformation from relative to absolute
|
||||||
|
//! balances is conducted with high fidelity, upholding the integrity of transactional data.
|
||||||
|
|
||||||
use crate::pb::tycho::evm::v1::{BalanceChange, BlockBalanceDeltas, Transaction};
|
use crate::pb::tycho::evm::v1::{BalanceChange, BlockBalanceDeltas, Transaction};
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use std::{collections::HashMap, str::FromStr};
|
use std::{collections::HashMap, str::FromStr};
|
||||||
@@ -19,20 +30,29 @@ use substreams::{
|
|||||||
prelude::{BigInt, StoreAdd},
|
prelude::{BigInt, StoreAdd},
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Store relative balances changes in a additive manner.
|
/// Stores relative balance changes in an additive manner.
|
||||||
///
|
///
|
||||||
/// Effectively aggregates the relative balances changes into an absolute balances.
|
/// Aggregates the relative balance changes from a `BlockBalanceDeltas` message into the store
|
||||||
|
/// in an additive way. This function ensures that balance changes are applied correctly
|
||||||
|
/// according to the order specified by their ordinal values. Each token's balance changes
|
||||||
|
/// must have strictly increasing ordinals; otherwise, the function will panic.
|
||||||
|
///
|
||||||
|
/// This method is designed to work in conjunction with `aggregate_balances_changes`,
|
||||||
|
/// which consumes the data stored by this function. The stored data is intended for use
|
||||||
|
/// in a "deltas mode" processing pattern, as described in the
|
||||||
|
/// [Substreams documentation](https://substreams.streamingfast.io/documentation/develop/manifest-modules/types#deltas-mode).
|
||||||
///
|
///
|
||||||
/// ## Arguments
|
/// ## Arguments
|
||||||
|
/// * `deltas` - A `BlockBalanceDeltas` message containing the relative balance changes. It is
|
||||||
|
/// crucial that the relative balance deltas for each token address have strictly increasing
|
||||||
|
/// ordinals; the function will panic otherwise.
|
||||||
|
/// * `store` - An implementation of the `StoreAdd` trait that will be used to add relative balance
|
||||||
|
/// changes. This store should support the addition of `BigInt` values.
|
||||||
///
|
///
|
||||||
/// * `deltas` - A `BlockBalanceDeltas` message containing the relative balances changes. Note:
|
/// ## Panics
|
||||||
/// relative balance deltas must have strictly increasing ordinals per token address, will panic
|
/// This function will panic if:
|
||||||
/// otherwise.
|
/// - The `component_id` of any delta is not valid UTF-8.
|
||||||
/// * `store` - An AddStore that will add relative balance changes.
|
/// - The ordinals for any given token address are not strictly increasing.
|
||||||
///
|
|
||||||
/// This method is meant to be used in combination with `aggregate_balances_changes`
|
|
||||||
/// which consumes the store filled with this methods in
|
|
||||||
/// [deltas mode](https://substreams.streamingfast.io/documentation/develop/manifest-modules/types#deltas-mode).
|
|
||||||
pub fn store_balance_changes(deltas: BlockBalanceDeltas, store: impl StoreAdd<BigInt>) {
|
pub fn store_balance_changes(deltas: BlockBalanceDeltas, store: impl StoreAdd<BigInt>) {
|
||||||
let mut previous_ordinal = HashMap::<String, u64>::new();
|
let mut previous_ordinal = HashMap::<String, u64>::new();
|
||||||
deltas
|
deltas
|
||||||
@@ -71,14 +91,23 @@ type TxAggregatedBalances = HashMap<Vec<u8>, (Transaction, HashMap<Vec<u8>, Bala
|
|||||||
/// * `balance_store` - A `StoreDeltas` with all changes that occured in the source store module.
|
/// * `balance_store` - A `StoreDeltas` with all changes that occured in the source store module.
|
||||||
/// * `deltas` - A `BlockBalanceDeltas` message containing the relative balances changes.
|
/// * `deltas` - A `BlockBalanceDeltas` message containing the relative balances changes.
|
||||||
///
|
///
|
||||||
/// Reads absolute balance values from the additive store (see `store_balance_changes`
|
/// This function reads absolute balance values from an additive store (see `store_balance_changes`
|
||||||
/// on how to create such a store), proceeds to zip them with the relative balance
|
/// for how to create such a store). It zips these values with the relative balance deltas to
|
||||||
/// deltas to associate balance values to token and component.
|
/// associate balance values with tokens and components, ensuring the last balance change per token
|
||||||
|
/// per transaction is kept if there are multiple changes. Negative balances are set to 0, adhering
|
||||||
|
/// to the expectation that absolute balances must be non-negative.
|
||||||
///
|
///
|
||||||
/// Will keep the last balance change per token per transaction if there are multiple
|
/// Will keep the last balance change per token per transaction if there are multiple
|
||||||
/// changes.
|
/// changes. In case a balance ends up being negative, it will be clipped to 0 since
|
||||||
|
/// absolute balances are expected to be either zero or positive.
|
||||||
///
|
///
|
||||||
/// Returns a map of transactions hashes to the full transaction and aggregated
|
/// ## Panics
|
||||||
|
/// May panic if the store deltas values are not in the correct format. Values are
|
||||||
|
/// expected to be utf-8 encoded string integers, which is the default behaviour
|
||||||
|
/// for substreams stores.
|
||||||
|
///
|
||||||
|
/// ## Returns
|
||||||
|
/// A map of transactions hashes to a tuple of `Transaction` and aggregated
|
||||||
/// absolute balance changes.
|
/// absolute balance changes.
|
||||||
pub fn aggregate_balances_changes(
|
pub fn aggregate_balances_changes(
|
||||||
balance_store: StoreDeltas,
|
balance_store: StoreDeltas,
|
||||||
@@ -95,7 +124,13 @@ pub fn aggregate_balances_changes(
|
|||||||
let ascii_string =
|
let ascii_string =
|
||||||
String::from_utf8(store_delta.new_value.clone()).expect("Invalid UTF-8 sequence");
|
String::from_utf8(store_delta.new_value.clone()).expect("Invalid UTF-8 sequence");
|
||||||
let balance = BigInt::from_str(&ascii_string).expect("Failed to parse integer");
|
let balance = BigInt::from_str(&ascii_string).expect("Failed to parse integer");
|
||||||
let big_endian_bytes_balance = balance.to_bytes_be().1;
|
|
||||||
|
// If the absolute balance is negative, we set it to zero.
|
||||||
|
let big_endian_bytes_balance = if balance < BigInt::zero() {
|
||||||
|
BigInt::zero().to_bytes_be().1
|
||||||
|
} else {
|
||||||
|
balance.to_bytes_be().1
|
||||||
|
};
|
||||||
|
|
||||||
(
|
(
|
||||||
balance_delta
|
balance_delta
|
||||||
|
|||||||
@@ -72,19 +72,33 @@ impl From<InterimContractChange> for tycho::ContractChange {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Extracts relevant contract changes from the block.
|
/// Extracts and aggregates contract changes from a block.
|
||||||
///
|
///
|
||||||
/// Contract changes include changes in storage, code and native balance.
|
/// This function identifies and collects changes to contract storage, code, and native balance for
|
||||||
|
/// contracts of interest within a given block. It filters contracts based on a user-defined
|
||||||
|
/// predicate and aggregates changes into a provided mutable map.
|
||||||
///
|
///
|
||||||
/// ## Arguments
|
/// ## Arguments
|
||||||
///
|
/// * `block` - The block from which to extract contract changes, expected to be an extended block
|
||||||
/// * `block` - The block to extract changes from. Must be the extended block model.
|
/// model.
|
||||||
/// * `inclusion_predicate` - A predicate function that determines if a contract address is
|
/// * `inclusion_predicate` - A closure that determines if a contract's address is of interest for
|
||||||
/// relevant.
|
/// the collection of changes. Only contracts satisfying this predicate are included.
|
||||||
/// * `transaction_contract_changes` - A mutable map to store the contract changes in.
|
/// * `transaction_contract_changes` - A mutable reference to a map where extracted contract changes
|
||||||
|
/// are stored. Keyed by transaction index, it aggregates changes into
|
||||||
|
/// `tycho::TransactionContractChanges`.
|
||||||
///
|
///
|
||||||
/// ## Panics
|
/// ## Panics
|
||||||
/// Will panic in case the detail level of the block is not extended.
|
/// Panics if the provided block is not an extended block model, as indicated by its detail level.
|
||||||
|
///
|
||||||
|
/// ## Operation
|
||||||
|
/// The function iterates over transactions and their calls within the block, collecting contract
|
||||||
|
/// changes (storage, balance, code) that pass the inclusion predicate. Changes are then sorted by
|
||||||
|
/// their ordinals to maintain the correct sequence of events. Aggregated changes for each contract
|
||||||
|
/// are stored in `transaction_contract_changes`, categorized by transaction index.
|
||||||
|
///
|
||||||
|
/// Contracts created within the block are tracked to differentiate between new and existing
|
||||||
|
/// contracts. The aggregation process respects transaction boundaries, ensuring that changes are
|
||||||
|
/// mapped accurately to their originating transactions.
|
||||||
pub fn extract_contract_changes<F: Fn(&[u8]) -> bool>(
|
pub fn extract_contract_changes<F: Fn(&[u8]) -> bool>(
|
||||||
block: ð::v2::Block,
|
block: ð::v2::Block,
|
||||||
inclusion_predicate: F,
|
inclusion_predicate: F,
|
||||||
|
|||||||
@@ -43,38 +43,50 @@ impl From<&sf::Block> for Block {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl ProtocolComponent {
|
impl ProtocolComponent {
|
||||||
/// Creates a new empty `ProtocolComponent` instance.
|
/// Constructs a new, empty `ProtocolComponent`.
|
||||||
///
|
///
|
||||||
/// You can use the `with_*` methods to set the fields in a convience way.
|
/// Initializes an instance with default values. Use `with_*` methods to populate fields
|
||||||
|
/// conveniently.
|
||||||
|
///
|
||||||
|
/// ## Parameters
|
||||||
|
/// - `id`: Identifier for the component.
|
||||||
|
/// - `tx`: Reference to the associated transaction.
|
||||||
pub fn new(id: &str, tx: &Transaction) -> Self {
|
pub fn new(id: &str, tx: &Transaction) -> Self {
|
||||||
Self {
|
Self {
|
||||||
id: id.to_string(),
|
id: id.to_string(),
|
||||||
tokens: vec![],
|
tokens: Vec::new(),
|
||||||
contracts: vec![],
|
contracts: Vec::new(),
|
||||||
static_att: vec![],
|
static_att: Vec::new(),
|
||||||
change: ChangeType::Creation.into(),
|
change: ChangeType::Creation.into(),
|
||||||
protocol_type: None,
|
protocol_type: None,
|
||||||
tx: Some(tx.clone()),
|
tx: Some(tx.clone()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Shorthand to create a component with a 1-1 relationship to a contract.
|
/// Initializes a `ProtocolComponent` with a direct association to a contract.
|
||||||
///
|
///
|
||||||
/// Will set the component id to a hex encoded address with a 0x prefix
|
/// Sets the component's ID to the hex-encoded address with a `0x` prefix and includes the
|
||||||
/// and add the contract to contracts attributes.
|
/// contract in the contracts list.
|
||||||
|
///
|
||||||
|
/// ## Parameters
|
||||||
|
/// - `id`: Contract address to be encoded and set as the component's ID.
|
||||||
|
/// - `tx`: Reference to the associated transaction.
|
||||||
pub fn at_contract(id: &[u8], tx: &Transaction) -> Self {
|
pub fn at_contract(id: &[u8], tx: &Transaction) -> Self {
|
||||||
Self {
|
Self {
|
||||||
id: format!("0x{}", hex::encode(id)),
|
id: format!("0x{}", hex::encode(id)),
|
||||||
tokens: vec![],
|
tokens: Vec::new(),
|
||||||
contracts: vec![id.to_vec()],
|
contracts: vec![id.to_vec()],
|
||||||
static_att: vec![],
|
static_att: Vec::new(),
|
||||||
change: ChangeType::Creation.into(),
|
change: ChangeType::Creation.into(),
|
||||||
protocol_type: None,
|
protocol_type: None,
|
||||||
tx: Some(tx.clone()),
|
tx: Some(tx.clone()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Replaces the tokens on this component.
|
/// Updates the tokens associated with this component.
|
||||||
|
///
|
||||||
|
/// ## Parameters
|
||||||
|
/// - `tokens`: Slice of byte slices representing the tokens to associate.
|
||||||
pub fn with_tokens<B: AsRef<[u8]>>(mut self, tokens: &[B]) -> Self {
|
pub fn with_tokens<B: AsRef<[u8]>>(mut self, tokens: &[B]) -> Self {
|
||||||
self.tokens = tokens
|
self.tokens = tokens
|
||||||
.iter()
|
.iter()
|
||||||
@@ -83,7 +95,10 @@ impl ProtocolComponent {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Replaces the contracts associated with this component.
|
/// Updates the contracts associated with this component.
|
||||||
|
///
|
||||||
|
/// ## Parameters
|
||||||
|
/// - `contracts`: Slice of byte slices representing the contracts to associate.
|
||||||
pub fn with_contracts<B: AsRef<[u8]>>(mut self, contracts: &[B]) -> Self {
|
pub fn with_contracts<B: AsRef<[u8]>>(mut self, contracts: &[B]) -> Self {
|
||||||
self.contracts = contracts
|
self.contracts = contracts
|
||||||
.iter()
|
.iter()
|
||||||
@@ -92,9 +107,12 @@ impl ProtocolComponent {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Replaces the static attributes on this component.
|
/// Updates the static attributes of this component.
|
||||||
///
|
///
|
||||||
/// The change type will be set to Creation.
|
/// Sets the change type to `Creation` for all attributes.
|
||||||
|
///
|
||||||
|
/// ## Parameters
|
||||||
|
/// - `attributes`: Slice of key-value pairs representing the attributes to set.
|
||||||
pub fn with_attributes<K: AsRef<str>, V: AsRef<[u8]>>(mut self, attributes: &[(K, V)]) -> Self {
|
pub fn with_attributes<K: AsRef<str>, V: AsRef<[u8]>>(mut self, attributes: &[(K, V)]) -> Self {
|
||||||
self.static_att = attributes
|
self.static_att = attributes
|
||||||
.iter()
|
.iter()
|
||||||
@@ -107,15 +125,19 @@ impl ProtocolComponent {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sets the protocol_type on this component.
|
/// Designates this component as a swap type within the protocol.
|
||||||
///
|
///
|
||||||
/// Will set the `financial_type` to FinancialType::Swap and the
|
/// Sets the `protocol_type` accordingly, including `financial_type` as `Swap` and leaving
|
||||||
/// `attribute_schema` to an empty list.
|
/// `attribute_schema` empty.
|
||||||
|
///
|
||||||
|
/// ## Parameters
|
||||||
|
/// - `name`: The name of the swap protocol.
|
||||||
|
/// - `implementation_type`: The implementation type of the protocol.
|
||||||
pub fn as_swap_type(mut self, name: &str, implementation_type: ImplementationType) -> Self {
|
pub fn as_swap_type(mut self, name: &str, implementation_type: ImplementationType) -> Self {
|
||||||
self.protocol_type = Some(ProtocolType {
|
self.protocol_type = Some(ProtocolType {
|
||||||
name: name.to_string(),
|
name: name.to_string(),
|
||||||
financial_type: FinancialType::Swap.into(),
|
financial_type: FinancialType::Swap.into(),
|
||||||
attribute_schema: vec![],
|
attribute_schema: Vec::new(),
|
||||||
implementation_type: implementation_type.into(),
|
implementation_type: implementation_type.into(),
|
||||||
});
|
});
|
||||||
self
|
self
|
||||||
|
|||||||
Reference in New Issue
Block a user