From e831640540891702966a49e503621d37b096d714 Mon Sep 17 00:00:00 2001 From: GW_MC <72297530+GWMCwing@users.noreply.github.com> Date: Sat, 11 Apr 2026 07:32:20 +0000 Subject: [PATCH] feat: Implement NginxHandler with reload, stop, validate, and config management methods --- Cargo.lock | 21 ++ Cargo.toml | 3 - apps/nxmesh-agent/Cargo.toml | 1 + apps/nxmesh-agent/src/main.rs | 5 +- apps/nxmesh-agent/src/service/mod.rs | 1 + .../nxmesh-agent/src/service/nginx_handler.rs | 308 ++++++++++++++++++ 6 files changed, 334 insertions(+), 5 deletions(-) create mode 100644 apps/nxmesh-agent/src/service/mod.rs create mode 100644 apps/nxmesh-agent/src/service/nginx_handler.rs diff --git a/Cargo.lock b/Cargo.lock index 984f48c..8b33a46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1343,6 +1343,17 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28dd6caf6059519a65843af8fe2a3ae298b14b80179855aeb4adc2c1934ee619" +[[package]] +name = "fs4" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8640e34b88f7652208ce9e88b1a37a2ae95227d84abec377ccd3c5cfeb141ed4" +dependencies = [ + "rustix", + "tokio", + "windows-sys 0.59.0", +] + [[package]] name = "funty" version = "2.0.0" @@ -2429,6 +2440,7 @@ dependencies = [ "chrono", "clap", "config", + "fs4", "futures", "hex", "hostname", @@ -5124,6 +5136,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.60.2" diff --git a/Cargo.toml b/Cargo.toml index 94de3ae..c18b346 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,9 +56,6 @@ futures = "0.3" toml = "0.9" config = "0.15" -# HTTP client -reqwest = { version = "0.13.2", default-features = false, features = ["json"] } - # Crypto sha2 = "0.10" hex = "0.4" diff --git a/apps/nxmesh-agent/Cargo.toml b/apps/nxmesh-agent/Cargo.toml index 2186cac..97dc5e2 100644 --- a/apps/nxmesh-agent/Cargo.toml +++ b/apps/nxmesh-agent/Cargo.toml @@ -57,6 +57,7 @@ zip = { workspace = true } # CLI clap = { workspace = true, features = ["derive"] } anyhow = "1.0.102" +fs4 = { version = "0.13.1", features = ["tokio"] } [dev-dependencies] tokio-test.workspace = true diff --git a/apps/nxmesh-agent/src/main.rs b/apps/nxmesh-agent/src/main.rs index e3dc71f..e812c37 100644 --- a/apps/nxmesh-agent/src/main.rs +++ b/apps/nxmesh-agent/src/main.rs @@ -14,6 +14,7 @@ use crate::connector::master::{MasterConnector, MasterConnectorTrait, ssh::SshMa mod cli; mod config; mod connector; +mod service; #[tokio::main] async fn main() { @@ -60,13 +61,13 @@ async fn main() { } // send a dummy heartbeat to verify the connection is working - let client = master_connector.get_client(); + let mut client = master_connector.get_client().lock().await.clone(); let request = nxmesh_proto::HealthReport { ..Default::default() }; - match client.lock().await.report_health(request).await { + match client.report_health(request).await { Ok(_) => info!("Successfully sent health report to master."), Err(e) => { error!("Failed to send health report to master: {}", e); diff --git a/apps/nxmesh-agent/src/service/mod.rs b/apps/nxmesh-agent/src/service/mod.rs new file mode 100644 index 0000000..87747d2 --- /dev/null +++ b/apps/nxmesh-agent/src/service/mod.rs @@ -0,0 +1 @@ +pub mod nginx_handler; diff --git a/apps/nxmesh-agent/src/service/nginx_handler.rs b/apps/nxmesh-agent/src/service/nginx_handler.rs new file mode 100644 index 0000000..30acbaa --- /dev/null +++ b/apps/nxmesh-agent/src/service/nginx_handler.rs @@ -0,0 +1,308 @@ +use std::sync::Arc; + +use anyhow::Result; +use fs4::tokio::AsyncFileExt; +use tokio::{io::AsyncWriteExt, process::Command}; +use tracing::{debug, warn}; + +use crate::config::settings::NginxSettings; + +// TODO: custom error type + +#[async_trait::async_trait] +pub trait NginxHandler { + // Reload nginx to apply new config. The config_path is an optional parameter that specifies the path to the nginx config file to be used for this reload operation. If not provided, the default config path will be used. + async fn reload(&self, config_path: Option<&str>) -> Result<()>; + async fn stop(&self) -> Result<()>; + async fn validate(&self, config_path: Option<&str>) -> Result<()>; + async fn get_version(&self) -> Result; + async fn get_status(&self) -> Result; + // Write a new config file for nginx. + // The output_path is a relative path to the nginx config directory of the deployment folder. The actual path to the config should not be assumed by the caller, as it can be different in different environments, but will be promised to be relative to the deployment folder for each the corresponding deployment_id. Path traversal is not allowed. + async fn write_config( + &self, + deployment_id: &str, + config_content: &str, + output_path: &str, + ) -> Result<()>; + // Append a new config content to an existing config file for nginx. This is useful for some use cases where we want to keep the existing config and just add some new config content to it. The output_path is a relative path to the nginx config directory of the deployment folder, which should be the same as the one used in write_config function. Path traversal is not allowed. + async fn append_config( + &self, + deployment_id: &str, + config_content: &str, + output_path: &str, + ) -> Result<()>; + + // clean up old config files that are applied to nginx + // keep only latest n deployments. + async fn cleanup_config(&self, n: usize) -> Result<()>; +} + +pub struct NginxHandlerImpl { + settings: Arc, +} + +impl NginxHandlerImpl { + pub fn new(settings: Arc) -> Self { + Self { settings } + } + + fn get_nginx_command(&self) -> String { + // TODO: rename the setting for better clarity, it can be a binary path or a custom command + self.settings + .nginx_binary_path + .clone() + .unwrap_or_else(|| "nginx".to_string()) + } + + fn validate_config_path(config_path: &str) -> Result<()> { + if !std::path::Path::new(config_path).exists() { + anyhow::bail!("Config file not found at path: {}", config_path); + } + if !std::path::Path::new(config_path).is_file() { + anyhow::bail!("Config path is not a file: {}", config_path); + } + Ok(()) + } + + fn apply_config_path_to_command_vecs<'a>( + command: &'a mut Vec, + config_path: &str, + ) -> Result<&'a mut Vec> { + // if given a config path, add it to the end of the command arguments to override the default config path used + Self::validate_config_path(config_path)?; + let parent_dir = match std::path::Path::new(config_path).parent() { + Some(dir) => dir, + // return root + None => std::path::Path::new("/"), + }; + // set prefix path to the parent directory of the config file to ensure nginx can find all related files (e.g. certs, conf.d, etc.) + command.push("-p".to_string()); + command.push(parent_dir.to_string_lossy().to_string()); + // add the config file path to the command arguments to override the default config path used by nginx + command.push("-c".to_string()); + command.push(config_path.to_string()); + Ok(command) + } + + fn get_deployment_dir(&self) -> std::path::PathBuf { + std::path::Path::new(&self.settings.nginx_config_path).join("deployments") + } + + fn get_deployment_dir_path(&self, deployment_id: &str) -> std::path::PathBuf { + self.get_deployment_dir().join(deployment_id) + } + + async fn get_deployment_config_path( + &self, + deployment_id: &str, + output_path: &str, + create_dir_if_not_exists: bool, + ) -> Result { + let output_path_obj = std::path::Path::new(output_path); + if output_path_obj.is_absolute() { + anyhow::bail!("Output path must be a relative path"); + } + if output_path_obj + .components() + .any(|comp| comp == std::path::Component::ParentDir) + { + anyhow::bail!("Output path must not contain parent directory traversal"); + } + + let deployment_config_dir = self.get_deployment_dir_path(deployment_id); + if create_dir_if_not_exists { + tokio::fs::create_dir_all(&deployment_config_dir).await?; + } + Ok(deployment_config_dir.join(output_path)) + } +} + +#[async_trait::async_trait] +impl NginxHandler for NginxHandlerImpl { + async fn reload(&self, config_path: Option<&str>) -> Result<()> { + // TODO: add timeout for the command execution + let reload_command_str = self.settings.override_nginx_reload_command.clone(); + let program = match reload_command_str.first() { + Some(cmd) => cmd, + None => &self.get_nginx_command(), + }; + + let mut reload_command_vec = reload_command_str[1..].to_vec(); + // if given a config path, add it to the end of the command arguments to override the default config path used + if let Some(path) = config_path { + Self::apply_config_path_to_command_vecs(&mut reload_command_vec, path)?; + } + + let output = Command::new(program) + .args(&reload_command_vec) + .output() + .await?; + if !output.status.success() { + let error_info = String::from_utf8_lossy(&output.stderr); + anyhow::bail!("Failed to reload nginx: {}", error_info.trim()); + } + let success_info = String::from_utf8_lossy(&output.stdout); + debug!("Nginx reloaded successfully: {}", success_info.trim()); + + Ok(()) + } + + async fn stop(&self) -> Result<()> { + let output = Command::new(self.get_nginx_command()) + .arg("-s") + .arg("stop") + .output() + .await?; + + if !output.status.success() { + let error_info = String::from_utf8_lossy(&output.stderr); + anyhow::bail!("Failed to stop nginx: {}", error_info.trim()); + } + let success_info = String::from_utf8_lossy(&output.stdout); + debug!("Nginx stopped successfully: {}", success_info.trim()); + + Ok(()) + } + + async fn validate(&self, config_path: Option<&str>) -> Result<()> { + // TODO: add timeout for the command execution + let validate_command_str = self.settings.override_nginx_test_command.clone(); + let program = match validate_command_str.first() { + Some(cmd) => cmd, + None => &self.get_nginx_command(), + }; + let mut validate_args = validate_command_str[1..].to_vec(); + // if given a config path, add it to the end of the command arguments to override the default config path used + if let Some(path) = config_path { + Self::apply_config_path_to_command_vecs(&mut validate_args, path)?; + } + + let output = Command::new(program).args(&validate_args).output().await?; + if !output.status.success() { + let error_info = String::from_utf8_lossy(&output.stderr); + anyhow::bail!("Nginx config validation failed: {}", error_info.trim()); + } + let success_info = String::from_utf8_lossy(&output.stdout); + debug!("Nginx config validation succeeded: {}", success_info.trim()); + Ok(()) + } + + async fn get_version(&self) -> Result { + let output = Command::new(self.get_nginx_command()) + .arg("-v") + .output() + .await?; + + if !output.status.success() { + let error_info = String::from_utf8_lossy(&output.stderr); + anyhow::bail!("Failed to get nginx version: {}", error_info.trim()); + } + + let version_info = String::from_utf8_lossy(&output.stderr); + Ok(version_info.trim().to_string()) + } + + async fn get_status(&self) -> Result { + let output = Command::new(self.get_nginx_command()) + .arg("-t") + .output() + .await?; + + if !output.status.success() { + let error_info = String::from_utf8_lossy(&output.stderr); + anyhow::bail!("Failed to get nginx status: {}", error_info.trim()); + } + + let status_info = String::from_utf8_lossy(&output.stderr); + Ok(status_info.trim().to_string()) + } + + async fn write_config( + &self, + deployment_id: &str, + config_content: &str, + output_path: &str, + ) -> Result<()> { + let full_output_path = self + .get_deployment_config_path(deployment_id, output_path, true) + .await?; + let mut file = tokio::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(full_output_path) + .await?; + // lock the file for writing to prevent concurrent write issue + file.allocate(config_content.len() as u64).await?; + file.lock_exclusive()?; + file.write_all(config_content.as_bytes()).await?; + file.unlock()?; + file.flush().await?; + + Ok(()) + } + + async fn append_config( + &self, + deployment_id: &str, + config_content: &str, + output_path: &str, + ) -> Result<()> { + let full_output_path = self + .get_deployment_config_path(deployment_id, output_path, true) + .await?; + let mut file = tokio::fs::OpenOptions::new() + .write(true) + .create(true) + .append(true) + .open(full_output_path) + .await?; + // lock the file for writing to prevent concurrent write issue + file.allocate(file.metadata().await?.len() + config_content.len() as u64) + .await?; + file.lock_exclusive()?; + file.write_all(config_content.as_bytes()).await?; + file.unlock()?; + file.flush().await?; + + Ok(()) + } + + async fn cleanup_config(&self, n: usize) -> Result<()> { + let deployment_dir = self.get_deployment_dir(); + // loop through all files in the deployment dir and delete them + let mut entries = tokio::fs::read_dir(&deployment_dir).await?; + let mut deployment_ids = Vec::new(); + while let Some(entry) = entries.next_entry().await? { + let file_type = entry.file_type().await?; + if file_type.is_dir() + && let Some(deployment_id) = entry.file_name().to_str() + { + deployment_ids.push(deployment_id.to_string()); + } + } + // sort the deployment ids by modified time in descending order and keep the latest n deployments, delete the rest + deployment_ids.sort_by_key(|id| { + let path = self.get_deployment_dir_path(id); + std::fs::metadata(path) + .and_then(|meta| meta.modified()) + .unwrap_or(std::time::SystemTime::UNIX_EPOCH) + }); + for deployment_id in deployment_ids.into_iter().skip(n) { + let path = self.get_deployment_dir_path(&deployment_id); + // ensure path is within the deplyment and nginx directory to prevent accidental deletion of other files + if !path.starts_with(&deployment_dir) + || !path.starts_with(&self.settings.nginx_config_path) + { + warn!( + "Skipping deletion of path outside of deployment or nginx config directory: {:?}", + path + ); + continue; + } + tokio::fs::remove_dir_all(path).await?; + } + Ok(()) + } +}