feat: add Pancakeswap v3 Substreams module (#178)

* feat: Add Pancakeswap V3 Substreams module

At this point it's just hard copy of Uniswap V3. It will be adapted in the following commits to make reviewing easier.

* refactor: adapt uniswapv3 module logic for pancakeswap v3

The main change is how they handle protocol fees. Protocol fees are set by default depending on the fee of the pool.

* refactor: use new protobuf structs

The "EntityChanges" got deprecated in favor of the hybrid messages. This commit makes PancakeswapV3 module use the new structs.

* fix: set correct factory address

---------

Co-authored-by: zizou <111426680+flopell@users.noreply.github.com>
Co-authored-by: Louise Poole <louise@datarevenue.com>
This commit is contained in:
Zizou
2025-03-17 13:39:05 +01:00
committed by GitHub
parent 503a83595e
commit 4c1e773b1b
26 changed files with 10156 additions and 0 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,4 @@
#![allow(clippy::all, clippy::pedantic, clippy::nursery)]
pub mod factory;
pub mod pool;

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,7 @@
#![allow(clippy::not_unsafe_ptr_arg_deref)]
mod abi;
mod modules;
mod pb;
pub use modules::*;

View File

@@ -0,0 +1,115 @@
use std::str::FromStr;
use ethabi::ethereum_types::Address;
use substreams::scalar::BigInt;
use substreams_ethereum::pb::eth::v2::{self as eth};
use substreams_helper::{event_handler::EventHandler, hex::Hexable};
use crate::abi::factory::events::PoolCreated;
use tycho_substreams::prelude::*;
#[substreams::handlers::map]
pub fn map_pools_created(
params: String,
block: eth::Block,
) -> Result<BlockChanges, substreams::errors::Error> {
let mut new_pools: Vec<TransactionChanges> = vec![];
let factory_address = params.as_str();
get_new_pools(&block, &mut new_pools, factory_address);
Ok(BlockChanges { block: None, changes: new_pools })
}
// Extract new pools from PoolCreated events
fn get_new_pools(
block: &eth::Block,
new_pools: &mut Vec<TransactionChanges>,
factory_address: &str,
) {
// Extract new pools from PoolCreated events
let mut on_pool_created = |event: PoolCreated, _tx: &eth::TransactionTrace, _log: &eth::Log| {
let tycho_tx: Transaction = _tx.into();
new_pools.push(TransactionChanges {
tx: Some(tycho_tx.clone()),
entity_changes: vec![EntityChanges {
component_id: event.pool.clone().to_hex(),
attributes: vec![
Attribute {
name: "liquidity".to_string(),
value: BigInt::from(0).to_signed_bytes_be(),
change: ChangeType::Creation.into(),
},
Attribute {
name: "tick".to_string(),
value: BigInt::from(0).to_signed_bytes_be(),
change: ChangeType::Creation.into(),
},
Attribute {
name: "sqrt_price_x96".to_string(),
value: BigInt::from(0).to_signed_bytes_be(),
change: ChangeType::Creation.into(),
},
],
}],
component_changes: vec![ProtocolComponent {
id: event.pool.to_hex(),
tokens: vec![event.token0.clone(), event.token1.clone()],
contracts: vec![],
static_att: vec![
Attribute {
name: "fee".to_string(),
value: event.fee.to_signed_bytes_be(),
change: ChangeType::Creation.into(),
},
Attribute {
name: "tick_spacing".to_string(),
value: event.tick_spacing.to_signed_bytes_be(),
change: ChangeType::Creation.into(),
},
Attribute {
name: "pool_address".to_string(),
value: event.pool.clone(),
change: ChangeType::Creation.into(),
},
],
change: i32::from(ChangeType::Creation),
protocol_type: Option::from(ProtocolType {
name: "pancakeswap_v3_pool".to_string(),
financial_type: FinancialType::Swap.into(),
attribute_schema: vec![],
implementation_type: ImplementationType::Custom.into(),
}),
tx: Some(tycho_tx),
}],
balance_changes: vec![
BalanceChange {
token: event.token0,
balance: BigInt::from(0).to_signed_bytes_be(),
component_id: event
.pool
.clone()
.to_hex()
.as_bytes()
.to_vec(),
},
BalanceChange {
token: event.token1,
balance: BigInt::from(0).to_signed_bytes_be(),
component_id: event.pool.to_hex().as_bytes().to_vec(),
},
],
contract_changes: vec![],
})
};
let mut eh = EventHandler::new(block);
eh.filter_by_address(vec![Address::from_str(factory_address).unwrap()]);
eh.on::<PoolCreated, _>(&mut on_pool_created);
eh.handle_events();
}

View File

@@ -0,0 +1,36 @@
use std::str;
use substreams::{
scalar::BigInt,
store::{StoreNew, StoreSetIfNotExists, StoreSetIfNotExistsProto},
};
use tycho_substreams::models::BlockChanges;
use crate::pb::pancakeswap::v3::Pool;
#[substreams::handlers::store]
pub fn store_pools(pools_created: BlockChanges, store: StoreSetIfNotExistsProto<Pool>) {
// Store pools. Required so the next maps can match any event to a known pool by their address
for change in pools_created.changes {
for component_change in &change.component_changes {
let pool_address: &str = &component_change.id;
let pool: Pool = Pool {
address: hex::decode(pool_address.trim_start_matches("0x")).unwrap(),
token0: component_change.tokens[0].clone(),
token1: component_change.tokens[1].clone(),
created_tx_hash: change.tx.as_ref().unwrap().hash.clone(),
fee: BigInt::from_signed_bytes_be(
&component_change
.static_att
.iter()
.find(|attr| attr.name == "fee")
.expect("every pool should have fee as static attribute")
.value,
)
.to_u64(),
};
store.set_if_not_exists(0, format!("{}:{}", "Pool", pool_address), &pool);
}
}
}

View File

@@ -0,0 +1,195 @@
use anyhow::Ok;
use substreams::{
store::{StoreGet, StoreGetProto},
Hex,
};
use substreams_ethereum::{
pb::eth::v2::{self as eth, Log, TransactionTrace},
Event,
};
use substreams_helper::hex::Hexable;
use crate::{
abi::pool::events::{
Burn, Collect, CollectProtocol, Flash, Initialize, Mint, SetFeeProtocol, Swap,
},
pb::pancakeswap::v3::{
events::{
pool_event::{self, Type},
PoolEvent,
},
Events, Pool,
},
};
#[substreams::handlers::map]
pub fn map_events(
block: eth::Block,
pools_store: StoreGetProto<Pool>,
) -> Result<Events, anyhow::Error> {
let mut pool_events = block
.transaction_traces
.into_iter()
.filter(|tx| tx.status == 1)
.flat_map(|tx| {
let receipt = tx
.receipt
.as_ref()
.expect("all transaction traces have a receipt");
receipt
.logs
.iter()
.filter_map(|log| {
let key = format!("{}:{}", "Pool", log.address.to_hex());
// Skip if the log is not from a known pool.
if let Some(pool) = pools_store.get_last(key) {
log_to_event(log, pool, &tx)
} else {
None
}
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
pool_events.sort_unstable_by_key(|e| e.log_ordinal);
Ok(Events { pool_events })
}
fn log_to_event(event: &Log, pool: Pool, tx: &TransactionTrace) -> Option<PoolEvent> {
if let Some(init) = Initialize::match_and_decode(event) {
Some(PoolEvent {
log_ordinal: event.ordinal,
pool_address: Hex(pool.address).to_string(),
token0: Hex(pool.token0).to_string(),
token1: Hex(pool.token1).to_string(),
fee: pool.fee,
transaction: Some(tx.into()),
r#type: Some(Type::Initialize(pool_event::Initialize {
sqrt_price: init.sqrt_price_x96.to_string(),
tick: init.tick.into(),
})),
})
} else if let Some(swap) = Swap::match_and_decode(event) {
Some(PoolEvent {
log_ordinal: event.ordinal,
pool_address: Hex(pool.address).to_string(),
token0: Hex(pool.token0).to_string(),
token1: Hex(pool.token1).to_string(),
fee: pool.fee,
transaction: Some(tx.into()),
r#type: Some(Type::Swap(pool_event::Swap {
sender: Hex(swap.sender).to_string(),
recipient: Hex(swap.recipient).to_string(),
amount_0: swap.amount0.to_string(),
amount_1: swap.amount1.to_string(),
sqrt_price: swap.sqrt_price_x96.to_string(),
liquidity: swap.liquidity.to_string(),
tick: swap.tick.into(),
})),
})
} else if let Some(flash) = Flash::match_and_decode(event) {
Some(PoolEvent {
log_ordinal: event.ordinal,
pool_address: Hex(pool.address).to_string(),
token0: Hex(pool.token0).to_string(),
token1: Hex(pool.token1).to_string(),
fee: pool.fee,
transaction: Some(tx.into()),
r#type: Some(Type::Flash(pool_event::Flash {
sender: Hex(flash.sender).to_string(),
recipient: Hex(flash.recipient).to_string(),
amount_0: flash.amount0.to_string(),
amount_1: flash.amount1.to_string(),
paid_0: flash.paid0.to_string(),
paid_1: flash.paid1.to_string(),
})),
})
} else if let Some(mint) = Mint::match_and_decode(event) {
Some(PoolEvent {
log_ordinal: event.ordinal,
pool_address: Hex(pool.address).to_string(),
token0: Hex(pool.token0).to_string(),
token1: Hex(pool.token1).to_string(),
fee: pool.fee,
transaction: Some(tx.into()),
r#type: Some(Type::Mint(pool_event::Mint {
sender: Hex(mint.sender).to_string(),
owner: Hex(mint.owner).to_string(),
tick_lower: mint.tick_lower.into(),
tick_upper: mint.tick_upper.into(),
amount: mint.amount.to_string(),
amount_0: mint.amount0.to_string(),
amount_1: mint.amount1.to_string(),
})),
})
} else if let Some(burn) = Burn::match_and_decode(event) {
Some(PoolEvent {
log_ordinal: event.ordinal,
pool_address: Hex(pool.address).to_string(),
token0: Hex(pool.token0).to_string(),
token1: Hex(pool.token1).to_string(),
fee: pool.fee,
transaction: Some(tx.into()),
r#type: Some(Type::Burn(pool_event::Burn {
owner: Hex(burn.owner).to_string(),
tick_lower: burn.tick_lower.into(),
tick_upper: burn.tick_upper.into(),
amount: burn.amount.to_string(),
amount_0: burn.amount0.to_string(),
amount_1: burn.amount1.to_string(),
})),
})
} else if let Some(collect) = Collect::match_and_decode(event) {
Some(PoolEvent {
log_ordinal: event.ordinal,
pool_address: Hex(pool.address).to_string(),
token0: Hex(pool.token0).to_string(),
token1: Hex(pool.token1).to_string(),
fee: pool.fee,
transaction: Some(tx.into()),
r#type: Some(Type::Collect(pool_event::Collect {
owner: Hex(collect.owner).to_string(),
recipient: Hex(collect.recipient).to_string(),
tick_lower: collect.tick_lower.into(),
tick_upper: collect.tick_upper.into(),
amount_0: collect.amount0.to_string(),
amount_1: collect.amount1.to_string(),
})),
})
} else if let Some(set_fp) = SetFeeProtocol::match_and_decode(event) {
Some(PoolEvent {
log_ordinal: event.ordinal,
pool_address: Hex(pool.address).to_string(),
token0: Hex(pool.token0).to_string(),
token1: Hex(pool.token1).to_string(),
fee: pool.fee,
transaction: Some(tx.into()),
r#type: Some(Type::SetFeeProtocol(pool_event::SetFeeProtocol {
fee_protocol_0_old: set_fp.fee_protocol0_old.to_u64(),
fee_protocol_1_old: set_fp.fee_protocol1_old.to_u64(),
fee_protocol_0_new: set_fp.fee_protocol0_new.to_u64(),
fee_protocol_1_new: set_fp.fee_protocol1_new.to_u64(),
})),
})
} else if let Some(cp) = CollectProtocol::match_and_decode(event) {
Some(PoolEvent {
log_ordinal: event.ordinal,
pool_address: Hex(pool.address).to_string(),
token0: Hex(pool.token0).to_string(),
token1: Hex(pool.token1).to_string(),
fee: pool.fee,
transaction: Some(tx.into()),
r#type: Some(Type::CollectProtocol(pool_event::CollectProtocol {
sender: Hex(cp.sender).to_string(),
recipient: Hex(cp.recipient).to_string(),
amount_0: cp.amount0.to_string(),
amount_1: cp.amount1.to_string(),
})),
})
} else {
None
}
}

View File

@@ -0,0 +1,163 @@
use std::str::FromStr;
use anyhow::Ok;
use tycho_substreams::models::{BalanceDelta, BlockBalanceDeltas};
use crate::pb::pancakeswap::v3::{
events::{pool_event, PoolEvent},
Events,
};
use substreams::{
scalar::BigInt,
store::{StoreAddBigInt, StoreNew},
};
#[substreams::handlers::map]
pub fn map_balance_changes(events: Events) -> Result<BlockBalanceDeltas, anyhow::Error> {
let balance_deltas = events
.pool_events
.into_iter()
.flat_map(event_to_balance_deltas)
.collect();
Ok(BlockBalanceDeltas { balance_deltas })
}
#[substreams::handlers::store]
pub fn store_pools_balances(balances_deltas: BlockBalanceDeltas, store: StoreAddBigInt) {
tycho_substreams::balances::store_balance_changes(balances_deltas, store);
}
fn event_to_balance_deltas(event: PoolEvent) -> Vec<BalanceDelta> {
let address = format!("0x{}", event.pool_address)
.as_bytes()
.to_vec();
match event.r#type.unwrap() {
pool_event::Type::Mint(e) => vec![
BalanceDelta {
token: hex::decode(event.token0).unwrap(),
delta: BigInt::from_str(&e.amount_0)
.unwrap()
.to_signed_bytes_be(),
component_id: address.clone(),
ord: event.log_ordinal,
tx: event
.transaction
.as_ref()
.map(Into::into),
},
BalanceDelta {
token: hex::decode(event.token1).unwrap(),
delta: BigInt::from_str(&e.amount_1)
.unwrap()
.to_signed_bytes_be(),
component_id: address,
ord: event.log_ordinal,
tx: event.transaction.map(Into::into),
},
],
pool_event::Type::Collect(e) => vec![
BalanceDelta {
token: hex::decode(event.token0).unwrap(),
delta: BigInt::from_str(&e.amount_0)
.unwrap()
.neg()
.to_signed_bytes_be(),
component_id: address.clone(),
ord: event.log_ordinal,
tx: event
.transaction
.as_ref()
.map(Into::into),
},
BalanceDelta {
token: hex::decode(event.token1).unwrap(),
delta: BigInt::from_str(&e.amount_1)
.unwrap()
.neg()
.to_signed_bytes_be(),
component_id: address,
ord: event.log_ordinal,
tx: event.transaction.map(Into::into),
},
],
//Burn balance changes are accounted for in the Collect event.
pool_event::Type::Burn(_) => vec![],
pool_event::Type::Swap(e) => {
vec![
BalanceDelta {
token: hex::decode(event.token0).unwrap(),
delta: BigInt::from_str(&e.amount_0)
.unwrap()
.to_signed_bytes_be(),
component_id: address.clone(),
ord: event.log_ordinal,
tx: event
.transaction
.as_ref()
.map(Into::into),
},
BalanceDelta {
token: hex::decode(event.token1).unwrap(),
delta: BigInt::from_str(&e.amount_1)
.unwrap()
.to_signed_bytes_be(),
component_id: address,
ord: event.log_ordinal,
tx: event.transaction.map(Into::into),
},
]
}
pool_event::Type::Flash(e) => vec![
BalanceDelta {
token: hex::decode(event.token0).unwrap(),
delta: BigInt::from_str(&e.paid_0)
.unwrap()
.to_signed_bytes_be(),
component_id: address.clone(),
ord: event.log_ordinal,
tx: event
.transaction
.as_ref()
.map(Into::into),
},
BalanceDelta {
token: hex::decode(event.token1).unwrap(),
delta: BigInt::from_str(&e.paid_1)
.unwrap()
.to_signed_bytes_be(),
component_id: address,
ord: event.log_ordinal,
tx: event.transaction.map(Into::into),
},
],
pool_event::Type::CollectProtocol(e) => {
vec![
BalanceDelta {
token: hex::decode(event.token0).unwrap(),
delta: BigInt::from_str(&e.amount_0)
.unwrap()
.neg()
.to_signed_bytes_be(),
component_id: address.clone(),
ord: event.log_ordinal,
tx: event
.transaction
.as_ref()
.map(Into::into),
},
BalanceDelta {
token: hex::decode(event.token1).unwrap(),
delta: BigInt::from_str(&e.amount_1)
.unwrap()
.neg()
.to_signed_bytes_be(),
component_id: address,
ord: event.log_ordinal,
tx: event.transaction.map(Into::into),
},
]
}
_ => vec![],
}
}

View File

@@ -0,0 +1,137 @@
use std::str::FromStr;
use substreams::store::{
StoreGet, StoreGetInt64, StoreSet, StoreSetInt64, StoreSetSum, StoreSetSumBigInt,
};
use crate::pb::pancakeswap::v3::{
events::{pool_event, PoolEvent},
Events, LiquidityChange, LiquidityChangeType, LiquidityChanges,
};
use substreams::{scalar::BigInt, store::StoreNew};
use anyhow::Ok;
#[substreams::handlers::store]
pub fn store_pool_current_tick(events: Events, store: StoreSetInt64) {
events
.pool_events
.into_iter()
.filter_map(event_to_current_tick)
.for_each(|(pool, ordinal, new_tick_index)| {
store.set(ordinal, format!("pool:{0}", pool), &new_tick_index.into())
});
}
#[substreams::handlers::map]
pub fn map_liquidity_changes(
events: Events,
pools_current_tick_store: StoreGetInt64,
) -> Result<LiquidityChanges, anyhow::Error> {
let mut changes = events
.pool_events
.into_iter()
.filter(PoolEvent::can_introduce_liquidity_changes)
.map(|e| {
(
pools_current_tick_store
.get_at(e.log_ordinal, format!("pool:{0}", &e.pool_address))
.unwrap_or(0),
e,
)
})
.filter_map(|(current_tick, event)| event_to_liquidity_deltas(current_tick, event))
.collect::<Vec<_>>();
changes.sort_unstable_by_key(|l| l.ordinal);
Ok(LiquidityChanges { changes })
}
#[substreams::handlers::store]
pub fn store_liquidity(ticks_deltas: LiquidityChanges, store: StoreSetSumBigInt) {
ticks_deltas
.changes
.iter()
.for_each(|changes| match changes.change_type() {
LiquidityChangeType::Delta => {
store.sum(
changes.ordinal,
format!("pool:{0}", hex::encode(&changes.pool_address)),
BigInt::from_signed_bytes_be(&changes.value),
);
}
LiquidityChangeType::Absolute => {
store.set(
changes.ordinal,
format!("pool:{0}", hex::encode(&changes.pool_address)),
BigInt::from_signed_bytes_be(&changes.value),
);
}
});
}
fn event_to_liquidity_deltas(current_tick: i64, event: PoolEvent) -> Option<LiquidityChange> {
match event.r#type.as_ref().unwrap() {
pool_event::Type::Mint(mint) => {
if current_tick >= mint.tick_lower.into() && current_tick < mint.tick_upper.into() {
Some(LiquidityChange {
pool_address: hex::decode(event.pool_address).unwrap(),
value: BigInt::from_str(&mint.amount)
.unwrap()
.to_signed_bytes_be(),
change_type: LiquidityChangeType::Delta.into(),
ordinal: event.log_ordinal,
transaction: Some(event.transaction.unwrap()),
})
} else {
None
}
}
pool_event::Type::Burn(burn) => {
if current_tick >= burn.tick_lower.into() && current_tick < burn.tick_upper.into() {
Some(LiquidityChange {
pool_address: hex::decode(event.pool_address).unwrap(),
value: BigInt::from_str(&burn.amount)
.unwrap()
.neg()
.to_signed_bytes_be(),
change_type: LiquidityChangeType::Delta.into(),
ordinal: event.log_ordinal,
transaction: Some(event.transaction.unwrap()),
})
} else {
None
}
}
pool_event::Type::Swap(swap) => Some(LiquidityChange {
pool_address: hex::decode(event.pool_address).unwrap(),
value: BigInt::from_str(&swap.liquidity)
.unwrap()
.to_signed_bytes_be(),
change_type: LiquidityChangeType::Absolute.into(),
ordinal: event.log_ordinal,
transaction: Some(event.transaction.unwrap()),
}),
_ => None,
}
}
impl PoolEvent {
fn can_introduce_liquidity_changes(&self) -> bool {
matches!(
self.r#type.as_ref().unwrap(),
pool_event::Type::Mint(_) | pool_event::Type::Burn(_) | pool_event::Type::Swap(_)
)
}
}
fn event_to_current_tick(event: PoolEvent) -> Option<(String, u64, i32)> {
match event.r#type.as_ref().unwrap() {
pool_event::Type::Initialize(initialize) => {
Some((event.pool_address, event.log_ordinal, initialize.tick))
}
pool_event::Type::Swap(swap) => Some((event.pool_address, event.log_ordinal, swap.tick)),
_ => None,
}
}

View File

@@ -0,0 +1,91 @@
use std::str::FromStr;
use substreams::store::StoreAddBigInt;
use crate::pb::pancakeswap::v3::{
events::{pool_event, PoolEvent},
Events, TickDelta, TickDeltas,
};
use substreams::{
scalar::BigInt,
store::{StoreAdd, StoreNew},
};
use anyhow::Ok;
#[substreams::handlers::map]
pub fn map_ticks_changes(events: Events) -> Result<TickDeltas, anyhow::Error> {
let ticks_deltas = events
.pool_events
.into_iter()
.flat_map(event_to_ticks_deltas)
.collect();
Ok(TickDeltas { deltas: ticks_deltas })
}
#[substreams::handlers::store]
pub fn store_ticks_liquidity(ticks_deltas: TickDeltas, store: StoreAddBigInt) {
let mut deltas = ticks_deltas.deltas;
deltas.sort_unstable_by_key(|delta| delta.ordinal);
deltas.iter().for_each(|delta| {
store.add(
delta.ordinal,
format!("pool:{0}:tick:{1}", hex::encode(&delta.pool_address), delta.tick_index,),
BigInt::from_signed_bytes_be(&delta.liquidity_net_delta),
);
});
}
fn event_to_ticks_deltas(event: PoolEvent) -> Vec<TickDelta> {
match event.r#type.as_ref().unwrap() {
pool_event::Type::Mint(mint) => {
vec![
TickDelta {
pool_address: hex::decode(&event.pool_address).unwrap(),
tick_index: mint.tick_lower,
liquidity_net_delta: BigInt::from_str(&mint.amount)
.unwrap()
.to_signed_bytes_be(),
ordinal: event.log_ordinal,
transaction: event.transaction.clone(),
},
TickDelta {
pool_address: hex::decode(&event.pool_address).unwrap(),
tick_index: mint.tick_upper,
liquidity_net_delta: BigInt::from_str(&mint.amount)
.unwrap()
.neg()
.to_signed_bytes_be(),
ordinal: event.log_ordinal,
transaction: event.transaction,
},
]
}
pool_event::Type::Burn(burn) => vec![
TickDelta {
pool_address: hex::decode(&event.pool_address).unwrap(),
tick_index: burn.tick_lower,
liquidity_net_delta: BigInt::from_str(&burn.amount)
.unwrap()
.neg()
.to_signed_bytes_be(),
ordinal: event.log_ordinal,
transaction: event.transaction.clone(),
},
TickDelta {
pool_address: hex::decode(&event.pool_address).unwrap(),
tick_index: burn.tick_upper,
liquidity_net_delta: BigInt::from_str(&burn.amount)
.unwrap()
.to_signed_bytes_be(),
ordinal: event.log_ordinal,
transaction: event.transaction,
},
],
_ => vec![],
}
}

View File

@@ -0,0 +1,287 @@
use crate::pb::pancakeswap::v3::{
events::{pool_event, PoolEvent},
Events, LiquidityChanges, TickDeltas,
};
use itertools::Itertools;
use std::{collections::HashMap, str::FromStr, vec};
use substreams::{pb::substreams::StoreDeltas, scalar::BigInt};
use substreams_ethereum::pb::eth::v2::{self as eth};
use substreams_helper::hex::Hexable;
use tycho_substreams::{balances::aggregate_balances_changes, prelude::*};
type PoolAddress = Vec<u8>;
#[substreams::handlers::map]
pub fn map_protocol_changes(
block: eth::Block,
created_pools: BlockChanges,
events: Events,
balances_map_deltas: BlockBalanceDeltas,
balances_store_deltas: StoreDeltas,
ticks_map_deltas: TickDeltas,
ticks_store_deltas: StoreDeltas,
pool_liquidity_changes: LiquidityChanges,
pool_liquidity_store_deltas: StoreDeltas,
) -> Result<BlockChanges, substreams::errors::Error> {
// We merge contract changes by transaction (identified by transaction index) making it easy to
// sort them at the very end.
let mut transaction_changes: HashMap<_, TransactionChangesBuilder> = HashMap::new();
// Add created pools to the tx_changes_map
for change in created_pools.changes.into_iter() {
let tx = change.tx.as_ref().unwrap();
let builder = transaction_changes
.entry(tx.index)
.or_insert_with(|| TransactionChangesBuilder::new(tx));
change
.component_changes
.iter()
.for_each(|c| {
builder.add_protocol_component(c);
});
change
.entity_changes
.iter()
.for_each(|ec| {
builder.add_entity_change(ec);
});
change
.balance_changes
.iter()
.for_each(|bc| {
builder.add_balance_change(bc);
});
}
// Balance changes are gathered by the `StoreDelta` based on `PoolBalanceChanged` creating
// `BlockBalanceDeltas`. We essentially just process the changes that occurred to the `store`
// this block. Then, these balance changes are merged onto the existing map of tx contract
// changes, inserting a new one if it doesn't exist.
aggregate_balances_changes(balances_store_deltas, balances_map_deltas)
.into_iter()
.for_each(|(_, (tx, balances))| {
let builder = transaction_changes
.entry(tx.index)
.or_insert_with(|| TransactionChangesBuilder::new(&tx));
balances
.values()
.for_each(|token_bc_map| {
token_bc_map
.values()
.for_each(|bc| builder.add_balance_change(bc))
});
});
// Insert ticks net-liquidity changes
ticks_store_deltas
.deltas
.into_iter()
.zip(ticks_map_deltas.deltas)
.for_each(|(store_delta, tick_delta)| {
let new_value_bigint =
BigInt::from_str(&String::from_utf8(store_delta.new_value).unwrap()).unwrap();
// If old value is empty or the int value is 0, it's considered as a creation.
let is_creation = store_delta.old_value.is_empty() ||
BigInt::from_str(&String::from_utf8(store_delta.old_value).unwrap())
.unwrap()
.is_zero();
let attribute_name = format!("ticks/{}/net-liquidity", tick_delta.tick_index);
let attribute = Attribute {
name: attribute_name,
value: new_value_bigint.to_signed_bytes_be(),
change: if is_creation {
ChangeType::Creation.into()
} else if new_value_bigint.is_zero() {
ChangeType::Deletion.into()
} else {
ChangeType::Update.into()
},
};
let tx = tick_delta.transaction.unwrap();
let builder = transaction_changes
.entry(tx.index)
.or_insert_with(|| TransactionChangesBuilder::new(&tx.into()));
builder.add_entity_change(&EntityChanges {
component_id: tick_delta.pool_address.to_hex(),
attributes: vec![attribute],
});
});
// Insert liquidity changes
pool_liquidity_store_deltas
.deltas
.into_iter()
.zip(pool_liquidity_changes.changes)
.for_each(|(store_delta, change)| {
let new_value_bigint = BigInt::from_str(
String::from_utf8(store_delta.new_value)
.unwrap()
.split(':')
.nth(1)
.unwrap(),
)
.unwrap();
let tx = change.transaction.unwrap();
let builder = transaction_changes
.entry(tx.index)
.or_insert_with(|| TransactionChangesBuilder::new(&tx.into()));
builder.add_entity_change(&EntityChanges {
component_id: change.pool_address.to_hex(),
attributes: vec![Attribute {
name: "liquidity".to_string(),
value: new_value_bigint.to_signed_bytes_be(),
change: ChangeType::Update.into(),
}],
});
});
// Insert others changes
events
.pool_events
.into_iter()
.flat_map(event_to_attributes_updates)
.for_each(|(tx, pool_address, attr)| {
let builder = transaction_changes
.entry(tx.index)
.or_insert_with(|| TransactionChangesBuilder::new(&tx));
builder.add_entity_change(&EntityChanges {
component_id: pool_address.to_hex(),
attributes: vec![attr],
});
});
Ok(BlockChanges {
block: Some((&block).into()),
changes: transaction_changes
.drain()
.sorted_unstable_by_key(|(index, _)| *index)
.filter_map(|(_, builder)| builder.build())
.collect::<Vec<_>>(),
})
}
fn event_to_attributes_updates(event: PoolEvent) -> Vec<(Transaction, PoolAddress, Attribute)> {
match event.r#type.as_ref().unwrap() {
pool_event::Type::Initialize(initalize) => {
let (zero_to_one, one_to_zero) = fee_to_default_protocol_fees(event.fee);
vec![
(
event
.transaction
.as_ref()
.unwrap()
.into(),
hex::decode(&event.pool_address).unwrap(),
Attribute {
name: "sqrt_price_x96".to_string(),
value: BigInt::from_str(&initalize.sqrt_price)
.unwrap()
.to_signed_bytes_be(),
change: ChangeType::Update.into(),
},
),
(
event
.transaction
.as_ref()
.unwrap()
.into(),
hex::decode(&event.pool_address).unwrap(),
Attribute {
name: "tick".to_string(),
value: BigInt::from(initalize.tick).to_signed_bytes_be(),
change: ChangeType::Update.into(),
},
),
(
event
.transaction
.as_ref()
.unwrap()
.into(),
hex::decode(&event.pool_address).unwrap(),
Attribute {
name: "protocol_fees/zeroForOne".to_string(),
value: BigInt::from(zero_to_one).to_signed_bytes_be(),
change: ChangeType::Update.into(),
},
),
(
event.transaction.unwrap().into(),
hex::decode(event.pool_address).unwrap(),
Attribute {
name: "protocol_fees/oneForZero".to_string(),
value: BigInt::from(one_to_zero).to_signed_bytes_be(),
change: ChangeType::Update.into(),
},
),
]
}
pool_event::Type::Swap(swap) => vec![
(
event
.transaction
.as_ref()
.unwrap()
.into(),
hex::decode(&event.pool_address).unwrap(),
Attribute {
name: "sqrt_price_x96".to_string(),
value: BigInt::from_str(&swap.sqrt_price)
.unwrap()
.to_signed_bytes_be(),
change: ChangeType::Update.into(),
},
),
(
event.transaction.unwrap().into(),
hex::decode(event.pool_address).unwrap(),
Attribute {
name: "tick".to_string(),
value: BigInt::from(swap.tick).to_signed_bytes_be(),
change: ChangeType::Update.into(),
},
),
],
pool_event::Type::SetFeeProtocol(sfp) => vec![
(
event
.transaction
.as_ref()
.unwrap()
.into(),
hex::decode(&event.pool_address).unwrap(),
Attribute {
name: "protocol_fees/zeroForOne".to_string(),
value: BigInt::from(sfp.fee_protocol_0_new).to_signed_bytes_be(),
change: ChangeType::Update.into(),
},
),
(
event.transaction.unwrap().into(),
hex::decode(event.pool_address).unwrap(),
Attribute {
name: "protocol_fees/oneForZero".to_string(),
value: BigInt::from(sfp.fee_protocol_1_new).to_signed_bytes_be(),
change: ChangeType::Update.into(),
},
),
],
_ => vec![],
}
}
// Map the pool fee to the default protocol fees.
// For the reference implementation see https://github.com/pancakeswap/pancake-v3-contracts/blob/5cc479f0c5a98966c74d94700057b8c3ca629afd/projects/v3-core/contracts/PancakeV3Pool.sol#L298-L306
fn fee_to_default_protocol_fees(fee: u64) -> (u64, u64) {
match fee {
100 => (3300, 3300),
500 => (3400, 3400),
2500 => (3200, 3200),
10000 => (3200, 3200),
_ => panic!("Unexpected fee value"),
}
}

View File

@@ -0,0 +1,61 @@
pub use map_pool_created::map_pools_created;
pub use map_protocol_changes::map_protocol_changes;
pub use store_pools::store_pools;
use substreams_ethereum::pb::eth::v2::TransactionTrace;
use crate::pb::pancakeswap::v3::Transaction;
#[path = "1_map_pool_created.rs"]
mod map_pool_created;
#[path = "2_store_pools.rs"]
mod store_pools;
#[path = "3_map_events.rs"]
mod map_events;
#[path = "4_map_and_store_balance_changes.rs"]
mod map_store_balance_changes;
#[path = "4_map_and_store_ticks.rs"]
mod map_store_ticks;
#[path = "4_map_and_store_liquidity.rs"]
mod map_store_liquidity;
#[path = "5_map_protocol_changes.rs"]
mod map_protocol_changes;
impl From<TransactionTrace> for Transaction {
fn from(value: TransactionTrace) -> Self {
Self { hash: value.hash, from: value.from, to: value.to, index: value.index.into() }
}
}
impl From<&TransactionTrace> for Transaction {
fn from(value: &TransactionTrace) -> Self {
Self {
hash: value.hash.clone(),
from: value.from.clone(),
to: value.to.clone(),
index: value.index.into(),
}
}
}
impl From<&Transaction> for tycho_substreams::prelude::Transaction {
fn from(value: &Transaction) -> Self {
Self {
hash: value.hash.clone(),
from: value.from.clone(),
to: value.to.clone(),
index: value.index,
}
}
}
impl From<Transaction> for tycho_substreams::prelude::Transaction {
fn from(value: Transaction) -> Self {
Self { hash: value.hash, from: value.from, to: value.to, index: value.index }
}
}

View File

@@ -0,0 +1,8 @@
// @generated
pub mod pancakeswap {
// @@protoc_insertion_point(attribute:pancakeswap.v3)
pub mod v3 {
include!("pancakeswap.v3.rs");
// @@protoc_insertion_point(pancakeswap.v3)
}
}

View File

@@ -0,0 +1,302 @@
// @generated
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Pool {
#[prost(bytes="vec", tag="1")]
pub address: ::prost::alloc::vec::Vec<u8>,
#[prost(bytes="vec", tag="2")]
pub token0: ::prost::alloc::vec::Vec<u8>,
#[prost(bytes="vec", tag="3")]
pub token1: ::prost::alloc::vec::Vec<u8>,
#[prost(uint64, tag="4")]
pub fee: u64,
#[prost(bytes="vec", tag="5")]
pub created_tx_hash: ::prost::alloc::vec::Vec<u8>,
}
/// A struct describing a transaction.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Transaction {
/// The transaction hash.
#[prost(bytes="vec", tag="1")]
pub hash: ::prost::alloc::vec::Vec<u8>,
/// The sender of the transaction.
#[prost(bytes="vec", tag="2")]
pub from: ::prost::alloc::vec::Vec<u8>,
/// The receiver of the transaction.
#[prost(bytes="vec", tag="3")]
pub to: ::prost::alloc::vec::Vec<u8>,
/// The transactions index within the block.
#[prost(uint64, tag="4")]
pub index: u64,
}
/// A change to a pool's tick.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TickDelta {
/// The address of the pool.
#[prost(bytes="vec", tag="1")]
pub pool_address: ::prost::alloc::vec::Vec<u8>,
/// The index of the tick.
#[prost(int32, tag="2")]
pub tick_index: i32,
/// The liquidity net delta of this tick. Bigint encoded as signed little endian bytes.
#[prost(bytes="vec", tag="3")]
pub liquidity_net_delta: ::prost::alloc::vec::Vec<u8>,
/// Used to determine the order of the balance changes. Necessary for the balance store.
#[prost(uint64, tag="4")]
pub ordinal: u64,
#[prost(message, optional, tag="5")]
pub transaction: ::core::option::Option<Transaction>,
}
/// A group of TickDelta
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TickDeltas {
#[prost(message, repeated, tag="1")]
pub deltas: ::prost::alloc::vec::Vec<TickDelta>,
}
/// A change to a pool's liquidity.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct LiquidityChange {
/// The address of the pool.
#[prost(bytes="vec", tag="1")]
pub pool_address: ::prost::alloc::vec::Vec<u8>,
/// The liquidity changed amount. Bigint encoded as signed little endian bytes.
#[prost(bytes="vec", tag="2")]
pub value: ::prost::alloc::vec::Vec<u8>,
/// The type of update, can be absolute or delta.
#[prost(enumeration="LiquidityChangeType", tag="3")]
pub change_type: i32,
/// Used to determine the order of the balance changes. Necessary for the balance store.
#[prost(uint64, tag="4")]
pub ordinal: u64,
#[prost(message, optional, tag="5")]
pub transaction: ::core::option::Option<Transaction>,
}
/// A group of LiquidityChange
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct LiquidityChanges {
#[prost(message, repeated, tag="1")]
pub changes: ::prost::alloc::vec::Vec<LiquidityChange>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Events {
#[prost(message, repeated, tag="3")]
pub pool_events: ::prost::alloc::vec::Vec<events::PoolEvent>,
}
/// Nested message and enum types in `Events`.
pub mod events {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PoolEvent {
#[prost(uint64, tag="100")]
pub log_ordinal: u64,
#[prost(string, tag="102")]
pub pool_address: ::prost::alloc::string::String,
#[prost(string, tag="103")]
pub token0: ::prost::alloc::string::String,
#[prost(string, tag="104")]
pub token1: ::prost::alloc::string::String,
#[prost(uint64, tag="105")]
pub fee: u64,
#[prost(message, optional, tag="106")]
pub transaction: ::core::option::Option<super::Transaction>,
#[prost(oneof="pool_event::Type", tags="1, 2, 3, 4, 5, 6, 7, 8")]
pub r#type: ::core::option::Option<pool_event::Type>,
}
/// Nested message and enum types in `PoolEvent`.
pub mod pool_event {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Initialize {
/// Unsigned
#[prost(string, tag="1")]
pub sqrt_price: ::prost::alloc::string::String,
#[prost(int32, tag="2")]
pub tick: i32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Mint {
#[prost(string, tag="1")]
pub sender: ::prost::alloc::string::String,
#[prost(string, tag="2")]
pub owner: ::prost::alloc::string::String,
/// Signed
#[prost(int32, tag="3")]
pub tick_lower: i32,
/// Signed
#[prost(int32, tag="4")]
pub tick_upper: i32,
/// Unsigned
#[prost(string, tag="5")]
pub amount: ::prost::alloc::string::String,
/// Unsigned
#[prost(string, tag="6")]
pub amount_0: ::prost::alloc::string::String,
/// Unsigned
#[prost(string, tag="7")]
pub amount_1: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Collect {
#[prost(string, tag="1")]
pub owner: ::prost::alloc::string::String,
#[prost(string, tag="2")]
pub recipient: ::prost::alloc::string::String,
#[prost(int32, tag="3")]
pub tick_lower: i32,
#[prost(int32, tag="4")]
pub tick_upper: i32,
/// Unsigned
#[prost(string, tag="5")]
pub amount_0: ::prost::alloc::string::String,
/// Unsigned
#[prost(string, tag="6")]
pub amount_1: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Burn {
#[prost(string, tag="1")]
pub owner: ::prost::alloc::string::String,
#[prost(int32, tag="2")]
pub tick_lower: i32,
#[prost(int32, tag="3")]
pub tick_upper: i32,
/// Unsigned
#[prost(string, tag="4")]
pub amount: ::prost::alloc::string::String,
/// Unsigned
#[prost(string, tag="5")]
pub amount_0: ::prost::alloc::string::String,
/// Unsigned
#[prost(string, tag="6")]
pub amount_1: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Swap {
#[prost(string, tag="1")]
pub sender: ::prost::alloc::string::String,
#[prost(string, tag="2")]
pub recipient: ::prost::alloc::string::String,
/// Signed
#[prost(string, tag="3")]
pub amount_0: ::prost::alloc::string::String,
/// Signed
#[prost(string, tag="4")]
pub amount_1: ::prost::alloc::string::String,
/// Unsigned
#[prost(string, tag="6")]
pub sqrt_price: ::prost::alloc::string::String,
/// Unsigned
#[prost(string, tag="7")]
pub liquidity: ::prost::alloc::string::String,
#[prost(int32, tag="8")]
pub tick: i32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Flash {
#[prost(string, tag="1")]
pub sender: ::prost::alloc::string::String,
#[prost(string, tag="2")]
pub recipient: ::prost::alloc::string::String,
/// Unsigned
#[prost(string, tag="3")]
pub amount_0: ::prost::alloc::string::String,
/// Unsigned
#[prost(string, tag="4")]
pub amount_1: ::prost::alloc::string::String,
/// Unsigned
#[prost(string, tag="5")]
pub paid_0: ::prost::alloc::string::String,
/// Unsigned
#[prost(string, tag="6")]
pub paid_1: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SetFeeProtocol {
/// Unsigned
#[prost(uint64, tag="1")]
pub fee_protocol_0_old: u64,
/// Unsigned
#[prost(uint64, tag="2")]
pub fee_protocol_1_old: u64,
/// Unsigned
#[prost(uint64, tag="3")]
pub fee_protocol_0_new: u64,
/// Unsigned
#[prost(uint64, tag="4")]
pub fee_protocol_1_new: u64,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CollectProtocol {
#[prost(string, tag="1")]
pub sender: ::prost::alloc::string::String,
#[prost(string, tag="2")]
pub recipient: ::prost::alloc::string::String,
/// Unsigned
#[prost(string, tag="3")]
pub amount_0: ::prost::alloc::string::String,
/// Unsigned
#[prost(string, tag="4")]
pub amount_1: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Type {
#[prost(message, tag="1")]
Initialize(Initialize),
#[prost(message, tag="2")]
Mint(Mint),
#[prost(message, tag="3")]
Collect(Collect),
#[prost(message, tag="4")]
Burn(Burn),
#[prost(message, tag="5")]
Swap(Swap),
#[prost(message, tag="6")]
Flash(Flash),
#[prost(message, tag="7")]
SetFeeProtocol(SetFeeProtocol),
#[prost(message, tag="8")]
CollectProtocol(CollectProtocol),
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum LiquidityChangeType {
Delta = 0,
Absolute = 1,
}
impl LiquidityChangeType {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
LiquidityChangeType::Delta => "DELTA",
LiquidityChangeType::Absolute => "ABSOLUTE",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"DELTA" => Some(Self::Delta),
"ABSOLUTE" => Some(Self::Absolute),
_ => None,
}
}
}
// @@protoc_insertion_point(module)