201 lines
6.8 KiB
Python
201 lines
6.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Simple ZMQ client to query historical OHLC data via the Relay gateway.
|
|
Tests the request-response pattern for historical data retrieval.
|
|
"""
|
|
|
|
import zmq
|
|
import struct
|
|
import json
|
|
import time
|
|
from datetime import datetime, timezone
|
|
|
|
# Protocol constants
|
|
PROTOCOL_VERSION = 0x01
|
|
MSG_TYPE_OHLC_REQUEST = 0x07
|
|
MSG_TYPE_RESPONSE = 0x08
|
|
|
|
class HistoryClient:
|
|
def __init__(self, relay_host='relay', client_request_port=5559):
|
|
self.context = zmq.Context()
|
|
self.socket = None
|
|
self.relay_endpoint = f"tcp://{relay_host}:{client_request_port}"
|
|
|
|
def connect(self):
|
|
"""Connect to Relay's client request endpoint (REQ/REP)"""
|
|
self.socket = self.context.socket(zmq.REQ)
|
|
self.socket.connect(self.relay_endpoint)
|
|
print(f"Connected to Relay at {self.relay_endpoint}")
|
|
|
|
def request_historical_ohlc(self, ticker, start_time, end_time, period_seconds, limit=None):
|
|
"""
|
|
Request historical OHLC data via Relay.
|
|
|
|
Args:
|
|
ticker: Market identifier (e.g., "BINANCE:BTC/USDT")
|
|
start_time: Start timestamp in microseconds since epoch
|
|
end_time: End timestamp in microseconds since epoch
|
|
period_seconds: OHLC period in seconds (e.g., 3600 for 1h)
|
|
limit: Optional limit on number of candles
|
|
|
|
Returns:
|
|
Response dict with status, data, etc.
|
|
"""
|
|
request_id = f"test-{int(time.time() * 1000)}"
|
|
|
|
# Build OHLCRequest message (simplified - would use protobuf in production)
|
|
request = {
|
|
'request_id': request_id,
|
|
'ticker': ticker,
|
|
'start_time': start_time,
|
|
'end_time': end_time,
|
|
'period_seconds': period_seconds
|
|
}
|
|
|
|
if limit:
|
|
request['limit'] = limit
|
|
|
|
print(f"\n=== Sending OHLCRequest ===")
|
|
print(f"Request ID: {request_id}")
|
|
print(f"Ticker: {ticker}")
|
|
print(f"Period: {period_seconds}s ({period_seconds // 3600}h)")
|
|
print(f"Start: {datetime.fromtimestamp(start_time / 1_000_000, tz=timezone.utc).isoformat()}")
|
|
print(f"End: {datetime.fromtimestamp(end_time / 1_000_000, tz=timezone.utc).isoformat()}")
|
|
if limit:
|
|
print(f"Limit: {limit}")
|
|
|
|
# Encode request (placeholder - would use actual protobuf)
|
|
request_data = json.dumps(request).encode('utf-8')
|
|
|
|
# Send message: [version byte] [type byte + data]
|
|
version_frame = struct.pack('B', PROTOCOL_VERSION)
|
|
message_frame = struct.pack('B', MSG_TYPE_OHLC_REQUEST) + request_data
|
|
|
|
self.socket.send_multipart([version_frame, message_frame])
|
|
print("\n⏳ Waiting for response via Relay...")
|
|
|
|
# Receive response with timeout
|
|
if self.socket.poll(30000): # 30 second timeout
|
|
response_frames = self.socket.recv_multipart()
|
|
return self._parse_response(response_frames)
|
|
else:
|
|
print("❌ Request timed out (30s)")
|
|
return None
|
|
|
|
def _parse_response(self, frames):
|
|
"""Parse response frames via Relay"""
|
|
if len(frames) != 2:
|
|
print(f"❌ Invalid response: expected 2 frames, got {len(frames)}")
|
|
return None
|
|
|
|
version_frame = frames[0]
|
|
message_frame = frames[1]
|
|
|
|
if len(version_frame) != 1:
|
|
print(f"❌ Invalid version frame length: {len(version_frame)}")
|
|
return None
|
|
|
|
version = struct.unpack('B', version_frame)[0]
|
|
if version != PROTOCOL_VERSION:
|
|
print(f"❌ Unsupported protocol version: {version}")
|
|
return None
|
|
|
|
if len(message_frame) < 1:
|
|
print(f"❌ Invalid message frame length: {len(message_frame)}")
|
|
return None
|
|
|
|
msg_type = message_frame[0]
|
|
msg_data = message_frame[1:]
|
|
|
|
print(f"\n=== Received Response ===")
|
|
print(f"Protocol version: {version}")
|
|
print(f"Message type: 0x{msg_type:02x}")
|
|
|
|
if msg_type != MSG_TYPE_RESPONSE:
|
|
print(f"❌ Unexpected message type: expected 0x{MSG_TYPE_RESPONSE:02x}, got 0x{msg_type:02x}")
|
|
return None
|
|
|
|
# Parse response (placeholder - would use actual protobuf)
|
|
try:
|
|
response = json.loads(msg_data.decode('utf-8'))
|
|
|
|
print(f"Request ID: {response.get('request_id', 'N/A')}")
|
|
print(f"Status: {response.get('status', 'UNKNOWN')}")
|
|
|
|
if response.get('error_message'):
|
|
print(f"Error: {response['error_message']}")
|
|
|
|
data = response.get('data', [])
|
|
total_records = response.get('total_records', len(data))
|
|
|
|
print(f"Total records: {total_records}")
|
|
print(f"Is final: {response.get('is_final', True)}")
|
|
|
|
if data and len(data) > 0:
|
|
print(f"\n📊 Sample data (first 3 records):")
|
|
for i, record in enumerate(data[:3]):
|
|
print(f" {i+1}. {record}")
|
|
|
|
return response
|
|
|
|
except json.JSONDecodeError as e:
|
|
print(f"❌ Failed to parse response JSON: {e}")
|
|
print(f"Raw data: {msg_data[:100]}...")
|
|
return None
|
|
|
|
def close(self):
|
|
"""Close the connection"""
|
|
if self.socket:
|
|
self.socket.close()
|
|
self.context.term()
|
|
print("\n🔌 Connection closed")
|
|
|
|
|
|
def main():
|
|
"""Test the historical data request"""
|
|
|
|
# Create client
|
|
client = HistoryClient(relay_host='relay', client_request_port=5559)
|
|
|
|
try:
|
|
# Connect to Relay
|
|
client.connect()
|
|
|
|
# Request BINANCE:BTC/USDT 1h candles for first 7 days of January 2026
|
|
# January 1, 2026 00:00:00 UTC = 1735689600 seconds = 1735689600000000 microseconds
|
|
# January 7, 2026 23:59:59 UTC = 1736294399 seconds = 1736294399000000 microseconds
|
|
|
|
start_time_us = 1735689600 * 1_000_000 # Jan 1, 2026 00:00:00 UTC
|
|
end_time_us = 1736294399 * 1_000_000 # Jan 7, 2026 23:59:59 UTC
|
|
|
|
response = client.request_historical_ohlc(
|
|
ticker='BINANCE:BTC/USDT',
|
|
start_time=start_time_us,
|
|
end_time=end_time_us,
|
|
period_seconds=3600, # 1 hour
|
|
limit=168 # 7 days * 24 hours = 168 candles
|
|
)
|
|
|
|
if response:
|
|
print("\n✅ Request completed successfully!")
|
|
status = response.get('status', 'UNKNOWN')
|
|
if status == 'OK':
|
|
print(f"📈 Received {response.get('total_records', 0)} candles")
|
|
else:
|
|
print(f"⚠️ Request status: {status}")
|
|
else:
|
|
print("\n❌ Request failed!")
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\n⚠️ Interrupted by user")
|
|
except Exception as e:
|
|
print(f"\n❌ Error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
finally:
|
|
client.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|