diff --git a/protocol-testing/Cargo.lock b/protocol-testing/Cargo.lock index be60f27..b42e8a6 100644 --- a/protocol-testing/Cargo.lock +++ b/protocol-testing/Cargo.lock @@ -1270,6 +1270,12 @@ dependencies = [ "syn 2.0.99", ] +[[package]] +name = "dotenv" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" + [[package]] name = "dunce" version = "1.0.5" @@ -2715,6 +2721,7 @@ version = "0.1.0" dependencies = [ "alloy", "clap", + "dotenv", "figment", "hex", "postgres", diff --git a/protocol-testing/Cargo.toml b/protocol-testing/Cargo.toml index bd76d81..2b8ecf4 100644 --- a/protocol-testing/Cargo.toml +++ b/protocol-testing/Cargo.toml @@ -20,3 +20,4 @@ hex = "0.4.3" tracing-subscriber = "0.3.19" postgres = "0.19.10" serde_yaml = "0.9.34" +dotenv = "0.15.0" diff --git a/protocol-testing/src/tycho_runner.rs b/protocol-testing/src/tycho_runner.rs new file mode 100644 index 0000000..50e60f3 --- /dev/null +++ b/protocol-testing/src/tycho_runner.rs @@ -0,0 +1,174 @@ +use std::{ + env, + io::{BufRead, BufReader}, + path::Path, + process::{Child, Command, Stdio}, + sync::{ + mpsc::{self, Receiver, Sender}, + Arc, Mutex, + }, + thread, + time::Duration, +}; + +use dotenv::dotenv; +use tracing::debug; + +pub struct TychoRunner { + db_url: String, + initialized_accounts: Vec, + with_binary_logs: bool, +} + +// TODO: Currently Tycho-Indexer cannot be run as a lib. We need to expose the entrypoints to allow +// running it as a lib +impl TychoRunner { + pub fn new(db_url: String, initialized_accounts: Vec, with_binary_logs: bool) -> Self { + Self { db_url, initialized_accounts, with_binary_logs } + } + + pub fn run_tycho( + &self, + spkg_path: &str, + start_block: u64, + end_block: u64, + protocol_type_names: &Vec, + ) -> Result<(), Box> { + // Expects a .env present in the same folder as package root (where Cargo.toml is) + dotenv().ok(); + + let mut cmd = Command::new("tycho-indexer"); + cmd.env("RUST_LOG", "tycho_indexer=info"); + + let all_accounts = self.initialized_accounts.clone(); + + cmd.args([ + "--database-url", + self.db_url.as_str(), + "run", + "--spkg", + spkg_path, + "--module", + "map_protocol_changes", + "--protocol-type-names", + &protocol_type_names.join(","), + "--start-block", + &start_block.to_string(), + "--stop-block", + &(end_block + 2).to_string(), // +2 is to make up for the cache in the index side + ]); + + if !all_accounts.is_empty() { + cmd.args([ + "--initialized-accounts", + &all_accounts.join(","), + "--initialization-block", + &start_block.to_string(), + ]); + } + + cmd.stdout(Stdio::piped()) + .stderr(Stdio::piped()); + + let mut process = match cmd.spawn() { + Ok(p) => p, + Err(e) => { + println!("Error running Tycho indexer: {}", e); + return Err(e.into()); + } + }; + + if self.with_binary_logs { + Self::handle_process_output(&mut process); + } + + match process.wait() { + Ok(status) => { + if !status.success() { + return Err(format!("Process exited with non-zero status: {}", status).into()); + } + } + Err(e) => { + println!("Error waiting for Tycho indexer: {}", e); + return Err(e.into()); + } + } + + Ok(()) + } + + fn run_with_rpc_server(&self, func: F) -> R + where + F: FnOnce() -> R, + { + let (tx, rx): (Sender, Receiver) = mpsc::channel(); + let db_url = self.db_url.clone(); + let with_binary_logs = self.with_binary_logs; + + // Start the RPC server in a separate thread + let rpc_thread = thread::spawn(move || { + let binary_path = "tycho-indexer"; + + let mut cmd = Command::new(binary_path) + .args(&["--database-url", db_url.as_str(), "rpc"]) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .env("RUST_LOG", "info") + .spawn() + .expect("Failed to start RPC server"); + + if with_binary_logs { + Self::handle_process_output(&mut cmd); + } + + match rx.recv() { + Ok(_) => { + debug!("Received termination message, stopping RPC server..."); + cmd.kill() + .expect("Failed to kill RPC server"); + } + Err(_) => { + // Channel closed, terminate anyway + let _ = cmd.kill(); + } + } + }); + + // Give the RPC server time to start + thread::sleep(Duration::from_secs(3)); + + // Run the provided function + let result = func(); + + tx.send(true) + .expect("Failed to send termination message"); + + // Wait for the RPC thread to finish + if rpc_thread.join().is_err() { + eprintln!("Failed to join RPC thread"); + } + + result + } + + // Helper method to handle process output in separate threads + fn handle_process_output(child: &mut Child) { + if let Some(stdout) = child.stdout.take() { + thread::spawn(move || { + let reader = BufReader::new(stdout); + for line in reader.lines().flatten() { + println!("{}", line); + } + }); + } + + if let Some(stderr) = child.stderr.take() { + thread::spawn(move || { + let reader = BufReader::new(stderr); + for line in reader.lines().flatten() { + eprintln!("{}", line); + } + }); + } + } +} diff --git a/protocol-testing/src/utils.rs b/protocol-testing/src/utils.rs index 6175473..0902344 100644 --- a/protocol-testing/src/utils.rs +++ b/protocol-testing/src/utils.rs @@ -1,4 +1,9 @@ -use std::{error::Error, fs, path::Path, process::Command}; +use std::{ + error::Error, + fs, + path::{Path, PathBuf}, + process::Command, +}; use figment::{ providers::{Format, Yaml}, @@ -7,19 +12,16 @@ use figment::{ }; /// Build a Substreams package with modifications to the YAML file. -pub fn build_spkg(yaml_file_path: &str, modify_func: F) -> Result> -where - F: FnOnce(&mut Value) -> Result<(), Box>, -{ +pub fn build_spkg(yaml_file_path: &PathBuf, initial_block: u64) -> Result> { // Create a backup file of the unmodified Substreams protocol YAML config file. - let backup_file_path = format!("{}.backup", yaml_file_path); + let backup_file_path = yaml_file_path.with_extension("backup"); fs::copy(yaml_file_path, &backup_file_path)?; let figment = Figment::new().merge(Yaml::file(yaml_file_path)); let mut data: Value = figment.extract()?; // Apply the modification function to update the YAML files - modify_func(&mut data).expect("Failed to modify the YAML config file."); + modify_initial_block(&mut data, initial_block); let parent_dir = Path::new(yaml_file_path) .parent() @@ -79,7 +81,7 @@ where } /// Update the initial block for all modules in the configuration data. -pub fn modify_initial_block(data: &mut Value, start_block: usize) { +pub fn modify_initial_block(data: &mut Value, start_block: u64) { if let Value::Dict(_, ref mut dict) = data { if let Some(Value::Array(_, modules)) = dict.get_mut("modules") { for module in modules.iter_mut() {