Ekubo TWAMM & MEV-resist integration (#192)
* Add Ekubo TWAMM support * Change order of words * Account TWAMM order balances * Fix tracking wrong component balance deltas Swapped and PositionUpdated are the only events affecting pool TVL * Fix fee addition Fees are a .64 instead of a .128 since v2 & the result is rounded * Consistent naming * cargo fmt * Add method for selecting store method from change type * Only store the affected sale rate delta on OrderUpdated events * Remove unnecessary parameterization * Index Ekubo MEV-resist pools * cargo clippy
This commit is contained in:
@@ -7,14 +7,14 @@ use substreams_ethereum::{
|
||||
};
|
||||
|
||||
use crate::{
|
||||
abi::core::events as abi_events,
|
||||
abi::{core::events as core_events, twamm::events as twamm_events},
|
||||
deployment_config::DeploymentConfig,
|
||||
pb::ekubo::{
|
||||
block_transaction_events::{
|
||||
transaction_events::{
|
||||
pool_log::{
|
||||
pool_initialized::Extension, Event, FeesAccumulated, PoolInitialized,
|
||||
PositionFeesCollected, PositionUpdated, Swapped,
|
||||
order_updated::OrderKey, pool_initialized::Extension, Event, OrderUpdated,
|
||||
PoolInitialized, PositionUpdated, Swapped, VirtualOrdersExecuted,
|
||||
},
|
||||
PoolLog,
|
||||
},
|
||||
@@ -22,7 +22,7 @@ use crate::{
|
||||
},
|
||||
BlockTransactionEvents,
|
||||
},
|
||||
pool_config::PoolConfig,
|
||||
pool_key::{PoolConfig, PoolKey},
|
||||
sqrt_ratio::float_sqrt_ratio_to_fixed,
|
||||
};
|
||||
|
||||
@@ -43,84 +43,114 @@ fn map_events(params: String, block: eth::v2::Block) -> BlockTransactionEvents {
|
||||
.then(|| TransactionEvents { transaction: Some(trace.into()), pool_logs })
|
||||
})
|
||||
.collect(),
|
||||
timestamp: block
|
||||
.header
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.timestamp
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.seconds
|
||||
.try_into()
|
||||
.unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
fn maybe_pool_log(log: &Log, config: &DeploymentConfig) -> Option<PoolLog> {
|
||||
if log.address != config.core {
|
||||
return None;
|
||||
}
|
||||
let (pool_id, ev) = if log.address == config.core {
|
||||
if log.topics.is_empty() {
|
||||
let data = &log.data;
|
||||
|
||||
let (pool_id, ev) = if log.topics.is_empty() {
|
||||
let data = &log.data;
|
||||
assert!(data.len() == 116, "swap event data length mismatch");
|
||||
|
||||
assert!(data.len() == 116, "swap event data length mismatch");
|
||||
(
|
||||
data[20..52].to_vec(),
|
||||
Event::Swapped(Swapped {
|
||||
delta0: data[52..68].to_vec(),
|
||||
delta1: data[68..84].to_vec(),
|
||||
liquidity_after: data[84..100].to_vec(),
|
||||
sqrt_ratio_after: float_sqrt_ratio_to_fixed(BigInt::from_unsigned_bytes_be(
|
||||
&data[100..112],
|
||||
)),
|
||||
tick_after: i32::from_be_bytes(data[112..116].try_into().unwrap()),
|
||||
}),
|
||||
)
|
||||
} else if let Some(ev) = core_events::PositionUpdated::match_and_decode(log) {
|
||||
(
|
||||
ev.pool_id.to_vec(),
|
||||
Event::PositionUpdated(PositionUpdated {
|
||||
lower: ev.params.1 .0.to_i32(),
|
||||
upper: ev.params.1 .1.to_i32(),
|
||||
liquidity_delta: ev.params.2.to_signed_bytes_be(),
|
||||
delta0: ev.delta0.to_signed_bytes_be(),
|
||||
delta1: ev.delta1.to_signed_bytes_be(),
|
||||
}),
|
||||
)
|
||||
} else if let Some(ev) = core_events::PoolInitialized::match_and_decode(log) {
|
||||
let pool_config = PoolConfig::from(ev.pool_key.2);
|
||||
|
||||
(
|
||||
data[20..52].to_vec(),
|
||||
Event::Swapped(Swapped {
|
||||
delta0: data[52..68].to_vec(),
|
||||
delta1: data[68..84].to_vec(),
|
||||
liquidity_after: data[84..100].to_vec(),
|
||||
sqrt_ratio_after: float_sqrt_ratio_to_fixed(BigInt::from_unsigned_bytes_be(
|
||||
&data[100..112],
|
||||
)),
|
||||
tick_after: i32::from_be_bytes(data[112..116].try_into().unwrap()),
|
||||
}),
|
||||
)
|
||||
} else if let Some(ev) = abi_events::PositionUpdated::match_and_decode(log) {
|
||||
(
|
||||
ev.pool_id.to_vec(),
|
||||
Event::PositionUpdated(PositionUpdated {
|
||||
lower: ev.params.1 .0.to_i32(),
|
||||
upper: ev.params.1 .1.to_i32(),
|
||||
liquidity_delta: ev.params.2.to_signed_bytes_be(),
|
||||
delta0: ev.delta0.to_signed_bytes_be(),
|
||||
delta1: ev.delta1.to_signed_bytes_be(),
|
||||
}),
|
||||
)
|
||||
} else if let Some(ev) = abi_events::PositionFeesCollected::match_and_decode(log) {
|
||||
(
|
||||
ev.pool_id.to_vec(),
|
||||
Event::PositionFeesCollected(PositionFeesCollected {
|
||||
amount0: ev.amount0.to_bytes_be().1,
|
||||
amount1: ev.amount1.to_bytes_be().1,
|
||||
}),
|
||||
)
|
||||
} else if let Some(ev) = abi_events::PoolInitialized::match_and_decode(log) {
|
||||
let pool_config = PoolConfig::from(ev.pool_key.2);
|
||||
let extension = {
|
||||
let extension = pool_config.extension.as_bytes();
|
||||
|
||||
let extension = {
|
||||
let extension = pool_config.extension;
|
||||
if extension == Address::zero().as_bytes() {
|
||||
Extension::Base
|
||||
} else if extension == config.oracle {
|
||||
Extension::Oracle
|
||||
} else if extension == config.twamm {
|
||||
Extension::Twamm
|
||||
} else if extension == config.mev_resist {
|
||||
Extension::MevResist
|
||||
} else {
|
||||
Extension::Unknown
|
||||
}
|
||||
};
|
||||
|
||||
if extension == Address::zero().as_bytes() {
|
||||
Extension::Base
|
||||
} else if extension == config.oracle {
|
||||
Extension::Oracle
|
||||
} else {
|
||||
Extension::Unknown
|
||||
}
|
||||
};
|
||||
(
|
||||
ev.pool_id.to_vec(),
|
||||
Event::PoolInitialized(PoolInitialized {
|
||||
token0: ev.pool_key.0,
|
||||
token1: ev.pool_key.1,
|
||||
config: ev.pool_key.2.to_vec(),
|
||||
tick: ev.tick.to_i32(),
|
||||
sqrt_ratio: float_sqrt_ratio_to_fixed(ev.sqrt_ratio),
|
||||
extension: extension.into(),
|
||||
}),
|
||||
)
|
||||
} else {
|
||||
return None;
|
||||
}
|
||||
} else if log.address == config.twamm {
|
||||
if log.topics.is_empty() {
|
||||
let data = &log.data;
|
||||
|
||||
(
|
||||
ev.pool_id.to_vec(),
|
||||
Event::PoolInitialized(PoolInitialized {
|
||||
token0: ev.pool_key.0,
|
||||
token1: ev.pool_key.1,
|
||||
config: ev.pool_key.2.to_vec(),
|
||||
tick: ev.tick.to_i32(),
|
||||
sqrt_ratio: float_sqrt_ratio_to_fixed(ev.sqrt_ratio),
|
||||
extension: extension.into(),
|
||||
}),
|
||||
)
|
||||
} else if let Some(ev) = abi_events::FeesAccumulated::match_and_decode(log) {
|
||||
(
|
||||
ev.pool_id.to_vec(),
|
||||
Event::FeesAccumulated(FeesAccumulated {
|
||||
amount0: ev.amount0.to_bytes_be().1,
|
||||
amount1: ev.amount1.to_bytes_be().1,
|
||||
}),
|
||||
)
|
||||
assert!(data.len() == 60, "virtual orders executed event data length mismatch");
|
||||
|
||||
(
|
||||
data[0..32].to_vec(),
|
||||
Event::VirtualOrdersExecuted(VirtualOrdersExecuted {
|
||||
token0_sale_rate: data[32..46].to_vec(),
|
||||
token1_sale_rate: data[46..60].to_vec(),
|
||||
}),
|
||||
)
|
||||
} else if let Some(ev) = twamm_events::OrderUpdated::match_and_decode(log) {
|
||||
let key = ev.order_key;
|
||||
|
||||
(
|
||||
PoolKey::from_order_key(&key, &log.address).into_pool_id(),
|
||||
Event::OrderUpdated(OrderUpdated {
|
||||
order_key: Some(OrderKey {
|
||||
sell_token: key.0,
|
||||
buy_token: key.1,
|
||||
fee: key.2.to_u64(),
|
||||
start_time: key.3.to_u64(),
|
||||
end_time: key.4.to_u64(),
|
||||
}),
|
||||
sale_rate_delta: ev.sale_rate_delta.to_signed_bytes_be(),
|
||||
}),
|
||||
)
|
||||
} else {
|
||||
return None;
|
||||
}
|
||||
} else {
|
||||
return None;
|
||||
};
|
||||
|
||||
@@ -8,10 +8,13 @@ use tycho_substreams::models::{
|
||||
|
||||
use crate::{
|
||||
pb::ekubo::{
|
||||
block_transaction_events::transaction_events::{pool_log::Event, PoolLog},
|
||||
block_transaction_events::transaction_events::{
|
||||
pool_log::{pool_initialized::Extension, Event},
|
||||
PoolLog,
|
||||
},
|
||||
BlockTransactionEvents,
|
||||
},
|
||||
pool_config::PoolConfig,
|
||||
pool_key::PoolConfig,
|
||||
};
|
||||
|
||||
#[substreams::handlers::map]
|
||||
@@ -25,7 +28,7 @@ fn map_components(block_tx_events: BlockTransactionEvents) -> BlockChanges {
|
||||
let (components, entities, balance_changes): (Vec<_>, Vec<_>, Vec<_>) = tx_events
|
||||
.pool_logs
|
||||
.into_iter()
|
||||
.filter_map(maybe_create_component)
|
||||
.filter_map(|log| maybe_create_component(log, block_tx_events.timestamp))
|
||||
.multiunzip();
|
||||
|
||||
(!components.is_empty()).then(|| TransactionChanges {
|
||||
@@ -45,8 +48,56 @@ fn map_components(block_tx_events: BlockTransactionEvents) -> BlockChanges {
|
||||
|
||||
fn maybe_create_component(
|
||||
log: PoolLog,
|
||||
timestamp: u64,
|
||||
) -> Option<(ProtocolComponent, EntityChanges, Vec<BalanceChange>)> {
|
||||
if let Event::PoolInitialized(pi) = log.event.unwrap() {
|
||||
let entity_attributes = (pi.extension() == Extension::Twamm)
|
||||
.then(|| {
|
||||
[
|
||||
Attribute {
|
||||
change: ChangeType::Creation.into(),
|
||||
name: "token0_sale_rate".to_string(),
|
||||
value: vec![],
|
||||
},
|
||||
Attribute {
|
||||
change: ChangeType::Creation.into(),
|
||||
name: "token1_sale_rate".to_string(),
|
||||
value: vec![],
|
||||
},
|
||||
Attribute {
|
||||
change: ChangeType::Creation.into(),
|
||||
name: "last_execution_time".to_string(),
|
||||
value: timestamp.to_be_bytes().to_vec(),
|
||||
},
|
||||
]
|
||||
})
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.chain([
|
||||
Attribute {
|
||||
change: ChangeType::Creation.into(),
|
||||
name: "liquidity".to_string(),
|
||||
value: 0_u128.to_be_bytes().to_vec(),
|
||||
},
|
||||
Attribute {
|
||||
change: ChangeType::Creation.into(),
|
||||
name: "tick".to_string(),
|
||||
value: pi.tick.to_be_bytes().to_vec(),
|
||||
},
|
||||
Attribute {
|
||||
change: ChangeType::Creation.into(),
|
||||
name: "sqrt_ratio".to_string(),
|
||||
value: pi.sqrt_ratio,
|
||||
},
|
||||
Attribute {
|
||||
change: ChangeType::Creation.into(),
|
||||
name: "balance_owner".to_string(), /* TODO: We should use AccountBalances
|
||||
* instead */
|
||||
value: hex!("e0e0e08A6A4b9Dc7bD67BCB7aadE5cF48157d444").to_vec(),
|
||||
},
|
||||
])
|
||||
.collect();
|
||||
|
||||
let config = PoolConfig::from(<[u8; 32]>::try_from(pi.config).unwrap());
|
||||
let component_id = log.pool_id.to_hex();
|
||||
|
||||
@@ -77,17 +128,23 @@ fn maybe_create_component(
|
||||
Attribute {
|
||||
change: ChangeType::Creation.into(),
|
||||
name: "fee".to_string(),
|
||||
value: config.fee,
|
||||
value: config.fee.to_be_bytes().to_vec(),
|
||||
},
|
||||
Attribute {
|
||||
change: ChangeType::Creation.into(),
|
||||
name: "tick_spacing".to_string(),
|
||||
value: config.tick_spacing,
|
||||
value: config
|
||||
.tick_spacing
|
||||
.to_be_bytes()
|
||||
.to_vec(),
|
||||
},
|
||||
Attribute {
|
||||
change: ChangeType::Creation.into(),
|
||||
name: "extension".to_string(),
|
||||
value: config.extension,
|
||||
value: config
|
||||
.extension
|
||||
.to_fixed_bytes()
|
||||
.to_vec(),
|
||||
},
|
||||
Attribute {
|
||||
change: ChangeType::Creation.into(),
|
||||
@@ -96,32 +153,7 @@ fn maybe_create_component(
|
||||
},
|
||||
],
|
||||
},
|
||||
EntityChanges {
|
||||
component_id: component_id.clone(),
|
||||
attributes: vec![
|
||||
Attribute {
|
||||
change: ChangeType::Creation.into(),
|
||||
name: "liquidity".to_string(),
|
||||
value: 0_u128.to_be_bytes().to_vec(),
|
||||
},
|
||||
Attribute {
|
||||
change: ChangeType::Creation.into(),
|
||||
name: "tick".to_string(),
|
||||
value: pi.tick.to_be_bytes().to_vec(),
|
||||
},
|
||||
Attribute {
|
||||
change: ChangeType::Creation.into(),
|
||||
name: "sqrt_ratio".to_string(),
|
||||
value: pi.sqrt_ratio,
|
||||
},
|
||||
Attribute {
|
||||
change: ChangeType::Creation.into(),
|
||||
name: "balance_owner".to_string(), /* TODO: We should use AccountBalances
|
||||
* instead */
|
||||
value: hex!("e0e0e08A6A4b9Dc7bD67BCB7aadE5cF48157d444").to_vec(),
|
||||
},
|
||||
],
|
||||
},
|
||||
EntityChanges { component_id: component_id.clone(), attributes: entity_attributes },
|
||||
vec![
|
||||
BalanceChange {
|
||||
component_id: component_id.clone().into_bytes(),
|
||||
|
||||
@@ -0,0 +1,70 @@
|
||||
use substreams::scalar::BigInt;
|
||||
|
||||
use crate::pb::ekubo::{
|
||||
block_transaction_events::transaction_events::pool_log::Event, BlockTransactionEvents,
|
||||
OrderSaleRateDelta, OrderSaleRateDeltas,
|
||||
};
|
||||
|
||||
#[substreams::handlers::map]
|
||||
pub fn map_order_sale_rate_deltas(block_tx_events: BlockTransactionEvents) -> OrderSaleRateDeltas {
|
||||
OrderSaleRateDeltas {
|
||||
deltas: block_tx_events
|
||||
.block_transaction_events
|
||||
.into_iter()
|
||||
.flat_map(|tx_events| {
|
||||
let tx = tx_events.transaction;
|
||||
|
||||
tx_events
|
||||
.pool_logs
|
||||
.into_iter()
|
||||
.flat_map(move |log| {
|
||||
let tx = tx.clone();
|
||||
|
||||
order_sale_rate_deltas(log.event.unwrap())
|
||||
.into_iter()
|
||||
.map(move |partial| OrderSaleRateDelta {
|
||||
pool_id: log.pool_id.clone(),
|
||||
time: partial.time,
|
||||
sale_rate_delta: partial.sale_rate_delta,
|
||||
is_token1: partial.is_token1,
|
||||
ordinal: log.ordinal,
|
||||
transaction: tx.clone(),
|
||||
})
|
||||
})
|
||||
})
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
|
||||
struct PartialOrderSaleRateDelta {
|
||||
time: u64,
|
||||
sale_rate_delta: Vec<u8>,
|
||||
is_token1: bool,
|
||||
}
|
||||
|
||||
fn order_sale_rate_deltas(ev: Event) -> Vec<PartialOrderSaleRateDelta> {
|
||||
match ev {
|
||||
Event::OrderUpdated(ev) => {
|
||||
let key = ev.order_key.unwrap();
|
||||
|
||||
let is_token1 = key.sell_token > key.buy_token;
|
||||
let sale_rate_delta = ev.sale_rate_delta;
|
||||
|
||||
vec![
|
||||
PartialOrderSaleRateDelta {
|
||||
time: key.start_time,
|
||||
sale_rate_delta: sale_rate_delta.clone(),
|
||||
is_token1,
|
||||
},
|
||||
PartialOrderSaleRateDelta {
|
||||
time: key.end_time,
|
||||
sale_rate_delta: BigInt::from_signed_bytes_be(&sale_rate_delta)
|
||||
.neg()
|
||||
.to_signed_bytes_be(),
|
||||
is_token1,
|
||||
},
|
||||
]
|
||||
}
|
||||
_ => vec![],
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,71 @@
|
||||
use crate::pb::ekubo::{
|
||||
block_transaction_events::transaction_events::{pool_log::Event, PoolLog},
|
||||
BlockTransactionEvents, ChangeType, SaleRateChange, SaleRateChanges,
|
||||
};
|
||||
|
||||
#[substreams::handlers::map]
|
||||
pub fn map_sale_rate_changes(block_tx_events: BlockTransactionEvents) -> SaleRateChanges {
|
||||
SaleRateChanges {
|
||||
changes: block_tx_events
|
||||
.block_transaction_events
|
||||
.into_iter()
|
||||
.flat_map(|tx_events| {
|
||||
tx_events
|
||||
.pool_logs
|
||||
.into_iter()
|
||||
.filter_map(move |log| {
|
||||
maybe_sale_rate_change(&log, block_tx_events.timestamp).map(|partial| {
|
||||
SaleRateChange {
|
||||
change_type: partial.change_type.into(),
|
||||
pool_id: log.pool_id,
|
||||
token0_value: partial.token0_value,
|
||||
token1_value: partial.token1_value,
|
||||
ordinal: log.ordinal,
|
||||
transaction: tx_events.transaction.clone(),
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
|
||||
struct PartialSaleRateChange {
|
||||
token0_value: Vec<u8>,
|
||||
token1_value: Vec<u8>,
|
||||
change_type: ChangeType,
|
||||
}
|
||||
|
||||
fn maybe_sale_rate_change(log: &PoolLog, timestamp: u64) -> Option<PartialSaleRateChange> {
|
||||
match log.event.as_ref().unwrap() {
|
||||
Event::VirtualOrdersExecuted(ev) => Some(PartialSaleRateChange {
|
||||
change_type: ChangeType::Absolute,
|
||||
token0_value: ev.token0_sale_rate.clone(),
|
||||
token1_value: ev.token1_sale_rate.clone(),
|
||||
}),
|
||||
Event::OrderUpdated(ev) => {
|
||||
// A virtual order execution always happens before an order update
|
||||
let last_execution_time = timestamp;
|
||||
|
||||
let key = ev.order_key.as_ref().unwrap();
|
||||
|
||||
(last_execution_time >= key.start_time && last_execution_time < key.end_time).then(
|
||||
|| {
|
||||
let (token0_sale_rate_delta, token1_sale_rate_delta) =
|
||||
if key.sell_token > key.buy_token {
|
||||
(vec![], ev.sale_rate_delta.clone())
|
||||
} else {
|
||||
(ev.sale_rate_delta.clone(), vec![])
|
||||
};
|
||||
|
||||
PartialSaleRateChange {
|
||||
change_type: ChangeType::Delta,
|
||||
token0_value: token0_sale_rate_delta,
|
||||
token1_value: token1_sale_rate_delta,
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
@@ -6,7 +6,7 @@ use crate::pb::ekubo::{
|
||||
};
|
||||
|
||||
#[substreams::handlers::map]
|
||||
pub fn map_tick_changes(block_tx_events: BlockTransactionEvents) -> TickDeltas {
|
||||
pub fn map_tick_deltas(block_tx_events: BlockTransactionEvents) -> TickDeltas {
|
||||
TickDeltas {
|
||||
deltas: block_tx_events
|
||||
.block_transaction_events
|
||||
@@ -4,7 +4,7 @@ use substreams_helper::hex::Hexable;
|
||||
|
||||
use crate::pb::ekubo::{
|
||||
block_transaction_events::transaction_events::{pool_log::Event, PoolLog},
|
||||
BlockTransactionEvents, LiquidityChange, LiquidityChangeType, LiquidityChanges,
|
||||
BlockTransactionEvents, ChangeType, LiquidityChange, LiquidityChanges,
|
||||
};
|
||||
|
||||
#[substreams::handlers::map]
|
||||
@@ -40,7 +40,7 @@ pub fn map_liquidity_changes(
|
||||
|
||||
struct PartialLiquidityChange {
|
||||
value: Vec<u8>,
|
||||
change_type: LiquidityChangeType,
|
||||
change_type: ChangeType,
|
||||
}
|
||||
|
||||
fn maybe_liquidity_change(
|
||||
@@ -50,7 +50,7 @@ fn maybe_liquidity_change(
|
||||
match log.event.as_ref().unwrap() {
|
||||
Event::Swapped(swapped) => Some(PartialLiquidityChange {
|
||||
value: swapped.liquidity_after.clone(),
|
||||
change_type: LiquidityChangeType::Absolute,
|
||||
change_type: ChangeType::Absolute,
|
||||
}),
|
||||
Event::PositionUpdated(position_updated) => {
|
||||
let current_tick = current_tick_store
|
||||
@@ -61,7 +61,7 @@ fn maybe_liquidity_change(
|
||||
current_tick < position_updated.upper.into())
|
||||
.then(|| PartialLiquidityChange {
|
||||
value: position_updated.liquidity_delta.clone(),
|
||||
change_type: LiquidityChangeType::Delta,
|
||||
change_type: ChangeType::Delta,
|
||||
})
|
||||
}
|
||||
_ => None,
|
||||
|
||||
@@ -0,0 +1,32 @@
|
||||
use substreams::{
|
||||
scalar::BigInt,
|
||||
store::{StoreSetSum, StoreSetSumBigInt},
|
||||
};
|
||||
use substreams_helper::hex::Hexable;
|
||||
|
||||
use crate::{pb::ekubo::SaleRateChanges, store::store_method_from_change_type};
|
||||
|
||||
#[substreams::handlers::store]
|
||||
pub fn store_active_sale_rates(sale_rate_changes: SaleRateChanges, store: StoreSetSumBigInt) {
|
||||
sale_rate_changes
|
||||
.changes
|
||||
.into_iter()
|
||||
.for_each(|changes| {
|
||||
let pool_id = changes.pool_id.to_hex();
|
||||
|
||||
let store_method = store_method_from_change_type(changes.change_type());
|
||||
|
||||
store_method(
|
||||
&store,
|
||||
changes.ordinal,
|
||||
format!("pool:{pool_id}:token0"),
|
||||
BigInt::from_signed_bytes_be(&changes.token0_value),
|
||||
);
|
||||
store_method(
|
||||
&store,
|
||||
changes.ordinal,
|
||||
format!("pool:{pool_id}:token1"),
|
||||
BigInt::from_signed_bytes_be(&changes.token1_value),
|
||||
);
|
||||
});
|
||||
}
|
||||
@@ -0,0 +1,25 @@
|
||||
use substreams::{
|
||||
scalar::BigInt,
|
||||
store::{StoreAdd, StoreAddBigInt, StoreNew},
|
||||
};
|
||||
use substreams_helper::hex::Hexable;
|
||||
|
||||
use crate::pb::ekubo::OrderSaleRateDeltas;
|
||||
|
||||
#[substreams::handlers::store]
|
||||
pub fn store_order_sale_rates(order_sale_rate_deltas: OrderSaleRateDeltas, store: StoreAddBigInt) {
|
||||
order_sale_rate_deltas
|
||||
.deltas
|
||||
.into_iter()
|
||||
.for_each(|delta| {
|
||||
let pool_id = delta.pool_id.to_hex();
|
||||
let time = delta.time;
|
||||
let token = if delta.is_token1 { "token1" } else { "token0" };
|
||||
|
||||
store.add(
|
||||
delta.ordinal,
|
||||
format!("pool:{pool_id}:{token}:time:{time}:"),
|
||||
BigInt::from_signed_bytes_be(&delta.sale_rate_delta),
|
||||
);
|
||||
});
|
||||
}
|
||||
@@ -57,54 +57,28 @@ struct ReducedBalanceDelta {
|
||||
|
||||
fn balance_deltas(ev: Event, pool_details: PoolDetails) -> Vec<ReducedBalanceDelta> {
|
||||
match ev {
|
||||
Event::Swapped(swapped) => {
|
||||
vec![
|
||||
ReducedBalanceDelta { token: pool_details.token0, delta: swapped.delta0 },
|
||||
ReducedBalanceDelta { token: pool_details.token1, delta: swapped.delta1 },
|
||||
]
|
||||
}
|
||||
Event::PositionUpdated(position_updated) => {
|
||||
vec![
|
||||
ReducedBalanceDelta {
|
||||
token: pool_details.token0,
|
||||
delta: adjust_delta_by_fee(
|
||||
BigInt::from_signed_bytes_be(&position_updated.delta0),
|
||||
pool_details.fee,
|
||||
)
|
||||
.to_signed_bytes_be(),
|
||||
},
|
||||
ReducedBalanceDelta {
|
||||
token: pool_details.token1,
|
||||
delta: adjust_delta_by_fee(
|
||||
BigInt::from_signed_bytes_be(&position_updated.delta1),
|
||||
pool_details.fee,
|
||||
)
|
||||
.to_signed_bytes_be(),
|
||||
},
|
||||
]
|
||||
}
|
||||
Event::PositionFeesCollected(position_fees_collected) => {
|
||||
vec![
|
||||
ReducedBalanceDelta {
|
||||
token: pool_details.token0,
|
||||
delta: BigInt::from_unsigned_bytes_be(&position_fees_collected.amount0)
|
||||
.neg()
|
||||
.to_signed_bytes_be(),
|
||||
},
|
||||
ReducedBalanceDelta {
|
||||
token: pool_details.token1,
|
||||
delta: BigInt::from_unsigned_bytes_be(&position_fees_collected.amount1)
|
||||
.neg()
|
||||
.to_signed_bytes_be(),
|
||||
},
|
||||
]
|
||||
}
|
||||
Event::FeesAccumulated(fees_accumulated) => {
|
||||
vec![
|
||||
ReducedBalanceDelta { token: pool_details.token0, delta: fees_accumulated.amount0 },
|
||||
ReducedBalanceDelta { token: pool_details.token1, delta: fees_accumulated.amount1 },
|
||||
]
|
||||
}
|
||||
Event::Swapped(ev) => vec![
|
||||
ReducedBalanceDelta { token: pool_details.token0, delta: ev.delta0 },
|
||||
ReducedBalanceDelta { token: pool_details.token1, delta: ev.delta1 },
|
||||
],
|
||||
Event::PositionUpdated(ev) => vec![
|
||||
ReducedBalanceDelta {
|
||||
token: pool_details.token0,
|
||||
delta: adjust_delta_by_fee(
|
||||
BigInt::from_signed_bytes_be(&ev.delta0),
|
||||
pool_details.fee,
|
||||
)
|
||||
.to_signed_bytes_be(),
|
||||
},
|
||||
ReducedBalanceDelta {
|
||||
token: pool_details.token1,
|
||||
delta: adjust_delta_by_fee(
|
||||
BigInt::from_signed_bytes_be(&ev.delta1),
|
||||
pool_details.fee,
|
||||
)
|
||||
.to_signed_bytes_be(),
|
||||
},
|
||||
],
|
||||
_ => vec![],
|
||||
}
|
||||
}
|
||||
@@ -113,8 +87,10 @@ fn balance_deltas(ev: Event, pool_details: PoolDetails) -> Vec<ReducedBalanceDel
|
||||
// here (i.e. subtract from the component's balance)
|
||||
fn adjust_delta_by_fee(delta: BigInt, fee: u64) -> BigInt {
|
||||
if delta < BigInt::zero() {
|
||||
let denom = BigInt::from_signed_bytes_be(&hex!("0100000000000000000000000000000000"));
|
||||
(delta * denom.clone()) / (denom - fee)
|
||||
let denom = BigInt::from_signed_bytes_be(&hex!("010000000000000000"));
|
||||
let (quotient, remainder) = (delta * denom.clone()).div_rem(&(denom - fee));
|
||||
|
||||
quotient - (!remainder.is_zero()) as u8
|
||||
} else {
|
||||
delta
|
||||
}
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
use substreams::{
|
||||
scalar::BigInt,
|
||||
store::{StoreSetSum, StoreSetSumBigInt},
|
||||
};
|
||||
use substreams_helper::hex::Hexable;
|
||||
|
||||
use crate::{pb::ekubo::LiquidityChanges, store::store_method_from_change_type};
|
||||
|
||||
#[substreams::handlers::store]
|
||||
pub fn store_active_liquidities(liquidity_changes: LiquidityChanges, store: StoreSetSumBigInt) {
|
||||
liquidity_changes
|
||||
.changes
|
||||
.into_iter()
|
||||
.for_each(|changes| {
|
||||
store_method_from_change_type(changes.change_type())(
|
||||
&store,
|
||||
changes.ordinal,
|
||||
format!("pool:{}", changes.pool_id.to_hex()),
|
||||
BigInt::from_signed_bytes_be(&changes.value),
|
||||
);
|
||||
});
|
||||
}
|
||||
@@ -1,30 +0,0 @@
|
||||
use substreams::{
|
||||
scalar::BigInt,
|
||||
store::{StoreSetSum, StoreSetSumBigInt},
|
||||
};
|
||||
use substreams_helper::hex::Hexable;
|
||||
|
||||
use crate::pb::ekubo::{LiquidityChangeType, LiquidityChanges};
|
||||
|
||||
#[substreams::handlers::store]
|
||||
pub fn store_liquidities(liquidity_changes: LiquidityChanges, store: StoreSetSumBigInt) {
|
||||
liquidity_changes
|
||||
.changes
|
||||
.into_iter()
|
||||
.for_each(|changes| match changes.change_type() {
|
||||
LiquidityChangeType::Delta => {
|
||||
store.sum(
|
||||
changes.ordinal,
|
||||
format!("pool:{0}", changes.pool_id.to_hex()),
|
||||
BigInt::from_signed_bytes_be(&changes.value),
|
||||
);
|
||||
}
|
||||
LiquidityChangeType::Absolute => {
|
||||
store.set(
|
||||
changes.ordinal,
|
||||
format!("pool:{0}", changes.pool_id.to_hex()),
|
||||
BigInt::from_signed_bytes_be(&changes.value),
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -14,7 +14,7 @@ use tycho_substreams::{
|
||||
|
||||
use crate::pb::ekubo::{
|
||||
block_transaction_events::transaction_events::pool_log::Event, BlockTransactionEvents,
|
||||
LiquidityChanges, TickDeltas,
|
||||
LiquidityChanges, OrderSaleRateDeltas, SaleRateChanges, TickDeltas,
|
||||
};
|
||||
|
||||
/// Aggregates protocol components and balance changes by transaction.
|
||||
@@ -30,8 +30,12 @@ fn map_protocol_changes(
|
||||
balances_store_deltas: StoreDeltas,
|
||||
ticks_map_deltas: TickDeltas,
|
||||
ticks_store_deltas: StoreDeltas,
|
||||
order_sale_rate_map_deltas: OrderSaleRateDeltas,
|
||||
order_sale_rate_store_deltas: StoreDeltas,
|
||||
liquidity_changes: LiquidityChanges,
|
||||
liquidity_store_deltas: StoreDeltas,
|
||||
sale_rate_changes: SaleRateChanges,
|
||||
sale_rate_store_deltas: StoreDeltas,
|
||||
) -> Result<BlockChanges, substreams::errors::Error> {
|
||||
let mut transaction_changes: HashMap<_, TransactionChangesBuilder> = HashMap::new();
|
||||
|
||||
@@ -90,21 +94,11 @@ fn map_protocol_changes(
|
||||
.into_iter()
|
||||
.zip(ticks_map_deltas.deltas)
|
||||
.for_each(|(store_delta, tick_delta)| {
|
||||
let new_value_bigint = BigInt::from_store_bytes(&store_delta.new_value);
|
||||
let (old_value, new_value) = (
|
||||
BigInt::from_store_bytes(&store_delta.old_value),
|
||||
BigInt::from_store_bytes(&store_delta.new_value),
|
||||
);
|
||||
|
||||
let is_creation = BigInt::from_store_bytes(&store_delta.old_value).is_zero();
|
||||
|
||||
let attribute = Attribute {
|
||||
name: format!("ticks/{}", tick_delta.tick_index),
|
||||
value: new_value_bigint.to_signed_bytes_be(),
|
||||
change: if is_creation {
|
||||
ChangeType::Creation.into()
|
||||
} else if new_value_bigint.is_zero() {
|
||||
ChangeType::Deletion.into()
|
||||
} else {
|
||||
ChangeType::Update.into()
|
||||
},
|
||||
};
|
||||
let tx = tick_delta.transaction.unwrap();
|
||||
let builder = transaction_changes
|
||||
.entry(tx.index)
|
||||
@@ -112,7 +106,39 @@ fn map_protocol_changes(
|
||||
|
||||
builder.add_entity_change(&EntityChanges {
|
||||
component_id: tick_delta.pool_id.to_hex(),
|
||||
attributes: vec![attribute],
|
||||
attributes: vec![Attribute {
|
||||
name: format!("ticks/{}", tick_delta.tick_index),
|
||||
value: new_value.to_signed_bytes_be(),
|
||||
change: change_type_from_delta(&old_value, &new_value).into(),
|
||||
}],
|
||||
});
|
||||
});
|
||||
|
||||
// TWAMM order sale rate deltas
|
||||
order_sale_rate_store_deltas
|
||||
.deltas
|
||||
.into_iter()
|
||||
.zip(order_sale_rate_map_deltas.deltas)
|
||||
.for_each(|(store_delta, sale_rate_delta)| {
|
||||
let tx = sale_rate_delta.transaction.unwrap();
|
||||
let builder = transaction_changes
|
||||
.entry(tx.index)
|
||||
.or_insert_with(|| TransactionChangesBuilder::new(&tx.into()));
|
||||
|
||||
let (old_value, new_value) = (
|
||||
BigInt::from_store_bytes(&store_delta.old_value),
|
||||
BigInt::from_store_bytes(&store_delta.new_value),
|
||||
);
|
||||
|
||||
let token = if sale_rate_delta.is_token1 { "token1" } else { "token0" };
|
||||
|
||||
builder.add_entity_change(&EntityChanges {
|
||||
component_id: sale_rate_delta.pool_id.to_hex(),
|
||||
attributes: vec![Attribute {
|
||||
name: format!("orders/{}/{}", token, sale_rate_delta.time),
|
||||
value: new_value.to_signed_bytes_be(),
|
||||
change: change_type_from_delta(&old_value, &new_value).into(),
|
||||
}],
|
||||
});
|
||||
});
|
||||
|
||||
@@ -127,22 +153,50 @@ fn map_protocol_changes(
|
||||
.entry(tx.index)
|
||||
.or_insert_with(|| TransactionChangesBuilder::new(&tx.into()));
|
||||
|
||||
let new_value_bigint = BigInt::from_str(key::segment_at(
|
||||
&String::from_utf8(store_delta.new_value).unwrap(),
|
||||
1,
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
builder.add_entity_change(&EntityChanges {
|
||||
component_id: change.pool_id.to_hex(),
|
||||
attributes: vec![Attribute {
|
||||
name: "liquidity".to_string(),
|
||||
value: new_value_bigint.to_signed_bytes_be(),
|
||||
value: bigint_from_set_sum_store_delta_value(store_delta.new_value)
|
||||
.to_signed_bytes_be(),
|
||||
change: ChangeType::Update.into(),
|
||||
}],
|
||||
});
|
||||
});
|
||||
|
||||
// TWAMM active sale rates
|
||||
sale_rate_store_deltas
|
||||
.deltas
|
||||
.chunks(2)
|
||||
.zip(sale_rate_changes.changes)
|
||||
.for_each(|(store_deltas, change)| {
|
||||
let tx = change.transaction.unwrap();
|
||||
let builder = transaction_changes
|
||||
.entry(tx.index)
|
||||
.or_insert_with(|| TransactionChangesBuilder::new(&tx.into()));
|
||||
|
||||
let (token0_sale_rate, token1_sale_rate) = (
|
||||
bigint_from_set_sum_store_delta_value(store_deltas[0].new_value.clone()),
|
||||
bigint_from_set_sum_store_delta_value(store_deltas[1].new_value.clone()),
|
||||
);
|
||||
|
||||
builder.add_entity_change(&EntityChanges {
|
||||
component_id: change.pool_id.to_hex(),
|
||||
attributes: vec![
|
||||
Attribute {
|
||||
name: "token0_sale_rate".to_string(),
|
||||
value: token0_sale_rate.to_bytes_be().1,
|
||||
change: ChangeType::Update.into(),
|
||||
},
|
||||
Attribute {
|
||||
name: "token1_sale_rate".to_string(),
|
||||
value: token1_sale_rate.to_bytes_be().1,
|
||||
change: ChangeType::Update.into(),
|
||||
},
|
||||
],
|
||||
});
|
||||
});
|
||||
|
||||
// Remaining event changes not subject to special treatment
|
||||
block_tx_events
|
||||
.block_transaction_events
|
||||
@@ -156,12 +210,17 @@ fn map_protocol_changes(
|
||||
.flat_map(move |log| {
|
||||
let tx = tx.clone();
|
||||
|
||||
maybe_attribute_updates(log.event.unwrap()).map(|attrs| {
|
||||
(
|
||||
tx,
|
||||
EntityChanges { component_id: log.pool_id.to_hex(), attributes: attrs },
|
||||
)
|
||||
})
|
||||
maybe_attribute_updates(log.event.unwrap(), block_tx_events.timestamp).map(
|
||||
|attrs| {
|
||||
(
|
||||
tx,
|
||||
EntityChanges {
|
||||
component_id: log.pool_id.to_hex(),
|
||||
attributes: attrs,
|
||||
},
|
||||
)
|
||||
},
|
||||
)
|
||||
})
|
||||
})
|
||||
.for_each(|(tx, entity_changes)| {
|
||||
@@ -181,23 +240,39 @@ fn map_protocol_changes(
|
||||
})
|
||||
}
|
||||
|
||||
fn maybe_attribute_updates(ev: Event) -> Option<Vec<Attribute>> {
|
||||
fn maybe_attribute_updates(ev: Event, timestamp: u64) -> Option<Vec<Attribute>> {
|
||||
match ev {
|
||||
Event::Swapped(swapped) => Some(vec![
|
||||
Event::Swapped(ev) => Some(vec![
|
||||
Attribute {
|
||||
name: "tick".into(),
|
||||
value: swapped
|
||||
.tick_after
|
||||
.to_be_bytes()
|
||||
.to_vec(),
|
||||
value: ev.tick_after.to_be_bytes().to_vec(),
|
||||
change: ChangeType::Update.into(),
|
||||
},
|
||||
Attribute {
|
||||
name: "sqrt_ratio".into(),
|
||||
value: swapped.sqrt_ratio_after,
|
||||
value: ev.sqrt_ratio_after,
|
||||
change: ChangeType::Update.into(),
|
||||
},
|
||||
]),
|
||||
Event::VirtualOrdersExecuted(_) => Some(vec![Attribute {
|
||||
name: "last_execution_time".to_string(),
|
||||
value: timestamp.to_be_bytes().to_vec(),
|
||||
change: ChangeType::Update.into(),
|
||||
}]),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn change_type_from_delta(old_value: &BigInt, new_value: &BigInt) -> ChangeType {
|
||||
if old_value.is_zero() {
|
||||
ChangeType::Creation
|
||||
} else if new_value.is_zero() {
|
||||
ChangeType::Deletion
|
||||
} else {
|
||||
ChangeType::Update
|
||||
}
|
||||
}
|
||||
|
||||
fn bigint_from_set_sum_store_delta_value(value: Vec<u8>) -> BigInt {
|
||||
BigInt::from_str(key::segment_at(&String::from_utf8(value).unwrap(), 1)).unwrap()
|
||||
}
|
||||
|
||||
@@ -7,13 +7,21 @@ mod map_events;
|
||||
|
||||
#[path = "2_map_components.rs"]
|
||||
mod map_components;
|
||||
#[path = "2_map_tick_changes.rs"]
|
||||
mod map_tick_changes;
|
||||
#[path = "2_map_order_sale_rate_deltas.rs"]
|
||||
mod map_order_sale_rate_deltas;
|
||||
#[path = "2_map_sale_rate_changes.rs"]
|
||||
mod map_sale_rate_changes;
|
||||
#[path = "2_map_tick_deltas.rs"]
|
||||
mod map_tick_deltas;
|
||||
#[path = "2_store_active_ticks.rs"]
|
||||
mod store_active_ticks;
|
||||
|
||||
#[path = "3_map_liquidity_changes.rs"]
|
||||
mod map_liquidity_changes;
|
||||
#[path = "3_store_active_sale_rates.rs"]
|
||||
mod store_active_sale_rates;
|
||||
#[path = "3_store_order_sale_rates.rs"]
|
||||
mod store_order_sale_rates;
|
||||
#[path = "3_store_pool_details.rs"]
|
||||
mod store_pool_details;
|
||||
#[path = "3_store_tick_liquidities.rs"]
|
||||
@@ -21,8 +29,8 @@ mod store_tick_liquidities;
|
||||
|
||||
#[path = "4_map_balance_changes.rs"]
|
||||
mod map_balance_changes;
|
||||
#[path = "4_store_liquidities.rs"]
|
||||
mod store_liquidities;
|
||||
#[path = "4_store_active_liquidities.rs"]
|
||||
mod store_active_liquidities;
|
||||
|
||||
#[path = "5_store_balance_changes.rs"]
|
||||
mod store_balance_changes;
|
||||
|
||||
Reference in New Issue
Block a user