script to calculate the APY for pools

This commit is contained in:
2025-11-14 13:29:04 -04:00
parent 48fc46f801
commit 4cc280e011

View File

@@ -0,0 +1,278 @@
#!/usr/bin/env python3
"""
Script to calculate APY for all PartyPools.
Fetches all pools from PartyPlanner and calculates APY for each one.
"""
from web3 import Web3
from datetime import datetime
import json
import os
# Configuration
RPC_URL = "https://eth-sepolia.g.alchemy.com/v2/sJ7rLYKJzdRqXUs9cOiLVvHN8aTI30dn"
CHAIN_ID = 11155111
DEPLOYMENT_JSON_PATH = "../liqp-deployments.json"
DEPLOYMENT_ARTIFACTS_PATH = "../deployment"
TIME_RANGE_SECONDS = 3600 # 1 hour
# ABIs will be loaded dynamically from deployment artifacts
ERC20_ABI = []
PARTY_INFO_ABI = []
PARTY_PLANNER_ABI = []
POOL_ABI = []
def load_abi_from_deployment(chain_id, contract_name):
"""Load ABI from deployment artifacts"""
script_dir = os.path.dirname(os.path.abspath(__file__))
artifact_path = os.path.join(
script_dir,
DEPLOYMENT_ARTIFACTS_PATH,
str(chain_id),
'v1',
'out',
f'{contract_name}.sol',
f'{contract_name}.json'
)
try:
with open(artifact_path, 'r') as f:
artifact_data = json.load(f)
return artifact_data.get('abi', [])
except Exception as e:
print(f"Error loading ABI for {contract_name}: {e}")
return []
def q64_64_to_float(q64_64_value):
"""Convert ABDK Q64.64 fixed-point to float"""
return q64_64_value / (2 ** 64)
def find_block_by_timestamp(w3, target_timestamp, low, high):
"""Binary search to find block closest to target timestamp"""
if low >= high:
return low
mid = (low + high) // 2
mid_block = w3.eth.get_block(mid)
mid_timestamp = mid_block['timestamp']
if mid_timestamp < target_timestamp:
return find_block_by_timestamp(w3, target_timestamp, mid + 1, high)
else:
return find_block_by_timestamp(w3, target_timestamp, low, mid)
def calculate_pool_apy(w3, pool_address, party_info_contract, from_block, current_block):
"""Calculate APY for a single pool"""
try:
pool_address = Web3.to_checksum_address(pool_address)
pool_contract = w3.eth.contract(address=pool_address, abi=POOL_ABI)
token_addresses = pool_contract.functions.allTokens().call()
# Get token info and calculate TVL
token_info = {}
stablecoin_decimals = None
pool_tvl_usd = 0.0
token_symbols = []
for idx, token_addr in enumerate(token_addresses):
token_contract = w3.eth.contract(address=token_addr, abi=ERC20_ABI)
decimals = token_contract.functions.decimals().call()
symbol = token_contract.functions.symbol().call()
token_symbols.append(symbol)
balance_raw = token_contract.functions.balanceOf(pool_address).call()
balance_human = balance_raw / (10 ** decimals)
if idx == 0:
stablecoin_decimals = decimals
price_usd = 1.0
else:
price_q64_64 = party_info_contract.functions.price(pool_address, idx, 0).call()
price_ratio = q64_64_to_float(price_q64_64)
decimal_adjustment = 10 ** (stablecoin_decimals - decimals)
price_usd = price_ratio * decimal_adjustment
token_value_usd = balance_human * price_usd
pool_tvl_usd += token_value_usd
token_info[token_addr.lower()] = {'decimals': decimals, 'price_usd': price_usd}
# Fetch events and calculate fees
total_fees_usd = 0.0
event_count = 0
# Process Swap events
try:
logs = pool_contract.events.Swap().get_logs(from_block=from_block, to_block=current_block)
event_count += len(logs)
for log in logs:
token = token_info.get(log['args']['tokenIn'].lower())
if token:
fee = (log['args']['lpFee'] + log['args']['protocolFee']) / (10 ** token['decimals']) * token['price_usd']
total_fees_usd += fee
print(f" Swap - Block {log['blockNumber']}, TX {log['transactionHash'].hex()[:10]}..., Fee: ${fee:.6f}")
except Exception as e:
print(f" Error fetching Swap events: {e}")
# Process SwapMint events
try:
logs = pool_contract.events.SwapMint().get_logs(from_block=from_block, to_block=current_block)
event_count += len(logs)
for log in logs:
token = token_info.get(log['args']['tokenIn'].lower())
if token:
fee = (log['args']['lpFee'] + log['args']['protocolFee']) / (10 ** token['decimals']) * token['price_usd']
total_fees_usd += fee
print(f" SwapMint - Block {log['blockNumber']}, TX {log['transactionHash'].hex()[:10]}..., Fee: ${fee:.6f}")
except Exception as e:
print(f" Error fetching SwapMint events: {e}")
# Process BurnSwap events
try:
logs = pool_contract.events.BurnSwap().get_logs(from_block=from_block, to_block=current_block)
event_count += len(logs)
for log in logs:
token = token_info.get(log['args']['tokenOut'].lower())
if token:
fee = (log['args']['lpFee'] + log['args']['protocolFee']) / (10 ** token['decimals']) * token['price_usd']
total_fees_usd += fee
print(f" BurnSwap - Block {log['blockNumber']}, TX {log['transactionHash'].hex()[:10]}..., Fee: ${fee:.6f}")
except Exception as e:
print(f" Error fetching BurnSwap events: {e}")
# Process Flash events
try:
logs = pool_contract.events.Flash().get_logs(from_block=from_block, to_block=current_block)
event_count += len(logs)
for log in logs:
token = token_info.get(log['args']['token'].lower())
if token:
fee = (log['args']['lpFee'] + log['args']['protocolFee']) / (10 ** token['decimals']) * token['price_usd']
total_fees_usd += fee
print(f" Flash - Block {log['blockNumber']}, TX {log['transactionHash'].hex()[:10]}..., Fee: ${fee:.6f}")
except Exception as e:
print(f" Error fetching Flash events: {e}")
# Calculate APY
if pool_tvl_usd > 0 and TIME_RANGE_SECONDS:
days = TIME_RANGE_SECONDS / 86400
daily_return = (total_fees_usd / pool_tvl_usd) / days
apy = (pow(1 + daily_return, 365) - 1) * 100
else:
apy = 0
return {
'pool_address': pool_address,
'tokens': token_symbols,
'tvl_usd': pool_tvl_usd,
'fees_usd': total_fees_usd,
'event_count': event_count,
'apy': apy
}
except Exception as e:
print(f"Error processing pool {pool_address}: {e}")
return None
def main():
w3 = Web3(Web3.HTTPProvider(RPC_URL))
if not w3.is_connected():
print(f"Error: Cannot connect to RPC node at {RPC_URL}")
return
chain_id = w3.eth.chain_id
print(f"Connected to chain {chain_id}")
print(f"Current block: {w3.eth.block_number}")
# Load ABIs from deployment artifacts
print("Loading ABIs from deployment artifacts...")
erc20_abi = load_abi_from_deployment(chain_id, 'ERC20')
party_info_abi = load_abi_from_deployment(chain_id, 'PartyInfo')
party_planner_abi = load_abi_from_deployment(chain_id, 'PartyPlanner')
pool_abi = load_abi_from_deployment(chain_id, 'PartyPool')
if not erc20_abi or not party_info_abi or not party_planner_abi or not pool_abi:
print("Error: Failed to load one or more ABIs from deployment artifacts")
return
# Update global ABIs
global ERC20_ABI, PARTY_INFO_ABI, PARTY_PLANNER_ABI, POOL_ABI
ERC20_ABI = erc20_abi
PARTY_INFO_ABI = party_info_abi
PARTY_PLANNER_ABI = party_planner_abi
POOL_ABI = pool_abi
# Load deployment addresses
# tim is adding comments
script_dir = os.path.dirname(os.path.abspath(__file__))
deployment_path = os.path.join(script_dir, DEPLOYMENT_JSON_PATH)
try:
with open(deployment_path, 'r') as f:
deployment_data = json.load(f)
chain_data = deployment_data.get(str(chain_id), {}).get('v1', {})
party_info_address = chain_data.get('PartyInfo')
party_planner_address = chain_data.get('PartyPlanner')
if not party_info_address or not party_planner_address:
print(f"Error: PartyInfo or PartyPlanner not found in deployment JSON")
return
except Exception as e:
print(f"Error: Could not load deployment JSON: {e}")
return
party_info_address = Web3.to_checksum_address(party_info_address)
party_planner_address = Web3.to_checksum_address(party_planner_address)
print(f"PartyInfo: {party_info_address}")
print(f"PartyPlanner: {party_planner_address}")
# Get all pools from PartyPlanner
party_planner = w3.eth.contract(address=party_planner_address, abi=PARTY_PLANNER_ABI)
party_info_contract = w3.eth.contract(address=party_info_address, abi=PARTY_INFO_ABI)
pool_count = party_planner.functions.poolCount().call()
print(f"\nTotal pools: {pool_count}")
if pool_count == 0:
print("No pools found")
return
# Get all pools
all_pools = party_planner.functions.getAllPools(0, pool_count).call()
print(f"Fetched {len(all_pools)} pool addresses\n")
# Calculate block range for last 24 hours
current_block = w3.eth.block_number
current_timestamp = w3.eth.get_block(current_block)['timestamp']
target_timestamp = current_timestamp - TIME_RANGE_SECONDS
from_block = find_block_by_timestamp(w3, target_timestamp, 0, current_block)
print(f"Analyzing events from block {from_block} to {current_block}")
print(f"Time range: last {TIME_RANGE_SECONDS / 3600:.1f} hours\n")
# Calculate APY for each pool
print(f"{'='*110}")
print(f"{'Pool Address':<45} {'Tokens':<20} {'TVL (USD)':<15} {'Fees (24h)':<15} {'Events':<8} {'APY':<10}")
print(f"{'='*110}")
results = []
for pool_addr in all_pools:
result = calculate_pool_apy(w3, pool_addr, party_info_contract, from_block, current_block)
if result:
results.append(result)
token_str = '/'.join(result['tokens'][:3])
print(f"{result['pool_address']:<45} {token_str:<20} ${result['tvl_usd']:>13,.2f} ${result['fees_usd']:>13,.6f} {result['event_count']:>7} {result['apy']:>9.2f}%")
print(f"{'='*110}")
print(f"\nSummary:")
print(f" Total Pools: {len(results)}")
print(f" Total TVL: ${sum(r['tvl_usd'] for r in results):,.2f}")
print(f" Total 24h Fees: ${sum(r['fees_usd'] for r in results):,.6f}")
if results:
print(f" Avg APY: {sum(r['apy'] for r in results) / len(results):.2f}%")
if __name__ == "__main__":
main()