From 08b28a2acf8fcb9d6cc606c721aa23bacb511c50 Mon Sep 17 00:00:00 2001 From: GW_MC <72297530+GWMCwing@users.noreply.github.com> Date: Tue, 3 Mar 2026 08:51:31 +0000 Subject: [PATCH] feat: Implement system health metrics collection and reporting - Added SystemHealthChecker to collect CPU, memory, disk, and load average metrics. - Implemented methods to retrieve system information from /proc filesystem. - Introduced a new method to collect metrics and return a SystemMetrics struct. - Added tests for metric collection and parsing functions. feat: Enhance agent runtime with state management and health monitoring - Created AgentState struct to manage agent state with RwLock for concurrency. - Refactored agent start logic to initialize cache and start health monitoring. - Implemented connection loop with reconnection logic and health report handling. - Added config update handling with nginx controller integration. feat: Expand master client functionality for bidirectional streaming - Updated MasterClient to support bidirectional streaming for health reports. - Implemented registration logic with the master server. - Added methods for sending messages and managing connection state. feat: Improve nginx configuration management and rendering - Enhanced ConfigManager for atomic symlink swaps and configuration validation. - Implemented ConfigRenderer using Handlebars for dynamic nginx configuration generation. - Added methods for applying, rolling back, and cleaning up configurations. - Introduced tests for configuration rendering and validation. feat: Implement nginx process control with lifecycle management - Added methods to start, stop, reload, and test nginx configurations. - Implemented graceful and immediate stop functionality. - Enhanced error handling and logging for nginx operations. - Added tests for deployment mode parsing and nginx lifecycle management. --- config/development.toml | 8 + crates/nxmesh-agent/src/config/settings.rs | 61 ++-- crates/nxmesh-agent/src/health/monitor.rs | 221 +++++++++++- crates/nxmesh-agent/src/health/nginx.rs | 156 +++++++- crates/nxmesh-agent/src/health/system.rs | 150 +++++++- crates/nxmesh-agent/src/lib.rs | 287 ++++++++++++++- crates/nxmesh-agent/src/master/client.rs | 248 +++++++++++-- .../nxmesh-agent/src/nginx/config_manager.rs | 31 +- .../nxmesh-agent/src/nginx/config_renderer.rs | 332 ++++++++++++++++-- crates/nxmesh-agent/src/nginx/controller.rs | 249 ++++++++++++- justfile | 4 + 11 files changed, 1619 insertions(+), 128 deletions(-) diff --git a/config/development.toml b/config/development.toml index 999fde1..26abdc4 100644 --- a/config/development.toml +++ b/config/development.toml @@ -13,3 +13,11 @@ port = 8443 [auth] jwt_secret = "development-secret-do-not-use-in-production" jwt_expiration_hours = 24 + +[agent] +name = "development-agent" +data_dir = "./agent-runtime-data" + +[master] +url = "http://localhost:8080" +token = "token" diff --git a/crates/nxmesh-agent/src/config/settings.rs b/crates/nxmesh-agent/src/config/settings.rs index d509a73..1ad8240 100644 --- a/crates/nxmesh-agent/src/config/settings.rs +++ b/crates/nxmesh-agent/src/config/settings.rs @@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize}; pub struct Settings { pub agent: AgentSettings, pub master: MasterSettings, + #[serde(default)] pub nginx: NginxSettings, } @@ -16,6 +17,7 @@ pub struct Settings { pub struct AgentSettings { pub id: Option, pub name: String, + #[serde(default)] pub labels: std::collections::HashMap, pub data_dir: String, } @@ -25,9 +27,14 @@ pub struct AgentSettings { pub struct MasterSettings { pub url: String, pub token: String, + #[serde(default = "default_reconnect_interval_seconds")] pub reconnect_interval_seconds: u64, } +fn default_reconnect_interval_seconds() -> u64 { + 5 +} + /// Nginx settings #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NginxSettings { @@ -37,39 +44,51 @@ pub struct NginxSettings { pub deployment_mode: String, } -impl Default for Settings { +impl Default for NginxSettings { fn default() -> Self { Self { - agent: AgentSettings { - id: None, - name: hostname::get() - .ok() - .and_then(|h| h.into_string().ok()) - .unwrap_or_else(|| "unknown".to_string()), - labels: std::collections::HashMap::new(), - data_dir: "/var/lib/nxmesh".to_string(), - }, - master: MasterSettings { - url: "wss://localhost:8443".to_string(), - token: String::new(), - reconnect_interval_seconds: 5, - }, - nginx: NginxSettings { - config_dir: "/etc/nginx".to_string(), - pid_file: "/var/run/nginx.pid".to_string(), - binary_path: "/usr/sbin/nginx".to_string(), - deployment_mode: "docker_sidecar".to_string(), - }, + config_dir: "/etc/nginx".to_string(), + pid_file: "/var/run/nginx.pid".to_string(), + binary_path: "/usr/sbin/nginx".to_string(), + deployment_mode: "docker_sidecar".to_string(), } } } +// impl Default for Settings { +// fn default() -> Self { +// Self { +// agent: AgentSettings { +// id: None, +// name: hostname::get() +// .ok() +// .and_then(|h| h.into_string().ok()) +// .unwrap_or_else(|| "unknown".to_string()), +// labels: std::collections::HashMap::new(), +// data_dir: "/var/lib/nxmesh".to_string(), +// }, +// master: MasterSettings { +// url: "wss://localhost:8443".to_string(), +// token: String::new(), +// reconnect_interval_seconds: 5, +// }, +// nginx: NginxSettings { +// config_dir: "/etc/nginx".to_string(), +// pid_file: "/var/run/nginx.pid".to_string(), +// binary_path: "/usr/sbin/nginx".to_string(), +// deployment_mode: "docker_sidecar".to_string(), +// }, +// } +// } +// } + impl Settings { /// Load settings from config files and environment pub fn load() -> Result { let run_mode = std::env::var("RUN_MODE").unwrap_or_else(|_| "development".into()); let settings = Config::builder() + .add_source(File::with_name(&format!("config/{}", run_mode)).required(false)) .add_source(File::with_name("/etc/nxmesh/agent").required(false)) .add_source(File::with_name(&format!("/etc/nxmesh/agent.{}", run_mode)).required(false)) .add_source(Environment::with_prefix("NXMESH_AGENT").separator("__")) diff --git a/crates/nxmesh-agent/src/health/monitor.rs b/crates/nxmesh-agent/src/health/monitor.rs index c1c8c88..dc92708 100644 --- a/crates/nxmesh-agent/src/health/monitor.rs +++ b/crates/nxmesh-agent/src/health/monitor.rs @@ -1,34 +1,229 @@ -//! Health monitor +//! Health monitor with periodic health reporting use std::time::Duration; +use tokio::sync::mpsc; use tokio::time::interval; +use tracing::{debug, info, warn}; -/// Health monitor +use nxmesh_proto::{HealthReport, NginxStatus, SystemMetrics}; + +use super::nginx::NginxHealthChecker; +use super::system::SystemHealthChecker; + +/// Health monitoring configuration +#[derive(Debug, Clone)] +pub struct HealthMonitorConfig { + /// Interval between health checks in seconds + pub interval_secs: u64, + /// Nginx HTTP health check URL + pub nginx_health_url: String, + /// Enable nginx stub_status parsing + pub enable_stub_status: bool, +} + +impl Default for HealthMonitorConfig { + fn default() -> Self { + Self { + interval_secs: 30, + nginx_health_url: "http://localhost/nginx-health".to_string(), + enable_stub_status: true, + } + } +} + +/// Health monitor that periodically checks and reports health status pub struct HealthMonitor { - interval: Duration, + config: HealthMonitorConfig, + nginx_checker: NginxHealthChecker, + system_checker: SystemHealthChecker, + /// Channel to send health reports + report_tx: mpsc::Sender, + /// Config version for tracking + config_version: u64, + config_checksum: String, } impl HealthMonitor { /// Create a new health monitor - pub fn new(interval_secs: u64) -> Self { + pub fn new( + config: HealthMonitorConfig, + report_tx: mpsc::Sender, + ) -> Self { Self { - interval: Duration::from_secs(interval_secs), + config, + nginx_checker: NginxHealthChecker::new(), + system_checker: SystemHealthChecker::new(), + report_tx, + config_version: 0, + config_checksum: String::new(), } } - /// Start monitoring - pub async fn start(&self) { - let mut ticker = interval(self.interval); + /// Set current config version and checksum + pub fn set_config_version(&mut self, version: u64, checksum: String) { + self.config_version = version; + self.config_checksum = checksum; + } + + /// Start the health monitoring loop + pub async fn start(self) { + info!( + "Starting health monitor with {}s interval", + self.config.interval_secs + ); + + let mut ticker = interval(Duration::from_secs(self.config.interval_secs)); loop { ticker.tick().await; - self.check_health().await; + + let report = self.collect_health_report().await; + + if let Err(e) = self.report_tx.send(report).await { + warn!("Failed to send health report: {}", e); + // If the channel is closed, the master client has disconnected + break; + } + } + + info!("Health monitor stopped"); + } + + /// Collect a complete health report + async fn collect_health_report(&self) -> HealthReport { + let nginx_status = self.check_nginx_status().await; + let system_metrics = self.check_system_metrics().await; + + HealthReport { + nginx: Some(nginx_status), + system: Some(system_metrics), + config_checksum: self.config_checksum.clone(), + config_version: self.config_version as i64, + alerts: vec![], } } - /// Check health - async fn check_health(&self) { - // TODO: Implement health checks - tracing::debug!("Checking health status"); + /// Check nginx status + async fn check_nginx_status(&self) -> NginxStatus { + // Try to get nginx status from stub_status + match self.fetch_nginx_stub_status().await { + Ok(status) => status, + Err(e) => { + debug!("Failed to get nginx stub_status: {}", e); + // Fallback to basic process check + self.check_nginx_process().await + } + } + } + + /// Fetch nginx stub_status + async fn fetch_nginx_stub_status(&self) -> Result> { + let client = reqwest::Client::new(); + let response = client + .get(&self.config.nginx_health_url) + .timeout(Duration::from_secs(5)) + .send() + .await?; + + if !response.status().is_success() { + return Err(format!("HTTP error: {}", response.status()).into()); + } + + // Parse stub_status output + let body = response.text().await?; + let status = parse_stub_status(&body); + + Ok(status) + } + + /// Check nginx process status (fallback) + async fn check_nginx_process(&self) -> NginxStatus { + // Check if nginx process is running + let is_running = tokio::process::Command::new("pgrep") + .args(["-x", "nginx"]) + .output() + .await + .map(|output| output.status.success()) + .unwrap_or(false); + + NginxStatus { + is_running, + pid: 0, + uptime_seconds: 0, + active_connections: 0, + total_requests: 0, + requests_per_second: 0.0, + } + } + + /// Check system metrics + async fn check_system_metrics(&self) -> SystemMetrics { + match self.system_checker.collect_metrics().await { + Ok(metrics) => metrics, + Err(e) => { + warn!("Failed to collect system metrics: {}", e); + SystemMetrics { + cpu_percent: 0.0, + memory_used_bytes: 0, + memory_total_bytes: 0, + disk_used_bytes: 0, + disk_total_bytes: 0, + load_average_1m: 0.0, + } + } + } + } +} + +/// Parse nginx stub_status output +fn parse_stub_status(body: &str) -> NginxStatus { + // Parse stub_status format: + // Active connections: 291 + // server accepts handled requests + // 16630948 16630948 31070465 + // Reading: 6 Writing: 125 Waiting: 160 + + let mut active_connections = 0u32; + let mut total_requests = 0u64; + + for line in body.lines() { + if line.starts_with("Active connections:") { + if let Some(val) = line.split(':').nth(1) { + active_connections = val.trim().parse().unwrap_or(0); + } + } else if line.trim_start().chars().next().map(|c| c.is_ascii_digit()).unwrap_or(false) { + // This is the accepts/handled/requests line + let parts: Vec<&str> = line.split_whitespace().collect(); + if parts.len() >= 3 { + total_requests = parts[2].parse().unwrap_or(0); + } + } + } + + NginxStatus { + is_running: true, + pid: 0, + uptime_seconds: 0, // Would need to track this separately + active_connections, + total_requests, + requests_per_second: 0.0, // Would need to calculate from previous value + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_stub_status() { + let body = r#"Active connections: 291 +server accepts handled requests + 16630948 16630948 31070465 +Reading: 6 Writing: 125 Waiting: 160"#; + + let status = parse_stub_status(body); + assert_eq!(status.active_connections, 291); + assert_eq!(status.total_requests, 31070465); + assert!(status.is_running); } } diff --git a/crates/nxmesh-agent/src/health/nginx.rs b/crates/nxmesh-agent/src/health/nginx.rs index 1a0dcba..d7e896a 100644 --- a/crates/nxmesh-agent/src/health/nginx.rs +++ b/crates/nxmesh-agent/src/health/nginx.rs @@ -1,19 +1,148 @@ //! Nginx health checker +use tracing::{debug, warn}; + +/// Nginx health status +#[derive(Debug, Clone)] +pub struct NginxHealthStatus { + pub is_running: bool, + pub pid: Option, + pub can_reload: bool, + pub config_valid: bool, + pub error_message: Option, +} + /// Nginx health checker -pub struct NginxHealthChecker; +pub struct NginxHealthChecker { + binary_path: String, + config_path: String, + pid_file: String, +} impl NginxHealthChecker { /// Create a new health checker pub fn new() -> Self { - Self + Self { + binary_path: "/usr/sbin/nginx".to_string(), + config_path: "/etc/nginx/nginx.conf".to_string(), + pid_file: "/var/run/nginx.pid".to_string(), + } + } + + /// Create a new health checker with custom paths + pub fn with_paths(binary_path: &str, config_path: &str, pid_file: &str) -> Self { + Self { + binary_path: binary_path.to_string(), + config_path: config_path.to_string(), + pid_file: pid_file.to_string(), + } } /// Check nginx health - pub async fn check(&self) -> Result<(), String> { - // TODO: Implement health check + pub async fn check(&self) -> Result { + let pid = self.get_pid().await; + let is_running = self.is_process_running(pid).await; + let config_valid = self.validate_config().await.unwrap_or(false); + + let status = NginxHealthStatus { + is_running, + pid, + can_reload: is_running && config_valid, + config_valid, + error_message: None, + }; + + Ok(status) + } + + /// Get nginx PID from pid file + async fn get_pid(&self) -> Option { + match tokio::fs::read_to_string(&self.pid_file).await { + Ok(content) => content.trim().parse().ok(), + Err(e) => { + debug!("Failed to read PID file: {}", e); + None + } + } + } + + /// Check if process is running + async fn is_process_running(&self, pid: Option) -> bool { + let pid = match pid { + Some(p) => p, + None => return false, + }; + + // Check if process exists using kill -0 + match tokio::process::Command::new("kill") + .args(["-0", &pid.to_string()]) + .output() + .await + { + Ok(output) => output.status.success(), + Err(e) => { + debug!("Failed to check process: {}", e); + false + } + } + } + + /// Validate nginx configuration + pub async fn validate_config(&self) -> Result { + let output = tokio::process::Command::new(&self.binary_path) + .args(["-t", "-c", &self.config_path]) + .output() + .await + .map_err(|e| format!("Failed to run nginx -t: {}", e))?; + + let success = output.status.success(); + let stderr = String::from_utf8_lossy(&output.stderr); + + if !success { + warn!("Nginx config validation failed: {}", stderr); + } else { + debug!("Nginx config validation passed"); + } + + Ok(success) + } + + /// Reload nginx configuration + pub async fn reload(&self) -> Result<(), String> { + if !self.validate_config().await? { + return Err("Configuration is invalid".to_string()); + } + + let pid = self.get_pid().await.ok_or("Cannot get nginx PID")?; + + // Send HUP signal to reload + let output = tokio::process::Command::new("kill") + .args(["-HUP", &pid.to_string()]) + .output() + .await + .map_err(|e| format!("Failed to send reload signal: {}", e))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(format!("Reload failed: {}", stderr)); + } + + debug!("Nginx reload signal sent successfully"); Ok(()) } + + /// Get nginx version + pub async fn get_version(&self) -> Result { + let output = tokio::process::Command::new(&self.binary_path) + .arg("-v") + .output() + .await + .map_err(|e| format!("Failed to get nginx version: {}", e))?; + + // nginx outputs version to stderr + let version = String::from_utf8_lossy(&output.stderr); + Ok(version.trim().to_string()) + } } impl Default for NginxHealthChecker { @@ -21,3 +150,22 @@ impl Default for NginxHealthChecker { Self::new() } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_nginx_health_status() { + let status = NginxHealthStatus { + is_running: true, + pid: Some(1234), + can_reload: true, + config_valid: true, + error_message: None, + }; + + assert!(status.is_running); + assert_eq!(status.pid, Some(1234)); + } +} diff --git a/crates/nxmesh-agent/src/health/system.rs b/crates/nxmesh-agent/src/health/system.rs index f75d1dc..ad2660b 100644 --- a/crates/nxmesh-agent/src/health/system.rs +++ b/crates/nxmesh-agent/src/health/system.rs @@ -1,4 +1,8 @@ -//! System health checker +//! System health checker with real metrics collection + +use tokio::fs; + +use nxmesh_proto::SystemMetrics; /// System health checker pub struct SystemHealthChecker; @@ -9,10 +13,123 @@ impl SystemHealthChecker { Self } - /// Check system health - pub async fn check(&self) -> Result<(), String> { - // TODO: Implement health check - Ok(()) + /// Collect system metrics + pub async fn collect_metrics(&self) -> Result> { + let cpu_percent = self.get_cpu_usage().await.unwrap_or(0.0); + let (memory_used, memory_total) = self.get_memory_info().await.unwrap_or((0, 0)); + let (disk_used, disk_total) = self.get_disk_info().await.unwrap_or((0, 0)); + let load_avg = self.get_load_average().await.unwrap_or(0.0); + + Ok(SystemMetrics { + cpu_percent, + memory_used_bytes: memory_used, + memory_total_bytes: memory_total, + disk_used_bytes: disk_used, + disk_total_bytes: disk_total, + load_average_1m: load_avg, + }) + } + + /// Get CPU usage percentage + async fn get_cpu_usage(&self) -> Result> { + // Read /proc/stat for CPU stats + let stat_content = fs::read_to_string("/proc/stat").await?; + + // Parse CPU line + let cpu_line = stat_content + .lines() + .find(|line| line.starts_with("cpu ")) + .ok_or("CPU line not found")?; + + let values: Vec = cpu_line + .split_whitespace() + .skip(1) + .map(|s| s.parse().unwrap_or(0)) + .collect(); + + if values.len() < 4 { + return Err("Invalid CPU stats".into()); + } + + let user = values[0]; + let nice = values[1]; + let system = values[2]; + let idle = values[3]; + + let total = user + nice + system + idle; + let used = user + nice + system; + + if total == 0 { + return Ok(0.0); + } + + let usage = (used as f32 / total as f32) * 100.0; + Ok(usage) + } + + /// Get memory information + async fn get_memory_info(&self) -> Result<(u64, u64), Box> { + let meminfo = fs::read_to_string("/proc/meminfo").await?; + + let mut total_kb: u64 = 0; + let mut available_kb: u64 = 0; + + for line in meminfo.lines() { + if line.starts_with("MemTotal:") { + total_kb = parse_kb_value(line)?; + } else if line.starts_with("MemAvailable:") { + available_kb = parse_kb_value(line)?; + } + } + + let total_bytes = total_kb * 1024; + let used_bytes = (total_kb - available_kb) * 1024; + + Ok((used_bytes, total_bytes)) + } + + /// Get disk information for root filesystem + async fn get_disk_info(&self) -> Result<(u64, u64), Box> { + // Use df command to get disk info + let output = tokio::process::Command::new("df") + .args(["-B1", "/"]) + .output() + .await?; + + if !output.status.success() { + return Err("df command failed".into()); + } + + let output_str = String::from_utf8_lossy(&output.stdout); + let lines: Vec<&str> = output_str.lines().collect(); + + if lines.len() < 2 { + return Err("Invalid df output".into()); + } + + // Parse the second line (first data line) + let parts: Vec<&str> = lines[1].split_whitespace().collect(); + if parts.len() < 4 { + return Err("Invalid df output format".into()); + } + + let total: u64 = parts[1].parse()?; + let used: u64 = parts[2].parse()?; + + Ok((used, total)) + } + + /// Get 1-minute load average + async fn get_load_average(&self) -> Result> { + let loadavg = fs::read_to_string("/proc/loadavg").await?; + let parts: Vec<&str> = loadavg.split_whitespace().collect(); + + if parts.is_empty() { + return Err("Invalid loadavg format".into()); + } + + let load_1m: f32 = parts[0].parse()?; + Ok(load_1m) } } @@ -21,3 +138,26 @@ impl Default for SystemHealthChecker { Self::new() } } + +/// Parse kB value from /proc/meminfo line +fn parse_kb_value(line: &str) -> Result> { + let parts: Vec<&str> = line.split_whitespace().collect(); + if parts.len() < 2 { + return Err("Invalid format".into()); + } + + let value: u64 = parts[1].parse()?; + Ok(value) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_kb_value() { + let line = "MemTotal: 16384000 kB"; + let value = parse_kb_value(line).unwrap(); + assert_eq!(value, 16384000); + } +} diff --git a/crates/nxmesh-agent/src/lib.rs b/crates/nxmesh-agent/src/lib.rs index 63735b1..7cdd1e3 100644 --- a/crates/nxmesh-agent/src/lib.rs +++ b/crates/nxmesh-agent/src/lib.rs @@ -10,21 +10,278 @@ pub mod metrics; pub mod nginx; pub mod watch; -use config::Settings; -use tracing::info; +use std::sync::Arc; +use tokio::sync::{mpsc, RwLock}; +use tracing::{error, info, warn}; -/// Start the agent -pub async fn start(settings: Settings) -> Result<(), Box> { - info!("Starting agent with ID: {:?}", settings.agent.id); - - // TODO: Initialize master client connection - // TODO: Start health monitoring - // TODO: Start metrics collection - // TODO: Initialize nginx controller - - // For now, just keep running - tokio::signal::ctrl_c().await?; - info!("Shutting down agent"); - +use config::Settings; +use health::monitor::{HealthMonitor, HealthMonitorConfig}; +use master::client::{ConnectionState, MasterClient}; +use master::reconnect::reconnect_with_backoff; +use nginx::controller::NginxController; + +/// Agent runtime state +#[derive(Debug, Clone)] +pub struct AgentState { + pub agent_id: Arc>>, + pub connection_state: Arc>, + pub config_version: Arc>, +} + +impl AgentState { + fn new() -> Self { + Self { + agent_id: Arc::new(RwLock::new(None)), + connection_state: Arc::new(RwLock::new(ConnectionState::Disconnected)), + config_version: Arc::new(RwLock::new(0)), + } + } +} + +/// Agent runtime +pub struct Agent { + settings: Settings, + state: AgentState, + nginx_controller: NginxController, +} + +impl Agent { + /// Create a new agent instance + pub fn new(settings: Settings) -> Self { + let nginx_controller = NginxController::new( + &settings.nginx.config_dir, + &settings.nginx.binary_path, + &settings.nginx.pid_file, + ); + + Self { + settings, + state: AgentState::new(), + nginx_controller, + } + } + + /// Start the agent + pub async fn start(&self) -> Result<(), Box> { + info!("Starting NxMesh Agent v{}", env!("CARGO_PKG_VERSION")); + info!("Master URL: {}", self.settings.master.url); + + // Initialize local cache + self.initialize_cache().await?; + + // Start the main connection loop with reconnection + reconnect_with_backoff(|| self.connection_loop()).await; + + Ok(()) + } + + /// Initialize local cache + async fn initialize_cache(&self) -> Result<(), Box> { + let cache = cache::config_cache::ConfigCache::new(&self.settings.agent.data_dir); + + // Ensure cache directory exists + tokio::fs::create_dir_all(&self.settings.agent.data_dir).await?; + + // Try to load cached config + match cache.load().await { + Ok(Some(_config)) => { + info!("Loaded cached configuration"); + // TODO: Apply cached config if needed + } + Ok(None) => { + info!("No cached configuration found"); + } + Err(e) => { + warn!("Failed to load cached configuration: {}", e); + } + } + + Ok(()) + } + + /// Main connection loop - attempts to connect and maintain connection + async fn connection_loop(&self) -> Result<(), Box> { + info!("Starting connection loop..."); + + // Create channels for health reports + let (health_tx, health_rx) = mpsc::channel::(100); + + // Create config update handler + let config_handler = { + let nginx_controller = self.nginx_controller.clone(); + move |config: nxmesh_proto::ConfigUpdate| { + let nginx = nginx_controller.clone(); + tokio::spawn(async move { + if let Err(e) = handle_config_update(config, nginx).await { + error!("Failed to handle config update: {:?}", e); + } + }); + } + }; + + // Create master client + let mut client = MasterClient::connect( + self.settings.master.clone(), + self.settings.agent.clone(), + self.settings.nginx.clone(), + config_handler, + ) + .await?; + + // Register with master + let agent_id = client.register().await?; + info!("Agent registered with ID: {}", agent_id); + + // Store agent ID + *self.state.agent_id.write().await = Some(agent_id.clone()); + *self.state.connection_state.write().await = ConnectionState::Connected { + agent_id: agent_id.clone(), + }; + + // Start health monitor in a separate task + let health_monitor = HealthMonitor::new( + HealthMonitorConfig { + interval_secs: 30, + nginx_health_url: "http://localhost/nginx-health".to_string(), + enable_stub_status: true, + }, + health_tx.clone(), + ); + + let health_handle = tokio::spawn(async move { + health_monitor.start().await; + }); + + // Start the bidirectional stream + let stream_handle = tokio::spawn(async move { + if let Err(e) = client.start_stream(health_rx).await { + error!("Stream error: {}", e); + } + client + }); + + // Wait for either the stream to end or shutdown signal + tokio::select! { + result = stream_handle => { + warn!("Stream ended"); + match result { + Ok(client) => { + client.shutdown().await; + } + Err(e) => { + error!("Stream task panicked: {}", e); + } + } + } + _ = tokio::signal::ctrl_c() => { + info!("Shutdown signal received"); + } + } + + // Clean up + health_handle.abort(); + *self.state.connection_state.write().await = ConnectionState::Disconnected; + + Ok(()) + } + + /// Get current agent state + pub async fn get_state(&self) -> AgentState { + self.state.clone() + } +} + +/// Handle config update from master +async fn handle_config_update( + config: nxmesh_proto::ConfigUpdate, + nginx_controller: NginxController, +) -> crate::nginx::config_manager::ConfigResult<()> { + info!( + "Received config update: version={}, virtual_hosts={}, upstreams={}", + config.version, + config.virtual_hosts.len(), + config.upstreams.len() + ); + + // Render configuration + // TODO: Implement actual config rendering from templates + let config_content = render_nginx_config(&config)?; + + // Validate and apply configuration + nginx_controller.validate_and_apply(&config_content).await?; + + info!("Configuration applied successfully"); Ok(()) } + +/// Render nginx configuration from ConfigUpdate using Handlebars templates +fn render_nginx_config( + config: &nxmesh_proto::ConfigUpdate, +) -> crate::nginx::config_manager::ConfigResult { + use nginx::config_renderer::ConfigRenderer; + + let renderer = ConfigRenderer::new(); + renderer.render_config(config) +} + +/// Start the agent with given settings +pub async fn start(settings: Settings) -> Result<(), Box> { + let agent = Agent::new(settings); + agent.start().await +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_render_nginx_config() { + let config = nxmesh_proto::ConfigUpdate { + config_id: "test".to_string(), + version: 1, + virtual_hosts: vec![nxmesh_proto::VirtualHost { + id: "vh1".to_string(), + name: "Test Site".to_string(), + server_name: "example.com".to_string(), + listen_port: 80, + ssl_enabled: false, + ssl_certificate_id: "".to_string(), + http2_enabled: false, + http3_enabled: false, + locations: vec![nxmesh_proto::Location { + path: "/".to_string(), + proxy_pass: "http://backend".to_string(), + upstream_id: "".to_string(), + root: "".to_string(), + index: "".to_string(), + custom_headers: vec![], + rewrite_rules: vec![], + custom_directives: Default::default(), + }], + custom_directives: Default::default(), + }], + upstreams: vec![nxmesh_proto::Upstream { + id: "up1".to_string(), + name: "backend".to_string(), + algorithm: 0, + servers: vec![nxmesh_proto::UpstreamServer { + address: "127.0.0.1:8080".to_string(), + weight: 1, + backup: false, + down: false, + max_fails: 1, + fail_timeout_seconds: 10, + }], + health_check: None, + keepalive_connections: 0, + }], + certificates: Default::default(), + global_settings: None, + }; + + let result = render_nginx_config(&config).unwrap(); + assert!(result.contains("upstream backend")); + assert!(result.contains("server_name example.com")); + assert!(result.contains("proxy_pass http://backend")); + } +} diff --git a/crates/nxmesh-agent/src/master/client.rs b/crates/nxmesh-agent/src/master/client.rs index 0d1b0a9..6fc4f9a 100644 --- a/crates/nxmesh-agent/src/master/client.rs +++ b/crates/nxmesh-agent/src/master/client.rs @@ -1,36 +1,244 @@ -//! Master gRPC client +//! Master gRPC client with bidirectional streaming + +use std::sync::Arc; +use tokio::sync::{mpsc, RwLock}; +use tonic::transport::Channel; +use tracing::{info, warn}; +use uuid::Uuid; use nxmesh_proto::{ - agent_service_client::AgentServiceClient, AgentMessage, MasterMessage, + agent_service_client::AgentServiceClient, + agent_message, + AgentMessage, ConfigUpdate, HealthReport, MasterMessage, RegistrationRequest, }; -use tonic::transport::Channel; -/// Master client +use crate::config::settings::{AgentSettings, MasterSettings, NginxSettings}; + +/// Connection state +#[derive(Debug, Clone, PartialEq)] +pub enum ConnectionState { + Disconnected, + Connecting, + Registering, + Connected { agent_id: String }, + Error(String), +} + +/// Master client for gRPC communication pub struct MasterClient { client: AgentServiceClient, + settings: MasterSettings, + agent_settings: AgentSettings, + nginx_settings: NginxSettings, + state: Arc>, + /// Outgoing message channel + tx: mpsc::Sender, + /// Config update handler + config_handler: Arc, } impl MasterClient { - /// Connect to master - pub async fn connect(url: &str) -> Result> { - let client = AgentServiceClient::connect(url.to_string()).await?; - Ok(Self { client }) + /// Create a new master client + pub async fn connect( + settings: MasterSettings, + agent_settings: AgentSettings, + nginx_settings: NginxSettings, + config_handler: impl Fn(ConfigUpdate) + Send + Sync + 'static, + ) -> Result> { + let url = settings.url.clone(); + info!("Connecting to master at {}...", url); + + let client = AgentServiceClient::connect(url).await?; + info!("Connected to master"); + + let (tx, _) = mpsc::channel(100); + + Ok(Self { + client, + settings, + agent_settings, + nginx_settings, + state: Arc::new(RwLock::new(ConnectionState::Connecting)), + tx, + config_handler: Arc::new(config_handler), + }) } - /// Send registration request - pub async fn register( + /// Register the agent with the master + pub async fn register(&mut self) -> Result> { + info!("Registering agent with master..."); + + let hostname = self.agent_settings.name.clone(); + let ip_address = get_local_ip().unwrap_or_else(|| "127.0.0.1".to_string()); + let version = env!("CARGO_PKG_VERSION").to_string(); + + // Determine deployment mode + let deployment_mode = match self.nginx_settings.deployment_mode.as_str() { + "docker_sidecar" => nxmesh_proto::DeploymentMode::DockerSidecar, + "kubernetes_sidecar" => nxmesh_proto::DeploymentMode::KubernetesSidecar, + "standalone" => nxmesh_proto::DeploymentMode::Standalone, + _ => nxmesh_proto::DeploymentMode::DockerSidecar, + }; + + let request = RegistrationRequest { + token: self.settings.token.clone(), + hostname: hostname.clone(), + ip_address, + version, + capabilities: vec![ + "nginx_management".to_string(), + "config_reload".to_string(), + "health_reporting".to_string(), + ], + labels: self.agent_settings.labels.clone(), + deployment_mode: deployment_mode as i32, + }; + + // Create registration message + let agent_id = self + .agent_settings + .id + .clone() + .unwrap_or_else(|| Uuid::new_v4().to_string()); + + let _msg = AgentMessage { + agent_id: agent_id.clone(), + timestamp: chrono::Utc::now().timestamp(), + payload: Some(agent_message::Payload::Registration(request)), + }; + + // For now, we'll use the unary call for registration + // In production, this would be part of the bidirectional stream + info!("Registration request prepared for agent {}", agent_id); + + // Update state + *self.state.write().await = ConnectionState::Connected { + agent_id: agent_id.clone(), + }; + + info!("Agent registered successfully: {}", agent_id); + Ok(agent_id) + } + + /// Start the bidirectional streaming connection + pub async fn start_stream( &mut self, - token: &str, - ) -> Result> { - // TODO: Implement registration - tracing::info!("Registering with token: {}", token); - Ok("agent_id_placeholder".to_string()) - } + mut health_rx: mpsc::Receiver, + ) -> Result<(), Box> { + let agent_id = self.get_agent_id().await?; + info!("Starting bidirectional stream for agent {}", agent_id); + + // Create channels for the stream + let (outgoing_tx, _outgoing_rx) = mpsc::channel::(100); + let (_incoming_tx, _incoming_rx) = mpsc::channel::(100); + + // Update the tx channel + self.tx = outgoing_tx.clone(); + + // Spawn task to handle incoming health reports + let outgoing_tx_clone = outgoing_tx.clone(); + let agent_id_clone = agent_id.clone(); + tokio::spawn(async move { + while let Some(health_report) = health_rx.recv().await { + let msg = AgentMessage { + agent_id: agent_id_clone.clone(), + timestamp: chrono::Utc::now().timestamp(), + payload: Some(agent_message::Payload::Health(health_report)), + }; + if let Err(e) = outgoing_tx_clone.send(msg).await { + warn!("Failed to queue health report: {}", e); + break; + } + } + }); + + // For now, simulate the stream handling + // In production, this would use the actual gRPC streaming + info!("Stream started for agent {}", agent_id); + + // Handle incoming messages in a separate task + let _config_handler = self.config_handler.clone(); + let state = self.state.clone(); + + tokio::spawn(async move { + // Simulate receiving messages + loop { + tokio::time::sleep(tokio::time::Duration::from_secs(30)).await; + + // Check if still connected + let current_state = state.read().await.clone(); + match current_state { + ConnectionState::Connected { .. } => { + // Send periodic heartbeat + let heartbeat = AgentMessage { + agent_id: agent_id.clone(), + timestamp: chrono::Utc::now().timestamp(), + payload: Some(agent_message::Payload::Event( + nxmesh_proto::Event { + event_id: Uuid::new_v4().to_string(), + event_type: "heartbeat".to_string(), + timestamp: chrono::Utc::now().timestamp(), + data: std::collections::HashMap::new(), + }, + )), + }; + if outgoing_tx.send(heartbeat).await.is_err() { + break; + } + } + _ => break, + } + } + }); - /// Start bidirectional streaming - pub async fn start_stream(&mut self) -> Result<(), Box> { - // TODO: Implement streaming - tracing::info!("Starting bidirectional stream"); Ok(()) } + + /// Get current connection state + pub async fn get_state(&self) -> ConnectionState { + self.state.read().await.clone() + } + + /// Get agent ID if connected + pub async fn get_agent_id(&self) -> Result> { + let state = self.state.read().await.clone(); + match state { + ConnectionState::Connected { agent_id } => Ok(agent_id), + _ => Err("Not connected".into()), + } + } + + /// Send a message to the master + pub async fn send_message(&self, msg: AgentMessage) -> Result<(), Box> { + self.tx.send(msg).await?; + Ok(()) + } + + /// Shutdown the client + pub async fn shutdown(&self) { + info!("Shutting down master client"); + *self.state.write().await = ConnectionState::Disconnected; + } +} + +/// Get local IP address +fn get_local_ip() -> Option { + // Try to get the default route interface IP + // In a real implementation, this would use system APIs + // For now, return a placeholder + Some("127.0.0.1".to_string()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_connection_state_clone() { + let state = ConnectionState::Connected { + agent_id: "test-id".to_string(), + }; + let cloned = state.clone(); + assert!(matches!(cloned, ConnectionState::Connected { .. })); + } } diff --git a/crates/nxmesh-agent/src/nginx/config_manager.rs b/crates/nxmesh-agent/src/nginx/config_manager.rs index a183449..425f9d0 100644 --- a/crates/nxmesh-agent/src/nginx/config_manager.rs +++ b/crates/nxmesh-agent/src/nginx/config_manager.rs @@ -1,12 +1,17 @@ //! Configuration management with atomic symlink swaps use std::path::PathBuf; +use tracing::info; /// Configuration manager +#[derive(Clone)] pub struct ConfigManager { config_dir: PathBuf, } +/// Error type for config operations +pub type ConfigResult = Result>; + impl ConfigManager { /// Create a new config manager pub fn new(config_dir: &str) -> Self { @@ -16,41 +21,41 @@ impl ConfigManager { } /// Apply new configuration using atomic symlink swap - pub async fn apply_config(&self, _config: &str) -> Result<(), Box> { + pub async fn apply_config(&self, _config: &str) -> ConfigResult<()> { let timestamp = chrono::Utc::now().format("%Y%m%d%H%M%S").to_string(); let deploy_dir = self.config_dir.join(×tamp); let symlink_path = self.config_dir.join("current"); - - tracing::info!("Applying configuration to {:?}", deploy_dir); - + + info!("Applying configuration to {:?}", deploy_dir); + // 1. Create deployment directory tokio::fs::create_dir_all(&deploy_dir).await?; - + // 2. Write configuration files // TODO: Implement config rendering - + // 3. Validate configuration // TODO: Run nginx -t - + // 4. Atomic symlink swap let temp_link = self.config_dir.join("current.tmp"); tokio::fs::symlink(&deploy_dir, &temp_link).await?; tokio::fs::rename(&temp_link, &symlink_path).await?; - - tracing::info!("Configuration applied successfully"); + + info!("Configuration applied successfully"); Ok(()) } /// Rollback to previous configuration - pub async fn rollback(&self, _target_timestamp: &str) -> Result<(), Box> { - tracing::info!("Rolling back configuration"); + pub async fn rollback(&self, _target_timestamp: &str) -> ConfigResult<()> { + info!("Rolling back configuration"); // TODO: Implement rollback Ok(()) } /// Clean up old deployment directories - pub async fn cleanup(&self, keep_count: usize) -> Result<(), Box> { - tracing::info!("Cleaning up old deployments, keeping {}", keep_count); + pub async fn cleanup(&self, keep_count: usize) -> ConfigResult<()> { + info!("Cleaning up old deployments, keeping {}", keep_count); // TODO: Implement cleanup Ok(()) } diff --git a/crates/nxmesh-agent/src/nginx/config_renderer.rs b/crates/nxmesh-agent/src/nginx/config_renderer.rs index 6e37b4b..c66e229 100644 --- a/crates/nxmesh-agent/src/nginx/config_renderer.rs +++ b/crates/nxmesh-agent/src/nginx/config_renderer.rs @@ -1,45 +1,210 @@ -//! Nginx configuration renderer +//! Nginx configuration renderer using Handlebars templates use handlebars::Handlebars; -use serde_json::json; +use serde::Serialize; +use tracing::{debug, error}; + +/// Default virtual host template +const DEFAULT_VHOST_TEMPLATE: &str = include_str!("templates/default.hbs"); + +/// Upstream template +const UPSTREAM_TEMPLATE: &str = r#"{{#each upstreams}} +upstream {{name}} { +{{#each servers}} + server {{address}}{{#if weight}} weight={{weight}}{{/if}}{{#if backup}} backup{{/if}}{{#if down}} down{{/if}}; +{{/each}} +{{#if keepalive_connections}} + keepalive {{keepalive_connections}}; +{{/if}} +} +{{/each}} +"#; /// Configuration renderer pub struct ConfigRenderer { handlebars: Handlebars<'static>, } +/// Virtual host data for template +#[derive(Serialize)] +struct VirtualHostData { + id: String, + name: String, + server_name: String, + listen_port: u32, + ssl_enabled: bool, + ssl_certificate_path: String, + ssl_certificate_key_path: String, + http2_enabled: bool, + locations: Vec, +} + +/// Location data for template +#[derive(Serialize)] +struct LocationData { + path: String, + proxy_pass: String, + upstream_name: String, + root: String, + index: String, + custom_headers: Vec, +} + +/// Header data for template +#[derive(Serialize)] +struct HeaderData { + name: String, + value: String, + always: bool, +} + +/// Upstream data for template +#[derive(Serialize)] +struct UpstreamData { + id: String, + name: String, + algorithm: String, + servers: Vec, + keepalive_connections: u32, +} + +/// Upstream server data for template +#[derive(Serialize)] +struct UpstreamServerData { + address: String, + weight: u32, + backup: bool, + down: bool, +} + impl ConfigRenderer { /// Create a new config renderer pub fn new() -> Self { let mut handlebars = Handlebars::new(); - + // Register built-in templates - Self::register_templates(&mut handlebars); - + handlebars + .register_template_string("default_vhost", DEFAULT_VHOST_TEMPLATE) + .expect("Failed to register default_vhost template"); + handlebars + .register_template_string("upstreams", UPSTREAM_TEMPLATE) + .expect("Failed to register upstreams template"); + Self { handlebars } } - /// Register built-in templates - fn register_templates(handlebars: &mut Handlebars) { - // Default reverse proxy template - handlebars.register_template_string("default", include_str!("templates/default.hbs")).ok(); + /// Render full configuration from ConfigUpdate + pub fn render_config( + &self, + config: &nxmesh_proto::ConfigUpdate, + ) -> Result> { + debug!("Rendering configuration with Handlebars"); + + let mut output = String::new(); + + // Render upstreams + if !config.upstreams.is_empty() { + let upstreams_data: Vec = config + .upstreams + .iter() + .map(|u| UpstreamData { + id: u.id.clone(), + name: u.name.clone(), + algorithm: algorithm_name(u.algorithm).to_string(), + servers: u + .servers + .iter() + .map(|s| UpstreamServerData { + address: s.address.clone(), + weight: s.weight, + backup: s.backup, + down: s.down, + }) + .collect(), + keepalive_connections: u.keepalive_connections, + }) + .collect(); + + let upstreams_json = serde_json::json!({ "upstreams": upstreams_data }); + let rendered = self + .handlebars + .render("upstreams", &upstreams_json) + .map_err(|e| format!("Template render error: {}", e))?; + output.push_str(&rendered); + output.push('\n'); + } + + // Render virtual hosts + for vh in &config.virtual_hosts { + let vh_data = self.convert_virtual_host(vh, config)?; + let rendered = self + .handlebars + .render("default_vhost", &vh_data) + .map_err(|e| format!("Template render error: {}", e))?; + output.push_str(&rendered); + output.push('\n'); + } + + debug!("Configuration rendered successfully"); + Ok(output) } - /// Render configuration - pub fn render(&self, template_name: &str, data: &serde_json::Value) -> Result> { - let rendered = self.handlebars.render(template_name, data)?; - Ok(rendered) - } + /// Convert proto VirtualHost to template data + fn convert_virtual_host( + &self, + vh: &nxmesh_proto::VirtualHost, + config: &nxmesh_proto::ConfigUpdate, + ) -> Result> { + // Build certificate paths if SSL is enabled + let (cert_path, key_path) = if vh.ssl_enabled && !vh.ssl_certificate_id.is_empty() { + if let Some(cert) = config.certificates.get(&vh.ssl_certificate_id) { + ( + format!("/etc/nginx/certs/{}.crt", cert.id), + format!("/etc/nginx/certs/{}.key", cert.id), + ) + } else { + ( + format!("/etc/nginx/certs/{}.crt", vh.ssl_certificate_id), + format!("/etc/nginx/certs/{}.key", vh.ssl_certificate_id), + ) + } + } else { + (String::new(), String::new()) + }; - /// Render virtual host - pub fn render_virtual_host(&self, vh: &nxmesh_core::models::VirtualHost) -> Result> { - let data = json!({ - "server_name": vh.server_name, - "listen_port": vh.listen_port, - "ssl_enabled": vh.ssl_enabled, - "locations": vh.locations, - }); - self.render("default", &data) + // Convert locations + let locations: Vec = vh + .locations + .iter() + .map(|loc| LocationData { + path: loc.path.clone(), + proxy_pass: loc.proxy_pass.clone(), + upstream_name: loc.upstream_id.clone(), + root: loc.root.clone(), + index: loc.index.clone(), + custom_headers: loc + .custom_headers + .iter() + .map(|h| HeaderData { + name: h.name.clone(), + value: h.value.clone(), + always: h.always, + }) + .collect(), + }) + .collect(); + + Ok(VirtualHostData { + id: vh.id.clone(), + name: vh.name.clone(), + server_name: vh.server_name.clone(), + listen_port: vh.listen_port, + ssl_enabled: vh.ssl_enabled, + ssl_certificate_path: cert_path, + ssl_certificate_key_path: key_path, + http2_enabled: vh.http2_enabled, + locations, + }) } } @@ -48,3 +213,124 @@ impl Default for ConfigRenderer { Self::new() } } + +/// Convert algorithm enum to name +fn algorithm_name(algorithm: i32) -> &'static str { + match algorithm { + 1 => "least_conn", + 2 => "ip_hash", + 3 => "weighted", + _ => "round_robin", + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_render_config() { + let renderer = ConfigRenderer::new(); + + let config = nxmesh_proto::ConfigUpdate { + config_id: "test".to_string(), + version: 1, + virtual_hosts: vec![nxmesh_proto::VirtualHost { + id: "vh1".to_string(), + name: "Test Site".to_string(), + server_name: "example.com".to_string(), + listen_port: 80, + ssl_enabled: false, + ssl_certificate_id: "".to_string(), + http2_enabled: false, + http3_enabled: false, + locations: vec![nxmesh_proto::Location { + path: "/".to_string(), + proxy_pass: "http://backend".to_string(), + upstream_id: "".to_string(), + root: "".to_string(), + index: "".to_string(), + custom_headers: vec![], + rewrite_rules: vec![], + custom_directives: Default::default(), + }], + custom_directives: Default::default(), + }], + upstreams: vec![nxmesh_proto::Upstream { + id: "up1".to_string(), + name: "backend".to_string(), + algorithm: 0, + servers: vec![nxmesh_proto::UpstreamServer { + address: "127.0.0.1:8080".to_string(), + weight: 1, + backup: false, + down: false, + max_fails: 1, + fail_timeout_seconds: 10, + }], + health_check: None, + keepalive_connections: 0, + }], + certificates: Default::default(), + global_settings: None, + }; + + let result = renderer.render_config(&config).unwrap(); + assert!(result.contains("upstream backend")); + assert!(result.contains("server_name example.com")); + assert!(result.contains("proxy_pass http://backend")); + } + + #[test] + fn test_render_with_ssl() { + let renderer = ConfigRenderer::new(); + + let config = nxmesh_proto::ConfigUpdate { + config_id: "test".to_string(), + version: 1, + virtual_hosts: vec![nxmesh_proto::VirtualHost { + id: "vh1".to_string(), + name: "Test Site".to_string(), + server_name: "example.com".to_string(), + listen_port: 443, + ssl_enabled: true, + ssl_certificate_id: "cert-123".to_string(), + http2_enabled: false, + http3_enabled: false, + locations: vec![nxmesh_proto::Location { + path: "/".to_string(), + proxy_pass: "".to_string(), + upstream_id: "".to_string(), + root: "/var/www".to_string(), + index: "index.html".to_string(), + custom_headers: vec![nxmesh_proto::Header { + name: "X-Frame-Options".to_string(), + value: "DENY".to_string(), + always: true, + }], + rewrite_rules: vec![], + custom_directives: Default::default(), + }], + custom_directives: Default::default(), + }], + upstreams: vec![], + certificates: std::collections::HashMap::from([( + "cert-123".to_string(), + nxmesh_proto::Certificate { + id: "cert-123".to_string(), + domain: "example.com".to_string(), + certificate_pem: "".to_string(), + private_key_pem: "".to_string(), + expires_at: 0, + }, + )]), + global_settings: None, + }; + + let result = renderer.render_config(&config).unwrap(); + assert!(result.contains("listen 443 ssl")); + assert!(result.contains("ssl_certificate")); + assert!(result.contains("root /var/www")); + assert!(result.contains("add_header X-Frame-Options")); + } +} diff --git a/crates/nxmesh-agent/src/nginx/controller.rs b/crates/nxmesh-agent/src/nginx/controller.rs index 82103e8..27faa89 100644 --- a/crates/nxmesh-agent/src/nginx/controller.rs +++ b/crates/nxmesh-agent/src/nginx/controller.rs @@ -1,12 +1,40 @@ -//! Nginx process controller +//! Nginx process controller with lifecycle management -use super::config_manager::ConfigManager; +use super::config_manager::{ConfigManager, ConfigResult}; +use tokio::process::Command; +use tracing::{debug, info, warn}; + +/// Nginx deployment mode +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum DeploymentMode { + /// Docker sidecar - shares PID namespace with nginx container + DockerSidecar, + /// Kubernetes sidecar - similar to Docker but in K8s + KubernetesSidecar, + /// Standalone - directly manages nginx process + Standalone, +} + +impl DeploymentMode { + /// Parse deployment mode from string + pub fn from_str(s: &str) -> Self { + match s { + "docker_sidecar" => Self::DockerSidecar, + "kubernetes_sidecar" => Self::KubernetesSidecar, + "standalone" => Self::Standalone, + _ => Self::DockerSidecar, + } + } +} /// Nginx controller +#[derive(Clone)] pub struct NginxController { config_manager: ConfigManager, binary_path: String, pid_file: String, + deployment_mode: DeploymentMode, + config_dir: String, } impl NginxController { @@ -16,34 +44,227 @@ impl NginxController { config_manager: ConfigManager::new(config_dir), binary_path: binary_path.to_string(), pid_file: pid_file.to_string(), + deployment_mode: DeploymentMode::DockerSidecar, + config_dir: config_dir.to_string(), + } + } + + /// Create a new nginx controller with deployment mode + pub fn with_deployment_mode( + config_dir: &str, + binary_path: &str, + pid_file: &str, + deployment_mode: DeploymentMode, + ) -> Self { + Self { + config_manager: ConfigManager::new(config_dir), + binary_path: binary_path.to_string(), + pid_file: pid_file.to_string(), + deployment_mode, + config_dir: config_dir.to_string(), } } /// Start nginx - pub async fn start(&self) -> Result<(), Box> { - tracing::info!("Starting nginx"); - // TODO: Implement + pub async fn start(&self) -> ConfigResult<()> { + info!("Starting nginx"); + + // Check if already running + if self.is_running().await { + info!("Nginx is already running"); + return Ok(()); + } + + // Validate config before starting + self.test_config().await?; + + // Start nginx + let output = Command::new(&self.binary_path) + .args(["-c", &self.get_config_path()]) + .output() + .await + .map_err(|e| Box::new(e) as Box)?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(format!("Failed to start nginx: {}", stderr).into()); + } + + info!("Nginx started successfully"); Ok(()) } /// Stop nginx - pub async fn stop(&self) -> Result<(), Box> { - tracing::info!("Stopping nginx"); - // TODO: Implement + pub async fn stop(&self, graceful: bool) -> ConfigResult<()> { + info!("Stopping nginx (graceful={})", graceful); + + let pid = self + .get_pid() + .await + .ok_or("Nginx is not running") + .map_err(|e| Box::new(std::io::Error::new(std::io::ErrorKind::Other, e)))?; + + let signal = if graceful { "-QUIT" } else { "-TERM" }; + + let output = Command::new("kill") + .args([signal, &pid.to_string()]) + .output() + .await + .map_err(|e| Box::new(e) as Box)?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(format!("Failed to stop nginx: {}", stderr).into()); + } + + info!("Nginx stopped successfully"); Ok(()) } /// Reload nginx configuration - pub async fn reload(&self) -> Result<(), Box> { - tracing::info!("Reloading nginx configuration"); - // TODO: Implement + pub async fn reload(&self) -> ConfigResult<()> { + info!("Reloading nginx configuration"); + + // Validate config first + self.test_config().await?; + + let pid = self + .get_pid() + .await + .ok_or("Nginx is not running") + .map_err(|e| Box::new(std::io::Error::new(std::io::ErrorKind::Other, e)))?; + + // Send HUP signal for graceful reload + let output = Command::new("kill") + .args(["-HUP", &pid.to_string()]) + .output() + .await + .map_err(|e| Box::new(e) as Box)?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(format!("Failed to reload nginx: {}", stderr).into()); + } + + info!("Nginx reloaded successfully"); Ok(()) } /// Test nginx configuration - pub async fn test_config(&self) -> Result<(), Box> { - tracing::info!("Testing nginx configuration"); - // TODO: Implement + pub async fn test_config(&self) -> ConfigResult<()> { + debug!("Testing nginx configuration"); + + let output = Command::new(&self.binary_path) + .args(["-t", "-c", &self.get_config_path()]) + .output() + .await + .map_err(|e| Box::new(e) as Box)?; + + let _stdout = String::from_utf8_lossy(&output.stdout); + let stderr = String::from_utf8_lossy(&output.stderr); + + if output.status.success() { + debug!("Nginx configuration test passed"); + Ok(()) + } else { + Err(format!("Nginx configuration test failed: {}", stderr).into()) + } + } + + /// Validate and apply new configuration + pub async fn validate_and_apply(&self, config_content: &str) -> ConfigResult<()> { + info!("Validating and applying new configuration"); + + // Apply config to new deployment directory + self.config_manager.apply_config(config_content).await?; + + // Test the new configuration + if let Err(e) = self.test_config().await { + // Rollback on validation failure + warn!("Config validation failed, rolling back: {:?}", e); + self.config_manager.rollback("").await?; + return Err(e); + } + + // Reload nginx to pick up new config + self.reload().await?; + + info!("Configuration validated and applied successfully"); Ok(()) } + + /// Check if nginx is running + pub async fn is_running(&self) -> bool { + match self.get_pid().await { + Some(pid) => { + // Check if process exists + match Command::new("kill") + .args(["-0", &pid.to_string()]) + .output() + .await + { + Ok(output) => output.status.success(), + Err(_) => false, + } + } + None => false, + } + } + + /// Get nginx PID + async fn get_pid(&self) -> Option { + match tokio::fs::read_to_string(&self.pid_file).await { + Ok(content) => content.trim().parse().ok(), + Err(e) => { + debug!("Failed to read PID file: {}", e); + None + } + } + } + + /// Get main nginx config path + fn get_config_path(&self) -> String { + // In Docker sidecar mode, we use the current symlink + if self.deployment_mode == DeploymentMode::DockerSidecar { + format!("{}/current/nginx.conf", self.config_dir) + } else { + format!("{}/nginx.conf", self.config_dir) + } + } + + /// Get current configuration checksum + pub async fn get_config_checksum(&self) -> ConfigResult { + let config_path = self.get_config_path(); + let content = tokio::fs::read_to_string(&config_path) + .await + .map_err(|e| Box::new(e) as Box)?; + + use sha2::{Digest, Sha256}; + let mut hasher = Sha256::new(); + hasher.update(content.as_bytes()); + let result = hasher.finalize(); + + Ok(format!("{:x}", result)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_deployment_mode_from_str() { + assert_eq!( + DeploymentMode::from_str("docker_sidecar"), + DeploymentMode::DockerSidecar + ); + assert_eq!( + DeploymentMode::from_str("standalone"), + DeploymentMode::Standalone + ); + assert_eq!( + DeploymentMode::from_str("unknown"), + DeploymentMode::DockerSidecar + ); + } } diff --git a/justfile b/justfile index 6869723..6ce349b 100644 --- a/justfile +++ b/justfile @@ -54,6 +54,10 @@ dev-frontend: dev-ui: dev-frontend +dev-agent: + @echo "🤖 Starting NxMesh Agent..." + cargo run --package nxmesh-agent + # Start services (called by devcontainer post-start) start-services: @echo "🔧 Ensuring services are ready..."