use crate::pb::pancakeswap::v3::{ events::{pool_event, PoolEvent}, Events, LiquidityChanges, TickDeltas, }; use itertools::Itertools; use std::{collections::HashMap, str::FromStr, vec}; use substreams::{pb::substreams::StoreDeltas, scalar::BigInt}; use substreams_ethereum::pb::eth::v2::{self as eth}; use substreams_helper::hex::Hexable; use tycho_substreams::{balances::aggregate_balances_changes, prelude::*}; type PoolAddress = Vec; #[substreams::handlers::map] pub fn map_protocol_changes( block: eth::Block, created_pools: BlockChanges, events: Events, balances_map_deltas: BlockBalanceDeltas, balances_store_deltas: StoreDeltas, ticks_map_deltas: TickDeltas, ticks_store_deltas: StoreDeltas, pool_liquidity_changes: LiquidityChanges, pool_liquidity_store_deltas: StoreDeltas, ) -> Result { // We merge contract changes by transaction (identified by transaction index) making it easy to // sort them at the very end. let mut transaction_changes: HashMap<_, TransactionChangesBuilder> = HashMap::new(); // Add created pools to the tx_changes_map for change in created_pools.changes.into_iter() { let tx = change.tx.as_ref().unwrap(); let builder = transaction_changes .entry(tx.index) .or_insert_with(|| TransactionChangesBuilder::new(tx)); change .component_changes .iter() .for_each(|c| { builder.add_protocol_component(c); }); change .entity_changes .iter() .for_each(|ec| { builder.add_entity_change(ec); }); change .balance_changes .iter() .for_each(|bc| { builder.add_balance_change(bc); }); } // Balance changes are gathered by the `StoreDelta` based on `PoolBalanceChanged` creating // `BlockBalanceDeltas`. We essentially just process the changes that occurred to the `store` // this block. Then, these balance changes are merged onto the existing map of tx contract // changes, inserting a new one if it doesn't exist. aggregate_balances_changes(balances_store_deltas, balances_map_deltas) .into_iter() .for_each(|(_, (tx, balances))| { let builder = transaction_changes .entry(tx.index) .or_insert_with(|| TransactionChangesBuilder::new(&tx)); balances .values() .for_each(|token_bc_map| { token_bc_map .values() .for_each(|bc| builder.add_balance_change(bc)) }); }); // Insert ticks net-liquidity changes ticks_store_deltas .deltas .into_iter() .zip(ticks_map_deltas.deltas) .for_each(|(store_delta, tick_delta)| { let new_value_bigint = BigInt::from_str(&String::from_utf8(store_delta.new_value).unwrap()).unwrap(); // If old value is empty or the int value is 0, it's considered as a creation. let is_creation = store_delta.old_value.is_empty() || BigInt::from_str(&String::from_utf8(store_delta.old_value).unwrap()) .unwrap() .is_zero(); let attribute_name = format!("ticks/{}/net-liquidity", tick_delta.tick_index); let attribute = Attribute { name: attribute_name, value: new_value_bigint.to_signed_bytes_be(), change: if is_creation { ChangeType::Creation.into() } else if new_value_bigint.is_zero() { ChangeType::Deletion.into() } else { ChangeType::Update.into() }, }; let tx = tick_delta.transaction.unwrap(); let builder = transaction_changes .entry(tx.index) .or_insert_with(|| TransactionChangesBuilder::new(&tx.into())); builder.add_entity_change(&EntityChanges { component_id: tick_delta.pool_address.to_hex(), attributes: vec![attribute], }); }); // Insert liquidity changes pool_liquidity_store_deltas .deltas .into_iter() .zip(pool_liquidity_changes.changes) .for_each(|(store_delta, change)| { let new_value_bigint = BigInt::from_str( String::from_utf8(store_delta.new_value) .unwrap() .split(':') .nth(1) .unwrap(), ) .unwrap(); let tx = change.transaction.unwrap(); let builder = transaction_changes .entry(tx.index) .or_insert_with(|| TransactionChangesBuilder::new(&tx.into())); builder.add_entity_change(&EntityChanges { component_id: change.pool_address.to_hex(), attributes: vec![Attribute { name: "liquidity".to_string(), value: new_value_bigint.to_signed_bytes_be(), change: ChangeType::Update.into(), }], }); }); // Insert others changes events .pool_events .into_iter() .flat_map(event_to_attributes_updates) .for_each(|(tx, pool_address, attr)| { let builder = transaction_changes .entry(tx.index) .or_insert_with(|| TransactionChangesBuilder::new(&tx)); builder.add_entity_change(&EntityChanges { component_id: pool_address.to_hex(), attributes: vec![attr], }); }); Ok(BlockChanges { block: Some((&block).into()), changes: transaction_changes .drain() .sorted_unstable_by_key(|(index, _)| *index) .filter_map(|(_, builder)| builder.build()) .collect::>(), }) } fn event_to_attributes_updates(event: PoolEvent) -> Vec<(Transaction, PoolAddress, Attribute)> { match event.r#type.as_ref().unwrap() { pool_event::Type::Initialize(initalize) => { let (zero_to_one, one_to_zero) = fee_to_default_protocol_fees(event.fee); vec![ ( event .transaction .as_ref() .unwrap() .into(), hex::decode(&event.pool_address).unwrap(), Attribute { name: "sqrt_price_x96".to_string(), value: BigInt::from_str(&initalize.sqrt_price) .unwrap() .to_signed_bytes_be(), change: ChangeType::Update.into(), }, ), ( event .transaction .as_ref() .unwrap() .into(), hex::decode(&event.pool_address).unwrap(), Attribute { name: "tick".to_string(), value: BigInt::from(initalize.tick).to_signed_bytes_be(), change: ChangeType::Update.into(), }, ), ( event .transaction .as_ref() .unwrap() .into(), hex::decode(&event.pool_address).unwrap(), Attribute { name: "protocol_fees/zero2one".to_string(), value: BigInt::from(zero_to_one).to_signed_bytes_be(), change: ChangeType::Update.into(), }, ), ( event.transaction.unwrap().into(), hex::decode(event.pool_address).unwrap(), Attribute { name: "protocol_fees/one2zero".to_string(), value: BigInt::from(one_to_zero).to_signed_bytes_be(), change: ChangeType::Update.into(), }, ), ] } pool_event::Type::Swap(swap) => vec![ ( event .transaction .as_ref() .unwrap() .into(), hex::decode(&event.pool_address).unwrap(), Attribute { name: "sqrt_price_x96".to_string(), value: BigInt::from_str(&swap.sqrt_price) .unwrap() .to_signed_bytes_be(), change: ChangeType::Update.into(), }, ), ( event.transaction.unwrap().into(), hex::decode(event.pool_address).unwrap(), Attribute { name: "tick".to_string(), value: BigInt::from(swap.tick).to_signed_bytes_be(), change: ChangeType::Update.into(), }, ), ], pool_event::Type::SetFeeProtocol(sfp) => vec![ ( event .transaction .as_ref() .unwrap() .into(), hex::decode(&event.pool_address).unwrap(), Attribute { name: "protocol_fees/zero2one".to_string(), value: BigInt::from(sfp.fee_protocol_0_new).to_signed_bytes_be(), change: ChangeType::Update.into(), }, ), ( event.transaction.unwrap().into(), hex::decode(event.pool_address).unwrap(), Attribute { name: "protocol_fees/one2zero".to_string(), value: BigInt::from(sfp.fee_protocol_1_new).to_signed_bytes_be(), change: ChangeType::Update.into(), }, ), ], _ => vec![], } } // Map the pool fee to the default protocol fees. // For the reference implementation see https://github.com/pancakeswap/pancake-v3-contracts/blob/5cc479f0c5a98966c74d94700057b8c3ca629afd/projects/v3-core/contracts/PancakeV3Pool.sol#L298-L306 fn fee_to_default_protocol_fees(fee: u64) -> (u64, u64) { match fee { 100 => (3300, 3300), 500 => (3400, 3400), 2500 => (3200, 3200), 10000 => (3200, 3200), _ => panic!("Unexpected fee value"), } }