refactor(substreams): Update ambient Substreams

This commit is contained in:
zizou
2024-10-14 18:09:17 +02:00
parent aff76f8cc7
commit 42f2f45aa7
31 changed files with 1567 additions and 724 deletions

View File

@@ -0,0 +1,127 @@
use substreams_ethereum::pb::eth::{self};
use crate::{
contracts::{
hotproxy::{
decode_direct_swap_hotproxy_call, AMBIENT_HOTPROXY_CONTRACT, USER_CMD_HOTPROXY_FN_SIG,
},
knockout::{decode_knockout_call, AMBIENT_KNOCKOUT_CONTRACT, USER_CMD_KNOCKOUT_FN_SIG},
main::{
decode_direct_swap_call, decode_pool_init, AMBIENT_CONTRACT, SWAP_FN_SIG,
USER_CMD_FN_SIG,
},
micropaths::{
decode_burn_ambient_call, decode_burn_range_call, decode_mint_ambient_call,
decode_mint_range_call, decode_sweep_swap_call, AMBIENT_MICROPATHS_CONTRACT,
BURN_AMBIENT_FN_SIG, BURN_RANGE_FN_SIG, MINT_AMBIENT_FN_SIG, MINT_RANGE_FN_SIG,
SWEEP_SWAP_FN_SIG,
},
warmpath::{
decode_warm_path_user_cmd_call, AMBIENT_WARMPATH_CONTRACT, USER_CMD_WARMPATH_FN_SIG,
},
},
utils::from_u256_to_vec,
};
use tycho_substreams::{
models::{AmbientBalanceDelta, BlockPoolChanges},
prelude::Transaction,
};
#[substreams::handlers::map]
fn map_pool_changes(block: eth::v2::Block) -> Result<BlockPoolChanges, substreams::errors::Error> {
let mut protocol_components = Vec::new();
let mut balance_deltas = Vec::new();
for block_tx in block.transactions() {
let tx = Transaction {
hash: block_tx.hash.clone(),
from: block_tx.from.clone(),
to: block_tx.to.clone(),
index: block_tx.index as u64,
};
// extract storage changes
let mut storage_changes = block_tx
.calls
.iter()
.filter(|call| !call.state_reverted)
.flat_map(|call| {
call.storage_changes
.iter()
.filter(|c| c.address == AMBIENT_CONTRACT)
})
.collect::<Vec<_>>();
storage_changes.sort_unstable_by_key(|change| change.ordinal);
let block_calls = block_tx
.calls
.iter()
.filter(|call| !call.state_reverted)
.collect::<Vec<_>>();
for call in block_calls {
if call.input.len() < 4 {
continue;
}
let selector: [u8; 4] = call.input[0..4].try_into().unwrap();
let address: [u8; 20] = call.address.clone().try_into().unwrap();
if call.address == AMBIENT_CONTRACT && selector == USER_CMD_FN_SIG {
// Extract pool creations
if let Some(protocol_component) = decode_pool_init(call, tx.clone())? {
protocol_components.push(protocol_component);
}
}
// Extract TVL changes
let result = match (address, selector) {
(AMBIENT_CONTRACT, SWAP_FN_SIG) => Some(decode_direct_swap_call(call)?),
(AMBIENT_HOTPROXY_CONTRACT, USER_CMD_HOTPROXY_FN_SIG) => {
Some(decode_direct_swap_hotproxy_call(call)?)
}
(AMBIENT_MICROPATHS_CONTRACT, SWEEP_SWAP_FN_SIG) => {
Some(decode_sweep_swap_call(call)?)
}
(AMBIENT_WARMPATH_CONTRACT, USER_CMD_WARMPATH_FN_SIG) => {
decode_warm_path_user_cmd_call(call)?
}
(AMBIENT_MICROPATHS_CONTRACT, MINT_RANGE_FN_SIG) => {
Some(decode_mint_range_call(call)?)
}
(AMBIENT_MICROPATHS_CONTRACT, MINT_AMBIENT_FN_SIG) => {
Some(decode_mint_ambient_call(call)?)
}
(AMBIENT_MICROPATHS_CONTRACT, BURN_RANGE_FN_SIG) => {
Some(decode_burn_range_call(call)?)
}
(AMBIENT_MICROPATHS_CONTRACT, BURN_AMBIENT_FN_SIG) => {
Some(decode_burn_ambient_call(call)?)
}
(AMBIENT_KNOCKOUT_CONTRACT, USER_CMD_KNOCKOUT_FN_SIG) => {
Some(decode_knockout_call(call)?)
}
_ => None,
};
let (pool_hash, base_flow, quote_flow) = match result {
Some((pool_hash, base_flow, quote_flow)) => (pool_hash, base_flow, quote_flow),
None => continue,
};
let base_balance_delta = AmbientBalanceDelta {
pool_hash: Vec::from(pool_hash),
token_type: "base".to_string(),
token_delta: from_u256_to_vec(base_flow),
ordinal: call.index as u64,
tx: Some(tx.clone()),
};
let quote_balance_delta = AmbientBalanceDelta {
pool_hash: Vec::from(pool_hash),
token_type: "quote".to_string(),
token_delta: from_u256_to_vec(quote_flow),
ordinal: call.index as u64,
tx: Some(tx.clone()),
};
balance_deltas.extend([base_balance_delta.clone(), quote_balance_delta.clone()]);
}
}
balance_deltas.sort_by_key(|delta| (delta.ordinal, delta.token_type.clone()));
let pool_changes = BlockPoolChanges { protocol_components, balance_deltas };
Ok(pool_changes)
}

View File

@@ -0,0 +1,18 @@
use substreams::{
scalar::BigInt,
store::{StoreAdd, StoreAddBigInt, StoreNew},
};
use tycho_substreams::models::BlockPoolChanges;
#[substreams::handlers::store]
pub fn store_pool_balances(changes: BlockPoolChanges, balance_store: StoreAddBigInt) {
let deltas = changes.balance_deltas.clone();
for balance_delta in deltas {
let pool_hash_hex = hex::encode(&balance_delta.pool_hash);
balance_store.add(
balance_delta.ordinal,
format!("{}:{}", pool_hash_hex, balance_delta.token_type),
BigInt::from_signed_bytes_be(&balance_delta.token_delta),
);
}
}

View File

@@ -0,0 +1,9 @@
use substreams::store::{StoreNew, StoreSet, StoreSetProto};
use tycho_substreams::models::{BlockPoolChanges, ProtocolComponent};
#[substreams::handlers::store]
pub fn store_pools(changes: BlockPoolChanges, component_store: StoreSetProto<ProtocolComponent>) {
for component in changes.protocol_components {
component_store.set(0, component.id.clone(), &component);
}
}

View File

@@ -0,0 +1,367 @@
use num_bigint::BigInt;
use std::{
collections::{hash_map::Entry, HashMap},
str::FromStr,
};
use substreams::pb::substreams::StoreDeltas;
use substreams_ethereum::pb::eth::{self};
use crate::contracts::main::AMBIENT_CONTRACT;
use substreams::store::{StoreGet, StoreGetProto};
use tycho_substreams::prelude::*;
struct SlotValue {
new_value: Vec<u8>,
start_value: Vec<u8>,
}
impl SlotValue {
fn has_changed(&self) -> bool {
self.start_value != self.new_value
}
}
// uses a map for slots, protobuf does not
// allow bytes in hashmap keys
struct InterimContractChange {
address: Vec<u8>,
balance: Vec<u8>,
code: Vec<u8>,
slots: HashMap<Vec<u8>, SlotValue>,
change: ChangeType,
}
impl From<InterimContractChange> for ContractChange {
fn from(value: InterimContractChange) -> Self {
ContractChange {
address: value.address,
balance: value.balance,
code: value.code,
slots: value
.slots
.into_iter()
.filter(|(_, value)| value.has_changed())
.map(|(slot, value)| ContractSlot { slot, value: value.new_value })
.collect(),
change: value.change.into(),
}
}
}
/// Extracts all contract changes relevant to vm simulations
///
/// This implementation has currently two major limitations:
/// 1. It is hardwired to only care about changes to the ambient main contract, this is ok for this
/// particular use case but for a more general purpose implementation this is not ideal
/// 2. Changes are processed separately, this means that if there are any side effects between each
/// other (e.g. if account is deleted and then created again in ethereum all the storage is set
/// to 0. So there is a side effect between account creation and contract storage.) these might
/// not be properly accounted for. Most of the time this should not be a major issue but may lead
/// to wrong results so consume this implementation with care. See example below for a concrete
/// case where this is problematic.
///
/// ## A very contrived example:
/// 1. Some existing contract receives a transaction that changes it state, the state is updated
/// 2. Next, this contract has self destruct called on itself
/// 3. The contract is created again using CREATE2 at the same address
/// 4. The contract receives a transaction that changes it state
/// 5. We would emit this as as contract creation with slots set from 1 and from 4, although we
/// should only emit the slots changed from 4.
#[substreams::handlers::map]
fn map_changes(
block: eth::v2::Block,
block_pool_changes: BlockPoolChanges,
balance_store: StoreDeltas,
pool_store: StoreGetProto<ProtocolComponent>,
) -> Result<BlockContractChanges, substreams::errors::Error> {
let mut block_changes = BlockContractChanges::default();
let mut tx_change = TransactionContractChanges::default();
let mut changed_contracts: HashMap<Vec<u8>, InterimContractChange> = HashMap::new();
let created_accounts: HashMap<_, _> = block
.transactions()
.flat_map(|tx| {
tx.calls.iter().flat_map(|call| {
call.account_creations
.iter()
.map(|ac| (&ac.account, ac.ordinal))
})
})
.collect();
for block_tx in block.transactions() {
// extract storage changes
let mut storage_changes = block_tx
.calls
.iter()
.filter(|call| !call.state_reverted)
.flat_map(|call| {
call.storage_changes
.iter()
.filter(|c| c.address == AMBIENT_CONTRACT)
})
.collect::<Vec<_>>();
storage_changes.sort_unstable_by_key(|change| change.ordinal);
// Note: some contracts change slot values and change them back to their
// original value before the transactions ends we remember the initial
// value before the first change and in the end filter found deltas
// that ended up not actually changing anything.
for storage_change in storage_changes.iter() {
match changed_contracts.entry(storage_change.address.clone()) {
// We have already an entry recording a change about this contract
// only append the change about this storage slot
Entry::Occupied(mut e) => {
let contract_change = e.get_mut();
match contract_change
.slots
.entry(storage_change.key.clone())
{
// The storage slot was already changed before, simply
// update new_value
Entry::Occupied(mut v) => {
let slot_value = v.get_mut();
slot_value
.new_value
.copy_from_slice(&storage_change.new_value);
}
// The storage slots is being initialised for the first time
Entry::Vacant(v) => {
v.insert(SlotValue {
new_value: storage_change.new_value.clone(),
start_value: storage_change.old_value.clone(),
});
}
}
}
// Intialise a new contract change after observing a storage change
Entry::Vacant(e) => {
let mut slots = HashMap::new();
slots.insert(
storage_change.key.clone(),
SlotValue {
new_value: storage_change.new_value.clone(),
start_value: storage_change.old_value.clone(),
},
);
e.insert(InterimContractChange {
address: storage_change.address.clone(),
balance: Vec::new(),
code: Vec::new(),
slots,
change: if created_accounts.contains_key(&storage_change.address) {
ChangeType::Creation
} else {
ChangeType::Update
},
});
}
}
}
// extract balance changes
let mut balance_changes = block_tx
.calls
.iter()
.filter(|call| !call.state_reverted)
.flat_map(|call| {
call.balance_changes
.iter()
.filter(|c| c.address == AMBIENT_CONTRACT)
})
.collect::<Vec<_>>();
balance_changes.sort_unstable_by_key(|change| change.ordinal);
for balance_change in balance_changes.iter() {
match changed_contracts.entry(balance_change.address.clone()) {
Entry::Occupied(mut e) => {
let contract_change = e.get_mut();
if let Some(new_balance) = &balance_change.new_value {
contract_change.balance.clear();
contract_change
.balance
.extend_from_slice(&new_balance.bytes);
}
}
Entry::Vacant(e) => {
if let Some(new_balance) = &balance_change.new_value {
e.insert(InterimContractChange {
address: balance_change.address.clone(),
balance: new_balance.bytes.clone(),
code: Vec::new(),
slots: HashMap::new(),
change: if created_accounts.contains_key(&balance_change.address) {
ChangeType::Creation
} else {
ChangeType::Update
},
});
}
}
}
}
// extract code changes
let mut code_changes = block_tx
.calls
.iter()
.filter(|call| !call.state_reverted)
.flat_map(|call| {
call.code_changes
.iter()
.filter(|c| c.address == AMBIENT_CONTRACT)
})
.collect::<Vec<_>>();
code_changes.sort_unstable_by_key(|change| change.ordinal);
for code_change in code_changes.iter() {
match changed_contracts.entry(code_change.address.clone()) {
Entry::Occupied(mut e) => {
let contract_change = e.get_mut();
contract_change.code.clear();
contract_change
.code
.extend_from_slice(&code_change.new_code);
}
Entry::Vacant(e) => {
e.insert(InterimContractChange {
address: code_change.address.clone(),
balance: Vec::new(),
code: code_change.new_code.clone(),
slots: HashMap::new(),
change: if created_accounts.contains_key(&code_change.address) {
ChangeType::Creation
} else {
ChangeType::Update
},
});
}
}
}
// if there were any changes, add transaction and push the changes
if !storage_changes.is_empty() || !balance_changes.is_empty() || !code_changes.is_empty() {
tx_change.tx = Some(Transaction {
hash: block_tx.hash.clone(),
from: block_tx.from.clone(),
to: block_tx.to.clone(),
index: block_tx.index as u64,
});
// reuse changed_contracts hash map by draining it, next iteration
// will start empty. This avoids a costly reallocation
for (_, change) in changed_contracts.drain() {
tx_change
.contract_changes
.push(change.into())
}
block_changes
.changes
.push(tx_change.clone());
// clear out the interim contract changes after we pushed those.
tx_change.tx = None;
tx_change.contract_changes.clear();
}
}
let mut grouped_components = HashMap::new();
for component in &block_pool_changes.protocol_components {
let tx_hash = component
.tx
.clone()
.expect("Transaction is missing")
.hash;
grouped_components
.entry(tx_hash)
.or_insert_with(Vec::new)
.push(component.clone());
}
for (tx_hash, components) in grouped_components {
if let Some(tx_change) = block_changes
.changes
.iter_mut()
// TODO: be better than this (quadratic complexity)
.find(|tx_change| {
tx_change
.tx
.as_ref()
.map_or(false, |tx| tx.hash == tx_hash)
})
{
tx_change
.component_changes
.extend(components);
}
}
let mut balance_changes = HashMap::new();
balance_store
.deltas
.into_iter()
.zip(block_pool_changes.balance_deltas)
.for_each(|(store_delta, balance_delta)| {
let pool_hash_hex = hex::encode(balance_delta.pool_hash);
let pool = match pool_store.get_last(pool_hash_hex.clone()) {
Some(pool) => pool,
None => panic!("Pool not found in store for given hash: {}", pool_hash_hex),
};
let token_type = substreams::key::segment_at(&store_delta.key, 1);
let token_index = if token_type == "quote" { 1 } else { 0 };
// store_delta.new_value is an ASCII string representing an integer
let ascii_string =
String::from_utf8(store_delta.new_value.clone()).expect("Invalid UTF-8 sequence");
let balance = BigInt::from_str(&ascii_string).expect("Failed to parse integer");
let big_endian_bytes_balance = balance.to_bytes_be().1;
let balance_change = BalanceChange {
component_id: pool_hash_hex.as_bytes().to_vec(),
token: pool.tokens[token_index].clone(),
balance: big_endian_bytes_balance.to_vec(),
};
let tx_hash = balance_delta
.tx
.expect("Transaction is missing")
.hash;
balance_changes
.entry(tx_hash)
.or_insert_with(Vec::new)
.push(balance_change);
});
for (tx_hash, grouped_balance_changes) in balance_changes {
if let Some(tx_change) = block_changes
.changes
.iter_mut()
// TODO: be better than this (quadratic complexity)
.find(|tx_change| {
tx_change
.tx
.as_ref()
.map_or(false, |tx| tx.hash == tx_hash)
})
{
tx_change
.balance_changes
.extend(grouped_balance_changes);
}
}
block_changes.block = Some(Block {
number: block.number,
hash: block.hash.clone(),
parent_hash: block
.header
.as_ref()
.expect("Block header not present")
.parent_hash
.clone(),
ts: block.timestamp_seconds(),
});
Ok(block_changes)
}

View File

@@ -0,0 +1,16 @@
pub use map_changes::map_changes;
pub use map_pool_changes::map_pool_changes;
pub use store_pool_balances::store_pool_balances;
pub use store_pools::store_pools;
#[path = "1_map_pool_changes.rs"]
mod map_pool_changes;
#[path = "2_store_pools.rs"]
mod store_pools;
#[path = "2_store_pool_balances.rs"]
mod store_pool_balances;
#[path = "3_map_changes.rs"]
mod map_changes;