feat: Implement NginxHandler with reload, stop, validate, and config management methods

This commit is contained in:
GW_MC
2026-04-11 07:32:20 +00:00
parent a023cbc082
commit e831640540
6 changed files with 334 additions and 5 deletions

21
Cargo.lock generated
View File

@@ -1343,6 +1343,17 @@ version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28dd6caf6059519a65843af8fe2a3ae298b14b80179855aeb4adc2c1934ee619"
[[package]]
name = "fs4"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8640e34b88f7652208ce9e88b1a37a2ae95227d84abec377ccd3c5cfeb141ed4"
dependencies = [
"rustix",
"tokio",
"windows-sys 0.59.0",
]
[[package]]
name = "funty"
version = "2.0.0"
@@ -2429,6 +2440,7 @@ dependencies = [
"chrono",
"clap",
"config",
"fs4",
"futures",
"hex",
"hostname",
@@ -5124,6 +5136,15 @@ dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "windows-sys"
version = "0.59.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b"
dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "windows-sys"
version = "0.60.2"

View File

@@ -56,9 +56,6 @@ futures = "0.3"
toml = "0.9"
config = "0.15"
# HTTP client
reqwest = { version = "0.13.2", default-features = false, features = ["json"] }
# Crypto
sha2 = "0.10"
hex = "0.4"

View File

@@ -57,6 +57,7 @@ zip = { workspace = true }
# CLI
clap = { workspace = true, features = ["derive"] }
anyhow = "1.0.102"
fs4 = { version = "0.13.1", features = ["tokio"] }
[dev-dependencies]
tokio-test.workspace = true

View File

@@ -14,6 +14,7 @@ use crate::connector::master::{MasterConnector, MasterConnectorTrait, ssh::SshMa
mod cli;
mod config;
mod connector;
mod service;
#[tokio::main]
async fn main() {
@@ -60,13 +61,13 @@ async fn main() {
}
// send a dummy heartbeat to verify the connection is working
let client = master_connector.get_client();
let mut client = master_connector.get_client().lock().await.clone();
let request = nxmesh_proto::HealthReport {
..Default::default()
};
match client.lock().await.report_health(request).await {
match client.report_health(request).await {
Ok(_) => info!("Successfully sent health report to master."),
Err(e) => {
error!("Failed to send health report to master: {}", e);

View File

@@ -0,0 +1 @@
pub mod nginx_handler;

View File

@@ -0,0 +1,308 @@
use std::sync::Arc;
use anyhow::Result;
use fs4::tokio::AsyncFileExt;
use tokio::{io::AsyncWriteExt, process::Command};
use tracing::{debug, warn};
use crate::config::settings::NginxSettings;
// TODO: custom error type
#[async_trait::async_trait]
pub trait NginxHandler {
// Reload nginx to apply new config. The config_path is an optional parameter that specifies the path to the nginx config file to be used for this reload operation. If not provided, the default config path will be used.
async fn reload(&self, config_path: Option<&str>) -> Result<()>;
async fn stop(&self) -> Result<()>;
async fn validate(&self, config_path: Option<&str>) -> Result<()>;
async fn get_version(&self) -> Result<String>;
async fn get_status(&self) -> Result<String>;
// Write a new config file for nginx.
// The output_path is a relative path to the nginx config directory of the deployment folder. The actual path to the config should not be assumed by the caller, as it can be different in different environments, but will be promised to be relative to the deployment folder for each the corresponding deployment_id. Path traversal is not allowed.
async fn write_config(
&self,
deployment_id: &str,
config_content: &str,
output_path: &str,
) -> Result<()>;
// Append a new config content to an existing config file for nginx. This is useful for some use cases where we want to keep the existing config and just add some new config content to it. The output_path is a relative path to the nginx config directory of the deployment folder, which should be the same as the one used in write_config function. Path traversal is not allowed.
async fn append_config(
&self,
deployment_id: &str,
config_content: &str,
output_path: &str,
) -> Result<()>;
// clean up old config files that are applied to nginx
// keep only latest n deployments.
async fn cleanup_config(&self, n: usize) -> Result<()>;
}
pub struct NginxHandlerImpl {
settings: Arc<NginxSettings>,
}
impl NginxHandlerImpl {
pub fn new(settings: Arc<NginxSettings>) -> Self {
Self { settings }
}
fn get_nginx_command(&self) -> String {
// TODO: rename the setting for better clarity, it can be a binary path or a custom command
self.settings
.nginx_binary_path
.clone()
.unwrap_or_else(|| "nginx".to_string())
}
fn validate_config_path(config_path: &str) -> Result<()> {
if !std::path::Path::new(config_path).exists() {
anyhow::bail!("Config file not found at path: {}", config_path);
}
if !std::path::Path::new(config_path).is_file() {
anyhow::bail!("Config path is not a file: {}", config_path);
}
Ok(())
}
fn apply_config_path_to_command_vecs<'a>(
command: &'a mut Vec<String>,
config_path: &str,
) -> Result<&'a mut Vec<String>> {
// if given a config path, add it to the end of the command arguments to override the default config path used
Self::validate_config_path(config_path)?;
let parent_dir = match std::path::Path::new(config_path).parent() {
Some(dir) => dir,
// return root
None => std::path::Path::new("/"),
};
// set prefix path to the parent directory of the config file to ensure nginx can find all related files (e.g. certs, conf.d, etc.)
command.push("-p".to_string());
command.push(parent_dir.to_string_lossy().to_string());
// add the config file path to the command arguments to override the default config path used by nginx
command.push("-c".to_string());
command.push(config_path.to_string());
Ok(command)
}
fn get_deployment_dir(&self) -> std::path::PathBuf {
std::path::Path::new(&self.settings.nginx_config_path).join("deployments")
}
fn get_deployment_dir_path(&self, deployment_id: &str) -> std::path::PathBuf {
self.get_deployment_dir().join(deployment_id)
}
async fn get_deployment_config_path(
&self,
deployment_id: &str,
output_path: &str,
create_dir_if_not_exists: bool,
) -> Result<std::path::PathBuf> {
let output_path_obj = std::path::Path::new(output_path);
if output_path_obj.is_absolute() {
anyhow::bail!("Output path must be a relative path");
}
if output_path_obj
.components()
.any(|comp| comp == std::path::Component::ParentDir)
{
anyhow::bail!("Output path must not contain parent directory traversal");
}
let deployment_config_dir = self.get_deployment_dir_path(deployment_id);
if create_dir_if_not_exists {
tokio::fs::create_dir_all(&deployment_config_dir).await?;
}
Ok(deployment_config_dir.join(output_path))
}
}
#[async_trait::async_trait]
impl NginxHandler for NginxHandlerImpl {
async fn reload(&self, config_path: Option<&str>) -> Result<()> {
// TODO: add timeout for the command execution
let reload_command_str = self.settings.override_nginx_reload_command.clone();
let program = match reload_command_str.first() {
Some(cmd) => cmd,
None => &self.get_nginx_command(),
};
let mut reload_command_vec = reload_command_str[1..].to_vec();
// if given a config path, add it to the end of the command arguments to override the default config path used
if let Some(path) = config_path {
Self::apply_config_path_to_command_vecs(&mut reload_command_vec, path)?;
}
let output = Command::new(program)
.args(&reload_command_vec)
.output()
.await?;
if !output.status.success() {
let error_info = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("Failed to reload nginx: {}", error_info.trim());
}
let success_info = String::from_utf8_lossy(&output.stdout);
debug!("Nginx reloaded successfully: {}", success_info.trim());
Ok(())
}
async fn stop(&self) -> Result<()> {
let output = Command::new(self.get_nginx_command())
.arg("-s")
.arg("stop")
.output()
.await?;
if !output.status.success() {
let error_info = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("Failed to stop nginx: {}", error_info.trim());
}
let success_info = String::from_utf8_lossy(&output.stdout);
debug!("Nginx stopped successfully: {}", success_info.trim());
Ok(())
}
async fn validate(&self, config_path: Option<&str>) -> Result<()> {
// TODO: add timeout for the command execution
let validate_command_str = self.settings.override_nginx_test_command.clone();
let program = match validate_command_str.first() {
Some(cmd) => cmd,
None => &self.get_nginx_command(),
};
let mut validate_args = validate_command_str[1..].to_vec();
// if given a config path, add it to the end of the command arguments to override the default config path used
if let Some(path) = config_path {
Self::apply_config_path_to_command_vecs(&mut validate_args, path)?;
}
let output = Command::new(program).args(&validate_args).output().await?;
if !output.status.success() {
let error_info = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("Nginx config validation failed: {}", error_info.trim());
}
let success_info = String::from_utf8_lossy(&output.stdout);
debug!("Nginx config validation succeeded: {}", success_info.trim());
Ok(())
}
async fn get_version(&self) -> Result<String> {
let output = Command::new(self.get_nginx_command())
.arg("-v")
.output()
.await?;
if !output.status.success() {
let error_info = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("Failed to get nginx version: {}", error_info.trim());
}
let version_info = String::from_utf8_lossy(&output.stderr);
Ok(version_info.trim().to_string())
}
async fn get_status(&self) -> Result<String> {
let output = Command::new(self.get_nginx_command())
.arg("-t")
.output()
.await?;
if !output.status.success() {
let error_info = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("Failed to get nginx status: {}", error_info.trim());
}
let status_info = String::from_utf8_lossy(&output.stderr);
Ok(status_info.trim().to_string())
}
async fn write_config(
&self,
deployment_id: &str,
config_content: &str,
output_path: &str,
) -> Result<()> {
let full_output_path = self
.get_deployment_config_path(deployment_id, output_path, true)
.await?;
let mut file = tokio::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(full_output_path)
.await?;
// lock the file for writing to prevent concurrent write issue
file.allocate(config_content.len() as u64).await?;
file.lock_exclusive()?;
file.write_all(config_content.as_bytes()).await?;
file.unlock()?;
file.flush().await?;
Ok(())
}
async fn append_config(
&self,
deployment_id: &str,
config_content: &str,
output_path: &str,
) -> Result<()> {
let full_output_path = self
.get_deployment_config_path(deployment_id, output_path, true)
.await?;
let mut file = tokio::fs::OpenOptions::new()
.write(true)
.create(true)
.append(true)
.open(full_output_path)
.await?;
// lock the file for writing to prevent concurrent write issue
file.allocate(file.metadata().await?.len() + config_content.len() as u64)
.await?;
file.lock_exclusive()?;
file.write_all(config_content.as_bytes()).await?;
file.unlock()?;
file.flush().await?;
Ok(())
}
async fn cleanup_config(&self, n: usize) -> Result<()> {
let deployment_dir = self.get_deployment_dir();
// loop through all files in the deployment dir and delete them
let mut entries = tokio::fs::read_dir(&deployment_dir).await?;
let mut deployment_ids = Vec::new();
while let Some(entry) = entries.next_entry().await? {
let file_type = entry.file_type().await?;
if file_type.is_dir()
&& let Some(deployment_id) = entry.file_name().to_str()
{
deployment_ids.push(deployment_id.to_string());
}
}
// sort the deployment ids by modified time in descending order and keep the latest n deployments, delete the rest
deployment_ids.sort_by_key(|id| {
let path = self.get_deployment_dir_path(id);
std::fs::metadata(path)
.and_then(|meta| meta.modified())
.unwrap_or(std::time::SystemTime::UNIX_EPOCH)
});
for deployment_id in deployment_ids.into_iter().skip(n) {
let path = self.get_deployment_dir_path(&deployment_id);
// ensure path is within the deplyment and nginx directory to prevent accidental deletion of other files
if !path.starts_with(&deployment_dir)
|| !path.starts_with(&self.settings.nginx_config_path)
{
warn!(
"Skipping deletion of path outside of deployment or nginx config directory: {:?}",
path
);
continue;
}
tokio::fs::remove_dir_all(path).await?;
}
Ok(())
}
}