mod reload; mod run; mod validate; mod write_config; use std::{ collections::HashMap, sync::{ Arc, atomic::{AtomicU64, Ordering}, }, }; use tokio::sync::{Mutex, RwLock}; use tokio_cron_scheduler::{Job, JobScheduler}; use tracing::{error, info}; use crate::commands::write_config::INTERNAL_CONFIG_FOLDER_NAME; const OLD_CONFIG_CLEANUP_THRESHOLD: u64 = 3600; pub struct NginxService { // lock for nginx reload, and timestamp tracking nginx_lock: Mutex<()>, last_applied: AtomicU64, // lock for write_config per (config_name, timestamp) #[allow(clippy::type_complexity)] write_config_lock: RwLock>>>, // commands reload_cmd: Arc, validate_cmd: Arc, write_config_cmd: Arc, } impl NginxService { pub async fn new( scheduler: Arc, nginx_config_dir: std::path::PathBuf, ) -> Result, Box> { let nginx_service = Arc::new(NginxService { nginx_lock: Mutex::new(()), last_applied: AtomicU64::new(0), write_config_lock: RwLock::new(HashMap::new()), // commands reload_cmd: Arc::new(reload::ReloadCommand::default()), validate_cmd: Arc::new(validate::ValidateCommand::new(nginx_config_dir.clone())), write_config_cmd: Arc::new(write_config::WriteConfigCommand::new(nginx_config_dir)), }); let mut nginx_service_clone = nginx_service.clone(); scheduler .clone() // cleanup every 10 minutes .add(Job::new_async("0 */10 * * * *", move |_uuid, _l| { info!("Running nginx_service cleanup job"); let nginx_service_clone = nginx_service_clone.clone(); let job = Box::pin(async move { nginx_service_clone.cleanup_unused_lock().await; }); info!("NginxService cleanup job completed"); job })?) .await?; nginx_service_clone = nginx_service.clone(); scheduler .clone() // cleanup every hour .add(Job::new_async("0 0 */1 * * *", move |_uuid, _l| { info!("Running nginx_service old config cleanup job"); let nginx_service_clone = nginx_service_clone.clone(); let job = Box::pin(async move { nginx_service_clone.cleanup_old_configs().await; }); info!("NginxService old config cleanup job completed"); job })?) .await?; Ok(nginx_service) } pub async fn validate_and_reload( &self, config_name: &str, timestamp: u64, ) -> Result<(i32, String), Box> { let cur = self.last_applied.load(Ordering::SeqCst); if cur > timestamp { return Err("Another operation is in progress with higher timestamp value".into()); } // acquire write lock to update nginx_lock let _nginx_guard = self.nginx_lock.lock().await; // acquire write lock for this config+timestamp let rw_lock = self.acquire_file_write_lock(config_name, timestamp).await; let _guard = rw_lock.write().await; match self .reload_cmd .validate_and_reload(config_name, timestamp, self.validate_cmd.clone()) .await { Ok((code, output)) => { // update last_applied self.last_applied.store(timestamp, Ordering::SeqCst); Ok((code, output)) } Err(e) => Err(e), } } pub async fn write_config( &self, config_name: &str, timestamp: u64, content: &str, ) -> Result<(), Box> { let rw_lock = self.acquire_file_write_lock(config_name, timestamp).await; let _guard = rw_lock.write().await; // call the write_config command self.write_config_cmd .write_config(config_name, timestamp, content) .await } pub async fn validate( &self, config_name: &str, timestamp: u64, ) -> Result<(i32, String), Box> { self.validate_cmd.validate(config_name, timestamp).await } async fn cleanup_unused_lock(&self) { let mut _write_lock = self.write_config_lock.write().await; (*_write_lock).retain(|_, lock| { // retain only locks that are currently held (readers or writers) lock.try_write().is_err() }); } async fn cleanup_old_configs(&self) { // list all files within nginx_config_dir/YANPM that is older than now - OLD_CONFIG_CLEANUP_THRESHOLD let cutoff = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs() - OLD_CONFIG_CLEANUP_THRESHOLD; let nginx_config_dir = self.validate_cmd.nginx_config_dir(); let yanpm_dir = nginx_config_dir.join(INTERNAL_CONFIG_FOLDER_NAME); let read_dir = match tokio::fs::read_dir(&yanpm_dir).await { Ok(rd) => rd, Err(e) if e.kind() == std::io::ErrorKind::NotFound => { // directory does not exist, nothing to clean up return; } Err(e) => { error!( "Error reading {} config directory {}: {}", INTERNAL_CONFIG_FOLDER_NAME, yanpm_dir.display(), e ); return; } }; tokio::pin!(read_dir); while let Some(entry) = read_dir.next_entry().await.unwrap_or(None) { let metadata = match entry.metadata().await { Ok(md) => md, Err(e) => { error!( "Error getting metadata for file {}: {}", entry.path().display(), e ); continue; } }; if let Ok(modified) = metadata.modified() && let Ok(duration) = modified.duration_since(std::time::UNIX_EPOCH) { let mtime_secs = duration.as_secs(); if mtime_secs < cutoff { // file is older than cutoff, remove it if let Err(e) = tokio::fs::remove_file(entry.path()).await { error!( "Error removing old config file {}: {}", entry.path().display(), e ); } else { info!("Removed old config file {}", entry.path().display()); } } } } } async fn acquire_file_write_lock(&self, config_name: &str, timestamp: u64) -> Arc> { let mut write_lock = self.write_config_lock.write().await; write_lock .entry((config_name.to_string(), timestamp)) .or_insert_with(|| Arc::new(RwLock::new(()))) .clone() } } #[cfg(test)] mod tests { use super::*; use std::error::Error; use std::sync::Arc as StdArc; use tokio::time::{Duration, sleep}; impl NginxService { // Test helper that simulates a long-running reload without invoking external commands. pub async fn test_simulated_reload( &self, config_name: &str, timestamp: u64, delay_ms: u64, ) -> Result<(), Box> { // pre-check let cur = self.last_applied.load(Ordering::SeqCst); if cur >= timestamp { return Err("stale".into()); } // acquire exclusive lock and re-check let _nginx_guard = self.nginx_lock.lock().await; let cur2 = self.last_applied.load(Ordering::SeqCst); if cur2 >= timestamp { return Err("stale".into()); } // per-file lock let rw_lock = self.acquire_file_write_lock(config_name, timestamp).await; let _guard = rw_lock.write().await; // simulate operation sleep(Duration::from_millis(delay_ms)).await; // on success update last_applied let mut prev = self.last_applied.load(Ordering::SeqCst); while prev < timestamp { match self.last_applied.compare_exchange( prev, timestamp, Ordering::SeqCst, Ordering::SeqCst, ) { Ok(_) => break, Err(next) => prev = next, } } Ok(()) } } #[tokio::test] async fn concurrent_stale_is_rejected() { let scheduler = StdArc::new(JobScheduler::new().await.unwrap()); let svc = NginxService::new(scheduler.clone(), std::env::temp_dir()) .await .unwrap(); let s1 = svc.clone(); let h1 = tokio::spawn(async move { s1.test_simulated_reload("cfg", 2, 200).await }); // let second start shortly after first so it will wait for the mutex sleep(Duration::from_millis(20)).await; let s2 = svc.clone(); let h2 = tokio::spawn(async move { s2.test_simulated_reload("cfg", 1, 10).await }); let r1 = h1.await.unwrap(); assert!(r1.is_ok(), "first (newer) task should succeed"); let r2 = h2.await.unwrap(); assert!( r2.is_err(), "second (older) task should be rejected as stale" ); } }