""" Example client demonstrating how to integrate with the datafeed WebSocket API. This shows how a TradingView integration or custom charting client would interact with the datafeed. """ import asyncio import json import time from typing import Callable, Dict import websockets class DatafeedClient: """Client for TradingView-compatible datafeed WebSocket API""" def __init__(self, uri: str = "ws://localhost:8000/ws/datafeed"): self.uri = uri self.websocket = None self.request_id_counter = 0 self.pending_requests: Dict[str, asyncio.Future] = {} self.subscriptions: Dict[str, Callable] = {} self._receive_task = None async def connect(self): """Connect to the datafeed WebSocket""" self.websocket = await websockets.connect(self.uri) self._receive_task = asyncio.create_task(self._receive_loop()) print(f"Connected to {self.uri}") async def disconnect(self): """Disconnect from the datafeed""" if self._receive_task: self._receive_task.cancel() if self.websocket: await self.websocket.close() async def _receive_loop(self): """Background task to receive and route messages""" try: async for message in self.websocket: data = json.loads(message) msg_type = data.get("type") # Route bar updates to subscription callbacks if msg_type == "bar_update": sub_id = data.get("subscription_id") if sub_id in self.subscriptions: callback = self.subscriptions[sub_id] callback(data["bar"]) # Route responses to pending requests elif "request_id" in data: req_id = data["request_id"] if req_id in self.pending_requests: future = self.pending_requests.pop(req_id) future.set_result(data) except asyncio.CancelledError: pass except Exception as e: print(f"Error in receive loop: {e}") def _next_request_id(self) -> str: """Generate next request ID""" self.request_id_counter += 1 return f"req_{self.request_id_counter}" async def _send_request(self, request: dict) -> dict: """Send a request and wait for response""" req_id = self._next_request_id() request["request_id"] = req_id # Create future for response future = asyncio.Future() self.pending_requests[req_id] = future # Send request await self.websocket.send(json.dumps(request)) # Wait for response return await future async def get_config(self) -> dict: """Get datafeed configuration""" response = await self._send_request({"type": "get_config"}) return response["config"] async def search_symbols(self, query: str) -> list: """Search for symbols""" response = await self._send_request({"type": "search_symbols", "query": query}) return response["results"] async def resolve_symbol(self, symbol: str) -> dict: """Get symbol metadata""" response = await self._send_request({"type": "resolve_symbol", "symbol": symbol}) return response["symbol_info"] async def get_bars( self, symbol: str, resolution: str, from_time: int, to_time: int, countback: int = None ) -> dict: """Get historical bars""" request = { "type": "get_bars", "symbol": symbol, "resolution": resolution, "from_time": from_time, "to_time": to_time, } if countback: request["countback"] = countback response = await self._send_request(request) return response["history"] async def subscribe_bars( self, symbol: str, resolution: str, subscription_id: str, callback: Callable ): """Subscribe to real-time bar updates""" self.subscriptions[subscription_id] = callback response = await self._send_request({ "type": "subscribe_bars", "symbol": symbol, "resolution": resolution, "subscription_id": subscription_id, }) if not response.get("success"): raise Exception(f"Subscription failed: {response.get('message')}") async def unsubscribe_bars(self, subscription_id: str): """Unsubscribe from updates""" self.subscriptions.pop(subscription_id, None) await self._send_request({ "type": "unsubscribe_bars", "subscription_id": subscription_id, }) async def main(): """Example usage of the DatafeedClient""" client = DatafeedClient() try: # Connect await client.connect() # Get config config = await client.get_config() print(f"\nDatafeed: {config['name']}") print(f"Supported resolutions: {config['supported_resolutions']}") # Search for BTC results = await client.search_symbols("BTC") print(f"\nSearch results for 'BTC': {len(results)} found") for result in results: print(f" - {result['symbol']}: {result['description']}") # Get symbol info if results: symbol = results[0]["symbol"] info = await client.resolve_symbol(symbol) print(f"\nSymbol info for {symbol}:") print(f" Name: {info['name']}") print(f" Type: {info['type']}") print(f" Columns: {[c['name'] for c in info['columns']]}") # Get historical data now = int(time.time()) from_time = now - 3600 # 1 hour ago history = await client.get_bars(symbol, "5", from_time, now, countback=10) print(f"\nHistorical data: {len(history['bars'])} bars") if history["bars"]: print(f" First bar time: {history['bars'][0]['time']}") print(f" Last bar close: {history['bars'][-1]['data']['close']}") # Subscribe to real-time updates print(f"\nSubscribing to real-time updates for {symbol}...") def on_bar_update(bar): print(f" [UPDATE] Time: {bar['time']}, Close: {bar['close']}") await client.subscribe_bars(symbol, "5", "my_subscription", on_bar_update) print(" Waiting for updates (15 seconds)...") await asyncio.sleep(15) # Unsubscribe await client.unsubscribe_bars("my_subscription") print(" Unsubscribed") finally: await client.disconnect() if __name__ == "__main__": print("=== Datafeed Client Example ===\n") print("Make sure the backend server is running on http://localhost:8000") asyncio.run(main())