From 4ca59d2bb60310cd78398624655b8ec412c653da Mon Sep 17 00:00:00 2001 From: GW_MC <72297530+GWMCwing@users.noreply.github.com> Date: Sun, 21 Dec 2025 15:32:42 +0800 Subject: [PATCH] feat: add agent module with Nginx service commands and routes - Introduced a new agent module with commands for managing Nginx configurations. - Implemented `NginxService` for handling reload, validation, and configuration writing. - Added routes for status, validation, and configuration writing using Axum. - Created necessary command files: `reload.rs`, `run.rs`, `validate.rs`, `write_config.rs`. - Updated `Cargo.toml` and `Cargo.lock` to include new dependencies. - Added `.gitignore` for the agent module. - Updated `justfile` to include OpenAPI generation for the agent. --- Cargo.lock | 137 ++++++++++- Cargo.toml | 3 +- apps/agent/.gitignore | 1 + apps/agent/Cargo.toml | 14 ++ apps/agent/src/commands.rs | 292 ++++++++++++++++++++++++ apps/agent/src/commands/reload.rs | 98 ++++++++ apps/agent/src/commands/run.rs | 85 +++++++ apps/agent/src/commands/validate.rs | 47 ++++ apps/agent/src/commands/write_config.rs | 131 +++++++++++ apps/agent/src/main.rs | 83 +++++++ apps/agent/src/routes.rs | 130 +++++++++++ justfile | 5 + 12 files changed, 1023 insertions(+), 3 deletions(-) create mode 100644 apps/agent/.gitignore create mode 100644 apps/agent/Cargo.toml create mode 100644 apps/agent/src/commands.rs create mode 100644 apps/agent/src/commands/reload.rs create mode 100644 apps/agent/src/commands/run.rs create mode 100644 apps/agent/src/commands/validate.rs create mode 100644 apps/agent/src/commands/write_config.rs create mode 100644 apps/agent/src/main.rs create mode 100644 apps/agent/src/routes.rs diff --git a/Cargo.lock b/Cargo.lock index 6966840..5826b40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -489,6 +489,16 @@ dependencies = [ "windows-link", ] +[[package]] +name = "chrono-tz" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6139a8597ed92cf816dfb33f5dd6cf0bb93a6adc938f11039f371bc5bcd26c3" +dependencies = [ + "chrono", + "phf", +] + [[package]] name = "clap" version = "4.5.53" @@ -690,6 +700,17 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "croner" +version = "3.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4aa42bcd3d846ebf66e15bd528d1087f75d1c6c1c66ebff626178a106353c576" +dependencies = [ + "chrono", + "derive_builder", + "strum", +] + [[package]] name = "crossbeam-queue" version = "0.3.12" @@ -790,6 +811,7 @@ dependencies = [ "ident_case", "proc-macro2", "quote", + "strsim", "syn 2.0.110", ] @@ -864,6 +886,37 @@ dependencies = [ "serde_core", ] +[[package]] +name = "derive_builder" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "507dfb09ea8b7fa618fcf76e953f4f5e192547945816d5358edffe39f6f94947" +dependencies = [ + "derive_builder_macro", +] + +[[package]] +name = "derive_builder_core" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d5bcf7b024d6835cfb3d473887cd966994907effbe9227e8c8219824d06c4e8" +dependencies = [ + "darling 0.20.11", + "proc-macro2", + "quote", + "syn 2.0.110", +] + +[[package]] +name = "derive_builder_macro" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c" +dependencies = [ + "derive_builder_core", + "syn 2.0.110", +] + [[package]] name = "derive_more" version = "2.0.1" @@ -1474,9 +1527,9 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "1.8.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1744436df46f0bde35af3eda22aeaba453aada65d8f1c171cd8a5f59030bd69f" +checksum = "2ab2d4f250c3d7b1c9fcdff1cece94ea4e2dfbec68614f7b87cb205f24ca9d11" dependencies = [ "atomic-waker", "bytes", @@ -2017,6 +2070,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" +[[package]] +name = "num-derive" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed3955f1a9c7c0c15e092f9c887db08b1fc683305fdf6eb6684f22555355e202" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.110", +] + [[package]] name = "num-integer" version = "0.1.46" @@ -2324,6 +2388,24 @@ dependencies = [ "serde", ] +[[package]] +name = "phf" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "913273894cec178f401a31ec4b656318d95473527be05c0752cc41cdc32be8b7" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_shared" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06005508882fb681fd97892ecff4b7fd0fee13ef1aa569f8695dae7ab9099981" +dependencies = [ + "siphasher", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -3339,6 +3421,12 @@ dependencies = [ "time", ] +[[package]] +name = "siphasher" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" + [[package]] name = "slab" version = "0.4.11" @@ -3650,6 +3738,21 @@ name = "strum" version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af23d6f6c1a224baef9d3f61e287d2761385a5b88fdab4eb4c6f11aeb54c4bcf" +dependencies = [ + "strum_macros", +] + +[[package]] +name = "strum_macros" +version = "0.27.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7695ce3845ea4b33927c055a39dc438a45b059f7c1b3d91d38d10355fb8cbca7" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "syn 2.0.110", +] [[package]] name = "subtle" @@ -3856,6 +3959,22 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "tokio-cron-scheduler" +version = "0.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f50e41f200fd8ed426489bd356910ede4f053e30cebfbd59ef0f856f0d7432a" +dependencies = [ + "chrono", + "chrono-tz", + "croner", + "num-derive", + "num-traits", + "tokio", + "tracing", + "uuid", +] + [[package]] name = "tokio-macros" version = "2.6.0" @@ -4699,6 +4818,20 @@ dependencies = [ "hashlink", ] +[[package]] +name = "yanpm-agent" +version = "0.1.0" +dependencies = [ + "axum", + "clap", + "serde", + "serde_json", + "tokio", + "tokio-cron-scheduler", + "tracing", + "tracing-subscriber", +] + [[package]] name = "yansi" version = "1.0.1" diff --git a/Cargo.toml b/Cargo.toml index 9c3e719..c0031ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,8 +1,9 @@ [workspace] -members = [ +members = [ "apps/api", "apps/container", "apps/cli", + "apps/agent", "public/shared", "public/database", "public/migration" diff --git a/apps/agent/.gitignore b/apps/agent/.gitignore new file mode 100644 index 0000000..f0a56ac --- /dev/null +++ b/apps/agent/.gitignore @@ -0,0 +1 @@ +*.sock \ No newline at end of file diff --git a/apps/agent/Cargo.toml b/apps/agent/Cargo.toml new file mode 100644 index 0000000..53c344f --- /dev/null +++ b/apps/agent/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "yanpm-agent" +version = "0.1.0" +edition = "2024" + +[dependencies] +axum = { version = "0.8.7", features = ["form", "http1", "json", "matched-path", "original-uri", "query", "tokio", "tower-log", "tracing", "macros"] } +tokio = { version = "1", features = ["fs", "io-util", "io-std", "macros", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "sync", "time", "tracing"] } +tracing = { version = "0.1.41", features = ["std", "attributes"] } +tracing-subscriber = { version = "0.3.20", features = ["smallvec", "fmt", "ansi", "tracing-log", "std", "json", "serde", "serde_json", "time", "tracing"] } +serde_json = { version = "1.0.145", features = ["std"] } +serde = { version = "1.0.228", features = ["std", "derive"] } +tokio-cron-scheduler = { version = "0.15.1", features = ["signal"] } +clap = { version = "4", features = ["derive"] } diff --git a/apps/agent/src/commands.rs b/apps/agent/src/commands.rs new file mode 100644 index 0000000..377bd6c --- /dev/null +++ b/apps/agent/src/commands.rs @@ -0,0 +1,292 @@ +mod reload; +mod run; +mod validate; +mod write_config; + +use std::{ + collections::HashMap, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, +}; + +use tokio::sync::{Mutex, RwLock}; +use tokio_cron_scheduler::{Job, JobScheduler}; +use tracing::{error, info}; + +use crate::commands::write_config::INTERNAL_CONFIG_FOLDER_NAME; + +const OLD_CONFIG_CLEANUP_THRESHOLD: u64 = 3600; + +pub struct NginxService { + // lock for nginx reload, and timestamp tracking + nginx_lock: Mutex<()>, + last_applied: AtomicU64, + // lock for write_config per (config_name, timestamp) + #[allow(clippy::type_complexity)] + write_config_lock: RwLock>>>, + // commands + reload_cmd: Arc, + validate_cmd: Arc, + write_config_cmd: Arc, +} + +impl NginxService { + pub async fn new( + scheduler: Arc, + nginx_config_dir: std::path::PathBuf, + ) -> Result, Box> { + let nginx_service = Arc::new(NginxService { + nginx_lock: Mutex::new(()), + last_applied: AtomicU64::new(0), + write_config_lock: RwLock::new(HashMap::new()), + // commands + reload_cmd: Arc::new(reload::ReloadCommand::default()), + validate_cmd: Arc::new(validate::ValidateCommand::new(nginx_config_dir.clone())), + write_config_cmd: Arc::new(write_config::WriteConfigCommand::new(nginx_config_dir)), + }); + let mut nginx_service_clone = nginx_service.clone(); + + scheduler + .clone() + // cleanup every 10 minutes + .add(Job::new_async("0 */10 * * * *", move |_uuid, _l| { + info!("Running nginx_service cleanup job"); + let nginx_service_clone = nginx_service_clone.clone(); + let job = Box::pin(async move { + nginx_service_clone.cleanup_unused_lock().await; + }); + info!("NginxService cleanup job completed"); + job + })?) + .await?; + + nginx_service_clone = nginx_service.clone(); + + scheduler + .clone() + // cleanup every hour + .add(Job::new_async("0 0 */1 * * *", move |_uuid, _l| { + info!("Running nginx_service old config cleanup job"); + let nginx_service_clone = nginx_service_clone.clone(); + let job = Box::pin(async move { + nginx_service_clone.cleanup_old_configs().await; + }); + info!("NginxService old config cleanup job completed"); + job + })?) + .await?; + + Ok(nginx_service) + } + + pub async fn validate_and_reload( + &self, + config_name: &str, + timestamp: u64, + ) -> Result<(i32, String), Box> { + let cur = self.last_applied.load(Ordering::SeqCst); + if cur > timestamp { + return Err("Another operation is in progress with higher timestamp value".into()); + } + + // acquire write lock to update nginx_lock + let _nginx_guard = self.nginx_lock.lock().await; + // acquire write lock for this config+timestamp + let rw_lock = self.acquire_file_write_lock(config_name, timestamp).await; + let _guard = rw_lock.write().await; + + match self + .reload_cmd + .validate_and_reload(config_name, timestamp, self.validate_cmd.clone()) + .await + { + Ok((code, output)) => { + // update last_applied + self.last_applied.store(timestamp, Ordering::SeqCst); + Ok((code, output)) + } + Err(e) => Err(e), + } + } + + pub async fn write_config( + &self, + config_name: &str, + timestamp: u64, + content: &str, + ) -> Result<(), Box> { + let rw_lock = self.acquire_file_write_lock(config_name, timestamp).await; + let _guard = rw_lock.write().await; + // call the write_config command + self.write_config_cmd + .write_config(config_name, timestamp, content) + .await + } + + pub async fn validate( + &self, + config_name: &str, + timestamp: u64, + ) -> Result<(i32, String), Box> { + self.validate_cmd.validate(config_name, timestamp).await + } + + async fn cleanup_unused_lock(&self) { + let mut _write_lock = self.write_config_lock.write().await; + (*_write_lock).retain(|_, lock| { + // retain only locks that are currently held (readers or writers) + lock.try_write().is_err() + }); + } + + async fn cleanup_old_configs(&self) { + // list all files within nginx_config_dir/YANPM that is older than now - OLD_CONFIG_CLEANUP_THRESHOLD + let cutoff = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() + - OLD_CONFIG_CLEANUP_THRESHOLD; + + let nginx_config_dir = self.validate_cmd.nginx_config_dir(); + let yanpm_dir = nginx_config_dir.join(INTERNAL_CONFIG_FOLDER_NAME); + + let read_dir = match tokio::fs::read_dir(&yanpm_dir).await { + Ok(rd) => rd, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + // directory does not exist, nothing to clean up + return; + } + Err(e) => { + error!( + "Error reading {} config directory {}: {}", + INTERNAL_CONFIG_FOLDER_NAME, + yanpm_dir.display(), + e + ); + return; + } + }; + + tokio::pin!(read_dir); + while let Some(entry) = read_dir.next_entry().await.unwrap_or(None) { + let metadata = match entry.metadata().await { + Ok(md) => md, + Err(e) => { + error!( + "Error getting metadata for file {}: {}", + entry.path().display(), + e + ); + continue; + } + }; + if let Ok(modified) = metadata.modified() + && let Ok(duration) = modified.duration_since(std::time::UNIX_EPOCH) + { + let mtime_secs = duration.as_secs(); + if mtime_secs < cutoff { + // file is older than cutoff, remove it + if let Err(e) = tokio::fs::remove_file(entry.path()).await { + error!( + "Error removing old config file {}: {}", + entry.path().display(), + e + ); + } else { + info!("Removed old config file {}", entry.path().display()); + } + } + } + } + } + + async fn acquire_file_write_lock(&self, config_name: &str, timestamp: u64) -> Arc> { + let mut write_lock = self.write_config_lock.write().await; + write_lock + .entry((config_name.to_string(), timestamp)) + .or_insert_with(|| Arc::new(RwLock::new(()))) + .clone() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::error::Error; + use std::sync::Arc as StdArc; + use tokio::time::{Duration, sleep}; + + impl NginxService { + // Test helper that simulates a long-running reload without invoking external commands. + pub async fn test_simulated_reload( + &self, + config_name: &str, + timestamp: u64, + delay_ms: u64, + ) -> Result<(), Box> { + // pre-check + let cur = self.last_applied.load(Ordering::SeqCst); + if cur >= timestamp { + return Err("stale".into()); + } + + // acquire exclusive lock and re-check + let _nginx_guard = self.nginx_lock.lock().await; + let cur2 = self.last_applied.load(Ordering::SeqCst); + if cur2 >= timestamp { + return Err("stale".into()); + } + + // per-file lock + let rw_lock = self.acquire_file_write_lock(config_name, timestamp).await; + let _guard = rw_lock.write().await; + + // simulate operation + sleep(Duration::from_millis(delay_ms)).await; + + // on success update last_applied + let mut prev = self.last_applied.load(Ordering::SeqCst); + while prev < timestamp { + match self.last_applied.compare_exchange( + prev, + timestamp, + Ordering::SeqCst, + Ordering::SeqCst, + ) { + Ok(_) => break, + Err(next) => prev = next, + } + } + + Ok(()) + } + } + + #[tokio::test] + async fn concurrent_stale_is_rejected() { + let scheduler = StdArc::new(JobScheduler::new().await.unwrap()); + let svc = NginxService::new(scheduler.clone(), std::env::temp_dir()) + .await + .unwrap(); + + let s1 = svc.clone(); + let h1 = tokio::spawn(async move { s1.test_simulated_reload("cfg", 2, 200).await }); + + // let second start shortly after first so it will wait for the mutex + sleep(Duration::from_millis(20)).await; + + let s2 = svc.clone(); + let h2 = tokio::spawn(async move { s2.test_simulated_reload("cfg", 1, 10).await }); + + let r1 = h1.await.unwrap(); + assert!(r1.is_ok(), "first (newer) task should succeed"); + + let r2 = h2.await.unwrap(); + assert!( + r2.is_err(), + "second (older) task should be rejected as stale" + ); + } +} diff --git a/apps/agent/src/commands/reload.rs b/apps/agent/src/commands/reload.rs new file mode 100644 index 0000000..601c6a3 --- /dev/null +++ b/apps/agent/src/commands/reload.rs @@ -0,0 +1,98 @@ +use std::path::Path; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; + +use tokio::sync::Mutex; +use tracing::error; + +use crate::commands::write_config::INTERNAL_CONFIG_FOLDER_NAME; +use crate::commands::{run::run_cmd, validate::ValidateCommand}; + +pub struct ReloadCommand { + is_reloading: Mutex, +} + +struct ReloadResetGuard<'a> { + guard: tokio::sync::MutexGuard<'a, bool>, +} + +impl<'a> Drop for ReloadResetGuard<'a> { + fn drop(&mut self) { + *self.guard = false; + } +} + +impl Default for ReloadCommand { + fn default() -> Self { + Self { + is_reloading: Mutex::new(false), + } + } +} + +impl ReloadCommand { + pub async fn validate_and_reload( + &self, + config_name: &str, + timestamp: u64, + validate_cmd: Arc, + ) -> Result<(i32, String), Box> { + // ensure the written fragment exists + validate_cmd.validate(config_name, timestamp).await?; + + // Now atomically swap the YANPM.conf symlink to point to the new fragment + // so nginx -t validates the composed main config. If validation fails, + // attempt to restore the previous symlink. + let filename = crate::commands::run::to_file_name(config_name, timestamp)?; + let nginx_dir = validate_cmd.nginx_config_dir(); + let symlink_path = nginx_dir.join("YANPM.conf"); + let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos(); + let tmp_name = format!("YANPM.conf.tmp.{}.{}", std::process::id(), now); + let tmp_path = nginx_dir.join(&tmp_name); + + // prepare relative target: INTERNAL_CONFIG_FOLDER_NAME/ + let rel_target = Path::new(INTERNAL_CONFIG_FOLDER_NAME).join(&filename); + + // read previous target if exists + let previous_target = std::fs::read_link(&symlink_path).ok(); + + // Acquire reload guard before mutating the symlink to avoid races + let reloading_lock = self.is_reloading.lock().await; + if *reloading_lock { + return Err("Reload already in progress".into()); + } + // set flag to true and ensure it is reset on drop + let mut mut_guard = reloading_lock; + *mut_guard = true; + let _reset_guard = ReloadResetGuard { guard: mut_guard }; + + // create temporary symlink and atomically rename into place + std::os::unix::fs::symlink(&rel_target, &tmp_path)?; + tokio::fs::rename(&tmp_path, &symlink_path).await?; + + // validate composed main config now that symlink points to new fragment + if let Err(e) = validate_cmd.validate_all().await { + // restore previous symlink state while still holding the guard + if let Some(prev) = previous_target { + let restore_tmp = + nginx_dir.join(format!("YANPM.conf.restore.{}.{}", std::process::id(), now)); + std::os::unix::fs::symlink(&prev, &restore_tmp)?; + if let Err(err) = tokio::fs::rename(&restore_tmp, &symlink_path).await { + error!( + "Failed to restore previous YANPM.conf symlink after validation error: {}", + err + ); + } + } else if let Err(err) = tokio::fs::remove_file(&symlink_path).await { + error!( + "Failed to remove YANPM.conf symlink after validation error: {}", + err + ); + } + return Err(e); + } + + // reload the running nginx master process (no -c) so it reloads its configured main config + run_cmd("nginx", &["-s", "reload"], 10).await + } +} diff --git a/apps/agent/src/commands/run.rs b/apps/agent/src/commands/run.rs new file mode 100644 index 0000000..5f60862 --- /dev/null +++ b/apps/agent/src/commands/run.rs @@ -0,0 +1,85 @@ +use std::time::Duration; + +use tokio::{process::Command, time::timeout}; +use tracing::error; + +pub fn to_file_name( + config_name: &str, + timestamp: u64, +) -> Result> { + // reject empty or unsafe names to avoid path traversal or invalid filesystem chars + if config_name.is_empty() { + return Err("config_name is empty".into()); + } + if config_name.len() > 255 { + return Err("config_name too long".into()); + } + if config_name.contains('/') || config_name.contains('\\') || config_name.contains("..") { + return Err("config_name contains invalid path characters".into()); + } + if !config_name + .chars() + .all(|c| c.is_ascii_alphanumeric() || "-._".contains(c)) + { + return Err("config_name contains invalid characters".into()); + } + + Ok(format!("{}_{}.conf", timestamp, config_name)) +} + +pub async fn run_cmd( + cmd: &str, + args: &[&str], + dur_s: u64, +) -> Result<(i32, String), Box> { + let mut c = Command::new(cmd); + c.args(args); + let res = timeout(Duration::from_secs(dur_s), c.output()).await; + let out = match res { + Ok(Ok(out)) => out, + Ok(Err(e)) => return Err(Box::new(e)), + Err(_) => { + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "command timeout", + ))); + } + }; + let code = out.status.code().unwrap_or(-1); + let output = String::from_utf8_lossy(&[out.stdout, out.stderr].concat()).to_string(); + if code != 0 { + error!("command failed ({}): {}", code, output); + return Err(format!("command failed ({}): {}", code, output).into()); + } + Ok((code, output)) +} + +#[cfg(test)] +mod tests { + use super::to_file_name; + + #[test] + fn to_file_name_valid() { + let res = to_file_name("myconf", 1234).expect("should succeed"); + assert_eq!(res, "1234_myconf.conf"); + } + + #[test] + fn to_file_name_empty() { + assert!(to_file_name("", 1).is_err()); + } + + #[test] + fn to_file_name_invalid_chars() { + assert!(to_file_name("bad/name", 1).is_err()); + assert!(to_file_name("bad\\name", 1).is_err()); + assert!(to_file_name("bad..name", 1).is_err()); + assert!(to_file_name("bad$name", 1).is_err()); + } + + #[test] + fn to_file_name_too_long() { + let long = "a".repeat(300); + assert!(to_file_name(&long, 1).is_err()); + } +} diff --git a/apps/agent/src/commands/validate.rs b/apps/agent/src/commands/validate.rs new file mode 100644 index 0000000..ed02987 --- /dev/null +++ b/apps/agent/src/commands/validate.rs @@ -0,0 +1,47 @@ +use crate::commands::{run::run_cmd, write_config::INTERNAL_CONFIG_FOLDER_NAME}; +use std::path::PathBuf; + +pub struct ValidateCommand { + nginx_config_dir: PathBuf, +} + +impl ValidateCommand { + pub fn new(nginx_config_dir: PathBuf) -> Self { + Self { nginx_config_dir } + } + + pub fn nginx_config_dir(&self) -> PathBuf { + self.nginx_config_dir.clone() + } + + pub async fn validate_all( + &self, + ) -> Result<(i32, String), Box> { + run_cmd("nginx", &["-t"], 10).await + } + + pub async fn validate( + &self, + config_name: &str, + timestamp: u64, + ) -> Result<(i32, String), Box> { + let filename = crate::commands::run::to_file_name(config_name, timestamp)?; + // fragments are written into the YANPM subdirectory + let full_path = self + .nginx_config_dir + .join(INTERNAL_CONFIG_FOLDER_NAME) + .join(&filename); + + // ensure the fragment file exists + if tokio::fs::metadata(&full_path).await.is_err() { + return Err(format!("Config file not found: {}", full_path.display()).into()); + } + + run_cmd( + "nginx", + &["-t", "-c", full_path.to_str().ok_or("invalid config path")?], + 10, + ) + .await + } +} diff --git a/apps/agent/src/commands/write_config.rs b/apps/agent/src/commands/write_config.rs new file mode 100644 index 0000000..350a4e7 --- /dev/null +++ b/apps/agent/src/commands/write_config.rs @@ -0,0 +1,131 @@ +use std::os::unix::fs::PermissionsExt; +use std::path::PathBuf; +use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::io::AsyncWriteExt; + +use crate::commands::run::to_file_name; + +pub const INTERNAL_CONFIG_FOLDER_NAME: &str = "YANPM"; +const FILE_SIZE_LIMIT: usize = 10 * 1024 * 1024; // 10MB + +pub struct WriteConfigCommand { + nginx_config_dir: PathBuf, +} + +impl WriteConfigCommand { + pub fn new(nginx_config_dir: PathBuf) -> Self { + Self { nginx_config_dir } + } + pub async fn write_config( + &self, + config_name: &str, + timestamp: u64, + content: &str, + ) -> Result<(), Box> { + let filename = to_file_name(config_name, timestamp)?; + let path = self.nginx_config_dir.clone(); + // ensure main config dir exists + tokio::fs::create_dir_all(&path).await?; + + // create YANPM subdir where fragment files live + let yanpm_dir = path.join(INTERNAL_CONFIG_FOLDER_NAME); + tokio::fs::create_dir_all(&yanpm_dir).await?; + let final_path = yanpm_dir.join(&filename); + + // limit size to 10MB + if content.len() > FILE_SIZE_LIMIT { + return Err(format!( + "content exceeds {}MB size limit", + FILE_SIZE_LIMIT / (1024 * 1024) + ) + .into()); + } + + // create a temporary filename in the same directory for atomic replace + let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos(); + let tmp_filename = format!("{}.tmp.{}.{}", filename, std::process::id(), now); + // create tmp file in the same directory as final file to ensure atomic rename + let tmp_path = yanpm_dir.join(tmp_filename); + + let mut file = tokio::fs::OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(&tmp_path) + .await?; + file.write_all(content.as_bytes()).await?; + // ensure data is flushed to disk; propagate errors + file.sync_all().await?; + + // atomically move the tmp file into the YANPM dir + tokio::fs::rename(&tmp_path, &final_path).await?; + + // set explicit permissions (rw-r-----) + tokio::fs::set_permissions(&final_path, std::fs::Permissions::from_mode(0o640)).await?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::{INTERNAL_CONFIG_FOLDER_NAME, WriteConfigCommand}; + use std::time::SystemTime; + use std::time::UNIX_EPOCH; + + #[tokio::test] + async fn write_config_success_and_cleanup() { + let base = std::env::temp_dir().join(format!( + "yanpm_test_{}_{}", + std::process::id(), + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos() + )); + // ensure clean + let _ = tokio::fs::remove_dir_all(&base).await; + let cmd = WriteConfigCommand::new(base.clone()); + + let config_name = "unittest"; + let timestamp = 42u64; + let content = "hello world"; + + cmd.write_config(config_name, timestamp, content) + .await + .expect("write should succeed"); + + let filename = super::to_file_name(config_name, timestamp).unwrap(); + let final_path = base.join(INTERNAL_CONFIG_FOLDER_NAME).join(&filename); + let data = tokio::fs::read_to_string(&final_path) + .await + .expect("file should exist"); + assert_eq!(data, content); + + // cleanup + tokio::fs::remove_dir_all(&base).await.expect("cleanup"); + } + + #[tokio::test] + async fn write_config_size_limit() { + let base = std::env::temp_dir().join(format!( + "yanpm_test_{}_{}", + std::process::id(), + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos() + )); + let _ = tokio::fs::remove_dir_all(&base).await; + let cmd = WriteConfigCommand::new(base.clone()); + + // exceed 10MB limit + let large = vec![b'a'; 10 * 1024 * 1024 + 1]; + let large_str = String::from_utf8_lossy(&large).to_string(); + + let res = cmd.write_config("big", 1, &large_str).await; + assert!(res.is_err()); + + let _ = tokio::fs::remove_dir_all(&base).await; + } +} diff --git a/apps/agent/src/main.rs b/apps/agent/src/main.rs new file mode 100644 index 0000000..f977490 --- /dev/null +++ b/apps/agent/src/main.rs @@ -0,0 +1,83 @@ +#![forbid(unsafe_code)] + +mod commands; +mod routes; + +use axum::routing::get; +use axum::{Router, routing::post}; +use clap::Parser; +use std::os::unix::fs::PermissionsExt; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::net::UnixListener; +use tracing::{error, info}; + +use crate::commands::NginxService; +use crate::routes::{status, validate, validate_and_reload, write_config}; + +#[derive(Parser)] +struct Args { + /// Unix socket path to bind the daemon to + sock: String, + + /// Directory where generated nginx config files will be written + #[arg(long, default_value = "/etc/nginx/conf.d")] + nginx_config_dir: PathBuf, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args = Args::parse(); + let sock = args.sock; + + let path = PathBuf::from(&sock); + if let Some(dir) = path.parent() { + tokio::fs::create_dir_all(dir).await?; + // permissive; set tighter perms in production via image/build steps + tokio::fs::set_permissions(dir, std::fs::Permissions::from_mode(0o770)).await?; + } + // If an existing path exists at the socket location, ensure it's a socket + match tokio::fs::metadata(&path).await { + Ok(md) => { + use std::os::unix::fs::FileTypeExt; + if md.file_type().is_socket() { + tokio::fs::remove_file(&path).await?; + } else { + return Err( + format!("Socket path {} exists and is not a socket", path.display()).into(), + ); + } + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} + Err(e) => return Err(e.into()), + } + + // bind using tokio's UnixListener (avoids converting a blocking std listener) + let listener = UnixListener::bind(&path)?; + // set socket perms to 0660 (best-effort) + if let Err(err) = + tokio::fs::set_permissions(&path, std::fs::Permissions::from_mode(0o660)).await + { + error!( + "Warning: failed to set permissions on socket {}: {}", + path.display(), + err + ); + } + + let scheduler = Arc::new(tokio_cron_scheduler::JobScheduler::new().await?); + + let app = Router::new() + .route("/status", get(status)) + .route("/validate_and_reload", post(validate_and_reload)) + .route("/validate", post(validate)) + .route("/write_config", post(write_config)) + .with_state(NginxService::new(scheduler.clone(), args.nginx_config_dir).await?); + + scheduler.start().await?; + + info!("Starting yanpm-daemon on unix socket: {}", sock); + axum::serve::serve(listener, app).await?; + + Ok(()) +} diff --git a/apps/agent/src/routes.rs b/apps/agent/src/routes.rs new file mode 100644 index 0000000..77c8733 --- /dev/null +++ b/apps/agent/src/routes.rs @@ -0,0 +1,130 @@ +use axum::Json; +use axum::extract::State; +use axum::http::StatusCode; +use axum::response::IntoResponse; +use serde::{Deserialize, Serialize}; +use serde_json::{Value, from_value}; +use std::sync::Arc; +use tracing::warn; + +use crate::commands::NginxService; + +#[derive(Serialize)] +pub struct StatusResp { + pub ok: bool, +} + +pub async fn status() -> impl IntoResponse { + let resp = StatusResp { ok: true }; + (axum::http::StatusCode::OK, axum::Json(resp)) +} + +#[derive(Serialize)] +pub struct ValidateAndReloadResp { + pub rc: i32, + pub ro: String, +} + +#[derive(Deserialize)] +pub struct ValidateBody { + config_name: String, + timestamp: u64, +} + +pub async fn validate( + State(nginx_controller): State>, + Json(payload): Json, +) -> impl IntoResponse { + let params: ValidateBody = match from_value(payload) { + Ok(req) => req, + Err(e) => { + warn!("Invalid validate request: {}", e); + return (StatusCode::BAD_REQUEST).into_response(); + } + }; + + let (_code, _output) = match nginx_controller + .validate(¶ms.config_name, params.timestamp) + .await + { + Ok(res) => res, + Err(e) => { + let resp = serde_json::json!({ "error": e.to_string() }); + return (StatusCode::INTERNAL_SERVER_ERROR, axum::Json(resp)).into_response(); + } + }; + + (axum::http::StatusCode::OK,).into_response() +} + +#[derive(Deserialize)] +pub struct ValidateAndReloadBody { + config_name: String, + timestamp: u64, +} + +pub async fn validate_and_reload( + State(nginx_controller): State>, + Json(payload): Json, +) -> impl IntoResponse { + let params: ValidateAndReloadBody = match from_value(payload) { + Ok(req) => req, + Err(e) => { + warn!("Invalid validate_and_reload request: {}", e); + return (StatusCode::BAD_REQUEST).into_response(); + } + }; + + let (code, output) = match nginx_controller + .validate_and_reload(¶ms.config_name, params.timestamp) + .await + { + Ok(res) => res, + Err(e) => { + let resp = ValidateAndReloadResp { + rc: -1, + ro: e.to_string(), + }; + return (StatusCode::INTERNAL_SERVER_ERROR, axum::Json(resp)).into_response(); + } + }; + + let resp = ValidateAndReloadResp { + rc: code, + ro: output, + }; + (axum::http::StatusCode::OK, axum::Json(resp)).into_response() +} + +#[derive(Deserialize)] +pub struct WriteConfigBody { + config_name: String, + timestamp: u64, + content: String, +} + +pub async fn write_config( + State(nginx_controller): State>, + Json(payload): Json, +) -> impl IntoResponse { + let body: WriteConfigBody = match from_value(payload) { + Ok(req) => req, + Err(e) => { + warn!("Invalid write_config request: {}", e); + return (StatusCode::BAD_REQUEST).into_response(); + } + }; + + match nginx_controller + .write_config(&body.config_name, body.timestamp, &body.content) + .await + { + Ok(_) => (), + Err(e) => { + let resp = serde_json::json!({ "error": e.to_string() }); + return (StatusCode::INTERNAL_SERVER_ERROR, axum::Json(resp)).into_response(); + } + }; + + (axum::http::StatusCode::OK,).into_response() +} diff --git a/justfile b/justfile index 91832fc..37cf4ba 100644 --- a/justfile +++ b/justfile @@ -48,6 +48,11 @@ generate-openapi: # Generate API client for frontend cd apps/frontend && \ pnpm generate:openapi + # Generate OpenAPI spec for agent + cd apps/agent && \ + cargo run -- --generate-openapi --openapi-output ./openapi.yaml + # TODO: Generate API client for agent in api + generate-all: generate-entity generate-openapi