import { parseFullSymbol } from './helpers.js'; const socket = io('wss://streamer.cryptocompare.com'); const channelToSubscription = new Map(); socket.on('connect', () => { console.log('[socket] Connected'); }); socket.on('disconnect', (reason) => { console.log('[socket] Disconnected:', reason); }); socket.on('error', (error) => { console.log('[socket] Error:', error); }); socket.on('m', data => { console.log('[socket] Message:', data); const [ eventTypeStr, exchange, fromSymbol, toSymbol, , , tradeTimeStr, , tradePriceStr, ] = data.split('~'); if (parseInt(eventTypeStr) !== 0) { // Skip all non-trading events return; } const tradePrice = parseFloat(tradePriceStr); const tradeTime = parseInt(tradeTimeStr); const channelString = `0~${exchange}~${fromSymbol}~${toSymbol}`; const subscriptionItem = channelToSubscription.get(channelString); if (subscriptionItem === undefined) { return; } const lastDailyBar = subscriptionItem.lastDailyBar; const nextDailyBarTime = getNextDailyBarTime(lastDailyBar.time, subscriptionItem.resolution); console.log("tradeTime ", tradeTime, new Date(tradeTime)) console.log("lastDailyBar.time", lastDailyBar.time, new Date(lastDailyBar.time)) console.log("nextDailyBarTime ", nextDailyBarTime, new Date(nextDailyBarTime)) let bar; if (tradeTime >= nextDailyBarTime) { bar = { time: nextDailyBarTime, open: tradePrice, high: tradePrice, low: tradePrice, close: tradePrice, }; console.log('[socket] Generate new bar', bar); console.log("time:", bar.time.toString(), new Date(bar.time).toUTCString()) } else { bar = { ...lastDailyBar, high: Math.max(lastDailyBar.high, tradePrice), low: Math.min(lastDailyBar.low, tradePrice), close: tradePrice, }; console.log('[socket] Update the latest bar by price', tradePrice); } subscriptionItem.lastDailyBar = bar; // Send data to every subscriber of that symbol subscriptionItem.handlers.forEach(handler => handler.callback(bar)); }); function getNextDailyBarTime(barTime, res) { const date = new Date(barTime); const resDigits = res.slice(0, -1) if (res.endsWith("W")) { date.setDate(date.getDate() + parseInt(resDigits)*7); } else if (res.endsWith("D")) { date.setDate(date.getDate() + parseInt(resDigits)); } else { date.setMinutes(date.getMinutes() + parseInt(res)) } return date.getTime(); } export function subscribeOnStream( symbolInfo, resolution, onRealtimeCallback, subscriberUID, onResetCacheNeededCallback, lastDailyBar, ) { // return; const parsedSymbol = parseFullSymbol(symbolInfo.full_name); const channelString = `0~${parsedSymbol.exchange}~${parsedSymbol.fromSymbol}~${parsedSymbol.toSymbol}`; const handler = { id: subscriberUID, callback: onRealtimeCallback, }; let subscriptionItem = channelToSubscription.get(channelString); if (subscriptionItem) { // Already subscribed to the channel, use the existing subscription subscriptionItem.handlers.push(handler); return; } subscriptionItem = { subscriberUID, resolution, lastDailyBar, handlers: [handler], }; channelToSubscription.set(channelString, subscriptionItem); console.log('[subscribeBars]: Subscribe to streaming. Channel:', channelString); socket.emit('SubAdd', { subs: [channelString] }); } export function unsubscribeFromStream(subscriberUID) { // return; // Find a subscription with id === subscriberUID for (const channelString of channelToSubscription.keys()) { const subscriptionItem = channelToSubscription.get(channelString); const handlerIndex = subscriptionItem.handlers .findIndex(handler => handler.id === subscriberUID); if (handlerIndex !== -1) { // Remove from handlers subscriptionItem.handlers.splice(handlerIndex, 1); if (subscriptionItem.handlers.length === 0) { // Unsubscribe from the channel if it was the last handler console.log('[unsubscribeBars]: Unsubscribe from streaming. Channel:', channelString); socket.emit('SubRemove', { subs: [channelString] }); channelToSubscription.delete(channelString); break; } } } } function sim() { // Assuming these variables hold the data you extracted earlier const eventTypeStr = "0"; const exchange = "Uniswap"; const fromSymbol = "WETH"; const toSymbol = "USD"; const tradeTimeStr = (Date.now()).toString(); const tradePriceStr = (55+Date.now()%23).toString(); // Constructing the original string const data = [ eventTypeStr, exchange, fromSymbol, toSymbol, '', // Placeholder for the fifth element '', // Placeholder for the sixth element tradeTimeStr, '', // Placeholder for the eighth element tradePriceStr, ].join('~'); socket._callbacks['$m'][0](data); } // window.sim = sim; socket._callbacks['$connect'][0](); setInterval(sim, 10*1000); ;