Files
tycho-protocol-sdk/testing/tycho-client/tycho_client/tycho_adapter.py
2024-08-01 12:03:01 +02:00

346 lines
12 KiB
Python

import asyncio
import json
import platform
import time
from asyncio.subprocess import STDOUT, PIPE
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal
from http.client import HTTPException
from logging import getLogger
from typing import Any, Optional, Dict
import requests
from protosim_py import AccountUpdate, AccountInfo, BlockHeader
from .constants import TYCHO_CLIENT_LOG_FOLDER, TYCHO_CLIENT_FOLDER
from .decoders import ThirdPartyPoolTychoDecoder
from .exceptions import APIRequestError, TychoClientException
from .models import Blockchain, EVMBlock, EthereumToken, SynchronizerState, Address
from .pool_state import ThirdPartyPool
from .tycho_db import TychoDBSingleton
from .utils import create_engine
log = getLogger(__name__)
class TokenLoader:
def __init__(
self,
tycho_url: str,
blockchain: Blockchain,
min_token_quality: Optional[int] = 0,
):
self.tycho_url = tycho_url
self.blockchain = blockchain
self.min_token_quality = min_token_quality
self.endpoint = "/v1/{}/tokens"
self._token_limit = 10000
def get_tokens(self) -> dict[str, EthereumToken]:
"""Loads all tokens from Tycho RPC"""
url = self.tycho_url + self.endpoint.format(self.blockchain.value)
page = 0
start = time.monotonic()
all_tokens = []
while data := self._get_all_with_pagination(
url=url,
page=page,
limit=self._token_limit,
params={"min_quality": self.min_token_quality},
):
all_tokens.extend(data)
page += 1
if len(data) < self._token_limit:
break
log.info(f"Loaded {len(all_tokens)} tokens in {time.monotonic() - start:.2f}s")
formatted_tokens = dict()
for token in all_tokens:
formatted = EthereumToken(**token)
formatted_tokens[formatted.address] = formatted
return formatted_tokens
def get_token_subset(self, addresses: list[str]) -> dict[str, EthereumToken]:
"""Loads a subset of tokens from Tycho RPC"""
url = self.tycho_url + self.endpoint.format(self.blockchain.value)
page = 0
start = time.monotonic()
all_tokens = []
while data := self._get_all_with_pagination(
url=url,
page=page,
limit=self._token_limit,
params={"min_quality": self.min_token_quality, "addresses": addresses},
):
all_tokens.extend(data)
page += 1
if len(data) < self._token_limit:
break
log.info(f"Loaded {len(all_tokens)} tokens in {time.monotonic() - start:.2f}s")
formatted_tokens = dict()
for token in all_tokens:
formatted = EthereumToken(**token)
formatted_tokens[formatted.address] = formatted
return formatted_tokens
@staticmethod
def _get_all_with_pagination(
url: str, params: Optional[Dict] = None, page: int = 0, limit: int = 50
) -> Dict:
if params is None:
params = {}
params["pagination"] = {"page": page, "page_size": limit}
r = requests.post(url, json=params)
try:
r.raise_for_status()
except HTTPException as e:
log.error(f"Request status {r.status_code} with content {r.json()}")
raise APIRequestError("Failed to load token configurations")
return r.json()["tokens"]
@dataclass(repr=False)
class BlockProtocolChanges:
block: EVMBlock
pool_states: dict[Address, ThirdPartyPool]
"""All updated pools"""
removed_pools: set[Address]
deserialization_time: float
"""The time it took to deserialize the pool states from the tycho feed message"""
class TychoPoolStateStreamAdapter:
def __init__(
self,
tycho_url: str,
protocol: str,
decoder: ThirdPartyPoolTychoDecoder,
blockchain: Blockchain,
min_tvl: Optional[Decimal] = 10,
min_token_quality: Optional[int] = 0,
include_state=True,
):
"""
:param tycho_url: URL to connect to Tycho DB
:param protocol: Name of the protocol that you're testing
:param blockchain: Blockchain enum
:param min_tvl: Minimum TVL to consider a pool
:param min_token_quality: Minimum token quality to consider a token
:param include_state: Include state in the stream
"""
self.min_token_quality = min_token_quality
self.tycho_url = tycho_url
self.min_tvl = min_tvl
self.tycho_client = None
self.protocol = f"vm:{protocol}"
self._include_state = include_state
self._blockchain = blockchain
self._decoder = decoder
# Create engine
# TODO: This should be initialized outside the adapter?
TychoDBSingleton.initialize(tycho_http_url=self.tycho_url)
self._engine = create_engine([], trace=False)
# Loads tokens from Tycho
self._tokens: dict[str, EthereumToken] = TokenLoader(
tycho_url=f"http://{self.tycho_url}",
blockchain=self._blockchain,
min_token_quality=self.min_token_quality,
).get_tokens()
async def start(self):
"""Start the tycho-client Rust binary through subprocess"""
# stdout=PIPE means that the output is piped directly to this Python process
# stderr=STDOUT combines the stderr and stdout streams
bin_path = self._get_binary_path()
cmd = [
"--log-folder",
str(TYCHO_CLIENT_LOG_FOLDER),
"--tycho-url",
self.tycho_url,
"--min-tvl",
str(self.min_tvl),
]
if not self._include_state:
cmd.append("--no-state")
cmd.append("--exchange")
cmd.append(self.protocol)
log.debug(f"Starting tycho-client binary at {bin_path}. CMD: {cmd}")
self.tycho_client = await asyncio.create_subprocess_exec(
str(bin_path), *cmd, stdout=PIPE, stderr=STDOUT, limit=2 ** 64
)
@staticmethod
def _get_binary_path():
"""Determines the correct binary path based on the OS and architecture."""
os_name = platform.system()
if os_name == "Linux":
architecture = platform.machine()
if architecture == "aarch64":
return TYCHO_CLIENT_FOLDER / "tycho-client-linux-arm64"
else:
return TYCHO_CLIENT_FOLDER / "tycho-client-linux-x64"
elif os_name == "Darwin":
architecture = platform.machine()
if architecture == "arm64":
return TYCHO_CLIENT_FOLDER / "tycho-client-mac-arm64"
else:
return TYCHO_CLIENT_FOLDER / "tycho-client-mac-x64"
else:
raise ValueError(f"Unsupported OS: {os_name}")
def __aiter__(self):
return self
async def __anext__(self) -> BlockProtocolChanges:
if self.tycho_client.stdout.at_eof():
raise StopAsyncIteration
line = await self.tycho_client.stdout.readline()
try:
if not line:
exit_code = await self.tycho_client.wait()
if exit_code == 0:
# Clean exit, handle accordingly, possibly without raising an error
log.debug("Tycho client exited cleanly.")
raise StopAsyncIteration
else:
line = f"Tycho client failed with exit code: {exit_code}"
# Non-zero exit code, handle accordingly, possibly by raising an error
raise TychoClientException(line)
msg = json.loads(line.decode("utf-8"))
except (json.JSONDecodeError, TychoClientException):
# Read the last 10 lines from the log file available under TYCHO_CLIENT_LOG_FOLDER
# and raise an exception with the last 10 lines
error_msg = f"Invalid JSON output on tycho. Original line: {line}."
with open(TYCHO_CLIENT_LOG_FOLDER / "dev_logs.log", "r") as f:
lines = f.readlines()
last_lines = lines[-10:]
error_msg += f" Tycho logs: {last_lines}"
log.exception(error_msg)
raise Exception("Tycho-client failed.")
return self.process_tycho_message(msg)
@staticmethod
def build_snapshot_message(
protocol_components: dict, protocol_states: dict, contract_states: dict
) -> dict[str, ThirdPartyPool]:
vm_states = {state["address"]: state for state in contract_states["accounts"]}
states = {}
for component in protocol_components["protocol_components"]:
pool_id = component["id"]
states[pool_id] = {"component": component}
for state in protocol_states["states"]:
pool_id = state["component_id"]
if pool_id not in states:
log.debug(f"{pool_id} was present in snapshot but not in components")
continue
states[pool_id]["state"] = state
snapshot = {"vm_storage": vm_states, "states": states}
return snapshot
def process_tycho_message(self, msg) -> BlockProtocolChanges:
self._validate_sync_states(msg)
state_msg = msg["state_msgs"][self.protocol]
block = EVMBlock(
id=msg["block"]["id"],
ts=datetime.fromtimestamp(msg["block"]["timestamp"]),
hash_=msg["block"]["hash"],
)
return self.process_snapshot(block, state_msg["snapshot"])
def process_snapshot(
self, block: EVMBlock, state_msg: dict
) -> BlockProtocolChanges:
start = time.monotonic()
removed_pools = set()
decoded_count = 0
failed_count = 0
self._process_vm_storage(state_msg["vm_storage"], block)
# decode new components
decoded_pools, failed_pools = self._decoder.decode_snapshot(
state_msg["states"], block, self._tokens
)
decoded_count += len(decoded_pools)
failed_count += len(failed_pools)
decoded_pools = {
p.id_: p for p in decoded_pools.values()
} # remap pools to their pool ids
deserialization_time = time.monotonic() - start
total = decoded_count + failed_count
log.debug(
f"Received {total} snapshots. n_decoded: {decoded_count}, n_failed: {failed_count}"
)
if failed_count > 0:
log.info(f"Could not to decode {failed_count}/{total} pool snapshots")
return BlockProtocolChanges(
block=block,
pool_states=decoded_pools,
removed_pools=removed_pools,
deserialization_time=round(deserialization_time, 3),
)
def _validate_sync_states(self, msg):
try:
sync_state = msg["sync_states"][self.protocol]
log.info(f"Received sync state for {self.protocol}: {sync_state}")
if not sync_state["status"] != SynchronizerState.ready.value:
raise ValueError("Tycho-indexer is not synced")
except KeyError:
raise ValueError("Invalid message received from tycho-client.")
def _process_vm_storage(self, storage: dict[str, Any], block: EVMBlock):
vm_updates = []
for storage_update in storage.values():
address = storage_update["address"]
balance = int(storage_update["native_balance"], 16)
code = bytearray.fromhex(storage_update["code"][2:])
# init accounts
self._engine.init_account(
address=address,
account=AccountInfo(balance=balance, nonce=0, code=code),
mocked=False,
permanent_storage=None,
)
# apply account updates
slots = {int(k, 16): int(v, 16) for k, v in storage_update["slots"].items()}
vm_updates.append(
AccountUpdate(
address=address,
chain=storage_update["chain"],
slots=slots,
balance=balance,
code=code,
change="Update",
)
)
block_header = BlockHeader(block.id, block.hash_, int(block.ts.timestamp()))
TychoDBSingleton.get_instance().update(vm_updates, block_header)