Files
server/watcher.js
2023-09-01 18:58:04 -04:00

104 lines
3.8 KiB
JavaScript

import {getProvider} from "./blockchain.js";
import {pool} from "./db.js";
import {getInterface} from "./abi.js";
const BATCH_SIZE = 100 // the most blocks allowed per batch
// watcher collects all events from the blockchain and then matches them against our local filters, invoking any
// registered callbacks. we do the filtering/switching locally to prevent invoking a separate api call for
// each type of event
async function processLogs(chainId, topicEventCallbacks) {
// console.log('processLogs')
const provider = getProvider(chainId)
const block = await provider.getBlockNumber()
const db = await pool.connect()
try {
let fromBlock, toBlock
const result = await db.query('select block from progress where chain=$1', [chainId])
if (result.rowCount === 0) {
console.log('initializing chain', chainId)
fromBlock = block
db.query('insert into progress values ($1,$2)', [chainId, block - 1])
} else if (result.rowCount === 1) {
fromBlock = result.rows[0].block + 1
} else
throw Error(`Found ${result.rowCount} rows for progress table chain ${chainId}`)
if (fromBlock > block)
return
try {
do {
toBlock = Math.min(block, fromBlock + BATCH_SIZE - 1) // toBlock is inclusive
await db.query('BEGIN')
const logs = await provider.getLogs({fromBlock, toBlock})
// console.log(`logs for block range [${fromBlock},${toBlock}]`, logs)
const promises = []
for (const log of logs) {
for (const topic of log.topics) {
const cb = topicEventCallbacks[topic]
if (cb !== undefined)
promises.push(cb(provider, db, log))
}
}
await Promise.all(promises)
db.query('update progress set block=$1 where chain=$2', [toBlock, chainId])
db.query('COMMIT')
fromBlock = toBlock + 1
} while (toBlock > block)
} catch (e) {
await db.query('ROLLBACK')
throw e
}
}
finally {
db.release()
}
}
export async function startWatcher(chainId, period, filterCallbacks) {
const topicEventCallbacks = {} // topic: callback(log)
for (const [[className, eventName, ...args], callback] of filterCallbacks) {
const interf = await getInterface(className)
const event = interf.getEvent(eventName)
const topics = interf.encodeFilterTopics(event, args)
for( const topic of topics ) {
topicEventCallbacks[topic] = async (provider, db, log) => {
let info
try {
// info = interf.decodeEventLog(event, log.data, log.topics)
info = interf.parseLog(log)
}
catch (e) {
// console.error(`could not decode log for topic ${topic}`, log)
return
}
if( info !== null ) {
info.address = log.address
await callback(provider, db, info)
}
}
}
}
console.log('registered topics', Object.keys(topicEventCallbacks))
setInterval(async () => {
try {
await processLogs(chainId, topicEventCallbacks)
} catch (e) {
console.error('error during processLogs',e)
}
}, period)
}
// test
// const erc20 = new ethers.Interface(erc20Abi);
// const filter = erc20.encodeFilterTopics('Transfer', [null, null])
// async function handle(provider, db, events) {
// console.log(`got ${events.length} Transfers`)
// }
// startWatcher(42161, 1000, [[filter, handle]])