chore: clean up ambient (#136)
* fix: remove unnecessary tx field in ProtocolComponent * chore: move ambient protobuf files to ambient module * chore: remove dependency on common message types This allows us to isolate the ambient specific messages within the ambient module * feat: update ambient substream with new message structs * chore: update substream configs And remove use of deprecated BlockContractChanges. * feat: implement From for AmbientProtocolComponent to ProtocolComponent
This commit is contained in:
@@ -20,24 +20,17 @@ use crate::{
|
||||
decode_warm_path_user_cmd_call, AMBIENT_WARMPATH_CONTRACT, USER_CMD_WARMPATH_FN_SIG,
|
||||
},
|
||||
},
|
||||
pb::tycho::ambient::v1::{AmbientBalanceDelta, BlockPoolChanges},
|
||||
utils::from_u256_to_vec,
|
||||
};
|
||||
use tycho_substreams::{
|
||||
models::{AmbientBalanceDelta, BlockPoolChanges},
|
||||
prelude::Transaction,
|
||||
};
|
||||
|
||||
#[substreams::handlers::map]
|
||||
fn map_pool_changes(block: eth::v2::Block) -> Result<BlockPoolChanges, substreams::errors::Error> {
|
||||
let mut protocol_components = Vec::new();
|
||||
let mut balance_deltas = Vec::new();
|
||||
let mut protocol_components = Vec::new();
|
||||
|
||||
for block_tx in block.transactions() {
|
||||
let tx = Transaction {
|
||||
hash: block_tx.hash.clone(),
|
||||
from: block_tx.from.clone(),
|
||||
to: block_tx.to.clone(),
|
||||
index: block_tx.index as u64,
|
||||
};
|
||||
let tx_index = block_tx.index as u64;
|
||||
// extract storage changes
|
||||
let mut storage_changes = block_tx
|
||||
.calls
|
||||
@@ -66,7 +59,7 @@ fn map_pool_changes(block: eth::v2::Block) -> Result<BlockPoolChanges, substream
|
||||
|
||||
if call.address == AMBIENT_CONTRACT && selector == USER_CMD_FN_SIG {
|
||||
// Extract pool creations
|
||||
if let Some(protocol_component) = decode_pool_init(call, tx.clone())? {
|
||||
if let Some(protocol_component) = decode_pool_init(call, tx_index)? {
|
||||
protocol_components.push(protocol_component);
|
||||
}
|
||||
}
|
||||
@@ -109,19 +102,18 @@ fn map_pool_changes(block: eth::v2::Block) -> Result<BlockPoolChanges, substream
|
||||
token_type: "base".to_string(),
|
||||
token_delta: from_u256_to_vec(base_flow),
|
||||
ordinal: call.index as u64,
|
||||
tx: Some(tx.clone()),
|
||||
tx_index,
|
||||
};
|
||||
let quote_balance_delta = AmbientBalanceDelta {
|
||||
pool_hash: Vec::from(pool_hash),
|
||||
token_type: "quote".to_string(),
|
||||
token_delta: from_u256_to_vec(quote_flow),
|
||||
ordinal: call.index as u64,
|
||||
tx: Some(tx.clone()),
|
||||
tx_index,
|
||||
};
|
||||
balance_deltas.extend([base_balance_delta.clone(), quote_balance_delta.clone()]);
|
||||
}
|
||||
}
|
||||
balance_deltas.sort_by_key(|delta| (delta.ordinal, delta.token_type.clone()));
|
||||
let pool_changes = BlockPoolChanges { protocol_components, balance_deltas };
|
||||
Ok(pool_changes)
|
||||
Ok(BlockPoolChanges { balance_deltas, new_components: protocol_components })
|
||||
}
|
||||
|
||||
@@ -2,12 +2,12 @@ use substreams::{
|
||||
scalar::BigInt,
|
||||
store::{StoreAdd, StoreAddBigInt, StoreNew},
|
||||
};
|
||||
use tycho_substreams::models::BlockPoolChanges;
|
||||
|
||||
use crate::pb::tycho::ambient::v1::BlockPoolChanges;
|
||||
|
||||
#[substreams::handlers::store]
|
||||
pub fn store_pool_balances(changes: BlockPoolChanges, balance_store: StoreAddBigInt) {
|
||||
let deltas = changes.balance_deltas.clone();
|
||||
for balance_delta in deltas {
|
||||
for balance_delta in changes.balance_deltas {
|
||||
let pool_hash_hex = hex::encode(&balance_delta.pool_hash);
|
||||
balance_store.add(
|
||||
balance_delta.ordinal,
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
use substreams::store::{StoreNew, StoreSet, StoreSetProto};
|
||||
use tycho_substreams::models::{BlockPoolChanges, ProtocolComponent};
|
||||
use tycho_substreams::models::ProtocolComponent;
|
||||
|
||||
use crate::pb::tycho::ambient::v1::BlockPoolChanges;
|
||||
|
||||
#[substreams::handlers::store]
|
||||
pub fn store_pools(changes: BlockPoolChanges, component_store: StoreSetProto<ProtocolComponent>) {
|
||||
for component in changes.protocol_components {
|
||||
component_store.set(0, component.id.clone(), &component);
|
||||
for component in changes.new_components {
|
||||
let protocol_component: ProtocolComponent = component.into();
|
||||
component_store.set(0, protocol_component.id.clone(), &protocol_component);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,14 +3,16 @@ use std::{
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
str::FromStr,
|
||||
};
|
||||
use substreams::pb::substreams::StoreDeltas;
|
||||
|
||||
use substreams::{
|
||||
pb::substreams::StoreDeltas,
|
||||
store::{StoreGet, StoreGetProto},
|
||||
};
|
||||
use substreams_ethereum::pb::eth::{self};
|
||||
|
||||
use crate::contracts::main::AMBIENT_CONTRACT;
|
||||
use substreams::store::{StoreGet, StoreGetProto};
|
||||
use tycho_substreams::prelude::*;
|
||||
|
||||
use crate::{contracts::main::AMBIENT_CONTRACT, pb::tycho::ambient::v1::BlockPoolChanges};
|
||||
|
||||
struct SlotValue {
|
||||
new_value: Vec<u8>,
|
||||
start_value: Vec<u8>,
|
||||
@@ -74,10 +76,10 @@ fn map_changes(
|
||||
block_pool_changes: BlockPoolChanges,
|
||||
balance_store: StoreDeltas,
|
||||
pool_store: StoreGetProto<ProtocolComponent>,
|
||||
) -> Result<BlockContractChanges, substreams::errors::Error> {
|
||||
let mut block_changes = BlockContractChanges::default();
|
||||
) -> Result<BlockChanges, substreams::errors::Error> {
|
||||
let mut block_changes = BlockChanges::default();
|
||||
|
||||
let mut tx_change = TransactionContractChanges::default();
|
||||
let mut transaction_changes = TransactionChanges::default();
|
||||
|
||||
let mut changed_contracts: HashMap<Vec<u8>, InterimContractChange> = HashMap::new();
|
||||
|
||||
@@ -93,6 +95,13 @@ fn map_changes(
|
||||
.collect();
|
||||
|
||||
for block_tx in block.transactions() {
|
||||
let tx = Transaction {
|
||||
hash: block_tx.hash.clone(),
|
||||
from: block_tx.from.clone(),
|
||||
to: block_tx.to.clone(),
|
||||
index: block_tx.index as u64,
|
||||
};
|
||||
|
||||
// extract storage changes
|
||||
let mut storage_changes = block_tx
|
||||
.calls
|
||||
@@ -244,44 +253,37 @@ fn map_changes(
|
||||
|
||||
// if there were any changes, add transaction and push the changes
|
||||
if !storage_changes.is_empty() || !balance_changes.is_empty() || !code_changes.is_empty() {
|
||||
tx_change.tx = Some(Transaction {
|
||||
hash: block_tx.hash.clone(),
|
||||
from: block_tx.from.clone(),
|
||||
to: block_tx.to.clone(),
|
||||
index: block_tx.index as u64,
|
||||
});
|
||||
transaction_changes.tx = Some(tx.clone());
|
||||
|
||||
// reuse changed_contracts hash map by draining it, next iteration
|
||||
// will start empty. This avoids a costly reallocation
|
||||
for (_, change) in changed_contracts.drain() {
|
||||
tx_change
|
||||
transaction_changes
|
||||
.contract_changes
|
||||
.push(change.into())
|
||||
}
|
||||
|
||||
block_changes
|
||||
.changes
|
||||
.push(tx_change.clone());
|
||||
.push(transaction_changes.clone());
|
||||
|
||||
// clear out the interim contract changes after we pushed those.
|
||||
tx_change.tx = None;
|
||||
tx_change.contract_changes.clear();
|
||||
transaction_changes.tx = None;
|
||||
transaction_changes
|
||||
.contract_changes
|
||||
.clear();
|
||||
}
|
||||
}
|
||||
|
||||
// extract new protocol components
|
||||
let mut grouped_components = HashMap::new();
|
||||
for component in &block_pool_changes.protocol_components {
|
||||
let tx_hash = component
|
||||
.tx
|
||||
.clone()
|
||||
.expect("Transaction is missing")
|
||||
.hash;
|
||||
for component in &block_pool_changes.new_components {
|
||||
grouped_components
|
||||
.entry(tx_hash)
|
||||
.entry(component.tx_index)
|
||||
.or_insert_with(Vec::new)
|
||||
.push(component.clone());
|
||||
}
|
||||
|
||||
for (tx_hash, components) in grouped_components {
|
||||
for (tx_index, components) in grouped_components {
|
||||
if let Some(tx_change) = block_changes
|
||||
.changes
|
||||
.iter_mut()
|
||||
@@ -290,14 +292,20 @@ fn map_changes(
|
||||
tx_change
|
||||
.tx
|
||||
.as_ref()
|
||||
.is_some_and(|tx| tx.hash == tx_hash)
|
||||
.is_some_and(|tx| tx.index == tx_index)
|
||||
})
|
||||
{
|
||||
let new_components = components
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect::<Vec<ProtocolComponent>>();
|
||||
tx_change
|
||||
.component_changes
|
||||
.extend(components);
|
||||
.extend(new_components);
|
||||
}
|
||||
}
|
||||
|
||||
// extract component balance changes
|
||||
let mut balance_changes = HashMap::new();
|
||||
balance_store
|
||||
.deltas
|
||||
@@ -323,17 +331,12 @@ fn map_changes(
|
||||
token: pool.tokens[token_index].clone(),
|
||||
balance: big_endian_bytes_balance.to_vec(),
|
||||
};
|
||||
let tx_hash = balance_delta
|
||||
.tx
|
||||
.expect("Transaction is missing")
|
||||
.hash;
|
||||
balance_changes
|
||||
.entry(tx_hash)
|
||||
.entry(balance_delta.tx_index)
|
||||
.or_insert_with(Vec::new)
|
||||
.push(balance_change);
|
||||
});
|
||||
|
||||
for (tx_hash, grouped_balance_changes) in balance_changes {
|
||||
for (tx_index, grouped_balance_changes) in balance_changes {
|
||||
if let Some(tx_change) = block_changes
|
||||
.changes
|
||||
.iter_mut()
|
||||
@@ -342,7 +345,7 @@ fn map_changes(
|
||||
tx_change
|
||||
.tx
|
||||
.as_ref()
|
||||
.is_some_and(|tx| tx.hash == tx_hash)
|
||||
.is_some_and(|tx| tx.index == tx_index)
|
||||
})
|
||||
{
|
||||
tx_change
|
||||
|
||||
Reference in New Issue
Block a user