Implement helpers for relative balances.
Adds some helpers that will users help to convert relative balances changes to absolute ones. The goal is to make this as reduce the effort of the user to implementing a simple map handler that extracts relative balances changes.
This commit is contained in:
17
crates/tycho-substreams/Cargo.lock
generated
17
crates/tycho-substreams/Cargo.lock
generated
@@ -333,6 +333,15 @@ dependencies = [
|
|||||||
"either",
|
"either",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "itertools"
|
||||||
|
version = "0.12.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569"
|
||||||
|
dependencies = [
|
||||||
|
"either",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "itoa"
|
name = "itoa"
|
||||||
version = "1.0.10"
|
version = "1.0.10"
|
||||||
@@ -530,7 +539,7 @@ checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"bytes",
|
"bytes",
|
||||||
"heck",
|
"heck",
|
||||||
"itertools",
|
"itertools 0.10.5",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"log",
|
"log",
|
||||||
"multimap",
|
"multimap",
|
||||||
@@ -551,7 +560,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||||||
checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4"
|
checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"itertools",
|
"itertools 0.10.5",
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
"syn 1.0.109",
|
"syn 1.0.109",
|
||||||
@@ -908,6 +917,10 @@ dependencies = [
|
|||||||
name = "tycho-substreams"
|
name = "tycho-substreams"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"hex",
|
||||||
|
"itertools 0.12.1",
|
||||||
|
"prost",
|
||||||
|
"substreams",
|
||||||
"substreams-ethereum",
|
"substreams-ethereum",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|||||||
@@ -4,4 +4,8 @@ version = "0.1.0"
|
|||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
substreams-ethereum = "0.9.9"
|
substreams-ethereum = "0.9.9"
|
||||||
|
substreams = "0.5"
|
||||||
|
prost = "0.11"
|
||||||
|
hex = "0.4.3"
|
||||||
|
itertools = "0.12.0"
|
||||||
13
crates/tycho-substreams/rustfmt.toml
Normal file
13
crates/tycho-substreams/rustfmt.toml
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
reorder_imports = true
|
||||||
|
imports_granularity = "Crate"
|
||||||
|
use_small_heuristics = "Max"
|
||||||
|
comment_width = 100
|
||||||
|
wrap_comments = true
|
||||||
|
binop_separator = "Back"
|
||||||
|
trailing_comma = "Vertical"
|
||||||
|
trailing_semicolon = false
|
||||||
|
use_field_init_shorthand = true
|
||||||
|
chain_width = 40
|
||||||
|
ignore = [
|
||||||
|
"src/pb",
|
||||||
|
]
|
||||||
306
crates/tycho-substreams/src/balances.rs
Normal file
306
crates/tycho-substreams/src/balances.rs
Normal file
@@ -0,0 +1,306 @@
|
|||||||
|
//! Utilities to handle relative balances.
|
||||||
|
//!
|
||||||
|
//!
|
||||||
|
//! To aggregate relative balances changes to absolute balances the general approach is:
|
||||||
|
//!
|
||||||
|
//! 1. Use a map function that will extract a `BlockBalanceDeltas` message. BalanceDeltas
|
||||||
|
//! within this message are required to have increasing ordinals so that
|
||||||
|
//! the order of relative balance changes is unambiguous.
|
||||||
|
//! 2. Store the balances changes with a store handler. You can use the
|
||||||
|
//! `store_balance_changes` library method directly for this.
|
||||||
|
//! 3. In the output module, use aggregate_balance_changes to receive an
|
||||||
|
//! aggregated map of absolute balances.
|
||||||
|
//!
|
||||||
|
use crate::pb::tycho::evm::v1::{BalanceChange, BlockBalanceDeltas};
|
||||||
|
use itertools::Itertools;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::str::FromStr;
|
||||||
|
use substreams::key;
|
||||||
|
use substreams::pb::substreams::StoreDeltas;
|
||||||
|
use substreams::prelude::{BigInt, StoreAdd};
|
||||||
|
|
||||||
|
pub fn store_balance_changes(deltas: BlockBalanceDeltas, store: impl StoreAdd<BigInt>) {
|
||||||
|
let mut previous_ordinal = HashMap::<String, u64>::new();
|
||||||
|
deltas
|
||||||
|
.balance_deltas
|
||||||
|
.iter()
|
||||||
|
.for_each(|delta| {
|
||||||
|
let balance_key = format!(
|
||||||
|
"{0}:{1}",
|
||||||
|
String::from_utf8(delta.component_id.clone())
|
||||||
|
.expect("delta.component_id is not valid utf-8!"),
|
||||||
|
hex::encode(&delta.token)
|
||||||
|
);
|
||||||
|
let current_ord = delta.ord;
|
||||||
|
previous_ordinal
|
||||||
|
.entry(balance_key.clone())
|
||||||
|
.and_modify(|ord| {
|
||||||
|
// ordinals must arrive in increasing order
|
||||||
|
if *ord >= current_ord {
|
||||||
|
panic!(
|
||||||
|
"Invalid ordinal sequence for {}: {} >= {}",
|
||||||
|
balance_key, *ord, current_ord
|
||||||
|
);
|
||||||
|
}
|
||||||
|
*ord = current_ord;
|
||||||
|
})
|
||||||
|
.or_insert(delta.ord);
|
||||||
|
store.add(delta.ord, balance_key, BigInt::from_signed_bytes_be(&delta.delta));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn aggregate_balances_changes(
|
||||||
|
balance_store: StoreDeltas,
|
||||||
|
deltas: BlockBalanceDeltas,
|
||||||
|
) -> HashMap<Vec<u8>, HashMap<Vec<u8>, BalanceChange>> {
|
||||||
|
balance_store
|
||||||
|
.deltas
|
||||||
|
.into_iter()
|
||||||
|
.zip(deltas.balance_deltas)
|
||||||
|
.map(|(store_delta, balance_delta)| {
|
||||||
|
let component_id = key::segment_at(&store_delta.key, 0);
|
||||||
|
let token_id = key::segment_at(&store_delta.key, 1);
|
||||||
|
// store_delta.new_value is an ASCII string representing an integer
|
||||||
|
let ascii_string =
|
||||||
|
String::from_utf8(store_delta.new_value.clone()).expect("Invalid UTF-8 sequence");
|
||||||
|
let balance = BigInt::from_str(&ascii_string).expect("Failed to parse integer");
|
||||||
|
let big_endian_bytes_balance = balance.to_bytes_be().1;
|
||||||
|
|
||||||
|
(
|
||||||
|
balance_delta.tx.unwrap(),
|
||||||
|
BalanceChange {
|
||||||
|
token: hex::decode(token_id).expect("Token ID not valid hex"),
|
||||||
|
balance: big_endian_bytes_balance,
|
||||||
|
component_id: component_id.as_bytes().to_vec(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
})
|
||||||
|
// We need to group the balance changes by tx hash for the `TransactionContractChanges` agg
|
||||||
|
.group_by(|(tx, _)| tx.hash.clone())
|
||||||
|
.into_iter()
|
||||||
|
.map(|(txh, group)| {
|
||||||
|
let balances = group
|
||||||
|
.into_iter()
|
||||||
|
.map(|(_, delta)| (delta.token.clone(), delta))
|
||||||
|
.collect();
|
||||||
|
(txh, balances)
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::mock_store::MockStore;
|
||||||
|
use crate::pb::tycho::evm::v1::{BalanceDelta, Transaction};
|
||||||
|
use substreams::pb::substreams::StoreDelta;
|
||||||
|
use substreams::prelude::{StoreGet, StoreNew};
|
||||||
|
|
||||||
|
fn block_balance_deltas() -> BlockBalanceDeltas {
|
||||||
|
let comp_id = "0x42c0ffee"
|
||||||
|
.to_string()
|
||||||
|
.as_bytes()
|
||||||
|
.to_vec();
|
||||||
|
let token_0 = hex::decode("bad999").unwrap();
|
||||||
|
let token_1 = hex::decode("babe00").unwrap();
|
||||||
|
BlockBalanceDeltas {
|
||||||
|
balance_deltas: vec![
|
||||||
|
BalanceDelta {
|
||||||
|
ord: 0,
|
||||||
|
tx: Some(Transaction {
|
||||||
|
hash: vec![0, 1],
|
||||||
|
from: vec![9, 9],
|
||||||
|
to: vec![8, 8],
|
||||||
|
index: 0,
|
||||||
|
}),
|
||||||
|
token: token_0.clone(),
|
||||||
|
delta: BigInt::from_str("+1000")
|
||||||
|
.unwrap()
|
||||||
|
.to_signed_bytes_be(),
|
||||||
|
component_id: comp_id.clone(),
|
||||||
|
},
|
||||||
|
BalanceDelta {
|
||||||
|
ord: 2,
|
||||||
|
tx: Some(Transaction {
|
||||||
|
hash: vec![0, 1],
|
||||||
|
from: vec![9, 9],
|
||||||
|
to: vec![8, 8],
|
||||||
|
index: 0,
|
||||||
|
}),
|
||||||
|
token: token_1.clone(),
|
||||||
|
delta: BigInt::from_str("+100")
|
||||||
|
.unwrap()
|
||||||
|
.to_signed_bytes_be(),
|
||||||
|
component_id: comp_id.clone(),
|
||||||
|
},
|
||||||
|
BalanceDelta {
|
||||||
|
ord: 3,
|
||||||
|
tx: Some(Transaction {
|
||||||
|
hash: vec![0, 1],
|
||||||
|
from: vec![9, 9],
|
||||||
|
to: vec![8, 8],
|
||||||
|
index: 0,
|
||||||
|
}),
|
||||||
|
token: token_1.clone(),
|
||||||
|
delta: BigInt::from_str("50")
|
||||||
|
.unwrap()
|
||||||
|
.to_signed_bytes_be(),
|
||||||
|
component_id: comp_id.clone(),
|
||||||
|
},
|
||||||
|
BalanceDelta {
|
||||||
|
ord: 10,
|
||||||
|
tx: Some(Transaction {
|
||||||
|
hash: vec![0, 1],
|
||||||
|
from: vec![9, 9],
|
||||||
|
to: vec![8, 8],
|
||||||
|
index: 0,
|
||||||
|
}),
|
||||||
|
token: token_0.clone(),
|
||||||
|
delta: BigInt::from_str("-1")
|
||||||
|
.unwrap()
|
||||||
|
.to_signed_bytes_be(),
|
||||||
|
component_id: comp_id.clone(),
|
||||||
|
},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fn store_deltas() -> StoreDeltas {
|
||||||
|
let comp_id = "0x42c0ffee"
|
||||||
|
.to_string()
|
||||||
|
.as_bytes()
|
||||||
|
.to_vec();
|
||||||
|
let token_0 = hex::decode("bad999").unwrap();
|
||||||
|
let token_1 = hex::decode("babe00").unwrap();
|
||||||
|
|
||||||
|
let t0_key =
|
||||||
|
format!("{}:{}", String::from_utf8(comp_id.clone()).unwrap(), hex::encode(&token_0));
|
||||||
|
let t1_key =
|
||||||
|
format!("{}:{}", String::from_utf8(comp_id.clone()).unwrap(), hex::encode(&token_1));
|
||||||
|
StoreDeltas {
|
||||||
|
deltas: vec![
|
||||||
|
StoreDelta {
|
||||||
|
operation: 0,
|
||||||
|
ordinal: 0,
|
||||||
|
key: t0_key.clone(),
|
||||||
|
old_value: BigInt::from(0)
|
||||||
|
.to_string()
|
||||||
|
.as_bytes()
|
||||||
|
.to_vec(),
|
||||||
|
new_value: BigInt::from(1000)
|
||||||
|
.to_string()
|
||||||
|
.as_bytes()
|
||||||
|
.to_vec(),
|
||||||
|
},
|
||||||
|
StoreDelta {
|
||||||
|
operation: 0,
|
||||||
|
ordinal: 2,
|
||||||
|
key: t1_key.clone(),
|
||||||
|
old_value: BigInt::from(0)
|
||||||
|
.to_string()
|
||||||
|
.as_bytes()
|
||||||
|
.to_vec(),
|
||||||
|
new_value: BigInt::from(100)
|
||||||
|
.to_string()
|
||||||
|
.as_bytes()
|
||||||
|
.to_vec(),
|
||||||
|
},
|
||||||
|
StoreDelta {
|
||||||
|
operation: 0,
|
||||||
|
ordinal: 3,
|
||||||
|
key: t1_key.clone(),
|
||||||
|
old_value: BigInt::from(100)
|
||||||
|
.to_string()
|
||||||
|
.as_bytes()
|
||||||
|
.to_vec(),
|
||||||
|
new_value: BigInt::from(150)
|
||||||
|
.to_string()
|
||||||
|
.as_bytes()
|
||||||
|
.to_vec(),
|
||||||
|
},
|
||||||
|
StoreDelta {
|
||||||
|
operation: 0,
|
||||||
|
ordinal: 10,
|
||||||
|
key: t0_key.clone(),
|
||||||
|
old_value: BigInt::from(1000)
|
||||||
|
.to_string()
|
||||||
|
.as_bytes()
|
||||||
|
.to_vec(),
|
||||||
|
new_value: BigInt::from(999)
|
||||||
|
.to_string()
|
||||||
|
.as_bytes()
|
||||||
|
.to_vec(),
|
||||||
|
},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_store_balances() {
|
||||||
|
let comp_id = "0x42c0ffee"
|
||||||
|
.to_string()
|
||||||
|
.as_bytes()
|
||||||
|
.to_vec();
|
||||||
|
let token_0 = hex::decode("bad999").unwrap();
|
||||||
|
let token_1 = hex::decode("babe00").unwrap();
|
||||||
|
let deltas = block_balance_deltas();
|
||||||
|
let store = <MockStore as StoreNew>::new();
|
||||||
|
|
||||||
|
store_balance_changes(deltas, store.clone());
|
||||||
|
let res_0 = store.get_last(format!(
|
||||||
|
"{}:{}",
|
||||||
|
String::from_utf8(comp_id.clone()).unwrap(),
|
||||||
|
hex::encode(&token_0)
|
||||||
|
));
|
||||||
|
let res_1 = store.get_last(format!(
|
||||||
|
"{}:{}",
|
||||||
|
String::from_utf8(comp_id.clone()).unwrap(),
|
||||||
|
hex::encode(&token_1)
|
||||||
|
));
|
||||||
|
|
||||||
|
assert_eq!(res_0, Some(BigInt::from_str("+999").unwrap()));
|
||||||
|
assert_eq!(res_1, Some(BigInt::from_str("+150").unwrap()));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_aggregate_balances_changes() {
|
||||||
|
let store_deltas = store_deltas();
|
||||||
|
let balance_deltas = block_balance_deltas();
|
||||||
|
let comp_id = "0x42c0ffee"
|
||||||
|
.to_string()
|
||||||
|
.as_bytes()
|
||||||
|
.to_vec();
|
||||||
|
let token_0 = hex::decode("bad999").unwrap();
|
||||||
|
let token_1 = hex::decode("babe00").unwrap();
|
||||||
|
let exp = [(
|
||||||
|
vec![0, 1],
|
||||||
|
[
|
||||||
|
(
|
||||||
|
token_0.clone(),
|
||||||
|
BalanceChange {
|
||||||
|
token: token_0,
|
||||||
|
balance: BigInt::from(999)
|
||||||
|
.to_signed_bytes_be()
|
||||||
|
.to_vec(),
|
||||||
|
component_id: comp_id.clone(),
|
||||||
|
},
|
||||||
|
),
|
||||||
|
(
|
||||||
|
token_1.clone(),
|
||||||
|
BalanceChange {
|
||||||
|
token: token_1,
|
||||||
|
balance: vec![150],
|
||||||
|
component_id: comp_id.clone(),
|
||||||
|
},
|
||||||
|
),
|
||||||
|
]
|
||||||
|
.into_iter()
|
||||||
|
.collect(),
|
||||||
|
)]
|
||||||
|
.into_iter()
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let res = aggregate_balances_changes(store_deltas, balance_deltas);
|
||||||
|
dbg!(&res);
|
||||||
|
|
||||||
|
assert_eq!(res, exp);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -22,7 +22,7 @@ impl SlotValue {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Uses a map for slots, protobuf does not allow bytes in hashmap keys
|
// Uses a map for slots, protobuf does not allow bytes in hashmap keys
|
||||||
pub struct InterimContractChange {
|
struct InterimContractChange {
|
||||||
address: Vec<u8>,
|
address: Vec<u8>,
|
||||||
balance: Vec<u8>,
|
balance: Vec<u8>,
|
||||||
code: Vec<u8>,
|
code: Vec<u8>,
|
||||||
@@ -40,17 +40,14 @@ impl From<InterimContractChange> for tycho::ContractChange {
|
|||||||
.slots
|
.slots
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter(|(_, value)| value.has_changed())
|
.filter(|(_, value)| value.has_changed())
|
||||||
.map(|(slot, value)| tycho::ContractSlot {
|
.map(|(slot, value)| tycho::ContractSlot { slot, value: value.new_value })
|
||||||
slot,
|
|
||||||
value: value.new_value,
|
|
||||||
})
|
|
||||||
.collect(),
|
.collect(),
|
||||||
change: value.change.into(),
|
change: value.change.into(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn extract_contract_changes<F: Fn(&[u8]) -> bool >(
|
pub fn extract_contract_changes<F: Fn(&[u8]) -> bool>(
|
||||||
block: ð::v2::Block,
|
block: ð::v2::Block,
|
||||||
inclusion_predicate: F,
|
inclusion_predicate: F,
|
||||||
transaction_contract_changes: &mut HashMap<u64, tycho::TransactionContractChanges>,
|
transaction_contract_changes: &mut HashMap<u64, tycho::TransactionContractChanges>,
|
||||||
@@ -69,131 +66,131 @@ pub fn extract_contract_changes<F: Fn(&[u8]) -> bool >(
|
|||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
block.transactions().for_each(|block_tx| {
|
block
|
||||||
let mut storage_changes = Vec::new();
|
.transactions()
|
||||||
let mut balance_changes = Vec::new();
|
.for_each(|block_tx| {
|
||||||
let mut code_changes = Vec::new();
|
let mut storage_changes = Vec::new();
|
||||||
|
let mut balance_changes = Vec::new();
|
||||||
|
let mut code_changes = Vec::new();
|
||||||
|
|
||||||
block_tx
|
block_tx
|
||||||
.calls
|
.calls
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|call| {
|
.filter(|call| !call.state_reverted && inclusion_predicate(&call.address))
|
||||||
!call.state_reverted
|
.for_each(|call| {
|
||||||
&& inclusion_predicate(&call.address)
|
storage_changes.extend(call.storage_changes.iter());
|
||||||
})
|
balance_changes.extend(call.balance_changes.iter());
|
||||||
.for_each(|call| {
|
code_changes.extend(call.code_changes.iter());
|
||||||
storage_changes.extend(call.storage_changes.iter());
|
});
|
||||||
balance_changes.extend(call.balance_changes.iter());
|
|
||||||
code_changes.extend(call.code_changes.iter());
|
|
||||||
});
|
|
||||||
|
|
||||||
storage_changes.sort_unstable_by_key(|change| change.ordinal);
|
storage_changes.sort_unstable_by_key(|change| change.ordinal);
|
||||||
balance_changes.sort_unstable_by_key(|change| change.ordinal);
|
balance_changes.sort_unstable_by_key(|change| change.ordinal);
|
||||||
code_changes.sort_unstable_by_key(|change| change.ordinal);
|
code_changes.sort_unstable_by_key(|change| change.ordinal);
|
||||||
|
|
||||||
storage_changes
|
storage_changes
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|changes| {
|
.filter(|changes| inclusion_predicate(&changes.address))
|
||||||
inclusion_predicate(&changes.address)
|
.for_each(|storage_change| {
|
||||||
})
|
let contract_change = changed_contracts
|
||||||
.for_each(|storage_change| {
|
.entry(storage_change.address.clone())
|
||||||
let contract_change = changed_contracts
|
.or_insert_with(|| InterimContractChange {
|
||||||
.entry(storage_change.address.clone())
|
address: storage_change.address.clone(),
|
||||||
.or_insert_with(|| InterimContractChange {
|
balance: Vec::new(),
|
||||||
address: storage_change.address.clone(),
|
code: Vec::new(),
|
||||||
balance: Vec::new(),
|
slots: HashMap::new(),
|
||||||
code: Vec::new(),
|
change: if created_accounts.contains_key(&storage_change.address) {
|
||||||
slots: HashMap::new(),
|
tycho::ChangeType::Creation
|
||||||
change: if created_accounts.contains_key(&storage_change.address) {
|
} else {
|
||||||
tycho::ChangeType::Creation
|
tycho::ChangeType::Update
|
||||||
} else {
|
},
|
||||||
tycho::ChangeType::Update
|
});
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
let slot_value = contract_change
|
let slot_value = contract_change
|
||||||
.slots
|
.slots
|
||||||
.entry(storage_change.key.clone())
|
.entry(storage_change.key.clone())
|
||||||
.or_insert_with(|| SlotValue {
|
.or_insert_with(|| SlotValue {
|
||||||
new_value: storage_change.new_value.clone(),
|
new_value: storage_change.new_value.clone(),
|
||||||
start_value: storage_change.old_value.clone(),
|
start_value: storage_change.old_value.clone(),
|
||||||
});
|
});
|
||||||
|
|
||||||
slot_value
|
slot_value
|
||||||
.new_value
|
.new_value
|
||||||
.copy_from_slice(&storage_change.new_value);
|
.copy_from_slice(&storage_change.new_value);
|
||||||
});
|
});
|
||||||
|
|
||||||
balance_changes
|
balance_changes
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|changes| {
|
.filter(|changes| inclusion_predicate(&changes.address))
|
||||||
inclusion_predicate(&changes.address)
|
.for_each(|balance_change| {
|
||||||
})
|
let contract_change = changed_contracts
|
||||||
.for_each(|balance_change| {
|
.entry(balance_change.address.clone())
|
||||||
let contract_change = changed_contracts
|
.or_insert_with(|| InterimContractChange {
|
||||||
.entry(balance_change.address.clone())
|
address: balance_change.address.clone(),
|
||||||
.or_insert_with(|| InterimContractChange {
|
balance: Vec::new(),
|
||||||
address: balance_change.address.clone(),
|
code: Vec::new(),
|
||||||
balance: Vec::new(),
|
slots: HashMap::new(),
|
||||||
code: Vec::new(),
|
change: if created_accounts.contains_key(&balance_change.address) {
|
||||||
slots: HashMap::new(),
|
tycho::ChangeType::Creation
|
||||||
change: if created_accounts.contains_key(&balance_change.address) {
|
} else {
|
||||||
tycho::ChangeType::Creation
|
tycho::ChangeType::Update
|
||||||
} else {
|
},
|
||||||
tycho::ChangeType::Update
|
});
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
if let Some(new_balance) = &balance_change.new_value {
|
if let Some(new_balance) = &balance_change.new_value {
|
||||||
contract_change.balance.clear();
|
contract_change.balance.clear();
|
||||||
|
contract_change
|
||||||
|
.balance
|
||||||
|
.extend_from_slice(&new_balance.bytes);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
code_changes
|
||||||
|
.iter()
|
||||||
|
.filter(|changes| inclusion_predicate(&changes.address))
|
||||||
|
.for_each(|code_change| {
|
||||||
|
let contract_change = changed_contracts
|
||||||
|
.entry(code_change.address.clone())
|
||||||
|
.or_insert_with(|| InterimContractChange {
|
||||||
|
address: code_change.address.clone(),
|
||||||
|
balance: Vec::new(),
|
||||||
|
code: Vec::new(),
|
||||||
|
slots: HashMap::new(),
|
||||||
|
change: if created_accounts.contains_key(&code_change.address) {
|
||||||
|
tycho::ChangeType::Creation
|
||||||
|
} else {
|
||||||
|
tycho::ChangeType::Update
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
contract_change.code.clear();
|
||||||
contract_change
|
contract_change
|
||||||
.balance
|
.code
|
||||||
.extend_from_slice(&new_balance.bytes);
|
.extend_from_slice(&code_change.new_code);
|
||||||
}
|
});
|
||||||
});
|
|
||||||
|
|
||||||
code_changes
|
if !storage_changes.is_empty()
|
||||||
.iter()
|
|| !balance_changes.is_empty()
|
||||||
.filter(|changes| {
|
|| !code_changes.is_empty()
|
||||||
inclusion_predicate(&changes.address)
|
{
|
||||||
})
|
transaction_contract_changes
|
||||||
.for_each(|code_change| {
|
.entry(block_tx.index.into())
|
||||||
let contract_change = changed_contracts
|
.or_insert_with(|| tycho::TransactionContractChanges {
|
||||||
.entry(code_change.address.clone())
|
tx: Some(tycho::Transaction {
|
||||||
.or_insert_with(|| InterimContractChange {
|
hash: block_tx.hash.clone(),
|
||||||
address: code_change.address.clone(),
|
from: block_tx.from.clone(),
|
||||||
balance: Vec::new(),
|
to: block_tx.to.clone(),
|
||||||
code: Vec::new(),
|
index: block_tx.index as u64,
|
||||||
slots: HashMap::new(),
|
}),
|
||||||
change: if created_accounts.contains_key(&code_change.address) {
|
contract_changes: vec![],
|
||||||
tycho::ChangeType::Creation
|
component_changes: vec![],
|
||||||
} else {
|
balance_changes: vec![],
|
||||||
tycho::ChangeType::Update
|
})
|
||||||
},
|
.contract_changes
|
||||||
});
|
.extend(
|
||||||
|
changed_contracts
|
||||||
contract_change.code.clear();
|
.drain()
|
||||||
contract_change
|
.map(|(_, change)| change.into()),
|
||||||
.code
|
);
|
||||||
.extend_from_slice(&code_change.new_code);
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
if !storage_changes.is_empty() || !balance_changes.is_empty() || !code_changes.is_empty() {
|
|
||||||
transaction_contract_changes
|
|
||||||
.entry(block_tx.index.into())
|
|
||||||
.or_insert_with(|| tycho::TransactionContractChanges {
|
|
||||||
tx: Some(tycho::Transaction {
|
|
||||||
hash: block_tx.hash.clone(),
|
|
||||||
from: block_tx.from.clone(),
|
|
||||||
to: block_tx.to.clone(),
|
|
||||||
index: block_tx.index as u64,
|
|
||||||
}),
|
|
||||||
contract_changes: vec![],
|
|
||||||
component_changes: vec![],
|
|
||||||
balance_changes: vec![],
|
|
||||||
})
|
|
||||||
.contract_changes
|
|
||||||
.extend(changed_contracts.drain().map(|(_, change)| change.into()));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,2 +1,6 @@
|
|||||||
|
pub mod balances;
|
||||||
pub mod contract;
|
pub mod contract;
|
||||||
pub mod pb;
|
mod mock_store;
|
||||||
|
pub mod pb;
|
||||||
|
// TODO: consider removing this module, after integrating with balancer
|
||||||
|
pub mod utils;
|
||||||
|
|||||||
93
crates/tycho-substreams/src/mock_store.rs
Normal file
93
crates/tycho-substreams/src/mock_store.rs
Normal file
@@ -0,0 +1,93 @@
|
|||||||
|
//! Contains a mock store for internal testing.
|
||||||
|
//!
|
||||||
|
//! Might make this public alter to users can test their store handlers.
|
||||||
|
use std::cell::RefCell;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::rc::Rc;
|
||||||
|
use substreams::prelude::{BigInt, StoreDelete, StoreGet, StoreNew};
|
||||||
|
use substreams::store::StoreAdd;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct MockStore {
|
||||||
|
data: Rc<RefCell<HashMap<String, Vec<(u64, BigInt)>>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StoreDelete for MockStore {
|
||||||
|
fn delete_prefix(&self, _ord: i64, prefix: &String) {
|
||||||
|
self.data
|
||||||
|
.borrow_mut()
|
||||||
|
.retain(|k, _| !k.starts_with(prefix));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StoreNew for MockStore {
|
||||||
|
fn new() -> Self {
|
||||||
|
Self { data: Rc::new(RefCell::new(HashMap::new())) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StoreAdd<BigInt> for MockStore {
|
||||||
|
fn add<K: AsRef<str>>(&self, ord: u64, key: K, value: BigInt) {
|
||||||
|
let mut guard = self.data.borrow_mut();
|
||||||
|
guard
|
||||||
|
.entry(key.as_ref().to_string())
|
||||||
|
.and_modify(|v| {
|
||||||
|
let prev_value = v.last().unwrap().1.clone();
|
||||||
|
v.push((ord, prev_value + value.clone()));
|
||||||
|
})
|
||||||
|
.or_insert(vec![(ord, value)]);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_many<K: AsRef<str>>(&self, _ord: u64, _keys: &Vec<K>, _value: BigInt) {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StoreGet<BigInt> for MockStore {
|
||||||
|
fn new(_idx: u32) -> Self {
|
||||||
|
Self { data: Rc::new(RefCell::new(HashMap::new())) }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_at<K: AsRef<str>>(&self, ord: u64, key: K) -> Option<BigInt> {
|
||||||
|
self.data
|
||||||
|
.borrow()
|
||||||
|
.get(&key.as_ref().to_string())
|
||||||
|
.map(|v| {
|
||||||
|
v.iter()
|
||||||
|
.find(|(current_ord, _)| *current_ord == ord)
|
||||||
|
.unwrap()
|
||||||
|
.1
|
||||||
|
.clone()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_last<K: AsRef<str>>(&self, key: K) -> Option<BigInt> {
|
||||||
|
self.data
|
||||||
|
.borrow()
|
||||||
|
.get(&key.as_ref().to_string())
|
||||||
|
.map(|v| v.last().unwrap().1.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_first<K: AsRef<str>>(&self, key: K) -> Option<BigInt> {
|
||||||
|
self.data
|
||||||
|
.borrow()
|
||||||
|
.get(&key.as_ref().to_string())
|
||||||
|
.map(|v| v.first().unwrap().1.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn has_at<K: AsRef<str>>(&self, ord: u64, key: K) -> bool {
|
||||||
|
self.data
|
||||||
|
.borrow()
|
||||||
|
.get(&key.as_ref().to_string())
|
||||||
|
.map(|v| v.iter().any(|(v, _)| *v == ord))
|
||||||
|
.unwrap_or(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn has_last<K: AsRef<str>>(&self, _key: K) -> bool {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn has_first<K: AsRef<str>>(&self, _key: K) -> bool {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -107,7 +107,7 @@ pub struct BalanceChange {
|
|||||||
/// The address of the ERC20 token whose balance changed.
|
/// The address of the ERC20 token whose balance changed.
|
||||||
#[prost(bytes="vec", tag="1")]
|
#[prost(bytes="vec", tag="1")]
|
||||||
pub token: ::prost::alloc::vec::Vec<u8>,
|
pub token: ::prost::alloc::vec::Vec<u8>,
|
||||||
/// The new balance of the token.
|
/// The new balance of the token. Note: it must be a big endian encoded int.
|
||||||
#[prost(bytes="vec", tag="2")]
|
#[prost(bytes="vec", tag="2")]
|
||||||
pub balance: ::prost::alloc::vec::Vec<u8>,
|
pub balance: ::prost::alloc::vec::Vec<u8>,
|
||||||
/// The id of the component whose TVL is tracked. Note: This MUST be utf8 encoded.
|
/// The id of the component whose TVL is tracked. Note: This MUST be utf8 encoded.
|
||||||
@@ -244,6 +244,55 @@ pub struct BlockEntityChanges {
|
|||||||
#[prost(message, repeated, tag="2")]
|
#[prost(message, repeated, tag="2")]
|
||||||
pub changes: ::prost::alloc::vec::Vec<TransactionEntityChanges>,
|
pub changes: ::prost::alloc::vec::Vec<TransactionEntityChanges>,
|
||||||
}
|
}
|
||||||
|
/// A message containing relative balance changes.
|
||||||
|
///
|
||||||
|
/// Used to track token balances of protocol components in case they are only
|
||||||
|
/// available as relative values within a block.
|
||||||
|
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||||
|
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||||
|
pub struct BalanceDelta {
|
||||||
|
/// The ordinal of the balance change. Must be unique & deterministic over all balances
|
||||||
|
/// changes within a block.
|
||||||
|
#[prost(uint64, tag="1")]
|
||||||
|
pub ord: u64,
|
||||||
|
/// The tx hash of the transaction that caused the balance change.
|
||||||
|
#[prost(message, optional, tag="2")]
|
||||||
|
pub tx: ::core::option::Option<Transaction>,
|
||||||
|
/// The address of the ERC20 token whose balance changed.
|
||||||
|
#[prost(bytes="vec", tag="3")]
|
||||||
|
pub token: ::prost::alloc::vec::Vec<u8>,
|
||||||
|
/// The delta balance of the token.
|
||||||
|
#[prost(bytes="vec", tag="4")]
|
||||||
|
pub delta: ::prost::alloc::vec::Vec<u8>,
|
||||||
|
/// The id of the component whose TVL is tracked.
|
||||||
|
/// If the protocol component includes multiple contracts, the balance change must be
|
||||||
|
/// aggregated to reflect how much tokens can be traded.
|
||||||
|
#[prost(bytes="vec", tag="5")]
|
||||||
|
pub component_id: ::prost::alloc::vec::Vec<u8>,
|
||||||
|
}
|
||||||
|
/// A set of balances deltas, usually a group of changes within a single block.
|
||||||
|
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||||
|
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||||
|
pub struct BlockBalanceDeltas {
|
||||||
|
#[prost(message, repeated, tag="1")]
|
||||||
|
pub balance_deltas: ::prost::alloc::vec::Vec<BalanceDelta>,
|
||||||
|
}
|
||||||
|
/// A message containing protocol components that were created by a single tx.
|
||||||
|
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||||
|
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||||
|
pub struct TransactionProtocolComponents {
|
||||||
|
#[prost(message, optional, tag="1")]
|
||||||
|
pub tx: ::core::option::Option<Transaction>,
|
||||||
|
#[prost(message, repeated, tag="2")]
|
||||||
|
pub components: ::prost::alloc::vec::Vec<ProtocolComponent>,
|
||||||
|
}
|
||||||
|
/// All protocol components that were created within a block with their corresponding tx.
|
||||||
|
#[allow(clippy::derive_partial_eq_without_eq)]
|
||||||
|
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||||
|
pub struct BlockTransactionProtocolComponents {
|
||||||
|
#[prost(message, repeated, tag="1")]
|
||||||
|
pub tx_components: ::prost::alloc::vec::Vec<TransactionProtocolComponents>,
|
||||||
|
}
|
||||||
// This file contains proto definitions specific to the VM integration.
|
// This file contains proto definitions specific to the VM integration.
|
||||||
|
|
||||||
/// A key value entry into contract storage.
|
/// A key value entry into contract storage.
|
||||||
|
|||||||
24
crates/tycho-substreams/src/utils.rs
Normal file
24
crates/tycho-substreams/src/utils.rs
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
use crate::pb::tycho::evm::v1::Transaction;
|
||||||
|
|
||||||
|
/// This struct purely exists to spoof the `PartialEq` trait for `Transaction` so we can use it in
|
||||||
|
/// a later groupby operation.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct TransactionWrapper(Transaction);
|
||||||
|
|
||||||
|
impl TransactionWrapper {
|
||||||
|
pub fn new(tx: Transaction) -> Self {
|
||||||
|
Self(tx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PartialEq for TransactionWrapper {
|
||||||
|
fn eq(&self, other: &Self) -> bool {
|
||||||
|
self.0.hash == other.0.hash
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<TransactionWrapper> for Transaction {
|
||||||
|
fn from(value: TransactionWrapper) -> Self {
|
||||||
|
value.0
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user