integrate remaining abis and clean up / comment some code

This commit is contained in:
0xMochan
2024-05-02 13:34:11 -05:00
parent 692579e930
commit 126bd003a8
2 changed files with 151 additions and 145 deletions

View File

@@ -123,7 +123,15 @@ pub fn map_relative_balances(
pools_store: StoreGetInt64, pools_store: StoreGetInt64,
tokens_store: StoreGetString, tokens_store: StoreGetString,
) -> Result<BlockBalanceDeltas, anyhow::Error> { ) -> Result<BlockBalanceDeltas, anyhow::Error> {
Ok(BlockBalanceDeltas { balance_deltas: emit_deltas(block, pools_store, tokens_store) }) Ok(BlockBalanceDeltas {
balance_deltas: {
block
.logs()
.filter_map(|log| emit_deltas(log, &pools_store, &tokens_store))
.flat_map(|deltas| deltas.into_iter())
.collect::<Vec<_>>()
},
})
} }
/// It's significant to include both the `pool_id` and the `token_id` for each balance delta as the /// It's significant to include both the `pool_id` and the `token_id` for each balance delta as the

View File

@@ -2,11 +2,19 @@ use substreams::{
scalar::BigInt, scalar::BigInt,
store::{StoreGet, StoreGetInt64, StoreGetString}, store::{StoreGet, StoreGetInt64, StoreGetString},
}; };
use substreams_ethereum::{block_view::LogView, pb::eth, Event}; use substreams_ethereum::{block_view::LogView, Event};
use tycho_substreams::prelude::*; use tycho_substreams::prelude::*;
use crate::abi; use crate::abi;
struct GenericAddLiquidity {
token_amounts: Vec<BigInt>,
provider: String,
fees: BigInt,
invariant: BigInt,
token_supply: BigInt,
}
fn tx_from_log(log: &LogView) -> Transaction { fn tx_from_log(log: &LogView) -> Transaction {
Transaction { Transaction {
hash: log.receipt.transaction.hash.clone(), hash: log.receipt.transaction.hash.clone(),
@@ -16,152 +24,142 @@ fn tx_from_log(log: &LogView) -> Transaction {
} }
} }
/// This function emits balance deltas for mints, burns, and exchanges in Curve pools. Since some
/// pools contain differing ABIs, we load in several examples of abis in order to best match the
/// topic ID to the correct event. The repetition in this function is dervived from the fact that
/// most of these events have similar structures, but the specific topic id differs.
pub fn emit_deltas( pub fn emit_deltas(
log: LogView, log: LogView,
pools_store: StoreGetInt64, pools_store: &StoreGetInt64,
tokens_store: StoreGetString, tokens_store: &StoreGetString,
) -> Option<impl Iterator<Item = BalanceDelta>> { ) -> Option<Vec<BalanceDelta>> {
if let Some(event) = abi::pool::events::TokenExchange::match_and_decode(log) { let pool_key = format!("pool:{}", hex::encode(&log.address()));
if pools_store if pools_store.get_last(pool_key).is_none() {
.get_last(format!("pool:{0}", hex::encode(&log.address()))) return None;
.is_none()
{
return None;
}
let tokens_bought_delta: BigInt = event.tokens_bought * -1;
return Some(
vec![
BalanceDelta {
ord: log.log.ordinal,
tx: Some(tx_from_log(&log)),
token: event.sold_id.to_signed_bytes_be(),
delta: event.tokens_sold.to_signed_bytes_be(),
component_id: log.address().into(),
},
BalanceDelta {
ord: log.log.ordinal,
tx: Some(tx_from_log(&log)),
token: event.bought_id.to_signed_bytes_be(),
delta: tokens_bought_delta.to_signed_bytes_be(),
component_id: log.address().into(),
},
]
.into_iter(),
)
} else if let Some(event) = abi::pool::events::AddLiquidity::match_and_decode(log) {
let tokens = tokens_store
.get_last(format!("pool:{0}", hex::encode(log.address())))?
.split(":")
.map(|token| token.to_owned()) // Clone the tokens
.collect::<Vec<_>>();
let deltas: Vec<_> = event
.token_amounts
.iter()
.zip(tokens)
.map(move |(token_amount, token_id)| BalanceDelta {
ord: log.log.ordinal,
tx: Some(tx_from_log(&log)),
token: token_id.into(),
delta: token_amount.to_signed_bytes_be(),
component_id: log.address().into(),
})
.collect();
return Some(deltas.into_iter());
} else if let Some(event) = abi::pool::events::RemoveLiquidity::match_and_decode(log) {
let tokens = tokens_store
.get_last(format!("pool:{0}", hex::encode(log.address())))?
.split(":")
.map(|token| token.to_owned()) // Clone the tokens
.collect::<Vec<_>>();
let deltas: Vec<_> = event
.token_amounts
.iter()
.zip(tokens)
.map(move |(token_amount, token_id)| BalanceDelta {
ord: log.log.ordinal,
tx: Some(tx_from_log(&log)),
token: token_id.into(),
delta: token_amount.to_signed_bytes_be(),
component_id: log.address().into(),
})
.collect();
return Some(deltas.into_iter());
} }
deltas.extend( let tokens = tokens_store
block .get_last(format!("pool:{}", hex::encode(log.address())))?
.logs() .split(":")
.filter_map(|log| { .map(|token| token.to_owned())
let event = abi::pool::events::AddLiquidity::match_and_decode(log)?; .collect::<Vec<_>>();
Some((log, event))
})
.filter_map(|(log, event)| {
let tokens = tokens_store
.get_last(format!("pool:{0}", hex::encode(log.address())))?
.split(":")
.map(|token| token.to_owned()) // Clone the tokens
.collect::<Vec<_>>();
Some((tokens, log, event)) if let Some(event) = abi::pool::events::TokenExchange::match_and_decode(log) {
}) return token_change_deltas(event, log);
.flat_map(|(tokens, log, event)| { } else if let Some(event) = abi::pool_3pool::events::TokenExchange::match_and_decode(log) {
event return token_change_deltas(
.token_amounts abi::pool::events::TokenExchange {
.iter() sold_id: event.sold_id,
.zip(tokens) bought_id: event.bought_id,
.map(move |(token_amount, token_id)| BalanceDelta { tokens_sold: event.tokens_sold,
ord: log.log.ordinal, tokens_bought: event.tokens_bought,
tx: Some(tx_from_log(&log)), buyer: event.buyer,
token: token_id.into(), },
delta: token_amount.to_signed_bytes_be(), log,
component_id: log.address().into(), );
}) } else if let Some(event) = abi::pool_steth::events::TokenExchange::match_and_decode(log) {
.collect::<Vec<_>>() return token_change_deltas(
}) abi::pool::events::TokenExchange {
.collect::<Vec<_>>(), sold_id: event.sold_id,
); bought_id: event.bought_id,
tokens_sold: event.tokens_sold,
deltas.extend( tokens_bought: event.tokens_bought,
block buyer: event.buyer,
.logs() },
.filter_map(|log| { log,
let event = abi::pool::events::RemoveLiquidity::match_and_decode(log)?; );
Some((log, event)) } else if let Some(event) = abi::pool_tricrypto::events::TokenExchange::match_and_decode(log) {
}) return token_change_deltas(
.filter(|(log, _)| { abi::pool::events::TokenExchange {
pools_store sold_id: event.sold_id,
.get_last(format!("pool:{0}", hex::encode(&log.address()))) bought_id: event.bought_id,
.is_none() tokens_sold: event.tokens_sold,
}) tokens_bought: event.tokens_bought,
.flat_map(|(log, event)| { buyer: event.buyer,
let tokens = tokens_store },
.get_last(format!("pool:{}", hex::encode(log.address()))) log,
.unwrap() );
.split(":") } else if let Some(event) = abi::pool::events::AddLiquidity::match_and_decode(log) {
.map(|token| token.to_owned()) // Clone the tokens return add_liquidity_deltas(event.token_amounts.into(), &tokens, log);
.collect::<Vec<_>>(); } else if let Some(event) = abi::pool_3pool::events::AddLiquidity::match_and_decode(log) {
return add_liquidity_deltas(event.token_amounts.into(), &tokens, log)
event } else if let Some(event) = abi::pool_steth::events::AddLiquidity::match_and_decode(log) {
.token_amounts return add_liquidity_deltas(event.token_amounts.into(), &tokens, log)
.iter() } else if let Some(event) = abi::pool_tricrypto::events::AddLiquidity::match_and_decode(log) {
.zip(tokens) return add_liquidity_deltas(event.token_amounts.into(), &tokens, log)
.map(move |(token_amount, token_id)| { } else if let Some(event) = abi::pool::events::RemoveLiquidity::match_and_decode(log) {
let negative_token_amount: BigInt = token_amount * BigInt::from(-1); return add_liquidity_deltas(event.token_amounts.into(), &tokens, log)
BalanceDelta { } else if let Some(event) = abi::pool_3pool::events::RemoveLiquidity::match_and_decode(log) {
ord: log.log.ordinal, return add_liquidity_deltas(event.token_amounts.into(), &tokens, log)
tx: Some(tx_from_log(&log)), } else if let Some(event) = abi::pool_steth::events::RemoveLiquidity::match_and_decode(log) {
token: token_id.into(), return add_liquidity_deltas(event.token_amounts.into(), &tokens, log)
delta: negative_token_amount.to_signed_bytes_be(), } else if let Some(event) = abi::pool_tricrypto::events::RemoveLiquidity::match_and_decode(log)
component_id: log.address().into(), {
} return add_liquidity_deltas(event.token_amounts.into(), &tokens, log)
}) } else {
.collect::<Vec<_>>() None
}) }
.collect::<Vec<_>>(), }
);
deltas fn token_change_deltas(
event: abi::pool::events::TokenExchange,
log: LogView<'_>,
) -> Option<Vec<BalanceDelta>> {
let tokens_bought_delta: BigInt = event.tokens_bought * -1;
Some(vec![
BalanceDelta {
ord: log.log.ordinal,
tx: Some(tx_from_log(&log)),
token: event.sold_id.to_signed_bytes_be(),
delta: event.tokens_sold.to_signed_bytes_be(),
component_id: log.address().into(),
},
BalanceDelta {
ord: log.log.ordinal,
tx: Some(tx_from_log(&log)),
token: event.bought_id.to_signed_bytes_be(),
delta: tokens_bought_delta.to_signed_bytes_be(),
component_id: log.address().into(),
},
])
}
fn add_liquidity_deltas(
amounts: Vec<BigInt>,
tokens: &Vec<String>,
log: LogView<'_>,
) -> Option<Vec<BalanceDelta>> {
Some(
amounts
.iter()
.zip(tokens)
.map(move |(token_amount, token_id)| BalanceDelta {
ord: log.log.ordinal,
tx: Some(tx_from_log(&log)),
token: token_id.as_str().into(),
delta: token_amount.to_signed_bytes_be(),
component_id: log.address().into(),
})
.collect::<Vec<_>>(),
)
}
fn remove_liquidity_deltas(
amounts: Vec<BigInt>,
tokens: &Vec<String>,
log: LogView<'_>,
) -> Option<Vec<BalanceDelta>> {
Some(
amounts
.iter()
.zip(tokens)
.map(move |(token_amount, token_id)| BalanceDelta {
ord: log.log.ordinal,
tx: Some(tx_from_log(&log)),
token: token_id.as_str().into(),
delta: token_amount.to_signed_bytes_be(),
component_id: log.address().into(),
})
.collect::<Vec<_>>(),
)
} }