Files
ai/backend/tests/test_ccxt_datasource.py
2026-03-02 01:36:14 -04:00

157 lines
5.7 KiB
Python

"""
Test script for CCXT DataSource adapter (Free Version).
This demonstrates how to use the free CCXT adapter (not ccxt.pro) with various
exchanges. It uses polling instead of WebSocket for real-time updates and
verifies that Decimal precision is maintained throughout.
"""
import asyncio
from decimal import Decimal
from datetime import datetime, timedelta
from src.datasource.adapters.ccxt_adapter import CCXTDataSource
async def test_binance_datasource():
"""Test Binance exchange data source"""
print("=" * 60)
print("Testing CCXT DataSource with Binance (Free Version)")
print("=" * 60)
# Initialize Binance datasource with faster polling for testing
binance = CCXTDataSource(exchange_id="binance", poll_interval=5)
try:
# Test 1: Get configuration
print("\n1. Getting datafeed configuration...")
config = await binance.get_config()
print(f" Name: {config.name}")
print(f" Description: {config.description}")
print(f" Supported resolutions: {config.supported_resolutions[:5]}...")
print(f" Exchanges: {config.exchanges}")
# Test 2: Search symbols
print("\n2. Searching for BTC symbols...")
results = await binance.search_symbols("BTC", limit=5)
print(f" Found {len(results)} symbols:")
for result in results[:3]:
print(f" - {result.symbol}: {result.description}")
# Test 3: Resolve symbol
print("\n3. Resolving symbol metadata for BTC/USDT...")
symbol_info = await binance.resolve_symbol("BTC/USDT")
print(f" Symbol: {symbol_info.symbol}")
print(f" Name: {symbol_info.name}")
print(f" Type: {symbol_info.type}")
print(f" Exchange: {symbol_info.exchange}")
print(f" Columns:")
for col in symbol_info.columns:
print(f" - {col.name} ({col.type}): {col.description}")
# Test 4: Get historical bars
print("\n4. Fetching historical 1-hour bars for BTC/USDT...")
end_time = int(datetime.now().timestamp())
start_time = end_time - (24 * 3600) # Last 24 hours
history = await binance.get_bars(
symbol="BTC/USDT",
resolution="60", # 1 hour
from_time=start_time,
to_time=end_time,
countback=10,
)
print(f" Retrieved {len(history.bars)} bars")
if history.bars:
latest = history.bars[-1]
print(f" Latest bar:")
print(f" Time: {datetime.fromtimestamp(latest.time)}")
print(f" Open: {latest.data['open']} (type: {type(latest.data['open']).__name__})")
print(f" High: {latest.data['high']} (type: {type(latest.data['high']).__name__})")
print(f" Low: {latest.data['low']} (type: {type(latest.data['low']).__name__})")
print(f" Close: {latest.data['close']} (type: {type(latest.data['close']).__name__})")
print(f" Volume: {latest.data['volume']} (type: {type(latest.data['volume']).__name__})")
# Verify Decimal precision
assert isinstance(latest.data['close'], Decimal), "Price should be Decimal type!"
assert isinstance(latest.data['volume'], Decimal), "Volume should be Decimal type!"
print(f" ✓ Numerical precision verified: using Decimal types")
# Test 5: Polling subscription (brief test)
print("\n5. Testing polling-based subscription...")
print(f" Note: Using free CCXT with {binance._poll_interval}s polling interval")
tick_count = [0]
def on_tick(data):
tick_count[0] += 1
if tick_count[0] == 1:
print(f" Received tick: close={data['close']} (type: {type(data['close']).__name__})")
assert isinstance(data['close'], Decimal), "Polled data should use Decimal!"
subscription_id = await binance.subscribe_bars(
symbol="BTC/USDT",
resolution="1", # 1 minute
on_tick=on_tick,
)
print(f" Subscription ID: {subscription_id}")
print(f" Waiting {binance._poll_interval + 2} seconds for first poll...")
await asyncio.sleep(binance._poll_interval + 2)
# Unsubscribe
await binance.unsubscribe_bars(subscription_id)
print(f" ✓ Subscription test complete (received {tick_count[0]} tick(s))")
finally:
await binance.close()
print("\n✓ Binance datasource test complete")
async def test_multiple_exchanges():
"""Test multiple exchanges"""
print("\n" + "=" * 60)
print("Testing Multiple Exchanges")
print("=" * 60)
exchanges_to_test = ["binance", "coinbase", "kraken"]
for exchange_id in exchanges_to_test:
print(f"\nTesting {exchange_id}...")
try:
datasource = CCXTDataSource(exchange_id=exchange_id)
config = await datasource.get_config()
print(f"{config.name}")
# Try to search for ETH
results = await datasource.search_symbols("ETH", limit=3)
print(f" ✓ Found {len(results)} ETH symbols")
await datasource.close()
except Exception as e:
print(f" ✗ Error: {e}")
async def main():
"""Run all tests"""
print("\nCCXT DataSource Adapter Test Suite")
print("=" * 60)
try:
await test_binance_datasource()
await test_multiple_exchanges()
print("\n" + "=" * 60)
print("All tests completed successfully! ✓")
print("=" * 60)
except Exception as e:
print(f"\n✗ Test failed: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(main())