Also fixes an issue with inconsistent version tag between cargo and substreams files Co-authored-by: zizou <111426680+flopell@users.noreply.github.com>
288 lines
11 KiB
Rust
288 lines
11 KiB
Rust
use crate::pb::pancakeswap::v3::{
|
|
events::{pool_event, PoolEvent},
|
|
Events, LiquidityChanges, TickDeltas,
|
|
};
|
|
use itertools::Itertools;
|
|
use std::{collections::HashMap, str::FromStr, vec};
|
|
use substreams::{pb::substreams::StoreDeltas, scalar::BigInt};
|
|
use substreams_ethereum::pb::eth::v2::{self as eth};
|
|
use substreams_helper::hex::Hexable;
|
|
use tycho_substreams::{balances::aggregate_balances_changes, prelude::*};
|
|
|
|
type PoolAddress = Vec<u8>;
|
|
|
|
#[substreams::handlers::map]
|
|
pub fn map_protocol_changes(
|
|
block: eth::Block,
|
|
created_pools: BlockChanges,
|
|
events: Events,
|
|
balances_map_deltas: BlockBalanceDeltas,
|
|
balances_store_deltas: StoreDeltas,
|
|
ticks_map_deltas: TickDeltas,
|
|
ticks_store_deltas: StoreDeltas,
|
|
pool_liquidity_changes: LiquidityChanges,
|
|
pool_liquidity_store_deltas: StoreDeltas,
|
|
) -> Result<BlockChanges, substreams::errors::Error> {
|
|
// We merge contract changes by transaction (identified by transaction index) making it easy to
|
|
// sort them at the very end.
|
|
let mut transaction_changes: HashMap<_, TransactionChangesBuilder> = HashMap::new();
|
|
|
|
// Add created pools to the tx_changes_map
|
|
for change in created_pools.changes.into_iter() {
|
|
let tx = change.tx.as_ref().unwrap();
|
|
let builder = transaction_changes
|
|
.entry(tx.index)
|
|
.or_insert_with(|| TransactionChangesBuilder::new(tx));
|
|
change
|
|
.component_changes
|
|
.iter()
|
|
.for_each(|c| {
|
|
builder.add_protocol_component(c);
|
|
});
|
|
change
|
|
.entity_changes
|
|
.iter()
|
|
.for_each(|ec| {
|
|
builder.add_entity_change(ec);
|
|
});
|
|
change
|
|
.balance_changes
|
|
.iter()
|
|
.for_each(|bc| {
|
|
builder.add_balance_change(bc);
|
|
});
|
|
}
|
|
|
|
// Balance changes are gathered by the `StoreDelta` based on `PoolBalanceChanged` creating
|
|
// `BlockBalanceDeltas`. We essentially just process the changes that occurred to the `store`
|
|
// this block. Then, these balance changes are merged onto the existing map of tx contract
|
|
// changes, inserting a new one if it doesn't exist.
|
|
aggregate_balances_changes(balances_store_deltas, balances_map_deltas)
|
|
.into_iter()
|
|
.for_each(|(_, (tx, balances))| {
|
|
let builder = transaction_changes
|
|
.entry(tx.index)
|
|
.or_insert_with(|| TransactionChangesBuilder::new(&tx));
|
|
balances
|
|
.values()
|
|
.for_each(|token_bc_map| {
|
|
token_bc_map
|
|
.values()
|
|
.for_each(|bc| builder.add_balance_change(bc))
|
|
});
|
|
});
|
|
|
|
// Insert ticks net-liquidity changes
|
|
ticks_store_deltas
|
|
.deltas
|
|
.into_iter()
|
|
.zip(ticks_map_deltas.deltas)
|
|
.for_each(|(store_delta, tick_delta)| {
|
|
let new_value_bigint =
|
|
BigInt::from_str(&String::from_utf8(store_delta.new_value).unwrap()).unwrap();
|
|
|
|
// If old value is empty or the int value is 0, it's considered as a creation.
|
|
let is_creation = store_delta.old_value.is_empty() ||
|
|
BigInt::from_str(&String::from_utf8(store_delta.old_value).unwrap())
|
|
.unwrap()
|
|
.is_zero();
|
|
let attribute_name = format!("ticks/{}/net-liquidity", tick_delta.tick_index);
|
|
let attribute = Attribute {
|
|
name: attribute_name,
|
|
value: new_value_bigint.to_signed_bytes_be(),
|
|
change: if is_creation {
|
|
ChangeType::Creation.into()
|
|
} else if new_value_bigint.is_zero() {
|
|
ChangeType::Deletion.into()
|
|
} else {
|
|
ChangeType::Update.into()
|
|
},
|
|
};
|
|
let tx = tick_delta.transaction.unwrap();
|
|
let builder = transaction_changes
|
|
.entry(tx.index)
|
|
.or_insert_with(|| TransactionChangesBuilder::new(&tx.into()));
|
|
|
|
builder.add_entity_change(&EntityChanges {
|
|
component_id: tick_delta.pool_address.to_hex(),
|
|
attributes: vec![attribute],
|
|
});
|
|
});
|
|
|
|
// Insert liquidity changes
|
|
pool_liquidity_store_deltas
|
|
.deltas
|
|
.into_iter()
|
|
.zip(pool_liquidity_changes.changes)
|
|
.for_each(|(store_delta, change)| {
|
|
let new_value_bigint = BigInt::from_str(
|
|
String::from_utf8(store_delta.new_value)
|
|
.unwrap()
|
|
.split(':')
|
|
.nth(1)
|
|
.unwrap(),
|
|
)
|
|
.unwrap();
|
|
let tx = change.transaction.unwrap();
|
|
let builder = transaction_changes
|
|
.entry(tx.index)
|
|
.or_insert_with(|| TransactionChangesBuilder::new(&tx.into()));
|
|
|
|
builder.add_entity_change(&EntityChanges {
|
|
component_id: change.pool_address.to_hex(),
|
|
attributes: vec![Attribute {
|
|
name: "liquidity".to_string(),
|
|
value: new_value_bigint.to_signed_bytes_be(),
|
|
change: ChangeType::Update.into(),
|
|
}],
|
|
});
|
|
});
|
|
|
|
// Insert others changes
|
|
events
|
|
.pool_events
|
|
.into_iter()
|
|
.flat_map(event_to_attributes_updates)
|
|
.for_each(|(tx, pool_address, attr)| {
|
|
let builder = transaction_changes
|
|
.entry(tx.index)
|
|
.or_insert_with(|| TransactionChangesBuilder::new(&tx));
|
|
builder.add_entity_change(&EntityChanges {
|
|
component_id: pool_address.to_hex(),
|
|
attributes: vec![attr],
|
|
});
|
|
});
|
|
|
|
Ok(BlockChanges {
|
|
block: Some((&block).into()),
|
|
changes: transaction_changes
|
|
.drain()
|
|
.sorted_unstable_by_key(|(index, _)| *index)
|
|
.filter_map(|(_, builder)| builder.build())
|
|
.collect::<Vec<_>>(),
|
|
})
|
|
}
|
|
|
|
fn event_to_attributes_updates(event: PoolEvent) -> Vec<(Transaction, PoolAddress, Attribute)> {
|
|
match event.r#type.as_ref().unwrap() {
|
|
pool_event::Type::Initialize(initalize) => {
|
|
let (zero_to_one, one_to_zero) = fee_to_default_protocol_fees(event.fee);
|
|
vec![
|
|
(
|
|
event
|
|
.transaction
|
|
.as_ref()
|
|
.unwrap()
|
|
.into(),
|
|
hex::decode(&event.pool_address).unwrap(),
|
|
Attribute {
|
|
name: "sqrt_price_x96".to_string(),
|
|
value: BigInt::from_str(&initalize.sqrt_price)
|
|
.unwrap()
|
|
.to_signed_bytes_be(),
|
|
change: ChangeType::Update.into(),
|
|
},
|
|
),
|
|
(
|
|
event
|
|
.transaction
|
|
.as_ref()
|
|
.unwrap()
|
|
.into(),
|
|
hex::decode(&event.pool_address).unwrap(),
|
|
Attribute {
|
|
name: "tick".to_string(),
|
|
value: BigInt::from(initalize.tick).to_signed_bytes_be(),
|
|
change: ChangeType::Update.into(),
|
|
},
|
|
),
|
|
(
|
|
event
|
|
.transaction
|
|
.as_ref()
|
|
.unwrap()
|
|
.into(),
|
|
hex::decode(&event.pool_address).unwrap(),
|
|
Attribute {
|
|
name: "protocol_fees/zero2one".to_string(),
|
|
value: BigInt::from(zero_to_one).to_signed_bytes_be(),
|
|
change: ChangeType::Update.into(),
|
|
},
|
|
),
|
|
(
|
|
event.transaction.unwrap().into(),
|
|
hex::decode(event.pool_address).unwrap(),
|
|
Attribute {
|
|
name: "protocol_fees/one2zero".to_string(),
|
|
value: BigInt::from(one_to_zero).to_signed_bytes_be(),
|
|
change: ChangeType::Update.into(),
|
|
},
|
|
),
|
|
]
|
|
}
|
|
pool_event::Type::Swap(swap) => vec![
|
|
(
|
|
event
|
|
.transaction
|
|
.as_ref()
|
|
.unwrap()
|
|
.into(),
|
|
hex::decode(&event.pool_address).unwrap(),
|
|
Attribute {
|
|
name: "sqrt_price_x96".to_string(),
|
|
value: BigInt::from_str(&swap.sqrt_price)
|
|
.unwrap()
|
|
.to_signed_bytes_be(),
|
|
change: ChangeType::Update.into(),
|
|
},
|
|
),
|
|
(
|
|
event.transaction.unwrap().into(),
|
|
hex::decode(event.pool_address).unwrap(),
|
|
Attribute {
|
|
name: "tick".to_string(),
|
|
value: BigInt::from(swap.tick).to_signed_bytes_be(),
|
|
change: ChangeType::Update.into(),
|
|
},
|
|
),
|
|
],
|
|
pool_event::Type::SetFeeProtocol(sfp) => vec![
|
|
(
|
|
event
|
|
.transaction
|
|
.as_ref()
|
|
.unwrap()
|
|
.into(),
|
|
hex::decode(&event.pool_address).unwrap(),
|
|
Attribute {
|
|
name: "protocol_fees/zero2one".to_string(),
|
|
value: BigInt::from(sfp.fee_protocol_0_new).to_signed_bytes_be(),
|
|
change: ChangeType::Update.into(),
|
|
},
|
|
),
|
|
(
|
|
event.transaction.unwrap().into(),
|
|
hex::decode(event.pool_address).unwrap(),
|
|
Attribute {
|
|
name: "protocol_fees/one2zero".to_string(),
|
|
value: BigInt::from(sfp.fee_protocol_1_new).to_signed_bytes_be(),
|
|
change: ChangeType::Update.into(),
|
|
},
|
|
),
|
|
],
|
|
_ => vec![],
|
|
}
|
|
}
|
|
|
|
// Map the pool fee to the default protocol fees.
|
|
// For the reference implementation see https://github.com/pancakeswap/pancake-v3-contracts/blob/5cc479f0c5a98966c74d94700057b8c3ca629afd/projects/v3-core/contracts/PancakeV3Pool.sol#L298-L306
|
|
fn fee_to_default_protocol_fees(fee: u64) -> (u64, u64) {
|
|
match fee {
|
|
100 => (3300, 3300),
|
|
500 => (3400, 3400),
|
|
2500 => (3200, 3200),
|
|
10000 => (3200, 3200),
|
|
_ => panic!("Unexpected fee value"),
|
|
}
|
|
}
|