62 lines
2.1 KiB
JavaScript
62 lines
2.1 KiB
JavaScript
import {getProvider} from "./blockchain.js";
|
|
import {ethers} from "ethers";
|
|
import {erc20Abi} from "./abi.js";
|
|
import {pool} from "./db.js";
|
|
|
|
const BATCH_SIZE = 100 // the most blocks allowed per batch
|
|
|
|
async function watchChain(chainId, filterCallbacks) {
|
|
const provider = getProvider(chainId)
|
|
const db = await pool.connect()
|
|
const block = await provider.getBlockNumber()
|
|
let fromBlock, toBlock
|
|
const result = await db.query('select block from progress where chain=$1', [chainId])
|
|
if( result.rowCount === 0 ) {
|
|
console.log('initializing chain', chainId)
|
|
fromBlock = block
|
|
db.query('insert into progress values ($1,$2)', [chainId, block-1])
|
|
}
|
|
else if( result.rowCount === 1 ) {
|
|
fromBlock = result.rows[0].block + 1
|
|
}
|
|
else
|
|
throw Error(`Found ${result.rowCount} rows for progress table chain ${chainId}`)
|
|
try {
|
|
do {
|
|
toBlock = Math.min(block, fromBlock + BATCH_SIZE-1) // toBlock is inclusive
|
|
const promises = []
|
|
await db.query('BEGIN')
|
|
for (const [filter, callback] of filterCallbacks) {
|
|
filter.fromBlock = fromBlock
|
|
filter.toBlock = toBlock
|
|
console.log('filter', filter)
|
|
const found = await provider.getLogs(filter)
|
|
promises.push(callback(provider, db, found))
|
|
}
|
|
await Promise.all(promises)
|
|
db.query('update progress set block=$1 where chain=$2', [toBlock, chainId])
|
|
db.query('COMMIT')
|
|
fromBlock = toBlock + 1
|
|
} while (toBlock > block)
|
|
} catch (e) {
|
|
await db.query('ROLLBACK')
|
|
throw e
|
|
} finally {
|
|
db.release()
|
|
}
|
|
}
|
|
|
|
|
|
export function startWatcher(chainId, period, filterCallbacks) {
|
|
setInterval(async () => await watchChain(chainId, filterCallbacks), period)
|
|
}
|
|
|
|
|
|
// test
|
|
// const erc20 = new ethers.Interface(erc20Abi);
|
|
// const filter = erc20.encodeFilterTopics('Transfer', [null, null])
|
|
// async function handle(provider, db, events) {
|
|
// console.log(`got ${events.length} Transfers`)
|
|
// }
|
|
// startWatcher(42161, 1000, [[filter, handle]])
|