Add a way to pull stateless contracts code from node, add more settings to test_assets.yaml, add logic to allow dynamic stateless contract by calling another contract
165 lines
6.2 KiB
Python
165 lines
6.2 KiB
Python
import time
|
|
from decimal import Decimal
|
|
from logging import getLogger
|
|
from typing import Any
|
|
|
|
import eth_abi
|
|
from eth_utils import keccak
|
|
from protosim_py import SimulationEngine, SimulationParameters, AccountInfo
|
|
|
|
from .constants import EXTERNAL_ACCOUNT
|
|
from .exceptions import TychoDecodeError
|
|
from .models import EVMBlock, EthereumToken
|
|
from .pool_state import ThirdPartyPool
|
|
from .tycho_db import TychoDBSingleton
|
|
from .utils import decode_tycho_exchange, get_code_for_address
|
|
|
|
log = getLogger(__name__)
|
|
|
|
|
|
class ThirdPartyPoolTychoDecoder:
|
|
"""ThirdPartyPool decoder for protocol messages from the Tycho feed"""
|
|
|
|
def __init__(self, adapter_contract: str, minimum_gas: int, hard_limit: bool):
|
|
self.adapter_contract = adapter_contract
|
|
self.minimum_gas = minimum_gas
|
|
self.hard_limit = hard_limit
|
|
|
|
def decode_snapshot(
|
|
self,
|
|
snapshot: dict[str, Any],
|
|
block: EVMBlock,
|
|
tokens: dict[str, EthereumToken],
|
|
) -> tuple[dict[str, ThirdPartyPool], list[str]]:
|
|
pools = {}
|
|
failed_pools = []
|
|
for snap in snapshot.values():
|
|
try:
|
|
pool = self.decode_pool_state(snap, block, tokens)
|
|
pools[pool.id_] = pool
|
|
except TychoDecodeError as e:
|
|
log.error(f"Failed to decode third party snapshot: {e}")
|
|
failed_pools.append(snap["component"]["id"])
|
|
continue
|
|
|
|
return pools, failed_pools
|
|
|
|
def decode_pool_state(
|
|
self, snap: dict, block: EVMBlock, tokens: dict[str, EthereumToken]
|
|
) -> ThirdPartyPool:
|
|
component = snap["component"]
|
|
exchange, _ = decode_tycho_exchange(component["protocol_system"])
|
|
|
|
try:
|
|
tokens = tuple(tokens[t] for t in component["tokens"])
|
|
except KeyError as e:
|
|
raise TychoDecodeError("Unsupported token", pool_id=component["id"])
|
|
|
|
balances = self.decode_balances(snap, tokens)
|
|
optional_attributes = self.decode_optional_attributes(component, snap, block.id)
|
|
|
|
return ThirdPartyPool(
|
|
id_=optional_attributes.pop("pool_id", component["id"]),
|
|
tokens=tokens,
|
|
balances=balances,
|
|
block=block,
|
|
spot_prices={},
|
|
trading_fee=Decimal("0"),
|
|
exchange=exchange,
|
|
adapter_contract_name=self.adapter_contract,
|
|
minimum_gas=self.minimum_gas,
|
|
hard_sell_limit=self.hard_limit,
|
|
trace=True,
|
|
**optional_attributes,
|
|
)
|
|
|
|
@staticmethod
|
|
def decode_optional_attributes(component, snap, block_number):
|
|
# Handle optional state attributes
|
|
attributes = snap["state"]["attributes"]
|
|
pool_id = attributes.get("pool_id") or component["id"]
|
|
balance_owner = attributes.get("balance_owner")
|
|
stateless_contracts = {}
|
|
static_attributes = snap["component"]["static_attributes"]
|
|
|
|
index = 0
|
|
while f"stateless_contract_addr_{index}" in static_attributes:
|
|
encoded_address = static_attributes[f"stateless_contract_addr_{index}"]
|
|
decoded = bytes.fromhex(encoded_address[2:] if encoded_address.startswith('0x') else encoded_address).decode('utf-8')
|
|
if decoded.startswith("call"):
|
|
address = ThirdPartyPoolTychoDecoder.get_address_from_call(block_number, decoded)
|
|
else:
|
|
address = decoded
|
|
|
|
code = static_attributes.get(f"stateless_contract_code_{index}") or get_code_for_address(address)
|
|
stateless_contracts[address] = code
|
|
index += 1
|
|
|
|
index = 0
|
|
while f"stateless_contract_addr_{index}" in attributes:
|
|
address = attributes[f"stateless_contract_addr_{index}"]
|
|
code = attributes.get(f"stateless_contract_code_{index}") or get_code_for_address(address)
|
|
stateless_contracts[address] = code
|
|
index += 1
|
|
return {
|
|
"balance_owner": balance_owner,
|
|
"pool_id": pool_id,
|
|
"stateless_contracts": stateless_contracts,
|
|
}
|
|
|
|
@staticmethod
|
|
def get_address_from_call(block_number, decoded):
|
|
db = TychoDBSingleton.get_instance()
|
|
engine = SimulationEngine.new_with_tycho_db(db=db)
|
|
engine.init_account(
|
|
address="0x0000000000000000000000000000000000000000",
|
|
account=AccountInfo(balance=0, nonce=0),
|
|
mocked=False,
|
|
permanent_storage=None,
|
|
)
|
|
selector = keccak(text=decoded.split(":")[-1])[:4]
|
|
sim_result = engine.run_sim(SimulationParameters(
|
|
data=bytearray(selector),
|
|
to=decoded.split(':')[1],
|
|
block_number=block_number,
|
|
timestamp=int(time.time()),
|
|
overrides={},
|
|
caller=EXTERNAL_ACCOUNT,
|
|
value=0,
|
|
))
|
|
address = eth_abi.decode(["address"], bytearray(sim_result.result))
|
|
return address[0]
|
|
|
|
@staticmethod
|
|
def decode_balances(snap, tokens):
|
|
balances = {}
|
|
for addr, balance in snap["state"]["balances"].items():
|
|
checksum_addr = addr
|
|
token = next(t for t in tokens if t.address == checksum_addr)
|
|
balances[token.address] = token.from_onchain_amount(
|
|
int(balance, 16) # balances are big endian encoded
|
|
)
|
|
return balances
|
|
|
|
@staticmethod
|
|
def apply_update(
|
|
pool: ThirdPartyPool,
|
|
pool_update: dict[str, Any],
|
|
balance_updates: dict[str, Any],
|
|
block: EVMBlock,
|
|
) -> ThirdPartyPool:
|
|
# check for and apply optional state attributes
|
|
attributes = pool_update.get("updated_attributes")
|
|
if attributes:
|
|
# TODO: handle balance_owner and stateless_contracts updates
|
|
pass
|
|
|
|
for addr, balance_msg in balance_updates.items():
|
|
token = [t for t in pool.tokens if t.address == addr][0]
|
|
balance = int(balance_msg["balance"], 16) # balances are big endian encoded
|
|
pool.balances[token.address] = token.from_onchain_amount(balance)
|
|
pool.block = block
|
|
# we clear simulation cache and overwrites on the pool and trigger a recalculation of spot prices
|
|
pool.clear_all_cache()
|
|
return pool
|