refactored watcher; produces logs
This commit is contained in:
118
watcher.js
118
watcher.js
@@ -1,54 +1,96 @@
|
||||
import {getProvider} from "./blockchain.js";
|
||||
import {ethers} from "ethers";
|
||||
import {erc20Abi} from "./abi.js";
|
||||
import {pool} from "./db.js";
|
||||
import {getInterface} from "./abi.js";
|
||||
|
||||
const BATCH_SIZE = 100 // the most blocks allowed per batch
|
||||
|
||||
async function watchChain(chainId, filterCallbacks) {
|
||||
|
||||
// watcher collects all events from the blockchain and then matches them against our local filters, invoking any
|
||||
// registered callbacks. we do the filtering/switching locally to prevent invoking a separate api call for
|
||||
// each type of event
|
||||
|
||||
|
||||
async function processLogs(chainId, topicEventCallbacks) {
|
||||
// console.log('processLogs')
|
||||
const provider = getProvider(chainId)
|
||||
const db = await pool.connect()
|
||||
const block = await provider.getBlockNumber()
|
||||
let fromBlock, toBlock
|
||||
const result = await db.query('select block from progress where chain=$1', [chainId])
|
||||
if( result.rowCount === 0 ) {
|
||||
console.log('initializing chain', chainId)
|
||||
fromBlock = block
|
||||
db.query('insert into progress values ($1,$2)', [chainId, block-1])
|
||||
}
|
||||
else if( result.rowCount === 1 ) {
|
||||
fromBlock = result.rows[0].block + 1
|
||||
}
|
||||
else
|
||||
throw Error(`Found ${result.rowCount} rows for progress table chain ${chainId}`)
|
||||
const db = await pool.connect()
|
||||
try {
|
||||
do {
|
||||
toBlock = Math.min(block, fromBlock + BATCH_SIZE-1) // toBlock is inclusive
|
||||
const promises = []
|
||||
await db.query('BEGIN')
|
||||
for (const [filter, callback] of filterCallbacks) {
|
||||
filter.fromBlock = fromBlock
|
||||
filter.toBlock = toBlock
|
||||
console.log('filter', filter)
|
||||
const found = await provider.getLogs(filter)
|
||||
promises.push(callback(provider, db, found))
|
||||
}
|
||||
await Promise.all(promises)
|
||||
db.query('update progress set block=$1 where chain=$2', [toBlock, chainId])
|
||||
db.query('COMMIT')
|
||||
fromBlock = toBlock + 1
|
||||
} while (toBlock > block)
|
||||
} catch (e) {
|
||||
await db.query('ROLLBACK')
|
||||
throw e
|
||||
} finally {
|
||||
let fromBlock, toBlock
|
||||
const result = await db.query('select block from progress where chain=$1', [chainId])
|
||||
if (result.rowCount === 0) {
|
||||
console.log('initializing chain', chainId)
|
||||
fromBlock = block
|
||||
db.query('insert into progress values ($1,$2)', [chainId, block - 1])
|
||||
} else if (result.rowCount === 1) {
|
||||
fromBlock = result.rows[0].block + 1
|
||||
} else
|
||||
throw Error(`Found ${result.rowCount} rows for progress table chain ${chainId}`)
|
||||
if (fromBlock > block)
|
||||
return
|
||||
try {
|
||||
do {
|
||||
toBlock = Math.min(block, fromBlock + BATCH_SIZE - 1) // toBlock is inclusive
|
||||
await db.query('BEGIN')
|
||||
const logs = await provider.getLogs({fromBlock, toBlock})
|
||||
// console.log(`logs for block range [${fromBlock},${toBlock}]`, logs)
|
||||
const promises = []
|
||||
for (const log of logs) {
|
||||
for (const topic of log.topics) {
|
||||
const cb = topicEventCallbacks[topic]
|
||||
if (cb !== undefined)
|
||||
promises.push(cb(provider, db, log))
|
||||
}
|
||||
}
|
||||
await Promise.all(promises)
|
||||
db.query('update progress set block=$1 where chain=$2', [toBlock, chainId])
|
||||
db.query('COMMIT')
|
||||
fromBlock = toBlock + 1
|
||||
} while (toBlock > block)
|
||||
} catch (e) {
|
||||
await db.query('ROLLBACK')
|
||||
throw e
|
||||
}
|
||||
}
|
||||
finally {
|
||||
db.release()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
export function startWatcher(chainId, period, filterCallbacks) {
|
||||
setInterval(async () => await watchChain(chainId, filterCallbacks), period)
|
||||
export async function startWatcher(chainId, period, filterCallbacks) {
|
||||
const topicEventCallbacks = {} // topic: callback(log)
|
||||
for (const [[className, eventName, ...args], callback] of filterCallbacks) {
|
||||
const interf = await getInterface(className)
|
||||
const event = interf.getEvent(eventName)
|
||||
const topics = interf.encodeFilterTopics(event, args)
|
||||
for( const topic of topics ) {
|
||||
topicEventCallbacks[topic] = async (provider, db, log) => {
|
||||
let info
|
||||
try {
|
||||
// info = interf.decodeEventLog(event, log.data, log.topics)
|
||||
info = interf.parseLog(log)
|
||||
}
|
||||
catch (e) {
|
||||
// console.error(`could not decode log for topic ${topic}`, log)
|
||||
return
|
||||
}
|
||||
if( info !== null ) {
|
||||
info.address = log.address
|
||||
await callback(provider, db, info)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
console.log('registered topics', Object.keys(topicEventCallbacks))
|
||||
|
||||
setInterval(async () => {
|
||||
try {
|
||||
await processLogs(chainId, topicEventCallbacks)
|
||||
} catch (e) {
|
||||
console.error('error during processLogs',e)
|
||||
}
|
||||
}, period)
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user