159 lines
5.8 KiB
Python
159 lines
5.8 KiB
Python
"""
|
|
Test script for CCXT DataSource adapter (Free Version).
|
|
|
|
This demonstrates how to use the free CCXT adapter (not ccxt.pro) with various
|
|
exchanges. It uses polling instead of WebSocket for real-time updates.
|
|
|
|
CCXT is configured to use Decimal mode internally for precision, but OHLCV data
|
|
is converted to float for optimal DataFrame/analysis performance.
|
|
"""
|
|
|
|
import asyncio
|
|
from decimal import Decimal
|
|
from datetime import datetime, timedelta
|
|
|
|
from src.datasource.adapters.ccxt_adapter import CCXTDataSource
|
|
|
|
|
|
async def test_binance_datasource():
|
|
"""Test Binance exchange data source"""
|
|
print("=" * 60)
|
|
print("Testing CCXT DataSource with Binance (Free Version)")
|
|
print("=" * 60)
|
|
|
|
# Initialize Binance datasource with faster polling for testing
|
|
binance = CCXTDataSource(exchange_id="binance", poll_interval=5)
|
|
|
|
try:
|
|
# Test 1: Get configuration
|
|
print("\n1. Getting datafeed configuration...")
|
|
config = await binance.get_config()
|
|
print(f" Name: {config.name}")
|
|
print(f" Description: {config.description}")
|
|
print(f" Supported resolutions: {config.supported_resolutions[:5]}...")
|
|
print(f" Exchanges: {config.exchanges}")
|
|
|
|
# Test 2: Search symbols
|
|
print("\n2. Searching for BTC symbols...")
|
|
results = await binance.search_symbols("BTC", limit=5)
|
|
print(f" Found {len(results)} symbols:")
|
|
for result in results[:3]:
|
|
print(f" - {result.symbol}: {result.description}")
|
|
|
|
# Test 3: Resolve symbol
|
|
print("\n3. Resolving symbol metadata for BTC/USDT...")
|
|
symbol_info = await binance.resolve_symbol("BTC/USDT")
|
|
print(f" Symbol: {symbol_info.symbol}")
|
|
print(f" Name: {symbol_info.name}")
|
|
print(f" Type: {symbol_info.type}")
|
|
print(f" Exchange: {symbol_info.exchange}")
|
|
print(f" Columns:")
|
|
for col in symbol_info.columns:
|
|
print(f" - {col.name} ({col.type}): {col.description}")
|
|
|
|
# Test 4: Get historical bars
|
|
print("\n4. Fetching historical 1-hour bars for BTC/USDT...")
|
|
end_time = int(datetime.now().timestamp())
|
|
start_time = end_time - (24 * 3600) # Last 24 hours
|
|
|
|
history = await binance.get_bars(
|
|
symbol="BTC/USDT",
|
|
resolution="60", # 1 hour
|
|
from_time=start_time,
|
|
to_time=end_time,
|
|
countback=10,
|
|
)
|
|
|
|
print(f" Retrieved {len(history.bars)} bars")
|
|
if history.bars:
|
|
latest = history.bars[-1]
|
|
print(f" Latest bar:")
|
|
print(f" Time: {datetime.fromtimestamp(latest.time)}")
|
|
print(f" Open: {latest.data['open']} (type: {type(latest.data['open']).__name__})")
|
|
print(f" High: {latest.data['high']} (type: {type(latest.data['high']).__name__})")
|
|
print(f" Low: {latest.data['low']} (type: {type(latest.data['low']).__name__})")
|
|
print(f" Close: {latest.data['close']} (type: {type(latest.data['close']).__name__})")
|
|
print(f" Volume: {latest.data['volume']} (type: {type(latest.data['volume']).__name__})")
|
|
|
|
# Verify OHLCV uses float (converted from Decimal for DataFrame performance)
|
|
assert isinstance(latest.data['close'], float), "OHLCV price should be float type!"
|
|
assert isinstance(latest.data['volume'], float), "OHLCV volume should be float type!"
|
|
print(f" ✓ OHLCV data type verified: using native float (CCXT uses Decimal internally)")
|
|
|
|
# Test 5: Polling subscription (brief test)
|
|
print("\n5. Testing polling-based subscription...")
|
|
print(f" Note: Using free CCXT with {binance._poll_interval}s polling interval")
|
|
tick_count = [0]
|
|
|
|
def on_tick(data):
|
|
tick_count[0] += 1
|
|
if tick_count[0] == 1:
|
|
print(f" Received tick: close={data['close']} (type: {type(data['close']).__name__})")
|
|
assert isinstance(data['close'], float), "Polled OHLCV data should use float!"
|
|
|
|
subscription_id = await binance.subscribe_bars(
|
|
symbol="BTC/USDT",
|
|
resolution="1", # 1 minute
|
|
on_tick=on_tick,
|
|
)
|
|
print(f" Subscription ID: {subscription_id}")
|
|
print(f" Waiting {binance._poll_interval + 2} seconds for first poll...")
|
|
|
|
await asyncio.sleep(binance._poll_interval + 2)
|
|
|
|
# Unsubscribe
|
|
await binance.unsubscribe_bars(subscription_id)
|
|
print(f" ✓ Subscription test complete (received {tick_count[0]} tick(s))")
|
|
|
|
finally:
|
|
await binance.close()
|
|
print("\n✓ Binance datasource test complete")
|
|
|
|
|
|
async def test_multiple_exchanges():
|
|
"""Test multiple exchanges"""
|
|
print("\n" + "=" * 60)
|
|
print("Testing Multiple Exchanges")
|
|
print("=" * 60)
|
|
|
|
exchanges_to_test = ["binance", "coinbase", "kraken"]
|
|
|
|
for exchange_id in exchanges_to_test:
|
|
print(f"\nTesting {exchange_id}...")
|
|
try:
|
|
datasource = CCXTDataSource(exchange_id=exchange_id)
|
|
config = await datasource.get_config()
|
|
print(f" ✓ {config.name}")
|
|
|
|
# Try to search for ETH
|
|
results = await datasource.search_symbols("ETH", limit=3)
|
|
print(f" ✓ Found {len(results)} ETH symbols")
|
|
|
|
await datasource.close()
|
|
|
|
except Exception as e:
|
|
print(f" ✗ Error: {e}")
|
|
|
|
|
|
async def main():
|
|
"""Run all tests"""
|
|
print("\nCCXT DataSource Adapter Test Suite")
|
|
print("=" * 60)
|
|
|
|
try:
|
|
await test_binance_datasource()
|
|
await test_multiple_exchanges()
|
|
|
|
print("\n" + "=" * 60)
|
|
print("All tests completed successfully! ✓")
|
|
print("=" * 60)
|
|
|
|
except Exception as e:
|
|
print(f"\n✗ Test failed: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|