feat: using Transfer and other various fixes

This commit is contained in:
0xMochan
2024-05-23 17:43:46 -04:00
parent 379baebfb7
commit 8c37a74ea6
11 changed files with 9288 additions and 246 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -28,15 +28,6 @@
"C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
]
},
{
"name": "crvUSD/USDC",
"address": "4dece678ceceb27446b35c672dc7d61f30bad69e",
"tx_hash": "4b288a16fe2f1c556752478989d88ef6e7a2c9a0cd258a12b11b232a112dc279",
"tokens": [
"a0b86991c6218b36c1d19d4a2e9eb0ce3606eb48",
"f939e0a03fb07f59a73314e73794be0e57ac1b4e"
]
},
{
"name": "susd",
"address": "A5407eAE9Ba41422680e2e00537571bcC53efBfD",

View File

@@ -3,6 +3,8 @@ from typing import Any
PARAMETERS = "params.json"
EMPTY = "0x0000000000000000000000000000000000000000"
INITIAL_BLOCK = 17258001
def encode_json_to_query_params(params: list[dict[str, Any]]):
encoded_params = []
@@ -11,8 +13,9 @@ def encode_json_to_query_params(params: list[dict[str, Any]]):
address: str = param["address"]
tx_hash: str = param["tx_hash"]
tokens: list[str] = param["tokens"]
attributes: dict[str, str] = param["attributes"]
attributes: dict[str, str] = param.get("attributes", {})
attributes["name"] = param["name"]
attributes["factory_name"] = "NA"
attributes["factory"] = EMPTY
encoded_address = f"address={address}"

View File

@@ -0,0 +1 @@
substreams gui -e mainnet.eth.streamingfast.io:443 substreams.yaml map_protocol_changes --start-block 19933736 --stop-block +50 -p map_components=`python params.py`

View File

@@ -0,0 +1,2 @@
[toolchain]
channel = "nightly"

File diff suppressed because it is too large Load Diff

View File

@@ -6,6 +6,7 @@ pub mod pool_crypto_swap_ng;
pub mod pool_tricrypto2;
pub mod crypto_swap_ng_factory;
pub mod test;
pub mod meta_registry;
pub mod pool_steth;
pub mod pool;
pub mod tricrypto_factory;

View File

@@ -5,10 +5,7 @@ use itertools::Itertools;
use substreams::{
pb::substreams::StoreDeltas,
scalar::BigInt,
store::{
StoreAdd, StoreAddBigInt, StoreAddInt64, StoreGet, StoreGetInt64, StoreGetString, StoreNew,
StoreSet, StoreSetString,
},
store::{StoreAddBigInt, StoreGet, StoreGetString, StoreNew, StoreSet, StoreSetString},
};
use substreams_ethereum::pb::eth;
@@ -112,7 +109,6 @@ pub fn map_relative_balances(
block
.logs()
.filter_map(|log| emit_deltas(log, &tokens_store))
.flat_map(|deltas| deltas.into_iter())
.collect::<Vec<_>>()
},
})

View File

@@ -1,7 +1,4 @@
use substreams::{
scalar::BigInt,
store::{StoreGet, StoreGetInt64, StoreGetString},
};
use substreams::store::{StoreGet, StoreGetString};
use substreams_ethereum::{block_view::LogView, Event};
use tycho_substreams::prelude::*;
@@ -16,219 +13,42 @@ fn tx_from_log(log: &LogView) -> Transaction {
}
}
/// This function emits balance deltas for mints, burns, and exchanges in Curve pools. Since some
/// pools contain differing ABIs, we load in several examples of abis in order to best match the
/// topic ID to the correct event. The repetition in this function is dervived from the fact that
/// most of these events have similar structures, but the specific topic id differs.
pub fn emit_deltas(log: LogView, tokens_store: &StoreGetString) -> Option<Vec<BalanceDelta>> {
let pool_key = format!("pool:{}", hex::encode(&log.address()));
let tokens = tokens_store
fn get_pool_tokens(pool_address: &Vec<u8>, tokens_store: &StoreGetString) -> Option<Vec<String>> {
let pool_key = format!("pool:{}", hex::encode(&pool_address));
Some(
tokens_store
.get_last(pool_key)?
.split(":")
.map(|token| token.to_owned())
.collect::<Vec<_>>();
.collect::<Vec<_>>(),
)
}
if let Some(event) = abi::pool::events::TokenExchange::match_and_decode(log) {
token_change_deltas(tokens, event, log)
} else if let Some(event) = abi::pool_3pool::events::TokenExchange::match_and_decode(log) {
token_change_deltas(
tokens,
abi::pool::events::TokenExchange {
sold_id: event.sold_id,
bought_id: event.bought_id,
tokens_sold: event.tokens_sold,
tokens_bought: event.tokens_bought,
buyer: event.buyer,
},
log,
)
} else if let Some(event) = abi::pool_steth::events::TokenExchange::match_and_decode(log) {
token_change_deltas(
tokens,
abi::pool::events::TokenExchange {
sold_id: event.sold_id,
bought_id: event.bought_id,
tokens_sold: event.tokens_sold,
tokens_bought: event.tokens_bought,
buyer: event.buyer,
},
log,
)
} else if let Some(event) = abi::pool_tricrypto::events::TokenExchange::match_and_decode(log) {
token_change_deltas(
tokens,
abi::pool::events::TokenExchange {
sold_id: event.sold_id,
bought_id: event.bought_id,
tokens_sold: event.tokens_sold,
tokens_bought: event.tokens_bought,
buyer: event.buyer,
},
log,
)
} else if let Some(event) = abi::pool_tricrypto2::events::TokenExchange::match_and_decode(log) {
token_change_deltas(
tokens,
abi::pool::events::TokenExchange {
sold_id: event.sold_id,
bought_id: event.bought_id,
tokens_sold: event.tokens_sold,
tokens_bought: event.tokens_bought,
buyer: event.buyer,
},
log,
)
} else if let Some(event) =
abi::pool_crypto_swap_ng::events::TokenExchange::match_and_decode(log)
{
token_change_deltas(
tokens,
abi::pool::events::TokenExchange {
sold_id: event.sold_id,
bought_id: event.bought_id,
tokens_sold: event.tokens_sold,
tokens_bought: event.tokens_bought,
buyer: event.buyer,
},
log,
)
} else if let Some(event) =
abi::pool_crypto_swap_ng::events::TokenExchangeUnderlying::match_and_decode(log)
{
token_change_deltas(
tokens,
abi::pool::events::TokenExchange {
sold_id: event.sold_id,
bought_id: event.bought_id,
tokens_sold: event.tokens_sold,
tokens_bought: event.tokens_bought,
buyer: event.buyer,
},
log,
)
} else if let Some(event) = abi::pool::events::AddLiquidity::match_and_decode(log) {
add_liquidity_deltas(event.token_amounts.into(), &tokens, log)
} else if let Some(event) = abi::pool_3pool::events::AddLiquidity::match_and_decode(log) {
add_liquidity_deltas(event.token_amounts.into(), &tokens, log)
} else if let Some(event) = abi::pool_steth::events::AddLiquidity::match_and_decode(log) {
add_liquidity_deltas(event.token_amounts.into(), &tokens, log)
} else if let Some(event) = abi::pool_tricrypto::events::AddLiquidity::match_and_decode(log) {
add_liquidity_deltas(event.token_amounts.into(), &tokens, log)
} else if let Some(event) = abi::pool_tricrypto2::events::AddLiquidity::match_and_decode(log) {
add_liquidity_deltas(event.token_amounts.into(), &tokens, log)
} else if let Some(event) =
abi::pool_crypto_swap_ng::events::AddLiquidity::match_and_decode(log)
{
add_liquidity_deltas(event.token_amounts.into(), &tokens, log)
} else if let Some(event) = abi::pool::events::RemoveLiquidity::match_and_decode(log) {
remove_liquidity_deltas(event.token_amounts.into(), &tokens, log)
} else if let Some(event) = abi::pool_3pool::events::RemoveLiquidity::match_and_decode(log) {
remove_liquidity_deltas(event.token_amounts.into(), &tokens, log)
} else if let Some(event) = abi::pool_steth::events::RemoveLiquidity::match_and_decode(log) {
remove_liquidity_deltas(event.token_amounts.into(), &tokens, log)
} else if let Some(event) = abi::pool_tricrypto::events::RemoveLiquidity::match_and_decode(log)
{
remove_liquidity_deltas(event.token_amounts.into(), &tokens, log)
} else if let Some(event) = abi::pool_tricrypto2::events::RemoveLiquidity::match_and_decode(log)
{
remove_liquidity_deltas(event.token_amounts.into(), &tokens, log)
} else if let Some(event) =
abi::pool_crypto_swap_ng::events::RemoveLiquidity::match_and_decode(log)
{
remove_liquidity_deltas(event.token_amounts.into(), &tokens, log)
// } else if let Some(event) =
// abi::pool_crypto_swap_ng::events::RemoveLiquidityImbalance::match_and_decode(log)
// {
// remove_liquidity_deltas(event.token_amounts.into(), &tokens, log)
// } else if let Some(event) =
// abi::pool_crypto_swap_ng::events::RemoveLiquidityOne::match_and_decode(log)
// {
// Some(vec![
// BalanceDelta {
// ord: log.log.ordinal,
// tx: Some(tx_from_log(&log)),
// token: hex::decode(sold_token_id.clone()).unwrap(),
// delta: event.tokens_sold.to_signed_bytes_be(),
// component_id: hex::encode(log.address()).into(),
// },
// BalanceDelta {
// ord: log.log.ordinal,
// tx: Some(tx_from_log(&log)),
// token: hex::decode(sold_token_id.clone()).unwrap(),
// delta: event.tokens_sold.to_signed_bytes_be(),
// component_id: hex::encode(log.address()).into(),
// },
// ])
/// TODO rewrite
pub fn emit_deltas(log: LogView, tokens_store: &StoreGetString) -> Option<BalanceDelta> {
let transfer = abi::ERC20::events::Transfer::match_and_decode(log)?;
let (component_id, pool_tokens, is_incoming) =
if let Some(pool_tokens) = get_pool_tokens(&transfer.to, tokens_store) {
(hex::encode(&transfer.to), pool_tokens, true)
} else if let Some(pool_tokens) = get_pool_tokens(&transfer.from, tokens_store) {
(hex::encode(&transfer.from), pool_tokens, false)
} else {
return None;
};
let token_id = hex::encode(log.address());
if pool_tokens.contains(&token_id) {
let delta = if is_incoming { transfer.value } else { transfer.value * -1 };
Some(BalanceDelta {
ord: log.log.ordinal,
tx: Some(tx_from_log(&log)),
token: hex::decode(token_id).unwrap(),
delta: delta.to_signed_bytes_be(),
component_id: component_id.into(),
})
} else {
substreams::log::info!("Token {:?} not in pool: {:?}", token_id, &component_id);
None
}
}
fn token_change_deltas(
tokens: Vec<String>,
event: abi::pool::events::TokenExchange,
log: LogView<'_>,
) -> Option<Vec<BalanceDelta>> {
let tokens_bought_delta: BigInt = event.tokens_bought * -1;
let sold_token_id = &tokens[event.sold_id.to_u64() as usize];
let bought_token_id = &tokens[event.bought_id.to_u64() as usize];
Some(vec![
BalanceDelta {
ord: log.log.ordinal,
tx: Some(tx_from_log(&log)),
token: hex::decode(sold_token_id.clone()).unwrap(),
delta: event.tokens_sold.to_signed_bytes_be(),
component_id: hex::encode(log.address()).into(),
},
BalanceDelta {
ord: log.log.ordinal,
tx: Some(tx_from_log(&log)),
token: hex::decode(bought_token_id.clone()).unwrap(),
delta: tokens_bought_delta.to_signed_bytes_be(),
component_id: hex::encode(log.address()).into(),
},
])
}
fn add_liquidity_deltas(
amounts: Vec<BigInt>,
tokens: &Vec<String>,
log: LogView<'_>,
) -> Option<Vec<BalanceDelta>> {
Some(
amounts
.iter()
.zip(tokens)
.map(move |(token_amount, token_id)| BalanceDelta {
ord: log.log.ordinal,
tx: Some(tx_from_log(&log)),
token: hex::decode(token_id).unwrap(),
delta: token_amount.to_signed_bytes_be(),
component_id: hex::encode(log.address()).into(),
})
.collect::<Vec<_>>(),
)
}
fn remove_liquidity_deltas(
amounts: Vec<BigInt>,
tokens: &Vec<String>,
log: LogView<'_>,
) -> Option<Vec<BalanceDelta>> {
Some(
amounts
.iter()
.zip(tokens)
.map(move |(token_amount, token_id)| {
let token_amount_neg: BigInt = token_amount.clone() * -1;
BalanceDelta {
ord: log.log.ordinal,
tx: Some(tx_from_log(&log)),
token: hex::decode(token_id).unwrap(),
delta: token_amount_neg.to_signed_bytes_be(),
component_id: hex::encode(log.address()).into(),
}
})
.collect::<Vec<_>>(),
)
}

View File

@@ -9,8 +9,7 @@ use tycho_substreams::prelude::*;
use substreams::scalar::BigInt;
const EMPTY_BYTES32: [u8; 32] = [0; 32];
const EMPTY_ADDRESS: [u8; 20] = hex!("0000000000000000000000000000000000000000");
const META_REGISTRY: [u8; 20] = hex!("F98B45FA17DE75FB1aD0e7aFD971b0ca00e379fC");
const CRYPTO_POOL_FACTORY: [u8; 20] = hex!("F18056Bbd320E96A48e3Fbf8bC061322531aac99");
const META_POOL_FACTORY: [u8; 20] = hex!("B9fC157394Af804a3578134A6585C0dc9cc990d4");
@@ -42,10 +41,28 @@ impl SerializableVecBigInt for Vec<BigInt> {
}
}
/// Converts address bytes into a string containing a leading `0x`.
fn address_to_bytes_with_0x(address: &[u8; 20]) -> Vec<u8> {
format!("0x{}", hex::encode(address)).into_bytes()
}
/// This massive function matches factory address to specific logic to construct
/// `ProtocolComponent`s. While, most of the logic is readily replicable, several factories differ
/// in information density resulting in needing other information sources such as decoding calls
/// or even making RPC calls to provide extra details.
///
/// Each `ProtocolComponent` contains the following static attributes:
/// - `pool_type`: The type of pool, such as `crypto_pool`, `plain_pool`, `metapool`, etc.
/// - `name`: The name of the pool.
/// - `factory_name`: The name of the factory that created the pool.
/// - `factory`: The address of the factory that created the pool.
///
/// The basic flow of this function is as follows:
/// - Match the factory address
/// - Decode the relevant event from the log
/// - Attempt to decode the cooresponding function call (based on the permutation of the ABI)
/// - Optionally make an RPC call to produce further information (see metapools)
/// - Construct the cooresponding `ProtocolComponent`
pub fn address_map(
call_address: &[u8; 20],
log: &Log,
@@ -137,6 +154,7 @@ pub fn address_map(
)
})?;
// The return data of several of these calls contain the actual component id
let component_id = &call.return_data[12..];
Some(ProtocolComponent {
@@ -201,6 +219,13 @@ pub fn address_map(
let component_id = &call.return_data[12..];
// The `add_pool.base_pool` may only refer to the contract of the base pool and not
// the token itself. This means we **have** to make an RPC call to the
// `meta_registry` in order to get the real LP token address.
let get_lp_token =
abi::meta_registry::functions::GetLpToken1 { pool: add_pool.base_pool.clone() };
let lp_token = get_lp_token.call(META_REGISTRY.to_vec())?;
Some(ProtocolComponent {
id: hex::encode(component_id),
tx: Some(Transaction {
@@ -209,8 +234,8 @@ pub fn address_map(
hash: tx.hash.clone(),
index: tx.index.into(),
}),
tokens: vec![pool_added.coin, add_pool.base_pool.clone()],
contracts: vec![component_id.into(), add_pool.base_pool.clone()],
tokens: vec![pool_added.coin, lp_token],
contracts: vec![component_id.into(), add_pool.base_pool],
static_att: vec![
Attribute {
name: "pool_type".into(),
@@ -268,6 +293,10 @@ pub fn address_map(
let component_id = &call.return_data[12..];
let get_lp_token =
abi::meta_registry::functions::GetLpToken1 { pool: add_pool.base_pool.clone() };
let lp_token = get_lp_token.call(META_REGISTRY.to_vec())?;
Some(ProtocolComponent {
id: hex::encode(component_id),
tx: Some(Transaction {
@@ -276,8 +305,8 @@ pub fn address_map(
hash: tx.hash.clone(),
index: tx.index.into(),
}),
tokens: vec![pool_added.coin, add_pool.base_pool.clone()],
contracts: vec![component_id.into(), add_pool.base_pool.clone()],
tokens: vec![pool_added.coin, lp_token],
contracts: vec![component_id.into(), add_pool.base_pool],
static_att: vec![
Attribute {
name: "pool_type".into(),
@@ -367,6 +396,11 @@ pub fn address_map(
let add_pool =
abi::crypto_swap_ng_factory::functions::DeployMetapool::match_and_decode(call)?;
let component_id = &call.return_data[12..];
let get_lp_token =
abi::meta_registry::functions::GetLpToken1 { pool: add_pool.base_pool.clone() };
let lp_token = get_lp_token.call(META_REGISTRY.to_vec())?;
Some(ProtocolComponent {
id: hex::encode(component_id),
tx: Some(Transaction {
@@ -375,8 +409,8 @@ pub fn address_map(
hash: tx.hash.clone(),
index: tx.index.into(),
}),
tokens: vec![pool_added.coin, pool_added.base_pool],
contracts: vec![component_id.into()],
tokens: vec![pool_added.coin, lp_token],
contracts: vec![component_id.into(), pool_added.base_pool],
static_att: vec![
Attribute {
name: "pool_type".into(),
@@ -558,6 +592,11 @@ pub fn address_map(
return None;
};
let component_id = &call.return_data[12..];
let get_lp_token =
abi::meta_registry::functions::GetLpToken1 { pool: add_pool.base_pool.clone() };
let lp_token = get_lp_token.call(META_REGISTRY.to_vec())?;
Some(ProtocolComponent {
id: hex::encode(component_id),
tx: Some(Transaction {
@@ -566,8 +605,8 @@ pub fn address_map(
hash: tx.hash.clone(),
index: tx.index.into(),
}),
tokens: vec![pool_added.coin, pool_added.base_pool],
contracts: vec![component_id.into()],
tokens: vec![pool_added.coin, lp_token],
contracts: vec![component_id.into(), pool_added.base_pool],
static_att: vec![
Attribute {
name: "pool_type".into(),

View File

@@ -20,7 +20,7 @@ binaries:
modules:
- name: map_components
kind: map
initialBlock: 18830232
initialBlock: 19933736
inputs:
- params: string
- source: sf.ethereum.type.v2.Block
@@ -29,7 +29,7 @@ modules:
- name: store_component_tokens
kind: store
initialBlock: 18830232
initialBlock: 19933736
updatePolicy: set
valueType: string
inputs:
@@ -37,17 +37,16 @@ modules:
- name: map_relative_balances
kind: map
initialBlock: 18830232 # An arbitrary block that should change based on your requirements
initialBlock: 19933736 # An arbitrary block that should change based on your requirements
inputs:
- source: sf.ethereum.type.v2.Block
- store: store_components
- store: store_component_tokens
output:
type: proto:tycho.evm.v1.BlockBalanceDeltas
- name: store_balances
kind: store
initialBlock: 18830232
initialBlock: 19933736
updatePolicy: add
valueType: bigint
inputs:
@@ -55,7 +54,7 @@ modules:
- name: map_protocol_changes
kind: map
initialBlock: 18830232
initialBlock: 19933736
inputs:
- source: sf.ethereum.type.v2.Block
- map: map_components