feat: add Tycho Runner to run separate tycho processes

This commit is contained in:
Thales Lima
2025-03-11 13:17:42 -03:00
committed by Tamara
parent 4425fe1680
commit e3ae70ab43
4 changed files with 192 additions and 8 deletions

View File

@@ -1270,6 +1270,12 @@ dependencies = [
"syn 2.0.99", "syn 2.0.99",
] ]
[[package]]
name = "dotenv"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f"
[[package]] [[package]]
name = "dunce" name = "dunce"
version = "1.0.5" version = "1.0.5"
@@ -2715,6 +2721,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"alloy", "alloy",
"clap", "clap",
"dotenv",
"figment", "figment",
"hex", "hex",
"postgres", "postgres",

View File

@@ -20,3 +20,4 @@ hex = "0.4.3"
tracing-subscriber = "0.3.19" tracing-subscriber = "0.3.19"
postgres = "0.19.10" postgres = "0.19.10"
serde_yaml = "0.9.34" serde_yaml = "0.9.34"
dotenv = "0.15.0"

View File

@@ -0,0 +1,174 @@
use std::{
env,
io::{BufRead, BufReader},
path::Path,
process::{Child, Command, Stdio},
sync::{
mpsc::{self, Receiver, Sender},
Arc, Mutex,
},
thread,
time::Duration,
};
use dotenv::dotenv;
use tracing::debug;
pub struct TychoRunner {
db_url: String,
initialized_accounts: Vec<String>,
with_binary_logs: bool,
}
// TODO: Currently Tycho-Indexer cannot be run as a lib. We need to expose the entrypoints to allow
// running it as a lib
impl TychoRunner {
pub fn new(db_url: String, initialized_accounts: Vec<String>, with_binary_logs: bool) -> Self {
Self { db_url, initialized_accounts, with_binary_logs }
}
pub fn run_tycho(
&self,
spkg_path: &str,
start_block: u64,
end_block: u64,
protocol_type_names: &Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
// Expects a .env present in the same folder as package root (where Cargo.toml is)
dotenv().ok();
let mut cmd = Command::new("tycho-indexer");
cmd.env("RUST_LOG", "tycho_indexer=info");
let all_accounts = self.initialized_accounts.clone();
cmd.args([
"--database-url",
self.db_url.as_str(),
"run",
"--spkg",
spkg_path,
"--module",
"map_protocol_changes",
"--protocol-type-names",
&protocol_type_names.join(","),
"--start-block",
&start_block.to_string(),
"--stop-block",
&(end_block + 2).to_string(), // +2 is to make up for the cache in the index side
]);
if !all_accounts.is_empty() {
cmd.args([
"--initialized-accounts",
&all_accounts.join(","),
"--initialization-block",
&start_block.to_string(),
]);
}
cmd.stdout(Stdio::piped())
.stderr(Stdio::piped());
let mut process = match cmd.spawn() {
Ok(p) => p,
Err(e) => {
println!("Error running Tycho indexer: {}", e);
return Err(e.into());
}
};
if self.with_binary_logs {
Self::handle_process_output(&mut process);
}
match process.wait() {
Ok(status) => {
if !status.success() {
return Err(format!("Process exited with non-zero status: {}", status).into());
}
}
Err(e) => {
println!("Error waiting for Tycho indexer: {}", e);
return Err(e.into());
}
}
Ok(())
}
fn run_with_rpc_server<F, R>(&self, func: F) -> R
where
F: FnOnce() -> R,
{
let (tx, rx): (Sender<bool>, Receiver<bool>) = mpsc::channel();
let db_url = self.db_url.clone();
let with_binary_logs = self.with_binary_logs;
// Start the RPC server in a separate thread
let rpc_thread = thread::spawn(move || {
let binary_path = "tycho-indexer";
let mut cmd = Command::new(binary_path)
.args(&["--database-url", db_url.as_str(), "rpc"])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.env("RUST_LOG", "info")
.spawn()
.expect("Failed to start RPC server");
if with_binary_logs {
Self::handle_process_output(&mut cmd);
}
match rx.recv() {
Ok(_) => {
debug!("Received termination message, stopping RPC server...");
cmd.kill()
.expect("Failed to kill RPC server");
}
Err(_) => {
// Channel closed, terminate anyway
let _ = cmd.kill();
}
}
});
// Give the RPC server time to start
thread::sleep(Duration::from_secs(3));
// Run the provided function
let result = func();
tx.send(true)
.expect("Failed to send termination message");
// Wait for the RPC thread to finish
if rpc_thread.join().is_err() {
eprintln!("Failed to join RPC thread");
}
result
}
// Helper method to handle process output in separate threads
fn handle_process_output(child: &mut Child) {
if let Some(stdout) = child.stdout.take() {
thread::spawn(move || {
let reader = BufReader::new(stdout);
for line in reader.lines().flatten() {
println!("{}", line);
}
});
}
if let Some(stderr) = child.stderr.take() {
thread::spawn(move || {
let reader = BufReader::new(stderr);
for line in reader.lines().flatten() {
eprintln!("{}", line);
}
});
}
}
}

View File

@@ -1,4 +1,9 @@
use std::{error::Error, fs, path::Path, process::Command}; use std::{
error::Error,
fs,
path::{Path, PathBuf},
process::Command,
};
use figment::{ use figment::{
providers::{Format, Yaml}, providers::{Format, Yaml},
@@ -7,19 +12,16 @@ use figment::{
}; };
/// Build a Substreams package with modifications to the YAML file. /// Build a Substreams package with modifications to the YAML file.
pub fn build_spkg<F>(yaml_file_path: &str, modify_func: F) -> Result<String, Box<dyn Error>> pub fn build_spkg(yaml_file_path: &PathBuf, initial_block: u64) -> Result<String, Box<dyn Error>> {
where
F: FnOnce(&mut Value) -> Result<(), Box<dyn Error>>,
{
// Create a backup file of the unmodified Substreams protocol YAML config file. // Create a backup file of the unmodified Substreams protocol YAML config file.
let backup_file_path = format!("{}.backup", yaml_file_path); let backup_file_path = yaml_file_path.with_extension("backup");
fs::copy(yaml_file_path, &backup_file_path)?; fs::copy(yaml_file_path, &backup_file_path)?;
let figment = Figment::new().merge(Yaml::file(yaml_file_path)); let figment = Figment::new().merge(Yaml::file(yaml_file_path));
let mut data: Value = figment.extract()?; let mut data: Value = figment.extract()?;
// Apply the modification function to update the YAML files // Apply the modification function to update the YAML files
modify_func(&mut data).expect("Failed to modify the YAML config file."); modify_initial_block(&mut data, initial_block);
let parent_dir = Path::new(yaml_file_path) let parent_dir = Path::new(yaml_file_path)
.parent() .parent()
@@ -79,7 +81,7 @@ where
} }
/// Update the initial block for all modules in the configuration data. /// Update the initial block for all modules in the configuration data.
pub fn modify_initial_block(data: &mut Value, start_block: usize) { pub fn modify_initial_block(data: &mut Value, start_block: u64) {
if let Value::Dict(_, ref mut dict) = data { if let Value::Dict(_, ref mut dict) = data {
if let Some(Value::Array(_, modules)) = dict.get_mut("modules") { if let Some(Value::Array(_, modules)) = dict.get_mut("modules") {
for module in modules.iter_mut() { for module in modules.iter_mut() {