From 42f2f45aa7187d5dee03f93e11cd2d8bdd314ec0 Mon Sep 17 00:00:00 2001 From: zizou <111426680+flopell@users.noreply.github.com> Date: Mon, 14 Oct 2024 18:09:17 +0200 Subject: [PATCH] refactor(substreams): Update ambient Substreams --- proto/buf.lock => buf.lock | 2 +- buf.yaml | 19 + proto/buf.yaml | 7 - substreams/Cargo.lock | 6 +- substreams/crates/tycho-substreams/Readme.md | 11 +- .../crates/tycho-substreams/buf.gen.yaml | 10 +- .../crates/tycho-substreams/src/models.rs | 2 +- .../crates/tycho-substreams/src/pb/mod.rs | 7 + .../src/pb/tycho.ambient.v1.rs | 33 ++ .../tycho-substreams/src/pb/tycho.evm.v1.rs | 4 +- substreams/ethereum-ambient/Cargo.lock | 108 +---- substreams/ethereum-ambient/Cargo.toml | 20 +- substreams/ethereum-ambient/Readme.md | 35 ++ substreams/ethereum-ambient/buf.gen.yaml | 18 +- .../ethereum-ambient/proto/ambient.proto | 27 ++ .../src/contracts/hotproxy.rs | 83 ++++ .../src/contracts/knockout.rs | 106 +++++ .../ethereum-ambient/src/contracts/main.rs | 170 ++++++++ .../src/contracts/micropaths.rs | 307 ++++++++++++++ .../ethereum-ambient/src/contracts/mod.rs | 16 + .../src/contracts/warmpath.rs | 87 ++++ substreams/ethereum-ambient/src/lib.rs | 391 +----------------- .../src/modules/1_map_pool_changes.rs | 127 ++++++ .../src/modules/2_store_pool_balances.rs | 18 + .../src/modules/2_store_pools.rs | 9 + .../src/modules/3_map_changes.rs | 367 ++++++++++++++++ .../ethereum-ambient/src/modules/mod.rs | 16 + substreams/ethereum-ambient/src/pb/mod.rs | 10 - .../ethereum-ambient/src/pb/tycho.evm.v1.rs | 183 -------- substreams/ethereum-ambient/src/utils.rs | 55 +++ substreams/ethereum-ambient/substreams.yaml | 37 +- 31 files changed, 1567 insertions(+), 724 deletions(-) rename proto/buf.lock => buf.lock (73%) create mode 100644 buf.yaml delete mode 100644 proto/buf.yaml create mode 100644 substreams/crates/tycho-substreams/src/pb/tycho.ambient.v1.rs create mode 100644 substreams/ethereum-ambient/Readme.md create mode 100644 substreams/ethereum-ambient/proto/ambient.proto create mode 100644 substreams/ethereum-ambient/src/contracts/hotproxy.rs create mode 100644 substreams/ethereum-ambient/src/contracts/knockout.rs create mode 100644 substreams/ethereum-ambient/src/contracts/main.rs create mode 100644 substreams/ethereum-ambient/src/contracts/micropaths.rs create mode 100644 substreams/ethereum-ambient/src/contracts/mod.rs create mode 100644 substreams/ethereum-ambient/src/contracts/warmpath.rs create mode 100644 substreams/ethereum-ambient/src/modules/1_map_pool_changes.rs create mode 100644 substreams/ethereum-ambient/src/modules/2_store_pool_balances.rs create mode 100644 substreams/ethereum-ambient/src/modules/2_store_pools.rs create mode 100644 substreams/ethereum-ambient/src/modules/3_map_changes.rs create mode 100644 substreams/ethereum-ambient/src/modules/mod.rs delete mode 100644 substreams/ethereum-ambient/src/pb/mod.rs delete mode 100644 substreams/ethereum-ambient/src/pb/tycho.evm.v1.rs create mode 100644 substreams/ethereum-ambient/src/utils.rs diff --git a/proto/buf.lock b/buf.lock similarity index 73% rename from proto/buf.lock rename to buf.lock index c91b581..4f98143 100644 --- a/proto/buf.lock +++ b/buf.lock @@ -1,2 +1,2 @@ # Generated by buf. DO NOT EDIT. -version: v1 +version: v2 diff --git a/buf.yaml b/buf.yaml new file mode 100644 index 0000000..6ee1fb4 --- /dev/null +++ b/buf.yaml @@ -0,0 +1,19 @@ +version: v2 +modules: + - path: proto + excludes: + - proto/sf + - path: substreams/ethereum-ambient/proto +lint: + use: + - BASIC + except: + - FIELD_NOT_REQUIRED + - PACKAGE_NO_IMPORT_CYCLE + disallow_comment_ignores: true +breaking: + use: + - FILE + except: + - EXTENSION_NO_DELETE + - FIELD_SAME_DEFAULT diff --git a/proto/buf.yaml b/proto/buf.yaml deleted file mode 100644 index d4ff52d..0000000 --- a/proto/buf.yaml +++ /dev/null @@ -1,7 +0,0 @@ -version: v1 -breaking: - use: - - FILE -lint: - use: - - BASIC diff --git a/substreams/Cargo.lock b/substreams/Cargo.lock index fc205c4..b7055cb 100644 --- a/substreams/Cargo.lock +++ b/substreams/Cargo.lock @@ -991,16 +991,20 @@ dependencies = [ [[package]] name = "substreams-ethereum-ambient" -version = "0.3.0" +version = "0.5.0" dependencies = [ "anyhow", "bytes", "ethabi 18.0.0", "hex", "hex-literal 0.4.1", + "num-bigint", "prost 0.11.9", + "quote", "substreams", "substreams-ethereum", + "tiny-keccak", + "tycho-substreams", ] [[package]] diff --git a/substreams/crates/tycho-substreams/Readme.md b/substreams/crates/tycho-substreams/Readme.md index 20f868b..44394fc 100644 --- a/substreams/crates/tycho-substreams/Readme.md +++ b/substreams/crates/tycho-substreams/Readme.md @@ -4,15 +4,12 @@ Some shared functionality that is used to create tycho substream packages. ## Protobuf Models -Protobuf models are manually synced from the `tycho-indexer` repository whenever they +Protobuf models are manually synced from the `tycho-indexer` repository whenever they changed. -To generate the rust structs run the following command from within the `./proto` +To generate the rust structs run the following command from within the root directory: ```bash -buf generate \ - --path tycho \ - --template ../substreams/crates/tycho-substreams/buf.gen.yaml \ - --output ../substreams/crates/tycho-substreams/ -``` \ No newline at end of file +buf generate --template substreams/crates/tycho-substreams/buf.gen.yaml --output substreams/crates/tycho-substreams/ +``` diff --git a/substreams/crates/tycho-substreams/buf.gen.yaml b/substreams/crates/tycho-substreams/buf.gen.yaml index 07f4f81..bc6db15 100644 --- a/substreams/crates/tycho-substreams/buf.gen.yaml +++ b/substreams/crates/tycho-substreams/buf.gen.yaml @@ -1,12 +1,10 @@ -version: v1 +version: v2 plugins: - - plugin: buf.build/community/neoeinstein-prost:v0.2.2 + - remote: buf.build/community/neoeinstein-prost:v0.2.2 out: src/pb opt: - file_descriptor_set=false - type_attribute=.tycho.evm.v1.Transaction=#[derive(Eq\, Hash)] - - - plugin: buf.build/community/neoeinstein-prost-crate:v0.3.1 + - remote: buf.build/community/neoeinstein-prost-crate:v0.3.1 out: src/pb - opt: - - no_features + opt: no_features diff --git a/substreams/crates/tycho-substreams/src/models.rs b/substreams/crates/tycho-substreams/src/models.rs index 333f172..359b2af 100644 --- a/substreams/crates/tycho-substreams/src/models.rs +++ b/substreams/crates/tycho-substreams/src/models.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use substreams_ethereum::pb::eth::v2::{self as sf, StorageChange}; // re-export the protobuf types here. -pub use crate::pb::tycho::evm::v1::*; +pub use crate::pb::tycho::{ambient::v1::*, evm::v1::*}; impl TransactionContractChanges { /// Creates a new empty `TransactionContractChanges` instance. diff --git a/substreams/crates/tycho-substreams/src/pb/mod.rs b/substreams/crates/tycho-substreams/src/pb/mod.rs index 43d8838..d6a992a 100644 --- a/substreams/crates/tycho-substreams/src/pb/mod.rs +++ b/substreams/crates/tycho-substreams/src/pb/mod.rs @@ -1,5 +1,12 @@ // @generated pub mod tycho { + pub mod ambient { + // @@protoc_insertion_point(attribute:tycho.ambient.v1) + pub mod v1 { + include!("tycho.ambient.v1.rs"); + // @@protoc_insertion_point(tycho.ambient.v1) + } + } pub mod evm { // @@protoc_insertion_point(attribute:tycho.evm.v1) pub mod v1 { diff --git a/substreams/crates/tycho-substreams/src/pb/tycho.ambient.v1.rs b/substreams/crates/tycho-substreams/src/pb/tycho.ambient.v1.rs new file mode 100644 index 0000000..5bfe909 --- /dev/null +++ b/substreams/crates/tycho-substreams/src/pb/tycho.ambient.v1.rs @@ -0,0 +1,33 @@ +// @generated +/// A change to a pool's balance. Ambient specific. +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AmbientBalanceDelta { + /// The address of the ERC20 token whose balance changed. + #[prost(bytes="vec", tag="1")] + pub pool_hash: ::prost::alloc::vec::Vec, + /// The token type: it can be base or quote. + #[prost(string, tag="2")] + pub token_type: ::prost::alloc::string::String, + /// The delta of the token. + #[prost(bytes="vec", tag="3")] + pub token_delta: ::prost::alloc::vec::Vec, + /// Used to determine the order of the balance changes. Necessary for the balance store. + #[prost(uint64, tag="4")] + pub ordinal: u64, + /// Transaction where the balance changed. + #[prost(message, optional, tag="5")] + pub tx: ::core::option::Option, +} +/// Ambient pool changes within a single block +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BlockPoolChanges { + /// New protocol components added in this block + #[prost(message, repeated, tag="1")] + pub protocol_components: ::prost::alloc::vec::Vec, + /// Balance changes to pools in this block + #[prost(message, repeated, tag="2")] + pub balance_deltas: ::prost::alloc::vec::Vec, +} +// @@protoc_insertion_point(module) diff --git a/substreams/crates/tycho-substreams/src/pb/tycho.evm.v1.rs b/substreams/crates/tycho-substreams/src/pb/tycho.evm.v1.rs index 39cbcb3..7f8c737 100644 --- a/substreams/crates/tycho-substreams/src/pb/tycho.evm.v1.rs +++ b/substreams/crates/tycho-substreams/src/pb/tycho.evm.v1.rs @@ -84,7 +84,8 @@ pub struct ProtocolComponent { /// Usually it is a single contract, but some protocols use multiple contracts. #[prost(bytes="vec", repeated, tag="3")] pub contracts: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, - /// Attributes of the component. Used mainly be the native integration. + /// Static attributes of the component. + /// These attributes MUST be immutable. If it can ever change, it should be given as an EntityChanges for this component id. /// The inner ChangeType of the attribute has to match the ChangeType of the ProtocolComponent. #[prost(message, repeated, tag="4")] pub static_att: ::prost::alloc::vec::Vec, @@ -186,6 +187,7 @@ pub struct TransactionChanges { pub balance_changes: ::prost::alloc::vec::Vec, } /// A set of transaction changes within a single block. +/// This message must be the output of your substreams module. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct BlockChanges { diff --git a/substreams/ethereum-ambient/Cargo.lock b/substreams/ethereum-ambient/Cargo.lock index 03629cf..6d24354 100644 --- a/substreams/ethereum-ambient/Cargo.lock +++ b/substreams/ethereum-ambient/Cargo.lock @@ -180,24 +180,7 @@ version = "17.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4966fba78396ff92db3b817ee71143eccd98acf0f876b8d600e585a670c5d1b" dependencies = [ - "ethereum-types 0.13.1", - "hex", - "once_cell", - "regex", - "serde", - "serde_json", - "sha3", - "thiserror", - "uint", -] - -[[package]] -name = "ethabi" -version = "18.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7413c5f74cc903ea37386a8965a936cbeb334bd270862fdece542c1b2dcbc898" -dependencies = [ - "ethereum-types 0.14.1", + "ethereum-types", "hex", "once_cell", "regex", @@ -215,22 +198,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "11da94e443c60508eb62cf256243a64da87304c2802ac2528847f79d750007ef" dependencies = [ "crunchy", - "fixed-hash 0.7.0", + "fixed-hash", "impl-rlp", - "impl-serde 0.3.2", - "tiny-keccak", -] - -[[package]] -name = "ethbloom" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c22d4b5885b6aa2fe5e8b9329fb8d232bf739e434e6b87347c63bdd00c120f60" -dependencies = [ - "crunchy", - "fixed-hash 0.8.0", - "impl-rlp", - "impl-serde 0.4.0", + "impl-serde", "tiny-keccak", ] @@ -240,25 +210,11 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b2827b94c556145446fcce834ca86b7abf0c39a805883fe20e72c5bfdb5a0dc6" dependencies = [ - "ethbloom 0.12.1", - "fixed-hash 0.7.0", + "ethbloom", + "fixed-hash", "impl-rlp", - "impl-serde 0.3.2", - "primitive-types 0.11.1", - "uint", -] - -[[package]] -name = "ethereum-types" -version = "0.14.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02d215cbf040552efcbe99a38372fe80ab9d00268e20012b79fcd0f073edd8ee" -dependencies = [ - "ethbloom 0.13.0", - "fixed-hash 0.8.0", - "impl-rlp", - "impl-serde 0.4.0", - "primitive-types 0.12.2", + "impl-serde", + "primitive-types", "uint", ] @@ -280,18 +236,6 @@ dependencies = [ "static_assertions", ] -[[package]] -name = "fixed-hash" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "835c052cb0c08c1acf6ffd71c022172e18723949c8282f2b9f27efbc51e64534" -dependencies = [ - "byteorder", - "rand", - "rustc-hex", - "static_assertions", -] - [[package]] name = "fixedbitset" version = "0.4.2" @@ -391,15 +335,6 @@ dependencies = [ "serde", ] -[[package]] -name = "impl-serde" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebc88fc67028ae3db0c853baa36269d398d5f45b6982f95549ff5def78c935cd" -dependencies = [ - "serde", -] - [[package]] name = "impl-trait-for-tuples" version = "0.2.2" @@ -584,23 +519,10 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e28720988bff275df1f51b171e1b2a18c30d194c4d2b61defdacecd625a5d94a" dependencies = [ - "fixed-hash 0.7.0", + "fixed-hash", "impl-codec", "impl-rlp", - "impl-serde 0.3.2", - "uint", -] - -[[package]] -name = "primitive-types" -version = "0.12.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b34d9fd68ae0b74a41b21c03c2f62847aa0ffea044eee893b4c140b37e244e2" -dependencies = [ - "fixed-hash 0.8.0", - "impl-codec", - "impl-rlp", - "impl-serde 0.4.0", + "impl-serde", "uint", ] @@ -883,7 +805,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97a176f39a6e09553c17a287edacd1854e5686fd20ffea3c9655dfc44d94b35e" dependencies = [ "anyhow", - "ethabi 17.2.0", + "ethabi", "heck", "hex", "prettyplease", @@ -895,12 +817,8 @@ dependencies = [ [[package]] name = "substreams-ethereum-ambient" -version = "0.3.0" +version = "0.4.0" dependencies = [ - "anyhow", - "bytes", - "ethabi 18.0.0", - "hex", "hex-literal 0.4.1", "prost", "substreams", @@ -914,7 +832,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db4700cfe408b75634a3c6b3a0caf7bddba4879601d2085c811485ea54cbde2d" dependencies = [ "bigdecimal", - "ethabi 17.2.0", + "ethabi", "getrandom", "num-bigint", "prost", @@ -929,7 +847,7 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40d6d278d926fe3f0775d996ee2b5e1dc822c1b4bf4f7bf07c7fbb5bce6c79a9" dependencies = [ - "ethabi 17.2.0", + "ethabi", "heck", "hex", "num-bigint", diff --git a/substreams/ethereum-ambient/Cargo.toml b/substreams/ethereum-ambient/Cargo.toml index f7d246b..45da3f0 100644 --- a/substreams/ethereum-ambient/Cargo.toml +++ b/substreams/ethereum-ambient/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "substreams-ethereum-ambient" -version = "0.3.0" +version = "0.5.0" edition = "2021" [lib] @@ -8,11 +8,15 @@ name = "substreams_ethereum_ambient" crate-type = ["cdylib"] [dependencies] -substreams = "0.5" -substreams-ethereum = "0.9" -prost = "0.11" -hex-literal = "0.4.1" -ethabi = "18.0.0" -hex = "0.4.2" +tycho-substreams.workspace = true +substreams.workspace = true +substreams-ethereum.workspace = true +prost.workspace = true +hex-literal.workspace = true +ethabi.workspace = true +hex.workspace = true bytes = "1.5.0" -anyhow = "1.0.75" +anyhow.workspace = true +tiny-keccak = "2.0.2" +num-bigint = { version = "0.4.4", features = [] } +quote = "1.0.33" diff --git a/substreams/ethereum-ambient/Readme.md b/substreams/ethereum-ambient/Readme.md new file mode 100644 index 0000000..af70922 --- /dev/null +++ b/substreams/ethereum-ambient/Readme.md @@ -0,0 +1,35 @@ +Substreams Ethereum Ambient Module +================================== + +Modules Description +------------------- + +### `map_pool_changes` + +* **Type**: Map +* **Purpose**: This module detects new pools within the Ethereum blockchain and balance changes. +* **Inputs**: Ethereum block data (`sf.ethereum.type.v2.Block`). +* **Output**: Emits data of type `proto:tycho.evm.state.v1.BlockPoolChanges`. + +### `store_pools_balances` + +* **Type**: Store +* **Purpose**: Accumulates and stores the balances of pools detected by `map_pool_changes`. It uses an additive update policy, implying that new values are added to existing balances. +* **Inputs**: Data mapped by `map_pool_changes`. + +### `store_pools` + +* **Type**: Store +* **Purpose**: Maintains a store of pool information using the `ProtocolComponent` data structure. This store is updated whenever `map_pool_changes` emits new pool data. +* **Inputs**: Data mapped by `map_pool_changes`. + +### `map_changes` + +* **Type**: Map +* **Purpose**: This module integrates all the processed information to generate comprehensive `BlockContractChanges`. It considers new pools, balance changes and contract changes. +* **Inputs**: + * Ethereum block data (`sf.ethereum.type.v2.Block`). + * Data from `map_pool_changes`. + * Data from `store_pools_balances`. + * Data from `store_pools`. +* **Output**: Emits `proto:tycho.evm.state.v1.BlockContractChanges`. diff --git a/substreams/ethereum-ambient/buf.gen.yaml b/substreams/ethereum-ambient/buf.gen.yaml index d2e6544..b028921 100644 --- a/substreams/ethereum-ambient/buf.gen.yaml +++ b/substreams/ethereum-ambient/buf.gen.yaml @@ -1,12 +1,8 @@ - -version: v1 +version: v2 plugins: -- plugin: buf.build/community/neoeinstein-prost:v0.2.2 - out: src/pb - opt: - - file_descriptor_set=false - -- plugin: buf.build/community/neoeinstein-prost-crate:v0.3.1 - out: src/pb - opt: - - no_features + - remote: buf.build/community/neoeinstein-prost:v0.2.2 + out: src/pb + opt: file_descriptor_set=false + - remote: buf.build/community/neoeinstein-prost-crate:v0.3.1 + out: src/pb + opt: no_features diff --git a/substreams/ethereum-ambient/proto/ambient.proto b/substreams/ethereum-ambient/proto/ambient.proto new file mode 100644 index 0000000..22afbbf --- /dev/null +++ b/substreams/ethereum-ambient/proto/ambient.proto @@ -0,0 +1,27 @@ +syntax = "proto3"; + +package tycho.ambient.v1; + +import "tycho/evm/v1/common.proto"; + +// A change to a pool's balance. Ambient specific. +message AmbientBalanceDelta { + // The address of the ERC20 token whose balance changed. + bytes pool_hash = 1; + // The token type: it can be base or quote. + string token_type = 2; + // The delta of the token. + bytes token_delta = 3; + // Used to determine the order of the balance changes. Necessary for the balance store. + uint64 ordinal = 4; + // Transaction where the balance changed. + tycho.evm.v1.Transaction tx = 5; +} + +// Ambient pool changes within a single block +message BlockPoolChanges { + // New protocol components added in this block + repeated tycho.evm.v1.ProtocolComponent protocol_components = 1; + // Balance changes to pools in this block + repeated AmbientBalanceDelta balance_deltas = 2; +} \ No newline at end of file diff --git a/substreams/ethereum-ambient/src/contracts/hotproxy.rs b/substreams/ethereum-ambient/src/contracts/hotproxy.rs new file mode 100644 index 0000000..c084ea1 --- /dev/null +++ b/substreams/ethereum-ambient/src/contracts/hotproxy.rs @@ -0,0 +1,83 @@ +use anyhow::{anyhow, bail}; + +use crate::utils::{decode_flows_from_output, encode_pool_hash}; +use ethabi::{decode, ParamType}; +use hex_literal::hex; +use substreams_ethereum::pb::eth::v2::Call; + +pub const AMBIENT_HOTPROXY_CONTRACT: [u8; 20] = hex!("37e00522Ce66507239d59b541940F99eA19fF81F"); +pub const USER_CMD_HOTPROXY_FN_SIG: [u8; 4] = hex!("f96dc788"); + +pub const SWAP_ABI_HOTPROXY_INPUT: &[ParamType] = &[ + ParamType::Address, // base + ParamType::Address, // quote + ParamType::Uint(256), // pool index + // isBuy - if true the direction of the swap is for the user to send base + // tokens and receive back quote tokens. + ParamType::Bool, + ParamType::Bool, // inBaseQty + ParamType::Uint(128), //qty + ParamType::Uint(16), // poolTip + ParamType::Uint(128), // limitPrice + ParamType::Uint(128), // minOut + ParamType::Uint(8), // reserveFlags +]; +const USER_CMD_EXTERNAL_ABI: &[ParamType] = &[ + ParamType::Bytes, // userCmd +]; + +pub fn decode_direct_swap_hotproxy_call( + call: &Call, +) -> Result<([u8; 32], ethabi::Int, ethabi::Int), anyhow::Error> { + if let Ok(external_cmd) = decode(USER_CMD_EXTERNAL_ABI, &call.input[4..]) { + let input_bytes = external_cmd[0] + .to_owned() + .into_bytes() // Convert Bytes32 to Vec + .ok_or_else(|| anyhow!("Failed to hotproxy userCmd input data.".to_string()))?; + + if let Ok(input_params) = decode(SWAP_ABI_HOTPROXY_INPUT, &input_bytes) { + let base_token = input_params[0] + .to_owned() + .into_address() + .ok_or_else(|| { + anyhow!( + "Failed to convert base token to address for direct swap hotproxy call: {:?}", + &input_params[0] + ) + })? + .to_fixed_bytes() + .to_vec(); + + let quote_token = input_params[1] + .to_owned() + .into_address() + .ok_or_else(|| { + anyhow!( + "Failed to convert quote token to address for direct swap hotproxy call: {:?}", + &input_params[1] + ) + })? + .to_fixed_bytes() + .to_vec(); + + let mut pool_index_buf = [0u8; 32]; + input_params[2] + .to_owned() + .into_uint() + .ok_or_else(|| { + anyhow!("Failed to convert pool index to u32 for direct swap hotproxy call" + .to_string()) + })? + .to_big_endian(&mut pool_index_buf); + let pool_index = pool_index_buf.to_vec(); + + let (base_flow, quote_flow) = decode_flows_from_output(call)?; + let pool_hash = encode_pool_hash(base_token, quote_token, pool_index); + Ok((pool_hash, base_flow, quote_flow)) + } else { + bail!("Failed to decode hotproxy swap call internap inputs.".to_string()); + } + } else { + bail!("Failed to decode hotproxy swap call external input.".to_string()); + } +} diff --git a/substreams/ethereum-ambient/src/contracts/knockout.rs b/substreams/ethereum-ambient/src/contracts/knockout.rs new file mode 100644 index 0000000..7a5234d --- /dev/null +++ b/substreams/ethereum-ambient/src/contracts/knockout.rs @@ -0,0 +1,106 @@ +use anyhow::{anyhow, bail}; + +use crate::utils::{decode_flows_from_output, encode_pool_hash}; +use ethabi::{decode, ParamType}; +use hex_literal::hex; +use substreams_ethereum::pb::eth::v2::Call; + +pub const AMBIENT_KNOCKOUT_CONTRACT: [u8; 20] = hex!("7F5D75AdE75646919c923C98D53E9Cc7Be7ea794"); +pub const USER_CMD_KNOCKOUT_FN_SIG: [u8; 4] = hex!("f96dc788"); + +// Represents the ABI of any cmd which is not mint or burn +const KNOCKOUT_INTERNAL_OTHER_CMD_ABI: &[ParamType] = &[ + ParamType::Uint(8), + ParamType::Address, // base + ParamType::Address, // quote + ParamType::Uint(256), // poolIdx + ParamType::Int(24), + ParamType::Int(24), + ParamType::Bool, + ParamType::Uint(8), + ParamType::Uint(256), + ParamType::Uint(256), + ParamType::Uint(32), +]; +const KNOCKOUT_INTERNAL_MINT_BURN_ABI: &[ParamType] = &[ + ParamType::Uint(8), + ParamType::Address, // base + ParamType::Address, // quote + ParamType::Uint(256), // poolIdx + ParamType::Int(24), + ParamType::Int(24), + ParamType::Bool, + ParamType::Uint(8), + ParamType::Uint(256), + ParamType::Uint(256), + ParamType::Uint(128), + ParamType::Bool, +]; + +const KNOCKOUT_EXTERNAL_ABI: &[ParamType] = &[ + ParamType::Bytes, // userCmd +]; + +pub fn decode_knockout_call( + call: &Call, +) -> Result<([u8; 32], ethabi::Int, ethabi::Int), anyhow::Error> { + if let Ok(external_cmd) = decode(KNOCKOUT_EXTERNAL_ABI, &call.input[4..]) { + let input_data = external_cmd[0] + .to_owned() + .into_bytes() // Convert Bytes32 to Vec + .ok_or_else(|| anyhow!("Failed to Knockout userCmd input data.".to_string()))?; + + let code = input_data[31]; + let is_mint = code == 91; + let is_burn = code == 92; + + let abi = if is_mint || is_burn { + KNOCKOUT_INTERNAL_MINT_BURN_ABI + } else { + KNOCKOUT_INTERNAL_OTHER_CMD_ABI + }; + + if let Ok(mint_burn_inputs) = decode(abi, &input_data) { + let base_token = mint_burn_inputs[1] + .to_owned() + .into_address() + .ok_or_else(|| { + anyhow!( + "Failed to convert base token to address for knockout call: {:?}", + &mint_burn_inputs[1] + ) + })? + .to_fixed_bytes() + .to_vec(); + let quote_token = mint_burn_inputs[2] + .to_owned() + .into_address() + .ok_or_else(|| { + anyhow!( + "Failed to convert quote token to address for knockout call: {:?}", + &mint_burn_inputs[2] + ) + })? + .to_fixed_bytes() + .to_vec(); + + let mut pool_index_buf = [0u8; 32]; + mint_burn_inputs[3] + .to_owned() + .into_uint() + .ok_or_else(|| { + anyhow!("Failed to convert pool index to bytes for knockout call".to_string()) + })? + .to_big_endian(&mut pool_index_buf); + let pool_index = pool_index_buf.to_vec(); + + let (base_flow, quote_flow) = decode_flows_from_output(call)?; + let pool_hash = encode_pool_hash(base_token, quote_token, pool_index); + Ok((pool_hash, base_flow, quote_flow)) + } else { + bail!("Failed to decode knockout call outputs.".to_string()); + } + } else { + bail!("Failed to decode inputs for knockout call.".to_string()); + } +} diff --git a/substreams/ethereum-ambient/src/contracts/main.rs b/substreams/ethereum-ambient/src/contracts/main.rs new file mode 100644 index 0000000..2b273b5 --- /dev/null +++ b/substreams/ethereum-ambient/src/contracts/main.rs @@ -0,0 +1,170 @@ +use anyhow::{anyhow, bail}; +use tycho_substreams::models::{ + Attribute, ChangeType, FinancialType, ImplementationType, ProtocolComponent, ProtocolType, + Transaction, +}; + +use crate::utils::{decode_flows_from_output, encode_pool_hash}; +use ethabi::{decode, ParamType}; +use hex_literal::hex; +use substreams_ethereum::pb::eth::v2::Call; + +pub const AMBIENT_CONTRACT: [u8; 20] = hex!("aaaaaaaaa24eeeb8d57d431224f73832bc34f688"); +pub const USER_CMD_FN_SIG: [u8; 4] = hex!("a15112f9"); + +const USER_CMD_EXTERNAL_ABI: &[ParamType] = &[ + // index of the proxy sidecar the command is being called on + ParamType::Uint(16), + // call data for internal UserCmd method + ParamType::Bytes, +]; +const USER_CMD_INTERNAL_ABI: &[ParamType] = &[ + ParamType::Uint(8), // command + ParamType::Address, // base + ParamType::Address, // quote + ParamType::Uint(256), // pool index + ParamType::Uint(128), // price +]; + +pub const INIT_POOL_CODE: u8 = 71; + +pub const SWAP_ABI_INPUT: &[ParamType] = &[ + ParamType::Address, // base + ParamType::Address, // quote + ParamType::Uint(256), // pool index + // isBuy - if true the direction of the swap is for the user to send base + // tokens and receive back quote tokens. + ParamType::Bool, + ParamType::Bool, // inBaseQty + ParamType::Uint(128), //qty + ParamType::Uint(16), // poolTip + ParamType::Uint(128), // limitPrice + ParamType::Uint(128), // minOut + ParamType::Uint(8), // reserveFlags +]; + +// MicroPaths fn sigs +pub const SWAP_FN_SIG: [u8; 4] = hex!("3d719cd9"); + +pub fn decode_direct_swap_call( + call: &Call, +) -> Result<([u8; 32], ethabi::Int, ethabi::Int), anyhow::Error> { + if let Ok(external_input_params) = decode(SWAP_ABI_INPUT, &call.input[4..]) { + let base_token = external_input_params[0] + .to_owned() + .into_address() + .ok_or_else(|| { + anyhow!( + "Failed to convert base token to address for direct swap call: {:?}", + &external_input_params[0] + ) + })? + .to_fixed_bytes() + .to_vec(); + + let quote_token = external_input_params[1] + .to_owned() + .into_address() + .ok_or_else(|| { + anyhow!( + "Failed to convert quote token to address for direct swap call: {:?}", + &external_input_params[1] + ) + })? + .to_fixed_bytes() + .to_vec(); + + let mut pool_index_buf = [0u8; 32]; + external_input_params[2] + .to_owned() + .into_uint() + .ok_or_else(|| { + anyhow!("Failed to convert pool index to u32 for direct swap call".to_string()) + })? + .to_big_endian(&mut pool_index_buf); + let pool_index = pool_index_buf.to_vec(); + + let (base_flow, quote_flow) = decode_flows_from_output(call)?; + let pool_hash = encode_pool_hash(base_token, quote_token, pool_index); + Ok((pool_hash, base_flow, quote_flow)) + } else { + bail!("Failed to decode swap call inputs.".to_string()); + } +} +pub fn decode_pool_init( + call: &Call, + tx: Transaction, +) -> Result, anyhow::Error> { + // Decode external call to UserCmd + if let Ok(external_params) = decode(USER_CMD_EXTERNAL_ABI, &call.input[4..]) { + let cmd_bytes = external_params[1] + .to_owned() + .into_bytes() + .ok_or_else(|| anyhow!("Failed to convert to bytes: {:?}", &external_params[1]))?; + + // Call data is structured differently depending on the cmd code, so only + // decode if this is an init pool code. + if cmd_bytes[31] == INIT_POOL_CODE { + // Decode internal call to UserCmd + if let Ok(internal_params) = decode(USER_CMD_INTERNAL_ABI, &cmd_bytes) { + let base = internal_params[1] + .to_owned() + .into_address() + .ok_or_else(|| { + anyhow!("Failed to convert to address: {:?}", &internal_params[1]) + })? + .to_fixed_bytes() + .to_vec(); + + let quote = internal_params[2] + .to_owned() + .into_address() + .ok_or_else(|| { + anyhow!("Failed to convert to address: {:?}", &internal_params[2]) + })? + .to_fixed_bytes() + .to_vec(); + + let mut pool_index_buf = [0u8; 32]; + internal_params[3] + .to_owned() + .into_uint() + .ok_or_else(|| anyhow!("Failed to convert to u32".to_string()))? + .to_big_endian(&mut pool_index_buf); + let pool_index = pool_index_buf.to_vec(); + let pool_hash = encode_pool_hash(base.clone(), quote.clone(), pool_index.clone()); + + let static_attribute = Attribute { + name: String::from("pool_index"), + value: pool_index, + change: ChangeType::Creation.into(), + }; + + let mut tokens: Vec> = vec![base.clone(), quote.clone()]; + tokens.sort(); + + let new_component = ProtocolComponent { + id: hex::encode(pool_hash), + tokens, + contracts: vec![AMBIENT_CONTRACT.to_vec()], + static_att: vec![static_attribute], + change: ChangeType::Creation.into(), + protocol_type: Some(ProtocolType { + name: "ambient_pool".to_string(), + attribute_schema: vec![], + financial_type: FinancialType::Swap.into(), + implementation_type: ImplementationType::Vm.into(), + }), + tx: Some(tx.clone()), + }; + Ok(Some(new_component)) + } else { + bail!("Failed to decode ABI internal call.".to_string()); + } + } else { + Ok(None) + } + } else { + bail!("Failed to decode ABI external call.".to_string()); + } +} diff --git a/substreams/ethereum-ambient/src/contracts/micropaths.rs b/substreams/ethereum-ambient/src/contracts/micropaths.rs new file mode 100644 index 0000000..4835b94 --- /dev/null +++ b/substreams/ethereum-ambient/src/contracts/micropaths.rs @@ -0,0 +1,307 @@ +use anyhow::{anyhow, bail}; + +use ethabi::{decode, ParamType}; +use hex_literal::hex; +use substreams_ethereum::pb::eth::v2::Call; + +pub const AMBIENT_MICROPATHS_CONTRACT: [u8; 20] = hex!("f241bEf0Ea64020655C70963ef81Fea333752367"); + +pub const SWEEP_SWAP_FN_SIG: [u8; 4] = hex!("7b370fc2"); + +pub const MINT_AMBIENT_FN_SIG: [u8; 4] = hex!("2ee11587"); +pub const MINT_RANGE_FN_SIG: [u8; 4] = hex!("2370632b"); +pub const BURN_AMBIENT_FN_SIG: [u8; 4] = hex!("2a6f0864"); +pub const BURN_RANGE_FN_SIG: [u8; 4] = hex!("7c6dfe3d"); + +// ABI for the mintAmbient function with return values +pub const MINT_AMBIENT_RETURN_ABI: &[ParamType] = &[ + ParamType::Int(128), // int128 baseFlow + ParamType::Int(128), // int128 quoteFlow + ParamType::Uint(128), // uint128 seedOut +]; + +// ABI for the mintAmbient function parameters +const MINT_AMBIENT_ABI: &[ParamType] = &[ + ParamType::Uint(128), // uint128 price + ParamType::Uint(128), // uint128 seed + ParamType::Uint(128), // uint128 conc + ParamType::Uint(64), // uint64 seedGrowth + ParamType::Uint(64), // uint64 concGrowth + ParamType::Uint(128), // uint128 liq + ParamType::FixedBytes(32), // bytes32 poolHash +]; + +// ABI for the burnRange function +const BURN_RANGE_ABI: &[ParamType] = &[ + ParamType::Uint(128), // price + ParamType::Int(24), // priceTick + ParamType::Uint(128), // seed + ParamType::Uint(128), // conc + ParamType::Uint(64), // seedGrowth + ParamType::Uint(64), // concGrowth + ParamType::Int(24), // lowTick + ParamType::Int(24), // highTick + ParamType::Uint(128), // liq + ParamType::FixedBytes(32), // poolHash +]; + +const BURN_RANGE_RETURN_ABI: &[ParamType] = &[ + ParamType::Int(128), // baseFlow + ParamType::Int(128), // quoteFlow + ParamType::Uint(128), // seedOut + ParamType::Uint(128), // concOut +]; + +// ABI for the burnAmbient function with return values +const BURN_AMBIENT_RETURN_ABI: &[ParamType] = &[ + ParamType::Int(128), // int128 baseFlow + ParamType::Int(128), // int128 quoteFlow + ParamType::Uint(128), // uint128 seedOut +]; + +// ABI for the burnAmbient function +const BURN_AMBIENT_ABI: &[ParamType] = &[ + ParamType::Uint(128), // uint128 price + ParamType::Uint(128), // uint128 seed + ParamType::Uint(128), // uint128 conc + ParamType::Uint(64), // uint64 seedGrowth + ParamType::Uint(64), // uint64 concGrowth + ParamType::Uint(128), // uint128 liq + ParamType::FixedBytes(32), // bytes32 poolHash +]; + +// ABI for the mintRange function parameters +const MINT_RANGE_ABI: &[ParamType] = &[ + ParamType::Uint(128), // price + ParamType::Int(24), // priceTick + ParamType::Uint(128), // seed + ParamType::Uint(128), // conc + ParamType::Uint(64), // seedGrowth + ParamType::Uint(64), // concGrowth + ParamType::Int(24), // lowTick + ParamType::Int(24), // highTick + ParamType::Uint(128), // liq + ParamType::FixedBytes(32), // poolHash +]; + +// ABI for the mintRange function with return values +const MINT_RANGE_RETURN_ABI: &[ParamType] = &[ + ParamType::Int(128), // baseFlow + ParamType::Int(128), // quoteFlow + ParamType::Uint(128), // seedOut + ParamType::Uint(128), // concOut +]; +pub fn decode_mint_range_call( + call: &Call, +) -> Result<([u8; 32], ethabi::Int, ethabi::Int), anyhow::Error> { + if let Ok(mint_range) = decode(MINT_RANGE_ABI, &call.input[4..]) { + let pool_hash: [u8; 32] = mint_range[9] + .to_owned() + .into_fixed_bytes() + .ok_or_else(|| anyhow!("Failed to convert pool hash to fixed bytes".to_string()))? + .try_into() + .unwrap(); + + if let Ok(external_outputs) = decode(MINT_RANGE_RETURN_ABI, &call.return_data) { + let base_flow = external_outputs[0] + .to_owned() + .into_int() // Needs conversion into bytes for next step + .ok_or_else(|| anyhow!("Failed to convert base flow to i128".to_string()))?; + + let quote_flow = external_outputs[1] + .to_owned() + .into_int() // Needs conversion into bytes for next step + .ok_or_else(|| anyhow!("Failed to convert quote flow to i128".to_string()))?; + Ok((pool_hash, base_flow, quote_flow)) + } else { + bail!("Failed to decode swap call outputs.".to_string()); + } + } else { + bail!("Failed to decode inputs for WarmPath userCmd call.".to_string()); + } +} + +pub fn decode_burn_ambient_call( + call: &Call, +) -> Result<([u8; 32], ethabi::Int, ethabi::Int), anyhow::Error> { + if let Ok(burn_ambient) = decode(BURN_AMBIENT_ABI, &call.input[4..]) { + let pool_hash: [u8; 32] = burn_ambient[6] + .to_owned() + .into_fixed_bytes() + .ok_or_else(|| anyhow!("Failed to convert pool hash to bytes".to_string()))? + .try_into() + .unwrap(); + + if let Ok(external_outputs) = decode(BURN_AMBIENT_RETURN_ABI, &call.return_data) { + let base_flow = external_outputs[0] + .to_owned() + .into_int() + .ok_or_else(|| anyhow!("Failed to convert base flow to i128".to_string()))?; + + let quote_flow = external_outputs[1] + .to_owned() + .into_int() + .ok_or_else(|| anyhow!("Failed to convert quote flow to i128".to_string()))?; + + Ok((pool_hash, base_flow, quote_flow)) + } else { + bail!("Failed to decode burnAmbient call outputs.".to_string()); + } + } else { + bail!("Failed to decode inputs for burnAmbient call.".to_string()); + } +} + +pub fn decode_mint_ambient_call( + call: &Call, +) -> Result<([u8; 32], ethabi::Int, ethabi::Int), anyhow::Error> { + if let Ok(mint_ambient) = decode(MINT_AMBIENT_ABI, &call.input[4..]) { + let pool_hash: [u8; 32] = mint_ambient[6] + .to_owned() + .into_fixed_bytes() + .ok_or_else(|| anyhow!("Failed to convert pool hash to bytes".to_string()))? + .try_into() + .unwrap(); + + if let Ok(external_outputs) = decode(MINT_AMBIENT_RETURN_ABI, &call.return_data) { + let base_flow = external_outputs[0] + .to_owned() + .into_int() + .ok_or_else(|| anyhow!("Failed to convert base flow to i128".to_string()))?; + + let quote_flow = external_outputs[1] + .to_owned() + .into_int() + .ok_or_else(|| anyhow!("Failed to convert quote flow to i128".to_string()))?; + + Ok((pool_hash, base_flow, quote_flow)) + } else { + bail!("Failed to decode mintAmbient call outputs.".to_string()); + } + } else { + bail!("Failed to decode inputs for mintAmbient call.".to_string()); + } +} + +pub fn decode_burn_range_call( + call: &Call, +) -> Result<([u8; 32], ethabi::Int, ethabi::Int), anyhow::Error> { + if let Ok(burn_range) = decode(BURN_RANGE_ABI, &call.input[4..]) { + let pool_hash: [u8; 32] = burn_range[9] + .to_owned() + .into_fixed_bytes() // Convert Bytes32 to Vec + .ok_or_else(|| anyhow!("Failed to convert pool hash to bytes".to_string()))? + .try_into() + .unwrap(); + + if let Ok(external_outputs) = decode(BURN_RANGE_RETURN_ABI, &call.return_data) { + let base_flow = external_outputs[0] + .to_owned() + .into_int() + .ok_or_else(|| anyhow!("Failed to convert base flow to i128".to_string()))?; + + let quote_flow = external_outputs[1] + .to_owned() + .into_int() + .ok_or_else(|| anyhow!("Failed to convert quote flow to i128".to_string()))?; + + Ok((pool_hash, base_flow, quote_flow)) + } else { + bail!("Failed to decode burnRange call outputs.".to_string()); + } + } else { + bail!("Failed to decode inputs for burnRange call.".to_string()); + } +} + +pub fn decode_sweep_swap_call( + call: &Call, +) -> Result<([u8; 32], ethabi::Int, ethabi::Int), anyhow::Error> { + let sweep_swap_abi: &[ParamType] = &[ + ParamType::Tuple(vec![ + ParamType::Uint(128), + ParamType::Uint(128), + ParamType::Uint(128), + ParamType::Uint(64), + ParamType::Uint(64), + ]), // CurveState + ParamType::Int(24), // midTick + ParamType::Tuple(vec![ + ParamType::Bool, + ParamType::Bool, + ParamType::Uint(8), + ParamType::Uint(128), + ParamType::Uint(128), + ]), // SwapDirective + ParamType::Tuple(vec![ + ParamType::Tuple(vec![ + ParamType::Uint(8), // schema + ParamType::Uint(16), // feeRate + ParamType::Uint(8), // protocolTake + ParamType::Uint(16), // tickSize + ParamType::Uint(8), // jitThresh + ParamType::Uint(8), // knockoutBits + ParamType::Uint(8), // oracleFlags + ]), + ParamType::FixedBytes(32), // poolHash + ParamType::Address, + ]), // PoolCursor + ]; + let sweep_swap_abi_output: &[ParamType] = &[ + ParamType::Tuple(vec![ + ParamType::Int(128), // baseFlow + ParamType::Int(128), // quoteFlow + ParamType::Uint(128), + ParamType::Uint(128), + ]), // Chaining.PairFlow memory accum + ParamType::Uint(128), // priceOut + ParamType::Uint(128), // seedOut + ParamType::Uint(128), // concOut + ParamType::Uint(64), // ambientOut + ParamType::Uint(64), // concGrowthOut + ]; + if let Ok(sweep_swap_input) = decode(sweep_swap_abi, &call.input[4..]) { + let pool_cursor = sweep_swap_input[3] + .to_owned() + .into_tuple() + .ok_or_else(|| { + anyhow!("Failed to convert pool cursor to tuple for sweepSwap call".to_string()) + })?; + let pool_hash: [u8; 32] = pool_cursor[1] + .to_owned() + .into_fixed_bytes() + .ok_or_else(|| { + anyhow!("Failed to convert pool hash to fixed bytes for sweepSwap call".to_string()) + })? + .try_into() + .unwrap(); + if let Ok(sweep_swap_output) = decode(sweep_swap_abi_output, &call.return_data) { + let pair_flow = sweep_swap_output[0] + .to_owned() + .into_tuple() + .ok_or_else(|| { + anyhow!("Failed to convert pair flow to tuple for sweepSwap call".to_string()) + })?; + + let base_flow = pair_flow[0] + .to_owned() + .into_int() // Needs conversion into bytes for next step + .ok_or_else(|| { + anyhow!("Failed to convert base flow to i128 for sweepSwap call".to_string()) + })?; + + let quote_flow = pair_flow[1] + .to_owned() + .into_int() // Needs conversion into bytes for next step + .ok_or_else(|| { + anyhow!("Failed to convert quote flow to i128 for sweepSwap call".to_string()) + })?; + + Ok((pool_hash, base_flow, quote_flow)) + } else { + bail!("Failed to decode sweepSwap outputs.".to_string()); + } + } else { + bail!("Failed to decode sweepSwap inputs.".to_string()); + } +} diff --git a/substreams/ethereum-ambient/src/contracts/mod.rs b/substreams/ethereum-ambient/src/contracts/mod.rs new file mode 100644 index 0000000..ecae537 --- /dev/null +++ b/substreams/ethereum-ambient/src/contracts/mod.rs @@ -0,0 +1,16 @@ +// @generated +pub mod warmpath { + include!("warmpath.rs"); +} +pub mod micropaths { + include!("micropaths.rs"); +} +pub mod knockout { + include!("knockout.rs"); +} +pub mod hotproxy { + include!("hotproxy.rs"); +} +pub mod main { + include!("main.rs"); +} diff --git a/substreams/ethereum-ambient/src/contracts/warmpath.rs b/substreams/ethereum-ambient/src/contracts/warmpath.rs new file mode 100644 index 0000000..29a680b --- /dev/null +++ b/substreams/ethereum-ambient/src/contracts/warmpath.rs @@ -0,0 +1,87 @@ +use anyhow::{anyhow, bail}; + +use crate::utils::{decode_flows_from_output, encode_pool_hash}; +use ethabi::{decode, ParamType}; +use hex_literal::hex; +use substreams_ethereum::pb::eth::v2::Call; + +pub const AMBIENT_WARMPATH_CONTRACT: [u8; 20] = hex!("d268767BE4597151Ce2BB4a70A9E368ff26cB195"); +pub const USER_CMD_WARMPATH_FN_SIG: [u8; 4] = hex!("f96dc788"); +const USER_CMD_EXTERNAL_ABI: &[ParamType] = &[ + ParamType::Bytes, // userCmd +]; + +const LIQUIDITY_CHANGE_ABI: &[ParamType] = &[ + ParamType::Uint(8), + ParamType::Address, // base + ParamType::Address, // quote + ParamType::Uint(256), // pool index + ParamType::Int(256), + ParamType::Uint(128), + ParamType::Uint(128), + ParamType::Uint(128), + ParamType::Uint(8), + ParamType::Address, +]; +pub fn decode_warm_path_user_cmd_call( + call: &Call, +) -> Result, anyhow::Error> { + if let Ok(external_cmd) = decode(USER_CMD_EXTERNAL_ABI, &call.input[4..]) { + let input_bytes = external_cmd[0] + .to_owned() + .into_bytes() // Convert Bytes32 to Vec + .ok_or_else(|| anyhow!("Failed to hotproxy userCmd input data.".to_string()))?; + + let code = input_bytes[31]; + let is_mint = [1, 11, 12, 3, 31, 32].contains(&code); + let is_burn = [2, 21, 22, 4, 41, 42].contains(&code); + let is_harvest = code == 5; + if is_mint || is_burn || is_harvest { + if let Ok(liquidity_change_calldata) = decode(LIQUIDITY_CHANGE_ABI, &input_bytes) { + let base_token = liquidity_change_calldata[1] + .to_owned() + .into_address() + .ok_or_else(|| { + anyhow!( + "Failed to convert base token to address for WarmPath userCmd call: {:?}", + &liquidity_change_calldata[1] + ) + })? + .to_fixed_bytes() + .to_vec(); + let quote_token = liquidity_change_calldata[2] + .to_owned() + .into_address() + .ok_or_else(|| { + anyhow!( + "Failed to convert quote token to address for WarmPath userCmd call: {:?}", + &liquidity_change_calldata[2] + ) + })? + .to_fixed_bytes() + .to_vec(); + + let mut pool_index_buf = [0u8; 32]; + liquidity_change_calldata[3] + .to_owned() + .into_uint() + .ok_or_else(|| { + anyhow!("Failed to convert pool index to bytes for WarmPath userCmd call" + .to_string()) + })? + .to_big_endian(&mut pool_index_buf); + let pool_index = pool_index_buf.to_vec(); + + let (base_flow, quote_flow) = decode_flows_from_output(call)?; + let pool_hash = encode_pool_hash(base_token, quote_token, pool_index); + Ok(Some((pool_hash, base_flow, quote_flow))) + } else { + bail!("Failed to decode inputs for WarmPath userCmd call.".to_string()); + } + } else { + Ok(None) + } + } else { + bail!("Failed to decode WarmPath call external input.".to_string()); + } +} diff --git a/substreams/ethereum-ambient/src/lib.rs b/substreams/ethereum-ambient/src/lib.rs index 5537a72..810581c 100644 --- a/substreams/ethereum-ambient/src/lib.rs +++ b/substreams/ethereum-ambient/src/lib.rs @@ -1,388 +1,5 @@ -use std::collections::{hash_map::Entry, HashMap}; +mod contracts; -use anyhow::{anyhow, bail}; -use ethabi::{decode, ParamType}; -use hex_literal::hex; -use substreams_ethereum::pb::eth::{self}; - -use pb::tycho::evm::v1::{self as tycho}; - -mod pb; - -const AMBIENT_CONTRACT: [u8; 20] = hex!("aaaaaaaaa24eeeb8d57d431224f73832bc34f688"); -const INIT_POOL_CODE: u8 = 71; -const USER_CMD_FN_SIG: [u8; 4] = [0xA1, 0x51, 0x12, 0xF9]; - -struct SlotValue { - new_value: Vec, - start_value: Vec, -} - -impl SlotValue { - fn has_changed(&self) -> bool { - self.start_value != self.new_value - } -} - -// Uses a map for slots, protobuf does not allow bytes in hashmap keys -struct InterimContractChange { - address: Vec, - balance: Vec, - code: Vec, - slots: HashMap, SlotValue>, - change: tycho::ChangeType, -} - -impl From for tycho::ContractChange { - fn from(value: InterimContractChange) -> Self { - tycho::ContractChange { - address: value.address, - balance: value.balance, - code: value.code, - slots: value - .slots - .into_iter() - .filter(|(_, value)| value.has_changed()) - .map(|(slot, value)| tycho::ContractSlot { slot, value: value.new_value }) - .collect(), - change: value.change.into(), - } - } -} - -/// Extracts all contract changes relevant to vm simulations -/// -/// This is the main logic of the substreams integration. It takes a raw ethereum block on input and -/// extracts the BlockContractChanges stream. It includes tracking: -/// - new pool initializations -/// - all storage slot changes for the Ambient contract -/// - all ERC20 balance changes for the Ambient pools -/// - all code changes and balance updates of the Ambient contract -/// -/// Generally we detect all changes in transactions sequentially and detect if it is a CREATE or -/// UPDATE change based on already present data. -#[substreams::handlers::map] -fn map_changes( - block: eth::v2::Block, -) -> Result { - let mut block_changes = tycho::BlockContractChanges::default(); - - let mut tx_change = tycho::TransactionContractChanges::default(); - - let mut changed_contracts: HashMap, InterimContractChange> = HashMap::new(); - - // Collect all accounts created in this block - let created_accounts: HashMap<_, _> = block - .transactions() - .flat_map(|tx| { - tx.calls.iter().flat_map(|call| { - call.account_creations - .iter() - .map(|ac| (&ac.account, ac.ordinal)) - }) - }) - .collect(); - - for block_tx in block.transactions() { - // Extract storage changes for all contracts relevant to this protocol system. - // Ambient is a protocol system consisting of many ProtocolComponents (one for each pool), - // but they all share the same AMBIENT_CONTRACT contract. - let mut storage_changes = block_tx - .calls - .iter() - .filter(|call| !call.state_reverted) - .flat_map(|call| { - call.storage_changes - .iter() - .filter(|c| c.address == AMBIENT_CONTRACT) - }) - .collect::>(); - storage_changes.sort_unstable_by_key(|change| change.ordinal); - - // Detect all call to the Ambient contracts, even inner calls - let ambient_calls = block_tx - .calls - .iter() - .filter(|call| !call.state_reverted) - .filter(|call| call.address == AMBIENT_CONTRACT) - .collect::>(); - - // Detect all pool initializations - // Official documentation: https://docs.ambient.finance/developers/dex-contract-interface/pool-initialization - for call in ambient_calls { - if call.input.len() < 4 { - continue; - } - if call.input[0..4] == USER_CMD_FN_SIG { - let user_cmd_external_abi_types = &[ - // index of the proxy sidecar the command is being called on - ParamType::Uint(16), - // call data for internal UserCmd method - ParamType::Bytes, - ]; - let user_cmd_internal_abi_types = &[ - ParamType::Uint(8), // command - ParamType::Address, // base - ParamType::Address, // quote - ParamType::Uint(256), // pool index - ParamType::Uint(128), // price - ]; - - // Decode external call to UserCmd - if let Ok(external_params) = decode(user_cmd_external_abi_types, &call.input[4..]) { - let cmd_bytes = external_params[1] - .to_owned() - .into_bytes() - .ok_or_else(|| { - anyhow!("Failed to convert to bytes: {:?}", &external_params[1]) - })?; - - // Call data is structured differently depending on the cmd code, so only - // decode if this is an init pool code. - if cmd_bytes[31] == INIT_POOL_CODE { - // Decode internal call to UserCmd - if let Ok(internal_params) = decode(user_cmd_internal_abi_types, &cmd_bytes) - { - let base = internal_params[1] - .to_owned() - .into_address() - .ok_or_else(|| { - anyhow!( - "Failed to convert to address: {:?}", - &internal_params[1] - ) - })? - .to_fixed_bytes() - .to_vec(); - - let quote = internal_params[2] - .to_owned() - .into_address() - .ok_or_else(|| { - anyhow!( - "Failed to convert to address: {:?}", - &internal_params[2] - ) - })? - .to_fixed_bytes() - .to_vec(); - - let pool_index = internal_params[3] - .to_owned() - .into_uint() - .ok_or_else(|| anyhow!("Failed to convert to u32".to_string()))? - .as_u32(); - - let static_attribute = tycho::Attribute { - name: String::from("pool_index"), - value: pool_index.to_be_bytes().to_vec(), - change: tycho::ChangeType::Creation.into(), - }; - - let mut tokens: Vec> = vec![base.clone(), quote.clone()]; - tokens.sort(); - - let new_component = tycho::ProtocolComponent { - id: format!( - "{}{}{}", - hex::encode(base.clone()), - hex::encode(quote.clone()), - pool_index - ), - tokens, - contracts: vec![AMBIENT_CONTRACT.to_vec()], - static_att: vec![static_attribute], - change: tycho::ChangeType::Creation.into(), - }; - tx_change - .component_changes - .push(new_component); - } else { - bail!("Failed to decode ABI internal call.".to_string()); - } - } - } else { - bail!("Failed to decode ABI external call.".to_string()); - } - } - } - - // Extract all contract changes. - // We cache the data in a general interim contract > slot > value data structure. - // Note: some contracts change slot values and change them back to their - // original value before the transactions ends we remember the initial - // value before the first change and in the end filter found deltas - // that ended up not actually changing anything. - for storage_change in storage_changes.iter() { - match changed_contracts.entry(storage_change.address.clone()) { - // We have already an entry recording a change about this contract - // only append the change about this storage slot - Entry::Occupied(mut e) => { - let contract_change = e.get_mut(); - match contract_change - .slots - .entry(storage_change.key.clone()) - { - // The storage slot was already changed before, simply - // update new_value - Entry::Occupied(mut v) => { - let slot_value = v.get_mut(); - slot_value - .new_value - .copy_from_slice(&storage_change.new_value); - } - // The storage slots is being initialised for the first time - Entry::Vacant(v) => { - v.insert(SlotValue { - new_value: storage_change.new_value.clone(), - start_value: storage_change.old_value.clone(), - }); - } - } - } - // Intialise a new contract change after observing a storage change - Entry::Vacant(e) => { - let mut slots = HashMap::new(); - slots.insert( - storage_change.key.clone(), - SlotValue { - new_value: storage_change.new_value.clone(), - start_value: storage_change.old_value.clone(), - }, - ); - e.insert(InterimContractChange { - address: storage_change.address.clone(), - balance: Vec::new(), - code: Vec::new(), - slots, - change: if created_accounts.contains_key(&storage_change.address) { - tycho::ChangeType::Creation - } else { - tycho::ChangeType::Update - }, - }); - } - } - } - - // Extract balance changes - let mut balance_changes = block_tx - .calls - .iter() - .filter(|call| !call.state_reverted) - .flat_map(|call| { - call.balance_changes - .iter() - .filter(|c| c.address == AMBIENT_CONTRACT) - }) - .collect::>(); - balance_changes.sort_unstable_by_key(|change| change.ordinal); - - for balance_change in balance_changes.iter() { - match changed_contracts.entry(balance_change.address.clone()) { - Entry::Occupied(mut e) => { - let contract_change = e.get_mut(); - if let Some(new_balance) = &balance_change.new_value { - contract_change.balance.clear(); - contract_change - .balance - .extend_from_slice(&new_balance.bytes); - } - } - Entry::Vacant(e) => { - if let Some(new_balance) = &balance_change.new_value { - e.insert(InterimContractChange { - address: balance_change.address.clone(), - balance: new_balance.bytes.clone(), - code: Vec::new(), - slots: HashMap::new(), - change: if created_accounts.contains_key(&balance_change.address) { - tycho::ChangeType::Creation - } else { - tycho::ChangeType::Update - }, - }); - } - } - } - } - - // Extract code changes - let mut code_changes = block_tx - .calls - .iter() - .filter(|call| !call.state_reverted) - .flat_map(|call| { - call.code_changes - .iter() - .filter(|c| c.address == AMBIENT_CONTRACT) - }) - .collect::>(); - code_changes.sort_unstable_by_key(|change| change.ordinal); - - for code_change in code_changes.iter() { - match changed_contracts.entry(code_change.address.clone()) { - Entry::Occupied(mut e) => { - let contract_change = e.get_mut(); - contract_change.code.clear(); - contract_change - .code - .extend_from_slice(&code_change.new_code); - } - Entry::Vacant(e) => { - e.insert(InterimContractChange { - address: code_change.address.clone(), - balance: Vec::new(), - code: code_change.new_code.clone(), - slots: HashMap::new(), - change: if created_accounts.contains_key(&code_change.address) { - tycho::ChangeType::Creation - } else { - tycho::ChangeType::Update - }, - }); - } - } - } - - // If there were any changes, add transaction and push the changes - if !storage_changes.is_empty() || !balance_changes.is_empty() || !code_changes.is_empty() { - tx_change.tx = Some(tycho::Transaction { - hash: block_tx.hash.clone(), - from: block_tx.from.clone(), - to: block_tx.to.clone(), - index: block_tx.index as u64, - }); - - // reuse changed_contracts hash map by draining it, next iteration - // will start empty. This avoids a costly reallocation - for (_, change) in changed_contracts.drain() { - tx_change - .contract_changes - .push(change.into()) - } - - block_changes - .changes - .push(tx_change.clone()); - - // clear out the interim contract changes after we pushed those. - tx_change.tx = None; - tx_change.contract_changes.clear(); - } - } - - block_changes.block = Some(tycho::Block { - number: block.number, - hash: block.hash.clone(), - parent_hash: block - .header - .as_ref() - .expect("Block header not present") - .parent_hash - .clone(), - ts: block.timestamp_seconds(), - }); - - Ok(block_changes) -} +pub use modules::*; +mod modules; +mod utils; diff --git a/substreams/ethereum-ambient/src/modules/1_map_pool_changes.rs b/substreams/ethereum-ambient/src/modules/1_map_pool_changes.rs new file mode 100644 index 0000000..741a162 --- /dev/null +++ b/substreams/ethereum-ambient/src/modules/1_map_pool_changes.rs @@ -0,0 +1,127 @@ +use substreams_ethereum::pb::eth::{self}; + +use crate::{ + contracts::{ + hotproxy::{ + decode_direct_swap_hotproxy_call, AMBIENT_HOTPROXY_CONTRACT, USER_CMD_HOTPROXY_FN_SIG, + }, + knockout::{decode_knockout_call, AMBIENT_KNOCKOUT_CONTRACT, USER_CMD_KNOCKOUT_FN_SIG}, + main::{ + decode_direct_swap_call, decode_pool_init, AMBIENT_CONTRACT, SWAP_FN_SIG, + USER_CMD_FN_SIG, + }, + micropaths::{ + decode_burn_ambient_call, decode_burn_range_call, decode_mint_ambient_call, + decode_mint_range_call, decode_sweep_swap_call, AMBIENT_MICROPATHS_CONTRACT, + BURN_AMBIENT_FN_SIG, BURN_RANGE_FN_SIG, MINT_AMBIENT_FN_SIG, MINT_RANGE_FN_SIG, + SWEEP_SWAP_FN_SIG, + }, + warmpath::{ + decode_warm_path_user_cmd_call, AMBIENT_WARMPATH_CONTRACT, USER_CMD_WARMPATH_FN_SIG, + }, + }, + utils::from_u256_to_vec, +}; +use tycho_substreams::{ + models::{AmbientBalanceDelta, BlockPoolChanges}, + prelude::Transaction, +}; + +#[substreams::handlers::map] +fn map_pool_changes(block: eth::v2::Block) -> Result { + let mut protocol_components = Vec::new(); + let mut balance_deltas = Vec::new(); + for block_tx in block.transactions() { + let tx = Transaction { + hash: block_tx.hash.clone(), + from: block_tx.from.clone(), + to: block_tx.to.clone(), + index: block_tx.index as u64, + }; + // extract storage changes + let mut storage_changes = block_tx + .calls + .iter() + .filter(|call| !call.state_reverted) + .flat_map(|call| { + call.storage_changes + .iter() + .filter(|c| c.address == AMBIENT_CONTRACT) + }) + .collect::>(); + storage_changes.sort_unstable_by_key(|change| change.ordinal); + + let block_calls = block_tx + .calls + .iter() + .filter(|call| !call.state_reverted) + .collect::>(); + + for call in block_calls { + if call.input.len() < 4 { + continue; + } + let selector: [u8; 4] = call.input[0..4].try_into().unwrap(); + let address: [u8; 20] = call.address.clone().try_into().unwrap(); + + if call.address == AMBIENT_CONTRACT && selector == USER_CMD_FN_SIG { + // Extract pool creations + if let Some(protocol_component) = decode_pool_init(call, tx.clone())? { + protocol_components.push(protocol_component); + } + } + + // Extract TVL changes + let result = match (address, selector) { + (AMBIENT_CONTRACT, SWAP_FN_SIG) => Some(decode_direct_swap_call(call)?), + (AMBIENT_HOTPROXY_CONTRACT, USER_CMD_HOTPROXY_FN_SIG) => { + Some(decode_direct_swap_hotproxy_call(call)?) + } + (AMBIENT_MICROPATHS_CONTRACT, SWEEP_SWAP_FN_SIG) => { + Some(decode_sweep_swap_call(call)?) + } + (AMBIENT_WARMPATH_CONTRACT, USER_CMD_WARMPATH_FN_SIG) => { + decode_warm_path_user_cmd_call(call)? + } + (AMBIENT_MICROPATHS_CONTRACT, MINT_RANGE_FN_SIG) => { + Some(decode_mint_range_call(call)?) + } + (AMBIENT_MICROPATHS_CONTRACT, MINT_AMBIENT_FN_SIG) => { + Some(decode_mint_ambient_call(call)?) + } + (AMBIENT_MICROPATHS_CONTRACT, BURN_RANGE_FN_SIG) => { + Some(decode_burn_range_call(call)?) + } + (AMBIENT_MICROPATHS_CONTRACT, BURN_AMBIENT_FN_SIG) => { + Some(decode_burn_ambient_call(call)?) + } + (AMBIENT_KNOCKOUT_CONTRACT, USER_CMD_KNOCKOUT_FN_SIG) => { + Some(decode_knockout_call(call)?) + } + _ => None, + }; + let (pool_hash, base_flow, quote_flow) = match result { + Some((pool_hash, base_flow, quote_flow)) => (pool_hash, base_flow, quote_flow), + None => continue, + }; + let base_balance_delta = AmbientBalanceDelta { + pool_hash: Vec::from(pool_hash), + token_type: "base".to_string(), + token_delta: from_u256_to_vec(base_flow), + ordinal: call.index as u64, + tx: Some(tx.clone()), + }; + let quote_balance_delta = AmbientBalanceDelta { + pool_hash: Vec::from(pool_hash), + token_type: "quote".to_string(), + token_delta: from_u256_to_vec(quote_flow), + ordinal: call.index as u64, + tx: Some(tx.clone()), + }; + balance_deltas.extend([base_balance_delta.clone(), quote_balance_delta.clone()]); + } + } + balance_deltas.sort_by_key(|delta| (delta.ordinal, delta.token_type.clone())); + let pool_changes = BlockPoolChanges { protocol_components, balance_deltas }; + Ok(pool_changes) +} diff --git a/substreams/ethereum-ambient/src/modules/2_store_pool_balances.rs b/substreams/ethereum-ambient/src/modules/2_store_pool_balances.rs new file mode 100644 index 0000000..28fd494 --- /dev/null +++ b/substreams/ethereum-ambient/src/modules/2_store_pool_balances.rs @@ -0,0 +1,18 @@ +use substreams::{ + scalar::BigInt, + store::{StoreAdd, StoreAddBigInt, StoreNew}, +}; +use tycho_substreams::models::BlockPoolChanges; + +#[substreams::handlers::store] +pub fn store_pool_balances(changes: BlockPoolChanges, balance_store: StoreAddBigInt) { + let deltas = changes.balance_deltas.clone(); + for balance_delta in deltas { + let pool_hash_hex = hex::encode(&balance_delta.pool_hash); + balance_store.add( + balance_delta.ordinal, + format!("{}:{}", pool_hash_hex, balance_delta.token_type), + BigInt::from_signed_bytes_be(&balance_delta.token_delta), + ); + } +} diff --git a/substreams/ethereum-ambient/src/modules/2_store_pools.rs b/substreams/ethereum-ambient/src/modules/2_store_pools.rs new file mode 100644 index 0000000..8b30721 --- /dev/null +++ b/substreams/ethereum-ambient/src/modules/2_store_pools.rs @@ -0,0 +1,9 @@ +use substreams::store::{StoreNew, StoreSet, StoreSetProto}; +use tycho_substreams::models::{BlockPoolChanges, ProtocolComponent}; + +#[substreams::handlers::store] +pub fn store_pools(changes: BlockPoolChanges, component_store: StoreSetProto) { + for component in changes.protocol_components { + component_store.set(0, component.id.clone(), &component); + } +} diff --git a/substreams/ethereum-ambient/src/modules/3_map_changes.rs b/substreams/ethereum-ambient/src/modules/3_map_changes.rs new file mode 100644 index 0000000..70c62d1 --- /dev/null +++ b/substreams/ethereum-ambient/src/modules/3_map_changes.rs @@ -0,0 +1,367 @@ +use num_bigint::BigInt; +use std::{ + collections::{hash_map::Entry, HashMap}, + str::FromStr, +}; +use substreams::pb::substreams::StoreDeltas; + +use substreams_ethereum::pb::eth::{self}; + +use crate::contracts::main::AMBIENT_CONTRACT; +use substreams::store::{StoreGet, StoreGetProto}; +use tycho_substreams::prelude::*; + +struct SlotValue { + new_value: Vec, + start_value: Vec, +} + +impl SlotValue { + fn has_changed(&self) -> bool { + self.start_value != self.new_value + } +} + +// uses a map for slots, protobuf does not +// allow bytes in hashmap keys +struct InterimContractChange { + address: Vec, + balance: Vec, + code: Vec, + slots: HashMap, SlotValue>, + change: ChangeType, +} + +impl From for ContractChange { + fn from(value: InterimContractChange) -> Self { + ContractChange { + address: value.address, + balance: value.balance, + code: value.code, + slots: value + .slots + .into_iter() + .filter(|(_, value)| value.has_changed()) + .map(|(slot, value)| ContractSlot { slot, value: value.new_value }) + .collect(), + change: value.change.into(), + } + } +} + +/// Extracts all contract changes relevant to vm simulations +/// +/// This implementation has currently two major limitations: +/// 1. It is hardwired to only care about changes to the ambient main contract, this is ok for this +/// particular use case but for a more general purpose implementation this is not ideal +/// 2. Changes are processed separately, this means that if there are any side effects between each +/// other (e.g. if account is deleted and then created again in ethereum all the storage is set +/// to 0. So there is a side effect between account creation and contract storage.) these might +/// not be properly accounted for. Most of the time this should not be a major issue but may lead +/// to wrong results so consume this implementation with care. See example below for a concrete +/// case where this is problematic. +/// +/// ## A very contrived example: +/// 1. Some existing contract receives a transaction that changes it state, the state is updated +/// 2. Next, this contract has self destruct called on itself +/// 3. The contract is created again using CREATE2 at the same address +/// 4. The contract receives a transaction that changes it state +/// 5. We would emit this as as contract creation with slots set from 1 and from 4, although we +/// should only emit the slots changed from 4. +#[substreams::handlers::map] +fn map_changes( + block: eth::v2::Block, + block_pool_changes: BlockPoolChanges, + balance_store: StoreDeltas, + pool_store: StoreGetProto, +) -> Result { + let mut block_changes = BlockContractChanges::default(); + + let mut tx_change = TransactionContractChanges::default(); + + let mut changed_contracts: HashMap, InterimContractChange> = HashMap::new(); + + let created_accounts: HashMap<_, _> = block + .transactions() + .flat_map(|tx| { + tx.calls.iter().flat_map(|call| { + call.account_creations + .iter() + .map(|ac| (&ac.account, ac.ordinal)) + }) + }) + .collect(); + + for block_tx in block.transactions() { + // extract storage changes + let mut storage_changes = block_tx + .calls + .iter() + .filter(|call| !call.state_reverted) + .flat_map(|call| { + call.storage_changes + .iter() + .filter(|c| c.address == AMBIENT_CONTRACT) + }) + .collect::>(); + storage_changes.sort_unstable_by_key(|change| change.ordinal); + + // Note: some contracts change slot values and change them back to their + // original value before the transactions ends we remember the initial + // value before the first change and in the end filter found deltas + // that ended up not actually changing anything. + for storage_change in storage_changes.iter() { + match changed_contracts.entry(storage_change.address.clone()) { + // We have already an entry recording a change about this contract + // only append the change about this storage slot + Entry::Occupied(mut e) => { + let contract_change = e.get_mut(); + match contract_change + .slots + .entry(storage_change.key.clone()) + { + // The storage slot was already changed before, simply + // update new_value + Entry::Occupied(mut v) => { + let slot_value = v.get_mut(); + slot_value + .new_value + .copy_from_slice(&storage_change.new_value); + } + // The storage slots is being initialised for the first time + Entry::Vacant(v) => { + v.insert(SlotValue { + new_value: storage_change.new_value.clone(), + start_value: storage_change.old_value.clone(), + }); + } + } + } + // Intialise a new contract change after observing a storage change + Entry::Vacant(e) => { + let mut slots = HashMap::new(); + slots.insert( + storage_change.key.clone(), + SlotValue { + new_value: storage_change.new_value.clone(), + start_value: storage_change.old_value.clone(), + }, + ); + e.insert(InterimContractChange { + address: storage_change.address.clone(), + balance: Vec::new(), + code: Vec::new(), + slots, + change: if created_accounts.contains_key(&storage_change.address) { + ChangeType::Creation + } else { + ChangeType::Update + }, + }); + } + } + } + + // extract balance changes + let mut balance_changes = block_tx + .calls + .iter() + .filter(|call| !call.state_reverted) + .flat_map(|call| { + call.balance_changes + .iter() + .filter(|c| c.address == AMBIENT_CONTRACT) + }) + .collect::>(); + balance_changes.sort_unstable_by_key(|change| change.ordinal); + + for balance_change in balance_changes.iter() { + match changed_contracts.entry(balance_change.address.clone()) { + Entry::Occupied(mut e) => { + let contract_change = e.get_mut(); + if let Some(new_balance) = &balance_change.new_value { + contract_change.balance.clear(); + contract_change + .balance + .extend_from_slice(&new_balance.bytes); + } + } + Entry::Vacant(e) => { + if let Some(new_balance) = &balance_change.new_value { + e.insert(InterimContractChange { + address: balance_change.address.clone(), + balance: new_balance.bytes.clone(), + code: Vec::new(), + slots: HashMap::new(), + change: if created_accounts.contains_key(&balance_change.address) { + ChangeType::Creation + } else { + ChangeType::Update + }, + }); + } + } + } + } + + // extract code changes + let mut code_changes = block_tx + .calls + .iter() + .filter(|call| !call.state_reverted) + .flat_map(|call| { + call.code_changes + .iter() + .filter(|c| c.address == AMBIENT_CONTRACT) + }) + .collect::>(); + code_changes.sort_unstable_by_key(|change| change.ordinal); + + for code_change in code_changes.iter() { + match changed_contracts.entry(code_change.address.clone()) { + Entry::Occupied(mut e) => { + let contract_change = e.get_mut(); + contract_change.code.clear(); + contract_change + .code + .extend_from_slice(&code_change.new_code); + } + Entry::Vacant(e) => { + e.insert(InterimContractChange { + address: code_change.address.clone(), + balance: Vec::new(), + code: code_change.new_code.clone(), + slots: HashMap::new(), + change: if created_accounts.contains_key(&code_change.address) { + ChangeType::Creation + } else { + ChangeType::Update + }, + }); + } + } + } + + // if there were any changes, add transaction and push the changes + if !storage_changes.is_empty() || !balance_changes.is_empty() || !code_changes.is_empty() { + tx_change.tx = Some(Transaction { + hash: block_tx.hash.clone(), + from: block_tx.from.clone(), + to: block_tx.to.clone(), + index: block_tx.index as u64, + }); + + // reuse changed_contracts hash map by draining it, next iteration + // will start empty. This avoids a costly reallocation + for (_, change) in changed_contracts.drain() { + tx_change + .contract_changes + .push(change.into()) + } + + block_changes + .changes + .push(tx_change.clone()); + + // clear out the interim contract changes after we pushed those. + tx_change.tx = None; + tx_change.contract_changes.clear(); + } + } + let mut grouped_components = HashMap::new(); + for component in &block_pool_changes.protocol_components { + let tx_hash = component + .tx + .clone() + .expect("Transaction is missing") + .hash; + grouped_components + .entry(tx_hash) + .or_insert_with(Vec::new) + .push(component.clone()); + } + + for (tx_hash, components) in grouped_components { + if let Some(tx_change) = block_changes + .changes + .iter_mut() + // TODO: be better than this (quadratic complexity) + .find(|tx_change| { + tx_change + .tx + .as_ref() + .map_or(false, |tx| tx.hash == tx_hash) + }) + { + tx_change + .component_changes + .extend(components); + } + } + let mut balance_changes = HashMap::new(); + balance_store + .deltas + .into_iter() + .zip(block_pool_changes.balance_deltas) + .for_each(|(store_delta, balance_delta)| { + let pool_hash_hex = hex::encode(balance_delta.pool_hash); + let pool = match pool_store.get_last(pool_hash_hex.clone()) { + Some(pool) => pool, + None => panic!("Pool not found in store for given hash: {}", pool_hash_hex), + }; + let token_type = substreams::key::segment_at(&store_delta.key, 1); + let token_index = if token_type == "quote" { 1 } else { 0 }; + + // store_delta.new_value is an ASCII string representing an integer + let ascii_string = + String::from_utf8(store_delta.new_value.clone()).expect("Invalid UTF-8 sequence"); + let balance = BigInt::from_str(&ascii_string).expect("Failed to parse integer"); + let big_endian_bytes_balance = balance.to_bytes_be().1; + + let balance_change = BalanceChange { + component_id: pool_hash_hex.as_bytes().to_vec(), + token: pool.tokens[token_index].clone(), + balance: big_endian_bytes_balance.to_vec(), + }; + let tx_hash = balance_delta + .tx + .expect("Transaction is missing") + .hash; + balance_changes + .entry(tx_hash) + .or_insert_with(Vec::new) + .push(balance_change); + }); + + for (tx_hash, grouped_balance_changes) in balance_changes { + if let Some(tx_change) = block_changes + .changes + .iter_mut() + // TODO: be better than this (quadratic complexity) + .find(|tx_change| { + tx_change + .tx + .as_ref() + .map_or(false, |tx| tx.hash == tx_hash) + }) + { + tx_change + .balance_changes + .extend(grouped_balance_changes); + } + } + + block_changes.block = Some(Block { + number: block.number, + hash: block.hash.clone(), + parent_hash: block + .header + .as_ref() + .expect("Block header not present") + .parent_hash + .clone(), + ts: block.timestamp_seconds(), + }); + + Ok(block_changes) +} diff --git a/substreams/ethereum-ambient/src/modules/mod.rs b/substreams/ethereum-ambient/src/modules/mod.rs new file mode 100644 index 0000000..0c5d9d5 --- /dev/null +++ b/substreams/ethereum-ambient/src/modules/mod.rs @@ -0,0 +1,16 @@ +pub use map_changes::map_changes; +pub use map_pool_changes::map_pool_changes; +pub use store_pool_balances::store_pool_balances; +pub use store_pools::store_pools; + +#[path = "1_map_pool_changes.rs"] +mod map_pool_changes; + +#[path = "2_store_pools.rs"] +mod store_pools; + +#[path = "2_store_pool_balances.rs"] +mod store_pool_balances; + +#[path = "3_map_changes.rs"] +mod map_changes; diff --git a/substreams/ethereum-ambient/src/pb/mod.rs b/substreams/ethereum-ambient/src/pb/mod.rs deleted file mode 100644 index 43d8838..0000000 --- a/substreams/ethereum-ambient/src/pb/mod.rs +++ /dev/null @@ -1,10 +0,0 @@ -// @generated -pub mod tycho { - pub mod evm { - // @@protoc_insertion_point(attribute:tycho.evm.v1) - pub mod v1 { - include!("tycho.evm.v1.rs"); - // @@protoc_insertion_point(tycho.evm.v1) - } - } -} diff --git a/substreams/ethereum-ambient/src/pb/tycho.evm.v1.rs b/substreams/ethereum-ambient/src/pb/tycho.evm.v1.rs deleted file mode 100644 index b59b5f7..0000000 --- a/substreams/ethereum-ambient/src/pb/tycho.evm.v1.rs +++ /dev/null @@ -1,183 +0,0 @@ -// @generated -// This file contains the proto definitions for Substreams common to all integrations. - -/// A struct describing a block. -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Block { - /// The blocks hash. - #[prost(bytes="vec", tag="1")] - pub hash: ::prost::alloc::vec::Vec, - /// The parent blocks hash. - #[prost(bytes="vec", tag="2")] - pub parent_hash: ::prost::alloc::vec::Vec, - /// The block number. - #[prost(uint64, tag="3")] - pub number: u64, - /// The block timestamp. - #[prost(uint64, tag="4")] - pub ts: u64, -} -/// A struct describing a transaction. -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Transaction { - /// The transaction hash. - #[prost(bytes="vec", tag="1")] - pub hash: ::prost::alloc::vec::Vec, - /// The sender of the transaction. - #[prost(bytes="vec", tag="2")] - pub from: ::prost::alloc::vec::Vec, - /// The receiver of the transaction. - #[prost(bytes="vec", tag="3")] - pub to: ::prost::alloc::vec::Vec, - /// The transactions index within the block. - #[prost(uint64, tag="4")] - pub index: u64, -} -/// A custom struct representing an arbitrary attribute of a protocol component. -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Attribute { - /// The name of the attribute. - #[prost(string, tag="1")] - pub name: ::prost::alloc::string::String, - /// The value of the attribute. - #[prost(bytes="vec", tag="2")] - pub value: ::prost::alloc::vec::Vec, - /// The type of change the attribute underwent. - #[prost(enumeration="ChangeType", tag="3")] - pub change: i32, -} -/// A struct describing a part of the protocol. -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ProtocolComponent { - /// A unique identifier for the component within the protocol. - /// Can be a stringified address or a string describing the trading pair. - #[prost(string, tag="1")] - pub id: ::prost::alloc::string::String, - /// Addresses of the ERC20 tokens used by the component. - #[prost(bytes="vec", repeated, tag="2")] - pub tokens: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, - /// Addresses of the contracts used by the component. - #[prost(bytes="vec", repeated, tag="3")] - pub contracts: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, - /// Attributes of the component. - /// The inner ChangeType of the attribute has to match the ChangeType of the ProtocolComponent. - #[prost(message, repeated, tag="4")] - pub static_att: ::prost::alloc::vec::Vec, - /// Type of change the component underwent. - #[prost(enumeration="ChangeType", tag="5")] - pub change: i32, -} -/// A struct for following the changes of Total Value Locked (TVL) of a protocol component. -/// Note that if the ProtocolComponent contains multiple contracts, the TVL is tracked for the component as a whole. -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct BalanceChange { - /// The address of the ERC20 token whose balance changed. - #[prost(bytes="vec", tag="1")] - pub token: ::prost::alloc::vec::Vec, - /// The new balance of the token. - #[prost(bytes="vec", tag="2")] - pub balance: ::prost::alloc::vec::Vec, - /// The id of the component whose TVL is tracked. - #[prost(bytes="vec", tag="3")] - pub component_id: ::prost::alloc::vec::Vec, -} -/// Enum to specify the type of a change. -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] -#[repr(i32)] -pub enum ChangeType { - Unspecified = 0, - Update = 1, - Creation = 2, - Deletion = 3, -} -impl ChangeType { - /// String value of the enum field names used in the ProtoBuf definition. - /// - /// The values are not transformed in any way and thus are considered stable - /// (if the ProtoBuf definition does not change) and safe for programmatic use. - pub fn as_str_name(&self) -> &'static str { - match self { - ChangeType::Unspecified => "CHANGE_TYPE_UNSPECIFIED", - ChangeType::Update => "CHANGE_TYPE_UPDATE", - ChangeType::Creation => "CHANGE_TYPE_CREATION", - ChangeType::Deletion => "CHANGE_TYPE_DELETION", - } - } - /// Creates an enum from field names used in the ProtoBuf definition. - pub fn from_str_name(value: &str) -> ::core::option::Option { - match value { - "CHANGE_TYPE_UNSPECIFIED" => Some(Self::Unspecified), - "CHANGE_TYPE_UPDATE" => Some(Self::Update), - "CHANGE_TYPE_CREATION" => Some(Self::Creation), - "CHANGE_TYPE_DELETION" => Some(Self::Deletion), - _ => None, - } - } -} -// This file contains proto definitions specific to the VM integration. - -/// A key value entry into contract storage. -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ContractSlot { - /// A contract's storage slot. - #[prost(bytes="vec", tag="2")] - pub slot: ::prost::alloc::vec::Vec, - /// The new value for this storage slot. - #[prost(bytes="vec", tag="3")] - pub value: ::prost::alloc::vec::Vec, -} -/// Changes made to a single contract's state. -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ContractChange { - /// The contract's address - #[prost(bytes="vec", tag="1")] - pub address: ::prost::alloc::vec::Vec, - /// The new balance of the contract, empty bytes indicates no change. - #[prost(bytes="vec", tag="2")] - pub balance: ::prost::alloc::vec::Vec, - /// The new code of the contract, empty bytes indicates no change. - #[prost(bytes="vec", tag="3")] - pub code: ::prost::alloc::vec::Vec, - /// The changes to this contract's slots, empty sequence indicates no change. - #[prost(message, repeated, tag="4")] - pub slots: ::prost::alloc::vec::Vec, - /// Whether this is an update, a creation or a deletion. - #[prost(enumeration="ChangeType", tag="5")] - pub change: i32, -} -/// A set of changes aggregated by transaction. -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct TransactionContractChanges { - /// The transaction instance that results in the changes. - #[prost(message, optional, tag="1")] - pub tx: ::core::option::Option, - /// Contains the changes induced by the above transaction, aggregated on a per-contract basis. - #[prost(message, repeated, tag="2")] - pub contract_changes: ::prost::alloc::vec::Vec, - /// An array of newly added components. - #[prost(message, repeated, tag="3")] - pub component_changes: ::prost::alloc::vec::Vec, - /// An array of balance changes to components. - #[prost(message, repeated, tag="4")] - pub balance_changes: ::prost::alloc::vec::Vec, -} -/// A set of transaction changes within a single block. -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct BlockContractChanges { - /// The block for which these changes are collectively computed. - #[prost(message, optional, tag="1")] - pub block: ::core::option::Option, - /// The set of transaction changes observed in the specified block. - #[prost(message, repeated, tag="2")] - pub changes: ::prost::alloc::vec::Vec, -} -// @@protoc_insertion_point(module) diff --git a/substreams/ethereum-ambient/src/utils.rs b/substreams/ethereum-ambient/src/utils.rs new file mode 100644 index 0000000..67e072a --- /dev/null +++ b/substreams/ethereum-ambient/src/utils.rs @@ -0,0 +1,55 @@ +use anyhow::{anyhow, bail}; +use ethabi::{decode, ethereum_types::U256, ParamType, Token, Uint}; +use substreams_ethereum::pb::eth::v2::Call; +use tiny_keccak::{Hasher, Keccak}; + +pub fn encode_pool_hash(token_x: Vec, token_y: Vec, pool_idx: Vec) -> [u8; 32] { + let base_address = ethabi::Address::from_slice(&token_x); + let quote_address = ethabi::Address::from_slice(&token_y); + let pool_idx_uint = Uint::from_big_endian(&pool_idx); + + let encoded = ethabi::encode(&[ + Token::Address(base_address), + Token::Address(quote_address), + Token::Uint(pool_idx_uint), + ]); + + let mut hasher = Keccak::v256(); + hasher.update(&encoded); + let mut output = [0u8; 32]; + hasher.finalize(&mut output); + + output +} + +pub fn decode_flows_from_output(call: &Call) -> Result<(ethabi::Int, ethabi::Int), anyhow::Error> { + if let Ok(external_outputs) = decode(BASE_QUOTE_FLOW_OUTPUT, &call.return_data) { + let base_flow = external_outputs[0] + .to_owned() + .into_int() // Needs conversion into bytes for next step + .ok_or_else(|| anyhow!("Failed to convert base flow to i128".to_string()))?; + + let quote_flow = external_outputs[1] + .to_owned() + .into_int() // Needs conversion into bytes for next step + .ok_or_else(|| anyhow!("Failed to convert quote flow to i128".to_string()))?; + Ok((base_flow, quote_flow)) + } else { + bail!("Failed to decode swap call outputs.".to_string()); + } +} + +const BASE_QUOTE_FLOW_OUTPUT: &[ParamType] = &[ + // The token base and quote token flows associated with this swap action. + // Negative indicates a credit paid to the user (token balance of pool + // decreases), positive a debit collected from the user (token balance of pool + // increases). + ParamType::Int(128), // baseFlow + ParamType::Int(128), // quoteFlow +]; + +pub fn from_u256_to_vec(src: U256) -> Vec { + let mut buf = [0u8; 32]; + src.to_big_endian(&mut buf); + buf.to_vec() +} diff --git a/substreams/ethereum-ambient/substreams.yaml b/substreams/ethereum-ambient/substreams.yaml index 8007c2d..3541139 100644 --- a/substreams/ethereum-ambient/substreams.yaml +++ b/substreams/ethereum-ambient/substreams.yaml @@ -1,14 +1,13 @@ specVersion: v0.1.0 package: name: "substreams_ethereum_ambient" - version: v0.3.0 + version: v0.5.0 protobuf: files: - - vm.proto - - common.proto + - ambient.proto importPaths: - - ../../proto/tycho/evm/v1/ + - ./proto binaries: default: @@ -16,9 +15,35 @@ binaries: file: ../../target/wasm32-unknown-unknown/substreams/substreams_ethereum_ambient.wasm modules: - - name: map_changes + - name: map_pool_changes kind: map + initialBlock: 17361664 inputs: - source: sf.ethereum.type.v2.Block output: - type: proto:tycho.evm.state.v1.BlockContractChanges + type: proto:tycho.evm.v1.BlockPoolChanges + - name: store_pools + kind: store + initialBlock: 17361664 + updatePolicy: set + valueType: proto:tycho.evm.v1.ProtocolComponent + inputs: + - map: map_pool_changes + - name: store_pool_balances + kind: store + initialBlock: 17361664 + updatePolicy: add + valueType: bigint + inputs: + - map: map_pool_changes + - name: map_changes + kind: map + initialBlock: 17361664 + inputs: + - source: sf.ethereum.type.v2.Block + - map: map_pool_changes + - store: store_pool_balances + mode: deltas + - store: store_pools + output: + type: proto:tycho.evm.v1.BlockContractChanges