Merge pull request #3 from propeller-heads/mp/ENG-2443-Move-Ambient-substreams-implementation-into-protocol-lib-repo

Substreams integration docs
This commit is contained in:
pistomat
2023-12-21 18:51:16 +01:00
committed by GitHub
36 changed files with 4270 additions and 9 deletions

12
.gitignore vendored
View File

@@ -1 +1,13 @@
### Rust ###
# Generated by Cargo
# will have compiled files and executables
debug/
target/
# Substreams spkg files are build artifacts
*.spkg
.env
.vscode
.idea
*.log

5
README.md Normal file
View File

@@ -0,0 +1,5 @@
# Propeller Protocol Lib
Protocol lib is a library used by Propellerheads.xyz solvers to integrate decentralized protocols. Currently, only swap/exchange protocols are supported.
Please refer to the [README.md](docs/README.md) for more information.

View File

@@ -22,7 +22,7 @@ While VM integration is certainly the quickest and probably most accessible one
### Indexing
For indexing purposes, it is required that you provide a [substreams](https://thegraph.com/docs/en/substreams/) package that emits a specified set of messages. If your protocol already has a [substreams](https://thegraph.com/docs/en/substreams/) package for indexing implemented, you can adjust it to emit the required messages.
_Specifications coming soon._
For indexing purposes, it is required that you provide a [substreams](https://substreams.streamingfast.io/) package that emits a specified set of messages. If your protocol already has a [substreams package](https://github.com/messari/substreams) package for indexing implemented, you can adjust it to emit the required messages.
**VM Integration** Currently the only supported integration is for EVM protocols in order to complement the Solidity protocol logic. **[Read more here.](indexing/vm-integration/README.md)**
**Custom Entity Integration** Coming soon, this integration will complement the upcoming native Rust protocol logic.

View File

@@ -11,4 +11,4 @@
## Indexing
* [Substreams Integration](indexing/substreams-integration/README.md)
* [Tutorial: UniswapV2](indexing/substreams-integration/tutorial-uniswapv2.md)
* [Tutorial: Ambient](indexing/substreams-integration/tutorial-ambient.md)

View File

@@ -1,3 +1,97 @@
# Substreams Integration
Coming Soon
## Example
We have integrated the **Ambient** protocol as a reference, see `/substreams/ethereum-ambient` for more information.
## Step by step
1. Install [Rust](https://www.rust-lang.org/tools/install), you can do so with the following command:
```bash
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
```
1. Install [Substreams CLI](https://substreams.streamingfast.io/getting-started/installing-the-cli), you can either use brew:
```bash
brew install streamingfast/tap/substreams
```
use precompiled binaries
```bash
# Use correct binary for your platform
LINK=$(curl -s https://api.github.com/repos/streamingfast/substreams/releases/latest | awk '/download.url.*linux/ {print $2}' | sed 's/"//g')
curl -L $LINK | tar zxf -
```
or compile from source:
```bash
git clone https://github.com/streamingfast/substreams
cd substreams
go install -v ./cmd/substreams
```
1. Start by making a local copy of the Propeller Protocol Lib repository:
```bash
git clone https://github.com/propeller-heads/propeller-protocol-lib
```
## Understanding the Substreams integration
Substreams is a new indexing technology, which uses Rust modules to compose raw blockchain data streams into higher level data streams, in out case specific to the protocol. These modules together with the protobuf definitions and manifest are then wrapped into SPKG packages (more info [here](https://substreams.streamingfast.io/quick-access/glossary#spkg-.spkg)) that is then run remotely on the Substreams server.
For more information, read the [quick explanation of Substreams](https://thegraph.com/docs/en/substreams/) or jump into the [Substreams documentation](https://substreams.streamingfast.io/). It describes the functions that need to be implemented as well as the manifest file.
### ProtoBuf files
Generally these describe the raw blockchain data that we get on the input stream and the output data that we want to produce using the Rust module.
If you are unfamiliar with ProtoBuf at all, you can start with the [official documentation](https://protobuf.dev/overview/).
First get familiar with the raw ProtoBuf definitions provided by us:
- [common.proto](../../../proto/tycho/evm/v1/common.proto) - Common types used by all integration types
- [vm.proto](../../../proto/tycho/evm/v1/vm.proto) - Types specific to the VM integration
You can also create your own intermediate ProtoBufs. These files should reside in your own substreams package, e.g. `./substreams/ethereum-template/proto/custom-messages.proto`. You have to link these files in the `substreams.yaml` file, see the [manifest docs](https://substreams.streamingfast.io/developers-guide/creating-your-manifest) for more information or you can look at the official substreams example integration of [UniswapV2](https://github.com/messari/substreams/blob/master/uniswap-v2/substreams.yaml#L20-L22).
*Note: Internally we are referring to the substreams integration as `Tycho`, which is why our protobuf files are under the `proto/tycho` directory.*
### Rust module
The goal of the rust module is to implement the logic that will transform the raw blockchain data into the desired output data.
*This is the actual integration code that you will be writing!*
The module is a Rust library that is compiled into a SPKG (`.spkg`) file using the Substreams CLI and then loaded by the Substreams server. It is defined by the `lib.rs` file (see the [Ambient reference example](../../../substreams/ethereum-ambient/src/lib.rs)).
Read our [Substreams README.md](../../../substreams/README.md) for more information on how to write the Rust module.
### How to implement the integration
1. Create a new directory for your integration by cloning the template, rename all the references to `ethereum-template` to `[CHAIN]-[PROTOCOL_SYSTEM]`:
```bash
cp -r ./substreams/ethereum-template ./substreams/[CHAIN]-[PROTOCOL_SYSTEM]
```
1. Implement the logic in the Rust module `lib.rs`. The main function to implement is the `map_changes` function, which is called for every block.
```rust
#[substreams::handlers::map]
fn map_changes(
block: eth::v2::Block,
) -> Result<tycho::BlockContractChanges, substreams::errors::Error> {}
```
The `map_changes` function takes a raw block as input and returns a `BlockContractChanges` struct, which is derived from the `BlockContractChanges` protobuf message in [vm.proto](../../../proto/tycho/evm/v1/vm.proto).
1. The `BlockContractChanges` is a list of `TransactionContractChanges`, which includes these main fields:
- list of `ContractChange` - All storage slots that have changed in the transaction for every contract tracked by any ProtocolComponent
- list of `ProtocolComponent` - All the protocol component changes in the transaction
- list of `BalanceChange` - All the contract component changes in the transaction
See the [Ambient reference example](../../../substreams/ethereum-ambient/src/lib.rs) for more information.
1. If you are more advanced with Substreams, you can define more steps than a single "map" step, including defining your own protobuf files. Add these protobuf files in your `pb` folder and update the manifest accordingly. This allows for better parallelization of the indexing process. See the official documentation of [modules](https://substreams.streamingfast.io/concepts-and-fundamentals/modules#modules-basics-overview).
### Testing
Read the [Substreams testing docs](../../../substreams/README.md#testing-your-implementation) for more information on how to test your integration.

View File

@@ -1,2 +0,0 @@
# Tutorial: UniswapV2

View File

@@ -24,12 +24,12 @@ Following exchanges have been integrated using VM approach:
foundryup
```
2. Start by making a local copy of the Propeller Protocol Lib repository:
1. Start by making a local copy of the Propeller Protocol Lib repository:
```bash
git clone https://github.com/propeller-heads/propeller-protocol-lib
```
3. Install forge dependencies:
1. Install forge dependencies:
```bash
cd ./propeller-protocol-lib/evm/
forge install

2
proto/buf.lock Normal file
View File

@@ -0,0 +1,2 @@
# Generated by buf. DO NOT EDIT.
version: v1

7
proto/buf.yaml Normal file
View File

@@ -0,0 +1,7 @@
version: v1
breaking:
use:
- FILE
lint:
use:
- BASIC

View File

@@ -0,0 +1,36 @@
syntax = "proto3";
package sf.substreams.internal.v2;
import "google/protobuf/any.proto";
option go_package = "github.com/streamingfast/substreams/pb/sf/substreams/internal/v2;pbssinternal";
message StoreDeltas {
repeated StoreDelta store_deltas = 1;
}
message StoreDelta {
enum Operation {
UNSET = 0;
CREATE = 1;
UPDATE = 2;
DELETE = 3;
}
Operation operation = 1;
uint64 ordinal = 2;
string key = 3;
bytes old_value = 4;
bytes new_value = 5;
}
message ModuleOutput {
string module_name = 1;
oneof data {
google.protobuf.Any map_output = 2;
StoreDeltas store_deltas = 3;
}
repeated string logs = 4;
bool debug_logs_truncated = 5;
bool cached = 6;
}

View File

@@ -0,0 +1,88 @@
syntax = "proto3";
package sf.substreams.internal.v2;
import "sf/substreams/v1/modules.proto";
option go_package = "github.com/streamingfast/substreams/pb/sf/substreams/internal/v2;pbssinternal";
service Substreams {
rpc ProcessRange(ProcessRangeRequest) returns (stream ProcessRangeResponse);
}
message ProcessRangeRequest {
uint64 start_block_num = 1;
uint64 stop_block_num = 2;
string output_module = 3;
sf.substreams.v1.Modules modules = 4;
uint32 stage = 5; // 0-based index of stage to execute up to
}
message ProcessRangeResponse {
reserved 1; // previously string module_name = 1;
reserved 2; // previously in oneof(type): BlockRange processed_range
reserved 3; // previously in oneof(type): ProcessedBytes processed_bytes
oneof type {
Failed failed = 4;
Completed completed = 5;
Update update = 6;
}
}
message Update {
uint64 duration_ms = 1;
uint64 processed_blocks = 2;
uint64 total_bytes_read = 3;
uint64 total_bytes_written = 4;
repeated ModuleStats modules_stats = 5;
}
message ModuleStats {
string name = 1;
uint64 processing_time_ms = 2;
uint64 store_operation_time_ms = 3;
uint64 store_read_count = 4;
repeated ExternalCallMetric external_call_metrics = 5;
// store-specific (will be 0 on mappers)
uint64 store_write_count = 10;
uint64 store_deleteprefix_count = 11;
uint64 store_size_bytes = 12;
}
message ExternalCallMetric {
string name = 1;
uint64 count = 2;
uint64 time_ms = 3;
}
message Completed {
repeated BlockRange all_processed_ranges = 1;
// TraceId represents the producer's trace id that produced the partial files.
// This is present here so that the consumer can use it to identify the
// right partial files that needs to be squashed together.
//
// The TraceId can be empty in which case it should be assumed by the tier1
// consuming this message that the tier2 that produced those partial files
// is not yet updated to produce a trace id and a such, the tier1 should
// generate a legacy partial file name.
string trace_id = 2;
}
message Failed {
string reason = 1;
repeated string logs = 2;
// FailureLogsTruncated is a flag that tells you if you received all the logs or if they
// were truncated because you logged too much (fixed limit currently is set to 128 KiB).
bool logs_truncated = 3;
}
message BlockRange {
uint64 start_block = 2;
uint64 end_block = 3;
}

View File

@@ -0,0 +1,20 @@
syntax = "proto3";
package sf.substreams;
import "google/protobuf/descriptor.proto";
option go_package = "github.com/streamingfast/substreams/pb/sf/substreams;pbsubstreams";
message FieldOptions {
// this option informs the `substreams pack` command that it should treat the corresponding manifest value as a path to a file, putting its content as bytes in this field.
// must be applied to a `bytes` or `string` field
bool load_from_file = 1;
// this option informs the `substreams pack` command that it should treat the corresponding manifest value as a path to a folder, zipping its content and putting the zip content as bytes in this field.
// must be applied to a `bytes` field
bool zip_from_folder = 2;
}
extend google.protobuf.FieldOptions {
optional FieldOptions options = 2200;
}

View File

@@ -0,0 +1,235 @@
syntax = "proto3";
package sf.substreams.rpc.v2;
import "google/protobuf/any.proto";
import "sf/substreams/v1/clock.proto";
import "sf/substreams/v1/modules.proto";
option go_package = "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2;pbsubstreamsrpc";
service Stream {
rpc Blocks(Request) returns (stream Response);
}
message Request {
int64 start_block_num = 1;
string start_cursor = 2;
uint64 stop_block_num = 3;
// With final_block_only, you only receive blocks that are irreversible:
// 'final_block_height' will be equal to current block and no 'undo_signal' will ever be sent
bool final_blocks_only = 4;
// Substreams has two mode when executing your module(s) either development mode or production
// mode. Development and production modes impact the execution of Substreams, important aspects
// of execution include:
// * The time required to reach the first byte.
// * The speed that large ranges get executed.
// * The module logs and outputs sent back to the client.
//
// By default, the engine runs in developer mode, with richer and deeper output. Differences
// between production and development modes include:
// * Forward parallel execution is enabled in production mode and disabled in development mode
// * The time required to reach the first byte in development mode is faster than in production mode.
//
// Specific attributes of development mode include:
// * The client will receive all of the executed module's logs.
// * It's possible to request specific store snapshots in the execution tree (via `debug_initial_store_snapshot_for_modules`).
// * Multiple module's output is possible.
//
// With production mode`, however, you trade off functionality for high speed enabling forward
// parallel execution of module ahead of time.
bool production_mode = 5;
string output_module = 6;
sf.substreams.v1.Modules modules = 7;
// Available only in developer mode
repeated string debug_initial_store_snapshot_for_modules = 10;
}
message Response {
oneof message {
SessionInit session = 1; // Always sent first
ModulesProgress progress = 2; // Progress of data preparation, before sending in the stream of `data` events.
BlockScopedData block_scoped_data = 3;
BlockUndoSignal block_undo_signal = 4;
Error fatal_error = 5;
// Available only in developer mode, and only if `debug_initial_store_snapshot_for_modules` is set.
InitialSnapshotData debug_snapshot_data = 10;
// Available only in developer mode, and only if `debug_initial_store_snapshot_for_modules` is set.
InitialSnapshotComplete debug_snapshot_complete = 11;
}
}
// BlockUndoSignal informs you that every bit of data
// with a block number above 'last_valid_block' has been reverted
// on-chain. Delete that data and restart from 'last_valid_cursor'
message BlockUndoSignal {
sf.substreams.v1.BlockRef last_valid_block = 1;
string last_valid_cursor = 2;
}
message BlockScopedData {
MapModuleOutput output = 1;
sf.substreams.v1.Clock clock = 2;
string cursor = 3;
// Non-deterministic, allows substreams-sink to let go of their undo data.
uint64 final_block_height = 4;
repeated MapModuleOutput debug_map_outputs = 10;
repeated StoreModuleOutput debug_store_outputs = 11;
}
message SessionInit {
string trace_id = 1;
uint64 resolved_start_block = 2;
uint64 linear_handoff_block = 3;
uint64 max_parallel_workers = 4;
}
message InitialSnapshotComplete {
string cursor = 1;
}
message InitialSnapshotData {
string module_name = 1;
repeated StoreDelta deltas = 2;
uint64 sent_keys = 4;
uint64 total_keys = 3;
}
message MapModuleOutput {
string name = 1;
google.protobuf.Any map_output = 2;
// DebugOutputInfo is available in non-production mode only
OutputDebugInfo debug_info = 10;
}
// StoreModuleOutput are produced for store modules in development mode.
// It is not possible to retrieve store models in production, with parallelization
// enabled. If you need the deltas directly, write a pass through mapper module
// that will get them down to you.
message StoreModuleOutput {
string name = 1;
repeated StoreDelta debug_store_deltas = 2;
OutputDebugInfo debug_info = 10;
}
message OutputDebugInfo {
repeated string logs = 1;
// LogsTruncated is a flag that tells you if you received all the logs or if they
// were truncated because you logged too much (fixed limit currently is set to 128 KiB).
bool logs_truncated = 2;
bool cached = 3;
}
// ModulesProgress is a message that is sent every 500ms
message ModulesProgress {
// previously: repeated ModuleProgress modules = 1;
// these previous `modules` messages were sent in bursts and are not sent anymore.
reserved 1;
// List of jobs running on tier2 servers
repeated Job running_jobs = 2;
// Execution statistics for each module
repeated ModuleStats modules_stats = 3;
// Stages definition and completed block ranges
repeated Stage stages = 4;
ProcessedBytes processed_bytes = 5;
}
message ProcessedBytes {
uint64 total_bytes_read = 1;
uint64 total_bytes_written = 2;
}
message Error {
string module = 1;
string reason = 2;
repeated string logs = 3;
// FailureLogsTruncated is a flag that tells you if you received all the logs or if they
// were truncated because you logged too much (fixed limit currently is set to 128 KiB).
bool logs_truncated = 4;
}
message Job {
uint32 stage = 1;
uint64 start_block = 2;
uint64 stop_block = 3;
uint64 processed_blocks = 4;
uint64 duration_ms = 5;
}
message Stage {
repeated string modules = 1;
repeated BlockRange completed_ranges = 2;
}
// ModuleStats gathers metrics and statistics from each module, running on tier1 or tier2
// All the 'count' and 'time_ms' values may include duplicate for each stage going over that module
message ModuleStats {
// name of the module
string name = 1;
// total_processed_blocks is the sum of blocks sent to that module code
uint64 total_processed_block_count = 2;
// total_processing_time_ms is the sum of all time spent running that module code
uint64 total_processing_time_ms = 3;
//// external_calls are chain-specific intrinsics, like "Ethereum RPC calls".
repeated ExternalCallMetric external_call_metrics = 4;
// total_store_operation_time_ms is the sum of all time spent running that module code waiting for a store operation (ex: read, write, delete...)
uint64 total_store_operation_time_ms = 5;
// total_store_read_count is the sum of all the store Read operations called from that module code
uint64 total_store_read_count = 6;
// total_store_write_count is the sum of all store Write operations called from that module code (store-only)
uint64 total_store_write_count = 10;
// total_store_deleteprefix_count is the sum of all store DeletePrefix operations called from that module code (store-only)
// note that DeletePrefix can be a costly operation on large stores
uint64 total_store_deleteprefix_count = 11;
// store_size_bytes is the uncompressed size of the full KV store for that module, from the last 'merge' operation (store-only)
uint64 store_size_bytes = 12;
// total_store_merging_time_ms is the time spent merging partial stores into a full KV store for that module (store-only)
uint64 total_store_merging_time_ms = 13;
// store_currently_merging is true if there is a merging operation (partial store to full KV store) on the way.
bool store_currently_merging = 14;
// highest_contiguous_block is the highest block in the highest merged full KV store of that module (store-only)
uint64 highest_contiguous_block = 15;
}
message ExternalCallMetric {
string name = 1;
uint64 count = 2;
uint64 time_ms = 3;
}
message StoreDelta {
enum Operation {
UNSET = 0;
CREATE = 1;
UPDATE = 2;
DELETE = 3;
}
Operation operation = 1;
uint64 ordinal = 2;
string key = 3;
bytes old_value = 4;
bytes new_value = 5;
}
message BlockRange {
uint64 start_block = 2;
uint64 end_block = 3;
}

View File

@@ -0,0 +1,142 @@
syntax = "proto3";
package sf.substreams.sink.service.v1;
import "sf/substreams/v1/package.proto";
option go_package = "github.com/streamingfast/substreams/pb/sf/substreams/sink/service/v1;pbsinksvc";
service Provider {
rpc Deploy(DeployRequest) returns (DeployResponse);
rpc Update(UpdateRequest) returns (UpdateResponse);
rpc Info(InfoRequest) returns (InfoResponse);
rpc List(ListRequest) returns (ListResponse);
rpc Pause(PauseRequest) returns (PauseResponse);
rpc Stop(StopRequest) returns (StopResponse);
rpc Resume(ResumeRequest) returns (ResumeResponse);
rpc Remove(RemoveRequest) returns (RemoveResponse);
}
message DeployRequest {
sf.substreams.v1.Package substreams_package = 1;
bool development_mode = 2;
repeated Parameter parameters = 3;
}
message Parameter {
string key = 1;
string value = 2;
}
message DeployResponse {
DeploymentStatus status = 1;
// deployment_id is a short name (max 8 characters) that uniquely identifies your deployment
string deployment_id = 2;
map<string, string> services = 3;
string reason = 4;
string motd = 5;
}
message UpdateRequest {
sf.substreams.v1.Package substreams_package = 1;
string deployment_id = 2;
bool reset = 3;
}
message UpdateResponse {
DeploymentStatus status = 1;
map<string, string> services = 2;
string reason = 3;
string motd = 4;
}
message InfoRequest {
string deployment_id = 1;
}
message InfoResponse {
DeploymentStatus status = 1;
map<string, string> services = 2;
string reason = 3;
PackageInfo package_info = 4;
SinkProgress progress = 5;
string motd = 6;
}
message SinkProgress {
uint64 last_processed_block = 1;
}
message PackageInfo {
string name = 1;
string version = 2;
string output_module_name = 3;
string output_module_hash = 4;
}
message ListRequest {}
message ListResponse {
repeated DeploymentWithStatus deployments = 1;
}
message DeploymentWithStatus {
string id = 1;
DeploymentStatus status = 2;
string reason = 3;
PackageInfo package_info = 4;
SinkProgress progress = 5;
string motd = 6;
}
enum DeploymentStatus {
UNKNOWN = 0;
RUNNING = 1;
FAILING = 2;
PAUSED = 3;
STOPPED = 4;
STARTING = 5;
PAUSING = 6;
STOPPING = 7;
REMOVING = 8;
RESUMING = 9;
}
message RemoveRequest {
string deployment_id = 1;
}
message RemoveResponse {
DeploymentStatus previous_status = 1;
}
message PauseRequest {
string deployment_id = 1;
}
message PauseResponse {
DeploymentStatus previous_status = 1;
DeploymentStatus new_status = 2;
}
message StopRequest {
string deployment_id = 1;
}
message StopResponse {
DeploymentStatus previous_status = 1;
DeploymentStatus new_status = 2;
}
message ResumeRequest {
string deployment_id = 1;
}
message ResumeResponse {
DeploymentStatus previous_status = 1;
DeploymentStatus new_status = 2;
}

View File

@@ -0,0 +1,20 @@
syntax = "proto3";
package sf.substreams.v1;
import "google/protobuf/timestamp.proto";
option go_package = "github.com/streamingfast/substreams/pb/sf/substreams/v1;pbsubstreams";
// Clock is a pointer to a block with added timestamp
message Clock {
string id = 1;
uint64 number = 2;
google.protobuf.Timestamp timestamp = 3;
}
// BlockRef is a pointer to a block to which we don't know the timestamp
message BlockRef {
string id = 1;
uint64 number = 2;
}

View File

@@ -0,0 +1,98 @@
syntax = "proto3";
package sf.substreams.v1;
option go_package = "github.com/streamingfast/substreams/pb/sf/substreams/v1;pbsubstreams";
message Modules {
repeated Module modules = 1;
repeated Binary binaries = 2;
}
// Binary represents some code compiled to its binary form.
message Binary {
string type = 1;
bytes content = 2;
}
message Module {
string name = 1;
oneof kind {
KindMap kind_map = 2;
KindStore kind_store = 3;
}
uint32 binary_index = 4;
string binary_entrypoint = 5;
repeated Input inputs = 6;
Output output = 7;
uint64 initial_block = 8;
message KindMap {
string output_type = 1;
}
message KindStore {
// The `update_policy` determines the functions available to mutate the store
// (like `set()`, `set_if_not_exists()` or `sum()`, etc..) in
// order to ensure that parallel operations are possible and deterministic
//
// Say a store cumulates keys from block 0 to 1M, and a second store
// cumulates keys from block 1M to 2M. When we want to use this
// store as a dependency for a downstream module, we will merge the
// two stores according to this policy.
UpdatePolicy update_policy = 1;
string value_type = 2;
enum UpdatePolicy {
UPDATE_POLICY_UNSET = 0;
// Provides a store where you can `set()` keys, and the latest key wins
UPDATE_POLICY_SET = 1;
// Provides a store where you can `set_if_not_exists()` keys, and the first key wins
UPDATE_POLICY_SET_IF_NOT_EXISTS = 2;
// Provides a store where you can `add_*()` keys, where two stores merge by summing its values.
UPDATE_POLICY_ADD = 3;
// Provides a store where you can `min_*()` keys, where two stores merge by leaving the minimum value.
UPDATE_POLICY_MIN = 4;
// Provides a store where you can `max_*()` keys, where two stores merge by leaving the maximum value.
UPDATE_POLICY_MAX = 5;
// Provides a store where you can `append()` keys, where two stores merge by concatenating the bytes in order.
UPDATE_POLICY_APPEND = 6;
}
}
message Input {
oneof input {
Source source = 1;
Map map = 2;
Store store = 3;
Params params = 4;
}
message Source {
string type = 1; // ex: "sf.ethereum.type.v1.Block"
}
message Map {
string module_name = 1; // ex: "block_to_pairs"
}
message Store {
string module_name = 1;
Mode mode = 2;
enum Mode {
UNSET = 0;
GET = 1;
DELTAS = 2;
}
}
message Params {
string value = 1;
}
}
message Output {
string type = 1;
}
}

View File

@@ -0,0 +1,42 @@
syntax = "proto3";
package sf.substreams.v1;
import "google/protobuf/any.proto";
import "google/protobuf/descriptor.proto";
import "sf/substreams/v1/modules.proto";
option go_package = "github.com/streamingfast/substreams/pb/sf/substreams/v1;pbsubstreams";
message Package {
// Needs to be one so this file can be used _directly_ as a
// buf `Image` andor a ProtoSet for grpcurl and other tools
repeated google.protobuf.FileDescriptorProto proto_files = 1;
reserved 2 to 4; // Reserved for future: in case protosets adds fields
uint64 version = 5;
sf.substreams.v1.Modules modules = 6;
repeated ModuleMetadata module_meta = 7;
repeated PackageMetadata package_meta = 8;
// Source network for Substreams to fetch its data from.
string network = 9;
google.protobuf.Any sink_config = 10;
string sink_module = 11;
// image is the bytes to a JPEG, WebP or PNG file. Max size is 2 MiB
bytes image = 12;
}
message PackageMetadata {
string version = 1;
string url = 2;
string name = 3;
string doc = 4;
}
message ModuleMetadata {
// Corresponds to the index in `Package.metadata.package_meta`
uint64 package_index = 1;
string doc = 2;
}

View File

@@ -0,0 +1,83 @@
syntax = "proto3";
package tycho.evm.v1;
// This file contains the proto definitions for Substreams common to all integrations.
// A struct describing a block.
message Block {
// The blocks hash.
bytes hash = 1;
// The parent blocks hash.
bytes parent_hash = 2;
// The block number.
uint64 number = 3;
// The block timestamp.
uint64 ts = 4;
}
// A struct describing a transaction.
message Transaction {
// The transaction hash.
bytes hash = 1;
// The sender of the transaction.
bytes from = 2;
// The receiver of the transaction.
bytes to = 3;
// The transactions index within the block.
uint64 index = 4;
}
// Enum to specify the type of a change.
enum ChangeType {
CHANGE_TYPE_UNSPECIFIED = 0;
CHANGE_TYPE_UPDATE = 1;
CHANGE_TYPE_CREATION = 2;
CHANGE_TYPE_DELETION = 3;
}
// A custom struct representing an arbitrary attribute of a protocol component.
// This is mainly used by the native integration to track the necessary information about the protocol.
message Attribute {
// The name of the attribute.
string name = 1;
// The value of the attribute.
bytes value = 2;
// The type of change the attribute underwent.
ChangeType change = 3;
}
// A struct describing a part of the protocol.
// Note: For example this can be a UniswapV2 pair, that would track the two ERC20 tokens used by the pair,
// the contract would be only the single sontract. The attributes would be empty for the VM integration,
// because we track all the relevant info via storage slots and balance changes.
// It can also be a wrapping contract, like WETH, that has a constant price, but it allows swapping tokens.
// This is why the name ProtocolComponent is used instead of "Pool" or "Pair".
message ProtocolComponent {
// A unique identifier for the component within the protocol.
// Can be a stringified address or a string describing the trading pair.
string id = 1;
// Addresses of the ERC20 tokens used by the component.
repeated bytes tokens = 2;
// Addresses of the contracts used by the component.
// Usually it is a single contract, but some protocols use multiple contracts.
repeated bytes contracts = 3;
// Attributes of the component. Used mainly be the native integration.
// The inner ChangeType of the attribute has to match the ChangeType of the ProtocolComponent.
repeated Attribute static_att = 4;
// Type of change the component underwent.
ChangeType change = 5;
}
// A struct for following the changes of Total Value Locked (TVL) of a protocol component.
// Note that if the ProtocolComponent contains multiple contracts, the TVL is tracked for the component as a whole.
// E.g. for UniswapV2 pair WETH/USDC, this tracks the USDC and WETH balance of the pair contract.
message BalanceChange {
// The address of the ERC20 token whose balance changed.
bytes token = 1;
// The new balance of the token.
bytes balance = 2;
// The id of the component whose TVL is tracked.
// If the protocol component includes multiple contracts, the balance change must be aggregated to reflect how much tokens can be traded.
bytes component_id = 3;
}

View File

@@ -0,0 +1,32 @@
syntax = "proto3";
package tycho.evm.v1;
import "tycho/evm/v1/common.proto";
// This file contains the definition for the native integration of Substreams.
// A component is a set of attributes that are associated with a custom entity.
message EntityChanges {
// A unique identifier of the entity within the protocol.
string component_id = 1;
// The set of attributes that are associated with the entity.
repeated Attribute attributes = 2;
}
message TransactionEntityChanges {
Transaction tx = 1;
repeated EntityChanges entity_changes = 2;
// An array of newly added components.
repeated ProtocolComponent component_changes = 3;
// An array of balance changes to components.
repeated BalanceChange balance_changes = 4;
}
// A set of transaction changes within a single block.
message BlockEntityChanges {
// The block for which these changes are collectively computed.
Block block = 1;
// The set of transaction changes observed in the specified block.
repeated TransactionEntityChanges changes = 2;
}

View File

@@ -0,0 +1,50 @@
syntax = "proto3";
package tycho.evm.v1;
import "tycho/evm/v1/common.proto";
// This file contains proto definitions specific to the VM integration.
// A key value entry into contract storage.
message ContractSlot {
// A contract's storage slot.
bytes slot = 2;
// The new value for this storage slot.
bytes value = 3;
}
// Changes made to a single contract's state.
message ContractChange {
// The contract's address
bytes address = 1;
// The new balance of the contract, empty bytes indicates no change.
bytes balance = 2;
// The new code of the contract, empty bytes indicates no change.
bytes code = 3;
// The changes to this contract's slots, empty sequence indicates no change.
repeated ContractSlot slots = 4;
// Whether this is an update, a creation or a deletion.
ChangeType change = 5;
}
// A set of changes aggregated by transaction.
message TransactionContractChanges {
// The transaction instance that results in the changes.
Transaction tx = 1;
// Contains the changes induced by the above transaction, aggregated on a per-contract basis.
// Must include changes to every contract that is tracked by all ProtocolComponents.
repeated ContractChange contract_changes = 2;
// An array of any component changes.
repeated ProtocolComponent component_changes = 3;
// An array of balance changes to components.
repeated BalanceChange balance_changes = 4;
}
// A set of transaction changes within a single block.
message BlockContractChanges {
// The block for which these changes are collectively computed.
Block block = 1;
// The set of transaction changes observed in the specified block.
repeated TransactionContractChanges changes = 2;
}

View File

99
substreams/Readme.md Normal file
View File

@@ -0,0 +1,99 @@
# Subtreams packages
This directory contains all substream packages that are used by the extractors to access certain data from diffrent
blockchains.
## Adding a new package
To add a new package add folder. The naming convention is `[CHAIN]-[PROTOCOL_SYSTEM]`.
### Manifest
In this new folder add a manifest file `substreams.yaml`. You can use the template below to get started:
```yaml
specVersion: v0.1.0
package:
name: 'substreams_[CHAIN]_[PROTOCOL_SYSTEM]'
version: v0.1.0
protobuf:
files:
- vm.proto
- common.proto
importPaths:
# This is different compared to the substreams example,
# we need to share protobuf definitions with tycho you
# are invited to reuse existing definitions if they are
# useful to you.
- ../../proto/evm/v1
# any private message types only used in internal modules
# can remain local to the crate.
- ./proto
binaries:
default:
type: wasm/rust-v1
# this points to the workspace target directory we use a special
# substreams build profile to optimise wasm binaries
file: ../../target/wasm32-unknown-unknown/substreams/substreams_[CHAIN]_[PROTOCOL_SYSTEM].wasm
modules:
- name: map_changes
kind: map
inputs:
- source: sf.ethereum.type.v2.Block
output:
type: proto:tycho.evm.state.v1.BlockContractChanges
```
Substreams packages are Rust crates so we also need a `cargo.toml`.
The example from the official docs will serve us just well:
```toml
[package]
name = "substreams_[CHAIN]_[PROTOCOL_SYSTEM]"
version = "0.1.0"
edition = "2021"
[lib]
name = "substreams_[CHAIN]_[PROTOCOL_SYSTEM]"
crate-type = ["cdylib"]
[dependencies]
substreams = "0.5"
substreams-ethereum = "0.9"
prost = "0.11"
```
There are already some generated rust files in the `src/pb` directory. These are generated from the protobuf files in the
Now we can generate the Rust protobuf code:
```
substreams protogen substreams.yaml --exclude-paths="sf/substreams,google"
```
The command above should put the generate rust files under `/src/pb`. You
can start using these now in your module handlers: See
the [official substreams documentation](https://thegraph.com/docs/en/substreams/getting-started/quickstart/#create-substreams-module-handlers)
on
how to implement module handlers.
You can also look into already existing substreams packages to see how it
is done. E.g. [ethereum-ambient](./ethereum-ambient/) provides a pretty good
example of how to get access to raw contract storage.
# Tests
To create a block test asset for ethereum do the following:
- Follow [this tutorial](https://substreams.streamingfast.io/tutorials/overview/map_block_meta_module). Make sure you
set up the substreams-explorer repo in the same directory as this repo.
- Comment out `image: ./ethereum.png` in `ethereum-explorer/substreams.yaml`
- Add `prost-types = "0.11.0"` to `ethereum-explorer/Cargo.toml`
- Make sure you set up your key env vars.
- Run `sh scripts/download-ethereum-block-to-s3 BLOCK_NUMBER`
Do not commit the block files (they are quite big).

1177
substreams/ethereum-ambient/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,18 @@
[package]
name = "substreams-ethereum-ambient"
version = "0.3.0"
edition = "2021"
[lib]
name = "substreams_ethereum_ambient"
crate-type = ["cdylib"]
[dependencies]
substreams = "0.5"
substreams-ethereum = "0.9"
prost = "0.11"
hex-literal = "0.4.1"
ethabi = "18.0.0"
hex = "0.4.2"
bytes = "1.5.0"
anyhow = "1.0.75"

View File

@@ -0,0 +1,12 @@
version: v1
plugins:
- plugin: buf.build/community/neoeinstein-prost:v0.2.2
out: src/pb
opt:
- file_descriptor_set=false
- plugin: buf.build/community/neoeinstein-prost-crate:v0.3.1
out: src/pb
opt:
- no_features

View File

@@ -0,0 +1,384 @@
use std::collections::{hash_map::Entry, HashMap};
use anyhow::{anyhow, bail};
use ethabi::{decode, ParamType};
use hex_literal::hex;
use substreams_ethereum::pb::eth::{self};
use pb::tycho::evm::v1::{self as tycho};
mod pb;
const AMBIENT_CONTRACT: [u8; 20] = hex!("aaaaaaaaa24eeeb8d57d431224f73832bc34f688");
const INIT_POOL_CODE: u8 = 71;
const USER_CMD_FN_SIG: [u8; 4] = [0xA1, 0x51, 0x12, 0xF9];
struct SlotValue {
new_value: Vec<u8>,
start_value: Vec<u8>,
}
impl SlotValue {
fn has_changed(&self) -> bool {
self.start_value != self.new_value
}
}
// Uses a map for slots, protobuf does not allow bytes in hashmap keys
struct InterimContractChange {
address: Vec<u8>,
balance: Vec<u8>,
code: Vec<u8>,
slots: HashMap<Vec<u8>, SlotValue>,
change: tycho::ChangeType,
}
impl From<InterimContractChange> for tycho::ContractChange {
fn from(value: InterimContractChange) -> Self {
tycho::ContractChange {
address: value.address,
balance: value.balance,
code: value.code,
slots: value
.slots
.into_iter()
.filter(|(_, value)| value.has_changed())
.map(|(slot, value)| tycho::ContractSlot { slot, value: value.new_value })
.collect(),
change: value.change.into(),
}
}
}
/// Extracts all contract changes relevant to vm simulations
///
/// This is the main logic of the substreams integration. It takes a raw ethereum block on input and extracts the BlockContractChanges stream. It includes tracking:
/// - new pool initializations
/// - all storage slot changes for the Ambient contract
/// - all ERC20 balance changes for the Ambient pools
/// - all code changes and balance updates of the Ambient contract
///
/// Generally we detect all changes in transactions sequentially and detect if it is a CREATE or UPDATE change based on already present data.
#[substreams::handlers::map]
fn map_changes(
block: eth::v2::Block,
) -> Result<tycho::BlockContractChanges, substreams::errors::Error> {
let mut block_changes = tycho::BlockContractChanges::default();
let mut tx_change = tycho::TransactionContractChanges::default();
let mut changed_contracts: HashMap<Vec<u8>, InterimContractChange> = HashMap::new();
// Collect all accounts created in this block
let created_accounts: HashMap<_, _> = block
.transactions()
.flat_map(|tx| {
tx.calls.iter().flat_map(|call| {
call.account_creations
.iter()
.map(|ac| (&ac.account, ac.ordinal))
})
})
.collect();
for block_tx in block.transactions() {
// Extract storage changes for all contracts relevant to this ProtocolComponent (i.e. Ambient)
let mut storage_changes = block_tx
.calls
.iter()
.filter(|call| !call.state_reverted)
.flat_map(|call| {
call.storage_changes
.iter()
.filter(|c| c.address == AMBIENT_CONTRACT)
})
.collect::<Vec<_>>();
storage_changes.sort_unstable_by_key(|change| change.ordinal);
// Detect all call to the Ambient contracts, even inner calls
let ambient_calls = block_tx
.calls
.iter()
.filter(|call| !call.state_reverted)
.filter(|call| call.address == AMBIENT_CONTRACT)
.collect::<Vec<_>>();
// Detect all pool initializations
// Official documentation: https://docs.ambient.finance/developers/dex-contract-interface/pool-initialization
for call in ambient_calls {
if call.input.len() < 4 {
continue;
}
if call.input[0..4] == USER_CMD_FN_SIG {
let user_cmd_external_abi_types = &[
// index of the proxy sidecar the command is being called on
ParamType::Uint(16),
// call data for internal UserCmd method
ParamType::Bytes,
];
let user_cmd_internal_abi_types = &[
ParamType::Uint(8), // command
ParamType::Address, // base
ParamType::Address, // quote
ParamType::Uint(256), // pool index
ParamType::Uint(128), // price
];
// Decode external call to UserCmd
if let Ok(external_params) = decode(user_cmd_external_abi_types, &call.input[4..]) {
let cmd_bytes = external_params[1]
.to_owned()
.into_bytes()
.ok_or_else(|| {
anyhow!("Failed to convert to bytes: {:?}", &external_params[1])
})?;
// Call data is structured differently depending on the cmd code, so only
// decode if this is an init pool code.
if cmd_bytes[31] == INIT_POOL_CODE {
// Decode internal call to UserCmd
if let Ok(internal_params) = decode(user_cmd_internal_abi_types, &cmd_bytes)
{
let base = internal_params[1]
.to_owned()
.into_address()
.ok_or_else(|| {
anyhow!(
"Failed to convert to address: {:?}",
&internal_params[1]
)
})?
.to_fixed_bytes()
.to_vec();
let quote = internal_params[2]
.to_owned()
.into_address()
.ok_or_else(|| {
anyhow!(
"Failed to convert to address: {:?}",
&internal_params[2]
)
})?
.to_fixed_bytes()
.to_vec();
let pool_index = internal_params[3]
.to_owned()
.into_uint()
.ok_or_else(|| anyhow!("Failed to convert to u32".to_string()))?
.as_u32();
let static_attribute = tycho::Attribute {
name: String::from("pool_index"),
value: pool_index.to_be_bytes().to_vec(),
change: tycho::ChangeType::Creation.into(),
};
let mut tokens: Vec<Vec<u8>> = vec![base.clone(), quote.clone()];
tokens.sort();
let new_component = tycho::ProtocolComponent {
id: format!(
"{}{}{}",
hex::encode(base.clone()),
hex::encode(quote.clone()),
pool_index
),
tokens,
contracts: vec![AMBIENT_CONTRACT.to_vec()],
static_att: vec![static_attribute],
change: tycho::ChangeType::Creation.into(),
};
tx_change
.component_changes
.push(new_component);
} else {
bail!("Failed to decode ABI internal call.".to_string());
}
}
} else {
bail!("Failed to decode ABI external call.".to_string());
}
}
}
// Extract all contract changes.
// We cache the data in a general interim contract > slot > value data structure.
// Note: some contracts change slot values and change them back to their
// original value before the transactions ends we remember the initial
// value before the first change and in the end filter found deltas
// that ended up not actually changing anything.
for storage_change in storage_changes.iter() {
match changed_contracts.entry(storage_change.address.clone()) {
// We have already an entry recording a change about this contract
// only append the change about this storage slot
Entry::Occupied(mut e) => {
let contract_change = e.get_mut();
match contract_change
.slots
.entry(storage_change.key.clone())
{
// The storage slot was already changed before, simply
// update new_value
Entry::Occupied(mut v) => {
let slot_value = v.get_mut();
slot_value
.new_value
.copy_from_slice(&storage_change.new_value);
}
// The storage slots is being initialised for the first time
Entry::Vacant(v) => {
v.insert(SlotValue {
new_value: storage_change.new_value.clone(),
start_value: storage_change.old_value.clone(),
});
}
}
}
// Intialise a new contract change after observing a storage change
Entry::Vacant(e) => {
let mut slots = HashMap::new();
slots.insert(
storage_change.key.clone(),
SlotValue {
new_value: storage_change.new_value.clone(),
start_value: storage_change.old_value.clone(),
},
);
e.insert(InterimContractChange {
address: storage_change.address.clone(),
balance: Vec::new(),
code: Vec::new(),
slots,
change: if created_accounts.contains_key(&storage_change.address) {
tycho::ChangeType::Creation
} else {
tycho::ChangeType::Update
},
});
}
}
}
// Extract balance changes
let mut balance_changes = block_tx
.calls
.iter()
.filter(|call| !call.state_reverted)
.flat_map(|call| {
call.balance_changes
.iter()
.filter(|c| c.address == AMBIENT_CONTRACT)
})
.collect::<Vec<_>>();
balance_changes.sort_unstable_by_key(|change| change.ordinal);
for balance_change in balance_changes.iter() {
match changed_contracts.entry(balance_change.address.clone()) {
Entry::Occupied(mut e) => {
let contract_change = e.get_mut();
if let Some(new_balance) = &balance_change.new_value {
contract_change.balance.clear();
contract_change
.balance
.extend_from_slice(&new_balance.bytes);
}
}
Entry::Vacant(e) => {
if let Some(new_balance) = &balance_change.new_value {
e.insert(InterimContractChange {
address: balance_change.address.clone(),
balance: new_balance.bytes.clone(),
code: Vec::new(),
slots: HashMap::new(),
change: if created_accounts.contains_key(&balance_change.address) {
tycho::ChangeType::Creation
} else {
tycho::ChangeType::Update
},
});
}
}
}
}
// Extract code changes
let mut code_changes = block_tx
.calls
.iter()
.filter(|call| !call.state_reverted)
.flat_map(|call| {
call.code_changes
.iter()
.filter(|c| c.address == AMBIENT_CONTRACT)
})
.collect::<Vec<_>>();
code_changes.sort_unstable_by_key(|change| change.ordinal);
for code_change in code_changes.iter() {
match changed_contracts.entry(code_change.address.clone()) {
Entry::Occupied(mut e) => {
let contract_change = e.get_mut();
contract_change.code.clear();
contract_change
.code
.extend_from_slice(&code_change.new_code);
}
Entry::Vacant(e) => {
e.insert(InterimContractChange {
address: code_change.address.clone(),
balance: Vec::new(),
code: code_change.new_code.clone(),
slots: HashMap::new(),
change: if created_accounts.contains_key(&code_change.address) {
tycho::ChangeType::Creation
} else {
tycho::ChangeType::Update
},
});
}
}
}
// If there were any changes, add transaction and push the changes
if !storage_changes.is_empty() || !balance_changes.is_empty() || !code_changes.is_empty() {
tx_change.tx = Some(tycho::Transaction {
hash: block_tx.hash.clone(),
from: block_tx.from.clone(),
to: block_tx.to.clone(),
index: block_tx.index as u64,
});
// reuse changed_contracts hash map by draining it, next iteration
// will start empty. This avoids a costly reallocation
for (_, change) in changed_contracts.drain() {
tx_change
.contract_changes
.push(change.into())
}
block_changes
.changes
.push(tx_change.clone());
// clear out the interim contract changes after we pushed those.
tx_change.tx = None;
tx_change.contract_changes.clear();
}
}
block_changes.block = Some(tycho::Block {
number: block.number,
hash: block.hash.clone(),
parent_hash: block
.header
.as_ref()
.expect("Block header not present")
.parent_hash
.clone(),
ts: block.timestamp_seconds(),
});
Ok(block_changes)
}

View File

@@ -0,0 +1,10 @@
// @generated
pub mod tycho {
pub mod evm {
// @@protoc_insertion_point(attribute:tycho.evm.v1)
pub mod v1 {
include!("tycho.evm.v1.rs");
// @@protoc_insertion_point(tycho.evm.v1)
}
}
}

View File

@@ -0,0 +1,183 @@
// @generated
// This file contains the proto definitions for Substreams common to all integrations.
/// A struct describing a block.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Block {
/// The blocks hash.
#[prost(bytes="vec", tag="1")]
pub hash: ::prost::alloc::vec::Vec<u8>,
/// The parent blocks hash.
#[prost(bytes="vec", tag="2")]
pub parent_hash: ::prost::alloc::vec::Vec<u8>,
/// The block number.
#[prost(uint64, tag="3")]
pub number: u64,
/// The block timestamp.
#[prost(uint64, tag="4")]
pub ts: u64,
}
/// A struct describing a transaction.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Transaction {
/// The transaction hash.
#[prost(bytes="vec", tag="1")]
pub hash: ::prost::alloc::vec::Vec<u8>,
/// The sender of the transaction.
#[prost(bytes="vec", tag="2")]
pub from: ::prost::alloc::vec::Vec<u8>,
/// The receiver of the transaction.
#[prost(bytes="vec", tag="3")]
pub to: ::prost::alloc::vec::Vec<u8>,
/// The transactions index within the block.
#[prost(uint64, tag="4")]
pub index: u64,
}
/// A custom struct representing an arbitrary attribute of a protocol component.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Attribute {
/// The name of the attribute.
#[prost(string, tag="1")]
pub name: ::prost::alloc::string::String,
/// The value of the attribute.
#[prost(bytes="vec", tag="2")]
pub value: ::prost::alloc::vec::Vec<u8>,
/// The type of change the attribute underwent.
#[prost(enumeration="ChangeType", tag="3")]
pub change: i32,
}
/// A struct describing a part of the protocol.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ProtocolComponent {
/// A unique identifier for the component within the protocol.
/// Can be a stringified address or a string describing the trading pair.
#[prost(string, tag="1")]
pub id: ::prost::alloc::string::String,
/// Addresses of the ERC20 tokens used by the component.
#[prost(bytes="vec", repeated, tag="2")]
pub tokens: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
/// Addresses of the contracts used by the component.
#[prost(bytes="vec", repeated, tag="3")]
pub contracts: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
/// Attributes of the component.
/// The inner ChangeType of the attribute has to match the ChangeType of the ProtocolComponent.
#[prost(message, repeated, tag="4")]
pub static_att: ::prost::alloc::vec::Vec<Attribute>,
/// Type of change the component underwent.
#[prost(enumeration="ChangeType", tag="5")]
pub change: i32,
}
/// A struct for following the changes of Total Value Locked (TVL) of a protocol component.
/// Note that if the ProtocolComponent contains multiple contracts, the TVL is tracked for the component as a whole.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct BalanceChange {
/// The address of the ERC20 token whose balance changed.
#[prost(bytes="vec", tag="1")]
pub token: ::prost::alloc::vec::Vec<u8>,
/// The new balance of the token.
#[prost(bytes="vec", tag="2")]
pub balance: ::prost::alloc::vec::Vec<u8>,
/// The id of the component whose TVL is tracked.
#[prost(bytes="vec", tag="3")]
pub component_id: ::prost::alloc::vec::Vec<u8>,
}
/// Enum to specify the type of a change.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum ChangeType {
Unspecified = 0,
Update = 1,
Creation = 2,
Deletion = 3,
}
impl ChangeType {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
ChangeType::Unspecified => "CHANGE_TYPE_UNSPECIFIED",
ChangeType::Update => "CHANGE_TYPE_UPDATE",
ChangeType::Creation => "CHANGE_TYPE_CREATION",
ChangeType::Deletion => "CHANGE_TYPE_DELETION",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"CHANGE_TYPE_UNSPECIFIED" => Some(Self::Unspecified),
"CHANGE_TYPE_UPDATE" => Some(Self::Update),
"CHANGE_TYPE_CREATION" => Some(Self::Creation),
"CHANGE_TYPE_DELETION" => Some(Self::Deletion),
_ => None,
}
}
}
// This file contains proto definitions specific to the VM integration.
/// A key value entry into contract storage.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ContractSlot {
/// A contract's storage slot.
#[prost(bytes="vec", tag="2")]
pub slot: ::prost::alloc::vec::Vec<u8>,
/// The new value for this storage slot.
#[prost(bytes="vec", tag="3")]
pub value: ::prost::alloc::vec::Vec<u8>,
}
/// Changes made to a single contract's state.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ContractChange {
/// The contract's address
#[prost(bytes="vec", tag="1")]
pub address: ::prost::alloc::vec::Vec<u8>,
/// The new balance of the contract, empty bytes indicates no change.
#[prost(bytes="vec", tag="2")]
pub balance: ::prost::alloc::vec::Vec<u8>,
/// The new code of the contract, empty bytes indicates no change.
#[prost(bytes="vec", tag="3")]
pub code: ::prost::alloc::vec::Vec<u8>,
/// The changes to this contract's slots, empty sequence indicates no change.
#[prost(message, repeated, tag="4")]
pub slots: ::prost::alloc::vec::Vec<ContractSlot>,
/// Whether this is an update, a creation or a deletion.
#[prost(enumeration="ChangeType", tag="5")]
pub change: i32,
}
/// A set of changes aggregated by transaction.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TransactionContractChanges {
/// The transaction instance that results in the changes.
#[prost(message, optional, tag="1")]
pub tx: ::core::option::Option<Transaction>,
/// Contains the changes induced by the above transaction, aggregated on a per-contract basis.
#[prost(message, repeated, tag="2")]
pub contract_changes: ::prost::alloc::vec::Vec<ContractChange>,
/// An array of newly added components.
#[prost(message, repeated, tag="3")]
pub component_changes: ::prost::alloc::vec::Vec<ProtocolComponent>,
/// An array of balance changes to components.
#[prost(message, repeated, tag="4")]
pub balance_changes: ::prost::alloc::vec::Vec<BalanceChange>,
}
/// A set of transaction changes within a single block.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct BlockContractChanges {
/// The block for which these changes are collectively computed.
#[prost(message, optional, tag="1")]
pub block: ::core::option::Option<Block>,
/// The set of transaction changes observed in the specified block.
#[prost(message, repeated, tag="2")]
pub changes: ::prost::alloc::vec::Vec<TransactionContractChanges>,
}
// @@protoc_insertion_point(module)

View File

@@ -0,0 +1,24 @@
specVersion: v0.1.0
package:
name: "substreams_ethereum_ambient"
version: v0.3.0
protobuf:
files:
- vm.proto
- common.proto
importPaths:
- ../../proto/tycho/evm/v1/
binaries:
default:
type: wasm/rust-v1
file: ../../target/wasm32-unknown-unknown/substreams/substreams_ethereum_ambient.wasm
modules:
- name: map_changes
kind: map
inputs:
- source: sf.ethereum.type.v2.Block
output:
type: proto:tycho.evm.state.v1.BlockContractChanges

1135
substreams/ethereum-template/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,13 @@
[package]
name = "substreams-ethereum-template"
version = "0.1.0"
edition = "2021"
[lib]
name = "substreams_ethereum_template"
crate-type = ["cdylib"]
[dependencies]
substreams = "0.5"
substreams-ethereum = "0.9"
prost = "0.11"

View File

@@ -0,0 +1,12 @@
version: v1
plugins:
- plugin: buf.build/community/neoeinstein-prost:v0.2.2
out: src/pb
opt:
- file_descriptor_set=false
- plugin: buf.build/community/neoeinstein-prost-crate:v0.3.1
out: src/pb
opt:
- no_features

View File

@@ -0,0 +1,13 @@
use substreams_ethereum::pb::eth;
use pb::tycho::evm::v1::{self as tycho};
mod pb;
#[substreams::handlers::map]
fn map_changes(
block: eth::v2::Block,
) -> Result<tycho::BlockContractChanges, substreams::errors::Error> {
todo!("Not implemented")
}

View File

@@ -0,0 +1,10 @@
// @generated
pub mod tycho {
pub mod evm {
// @@protoc_insertion_point(attribute:tycho.evm.v1)
pub mod v1 {
include!("tycho.evm.v1.rs");
// @@protoc_insertion_point(tycho.evm.v1)
}
}
}

View File

@@ -0,0 +1,183 @@
// @generated
// This file contains the proto definitions for Substreams common to all integrations.
/// A struct describing a block.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Block {
/// The blocks hash.
#[prost(bytes="vec", tag="1")]
pub hash: ::prost::alloc::vec::Vec<u8>,
/// The parent blocks hash.
#[prost(bytes="vec", tag="2")]
pub parent_hash: ::prost::alloc::vec::Vec<u8>,
/// The block number.
#[prost(uint64, tag="3")]
pub number: u64,
/// The block timestamp.
#[prost(uint64, tag="4")]
pub ts: u64,
}
/// A struct describing a transaction.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Transaction {
/// The transaction hash.
#[prost(bytes="vec", tag="1")]
pub hash: ::prost::alloc::vec::Vec<u8>,
/// The sender of the transaction.
#[prost(bytes="vec", tag="2")]
pub from: ::prost::alloc::vec::Vec<u8>,
/// The receiver of the transaction.
#[prost(bytes="vec", tag="3")]
pub to: ::prost::alloc::vec::Vec<u8>,
/// The transactions index within the block.
#[prost(uint64, tag="4")]
pub index: u64,
}
/// A custom struct representing an arbitrary attribute of a protocol component.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Attribute {
/// The name of the attribute.
#[prost(string, tag="1")]
pub name: ::prost::alloc::string::String,
/// The value of the attribute.
#[prost(bytes="vec", tag="2")]
pub value: ::prost::alloc::vec::Vec<u8>,
/// The type of change the attribute underwent.
#[prost(enumeration="ChangeType", tag="3")]
pub change: i32,
}
/// A struct describing a part of the protocol.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ProtocolComponent {
/// A unique identifier for the component within the protocol.
/// Can be a stringified address or a string describing the trading pair.
#[prost(string, tag="1")]
pub id: ::prost::alloc::string::String,
/// Addresses of the ERC20 tokens used by the component.
#[prost(bytes="vec", repeated, tag="2")]
pub tokens: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
/// Addresses of the contracts used by the component.
#[prost(bytes="vec", repeated, tag="3")]
pub contracts: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
/// Attributes of the component.
/// The inner ChangeType of the attribute has to match the ChangeType of the ProtocolComponent.
#[prost(message, repeated, tag="4")]
pub static_att: ::prost::alloc::vec::Vec<Attribute>,
/// Type of change the component underwent.
#[prost(enumeration="ChangeType", tag="5")]
pub change: i32,
}
/// A struct for following the changes of Total Value Locked (TVL) of a protocol component.
/// Note that if the ProtocolComponent contains multiple contracts, the TVL is tracked for the component as a whole.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct BalanceChange {
/// The address of the ERC20 token whose balance changed.
#[prost(bytes="vec", tag="1")]
pub token: ::prost::alloc::vec::Vec<u8>,
/// The new balance of the token.
#[prost(bytes="vec", tag="2")]
pub balance: ::prost::alloc::vec::Vec<u8>,
/// The id of the component whose TVL is tracked.
#[prost(bytes="vec", tag="3")]
pub component_id: ::prost::alloc::vec::Vec<u8>,
}
/// Enum to specify the type of a change.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum ChangeType {
Unspecified = 0,
Update = 1,
Creation = 2,
Deletion = 3,
}
impl ChangeType {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
ChangeType::Unspecified => "CHANGE_TYPE_UNSPECIFIED",
ChangeType::Update => "CHANGE_TYPE_UPDATE",
ChangeType::Creation => "CHANGE_TYPE_CREATION",
ChangeType::Deletion => "CHANGE_TYPE_DELETION",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"CHANGE_TYPE_UNSPECIFIED" => Some(Self::Unspecified),
"CHANGE_TYPE_UPDATE" => Some(Self::Update),
"CHANGE_TYPE_CREATION" => Some(Self::Creation),
"CHANGE_TYPE_DELETION" => Some(Self::Deletion),
_ => None,
}
}
}
// This file contains proto definitions specific to the VM integration.
/// A key value entry into contract storage.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ContractSlot {
/// A contract's storage slot.
#[prost(bytes="vec", tag="2")]
pub slot: ::prost::alloc::vec::Vec<u8>,
/// The new value for this storage slot.
#[prost(bytes="vec", tag="3")]
pub value: ::prost::alloc::vec::Vec<u8>,
}
/// Changes made to a single contract's state.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ContractChange {
/// The contract's address
#[prost(bytes="vec", tag="1")]
pub address: ::prost::alloc::vec::Vec<u8>,
/// The new balance of the contract, empty bytes indicates no change.
#[prost(bytes="vec", tag="2")]
pub balance: ::prost::alloc::vec::Vec<u8>,
/// The new code of the contract, empty bytes indicates no change.
#[prost(bytes="vec", tag="3")]
pub code: ::prost::alloc::vec::Vec<u8>,
/// The changes to this contract's slots, empty sequence indicates no change.
#[prost(message, repeated, tag="4")]
pub slots: ::prost::alloc::vec::Vec<ContractSlot>,
/// Whether this is an update, a creation or a deletion.
#[prost(enumeration="ChangeType", tag="5")]
pub change: i32,
}
/// A set of changes aggregated by transaction.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TransactionContractChanges {
/// The transaction instance that results in the changes.
#[prost(message, optional, tag="1")]
pub tx: ::core::option::Option<Transaction>,
/// Contains the changes induced by the above transaction, aggregated on a per-contract basis.
#[prost(message, repeated, tag="2")]
pub contract_changes: ::prost::alloc::vec::Vec<ContractChange>,
/// An array of newly added components.
#[prost(message, repeated, tag="3")]
pub component_changes: ::prost::alloc::vec::Vec<ProtocolComponent>,
/// An array of balance changes to components.
#[prost(message, repeated, tag="4")]
pub balance_changes: ::prost::alloc::vec::Vec<BalanceChange>,
}
/// A set of transaction changes within a single block.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct BlockContractChanges {
/// The block for which these changes are collectively computed.
#[prost(message, optional, tag="1")]
pub block: ::core::option::Option<Block>,
/// The set of transaction changes observed in the specified block.
#[prost(message, repeated, tag="2")]
pub changes: ::prost::alloc::vec::Vec<TransactionContractChanges>,
}
// @@protoc_insertion_point(module)

View File

@@ -0,0 +1,24 @@
specVersion: v0.1.0
package:
name: "substreams_ethereum_template"
version: v0.1.0
protobuf:
files:
- vm.proto
- common.proto
importPaths:
- ../../proto/tycho/evm/v1/
binaries:
default:
type: wasm/rust-v1
file: ../../target/wasm32-unknown-unknown/substreams/substreams_ethereum_template.wasm
modules:
- name: map_changes
kind: map
inputs:
- source: sf.ethereum.type.v2.Block
output:
type: proto:tycho.evm.state.v1.BlockContractChanges