sdk: update to latest sdk

This commit is contained in:
0xMochan
2024-03-29 14:55:31 -05:00
parent 448a153c6e
commit 2feee8add6
13 changed files with 228 additions and 805 deletions

View File

@@ -1,31 +1,30 @@
use std::collections::HashMap;
use anyhow::Result;
use substreams::pb::substreams::StoreDeltas;
use substreams::store::{
StoreAdd, StoreAddBigInt, StoreAddInt64, StoreGet, StoreGetInt64, StoreGetString, StoreNew,
StoreSet, StoreSetString,
use itertools::Itertools;
use substreams::{
pb::substreams::StoreDeltas,
store::{
StoreAdd, StoreAddBigInt, StoreAddInt64, StoreGet, StoreGetInt64, StoreGetString, StoreNew,
StoreSet, StoreSetString,
},
};
use substreams::key;
use substreams::scalar::BigInt;
use substreams::{key, scalar::BigInt};
use substreams_ethereum::block_view::LogView;
use substreams_ethereum::pb::eth;
use substreams_ethereum::{block_view::LogView, pb::eth};
use itertools::Itertools;
use pb::tycho::evm::v1::{self as tycho};
use contract_changes::extract_contract_changes;
use substreams_ethereum::Event;
use crate::{abi, contract_changes, pb, pool_factories};
use std::convert::TryInto;
use crate::{abi, pool_factories};
use tycho_substreams::{
balances::store_balance_changes, contract::extract_contract_changes, prelude::*,
};
/// This struct purely exists to spoof the `PartialEq` trait for `Transaction` so we can use it in
/// a later groupby operation.
#[derive(Debug)]
struct TransactionWrapper(tycho::Transaction);
struct TransactionWrapper(Transaction);
impl PartialEq for TransactionWrapper {
fn eq(&self, other: &Self) -> bool {
@@ -33,8 +32,8 @@ impl PartialEq for TransactionWrapper {
}
}
fn tx_from_log(log: &LogView) -> tycho::Transaction {
tycho::Transaction {
fn tx_from_log(log: &LogView) -> Transaction {
Transaction {
hash: log.receipt.transaction.hash.clone(),
from: log.receipt.transaction.from.clone(),
to: log.receipt.transaction.to.clone(),
@@ -43,12 +42,10 @@ fn tx_from_log(log: &LogView) -> tycho::Transaction {
}
#[substreams::handlers::map]
pub fn map_pools_created(
block: eth::v2::Block,
) -> Result<tycho::GroupedTransactionProtocolComponents> {
pub fn map_components(block: eth::v2::Block) -> Result<BlockTransactionProtocolComponents> {
// Gather contract changes by indexing `PoolCreated` events and analysing the `Create` call
// We store these as a hashmap by tx hash since we need to agg by tx hash later
Ok(tycho::GroupedTransactionProtocolComponents {
Ok(BlockTransactionProtocolComponents {
tx_components: block
.transactions()
.filter_map(|tx| {
@@ -57,16 +54,21 @@ pub fn map_pools_created(
.filter(|(_, call)| !call.call.state_reverted)
.filter_map(|(log, call)| {
Some(pool_factories::address_map(
call.call.address.as_slice().try_into().ok()?, // this shouldn't fail
call.call
.address
.as_slice()
.try_into()
.ok()?, // this shouldn't fail
log,
call.call,
tx,
)?)
})
.collect::<Vec<_>>();
if !components.is_empty() {
Some(tycho::TransactionProtocolComponents {
tx: Some(tycho::Transaction {
Some(TransactionProtocolComponents {
tx: Some(Transaction {
hash: tx.hash.clone(),
from: tx.from.clone(),
to: tx.to.clone(),
@@ -84,7 +86,7 @@ pub fn map_pools_created(
/// Simply stores the `ProtocolComponent`s with the pool id as the key
#[substreams::handlers::store]
pub fn store_pools_created(map: tycho::GroupedTransactionProtocolComponents, store: StoreAddInt64) {
pub fn store_components(map: BlockTransactionProtocolComponents, store: StoreAddInt64) {
store.add_many(
0,
&map.tx_components
@@ -98,7 +100,7 @@ pub fn store_pools_created(map: tycho::GroupedTransactionProtocolComponents, sto
/// Simply stores the `ProtocolComponent`s with the pool id as the key
#[substreams::handlers::store]
pub fn store_pools_tokens(map: tycho::GroupedTransactionProtocolComponents, store: StoreSetString) {
pub fn store_component_tokens(map: BlockTransactionProtocolComponents, store: StoreSetString) {
map.tx_components
.iter()
.flat_map(|tx_components| &tx_components.components)
@@ -118,11 +120,11 @@ pub fn store_pools_tokens(map: tycho::GroupedTransactionProtocolComponents, stor
/// Since the `PoolBalanceChanged` events administer only deltas, we need to leverage a map and a
/// store to be able to tally up final balances for tokens in a pool.
#[substreams::handlers::map]
pub fn map_balance_deltas(
pub fn map_relative_balances(
block: eth::v2::Block,
pools_store: StoreGetInt64,
tokens_store: StoreGetString,
) -> Result<tycho::BalanceDeltas, anyhow::Error> {
) -> Result<BlockBalanceDeltas, anyhow::Error> {
let mut deltas = block
.logs()
.filter_map(|log| {
@@ -137,14 +139,14 @@ pub fn map_balance_deltas(
.flat_map(|(log, event)| {
let tokens_bought_delta: BigInt = event.tokens_bought * -1;
vec![
tycho::BalanceDelta {
BalanceDelta {
ord: log.log.ordinal,
tx: Some(tx_from_log(&log)),
token: event.sold_id.to_signed_bytes_be(),
delta: event.tokens_sold.to_signed_bytes_be(),
component_id: log.address().into(),
},
tycho::BalanceDelta {
BalanceDelta {
ord: log.log.ordinal,
tx: Some(tx_from_log(&log)),
token: event.bought_id.to_signed_bytes_be(),
@@ -178,7 +180,7 @@ pub fn map_balance_deltas(
.token_amounts
.iter()
.zip(tokens)
.map(move |(token_amount, token_id)| tycho::BalanceDelta {
.map(move |(token_amount, token_id)| BalanceDelta {
ord: log.log.ordinal,
tx: Some(tx_from_log(&log)),
token: token_id.into(),
@@ -216,7 +218,7 @@ pub fn map_balance_deltas(
.zip(tokens)
.map(move |(token_amount, token_id)| {
let negative_token_amount: BigInt = token_amount * BigInt::from(-1);
tycho::BalanceDelta {
BalanceDelta {
ord: log.log.ordinal,
tx: Some(tx_from_log(&log)),
token: token_id.into(),
@@ -229,46 +231,33 @@ pub fn map_balance_deltas(
.collect::<Vec<_>>(),
);
Ok(tycho::BalanceDeltas {
balance_deltas: deltas,
})
Ok(BlockBalanceDeltas { balance_deltas: deltas })
}
/// It's significant to include both the `pool_id` and the `token_id` for each balance delta as the
/// store key to ensure that there's a unique balance being tallied for each.
#[substreams::handlers::store]
pub fn store_balance_changes(deltas: tycho::BalanceDeltas, store: StoreAddBigInt) {
deltas.balance_deltas.iter().for_each(|delta| {
store.add(
delta.ord,
format!(
"pool:{0}:token:{1}",
hex::encode(&delta.component_id),
hex::encode(&delta.token)
),
BigInt::from_signed_bytes_be(&delta.delta),
);
});
pub fn store_balance(deltas: BlockBalanceDeltas, store: StoreAddBigInt) {
store_balance_changes(deltas, store)
}
/// This is the main map that handles most of the indexing of this substream.
/// Every contract change is grouped by transaction index via the `transaction_contract_changes`
/// map. Each block of code will extend the `TransactionContractChanges` struct with the
/// cooresponding changes (balance, component, contract), inserting a new one if it doesn't exist.
/// At the very end, the map can easily be sorted by index to ensure the final `BlockContractChanges`
/// is ordered by transactions properly.
/// At the very end, the map can easily be sorted by index to ensure the final
/// `BlockContractChanges` is ordered by transactions properly.
#[substreams::handlers::map]
pub fn map_changes(
pub fn map_protocol_changes(
block: eth::v2::Block,
grouped_components: tycho::GroupedTransactionProtocolComponents,
deltas: tycho::BalanceDeltas,
grouped_components: BlockTransactionProtocolComponents,
deltas: BlockBalanceDeltas,
components_store: StoreGetInt64,
balance_store: StoreDeltas, // Note, this map module is using the `deltas` mode for the store.
) -> Result<tycho::BlockContractChanges> {
) -> Result<BlockContractChanges> {
// We merge contract changes by transaction (identified by transaction index) making it easy to
// sort them at the very end.
let mut transaction_contract_changes: HashMap<_, tycho::TransactionContractChanges> =
HashMap::new();
let mut transaction_contract_changes: HashMap<_, TransactionContractChanges> = HashMap::new();
// `ProtocolComponents` are gathered from `map_pools_created` which just need a bit of work to
// convert into `TransactionContractChanges`
@@ -280,7 +269,7 @@ pub fn map_changes(
transaction_contract_changes
.entry(tx.index)
.or_insert_with(|| tycho::TransactionContractChanges {
.or_insert_with(|| TransactionContractChanges {
tx: Some(tx.clone()),
contract_changes: vec![],
component_changes: vec![],
@@ -303,7 +292,7 @@ pub fn map_changes(
let token_id = key::segment_at(&store_delta.key, 3);
(
balance_delta.tx.unwrap(),
tycho::BalanceChange {
BalanceChange {
token: hex::decode(token_id).expect("Token ID not valid hex"),
balance: store_delta.new_value,
component_id: hex::decode(pool_id).expect("Token ID not valid hex"),
@@ -318,7 +307,7 @@ pub fn map_changes(
transaction_contract_changes
.entry(tx.index)
.or_insert_with(|| tycho::TransactionContractChanges {
.or_insert_with(|| TransactionContractChanges {
tx: Some(tx.clone()),
contract_changes: vec![],
component_changes: vec![],
@@ -331,12 +320,20 @@ pub fn map_changes(
// General helper for extracting contract changes. Uses block, our component store which holds
// all of our tracked deployed pool addresses, and the map of tx contract changes which we
// output into for final processing later.
extract_contract_changes(&block, components_store, &mut transaction_contract_changes);
extract_contract_changes(
&block,
|addr| {
components_store
.get_last(format!("pool:0x{0}", hex::encode(addr)))
.is_some()
},
&mut transaction_contract_changes,
);
// Process all `transaction_contract_changes` for final output in the `BlockContractChanges`,
// sorted by transaction index (the key).
Ok(tycho::BlockContractChanges {
block: Some(tycho::Block {
Ok(BlockContractChanges {
block: Some(Block {
number: block.number,
hash: block.hash.clone(),
parent_hash: block
@@ -351,9 +348,9 @@ pub fn map_changes(
.drain()
.sorted_unstable_by_key(|(index, _)| index.clone())
.filter_map(|(_, change)| {
if change.contract_changes.is_empty()
&& change.component_changes.is_empty()
&& change.balance_changes.is_empty()
if change.contract_changes.is_empty() &&
change.component_changes.is_empty() &&
change.balance_changes.is_empty()
{
None
} else {