Streaming is limping

This commit is contained in:
7400
2024-03-17 18:24:19 -07:00
parent fb64b9a754
commit b6c0357a1b
5 changed files with 259 additions and 179 deletions

View File

@@ -43,7 +43,7 @@ export function initWidget(el) {
// debug: true, // debug: true,
autosize: true, autosize: true,
symbol: 'default', symbol: 'default',
interval: '15', interval: '1',
container: el, container: el,
datafeed: DataFeed, // use this for ohlc datafeed: DataFeed, // use this for ohlc
locale: "en", locale: "en",

View File

@@ -1,4 +1,4 @@
import {subscribeOnStream, unsubscribeFromStream,} from './streaming.js'; // import {subscribeOnStream, unsubscribeFromStream,} from './streaming.js';
import {jBars, tvResolutionToPeriodString} from './jBars.js'; import {jBars, tvResolutionToPeriodString} from './jBars.js';
import {metadata} from "@/version.js"; import {metadata} from "@/version.js";
@@ -177,6 +177,33 @@ export function lookupBaseQuote(baseAddr, quoteAddr) {
return _symbols[key] return _symbols[key]
} }
function poolIsInverted(pool) {
let p
for (p of metadata.p) {
if (p.a==pool.substr(0,42)) {
let fullname = `${p.q}${p.b}`
let symbol = lookupSymbol(fullname)
if (symbol in [undefined, null]) {
throw error(`poolIsInverted: pool fullname not found: ${fullname}`)
}
return symbol.inverted
// return p.x.data.inverted ^ symbol.inverted
}
}
throw error(`poolIsInverted: pool not found in metadata.json: ${pool}`)
}
export function maybeInvertBar (pool, bar) {
if (poolIsInverted(pool)) {
bar.open = 1/bar.open
let high = bar.high
bar.high = 1/bar.low
bar.low = 1/high
bar.close = 1/bar.close
}
return bar
}
export const DataFeed = { export const DataFeed = {
onReady: (callback) => { onReady: (callback) => {
console.log('[onReady]: Method call'); console.log('[onReady]: Method call');
@@ -252,7 +279,7 @@ export const DataFeed = {
// todo need to consider the selected fee tier // todo need to consider the selected fee tier
let bars, metadata; let bars, metadata;
const pool = useChartOrderStore().selectedPool; const pool = useChartOrderStore().selectedPool;
[bars, metadata] = await jBars(symbolInfo, pool[0], from, to, resolution); // This is the one that does all the work [bars, metadata] = await jBars(lookupSymbol(symbolInfo.ticker), pool[0], from, to, resolution); // This is the one that does all the work
if (firstDataRequest) { if (firstDataRequest) {
lastBarsCache.set(symbolInfo.full_name, { lastBarsCache.set(symbolInfo.full_name, {
...bars[bars.length - 1], ...bars[bars.length - 1],
@@ -267,6 +294,8 @@ export const DataFeed = {
} }
}, },
subscribeBarsOnRealtimeCallback: null,
subscribeBars: ( subscribeBars: (
symbolInfo, symbolInfo,
resolution, resolution,
@@ -279,16 +308,17 @@ export const DataFeed = {
const poolAddr = useChartOrderStore().selectedPool[0]; const poolAddr = useChartOrderStore().selectedPool[0];
const period = tvResolutionToPeriodString(resolution); const period = tvResolutionToPeriodString(resolution);
subscriptions[subscriberUID] = [chainId, poolAddr, period] subscriptions[subscriberUID] = [chainId, poolAddr, period]
DataFeed.subscribeBarsOnRealtimeCallback = onRealtimeCallback;
subOHLC(chainId, poolAddr, period) subOHLC(chainId, poolAddr, period)
return; // disable // return; // disable
subscribeOnStream( // subscribeOnStream(
symbolInfo, // symbolInfo,
resolution, // resolution,
onRealtimeCallback, // onRealtimeCallback,
subscriberUID, // subscriberUID,
onResetCacheNeededCallback, // onResetCacheNeededCallback,
lastBarsCache.get(symbolInfo.full_name), // lastBarsCache.get(symbolInfo.full_name),
); // );
}, },
unsubscribeBars: (subscriberUID) => { unsubscribeBars: (subscriberUID) => {
@@ -296,9 +326,56 @@ export const DataFeed = {
const [chainId, poolAddr, period] = subscriptions[subscriberUID] const [chainId, poolAddr, period] = subscriptions[subscriberUID]
delete subscriptions[subscriberUID] delete subscriptions[subscriberUID]
unsubOHLC(chainId, poolAddr, period) unsubOHLC(chainId, poolAddr, period)
return; // disable // return; // disable
unsubscribeFromStream(subscriberUID); // unsubscribeFromStream(subscriberUID);
}, },
poolCallbackState : {lastBar: null},
poolCallback(chainId, pool, ohlcs) {
let ohlc = ohlcs.at(-1);
// for (const ohlc of ohlcs) {
let date = new Date(ohlc[0]*1000)
let close = ohlc[4] // close
let bar = {
time: date.getTime(),
open: ohlc[1] ?? close, // open
high: ohlc[2] ?? close, // high
low: ohlc[3] ?? close, // low
close: close,
}
// if (poolIsInverted(pool)) {
// bar.open = 1/bar.open
// let high = bar.high
// bar.high = 1/bar.low
// bar.low = 1/high
// bar.close = 1/bar.close
// }
bar = maybeInvertBar(pool, bar)
if (bar.high<bar.open||bar.high<bar.low||bar.high<bar.close ||
bar.low>bar.open||bar.low>bar.high||bar.low>bar.close) {
// throw error("poolCallback: bar.high/low inconsistent.")
console.log("poolCallback: bar.high/low inconsistent.")
}
console.log('DataFeed.poolCallback', date.toGMTString(), ohlcs, bar)
// No last bar then initialize bar
if ( DataFeed.poolCallbackState.lastBar===null) {
DataFeed.poolCallbackState.lastBar = bar
}
// bar is less than last bar then ignore
else if (bar.time<DataFeed.poolCallbackState.lastBar.time ) {
}
// bar equal to last bar then replace last bar
else if (bar.time==DataFeed.poolCallbackState.lastBar.time ) {
DataFeed.poolCallbackState.lastBar = bar
}
// new bar, then render and replace last bar
else {
DataFeed.subscribeBarsOnRealtimeCallback(bar)
DataFeed.poolCallbackState.lastBar = bar
}
// }
}
}; };
let defaultSymbol = null let defaultSymbol = null

View File

@@ -1,5 +1,5 @@
import {useStore} from "@/store/store.js"; import {useStore} from "@/store/store.js";
import {metadataMap} from "@/version.js"; // import {metadataMap} from "@/version.js";
const file_res = ['1m', '3m', '5m', '10m', '15m', '30m', '1H', '2H', '4H', '8H', '12H', '1D', '2D', '3D', '1W',]; const file_res = ['1m', '3m', '5m', '10m', '15m', '30m', '1H', '2H', '4H', '8H', '12H', '1D', '2D', '3D', '1W',];
const supported_res = ['1', '3', '5', '10', '15', '30', '60', '120', '240', '480', '720', '1D', '2D', '3D', '1W',]; const supported_res = ['1', '3', '5', '10', '15', '30', '60', '120', '240', '480', '720', '1D', '2D', '3D', '1W',];
@@ -13,7 +13,7 @@ export function tvResolutionToPeriodString(res) {
return resMap[res] return resMap[res]
} }
export async function jBars (symbolInfo, contract, from, to, res) { export async function jBars (symbol, contract, from, to, res) {
console.log('[jBars]: Method call', res, from, to); console.log('[jBars]: Method call', res, from, to);
const toDate = new Date(to*1000); const toDate = new Date(to*1000);
@@ -85,18 +85,18 @@ export async function jBars (symbolInfo, contract, from, to, res) {
let baseURL = "https://alpha.dexorder.trade/ohlc/" let baseURL = "https://alpha.dexorder.trade/ohlc/"
let chainId = useStore().chainId let chainId = useStore().chainId
const meta = metadataMap[contract] // const meta = metadataMap[contract]
if (meta===undefined) { // if (meta===undefined) {
console.log('warning: no symbol', contract) // console.log('warning: no symbol', contract)
return [bars, {noData:true}] // return [bars, {noData:true}]
} // }
// console.log('metadata', contract, metadataMap, meta) // console.log('metadata', contract, metadataMap, meta)
let inverted = symbolInfo.inverted let inverted = symbol.inverted
if (meta.x?.data) { if (symbol.x?.data) {
baseURL = meta.x.data.uri baseURL = symbol.x.data.uri
chainId = meta.x.data.chain chainId = symbol.x.data.chain
contract = meta.x.data.symbol contract = symbol.x.data.symbol
inverted ^= meta.x.data.inverted inverted ^= symbol.x.data.inverted
} }
let url = `${baseURL}${chainId}/${contract}/${fres}${yrdir}/${contract}-${fres}${yrmo}${date}.json`; let url = `${baseURL}${chainId}/${contract}/${fres}${yrdir}/${contract}-${fres}${yrmo}${date}.json`;

View File

@@ -1,171 +1,172 @@
import { parseFullSymbol } from './helpers.js'; // import { parseFullSymbol } from './helpers.js';
// import { subPrices } from '@/blockchain/prices.js';
const socket = io('wss://streamer.cryptocompare.com'); // const socket = io('wss://streamer.cryptocompare.com');
const channelToSubscription = new Map(); // const channelToSubscription = new Map();
socket.on('connect', () => { // socket.on('connect', () => {
console.log('[socket] Connected'); // console.log('[socket] Connected');
}); // });
socket.on('disconnect', (reason) => { // socket.on('disconnect', (reason) => {
console.log('[socket] Disconnected:', reason); // console.log('[socket] Disconnected:', reason);
}); // });
socket.on('error', (error) => { // socket.on('error', (error) => {
console.log('[socket] Error:', error); // console.log('[socket] Error:', error);
}); // });
socket.on('m', data => { // socket.on('m', data => {
console.log('[socket] Message:', data); // console.log('[socket] Message:', data);
const [ // const [
eventTypeStr, // eventTypeStr,
exchange, // exchange,
fromSymbol, // fromSymbol,
toSymbol, // toSymbol,
, // ,
, // ,
tradeTimeStr, // tradeTimeStr,
, // ,
tradePriceStr, // tradePriceStr,
] = data.split('~'); // ] = data.split('~');
if (parseInt(eventTypeStr) !== 0) { // if (parseInt(eventTypeStr) !== 0) {
// Skip all non-trading events // // Skip all non-trading events
return;
}
const tradePrice = parseFloat(tradePriceStr);
const tradeTime = parseInt(tradeTimeStr);
const channelString = `0~${exchange}~${fromSymbol}~${toSymbol}`;
const subscriptionItem = channelToSubscription.get(channelString);
if (subscriptionItem === undefined) {
return;
}
const lastDailyBar = subscriptionItem.lastDailyBar;
const nextDailyBarTime = getNextDailyBarTime(lastDailyBar.time, subscriptionItem.resolution);
console.log("tradeTime ", tradeTime, new Date(tradeTime))
console.log("lastDailyBar.time", lastDailyBar.time, new Date(lastDailyBar.time))
console.log("nextDailyBarTime ", nextDailyBarTime, new Date(nextDailyBarTime))
let bar;
if (tradeTime >= nextDailyBarTime) {
bar = {
time: nextDailyBarTime,
open: tradePrice,
high: tradePrice,
low: tradePrice,
close: tradePrice,
};
console.log('[socket] Generate new bar', bar);
console.log("time:", bar.time.toString(), new Date(bar.time).toUTCString())
} else {
bar = {
...lastDailyBar,
high: Math.max(lastDailyBar.high, tradePrice),
low: Math.min(lastDailyBar.low, tradePrice),
close: tradePrice,
};
console.log('[socket] Update the latest bar by price', tradePrice);
}
subscriptionItem.lastDailyBar = bar;
// Send data to every subscriber of that symbol
subscriptionItem.handlers.forEach(handler => handler.callback(bar));
});
function getNextDailyBarTime(barTime, res) {
const date = new Date(barTime);
const resDigits = res.slice(0, -1)
if (res.endsWith("W")) {
date.setDate(date.getDate() + parseInt(resDigits)*7);
} else if (res.endsWith("D")) {
date.setDate(date.getDate() + parseInt(resDigits));
} else {
date.setMinutes(date.getMinutes() + parseInt(res))
}
return date.getTime();
}
export function subscribeOnStream(
symbolInfo,
resolution,
onRealtimeCallback,
subscriberUID,
onResetCacheNeededCallback,
lastDailyBar,
) {
// return; // return;
const parsedSymbol = parseFullSymbol(symbolInfo.full_name); // }
const channelString = `0~${parsedSymbol.exchange}~${parsedSymbol.fromSymbol}~${parsedSymbol.toSymbol}`; // const tradePrice = parseFloat(tradePriceStr);
const handler = { // const tradeTime = parseInt(tradeTimeStr);
id: subscriberUID, // const channelString = `0~${exchange}~${fromSymbol}~${toSymbol}`;
callback: onRealtimeCallback, // const subscriptionItem = channelToSubscription.get(channelString);
}; // if (subscriptionItem === undefined) {
let subscriptionItem = channelToSubscription.get(channelString);
if (subscriptionItem) {
// Already subscribed to the channel, use the existing subscription
subscriptionItem.handlers.push(handler);
return;
}
subscriptionItem = {
subscriberUID,
resolution,
lastDailyBar,
handlers: [handler],
};
channelToSubscription.set(channelString, subscriptionItem);
console.log('[subscribeBars]: Subscribe to streaming. Channel:', channelString);
socket.emit('SubAdd', { subs: [channelString] });
}
export function unsubscribeFromStream(subscriberUID) {
// return; // return;
// Find a subscription with id === subscriberUID // }
for (const channelString of channelToSubscription.keys()) { // const lastDailyBar = subscriptionItem.lastDailyBar;
const subscriptionItem = channelToSubscription.get(channelString); // const nextDailyBarTime = getNextDailyBarTime(lastDailyBar.time, subscriptionItem.resolution);
const handlerIndex = subscriptionItem.handlers
.findIndex(handler => handler.id === subscriberUID);
if (handlerIndex !== -1) { // console.log("tradeTime ", tradeTime, new Date(tradeTime))
// Remove from handlers // console.log("lastDailyBar.time", lastDailyBar.time, new Date(lastDailyBar.time))
subscriptionItem.handlers.splice(handlerIndex, 1); // console.log("nextDailyBarTime ", nextDailyBarTime, new Date(nextDailyBarTime))
if (subscriptionItem.handlers.length === 0) { // let bar;
// Unsubscribe from the channel if it was the last handler // if (tradeTime >= nextDailyBarTime) {
console.log('[unsubscribeBars]: Unsubscribe from streaming. Channel:', channelString); // bar = {
socket.emit('SubRemove', { subs: [channelString] }); // time: nextDailyBarTime,
channelToSubscription.delete(channelString); // open: tradePrice,
break; // high: tradePrice,
} // low: tradePrice,
} // close: tradePrice,
} // };
} // console.log('[socket] Generate new bar', bar);
// console.log("time:", bar.time.toString(), new Date(bar.time).toUTCString())
// } else {
// bar = {
// ...lastDailyBar,
// high: Math.max(lastDailyBar.high, tradePrice),
// low: Math.min(lastDailyBar.low, tradePrice),
// close: tradePrice,
// };
// console.log('[socket] Update the latest bar by price', tradePrice);
// }
// subscriptionItem.lastDailyBar = bar;
function sim() { // // Send data to every subscriber of that symbol
// Assuming these variables hold the data you extracted earlier // subscriptionItem.handlers.forEach(handler => handler.callback(bar));
const eventTypeStr = "0"; // });
const exchange = "Uniswap";
const fromSymbol = "WETH";
const toSymbol = "USD";
const tradeTimeStr = (Date.now()).toString();
const tradePriceStr = (55+Date.now()%23).toString();
// Constructing the original string // function getNextDailyBarTime(barTime, res) {
const data = [ // const date = new Date(barTime);
eventTypeStr, // const resDigits = res.slice(0, -1)
exchange, // if (res.endsWith("W")) {
fromSymbol, // date.setDate(date.getDate() + parseInt(resDigits)*7);
toSymbol, // } else if (res.endsWith("D")) {
'', // Placeholder for the fifth element // date.setDate(date.getDate() + parseInt(resDigits));
'', // Placeholder for the sixth element // } else {
tradeTimeStr, // date.setMinutes(date.getMinutes() + parseInt(res))
'', // Placeholder for the eighth element // }
tradePriceStr, // return date.getTime();
].join('~'); // }
socket._callbacks['$m'][0](data);
} // export function subscribeOnStream(
// symbolInfo,
// resolution,
// onRealtimeCallback,
// subscriberUID,
// onResetCacheNeededCallback,
// lastDailyBar,
// ) {
// // return;
// const parsedSymbol = parseFullSymbol(symbolInfo.full_name);
// const channelString = `0~${parsedSymbol.exchange}~${parsedSymbol.fromSymbol}~${parsedSymbol.toSymbol}`;
// const handler = {
// id: subscriberUID,
// callback: onRealtimeCallback,
// };
// let subscriptionItem = channelToSubscription.get(channelString);
// if (subscriptionItem) {
// // Already subscribed to the channel, use the existing subscription
// subscriptionItem.handlers.push(handler);
// return;
// }
// subscriptionItem = {
// subscriberUID,
// resolution,
// lastDailyBar,
// handlers: [handler],
// };
// channelToSubscription.set(channelString, subscriptionItem);
// console.log('[subscribeBars]: Subscribe to streaming. Channel:', channelString);
// socket.emit('SubAdd', { subs: [channelString] });
// }
// export function unsubscribeFromStream(subscriberUID) {
// // return;
// // Find a subscription with id === subscriberUID
// for (const channelString of channelToSubscription.keys()) {
// const subscriptionItem = channelToSubscription.get(channelString);
// const handlerIndex = subscriptionItem.handlers
// .findIndex(handler => handler.id === subscriberUID);
// if (handlerIndex !== -1) {
// // Remove from handlers
// subscriptionItem.handlers.splice(handlerIndex, 1);
// if (subscriptionItem.handlers.length === 0) {
// // Unsubscribe from the channel if it was the last handler
// console.log('[unsubscribeBars]: Unsubscribe from streaming. Channel:', channelString);
// socket.emit('SubRemove', { subs: [channelString] });
// channelToSubscription.delete(channelString);
// break;
// }
// }
// }
// }
// function sim() {
// // Assuming these variables hold the data you extracted earlier
// const eventTypeStr = "0";
// const exchange = "Uniswap";
// const fromSymbol = "WETH";
// const toSymbol = "USD";
// const tradeTimeStr = (Date.now()).toString();
// const tradePriceStr = (55+Date.now()%23).toString();
// // Constructing the original string
// const data = [
// eventTypeStr,
// exchange,
// fromSymbol,
// toSymbol,
// '', // Placeholder for the fifth element
// '', // Placeholder for the sixth element
// tradeTimeStr,
// '', // Placeholder for the eighth element
// tradePriceStr,
// ].join('~');
// socket._callbacks['$m'][0](data);
// }
// window.sim = sim; // window.sim = sim;
socket._callbacks['$connect'][0](); // socket._callbacks['$connect'][0]();
setInterval(sim, 10*1000); // setInterval(sim, 10*1000);
; ;

View File

@@ -2,6 +2,7 @@ import {io} from "socket.io-client";
import {useStore} from "@/store/store.js"; import {useStore} from "@/store/store.js";
import {flushOrders} from "@/blockchain/wallet.js"; import {flushOrders} from "@/blockchain/wallet.js";
import {parseOrderStatus} from "@/blockchain/orderlib.js"; import {parseOrderStatus} from "@/blockchain/orderlib.js";
import { DataFeed } from "./charts/datafeed";
export const socket = io(import.meta.env.VITE_WS_URL || undefined, {transports: ["websocket"]}) export const socket = io(import.meta.env.VITE_WS_URL || undefined, {transports: ["websocket"]})
@@ -23,6 +24,7 @@ socket.on('p', async (chainId, pool, price) => {
socket.on('ohlc', async (chainId, pool, ohlcs) => { socket.on('ohlc', async (chainId, pool, ohlcs) => {
console.log('pool bars', pool, ohlcs) console.log('pool bars', pool, ohlcs)
DataFeed.poolCallback(chainId, pool, ohlcs)
}) })
socket.on('vb', async (chainId, vault, balances) => { socket.on('vb', async (chainId, vault, balances) => {