Datafeed limping

This commit is contained in:
7400
2024-02-28 13:52:06 -08:00
parent 9a5d7a1ce5
commit d65af4e3b7
5 changed files with 262 additions and 44 deletions

171
src/charts/streaming.js Normal file
View File

@@ -0,0 +1,171 @@
import { parseFullSymbol } from './helpers.js';
const socket = io('wss://streamer.cryptocompare.com');
const channelToSubscription = new Map();
socket.on('connect', () => {
console.log('[socket] Connected');
});
socket.on('disconnect', (reason) => {
console.log('[socket] Disconnected:', reason);
});
socket.on('error', (error) => {
console.log('[socket] Error:', error);
});
socket.on('m', data => {
console.log('[socket] Message:', data);
const [
eventTypeStr,
exchange,
fromSymbol,
toSymbol,
,
,
tradeTimeStr,
,
tradePriceStr,
] = data.split('~');
if (parseInt(eventTypeStr) !== 0) {
// Skip all non-trading events
return;
}
const tradePrice = parseFloat(tradePriceStr);
const tradeTime = parseInt(tradeTimeStr);
const channelString = `0~${exchange}~${fromSymbol}~${toSymbol}`;
const subscriptionItem = channelToSubscription.get(channelString);
if (subscriptionItem === undefined) {
return;
}
const lastDailyBar = subscriptionItem.lastDailyBar;
const nextDailyBarTime = getNextDailyBarTime(lastDailyBar.time, subscriptionItem.resolution);
console.log("tradeTime ", tradeTime, new Date(tradeTime))
console.log("lastDailyBar.time", lastDailyBar.time, new Date(lastDailyBar.time))
console.log("nextDailyBarTime ", nextDailyBarTime, new Date(nextDailyBarTime))
let bar;
if (tradeTime >= nextDailyBarTime) {
bar = {
time: nextDailyBarTime,
open: tradePrice,
high: tradePrice,
low: tradePrice,
close: tradePrice,
};
console.log('[socket] Generate new bar', bar);
console.log("time:", bar.time.toString(), new Date(bar.time).toUTCString())
} else {
bar = {
...lastDailyBar,
high: Math.max(lastDailyBar.high, tradePrice),
low: Math.min(lastDailyBar.low, tradePrice),
close: tradePrice,
};
console.log('[socket] Update the latest bar by price', tradePrice);
}
subscriptionItem.lastDailyBar = bar;
// Send data to every subscriber of that symbol
subscriptionItem.handlers.forEach(handler => handler.callback(bar));
});
function getNextDailyBarTime(barTime, res) {
const date = new Date(barTime);
const resDigits = res.slice(0, -1)
if (res.endsWith("W")) {
date.setDate(date.getDate() + parseInt(resDigits)*7);
} else if (res.endsWith("D")) {
date.setDate(date.getDate() + parseInt(resDigits));
} else {
date.setMinutes(date.getMinutes() + parseInt(res))
}
return date.getTime();
}
export function subscribeOnStream(
symbolInfo,
resolution,
onRealtimeCallback,
subscriberUID,
onResetCacheNeededCallback,
lastDailyBar,
) {
// return;
const parsedSymbol = parseFullSymbol(symbolInfo.full_name);
const channelString = `0~${parsedSymbol.exchange}~${parsedSymbol.fromSymbol}~${parsedSymbol.toSymbol}`;
const handler = {
id: subscriberUID,
callback: onRealtimeCallback,
};
let subscriptionItem = channelToSubscription.get(channelString);
if (subscriptionItem) {
// Already subscribed to the channel, use the existing subscription
subscriptionItem.handlers.push(handler);
return;
}
subscriptionItem = {
subscriberUID,
resolution,
lastDailyBar,
handlers: [handler],
};
channelToSubscription.set(channelString, subscriptionItem);
console.log('[subscribeBars]: Subscribe to streaming. Channel:', channelString);
socket.emit('SubAdd', { subs: [channelString] });
}
export function unsubscribeFromStream(subscriberUID) {
// return;
// Find a subscription with id === subscriberUID
for (const channelString of channelToSubscription.keys()) {
const subscriptionItem = channelToSubscription.get(channelString);
const handlerIndex = subscriptionItem.handlers
.findIndex(handler => handler.id === subscriberUID);
if (handlerIndex !== -1) {
// Remove from handlers
subscriptionItem.handlers.splice(handlerIndex, 1);
if (subscriptionItem.handlers.length === 0) {
// Unsubscribe from the channel if it was the last handler
console.log('[unsubscribeBars]: Unsubscribe from streaming. Channel:', channelString);
socket.emit('SubRemove', { subs: [channelString] });
channelToSubscription.delete(channelString);
break;
}
}
}
}
function sim() {
// Assuming these variables hold the data you extracted earlier
const eventTypeStr = "0";
const exchange = "Uniswap";
const fromSymbol = "WETH";
const toSymbol = "USD";
const tradeTimeStr = (Date.now()).toString();
const tradePriceStr = (55+Date.now()%23).toString();
// Constructing the original string
const data = [
eventTypeStr,
exchange,
fromSymbol,
toSymbol,
'', // Placeholder for the fifth element
'', // Placeholder for the sixth element
tradeTimeStr,
'', // Placeholder for the eighth element
tradePriceStr,
].join('~');
socket._callbacks['$m'][0](data);
}
// window.sim = sim;
socket._callbacks['$connect'][0]();
setInterval(sim, 10*1000);
;