refactor(substreams): improve logic to ignore updates (#96)
* refactor(substreams): ignore transaction if contracts update are ignored. There are some cases where we ignore contracts updates (for example if the old and new values are the same). In that case if the transaction only contains ignored updates we don't emit it. * refactor(substreams): ignore deletions for freshly created attributes. There are cases where an attribute can be created and deleted during the same transaction. To avoid sending a confusing deletion for something that was never created before, we just ignore the deletion in that particular case. * feat(substreams): Add uniswap V3 logs only module (#98) * feat(substreams): add uniswapV3 logs only Substreams module * refactor(substreams): encode everything as big endian * refactor(substreams): mark changes as creation when a tick liq is updated from 0 This will allow the SDK to detect cases where a tick is created and deleted in the same transaction and ignore it. * ci(substreams): ignore built files for uniswapv3 logs only module and clean code * refactor(substreams): update uniswapv3 substreams with new SDK interface * feat(subtreams): emit default token balances value for uniswapv3 --------- Co-authored-by: zizou <111426680+flopell@users.noreply.github.com> --------- Co-authored-by: zizou <111426680+flopell@users.noreply.github.com>
This commit is contained in:
@@ -0,0 +1,114 @@
|
||||
use std::str::FromStr;
|
||||
|
||||
use ethabi::ethereum_types::Address;
|
||||
use substreams::scalar::BigInt;
|
||||
use substreams_ethereum::pb::eth::v2::{self as eth};
|
||||
|
||||
use substreams_helper::{event_handler::EventHandler, hex::Hexable};
|
||||
|
||||
use crate::abi::factory::events::PoolCreated;
|
||||
|
||||
use tycho_substreams::prelude::*;
|
||||
|
||||
#[substreams::handlers::map]
|
||||
pub fn map_pools_created(
|
||||
params: String,
|
||||
block: eth::Block,
|
||||
) -> Result<BlockEntityChanges, substreams::errors::Error> {
|
||||
let mut new_pools: Vec<TransactionEntityChanges> = vec![];
|
||||
let factory_address = params.as_str();
|
||||
|
||||
get_new_pools(&block, &mut new_pools, factory_address);
|
||||
|
||||
Ok(BlockEntityChanges { block: None, changes: new_pools })
|
||||
}
|
||||
|
||||
// Extract new pools from PoolCreated events
|
||||
fn get_new_pools(
|
||||
block: ð::Block,
|
||||
new_pools: &mut Vec<TransactionEntityChanges>,
|
||||
factory_address: &str,
|
||||
) {
|
||||
// Extract new pools from PoolCreated events
|
||||
let mut on_pool_created = |event: PoolCreated, _tx: ð::TransactionTrace, _log: ð::Log| {
|
||||
let tycho_tx: Transaction = _tx.into();
|
||||
|
||||
new_pools.push(TransactionEntityChanges {
|
||||
tx: Some(tycho_tx.clone()),
|
||||
entity_changes: vec![EntityChanges {
|
||||
component_id: event.pool.clone().to_hex(),
|
||||
attributes: vec![
|
||||
Attribute {
|
||||
name: "liquidity".to_string(),
|
||||
value: BigInt::from(0).to_signed_bytes_be(),
|
||||
change: ChangeType::Creation.into(),
|
||||
},
|
||||
Attribute {
|
||||
name: "tick".to_string(),
|
||||
value: BigInt::from(0).to_signed_bytes_be(),
|
||||
change: ChangeType::Creation.into(),
|
||||
},
|
||||
Attribute {
|
||||
name: "sqrt_price_x96".to_string(),
|
||||
value: BigInt::from(0).to_signed_bytes_be(),
|
||||
change: ChangeType::Creation.into(),
|
||||
},
|
||||
],
|
||||
}],
|
||||
component_changes: vec![ProtocolComponent {
|
||||
id: event.pool.to_hex(),
|
||||
tokens: vec![event.token0.clone(), event.token1.clone()],
|
||||
contracts: vec![],
|
||||
static_att: vec![
|
||||
Attribute {
|
||||
name: "fee".to_string(),
|
||||
value: event.fee.to_signed_bytes_be(),
|
||||
change: ChangeType::Creation.into(),
|
||||
},
|
||||
Attribute {
|
||||
name: "tick_spacing".to_string(),
|
||||
value: event.tick_spacing.to_signed_bytes_be(),
|
||||
change: ChangeType::Creation.into(),
|
||||
},
|
||||
Attribute {
|
||||
name: "pool_address".to_string(),
|
||||
value: event.pool.clone(),
|
||||
change: ChangeType::Creation.into(),
|
||||
},
|
||||
],
|
||||
change: i32::from(ChangeType::Creation),
|
||||
protocol_type: Option::from(ProtocolType {
|
||||
name: "uniswap_v3_pool".to_string(),
|
||||
financial_type: FinancialType::Swap.into(),
|
||||
attribute_schema: vec![],
|
||||
implementation_type: ImplementationType::Custom.into(),
|
||||
}),
|
||||
tx: Some(tycho_tx),
|
||||
}],
|
||||
balance_changes: vec![
|
||||
BalanceChange {
|
||||
token: event.token0,
|
||||
balance: BigInt::from(0).to_signed_bytes_be(),
|
||||
component_id: event
|
||||
.pool
|
||||
.clone()
|
||||
.to_hex()
|
||||
.as_bytes()
|
||||
.to_vec(),
|
||||
},
|
||||
BalanceChange {
|
||||
token: event.token1,
|
||||
balance: BigInt::from(0).to_signed_bytes_be(),
|
||||
component_id: event.pool.to_hex().as_bytes().to_vec(),
|
||||
},
|
||||
],
|
||||
})
|
||||
};
|
||||
|
||||
let mut eh = EventHandler::new(block);
|
||||
|
||||
eh.filter_by_address(vec![Address::from_str(factory_address).unwrap()]);
|
||||
|
||||
eh.on::<PoolCreated, _>(&mut on_pool_created);
|
||||
eh.handle_events();
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
use std::str;
|
||||
|
||||
use substreams::store::{StoreNew, StoreSetIfNotExists, StoreSetIfNotExistsProto};
|
||||
use tycho_substreams::models::BlockEntityChanges;
|
||||
|
||||
use crate::pb::uniswap::v3::Pool;
|
||||
|
||||
#[substreams::handlers::store]
|
||||
pub fn store_pools(pools_created: BlockEntityChanges, store: StoreSetIfNotExistsProto<Pool>) {
|
||||
// Store pools. Required so the next maps can match any event to a known pool by their address
|
||||
|
||||
for change in pools_created.changes {
|
||||
for component_change in &change.component_changes {
|
||||
let pool_address: &str = &component_change.id;
|
||||
let pool: Pool = Pool {
|
||||
address: hex::decode(pool_address.trim_start_matches("0x")).unwrap(),
|
||||
token0: component_change.tokens[0].clone(),
|
||||
token1: component_change.tokens[1].clone(),
|
||||
created_tx_hash: change.tx.as_ref().unwrap().hash.clone(),
|
||||
};
|
||||
store.set_if_not_exists(0, format!("{}:{}", "Pool", pool_address), &pool);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,183 @@
|
||||
use anyhow::Ok;
|
||||
use substreams::{
|
||||
store::{StoreGet, StoreGetProto},
|
||||
Hex,
|
||||
};
|
||||
use substreams_ethereum::{
|
||||
pb::eth::v2::{self as eth, Log, TransactionTrace},
|
||||
Event,
|
||||
};
|
||||
use substreams_helper::hex::Hexable;
|
||||
|
||||
use crate::{
|
||||
abi::pool::events::{
|
||||
Burn, Collect, CollectProtocol, Flash, Initialize, Mint, SetFeeProtocol, Swap,
|
||||
},
|
||||
pb::uniswap::v3::{
|
||||
events::{
|
||||
pool_event::{self, Type},
|
||||
PoolEvent,
|
||||
},
|
||||
Events, Pool,
|
||||
},
|
||||
};
|
||||
|
||||
#[substreams::handlers::map]
|
||||
pub fn map_events(
|
||||
block: eth::Block,
|
||||
pools_store: StoreGetProto<Pool>,
|
||||
) -> Result<Events, anyhow::Error> {
|
||||
let mut pool_events = block
|
||||
.transaction_traces
|
||||
.into_iter()
|
||||
.filter(|tx| tx.status == 1)
|
||||
.flat_map(|tx| {
|
||||
tx.clone()
|
||||
.receipt
|
||||
.into_iter()
|
||||
.flat_map(|receipt| receipt.logs)
|
||||
.filter_map(|log| {
|
||||
let key = format!("{}:{}", "Pool", log.address.to_hex());
|
||||
// Skip if the log is not from a known uniswapV3 pool.
|
||||
if let Some(pool) = pools_store.get_last(key) {
|
||||
log_to_event(&log, pool, tx.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
pool_events.sort_unstable_by_key(|e| e.log_ordinal);
|
||||
|
||||
Ok(Events { pool_events })
|
||||
}
|
||||
|
||||
fn log_to_event(event: &Log, pool: Pool, tx: TransactionTrace) -> Option<PoolEvent> {
|
||||
if let Some(init) = Initialize::match_and_decode(event) {
|
||||
Some(PoolEvent {
|
||||
log_ordinal: event.ordinal,
|
||||
pool_address: Hex(pool.address).to_string(),
|
||||
token0: Hex(pool.token0).to_string(),
|
||||
token1: Hex(pool.token1).to_string(),
|
||||
transaction: Some(tx.into()),
|
||||
r#type: Some(Type::Initialize(pool_event::Initialize {
|
||||
sqrt_price: init.sqrt_price_x96.to_string(),
|
||||
tick: init.tick.into(),
|
||||
})),
|
||||
})
|
||||
} else if let Some(swap) = Swap::match_and_decode(event) {
|
||||
Some(PoolEvent {
|
||||
log_ordinal: event.ordinal,
|
||||
pool_address: Hex(pool.address).to_string(),
|
||||
token0: Hex(pool.token0).to_string(),
|
||||
token1: Hex(pool.token1).to_string(),
|
||||
transaction: Some(tx.into()),
|
||||
r#type: Some(Type::Swap(pool_event::Swap {
|
||||
sender: Hex(swap.sender).to_string(),
|
||||
recipient: Hex(swap.recipient).to_string(),
|
||||
amount_0: swap.amount0.to_string(),
|
||||
amount_1: swap.amount1.to_string(),
|
||||
sqrt_price: swap.sqrt_price_x96.to_string(),
|
||||
liquidity: swap.liquidity.to_string(),
|
||||
tick: swap.tick.into(),
|
||||
})),
|
||||
})
|
||||
} else if let Some(flash) = Flash::match_and_decode(event) {
|
||||
Some(PoolEvent {
|
||||
log_ordinal: event.ordinal,
|
||||
pool_address: Hex(pool.address).to_string(),
|
||||
token0: Hex(pool.token0).to_string(),
|
||||
token1: Hex(pool.token1).to_string(),
|
||||
transaction: Some(tx.into()),
|
||||
r#type: Some(Type::Flash(pool_event::Flash {
|
||||
sender: Hex(flash.sender).to_string(),
|
||||
recipient: Hex(flash.recipient).to_string(),
|
||||
amount_0: flash.amount0.to_string(),
|
||||
amount_1: flash.amount1.to_string(),
|
||||
paid_0: flash.paid0.to_string(),
|
||||
paid_1: flash.paid1.to_string(),
|
||||
})),
|
||||
})
|
||||
} else if let Some(mint) = Mint::match_and_decode(event) {
|
||||
Some(PoolEvent {
|
||||
log_ordinal: event.ordinal,
|
||||
pool_address: Hex(pool.address).to_string(),
|
||||
token0: Hex(pool.token0).to_string(),
|
||||
token1: Hex(pool.token1).to_string(),
|
||||
transaction: Some(tx.into()),
|
||||
r#type: Some(Type::Mint(pool_event::Mint {
|
||||
sender: Hex(mint.sender).to_string(),
|
||||
owner: Hex(mint.owner).to_string(),
|
||||
tick_lower: mint.tick_lower.into(),
|
||||
tick_upper: mint.tick_upper.into(),
|
||||
amount: mint.amount.to_string(),
|
||||
amount_0: mint.amount0.to_string(),
|
||||
amount_1: mint.amount1.to_string(),
|
||||
})),
|
||||
})
|
||||
} else if let Some(burn) = Burn::match_and_decode(event) {
|
||||
Some(PoolEvent {
|
||||
log_ordinal: event.ordinal,
|
||||
pool_address: Hex(pool.address).to_string(),
|
||||
token0: Hex(pool.token0).to_string(),
|
||||
token1: Hex(pool.token1).to_string(),
|
||||
transaction: Some(tx.into()),
|
||||
r#type: Some(Type::Burn(pool_event::Burn {
|
||||
owner: Hex(burn.owner).to_string(),
|
||||
tick_lower: burn.tick_lower.into(),
|
||||
tick_upper: burn.tick_upper.into(),
|
||||
amount: burn.amount.to_string(),
|
||||
amount_0: burn.amount0.to_string(),
|
||||
amount_1: burn.amount1.to_string(),
|
||||
})),
|
||||
})
|
||||
} else if let Some(collect) = Collect::match_and_decode(event) {
|
||||
Some(PoolEvent {
|
||||
log_ordinal: event.ordinal,
|
||||
pool_address: Hex(pool.address).to_string(),
|
||||
token0: Hex(pool.token0).to_string(),
|
||||
token1: Hex(pool.token1).to_string(),
|
||||
transaction: Some(tx.into()),
|
||||
r#type: Some(Type::Collect(pool_event::Collect {
|
||||
owner: Hex(collect.owner).to_string(),
|
||||
recipient: Hex(collect.recipient).to_string(),
|
||||
tick_lower: collect.tick_lower.into(),
|
||||
tick_upper: collect.tick_upper.into(),
|
||||
amount_0: collect.amount0.to_string(),
|
||||
amount_1: collect.amount1.to_string(),
|
||||
})),
|
||||
})
|
||||
} else if let Some(set_fp) = SetFeeProtocol::match_and_decode(event) {
|
||||
Some(PoolEvent {
|
||||
log_ordinal: event.ordinal,
|
||||
pool_address: Hex(pool.address).to_string(),
|
||||
token0: Hex(pool.token0).to_string(),
|
||||
token1: Hex(pool.token1).to_string(),
|
||||
transaction: Some(tx.into()),
|
||||
r#type: Some(Type::SetFeeProtocol(pool_event::SetFeeProtocol {
|
||||
fee_protocol_0_old: set_fp.fee_protocol0_old.to_u64(),
|
||||
fee_protocol_1_old: set_fp.fee_protocol1_old.to_u64(),
|
||||
fee_protocol_0_new: set_fp.fee_protocol0_new.to_u64(),
|
||||
fee_protocol_1_new: set_fp.fee_protocol1_new.to_u64(),
|
||||
})),
|
||||
})
|
||||
} else if let Some(cp) = CollectProtocol::match_and_decode(event) {
|
||||
Some(PoolEvent {
|
||||
log_ordinal: event.ordinal,
|
||||
pool_address: Hex(pool.address).to_string(),
|
||||
token0: Hex(pool.token0).to_string(),
|
||||
token1: Hex(pool.token1).to_string(),
|
||||
transaction: Some(tx.into()),
|
||||
r#type: Some(Type::CollectProtocol(pool_event::CollectProtocol {
|
||||
sender: Hex(cp.sender).to_string(),
|
||||
recipient: Hex(cp.recipient).to_string(),
|
||||
amount_0: cp.amount0.to_string(),
|
||||
amount_1: cp.amount1.to_string(),
|
||||
})),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,175 @@
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::Ok;
|
||||
use tycho_substreams::models::{BalanceDelta, BlockBalanceDeltas};
|
||||
|
||||
use crate::pb::uniswap::v3::{
|
||||
events::{pool_event, PoolEvent},
|
||||
Events,
|
||||
};
|
||||
use substreams::{
|
||||
scalar::BigInt,
|
||||
store::{StoreAddBigInt, StoreNew},
|
||||
};
|
||||
|
||||
#[substreams::handlers::map]
|
||||
pub fn map_balance_changes(events: Events) -> Result<BlockBalanceDeltas, anyhow::Error> {
|
||||
let balance_deltas = events
|
||||
.pool_events
|
||||
.into_iter()
|
||||
.flat_map(event_to_balance_deltas)
|
||||
.collect();
|
||||
|
||||
Ok(BlockBalanceDeltas { balance_deltas })
|
||||
}
|
||||
|
||||
#[substreams::handlers::store]
|
||||
pub fn store_pools_balances(balances_deltas: BlockBalanceDeltas, store: StoreAddBigInt) {
|
||||
tycho_substreams::balances::store_balance_changes(balances_deltas, store);
|
||||
}
|
||||
|
||||
fn event_to_balance_deltas(event: PoolEvent) -> Vec<BalanceDelta> {
|
||||
let address = format!("0x{}", event.pool_address)
|
||||
.as_bytes()
|
||||
.to_vec();
|
||||
match event.r#type.unwrap() {
|
||||
pool_event::Type::Mint(e) => vec![
|
||||
BalanceDelta {
|
||||
token: hex::decode(event.token0.clone()).unwrap(),
|
||||
delta: BigInt::from_str(&e.amount_0)
|
||||
.unwrap()
|
||||
.to_signed_bytes_be(),
|
||||
component_id: address.clone(),
|
||||
ord: event.log_ordinal,
|
||||
tx: event
|
||||
.transaction
|
||||
.clone()
|
||||
.map(Into::into),
|
||||
},
|
||||
BalanceDelta {
|
||||
token: hex::decode(event.token1.clone()).unwrap(),
|
||||
delta: BigInt::from_str(&e.amount_1)
|
||||
.unwrap()
|
||||
.to_signed_bytes_be(),
|
||||
component_id: address,
|
||||
ord: event.log_ordinal,
|
||||
tx: event.transaction.map(Into::into),
|
||||
},
|
||||
],
|
||||
pool_event::Type::Collect(e) => vec![
|
||||
BalanceDelta {
|
||||
token: hex::decode(event.token0.clone()).unwrap(),
|
||||
delta: BigInt::from_str(&e.amount_0)
|
||||
.unwrap()
|
||||
.neg()
|
||||
.to_signed_bytes_be(),
|
||||
component_id: address.clone(),
|
||||
ord: event.log_ordinal,
|
||||
tx: event
|
||||
.transaction
|
||||
.clone()
|
||||
.map(Into::into),
|
||||
},
|
||||
BalanceDelta {
|
||||
token: hex::decode(event.token1.clone()).unwrap(),
|
||||
delta: BigInt::from_str(&e.amount_1)
|
||||
.unwrap()
|
||||
.neg()
|
||||
.to_signed_bytes_be(),
|
||||
component_id: address,
|
||||
ord: event.log_ordinal,
|
||||
tx: event.transaction.map(Into::into),
|
||||
},
|
||||
],
|
||||
//Burn balance changes are accounted for in the Collect event.
|
||||
pool_event::Type::Burn(_) => vec![],
|
||||
pool_event::Type::Swap(e) => {
|
||||
vec![
|
||||
BalanceDelta {
|
||||
token: hex::decode(event.token0.clone()).unwrap(),
|
||||
delta: BigInt::from_str(&e.amount_0)
|
||||
.unwrap()
|
||||
.to_signed_bytes_be(),
|
||||
component_id: address.clone(),
|
||||
ord: event.log_ordinal,
|
||||
tx: event
|
||||
.transaction
|
||||
.clone()
|
||||
.map(Into::into),
|
||||
},
|
||||
BalanceDelta {
|
||||
token: hex::decode(event.token1.clone()).unwrap(),
|
||||
delta: BigInt::from_str(&e.amount_1)
|
||||
.unwrap()
|
||||
.to_signed_bytes_be(),
|
||||
component_id: address.clone(),
|
||||
ord: event.log_ordinal,
|
||||
tx: event.transaction.map(Into::into),
|
||||
},
|
||||
]
|
||||
}
|
||||
pool_event::Type::Flash(e) => vec![
|
||||
BalanceDelta {
|
||||
token: hex::decode(event.token0).unwrap(),
|
||||
delta: BigInt::from_str(&e.paid_0)
|
||||
.unwrap()
|
||||
.clone()
|
||||
.to_signed_bytes_be(),
|
||||
component_id: address.clone(),
|
||||
ord: event.log_ordinal,
|
||||
tx: event
|
||||
.transaction
|
||||
.clone()
|
||||
.map(Into::into),
|
||||
},
|
||||
BalanceDelta {
|
||||
token: hex::decode(event.token1).unwrap(),
|
||||
delta: BigInt::from_str(&e.paid_1)
|
||||
.unwrap()
|
||||
.clone()
|
||||
.to_signed_bytes_be(),
|
||||
component_id: address,
|
||||
ord: event.log_ordinal,
|
||||
tx: event.transaction.map(Into::into),
|
||||
},
|
||||
],
|
||||
pool_event::Type::CollectProtocol(e) => {
|
||||
vec![
|
||||
BalanceDelta {
|
||||
token: hex::decode(event.token0).unwrap(),
|
||||
delta: BigInt::from_str(&e.amount_0)
|
||||
.unwrap()
|
||||
.neg()
|
||||
.clone()
|
||||
.to_signed_bytes_be(),
|
||||
component_id: event
|
||||
.pool_address
|
||||
.clone()
|
||||
.as_bytes()
|
||||
.to_vec(),
|
||||
ord: event.log_ordinal,
|
||||
tx: event
|
||||
.transaction
|
||||
.clone()
|
||||
.map(Into::into),
|
||||
},
|
||||
BalanceDelta {
|
||||
token: hex::decode(event.token1).unwrap(),
|
||||
delta: BigInt::from_str(&e.amount_1)
|
||||
.unwrap()
|
||||
.clone()
|
||||
.neg()
|
||||
.to_signed_bytes_be(),
|
||||
component_id: event
|
||||
.pool_address
|
||||
.clone()
|
||||
.as_bytes()
|
||||
.to_vec(),
|
||||
ord: event.log_ordinal,
|
||||
tx: event.transaction.map(Into::into),
|
||||
},
|
||||
]
|
||||
}
|
||||
_ => vec![],
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,137 @@
|
||||
use std::str::FromStr;
|
||||
|
||||
use substreams::store::{
|
||||
StoreGet, StoreGetInt64, StoreSet, StoreSetInt64, StoreSetSum, StoreSetSumBigInt,
|
||||
};
|
||||
|
||||
use crate::pb::uniswap::v3::{
|
||||
events::{pool_event, PoolEvent},
|
||||
Events, LiquidityChange, LiquidityChangeType, LiquidityChanges,
|
||||
};
|
||||
|
||||
use substreams::{scalar::BigInt, store::StoreNew};
|
||||
|
||||
use anyhow::Ok;
|
||||
|
||||
#[substreams::handlers::store]
|
||||
pub fn store_pool_current_tick(events: Events, store: StoreSetInt64) {
|
||||
events
|
||||
.pool_events
|
||||
.into_iter()
|
||||
.filter_map(event_to_current_tick)
|
||||
.for_each(|(pool, ordinal, new_tick_index)| {
|
||||
store.set(ordinal, format!("pool:{0}", pool), &new_tick_index.into())
|
||||
});
|
||||
}
|
||||
|
||||
#[substreams::handlers::map]
|
||||
pub fn map_liquidity_changes(
|
||||
events: Events,
|
||||
pools_current_tick_store: StoreGetInt64,
|
||||
) -> Result<LiquidityChanges, anyhow::Error> {
|
||||
let mut changes = events
|
||||
.pool_events
|
||||
.into_iter()
|
||||
.filter(PoolEvent::can_introduce_liquidity_changes)
|
||||
.map(|e| {
|
||||
(
|
||||
pools_current_tick_store
|
||||
.get_at(e.log_ordinal, format!("pool:{0}", &e.pool_address))
|
||||
.unwrap_or(0),
|
||||
e,
|
||||
)
|
||||
})
|
||||
.filter_map(|(current_tick, event)| event_to_liquidity_deltas(current_tick, event))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
changes.sort_unstable_by_key(|l| l.ordinal);
|
||||
Ok(LiquidityChanges { changes })
|
||||
}
|
||||
|
||||
#[substreams::handlers::store]
|
||||
pub fn store_liquidity(ticks_deltas: LiquidityChanges, store: StoreSetSumBigInt) {
|
||||
ticks_deltas
|
||||
.changes
|
||||
.iter()
|
||||
.for_each(|changes| match changes.change_type() {
|
||||
LiquidityChangeType::Delta => {
|
||||
store.sum(
|
||||
changes.ordinal,
|
||||
format!("pool:{0}", hex::encode(&changes.pool_address)),
|
||||
BigInt::from_signed_bytes_be(&changes.value),
|
||||
);
|
||||
}
|
||||
LiquidityChangeType::Absolute => {
|
||||
store.set(
|
||||
changes.ordinal,
|
||||
format!("pool:{0}", hex::encode(&changes.pool_address)),
|
||||
BigInt::from_signed_bytes_be(&changes.value),
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn event_to_liquidity_deltas(current_tick: i64, event: PoolEvent) -> Option<LiquidityChange> {
|
||||
match event.r#type.as_ref().unwrap() {
|
||||
pool_event::Type::Mint(mint) => {
|
||||
if current_tick >= mint.tick_lower.into() && current_tick < mint.tick_upper.into() {
|
||||
Some(LiquidityChange {
|
||||
pool_address: hex::decode(event.pool_address).unwrap(),
|
||||
value: BigInt::from_str(&mint.amount)
|
||||
.unwrap()
|
||||
.to_signed_bytes_be(),
|
||||
change_type: LiquidityChangeType::Delta.into(),
|
||||
ordinal: event.log_ordinal,
|
||||
transaction: Some(event.transaction.unwrap()),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
pool_event::Type::Burn(burn) => {
|
||||
if current_tick >= burn.tick_lower.into() && current_tick < burn.tick_upper.into() {
|
||||
Some(LiquidityChange {
|
||||
pool_address: hex::decode(event.pool_address).unwrap(),
|
||||
value: BigInt::from_str(&burn.amount)
|
||||
.unwrap()
|
||||
.neg()
|
||||
.to_signed_bytes_be(),
|
||||
change_type: LiquidityChangeType::Delta.into(),
|
||||
ordinal: event.log_ordinal,
|
||||
transaction: Some(event.transaction.unwrap()),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
pool_event::Type::Swap(swap) => Some(LiquidityChange {
|
||||
pool_address: hex::decode(event.pool_address).unwrap(),
|
||||
value: BigInt::from_str(&swap.liquidity)
|
||||
.unwrap()
|
||||
.to_signed_bytes_be(),
|
||||
change_type: LiquidityChangeType::Absolute.into(),
|
||||
ordinal: event.log_ordinal,
|
||||
transaction: Some(event.transaction.unwrap()),
|
||||
}),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
impl PoolEvent {
|
||||
fn can_introduce_liquidity_changes(&self) -> bool {
|
||||
matches!(
|
||||
self.r#type.as_ref().unwrap(),
|
||||
pool_event::Type::Mint(_) | pool_event::Type::Burn(_) | pool_event::Type::Swap(_)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn event_to_current_tick(event: PoolEvent) -> Option<(String, u64, i32)> {
|
||||
match event.r#type.as_ref().unwrap() {
|
||||
pool_event::Type::Initialize(initialize) => {
|
||||
Some((event.pool_address, event.log_ordinal, initialize.tick))
|
||||
}
|
||||
pool_event::Type::Swap(swap) => Some((event.pool_address, event.log_ordinal, swap.tick)),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,91 @@
|
||||
use std::str::FromStr;
|
||||
|
||||
use substreams::store::StoreAddBigInt;
|
||||
|
||||
use crate::pb::uniswap::v3::{
|
||||
events::{pool_event, PoolEvent},
|
||||
Events, TickDelta, TickDeltas,
|
||||
};
|
||||
|
||||
use substreams::{
|
||||
scalar::BigInt,
|
||||
store::{StoreAdd, StoreNew},
|
||||
};
|
||||
|
||||
use anyhow::Ok;
|
||||
|
||||
#[substreams::handlers::map]
|
||||
pub fn map_ticks_changes(events: Events) -> Result<TickDeltas, anyhow::Error> {
|
||||
let ticks_deltas = events
|
||||
.pool_events
|
||||
.into_iter()
|
||||
.flat_map(event_to_ticks_deltas)
|
||||
.collect();
|
||||
|
||||
Ok(TickDeltas { deltas: ticks_deltas })
|
||||
}
|
||||
|
||||
#[substreams::handlers::store]
|
||||
pub fn store_ticks_liquidity(ticks_deltas: TickDeltas, store: StoreAddBigInt) {
|
||||
let mut deltas = ticks_deltas.deltas.clone();
|
||||
|
||||
deltas.sort_unstable_by_key(|delta| delta.ordinal);
|
||||
|
||||
deltas.iter().for_each(|delta| {
|
||||
store.add(
|
||||
delta.ordinal,
|
||||
format!("pool:{0}:tick:{1}", hex::encode(&delta.pool_address), delta.tick_index,),
|
||||
BigInt::from_signed_bytes_be(&delta.liquidity_net_delta),
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
fn event_to_ticks_deltas(event: PoolEvent) -> Vec<TickDelta> {
|
||||
match event.r#type.as_ref().unwrap() {
|
||||
pool_event::Type::Mint(mint) => {
|
||||
vec![
|
||||
TickDelta {
|
||||
pool_address: hex::decode(&event.pool_address).unwrap(),
|
||||
tick_index: mint.tick_lower,
|
||||
liquidity_net_delta: BigInt::from_str(&mint.amount)
|
||||
.unwrap()
|
||||
.to_signed_bytes_be(),
|
||||
ordinal: event.log_ordinal,
|
||||
transaction: event.transaction.clone(),
|
||||
},
|
||||
TickDelta {
|
||||
pool_address: hex::decode(&event.pool_address).unwrap(),
|
||||
tick_index: mint.tick_upper,
|
||||
liquidity_net_delta: BigInt::from_str(&mint.amount)
|
||||
.unwrap()
|
||||
.neg()
|
||||
.to_signed_bytes_be(),
|
||||
ordinal: event.log_ordinal,
|
||||
transaction: event.transaction,
|
||||
},
|
||||
]
|
||||
}
|
||||
pool_event::Type::Burn(burn) => vec![
|
||||
TickDelta {
|
||||
pool_address: hex::decode(&event.pool_address).unwrap(),
|
||||
tick_index: burn.tick_lower,
|
||||
liquidity_net_delta: BigInt::from_str(&burn.amount)
|
||||
.unwrap()
|
||||
.neg()
|
||||
.to_signed_bytes_be(),
|
||||
ordinal: event.log_ordinal,
|
||||
transaction: event.transaction.clone(),
|
||||
},
|
||||
TickDelta {
|
||||
pool_address: hex::decode(&event.pool_address).unwrap(),
|
||||
tick_index: burn.tick_upper,
|
||||
liquidity_net_delta: BigInt::from_str(&burn.amount)
|
||||
.unwrap()
|
||||
.to_signed_bytes_be(),
|
||||
ordinal: event.log_ordinal,
|
||||
transaction: event.transaction,
|
||||
},
|
||||
],
|
||||
_ => vec![],
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,248 @@
|
||||
use crate::pb::uniswap::v3::{
|
||||
events::{pool_event, PoolEvent},
|
||||
Events, LiquidityChanges, TickDeltas,
|
||||
};
|
||||
use itertools::Itertools;
|
||||
use std::{collections::HashMap, str::FromStr, vec};
|
||||
use substreams::{pb::substreams::StoreDeltas, scalar::BigInt};
|
||||
use substreams_ethereum::pb::eth::v2::{self as eth};
|
||||
use substreams_helper::hex::Hexable;
|
||||
use tycho_substreams::{balances::aggregate_balances_changes, prelude::*};
|
||||
|
||||
type PoolAddress = Vec<u8>;
|
||||
|
||||
#[substreams::handlers::map]
|
||||
pub fn map_protocol_changes(
|
||||
block: eth::Block,
|
||||
created_pools: BlockEntityChanges,
|
||||
events: Events,
|
||||
balances_map_deltas: BlockBalanceDeltas,
|
||||
balances_store_deltas: StoreDeltas,
|
||||
ticks_map_deltas: TickDeltas,
|
||||
ticks_store_deltas: StoreDeltas,
|
||||
pool_liquidity_changes: LiquidityChanges,
|
||||
pool_liquidity_store_deltas: StoreDeltas,
|
||||
) -> Result<BlockChanges, substreams::errors::Error> {
|
||||
// We merge contract changes by transaction (identified by transaction index) making it easy to
|
||||
// sort them at the very end.
|
||||
let mut transaction_changes: HashMap<_, TransactionChangesBuilder> = HashMap::new();
|
||||
|
||||
// Add created pools to the tx_changes_map
|
||||
for change in created_pools.changes.into_iter() {
|
||||
let tx = change.tx.as_ref().unwrap();
|
||||
let builder = transaction_changes
|
||||
.entry(tx.index)
|
||||
.or_insert_with(|| TransactionChangesBuilder::new(tx));
|
||||
change
|
||||
.component_changes
|
||||
.iter()
|
||||
.for_each(|c| {
|
||||
builder.add_protocol_component(c);
|
||||
});
|
||||
change
|
||||
.entity_changes
|
||||
.iter()
|
||||
.for_each(|ec| {
|
||||
builder.add_entity_change(ec);
|
||||
});
|
||||
change
|
||||
.balance_changes
|
||||
.iter()
|
||||
.for_each(|bc| {
|
||||
builder.add_balance_change(bc);
|
||||
});
|
||||
}
|
||||
|
||||
// Balance changes are gathered by the `StoreDelta` based on `PoolBalanceChanged` creating
|
||||
// `BlockBalanceDeltas`. We essentially just process the changes that occurred to the `store`
|
||||
// this block. Then, these balance changes are merged onto the existing map of tx contract
|
||||
// changes, inserting a new one if it doesn't exist.
|
||||
aggregate_balances_changes(balances_store_deltas, balances_map_deltas)
|
||||
.into_iter()
|
||||
.for_each(|(_, (tx, balances))| {
|
||||
let builder = transaction_changes
|
||||
.entry(tx.index)
|
||||
.or_insert_with(|| TransactionChangesBuilder::new(&tx));
|
||||
balances
|
||||
.values()
|
||||
.for_each(|token_bc_map| {
|
||||
token_bc_map
|
||||
.values()
|
||||
.for_each(|bc| builder.add_balance_change(bc))
|
||||
});
|
||||
});
|
||||
|
||||
// Insert ticks net-liquidity changes
|
||||
ticks_store_deltas
|
||||
.deltas
|
||||
.into_iter()
|
||||
.zip(ticks_map_deltas.deltas)
|
||||
.for_each(|(store_delta, tick_delta)| {
|
||||
let new_value_bigint =
|
||||
BigInt::from_str(&String::from_utf8(store_delta.new_value).unwrap()).unwrap();
|
||||
|
||||
// If old value is empty or the int value is 0, it's considered as a creation.
|
||||
let is_creation = store_delta.old_value.is_empty() ||
|
||||
BigInt::from_str(&String::from_utf8(store_delta.old_value).unwrap())
|
||||
.unwrap()
|
||||
.is_zero();
|
||||
let attribute_name = format!("ticks/{}/net-liquidity", tick_delta.tick_index);
|
||||
let attribute = Attribute {
|
||||
name: attribute_name,
|
||||
value: new_value_bigint.to_signed_bytes_be(),
|
||||
change: if is_creation {
|
||||
ChangeType::Creation.into()
|
||||
} else if new_value_bigint.is_zero() {
|
||||
ChangeType::Deletion.into()
|
||||
} else {
|
||||
ChangeType::Update.into()
|
||||
},
|
||||
};
|
||||
let tx = tick_delta.transaction.unwrap();
|
||||
let builder = transaction_changes
|
||||
.entry(tx.index)
|
||||
.or_insert_with(|| TransactionChangesBuilder::new(&tx.into()));
|
||||
|
||||
builder.add_entity_change(&EntityChanges {
|
||||
component_id: tick_delta.pool_address.to_hex(),
|
||||
attributes: vec![attribute],
|
||||
});
|
||||
});
|
||||
|
||||
// Insert liquidity changes
|
||||
pool_liquidity_store_deltas
|
||||
.deltas
|
||||
.into_iter()
|
||||
.zip(pool_liquidity_changes.changes)
|
||||
.for_each(|(store_delta, change)| {
|
||||
let new_value_bigint = BigInt::from_str(
|
||||
String::from_utf8(store_delta.new_value)
|
||||
.unwrap()
|
||||
.split(':')
|
||||
.nth(1)
|
||||
.unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
let tx = change.transaction.unwrap();
|
||||
let builder = transaction_changes
|
||||
.entry(tx.index)
|
||||
.or_insert_with(|| TransactionChangesBuilder::new(&tx.into()));
|
||||
|
||||
builder.add_entity_change(&EntityChanges {
|
||||
component_id: change.pool_address.to_hex(),
|
||||
attributes: vec![Attribute {
|
||||
name: "liquidity".to_string(),
|
||||
value: new_value_bigint.to_signed_bytes_be(),
|
||||
change: ChangeType::Update.into(),
|
||||
}],
|
||||
});
|
||||
});
|
||||
|
||||
// Insert others changes
|
||||
events
|
||||
.pool_events
|
||||
.into_iter()
|
||||
.flat_map(event_to_attributes_updates)
|
||||
.for_each(|(tx, pool_address, attr)| {
|
||||
let builder = transaction_changes
|
||||
.entry(tx.index)
|
||||
.or_insert_with(|| TransactionChangesBuilder::new(&tx));
|
||||
builder.add_entity_change(&EntityChanges {
|
||||
component_id: pool_address.to_hex(),
|
||||
attributes: vec![attr],
|
||||
});
|
||||
});
|
||||
|
||||
Ok(BlockChanges {
|
||||
block: Some((&block).into()),
|
||||
changes: transaction_changes
|
||||
.drain()
|
||||
.sorted_unstable_by_key(|(index, _)| *index)
|
||||
.filter_map(|(_, builder)| builder.build())
|
||||
.collect::<Vec<_>>(),
|
||||
})
|
||||
}
|
||||
|
||||
fn event_to_attributes_updates(event: PoolEvent) -> Vec<(Transaction, PoolAddress, Attribute)> {
|
||||
match event.r#type.as_ref().unwrap() {
|
||||
pool_event::Type::Initialize(initalize) => {
|
||||
vec![
|
||||
(
|
||||
event
|
||||
.transaction
|
||||
.clone()
|
||||
.unwrap()
|
||||
.into(),
|
||||
hex::decode(event.pool_address.clone()).unwrap(),
|
||||
Attribute {
|
||||
name: "sqrt_price_x96".to_string(),
|
||||
value: BigInt::from_str(&initalize.sqrt_price)
|
||||
.unwrap()
|
||||
.to_signed_bytes_be(),
|
||||
change: ChangeType::Update.into(),
|
||||
},
|
||||
),
|
||||
(
|
||||
event.transaction.unwrap().into(),
|
||||
hex::decode(event.pool_address).unwrap(),
|
||||
Attribute {
|
||||
name: "tick".to_string(),
|
||||
value: BigInt::from(initalize.tick).to_signed_bytes_be(),
|
||||
change: ChangeType::Update.into(),
|
||||
},
|
||||
),
|
||||
]
|
||||
}
|
||||
pool_event::Type::Swap(swap) => vec![
|
||||
(
|
||||
event
|
||||
.transaction
|
||||
.clone()
|
||||
.unwrap()
|
||||
.into(),
|
||||
hex::decode(event.pool_address.clone()).unwrap(),
|
||||
Attribute {
|
||||
name: "sqrt_price_x96".to_string(),
|
||||
value: BigInt::from_str(&swap.sqrt_price)
|
||||
.unwrap()
|
||||
.to_signed_bytes_be(),
|
||||
change: ChangeType::Update.into(),
|
||||
},
|
||||
),
|
||||
(
|
||||
event.transaction.unwrap().into(),
|
||||
hex::decode(event.pool_address).unwrap(),
|
||||
Attribute {
|
||||
name: "tick".to_string(),
|
||||
value: BigInt::from(swap.tick).to_signed_bytes_be(),
|
||||
change: ChangeType::Update.into(),
|
||||
},
|
||||
),
|
||||
],
|
||||
pool_event::Type::SetFeeProtocol(sfp) => vec![
|
||||
(
|
||||
event
|
||||
.transaction
|
||||
.clone()
|
||||
.unwrap()
|
||||
.into(),
|
||||
hex::decode(event.pool_address.clone()).unwrap(),
|
||||
Attribute {
|
||||
name: "protocol_fees/token0".to_string(),
|
||||
value: BigInt::from(sfp.fee_protocol_0_new).to_signed_bytes_be(),
|
||||
change: ChangeType::Update.into(),
|
||||
},
|
||||
),
|
||||
(
|
||||
event.transaction.unwrap().into(),
|
||||
hex::decode(event.pool_address).unwrap(),
|
||||
Attribute {
|
||||
name: "protocol_fees/token1".to_string(),
|
||||
value: BigInt::from(sfp.fee_protocol_1_new).to_signed_bytes_be(),
|
||||
change: ChangeType::Update.into(),
|
||||
},
|
||||
),
|
||||
],
|
||||
_ => vec![],
|
||||
}
|
||||
}
|
||||
39
substreams/ethereum-uniswap-v3-logs-only/src/modules/mod.rs
Normal file
39
substreams/ethereum-uniswap-v3-logs-only/src/modules/mod.rs
Normal file
@@ -0,0 +1,39 @@
|
||||
pub use map_pool_created::map_pools_created;
|
||||
pub use map_protocol_changes::map_protocol_changes;
|
||||
pub use store_pools::store_pools;
|
||||
use substreams_ethereum::pb::eth::v2::TransactionTrace;
|
||||
|
||||
use crate::pb::uniswap::v3::Transaction;
|
||||
|
||||
#[path = "1_map_pool_created.rs"]
|
||||
mod map_pool_created;
|
||||
|
||||
#[path = "2_store_pools.rs"]
|
||||
mod store_pools;
|
||||
|
||||
#[path = "3_map_events.rs"]
|
||||
mod map_events;
|
||||
|
||||
#[path = "4_map_and_store_balance_changes.rs"]
|
||||
mod map_store_balance_changes;
|
||||
|
||||
#[path = "4_map_and_store_ticks.rs"]
|
||||
mod map_store_ticks;
|
||||
|
||||
#[path = "4_map_and_store_liquidity.rs"]
|
||||
mod map_store_liquidity;
|
||||
|
||||
#[path = "5_map_protocol_changes.rs"]
|
||||
mod map_protocol_changes;
|
||||
|
||||
impl From<TransactionTrace> for Transaction {
|
||||
fn from(value: TransactionTrace) -> Self {
|
||||
Self { hash: value.hash, from: value.from, to: value.to, index: value.index.into() }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Transaction> for tycho_substreams::prelude::Transaction {
|
||||
fn from(value: Transaction) -> Self {
|
||||
Self { hash: value.hash, from: value.from, to: value.to, index: value.index }
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user