fix: liquidity
This commit is contained in:
@@ -3,9 +3,9 @@ use std::collections::HashMap;
|
|||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use substreams::pb::substreams::StoreDeltas;
|
use substreams::pb::substreams::StoreDeltas;
|
||||||
use substreams::store::{
|
use substreams::store::{
|
||||||
StoreAdd, StoreAddBigInt, StoreAddInt64, StoreGet, StoreGetInt64, StoreNew,
|
StoreAdd, StoreAddBigInt, StoreAddInt64, StoreGet, StoreGetInt64, StoreGetString, StoreNew,
|
||||||
|
StoreSet, StoreSetString,
|
||||||
};
|
};
|
||||||
use substreams::{hex, log};
|
|
||||||
|
|
||||||
use substreams::key;
|
use substreams::key;
|
||||||
use substreams::scalar::BigInt;
|
use substreams::scalar::BigInt;
|
||||||
@@ -96,45 +96,141 @@ pub fn store_pools_created(map: tycho::GroupedTransactionProtocolComponents, sto
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Simply stores the `ProtocolComponent`s with the pool id as the key
|
||||||
|
#[substreams::handlers::store]
|
||||||
|
pub fn store_pool_tokens(map: tycho::GroupedTransactionProtocolComponents, store: StoreSetString) {
|
||||||
|
&map.tx_components
|
||||||
|
.iter()
|
||||||
|
.flat_map(|tx_components| &tx_components.components)
|
||||||
|
.for_each(|component| {
|
||||||
|
store.set(
|
||||||
|
0,
|
||||||
|
format!("pool:{0}", component.id),
|
||||||
|
&component
|
||||||
|
.tokens
|
||||||
|
.iter()
|
||||||
|
.map(|token| hex::encode(token))
|
||||||
|
.join(":".into()),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/// Since the `PoolBalanceChanged` events administer only deltas, we need to leverage a map and a
|
/// Since the `PoolBalanceChanged` events administer only deltas, we need to leverage a map and a
|
||||||
/// store to be able to tally up final balances for tokens in a pool.
|
/// store to be able to tally up final balances for tokens in a pool.
|
||||||
#[substreams::handlers::map]
|
#[substreams::handlers::map]
|
||||||
pub fn map_balance_deltas(
|
pub fn map_balance_deltas(
|
||||||
block: eth::v2::Block,
|
block: eth::v2::Block,
|
||||||
store: StoreGetInt64,
|
store: StoreGetInt64,
|
||||||
|
tokens_store: StoreGetString,
|
||||||
) -> Result<tycho::BalanceDeltas, anyhow::Error> {
|
) -> Result<tycho::BalanceDeltas, anyhow::Error> {
|
||||||
Ok(tycho::BalanceDeltas {
|
let mut deltas = block
|
||||||
balance_deltas: block
|
.logs()
|
||||||
|
.filter_map(|log| {
|
||||||
|
let event = abi::pool::events::TokenExchange::match_and_decode(log)?;
|
||||||
|
Some((log, event))
|
||||||
|
})
|
||||||
|
.filter(|(log, _)| {
|
||||||
|
store
|
||||||
|
.get_last(format!("pool:{0}", hex::encode(&log.address())))
|
||||||
|
.is_some()
|
||||||
|
})
|
||||||
|
.flat_map(|(log, event)| {
|
||||||
|
let tokens_bought_delta: BigInt = event.tokens_bought * -1;
|
||||||
|
vec![
|
||||||
|
tycho::BalanceDelta {
|
||||||
|
ord: log.log.ordinal,
|
||||||
|
tx: Some(tx_from_log(&log)),
|
||||||
|
token: event.sold_id.to_signed_bytes_be(),
|
||||||
|
delta: event.tokens_sold.to_signed_bytes_be(),
|
||||||
|
component_id: log.address().into(),
|
||||||
|
},
|
||||||
|
tycho::BalanceDelta {
|
||||||
|
ord: log.log.ordinal,
|
||||||
|
tx: Some(tx_from_log(&log)),
|
||||||
|
token: event.bought_id.to_signed_bytes_be(),
|
||||||
|
delta: tokens_bought_delta.to_signed_bytes_be(),
|
||||||
|
component_id: log.address().into(),
|
||||||
|
},
|
||||||
|
]
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
deltas.extend(
|
||||||
|
block
|
||||||
.logs()
|
.logs()
|
||||||
.filter_map(|log| {
|
.filter_map(|log| {
|
||||||
let event = abi::pool::events::TokenExchange::match_and_decode(log)?;
|
let event = abi::pool::events::AddLiquidity::match_and_decode(log)?;
|
||||||
Some((log, event))
|
Some((log, event))
|
||||||
})
|
})
|
||||||
.filter(|(log, _)| {
|
.filter(|(log, _)| {
|
||||||
store
|
store
|
||||||
.get_last(format!("pool:{0}", hex::encode(&log.address())))
|
.get_last(format!("pool:{0}", hex::encode(&log.address())))
|
||||||
.is_some()
|
.is_none()
|
||||||
})
|
})
|
||||||
.flat_map(|(log, event)| {
|
.flat_map(|(log, event)| {
|
||||||
let tokens_bought_delta: BigInt = event.tokens_bought * -1;
|
let tokens = tokens_store
|
||||||
vec![
|
.get_last(format!("pool:{}", hex::encode(log.address())))
|
||||||
tycho::BalanceDelta {
|
.unwrap()
|
||||||
|
.split(":")
|
||||||
|
.map(|token| token.to_owned()) // Clone the tokens
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
event
|
||||||
|
.token_amounts
|
||||||
|
.iter()
|
||||||
|
.zip(tokens)
|
||||||
|
.map(move |(token_amount, token_id)| tycho::BalanceDelta {
|
||||||
ord: log.log.ordinal,
|
ord: log.log.ordinal,
|
||||||
tx: Some(tx_from_log(&log)),
|
tx: Some(tx_from_log(&log)),
|
||||||
token: event.sold_id.to_signed_bytes_be(),
|
token: token_id.into(),
|
||||||
delta: event.tokens_sold.to_signed_bytes_be(),
|
delta: token_amount.to_signed_bytes_be(),
|
||||||
component_id: log.address().into(),
|
component_id: log.address().into(),
|
||||||
},
|
})
|
||||||
tycho::BalanceDelta {
|
.collect::<Vec<_>>()
|
||||||
ord: log.log.ordinal,
|
|
||||||
tx: Some(tx_from_log(&log)),
|
|
||||||
token: event.bought_id.to_signed_bytes_be(),
|
|
||||||
delta: tokens_bought_delta.to_signed_bytes_be(),
|
|
||||||
component_id: log.address().into(),
|
|
||||||
},
|
|
||||||
]
|
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>(),
|
.collect::<Vec<_>>(),
|
||||||
|
);
|
||||||
|
|
||||||
|
deltas.extend(
|
||||||
|
block
|
||||||
|
.logs()
|
||||||
|
.filter_map(|log| {
|
||||||
|
let event = abi::pool::events::RemoveLiquidity::match_and_decode(log)?;
|
||||||
|
Some((log, event))
|
||||||
|
})
|
||||||
|
.filter(|(log, _)| {
|
||||||
|
store
|
||||||
|
.get_last(format!("pool:{0}", hex::encode(&log.address())))
|
||||||
|
.is_none()
|
||||||
|
})
|
||||||
|
.flat_map(|(log, event)| {
|
||||||
|
let tokens = tokens_store
|
||||||
|
.get_last(format!("pool:{}", hex::encode(log.address())))
|
||||||
|
.unwrap()
|
||||||
|
.split(":")
|
||||||
|
.map(|token| token.to_owned()) // Clone the tokens
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
event
|
||||||
|
.token_amounts
|
||||||
|
.iter()
|
||||||
|
.zip(tokens)
|
||||||
|
.map(move |(token_amount, token_id)| {
|
||||||
|
let negative_token_amount: BigInt = token_amount * BigInt::from(-1);
|
||||||
|
tycho::BalanceDelta {
|
||||||
|
ord: log.log.ordinal,
|
||||||
|
tx: Some(tx_from_log(&log)),
|
||||||
|
token: token_id.into(),
|
||||||
|
delta: negative_token_amount.to_signed_bytes_be(),
|
||||||
|
component_id: log.address().into(),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>(),
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(tycho::BalanceDeltas {
|
||||||
|
balance_deltas: deltas,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -33,6 +33,14 @@ modules:
|
|||||||
inputs:
|
inputs:
|
||||||
- map: map_pools_created
|
- map: map_pools_created
|
||||||
|
|
||||||
|
- name: store_pools_tokens
|
||||||
|
kind: store
|
||||||
|
initialBlock: 19128828
|
||||||
|
updatePolicy: add
|
||||||
|
valueType: string
|
||||||
|
inputs:
|
||||||
|
- map: map_pools_created
|
||||||
|
|
||||||
- name: map_balance_deltas
|
- name: map_balance_deltas
|
||||||
kind: map
|
kind: map
|
||||||
initialBlock: 19128828 # An arbitrary block that should change based on your requirements
|
initialBlock: 19128828 # An arbitrary block that should change based on your requirements
|
||||||
|
|||||||
Reference in New Issue
Block a user