From b6c0357a1bedcedd79fcd64b068e766a5a669bd7 Mon Sep 17 00:00:00 2001 From: 7400 <7400> Date: Sun, 17 Mar 2024 18:24:19 -0700 Subject: [PATCH] Streaming is limping --- src/charts/chart.js | 2 +- src/charts/datafeed.js | 103 ++++++++++++-- src/charts/jBars.js | 26 ++-- src/charts/streaming.js | 305 ++++++++++++++++++++-------------------- src/socket.js | 2 + 5 files changed, 259 insertions(+), 179 deletions(-) diff --git a/src/charts/chart.js b/src/charts/chart.js index f396078..eba3b2e 100644 --- a/src/charts/chart.js +++ b/src/charts/chart.js @@ -43,7 +43,7 @@ export function initWidget(el) { // debug: true, autosize: true, symbol: 'default', - interval: '15', + interval: '1', container: el, datafeed: DataFeed, // use this for ohlc locale: "en", diff --git a/src/charts/datafeed.js b/src/charts/datafeed.js index 05ac3f1..e6f6961 100644 --- a/src/charts/datafeed.js +++ b/src/charts/datafeed.js @@ -1,4 +1,4 @@ -import {subscribeOnStream, unsubscribeFromStream,} from './streaming.js'; +// import {subscribeOnStream, unsubscribeFromStream,} from './streaming.js'; import {jBars, tvResolutionToPeriodString} from './jBars.js'; import {metadata} from "@/version.js"; @@ -177,6 +177,33 @@ export function lookupBaseQuote(baseAddr, quoteAddr) { return _symbols[key] } +function poolIsInverted(pool) { + let p + for (p of metadata.p) { + if (p.a==pool.substr(0,42)) { + let fullname = `${p.q}${p.b}` + let symbol = lookupSymbol(fullname) + if (symbol in [undefined, null]) { + throw error(`poolIsInverted: pool fullname not found: ${fullname}`) + } + return symbol.inverted + // return p.x.data.inverted ^ symbol.inverted + } + } + throw error(`poolIsInverted: pool not found in metadata.json: ${pool}`) +} + +export function maybeInvertBar (pool, bar) { + if (poolIsInverted(pool)) { + bar.open = 1/bar.open + let high = bar.high + bar.high = 1/bar.low + bar.low = 1/high + bar.close = 1/bar.close + } + return bar +} + export const DataFeed = { onReady: (callback) => { console.log('[onReady]: Method call'); @@ -252,7 +279,7 @@ export const DataFeed = { // todo need to consider the selected fee tier let bars, metadata; const pool = useChartOrderStore().selectedPool; - [bars, metadata] = await jBars(symbolInfo, pool[0], from, to, resolution); // This is the one that does all the work + [bars, metadata] = await jBars(lookupSymbol(symbolInfo.ticker), pool[0], from, to, resolution); // This is the one that does all the work if (firstDataRequest) { lastBarsCache.set(symbolInfo.full_name, { ...bars[bars.length - 1], @@ -267,6 +294,8 @@ export const DataFeed = { } }, + subscribeBarsOnRealtimeCallback: null, + subscribeBars: ( symbolInfo, resolution, @@ -279,16 +308,17 @@ export const DataFeed = { const poolAddr = useChartOrderStore().selectedPool[0]; const period = tvResolutionToPeriodString(resolution); subscriptions[subscriberUID] = [chainId, poolAddr, period] + DataFeed.subscribeBarsOnRealtimeCallback = onRealtimeCallback; subOHLC(chainId, poolAddr, period) - return; // disable - subscribeOnStream( - symbolInfo, - resolution, - onRealtimeCallback, - subscriberUID, - onResetCacheNeededCallback, - lastBarsCache.get(symbolInfo.full_name), - ); + // return; // disable + // subscribeOnStream( + // symbolInfo, + // resolution, + // onRealtimeCallback, + // subscriberUID, + // onResetCacheNeededCallback, + // lastBarsCache.get(symbolInfo.full_name), + // ); }, unsubscribeBars: (subscriberUID) => { @@ -296,9 +326,56 @@ export const DataFeed = { const [chainId, poolAddr, period] = subscriptions[subscriberUID] delete subscriptions[subscriberUID] unsubOHLC(chainId, poolAddr, period) - return; // disable - unsubscribeFromStream(subscriberUID); + // return; // disable + // unsubscribeFromStream(subscriberUID); }, + + poolCallbackState : {lastBar: null}, + + poolCallback(chainId, pool, ohlcs) { + let ohlc = ohlcs.at(-1); + // for (const ohlc of ohlcs) { + let date = new Date(ohlc[0]*1000) + let close = ohlc[4] // close + let bar = { + time: date.getTime(), + open: ohlc[1] ?? close, // open + high: ohlc[2] ?? close, // high + low: ohlc[3] ?? close, // low + close: close, + } + // if (poolIsInverted(pool)) { + // bar.open = 1/bar.open + // let high = bar.high + // bar.high = 1/bar.low + // bar.low = 1/high + // bar.close = 1/bar.close + // } + bar = maybeInvertBar(pool, bar) + if (bar.highbar.open||bar.low>bar.high||bar.low>bar.close) { + // throw error("poolCallback: bar.high/low inconsistent.") + console.log("poolCallback: bar.high/low inconsistent.") + } + console.log('DataFeed.poolCallback', date.toGMTString(), ohlcs, bar) + // No last bar then initialize bar + if ( DataFeed.poolCallbackState.lastBar===null) { + DataFeed.poolCallbackState.lastBar = bar + } + // bar is less than last bar then ignore + else if (bar.time { - console.log('[socket] Connected'); -}); +// socket.on('connect', () => { +// console.log('[socket] Connected'); +// }); -socket.on('disconnect', (reason) => { - console.log('[socket] Disconnected:', reason); -}); +// socket.on('disconnect', (reason) => { +// console.log('[socket] Disconnected:', reason); +// }); -socket.on('error', (error) => { - console.log('[socket] Error:', error); -}); +// socket.on('error', (error) => { +// console.log('[socket] Error:', error); +// }); -socket.on('m', data => { - console.log('[socket] Message:', data); - const [ - eventTypeStr, - exchange, - fromSymbol, - toSymbol, - , - , - tradeTimeStr, - , - tradePriceStr, - ] = data.split('~'); +// socket.on('m', data => { +// console.log('[socket] Message:', data); +// const [ +// eventTypeStr, +// exchange, +// fromSymbol, +// toSymbol, +// , +// , +// tradeTimeStr, +// , +// tradePriceStr, +// ] = data.split('~'); - if (parseInt(eventTypeStr) !== 0) { - // Skip all non-trading events - return; - } - const tradePrice = parseFloat(tradePriceStr); - const tradeTime = parseInt(tradeTimeStr); - const channelString = `0~${exchange}~${fromSymbol}~${toSymbol}`; - const subscriptionItem = channelToSubscription.get(channelString); - if (subscriptionItem === undefined) { - return; - } - const lastDailyBar = subscriptionItem.lastDailyBar; - const nextDailyBarTime = getNextDailyBarTime(lastDailyBar.time, subscriptionItem.resolution); +// if (parseInt(eventTypeStr) !== 0) { +// // Skip all non-trading events +// return; +// } +// const tradePrice = parseFloat(tradePriceStr); +// const tradeTime = parseInt(tradeTimeStr); +// const channelString = `0~${exchange}~${fromSymbol}~${toSymbol}`; +// const subscriptionItem = channelToSubscription.get(channelString); +// if (subscriptionItem === undefined) { +// return; +// } +// const lastDailyBar = subscriptionItem.lastDailyBar; +// const nextDailyBarTime = getNextDailyBarTime(lastDailyBar.time, subscriptionItem.resolution); - console.log("tradeTime ", tradeTime, new Date(tradeTime)) - console.log("lastDailyBar.time", lastDailyBar.time, new Date(lastDailyBar.time)) - console.log("nextDailyBarTime ", nextDailyBarTime, new Date(nextDailyBarTime)) +// console.log("tradeTime ", tradeTime, new Date(tradeTime)) +// console.log("lastDailyBar.time", lastDailyBar.time, new Date(lastDailyBar.time)) +// console.log("nextDailyBarTime ", nextDailyBarTime, new Date(nextDailyBarTime)) - let bar; - if (tradeTime >= nextDailyBarTime) { - bar = { - time: nextDailyBarTime, - open: tradePrice, - high: tradePrice, - low: tradePrice, - close: tradePrice, - }; - console.log('[socket] Generate new bar', bar); - console.log("time:", bar.time.toString(), new Date(bar.time).toUTCString()) - } else { - bar = { - ...lastDailyBar, - high: Math.max(lastDailyBar.high, tradePrice), - low: Math.min(lastDailyBar.low, tradePrice), - close: tradePrice, - }; - console.log('[socket] Update the latest bar by price', tradePrice); - } - subscriptionItem.lastDailyBar = bar; +// let bar; +// if (tradeTime >= nextDailyBarTime) { +// bar = { +// time: nextDailyBarTime, +// open: tradePrice, +// high: tradePrice, +// low: tradePrice, +// close: tradePrice, +// }; +// console.log('[socket] Generate new bar', bar); +// console.log("time:", bar.time.toString(), new Date(bar.time).toUTCString()) +// } else { +// bar = { +// ...lastDailyBar, +// high: Math.max(lastDailyBar.high, tradePrice), +// low: Math.min(lastDailyBar.low, tradePrice), +// close: tradePrice, +// }; +// console.log('[socket] Update the latest bar by price', tradePrice); +// } +// subscriptionItem.lastDailyBar = bar; - // Send data to every subscriber of that symbol - subscriptionItem.handlers.forEach(handler => handler.callback(bar)); -}); +// // Send data to every subscriber of that symbol +// subscriptionItem.handlers.forEach(handler => handler.callback(bar)); +// }); -function getNextDailyBarTime(barTime, res) { - const date = new Date(barTime); - const resDigits = res.slice(0, -1) - if (res.endsWith("W")) { - date.setDate(date.getDate() + parseInt(resDigits)*7); - } else if (res.endsWith("D")) { - date.setDate(date.getDate() + parseInt(resDigits)); - } else { - date.setMinutes(date.getMinutes() + parseInt(res)) - } - return date.getTime(); -} +// function getNextDailyBarTime(barTime, res) { +// const date = new Date(barTime); +// const resDigits = res.slice(0, -1) +// if (res.endsWith("W")) { +// date.setDate(date.getDate() + parseInt(resDigits)*7); +// } else if (res.endsWith("D")) { +// date.setDate(date.getDate() + parseInt(resDigits)); +// } else { +// date.setMinutes(date.getMinutes() + parseInt(res)) +// } +// return date.getTime(); +// } -export function subscribeOnStream( - symbolInfo, - resolution, - onRealtimeCallback, - subscriberUID, - onResetCacheNeededCallback, - lastDailyBar, -) { - // return; - const parsedSymbol = parseFullSymbol(symbolInfo.full_name); - const channelString = `0~${parsedSymbol.exchange}~${parsedSymbol.fromSymbol}~${parsedSymbol.toSymbol}`; - const handler = { - id: subscriberUID, - callback: onRealtimeCallback, - }; - let subscriptionItem = channelToSubscription.get(channelString); - if (subscriptionItem) { - // Already subscribed to the channel, use the existing subscription - subscriptionItem.handlers.push(handler); - return; - } - subscriptionItem = { - subscriberUID, - resolution, - lastDailyBar, - handlers: [handler], - }; - channelToSubscription.set(channelString, subscriptionItem); - console.log('[subscribeBars]: Subscribe to streaming. Channel:', channelString); - socket.emit('SubAdd', { subs: [channelString] }); -} +// export function subscribeOnStream( +// symbolInfo, +// resolution, +// onRealtimeCallback, +// subscriberUID, +// onResetCacheNeededCallback, +// lastDailyBar, +// ) { +// // return; +// const parsedSymbol = parseFullSymbol(symbolInfo.full_name); +// const channelString = `0~${parsedSymbol.exchange}~${parsedSymbol.fromSymbol}~${parsedSymbol.toSymbol}`; +// const handler = { +// id: subscriberUID, +// callback: onRealtimeCallback, +// }; +// let subscriptionItem = channelToSubscription.get(channelString); +// if (subscriptionItem) { +// // Already subscribed to the channel, use the existing subscription +// subscriptionItem.handlers.push(handler); +// return; +// } +// subscriptionItem = { +// subscriberUID, +// resolution, +// lastDailyBar, +// handlers: [handler], +// }; +// channelToSubscription.set(channelString, subscriptionItem); +// console.log('[subscribeBars]: Subscribe to streaming. Channel:', channelString); +// socket.emit('SubAdd', { subs: [channelString] }); +// } -export function unsubscribeFromStream(subscriberUID) { - // return; - // Find a subscription with id === subscriberUID - for (const channelString of channelToSubscription.keys()) { - const subscriptionItem = channelToSubscription.get(channelString); - const handlerIndex = subscriptionItem.handlers - .findIndex(handler => handler.id === subscriberUID); +// export function unsubscribeFromStream(subscriberUID) { +// // return; +// // Find a subscription with id === subscriberUID +// for (const channelString of channelToSubscription.keys()) { +// const subscriptionItem = channelToSubscription.get(channelString); +// const handlerIndex = subscriptionItem.handlers +// .findIndex(handler => handler.id === subscriberUID); - if (handlerIndex !== -1) { - // Remove from handlers - subscriptionItem.handlers.splice(handlerIndex, 1); +// if (handlerIndex !== -1) { +// // Remove from handlers +// subscriptionItem.handlers.splice(handlerIndex, 1); - if (subscriptionItem.handlers.length === 0) { - // Unsubscribe from the channel if it was the last handler - console.log('[unsubscribeBars]: Unsubscribe from streaming. Channel:', channelString); - socket.emit('SubRemove', { subs: [channelString] }); - channelToSubscription.delete(channelString); - break; - } - } - } -} +// if (subscriptionItem.handlers.length === 0) { +// // Unsubscribe from the channel if it was the last handler +// console.log('[unsubscribeBars]: Unsubscribe from streaming. Channel:', channelString); +// socket.emit('SubRemove', { subs: [channelString] }); +// channelToSubscription.delete(channelString); +// break; +// } +// } +// } +// } -function sim() { - // Assuming these variables hold the data you extracted earlier - const eventTypeStr = "0"; - const exchange = "Uniswap"; - const fromSymbol = "WETH"; - const toSymbol = "USD"; - const tradeTimeStr = (Date.now()).toString(); - const tradePriceStr = (55+Date.now()%23).toString(); +// function sim() { +// // Assuming these variables hold the data you extracted earlier +// const eventTypeStr = "0"; +// const exchange = "Uniswap"; +// const fromSymbol = "WETH"; +// const toSymbol = "USD"; +// const tradeTimeStr = (Date.now()).toString(); +// const tradePriceStr = (55+Date.now()%23).toString(); - // Constructing the original string - const data = [ - eventTypeStr, - exchange, - fromSymbol, - toSymbol, - '', // Placeholder for the fifth element - '', // Placeholder for the sixth element - tradeTimeStr, - '', // Placeholder for the eighth element - tradePriceStr, - ].join('~'); - socket._callbacks['$m'][0](data); -} +// // Constructing the original string +// const data = [ +// eventTypeStr, +// exchange, +// fromSymbol, +// toSymbol, +// '', // Placeholder for the fifth element +// '', // Placeholder for the sixth element +// tradeTimeStr, +// '', // Placeholder for the eighth element +// tradePriceStr, +// ].join('~'); +// socket._callbacks['$m'][0](data); +// } // window.sim = sim; -socket._callbacks['$connect'][0](); -setInterval(sim, 10*1000); +// socket._callbacks['$connect'][0](); +// setInterval(sim, 10*1000); ; diff --git a/src/socket.js b/src/socket.js index 4952e29..d333a6b 100644 --- a/src/socket.js +++ b/src/socket.js @@ -2,6 +2,7 @@ import {io} from "socket.io-client"; import {useStore} from "@/store/store.js"; import {flushOrders} from "@/blockchain/wallet.js"; import {parseOrderStatus} from "@/blockchain/orderlib.js"; +import { DataFeed } from "./charts/datafeed"; export const socket = io(import.meta.env.VITE_WS_URL || undefined, {transports: ["websocket"]}) @@ -23,6 +24,7 @@ socket.on('p', async (chainId, pool, price) => { socket.on('ohlc', async (chainId, pool, ohlcs) => { console.log('pool bars', pool, ohlcs) + DataFeed.poolCallback(chainId, pool, ohlcs) }) socket.on('vb', async (chainId, vault, balances) => {