346 lines
12 KiB
Python
346 lines
12 KiB
Python
import asyncio
|
|
import json
|
|
import platform
|
|
import time
|
|
from asyncio.subprocess import STDOUT, PIPE
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from decimal import Decimal
|
|
from http.client import HTTPException
|
|
from logging import getLogger
|
|
from typing import Any, Optional, Dict
|
|
|
|
import requests
|
|
from protosim_py import AccountUpdate, AccountInfo, BlockHeader
|
|
|
|
from .constants import TYCHO_CLIENT_LOG_FOLDER, TYCHO_CLIENT_FOLDER
|
|
from .decoders import ThirdPartyPoolTychoDecoder
|
|
from .exceptions import APIRequestError, TychoClientException
|
|
from .models import Blockchain, EVMBlock, EthereumToken, SynchronizerState, Address
|
|
from .pool_state import ThirdPartyPool
|
|
from .tycho_db import TychoDBSingleton
|
|
from .utils import create_engine
|
|
|
|
log = getLogger(__name__)
|
|
|
|
|
|
class TokenLoader:
|
|
def __init__(
|
|
self,
|
|
tycho_url: str,
|
|
blockchain: Blockchain,
|
|
min_token_quality: Optional[int] = 0,
|
|
):
|
|
self.tycho_url = tycho_url
|
|
self.blockchain = blockchain
|
|
self.min_token_quality = min_token_quality
|
|
self.endpoint = "/v1/{}/tokens"
|
|
self._token_limit = 10000
|
|
|
|
def get_tokens(self) -> dict[str, EthereumToken]:
|
|
"""Loads all tokens from Tycho RPC"""
|
|
url = self.tycho_url + self.endpoint.format(self.blockchain.value)
|
|
page = 0
|
|
|
|
start = time.monotonic()
|
|
all_tokens = []
|
|
while data := self._get_all_with_pagination(
|
|
url=url,
|
|
page=page,
|
|
limit=self._token_limit,
|
|
params={"min_quality": self.min_token_quality},
|
|
):
|
|
all_tokens.extend(data)
|
|
page += 1
|
|
if len(data) < self._token_limit:
|
|
break
|
|
|
|
log.info(f"Loaded {len(all_tokens)} tokens in {time.monotonic() - start:.2f}s")
|
|
|
|
formatted_tokens = dict()
|
|
|
|
for token in all_tokens:
|
|
formatted = EthereumToken(**token)
|
|
formatted_tokens[formatted.address] = formatted
|
|
|
|
return formatted_tokens
|
|
|
|
def get_token_subset(self, addresses: list[str]) -> dict[str, EthereumToken]:
|
|
"""Loads a subset of tokens from Tycho RPC"""
|
|
url = self.tycho_url + self.endpoint.format(self.blockchain.value)
|
|
page = 0
|
|
|
|
start = time.monotonic()
|
|
all_tokens = []
|
|
while data := self._get_all_with_pagination(
|
|
url=url,
|
|
page=page,
|
|
limit=self._token_limit,
|
|
params={"min_quality": self.min_token_quality, "addresses": addresses},
|
|
):
|
|
all_tokens.extend(data)
|
|
page += 1
|
|
if len(data) < self._token_limit:
|
|
break
|
|
|
|
log.info(f"Loaded {len(all_tokens)} tokens in {time.monotonic() - start:.2f}s")
|
|
|
|
formatted_tokens = dict()
|
|
|
|
for token in all_tokens:
|
|
formatted = EthereumToken(**token)
|
|
formatted_tokens[formatted.address] = formatted
|
|
|
|
return formatted_tokens
|
|
|
|
@staticmethod
|
|
def _get_all_with_pagination(
|
|
url: str, params: Optional[Dict] = None, page: int = 0, limit: int = 50
|
|
) -> Dict:
|
|
if params is None:
|
|
params = {}
|
|
|
|
params["pagination"] = {"page": page, "page_size": limit}
|
|
r = requests.post(url, json=params)
|
|
try:
|
|
r.raise_for_status()
|
|
except HTTPException as e:
|
|
log.error(f"Request status {r.status_code} with content {r.json()}")
|
|
raise APIRequestError("Failed to load token configurations")
|
|
return r.json()["tokens"]
|
|
|
|
|
|
@dataclass(repr=False)
|
|
class BlockProtocolChanges:
|
|
block: EVMBlock
|
|
pool_states: dict[Address, ThirdPartyPool]
|
|
"""All updated pools"""
|
|
removed_pools: set[Address]
|
|
deserialization_time: float
|
|
"""The time it took to deserialize the pool states from the tycho feed message"""
|
|
|
|
|
|
class TychoPoolStateStreamAdapter:
|
|
def __init__(
|
|
self,
|
|
tycho_url: str,
|
|
protocol: str,
|
|
decoder: ThirdPartyPoolTychoDecoder,
|
|
blockchain: Blockchain,
|
|
min_tvl: Optional[Decimal] = 10,
|
|
min_token_quality: Optional[int] = 0,
|
|
include_state=True,
|
|
):
|
|
"""
|
|
:param tycho_url: URL to connect to Tycho DB
|
|
:param protocol: Name of the protocol that you're testing
|
|
:param blockchain: Blockchain enum
|
|
:param min_tvl: Minimum TVL to consider a pool
|
|
:param min_token_quality: Minimum token quality to consider a token
|
|
:param include_state: Include state in the stream
|
|
"""
|
|
self.min_token_quality = min_token_quality
|
|
self.tycho_url = tycho_url
|
|
self.min_tvl = min_tvl
|
|
self.tycho_client = None
|
|
self.protocol = f"vm:{protocol}"
|
|
self._include_state = include_state
|
|
self._blockchain = blockchain
|
|
self._decoder = decoder
|
|
|
|
# Create engine
|
|
# TODO: This should be initialized outside the adapter?
|
|
TychoDBSingleton.initialize(tycho_http_url=self.tycho_url)
|
|
self._engine = create_engine([], trace=False)
|
|
|
|
# Loads tokens from Tycho
|
|
self._tokens: dict[str, EthereumToken] = TokenLoader(
|
|
tycho_url=f"http://{self.tycho_url}",
|
|
blockchain=self._blockchain,
|
|
min_token_quality=self.min_token_quality,
|
|
).get_tokens()
|
|
|
|
async def start(self):
|
|
"""Start the tycho-client Rust binary through subprocess"""
|
|
# stdout=PIPE means that the output is piped directly to this Python process
|
|
# stderr=STDOUT combines the stderr and stdout streams
|
|
bin_path = self._get_binary_path()
|
|
|
|
cmd = [
|
|
"--log-folder",
|
|
str(TYCHO_CLIENT_LOG_FOLDER),
|
|
"--tycho-url",
|
|
self.tycho_url,
|
|
"--min-tvl",
|
|
str(self.min_tvl),
|
|
]
|
|
if not self._include_state:
|
|
cmd.append("--no-state")
|
|
cmd.append("--exchange")
|
|
cmd.append(self.protocol)
|
|
|
|
log.debug(f"Starting tycho-client binary at {bin_path}. CMD: {cmd}")
|
|
self.tycho_client = await asyncio.create_subprocess_exec(
|
|
str(bin_path), *cmd, stdout=PIPE, stderr=STDOUT, limit=2**64
|
|
)
|
|
|
|
@staticmethod
|
|
def _get_binary_path():
|
|
"""Determines the correct binary path based on the OS and architecture."""
|
|
os_name = platform.system()
|
|
if os_name == "Linux":
|
|
architecture = platform.machine()
|
|
if architecture == "aarch64":
|
|
return TYCHO_CLIENT_FOLDER / "tycho-client-linux-arm64"
|
|
else:
|
|
return TYCHO_CLIENT_FOLDER / "tycho-client-linux-x64"
|
|
elif os_name == "Darwin":
|
|
architecture = platform.machine()
|
|
if architecture == "arm64":
|
|
return TYCHO_CLIENT_FOLDER / "tycho-client-mac-arm64"
|
|
else:
|
|
return TYCHO_CLIENT_FOLDER / "tycho-client-mac-x64"
|
|
else:
|
|
raise ValueError(f"Unsupported OS: {os_name}")
|
|
|
|
def __aiter__(self):
|
|
return self
|
|
|
|
async def __anext__(self) -> BlockProtocolChanges:
|
|
if self.tycho_client.stdout.at_eof():
|
|
raise StopAsyncIteration
|
|
line = await self.tycho_client.stdout.readline()
|
|
|
|
try:
|
|
if not line:
|
|
exit_code = await self.tycho_client.wait()
|
|
if exit_code == 0:
|
|
# Clean exit, handle accordingly, possibly without raising an error
|
|
log.debug("Tycho client exited cleanly.")
|
|
raise StopAsyncIteration
|
|
else:
|
|
line = f"Tycho client failed with exit code: {exit_code}"
|
|
# Non-zero exit code, handle accordingly, possibly by raising an error
|
|
raise TychoClientException(line)
|
|
|
|
msg = json.loads(line.decode("utf-8"))
|
|
except (json.JSONDecodeError, TychoClientException):
|
|
# Read the last 10 lines from the log file available under TYCHO_CLIENT_LOG_FOLDER
|
|
# and raise an exception with the last 10 lines
|
|
error_msg = f"Invalid JSON output on tycho. Original line: {line}."
|
|
with open(TYCHO_CLIENT_LOG_FOLDER / "dev_logs.log", "r") as f:
|
|
lines = f.readlines()
|
|
last_lines = lines[-10:]
|
|
error_msg += f" Tycho logs: {last_lines}"
|
|
log.exception(error_msg)
|
|
raise Exception("Tycho-client failed.")
|
|
return self.process_tycho_message(msg)
|
|
|
|
@staticmethod
|
|
def build_snapshot_message(
|
|
protocol_components: dict, protocol_states: dict, contract_states: dict
|
|
) -> dict[str, ThirdPartyPool]:
|
|
vm_states = {state["address"]: state for state in contract_states["accounts"]}
|
|
states = {}
|
|
for component in protocol_components["protocol_components"]:
|
|
pool_id = component["id"]
|
|
states[pool_id] = {"component": component}
|
|
for state in protocol_states["states"]:
|
|
pool_id = state["component_id"]
|
|
if pool_id not in states:
|
|
log.debug(f"{pool_id} was present in snapshot but not in components")
|
|
continue
|
|
states[pool_id]["state"] = state
|
|
snapshot = {"vm_storage": vm_states, "states": states}
|
|
|
|
return snapshot
|
|
|
|
def process_tycho_message(self, msg) -> BlockProtocolChanges:
|
|
self._validate_sync_states(msg)
|
|
|
|
state_msg = msg["state_msgs"][self.protocol]
|
|
|
|
block = EVMBlock(
|
|
id=msg["block"]["id"],
|
|
ts=datetime.fromtimestamp(msg["block"]["timestamp"]),
|
|
hash_=msg["block"]["hash"],
|
|
)
|
|
|
|
return self.process_snapshot(block, state_msg["snapshot"])
|
|
|
|
def process_snapshot(
|
|
self, block: EVMBlock, state_msg: dict
|
|
) -> BlockProtocolChanges:
|
|
start = time.monotonic()
|
|
removed_pools = set()
|
|
decoded_count = 0
|
|
failed_count = 0
|
|
|
|
self._process_vm_storage(state_msg["vm_storage"], block)
|
|
|
|
# decode new components
|
|
decoded_pools, failed_pools = self._decoder.decode_snapshot(
|
|
state_msg["states"], block, self._tokens
|
|
)
|
|
|
|
decoded_count += len(decoded_pools)
|
|
failed_count += len(failed_pools)
|
|
|
|
decoded_pools = {
|
|
p.id_: p for p in decoded_pools.values()
|
|
} # remap pools to their pool ids
|
|
deserialization_time = time.monotonic() - start
|
|
total = decoded_count + failed_count
|
|
log.debug(
|
|
f"Received {total} snapshots. n_decoded: {decoded_count}, n_failed: {failed_count}"
|
|
)
|
|
if failed_count > 0:
|
|
log.info(f"Could not to decode {failed_count}/{total} pool snapshots")
|
|
|
|
return BlockProtocolChanges(
|
|
block=block,
|
|
pool_states=decoded_pools,
|
|
removed_pools=removed_pools,
|
|
deserialization_time=round(deserialization_time, 3),
|
|
)
|
|
|
|
def _validate_sync_states(self, msg):
|
|
try:
|
|
sync_state = msg["sync_states"][self.protocol]
|
|
log.info(f"Received sync state for {self.protocol}: {sync_state}")
|
|
if not sync_state["status"] != SynchronizerState.ready.value:
|
|
raise ValueError("Tycho-indexer is not synced")
|
|
except KeyError:
|
|
raise ValueError("Invalid message received from tycho-client.")
|
|
|
|
def _process_vm_storage(self, storage: dict[str, Any], block: EVMBlock):
|
|
vm_updates = []
|
|
for storage_update in storage.values():
|
|
address = storage_update["address"]
|
|
balance = int(storage_update["native_balance"], 16)
|
|
code = bytearray.fromhex(storage_update["code"][2:])
|
|
|
|
# init accounts
|
|
self._engine.init_account(
|
|
address=address,
|
|
account=AccountInfo(balance=balance, nonce=0, code=code),
|
|
mocked=False,
|
|
permanent_storage=None,
|
|
)
|
|
|
|
# apply account updates
|
|
slots = {int(k, 16): int(v, 16) for k, v in storage_update["slots"].items()}
|
|
vm_updates.append(
|
|
AccountUpdate(
|
|
address=address,
|
|
chain=storage_update["chain"],
|
|
slots=slots,
|
|
balance=balance,
|
|
code=code,
|
|
change="Update",
|
|
)
|
|
)
|
|
|
|
block_header = BlockHeader(block.id, block.hash_, int(block.ts.timestamp()))
|
|
TychoDBSingleton.get_instance().update(vm_updates, block_header)
|