abis + params
This commit is contained in:
@@ -1,5 +1,8 @@
|
||||
pub mod crypto_pool_factory;
|
||||
pub mod pool_tricrypto;
|
||||
pub mod pool_steth;
|
||||
pub mod pool;
|
||||
pub mod main_registry;
|
||||
pub mod pool_3pool;
|
||||
pub mod meta_pool_factory;
|
||||
pub mod crypto_swap_registry;
|
||||
|
||||
8979
substreams/ethereum-curve/src/abi/pool_3pool.rs
Normal file
8979
substreams/ethereum-curve/src/abi/pool_3pool.rs
Normal file
File diff suppressed because it is too large
Load Diff
5393
substreams/ethereum-curve/src/abi/pool_steth.rs
Normal file
5393
substreams/ethereum-curve/src/abi/pool_steth.rs
Normal file
File diff suppressed because it is too large
Load Diff
11448
substreams/ethereum-curve/src/abi/pool_tricrypto.rs
Normal file
11448
substreams/ethereum-curve/src/abi/pool_tricrypto.rs
Normal file
File diff suppressed because it is too large
Load Diff
@@ -1,4 +1,5 @@
|
||||
mod abi;
|
||||
pub mod modules;
|
||||
mod pool_changes;
|
||||
mod pool_factories;
|
||||
mod pools;
|
||||
|
||||
@@ -16,7 +16,7 @@ use substreams_ethereum::{block_view::LogView, pb::eth};
|
||||
|
||||
use substreams_ethereum::Event;
|
||||
|
||||
use crate::{abi, pool_factories, pools::emit_specific_pools};
|
||||
use crate::{abi, pool_changes::emit_deltas, pool_factories, pools::emit_specific_pools};
|
||||
use tycho_substreams::{
|
||||
balances::store_balance_changes, contract::extract_contract_changes, prelude::*,
|
||||
};
|
||||
@@ -32,15 +32,6 @@ impl PartialEq for TransactionWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
fn tx_from_log(log: &LogView) -> Transaction {
|
||||
Transaction {
|
||||
hash: log.receipt.transaction.hash.clone(),
|
||||
from: log.receipt.transaction.from.clone(),
|
||||
to: log.receipt.transaction.to.clone(),
|
||||
index: Into::<u64>::into(log.receipt.transaction.index),
|
||||
}
|
||||
}
|
||||
|
||||
#[substreams::handlers::map]
|
||||
pub fn map_components(
|
||||
params: String,
|
||||
@@ -132,111 +123,7 @@ pub fn map_relative_balances(
|
||||
pools_store: StoreGetInt64,
|
||||
tokens_store: StoreGetString,
|
||||
) -> Result<BlockBalanceDeltas, anyhow::Error> {
|
||||
let mut deltas = block
|
||||
.logs()
|
||||
.filter_map(|log| {
|
||||
let event = abi::pool::events::TokenExchange::match_and_decode(log)?;
|
||||
Some((log, event))
|
||||
})
|
||||
.filter(|(log, _)| {
|
||||
pools_store
|
||||
.get_last(format!("pool:{0}", hex::encode(&log.address())))
|
||||
.is_some()
|
||||
})
|
||||
.flat_map(|(log, event)| {
|
||||
let tokens_bought_delta: BigInt = event.tokens_bought * -1;
|
||||
vec![
|
||||
BalanceDelta {
|
||||
ord: log.log.ordinal,
|
||||
tx: Some(tx_from_log(&log)),
|
||||
token: event.sold_id.to_signed_bytes_be(),
|
||||
delta: event.tokens_sold.to_signed_bytes_be(),
|
||||
component_id: log.address().into(),
|
||||
},
|
||||
BalanceDelta {
|
||||
ord: log.log.ordinal,
|
||||
tx: Some(tx_from_log(&log)),
|
||||
token: event.bought_id.to_signed_bytes_be(),
|
||||
delta: tokens_bought_delta.to_signed_bytes_be(),
|
||||
component_id: log.address().into(),
|
||||
},
|
||||
]
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
deltas.extend(
|
||||
block
|
||||
.logs()
|
||||
.filter_map(|log| {
|
||||
let event = abi::pool::events::AddLiquidity::match_and_decode(log)?;
|
||||
Some((log, event))
|
||||
})
|
||||
.filter_map(|(log, event)| {
|
||||
let tokens = tokens_store
|
||||
.get_last(format!("pool:{0}", hex::encode(log.address())))?
|
||||
.split(":")
|
||||
.map(|token| token.to_owned()) // Clone the tokens
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Some((tokens, log, event))
|
||||
})
|
||||
.flat_map(|(tokens, log, event)| {
|
||||
event
|
||||
.token_amounts
|
||||
.iter()
|
||||
.zip(tokens)
|
||||
.map(move |(token_amount, token_id)| BalanceDelta {
|
||||
ord: log.log.ordinal,
|
||||
tx: Some(tx_from_log(&log)),
|
||||
token: token_id.into(),
|
||||
delta: token_amount.to_signed_bytes_be(),
|
||||
component_id: log.address().into(),
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
);
|
||||
|
||||
deltas.extend(
|
||||
block
|
||||
.logs()
|
||||
.filter_map(|log| {
|
||||
let event = abi::pool::events::RemoveLiquidity::match_and_decode(log)?;
|
||||
Some((log, event))
|
||||
})
|
||||
.filter(|(log, _)| {
|
||||
pools_store
|
||||
.get_last(format!("pool:{0}", hex::encode(&log.address())))
|
||||
.is_none()
|
||||
})
|
||||
.flat_map(|(log, event)| {
|
||||
let tokens = tokens_store
|
||||
.get_last(format!("pool:{}", hex::encode(log.address())))
|
||||
.unwrap()
|
||||
.split(":")
|
||||
.map(|token| token.to_owned()) // Clone the tokens
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
event
|
||||
.token_amounts
|
||||
.iter()
|
||||
.zip(tokens)
|
||||
.map(move |(token_amount, token_id)| {
|
||||
let negative_token_amount: BigInt = token_amount * BigInt::from(-1);
|
||||
BalanceDelta {
|
||||
ord: log.log.ordinal,
|
||||
tx: Some(tx_from_log(&log)),
|
||||
token: token_id.into(),
|
||||
delta: negative_token_amount.to_signed_bytes_be(),
|
||||
component_id: log.address().into(),
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
);
|
||||
|
||||
Ok(BlockBalanceDeltas { balance_deltas: deltas })
|
||||
Ok(BlockBalanceDeltas { balance_deltas: emit_deltas(block, pools_store, tokens_store) })
|
||||
}
|
||||
|
||||
/// It's significant to include both the `pool_id` and the `token_id` for each balance delta as the
|
||||
|
||||
167
substreams/ethereum-curve/src/pool_changes.rs
Normal file
167
substreams/ethereum-curve/src/pool_changes.rs
Normal file
@@ -0,0 +1,167 @@
|
||||
use substreams::{
|
||||
scalar::BigInt,
|
||||
store::{StoreGet, StoreGetInt64, StoreGetString},
|
||||
};
|
||||
use substreams_ethereum::{block_view::LogView, pb::eth, Event};
|
||||
use tycho_substreams::prelude::*;
|
||||
|
||||
use crate::abi;
|
||||
|
||||
fn tx_from_log(log: &LogView) -> Transaction {
|
||||
Transaction {
|
||||
hash: log.receipt.transaction.hash.clone(),
|
||||
from: log.receipt.transaction.from.clone(),
|
||||
to: log.receipt.transaction.to.clone(),
|
||||
index: Into::<u64>::into(log.receipt.transaction.index),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn emit_deltas(
|
||||
log: LogView,
|
||||
pools_store: StoreGetInt64,
|
||||
tokens_store: StoreGetString,
|
||||
) -> Option<impl Iterator<Item = BalanceDelta>> {
|
||||
if let Some(event) = abi::pool::events::TokenExchange::match_and_decode(log) {
|
||||
if pools_store
|
||||
.get_last(format!("pool:{0}", hex::encode(&log.address())))
|
||||
.is_none()
|
||||
{
|
||||
return None;
|
||||
}
|
||||
let tokens_bought_delta: BigInt = event.tokens_bought * -1;
|
||||
return Some(
|
||||
vec![
|
||||
BalanceDelta {
|
||||
ord: log.log.ordinal,
|
||||
tx: Some(tx_from_log(&log)),
|
||||
token: event.sold_id.to_signed_bytes_be(),
|
||||
delta: event.tokens_sold.to_signed_bytes_be(),
|
||||
component_id: log.address().into(),
|
||||
},
|
||||
BalanceDelta {
|
||||
ord: log.log.ordinal,
|
||||
tx: Some(tx_from_log(&log)),
|
||||
token: event.bought_id.to_signed_bytes_be(),
|
||||
delta: tokens_bought_delta.to_signed_bytes_be(),
|
||||
component_id: log.address().into(),
|
||||
},
|
||||
]
|
||||
.into_iter(),
|
||||
)
|
||||
} else if let Some(event) = abi::pool::events::AddLiquidity::match_and_decode(log) {
|
||||
let tokens = tokens_store
|
||||
.get_last(format!("pool:{0}", hex::encode(log.address())))?
|
||||
.split(":")
|
||||
.map(|token| token.to_owned()) // Clone the tokens
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let deltas: Vec<_> = event
|
||||
.token_amounts
|
||||
.iter()
|
||||
.zip(tokens)
|
||||
.map(move |(token_amount, token_id)| BalanceDelta {
|
||||
ord: log.log.ordinal,
|
||||
tx: Some(tx_from_log(&log)),
|
||||
token: token_id.into(),
|
||||
delta: token_amount.to_signed_bytes_be(),
|
||||
component_id: log.address().into(),
|
||||
})
|
||||
.collect();
|
||||
|
||||
return Some(deltas.into_iter());
|
||||
} else if let Some(event) = abi::pool::events::RemoveLiquidity::match_and_decode(log) {
|
||||
let tokens = tokens_store
|
||||
.get_last(format!("pool:{0}", hex::encode(log.address())))?
|
||||
.split(":")
|
||||
.map(|token| token.to_owned()) // Clone the tokens
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let deltas: Vec<_> = event
|
||||
.token_amounts
|
||||
.iter()
|
||||
.zip(tokens)
|
||||
.map(move |(token_amount, token_id)| BalanceDelta {
|
||||
ord: log.log.ordinal,
|
||||
tx: Some(tx_from_log(&log)),
|
||||
token: token_id.into(),
|
||||
delta: token_amount.to_signed_bytes_be(),
|
||||
component_id: log.address().into(),
|
||||
})
|
||||
.collect();
|
||||
|
||||
return Some(deltas.into_iter());
|
||||
}
|
||||
|
||||
deltas.extend(
|
||||
block
|
||||
.logs()
|
||||
.filter_map(|log| {
|
||||
let event = abi::pool::events::AddLiquidity::match_and_decode(log)?;
|
||||
Some((log, event))
|
||||
})
|
||||
.filter_map(|(log, event)| {
|
||||
let tokens = tokens_store
|
||||
.get_last(format!("pool:{0}", hex::encode(log.address())))?
|
||||
.split(":")
|
||||
.map(|token| token.to_owned()) // Clone the tokens
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Some((tokens, log, event))
|
||||
})
|
||||
.flat_map(|(tokens, log, event)| {
|
||||
event
|
||||
.token_amounts
|
||||
.iter()
|
||||
.zip(tokens)
|
||||
.map(move |(token_amount, token_id)| BalanceDelta {
|
||||
ord: log.log.ordinal,
|
||||
tx: Some(tx_from_log(&log)),
|
||||
token: token_id.into(),
|
||||
delta: token_amount.to_signed_bytes_be(),
|
||||
component_id: log.address().into(),
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
);
|
||||
|
||||
deltas.extend(
|
||||
block
|
||||
.logs()
|
||||
.filter_map(|log| {
|
||||
let event = abi::pool::events::RemoveLiquidity::match_and_decode(log)?;
|
||||
Some((log, event))
|
||||
})
|
||||
.filter(|(log, _)| {
|
||||
pools_store
|
||||
.get_last(format!("pool:{0}", hex::encode(&log.address())))
|
||||
.is_none()
|
||||
})
|
||||
.flat_map(|(log, event)| {
|
||||
let tokens = tokens_store
|
||||
.get_last(format!("pool:{}", hex::encode(log.address())))
|
||||
.unwrap()
|
||||
.split(":")
|
||||
.map(|token| token.to_owned()) // Clone the tokens
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
event
|
||||
.token_amounts
|
||||
.iter()
|
||||
.zip(tokens)
|
||||
.map(move |(token_amount, token_id)| {
|
||||
let negative_token_amount: BigInt = token_amount * BigInt::from(-1);
|
||||
BalanceDelta {
|
||||
ord: log.log.ordinal,
|
||||
tx: Some(tx_from_log(&log)),
|
||||
token: token_id.into(),
|
||||
delta: negative_token_amount.to_signed_bytes_be(),
|
||||
component_id: log.address().into(),
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
);
|
||||
deltas
|
||||
}
|
||||
@@ -1,18 +1,18 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use serde::Deserialize;
|
||||
use std::{collections::HashMap, iter::zip};
|
||||
use substreams_ethereum::pb::eth;
|
||||
use tycho_substreams::prelude::*;
|
||||
|
||||
const PARAMS_SEPERATOR: &str = ",";
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[derive(Debug, Deserialize, PartialEq)]
|
||||
struct PoolQueryParams {
|
||||
address: String,
|
||||
tx_hash: String,
|
||||
tokens: Vec<String>,
|
||||
attributes: Vec<(String, String)>,
|
||||
attribute_keys: Vec<String>,
|
||||
attribute_vals: Vec<String>,
|
||||
}
|
||||
|
||||
/// This function parses the `params` string and extracts the pool query parameters. `params` are
|
||||
@@ -32,19 +32,15 @@ pub fn emit_specific_pools(
|
||||
params: &String,
|
||||
block: ð::v2::Block,
|
||||
) -> Result<Vec<ProtocolComponent>> {
|
||||
let pools: HashMap<String, PoolQueryParams> = params
|
||||
.split(PARAMS_SEPERATOR)
|
||||
.map(|param| {
|
||||
// TODO UNSAFE
|
||||
let pool: PoolQueryParams = serde_qs::from_str(¶m)
|
||||
.with_context(|| format!("Failed to parse pool query params: {0}", param))?;
|
||||
Ok((pool.tx_hash.clone(), pool))
|
||||
})
|
||||
.collect::<Result<HashMap<_, _>>>()
|
||||
.with_context(|| "Failed to parse all pool query params")?;
|
||||
let pools = parse_params(params)?;
|
||||
create_components(block, pools)
|
||||
}
|
||||
|
||||
fn create_components(
|
||||
block: ð::v2::Block,
|
||||
pools: HashMap<String, PoolQueryParams>,
|
||||
) -> Result<Vec<ProtocolComponent>, anyhow::Error> {
|
||||
let mut components: Vec<ProtocolComponent> = vec![];
|
||||
|
||||
for tx in block.transactions() {
|
||||
let encoded_hash = hex::encode(tx.hash.clone());
|
||||
if let Some(pool) = pools.get(&encoded_hash) {
|
||||
@@ -63,16 +59,18 @@ pub fn emit_specific_pools(
|
||||
.map(|token| Result::Ok(hex::decode(token)?))
|
||||
.collect::<Result<Vec<_>>>()
|
||||
.with_context(|| "Token addresses were not formatted properly")?,
|
||||
static_att: pool
|
||||
.attributes
|
||||
.clone()
|
||||
.into_iter()
|
||||
.map(|attr| Attribute {
|
||||
name: attr.0,
|
||||
value: attr.1.into(),
|
||||
change: ChangeType::Creation.into(),
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
static_att: zip(
|
||||
pool.attribute_keys.clone().into_iter(),
|
||||
pool.attribute_vals.clone().into_iter(),
|
||||
)
|
||||
.clone()
|
||||
.into_iter()
|
||||
.map(|(key, value)| Attribute {
|
||||
name: key,
|
||||
value: value.into(),
|
||||
change: ChangeType::Creation.into(),
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
contracts: vec![hex::decode(pool.address.clone())
|
||||
.with_context(|| "Pool address was not formatted properly")?],
|
||||
change: ChangeType::Creation.into(),
|
||||
@@ -88,3 +86,46 @@ pub fn emit_specific_pools(
|
||||
}
|
||||
Ok(components)
|
||||
}
|
||||
|
||||
fn parse_params(params: &String) -> Result<HashMap<String, PoolQueryParams>, anyhow::Error> {
|
||||
let pools: HashMap<String, PoolQueryParams> = params
|
||||
.split(PARAMS_SEPERATOR)
|
||||
.map(|param| {
|
||||
let pool: PoolQueryParams = serde_qs::from_str(¶m)
|
||||
.with_context(|| format!("Failed to parse pool query params: {0}", param))?;
|
||||
Ok((pool.tx_hash.clone(), pool))
|
||||
})
|
||||
.collect::<Result<HashMap<_, _>>>()
|
||||
.with_context(|| "Failed to parse all pool query params")?;
|
||||
Ok(pools)
|
||||
}
|
||||
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_parse_params() {
|
||||
// Existing test case
|
||||
let params = "address=0x5F890841f657d90E081bAbdB532A05996Af79Fe6&tx_hash=0xb71a66c1d93c525a2dd19a8db0da19e65be04f36e733af7f03e3c9dff41aa16a&tokens[]=0x6b175474e89094c44da98b954eedeac495271d0f&tokens[]=0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48&tokens[]=0xdac17f958d2ee523a2206206994597c13d831ec7&attribute_keys[]=key1&attribute_vals[]=val1".to_string();
|
||||
let expected_result = {
|
||||
let mut map = HashMap::new();
|
||||
map.insert(
|
||||
"0xb71a66c1d93c525a2dd19a8db0da19e65be04f36e733af7f03e3c9dff41aa16a".to_string(),
|
||||
PoolQueryParams {
|
||||
address: "0x5F890841f657d90E081bAbdB532A05996Af79Fe6".to_string(),
|
||||
tx_hash: "0xb71a66c1d93c525a2dd19a8db0da19e65be04f36e733af7f03e3c9dff41aa16a"
|
||||
.to_string(),
|
||||
tokens: vec![
|
||||
"0x6b175474e89094c44da98b954eedeac495271d0f".to_string(),
|
||||
"0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48".to_string(),
|
||||
"0xdac17f958d2ee523a2206206994597c13d831ec7".to_string(),
|
||||
],
|
||||
attribute_keys: vec!["key1".to_string()],
|
||||
attribute_vals: vec!["val1".to_string()],
|
||||
},
|
||||
);
|
||||
map
|
||||
};
|
||||
assert_eq!(parse_params(¶ms).unwrap(), expected_result);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user