Merge pull request #57 from propeller-heads/tl/sdk-improvements

SDK Improvements
This commit is contained in:
tvinagre
2024-08-08 17:58:36 -03:00
committed by GitHub
39 changed files with 801 additions and 2817 deletions

View File

@@ -16,7 +16,7 @@ This is planned to be resolved with the dynamic contract indexing module.
|--------------------|-------|---------------------------------------------------------------------------------------------------------|
| pool_type | str | A unique identifier per pool type. Set depending on the factory |
| normalized weights | json | The normalised weights of a weighted pool. |
| pool_id | bytes | The balancer pool id. |
| pool_id | str | A hex encoded balancer pool id. |
| rate_providers | json | A list of rate provider addresses. |
| bpt | bytes | The balancer lp token, set if the pool support entering and exiting lp postions via the swap interface. |
| main_token | bytes | The main token address for a linear pool |

View File

@@ -0,0 +1,169 @@
substreams_yaml_path: ./substreams.yaml
protocol_type_names:
- "balancer_pool"
adapter_contract: "BalancerV2SwapAdapter"
adapter_build_signature: "constructor(address)"
adapter_build_args: "0xBA12222222228d8Ba445958a75a0704d566BF2C8"
skip_balance_check: true
initialized_accounts:
- "0xba12222222228d8ba445958a75a0704d566bf2c8"
# Uncomment entries below to include composable stable pool dependencies
# wstETH dependencies
# - "0x72D07D7DcA67b8A406aD1Ec34ce969c90bFEE768"
# - "0xb8ffc3cd6e7cf5a098a1c92f48009765b24088dc"
# - "0xae7ab96520de3a18e5e111b5eaab095312d7fe84"
# - "0x7f39c581f595b53c5cb19bd0b3f8da6c935e2ca0"
# - "0x2b33cf282f867a7ff693a66e11b0fcc5552e4425"
# - "0x17144556fd3424edc8fc8a4c940b2d04936d17eb"
# sfrxETH dependencies
# - "0x302013E7936a39c358d07A3Df55dc94EC417E3a1"
# - "0xac3e018457b222d93114458476f3e3416abbe38f"
# rETH dependencies
# - "0x1a8F81c256aee9C640e14bB0453ce247ea0DFE6F"
# - "0x07fcabcbe4ff0d80c2b1eb42855c0131b6cba2f4"
# - "0x1d8f8f00cfa6758d7be78336684788fb0ee0fa46"
# - "0xae78736cd615f374d3085123a210448e74fc6393"
tests:
# WeightedPoolFactory - 0x897888115Ada5773E02aA29F775430BFB5F34c51
- name: test_weighted_pool_creation
start_block: 20128706
stop_block: 20128806
expected_components:
- id: "0xe96a45f66bdDA121B24F0a861372A72E8889523d"
tokens:
- "0x38C2a4a7330b22788374B8Ff70BBa513C8D848cA"
- "0x514910771AF9Ca656af840dff83E8264EcF986CA"
static_attributes:
rate_providers: "0x5b22307830303030303030303030303030303030303030303030303030303030303030303030303030303030222c22307830303030303030303030303030303030303030303030303030303030303030303030303030303030225d"
pool_id: "0x307865393661343566363662646461313231623234663061383631333732613732653838383935323364303030323030303030303030303030303030303030363962"
normalized_weights: "0x5b22307830623161326263326563353030303030222c22307830326336386166306262313430303030225d"
fee: "0x11c37937e08000"
manual_updates: "0x01"
pool_type: "0x5765696768746564506f6f6c466163746f7279"
creation_tx: "0xa63c671046ad2075ec8ea83ac21199cf3e3a5f433e72ec4c117cbabfb9b18de2"
# WeightedPool2TokensFactory - 0xA5bf2ddF098bb0Ef6d120C98217dD6B141c74EE0
- name: weighted_legacy_creation
start_block: 13148365
stop_block: 13148465
expected_components:
- id: "0xBF96189Eee9357a95C7719f4F5047F76bdE804E5"
tokens:
- "0x5A98FcBEA516Cf06857215779Fd812CA3beF1B32"
- "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
static_attributes:
pool_id: "0x307862663936313839656565393335376139356337373139663466353034376637366264653830346535303030323030303030303030303030303030303030303837"
weights: "0x5b22307830623161326263326563353030303030222c22307830326336386166306262313430303030225d"
fee: "0x08e1bc9bf04000"
manual_updates: "0x01"
pool_type: "0x5765696768746564506f6f6c32546f6b656e73466163746f7279"
creation_tx: "0xdced662e41b1608c386551bbc89894a10321fd8bd58782e22077d1044cf99cb5"
# ComposableStablePoolFactory - 0xDB8d758BCb971e482B2C45f7F8a7740283A1bd3A
- name: test_composable_stable_pool_creation
start_block: 17677300
stop_block: 17678400
expected_components:
- id: "0x42ED016F826165C2e5976fe5bC3df540C5aD0Af7"
tokens:
- "0x42ed016f826165c2e5976fe5bc3df540c5ad0af7"
- "0x7f39C581F595B53c5cb19bD0b3f8dA6c935E2Ca0"
- "0xac3E018457B222d93114458476f3E3416Abbe38F"
- "0xae78736Cd615f374D3085123A210448E74Fc6393"
static_attributes:
rate_providers: "0x5b22307837326430376437646361363762386134303661643165633334636539363963393062666565373638222c22307833303230313365373933366133396333353864303761336466353564633934656334313765336131222c22307831613866383163323536616565396336343065313462623034353363653234376561306466653666225d"
pool_id: "0x307834326564303136663832363136356332653539373666653562633364663534306335616430616637303030303030303030303030303030303030303030353862"
bpt: "0x42ed016f826165c2e5976fe5bc3df540c5ad0af7"
fee: "0x5af3107a4000"
manual_updates: "0x01"
pool_type: "0x436f6d706f7361626c65537461626c65506f6f6c466163746f7279"
skip_simulation: true
creation_tx: "0x53ff6bab0d8a76a998e29e59da8068ad906ae85507a1c2fbf2505e2cb52fd754"
# ERC4626LinearPoolFactory - 0x813EE7a840CE909E7Fea2117A44a90b8063bd4fd
- name: test_erc4626_linear_pool_creation
start_block: 17480142
stop_block: 17480242
expected_components:
- id: "0x3fCb7085B8F2F473F80bF6D879cAe99eA4DE9344"
tokens:
- "0x39Dd7790e75C6F663731f7E1FdC0f35007D3879b"
- "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
- "0x3fcb7085b8f2f473f80bf6d879cae99ea4de9344"
static_attributes:
pool_id: "0x307833666362373038356238663266343733663830626636643837396361653939656134646539333434303030303030303030303030303030303030303030353664"
wrapped_token: "0x39dd7790e75c6f663731f7e1fdc0f35007d3879b"
fee: "0x00b5e620f48000"
manual_updates: "0x01"
pool_type: "0x455243343632364c696e656172506f6f6c466163746f7279"
upper_target: "0x108b2a2c28029094000000"
bpt: "0x3fcb7085b8f2f473f80bf6d879cae99ea4de9344"
main_token: "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"
skip_simulation: true
creation_tx: "0x5ff97870685370bab3876a4335d28c42e24659064fe78b486d6fb1b37b992877"
# EulerLinearPoolFactory - 0x5F43FBa61f63Fa6bFF101a0A0458cEA917f6B347
- name: test_euler_linear_pool_creation
start_block: 16588117
stop_block: 16588217
expected_components:
- id: "0xD4e7C1F3DA1144c9E2CfD1b015eDA7652b4a4399"
tokens:
- "0xD4e7C1F3DA1144c9E2CfD1b015eDA7652b4a4399"
- "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"
- "0xEb91861f8A4e1C12333F42DCE8fB0Ecdc28dA716"
static_attributes:
pool_id: "0x307864346537633166336461313134346339653263666431623031356564613736353262346134333939303030303030303030303030303030303030303030343661"
wrapped_token: "0xeb91861f8a4e1c12333f42dce8fb0ecdc28da716"
fee: "0x00b5e620f48000"
manual_updates: "0x01"
pool_type: "0x45756c65724c696e656172506f6f6c466163746f7279"
upper_target: "0x108b2a2c28029094000000"
bpt: "0xd4e7c1f3da1144c9e2cfd1b015eda7652b4a4399"
main_token: "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"
skip_simulation: true
creation_tx: "0x4a9ea683052afefdae3d189862868c3a7dc8f431d1d9828b6bfd9451a8816426"
# SiloLinearPoolFactory - 0x4E11AEec21baF1660b1a46472963cB3DA7811C89
- name: test_silo_linear_pool_creation
start_block: 17173185
stop_block: 17173187
expected_components:
- id: "0x74CBfAF94A3577c539a9dCEE9870A6349a33b34f"
tokens:
- "0x192E67544694a7bAA2DeA94f9B1Df58BB3395A12"
- "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
- "0x74cbfaf94a3577c539a9dcee9870a6349a33b34f"
static_attributes:
pool_id: "0x307837346362666166393461333537376335333961396463656539383730613633343961333362333466303030303030303030303030303030303030303030353334"
wrapped_token: "0x192e67544694a7baa2dea94f9b1df58bb3395a12"
fee: "0x00e8d4a51000"
manual_updates: "0x01"
pool_type: "0x53696c6f4c696e656172506f6f6c466163746f7279"
upper_target: "0x00"
bpt: "0x74cbfaf94a3577c539a9dcee9870a6349a33b34f"
main_token: "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"
skip_simulation: true
creation_tx: "0x215c9f4256ab450368132f4063611ae8cdd98e80bea7e44ecf0600ed1d757018"
# YearnLinearPoolFactory - 0x5F5222Ffa40F2AEd6380D022184D6ea67C776eE0a
- name: test_yearn_linear_pool_creation
start_block: 17052601
stop_block: 17052605
expected_components:
- id: "0xac5b4ef7ede2f2843a704e96dcaa637f4ba3dc3f"
tokens:
- "0x806E02Dea8d4a0882caD9fA3Fa75B212328692dE"
- "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
- "0xac5b4ef7ede2f2843a704e96dcaa637f4ba3dc3f"
static_attributes:
pool_id: "0x307861633562346566376564653266323834336137303465393664636161363337663462613364633366303030303030303030303030303030303030303030353164"
wrapped_token: "0x806e02dea8d4a0882cad9fa3fa75b212328692de"
fee: "0x00e8d4a51000"
manual_updates: "0x01"
pool_type: "0x596561726e4c696e656172506f6f6c466163746f7279"
upper_target: "0x00"
bpt: "0xac5b4ef7ede2f2843a704e96dcaa637f4ba3dc3f"
main_token: "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"
skip_simulation: true
creation_tx: "0x497aa03ce84d236c183204ddfc6762c8e4158da1ebc5e7e18e7f6cceaa497a2a"

View File

@@ -70,7 +70,10 @@ pub fn address_map(
"normalized_weights",
&json_serialize_bigint_list(&create_call.normalized_weights),
),
("pool_id", &pool_registered.pool_id),
(
"pool_id",
format!("0x{}", hex::encode(pool_registered.pool_id)).as_bytes(),
),
(
"rate_providers",
&json_serialize_address_list(&create_call.rate_providers),
@@ -100,7 +103,10 @@ pub fn address_map(
.with_tokens(&tokens_registered.tokens)
.with_attributes(&[
("pool_type", "ComposableStablePoolFactory".as_bytes()),
("pool_id", &pool_registered.pool_id),
(
"pool_id",
format!("0x{}", hex::encode(pool_registered.pool_id)).as_bytes(),
),
("bpt", &pool_created.pool),
(
"fee",
@@ -137,7 +143,10 @@ pub fn address_map(
.upper_target
.to_signed_bytes_be(),
),
("pool_id", &pool_registered.pool_id),
(
"pool_id",
format!("0x{}", hex::encode(pool_registered.pool_id)).as_bytes(),
),
("manual_updates", &[1u8]),
("bpt", &pool_created.pool),
("main_token", &create_call.main_token),
@@ -172,7 +181,10 @@ pub fn address_map(
.upper_target
.to_signed_bytes_be(),
),
("pool_id", &pool_registered.pool_id),
(
"pool_id",
format!("0x{}", hex::encode(pool_registered.pool_id)).as_bytes(),
),
("manual_updates", &[1u8]),
("bpt", &pool_created.pool),
("main_token", &create_call.main_token),
@@ -255,7 +267,10 @@ pub fn address_map(
.upper_target
.to_signed_bytes_be(),
),
("pool_id", &pool_registered.pool_id),
(
"pool_id",
format!("0x{}", hex::encode(pool_registered.pool_id)).as_bytes(),
),
("manual_updates", &[1u8]),
("bpt", &pool_created.pool),
("main_token", &create_call.main_token),
@@ -290,7 +305,10 @@ pub fn address_map(
.upper_target
.to_signed_bytes_be(),
),
("pool_id", &pool_registered.pool_id),
(
"pool_id",
format!("0x{}", hex::encode(pool_registered.pool_id)).as_bytes(),
),
("manual_updates", &[1u8]),
("bpt", &pool_created.pool),
("main_token", &create_call.main_token),
@@ -321,7 +339,10 @@ pub fn address_map(
.with_attributes(&[
("pool_type", "WeightedPool2TokensFactory".as_bytes()),
("weights", &json_serialize_bigint_list(&create_call.weights)),
("pool_id", &pool_registered.pool_id),
(
"pool_id",
format!("0x{}", hex::encode(pool_registered.pool_id)).as_bytes(),
),
(
"fee",
&create_call

View File

@@ -1,125 +0,0 @@
substreams_yaml_path: ./substreams.yaml
protocol_type_names:
- "balancer_pool"
adapter_contract: "BalancerV2SwapAdapter"
skip_balance_check: true
initialized_accounts:
- "0xba12222222228d8ba445958a75a0704d566bf2c8"
# Uncomment entries below to include composable stable pool dependencies
# wstETH dependencies
# - "0x72D07D7DcA67b8A406aD1Ec34ce969c90bFEE768"
# - "0xb8ffc3cd6e7cf5a098a1c92f48009765b24088dc"
# - "0xae7ab96520de3a18e5e111b5eaab095312d7fe84"
# - "0x7f39c581f595b53c5cb19bd0b3f8da6c935e2ca0"
# - "0x2b33cf282f867a7ff693a66e11b0fcc5552e4425"
# - "0x17144556fd3424edc8fc8a4c940b2d04936d17eb"
# sfrxETH dependencies
# - "0x302013E7936a39c358d07A3Df55dc94EC417E3a1"
# - "0xac3e018457b222d93114458476f3e3416abbe38f"
# rETH dependencies
# - "0x1a8F81c256aee9C640e14bB0453ce247ea0DFE6F"
# - "0x07fcabcbe4ff0d80c2b1eb42855c0131b6cba2f4"
# - "0x1d8f8f00cfa6758d7be78336684788fb0ee0fa46"
# - "0xae78736cd615f374d3085123a210448e74fc6393"
tests:
# WeightedPoolFactory - 0x897888115Ada5773E02aA29F775430BFB5F34c51
- name: test_weighted_pool_creation
start_block: 20128706
stop_block: 20128806
expected_state:
protocol_components:
- id: "0xe96a45f66bdDA121B24F0a861372A72E8889523d"
tokens:
- "0x38C2a4a7330b22788374B8Ff70BBa513C8D848cA"
- "0x514910771AF9Ca656af840dff83E8264EcF986CA"
static_attributes: null
creation_tx: "0xa63c671046ad2075ec8ea83ac21199cf3e3a5f433e72ec4c117cbabfb9b18de2"
# WeightedPool2TokensFactory - 0xA5bf2ddF098bb0Ef6d120C98217dD6B141c74EE0
- name: weighted_legacy_creation
start_block: 13148365
stop_block: 13148465
expected_state:
protocol_components:
- id: "0xBF96189Eee9357a95C7719f4F5047F76bdE804E5"
tokens:
- "0x5A98FcBEA516Cf06857215779Fd812CA3beF1B32"
- "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
static_attributes: null
creation_tx: "0xdced662e41b1608c386551bbc89894a10321fd8bd58782e22077d1044cf99cb5"
# ComposableStablePoolFactory - 0xDB8d758BCb971e482B2C45f7F8a7740283A1bd3A
- name: test_composable_stable_pool_creation
start_block: 17677300
stop_block: 17678400
expected_state:
protocol_components:
- id: "0x42ED016F826165C2e5976fe5bC3df540C5aD0Af7"
tokens:
- "0x42ed016f826165c2e5976fe5bc3df540c5ad0af7"
- "0x7f39C581F595B53c5cb19bD0b3f8dA6c935E2Ca0"
- "0xac3E018457B222d93114458476f3E3416Abbe38F"
- "0xae78736Cd615f374D3085123A210448E74Fc6393"
static_attributes: null
skip_simulation: true
creation_tx: "0x53ff6bab0d8a76a998e29e59da8068ad906ae85507a1c2fbf2505e2cb52fd754"
# ERC4626LinearPoolFactory - 0x813EE7a840CE909E7Fea2117A44a90b8063bd4fd
- name: test_erc4626_linear_pool_creation
start_block: 17480142
stop_block: 17480242
expected_state:
protocol_components:
- id: "0x3fCb7085B8F2F473F80bF6D879cAe99eA4DE9344"
tokens:
- "0x39Dd7790e75C6F663731f7E1FdC0f35007D3879b"
- "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
- "0x3fcb7085b8f2f473f80bf6d879cae99ea4de9344"
static_attributes: null
skip_simulation: true
creation_tx: "0x5ff97870685370bab3876a4335d28c42e24659064fe78b486d6fb1b37b992877"
# EulerLinearPoolFactory - 0x5F43FBa61f63Fa6bFF101a0A0458cEA917f6B347
- name: test_euler_linear_pool_creation
start_block: 16588117
stop_block: 16588217
expected_state:
protocol_components:
- id: "0xD4e7C1F3DA1144c9E2CfD1b015eDA7652b4a4399"
tokens:
- "0xD4e7C1F3DA1144c9E2CfD1b015eDA7652b4a4399"
- "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"
- "0xEb91861f8A4e1C12333F42DCE8fB0Ecdc28dA716"
static_attributes: null
skip_simulation: true
creation_tx: "0x4a9ea683052afefdae3d189862868c3a7dc8f431d1d9828b6bfd9451a8816426"
# SiloLinearPoolFactory - 0x4E11AEec21baF1660b1a46472963cB3DA7811C89
- name: test_silo_linear_pool_creation
start_block: 17173185
stop_block: 17173187
expected_state:
protocol_components:
- id: "0x74CBfAF94A3577c539a9dCEE9870A6349a33b34f"
tokens:
- "0x192E67544694a7bAA2DeA94f9B1Df58BB3395A12"
- "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
- "0x74cbfaf94a3577c539a9dcee9870a6349a33b34f"
static_attributes: null
skip_simulation: true
creation_tx: "0x215c9f4256ab450368132f4063611ae8cdd98e80bea7e44ecf0600ed1d757018"
# YearnLinearPoolFactory - 0x5F5222Ffa40F2AEd6380D022184D6ea67C776eE0a
- name: test_yearn_linear_pool_creation
start_block: 17052601
stop_block: 17052605
expected_state:
protocol_components:
- id: "0xac5b4ef7ede2f2843a704e96dcaa637f4ba3dc3f"
tokens:
- "0x806E02Dea8d4a0882caD9fA3Fa75B212328692dE"
- "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
- "0xac5b4ef7ede2f2843a704e96dcaa637f4ba3dc3f"
static_attributes: null
skip_simulation: true
creation_tx: "0x497aa03ce84d236c183204ddfc6762c8e4158da1ebc5e7e18e7f6cceaa497a2a"

View File

@@ -1,9 +1,9 @@
#![allow(clippy::all)]
pub mod crypto_pool_factory;
pub mod crypto_swap_ng_factory;
pub mod erc20;
pub mod meta_pool_factory;
pub mod meta_registry;
pub mod stableswap_factory;
pub mod crypto_swap_ng_factory;
pub mod meta_registry;
pub mod tricrypto_factory;
pub mod twocrypto_factory;
pub mod erc20;
pub mod meta_pool_factory;

View File

@@ -0,0 +1,38 @@
substreams_yaml_path: ./substreams.yaml
adapter_contract: "SwapAdapter.evm.runtime"
adapter_build_signature: "constructor(address)"
adapter_build_args: "0x0000000000000000000000000000000000000000"
skip_balance_check: false
initialized_accounts:
- "0xae7ab96520DE3A18E5e111B5EaAb095312D7fE84" # Needed for ....
protocol_type_names:
- "type_name_1"
- "type_name_2"
tests:
- name: test_pool_creation
start_block: 123
stop_block: 456
initialized_accounts:
- "0x0c0e5f2fF0ff18a3be9b835635039256dC4B4963" # Needed for ....
expected_components:
- id: "0xbebc44782c7db0a1a60cb6fe97d0b483032ff1c7"
tokens:
- "0xdac17f958d2ee523a2206206994597c13d831ec7"
- "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"
- "0x6b175474e89094c44da98b954eedeac495271d0f"
static_attributes:
attr_1: "value"
attr_2: "value"
creation_tx: "0x20793bbf260912aae189d5d261ff003c9b9166da8191d8f9d63ff1c7722f3ac6"
skip_simulation: false
- name: test_something_else
start_block: 123
stop_block: 456
expected_components:
- id: "0xdc24316b9ae028f1497c275eb9192a3ea0f67022"
tokens:
- "0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE"
- "0xae7ab96520DE3A18E5e111B5EaAb095312D7fE84"
static_attributes: null
creation_tx: "0xfac67ecbd423a5b915deff06045ec9343568edaec34ae95c43d35f2c018afdaa"
skip_simulation: true # If true, always add a reason

View File

@@ -1,34 +0,0 @@
substreams_yaml_path: ./substreams.yaml
adapter_contract: "SwapAdapter.evm.runtime"
skip_balance_check: false
protocol_type_names:
- "type_name_1"
- "type_name_2"
tests:
- name: test_pool_creation
start_block: 123
stop_block: 456
initialized_accounts:
- "0x0c0e5f2fF0ff18a3be9b835635039256dC4B4963" # Needed for ....
expected_state:
protocol_components:
- id: "0xbebc44782c7db0a1a60cb6fe97d0b483032ff1c7"
tokens:
- "0xdac17f958d2ee523a2206206994597c13d831ec7"
- "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"
- "0x6b175474e89094c44da98b954eedeac495271d0f"
static_attributes:
creation_tx: "0x20793bbf260912aae189d5d261ff003c9b9166da8191d8f9d63ff1c7722f3ac6"
skip_simulation: false
- name: test_something_else
start_block: 123
stop_block: 456
expected_state:
protocol_components:
- id: "0xdc24316b9ae028f1497c275eb9192a3ea0f67022"
tokens:
- "0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE"
- "0xae7ab96520DE3A18E5e111B5EaAb095312D7fE84"
static_attributes:
creation_tx: "0xfac67ecbd423a5b915deff06045ec9343568edaec34ae95c43d35f2c018afdaa"
skip_simulation: true # If true, always add a reason

View File

@@ -1,4 +1,5 @@
SUBSTREAMS_PATH=../substreams/ethereum-curve
RPC_URL=https://mainnet.infura.io/v3/your-infura-key
DATABASE_URL: "postgres://postgres:mypassword@db:5432/tycho_indexer_0"
SUBSTREAMS_API_TOKEN="changeme"
export SUBSTREAMS_PACKAGE=ethereum-curve
export RPC_URL=https://mainnet.infura.io/v3/your-infura-key
export DATABASE_URL: "postgres://postgres:mypassword@db:5432/tycho_indexer_0"
export SUBSTREAMS_API_TOKEN="changeme"
export DOMAIN_OWNER="AWSAccountId"

View File

@@ -4,10 +4,18 @@ FROM --platform=linux/amd64 continuumio/miniconda3:24.4.0-0
# Set the working directory in the container to /app
WORKDIR /app
# Install Foundry
RUN apt-get update && apt-get install -y curl
RUN curl -L https://foundry.paradigm.xyz | bash
RUN /bin/bash -c "source $HOME/.bashrc && $HOME/.foundry/bin/foundryup"
#
# Add Foundry to PATH
ENV PATH /root/.foundry/bin:$PATH
# Add current directory code to /app in container
ADD . /app/testing
RUN chmod +x /app/testing/tycho-indexer-linux-x64
RUN chmod +x /app/testing/tycho-indexer
# Create a new conda environment and install pip
RUN conda create -n myenv pip python=3.9
@@ -21,6 +29,7 @@ RUN apt-get update \
&& pip install psycopg2 \
&& apt-get clean
ARG PIP_INDEX_URL
RUN /bin/bash -c "source activate myenv && cd testing && pip install --no-cache-dir -r requirements.txt && cd -"
# Make port 80 available to the world outside this container
@@ -31,4 +40,4 @@ RUN wget -c https://github.com/streamingfast/substreams/releases/download/v1.8.0
RUN mv substreams /usr/local/bin/substreams && chmod +x /usr/local/bin/substreams
# Run the command to start your application
CMD ["python", "testing/cli.py", "--test_yaml_path", "/app/substreams/my_substream/test_assets.yaml", "--with_binary_logs", "--db_url", "postgres://postgres:mypassword@db:5432"]
CMD ["python", "testing/src/runner/cli.py", "--package", "my_substream", "--with_binary_logs", "--db_url", "postgres://postgres:mypassword@db:5432"]

View File

@@ -9,11 +9,13 @@ The testing suite builds the `.spkg` for your Substreams module, indexes a speci
## Prerequisites
- Latest version of our indexer, Tycho. Please contact us to obtain the latest version. Once acquired, place it in the `/testing/` directory.
- Access to PropellerHeads' private PyPI repository. Please contact us to obtain access.
- Docker installed on your machine.
## Test Configuration
Tests are defined in a `yaml` file. A template can be found at `substreams/ethereum-template/test_assets.yaml`. The configuration file should include:
Tests are defined in a `yaml` file. A template can be found at
`substreams/ethereum-template/integration_test.tycho.yaml`. The configuration file should include:
- The target Substreams config file.
- The expected protocol types.
@@ -32,13 +34,33 @@ Please place this Runtime file under the respective `substream` directory inside
Export the required environment variables for the execution. You can find the available environment variables in the `.env.default` file.
Please create a `.env` file in the `testing` directory and set the required environment variables.
The variable SUBSTREAMS_PATH should be a relative reference to the directory containing the Substreams module that you want to test.
#### Environment Variables
Example: `SUBSTREAMS_PATH=../substreams/ethereum-curve`
**SUBSTREAMS_PACKAGE**
- **Description**: Specifies the Substreams module that you want to test
- **Example**: `export SUBSTREAMS_PACKAGE=ethereum-balancer`
**DATABASE_URL**
- **Description**: The connection string for the PostgreSQL database. It includes the username, password, host, port, and database name. It's already set to the default for the Docker container.
- **Example**: `export DATABASE_URL="postgres://postgres:mypassword@localhost:5431/tycho_indexer_0`
**RPC_URL**
- **Description**: The URL for the Ethereum RPC endpoint. This is used to fetch the storage data. The node needs to be an archive node, and support [debug_storageRangeAt](https://www.quicknode.com/docs/ethereum/debug_storageRangeAt).
- **Example**: `export RPC_URL="https://ethereum-mainnet.core.chainstack.com/123123123123"`
**SUBSTREAMS_API_TOKEN**
- **Description**: The API token for accessing Substreams services. This token is required for authentication.
- **Example**: `export SUBSTREAMS_API_TOKEN=eyJhbGci...`
**DOMAIN_OWNER**
- **Description**: The domain owner identifier for Propellerhead's AWS account, used for authenticating on the private PyPI repository.
- **Example**: `export DOMAIN_OWNER=123456789`
### Step 2: Build and the Testing Script
Run the testing script using Docker Compose:
To build the testing script, run the following commands:
```bash
source pre_build.sh
docker compose build
docker compose run app
```

View File

@@ -17,17 +17,30 @@ services:
build:
context: .
dockerfile: Dockerfile
args:
PIP_INDEX_URL: ${PIP_INDEX_URL}
volumes:
- ${SUBSTREAMS_PATH}:/app/substreams/my_substream
- ../substreams:/app/substreams
- ../proto:/app/proto
- ./tycho-indexer:/app/testing/tycho-indexer
- ./tycho-indexer:/bin/tycho-indexer
- ./src/runner/runner.py:/app/testing/src.py
- ../evm:/app/evm
- ./src/runner:/app/testing/src/runner
ports:
- "80:80"
depends_on:
- db
env_file:
- ".env"
environment:
- DATABASE_URL=postgres://postgres:mypassword@db:5432/tycho_indexer_0
command:
- "python"
- "testing/src/runner/cli.py"
- "--package"
- ${SUBSTREAMS_PACKAGE}
- "--tycho-logs"
- "--db-url"
- "postgres://postgres:mypassword@db:5432/tycho_indexer_0"
volumes:
postgres_data:

26
testing/pre_build.sh Executable file
View File

@@ -0,0 +1,26 @@
#!/bin/bash
# Enable automatic export of all defined variables
set -a
# Source the .env file
source .env
# Disable automatic export (optional, if you want to stop exporting variables)
set +a
# Check if DOMAIN_OWNER is set
if [ -z "$DOMAIN_OWNER" ]; then
echo "DOMAIN_OWNER environment variable is not set."
return 1
fi
# Fetch the CODEARTIFACT_AUTH_TOKEN
CODEARTIFACT_AUTH_TOKEN=$(aws --region eu-central-1 codeartifact get-authorization-token --domain propeller --domain-owner $DOMAIN_OWNER --query authorizationToken --output text --duration 1800)
# Set the PIP_INDEX_URL
PIP_INDEX_URL="https://aws:${CODEARTIFACT_AUTH_TOKEN}@propeller-${DOMAIN_OWNER}.d.codeartifact.eu-central-1.amazonaws.com/pypi/protosim/simple/"
# Export the variables
export CODEARTIFACT_AUTH_TOKEN
export PIP_INDEX_URL

View File

@@ -2,4 +2,5 @@ psycopg2==2.9.9
PyYAML==6.0.1
Requests==2.32.2
web3==5.31.3
-e ./tycho-client
tycho-indexer-client>=0.7.0
protosim_py>=0.5.0

View File

@@ -0,0 +1,65 @@
import os
import subprocess
from typing import Optional
class AdapterContractBuilder:
def __init__(self, src_path: str):
self.src_path = src_path
def find_contract(self, adapter_contract: str):
"""
Finds the contract file in the provided source path.
:param adapter_contract: The contract name to be found.
:return: The path to the contract file.
"""
contract_path = os.path.join(
self.src_path,
"out",
f"{adapter_contract}.sol",
f"{adapter_contract}.evm.runtime",
)
if not os.path.exists(contract_path):
raise FileNotFoundError(f"Contract {adapter_contract} not found.")
return contract_path
def build_target(
self, adapter_contract: str, signature: Optional[str], args: Optional[str]
) -> str:
"""
Runs the buildRuntime Bash script in a subprocess with the provided arguments.
:param src_path: Path to the script to be executed.
:param adapter_contract: The contract name to be passed to the script.
:param signature: The constructor signature to be passed to the script.
:param args: The constructor arguments to be passed to the script.
:return: The path to the contract file.
"""
script_path = "scripts/buildRuntime.sh"
cmd = [script_path, "-c", adapter_contract]
if signature:
cmd.extend(["-s", signature, "-a", args])
try:
# Running the bash script with the provided arguments
result = subprocess.run(
[script_path, "-c", adapter_contract, "-s", signature, "-a", args],
cwd=self.src_path,
capture_output=True,
text=True,
check=True,
)
# Print standard output and error for debugging
print("Output:\n", result.stdout)
if result.stderr:
print("Errors:\n", result.stderr)
return self.find_contract(adapter_contract)
except subprocess.CalledProcessError as e:
print(f"An error occurred: {e}")
print("Error Output:\n", e.stderr)

View File

@@ -6,29 +6,20 @@ def main() -> None:
parser = argparse.ArgumentParser(
description="Run indexer within a specified range of blocks"
)
parser.add_argument("--package", type=str, help="Name of the package to test.")
parser.add_argument(
"--package", type=str, help="Name of the package to test."
)
parser.add_argument(
"--tycho-logs",
action="store_true",
help="Flag to activate logs from Tycho.",
"--tycho-logs", action="store_true", help="Flag to activate logs from Tycho."
)
parser.add_argument(
"--db-url", type=str, help="Postgres database URL for the Tycho indexer."
)
parser.add_argument(
"--vm-traces",
action="store_true",
help="Enable tracing during vm simulations.",
"--vm-traces", action="store_true", help="Enable tracing during vm simulations."
)
args = parser.parse_args()
test_runner = TestRunner(
args.package,
args.tycho_logs,
db_url=args.db_url,
vm_traces=args.vm_traces,
args.package, args.tycho_logs, db_url=args.db_url, vm_traces=args.vm_traces
)
test_runner.run_tests()

View File

@@ -0,0 +1,125 @@
import difflib
from hexbytes import HexBytes
from pydantic import BaseModel, Field, validator
from typing import List, Dict, Optional
from tycho_client.dto import ProtocolComponent
class ProtocolComponentExpectation(BaseModel):
"""Represents a ProtocolComponent with its main attributes."""
id: str = Field(..., description="Identifier of the protocol component")
tokens: List[HexBytes] = Field(
...,
description="List of token addresses associated with the protocol component",
)
static_attributes: Optional[Dict[str, HexBytes]] = Field(
default_factory=dict, description="Static attributes of the protocol component"
)
creation_tx: HexBytes = Field(
..., description="Hash of the transaction that created the protocol component"
)
@validator("id", pre=True, always=True)
def lower_id(cls, v):
return v.lower()
@validator("tokens", pre=True, always=True)
def convert_tokens_to_hexbytes(cls, v):
return sorted(HexBytes(t.lower()) for t in v)
@validator("static_attributes", pre=True, always=True)
def convert_static_attributes_to_hexbytes(cls, v):
return {k: HexBytes(v[k].lower()) for k in v} if v else {}
@validator("creation_tx", pre=True, always=True)
def convert_creation_tx_to_hexbytes(cls, v):
return HexBytes(v.lower())
def compare(
self, other: "ProtocolComponentExpectation", colorize_output: bool = True
) -> Optional[str]:
"""Compares the current instance with another ProtocolComponent instance and returns a message with the
differences or None if there are no differences."""
def colorize_diff(diff):
colored_diff = []
for line in diff:
if line.startswith("-"):
colored_diff.append(f"\033[91m{line}\033[0m") # Red
elif line.startswith("+"):
colored_diff.append(f"\033[92m{line}\033[0m") # Green
elif line.startswith("?"):
colored_diff.append(f"\033[93m{line}\033[0m") # Yellow
else:
colored_diff.append(line)
return "\n".join(colored_diff)
differences = []
for field_name, field_value in self.__dict__.items():
other_value = getattr(other, field_name, None)
if field_value != other_value:
diff = list(difflib.ndiff([str(field_value)], [str(other_value)]))
highlighted_diff = (
colorize_diff(diff) if colorize_output else "\n".join(diff)
)
differences.append(
f"Field '{field_name}' mismatch:\n{highlighted_diff}"
)
if not differences:
return None
return "\n".join(differences)
class ProtocolComponentWithTestConfig(ProtocolComponentExpectation):
"""Represents a ProtocolComponent with its main attributes and test configuration."""
skip_simulation: Optional[bool] = Field(
False,
description="Flag indicating whether to skip simulation for this component",
)
class IntegrationTest(BaseModel):
"""Configuration for an individual test."""
name: str = Field(..., description="Name of the test")
start_block: int = Field(..., description="Starting block number for the test")
stop_block: int = Field(..., description="Stopping block number for the test")
initialized_accounts: Optional[List[str]] = Field(
None, description="List of initialized account addresses"
)
expected_components: List[ProtocolComponentWithTestConfig] = Field(
..., description="List of protocol components expected in the indexed state"
)
class IntegrationTestsConfig(BaseModel):
"""Main integration test configuration."""
substreams_yaml_path: str = Field(
"./substreams.yaml", description="Path of the Substreams YAML file"
)
adapter_contract: str = Field(
..., description="Name of the SwapAdapter contract for this protocol"
)
adapter_build_signature: Optional[str] = Field(
None, description="SwapAdapter's constructor signature"
)
adapter_build_args: Optional[str] = Field(
None, description="Arguments for the SwapAdapter constructor"
)
initialized_accounts: Optional[List[str]] = Field(
None,
description="List of initialized account addresses. These accounts will be initialized for every tests",
)
skip_balance_check: bool = Field(
..., description="Flag to skip balance check for all tests"
)
protocol_type_names: List[str] = Field(
..., description="List of protocol type names for the tested protocol"
)
tests: List[IntegrationTest] = Field(..., description="List of integration tests")

View File

@@ -2,20 +2,41 @@ import itertools
import os
import shutil
import subprocess
import traceback
from collections import defaultdict
from datetime import datetime
from decimal import Decimal
from pathlib import Path
import traceback
from typing import List
import yaml
from protosim_py.evm.decoders import ThirdPartyPoolTychoDecoder
from protosim_py.evm.storage import TychoDBSingleton
from protosim_py.models import EVMBlock
from pydantic import BaseModel
from tycho_client.decoders import ThirdPartyPoolTychoDecoder
from tycho_client.models import Blockchain, EVMBlock
from tycho_client.tycho_adapter import TychoPoolStateStreamAdapter
from tycho_client.dto import (
Chain,
ProtocolComponentsParams,
ProtocolStateParams,
ContractStateParams,
ProtocolComponent,
ResponseProtocolState,
HexBytes,
ResponseAccount,
Snapshot,
ContractId,
)
from tycho_client.rpc_client import TychoRPCClient
from models import (
IntegrationTestsConfig,
ProtocolComponentWithTestConfig,
ProtocolComponentExpectation,
)
from adapter_builder import AdapterContractBuilder
from evm import get_token_balance, get_block_header
from tycho import TychoRunner, TychoRPCClient
from tycho import TychoRunner
from utils import build_snapshot_message, token_factory
class TestResult:
@@ -32,10 +53,10 @@ class TestResult:
return cls(success=False, message=message)
def load_config(yaml_path: str) -> dict:
"""Load YAML configuration from a specified file path."""
def parse_config(yaml_path: str) -> IntegrationTestsConfig:
with open(yaml_path, "r") as file:
return yaml.safe_load(file)
yaml_content = yaml.safe_load(file)
return IntegrationTestsConfig(**yaml_content)
class SimulationFailure(BaseModel):
@@ -46,120 +67,139 @@ class SimulationFailure(BaseModel):
class TestRunner:
def __init__(self, package: str, with_binary_logs: bool, db_url: str, vm_traces: bool):
def __init__(
self, package: str, with_binary_logs: bool, db_url: str, vm_traces: bool
):
self.repo_root = os.getcwd()
config_path = os.path.join(self.repo_root, "substreams", package, "test_assets.yaml")
self.config = load_config(config_path)
config_path = os.path.join(
self.repo_root, "substreams", package, "integration_test.tycho.yaml"
)
self.config: IntegrationTestsConfig = parse_config(config_path)
self.spkg_src = os.path.join(self.repo_root, "substreams", package)
self.adapters_src = os.path.join(self.repo_root, "evm")
self.tycho_runner = TychoRunner(db_url, with_binary_logs, self.config["initialized_accounts"])
self.adapter_contract_builder = AdapterContractBuilder(
os.path.join(self.repo_root, "evm")
)
self.tycho_runner = TychoRunner(
db_url, with_binary_logs, self.config.initialized_accounts
)
self.tycho_rpc_client = TychoRPCClient()
self._token_factory_func = token_factory(self.tycho_rpc_client)
self.db_url = db_url
self._vm_traces = vm_traces
self._chain = Blockchain.ethereum
self._chain = Chain.ethereum
def run_tests(self) -> None:
"""Run all tests specified in the configuration."""
print(f"Running tests ...")
for test in self.config["tests"]:
failed_tests = []
for test in self.config.tests:
self.tycho_runner.empty_database(self.db_url)
spkg_path = self.build_spkg(
os.path.join(self.spkg_src, self.config["substreams_yaml_path"]),
lambda data: self.update_initial_block(data, test["start_block"]),
os.path.join(self.spkg_src, self.config.substreams_yaml_path),
lambda data: self.update_initial_block(data, test.start_block),
)
self.tycho_runner.run_tycho(
spkg_path,
test["start_block"],
test["stop_block"],
self.config["protocol_type_names"],
test.get("initialized_accounts", []),
test.start_block,
test.stop_block,
self.config.protocol_type_names,
test.initialized_accounts or [],
)
result = self.tycho_runner.run_with_rpc_server(
self.validate_state, test["expected_state"], test["stop_block"]
self.validate_state, test.expected_components, test.stop_block
)
if result.success:
print(f"{test['name']} passed.")
print(f"\n{test.name} passed.\n")
else:
print(f"❗️ {test['name']} failed: {result.message}")
print(f"\n❗️ {test.name} failed: {result.message}\n")
def validate_state(self, expected_state: dict, stop_block: int) -> TestResult:
print(
"\nTest finished! \n"
f"Passed: {len(self.config.tests) - len(failed_tests)}/{len(self.config.tests)}\n"
)
if failed_tests:
print("Failed tests:")
for failed_test in failed_tests:
print(failed_test)
def validate_state(
self,
expected_components: List[ProtocolComponentWithTestConfig],
stop_block: int,
) -> TestResult:
"""Validate the current protocol state against the expected state."""
protocol_components = self.tycho_rpc_client.get_protocol_components()
protocol_states = self.tycho_rpc_client.get_protocol_state()
components = {
component["id"]: component
for component in protocol_components["protocol_components"]
protocol_components = self.tycho_rpc_client.get_protocol_components(
ProtocolComponentsParams(protocol_system="test_protocol")
)
protocol_states = self.tycho_rpc_client.get_protocol_state(
ProtocolStateParams(protocol_system="test_protocol")
)
components_by_id = {
component.id: component for component in protocol_components
}
try:
for expected_component in expected_state.get("protocol_components", []):
comp_id = expected_component["id"].lower()
if comp_id not in components:
for expected_component in expected_components:
comp_id = expected_component.id.lower()
if comp_id not in components_by_id:
return TestResult.Failed(
f"'{comp_id}' not found in protocol components."
)
component = components[comp_id]
for key, value in expected_component.items():
if key not in ["tokens", "static_attributes", "creation_tx"]:
continue
if key not in component:
return TestResult.Failed(
f"Missing '{key}' in component '{comp_id}'."
)
if isinstance(value, list):
if set(map(str.lower, value)) != set(
map(str.lower, component[key])
):
return TestResult.Failed(
f"List mismatch for key '{key}': {value} != {component[key]}"
)
elif value is not None and value.lower() != component[key]:
return TestResult.Failed(
f"Value mismatch for key '{key}': {value} != {component[key]}"
)
diff = ProtocolComponentExpectation(
**components_by_id[comp_id].dict()
).compare(ProtocolComponentExpectation(**expected_component.dict()))
if diff is not None:
return TestResult.Failed(diff)
token_balances: dict[str, dict[str, int]] = defaultdict(dict)
for component in protocol_components["protocol_components"]:
comp_id = component["id"].lower()
for token in component["tokens"]:
token_lower = token.lower()
token_balances: dict[str, dict[HexBytes, int]] = defaultdict(dict)
for component in protocol_components:
comp_id = component.id.lower()
for token in component.tokens:
state = next(
(
s
for s in protocol_states["states"]
if s["component_id"].lower() == comp_id
for s in protocol_states
if s.component_id.lower() == comp_id
),
None,
)
if state:
balance_hex = state["balances"].get(token_lower, "0x0")
balance_hex = state.balances.get(token, HexBytes("0x00"))
else:
balance_hex = "0x0"
tycho_balance = int(balance_hex, 16)
token_balances[comp_id][token_lower] = tycho_balance
balance_hex = HexBytes("0x00")
tycho_balance = int(balance_hex)
token_balances[comp_id][token] = tycho_balance
if self.config["skip_balance_check"] is not True:
if not self.config.skip_balance_check:
node_balance = get_token_balance(token, comp_id, stop_block)
if node_balance != tycho_balance:
return TestResult.Failed(
f"Balance mismatch for {comp_id}:{token} at block {stop_block}: got {node_balance} "
f"from rpc call and {tycho_balance} from Substreams"
)
contract_states = self.tycho_rpc_client.get_contract_state()
filtered_components = {'protocol_components': [pc for pc in protocol_components["protocol_components"] if
pc["id"] in [c["id"].lower() for c in
expected_state["protocol_components"] if
c.get("skip_simulation", False) is False]]}
contract_states = self.tycho_rpc_client.get_contract_state(
ContractStateParams(
contract_ids=[
ContractId(chain=self._chain, address=a)
for component in protocol_components
for a in component.contract_ids
]
)
)
filtered_components = [
pc
for pc in protocol_components
if pc.id
in [c.id for c in expected_components if c.skip_simulation is False]
]
simulation_failures = self.simulate_get_amount_out(
stop_block,
protocol_states,
filtered_components,
contract_states,
stop_block, protocol_states, filtered_components, contract_states
)
if len(simulation_failures):
error_msgs = []
@@ -178,13 +218,14 @@ class TestRunner:
return TestResult.Failed(error_message)
def simulate_get_amount_out(
self,
block_number: int,
protocol_states: dict,
protocol_components: dict,
contract_state: dict,
self,
block_number: int,
protocol_states: list[ResponseProtocolState],
protocol_components: list[ProtocolComponent],
contract_states: list[ResponseAccount],
) -> dict[str, list[SimulationFailure]]:
protocol_type_names = self.config["protocol_type_names"]
TychoDBSingleton.initialize()
protocol_type_names = self.config.protocol_type_names
block_header = get_block_header(block_number)
block: EVMBlock = EVMBlock(
@@ -194,56 +235,62 @@ class TestRunner:
)
failed_simulations: dict[str, list[SimulationFailure]] = dict()
for protocol in protocol_type_names:
adapter_contract = os.path.join(
self.adapters_src, "out", f"{self.config['adapter_contract']}.sol",
f"{self.config['adapter_contract']}.evm.runtime"
)
decoder = ThirdPartyPoolTychoDecoder(adapter_contract, 0, trace=self._vm_traces)
stream_adapter = TychoPoolStateStreamAdapter(
tycho_url="0.0.0.0:4242",
protocol=protocol,
decoder=decoder,
blockchain=self._chain,
)
snapshot_message = stream_adapter.build_snapshot_message(
protocol_components, protocol_states, contract_state
)
decoded = stream_adapter.process_snapshot(block, snapshot_message)
for pool_state in decoded.pool_states.values():
pool_id = pool_state.id_
if not pool_state.balances:
raise ValueError(f"Missing balances for pool {pool_id}")
for sell_token, buy_token in itertools.permutations(
pool_state.tokens, 2
):
# Try to sell 0.1% of the protocol balance
sell_amount = Decimal("0.001") * pool_state.balances[sell_token.address]
try:
amount_out, gas_used, _ = pool_state.get_amount_out(
sell_token, sell_amount, buy_token
try:
adapter_contract = self.adapter_contract_builder.find_contract(
self.config.adapter_contract
)
except FileNotFoundError:
adapter_contract = self.adapter_contract_builder.build_target(
self.config.adapter_contract,
self.config.adapter_build_signature,
self.config.adapter_build_args,
)
decoder = ThirdPartyPoolTychoDecoder(
token_factory_func=self._token_factory_func,
adapter_contract=adapter_contract,
minimum_gas=0,
trace=self._vm_traces,
)
snapshot_message: Snapshot = build_snapshot_message(
protocol_states, protocol_components, contract_states
)
decoded = decoder.decode_snapshot(snapshot_message, block)
for pool_state in decoded.values():
pool_id = pool_state.id_
if not pool_state.balances:
raise ValueError(f"Missing balances for pool {pool_id}")
for sell_token, buy_token in itertools.permutations(pool_state.tokens, 2):
# Try to sell 0.1% of the protocol balance
sell_amount = Decimal("0.001") * pool_state.balances[sell_token.address]
try:
amount_out, gas_used, _ = pool_state.get_amount_out(
sell_token, sell_amount, buy_token
)
print(
f"Amount out for {pool_id}: {sell_amount} {sell_token} -> {amount_out} {buy_token} - "
f"Gas used: {gas_used}"
)
except Exception as e:
print(
f"Error simulating get_amount_out for {pool_id}: {sell_token} -> {buy_token}. "
f"Error: {e}"
)
if pool_id not in failed_simulations:
failed_simulations[pool_id] = []
failed_simulations[pool_id].append(
SimulationFailure(
pool_id=pool_id,
sell_token=str(sell_token),
buy_token=str(buy_token),
error=str(e),
)
print(
f"Amount out for {pool_id}: {sell_amount} {sell_token} -> {amount_out} {buy_token} - "
f"Gas used: {gas_used}"
)
except Exception as e:
print(
f"Error simulating get_amount_out for {pool_id}: {sell_token} -> {buy_token}. "
f"Error: {e}"
)
if pool_id not in failed_simulations:
failed_simulations[pool_id] = []
failed_simulations[pool_id].append(
SimulationFailure(
pool_id=pool_id,
sell_token=str(sell_token),
buy_token=str(buy_token),
error=str(e),
)
)
continue
)
continue
return failed_simulations
@staticmethod

View File

@@ -4,7 +4,6 @@ import threading
import time
import psycopg2
import requests
from psycopg2 import sql
import os
@@ -39,51 +38,24 @@ def find_binary_file(file_name):
binary_path = find_binary_file("tycho-indexer")
class TychoRPCClient:
def __init__(self, rpc_url: str = "http://0.0.0.0:4242"):
self.rpc_url = rpc_url
def get_protocol_components(self) -> dict:
"""Retrieve protocol components from the RPC server."""
url = self.rpc_url + "/v1/ethereum/protocol_components"
headers = {"accept": "application/json", "Content-Type": "application/json"}
data = {"protocol_system": "test_protocol"}
response = requests.post(url, headers=headers, json=data)
return response.json()
def get_protocol_state(self) -> dict:
"""Retrieve protocol state from the RPC server."""
url = self.rpc_url + "/v1/ethereum/protocol_state"
headers = {"accept": "application/json", "Content-Type": "application/json"}
data = {}
response = requests.post(url, headers=headers, json=data)
return response.json()
def get_contract_state(self) -> dict:
"""Retrieve contract state from the RPC server."""
url = self.rpc_url + "/v1/ethereum/contract_state?include_balances=false"
headers = {"accept": "application/json", "Content-Type": "application/json"}
data = {}
response = requests.post(url, headers=headers, json=data)
return response.json()
class TychoRunner:
def __init__(self, db_url: str, with_binary_logs: bool = False, initialized_accounts: list[str] = None):
def __init__(
self,
db_url: str,
with_binary_logs: bool = False,
initialized_accounts: list[str] = None,
):
self.with_binary_logs = with_binary_logs
self._db_url = db_url
self._initialized_accounts = initialized_accounts or []
def run_tycho(
self,
spkg_path: str,
start_block: int,
end_block: int,
protocol_type_names: list,
initialized_accounts: list,
self,
spkg_path: str,
start_block: int,
end_block: int,
protocol_type_names: list,
initialized_accounts: list,
) -> None:
"""Run the Tycho indexer with the specified SPKG and block range."""
@@ -109,14 +81,24 @@ class TychoRunner:
str(start_block),
"--stop-block",
# +2 is to make up for the cache in the index side.
str(end_block + 2)
] + (["--initialized-accounts", ",".join(all_accounts)] if all_accounts else []),
str(end_block + 2),
]
+ (
[
"--initialized-accounts",
",".join(all_accounts),
"--initialization-block",
str(start_block),
]
if all_accounts
else []
),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
env=env,
)
)
with process.stdout:
for line in iter(process.stdout.readline, ""):
@@ -149,12 +131,7 @@ class TychoRunner:
env["RUST_LOG"] = "info"
process = subprocess.Popen(
[
binary_path,
"--database-url",
self._db_url,
"rpc"
],
[binary_path, "--database-url", self._db_url, "rpc"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
@@ -204,7 +181,7 @@ class TychoRunner:
def empty_database(db_url: str) -> None:
"""Drop and recreate the Tycho indexer database."""
try:
conn = psycopg2.connect(db_url[:db_url.rfind('/')])
conn = psycopg2.connect(db_url[: db_url.rfind("/")])
conn.autocommit = True
cursor = conn.cursor()

View File

@@ -0,0 +1,80 @@
from logging import getLogger
from typing import Union
from eth_utils import to_checksum_address
from protosim_py.models import EthereumToken
from tycho_client.dto import (
ResponseProtocolState,
ProtocolComponent,
ResponseAccount,
ComponentWithState,
Snapshot,
HexBytes,
TokensParams,
PaginationParams,
)
from tycho_client.rpc_client import TychoRPCClient
log = getLogger(__name__)
def build_snapshot_message(
protocol_states: list[ResponseProtocolState],
protocol_components: list[ProtocolComponent],
account_states: list[ResponseAccount],
) -> Snapshot:
vm_storage = {state.address: state for state in account_states}
states = {}
for component in protocol_components:
pool_id = component.id
states[pool_id] = {"component": component}
for state in protocol_states:
pool_id = state.component_id
if pool_id not in states:
log.warning(f"State for pool {pool_id} not found in components")
continue
states[pool_id]["state"] = state
states = {id_: ComponentWithState(**state) for id_, state in states.items()}
return Snapshot(states=states, vm_storage=vm_storage)
def token_factory(rpc_client: TychoRPCClient) -> callable(HexBytes):
_client = rpc_client
_token_cache: dict[str, EthereumToken] = {}
def factory(requested_addresses: Union[str, list[str]]) -> list[EthereumToken]:
if not isinstance(requested_addresses, list):
requested_addresses = [to_checksum_address(requested_addresses)]
else:
requested_addresses = [to_checksum_address(a) for a in requested_addresses]
response = dict()
to_fetch = []
for address in requested_addresses:
if address in _token_cache:
response[address] = _token_cache[address]
else:
to_fetch.append(address)
if to_fetch:
pagination = PaginationParams(page_size=len(to_fetch), page=0)
params = TokensParams(token_addresses=to_fetch, pagination=pagination)
tokens = _client.get_tokens(params)
for token in tokens:
address = to_checksum_address(token.address)
eth_token = EthereumToken(
symbol=token.symbol,
address=address,
decimals=token.decimals,
gas=token.gas,
)
response[address] = eth_token
_token_cache[address] = eth_token
return [response[address] for address in requested_addresses]
return factory

View File

@@ -1,3 +0,0 @@
include wheels/*.whl
include tycho_client/assets/*
include tycho_client/bins/*

View File

@@ -1,38 +0,0 @@
# Tycho Adapter
This repository contains the Tycho Adapter, a tool that allows you to interact with the Tycho API.
## Installation
### Prerequisites
- Python 3.9
### Install with pip
```shell
# Create conda environment
conda create -n tycho pip python=3.9
# Activate environment
conda activate tycho
# Install packages
pip install -r requirements.txt
```
## Usage
```python
from tycho_client.decoders import ThirdPartyPoolTychoDecoder
from tycho_client.models import Blockchain
from tycho_client.tycho_adapter import TychoPoolStateStreamAdapter
decoder = ThirdPartyPoolTychoDecoder(
"MyProtocolSwapAdapter.evm.runtime", minimum_gas=0, hard_limit=False
)
stream_adapter = TychoPoolStateStreamAdapter(
tycho_url="0.0.0.0:4242",
protocol="my_protocol",
decoder=decoder,
blockchain=Blockchain.ethereum,
)
```

View File

@@ -1,6 +0,0 @@
requests==2.32.2
eth-abi==2.2.0
eth-typing==2.3.0
eth-utils==1.9.5
hexbytes==0.3.1
pydantic==2.8.2

View File

@@ -1,36 +0,0 @@
from setuptools import setup, find_packages
def read_requirements():
with open("requirements.txt") as req:
content = req.read()
requirements = content.split("\n")
return [req for req in requirements if req and not req.startswith("#")]
setup(
name="tycho-client",
version="0.1.0",
author="Propeller Heads",
description="A package for interacting with the Tycho API.",
long_description=open("README.md").read(),
long_description_content_type="text/markdown",
packages=find_packages(),
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
],
python_requires="~=3.9",
install_requires=[
"requests==2.32.2",
"eth-abi==2.2.0",
"eth-typing==2.3.0",
"eth-utils==1.9.5",
"hexbytes==0.3.1",
"pydantic==2.8.2",
"protosim_py==0.4.11",
],
package_data={"tycho-client": ["../wheels/*", "./assets/*", "./bins/*"]},
include_package_data=True,
)

View File

@@ -1,211 +0,0 @@
import logging
import time
from decimal import Decimal
from fractions import Fraction
from typing import Any, Union, NamedTuple
import eth_abi
from eth_abi.exceptions import DecodingError
from eth_typing import HexStr
from eth_utils import keccak
from eth_utils.abi import collapse_if_tuple
from hexbytes import HexBytes
from protosim_py import (
SimulationEngine,
SimulationParameters,
SimulationResult,
StateUpdate,
)
from .constants import EXTERNAL_ACCOUNT
from .models import Address, EthereumToken, EVMBlock, Capability
from .utils import load_abi, maybe_coerce_error
log = logging.getLogger(__name__)
TStateOverwrites = dict[Address, dict[int, int]]
class Trade(NamedTuple):
"""
Trade represents a simple trading operation with fields:
received_amount: Amount received from the trade
gas_used: Amount of gas used in the transaction
price: Price at which the trade was executed
"""
received_amount: float
gas_used: float
price: float
class ProtoSimResponse:
def __init__(self, return_value: Any, simulation_result: SimulationResult):
self.return_value = return_value
self.simulation_result = simulation_result
class ProtoSimContract:
def __init__(self, address: Address, abi_name: str, engine: SimulationEngine):
self.abi = load_abi(abi_name)
self.address = address
self.engine = engine
self._default_tx_env = dict(
caller=EXTERNAL_ACCOUNT, to=self.address, value=0, overrides={}
)
functions = [f for f in self.abi if f["type"] == "function"]
self._functions = {f["name"]: f for f in functions}
if len(self._functions) != len(functions):
raise ValueError(
f"ProtoSimContract does not support overloaded function names! "
f"Encountered while loading {abi_name}."
)
def _encode_input(self, fname: str, args: list) -> bytearray:
func = self._functions[fname]
types = [collapse_if_tuple(t) for t in func["inputs"]]
selector = keccak(text=f"{fname}({','.join(types)})")[:4]
return bytearray(selector + eth_abi.encode(types, args))
def _decode_output(self, fname: str, encoded: list[int]) -> Any:
func = self._functions[fname]
types = [collapse_if_tuple(t) for t in func["outputs"]]
return eth_abi.decode(types, bytearray(encoded))
def call(
self,
fname: str,
*args: list[Union[int, str, bool, bytes]],
block_number,
timestamp: int = None,
overrides: TStateOverwrites = None,
caller: Address = EXTERNAL_ACCOUNT,
value: int = 0,
) -> ProtoSimResponse:
call_data = self._encode_input(fname, *args)
params = SimulationParameters(
data=call_data,
to=self.address,
block_number=block_number,
timestamp=timestamp or int(time.time()),
overrides=overrides or {},
caller=caller,
value=value,
)
sim_result = self._simulate(params)
try:
output = self._decode_output(fname, sim_result.result)
except DecodingError:
log.warning("Failed to decode output")
output = None
return ProtoSimResponse(output, sim_result)
def _simulate(self, params: SimulationParameters) -> "SimulationResult":
"""Run simulation and handle errors.
It catches a RuntimeError:
- if it's ``Execution reverted``, re-raises a RuntimeError
with a Tenderly link added
- if it's ``Out of gas``, re-raises a RecoverableSimulationException
- otherwise it just re-raises the original error.
"""
try:
simulation_result = self.engine.run_sim(params)
return simulation_result
except RuntimeError as err:
try:
coerced_err = maybe_coerce_error(err, self, params.gas_limit)
except Exception:
log.exception("Couldn't coerce error. Re-raising the original one.")
raise err
msg = str(coerced_err)
if "Revert!" in msg:
raise type(coerced_err)(msg, repr(self)) from err
else:
raise coerced_err
class AdapterContract(ProtoSimContract):
"""
The AdapterContract provides an interface to interact with the protocols implemented
by third parties using the `propeller-protocol-lib`.
"""
def __init__(self, address: Address, engine: SimulationEngine):
super().__init__(address, "ISwapAdapter", engine)
def price(
self,
pair_id: HexStr,
sell_token: EthereumToken,
buy_token: EthereumToken,
amounts: list[int],
block: EVMBlock,
overwrites: TStateOverwrites = None,
) -> list[Fraction]:
args = [HexBytes(pair_id), sell_token.address, buy_token.address, amounts]
res = self.call(
"price",
args,
block_number=block.id,
timestamp=int(block.ts.timestamp()),
overrides=overwrites,
)
return list(map(lambda x: Fraction(*x), res.return_value[0]))
def swap(
self,
pair_id: HexStr,
sell_token: EthereumToken,
buy_token: EthereumToken,
is_buy: bool,
amount: Decimal,
block: EVMBlock,
overwrites: TStateOverwrites = None,
) -> tuple[Trade, dict[str, StateUpdate]]:
args = [
HexBytes(pair_id),
sell_token.address,
buy_token.address,
int(is_buy),
amount,
]
res = self.call(
"swap",
args,
block_number=block.id,
timestamp=int(block.ts.timestamp()),
overrides=overwrites,
)
amount, gas, price = res.return_value[0]
return Trade(amount, gas, Fraction(*price)), res.simulation_result.state_updates
def get_limits(
self,
pair_id: HexStr,
sell_token: EthereumToken,
buy_token: EthereumToken,
block: EVMBlock,
overwrites: TStateOverwrites = None,
) -> tuple[int, int]:
args = [HexBytes(pair_id), sell_token.address, buy_token.address]
res = self.call(
"getLimits",
args,
block_number=block.id,
timestamp=int(block.ts.timestamp()),
overrides=overwrites,
)
return res.return_value[0]
def get_capabilities(
self, pair_id: HexStr, sell_token: EthereumToken, buy_token: EthereumToken
) -> set[Capability]:
args = [HexBytes(pair_id), sell_token.address, buy_token.address]
res = self.call("getCapabilities", args, block_number=1)
return set(map(Capability, res.return_value[0]))
def min_gas_usage(self) -> int:
res = self.call("minGasUsage", [], block_number=1)
return res.return_value[0]

View File

@@ -1,78 +0,0 @@
// SPDX-License-Identifier: MIT
// OpenZeppelin Contracts (last updated v4.9.0) (token/ERC20/IERC20.sol)
pragma solidity ^0.8.19;
/**
* @dev Interface of the ERC20 standard as defined in the EIP.
*/
interface IERC20 {
/**
* @dev Emitted when `value` tokens are moved from one account (`from`) to
* another (`to`).
*
* Note that `value` may be zero.
*/
event Transfer(address indexed from, address indexed to, uint256 value);
/**
* @dev Emitted when the allowance of a `spender` for an `owner` is set by
* a call to {approve}. `value` is the new allowance.
*/
event Approval(address indexed owner, address indexed spender, uint256 value);
/**
* @dev Returns the amount of tokens in existence.
*/
function totalSupply() external view returns (uint256);
/**
* @dev Returns the amount of tokens owned by `account`.
*/
function balanceOf(address account) external view returns (uint256);
/**
* @dev Moves `amount` tokens from the caller's account to `to`.
*
* Returns a boolean value indicating whether the operation succeeded.
*
* Emits a {Transfer} event.
*/
function transfer(address to, uint256 amount) external returns (bool);
/**
* @dev Returns the remaining number of tokens that `spender` will be
* allowed to spend on behalf of `owner` through {transferFrom}. This is
* zero by default.
*
* This value changes when {approve} or {transferFrom} are called.
*/
function allowance(address owner, address spender) external view returns (uint256);
/**
* @dev Sets `amount` as the allowance of `spender` over the caller's tokens.
*
* Returns a boolean value indicating whether the operation succeeded.
*
* IMPORTANT: Beware that changing an allowance with this method brings the risk
* that someone may use both the old and the new allowance by unfortunate
* transaction ordering. One possible solution to mitigate this race
* condition is to first reduce the spender's allowance to 0 and set the
* desired value afterwards:
* https://github.com/ethereum/EIPs/issues/20#issuecomment-263524729
*
* Emits an {Approval} event.
*/
function approve(address spender, uint256 amount) external returns (bool);
/**
* @dev Moves `amount` tokens from `from` to `to` using the
* allowance mechanism. `amount` is then deducted from the caller's
* allowance.
*
* Returns a boolean value indicating whether the operation succeeded.
*
* Emits a {Transfer} event.
*/
function transferFrom(address from, address to, uint256 amount) external returns (bool);
}

View File

@@ -1,250 +0,0 @@
[
{
"inputs": [
{
"internalType": "uint256",
"name": "limit",
"type": "uint256"
}
],
"name": "LimitExceeded",
"type": "error"
},
{
"inputs": [
{
"internalType": "string",
"name": "reason",
"type": "string"
}
],
"name": "NotImplemented",
"type": "error"
},
{
"inputs": [
{
"internalType": "string",
"name": "reason",
"type": "string"
}
],
"name": "Unavailable",
"type": "error"
},
{
"inputs": [
{
"internalType": "bytes32",
"name": "poolId",
"type": "bytes32"
},
{
"internalType": "contract IERC20",
"name": "sellToken",
"type": "address"
},
{
"internalType": "contract IERC20",
"name": "buyToken",
"type": "address"
}
],
"name": "getCapabilities",
"outputs": [
{
"internalType": "enum ISwapAdapterTypes.Capability[]",
"name": "capabilities",
"type": "uint8[]"
}
],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "bytes32",
"name": "poolId",
"type": "bytes32"
},
{
"internalType": "contract IERC20",
"name": "sellToken",
"type": "address"
},
{
"internalType": "contract IERC20",
"name": "buyToken",
"type": "address"
}
],
"name": "getLimits",
"outputs": [
{
"internalType": "uint256[]",
"name": "limits",
"type": "uint256[]"
}
],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "uint256",
"name": "offset",
"type": "uint256"
},
{
"internalType": "uint256",
"name": "limit",
"type": "uint256"
}
],
"name": "getPoolIds",
"outputs": [
{
"internalType": "bytes32[]",
"name": "ids",
"type": "bytes32[]"
}
],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "bytes32",
"name": "poolId",
"type": "bytes32"
}
],
"name": "getTokens",
"outputs": [
{
"internalType": "contract IERC20[]",
"name": "tokens",
"type": "address[]"
}
],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "bytes32",
"name": "poolId",
"type": "bytes32"
},
{
"internalType": "contract IERC20",
"name": "sellToken",
"type": "address"
},
{
"internalType": "contract IERC20",
"name": "buyToken",
"type": "address"
},
{
"internalType": "uint256[]",
"name": "specifiedAmounts",
"type": "uint256[]"
}
],
"name": "price",
"outputs": [
{
"components": [
{
"internalType": "uint256",
"name": "numerator",
"type": "uint256"
},
{
"internalType": "uint256",
"name": "denominator",
"type": "uint256"
}
],
"internalType": "struct ISwapAdapterTypes.Fraction[]",
"name": "prices",
"type": "tuple[]"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{
"internalType": "bytes32",
"name": "poolId",
"type": "bytes32"
},
{
"internalType": "contract IERC20",
"name": "sellToken",
"type": "address"
},
{
"internalType": "contract IERC20",
"name": "buyToken",
"type": "address"
},
{
"internalType": "enum ISwapAdapterTypes.OrderSide",
"name": "side",
"type": "uint8"
},
{
"internalType": "uint256",
"name": "specifiedAmount",
"type": "uint256"
}
],
"name": "swap",
"outputs": [
{
"components": [
{
"internalType": "uint256",
"name": "calculatedAmount",
"type": "uint256"
},
{
"internalType": "uint256",
"name": "gasUsed",
"type": "uint256"
},
{
"components": [
{
"internalType": "uint256",
"name": "numerator",
"type": "uint256"
},
{
"internalType": "uint256",
"name": "denominator",
"type": "uint256"
}
],
"internalType": "struct ISwapAdapterTypes.Fraction",
"name": "price",
"type": "tuple"
}
],
"internalType": "struct ISwapAdapterTypes.Trade",
"name": "trade",
"type": "tuple"
}
],
"stateMutability": "nonpayable",
"type": "function"
}
]

View File

@@ -1,363 +0,0 @@
// SPDX-License-Identifier: MIT
// OpenZeppelin Contracts (last updated v4.9.0) (token/ERC20/ERC20.sol)
pragma solidity ^0.8.19;
import "./IERC20.sol";
/**
* @dev Provides information about the current execution context, including the
* sender of the transaction and its data. While these are generally available
* via msg.sender and msg.data, they should not be accessed in such a direct
* manner, since when dealing with meta-transactions the account sending and
* paying for execution may not be the actual sender (as far as an application
* is concerned).
*
* This contract is only required for intermediate, library-like contracts.
*/
abstract contract Context {
function _msgSender() internal view virtual returns (address) {
return msg.sender;
}
function _msgData() internal view virtual returns (bytes calldata) {
return msg.data;
}
}
/**
* @dev Interface for the optional metadata functions from the ERC20 standard.
*
* _Available since v4.1._
*/
interface IERC20Metadata is IERC20 {
/**
* @dev Returns the name of the token.
*/
function name() external view returns (string memory);
/**
* @dev Returns the symbol of the token.
*/
function symbol() external view returns (string memory);
/**
* @dev Returns the decimals places of the token.
*/
function decimals() external view returns (uint8);
}
/**
* @dev Implementation of the {IERC20} interface.
*
* This implementation is agnostic to the way tokens are created. This means
* that a supply mechanism has to be added in a derived contract using {_mint}.
*
* TIP: For a detailed writeup see our guide
* https://forum.openzeppelin.com/t/how-to-implement-erc20-supply-mechanisms/226[How
* to implement supply mechanisms].
*
* The default value of {decimals} is 18. To change this, you should override
* this function so it returns a different value.
*
* We have followed general OpenZeppelin Contracts guidelines: functions revert
* instead returning `false` on failure. This behavior is nonetheless
* conventional and does not conflict with the expectations of ERC20
* applications.
*
* Additionally, an {Approval} event is emitted on calls to {transferFrom}.
* This allows applications to reconstruct the allowance for all accounts just
* by listening to said events. Other implementations of the EIP may not emit
* these events, as it isn't required by the specification.
*
* Finally, the non-standard {decreaseAllowance} and {increaseAllowance}
* functions have been added to mitigate the well-known issues around setting
* allowances. See {IERC20-approve}.
*/
contract ERC20 is Context, IERC20, IERC20Metadata {
mapping(address => uint256) private _balances;
mapping(address => mapping(address => uint256)) private _allowances;
uint256 private _totalSupply;
string private _name;
string private _symbol;
uint8 private _decimals;
/**
* @dev Sets the values for {name}, {symbol} and {decimals}.
*
* All three of these values are immutable: they can only be set once during
* construction.
*/
constructor(string memory name_, string memory symbol_, uint8 decimals_) {
_name = name_;
_symbol = symbol_;
_decimals = decimals_;
}
/**
* @dev Returns the name of the token.
*/
function name() public view virtual returns (string memory) {
return _name;
}
/**
* @dev Returns the symbol of the token, usually a shorter version of the
* name.
*/
function symbol() public view virtual returns (string memory) {
return _symbol;
}
/**
* @dev Returns the number of decimals used to get its user representation.
* For example, if `decimals` equals `2`, a balance of `505` tokens should
* be displayed to a user as `5.05` (`505 / 10 ** 2`).
*
* Tokens usually opt for a value of 18, imitating the relationship between
* Ether and Wei. This is the default value returned by this function, unless
* it's overridden.
*
* NOTE: This information is only used for _display_ purposes: it in
* no way affects any of the arithmetic of the contract, including
* {IERC20-balanceOf} and {IERC20-transfer}.
*/
function decimals() public view virtual returns (uint8) {
return _decimals;
}
/**
* @dev See {IERC20-totalSupply}.
*/
function totalSupply() public view virtual returns (uint256) {
return _totalSupply;
}
/**
* @dev See {IERC20-balanceOf}.
*/
function balanceOf(address account) public view virtual returns (uint256) {
return _balances[account];
}
/**
* @dev See {IERC20-transfer}.
*
* Requirements:
*
* - `to` cannot be the zero address.
* - the caller must have a balance of at least `amount`.
*/
function transfer(address to, uint256 amount) public virtual returns (bool) {
address owner = _msgSender();
_transfer(owner, to, amount);
return true;
}
/**
* @dev See {IERC20-allowance}.
*/
function allowance(address owner, address spender) public view virtual returns (uint256) {
return _allowances[owner][spender];
}
/**
* @dev See {IERC20-approve}.
*
* NOTE: If `amount` is the maximum `uint256`, the allowance is not updated on
* `transferFrom`. This is semantically equivalent to an infinite approval.
*
* Requirements:
*
* - `spender` cannot be the zero address.
*/
function approve(address spender, uint256 amount) public virtual returns (bool) {
address owner = _msgSender();
_approve(owner, spender, amount);
return true;
}
/**
* @dev See {IERC20-transferFrom}.
*
* Emits an {Approval} event indicating the updated allowance. This is not
* required by the EIP. See the note at the beginning of {ERC20}.
*
* NOTE: Does not update the allowance if the current allowance
* is the maximum `uint256`.
*
* Requirements:
*
* - `from` and `to` cannot be the zero address.
* - `from` must have a balance of at least `amount`.
* - the caller must have allowance for ``from``'s tokens of at least
* `amount`.
*/
function transferFrom(address from, address to, uint256 amount) public virtual returns (bool) {
address spender = _msgSender();
_spendAllowance(from, spender, amount);
_transfer(from, to, amount);
return true;
}
/**
* @dev Atomically increases the allowance granted to `spender` by the caller.
*
* This is an alternative to {approve} that can be used as a mitigation for
* problems described in {IERC20-approve}.
*
* Emits an {Approval} event indicating the updated allowance.
*
* Requirements:
*
* - `spender` cannot be the zero address.
*/
function increaseAllowance(address spender, uint256 addedValue) public virtual returns (bool) {
address owner = _msgSender();
_approve(owner, spender, allowance(owner, spender) + addedValue);
return true;
}
/**
* @dev Atomically decreases the allowance granted to `spender` by the caller.
*
* This is an alternative to {approve} that can be used as a mitigation for
* problems described in {IERC20-approve}.
*
* Emits an {Approval} event indicating the updated allowance.
*
* Requirements:
*
* - `spender` cannot be the zero address.
* - `spender` must have allowance for the caller of at least
* `subtractedValue`.
*/
function decreaseAllowance(address spender, uint256 subtractedValue) public virtual returns (bool) {
address owner = _msgSender();
uint256 currentAllowance = allowance(owner, spender);
require(currentAllowance >= subtractedValue, "ERC20: decreased allowance below zero");
unchecked {
_approve(owner, spender, currentAllowance - subtractedValue);
}
return true;
}
/**
* @dev Moves `amount` of tokens from `from` to `to`.
*
* This internal function is equivalent to {transfer}, and can be used to
* e.g. implement automatic token fees, slashing mechanisms, etc.
*
* Emits a {Transfer} event.
*
* NOTE: This function is not virtual, {_update} should be overridden instead.
*/
function _transfer(address from, address to, uint256 amount) internal {
require(from != address(0), "ERC20: transfer from the zero address");
require(to != address(0), "ERC20: transfer to the zero address");
_update(from, to, amount);
}
/**
* @dev Transfers `amount` of tokens from `from` to `to`, or alternatively mints (or burns) if `from` (or `to`) is
* the zero address. All customizations to transfers, mints, and burns should be done by overriding this function.
*
* Emits a {Transfer} event.
*/
function _update(address from, address to, uint256 amount) internal virtual {
if (from == address(0)) {
_totalSupply += amount;
} else {
uint256 fromBalance = _balances[from];
require(fromBalance >= amount, "ERC20: transfer amount exceeds balance");
unchecked {
// Overflow not possible: amount <= fromBalance <= totalSupply.
_balances[from] = fromBalance - amount;
}
}
if (to == address(0)) {
unchecked {
// Overflow not possible: amount <= totalSupply or amount <= fromBalance <= totalSupply.
_totalSupply -= amount;
}
} else {
unchecked {
// Overflow not possible: balance + amount is at most totalSupply, which we know fits into a uint256.
_balances[to] += amount;
}
}
emit Transfer(from, to, amount);
}
/**
* @dev Creates `amount` tokens and assigns them to `account`, by transferring it from address(0).
* Relies on the `_update` mechanism
*
* Emits a {Transfer} event with `from` set to the zero address.
*
* NOTE: This function is not virtual, {_update} should be overridden instead.
*/
function _mint(address account, uint256 amount) internal {
require(account != address(0), "ERC20: mint to the zero address");
_update(address(0), account, amount);
}
/**
* @dev Destroys `amount` tokens from `account`, by transferring it to address(0).
* Relies on the `_update` mechanism.
*
* Emits a {Transfer} event with `to` set to the zero address.
*
* NOTE: This function is not virtual, {_update} should be overridden instead
*/
function _burn(address account, uint256 amount) internal {
require(account != address(0), "ERC20: burn from the zero address");
_update(account, address(0), amount);
}
/**
* @dev Sets `amount` as the allowance of `spender` over the `owner` s tokens.
*
* This internal function is equivalent to `approve`, and can be used to
* e.g. set automatic allowances for certain subsystems, etc.
*
* Emits an {Approval} event.
*
* Requirements:
*
* - `owner` cannot be the zero address.
* - `spender` cannot be the zero address.
*/
function _approve(address owner, address spender, uint256 amount) internal virtual {
require(owner != address(0), "ERC20: approve from the zero address");
require(spender != address(0), "ERC20: approve to the zero address");
_allowances[owner][spender] = amount;
emit Approval(owner, spender, amount);
}
/**
* @dev Updates `owner` s allowance for `spender` based on spent `amount`.
*
* Does not update the allowance amount in case of infinite allowance.
* Revert if not enough allowance is available.
*
* Might emit an {Approval} event.
*/
function _spendAllowance(address owner, address spender, uint256 amount) internal virtual {
uint256 currentAllowance = allowance(owner, spender);
if (currentAllowance != type(uint256).max) {
require(currentAllowance >= amount, "ERC20: insufficient allowance");
unchecked {
_approve(owner, spender, currentAllowance - amount);
}
}
}
}

View File

@@ -1,12 +0,0 @@
from pathlib import Path
from typing import Final
ASSETS_FOLDER = Path(__file__).parent / "assets"
TYCHO_CLIENT_FOLDER = Path(__file__).parent / "bins"
TYCHO_CLIENT_LOG_FOLDER = TYCHO_CLIENT_FOLDER / "logs"
EXTERNAL_ACCOUNT: Final[str] = "0xf847a638E44186F3287ee9F8cAF73FF4d4B80784"
"""This is a dummy address used as a transaction sender"""
UINT256_MAX: Final[int] = 2 ** 256 - 1
MAX_BALANCE: Final[int] = UINT256_MAX // 2
"""0.5 of the maximal possible balance to avoid overflow errors"""

View File

@@ -1,164 +0,0 @@
import time
from decimal import Decimal
from logging import getLogger
from typing import Any
import eth_abi
from eth_utils import keccak
from protosim_py import SimulationEngine, SimulationParameters, AccountInfo
from .constants import EXTERNAL_ACCOUNT
from .exceptions import TychoDecodeError
from .models import EVMBlock, EthereumToken
from .pool_state import ThirdPartyPool
from .tycho_db import TychoDBSingleton
from .utils import decode_tycho_exchange, get_code_for_address
log = getLogger(__name__)
class ThirdPartyPoolTychoDecoder:
"""ThirdPartyPool decoder for protocol messages from the Tycho feed"""
def __init__(self, adapter_contract: str, minimum_gas: int, trace: bool):
self.adapter_contract = adapter_contract
self.minimum_gas = minimum_gas
self.trace = trace
def decode_snapshot(
self,
snapshot: dict[str, Any],
block: EVMBlock,
tokens: dict[str, EthereumToken],
) -> tuple[dict[str, ThirdPartyPool], list[str]]:
pools = {}
failed_pools = []
for snap in snapshot.values():
try:
pool = self.decode_pool_state(snap, block, tokens)
pools[pool.id_] = pool
except TychoDecodeError as e:
log.error(f"Failed to decode third party snapshot: {e}")
failed_pools.append(snap["component"]["id"])
continue
return pools, failed_pools
def decode_pool_state(
self, snap: dict, block: EVMBlock, tokens: dict[str, EthereumToken]
) -> ThirdPartyPool:
component = snap["component"]
exchange, _ = decode_tycho_exchange(component["protocol_system"])
try:
tokens = tuple(tokens[t] for t in component["tokens"])
except KeyError as e:
raise TychoDecodeError("Unsupported token", pool_id=component["id"])
balances = self.decode_balances(snap, tokens)
optional_attributes = self.decode_optional_attributes(component, snap, block.id)
return ThirdPartyPool(
id_=optional_attributes.pop("pool_id", component["id"]),
tokens=tokens,
balances=balances,
block=block,
spot_prices={},
trading_fee=Decimal("0"),
exchange=exchange,
adapter_contract_name=self.adapter_contract,
minimum_gas=self.minimum_gas,
trace=self.trace,
**optional_attributes,
)
@staticmethod
def decode_optional_attributes(component, snap, block_number):
# Handle optional state attributes
attributes = snap["state"]["attributes"]
balance_owner = attributes.get("balance_owner")
stateless_contracts = {}
static_attributes = snap["component"]["static_attributes"]
pool_id = static_attributes.get("pool_id") or component["id"]
index = 0
while f"stateless_contract_addr_{index}" in static_attributes:
encoded_address = static_attributes[f"stateless_contract_addr_{index}"]
decoded = bytes.fromhex(
encoded_address[2:] if encoded_address.startswith('0x') else encoded_address).decode('utf-8')
if decoded.startswith("call"):
address = ThirdPartyPoolTychoDecoder.get_address_from_call(block_number, decoded)
else:
address = decoded
code = static_attributes.get(f"stateless_contract_code_{index}") or get_code_for_address(address)
stateless_contracts[address] = code
index += 1
index = 0
while f"stateless_contract_addr_{index}" in attributes:
address = attributes[f"stateless_contract_addr_{index}"]
code = attributes.get(f"stateless_contract_code_{index}") or get_code_for_address(address)
stateless_contracts[address] = code
index += 1
return {
"balance_owner": balance_owner,
"pool_id": pool_id,
"stateless_contracts": stateless_contracts,
}
@staticmethod
def get_address_from_call(block_number, decoded):
db = TychoDBSingleton.get_instance()
engine = SimulationEngine.new_with_tycho_db(db=db)
engine.init_account(
address="0x0000000000000000000000000000000000000000",
account=AccountInfo(balance=0, nonce=0),
mocked=False,
permanent_storage=None,
)
selector = keccak(text=decoded.split(":")[-1])[:4]
sim_result = engine.run_sim(SimulationParameters(
data=bytearray(selector),
to=decoded.split(':')[1],
block_number=block_number,
timestamp=int(time.time()),
overrides={},
caller=EXTERNAL_ACCOUNT,
value=0,
))
address = eth_abi.decode(["address"], bytearray(sim_result.result))
return address[0]
@staticmethod
def decode_balances(snap, tokens):
balances = {}
for addr, balance in snap["state"]["balances"].items():
checksum_addr = addr
token = next(t for t in tokens if t.address == checksum_addr)
balances[token.address] = token.from_onchain_amount(
int(balance, 16) # balances are big endian encoded
)
return balances
@staticmethod
def apply_update(
pool: ThirdPartyPool,
pool_update: dict[str, Any],
balance_updates: dict[str, Any],
block: EVMBlock,
) -> ThirdPartyPool:
# check for and apply optional state attributes
attributes = pool_update.get("updated_attributes")
if attributes:
# TODO: handle balance_owner and stateless_contracts updates
pass
for addr, balance_msg in balance_updates.items():
token = [t for t in pool.tokens if t.address == addr][0]
balance = int(balance_msg["balance"], 16) # balances are big endian encoded
pool.balances[token.address] = token.from_onchain_amount(balance)
pool.block = block
# we clear simulation cache and overwrites on the pool and trigger a recalculation of spot prices
pool.clear_all_cache()
return pool

View File

@@ -1,59 +0,0 @@
from decimal import Decimal
class TychoDecodeError(Exception):
def __init__(self, msg: str, pool_id: str):
super().__init__(msg)
self.pool_id = pool_id
class APIRequestError(Exception):
pass
class TradeSimulationException(Exception):
def __init__(self, message, pool_id: str):
self.pool_id = pool_id
super().__init__(message)
class RecoverableSimulationException(TradeSimulationException):
"""Marks that the simulation could not fully fulfill the requested order.
Provides a partial trade that is valid but does not fully fulfill the conditions
requested.
Parameters
----------
message
Error message
pool_id
ID of a pool that caused the error
partial_trade
A tuple of (bought_amount, gas_used, new_pool_state, sold_amount)
"""
def __init__(
self,
message,
pool_id: str,
partial_trade: tuple[Decimal, int, "ThirdPartyPool", Decimal] = None,
):
super().__init__(message, pool_id)
self.partial_trade = partial_trade
class OutOfGas(RecoverableSimulationException):
"""This exception indicates that the underlying VM **likely** ran out of gas.
It is not easy to judge whether it was really due to out of gas, as the details
of the SC being called might be hiding this. E.g. out of gas may happen while
calling an external contract, which might show as the external call failing, although
it was due to a lack of gas.
"""
pass
class TychoClientException(Exception):
pass

View File

@@ -1,127 +0,0 @@
import datetime
from decimal import Decimal, localcontext, Context, ROUND_FLOOR, InvalidOperation
from enum import Enum, IntEnum, auto
from fractions import Fraction
from logging import getLogger
from typing import Union
from pydantic import BaseModel, Field, PrivateAttr
Address = str
log = getLogger(__name__)
class Blockchain(Enum):
ethereum = "ethereum"
arbitrum = "arbitrum"
polygon = "polygon"
zksync = "zksync"
class EVMBlock(BaseModel):
id: int
ts: datetime.datetime = Field(default_factory=datetime.datetime.utcnow)
hash_: str
class EthereumToken(BaseModel):
symbol: str
address: str
decimals: int
gas: Union[int, list[int]] = 29000
_hash: int = PrivateAttr(default=None)
def to_onchain_amount(self, amount: Union[float, Decimal, str]) -> int:
"""Converts floating-point numerals to an integer, by shifting right by the
token's maximum amount of decimals (e.g.: 1.000000 becomes 1000000).
For the reverse operation please see self.from_onchain_amount
"""
if not isinstance(amount, Decimal):
log.warning(f"Expected variable of type Decimal. Got {type(amount)}.")
with localcontext(Context(rounding=ROUND_FLOOR, prec=256)):
amount = Decimal(str(amount)) * (10 ** self.decimals)
try:
amount = amount.quantize(Decimal("1.0"))
except InvalidOperation:
log.error(
f"Quantize failed for {self.symbol}, {amount}, {self.decimals}"
)
return int(amount)
def from_onchain_amount(
self, onchain_amount: Union[int, Fraction], quantize: bool = True
) -> Decimal:
"""Converts an Integer to a quantized decimal, by shifting left by the token's
maximum amount of decimals (e.g.: 1000000 becomes 1.000000 for a 6-decimal token
For the reverse operation please see self.to_onchain_amount
If the onchain_amount is too low, then using quantize can underflow without
raising and the offchain amount returned is 0.
See _decimal.Decimal.quantize docstrings for details.
Quantize is needed for UniswapV2.
"""
with localcontext(Context(rounding=ROUND_FLOOR, prec=256)):
if isinstance(onchain_amount, Fraction):
return (
Decimal(onchain_amount.numerator)
/ Decimal(onchain_amount.denominator)
/ Decimal(10 ** self.decimals)
).quantize(Decimal(f"{1 / 10 ** self.decimals}"))
if quantize is True:
try:
amount = (
Decimal(str(onchain_amount)) / 10 ** self.decimals
).quantize(Decimal(f"{1 / 10 ** self.decimals}"))
except InvalidOperation:
amount = Decimal(str(onchain_amount)) / Decimal(10 ** self.decimals)
else:
amount = Decimal(str(onchain_amount)) / Decimal(10 ** self.decimals)
return amount
def __repr__(self):
return self.symbol
def __str__(self):
return self.symbol
def __eq__(self, other) -> bool:
# this is faster than calling custom __hash__, due to cache check
return other.address == self.address
def __hash__(self) -> int:
if self._hash is None:
# caching the hash saves time during graph search
self._hash = hash(self.address)
return self._hash
class DatabaseType(Enum):
# Make call to the node each time it needs a storage (unless cached from a previous call).
rpc_reader = "rpc_reader"
# Connect to Tycho and cache the whole state of a target contract, the state is continuously updated by Tycho.
# To use this we need Tycho to be configured to index the target contract state.
tycho = "tycho"
class Capability(IntEnum):
SellSide = auto()
BuySide = auto()
PriceFunction = auto()
FeeOnTransfer = auto()
ConstantPrice = auto()
TokenBalanceIndependent = auto()
ScaledPrice = auto()
HardLimits = auto()
MarginalPrice = auto()
class SynchronizerState(Enum):
started = "started"
ready = "ready"
stale = "stale"
delayed = "delayed"
advanced = "advanced"
ended = "ended"

View File

@@ -1,347 +0,0 @@
import functools
import itertools
from collections import defaultdict
from copy import deepcopy
from decimal import Decimal
from fractions import Fraction
from logging import getLogger
from typing import Optional, cast, TypeVar, Annotated
from eth_typing import HexStr
from protosim_py import SimulationEngine, AccountInfo
from pydantic import BaseModel, PrivateAttr, Field
from .adapter_contract import AdapterContract
from .constants import MAX_BALANCE, EXTERNAL_ACCOUNT
from .exceptions import RecoverableSimulationException
from .models import EVMBlock, Capability, Address, EthereumToken
from .utils import (
create_engine,
get_contract_bytecode,
frac_to_decimal,
ERC20OverwriteFactory,
)
ADAPTER_ADDRESS = "0xA2C5C98A892fD6656a7F39A2f63228C0Bc846270"
log = getLogger(__name__)
TPoolState = TypeVar("TPoolState", bound="ThirdPartyPool")
class ThirdPartyPool(BaseModel):
id_: str
tokens: tuple[EthereumToken, ...]
balances: dict[Address, Decimal]
block: EVMBlock
spot_prices: dict[tuple[EthereumToken, EthereumToken], Decimal]
trading_fee: Decimal
exchange: str
minimum_gas: int
_engine: SimulationEngine = PrivateAttr(default=None)
adapter_contract_name: str
"""The adapters contract name. Used to look up the byte code for the adapter."""
_adapter_contract: AdapterContract = PrivateAttr(default=None)
stateless_contracts: dict[str, bytes] = {}
"""The address to bytecode map of all stateless contracts used by the protocol for simulations."""
capabilities: set[Capability] = Field(default_factory=lambda: {Capability.SellSide})
"""The supported capabilities of this pool."""
balance_owner: Optional[str] = None
"""The contract address for where protocol balances are stored (i.e. a vault contract).
If given, balances will be overwritten here instead of on the pool contract during simulations."""
block_lasting_overwrites: defaultdict[
Address,
Annotated[dict[int, int], Field(default_factory=lambda: defaultdict[dict])],
] = Field(default_factory=lambda: defaultdict(dict))
"""Storage overwrites that will be applied to all simulations. They will be cleared
when ``clear_all_cache`` is called, i.e. usually at each block. Hence the name."""
trace: bool = False
def __init__(self, **data):
super().__init__(**data)
self._set_engine(data.get("engine", None))
self.balance_owner = data.get("balance_owner", None)
self._adapter_contract = AdapterContract(ADAPTER_ADDRESS, self._engine)
self._set_capabilities()
if len(self.spot_prices) == 0:
self._set_spot_prices()
def _set_engine(self, engine: Optional[SimulationEngine]):
"""Set instance's simulation engine. If no engine given, make a default one.
If engine is already set, this is a noop.
The engine will have the specified adapter contract mocked, as well as the
tokens used by the pool.
Parameters
----------
engine
Optional simulation engine instance.
"""
if self._engine is not None:
return
else:
engine = create_engine([t.address for t in self.tokens], trace=self.trace)
engine.init_account(
address="0x0000000000000000000000000000000000000000",
account=AccountInfo(balance=0, nonce=0),
mocked=False,
permanent_storage=None,
)
engine.init_account(
address="0x0000000000000000000000000000000000000004",
account=AccountInfo(balance=0, nonce=0),
mocked=False,
permanent_storage=None,
)
engine.init_account(
address=ADAPTER_ADDRESS,
account=AccountInfo(
balance=MAX_BALANCE,
nonce=0,
code=get_contract_bytecode(self.adapter_contract_name),
),
mocked=False,
permanent_storage=None,
)
for addr, bytecode in self.stateless_contracts.items():
engine.init_account(
address=addr,
account=AccountInfo(balance=0, nonce=0, code=bytecode),
mocked=False,
permanent_storage=None,
)
self._engine = engine
def _set_spot_prices(self):
"""Set the spot prices for this pool.
We currently require the price function capability for now.
"""
self._ensure_capability(Capability.PriceFunction)
for t0, t1 in itertools.permutations(self.tokens, 2):
sell_amount = t0.to_onchain_amount(
self.get_sell_amount_limit(t0, t1) * Decimal("0.01")
)
frac = self._adapter_contract.price(
cast(HexStr, self.id_),
t0,
t1,
[sell_amount],
block=self.block,
overwrites=self.block_lasting_overwrites,
)[0]
if Capability.ScaledPrice in self.capabilities:
self.spot_prices[(t0, t1)] = frac_to_decimal(frac)
else:
scaled = frac * Fraction(10 ** t0.decimals, 10 ** t1.decimals)
self.spot_prices[(t0, t1)] = frac_to_decimal(scaled)
def _ensure_capability(self, capability: Capability):
"""Ensures the protocol/adapter implement a certain capability."""
if capability not in self.capabilities:
raise NotImplemented(f"{capability} not available!")
def _set_capabilities(self):
"""Sets capabilities of the pool."""
capabilities = []
for t0, t1 in itertools.permutations(self.tokens, 2):
capabilities.append(
self._adapter_contract.get_capabilities(cast(HexStr, self.id_), t0, t1)
)
max_capabilities = max(map(len, capabilities))
self.capabilities = functools.reduce(set.intersection, capabilities)
if len(self.capabilities) < max_capabilities:
log.warning(
f"Pool {self.id_} hash different capabilities depending on the token pair!"
)
def get_amount_out(
self: TPoolState,
sell_token: EthereumToken,
sell_amount: Decimal,
buy_token: EthereumToken,
) -> tuple[Decimal, int, TPoolState]:
# if the pool has a hard limit and the sell amount exceeds that, simulate and
# raise a partial trade
if Capability.HardLimits in self.capabilities:
sell_limit = self.get_sell_amount_limit(sell_token, buy_token)
if sell_amount > sell_limit:
partial_trade = self._get_amount_out(sell_token, sell_limit, buy_token)
raise RecoverableSimulationException(
"Sell amount exceeds sell limit",
repr(self),
partial_trade + (sell_limit,),
)
return self._get_amount_out(sell_token, sell_amount, buy_token)
def _get_amount_out(
self: TPoolState,
sell_token: EthereumToken,
sell_amount: Decimal,
buy_token: EthereumToken,
) -> tuple[Decimal, int, TPoolState]:
trade, state_changes = self._adapter_contract.swap(
cast(HexStr, self.id_),
sell_token,
buy_token,
False,
sell_token.to_onchain_amount(sell_amount),
block=self.block,
overwrites=self._get_overwrites(sell_token, buy_token),
)
new_state = self._duplicate()
for address, state_update in state_changes.items():
for slot, value in state_update.storage.items():
new_state.block_lasting_overwrites[address][slot] = value
new_price = frac_to_decimal(trade.price)
if new_price != Decimal(0):
new_state.spot_prices = {
(sell_token, buy_token): new_price,
(buy_token, sell_token): Decimal(1) / new_price,
}
buy_amount = buy_token.from_onchain_amount(trade.received_amount)
return buy_amount, trade.gas_used, new_state
def _get_overwrites(
self, sell_token: EthereumToken, buy_token: EthereumToken, **kwargs
) -> dict[Address, dict[int, int]]:
"""Get an overwrites dictionary to use in a simulation.
The returned overwrites include block-lasting overwrites set on the instance
level, and token-specific overwrites that depend on passed tokens.
"""
token_overwrites = self._get_token_overwrites(sell_token, buy_token, **kwargs)
return _merge(self.block_lasting_overwrites, token_overwrites)
def _get_token_overwrites(
self, sell_token: EthereumToken, buy_token: EthereumToken, max_amount=None
) -> dict[Address, dict[int, int]]:
"""Creates overwrites for a token.
Funds external account with enough tokens to execute swaps. Also creates a
corresponding approval to the adapter contract.
If the protocol reads its own token balances, the balances for the underlying
pool contract will also be overwritten.
"""
res = []
if Capability.TokenBalanceIndependent not in self.capabilities:
res = [self._get_balance_overwrites()]
# avoids recursion if using this method with get_sell_amount_limit
if max_amount is None:
max_amount = sell_token.to_onchain_amount(
self.get_sell_amount_limit(sell_token, buy_token)
)
overwrites = ERC20OverwriteFactory(sell_token)
overwrites.set_balance(max_amount, EXTERNAL_ACCOUNT)
overwrites.set_allowance(
allowance=max_amount, owner=EXTERNAL_ACCOUNT, spender=ADAPTER_ADDRESS
)
res.append(overwrites.get_protosim_overwrites())
# we need to merge the dictionaries because balance overwrites may target
# the same token address.
res = functools.reduce(_merge, res)
return res
def _get_balance_overwrites(self) -> dict[Address, dict[int, int]]:
balance_overwrites = {}
address = self.balance_owner or self.id_
for t in self.tokens:
overwrites = ERC20OverwriteFactory(t)
overwrites.set_balance(
t.to_onchain_amount(self.balances[t.address]), address
)
balance_overwrites.update(overwrites.get_protosim_overwrites())
return balance_overwrites
def _duplicate(self: type["ThirdPartyPool"]) -> "ThirdPartyPool":
"""Make a new instance identical to self that shares the same simulation engine.
Note that the new and current state become coupled in a way that they must
simulate the same block. This is fine, see
https://datarevenue.atlassian.net/browse/ROC-1301
Not naming this method _copy to not confuse with Pydantic's .copy method.
"""
return type(self)(
exchange=self.exchange,
adapter_contract_name=self.adapter_contract_name,
block=self.block,
id_=self.id_,
tokens=self.tokens,
spot_prices=self.spot_prices.copy(),
trading_fee=self.trading_fee,
block_lasting_overwrites=deepcopy(self.block_lasting_overwrites),
engine=self._engine,
balances=self.balances,
minimum_gas=self.minimum_gas,
balance_owner=self.balance_owner,
stateless_contracts=self.stateless_contracts,
)
def get_sell_amount_limit(
self, sell_token: EthereumToken, buy_token: EthereumToken
) -> Decimal:
"""
Retrieves the sell amount of the given token.
For pools with more than 2 tokens, the sell limit is obtain for all possible buy token
combinations and the minimum is returned.
"""
limit = self._adapter_contract.get_limits(
cast(HexStr, self.id_),
sell_token,
buy_token,
block=self.block,
overwrites=self._get_overwrites(
sell_token, buy_token, max_amount=MAX_BALANCE // 100
),
)[0]
return sell_token.from_onchain_amount(limit)
def clear_all_cache(self):
self._engine.clear_temp_storage()
self.block_lasting_overwrites = defaultdict(dict)
self._set_spot_prices()
def _merge(a: dict, b: dict, path=None):
"""
Merges two dictionaries (a and b) deeply. This means it will traverse and combine
their nested dictionaries too if present.
Parameters:
a (dict): The first dictionary to merge.
b (dict): The second dictionary to merge into the first one.
path (list, optional): An internal parameter used during recursion
to keep track of the ancestry of nested dictionaries.
Returns:
a (dict): The merged dictionary which includes all key-value pairs from `b`
added into `a`. If they have nested dictionaries with same keys, those are also merged.
On key conflicts, preference is given to values from b.
"""
if path is None:
path = []
for key in b:
if key in a:
if isinstance(a[key], dict) and isinstance(b[key], dict):
_merge(a[key], b[key], path + [str(key)])
else:
a[key] = b[key]
return a

View File

@@ -1,345 +0,0 @@
import asyncio
import json
import platform
import time
from asyncio.subprocess import STDOUT, PIPE
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal
from http.client import HTTPException
from logging import getLogger
from typing import Any, Optional, Dict
import requests
from protosim_py import AccountUpdate, AccountInfo, BlockHeader
from .constants import TYCHO_CLIENT_LOG_FOLDER, TYCHO_CLIENT_FOLDER
from .decoders import ThirdPartyPoolTychoDecoder
from .exceptions import APIRequestError, TychoClientException
from .models import Blockchain, EVMBlock, EthereumToken, SynchronizerState, Address
from .pool_state import ThirdPartyPool
from .tycho_db import TychoDBSingleton
from .utils import create_engine
log = getLogger(__name__)
class TokenLoader:
def __init__(
self,
tycho_url: str,
blockchain: Blockchain,
min_token_quality: Optional[int] = 0,
):
self.tycho_url = tycho_url
self.blockchain = blockchain
self.min_token_quality = min_token_quality
self.endpoint = "/v1/{}/tokens"
self._token_limit = 10000
def get_tokens(self) -> dict[str, EthereumToken]:
"""Loads all tokens from Tycho RPC"""
url = self.tycho_url + self.endpoint.format(self.blockchain.value)
page = 0
start = time.monotonic()
all_tokens = []
while data := self._get_all_with_pagination(
url=url,
page=page,
limit=self._token_limit,
params={"min_quality": self.min_token_quality},
):
all_tokens.extend(data)
page += 1
if len(data) < self._token_limit:
break
log.info(f"Loaded {len(all_tokens)} tokens in {time.monotonic() - start:.2f}s")
formatted_tokens = dict()
for token in all_tokens:
formatted = EthereumToken(**token)
formatted_tokens[formatted.address] = formatted
return formatted_tokens
def get_token_subset(self, addresses: list[str]) -> dict[str, EthereumToken]:
"""Loads a subset of tokens from Tycho RPC"""
url = self.tycho_url + self.endpoint.format(self.blockchain.value)
page = 0
start = time.monotonic()
all_tokens = []
while data := self._get_all_with_pagination(
url=url,
page=page,
limit=self._token_limit,
params={"min_quality": self.min_token_quality, "addresses": addresses},
):
all_tokens.extend(data)
page += 1
if len(data) < self._token_limit:
break
log.info(f"Loaded {len(all_tokens)} tokens in {time.monotonic() - start:.2f}s")
formatted_tokens = dict()
for token in all_tokens:
formatted = EthereumToken(**token)
formatted_tokens[formatted.address] = formatted
return formatted_tokens
@staticmethod
def _get_all_with_pagination(
url: str, params: Optional[Dict] = None, page: int = 0, limit: int = 50
) -> Dict:
if params is None:
params = {}
params["pagination"] = {"page": page, "page_size": limit}
r = requests.post(url, json=params)
try:
r.raise_for_status()
except HTTPException as e:
log.error(f"Request status {r.status_code} with content {r.json()}")
raise APIRequestError("Failed to load token configurations")
return r.json()["tokens"]
@dataclass(repr=False)
class BlockProtocolChanges:
block: EVMBlock
pool_states: dict[Address, ThirdPartyPool]
"""All updated pools"""
removed_pools: set[Address]
deserialization_time: float
"""The time it took to deserialize the pool states from the tycho feed message"""
class TychoPoolStateStreamAdapter:
def __init__(
self,
tycho_url: str,
protocol: str,
decoder: ThirdPartyPoolTychoDecoder,
blockchain: Blockchain,
min_tvl: Optional[Decimal] = 10,
min_token_quality: Optional[int] = 0,
include_state=True,
):
"""
:param tycho_url: URL to connect to Tycho DB
:param protocol: Name of the protocol that you're testing
:param blockchain: Blockchain enum
:param min_tvl: Minimum TVL to consider a pool
:param min_token_quality: Minimum token quality to consider a token
:param include_state: Include state in the stream
"""
self.min_token_quality = min_token_quality
self.tycho_url = tycho_url
self.min_tvl = min_tvl
self.tycho_client = None
self.protocol = f"vm:{protocol}"
self._include_state = include_state
self._blockchain = blockchain
self._decoder = decoder
# Create engine
# TODO: This should be initialized outside the adapter?
TychoDBSingleton.initialize(tycho_http_url=self.tycho_url)
self._engine = create_engine([], trace=False)
# Loads tokens from Tycho
self._tokens: dict[str, EthereumToken] = TokenLoader(
tycho_url=f"http://{self.tycho_url}",
blockchain=self._blockchain,
min_token_quality=self.min_token_quality,
).get_tokens()
async def start(self):
"""Start the tycho-client Rust binary through subprocess"""
# stdout=PIPE means that the output is piped directly to this Python process
# stderr=STDOUT combines the stderr and stdout streams
bin_path = self._get_binary_path()
cmd = [
"--log-folder",
str(TYCHO_CLIENT_LOG_FOLDER),
"--tycho-url",
self.tycho_url,
"--min-tvl",
str(self.min_tvl),
]
if not self._include_state:
cmd.append("--no-state")
cmd.append("--exchange")
cmd.append(self.protocol)
log.debug(f"Starting tycho-client binary at {bin_path}. CMD: {cmd}")
self.tycho_client = await asyncio.create_subprocess_exec(
str(bin_path), *cmd, stdout=PIPE, stderr=STDOUT, limit=2 ** 64
)
@staticmethod
def _get_binary_path():
"""Determines the correct binary path based on the OS and architecture."""
os_name = platform.system()
if os_name == "Linux":
architecture = platform.machine()
if architecture == "aarch64":
return TYCHO_CLIENT_FOLDER / "tycho-client-linux-arm64"
else:
return TYCHO_CLIENT_FOLDER / "tycho-client-linux-x64"
elif os_name == "Darwin":
architecture = platform.machine()
if architecture == "arm64":
return TYCHO_CLIENT_FOLDER / "tycho-client-mac-arm64"
else:
return TYCHO_CLIENT_FOLDER / "tycho-client-mac-x64"
else:
raise ValueError(f"Unsupported OS: {os_name}")
def __aiter__(self):
return self
async def __anext__(self) -> BlockProtocolChanges:
if self.tycho_client.stdout.at_eof():
raise StopAsyncIteration
line = await self.tycho_client.stdout.readline()
try:
if not line:
exit_code = await self.tycho_client.wait()
if exit_code == 0:
# Clean exit, handle accordingly, possibly without raising an error
log.debug("Tycho client exited cleanly.")
raise StopAsyncIteration
else:
line = f"Tycho client failed with exit code: {exit_code}"
# Non-zero exit code, handle accordingly, possibly by raising an error
raise TychoClientException(line)
msg = json.loads(line.decode("utf-8"))
except (json.JSONDecodeError, TychoClientException):
# Read the last 10 lines from the log file available under TYCHO_CLIENT_LOG_FOLDER
# and raise an exception with the last 10 lines
error_msg = f"Invalid JSON output on tycho. Original line: {line}."
with open(TYCHO_CLIENT_LOG_FOLDER / "dev_logs.log", "r") as f:
lines = f.readlines()
last_lines = lines[-10:]
error_msg += f" Tycho logs: {last_lines}"
log.exception(error_msg)
raise Exception("Tycho-client failed.")
return self.process_tycho_message(msg)
@staticmethod
def build_snapshot_message(
protocol_components: dict, protocol_states: dict, contract_states: dict
) -> dict[str, ThirdPartyPool]:
vm_states = {state["address"]: state for state in contract_states["accounts"]}
states = {}
for component in protocol_components["protocol_components"]:
pool_id = component["id"]
states[pool_id] = {"component": component}
for state in protocol_states["states"]:
pool_id = state["component_id"]
if pool_id not in states:
log.debug(f"{pool_id} was present in snapshot but not in components")
continue
states[pool_id]["state"] = state
snapshot = {"vm_storage": vm_states, "states": states}
return snapshot
def process_tycho_message(self, msg) -> BlockProtocolChanges:
self._validate_sync_states(msg)
state_msg = msg["state_msgs"][self.protocol]
block = EVMBlock(
id=msg["block"]["id"],
ts=datetime.fromtimestamp(msg["block"]["timestamp"]),
hash_=msg["block"]["hash"],
)
return self.process_snapshot(block, state_msg["snapshot"])
def process_snapshot(
self, block: EVMBlock, state_msg: dict
) -> BlockProtocolChanges:
start = time.monotonic()
removed_pools = set()
decoded_count = 0
failed_count = 0
self._process_vm_storage(state_msg["vm_storage"], block)
# decode new components
decoded_pools, failed_pools = self._decoder.decode_snapshot(
state_msg["states"], block, self._tokens
)
decoded_count += len(decoded_pools)
failed_count += len(failed_pools)
decoded_pools = {
p.id_: p for p in decoded_pools.values()
} # remap pools to their pool ids
deserialization_time = time.monotonic() - start
total = decoded_count + failed_count
log.debug(
f"Received {total} snapshots. n_decoded: {decoded_count}, n_failed: {failed_count}"
)
if failed_count > 0:
log.info(f"Could not to decode {failed_count}/{total} pool snapshots")
return BlockProtocolChanges(
block=block,
pool_states=decoded_pools,
removed_pools=removed_pools,
deserialization_time=round(deserialization_time, 3),
)
def _validate_sync_states(self, msg):
try:
sync_state = msg["sync_states"][self.protocol]
log.info(f"Received sync state for {self.protocol}: {sync_state}")
if not sync_state["status"] != SynchronizerState.ready.value:
raise ValueError("Tycho-indexer is not synced")
except KeyError:
raise ValueError("Invalid message received from tycho-client.")
def _process_vm_storage(self, storage: dict[str, Any], block: EVMBlock):
vm_updates = []
for storage_update in storage.values():
address = storage_update["address"]
balance = int(storage_update["native_balance"], 16)
code = bytearray.fromhex(storage_update["code"][2:])
# init accounts
self._engine.init_account(
address=address,
account=AccountInfo(balance=balance, nonce=0, code=code),
mocked=False,
permanent_storage=None,
)
# apply account updates
slots = {int(k, 16): int(v, 16) for k, v in storage_update["slots"].items()}
vm_updates.append(
AccountUpdate(
address=address,
chain=storage_update["chain"],
slots=slots,
balance=balance,
code=code,
change="Update",
)
)
block_header = BlockHeader(block.id, block.hash_, int(block.ts.timestamp()))
TychoDBSingleton.get_instance().update(vm_updates, block_header)

View File

@@ -1,48 +0,0 @@
from protosim_py import TychoDB
class TychoDBSingleton:
"""
A singleton wrapper around the TychoDB class.
This class ensures that there is only one instance of TychoDB throughout the lifetime of the program,
avoiding the overhead of creating multiple instances.
"""
_instance = None
@classmethod
def initialize(cls, tycho_http_url: str):
"""
Initialize the TychoDB instance with the given URLs.
Parameters
----------
tycho_http_url : str
The URL of the Tycho HTTP server.
"""
cls._instance = TychoDB(tycho_http_url=tycho_http_url)
@classmethod
def get_instance(cls) -> TychoDB:
"""
Retrieve the singleton instance of TychoDB.
If the TychoDB instance does not exist, it creates a new one.
If it already exists, it returns the existing instance.
Returns
-------
TychoDB
The singleton instance of TychoDB.
"""
if cls._instance is None:
raise ValueError(
"TychoDB instance not initialized. Call initialize() first."
)
return cls._instance
@classmethod
def clear_instance(cls):
cls._instance = None

View File

@@ -1,355 +0,0 @@
import json
import os
from decimal import Decimal
from fractions import Fraction
from functools import lru_cache
from logging import getLogger
from pathlib import Path
from typing import Final, Any
import eth_abi
from eth_typing import HexStr
from hexbytes import HexBytes
from protosim_py import SimulationEngine, AccountInfo
import requests
from web3 import Web3
from .constants import EXTERNAL_ACCOUNT, MAX_BALANCE, ASSETS_FOLDER
from .exceptions import OutOfGas
from .models import Address, EthereumToken
from .tycho_db import TychoDBSingleton
log = getLogger(__name__)
def decode_tycho_exchange(exchange: str) -> (str, bool):
# removes vm prefix if present, returns True if vm prefix was present (vm protocol) or False if native protocol
return (exchange.split(":")[1], False) if "vm:" in exchange else (exchange, True)
def create_engine(
mocked_tokens: list[Address], trace: bool = False
) -> SimulationEngine:
"""Create a simulation engine with a mocked ERC20 contract at given addresses.
Parameters
----------
mocked_tokens
A list of addresses at which a mocked ERC20 contract should be inserted.
trace
Whether to trace calls, only meant for debugging purposes, might print a lot of
data to stdout.
"""
db = TychoDBSingleton.get_instance()
engine = SimulationEngine.new_with_tycho_db(db=db, trace=trace)
for t in mocked_tokens:
info = AccountInfo(
balance=0, nonce=0, code=get_contract_bytecode(ASSETS_FOLDER / "ERC20.bin")
)
engine.init_account(
address=t, account=info, mocked=True, permanent_storage=None
)
engine.init_account(
address=EXTERNAL_ACCOUNT,
account=AccountInfo(balance=MAX_BALANCE, nonce=0, code=None),
mocked=False,
permanent_storage=None,
)
return engine
class ERC20OverwriteFactory:
def __init__(self, token: EthereumToken):
"""
Initialize the ERC20OverwriteFactory.
Parameters:
token: The token object.
"""
self._token = token
self._overwrites = dict()
self._balance_slot: Final[int] = 0
self._allowance_slot: Final[int] = 1
self._total_supply_slot: Final[int] = 2
def set_balance(self, balance: int, owner: Address):
"""
Set the balance for a given owner.
Parameters:
balance: The balance value.
owner: The owner's address.
"""
storage_index = get_storage_slot_at_key(HexStr(owner), self._balance_slot)
self._overwrites[storage_index] = balance
log.log(
5,
f"Override balance: token={self._token.address} owner={owner}"
f"value={balance} slot={storage_index}",
)
def set_allowance(self, allowance: int, spender: Address, owner: Address):
"""
Set the allowance for a given spender and owner.
Parameters:
allowance: The allowance value.
spender: The spender's address.
owner: The owner's address.
"""
storage_index = get_storage_slot_at_key(
HexStr(spender),
get_storage_slot_at_key(HexStr(owner), self._allowance_slot),
)
self._overwrites[storage_index] = allowance
log.log(
5,
f"Override allowance: token={self._token.address} owner={owner}"
f"spender={spender} value={allowance} slot={storage_index}",
)
def set_total_supply(self, supply: int):
"""
Set the total supply of the token.
Parameters:
supply: The total supply value.
"""
self._overwrites[self._total_supply_slot] = supply
log.log(
5,
f"Override total supply: token={self._token.address} supply={supply}"
)
def get_protosim_overwrites(self) -> dict[Address, dict[int, int]]:
"""
Get the overwrites dictionary of previously collected values.
Returns:
dict[Address, dict]: A dictionary containing the token's address
and the overwrites.
"""
# Protosim returns lowercase addresses in state updates returned from simulation
return {self._token.address.lower(): self._overwrites}
def get_geth_overwrites(self) -> dict[Address, dict[int, int]]:
"""
Get the overwrites dictionary of previously collected values.
Returns:
dict[Address, dict]: A dictionary containing the token's address
and the overwrites.
"""
formatted_overwrites = {
HexBytes(key).hex(): "0x" + HexBytes(val).hex().lstrip("0x").zfill(64)
for key, val in self._overwrites.items()
}
code = "0x" + get_contract_bytecode(ASSETS_FOLDER / "ERC20.bin").hex()
return {self._token.address: {"stateDiff": formatted_overwrites, "code": code}}
def get_storage_slot_at_key(key: Address, mapping_slot: int) -> int:
"""Get storage slot index of a value stored at a certain key in a mapping
Parameters
----------
key
Key in a mapping. This function is meant to work with ethereum addresses
and accepts only strings.
mapping_slot
Storage slot at which the mapping itself is stored. See the examples for more
explanation.
Returns
-------
slot
An index of a storage slot where the value at the given key is stored.
Examples
--------
If a mapping is declared as a first variable in solidity code, its storage slot
is 0 (e.g. ``balances`` in our mocked ERC20 contract). Here's how to compute
a storage slot where balance of a given account is stored::
get_storage_slot_at_key("0xC63135E4bF73F637AF616DFd64cf701866BB2628", 0)
For nested mappings, we need to apply the function twice. An example of this is
``allowances`` in ERC20. It is a mapping of form:
``dict[owner, dict[spender, value]]``. In our mocked ERC20 contract, ``allowances``
is a second variable, so it is stored at slot 1. Here's how to get a storage slot
where an allowance of ``0xspender`` to spend ``0xowner``'s money is stored::
get_storage_slot_at_key("0xspender", get_storage_slot_at_key("0xowner", 1)))
See Also
--------
`Solidity Storage Layout documentation
<https://docs.soliditylang.org/en/v0.8.13/internals/layout_in_storage.html#mappings-and-dynamic-arrays>`_
"""
key_bytes = bytes.fromhex(key[2:]).rjust(32, b"\0")
mapping_slot_bytes = int.to_bytes(mapping_slot, 32, "big")
slot_bytes = Web3.keccak(key_bytes + mapping_slot_bytes)
return int.from_bytes(slot_bytes, "big")
@lru_cache
def get_contract_bytecode(path: str) -> bytes:
"""Load contract bytecode from a file given an absolute path"""
with open(path, "rb") as fh:
code = fh.read()
return code
def frac_to_decimal(frac: Fraction) -> Decimal:
return Decimal(frac.numerator) / Decimal(frac.denominator)
def load_abi(name_or_path: str) -> dict:
if os.path.exists(abspath := os.path.abspath(name_or_path)):
path = abspath
else:
path = f"{os.path.dirname(os.path.abspath(__file__))}/assets/{name_or_path}.abi"
try:
with open(os.path.abspath(path)) as f:
abi: dict = json.load(f)
except FileNotFoundError:
search_dir = f"{os.path.dirname(os.path.abspath(__file__))}/assets/"
# List all files in search dir and subdirs suggest them to the user in an error message
available_files = []
for dirpath, dirnames, filenames in os.walk(search_dir):
for filename in filenames:
# Make paths relative to search_dir
relative_path = os.path.relpath(
os.path.join(dirpath, filename), search_dir
)
available_files.append(relative_path.replace(".abi", ""))
raise FileNotFoundError(
f"File {name_or_path} not found. "
f"Did you mean one of these? {', '.join(available_files)}"
)
return abi
# https://docs.soliditylang.org/en/latest/control-structures.html#panic-via-assert-and-error-via-require
solidity_panic_codes = {
0: "GenericCompilerPanic",
1: "AssertionError",
17: "ArithmeticOver/Underflow",
18: "ZeroDivisionError",
33: "UnkownEnumMember",
34: "BadStorageByteArrayEncoding",
51: "EmptyArray",
0x32: "OutOfBounds",
0x41: "OutOfMemory",
0x51: "BadFunctionPointer",
}
def parse_solidity_error_message(data) -> str:
data_bytes = HexBytes(data)
error_string = f"Failed to decode: {data}"
# data is encoded as Error(string)
if data_bytes[:4] == HexBytes("0x08c379a0"):
(error_string,) = eth_abi.decode(["string"], data_bytes[4:])
return error_string
elif data_bytes[:4] == HexBytes("0x4e487b71"):
(error_code,) = eth_abi.decode(["uint256"], data_bytes[4:])
return solidity_panic_codes.get(error_code, f"Panic({error_code})")
# old solidity: revert 'some string' case
try:
(error_string,) = eth_abi.decode(["string"], data_bytes)
return error_string
except Exception:
pass
# some custom error maybe it is with string?
try:
(error_string,) = eth_abi.decode(["string"], data_bytes[4:])
return error_string
except Exception:
pass
try:
(error_string,) = eth_abi.decode(["string"], data_bytes[4:])
return error_string
except Exception:
pass
return error_string
def maybe_coerce_error(
err: RuntimeError, pool_state: Any, gas_limit: int = None
) -> Exception:
details = err.args[0]
# we got bytes as data, so this was a revert
if details.data.startswith("0x"):
err = RuntimeError(
f"Revert! Reason: {parse_solidity_error_message(details.data)}"
)
# we have gas information, check if this likely an out of gas err.
if gas_limit is not None and details.gas_used is not None:
# if we used up 97% or more issue a OutOfGas error.
usage = details.gas_used / gas_limit
if usage >= 0.97:
return OutOfGas(
f"SimulationError: Likely out-of-gas. "
f"Used: {usage * 100:.2f}% of gas limit. "
f"Original error: {err}",
repr(pool_state),
)
elif "OutOfGas" in details.data:
if gas_limit is not None:
usage = details.gas_used / gas_limit
usage_msg = f"Used: {usage * 100:.2f}% of gas limit. "
else:
usage_msg = ""
return OutOfGas(
f"SimulationError: out-of-gas. {usage_msg}Original error: {details.data}",
repr(pool_state),
)
return err
def exec_rpc_method(url, method, params, timeout=240) -> dict:
payload = {"jsonrpc": "2.0", "method": method, "params": params, "id": 1}
headers = {"Content-Type": "application/json"}
r = requests.post(url, data=json.dumps(payload), headers=headers, timeout=timeout)
if r.status_code >= 400:
raise RuntimeError(
"RPC failed: status_code not ok. (method {}: {})".format(
method, r.status_code
)
)
data = r.json()
if "result" in data:
return data["result"]
elif "error" in data:
raise RuntimeError(
"RPC failed with Error {} - {}".format(data["error"], method)
)
def get_code_for_address(address: str, connection_string: str = None):
if connection_string is None:
connection_string = os.getenv("RPC_URL")
if connection_string is None:
raise EnvironmentError("RPC_URL environment variable is not set")
method = "eth_getCode"
params = [address, "latest"]
try:
code = exec_rpc_method(connection_string, method, params)
return bytes.fromhex(code[2:])
except RuntimeError as e:
print(f"Error fetching code for address {address}: {e}")
return None