diff --git a/substreams/ethereum-curve/src/modules.rs b/substreams/ethereum-curve/src/modules.rs index 2fdd73b..2a47f46 100644 --- a/substreams/ethereum-curve/src/modules.rs +++ b/substreams/ethereum-curve/src/modules.rs @@ -3,9 +3,9 @@ use std::collections::HashMap; use anyhow::Result; use substreams::pb::substreams::StoreDeltas; use substreams::store::{ - StoreAdd, StoreAddBigInt, StoreAddInt64, StoreGet, StoreGetInt64, StoreNew, + StoreAdd, StoreAddBigInt, StoreAddInt64, StoreGet, StoreGetInt64, StoreGetString, StoreNew, + StoreSet, StoreSetString, }; -use substreams::{hex, log}; use substreams::key; use substreams::scalar::BigInt; @@ -96,45 +96,141 @@ pub fn store_pools_created(map: tycho::GroupedTransactionProtocolComponents, sto ); } +/// Simply stores the `ProtocolComponent`s with the pool id as the key +#[substreams::handlers::store] +pub fn store_pool_tokens(map: tycho::GroupedTransactionProtocolComponents, store: StoreSetString) { + &map.tx_components + .iter() + .flat_map(|tx_components| &tx_components.components) + .for_each(|component| { + store.set( + 0, + format!("pool:{0}", component.id), + &component + .tokens + .iter() + .map(|token| hex::encode(token)) + .join(":".into()), + ); + }); +} + /// Since the `PoolBalanceChanged` events administer only deltas, we need to leverage a map and a /// store to be able to tally up final balances for tokens in a pool. #[substreams::handlers::map] pub fn map_balance_deltas( block: eth::v2::Block, store: StoreGetInt64, + tokens_store: StoreGetString, ) -> Result { - Ok(tycho::BalanceDeltas { - balance_deltas: block + let mut deltas = block + .logs() + .filter_map(|log| { + let event = abi::pool::events::TokenExchange::match_and_decode(log)?; + Some((log, event)) + }) + .filter(|(log, _)| { + store + .get_last(format!("pool:{0}", hex::encode(&log.address()))) + .is_some() + }) + .flat_map(|(log, event)| { + let tokens_bought_delta: BigInt = event.tokens_bought * -1; + vec![ + tycho::BalanceDelta { + ord: log.log.ordinal, + tx: Some(tx_from_log(&log)), + token: event.sold_id.to_signed_bytes_be(), + delta: event.tokens_sold.to_signed_bytes_be(), + component_id: log.address().into(), + }, + tycho::BalanceDelta { + ord: log.log.ordinal, + tx: Some(tx_from_log(&log)), + token: event.bought_id.to_signed_bytes_be(), + delta: tokens_bought_delta.to_signed_bytes_be(), + component_id: log.address().into(), + }, + ] + }) + .collect::>(); + + deltas.extend( + block .logs() .filter_map(|log| { - let event = abi::pool::events::TokenExchange::match_and_decode(log)?; + let event = abi::pool::events::AddLiquidity::match_and_decode(log)?; Some((log, event)) }) .filter(|(log, _)| { store .get_last(format!("pool:{0}", hex::encode(&log.address()))) - .is_some() + .is_none() }) .flat_map(|(log, event)| { - let tokens_bought_delta: BigInt = event.tokens_bought * -1; - vec![ - tycho::BalanceDelta { + let tokens = tokens_store + .get_last(format!("pool:{}", hex::encode(log.address()))) + .unwrap() + .split(":") + .map(|token| token.to_owned()) // Clone the tokens + .collect::>(); + event + .token_amounts + .iter() + .zip(tokens) + .map(move |(token_amount, token_id)| tycho::BalanceDelta { ord: log.log.ordinal, tx: Some(tx_from_log(&log)), - token: event.sold_id.to_signed_bytes_be(), - delta: event.tokens_sold.to_signed_bytes_be(), + token: token_id.into(), + delta: token_amount.to_signed_bytes_be(), component_id: log.address().into(), - }, - tycho::BalanceDelta { - ord: log.log.ordinal, - tx: Some(tx_from_log(&log)), - token: event.bought_id.to_signed_bytes_be(), - delta: tokens_bought_delta.to_signed_bytes_be(), - component_id: log.address().into(), - }, - ] + }) + .collect::>() }) .collect::>(), + ); + + deltas.extend( + block + .logs() + .filter_map(|log| { + let event = abi::pool::events::RemoveLiquidity::match_and_decode(log)?; + Some((log, event)) + }) + .filter(|(log, _)| { + store + .get_last(format!("pool:{0}", hex::encode(&log.address()))) + .is_none() + }) + .flat_map(|(log, event)| { + let tokens = tokens_store + .get_last(format!("pool:{}", hex::encode(log.address()))) + .unwrap() + .split(":") + .map(|token| token.to_owned()) // Clone the tokens + .collect::>(); + + event + .token_amounts + .iter() + .zip(tokens) + .map(move |(token_amount, token_id)| { + let negative_token_amount: BigInt = token_amount * BigInt::from(-1); + tycho::BalanceDelta { + ord: log.log.ordinal, + tx: Some(tx_from_log(&log)), + token: token_id.into(), + delta: negative_token_amount.to_signed_bytes_be(), + component_id: log.address().into(), + } + }) + .collect::>() + }) + .collect::>(), + ); + + Ok(tycho::BalanceDeltas { + balance_deltas: deltas, }) } diff --git a/substreams/ethereum-curve/substreams.yaml b/substreams/ethereum-curve/substreams.yaml index e44a5f9..306d089 100644 --- a/substreams/ethereum-curve/substreams.yaml +++ b/substreams/ethereum-curve/substreams.yaml @@ -33,6 +33,14 @@ modules: inputs: - map: map_pools_created + - name: store_pools_tokens + kind: store + initialBlock: 19128828 + updatePolicy: add + valueType: string + inputs: + - map: map_pools_created + - name: map_balance_deltas kind: map initialBlock: 19128828 # An arbitrary block that should change based on your requirements