Clone proto and substreams from tycho-indexer

This commit is contained in:
pistomat
2023-12-20 14:29:26 +01:00
parent 4f280b9b9c
commit 8a7271bc59
22 changed files with 2599 additions and 0 deletions

12
.gitignore vendored
View File

@@ -1 +1,13 @@
### Rust ###
# Generated by Cargo
# will have compiled files and executables
debug/
target/
# Substreams spkg files are build artifacts
*.spkg
.env
.vscode
.idea
*.log

2
proto/buf.lock Normal file
View File

@@ -0,0 +1,2 @@
# Generated by buf. DO NOT EDIT.
version: v1

7
proto/buf.yaml Normal file
View File

@@ -0,0 +1,7 @@
version: v1
breaking:
use:
- FILE
lint:
use:
- BASIC

View File

@@ -0,0 +1,36 @@
syntax = "proto3";
package sf.substreams.internal.v2;
import "google/protobuf/any.proto";
option go_package = "github.com/streamingfast/substreams/pb/sf/substreams/internal/v2;pbssinternal";
message StoreDeltas {
repeated StoreDelta store_deltas = 1;
}
message StoreDelta {
enum Operation {
UNSET = 0;
CREATE = 1;
UPDATE = 2;
DELETE = 3;
}
Operation operation = 1;
uint64 ordinal = 2;
string key = 3;
bytes old_value = 4;
bytes new_value = 5;
}
message ModuleOutput {
string module_name = 1;
oneof data {
google.protobuf.Any map_output = 2;
StoreDeltas store_deltas = 3;
}
repeated string logs = 4;
bool debug_logs_truncated = 5;
bool cached = 6;
}

View File

@@ -0,0 +1,88 @@
syntax = "proto3";
package sf.substreams.internal.v2;
import "sf/substreams/v1/modules.proto";
option go_package = "github.com/streamingfast/substreams/pb/sf/substreams/internal/v2;pbssinternal";
service Substreams {
rpc ProcessRange(ProcessRangeRequest) returns (stream ProcessRangeResponse);
}
message ProcessRangeRequest {
uint64 start_block_num = 1;
uint64 stop_block_num = 2;
string output_module = 3;
sf.substreams.v1.Modules modules = 4;
uint32 stage = 5; // 0-based index of stage to execute up to
}
message ProcessRangeResponse {
reserved 1; // previously string module_name = 1;
reserved 2; // previously in oneof(type): BlockRange processed_range
reserved 3; // previously in oneof(type): ProcessedBytes processed_bytes
oneof type {
Failed failed = 4;
Completed completed = 5;
Update update = 6;
}
}
message Update {
uint64 duration_ms = 1;
uint64 processed_blocks = 2;
uint64 total_bytes_read = 3;
uint64 total_bytes_written = 4;
repeated ModuleStats modules_stats = 5;
}
message ModuleStats {
string name = 1;
uint64 processing_time_ms = 2;
uint64 store_operation_time_ms = 3;
uint64 store_read_count = 4;
repeated ExternalCallMetric external_call_metrics = 5;
// store-specific (will be 0 on mappers)
uint64 store_write_count = 10;
uint64 store_deleteprefix_count = 11;
uint64 store_size_bytes = 12;
}
message ExternalCallMetric {
string name = 1;
uint64 count = 2;
uint64 time_ms = 3;
}
message Completed {
repeated BlockRange all_processed_ranges = 1;
// TraceId represents the producer's trace id that produced the partial files.
// This is present here so that the consumer can use it to identify the
// right partial files that needs to be squashed together.
//
// The TraceId can be empty in which case it should be assumed by the tier1
// consuming this message that the tier2 that produced those partial files
// is not yet updated to produce a trace id and a such, the tier1 should
// generate a legacy partial file name.
string trace_id = 2;
}
message Failed {
string reason = 1;
repeated string logs = 2;
// FailureLogsTruncated is a flag that tells you if you received all the logs or if they
// were truncated because you logged too much (fixed limit currently is set to 128 KiB).
bool logs_truncated = 3;
}
message BlockRange {
uint64 start_block = 2;
uint64 end_block = 3;
}

View File

@@ -0,0 +1,20 @@
syntax = "proto3";
package sf.substreams;
import "google/protobuf/descriptor.proto";
option go_package = "github.com/streamingfast/substreams/pb/sf/substreams;pbsubstreams";
message FieldOptions {
// this option informs the `substreams pack` command that it should treat the corresponding manifest value as a path to a file, putting its content as bytes in this field.
// must be applied to a `bytes` or `string` field
bool load_from_file = 1;
// this option informs the `substreams pack` command that it should treat the corresponding manifest value as a path to a folder, zipping its content and putting the zip content as bytes in this field.
// must be applied to a `bytes` field
bool zip_from_folder = 2;
}
extend google.protobuf.FieldOptions {
optional FieldOptions options = 2200;
}

View File

@@ -0,0 +1,235 @@
syntax = "proto3";
package sf.substreams.rpc.v2;
import "google/protobuf/any.proto";
import "sf/substreams/v1/clock.proto";
import "sf/substreams/v1/modules.proto";
option go_package = "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2;pbsubstreamsrpc";
service Stream {
rpc Blocks(Request) returns (stream Response);
}
message Request {
int64 start_block_num = 1;
string start_cursor = 2;
uint64 stop_block_num = 3;
// With final_block_only, you only receive blocks that are irreversible:
// 'final_block_height' will be equal to current block and no 'undo_signal' will ever be sent
bool final_blocks_only = 4;
// Substreams has two mode when executing your module(s) either development mode or production
// mode. Development and production modes impact the execution of Substreams, important aspects
// of execution include:
// * The time required to reach the first byte.
// * The speed that large ranges get executed.
// * The module logs and outputs sent back to the client.
//
// By default, the engine runs in developer mode, with richer and deeper output. Differences
// between production and development modes include:
// * Forward parallel execution is enabled in production mode and disabled in development mode
// * The time required to reach the first byte in development mode is faster than in production mode.
//
// Specific attributes of development mode include:
// * The client will receive all of the executed module's logs.
// * It's possible to request specific store snapshots in the execution tree (via `debug_initial_store_snapshot_for_modules`).
// * Multiple module's output is possible.
//
// With production mode`, however, you trade off functionality for high speed enabling forward
// parallel execution of module ahead of time.
bool production_mode = 5;
string output_module = 6;
sf.substreams.v1.Modules modules = 7;
// Available only in developer mode
repeated string debug_initial_store_snapshot_for_modules = 10;
}
message Response {
oneof message {
SessionInit session = 1; // Always sent first
ModulesProgress progress = 2; // Progress of data preparation, before sending in the stream of `data` events.
BlockScopedData block_scoped_data = 3;
BlockUndoSignal block_undo_signal = 4;
Error fatal_error = 5;
// Available only in developer mode, and only if `debug_initial_store_snapshot_for_modules` is set.
InitialSnapshotData debug_snapshot_data = 10;
// Available only in developer mode, and only if `debug_initial_store_snapshot_for_modules` is set.
InitialSnapshotComplete debug_snapshot_complete = 11;
}
}
// BlockUndoSignal informs you that every bit of data
// with a block number above 'last_valid_block' has been reverted
// on-chain. Delete that data and restart from 'last_valid_cursor'
message BlockUndoSignal {
sf.substreams.v1.BlockRef last_valid_block = 1;
string last_valid_cursor = 2;
}
message BlockScopedData {
MapModuleOutput output = 1;
sf.substreams.v1.Clock clock = 2;
string cursor = 3;
// Non-deterministic, allows substreams-sink to let go of their undo data.
uint64 final_block_height = 4;
repeated MapModuleOutput debug_map_outputs = 10;
repeated StoreModuleOutput debug_store_outputs = 11;
}
message SessionInit {
string trace_id = 1;
uint64 resolved_start_block = 2;
uint64 linear_handoff_block = 3;
uint64 max_parallel_workers = 4;
}
message InitialSnapshotComplete {
string cursor = 1;
}
message InitialSnapshotData {
string module_name = 1;
repeated StoreDelta deltas = 2;
uint64 sent_keys = 4;
uint64 total_keys = 3;
}
message MapModuleOutput {
string name = 1;
google.protobuf.Any map_output = 2;
// DebugOutputInfo is available in non-production mode only
OutputDebugInfo debug_info = 10;
}
// StoreModuleOutput are produced for store modules in development mode.
// It is not possible to retrieve store models in production, with parallelization
// enabled. If you need the deltas directly, write a pass through mapper module
// that will get them down to you.
message StoreModuleOutput {
string name = 1;
repeated StoreDelta debug_store_deltas = 2;
OutputDebugInfo debug_info = 10;
}
message OutputDebugInfo {
repeated string logs = 1;
// LogsTruncated is a flag that tells you if you received all the logs or if they
// were truncated because you logged too much (fixed limit currently is set to 128 KiB).
bool logs_truncated = 2;
bool cached = 3;
}
// ModulesProgress is a message that is sent every 500ms
message ModulesProgress {
// previously: repeated ModuleProgress modules = 1;
// these previous `modules` messages were sent in bursts and are not sent anymore.
reserved 1;
// List of jobs running on tier2 servers
repeated Job running_jobs = 2;
// Execution statistics for each module
repeated ModuleStats modules_stats = 3;
// Stages definition and completed block ranges
repeated Stage stages = 4;
ProcessedBytes processed_bytes = 5;
}
message ProcessedBytes {
uint64 total_bytes_read = 1;
uint64 total_bytes_written = 2;
}
message Error {
string module = 1;
string reason = 2;
repeated string logs = 3;
// FailureLogsTruncated is a flag that tells you if you received all the logs or if they
// were truncated because you logged too much (fixed limit currently is set to 128 KiB).
bool logs_truncated = 4;
}
message Job {
uint32 stage = 1;
uint64 start_block = 2;
uint64 stop_block = 3;
uint64 processed_blocks = 4;
uint64 duration_ms = 5;
}
message Stage {
repeated string modules = 1;
repeated BlockRange completed_ranges = 2;
}
// ModuleStats gathers metrics and statistics from each module, running on tier1 or tier2
// All the 'count' and 'time_ms' values may include duplicate for each stage going over that module
message ModuleStats {
// name of the module
string name = 1;
// total_processed_blocks is the sum of blocks sent to that module code
uint64 total_processed_block_count = 2;
// total_processing_time_ms is the sum of all time spent running that module code
uint64 total_processing_time_ms = 3;
//// external_calls are chain-specific intrinsics, like "Ethereum RPC calls".
repeated ExternalCallMetric external_call_metrics = 4;
// total_store_operation_time_ms is the sum of all time spent running that module code waiting for a store operation (ex: read, write, delete...)
uint64 total_store_operation_time_ms = 5;
// total_store_read_count is the sum of all the store Read operations called from that module code
uint64 total_store_read_count = 6;
// total_store_write_count is the sum of all store Write operations called from that module code (store-only)
uint64 total_store_write_count = 10;
// total_store_deleteprefix_count is the sum of all store DeletePrefix operations called from that module code (store-only)
// note that DeletePrefix can be a costly operation on large stores
uint64 total_store_deleteprefix_count = 11;
// store_size_bytes is the uncompressed size of the full KV store for that module, from the last 'merge' operation (store-only)
uint64 store_size_bytes = 12;
// total_store_merging_time_ms is the time spent merging partial stores into a full KV store for that module (store-only)
uint64 total_store_merging_time_ms = 13;
// store_currently_merging is true if there is a merging operation (partial store to full KV store) on the way.
bool store_currently_merging = 14;
// highest_contiguous_block is the highest block in the highest merged full KV store of that module (store-only)
uint64 highest_contiguous_block = 15;
}
message ExternalCallMetric {
string name = 1;
uint64 count = 2;
uint64 time_ms = 3;
}
message StoreDelta {
enum Operation {
UNSET = 0;
CREATE = 1;
UPDATE = 2;
DELETE = 3;
}
Operation operation = 1;
uint64 ordinal = 2;
string key = 3;
bytes old_value = 4;
bytes new_value = 5;
}
message BlockRange {
uint64 start_block = 2;
uint64 end_block = 3;
}

View File

@@ -0,0 +1,142 @@
syntax = "proto3";
package sf.substreams.sink.service.v1;
import "sf/substreams/v1/package.proto";
option go_package = "github.com/streamingfast/substreams/pb/sf/substreams/sink/service/v1;pbsinksvc";
service Provider {
rpc Deploy(DeployRequest) returns (DeployResponse);
rpc Update(UpdateRequest) returns (UpdateResponse);
rpc Info(InfoRequest) returns (InfoResponse);
rpc List(ListRequest) returns (ListResponse);
rpc Pause(PauseRequest) returns (PauseResponse);
rpc Stop(StopRequest) returns (StopResponse);
rpc Resume(ResumeRequest) returns (ResumeResponse);
rpc Remove(RemoveRequest) returns (RemoveResponse);
}
message DeployRequest {
sf.substreams.v1.Package substreams_package = 1;
bool development_mode = 2;
repeated Parameter parameters = 3;
}
message Parameter {
string key = 1;
string value = 2;
}
message DeployResponse {
DeploymentStatus status = 1;
// deployment_id is a short name (max 8 characters) that uniquely identifies your deployment
string deployment_id = 2;
map<string, string> services = 3;
string reason = 4;
string motd = 5;
}
message UpdateRequest {
sf.substreams.v1.Package substreams_package = 1;
string deployment_id = 2;
bool reset = 3;
}
message UpdateResponse {
DeploymentStatus status = 1;
map<string, string> services = 2;
string reason = 3;
string motd = 4;
}
message InfoRequest {
string deployment_id = 1;
}
message InfoResponse {
DeploymentStatus status = 1;
map<string, string> services = 2;
string reason = 3;
PackageInfo package_info = 4;
SinkProgress progress = 5;
string motd = 6;
}
message SinkProgress {
uint64 last_processed_block = 1;
}
message PackageInfo {
string name = 1;
string version = 2;
string output_module_name = 3;
string output_module_hash = 4;
}
message ListRequest {}
message ListResponse {
repeated DeploymentWithStatus deployments = 1;
}
message DeploymentWithStatus {
string id = 1;
DeploymentStatus status = 2;
string reason = 3;
PackageInfo package_info = 4;
SinkProgress progress = 5;
string motd = 6;
}
enum DeploymentStatus {
UNKNOWN = 0;
RUNNING = 1;
FAILING = 2;
PAUSED = 3;
STOPPED = 4;
STARTING = 5;
PAUSING = 6;
STOPPING = 7;
REMOVING = 8;
RESUMING = 9;
}
message RemoveRequest {
string deployment_id = 1;
}
message RemoveResponse {
DeploymentStatus previous_status = 1;
}
message PauseRequest {
string deployment_id = 1;
}
message PauseResponse {
DeploymentStatus previous_status = 1;
DeploymentStatus new_status = 2;
}
message StopRequest {
string deployment_id = 1;
}
message StopResponse {
DeploymentStatus previous_status = 1;
DeploymentStatus new_status = 2;
}
message ResumeRequest {
string deployment_id = 1;
}
message ResumeResponse {
DeploymentStatus previous_status = 1;
DeploymentStatus new_status = 2;
}

View File

@@ -0,0 +1,20 @@
syntax = "proto3";
package sf.substreams.v1;
import "google/protobuf/timestamp.proto";
option go_package = "github.com/streamingfast/substreams/pb/sf/substreams/v1;pbsubstreams";
// Clock is a pointer to a block with added timestamp
message Clock {
string id = 1;
uint64 number = 2;
google.protobuf.Timestamp timestamp = 3;
}
// BlockRef is a pointer to a block to which we don't know the timestamp
message BlockRef {
string id = 1;
uint64 number = 2;
}

View File

@@ -0,0 +1,98 @@
syntax = "proto3";
package sf.substreams.v1;
option go_package = "github.com/streamingfast/substreams/pb/sf/substreams/v1;pbsubstreams";
message Modules {
repeated Module modules = 1;
repeated Binary binaries = 2;
}
// Binary represents some code compiled to its binary form.
message Binary {
string type = 1;
bytes content = 2;
}
message Module {
string name = 1;
oneof kind {
KindMap kind_map = 2;
KindStore kind_store = 3;
}
uint32 binary_index = 4;
string binary_entrypoint = 5;
repeated Input inputs = 6;
Output output = 7;
uint64 initial_block = 8;
message KindMap {
string output_type = 1;
}
message KindStore {
// The `update_policy` determines the functions available to mutate the store
// (like `set()`, `set_if_not_exists()` or `sum()`, etc..) in
// order to ensure that parallel operations are possible and deterministic
//
// Say a store cumulates keys from block 0 to 1M, and a second store
// cumulates keys from block 1M to 2M. When we want to use this
// store as a dependency for a downstream module, we will merge the
// two stores according to this policy.
UpdatePolicy update_policy = 1;
string value_type = 2;
enum UpdatePolicy {
UPDATE_POLICY_UNSET = 0;
// Provides a store where you can `set()` keys, and the latest key wins
UPDATE_POLICY_SET = 1;
// Provides a store where you can `set_if_not_exists()` keys, and the first key wins
UPDATE_POLICY_SET_IF_NOT_EXISTS = 2;
// Provides a store where you can `add_*()` keys, where two stores merge by summing its values.
UPDATE_POLICY_ADD = 3;
// Provides a store where you can `min_*()` keys, where two stores merge by leaving the minimum value.
UPDATE_POLICY_MIN = 4;
// Provides a store where you can `max_*()` keys, where two stores merge by leaving the maximum value.
UPDATE_POLICY_MAX = 5;
// Provides a store where you can `append()` keys, where two stores merge by concatenating the bytes in order.
UPDATE_POLICY_APPEND = 6;
}
}
message Input {
oneof input {
Source source = 1;
Map map = 2;
Store store = 3;
Params params = 4;
}
message Source {
string type = 1; // ex: "sf.ethereum.type.v1.Block"
}
message Map {
string module_name = 1; // ex: "block_to_pairs"
}
message Store {
string module_name = 1;
Mode mode = 2;
enum Mode {
UNSET = 0;
GET = 1;
DELTAS = 2;
}
}
message Params {
string value = 1;
}
}
message Output {
string type = 1;
}
}

View File

@@ -0,0 +1,42 @@
syntax = "proto3";
package sf.substreams.v1;
import "google/protobuf/any.proto";
import "google/protobuf/descriptor.proto";
import "sf/substreams/v1/modules.proto";
option go_package = "github.com/streamingfast/substreams/pb/sf/substreams/v1;pbsubstreams";
message Package {
// Needs to be one so this file can be used _directly_ as a
// buf `Image` andor a ProtoSet for grpcurl and other tools
repeated google.protobuf.FileDescriptorProto proto_files = 1;
reserved 2 to 4; // Reserved for future: in case protosets adds fields
uint64 version = 5;
sf.substreams.v1.Modules modules = 6;
repeated ModuleMetadata module_meta = 7;
repeated PackageMetadata package_meta = 8;
// Source network for Substreams to fetch its data from.
string network = 9;
google.protobuf.Any sink_config = 10;
string sink_module = 11;
// image is the bytes to a JPEG, WebP or PNG file. Max size is 2 MiB
bytes image = 12;
}
message PackageMetadata {
string version = 1;
string url = 2;
string name = 3;
string doc = 4;
}
message ModuleMetadata {
// Corresponds to the index in `Package.metadata.package_meta`
uint64 package_index = 1;
string doc = 2;
}

View File

@@ -0,0 +1,44 @@
syntax = "proto3";
package tycho.evm.v1;
message Block {
bytes hash = 1;
bytes parent_hash = 2;
uint64 number = 3;
uint64 ts = 4;
}
message Transaction {
bytes hash = 1;
bytes from = 2;
bytes to = 3;
uint64 index = 4;
}
enum ChangeType {
CHANGE_TYPE_UNSPECIFIED = 0;
CHANGE_TYPE_UPDATE = 1;
CHANGE_TYPE_CREATION = 2;
CHANGE_TYPE_DELETION = 3;
}
message Attribute {
string name = 1;
bytes value = 2;
ChangeType change = 3;
}
message ProtocolComponent {
string id = 1;
repeated bytes tokens = 2;
repeated string contracts = 3;
repeated Attribute static_att = 4;
ChangeType change = 5;
}
message BalanceChange {
bytes token = 1;
bytes balance = 2;
bytes component_id = 3;
}

View File

@@ -0,0 +1,22 @@
syntax = "proto3";
package tycho.evm.v1;
import "tycho/evm/v1/common.proto";
message EntityChanges {
string component_id = 1;
repeated Attribute attributes = 2;
}
message TransactionEntityChanges {
Transaction tx = 1;
repeated EntityChanges entity_changes = 2;
repeated ProtocolComponent component_changes = 3;
repeated BalanceChange balance_changes = 4;
}
message BlockEntityChanges {
Block block = 1;
repeated TransactionEntityChanges changes = 2;
}

View File

@@ -0,0 +1,34 @@
syntax = "proto3";
package tycho.evm.v1;
import "tycho/evm/v1/common.proto";
message ContractSlot {
bytes slot = 2;
bytes value = 3;
}
message ContractChange {
bytes address = 1;
// empty bytes indicates no change
bytes balance = 2;
// empty bytes indicates no change
bytes code = 3;
// empty sequence indicates no change
repeated ContractSlot slots = 4;
// Whether this is an update, creation or deletion
ChangeType change = 5;
}
message TransactionContractChanges {
Transaction tx = 1;
repeated ContractChange contract_changes = 2;
repeated ProtocolComponent component_changes = 3;
repeated BalanceChange balance_changes = 4;
}
message BlockContractChanges {
Block block = 1;
repeated TransactionContractChanges changes = 2;
}

View File

95
substreams/Readme.md Normal file
View File

@@ -0,0 +1,95 @@
# Subtreams packages
This directory contains all substream packages that are used by the extractors to access certain data from diffrent
blockchains.
## Adding a new package
To add a new package add folder. The naming convention is `[CHAIN]-[PROTOCOL_SYSTEM]`. In this new folder add a manifest
file `substreams.yaml`. You can use the template below to get started:
```yaml
specVersion: v0.1.0
package:
name: 'substreams_[CHAIN]_[PROTOCOL_SYSTEM]'
version: v0.1.0
protobuf:
files:
- vm.proto
- common.proto
importPaths:
# This is different compared to the substreams example,
# we need to share protobuf definitions with tycho you
# are invited to reuse existing definitions if they are
# useful to you.
- ../../proto/evm/v1
# any private message types only used in internal modules
# can remain local to the crate.
- ./proto
binaries:
default:
type: wasm/rust-v1
# this points to the workspace target directory we use a special
# substreams build profile to optimise wasm binaries
file: ../../target/wasm32-unknown-unknown/substreams/substreams_[CHAIN]_[PROTOCOL_SYSTEM].wasm
modules:
# sample module provides access to blocks.
- name: map_block
kind: map
inputs:
- source: sf.ethereum.type.v2.Block
output:
type: proto:acme.block_meta.v1.BlockMeta
```
Substreams packages are Rust crates so we also need a `cargo.toml`.
The example from the official docs will serve us just well:
```toml
[package]
name = "substreams_[CHAIN]_[PROTOCOL_SYSTEM]"
version = "0.1.0"
edition = "2021"
[lib]
name = "substreams_[CHAIN]_[PROTOCOL_SYSTEM]"
crate-type = ["cdylib"]
[dependencies]
substreams = "0.5"
substreams-ethereum = "0.9"
prost = "0.11"
```
Now we can generate the Rust protobuf code:
```
substreams protogen substreams.yaml --exclude-paths="sf/substreams,google"
```
The command above should put the generate rust files under `/src/pb`. You
can start using these now in your module handlers: See
the [official substreams documentation](https://thegraph.com/docs/en/substreams/getting-started/quickstart/#create-substreams-module-handlers)
on
how to implement module handlers.
You can also look into already existing substreams packages to see how it
is done. E.g. [ethereum-ambient](./ethereum-ambient/) provides a pretty good
example of how to get access to raw contract storage.
# Tests
To create a block test asset for ethereum do the following:
- Follow [this tutorial](https://substreams.streamingfast.io/tutorials/overview/map_block_meta_module). Make sure you
set up the substreams-explorer repo in the same directory as this repo.
- Comment out `image: ./ethereum.png` in `ethereum-explorer/substreams.yaml`
- Add `prost-types = "0.11.0"` to `ethereum-explorer/Cargo.toml`
- Make sure you set up your key env vars.
- Run `sh scripts/download-ethereum-block-to-s3 BLOCK_NUMBER`
Do not commit the block files (they are quite big).

1095
substreams/ethereum-ambient/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,18 @@
[package]
name = "substreams-ethereum-ambient"
version = "0.3.0"
edition = "2021"
[lib]
name = "substreams_ethereum_ambient"
crate-type = ["cdylib"]
[dependencies]
substreams = "0.5"
substreams-ethereum = "0.9"
prost = "0.11"
hex-literal = "0.4.1"
ethabi = "18.0.0"
hex = "0.4.2"
bytes = "1.5.0"
anyhow = "1.0.75"

View File

@@ -0,0 +1,389 @@
use std::collections::{hash_map::Entry, HashMap};
use anyhow::{anyhow, bail};
use ethabi::{decode, ParamType};
use hex_literal::hex;
use substreams_ethereum::pb::eth::{self};
use pb::tycho::evm::v1::{self as tycho, ChangeType};
mod pb;
const AMBIENT_CONTRACT: [u8; 20] = hex!("aaaaaaaaa24eeeb8d57d431224f73832bc34f688");
const INIT_POOL_CODE: u8 = 71;
const USER_CMD_FN_SIG: [u8; 4] = [0xA1, 0x51, 0x12, 0xF9];
struct SlotValue {
new_value: Vec<u8>,
start_value: Vec<u8>,
}
impl SlotValue {
fn has_changed(&self) -> bool {
self.start_value != self.new_value
}
}
// uses a map for slots, protobuf does not
// allow bytes in hashmap keys
struct InterimContractChange {
address: Vec<u8>,
balance: Vec<u8>,
code: Vec<u8>,
slots: HashMap<Vec<u8>, SlotValue>,
change: tycho::ChangeType,
}
impl From<InterimContractChange> for tycho::ContractChange {
fn from(value: InterimContractChange) -> Self {
tycho::ContractChange {
address: value.address,
balance: value.balance,
code: value.code,
slots: value
.slots
.into_iter()
.filter(|(_, value)| value.has_changed())
.map(|(slot, value)| tycho::ContractSlot { slot, value: value.new_value })
.collect(),
change: value.change.into(),
}
}
}
/// Extracts all contract changes relevant to vm simulations
///
/// This implementation has currently two major limitations:
/// 1. It is hardwired to only care about changes to the ambient main contract, this is ok for this
/// particular use case but for a more general purpose implementation this is not ideal
/// 2. Changes are processed separately, this means that if there are any side effects between each
/// other (e.g. if account is deleted and then created again in ethereum all the storage is set
/// to 0. So there is a side effect between account creation and contract storage.) these might
/// not be properly accounted for. Most of the time this should not be a major issue but may lead
/// to wrong results so consume this implementation with care. See example below for a concrete
/// case where this is problematic.
///
/// ## A very contrived example:
/// 1. Some existing contract receives a transaction that changes it state, the state is updated
/// 2. Next, this contract has self destruct called on itself
/// 3. The contract is created again using CREATE2 at the same address
/// 4. The contract receives a transaction that changes it state
/// 5. We would emit this as as contract creation with slots set from 1 and from 4, although we
/// should only emit the slots changed from 4.
#[substreams::handlers::map]
fn map_changes(
block: eth::v2::Block,
) -> Result<tycho::BlockContractChanges, substreams::errors::Error> {
let mut block_changes = tycho::BlockContractChanges { block: None, changes: Vec::new() };
let mut tx_change = tycho::TransactionContractChanges::default();
let mut changed_contracts: HashMap<Vec<u8>, InterimContractChange> = HashMap::new();
let created_accounts: HashMap<_, _> = block
.transactions()
.flat_map(|tx| {
tx.calls.iter().flat_map(|call| {
call.account_creations
.iter()
.map(|ac| (&ac.account, ac.ordinal))
})
})
.collect();
for block_tx in block.transactions() {
// extract storage changes
let mut storage_changes = block_tx
.calls
.iter()
.filter(|call| !call.state_reverted)
.flat_map(|call| {
call.storage_changes
.iter()
.filter(|c| c.address == AMBIENT_CONTRACT)
})
.collect::<Vec<_>>();
storage_changes.sort_unstable_by_key(|change| change.ordinal);
let ambient_calls = block_tx
.calls
.iter()
.filter(|call| !call.state_reverted)
.filter(|call| call.address == AMBIENT_CONTRACT)
.collect::<Vec<_>>();
for call in ambient_calls {
if call.input.len() < 4 {
continue;
}
if call.input[0..4] == USER_CMD_FN_SIG {
let user_cmd_external_abi_types = &[
// index of the proxy sidecar the command is being called on
ParamType::Uint(16),
// call data for internal UserCmd method
ParamType::Bytes,
];
let user_cmd_internal_abi_types = &[
ParamType::Uint(8), // command
ParamType::Address, // base
ParamType::Address, // quote
ParamType::Uint(256), // pool index
ParamType::Uint(128), // price
];
// Decode external call to UserCmd
if let Ok(external_params) = decode(user_cmd_external_abi_types, &call.input[4..]) {
let cmd_bytes = external_params[1]
.to_owned()
.into_bytes()
.ok_or_else(|| {
anyhow!("Failed to convert to bytes: {:?}", &external_params[1])
})?;
// Call data is structured differently depending on the cmd code, so only
// decode if this is an init pool code.
if cmd_bytes[31] == INIT_POOL_CODE {
// Decode internal call to UserCmd
if let Ok(internal_params) = decode(user_cmd_internal_abi_types, &cmd_bytes)
{
let base = internal_params[1]
.to_owned()
.into_address()
.ok_or_else(|| {
anyhow!(
"Failed to convert to address: {:?}",
&internal_params[1]
)
})?
.to_fixed_bytes()
.to_vec();
let quote = internal_params[2]
.to_owned()
.into_address()
.ok_or_else(|| {
anyhow!(
"Failed to convert to address: {:?}",
&internal_params[2]
)
})?
.to_fixed_bytes()
.to_vec();
let pool_index = internal_params[3]
.to_owned()
.into_uint()
.ok_or_else(|| anyhow!("Failed to convert to u32".to_string()))?
.as_u32();
let static_attribute = tycho::Attribute {
name: String::from("pool_index"),
value: pool_index.to_be_bytes().to_vec(),
change: ChangeType::Creation.into(),
};
let mut tokens: Vec<Vec<u8>> = vec![base.clone(), quote.clone()];
tokens.sort();
let new_component = tycho::ProtocolComponent {
id: format!(
"{}{}{}",
hex::encode(base.clone()),
hex::encode(quote.clone()),
pool_index
),
tokens,
contracts: vec![hex::encode(AMBIENT_CONTRACT)],
static_att: vec![static_attribute],
change: ChangeType::Creation.into(),
};
tx_change
.component_changes
.push(new_component);
} else {
bail!("Failed to decode ABI internal call.".to_string());
}
}
} else {
bail!("Failed to decode ABI external call.".to_string());
}
}
}
// Note: some contracts change slot values and change them back to their
// original value before the transactions ends we remember the initial
// value before the first change and in the end filter found deltas
// that ended up not actually changing anything.
for storage_change in storage_changes.iter() {
match changed_contracts.entry(storage_change.address.clone()) {
// We have already an entry recording a change about this contract
// only append the change about this storage slot
Entry::Occupied(mut e) => {
let contract_change = e.get_mut();
match contract_change
.slots
.entry(storage_change.key.clone())
{
// The storage slot was already changed before, simply
// update new_value
Entry::Occupied(mut v) => {
let slot_value = v.get_mut();
slot_value
.new_value
.copy_from_slice(&storage_change.new_value);
}
// The storage slots is being initialised for the first time
Entry::Vacant(v) => {
v.insert(SlotValue {
new_value: storage_change.new_value.clone(),
start_value: storage_change.old_value.clone(),
});
}
}
}
// Intialise a new contract change after obsering a storage change
Entry::Vacant(e) => {
let mut slots = HashMap::new();
slots.insert(
storage_change.key.clone(),
SlotValue {
new_value: storage_change.new_value.clone(),
start_value: storage_change.old_value.clone(),
},
);
e.insert(InterimContractChange {
address: storage_change.address.clone(),
balance: Vec::new(),
code: Vec::new(),
slots,
change: if created_accounts.contains_key(&storage_change.address) {
ChangeType::Creation
} else {
ChangeType::Update
},
});
}
}
}
// extract balance changes
let mut balance_changes = block_tx
.calls
.iter()
.filter(|call| !call.state_reverted)
.flat_map(|call| {
call.balance_changes
.iter()
.filter(|c| c.address == AMBIENT_CONTRACT)
})
.collect::<Vec<_>>();
balance_changes.sort_unstable_by_key(|change| change.ordinal);
for balance_change in balance_changes.iter() {
match changed_contracts.entry(balance_change.address.clone()) {
Entry::Occupied(mut e) => {
let contract_change = e.get_mut();
if let Some(new_balance) = &balance_change.new_value {
contract_change.balance.clear();
contract_change
.balance
.extend_from_slice(&new_balance.bytes);
}
}
Entry::Vacant(e) => {
if let Some(new_balance) = &balance_change.new_value {
e.insert(InterimContractChange {
address: balance_change.address.clone(),
balance: new_balance.bytes.clone(),
code: Vec::new(),
slots: HashMap::new(),
change: if created_accounts.contains_key(&balance_change.address) {
ChangeType::Creation
} else {
ChangeType::Update
},
});
}
}
}
}
// extract code changes
let mut code_changes = block_tx
.calls
.iter()
.filter(|call| !call.state_reverted)
.flat_map(|call| {
call.code_changes
.iter()
.filter(|c| c.address == AMBIENT_CONTRACT)
})
.collect::<Vec<_>>();
code_changes.sort_unstable_by_key(|change| change.ordinal);
for code_change in code_changes.iter() {
match changed_contracts.entry(code_change.address.clone()) {
Entry::Occupied(mut e) => {
let contract_change = e.get_mut();
contract_change.code.clear();
contract_change
.code
.extend_from_slice(&code_change.new_code);
}
Entry::Vacant(e) => {
e.insert(InterimContractChange {
address: code_change.address.clone(),
balance: Vec::new(),
code: code_change.new_code.clone(),
slots: HashMap::new(),
change: if created_accounts.contains_key(&code_change.address) {
ChangeType::Creation
} else {
ChangeType::Update
},
});
}
}
}
// if there were any changes, add transaction and push the changes
if !storage_changes.is_empty() || !balance_changes.is_empty() || !code_changes.is_empty() {
tx_change.tx = Some(tycho::Transaction {
hash: block_tx.hash.clone(),
from: block_tx.from.clone(),
to: block_tx.to.clone(),
index: block_tx.index as u64,
});
// reuse changed_contracts hash map by draining it, next iteration
// will start empty. This avoids a costly reallocation
for (_, change) in changed_contracts.drain() {
tx_change
.contract_changes
.push(change.into())
}
block_changes
.changes
.push(tx_change.clone());
// clear out the interim contract changes after we pushed those.
tx_change.tx = None;
tx_change.contract_changes.clear();
}
}
block_changes.block = Some(tycho::Block {
number: block.number,
hash: block.hash.clone(),
parent_hash: block
.header
.as_ref()
.expect("Block header not present")
.parent_hash
.clone(),
ts: block.timestamp_seconds(),
});
Ok(block_changes)
}

View File

@@ -0,0 +1,10 @@
// @generated
pub mod tycho {
pub mod evm {
// @@protoc_insertion_point(attribute:tycho.evm.v1)
pub mod v1 {
include!("tycho.evm.v1.rs");
// @@protoc_insertion_point(tycho.evm.v1)
}
}
}

View File

@@ -0,0 +1,166 @@
// @generated
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Block {
#[prost(bytes="vec", tag="1")]
pub hash: ::prost::alloc::vec::Vec<u8>,
#[prost(bytes="vec", tag="2")]
pub parent_hash: ::prost::alloc::vec::Vec<u8>,
#[prost(uint64, tag="3")]
pub number: u64,
#[prost(uint64, tag="4")]
pub ts: u64,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Transaction {
#[prost(bytes="vec", tag="1")]
pub hash: ::prost::alloc::vec::Vec<u8>,
#[prost(bytes="vec", tag="2")]
pub from: ::prost::alloc::vec::Vec<u8>,
#[prost(bytes="vec", tag="3")]
pub to: ::prost::alloc::vec::Vec<u8>,
#[prost(uint64, tag="4")]
pub index: u64,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Attribute {
#[prost(string, tag="1")]
pub name: ::prost::alloc::string::String,
#[prost(bytes="vec", tag="2")]
pub value: ::prost::alloc::vec::Vec<u8>,
#[prost(enumeration="ChangeType", tag="3")]
pub change: i32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ProtocolComponent {
#[prost(string, tag="1")]
pub id: ::prost::alloc::string::String,
#[prost(bytes="vec", repeated, tag="2")]
pub tokens: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
#[prost(string, repeated, tag="3")]
pub contracts: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
#[prost(message, repeated, tag="4")]
pub static_att: ::prost::alloc::vec::Vec<Attribute>,
#[prost(enumeration="ChangeType", tag="5")]
pub change: i32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct BalanceChange {
#[prost(bytes="vec", tag="1")]
pub token: ::prost::alloc::vec::Vec<u8>,
#[prost(bytes="vec", tag="2")]
pub balance: ::prost::alloc::vec::Vec<u8>,
#[prost(bytes="vec", tag="3")]
pub component_id: ::prost::alloc::vec::Vec<u8>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum ChangeType {
Unspecified = 0,
Update = 1,
Creation = 2,
Deletion = 3,
}
impl ChangeType {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
ChangeType::Unspecified => "CHANGE_TYPE_UNSPECIFIED",
ChangeType::Update => "CHANGE_TYPE_UPDATE",
ChangeType::Creation => "CHANGE_TYPE_CREATION",
ChangeType::Deletion => "CHANGE_TYPE_DELETION",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"CHANGE_TYPE_UNSPECIFIED" => Some(Self::Unspecified),
"CHANGE_TYPE_UPDATE" => Some(Self::Update),
"CHANGE_TYPE_CREATION" => Some(Self::Creation),
"CHANGE_TYPE_DELETION" => Some(Self::Deletion),
_ => None,
}
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct EntityChanges {
#[prost(string, tag="1")]
pub component_id: ::prost::alloc::string::String,
#[prost(message, repeated, tag="2")]
pub attributes: ::prost::alloc::vec::Vec<Attribute>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TransactionEntityChanges {
#[prost(message, optional, tag="1")]
pub tx: ::core::option::Option<Transaction>,
#[prost(message, repeated, tag="2")]
pub entity_changes: ::prost::alloc::vec::Vec<EntityChanges>,
#[prost(message, repeated, tag="3")]
pub component_changes: ::prost::alloc::vec::Vec<ProtocolComponent>,
#[prost(message, repeated, tag="4")]
pub balance_changes: ::prost::alloc::vec::Vec<BalanceChange>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct BlockEntityChanges {
#[prost(message, optional, tag="1")]
pub block: ::core::option::Option<Block>,
#[prost(message, repeated, tag="2")]
pub changes: ::prost::alloc::vec::Vec<TransactionEntityChanges>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ContractSlot {
#[prost(bytes="vec", tag="2")]
pub slot: ::prost::alloc::vec::Vec<u8>,
#[prost(bytes="vec", tag="3")]
pub value: ::prost::alloc::vec::Vec<u8>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ContractChange {
#[prost(bytes="vec", tag="1")]
pub address: ::prost::alloc::vec::Vec<u8>,
/// empty bytes indicates no change
#[prost(bytes="vec", tag="2")]
pub balance: ::prost::alloc::vec::Vec<u8>,
/// empty bytes indicates no change
#[prost(bytes="vec", tag="3")]
pub code: ::prost::alloc::vec::Vec<u8>,
/// empty sequence indicates no change
#[prost(message, repeated, tag="4")]
pub slots: ::prost::alloc::vec::Vec<ContractSlot>,
/// Whether this is an update, creation or deletion
#[prost(enumeration="ChangeType", tag="5")]
pub change: i32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TransactionContractChanges {
#[prost(message, optional, tag="1")]
pub tx: ::core::option::Option<Transaction>,
#[prost(message, repeated, tag="2")]
pub contract_changes: ::prost::alloc::vec::Vec<ContractChange>,
#[prost(message, repeated, tag="3")]
pub component_changes: ::prost::alloc::vec::Vec<ProtocolComponent>,
#[prost(message, repeated, tag="4")]
pub balance_changes: ::prost::alloc::vec::Vec<BalanceChange>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct BlockContractChanges {
#[prost(message, optional, tag="1")]
pub block: ::core::option::Option<Block>,
#[prost(message, repeated, tag="2")]
pub changes: ::prost::alloc::vec::Vec<TransactionContractChanges>,
}
// @@protoc_insertion_point(module)

View File

@@ -0,0 +1,24 @@
specVersion: v0.1.0
package:
name: "substreams_ethereum_ambient"
version: v0.3.0
protobuf:
files:
- vm.proto
- common.proto
importPaths:
- ../../proto/tycho/evm/v1/
binaries:
default:
type: wasm/rust-v1
file: ../../target/wasm32-unknown-unknown/substreams/substreams_ethereum_ambient.wasm
modules:
- name: map_changes
kind: map
inputs:
- source: sf.ethereum.type.v2.Block
output:
type: proto:tycho.evm.state.v1.BlockContractChanges