This commit is contained in:
2026-03-02 01:36:14 -04:00
parent d907c5765e
commit 311df9aeda
21 changed files with 2772 additions and 33 deletions

View File

@@ -0,0 +1,97 @@
"""
Example: Integrating CCXT DataSource into main.py
This shows how to register a CCXT exchange datasource in your application.
"""
# In your backend/src/main.py, add this to the lifespan function:
"""
from datasource.adapters.ccxt_adapter import CCXTDataSource
@asynccontextmanager
async def lifespan(app: FastAPI):
global agent_executor
# Initialize data sources
demo_source = DemoDataSource()
datasource_registry.register("demo", demo_source)
subscription_manager.register_source("demo", demo_source)
# Add CCXT Binance datasource
binance_source = CCXTDataSource(
exchange_id="binance",
poll_interval=60, # Poll every 60 seconds for updates
)
datasource_registry.register("binance", binance_source)
subscription_manager.register_source("binance", binance_source)
# Add CCXT Coinbase datasource
coinbase_source = CCXTDataSource(
exchange_id="coinbase",
poll_interval=60,
)
datasource_registry.register("coinbase", coinbase_source)
subscription_manager.register_source("coinbase", coinbase_source)
# Add CCXT Kraken datasource
kraken_source = CCXTDataSource(
exchange_id="kraken",
poll_interval=60,
)
datasource_registry.register("kraken", kraken_source)
subscription_manager.register_source("kraken", kraken_source)
logger.info("DataSource infrastructure initialized with demo + CCXT sources")
# ... rest of initialization ...
yield
# Cleanup
await binance_source.close()
await coinbase_source.close()
await kraken_source.close()
# ... rest of cleanup ...
"""
# Usage examples:
# 1. Connect to datafeed WebSocket and request Binance data:
"""
{
"type": "request",
"source": "binance",
"method": "search_symbols",
"params": {
"query": "BTC"
}
}
"""
# 2. Get historical bars from Binance:
"""
{
"type": "request",
"source": "binance",
"method": "get_bars",
"params": {
"symbol": "BTC/USDT",
"resolution": "60",
"from_time": 1234567890,
"to_time": 1234567999
}
}
"""
# 3. Subscribe to polling updates:
"""
{
"type": "subscribe",
"source": "binance",
"symbol": "BTC/USDT",
"resolution": "1"
}
"""
print("See comments above for integration examples!")

View File

@@ -0,0 +1,174 @@
# System Prompt
You are an AI trading assistant for an AI-native algorithmic trading platform. Your role is to help traders design, implement, and manage trading strategies through natural language interaction.
## Your Core Identity
You are a **strategy authoring assistant**, not a strategy executor. You help users:
- Design trading strategies from natural language descriptions
- Interpret chart annotations and technical requirements
- Generate strategy executables (code artifacts)
- Manage and monitor live trading state
- Analyze market data and provide insights
## Your Capabilities
### State Management
You have read/write access to synchronized state stores:
- **OrderStore**: Active swap orders and order configurations
- **ChartStore**: Current chart view state (symbol, time range, interval)
- `symbol`: Trading pair currently being viewed (e.g., "BINANCE:BTC/USDT")
- `start_time`: Start of visible chart range (Unix timestamp in seconds)
- `end_time`: End of visible chart range (Unix timestamp in seconds)
- `interval`: Chart interval/timeframe (e.g., "15", "60", "D")
- Use your tools to read current state and update it as needed
- All state changes are automatically synchronized with connected clients
### Strategy Authoring
- Help users express trading intent through conversation
- Translate natural language to concrete strategy specifications
- Understand technical analysis concepts (support/resistance, indicators, patterns)
- Generate self-contained, deterministic strategy executables
- Validate strategy logic for correctness and safety
### Data & Analysis
- Access to market data through abstract feed specifications
- Can compute indicators and perform technical analysis
- Understand OHLCV data, order books, and market microstructure
- Interpret unstructured data (news, sentiment, on-chain metrics)
## Communication Style
- **Technical & Direct**: Users are knowledgeable traders, be precise
- **Safety First**: Never make destructive changes without confirmation
- **Explain Actions**: When modifying state, explain what you're doing
- **Ask Questions**: If intent is unclear, ask for clarification
- **Concise**: Be brief but complete, avoid unnecessary elaboration
## Key Principles
1. **Strategies are Deterministic**: Generated strategies run without LLM involvement at runtime
2. **Local Execution**: The platform runs locally for security; you're design-time only
3. **Schema Validation**: All outputs must conform to platform schemas
4. **Risk Awareness**: Always consider position sizing, exposure limits, and risk management
5. **Versioning**: Every strategy artifact is version-controlled with full auditability
## Your Limitations
- You **DO NOT** execute trades directly
- You **DO NOT** have access to live market data in real-time (users provide it)
- You **CANNOT** modify the order kernel or execution layer
- You **SHOULD NOT** make assumptions about user risk tolerance without asking
- You **MUST NOT** provide trading or investment advice
## Memory & Context
You have access to:
- Full conversation history with semantic search
- Project documentation (design, architecture, data formats)
- Past strategy discussions and decisions
- Relevant context retrieved automatically based on current conversation
## Tools Available
### State Management Tools
- `list_sync_stores()`: See available state stores
- `read_sync_state(store_name)`: Read current state
- `write_sync_state(store_name, updates)`: Update state
- `get_store_schema(store_name)`: Inspect state structure
### Data Source Tools
- `list_data_sources()`: List available data sources (exchanges)
- `search_symbols(query, type, exchange, limit)`: Search for trading symbols
- `get_symbol_info(source_name, symbol)`: Get metadata for a symbol
- `get_historical_data(source_name, symbol, resolution, from_time, to_time, countback)`: Get historical bars
- **`get_chart_data(countback)`**: Get data for the chart the user is currently viewing
- This is the **preferred** way to access chart data when analyzing what the user is looking at
- Automatically reads ChartStore to determine symbol, timeframe, and visible range
- Returns OHLCV data plus any custom columns for the visible chart range
- **`analyze_chart_data(python_script, countback)`**: Execute Python analysis on current chart data
- Automatically fetches current chart data and converts to pandas DataFrame
- Execute custom Python scripts with access to pandas, numpy, matplotlib
- Captures matplotlib plots as base64 images for display to user
- Returns result DataFrames and any printed output
- **Use this for technical analysis, indicator calculations, statistical analysis, and visualization**
## Important Behavioral Rules
### Chart Context Awareness
When a user asks about "this chart", "the chart", "what I'm viewing", or similar references to their current view:
1. **ALWAYS** first use `read_sync_state("ChartStore")` to see what they're viewing
2. **NEVER** ask the user to upload an image or tell you what symbol they're looking at
3. The user is viewing a live trading chart in the UI - you can access what they see via ChartStore
4. After reading ChartStore, you can use `get_chart_data()` to get the actual candle data
5. For technical analysis questions, use `analyze_chart_data()` with Python scripts
Examples of questions that require checking ChartStore first:
- "Can you see this chart?"
- "What are the swing highs and lows?"
- "Is this in an uptrend?"
- "What's the current price?"
- "Analyze this chart"
- "What am I looking at?"
### Data Analysis Workflow
1. **Check ChartStore** → Know what the user is viewing
2. **Get data** with `get_chart_data()` → Fetch the actual OHLCV bars
3. **Analyze** with `analyze_chart_data()` → Run Python analysis if needed
4. **Respond** with insights based on the actual data
## Working with Users
1. **Understand Intent**: Ask clarifying questions about strategy goals
2. **Design Together**: Collaborate on strategy logic iteratively
3. **Validate**: Ensure strategy makes sense before generating code
4. **Test**: Encourage backtesting and paper trading first
5. **Monitor**: Help users interpret live strategy behavior
## Example Interactions
**User**: "Can you see this chart? What are the swing highs and lows?"
**You**: *[uses read_sync_state("ChartStore") to check what they're viewing]*
*[uses get_chart_data() to fetch the candle data]*
*[uses analyze_chart_data with Python script to identify swing points]*
"Yes! You're viewing BINANCE:BTC/USDT on the 15-minute chart. I've analyzed the visible range and identified the following swing points:
- Swing High: $43,420 at 14:15 UTC
- Swing Low: $42,980 at 12:30 UTC
- Current swing high: $43,280 at 16:45 UTC (still forming)
The chart shows..."
**User**: "Create a mean reversion strategy on ETH with RSI"
**You**: "I'll help you design a mean reversion strategy for ETH using RSI. A few questions:
- What RSI levels indicate oversold/overbought? (e.g., 30/70)
- What timeframe? (e.g., 1h, 15m)
- Position sizing as % of balance?
- Stop loss and take profit levels?"
**User**: "What am I looking at?"
**You**: *[uses read_sync_state("ChartStore")]*
"You're currently viewing BINANCE:BTC/USDT on a 15-minute chart, looking at the range from 2024-01-15 10:00 to 2024-01-15 18:30."
**User**: "Show me ETH on the 1-hour chart"
**You**: *[uses write_sync_state("ChartStore", {"chart_state": {"symbol": "BINANCE:ETH/USDT", "interval": "60"}})]*
"I've switched your chart to BINANCE:ETH/USDT on the 1-hour timeframe."
**User**: "What's the current price?"
**You**: *[uses get_chart_data(countback=1)]*
"Based on your current chart (BINANCE:BTC/USDT, 15min), the latest close price is $43,250.50 as of 14:30 UTC."
**User**: "Calculate the average price over the visible range"
**You**: *[uses get_chart_data()]*
*[analyzes the returned bars data]*
"Over the visible time range (last 4 hours, 16 candles), the average close price is $43,180.25, with a high of $43,420 and low of $42,980."
**User**: "Calculate RSI and show me a chart"
**You**: *[uses analyze_chart_data with Python script to calculate RSI and create plot]*
"I've calculated the 14-period RSI for your chart. The current RSI is 58.3, indicating neutral momentum. Here's the chart showing price and RSI over the visible range." *[image displayed to user]*
**User**: "Is this in an uptrend?"
**You**: *[uses analyze_chart_data to calculate 20/50 moving averages and analyze trend]*
"Yes, based on the moving averages analysis, the chart is in an uptrend. The 20-period SMA ($43,150) is above the 50-period SMA ($42,800), and both are sloping upward. Price is currently trading above both averages."
---
Remember: You are a collaborative partner in strategy design, not an autonomous trader. Always prioritize safety, clarity, and user intent.

View File

@@ -30,3 +30,7 @@ aiofiles>=24.0.0
# Environment configuration
python-dotenv>=1.0.0
# Secrets management
cryptography>=42.0.0
argon2-cffi>=23.0.0

View File

@@ -572,8 +572,8 @@ async def analyze_chart_data(python_script: str, countback: Optional[int] = None
plot_urls = []
# Determine uploads directory (relative to this file)
uploads_dir = Path(__file__).parent.parent.parent / "uploads"
uploads_dir.mkdir(exist_ok=True)
uploads_dir = Path(__file__).parent.parent.parent / "data" / "uploads"
uploads_dir.mkdir(parents=True, exist_ok=True)
try:
with redirect_stdout(stdout_capture), redirect_stderr(stderr_capture):

View File

@@ -14,7 +14,7 @@ from pydantic import BaseModel
import uuid
import shutil
from sync.protocol import HelloMessage, PatchMessage
from sync.protocol import HelloMessage, PatchMessage, AuthMessage, AuthResponseMessage
from sync.registry import SyncRegistry
from gateway.hub import Gateway
from gateway.channels.websocket import WebSocketChannel
@@ -26,6 +26,7 @@ from schema.chart_state import ChartState
from datasource.registry import DataSourceRegistry
from datasource.subscription_manager import SubscriptionManager
from datasource.websocket_handler import DatafeedWebSocketHandler
from secrets_manager import SecretsStore, InvalidMasterPassword
# Configure logging
logging.basicConfig(
@@ -52,6 +53,9 @@ agent_executor = None
datasource_registry = DataSourceRegistry()
subscription_manager = SubscriptionManager()
# Global secrets store
secrets_store = SecretsStore()
@asynccontextmanager
async def lifespan(app: FastAPI):
@@ -76,8 +80,19 @@ async def lifespan(app: FastAPI):
logger.warning(f"CCXT not available: {e}. Only demo source will be available.")
logger.info("To use real exchange data, install ccxt: pip install ccxt>=4.0.0")
# Get API keys from environment
anthropic_api_key = os.environ.get("ANTHROPIC_API_KEY")
# Get API keys from secrets store if unlocked, otherwise fall back to environment
anthropic_api_key = None
if secrets_store.is_unlocked:
anthropic_api_key = secrets_store.get("ANTHROPIC_API_KEY")
if anthropic_api_key:
logger.info("Loaded API key from encrypted secrets store")
# Fall back to environment variable
if not anthropic_api_key:
anthropic_api_key = os.environ.get("ANTHROPIC_API_KEY")
if anthropic_api_key:
logger.info("Loaded API key from environment")
if not anthropic_api_key:
logger.error("ANTHROPIC_API_KEY not found in environment!")
@@ -117,8 +132,8 @@ async def lifespan(app: FastAPI):
app = FastAPI(lifespan=lifespan)
# Create uploads directory
UPLOAD_DIR = Path(__file__).parent.parent / "uploads"
UPLOAD_DIR.mkdir(exist_ok=True)
UPLOAD_DIR = Path(__file__).parent.parent / "data" / "uploads"
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
# Mount static files for serving uploads
app.mount("/uploads", StaticFiles(directory=str(UPLOAD_DIR)), name="uploads")
@@ -142,14 +157,6 @@ registry.register(chart_store, store_name="ChartStore")
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
registry.websocket = websocket
# Create WebSocket channel for agent communication
channel_id = f"ws_{id(websocket)}"
client_id = f"client_{id(websocket)}"
logger.info(f"WebSocket connected - channel_id: {channel_id}, client_id: {client_id}")
ws_channel = WebSocketChannel(channel_id, websocket, session_id="default")
gateway.register_channel(ws_channel)
# Helper function to send responses
async def send_response(response):
@@ -158,6 +165,168 @@ async def websocket_endpoint(websocket: WebSocket):
except Exception as e:
logger.error(f"Error sending response: {e}")
# Authentication state
is_authenticated = False
# Wait for authentication message (must be first message)
try:
auth_timeout = 30 # 30 seconds to authenticate
auth_data = await asyncio.wait_for(websocket.receive_text(), timeout=auth_timeout)
auth_message_json = json.loads(auth_data)
if auth_message_json.get("type") != "auth":
logger.warning("First message was not auth message")
await send_response(AuthResponseMessage(
success=False,
message="First message must be authentication"
))
await websocket.close(code=1008, reason="Authentication required")
return
auth_msg = AuthMessage(**auth_message_json)
logger.info("Received authentication message")
# Check if secrets store needs initialization
if not secrets_store.is_initialized:
logger.info("Secrets store not initialized, performing first-time setup")
# Require password confirmation for initialization
if not auth_msg.confirm_password:
await send_response(AuthResponseMessage(
success=False,
needs_confirmation=True,
message="First-time setup: password confirmation required"
))
await websocket.close(code=1008, reason="Password confirmation required")
return
if auth_msg.password != auth_msg.confirm_password:
await send_response(AuthResponseMessage(
success=False,
needs_confirmation=True,
message="Passwords do not match"
))
await websocket.close(code=1008, reason="Password confirmation failed")
return
# Initialize secrets store
try:
secrets_store.initialize(auth_msg.password)
# Migrate ANTHROPIC_API_KEY from environment if present
env_key = os.environ.get("ANTHROPIC_API_KEY")
if env_key:
secrets_store.set("ANTHROPIC_API_KEY", env_key)
logger.info("Migrated ANTHROPIC_API_KEY from environment to secrets store")
is_authenticated = True
await send_response(AuthResponseMessage(
success=True,
message="Secrets store initialized successfully"
))
logger.info("Secrets store initialized and authenticated")
except Exception as e:
logger.error(f"Failed to initialize secrets store: {e}")
await send_response(AuthResponseMessage(
success=False,
message=f"Initialization failed: {str(e)}"
))
await websocket.close(code=1011, reason="Initialization failed")
return
else:
# Unlock existing secrets store (or verify password if already unlocked)
try:
# If already unlocked, just verify the password is correct
if secrets_store.is_unlocked:
# Verify password by creating a temporary store and attempting unlock
from secrets_manager import SecretsStore as TempStore
temp_store = TempStore(data_dir=secrets_store.data_dir)
temp_store.unlock(auth_msg.password) # This will throw if wrong password
logger.info("Password verified (store already unlocked)")
else:
secrets_store.unlock(auth_msg.password)
logger.info("Secrets store unlocked successfully")
# Check if user wants to change password
password_changed = False
if auth_msg.change_to_password:
# Validate password change request
if not auth_msg.confirm_new_password:
await send_response(AuthResponseMessage(
success=False,
message="New password confirmation required"
))
await websocket.close(code=1008, reason="Password confirmation required")
return
if auth_msg.change_to_password != auth_msg.confirm_new_password:
await send_response(AuthResponseMessage(
success=False,
message="New passwords do not match"
))
await websocket.close(code=1008, reason="Password confirmation mismatch")
return
# Change the password
try:
secrets_store.change_master_password(auth_msg.password, auth_msg.change_to_password)
password_changed = True
logger.info("Master password changed successfully")
except Exception as e:
logger.error(f"Failed to change password: {e}")
await send_response(AuthResponseMessage(
success=False,
message=f"Failed to change password: {str(e)}"
))
await websocket.close(code=1011, reason="Password change failed")
return
is_authenticated = True
response_message = "Password changed successfully" if password_changed else "Authentication successful"
await send_response(AuthResponseMessage(
success=True,
password_changed=password_changed,
message=response_message
))
except InvalidMasterPassword:
logger.warning("Invalid password attempt")
await send_response(AuthResponseMessage(
success=False,
message="Invalid password"
))
await websocket.close(code=1008, reason="Invalid password")
return
except Exception as e:
logger.error(f"Authentication error: {e}")
await send_response(AuthResponseMessage(
success=False,
message="Authentication failed"
))
await websocket.close(code=1011, reason="Authentication error")
return
except asyncio.TimeoutError:
logger.warning("Authentication timeout")
await websocket.close(code=1008, reason="Authentication timeout")
return
except WebSocketDisconnect:
logger.info("Client disconnected during authentication")
return
except Exception as e:
logger.error(f"Error during authentication: {e}")
await websocket.close(code=1011, reason="Authentication error")
return
# Now authenticated - proceed with normal WebSocket handling
registry.websocket = websocket
# Create WebSocket channel for agent communication
channel_id = f"ws_{id(websocket)}"
client_id = f"client_{id(websocket)}"
logger.info(f"WebSocket authenticated - channel_id: {channel_id}, client_id: {client_id}")
ws_channel = WebSocketChannel(channel_id, websocket, session_id="default")
gateway.register_channel(ws_channel)
try:
while True:
data = await websocket.receive_text()

View File

@@ -1,8 +1,24 @@
from typing import Any, Dict, List, Literal, Union
from typing import Any, Dict, List, Literal, Optional, Union
from pydantic import BaseModel
class AuthMessage(BaseModel):
"""Authentication message (must be first message from client)"""
type: Literal["auth"] = "auth"
password: str
confirm_password: Optional[str] = None # Required only for initialization
change_to_password: Optional[str] = None # If provided, change password after auth
confirm_new_password: Optional[str] = None # Required if change_to_password is set
class AuthResponseMessage(BaseModel):
"""Authentication response from server"""
type: Literal["auth_response"] = "auth_response"
success: bool
needs_confirmation: bool = False # True if this is first-time setup
password_changed: bool = False # True if password was changed
message: str
class SnapshotMessage(BaseModel):
type: Literal["snapshot"] = "snapshot"
store: str
@@ -20,7 +36,7 @@ class HelloMessage(BaseModel):
seqs: Dict[str, int]
# Union type for all messages from backend to frontend
BackendMessage = Union[SnapshotMessage, PatchMessage]
BackendMessage = Union[SnapshotMessage, PatchMessage, AuthResponseMessage]
# Union type for all messages from frontend to backend
FrontendMessage = Union[HelloMessage, PatchMessage]
FrontendMessage = Union[AuthMessage, HelloMessage, PatchMessage]

View File

@@ -0,0 +1,199 @@
"""
Example client demonstrating how to integrate with the datafeed WebSocket API.
This shows how a TradingView integration or custom charting client would
interact with the datafeed.
"""
import asyncio
import json
import time
from typing import Callable, Dict
import websockets
class DatafeedClient:
"""Client for TradingView-compatible datafeed WebSocket API"""
def __init__(self, uri: str = "ws://localhost:8000/ws/datafeed"):
self.uri = uri
self.websocket = None
self.request_id_counter = 0
self.pending_requests: Dict[str, asyncio.Future] = {}
self.subscriptions: Dict[str, Callable] = {}
self._receive_task = None
async def connect(self):
"""Connect to the datafeed WebSocket"""
self.websocket = await websockets.connect(self.uri)
self._receive_task = asyncio.create_task(self._receive_loop())
print(f"Connected to {self.uri}")
async def disconnect(self):
"""Disconnect from the datafeed"""
if self._receive_task:
self._receive_task.cancel()
if self.websocket:
await self.websocket.close()
async def _receive_loop(self):
"""Background task to receive and route messages"""
try:
async for message in self.websocket:
data = json.loads(message)
msg_type = data.get("type")
# Route bar updates to subscription callbacks
if msg_type == "bar_update":
sub_id = data.get("subscription_id")
if sub_id in self.subscriptions:
callback = self.subscriptions[sub_id]
callback(data["bar"])
# Route responses to pending requests
elif "request_id" in data:
req_id = data["request_id"]
if req_id in self.pending_requests:
future = self.pending_requests.pop(req_id)
future.set_result(data)
except asyncio.CancelledError:
pass
except Exception as e:
print(f"Error in receive loop: {e}")
def _next_request_id(self) -> str:
"""Generate next request ID"""
self.request_id_counter += 1
return f"req_{self.request_id_counter}"
async def _send_request(self, request: dict) -> dict:
"""Send a request and wait for response"""
req_id = self._next_request_id()
request["request_id"] = req_id
# Create future for response
future = asyncio.Future()
self.pending_requests[req_id] = future
# Send request
await self.websocket.send(json.dumps(request))
# Wait for response
return await future
async def get_config(self) -> dict:
"""Get datafeed configuration"""
response = await self._send_request({"type": "get_config"})
return response["config"]
async def search_symbols(self, query: str) -> list:
"""Search for symbols"""
response = await self._send_request({"type": "search_symbols", "query": query})
return response["results"]
async def resolve_symbol(self, symbol: str) -> dict:
"""Get symbol metadata"""
response = await self._send_request({"type": "resolve_symbol", "symbol": symbol})
return response["symbol_info"]
async def get_bars(
self, symbol: str, resolution: str, from_time: int, to_time: int, countback: int = None
) -> dict:
"""Get historical bars"""
request = {
"type": "get_bars",
"symbol": symbol,
"resolution": resolution,
"from_time": from_time,
"to_time": to_time,
}
if countback:
request["countback"] = countback
response = await self._send_request(request)
return response["history"]
async def subscribe_bars(
self, symbol: str, resolution: str, subscription_id: str, callback: Callable
):
"""Subscribe to real-time bar updates"""
self.subscriptions[subscription_id] = callback
response = await self._send_request({
"type": "subscribe_bars",
"symbol": symbol,
"resolution": resolution,
"subscription_id": subscription_id,
})
if not response.get("success"):
raise Exception(f"Subscription failed: {response.get('message')}")
async def unsubscribe_bars(self, subscription_id: str):
"""Unsubscribe from updates"""
self.subscriptions.pop(subscription_id, None)
await self._send_request({
"type": "unsubscribe_bars",
"subscription_id": subscription_id,
})
async def main():
"""Example usage of the DatafeedClient"""
client = DatafeedClient()
try:
# Connect
await client.connect()
# Get config
config = await client.get_config()
print(f"\nDatafeed: {config['name']}")
print(f"Supported resolutions: {config['supported_resolutions']}")
# Search for BTC
results = await client.search_symbols("BTC")
print(f"\nSearch results for 'BTC': {len(results)} found")
for result in results:
print(f" - {result['symbol']}: {result['description']}")
# Get symbol info
if results:
symbol = results[0]["symbol"]
info = await client.resolve_symbol(symbol)
print(f"\nSymbol info for {symbol}:")
print(f" Name: {info['name']}")
print(f" Type: {info['type']}")
print(f" Columns: {[c['name'] for c in info['columns']]}")
# Get historical data
now = int(time.time())
from_time = now - 3600 # 1 hour ago
history = await client.get_bars(symbol, "5", from_time, now, countback=10)
print(f"\nHistorical data: {len(history['bars'])} bars")
if history["bars"]:
print(f" First bar time: {history['bars'][0]['time']}")
print(f" Last bar close: {history['bars'][-1]['data']['close']}")
# Subscribe to real-time updates
print(f"\nSubscribing to real-time updates for {symbol}...")
def on_bar_update(bar):
print(f" [UPDATE] Time: {bar['time']}, Close: {bar['close']}")
await client.subscribe_bars(symbol, "5", "my_subscription", on_bar_update)
print(" Waiting for updates (15 seconds)...")
await asyncio.sleep(15)
# Unsubscribe
await client.unsubscribe_bars("my_subscription")
print(" Unsubscribed")
finally:
await client.disconnect()
if __name__ == "__main__":
print("=== Datafeed Client Example ===\n")
print("Make sure the backend server is running on http://localhost:8000")
asyncio.run(main())

View File

@@ -0,0 +1,156 @@
"""
Test script for CCXT DataSource adapter (Free Version).
This demonstrates how to use the free CCXT adapter (not ccxt.pro) with various
exchanges. It uses polling instead of WebSocket for real-time updates and
verifies that Decimal precision is maintained throughout.
"""
import asyncio
from decimal import Decimal
from datetime import datetime, timedelta
from src.datasource.adapters.ccxt_adapter import CCXTDataSource
async def test_binance_datasource():
"""Test Binance exchange data source"""
print("=" * 60)
print("Testing CCXT DataSource with Binance (Free Version)")
print("=" * 60)
# Initialize Binance datasource with faster polling for testing
binance = CCXTDataSource(exchange_id="binance", poll_interval=5)
try:
# Test 1: Get configuration
print("\n1. Getting datafeed configuration...")
config = await binance.get_config()
print(f" Name: {config.name}")
print(f" Description: {config.description}")
print(f" Supported resolutions: {config.supported_resolutions[:5]}...")
print(f" Exchanges: {config.exchanges}")
# Test 2: Search symbols
print("\n2. Searching for BTC symbols...")
results = await binance.search_symbols("BTC", limit=5)
print(f" Found {len(results)} symbols:")
for result in results[:3]:
print(f" - {result.symbol}: {result.description}")
# Test 3: Resolve symbol
print("\n3. Resolving symbol metadata for BTC/USDT...")
symbol_info = await binance.resolve_symbol("BTC/USDT")
print(f" Symbol: {symbol_info.symbol}")
print(f" Name: {symbol_info.name}")
print(f" Type: {symbol_info.type}")
print(f" Exchange: {symbol_info.exchange}")
print(f" Columns:")
for col in symbol_info.columns:
print(f" - {col.name} ({col.type}): {col.description}")
# Test 4: Get historical bars
print("\n4. Fetching historical 1-hour bars for BTC/USDT...")
end_time = int(datetime.now().timestamp())
start_time = end_time - (24 * 3600) # Last 24 hours
history = await binance.get_bars(
symbol="BTC/USDT",
resolution="60", # 1 hour
from_time=start_time,
to_time=end_time,
countback=10,
)
print(f" Retrieved {len(history.bars)} bars")
if history.bars:
latest = history.bars[-1]
print(f" Latest bar:")
print(f" Time: {datetime.fromtimestamp(latest.time)}")
print(f" Open: {latest.data['open']} (type: {type(latest.data['open']).__name__})")
print(f" High: {latest.data['high']} (type: {type(latest.data['high']).__name__})")
print(f" Low: {latest.data['low']} (type: {type(latest.data['low']).__name__})")
print(f" Close: {latest.data['close']} (type: {type(latest.data['close']).__name__})")
print(f" Volume: {latest.data['volume']} (type: {type(latest.data['volume']).__name__})")
# Verify Decimal precision
assert isinstance(latest.data['close'], Decimal), "Price should be Decimal type!"
assert isinstance(latest.data['volume'], Decimal), "Volume should be Decimal type!"
print(f" ✓ Numerical precision verified: using Decimal types")
# Test 5: Polling subscription (brief test)
print("\n5. Testing polling-based subscription...")
print(f" Note: Using free CCXT with {binance._poll_interval}s polling interval")
tick_count = [0]
def on_tick(data):
tick_count[0] += 1
if tick_count[0] == 1:
print(f" Received tick: close={data['close']} (type: {type(data['close']).__name__})")
assert isinstance(data['close'], Decimal), "Polled data should use Decimal!"
subscription_id = await binance.subscribe_bars(
symbol="BTC/USDT",
resolution="1", # 1 minute
on_tick=on_tick,
)
print(f" Subscription ID: {subscription_id}")
print(f" Waiting {binance._poll_interval + 2} seconds for first poll...")
await asyncio.sleep(binance._poll_interval + 2)
# Unsubscribe
await binance.unsubscribe_bars(subscription_id)
print(f" ✓ Subscription test complete (received {tick_count[0]} tick(s))")
finally:
await binance.close()
print("\n✓ Binance datasource test complete")
async def test_multiple_exchanges():
"""Test multiple exchanges"""
print("\n" + "=" * 60)
print("Testing Multiple Exchanges")
print("=" * 60)
exchanges_to_test = ["binance", "coinbase", "kraken"]
for exchange_id in exchanges_to_test:
print(f"\nTesting {exchange_id}...")
try:
datasource = CCXTDataSource(exchange_id=exchange_id)
config = await datasource.get_config()
print(f"{config.name}")
# Try to search for ETH
results = await datasource.search_symbols("ETH", limit=3)
print(f" ✓ Found {len(results)} ETH symbols")
await datasource.close()
except Exception as e:
print(f" ✗ Error: {e}")
async def main():
"""Run all tests"""
print("\nCCXT DataSource Adapter Test Suite")
print("=" * 60)
try:
await test_binance_datasource()
await test_multiple_exchanges()
print("\n" + "=" * 60)
print("All tests completed successfully! ✓")
print("=" * 60)
except Exception as e:
print(f"\n✗ Test failed: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,130 @@
"""
Test client for TradingView-compatible datafeed WebSocket API.
Run this to test the datafeed endpoints:
python -m pytest backend/tests/test_datafeed_websocket.py -v -s
Or run directly:
python backend/tests/test_datafeed_websocket.py
"""
import asyncio
import json
import time
import websockets
async def test_datafeed():
"""Test datafeed WebSocket API"""
uri = "ws://localhost:8000/ws/datafeed"
async with websockets.connect(uri) as websocket:
print("✓ Connected to datafeed WebSocket")
# Test 1: Get config
print("\n--- Test 1: Get Config ---")
request = {"type": "get_config", "request_id": "req_1"}
await websocket.send(json.dumps(request))
response = json.loads(await websocket.recv())
print(f"Response: {json.dumps(response, indent=2)}")
assert response["type"] == "get_config_response"
print("✓ Config retrieved")
# Test 2: Search symbols
print("\n--- Test 2: Search Symbols ---")
request = {"type": "search_symbols", "request_id": "req_2", "query": "BTC"}
await websocket.send(json.dumps(request))
response = json.loads(await websocket.recv())
print(f"Response: {json.dumps(response, indent=2)}")
assert response["type"] == "search_symbols_response"
assert len(response["results"]) > 0
print(f"✓ Found {len(response['results'])} symbols")
# Test 3: Resolve symbol
print("\n--- Test 3: Resolve Symbol ---")
request = {"type": "resolve_symbol", "request_id": "req_3", "symbol": "DEMO:BTC/USD"}
await websocket.send(json.dumps(request))
response = json.loads(await websocket.recv())
print(f"Response: {json.dumps(response, indent=2)}")
assert response["type"] == "resolve_symbol_response"
symbol_info = response["symbol_info"]
print(f"✓ Symbol resolved: {symbol_info['name']}")
print(f" Available columns: {[c['name'] for c in symbol_info['columns']]}")
# Test 4: Get historical bars
print("\n--- Test 4: Get Historical Bars ---")
now = int(time.time())
from_time = now - 3600 # 1 hour ago
request = {
"type": "get_bars",
"request_id": "req_4",
"symbol": "DEMO:BTC/USD",
"resolution": "5",
"from_time": from_time,
"to_time": now,
"countback": 10,
}
await websocket.send(json.dumps(request))
response = json.loads(await websocket.recv())
print(f"Response type: {response['type']}")
if response["type"] == "get_bars_response":
history = response["history"]
print(f"✓ Received {len(history['bars'])} bars")
if history["bars"]:
print(f" First bar: {history['bars'][0]}")
print(f" Last bar: {history['bars'][-1]}")
else:
print(f"Error: {response}")
# Test 5: Subscribe to real-time updates
print("\n--- Test 5: Subscribe to Real-time Updates ---")
request = {
"type": "subscribe_bars",
"request_id": "req_5",
"symbol": "DEMO:BTC/USD",
"resolution": "5",
"subscription_id": "sub_1",
}
await websocket.send(json.dumps(request))
response = json.loads(await websocket.recv())
print(f"Subscription response: {json.dumps(response, indent=2)}")
assert response["type"] == "subscribe_bars_response"
assert response["success"] is True
print("✓ Subscribed successfully")
# Wait for a few updates
print("\n Waiting for real-time updates (10 seconds)...")
update_count = 0
try:
for _ in range(3): # Wait for up to 3 messages
response = await asyncio.wait_for(websocket.recv(), timeout=5.0)
message = json.loads(response)
if message["type"] == "bar_update":
update_count += 1
print(f" Update {update_count}: {message['bar']}")
except asyncio.TimeoutError:
print(f" No more updates received (got {update_count} updates)")
# Test 6: Unsubscribe
print("\n--- Test 6: Unsubscribe ---")
request = {
"type": "unsubscribe_bars",
"request_id": "req_6",
"subscription_id": "sub_1",
}
await websocket.send(json.dumps(request))
response = json.loads(await websocket.recv())
print(f"Unsubscribe response: {json.dumps(response, indent=2)}")
assert response["type"] == "unsubscribe_bars_response"
assert response["success"] is True
print("✓ Unsubscribed successfully")
print("\n=== All tests passed! ===")
if __name__ == "__main__":
print("Starting datafeed WebSocket tests...")
print("Make sure the backend server is running on http://localhost:8000")
print()
asyncio.run(test_datafeed())