#!/usr/bin/env python3 """ Async ZMQ client for historical OHLC data requests via Relay gateway. Uses async pub/sub pattern: submit request → wait for notification → query Iceberg """ import zmq import struct import json import time import uuid from datetime import datetime, timezone # Protocol constants PROTOCOL_VERSION = 0x01 MSG_TYPE_SUBMIT_REQUEST = 0x10 MSG_TYPE_SUBMIT_RESPONSE = 0x11 MSG_TYPE_HISTORY_READY = 0x12 class AsyncHistoryClient: def __init__(self, relay_host='relay', request_port=5559, data_port=5558): self.context = zmq.Context() self.request_socket = None self.subscribe_socket = None self.relay_endpoint_req = f"tcp://{relay_host}:{request_port}" self.relay_endpoint_sub = f"tcp://{relay_host}:{data_port}" self.client_id = f"client-{uuid.uuid4().hex[:8]}" def connect(self): """Connect to Relay endpoints""" # REQ socket for submitting requests (gets immediate ack) self.request_socket = self.context.socket(zmq.REQ) self.request_socket.connect(self.relay_endpoint_req) print(f"Connected REQ socket to Relay at {self.relay_endpoint_req}") # SUB socket for receiving notifications self.subscribe_socket = self.context.socket(zmq.SUB) self.subscribe_socket.connect(self.relay_endpoint_sub) # CRITICAL: Subscribe to our client-specific response topic BEFORE submitting any requests # This prevents race condition where notification arrives before we subscribe. # The notification topic is deterministic: RESPONSE:{client_id} (we generate client_id) response_topic = f"RESPONSE:{self.client_id}" self.subscribe_socket.subscribe(response_topic.encode()) print(f"Connected SUB socket to Relay at {self.relay_endpoint_sub}") print(f"Subscribed to topic: {response_topic}") print(f"✓ Safe to submit requests - already subscribed to notifications") def request_historical_ohlc(self, ticker, start_time, end_time, period_seconds, limit=None, timeout_secs=60): """ Request historical OHLC data (async pattern). Flow: 1. Submit request → get immediate ack with request_id 2. Wait for HistoryReadyNotification on pub/sub 3. Query Iceberg with the table information (or notification includes data) Args: ticker: Market identifier (e.g., "BINANCE:BTC/USDT") start_time: Start timestamp in microseconds since epoch end_time: End timestamp in microseconds since epoch period_seconds: OHLC period in seconds (e.g., 3600 for 1h) limit: Optional limit on number of candles timeout_secs: How long to wait for notification (default 60s) Returns: Notification dict or None on timeout """ # Generate request ID request_id = f"{self.client_id}-{int(time.time() * 1000)}" # Build SubmitHistoricalRequest request = { 'request_id': request_id, 'ticker': ticker, 'start_time': start_time, 'end_time': end_time, 'period_seconds': period_seconds, 'client_id': self.client_id, # For response routing } if limit: request['limit'] = limit print(f"\n=== Step 1: Submitting Request ===") print(f"Request ID: {request_id}") print(f"Ticker: {ticker}") print(f"Period: {period_seconds}s ({period_seconds // 3600}h)") print(f"Start: {datetime.fromtimestamp(start_time / 1_000_000, tz=timezone.utc).isoformat()}") print(f"End: {datetime.fromtimestamp(end_time / 1_000_000, tz=timezone.utc).isoformat()}") print(f"Client ID: {self.client_id}") if limit: print(f"Limit: {limit}") # Encode request request_data = json.dumps(request).encode('utf-8') # Send: [version byte] [type byte + data] version_frame = struct.pack('B', PROTOCOL_VERSION) message_frame = struct.pack('B', MSG_TYPE_SUBMIT_REQUEST) + request_data self.request_socket.send_multipart([version_frame, message_frame]) # Receive immediate SubmitResponse if self.request_socket.poll(5000): # 5 second timeout for ack response_frames = self.request_socket.recv_multipart() submit_response = self._parse_submit_response(response_frames) if not submit_response or submit_response.get('status') != 'QUEUED': print(f"❌ Request submission failed: {submit_response}") return None print(f"\n✅ Request queued successfully") print(f"Notification topic: {submit_response.get('notification_topic')}") else: print("❌ Timeout waiting for submit response") return None # Step 2: Wait for HistoryReadyNotification print(f"\n=== Step 2: Waiting for Notification ===") print(f"⏳ Waiting up to {timeout_secs}s for HistoryReadyNotification...") print(f" (Ingestor fetches → Kafka → Flink → Iceberg → Notification)") if self.subscribe_socket.poll(timeout_secs * 1000): notification_frames = self.subscribe_socket.recv_multipart() notification = self._parse_history_ready(notification_frames) if notification: print(f"\n=== Step 3: Notification Received ===") return notification else: print("❌ Failed to parse notification") return None else: print(f"\n❌ Timeout waiting for notification ({timeout_secs}s)") print(" Possible reasons:") print(" - Ingestor still fetching data from exchange") print(" - Flink still processing Kafka stream") print(" - Flink writing to Iceberg") return None def _parse_submit_response(self, frames): """Parse SubmitResponse from relay""" if len(frames) != 2: print(f"❌ Invalid submit response: expected 2 frames, got {len(frames)}") return None version_frame = frames[0] message_frame = frames[1] if len(version_frame) != 1: return None version = struct.unpack('B', version_frame)[0] if version != PROTOCOL_VERSION: print(f"❌ Unsupported protocol version: {version}") return None if len(message_frame) < 1: return None msg_type = message_frame[0] msg_data = message_frame[1:] if msg_type != MSG_TYPE_SUBMIT_RESPONSE: print(f"❌ Unexpected message type: 0x{msg_type:02x}") return None try: response = json.loads(msg_data.decode('utf-8')) return response except json.JSONDecodeError as e: print(f"❌ Failed to parse response: {e}") return None def _parse_history_ready(self, frames): """Parse HistoryReadyNotification from Flink via relay""" # Topic frame + message frames if len(frames) < 2: print(f"❌ Invalid notification: expected at least 2 frames, got {len(frames)}") return None topic_frame = frames[0] # Find version and message frames (may have multiple frames) # Typically: [topic][version][message] if len(frames) == 3: version_frame = frames[1] message_frame = frames[2] else: # Handle multi-part message version_frame = frames[1] message_frame = frames[2] topic = topic_frame.decode('utf-8') print(f"📬 Received on topic: {topic}") if len(version_frame) != 1: print(f"❌ Invalid version frame") return None version = struct.unpack('B', version_frame)[0] if version != PROTOCOL_VERSION: print(f"❌ Unsupported protocol version: {version}") return None if len(message_frame) < 1: print(f"❌ Empty message frame") return None msg_type = message_frame[0] msg_data = message_frame[1:] print(f"Message type: 0x{msg_type:02x}") if msg_type != MSG_TYPE_HISTORY_READY: print(f"⚠️ Unexpected message type: expected 0x{MSG_TYPE_HISTORY_READY:02x}, got 0x{msg_type:02x}") try: notification = json.loads(msg_data.decode('utf-8')) print(f"\nRequest ID: {notification.get('request_id')}") print(f"Status: {notification.get('status')}") print(f"Ticker: {notification.get('ticker')}") print(f"Period: {notification.get('period_seconds')}s") if notification.get('error_message'): print(f"❌ Error: {notification['error_message']}") if notification.get('status') == 'OK': print(f"✅ Data ready in Iceberg") print(f" Namespace: {notification.get('iceberg_namespace', 'N/A')}") print(f" Table: {notification.get('iceberg_table', 'N/A')}") print(f" Row count: {notification.get('row_count', 0)}") completed_at = notification.get('completed_at') if completed_at: ts = datetime.fromtimestamp(completed_at / 1_000_000, tz=timezone.utc) print(f" Completed at: {ts.isoformat()}") return notification except json.JSONDecodeError as e: print(f"❌ Failed to parse notification: {e}") print(f"Raw data: {msg_data[:200]}...") return None def close(self): """Close connections""" if self.request_socket: self.request_socket.close() if self.subscribe_socket: self.subscribe_socket.close() self.context.term() print("\n🔌 Connection closed") def main(): """Test the async historical data request pattern""" client = AsyncHistoryClient(relay_host='relay', request_port=5559, data_port=5558) try: # Connect client.connect() # Request BINANCE:BTC/USDT 1h candles for first 7 days of January 2026 start_time_us = 1735689600 * 1_000_000 # Jan 1, 2026 00:00:00 UTC end_time_us = 1736294399 * 1_000_000 # Jan 7, 2026 23:59:59 UTC notification = client.request_historical_ohlc( ticker='BINANCE:BTC/USDT', start_time=start_time_us, end_time=end_time_us, period_seconds=3600, # 1 hour limit=168, # 7 days * 24 hours timeout_secs=60 ) if notification: status = notification.get('status') if status == 'OK': print(f"\n🎉 Success! Data is ready in Iceberg") print(f"📊 Query Iceberg to retrieve {notification.get('row_count', 0)} records") print(f"\nNext steps:") print(f" 1. Connect to Iceberg") print(f" 2. Query table: {notification.get('iceberg_table')}") print(f" 3. Filter by time range and ticker") elif status == 'NOT_FOUND': print(f"\n⚠️ No data found for the requested period") elif status == 'ERROR': print(f"\n❌ Error: {notification.get('error_message')}") elif status == 'TIMEOUT': print(f"\n⏱️ Request timed out on server side") else: print("\n❌ Request failed or timed out") except KeyboardInterrupt: print("\n\n⚠️ Interrupted by user") except Exception as e: print(f"\n❌ Error: {e}") import traceback traceback.print_exc() finally: client.close() if __name__ == '__main__': main()