Merge pull request #100 from propeller-heads/zz/substreams/fix-balance-aggregation-bug

fix(substreams-sdk): fix a bug in `aggregate_balances_changes`
This commit is contained in:
Zizou
2024-10-25 16:20:23 +02:00
committed by GitHub
4 changed files with 4733 additions and 4085 deletions

View File

@@ -88,7 +88,8 @@ pub fn store_balance_changes(deltas: BlockBalanceDeltas, store: impl StoreAdd<Bi
}); });
} }
type TxAggregatedBalances = HashMap<Vec<u8>, (Transaction, HashMap<Vec<u8>, BalanceChange>)>; type TxAggregatedBalances =
HashMap<Vec<u8>, (Transaction, HashMap<Vec<u8>, HashMap<Vec<u8>, BalanceChange>>)>;
/// Aggregates absolute balances per transaction and token. /// Aggregates absolute balances per transaction and token.
/// ///
@@ -98,13 +99,14 @@ type TxAggregatedBalances = HashMap<Vec<u8>, (Transaction, HashMap<Vec<u8>, Bala
/// ///
/// This function reads absolute balance values from an additive store (see `store_balance_changes` /// This function reads absolute balance values from an additive store (see `store_balance_changes`
/// for how to create such a store). It zips these values with the relative balance deltas to /// for how to create such a store). It zips these values with the relative balance deltas to
/// associate balance values with tokens and components, ensuring the last balance change per token /// associate balance values with tokens and components, ensuring the last balance change for each
/// per transaction is kept if there are multiple changes. Negative balances are set to 0, adhering /// unique combination of component, token, and transaction is kept if there are multiple changes.
/// to the expectation that absolute balances must be non-negative. /// Negative balances are set to 0, adhering to the expectation that absolute balances must be
/// non-negative.
/// ///
/// Will keep the last balance change per token per transaction if there are multiple /// Will keep the last balance change for each unique combination of component, token, and
/// changes. In case a balance ends up being negative, it will be clipped to 0 since /// transaction if there are multiple changes. In case a balance ends up being negative, it will be
/// absolute balances are expected to be either zero or positive. /// clipped to 0 since absolute balances are expected to be either zero or positive.
/// ///
/// ## Panics /// ## Panics
/// May panic if the store deltas values are not in the correct format. Values are /// May panic if the store deltas values are not in the correct format. Values are
@@ -153,11 +155,16 @@ pub fn aggregate_balances_changes(
.into_iter() .into_iter()
.map(|(txh, group)| { .map(|(txh, group)| {
let (mut transactions, balance_changes): (Vec<_>, Vec<_>) = group.into_iter().unzip(); let (mut transactions, balance_changes): (Vec<_>, Vec<_>) = group.into_iter().unzip();
let mut balances: HashMap<Vec<u8>, HashMap<Vec<u8>, BalanceChange>> = HashMap::new();
let balances = balance_changes for balance_change in balance_changes.into_iter() {
.into_iter() let component_entry = balances
.map(|balance_change| (balance_change.token.clone(), balance_change)) .entry(balance_change.component_id.clone())
.collect(); .or_default();
// Insert or overwrite the balance change for the specific token
component_entry.insert(balance_change.token.clone(), balance_change);
}
(txh, (transactions.pop().unwrap(), balances)) (txh, (transactions.pop().unwrap(), balances))
}) })
.collect() .collect()
@@ -420,26 +427,31 @@ mod tests {
vec![0, 1], vec![0, 1],
( (
Transaction { hash: vec![0, 1], from: vec![9, 9], to: vec![8, 8], index: 0 }, Transaction { hash: vec![0, 1], from: vec![9, 9], to: vec![8, 8], index: 0 },
[ [(
( comp_id.clone(),
token_0.clone(), [
BalanceChange { (
token: token_0, token_0.clone(),
balance: BigInt::from(999) BalanceChange {
.to_signed_bytes_be() token: token_0,
.to_vec(), balance: BigInt::from(999)
component_id: comp_id.clone(), .to_signed_bytes_be()
}, .to_vec(),
), component_id: comp_id.clone(),
( },
token_1.clone(), ),
BalanceChange { (
token: token_1, token_1.clone(),
balance: vec![150], BalanceChange {
component_id: comp_id.clone(), token: token_1,
}, balance: vec![150],
), component_id: comp_id.clone(),
] },
),
]
.into_iter()
.collect::<HashMap<_, _>>(),
)]
.into_iter() .into_iter()
.collect::<HashMap<_, _>>(), .collect::<HashMap<_, _>>(),
), ),

View File

@@ -208,7 +208,11 @@ pub fn map_protocol_changes(
.or_insert_with(|| TransactionChangesBuilder::new(&tx)); .or_insert_with(|| TransactionChangesBuilder::new(&tx));
balances balances
.values() .values()
.for_each(|bc| builder.add_balance_change(bc)); .for_each(|token_bc_map| {
token_bc_map
.values()
.for_each(|bc| builder.add_balance_change(bc))
});
}); });
// Extract and insert any storage changes that happened for any of the components. // Extract and insert any storage changes that happened for any of the components.

File diff suppressed because it is too large Load Diff

View File

@@ -305,9 +305,13 @@ pub fn map_protocol_changes(
let builder = transaction_changes let builder = transaction_changes
.entry(tx.index) .entry(tx.index)
.or_insert_with(|| TransactionChangesBuilder::new(&tx)); .or_insert_with(|| TransactionChangesBuilder::new(&tx));
balances.values().for_each(|bc| { balances
builder.add_balance_change(bc); .values()
}); .for_each(|token_bc_map| {
token_bc_map
.values()
.for_each(|bc| builder.add_balance_change(bc))
});
}); });
// Extract and insert any storage changes that happened for any of the components. // Extract and insert any storage changes that happened for any of the components.