feat: Update protobuf messages for Balancer, add pool_id and balance_owner as entity changes
This commit is contained in:
committed by
Thales Lima
parent
7db72c284c
commit
c48532a5c4
@@ -83,9 +83,8 @@ impl From<InterimContractChange> for tycho::ContractChange {
|
|||||||
/// model.
|
/// model.
|
||||||
/// * `inclusion_predicate` - A closure that determines if a contract's address is of interest for
|
/// * `inclusion_predicate` - A closure that determines if a contract's address is of interest for
|
||||||
/// the collection of changes. Only contracts satisfying this predicate are included.
|
/// the collection of changes. Only contracts satisfying this predicate are included.
|
||||||
/// * `transaction_contract_changes` - A mutable reference to a map where extracted contract changes
|
/// * `transaction_changes` - A mutable reference to a map where extracted contract changes are
|
||||||
/// are stored. Keyed by transaction index, it aggregates changes into
|
/// stored. Keyed by transaction index, it aggregates changes into `tycho::TransactionChanges`.
|
||||||
/// `tycho::TransactionContractChanges`.
|
|
||||||
///
|
///
|
||||||
/// ## Panics
|
/// ## Panics
|
||||||
/// Panics if the provided block is not an extended block model, as indicated by its detail level.
|
/// Panics if the provided block is not an extended block model, as indicated by its detail level.
|
||||||
@@ -94,7 +93,7 @@ impl From<InterimContractChange> for tycho::ContractChange {
|
|||||||
/// The function iterates over transactions and their calls within the block, collecting contract
|
/// The function iterates over transactions and their calls within the block, collecting contract
|
||||||
/// changes (storage, balance, code) that pass the inclusion predicate. Changes are then sorted by
|
/// changes (storage, balance, code) that pass the inclusion predicate. Changes are then sorted by
|
||||||
/// their ordinals to maintain the correct sequence of events. Aggregated changes for each contract
|
/// their ordinals to maintain the correct sequence of events. Aggregated changes for each contract
|
||||||
/// are stored in `transaction_contract_changes`, categorized by transaction index.
|
/// are stored in `transaction_changes`, categorized by transaction index.
|
||||||
///
|
///
|
||||||
/// Contracts created within the block are tracked to differentiate between new and existing
|
/// Contracts created within the block are tracked to differentiate between new and existing
|
||||||
/// contracts. The aggregation process respects transaction boundaries, ensuring that changes are
|
/// contracts. The aggregation process respects transaction boundaries, ensuring that changes are
|
||||||
@@ -102,7 +101,7 @@ impl From<InterimContractChange> for tycho::ContractChange {
|
|||||||
pub fn extract_contract_changes<F: Fn(&[u8]) -> bool>(
|
pub fn extract_contract_changes<F: Fn(&[u8]) -> bool>(
|
||||||
block: ð::v2::Block,
|
block: ð::v2::Block,
|
||||||
inclusion_predicate: F,
|
inclusion_predicate: F,
|
||||||
transaction_contract_changes: &mut HashMap<u64, tycho::TransactionContractChanges>,
|
transaction_changes: &mut HashMap<u64, tycho::TransactionChanges>,
|
||||||
) {
|
) {
|
||||||
if block.detail_level != Into::<i32>::into(DetailLevel::DetaillevelExtended) {
|
if block.detail_level != Into::<i32>::into(DetailLevel::DetaillevelExtended) {
|
||||||
panic!("Only extended blocks are supported");
|
panic!("Only extended blocks are supported");
|
||||||
@@ -209,9 +208,9 @@ pub fn extract_contract_changes<F: Fn(&[u8]) -> bool>(
|
|||||||
!balance_changes.is_empty() ||
|
!balance_changes.is_empty() ||
|
||||||
!code_changes.is_empty()
|
!code_changes.is_empty()
|
||||||
{
|
{
|
||||||
transaction_contract_changes
|
transaction_changes
|
||||||
.entry(block_tx.index.into())
|
.entry(block_tx.index.into())
|
||||||
.or_insert_with(|| tycho::TransactionContractChanges::new(&(block_tx.into())))
|
.or_insert_with(|| tycho::TransactionChanges::new(&(block_tx.into())))
|
||||||
.contract_changes
|
.contract_changes
|
||||||
.extend(
|
.extend(
|
||||||
changed_contracts
|
changed_contracts
|
||||||
|
|||||||
@@ -6,12 +6,14 @@ pub use crate::pb::tycho::evm::v1::*;
|
|||||||
impl TransactionContractChanges {
|
impl TransactionContractChanges {
|
||||||
/// Creates a new empty `TransactionContractChanges` instance.
|
/// Creates a new empty `TransactionContractChanges` instance.
|
||||||
pub fn new(tx: &Transaction) -> Self {
|
pub fn new(tx: &Transaction) -> Self {
|
||||||
Self {
|
Self { tx: Some(tx.clone()), ..Default::default() }
|
||||||
tx: Some(tx.clone()),
|
}
|
||||||
contract_changes: vec![],
|
}
|
||||||
component_changes: vec![],
|
|
||||||
balance_changes: vec![],
|
impl TransactionChanges {
|
||||||
}
|
/// Creates a new empty `TransactionChanges` instance.
|
||||||
|
pub fn new(tx: &Transaction) -> Self {
|
||||||
|
Self { tx: Some(tx.clone()), ..Default::default() }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -24,7 +24,6 @@ pub fn map_components(block: eth::v2::Block) -> Result<BlockTransactionProtocolC
|
|||||||
.filter_map(|tx| {
|
.filter_map(|tx| {
|
||||||
let components = tx
|
let components = tx
|
||||||
.logs_with_calls()
|
.logs_with_calls()
|
||||||
.filter(|(_, call)| !call.call.state_reverted)
|
|
||||||
.filter_map(|(log, call)| {
|
.filter_map(|(log, call)| {
|
||||||
pool_factories::address_map(
|
pool_factories::address_map(
|
||||||
call.call.address.as_slice(),
|
call.call.address.as_slice(),
|
||||||
@@ -132,11 +131,11 @@ pub fn store_balances(deltas: BlockBalanceDeltas, store: StoreAddBigInt) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// This is the main map that handles most of the indexing of this substream.
|
/// This is the main map that handles most of the indexing of this substream.
|
||||||
/// Every contract change is grouped by transaction index via the `transaction_contract_changes`
|
/// Every contract change is grouped by transaction index via the `transaction_changes`
|
||||||
/// map. Each block of code will extend the `TransactionContractChanges` struct with the
|
/// map. Each block of code will extend the `TransactionChanges` struct with the
|
||||||
/// cooresponding changes (balance, component, contract), inserting a new one if it doesn't exist.
|
/// cooresponding changes (balance, component, contract), inserting a new one if it doesn't exist.
|
||||||
/// At the very end, the map can easily be sorted by index to ensure the final
|
/// At the very end, the map can easily be sorted by index to ensure the final
|
||||||
/// `BlockContractChanges` is ordered by transactions properly.
|
/// `BlockChanges` is ordered by transactions properly.
|
||||||
#[substreams::handlers::map]
|
#[substreams::handlers::map]
|
||||||
pub fn map_protocol_changes(
|
pub fn map_protocol_changes(
|
||||||
block: eth::v2::Block,
|
block: eth::v2::Block,
|
||||||
@@ -144,25 +143,68 @@ pub fn map_protocol_changes(
|
|||||||
deltas: BlockBalanceDeltas,
|
deltas: BlockBalanceDeltas,
|
||||||
components_store: StoreGetInt64,
|
components_store: StoreGetInt64,
|
||||||
balance_store: StoreDeltas, // Note, this map module is using the `deltas` mode for the store.
|
balance_store: StoreDeltas, // Note, this map module is using the `deltas` mode for the store.
|
||||||
) -> Result<BlockContractChanges> {
|
) -> Result<BlockChanges> {
|
||||||
// We merge contract changes by transaction (identified by transaction index) making it easy to
|
// We merge contract changes by transaction (identified by transaction index) making it easy to
|
||||||
// sort them at the very end.
|
// sort them at the very end.
|
||||||
let mut transaction_contract_changes: HashMap<_, TransactionContractChanges> = HashMap::new();
|
let mut transaction_changes: HashMap<_, TransactionChanges> = HashMap::new();
|
||||||
|
|
||||||
// `ProtocolComponents` are gathered from `map_pools_created` which just need a bit of work to
|
// `ProtocolComponents` are gathered from `map_pools_created` which just need a bit of work to
|
||||||
// convert into `TransactionContractChanges`
|
// convert into `TransactionChanges`
|
||||||
grouped_components
|
grouped_components
|
||||||
.tx_components
|
.tx_components
|
||||||
.iter()
|
.iter()
|
||||||
.for_each(|tx_component| {
|
.for_each(|tx_component| {
|
||||||
let tx = tx_component.tx.as_ref().unwrap();
|
let tx = tx_component.tx.as_ref().unwrap();
|
||||||
transaction_contract_changes
|
transaction_changes
|
||||||
.entry(tx.index)
|
.entry(tx.index)
|
||||||
.or_insert_with(|| TransactionContractChanges::new(tx))
|
.or_insert_with(|| TransactionChanges::new(tx))
|
||||||
.component_changes
|
.component_changes
|
||||||
.extend_from_slice(&tx_component.components);
|
.extend_from_slice(&tx_component.components);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
block
|
||||||
|
.transactions()
|
||||||
|
.flat_map(|tx| {
|
||||||
|
let components = tx
|
||||||
|
.logs_with_calls()
|
||||||
|
.filter(|(log, _)| log.address == VAULT_ADDRESS)
|
||||||
|
.filter_map(|(log, _)| {
|
||||||
|
let registered = abi::vault::events::PoolRegistered::match_and_decode(log)?;
|
||||||
|
substreams::log::info!("{:?}", log);
|
||||||
|
Some((
|
||||||
|
tx.clone(),
|
||||||
|
EntityChanges {
|
||||||
|
component_id: hex::encode(registered.pool_address),
|
||||||
|
attributes: vec![
|
||||||
|
Attribute {
|
||||||
|
name: "pool_id".to_string(),
|
||||||
|
value: format!("0x{}", hex::encode(registered.pool_id))
|
||||||
|
.as_bytes()
|
||||||
|
.to_vec(),
|
||||||
|
change: ChangeType::Update.into(),
|
||||||
|
},
|
||||||
|
Attribute {
|
||||||
|
name: "balance_owner".to_string(),
|
||||||
|
value: "0xBA12222222228d8Ba445958a75a0704d566BF2C8"
|
||||||
|
.to_string()
|
||||||
|
.as_bytes()
|
||||||
|
.to_vec(),
|
||||||
|
change: ChangeType::Creation.into(),
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
))
|
||||||
|
});
|
||||||
|
components
|
||||||
|
})
|
||||||
|
.for_each(|(tx, state_change)| {
|
||||||
|
transaction_changes
|
||||||
|
.entry(tx.index.into())
|
||||||
|
.or_insert_with(|| TransactionChanges::new(&(&tx).into()))
|
||||||
|
.entity_changes
|
||||||
|
.push(state_change);
|
||||||
|
});
|
||||||
|
|
||||||
// Balance changes are gathered by the `StoreDelta` based on `PoolBalanceChanged` creating
|
// Balance changes are gathered by the `StoreDelta` based on `PoolBalanceChanged` creating
|
||||||
// `BlockBalanceDeltas`. We essentially just process the changes that occurred to the `store`
|
// `BlockBalanceDeltas`. We essentially just process the changes that occurred to the `store`
|
||||||
// this block. Then, these balance changes are merged onto the existing map of tx contract
|
// this block. Then, these balance changes are merged onto the existing map of tx contract
|
||||||
@@ -170,9 +212,9 @@ pub fn map_protocol_changes(
|
|||||||
aggregate_balances_changes(balance_store, deltas)
|
aggregate_balances_changes(balance_store, deltas)
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.for_each(|(_, (tx, balances))| {
|
.for_each(|(_, (tx, balances))| {
|
||||||
transaction_contract_changes
|
transaction_changes
|
||||||
.entry(tx.index)
|
.entry(tx.index)
|
||||||
.or_insert_with(|| TransactionContractChanges::new(&tx))
|
.or_insert_with(|| TransactionChanges::new(&tx))
|
||||||
.balance_changes
|
.balance_changes
|
||||||
.extend(balances.into_values());
|
.extend(balances.into_values());
|
||||||
});
|
});
|
||||||
@@ -185,20 +227,21 @@ pub fn map_protocol_changes(
|
|||||||
.get_last(format!("pool:0x{0}", hex::encode(addr)))
|
.get_last(format!("pool:0x{0}", hex::encode(addr)))
|
||||||
.is_some()
|
.is_some()
|
||||||
},
|
},
|
||||||
&mut transaction_contract_changes,
|
&mut transaction_changes,
|
||||||
);
|
);
|
||||||
|
|
||||||
// Process all `transaction_contract_changes` for final output in the `BlockContractChanges`,
|
// Process all `transaction_changes` for final output in the `BlockChanges`,
|
||||||
// sorted by transaction index (the key).
|
// sorted by transaction index (the key).
|
||||||
Ok(BlockContractChanges {
|
Ok(BlockChanges {
|
||||||
block: Some((&block).into()),
|
block: Some((&block).into()),
|
||||||
changes: transaction_contract_changes
|
changes: transaction_changes
|
||||||
.drain()
|
.drain()
|
||||||
.sorted_unstable_by_key(|(index, _)| *index)
|
.sorted_unstable_by_key(|(index, _)| *index)
|
||||||
.filter_map(|(_, change)| {
|
.filter_map(|(_, change)| {
|
||||||
if change.contract_changes.is_empty() &&
|
if change.contract_changes.is_empty() &&
|
||||||
change.component_changes.is_empty() &&
|
change.component_changes.is_empty() &&
|
||||||
change.balance_changes.is_empty()
|
change.balance_changes.is_empty() &&
|
||||||
|
change.entity_changes.is_empty()
|
||||||
{
|
{
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ modules:
|
|||||||
inputs:
|
inputs:
|
||||||
- source: sf.ethereum.type.v2.Block
|
- source: sf.ethereum.type.v2.Block
|
||||||
output:
|
output:
|
||||||
type: proto:tycho.evm.v1.GroupedTransactionProtocolComponents
|
type: proto:tycho.evm.v1.BlockTransactionProtocolComponents
|
||||||
|
|
||||||
- name: store_components
|
- name: store_components
|
||||||
kind: store
|
kind: store
|
||||||
@@ -40,7 +40,7 @@ modules:
|
|||||||
- source: sf.ethereum.type.v2.Block
|
- source: sf.ethereum.type.v2.Block
|
||||||
- store: store_components
|
- store: store_components
|
||||||
output:
|
output:
|
||||||
type: proto:tycho.evm.v1.BalanceDeltas
|
type: proto:tycho.evm.v1.BlockBalanceDeltas
|
||||||
|
|
||||||
- name: store_balances
|
- name: store_balances
|
||||||
kind: store
|
kind: store
|
||||||
@@ -61,4 +61,4 @@ modules:
|
|||||||
- store: store_balances
|
- store: store_balances
|
||||||
mode: deltas # This is the key property that simplifies `BalanceChange` handling
|
mode: deltas # This is the key property that simplifies `BalanceChange` handling
|
||||||
output:
|
output:
|
||||||
type: proto:tycho.evm.v1.BlockContractChanges
|
type: proto:tycho.evm.v1.BlockChanges
|
||||||
|
|||||||
Reference in New Issue
Block a user