redis pub/sub fix
This commit is contained in:
13
cache.js
13
cache.js
@@ -1,12 +1,19 @@
|
||||
import {createClient} from "redis";
|
||||
|
||||
export const redis = createClient({
|
||||
|
||||
async function createRedisClient() {
|
||||
const client = createClient({
|
||||
url: process.env.DEXORDER_REDIS_URL || 'redis://localhost:6379',
|
||||
returnBuffers: false,
|
||||
})
|
||||
client.on('error', (err) => console.log('Redis Client Error', err));
|
||||
await client.connect();
|
||||
return client
|
||||
}
|
||||
|
||||
redis.on('error', (err) => console.log('Redis Client Error', err));
|
||||
await redis.connect();
|
||||
|
||||
export const redis = await createRedisClient()
|
||||
export const redisSubscriber = await createRedisClient()
|
||||
|
||||
|
||||
export class CacheSet {
|
||||
|
||||
4
chain.js
4
chain.js
@@ -5,6 +5,7 @@ export const chainInfo = JSON.parse(fs.readFileSync('../contract/version.json'))
|
||||
console.log('chainInfo', chainInfo)
|
||||
|
||||
export async function joinChain( socket, chainId ) {
|
||||
try {
|
||||
if (socket.chainId)
|
||||
socket.leave(socket.chainId)
|
||||
socket.join(chainId)
|
||||
@@ -12,4 +13,7 @@ export async function joinChain( socket, chainId ) {
|
||||
const items = await marks.items(chainId);
|
||||
for (let [token, mark] of items)
|
||||
socket.emit('mark.usd', chainId, token, mark)
|
||||
} catch (e) {
|
||||
console.error('joinChain', e)
|
||||
}
|
||||
}
|
||||
|
||||
10
faucet.js
10
faucet.js
@@ -6,6 +6,16 @@ import {metadata} from "./metadata.js";
|
||||
|
||||
|
||||
export async function gib( chainId, owner, vault, tokenAmounts ) {
|
||||
try {
|
||||
return await doGib( chainId, owner, vault, tokenAmounts )
|
||||
}
|
||||
catch (e) {
|
||||
console.error('gib failed', e)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
export async function doGib( chainId, owner, vault, tokenAmounts ) {
|
||||
if (!owner || !vault) return
|
||||
if (chainId === 421614) {
|
||||
// Arbitrum-Sepolia
|
||||
|
||||
11
init.js
Normal file
11
init.js
Normal file
@@ -0,0 +1,11 @@
|
||||
export function initLog(app) {
|
||||
process.on('uncaughtException', (err) => {
|
||||
console.error('Uncaught Exception:', err);
|
||||
process.exit(1); // Exit with code 1
|
||||
});
|
||||
|
||||
process.on('unhandledRejection', (reason, promise) => {
|
||||
console.error('Unhandled Rejection at:', promise, 'reason:', reason);
|
||||
process.exit(1); // Exit with code 1
|
||||
});
|
||||
}
|
||||
10
io.js
10
io.js
@@ -1,7 +1,7 @@
|
||||
import {createServer} from "http";
|
||||
import {Server} from "socket.io";
|
||||
import { createAdapter } from "@socket.io/redis-adapter";
|
||||
import {redis} from "./cache.js";
|
||||
import {redis, redisSubscriber} from "./cache.js";
|
||||
import {fileURLToPath} from "url";
|
||||
import path from "path";
|
||||
import express from "express";
|
||||
@@ -29,12 +29,14 @@ app.set('views', path.join(__dirname, 'views')); // Set the views directory
|
||||
|
||||
app.use(express.static(path.join(__dirname, 'public')));
|
||||
app.use(cors())
|
||||
app.use((err, req, res, next) => {
|
||||
console.error(err.stack);
|
||||
res.status(500).send('Something went wrong!');
|
||||
});
|
||||
|
||||
initSnapShare(app)
|
||||
|
||||
export const httpServer = createServer(app)
|
||||
export const io = new Server(httpServer, socketIoOptions)
|
||||
const pubClient = redis.duplicate();
|
||||
await pubClient.connect()
|
||||
const adapter = createAdapter(pubClient, redis, {/*key:'socket.io'*/})
|
||||
const adapter = createAdapter(redis, redisSubscriber, {/*key:'socket.io'*/})
|
||||
io.adapter(adapter)
|
||||
|
||||
2
main.js
2
main.js
@@ -1,8 +1,10 @@
|
||||
import 'dotenv/config'
|
||||
import {httpServer} from "./io.js";
|
||||
import {initIO} from "./route.js";
|
||||
import {initLog} from "./init.js";
|
||||
|
||||
|
||||
initLog();
|
||||
initIO();
|
||||
|
||||
const port = parseInt(process.env.DEXORDER_PORT) || 3001;
|
||||
|
||||
5
pool.js
5
pool.js
@@ -19,6 +19,7 @@ export function unsubPools( socket, chainId, addresses ) {
|
||||
|
||||
|
||||
export async function subOHLCs( socket, chainId, poolPeriods) {
|
||||
try {
|
||||
console.log('subOHLCs', chainId, poolPeriods)
|
||||
for (const key of poolPeriods) {
|
||||
const room = `${chainId}|${key}`;
|
||||
@@ -32,6 +33,10 @@ export async function subOHLCs( socket, chainId, poolPeriods) {
|
||||
console.log('joined room', room)
|
||||
}
|
||||
}
|
||||
catch (e) {
|
||||
console.error('subOHLCs', e)
|
||||
}
|
||||
}
|
||||
|
||||
export function unsubOHLCs( socket, chainId, poolPeriods ) {
|
||||
console.log('unsubOHLCs', chainId, poolPeriods)
|
||||
|
||||
Reference in New Issue
Block a user