from decimal import Decimal from logging import getLogger from typing import Any from .exceptions import TychoDecodeError from .models import EVMBlock, EthereumToken from .pool_state import ThirdPartyPool from .utils import decode_tycho_exchange log = getLogger(__name__) class ThirdPartyPoolTychoDecoder: """ThirdPartyPool decoder for protocol messages from the Tycho feed""" def __init__(self, adapter_contract: str, minimum_gas: int, hard_limit: bool): self.adapter_contract = adapter_contract self.minimum_gas = minimum_gas self.hard_limit = hard_limit def decode_snapshot( self, snapshot: dict[str, Any], block: EVMBlock, tokens: dict[str, EthereumToken], ) -> tuple[dict[str, ThirdPartyPool], list[str]]: pools = {} failed_pools = [] for snap in snapshot.values(): try: pool = self.decode_pool_state(snap, block, tokens) pools[pool.id_] = pool except TychoDecodeError as e: log.error(f"Failed to decode third party snapshot: {e}") failed_pools.append(snap["component"]["id"]) continue return pools, failed_pools def decode_pool_state( self, snap: dict, block: EVMBlock, tokens: dict[str, EthereumToken] ) -> ThirdPartyPool: component = snap["component"] exchange, _ = decode_tycho_exchange(component["protocol_system"]) try: tokens = tuple(tokens[t] for t in component["tokens"]) except KeyError as e: raise TychoDecodeError("Unsupported token", pool_id=component["id"]) balances = self.decode_balances(snap, tokens) optional_attributes = self.decode_optional_attributes(component, snap) return ThirdPartyPool( id_=optional_attributes.pop("pool_id", component["id"]), tokens=tokens, balances=balances, block=block, spot_prices={}, trading_fee=Decimal("0"), exchange=exchange, adapter_contract_name=self.adapter_contract, minimum_gas=self.minimum_gas, hard_sell_limit=self.hard_limit, trace=False, **optional_attributes, ) @staticmethod def decode_optional_attributes(component, snap): # Handle optional state attributes attributes = snap["state"]["attributes"] balance_owner = attributes.get("balance_owner") balance_owner = bytes.fromhex(balance_owner[2:] if balance_owner.startswith('0x') else balance_owner).decode( 'utf-8').lower() stateless_contracts = {} static_attributes = snap["component"]["static_attributes"] pool_id = static_attributes.get("pool_id") or component["id"] pool_id = bytes.fromhex(pool_id[2:]).decode().lower() index = 0 while f"stateless_contract_addr_{index}" in static_attributes: encoded_address = static_attributes[f"stateless_contract_addr_{index}"] address = bytes.fromhex( encoded_address[2:] if encoded_address.startswith('0x') else encoded_address).decode('utf-8') code = static_attributes.get(f"stateless_contract_code_{index}") or get_code_for_address(address) stateless_contracts[address] = code index += 1 index = 0 while f"stateless_contract_addr_{index}" in attributes: address = attributes[f"stateless_contract_addr_{index}"] code = attributes.get(f"stateless_contract_code_{index}") or get_code_for_address(address) stateless_contracts[address] = code index += 1 return { "balance_owner": balance_owner, "pool_id": pool_id, "stateless_contracts": stateless_contracts, } @staticmethod def decode_balances(snap, tokens): balances = {} for addr, balance in snap["state"]["balances"].items(): checksum_addr = addr token = next(t for t in tokens if t.address == checksum_addr) balances[token.address] = token.from_onchain_amount( int(balance, 16) # balances are big endian encoded ) return balances @staticmethod def apply_update( pool: ThirdPartyPool, pool_update: dict[str, Any], balance_updates: dict[str, Any], block: EVMBlock, ) -> ThirdPartyPool: # check for and apply optional state attributes attributes = pool_update.get("updated_attributes") if attributes: # TODO: handle balance_owner and stateless_contracts updates pass for addr, balance_msg in balance_updates.items(): token = [t for t in pool.tokens if t.address == addr][0] balance = int(balance_msg["balance"], 16) # balances are big endian encoded pool.balances[token.address] = token.from_onchain_amount(balance) pool.block = block # we clear simulation cache and overwrites on the pool and trigger a recalculation of spot prices pool.clear_all_cache() return pool