diff --git a/substreams/ethereum-curve/src/modules.rs b/substreams/ethereum-curve/src/modules.rs index ca80dcb..0476b17 100644 --- a/substreams/ethereum-curve/src/modules.rs +++ b/substreams/ethereum-curve/src/modules.rs @@ -123,7 +123,15 @@ pub fn map_relative_balances( pools_store: StoreGetInt64, tokens_store: StoreGetString, ) -> Result { - Ok(BlockBalanceDeltas { balance_deltas: emit_deltas(block, pools_store, tokens_store) }) + Ok(BlockBalanceDeltas { + balance_deltas: { + block + .logs() + .filter_map(|log| emit_deltas(log, &pools_store, &tokens_store)) + .flat_map(|deltas| deltas.into_iter()) + .collect::>() + }, + }) } /// It's significant to include both the `pool_id` and the `token_id` for each balance delta as the diff --git a/substreams/ethereum-curve/src/pool_changes.rs b/substreams/ethereum-curve/src/pool_changes.rs index 86a8a8e..5d32ee1 100644 --- a/substreams/ethereum-curve/src/pool_changes.rs +++ b/substreams/ethereum-curve/src/pool_changes.rs @@ -2,11 +2,19 @@ use substreams::{ scalar::BigInt, store::{StoreGet, StoreGetInt64, StoreGetString}, }; -use substreams_ethereum::{block_view::LogView, pb::eth, Event}; +use substreams_ethereum::{block_view::LogView, Event}; use tycho_substreams::prelude::*; use crate::abi; +struct GenericAddLiquidity { + token_amounts: Vec, + provider: String, + fees: BigInt, + invariant: BigInt, + token_supply: BigInt, +} + fn tx_from_log(log: &LogView) -> Transaction { Transaction { hash: log.receipt.transaction.hash.clone(), @@ -16,152 +24,142 @@ fn tx_from_log(log: &LogView) -> Transaction { } } +/// This function emits balance deltas for mints, burns, and exchanges in Curve pools. Since some +/// pools contain differing ABIs, we load in several examples of abis in order to best match the +/// topic ID to the correct event. The repetition in this function is dervived from the fact that +/// most of these events have similar structures, but the specific topic id differs. pub fn emit_deltas( log: LogView, - pools_store: StoreGetInt64, - tokens_store: StoreGetString, -) -> Option> { - if let Some(event) = abi::pool::events::TokenExchange::match_and_decode(log) { - if pools_store - .get_last(format!("pool:{0}", hex::encode(&log.address()))) - .is_none() - { - return None; - } - let tokens_bought_delta: BigInt = event.tokens_bought * -1; - return Some( - vec![ - BalanceDelta { - ord: log.log.ordinal, - tx: Some(tx_from_log(&log)), - token: event.sold_id.to_signed_bytes_be(), - delta: event.tokens_sold.to_signed_bytes_be(), - component_id: log.address().into(), - }, - BalanceDelta { - ord: log.log.ordinal, - tx: Some(tx_from_log(&log)), - token: event.bought_id.to_signed_bytes_be(), - delta: tokens_bought_delta.to_signed_bytes_be(), - component_id: log.address().into(), - }, - ] - .into_iter(), - ) - } else if let Some(event) = abi::pool::events::AddLiquidity::match_and_decode(log) { - let tokens = tokens_store - .get_last(format!("pool:{0}", hex::encode(log.address())))? - .split(":") - .map(|token| token.to_owned()) // Clone the tokens - .collect::>(); - - let deltas: Vec<_> = event - .token_amounts - .iter() - .zip(tokens) - .map(move |(token_amount, token_id)| BalanceDelta { - ord: log.log.ordinal, - tx: Some(tx_from_log(&log)), - token: token_id.into(), - delta: token_amount.to_signed_bytes_be(), - component_id: log.address().into(), - }) - .collect(); - - return Some(deltas.into_iter()); - } else if let Some(event) = abi::pool::events::RemoveLiquidity::match_and_decode(log) { - let tokens = tokens_store - .get_last(format!("pool:{0}", hex::encode(log.address())))? - .split(":") - .map(|token| token.to_owned()) // Clone the tokens - .collect::>(); - - let deltas: Vec<_> = event - .token_amounts - .iter() - .zip(tokens) - .map(move |(token_amount, token_id)| BalanceDelta { - ord: log.log.ordinal, - tx: Some(tx_from_log(&log)), - token: token_id.into(), - delta: token_amount.to_signed_bytes_be(), - component_id: log.address().into(), - }) - .collect(); - - return Some(deltas.into_iter()); + pools_store: &StoreGetInt64, + tokens_store: &StoreGetString, +) -> Option> { + let pool_key = format!("pool:{}", hex::encode(&log.address())); + if pools_store.get_last(pool_key).is_none() { + return None; } - deltas.extend( - block - .logs() - .filter_map(|log| { - let event = abi::pool::events::AddLiquidity::match_and_decode(log)?; - Some((log, event)) - }) - .filter_map(|(log, event)| { - let tokens = tokens_store - .get_last(format!("pool:{0}", hex::encode(log.address())))? - .split(":") - .map(|token| token.to_owned()) // Clone the tokens - .collect::>(); + let tokens = tokens_store + .get_last(format!("pool:{}", hex::encode(log.address())))? + .split(":") + .map(|token| token.to_owned()) + .collect::>(); - Some((tokens, log, event)) - }) - .flat_map(|(tokens, log, event)| { - event - .token_amounts - .iter() - .zip(tokens) - .map(move |(token_amount, token_id)| BalanceDelta { - ord: log.log.ordinal, - tx: Some(tx_from_log(&log)), - token: token_id.into(), - delta: token_amount.to_signed_bytes_be(), - component_id: log.address().into(), - }) - .collect::>() - }) - .collect::>(), - ); - - deltas.extend( - block - .logs() - .filter_map(|log| { - let event = abi::pool::events::RemoveLiquidity::match_and_decode(log)?; - Some((log, event)) - }) - .filter(|(log, _)| { - pools_store - .get_last(format!("pool:{0}", hex::encode(&log.address()))) - .is_none() - }) - .flat_map(|(log, event)| { - let tokens = tokens_store - .get_last(format!("pool:{}", hex::encode(log.address()))) - .unwrap() - .split(":") - .map(|token| token.to_owned()) // Clone the tokens - .collect::>(); - - event - .token_amounts - .iter() - .zip(tokens) - .map(move |(token_amount, token_id)| { - let negative_token_amount: BigInt = token_amount * BigInt::from(-1); - BalanceDelta { - ord: log.log.ordinal, - tx: Some(tx_from_log(&log)), - token: token_id.into(), - delta: negative_token_amount.to_signed_bytes_be(), - component_id: log.address().into(), - } - }) - .collect::>() - }) - .collect::>(), - ); - deltas + if let Some(event) = abi::pool::events::TokenExchange::match_and_decode(log) { + return token_change_deltas(event, log); + } else if let Some(event) = abi::pool_3pool::events::TokenExchange::match_and_decode(log) { + return token_change_deltas( + abi::pool::events::TokenExchange { + sold_id: event.sold_id, + bought_id: event.bought_id, + tokens_sold: event.tokens_sold, + tokens_bought: event.tokens_bought, + buyer: event.buyer, + }, + log, + ); + } else if let Some(event) = abi::pool_steth::events::TokenExchange::match_and_decode(log) { + return token_change_deltas( + abi::pool::events::TokenExchange { + sold_id: event.sold_id, + bought_id: event.bought_id, + tokens_sold: event.tokens_sold, + tokens_bought: event.tokens_bought, + buyer: event.buyer, + }, + log, + ); + } else if let Some(event) = abi::pool_tricrypto::events::TokenExchange::match_and_decode(log) { + return token_change_deltas( + abi::pool::events::TokenExchange { + sold_id: event.sold_id, + bought_id: event.bought_id, + tokens_sold: event.tokens_sold, + tokens_bought: event.tokens_bought, + buyer: event.buyer, + }, + log, + ); + } else if let Some(event) = abi::pool::events::AddLiquidity::match_and_decode(log) { + return add_liquidity_deltas(event.token_amounts.into(), &tokens, log); + } else if let Some(event) = abi::pool_3pool::events::AddLiquidity::match_and_decode(log) { + return add_liquidity_deltas(event.token_amounts.into(), &tokens, log) + } else if let Some(event) = abi::pool_steth::events::AddLiquidity::match_and_decode(log) { + return add_liquidity_deltas(event.token_amounts.into(), &tokens, log) + } else if let Some(event) = abi::pool_tricrypto::events::AddLiquidity::match_and_decode(log) { + return add_liquidity_deltas(event.token_amounts.into(), &tokens, log) + } else if let Some(event) = abi::pool::events::RemoveLiquidity::match_and_decode(log) { + return add_liquidity_deltas(event.token_amounts.into(), &tokens, log) + } else if let Some(event) = abi::pool_3pool::events::RemoveLiquidity::match_and_decode(log) { + return add_liquidity_deltas(event.token_amounts.into(), &tokens, log) + } else if let Some(event) = abi::pool_steth::events::RemoveLiquidity::match_and_decode(log) { + return add_liquidity_deltas(event.token_amounts.into(), &tokens, log) + } else if let Some(event) = abi::pool_tricrypto::events::RemoveLiquidity::match_and_decode(log) + { + return add_liquidity_deltas(event.token_amounts.into(), &tokens, log) + } else { + None + } +} + +fn token_change_deltas( + event: abi::pool::events::TokenExchange, + log: LogView<'_>, +) -> Option> { + let tokens_bought_delta: BigInt = event.tokens_bought * -1; + Some(vec![ + BalanceDelta { + ord: log.log.ordinal, + tx: Some(tx_from_log(&log)), + token: event.sold_id.to_signed_bytes_be(), + delta: event.tokens_sold.to_signed_bytes_be(), + component_id: log.address().into(), + }, + BalanceDelta { + ord: log.log.ordinal, + tx: Some(tx_from_log(&log)), + token: event.bought_id.to_signed_bytes_be(), + delta: tokens_bought_delta.to_signed_bytes_be(), + component_id: log.address().into(), + }, + ]) +} + +fn add_liquidity_deltas( + amounts: Vec, + tokens: &Vec, + log: LogView<'_>, +) -> Option> { + Some( + amounts + .iter() + .zip(tokens) + .map(move |(token_amount, token_id)| BalanceDelta { + ord: log.log.ordinal, + tx: Some(tx_from_log(&log)), + token: token_id.as_str().into(), + delta: token_amount.to_signed_bytes_be(), + component_id: log.address().into(), + }) + .collect::>(), + ) +} + +fn remove_liquidity_deltas( + amounts: Vec, + tokens: &Vec, + log: LogView<'_>, +) -> Option> { + Some( + amounts + .iter() + .zip(tokens) + .map(move |(token_amount, token_id)| BalanceDelta { + ord: log.log.ordinal, + tx: Some(tx_from_log(&log)), + token: token_id.as_str().into(), + delta: token_amount.to_signed_bytes_be(), + component_id: log.address().into(), + }) + .collect::>(), + ) }