""" Test client for TradingView-compatible datafeed WebSocket API. Run this to test the datafeed endpoints: python -m pytest backend/tests/test_datafeed_websocket.py -v -s Or run directly: python backend/tests/test_datafeed_websocket.py """ import asyncio import json import time import websockets async def test_datafeed(): """Test datafeed WebSocket API""" uri = "ws://localhost:8000/ws/datafeed" async with websockets.connect(uri) as websocket: print("✓ Connected to datafeed WebSocket") # Test 1: Get config print("\n--- Test 1: Get Config ---") request = {"type": "get_config", "request_id": "req_1"} await websocket.send(json.dumps(request)) response = json.loads(await websocket.recv()) print(f"Response: {json.dumps(response, indent=2)}") assert response["type"] == "get_config_response" print("✓ Config retrieved") # Test 2: Search symbols print("\n--- Test 2: Search Symbols ---") request = {"type": "search_symbols", "request_id": "req_2", "query": "BTC"} await websocket.send(json.dumps(request)) response = json.loads(await websocket.recv()) print(f"Response: {json.dumps(response, indent=2)}") assert response["type"] == "search_symbols_response" assert len(response["results"]) > 0 print(f"✓ Found {len(response['results'])} symbols") # Test 3: Resolve symbol print("\n--- Test 3: Resolve Symbol ---") request = {"type": "resolve_symbol", "request_id": "req_3", "symbol": "DEMO:BTC/USD"} await websocket.send(json.dumps(request)) response = json.loads(await websocket.recv()) print(f"Response: {json.dumps(response, indent=2)}") assert response["type"] == "resolve_symbol_response" symbol_info = response["symbol_info"] print(f"✓ Symbol resolved: {symbol_info['name']}") print(f" Available columns: {[c['name'] for c in symbol_info['columns']]}") # Test 4: Get historical bars print("\n--- Test 4: Get Historical Bars ---") now = int(time.time()) from_time = now - 3600 # 1 hour ago request = { "type": "get_bars", "request_id": "req_4", "symbol": "DEMO:BTC/USD", "resolution": "5", "from_time": from_time, "to_time": now, "countback": 10, } await websocket.send(json.dumps(request)) response = json.loads(await websocket.recv()) print(f"Response type: {response['type']}") if response["type"] == "get_bars_response": history = response["history"] print(f"✓ Received {len(history['bars'])} bars") if history["bars"]: print(f" First bar: {history['bars'][0]}") print(f" Last bar: {history['bars'][-1]}") else: print(f"Error: {response}") # Test 5: Subscribe to real-time updates print("\n--- Test 5: Subscribe to Real-time Updates ---") request = { "type": "subscribe_bars", "request_id": "req_5", "symbol": "DEMO:BTC/USD", "resolution": "5", "subscription_id": "sub_1", } await websocket.send(json.dumps(request)) response = json.loads(await websocket.recv()) print(f"Subscription response: {json.dumps(response, indent=2)}") assert response["type"] == "subscribe_bars_response" assert response["success"] is True print("✓ Subscribed successfully") # Wait for a few updates print("\n Waiting for real-time updates (10 seconds)...") update_count = 0 try: for _ in range(3): # Wait for up to 3 messages response = await asyncio.wait_for(websocket.recv(), timeout=5.0) message = json.loads(response) if message["type"] == "bar_update": update_count += 1 print(f" Update {update_count}: {message['bar']}") except asyncio.TimeoutError: print(f" No more updates received (got {update_count} updates)") # Test 6: Unsubscribe print("\n--- Test 6: Unsubscribe ---") request = { "type": "unsubscribe_bars", "request_id": "req_6", "subscription_id": "sub_1", } await websocket.send(json.dumps(request)) response = json.loads(await websocket.recv()) print(f"Unsubscribe response: {json.dumps(response, indent=2)}") assert response["type"] == "unsubscribe_bars_response" assert response["success"] is True print("✓ Unsubscribed successfully") print("\n=== All tests passed! ===") if __name__ == "__main__": print("Starting datafeed WebSocket tests...") print("Make sure the backend server is running on http://localhost:8000") print() asyncio.run(test_datafeed())