Optimize imports and return BlockProtocolChanges msg

This commit is contained in:
Thales Lima
2024-07-15 15:05:03 +02:00
committed by tvinagre
parent f6fba2805a
commit c075fdb668
7 changed files with 105 additions and 96 deletions

View File

@@ -2,10 +2,8 @@ from decimal import Decimal
from logging import getLogger
from typing import Any
from protosim_py import SimulationEngine
from tycho.tycho.exceptions import TychoDecodeError
from tycho.tycho.models import EVMBlock, EthereumToken, DatabaseType
from tycho.tycho.models import EVMBlock, EthereumToken
from tycho.tycho.pool_state import ThirdPartyPool
from tycho.tycho.utils import decode_tycho_exchange
@@ -87,7 +85,8 @@ class ThirdPartyPoolTychoDecoder:
"stateless_contracts": stateless_contracts,
}
def decode_balances(self, snap, tokens):
@staticmethod
def decode_balances(snap, tokens):
balances = {}
for addr, balance in snap["state"]["balances"].items():
checksum_addr = addr
@@ -97,9 +96,9 @@ class ThirdPartyPoolTychoDecoder:
)
return balances
@staticmethod
def apply_update(
self,
pool: ThirdPartyPool,
pool: ThirdPartyPool,
pool_update: dict[str, Any],
balance_updates: dict[str, Any],
block: EVMBlock,

View File

@@ -53,3 +53,7 @@ class OutOfGas(RecoverableSimulationException):
"""
pass
class TychoClientException(Exception):
pass

View File

@@ -1,9 +1,12 @@
import datetime
from dataclasses import dataclass
from enum import Enum, IntEnum, auto
from typing import Union
from pydantic import BaseModel, Field
from tycho.tycho.pool_state import ThirdPartyPool
Address = str
@@ -43,3 +46,23 @@ class Capability(IntEnum):
ConstantPrice = auto()
TokenBalanceIndependent = auto()
ScaledPrice = auto()
class SynchronizerState(Enum):
started = "started"
ready = "ready"
stale = "stale"
delayed = "delayed"
advanced = "advanced"
ended = "ended"
@dataclass(repr=False)
class BlockProtocolChanges:
block: EVMBlock
pool_states: dict[Address, ThirdPartyPool]
"""All updated pools"""
removed_pools: set[Address]
sync_states: dict[str, SynchronizerState]
deserialization_time: float
"""The time it took to deserialize the pool states from the tycho feed message"""

View File

@@ -7,26 +7,20 @@ from fractions import Fraction
from logging import getLogger
from typing import Optional, cast, TypeVar
from eth_typing import HexStr
from protosim_py import SimulationEngine, AccountInfo
from pydantic import BaseModel, PrivateAttr, Field
from tycho.tycho.adapter_contract import AdapterContract
from tycho.tycho.constants import MAX_BALANCE, EXTERNAL_ACCOUNT
from tycho.tycho.exceptions import RecoverableSimulationException
from tycho.tycho.models import (
EVMBlock,
DatabaseType,
Capability,
Address,
EthereumToken,
)
from tycho.tycho.models import EVMBlock, Capability, Address, EthereumToken
from tycho.tycho.utils import (
create_engine,
get_contract_bytecode,
frac_to_decimal,
ERC20OverwriteFactory,
)
from eth_typing import HexStr
ADAPTER_ADDRESS = "0xA2C5C98A892fD6656a7F39A2f63228C0Bc846270"
@@ -167,8 +161,6 @@ class ThirdPartyPool(BaseModel):
sell_token: EthereumToken,
sell_amount: Decimal,
buy_token: EthereumToken,
slippage: Decimal = Decimal(0),
create_new_pool: bool = True,
) -> tuple[Decimal, int, TPoolState]:
# if the pool has a hard limit and the sell amount exceeds that, simulate and
# raise a partial trade
@@ -276,7 +268,7 @@ class ThirdPartyPool(BaseModel):
simulate the same block. This is fine, see
https://datarevenue.atlassian.net/browse/ROC-1301
Not naming this method _copy to not confuse with pydantic's .copy method.
Not naming this method _copy to not confuse with Pydantic's .copy method.
"""
return type(self)(
exchange=self.exchange,
@@ -379,8 +371,8 @@ def _merge(a: dict, b: dict, path=None):
to keep track of the ancestry of nested dictionaries.
Returns:
a (dict): The merged dictionary which includes all key-value pairs from b
added into a. If they have nested dictionaries with same keys, those are also merged.
a (dict): The merged dictionary which includes all key-value pairs from `b`
added into `a`. If they have nested dictionaries with same keys, those are also merged.
On key conflicts, preference is given to values from b.
"""
if path is None:

View File

@@ -1,44 +1,34 @@
import asyncio
import json
import platform
import requests
import time
from asyncio.subprocess import STDOUT, PIPE
from collections import defaultdict
from datetime import datetime
from decimal import Decimal
from http.client import HTTPException
from logging import getLogger
from typing import Tuple, Any, Optional, Dict
from typing import Any, Optional, Dict
from protosim_py import (
AccountUpdate,
AccountInfo,
BlockHeader,
TychoDB,
SimulationEngine,
)
import requests
from protosim_py import AccountUpdate, AccountInfo, BlockHeader
from tycho.tycho.constants import (
TYCHO_CLIENT_LOG_FOLDER,
TYCHO_CLIENT_FOLDER,
EXTERNAL_ACCOUNT,
MAX_BALANCE,
)
from tycho.tycho.constants import TYCHO_CLIENT_LOG_FOLDER, TYCHO_CLIENT_FOLDER
from tycho.tycho.decoders import ThirdPartyPoolTychoDecoder
from tycho.tycho.exceptions import APIRequestError
from tycho.tycho.models import Blockchain, EVMBlock, EthereumToken
from tycho.tycho.pool_state import ThirdPartyPool
from tycho.tycho.utils import create_engine, TychoDBSingleton
from tycho.tycho.exceptions import APIRequestError, TychoClientException
from tycho.tycho.models import (
Blockchain,
EVMBlock,
EthereumToken,
BlockProtocolChanges,
SynchronizerState,
)
from tycho.tycho.tycho_db import TychoDBSingleton
from tycho.tycho.utils import create_engine
log = getLogger(__name__)
class TychoClientException(Exception):
pass
class TokenLoader:
def __init__(
self,
@@ -128,7 +118,7 @@ class TychoPoolStateStreamAdapter:
# Create engine
# TODO: This should be initialized outside the adapter?
TychoDBSingleton.initialize(tycho_http_url=self.tycho_url)
self._engine = create_engine([], state_block=None, trace=True)
self._engine = create_engine([], trace=True)
# Loads tokens from Tycho
self._tokens: dict[str, EthereumToken] = TokenLoader(
@@ -222,7 +212,7 @@ class TychoPoolStateStreamAdapter:
sync_state = msg["sync_states"][self.protocol]
state_msg = msg["state_msgs"][self.protocol]
log.info(f"Received sync state for {self.protocol}: {sync_state}")
if not sync_state["status"] != "ready":
if not sync_state["status"] != SynchronizerState.ready.value:
raise ValueError("Tycho-indexer is not synced")
except KeyError:
raise ValueError("Invalid message received from tycho-client.")
@@ -265,10 +255,9 @@ class TychoPoolStateStreamAdapter:
log.info(f"Could not to decode {failed_count}/{total} pool snapshots")
return BlockProtocolChanges(
block=self.current_block,
pool_states=pool_states,
block=block,
pool_states=decoded_pools,
removed_pools=removed_pools,
sync_states=exchanges_states,
deserialization_time=round(deserialization_time, 3),
)

View File

@@ -0,0 +1,48 @@
from protosim_py import TychoDB
class TychoDBSingleton:
"""
A singleton wrapper around the TychoDB class.
This class ensures that there is only one instance of TychoDB throughout the lifetime of the program,
avoiding the overhead of creating multiple instances.
"""
_instance = None
@classmethod
def initialize(cls, tycho_http_url: str):
"""
Initialize the TychoDB instance with the given URLs.
Parameters
----------
tycho_http_url : str
The URL of the Tycho HTTP server.
"""
cls._instance = TychoDB(tycho_http_url=tycho_http_url)
@classmethod
def get_instance(cls) -> TychoDB:
"""
Retrieve the singleton instance of TychoDB.
If the TychoDB instance does not exist, it creates a new one.
If it already exists, it returns the existing instance.
Returns
-------
TychoDB
The singleton instance of TychoDB.
"""
if cls._instance is None:
raise ValueError(
"TychoDB instance not initialized. Call initialize() first."
)
return cls._instance
@classmethod
def clear_instance(cls):
cls._instance = None

View File

@@ -10,12 +10,13 @@ from typing import Final, Any
import eth_abi
from eth_typing import HexStr
from hexbytes import HexBytes
from protosim_py import SimulationEngine, TychoDB, AccountInfo
from protosim_py import SimulationEngine, AccountInfo
from web3 import Web3
from tycho.tycho.constants import EXTERNAL_ACCOUNT, MAX_BALANCE
from tycho.tycho.exceptions import OutOfGas
from tycho.tycho.models import Address, EthereumToken
from tycho.tycho.tycho_db import TychoDBSingleton
log = getLogger(__name__)
@@ -25,53 +26,6 @@ def decode_tycho_exchange(exchange: str) -> (str, bool):
return (exchange.split(":")[1], False) if "vm:" in exchange else (exchange, True)
class TychoDBSingleton:
"""
A singleton wrapper around the TychoDB class.
This class ensures that there is only one instance of TychoDB throughout the lifetime of the program,
avoiding the overhead of creating multiple instances.
"""
_instance = None
@classmethod
def initialize(cls, tycho_http_url: str):
"""
Initialize the TychoDB instance with the given URLs.
Parameters
----------
tycho_http_url : str
The URL of the Tycho HTTP server.
"""
cls._instance = TychoDB(tycho_http_url=tycho_http_url)
@classmethod
def get_instance(cls) -> TychoDB:
"""
Retrieve the singleton instance of TychoDB.
If the TychoDB instance does not exist, it creates a new one.
If it already exists, it returns the existing instance.
Returns
-------
TychoDB
The singleton instance of TychoDB.
"""
if cls._instance is None:
raise ValueError(
"TychoDB instance not initialized. Call initialize() first."
)
return cls._instance
@classmethod
def clear_instance(cls):
cls._instance = None
def create_engine(
mocked_tokens: list[Address], trace: bool = False
) -> SimulationEngine: