Files
tycho-protocol-sdk/substreams/ethereum-curve/params.py
2024-05-07 10:16:22 -05:00

55 lines
1.6 KiB
Python

"""
This script could be changed to used `jq` and bash. Usage:
```bash
python params.py | substreams run -e mainnet.eth.streamingfast.io:443 substreams.yaml map_protocol_changes --start-block 11942410 --stop-block +100 -p map_components=
```
"""
import json
from typing import Any
PARAMETERS = "params.json"
def encode_json_to_query_params(params: list[dict[str, Any]]):
encoded_params = []
try:
for i, param in enumerate(params):
address: str = param["address"]
tx_hash: str = param["tx_hash"]
tokens: list[str] = param["tokens"]
attributes: dict[str, str] = param["attributes"]
encoded_address = f"address={address}"
encoded_tx_hash = f"tx_hash={tx_hash}"
encoded_tokens = "&".join([f"tokens[]={token}" for token in tokens])
encoded_attributes = "&".join(
[
f"attribute_keys[]={key}&attribute_vals[]={value}"
for key, value in attributes.items()
]
)
encoded_param = f"{encoded_address}&{encoded_tx_hash}&{encoded_tokens}&{encoded_attributes}"
encoded_param = encoded_param.rstrip("&")
encoded_params.append(encoded_param)
except KeyError as err:
raise KeyError(
f"Missing key in {PARAMETERS}.\n"
f"Index `{i}` object missing parameters.\n\n" + err.args[0]
)
return ",".join(encoded_params)
def main():
with open(PARAMETERS, "r") as f:
params = json.load(f)
print(encode_json_to_query_params(params))
if __name__ == "__main__":
main()