Files
ai/test/history_client/client_async.py
2026-03-11 18:47:11 -04:00

309 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Async ZMQ client for historical OHLC data requests via Relay gateway.
Uses async pub/sub pattern: submit request → wait for notification → query Iceberg
"""
import zmq
import struct
import json
import time
import uuid
from datetime import datetime, timezone
# Protocol constants
PROTOCOL_VERSION = 0x01
MSG_TYPE_SUBMIT_REQUEST = 0x10
MSG_TYPE_SUBMIT_RESPONSE = 0x11
MSG_TYPE_HISTORY_READY = 0x12
class AsyncHistoryClient:
def __init__(self, relay_host='relay', request_port=5559, data_port=5558):
self.context = zmq.Context()
self.request_socket = None
self.subscribe_socket = None
self.relay_endpoint_req = f"tcp://{relay_host}:{request_port}"
self.relay_endpoint_sub = f"tcp://{relay_host}:{data_port}"
self.client_id = f"client-{uuid.uuid4().hex[:8]}"
def connect(self):
"""Connect to Relay endpoints"""
# REQ socket for submitting requests (gets immediate ack)
self.request_socket = self.context.socket(zmq.REQ)
self.request_socket.connect(self.relay_endpoint_req)
print(f"Connected REQ socket to Relay at {self.relay_endpoint_req}")
# SUB socket for receiving notifications
self.subscribe_socket = self.context.socket(zmq.SUB)
self.subscribe_socket.connect(self.relay_endpoint_sub)
# CRITICAL: Subscribe to our client-specific response topic BEFORE submitting any requests
# This prevents race condition where notification arrives before we subscribe.
# The notification topic is deterministic: RESPONSE:{client_id} (we generate client_id)
response_topic = f"RESPONSE:{self.client_id}"
self.subscribe_socket.subscribe(response_topic.encode())
print(f"Connected SUB socket to Relay at {self.relay_endpoint_sub}")
print(f"Subscribed to topic: {response_topic}")
print(f"✓ Safe to submit requests - already subscribed to notifications")
def request_historical_ohlc(self, ticker, start_time, end_time, period_seconds, limit=None, timeout_secs=60):
"""
Request historical OHLC data (async pattern).
Flow:
1. Submit request → get immediate ack with request_id
2. Wait for HistoryReadyNotification on pub/sub
3. Query Iceberg with the table information (or notification includes data)
Args:
ticker: Market identifier (e.g., "BINANCE:BTC/USDT")
start_time: Start timestamp in microseconds since epoch
end_time: End timestamp in microseconds since epoch
period_seconds: OHLC period in seconds (e.g., 3600 for 1h)
limit: Optional limit on number of candles
timeout_secs: How long to wait for notification (default 60s)
Returns:
Notification dict or None on timeout
"""
# Generate request ID
request_id = f"{self.client_id}-{int(time.time() * 1000)}"
# Build SubmitHistoricalRequest
request = {
'request_id': request_id,
'ticker': ticker,
'start_time': start_time,
'end_time': end_time,
'period_seconds': period_seconds,
'client_id': self.client_id, # For response routing
}
if limit:
request['limit'] = limit
print(f"\n=== Step 1: Submitting Request ===")
print(f"Request ID: {request_id}")
print(f"Ticker: {ticker}")
print(f"Period: {period_seconds}s ({period_seconds // 3600}h)")
print(f"Start: {datetime.fromtimestamp(start_time / 1_000_000, tz=timezone.utc).isoformat()}")
print(f"End: {datetime.fromtimestamp(end_time / 1_000_000, tz=timezone.utc).isoformat()}")
print(f"Client ID: {self.client_id}")
if limit:
print(f"Limit: {limit}")
# Encode request
request_data = json.dumps(request).encode('utf-8')
# Send: [version byte] [type byte + data]
version_frame = struct.pack('B', PROTOCOL_VERSION)
message_frame = struct.pack('B', MSG_TYPE_SUBMIT_REQUEST) + request_data
self.request_socket.send_multipart([version_frame, message_frame])
# Receive immediate SubmitResponse
if self.request_socket.poll(5000): # 5 second timeout for ack
response_frames = self.request_socket.recv_multipart()
submit_response = self._parse_submit_response(response_frames)
if not submit_response or submit_response.get('status') != 'QUEUED':
print(f"❌ Request submission failed: {submit_response}")
return None
print(f"\n✅ Request queued successfully")
print(f"Notification topic: {submit_response.get('notification_topic')}")
else:
print("❌ Timeout waiting for submit response")
return None
# Step 2: Wait for HistoryReadyNotification
print(f"\n=== Step 2: Waiting for Notification ===")
print(f"⏳ Waiting up to {timeout_secs}s for HistoryReadyNotification...")
print(f" (Ingestor fetches → Kafka → Flink → Iceberg → Notification)")
if self.subscribe_socket.poll(timeout_secs * 1000):
notification_frames = self.subscribe_socket.recv_multipart()
notification = self._parse_history_ready(notification_frames)
if notification:
print(f"\n=== Step 3: Notification Received ===")
return notification
else:
print("❌ Failed to parse notification")
return None
else:
print(f"\n❌ Timeout waiting for notification ({timeout_secs}s)")
print(" Possible reasons:")
print(" - Ingestor still fetching data from exchange")
print(" - Flink still processing Kafka stream")
print(" - Flink writing to Iceberg")
return None
def _parse_submit_response(self, frames):
"""Parse SubmitResponse from relay"""
if len(frames) != 2:
print(f"❌ Invalid submit response: expected 2 frames, got {len(frames)}")
return None
version_frame = frames[0]
message_frame = frames[1]
if len(version_frame) != 1:
return None
version = struct.unpack('B', version_frame)[0]
if version != PROTOCOL_VERSION:
print(f"❌ Unsupported protocol version: {version}")
return None
if len(message_frame) < 1:
return None
msg_type = message_frame[0]
msg_data = message_frame[1:]
if msg_type != MSG_TYPE_SUBMIT_RESPONSE:
print(f"❌ Unexpected message type: 0x{msg_type:02x}")
return None
try:
response = json.loads(msg_data.decode('utf-8'))
return response
except json.JSONDecodeError as e:
print(f"❌ Failed to parse response: {e}")
return None
def _parse_history_ready(self, frames):
"""Parse HistoryReadyNotification from Flink via relay"""
# Topic frame + message frames
if len(frames) < 2:
print(f"❌ Invalid notification: expected at least 2 frames, got {len(frames)}")
return None
topic_frame = frames[0]
# Find version and message frames (may have multiple frames)
# Typically: [topic][version][message]
if len(frames) == 3:
version_frame = frames[1]
message_frame = frames[2]
else:
# Handle multi-part message
version_frame = frames[1]
message_frame = frames[2]
topic = topic_frame.decode('utf-8')
print(f"📬 Received on topic: {topic}")
if len(version_frame) != 1:
print(f"❌ Invalid version frame")
return None
version = struct.unpack('B', version_frame)[0]
if version != PROTOCOL_VERSION:
print(f"❌ Unsupported protocol version: {version}")
return None
if len(message_frame) < 1:
print(f"❌ Empty message frame")
return None
msg_type = message_frame[0]
msg_data = message_frame[1:]
print(f"Message type: 0x{msg_type:02x}")
if msg_type != MSG_TYPE_HISTORY_READY:
print(f"⚠️ Unexpected message type: expected 0x{MSG_TYPE_HISTORY_READY:02x}, got 0x{msg_type:02x}")
try:
notification = json.loads(msg_data.decode('utf-8'))
print(f"\nRequest ID: {notification.get('request_id')}")
print(f"Status: {notification.get('status')}")
print(f"Ticker: {notification.get('ticker')}")
print(f"Period: {notification.get('period_seconds')}s")
if notification.get('error_message'):
print(f"❌ Error: {notification['error_message']}")
if notification.get('status') == 'OK':
print(f"✅ Data ready in Iceberg")
print(f" Namespace: {notification.get('iceberg_namespace', 'N/A')}")
print(f" Table: {notification.get('iceberg_table', 'N/A')}")
print(f" Row count: {notification.get('row_count', 0)}")
completed_at = notification.get('completed_at')
if completed_at:
ts = datetime.fromtimestamp(completed_at / 1_000_000, tz=timezone.utc)
print(f" Completed at: {ts.isoformat()}")
return notification
except json.JSONDecodeError as e:
print(f"❌ Failed to parse notification: {e}")
print(f"Raw data: {msg_data[:200]}...")
return None
def close(self):
"""Close connections"""
if self.request_socket:
self.request_socket.close()
if self.subscribe_socket:
self.subscribe_socket.close()
self.context.term()
print("\n🔌 Connection closed")
def main():
"""Test the async historical data request pattern"""
client = AsyncHistoryClient(relay_host='relay', request_port=5559, data_port=5558)
try:
# Connect
client.connect()
# Request BINANCE:BTC/USDT 1h candles for first 7 days of January 2026
start_time_us = 1735689600 * 1_000_000 # Jan 1, 2026 00:00:00 UTC
end_time_us = 1736294399 * 1_000_000 # Jan 7, 2026 23:59:59 UTC
notification = client.request_historical_ohlc(
ticker='BINANCE:BTC/USDT',
start_time=start_time_us,
end_time=end_time_us,
period_seconds=3600, # 1 hour
limit=168, # 7 days * 24 hours
timeout_secs=60
)
if notification:
status = notification.get('status')
if status == 'OK':
print(f"\n🎉 Success! Data is ready in Iceberg")
print(f"📊 Query Iceberg to retrieve {notification.get('row_count', 0)} records")
print(f"\nNext steps:")
print(f" 1. Connect to Iceberg")
print(f" 2. Query table: {notification.get('iceberg_table')}")
print(f" 3. Filter by time range and ticker")
elif status == 'NOT_FOUND':
print(f"\n⚠️ No data found for the requested period")
elif status == 'ERROR':
print(f"\n❌ Error: {notification.get('error_message')}")
elif status == 'TIMEOUT':
print(f"\n⏱️ Request timed out on server side")
else:
print("\n❌ Request failed or timed out")
except KeyboardInterrupt:
print("\n\n⚠️ Interrupted by user")
except Exception as e:
print(f"\n❌ Error: {e}")
import traceback
traceback.print_exc()
finally:
client.close()
if __name__ == '__main__':
main()