diff --git a/abi.js b/abi.js index 4e758b1..129b815 100644 --- a/abi.js +++ b/abi.js @@ -1,3 +1,9 @@ +import {readFile} from './misc.js' +import {ethers} from "ethers"; + +const ABI_BASE_PATH = '../contract/out' + + export const erc20Abi = [ 'function name() view returns (string)', 'function symbol() view returns (string)', @@ -26,6 +32,7 @@ const TimedOrderSpec = '(' + 'bool amountIsInput' + ')' + export const timedOrderAbi = [ 'event TimedOrderCreated (address owner, uint64 index, Spec spec)', 'event TimedOrderFilled (address owner, uint64 index, uint256 amountIn, uint256 amountOut)', @@ -35,8 +42,31 @@ export const timedOrderAbi = [ ] +const vaultDeployerAbi = [ + 'function deployVault(address owner) returns (address vault)', + 'event VaultCreated( address deployer, address owner )', +] + + export const abi = { 'ERC20': erc20Abi, 'TimedOrder': timedOrderAbi, + 'VaultDeployer': vaultDeployerAbi, } + +export async function getAbi(className) { + let found = abi[className] + if (found === undefined) { + console.log('warning: loading ABI from filesystem for '+className) + const data = await readFile(ABI_BASE_PATH + `/${className}.sol/${className}.json`) + found = JSON.parse(data.toString())['abi'] + abi[className] = found + } + return found +} + + +export async function getInterface(className) { + return new ethers.Interface(await getAbi(className)) +} diff --git a/blockchain.js b/blockchain.js index ac35fd6..c49249a 100644 --- a/blockchain.js +++ b/blockchain.js @@ -1,15 +1,22 @@ import {ethers} from "ethers"; -const providers = {} // indexed by chain id +export const chains = {} +const _chainInfo = [ + {id:42161, name:'Arbitrum'}, +] +for( const chain of _chainInfo ) + chains[chain.id] = chain + +const providers = {} // indexed by chain id export function getProvider(chainId) { let result = providers[chainId] if( result === undefined ) { - const rpc_url = process.env['DEXORDER_RPC_URL_'+chainId] + let rpc_url = process.env['DEXORDER_RPC_URL_'+chainId] if( rpc_url === undefined ) { - console.error('No provider found for chainId',chainId) - return null + console.error(`WARNING: No provider found for chainId ${chainId}. Using localhost.`) + rpc_url = 'http://localhost:8545' } result = rpc_url.startsWith('ws') ? new ethers.WebSocketProvider(rpc_url, chainId) : @@ -19,24 +26,28 @@ export function getProvider(chainId) { return result } -const signers = {} // indexed by chain id, value is an array to be used in round-robin fashion -const signerIndexes = {} +const signers = {} // indexed by chain id -export function signer(chainId) { - let chainSigners = signers[chainId] - if (chainSigners === undefined) { - chainSigners = [] +// todo multiple signers per chain, checked out of a pool +export function getSigner(chainId) { + let signer = signers[chainId] + if (signer === undefined) { const private_keys = process.env['DEXORDER_ACCOUNTS_' + chainId] - for (const match of private_keys.matchAll(/([^,]+),?/g)) - chainSigners.push(new ethers.Wallet(match[1])) - signers[chainId] = chainSigners - signerIndexes[chainId] = 0 + if( !private_keys ) { + console.log(`DEXORDER_ACCOUNTS_${chainId} not defined`) + return null // todo fatal + } + // for (const match of private_keys.matchAll(/([^,]+),?/g)) + // signer.push(new ethers.Wallet(match[1])) + signer = new ethers.Wallet(private_keys, getProvider(chainId)) + signers[chainId] = signer } - const result = chainSigners[signerIndexes[chainId]] - signerIndexes[chainId]++ - if( signerIndexes[chainId] >= chainSigners.length ) - signerIndexes[chainId] = 0 - return result + return signer + // const result = signer[signerIndexes[chainId]] + // signerIndexes[chainId]++ + // if( signerIndexes[chainId] >= signer.length ) + // signerIndexes[chainId] = 0 + // return result } diff --git a/cache.js b/cache.js new file mode 100644 index 0000000..2ed68c9 --- /dev/null +++ b/cache.js @@ -0,0 +1,19 @@ +// implement a cluster-wide cache which is in-memory for each instance + +export class Cache { + + constructor(name) { + this.name = name + this.cache = {} + } + + async get(key) { + return this.cache[key] + } + + async set(key, value) { + this.cache[key] = value + // todo broadcast + } +} + diff --git a/io.js b/io.js new file mode 100644 index 0000000..17d81d2 --- /dev/null +++ b/io.js @@ -0,0 +1,8 @@ +import {createServer} from "http"; +import {Server} from "socket.io"; + +const options = {} +if( process.env.DEXORDER_CORS ) + options['cors'] = {origin:process.env.DEXORDER_CORS} +export const httpServer = createServer() +export const io = new Server(httpServer, options) diff --git a/main.js b/main.js index 19e13c6..5fa1ec4 100644 --- a/main.js +++ b/main.js @@ -1,14 +1,25 @@ import 'dotenv/config' -import { createServer } from "http"; -import { Server } from "socket.io" import {lookupToken} from "./token.js"; +import {startWatcher} from "./watcher.js"; +import {chains} from "./blockchain.js"; +import {watchErc20Transfer, watchVaultCreated} from "./vault.js"; +import {httpServer, io} from "./io.js"; -const options = {} -if( process.env.DEXORDER_CORS ) - options['cors'] = {origin:process.env.DEXORDER_CORS} -const httpServer = createServer() -const io = new Server(httpServer, options) + +// setup watcher + +const filterCallbacks = [ + // format is [[className, eventName, ...eventArgs], callback(provider, database, logInfo)] + [['VaultDeployer','VaultCreated', null, null], watchVaultCreated], + [['ERC20', 'Transfer'], watchErc20Transfer], +] + +for( const chain of Object.values(chains) ) + await startWatcher( chain.id, 1000, filterCallbacks ) + + +// setup socket.io io.on("connection", (socket) => { // initially, only anonymous messages are allowed @@ -23,4 +34,3 @@ io.on("connection", (socket) => { const port = parseInt(process.env.DEXORDER_PORT) || 3000; httpServer.listen(port) console.log('Started server on port '+port) -console.log(options) diff --git a/migrations/20230829202027-initial.js b/migrations/20230829202027-initial.js index f7286a5..cdaae8b 100644 --- a/migrations/20230829202027-initial.js +++ b/migrations/20230829202027-initial.js @@ -22,7 +22,7 @@ exports.up = async function (db) { await db.createTable('eoa', { chain: { type: 'int', primaryKey: true }, address: { type: 'bytea', primaryKey: true}, - vaulted: 'boolean', + // vaulted: 'boolean', }) await db.createTable('tokenusage', { chain: { type: 'int', primaryKey: true }, diff --git a/misc.js b/misc.js new file mode 100644 index 0000000..73f2099 --- /dev/null +++ b/misc.js @@ -0,0 +1,4 @@ +import fs from "fs"; +import util from "util"; + +export const readFile = (fileName) => util.promisify(fs.readFile)(fileName, 'utf8'); diff --git a/vault.js b/vault.js index aa14066..7b051e9 100644 --- a/vault.js +++ b/vault.js @@ -1,5 +1,51 @@ +import {ethers} from "ethers"; +import {getAbi} from "./abi.js"; +import {getProvider, getSigner} from "./blockchain.js"; +import {Cache} from './cache.js'; + +const DEPLOYER_ADDRESS = '0xF99aB16Bd8398EAf12407D05A0F8824316008E99' +const VAULT_INIT_CODE_HASH = '0xbf043f7035d5aa3be2b3c94df5b256fbe24675689327af4ab71c48194c463031' + +const vaults = new Cache('vaults') // vault:owner + +const deployerAbi = await getAbi('VaultDeployer'); + + +function newVault(address, owner) { + return {address, owner, balances: {}} +} + +export function vaultAddress(chainId, ownerAddress) { + const encoded = ethers.AbiCoder.defaultAbiCoder().encode(['address'], [ownerAddress]) + const salt = ethers.keccak256(encoded) + return ethers.getCreate2Address(DEPLOYER_ADDRESS, salt, VAULT_INIT_CODE_HASH) +} + export function loginAddress(socket, chainId, address) { // todo check for existing vault - // todo send welcome with basic info -} \ No newline at end of file + if (!vaults[address]) { + // + } else { + // todo send welcome with basic info and extra tokens + + } +} + + +export async function ensureVault(socket, chainId, owner) { + const address = vaultAddress(chainId, owner) + if (!vaults[address]) { + const deployer = new ethers.Contract(DEPLOYER_ADDRESS, deployerAbi, getSigner(chainId)) + await deployer.deployVault(owner) + } +} + + +export async function watchVaultCreated(provider, db, event) { + console.log(`vault created`, event) +} + +export async function watchErc20Transfer(provider, db, event) { + console.log('Transfer', event) +} diff --git a/watcher.js b/watcher.js index c951953..bff29b6 100644 --- a/watcher.js +++ b/watcher.js @@ -1,54 +1,96 @@ import {getProvider} from "./blockchain.js"; -import {ethers} from "ethers"; -import {erc20Abi} from "./abi.js"; import {pool} from "./db.js"; +import {getInterface} from "./abi.js"; const BATCH_SIZE = 100 // the most blocks allowed per batch -async function watchChain(chainId, filterCallbacks) { + +// watcher collects all events from the blockchain and then matches them against our local filters, invoking any +// registered callbacks. we do the filtering/switching locally to prevent invoking a separate api call for +// each type of event + + +async function processLogs(chainId, topicEventCallbacks) { + // console.log('processLogs') const provider = getProvider(chainId) - const db = await pool.connect() const block = await provider.getBlockNumber() - let fromBlock, toBlock - const result = await db.query('select block from progress where chain=$1', [chainId]) - if( result.rowCount === 0 ) { - console.log('initializing chain', chainId) - fromBlock = block - db.query('insert into progress values ($1,$2)', [chainId, block-1]) - } - else if( result.rowCount === 1 ) { - fromBlock = result.rows[0].block + 1 - } - else - throw Error(`Found ${result.rowCount} rows for progress table chain ${chainId}`) + const db = await pool.connect() try { - do { - toBlock = Math.min(block, fromBlock + BATCH_SIZE-1) // toBlock is inclusive - const promises = [] - await db.query('BEGIN') - for (const [filter, callback] of filterCallbacks) { - filter.fromBlock = fromBlock - filter.toBlock = toBlock - console.log('filter', filter) - const found = await provider.getLogs(filter) - promises.push(callback(provider, db, found)) - } - await Promise.all(promises) - db.query('update progress set block=$1 where chain=$2', [toBlock, chainId]) - db.query('COMMIT') - fromBlock = toBlock + 1 - } while (toBlock > block) - } catch (e) { - await db.query('ROLLBACK') - throw e - } finally { + let fromBlock, toBlock + const result = await db.query('select block from progress where chain=$1', [chainId]) + if (result.rowCount === 0) { + console.log('initializing chain', chainId) + fromBlock = block + db.query('insert into progress values ($1,$2)', [chainId, block - 1]) + } else if (result.rowCount === 1) { + fromBlock = result.rows[0].block + 1 + } else + throw Error(`Found ${result.rowCount} rows for progress table chain ${chainId}`) + if (fromBlock > block) + return + try { + do { + toBlock = Math.min(block, fromBlock + BATCH_SIZE - 1) // toBlock is inclusive + await db.query('BEGIN') + const logs = await provider.getLogs({fromBlock, toBlock}) + // console.log(`logs for block range [${fromBlock},${toBlock}]`, logs) + const promises = [] + for (const log of logs) { + for (const topic of log.topics) { + const cb = topicEventCallbacks[topic] + if (cb !== undefined) + promises.push(cb(provider, db, log)) + } + } + await Promise.all(promises) + db.query('update progress set block=$1 where chain=$2', [toBlock, chainId]) + db.query('COMMIT') + fromBlock = toBlock + 1 + } while (toBlock > block) + } catch (e) { + await db.query('ROLLBACK') + throw e + } + } + finally { db.release() } } -export function startWatcher(chainId, period, filterCallbacks) { - setInterval(async () => await watchChain(chainId, filterCallbacks), period) +export async function startWatcher(chainId, period, filterCallbacks) { + const topicEventCallbacks = {} // topic: callback(log) + for (const [[className, eventName, ...args], callback] of filterCallbacks) { + const interf = await getInterface(className) + const event = interf.getEvent(eventName) + const topics = interf.encodeFilterTopics(event, args) + for( const topic of topics ) { + topicEventCallbacks[topic] = async (provider, db, log) => { + let info + try { + // info = interf.decodeEventLog(event, log.data, log.topics) + info = interf.parseLog(log) + } + catch (e) { + // console.error(`could not decode log for topic ${topic}`, log) + return + } + if( info !== null ) { + info.address = log.address + await callback(provider, db, info) + } + } + } + } + console.log('registered topics', Object.keys(topicEventCallbacks)) + + setInterval(async () => { + try { + await processLogs(chainId, topicEventCallbacks) + } catch (e) { + console.error('error during processLogs',e) + } + }, period) }