feat: Implement system health metrics collection and reporting

- Added SystemHealthChecker to collect CPU, memory, disk, and load average metrics.
- Implemented methods to retrieve system information from /proc filesystem.
- Introduced a new method to collect metrics and return a SystemMetrics struct.
- Added tests for metric collection and parsing functions.

feat: Enhance agent runtime with state management and health monitoring

- Created AgentState struct to manage agent state with RwLock for concurrency.
- Refactored agent start logic to initialize cache and start health monitoring.
- Implemented connection loop with reconnection logic and health report handling.
- Added config update handling with nginx controller integration.

feat: Expand master client functionality for bidirectional streaming

- Updated MasterClient to support bidirectional streaming for health reports.
- Implemented registration logic with the master server.
- Added methods for sending messages and managing connection state.

feat: Improve nginx configuration management and rendering

- Enhanced ConfigManager for atomic symlink swaps and configuration validation.
- Implemented ConfigRenderer using Handlebars for dynamic nginx configuration generation.
- Added methods for applying, rolling back, and cleaning up configurations.
- Introduced tests for configuration rendering and validation.

feat: Implement nginx process control with lifecycle management

- Added methods to start, stop, reload, and test nginx configurations.
- Implemented graceful and immediate stop functionality.
- Enhanced error handling and logging for nginx operations.
- Added tests for deployment mode parsing and nginx lifecycle management.
This commit is contained in:
GW_MC
2026-03-03 08:51:31 +00:00
parent 4eddf7e094
commit 08b28a2acf
11 changed files with 1619 additions and 128 deletions

View File

@@ -13,3 +13,11 @@ port = 8443
[auth] [auth]
jwt_secret = "development-secret-do-not-use-in-production" jwt_secret = "development-secret-do-not-use-in-production"
jwt_expiration_hours = 24 jwt_expiration_hours = 24
[agent]
name = "development-agent"
data_dir = "./agent-runtime-data"
[master]
url = "http://localhost:8080"
token = "token"

View File

@@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize};
pub struct Settings { pub struct Settings {
pub agent: AgentSettings, pub agent: AgentSettings,
pub master: MasterSettings, pub master: MasterSettings,
#[serde(default)]
pub nginx: NginxSettings, pub nginx: NginxSettings,
} }
@@ -16,6 +17,7 @@ pub struct Settings {
pub struct AgentSettings { pub struct AgentSettings {
pub id: Option<String>, pub id: Option<String>,
pub name: String, pub name: String,
#[serde(default)]
pub labels: std::collections::HashMap<String, String>, pub labels: std::collections::HashMap<String, String>,
pub data_dir: String, pub data_dir: String,
} }
@@ -25,9 +27,14 @@ pub struct AgentSettings {
pub struct MasterSettings { pub struct MasterSettings {
pub url: String, pub url: String,
pub token: String, pub token: String,
#[serde(default = "default_reconnect_interval_seconds")]
pub reconnect_interval_seconds: u64, pub reconnect_interval_seconds: u64,
} }
fn default_reconnect_interval_seconds() -> u64 {
5
}
/// Nginx settings /// Nginx settings
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NginxSettings { pub struct NginxSettings {
@@ -37,39 +44,51 @@ pub struct NginxSettings {
pub deployment_mode: String, pub deployment_mode: String,
} }
impl Default for Settings { impl Default for NginxSettings {
fn default() -> Self { fn default() -> Self {
Self { Self {
agent: AgentSettings { config_dir: "/etc/nginx".to_string(),
id: None, pid_file: "/var/run/nginx.pid".to_string(),
name: hostname::get() binary_path: "/usr/sbin/nginx".to_string(),
.ok() deployment_mode: "docker_sidecar".to_string(),
.and_then(|h| h.into_string().ok())
.unwrap_or_else(|| "unknown".to_string()),
labels: std::collections::HashMap::new(),
data_dir: "/var/lib/nxmesh".to_string(),
},
master: MasterSettings {
url: "wss://localhost:8443".to_string(),
token: String::new(),
reconnect_interval_seconds: 5,
},
nginx: NginxSettings {
config_dir: "/etc/nginx".to_string(),
pid_file: "/var/run/nginx.pid".to_string(),
binary_path: "/usr/sbin/nginx".to_string(),
deployment_mode: "docker_sidecar".to_string(),
},
} }
} }
} }
// impl Default for Settings {
// fn default() -> Self {
// Self {
// agent: AgentSettings {
// id: None,
// name: hostname::get()
// .ok()
// .and_then(|h| h.into_string().ok())
// .unwrap_or_else(|| "unknown".to_string()),
// labels: std::collections::HashMap::new(),
// data_dir: "/var/lib/nxmesh".to_string(),
// },
// master: MasterSettings {
// url: "wss://localhost:8443".to_string(),
// token: String::new(),
// reconnect_interval_seconds: 5,
// },
// nginx: NginxSettings {
// config_dir: "/etc/nginx".to_string(),
// pid_file: "/var/run/nginx.pid".to_string(),
// binary_path: "/usr/sbin/nginx".to_string(),
// deployment_mode: "docker_sidecar".to_string(),
// },
// }
// }
// }
impl Settings { impl Settings {
/// Load settings from config files and environment /// Load settings from config files and environment
pub fn load() -> Result<Self, ConfigError> { pub fn load() -> Result<Self, ConfigError> {
let run_mode = std::env::var("RUN_MODE").unwrap_or_else(|_| "development".into()); let run_mode = std::env::var("RUN_MODE").unwrap_or_else(|_| "development".into());
let settings = Config::builder() let settings = Config::builder()
.add_source(File::with_name(&format!("config/{}", run_mode)).required(false))
.add_source(File::with_name("/etc/nxmesh/agent").required(false)) .add_source(File::with_name("/etc/nxmesh/agent").required(false))
.add_source(File::with_name(&format!("/etc/nxmesh/agent.{}", run_mode)).required(false)) .add_source(File::with_name(&format!("/etc/nxmesh/agent.{}", run_mode)).required(false))
.add_source(Environment::with_prefix("NXMESH_AGENT").separator("__")) .add_source(Environment::with_prefix("NXMESH_AGENT").separator("__"))

View File

@@ -1,34 +1,229 @@
//! Health monitor //! Health monitor with periodic health reporting
use std::time::Duration; use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::interval; use tokio::time::interval;
use tracing::{debug, info, warn};
/// Health monitor use nxmesh_proto::{HealthReport, NginxStatus, SystemMetrics};
use super::nginx::NginxHealthChecker;
use super::system::SystemHealthChecker;
/// Health monitoring configuration
#[derive(Debug, Clone)]
pub struct HealthMonitorConfig {
/// Interval between health checks in seconds
pub interval_secs: u64,
/// Nginx HTTP health check URL
pub nginx_health_url: String,
/// Enable nginx stub_status parsing
pub enable_stub_status: bool,
}
impl Default for HealthMonitorConfig {
fn default() -> Self {
Self {
interval_secs: 30,
nginx_health_url: "http://localhost/nginx-health".to_string(),
enable_stub_status: true,
}
}
}
/// Health monitor that periodically checks and reports health status
pub struct HealthMonitor { pub struct HealthMonitor {
interval: Duration, config: HealthMonitorConfig,
nginx_checker: NginxHealthChecker,
system_checker: SystemHealthChecker,
/// Channel to send health reports
report_tx: mpsc::Sender<HealthReport>,
/// Config version for tracking
config_version: u64,
config_checksum: String,
} }
impl HealthMonitor { impl HealthMonitor {
/// Create a new health monitor /// Create a new health monitor
pub fn new(interval_secs: u64) -> Self { pub fn new(
config: HealthMonitorConfig,
report_tx: mpsc::Sender<HealthReport>,
) -> Self {
Self { Self {
interval: Duration::from_secs(interval_secs), config,
nginx_checker: NginxHealthChecker::new(),
system_checker: SystemHealthChecker::new(),
report_tx,
config_version: 0,
config_checksum: String::new(),
} }
} }
/// Start monitoring /// Set current config version and checksum
pub async fn start(&self) { pub fn set_config_version(&mut self, version: u64, checksum: String) {
let mut ticker = interval(self.interval); self.config_version = version;
self.config_checksum = checksum;
}
/// Start the health monitoring loop
pub async fn start(self) {
info!(
"Starting health monitor with {}s interval",
self.config.interval_secs
);
let mut ticker = interval(Duration::from_secs(self.config.interval_secs));
loop { loop {
ticker.tick().await; ticker.tick().await;
self.check_health().await;
let report = self.collect_health_report().await;
if let Err(e) = self.report_tx.send(report).await {
warn!("Failed to send health report: {}", e);
// If the channel is closed, the master client has disconnected
break;
}
}
info!("Health monitor stopped");
}
/// Collect a complete health report
async fn collect_health_report(&self) -> HealthReport {
let nginx_status = self.check_nginx_status().await;
let system_metrics = self.check_system_metrics().await;
HealthReport {
nginx: Some(nginx_status),
system: Some(system_metrics),
config_checksum: self.config_checksum.clone(),
config_version: self.config_version as i64,
alerts: vec![],
} }
} }
/// Check health /// Check nginx status
async fn check_health(&self) { async fn check_nginx_status(&self) -> NginxStatus {
// TODO: Implement health checks // Try to get nginx status from stub_status
tracing::debug!("Checking health status"); match self.fetch_nginx_stub_status().await {
Ok(status) => status,
Err(e) => {
debug!("Failed to get nginx stub_status: {}", e);
// Fallback to basic process check
self.check_nginx_process().await
}
}
}
/// Fetch nginx stub_status
async fn fetch_nginx_stub_status(&self) -> Result<NginxStatus, Box<dyn std::error::Error + Send + Sync>> {
let client = reqwest::Client::new();
let response = client
.get(&self.config.nginx_health_url)
.timeout(Duration::from_secs(5))
.send()
.await?;
if !response.status().is_success() {
return Err(format!("HTTP error: {}", response.status()).into());
}
// Parse stub_status output
let body = response.text().await?;
let status = parse_stub_status(&body);
Ok(status)
}
/// Check nginx process status (fallback)
async fn check_nginx_process(&self) -> NginxStatus {
// Check if nginx process is running
let is_running = tokio::process::Command::new("pgrep")
.args(["-x", "nginx"])
.output()
.await
.map(|output| output.status.success())
.unwrap_or(false);
NginxStatus {
is_running,
pid: 0,
uptime_seconds: 0,
active_connections: 0,
total_requests: 0,
requests_per_second: 0.0,
}
}
/// Check system metrics
async fn check_system_metrics(&self) -> SystemMetrics {
match self.system_checker.collect_metrics().await {
Ok(metrics) => metrics,
Err(e) => {
warn!("Failed to collect system metrics: {}", e);
SystemMetrics {
cpu_percent: 0.0,
memory_used_bytes: 0,
memory_total_bytes: 0,
disk_used_bytes: 0,
disk_total_bytes: 0,
load_average_1m: 0.0,
}
}
}
}
}
/// Parse nginx stub_status output
fn parse_stub_status(body: &str) -> NginxStatus {
// Parse stub_status format:
// Active connections: 291
// server accepts handled requests
// 16630948 16630948 31070465
// Reading: 6 Writing: 125 Waiting: 160
let mut active_connections = 0u32;
let mut total_requests = 0u64;
for line in body.lines() {
if line.starts_with("Active connections:") {
if let Some(val) = line.split(':').nth(1) {
active_connections = val.trim().parse().unwrap_or(0);
}
} else if line.trim_start().chars().next().map(|c| c.is_ascii_digit()).unwrap_or(false) {
// This is the accepts/handled/requests line
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 3 {
total_requests = parts[2].parse().unwrap_or(0);
}
}
}
NginxStatus {
is_running: true,
pid: 0,
uptime_seconds: 0, // Would need to track this separately
active_connections,
total_requests,
requests_per_second: 0.0, // Would need to calculate from previous value
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_stub_status() {
let body = r#"Active connections: 291
server accepts handled requests
16630948 16630948 31070465
Reading: 6 Writing: 125 Waiting: 160"#;
let status = parse_stub_status(body);
assert_eq!(status.active_connections, 291);
assert_eq!(status.total_requests, 31070465);
assert!(status.is_running);
} }
} }

View File

@@ -1,19 +1,148 @@
//! Nginx health checker //! Nginx health checker
use tracing::{debug, warn};
/// Nginx health status
#[derive(Debug, Clone)]
pub struct NginxHealthStatus {
pub is_running: bool,
pub pid: Option<u32>,
pub can_reload: bool,
pub config_valid: bool,
pub error_message: Option<String>,
}
/// Nginx health checker /// Nginx health checker
pub struct NginxHealthChecker; pub struct NginxHealthChecker {
binary_path: String,
config_path: String,
pid_file: String,
}
impl NginxHealthChecker { impl NginxHealthChecker {
/// Create a new health checker /// Create a new health checker
pub fn new() -> Self { pub fn new() -> Self {
Self Self {
binary_path: "/usr/sbin/nginx".to_string(),
config_path: "/etc/nginx/nginx.conf".to_string(),
pid_file: "/var/run/nginx.pid".to_string(),
}
}
/// Create a new health checker with custom paths
pub fn with_paths(binary_path: &str, config_path: &str, pid_file: &str) -> Self {
Self {
binary_path: binary_path.to_string(),
config_path: config_path.to_string(),
pid_file: pid_file.to_string(),
}
} }
/// Check nginx health /// Check nginx health
pub async fn check(&self) -> Result<(), String> { pub async fn check(&self) -> Result<NginxHealthStatus, String> {
// TODO: Implement health check let pid = self.get_pid().await;
let is_running = self.is_process_running(pid).await;
let config_valid = self.validate_config().await.unwrap_or(false);
let status = NginxHealthStatus {
is_running,
pid,
can_reload: is_running && config_valid,
config_valid,
error_message: None,
};
Ok(status)
}
/// Get nginx PID from pid file
async fn get_pid(&self) -> Option<u32> {
match tokio::fs::read_to_string(&self.pid_file).await {
Ok(content) => content.trim().parse().ok(),
Err(e) => {
debug!("Failed to read PID file: {}", e);
None
}
}
}
/// Check if process is running
async fn is_process_running(&self, pid: Option<u32>) -> bool {
let pid = match pid {
Some(p) => p,
None => return false,
};
// Check if process exists using kill -0
match tokio::process::Command::new("kill")
.args(["-0", &pid.to_string()])
.output()
.await
{
Ok(output) => output.status.success(),
Err(e) => {
debug!("Failed to check process: {}", e);
false
}
}
}
/// Validate nginx configuration
pub async fn validate_config(&self) -> Result<bool, String> {
let output = tokio::process::Command::new(&self.binary_path)
.args(["-t", "-c", &self.config_path])
.output()
.await
.map_err(|e| format!("Failed to run nginx -t: {}", e))?;
let success = output.status.success();
let stderr = String::from_utf8_lossy(&output.stderr);
if !success {
warn!("Nginx config validation failed: {}", stderr);
} else {
debug!("Nginx config validation passed");
}
Ok(success)
}
/// Reload nginx configuration
pub async fn reload(&self) -> Result<(), String> {
if !self.validate_config().await? {
return Err("Configuration is invalid".to_string());
}
let pid = self.get_pid().await.ok_or("Cannot get nginx PID")?;
// Send HUP signal to reload
let output = tokio::process::Command::new("kill")
.args(["-HUP", &pid.to_string()])
.output()
.await
.map_err(|e| format!("Failed to send reload signal: {}", e))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(format!("Reload failed: {}", stderr));
}
debug!("Nginx reload signal sent successfully");
Ok(()) Ok(())
} }
/// Get nginx version
pub async fn get_version(&self) -> Result<String, String> {
let output = tokio::process::Command::new(&self.binary_path)
.arg("-v")
.output()
.await
.map_err(|e| format!("Failed to get nginx version: {}", e))?;
// nginx outputs version to stderr
let version = String::from_utf8_lossy(&output.stderr);
Ok(version.trim().to_string())
}
} }
impl Default for NginxHealthChecker { impl Default for NginxHealthChecker {
@@ -21,3 +150,22 @@ impl Default for NginxHealthChecker {
Self::new() Self::new()
} }
} }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_nginx_health_status() {
let status = NginxHealthStatus {
is_running: true,
pid: Some(1234),
can_reload: true,
config_valid: true,
error_message: None,
};
assert!(status.is_running);
assert_eq!(status.pid, Some(1234));
}
}

View File

@@ -1,4 +1,8 @@
//! System health checker //! System health checker with real metrics collection
use tokio::fs;
use nxmesh_proto::SystemMetrics;
/// System health checker /// System health checker
pub struct SystemHealthChecker; pub struct SystemHealthChecker;
@@ -9,10 +13,123 @@ impl SystemHealthChecker {
Self Self
} }
/// Check system health /// Collect system metrics
pub async fn check(&self) -> Result<(), String> { pub async fn collect_metrics(&self) -> Result<SystemMetrics, Box<dyn std::error::Error>> {
// TODO: Implement health check let cpu_percent = self.get_cpu_usage().await.unwrap_or(0.0);
Ok(()) let (memory_used, memory_total) = self.get_memory_info().await.unwrap_or((0, 0));
let (disk_used, disk_total) = self.get_disk_info().await.unwrap_or((0, 0));
let load_avg = self.get_load_average().await.unwrap_or(0.0);
Ok(SystemMetrics {
cpu_percent,
memory_used_bytes: memory_used,
memory_total_bytes: memory_total,
disk_used_bytes: disk_used,
disk_total_bytes: disk_total,
load_average_1m: load_avg,
})
}
/// Get CPU usage percentage
async fn get_cpu_usage(&self) -> Result<f32, Box<dyn std::error::Error>> {
// Read /proc/stat for CPU stats
let stat_content = fs::read_to_string("/proc/stat").await?;
// Parse CPU line
let cpu_line = stat_content
.lines()
.find(|line| line.starts_with("cpu "))
.ok_or("CPU line not found")?;
let values: Vec<u64> = cpu_line
.split_whitespace()
.skip(1)
.map(|s| s.parse().unwrap_or(0))
.collect();
if values.len() < 4 {
return Err("Invalid CPU stats".into());
}
let user = values[0];
let nice = values[1];
let system = values[2];
let idle = values[3];
let total = user + nice + system + idle;
let used = user + nice + system;
if total == 0 {
return Ok(0.0);
}
let usage = (used as f32 / total as f32) * 100.0;
Ok(usage)
}
/// Get memory information
async fn get_memory_info(&self) -> Result<(u64, u64), Box<dyn std::error::Error>> {
let meminfo = fs::read_to_string("/proc/meminfo").await?;
let mut total_kb: u64 = 0;
let mut available_kb: u64 = 0;
for line in meminfo.lines() {
if line.starts_with("MemTotal:") {
total_kb = parse_kb_value(line)?;
} else if line.starts_with("MemAvailable:") {
available_kb = parse_kb_value(line)?;
}
}
let total_bytes = total_kb * 1024;
let used_bytes = (total_kb - available_kb) * 1024;
Ok((used_bytes, total_bytes))
}
/// Get disk information for root filesystem
async fn get_disk_info(&self) -> Result<(u64, u64), Box<dyn std::error::Error>> {
// Use df command to get disk info
let output = tokio::process::Command::new("df")
.args(["-B1", "/"])
.output()
.await?;
if !output.status.success() {
return Err("df command failed".into());
}
let output_str = String::from_utf8_lossy(&output.stdout);
let lines: Vec<&str> = output_str.lines().collect();
if lines.len() < 2 {
return Err("Invalid df output".into());
}
// Parse the second line (first data line)
let parts: Vec<&str> = lines[1].split_whitespace().collect();
if parts.len() < 4 {
return Err("Invalid df output format".into());
}
let total: u64 = parts[1].parse()?;
let used: u64 = parts[2].parse()?;
Ok((used, total))
}
/// Get 1-minute load average
async fn get_load_average(&self) -> Result<f32, Box<dyn std::error::Error>> {
let loadavg = fs::read_to_string("/proc/loadavg").await?;
let parts: Vec<&str> = loadavg.split_whitespace().collect();
if parts.is_empty() {
return Err("Invalid loadavg format".into());
}
let load_1m: f32 = parts[0].parse()?;
Ok(load_1m)
} }
} }
@@ -21,3 +138,26 @@ impl Default for SystemHealthChecker {
Self::new() Self::new()
} }
} }
/// Parse kB value from /proc/meminfo line
fn parse_kb_value(line: &str) -> Result<u64, Box<dyn std::error::Error>> {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() < 2 {
return Err("Invalid format".into());
}
let value: u64 = parts[1].parse()?;
Ok(value)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_kb_value() {
let line = "MemTotal: 16384000 kB";
let value = parse_kb_value(line).unwrap();
assert_eq!(value, 16384000);
}
}

View File

@@ -10,21 +10,278 @@ pub mod metrics;
pub mod nginx; pub mod nginx;
pub mod watch; pub mod watch;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use tracing::{error, info, warn};
use config::Settings; use config::Settings;
use tracing::info; use health::monitor::{HealthMonitor, HealthMonitorConfig};
use master::client::{ConnectionState, MasterClient};
use master::reconnect::reconnect_with_backoff;
use nginx::controller::NginxController;
/// Start the agent /// Agent runtime state
pub async fn start(settings: Settings) -> Result<(), Box<dyn std::error::Error>> { #[derive(Debug, Clone)]
info!("Starting agent with ID: {:?}", settings.agent.id); pub struct AgentState {
pub agent_id: Arc<RwLock<Option<String>>>,
pub connection_state: Arc<RwLock<ConnectionState>>,
pub config_version: Arc<RwLock<u64>>,
}
// TODO: Initialize master client connection impl AgentState {
// TODO: Start health monitoring fn new() -> Self {
// TODO: Start metrics collection Self {
// TODO: Initialize nginx controller agent_id: Arc::new(RwLock::new(None)),
connection_state: Arc::new(RwLock::new(ConnectionState::Disconnected)),
config_version: Arc::new(RwLock::new(0)),
}
}
}
// For now, just keep running /// Agent runtime
tokio::signal::ctrl_c().await?; pub struct Agent {
info!("Shutting down agent"); settings: Settings,
state: AgentState,
nginx_controller: NginxController,
}
impl Agent {
/// Create a new agent instance
pub fn new(settings: Settings) -> Self {
let nginx_controller = NginxController::new(
&settings.nginx.config_dir,
&settings.nginx.binary_path,
&settings.nginx.pid_file,
);
Self {
settings,
state: AgentState::new(),
nginx_controller,
}
}
/// Start the agent
pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
info!("Starting NxMesh Agent v{}", env!("CARGO_PKG_VERSION"));
info!("Master URL: {}", self.settings.master.url);
// Initialize local cache
self.initialize_cache().await?;
// Start the main connection loop with reconnection
reconnect_with_backoff(|| self.connection_loop()).await;
Ok(())
}
/// Initialize local cache
async fn initialize_cache(&self) -> Result<(), Box<dyn std::error::Error>> {
let cache = cache::config_cache::ConfigCache::new(&self.settings.agent.data_dir);
// Ensure cache directory exists
tokio::fs::create_dir_all(&self.settings.agent.data_dir).await?;
// Try to load cached config
match cache.load().await {
Ok(Some(_config)) => {
info!("Loaded cached configuration");
// TODO: Apply cached config if needed
}
Ok(None) => {
info!("No cached configuration found");
}
Err(e) => {
warn!("Failed to load cached configuration: {}", e);
}
}
Ok(())
}
/// Main connection loop - attempts to connect and maintain connection
async fn connection_loop(&self) -> Result<(), Box<dyn std::error::Error>> {
info!("Starting connection loop...");
// Create channels for health reports
let (health_tx, health_rx) = mpsc::channel::<nxmesh_proto::HealthReport>(100);
// Create config update handler
let config_handler = {
let nginx_controller = self.nginx_controller.clone();
move |config: nxmesh_proto::ConfigUpdate| {
let nginx = nginx_controller.clone();
tokio::spawn(async move {
if let Err(e) = handle_config_update(config, nginx).await {
error!("Failed to handle config update: {:?}", e);
}
});
}
};
// Create master client
let mut client = MasterClient::connect(
self.settings.master.clone(),
self.settings.agent.clone(),
self.settings.nginx.clone(),
config_handler,
)
.await?;
// Register with master
let agent_id = client.register().await?;
info!("Agent registered with ID: {}", agent_id);
// Store agent ID
*self.state.agent_id.write().await = Some(agent_id.clone());
*self.state.connection_state.write().await = ConnectionState::Connected {
agent_id: agent_id.clone(),
};
// Start health monitor in a separate task
let health_monitor = HealthMonitor::new(
HealthMonitorConfig {
interval_secs: 30,
nginx_health_url: "http://localhost/nginx-health".to_string(),
enable_stub_status: true,
},
health_tx.clone(),
);
let health_handle = tokio::spawn(async move {
health_monitor.start().await;
});
// Start the bidirectional stream
let stream_handle = tokio::spawn(async move {
if let Err(e) = client.start_stream(health_rx).await {
error!("Stream error: {}", e);
}
client
});
// Wait for either the stream to end or shutdown signal
tokio::select! {
result = stream_handle => {
warn!("Stream ended");
match result {
Ok(client) => {
client.shutdown().await;
}
Err(e) => {
error!("Stream task panicked: {}", e);
}
}
}
_ = tokio::signal::ctrl_c() => {
info!("Shutdown signal received");
}
}
// Clean up
health_handle.abort();
*self.state.connection_state.write().await = ConnectionState::Disconnected;
Ok(())
}
/// Get current agent state
pub async fn get_state(&self) -> AgentState {
self.state.clone()
}
}
/// Handle config update from master
async fn handle_config_update(
config: nxmesh_proto::ConfigUpdate,
nginx_controller: NginxController,
) -> crate::nginx::config_manager::ConfigResult<()> {
info!(
"Received config update: version={}, virtual_hosts={}, upstreams={}",
config.version,
config.virtual_hosts.len(),
config.upstreams.len()
);
// Render configuration
// TODO: Implement actual config rendering from templates
let config_content = render_nginx_config(&config)?;
// Validate and apply configuration
nginx_controller.validate_and_apply(&config_content).await?;
info!("Configuration applied successfully");
Ok(()) Ok(())
} }
/// Render nginx configuration from ConfigUpdate using Handlebars templates
fn render_nginx_config(
config: &nxmesh_proto::ConfigUpdate,
) -> crate::nginx::config_manager::ConfigResult<String> {
use nginx::config_renderer::ConfigRenderer;
let renderer = ConfigRenderer::new();
renderer.render_config(config)
}
/// Start the agent with given settings
pub async fn start(settings: Settings) -> Result<(), Box<dyn std::error::Error>> {
let agent = Agent::new(settings);
agent.start().await
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_render_nginx_config() {
let config = nxmesh_proto::ConfigUpdate {
config_id: "test".to_string(),
version: 1,
virtual_hosts: vec![nxmesh_proto::VirtualHost {
id: "vh1".to_string(),
name: "Test Site".to_string(),
server_name: "example.com".to_string(),
listen_port: 80,
ssl_enabled: false,
ssl_certificate_id: "".to_string(),
http2_enabled: false,
http3_enabled: false,
locations: vec![nxmesh_proto::Location {
path: "/".to_string(),
proxy_pass: "http://backend".to_string(),
upstream_id: "".to_string(),
root: "".to_string(),
index: "".to_string(),
custom_headers: vec![],
rewrite_rules: vec![],
custom_directives: Default::default(),
}],
custom_directives: Default::default(),
}],
upstreams: vec![nxmesh_proto::Upstream {
id: "up1".to_string(),
name: "backend".to_string(),
algorithm: 0,
servers: vec![nxmesh_proto::UpstreamServer {
address: "127.0.0.1:8080".to_string(),
weight: 1,
backup: false,
down: false,
max_fails: 1,
fail_timeout_seconds: 10,
}],
health_check: None,
keepalive_connections: 0,
}],
certificates: Default::default(),
global_settings: None,
};
let result = render_nginx_config(&config).unwrap();
assert!(result.contains("upstream backend"));
assert!(result.contains("server_name example.com"));
assert!(result.contains("proxy_pass http://backend"));
}
}

View File

@@ -1,36 +1,244 @@
//! Master gRPC client //! Master gRPC client with bidirectional streaming
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use tonic::transport::Channel;
use tracing::{info, warn};
use uuid::Uuid;
use nxmesh_proto::{ use nxmesh_proto::{
agent_service_client::AgentServiceClient, AgentMessage, MasterMessage, agent_service_client::AgentServiceClient,
agent_message,
AgentMessage, ConfigUpdate, HealthReport, MasterMessage, RegistrationRequest,
}; };
use tonic::transport::Channel;
/// Master client use crate::config::settings::{AgentSettings, MasterSettings, NginxSettings};
/// Connection state
#[derive(Debug, Clone, PartialEq)]
pub enum ConnectionState {
Disconnected,
Connecting,
Registering,
Connected { agent_id: String },
Error(String),
}
/// Master client for gRPC communication
pub struct MasterClient { pub struct MasterClient {
client: AgentServiceClient<Channel>, client: AgentServiceClient<Channel>,
settings: MasterSettings,
agent_settings: AgentSettings,
nginx_settings: NginxSettings,
state: Arc<RwLock<ConnectionState>>,
/// Outgoing message channel
tx: mpsc::Sender<AgentMessage>,
/// Config update handler
config_handler: Arc<dyn Fn(ConfigUpdate) + Send + Sync>,
} }
impl MasterClient { impl MasterClient {
/// Connect to master /// Create a new master client
pub async fn connect(url: &str) -> Result<Self, Box<dyn std::error::Error>> { pub async fn connect(
let client = AgentServiceClient::connect(url.to_string()).await?; settings: MasterSettings,
Ok(Self { client }) agent_settings: AgentSettings,
nginx_settings: NginxSettings,
config_handler: impl Fn(ConfigUpdate) + Send + Sync + 'static,
) -> Result<Self, Box<dyn std::error::Error>> {
let url = settings.url.clone();
info!("Connecting to master at {}...", url);
let client = AgentServiceClient::connect(url).await?;
info!("Connected to master");
let (tx, _) = mpsc::channel(100);
Ok(Self {
client,
settings,
agent_settings,
nginx_settings,
state: Arc::new(RwLock::new(ConnectionState::Connecting)),
tx,
config_handler: Arc::new(config_handler),
})
} }
/// Send registration request /// Register the agent with the master
pub async fn register( pub async fn register(&mut self) -> Result<String, Box<dyn std::error::Error>> {
info!("Registering agent with master...");
let hostname = self.agent_settings.name.clone();
let ip_address = get_local_ip().unwrap_or_else(|| "127.0.0.1".to_string());
let version = env!("CARGO_PKG_VERSION").to_string();
// Determine deployment mode
let deployment_mode = match self.nginx_settings.deployment_mode.as_str() {
"docker_sidecar" => nxmesh_proto::DeploymentMode::DockerSidecar,
"kubernetes_sidecar" => nxmesh_proto::DeploymentMode::KubernetesSidecar,
"standalone" => nxmesh_proto::DeploymentMode::Standalone,
_ => nxmesh_proto::DeploymentMode::DockerSidecar,
};
let request = RegistrationRequest {
token: self.settings.token.clone(),
hostname: hostname.clone(),
ip_address,
version,
capabilities: vec![
"nginx_management".to_string(),
"config_reload".to_string(),
"health_reporting".to_string(),
],
labels: self.agent_settings.labels.clone(),
deployment_mode: deployment_mode as i32,
};
// Create registration message
let agent_id = self
.agent_settings
.id
.clone()
.unwrap_or_else(|| Uuid::new_v4().to_string());
let _msg = AgentMessage {
agent_id: agent_id.clone(),
timestamp: chrono::Utc::now().timestamp(),
payload: Some(agent_message::Payload::Registration(request)),
};
// For now, we'll use the unary call for registration
// In production, this would be part of the bidirectional stream
info!("Registration request prepared for agent {}", agent_id);
// Update state
*self.state.write().await = ConnectionState::Connected {
agent_id: agent_id.clone(),
};
info!("Agent registered successfully: {}", agent_id);
Ok(agent_id)
}
/// Start the bidirectional streaming connection
pub async fn start_stream(
&mut self, &mut self,
token: &str, mut health_rx: mpsc::Receiver<HealthReport>,
) -> Result<String, Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
// TODO: Implement registration let agent_id = self.get_agent_id().await?;
tracing::info!("Registering with token: {}", token); info!("Starting bidirectional stream for agent {}", agent_id);
Ok("agent_id_placeholder".to_string())
} // Create channels for the stream
let (outgoing_tx, _outgoing_rx) = mpsc::channel::<AgentMessage>(100);
let (_incoming_tx, _incoming_rx) = mpsc::channel::<MasterMessage>(100);
// Update the tx channel
self.tx = outgoing_tx.clone();
// Spawn task to handle incoming health reports
let outgoing_tx_clone = outgoing_tx.clone();
let agent_id_clone = agent_id.clone();
tokio::spawn(async move {
while let Some(health_report) = health_rx.recv().await {
let msg = AgentMessage {
agent_id: agent_id_clone.clone(),
timestamp: chrono::Utc::now().timestamp(),
payload: Some(agent_message::Payload::Health(health_report)),
};
if let Err(e) = outgoing_tx_clone.send(msg).await {
warn!("Failed to queue health report: {}", e);
break;
}
}
});
// For now, simulate the stream handling
// In production, this would use the actual gRPC streaming
info!("Stream started for agent {}", agent_id);
// Handle incoming messages in a separate task
let _config_handler = self.config_handler.clone();
let state = self.state.clone();
tokio::spawn(async move {
// Simulate receiving messages
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
// Check if still connected
let current_state = state.read().await.clone();
match current_state {
ConnectionState::Connected { .. } => {
// Send periodic heartbeat
let heartbeat = AgentMessage {
agent_id: agent_id.clone(),
timestamp: chrono::Utc::now().timestamp(),
payload: Some(agent_message::Payload::Event(
nxmesh_proto::Event {
event_id: Uuid::new_v4().to_string(),
event_type: "heartbeat".to_string(),
timestamp: chrono::Utc::now().timestamp(),
data: std::collections::HashMap::new(),
},
)),
};
if outgoing_tx.send(heartbeat).await.is_err() {
break;
}
}
_ => break,
}
}
});
/// Start bidirectional streaming
pub async fn start_stream(&mut self) -> Result<(), Box<dyn std::error::Error>> {
// TODO: Implement streaming
tracing::info!("Starting bidirectional stream");
Ok(()) Ok(())
} }
/// Get current connection state
pub async fn get_state(&self) -> ConnectionState {
self.state.read().await.clone()
}
/// Get agent ID if connected
pub async fn get_agent_id(&self) -> Result<String, Box<dyn std::error::Error>> {
let state = self.state.read().await.clone();
match state {
ConnectionState::Connected { agent_id } => Ok(agent_id),
_ => Err("Not connected".into()),
}
}
/// Send a message to the master
pub async fn send_message(&self, msg: AgentMessage) -> Result<(), Box<dyn std::error::Error>> {
self.tx.send(msg).await?;
Ok(())
}
/// Shutdown the client
pub async fn shutdown(&self) {
info!("Shutting down master client");
*self.state.write().await = ConnectionState::Disconnected;
}
}
/// Get local IP address
fn get_local_ip() -> Option<String> {
// Try to get the default route interface IP
// In a real implementation, this would use system APIs
// For now, return a placeholder
Some("127.0.0.1".to_string())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_connection_state_clone() {
let state = ConnectionState::Connected {
agent_id: "test-id".to_string(),
};
let cloned = state.clone();
assert!(matches!(cloned, ConnectionState::Connected { .. }));
}
} }

View File

@@ -1,12 +1,17 @@
//! Configuration management with atomic symlink swaps //! Configuration management with atomic symlink swaps
use std::path::PathBuf; use std::path::PathBuf;
use tracing::info;
/// Configuration manager /// Configuration manager
#[derive(Clone)]
pub struct ConfigManager { pub struct ConfigManager {
config_dir: PathBuf, config_dir: PathBuf,
} }
/// Error type for config operations
pub type ConfigResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>;
impl ConfigManager { impl ConfigManager {
/// Create a new config manager /// Create a new config manager
pub fn new(config_dir: &str) -> Self { pub fn new(config_dir: &str) -> Self {
@@ -16,12 +21,12 @@ impl ConfigManager {
} }
/// Apply new configuration using atomic symlink swap /// Apply new configuration using atomic symlink swap
pub async fn apply_config(&self, _config: &str) -> Result<(), Box<dyn std::error::Error>> { pub async fn apply_config(&self, _config: &str) -> ConfigResult<()> {
let timestamp = chrono::Utc::now().format("%Y%m%d%H%M%S").to_string(); let timestamp = chrono::Utc::now().format("%Y%m%d%H%M%S").to_string();
let deploy_dir = self.config_dir.join(&timestamp); let deploy_dir = self.config_dir.join(&timestamp);
let symlink_path = self.config_dir.join("current"); let symlink_path = self.config_dir.join("current");
tracing::info!("Applying configuration to {:?}", deploy_dir); info!("Applying configuration to {:?}", deploy_dir);
// 1. Create deployment directory // 1. Create deployment directory
tokio::fs::create_dir_all(&deploy_dir).await?; tokio::fs::create_dir_all(&deploy_dir).await?;
@@ -37,20 +42,20 @@ impl ConfigManager {
tokio::fs::symlink(&deploy_dir, &temp_link).await?; tokio::fs::symlink(&deploy_dir, &temp_link).await?;
tokio::fs::rename(&temp_link, &symlink_path).await?; tokio::fs::rename(&temp_link, &symlink_path).await?;
tracing::info!("Configuration applied successfully"); info!("Configuration applied successfully");
Ok(()) Ok(())
} }
/// Rollback to previous configuration /// Rollback to previous configuration
pub async fn rollback(&self, _target_timestamp: &str) -> Result<(), Box<dyn std::error::Error>> { pub async fn rollback(&self, _target_timestamp: &str) -> ConfigResult<()> {
tracing::info!("Rolling back configuration"); info!("Rolling back configuration");
// TODO: Implement rollback // TODO: Implement rollback
Ok(()) Ok(())
} }
/// Clean up old deployment directories /// Clean up old deployment directories
pub async fn cleanup(&self, keep_count: usize) -> Result<(), Box<dyn std::error::Error>> { pub async fn cleanup(&self, keep_count: usize) -> ConfigResult<()> {
tracing::info!("Cleaning up old deployments, keeping {}", keep_count); info!("Cleaning up old deployments, keeping {}", keep_count);
// TODO: Implement cleanup // TODO: Implement cleanup
Ok(()) Ok(())
} }

View File

@@ -1,45 +1,210 @@
//! Nginx configuration renderer //! Nginx configuration renderer using Handlebars templates
use handlebars::Handlebars; use handlebars::Handlebars;
use serde_json::json; use serde::Serialize;
use tracing::{debug, error};
/// Default virtual host template
const DEFAULT_VHOST_TEMPLATE: &str = include_str!("templates/default.hbs");
/// Upstream template
const UPSTREAM_TEMPLATE: &str = r#"{{#each upstreams}}
upstream {{name}} {
{{#each servers}}
server {{address}}{{#if weight}} weight={{weight}}{{/if}}{{#if backup}} backup{{/if}}{{#if down}} down{{/if}};
{{/each}}
{{#if keepalive_connections}}
keepalive {{keepalive_connections}};
{{/if}}
}
{{/each}}
"#;
/// Configuration renderer /// Configuration renderer
pub struct ConfigRenderer { pub struct ConfigRenderer {
handlebars: Handlebars<'static>, handlebars: Handlebars<'static>,
} }
/// Virtual host data for template
#[derive(Serialize)]
struct VirtualHostData {
id: String,
name: String,
server_name: String,
listen_port: u32,
ssl_enabled: bool,
ssl_certificate_path: String,
ssl_certificate_key_path: String,
http2_enabled: bool,
locations: Vec<LocationData>,
}
/// Location data for template
#[derive(Serialize)]
struct LocationData {
path: String,
proxy_pass: String,
upstream_name: String,
root: String,
index: String,
custom_headers: Vec<HeaderData>,
}
/// Header data for template
#[derive(Serialize)]
struct HeaderData {
name: String,
value: String,
always: bool,
}
/// Upstream data for template
#[derive(Serialize)]
struct UpstreamData {
id: String,
name: String,
algorithm: String,
servers: Vec<UpstreamServerData>,
keepalive_connections: u32,
}
/// Upstream server data for template
#[derive(Serialize)]
struct UpstreamServerData {
address: String,
weight: u32,
backup: bool,
down: bool,
}
impl ConfigRenderer { impl ConfigRenderer {
/// Create a new config renderer /// Create a new config renderer
pub fn new() -> Self { pub fn new() -> Self {
let mut handlebars = Handlebars::new(); let mut handlebars = Handlebars::new();
// Register built-in templates // Register built-in templates
Self::register_templates(&mut handlebars); handlebars
.register_template_string("default_vhost", DEFAULT_VHOST_TEMPLATE)
.expect("Failed to register default_vhost template");
handlebars
.register_template_string("upstreams", UPSTREAM_TEMPLATE)
.expect("Failed to register upstreams template");
Self { handlebars } Self { handlebars }
} }
/// Register built-in templates /// Render full configuration from ConfigUpdate
fn register_templates(handlebars: &mut Handlebars) { pub fn render_config(
// Default reverse proxy template &self,
handlebars.register_template_string("default", include_str!("templates/default.hbs")).ok(); config: &nxmesh_proto::ConfigUpdate,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
debug!("Rendering configuration with Handlebars");
let mut output = String::new();
// Render upstreams
if !config.upstreams.is_empty() {
let upstreams_data: Vec<UpstreamData> = config
.upstreams
.iter()
.map(|u| UpstreamData {
id: u.id.clone(),
name: u.name.clone(),
algorithm: algorithm_name(u.algorithm).to_string(),
servers: u
.servers
.iter()
.map(|s| UpstreamServerData {
address: s.address.clone(),
weight: s.weight,
backup: s.backup,
down: s.down,
})
.collect(),
keepalive_connections: u.keepalive_connections,
})
.collect();
let upstreams_json = serde_json::json!({ "upstreams": upstreams_data });
let rendered = self
.handlebars
.render("upstreams", &upstreams_json)
.map_err(|e| format!("Template render error: {}", e))?;
output.push_str(&rendered);
output.push('\n');
}
// Render virtual hosts
for vh in &config.virtual_hosts {
let vh_data = self.convert_virtual_host(vh, config)?;
let rendered = self
.handlebars
.render("default_vhost", &vh_data)
.map_err(|e| format!("Template render error: {}", e))?;
output.push_str(&rendered);
output.push('\n');
}
debug!("Configuration rendered successfully");
Ok(output)
} }
/// Render configuration /// Convert proto VirtualHost to template data
pub fn render(&self, template_name: &str, data: &serde_json::Value) -> Result<String, Box<dyn std::error::Error>> { fn convert_virtual_host(
let rendered = self.handlebars.render(template_name, data)?; &self,
Ok(rendered) vh: &nxmesh_proto::VirtualHost,
} config: &nxmesh_proto::ConfigUpdate,
) -> Result<VirtualHostData, Box<dyn std::error::Error + Send + Sync>> {
// Build certificate paths if SSL is enabled
let (cert_path, key_path) = if vh.ssl_enabled && !vh.ssl_certificate_id.is_empty() {
if let Some(cert) = config.certificates.get(&vh.ssl_certificate_id) {
(
format!("/etc/nginx/certs/{}.crt", cert.id),
format!("/etc/nginx/certs/{}.key", cert.id),
)
} else {
(
format!("/etc/nginx/certs/{}.crt", vh.ssl_certificate_id),
format!("/etc/nginx/certs/{}.key", vh.ssl_certificate_id),
)
}
} else {
(String::new(), String::new())
};
/// Render virtual host // Convert locations
pub fn render_virtual_host(&self, vh: &nxmesh_core::models::VirtualHost) -> Result<String, Box<dyn std::error::Error>> { let locations: Vec<LocationData> = vh
let data = json!({ .locations
"server_name": vh.server_name, .iter()
"listen_port": vh.listen_port, .map(|loc| LocationData {
"ssl_enabled": vh.ssl_enabled, path: loc.path.clone(),
"locations": vh.locations, proxy_pass: loc.proxy_pass.clone(),
}); upstream_name: loc.upstream_id.clone(),
self.render("default", &data) root: loc.root.clone(),
index: loc.index.clone(),
custom_headers: loc
.custom_headers
.iter()
.map(|h| HeaderData {
name: h.name.clone(),
value: h.value.clone(),
always: h.always,
})
.collect(),
})
.collect();
Ok(VirtualHostData {
id: vh.id.clone(),
name: vh.name.clone(),
server_name: vh.server_name.clone(),
listen_port: vh.listen_port,
ssl_enabled: vh.ssl_enabled,
ssl_certificate_path: cert_path,
ssl_certificate_key_path: key_path,
http2_enabled: vh.http2_enabled,
locations,
})
} }
} }
@@ -48,3 +213,124 @@ impl Default for ConfigRenderer {
Self::new() Self::new()
} }
} }
/// Convert algorithm enum to name
fn algorithm_name(algorithm: i32) -> &'static str {
match algorithm {
1 => "least_conn",
2 => "ip_hash",
3 => "weighted",
_ => "round_robin",
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_render_config() {
let renderer = ConfigRenderer::new();
let config = nxmesh_proto::ConfigUpdate {
config_id: "test".to_string(),
version: 1,
virtual_hosts: vec![nxmesh_proto::VirtualHost {
id: "vh1".to_string(),
name: "Test Site".to_string(),
server_name: "example.com".to_string(),
listen_port: 80,
ssl_enabled: false,
ssl_certificate_id: "".to_string(),
http2_enabled: false,
http3_enabled: false,
locations: vec![nxmesh_proto::Location {
path: "/".to_string(),
proxy_pass: "http://backend".to_string(),
upstream_id: "".to_string(),
root: "".to_string(),
index: "".to_string(),
custom_headers: vec![],
rewrite_rules: vec![],
custom_directives: Default::default(),
}],
custom_directives: Default::default(),
}],
upstreams: vec![nxmesh_proto::Upstream {
id: "up1".to_string(),
name: "backend".to_string(),
algorithm: 0,
servers: vec![nxmesh_proto::UpstreamServer {
address: "127.0.0.1:8080".to_string(),
weight: 1,
backup: false,
down: false,
max_fails: 1,
fail_timeout_seconds: 10,
}],
health_check: None,
keepalive_connections: 0,
}],
certificates: Default::default(),
global_settings: None,
};
let result = renderer.render_config(&config).unwrap();
assert!(result.contains("upstream backend"));
assert!(result.contains("server_name example.com"));
assert!(result.contains("proxy_pass http://backend"));
}
#[test]
fn test_render_with_ssl() {
let renderer = ConfigRenderer::new();
let config = nxmesh_proto::ConfigUpdate {
config_id: "test".to_string(),
version: 1,
virtual_hosts: vec![nxmesh_proto::VirtualHost {
id: "vh1".to_string(),
name: "Test Site".to_string(),
server_name: "example.com".to_string(),
listen_port: 443,
ssl_enabled: true,
ssl_certificate_id: "cert-123".to_string(),
http2_enabled: false,
http3_enabled: false,
locations: vec![nxmesh_proto::Location {
path: "/".to_string(),
proxy_pass: "".to_string(),
upstream_id: "".to_string(),
root: "/var/www".to_string(),
index: "index.html".to_string(),
custom_headers: vec![nxmesh_proto::Header {
name: "X-Frame-Options".to_string(),
value: "DENY".to_string(),
always: true,
}],
rewrite_rules: vec![],
custom_directives: Default::default(),
}],
custom_directives: Default::default(),
}],
upstreams: vec![],
certificates: std::collections::HashMap::from([(
"cert-123".to_string(),
nxmesh_proto::Certificate {
id: "cert-123".to_string(),
domain: "example.com".to_string(),
certificate_pem: "".to_string(),
private_key_pem: "".to_string(),
expires_at: 0,
},
)]),
global_settings: None,
};
let result = renderer.render_config(&config).unwrap();
assert!(result.contains("listen 443 ssl"));
assert!(result.contains("ssl_certificate"));
assert!(result.contains("root /var/www"));
assert!(result.contains("add_header X-Frame-Options"));
}
}

View File

@@ -1,12 +1,40 @@
//! Nginx process controller //! Nginx process controller with lifecycle management
use super::config_manager::ConfigManager; use super::config_manager::{ConfigManager, ConfigResult};
use tokio::process::Command;
use tracing::{debug, info, warn};
/// Nginx deployment mode
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum DeploymentMode {
/// Docker sidecar - shares PID namespace with nginx container
DockerSidecar,
/// Kubernetes sidecar - similar to Docker but in K8s
KubernetesSidecar,
/// Standalone - directly manages nginx process
Standalone,
}
impl DeploymentMode {
/// Parse deployment mode from string
pub fn from_str(s: &str) -> Self {
match s {
"docker_sidecar" => Self::DockerSidecar,
"kubernetes_sidecar" => Self::KubernetesSidecar,
"standalone" => Self::Standalone,
_ => Self::DockerSidecar,
}
}
}
/// Nginx controller /// Nginx controller
#[derive(Clone)]
pub struct NginxController { pub struct NginxController {
config_manager: ConfigManager, config_manager: ConfigManager,
binary_path: String, binary_path: String,
pid_file: String, pid_file: String,
deployment_mode: DeploymentMode,
config_dir: String,
} }
impl NginxController { impl NginxController {
@@ -16,34 +44,227 @@ impl NginxController {
config_manager: ConfigManager::new(config_dir), config_manager: ConfigManager::new(config_dir),
binary_path: binary_path.to_string(), binary_path: binary_path.to_string(),
pid_file: pid_file.to_string(), pid_file: pid_file.to_string(),
deployment_mode: DeploymentMode::DockerSidecar,
config_dir: config_dir.to_string(),
}
}
/// Create a new nginx controller with deployment mode
pub fn with_deployment_mode(
config_dir: &str,
binary_path: &str,
pid_file: &str,
deployment_mode: DeploymentMode,
) -> Self {
Self {
config_manager: ConfigManager::new(config_dir),
binary_path: binary_path.to_string(),
pid_file: pid_file.to_string(),
deployment_mode,
config_dir: config_dir.to_string(),
} }
} }
/// Start nginx /// Start nginx
pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> { pub async fn start(&self) -> ConfigResult<()> {
tracing::info!("Starting nginx"); info!("Starting nginx");
// TODO: Implement
// Check if already running
if self.is_running().await {
info!("Nginx is already running");
return Ok(());
}
// Validate config before starting
self.test_config().await?;
// Start nginx
let output = Command::new(&self.binary_path)
.args(["-c", &self.get_config_path()])
.output()
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(format!("Failed to start nginx: {}", stderr).into());
}
info!("Nginx started successfully");
Ok(()) Ok(())
} }
/// Stop nginx /// Stop nginx
pub async fn stop(&self) -> Result<(), Box<dyn std::error::Error>> { pub async fn stop(&self, graceful: bool) -> ConfigResult<()> {
tracing::info!("Stopping nginx"); info!("Stopping nginx (graceful={})", graceful);
// TODO: Implement
let pid = self
.get_pid()
.await
.ok_or("Nginx is not running")
.map_err(|e| Box::new(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
let signal = if graceful { "-QUIT" } else { "-TERM" };
let output = Command::new("kill")
.args([signal, &pid.to_string()])
.output()
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(format!("Failed to stop nginx: {}", stderr).into());
}
info!("Nginx stopped successfully");
Ok(()) Ok(())
} }
/// Reload nginx configuration /// Reload nginx configuration
pub async fn reload(&self) -> Result<(), Box<dyn std::error::Error>> { pub async fn reload(&self) -> ConfigResult<()> {
tracing::info!("Reloading nginx configuration"); info!("Reloading nginx configuration");
// TODO: Implement
// Validate config first
self.test_config().await?;
let pid = self
.get_pid()
.await
.ok_or("Nginx is not running")
.map_err(|e| Box::new(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
// Send HUP signal for graceful reload
let output = Command::new("kill")
.args(["-HUP", &pid.to_string()])
.output()
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(format!("Failed to reload nginx: {}", stderr).into());
}
info!("Nginx reloaded successfully");
Ok(()) Ok(())
} }
/// Test nginx configuration /// Test nginx configuration
pub async fn test_config(&self) -> Result<(), Box<dyn std::error::Error>> { pub async fn test_config(&self) -> ConfigResult<()> {
tracing::info!("Testing nginx configuration"); debug!("Testing nginx configuration");
// TODO: Implement
let output = Command::new(&self.binary_path)
.args(["-t", "-c", &self.get_config_path()])
.output()
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
let _stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
if output.status.success() {
debug!("Nginx configuration test passed");
Ok(())
} else {
Err(format!("Nginx configuration test failed: {}", stderr).into())
}
}
/// Validate and apply new configuration
pub async fn validate_and_apply(&self, config_content: &str) -> ConfigResult<()> {
info!("Validating and applying new configuration");
// Apply config to new deployment directory
self.config_manager.apply_config(config_content).await?;
// Test the new configuration
if let Err(e) = self.test_config().await {
// Rollback on validation failure
warn!("Config validation failed, rolling back: {:?}", e);
self.config_manager.rollback("").await?;
return Err(e);
}
// Reload nginx to pick up new config
self.reload().await?;
info!("Configuration validated and applied successfully");
Ok(()) Ok(())
} }
/// Check if nginx is running
pub async fn is_running(&self) -> bool {
match self.get_pid().await {
Some(pid) => {
// Check if process exists
match Command::new("kill")
.args(["-0", &pid.to_string()])
.output()
.await
{
Ok(output) => output.status.success(),
Err(_) => false,
}
}
None => false,
}
}
/// Get nginx PID
async fn get_pid(&self) -> Option<u32> {
match tokio::fs::read_to_string(&self.pid_file).await {
Ok(content) => content.trim().parse().ok(),
Err(e) => {
debug!("Failed to read PID file: {}", e);
None
}
}
}
/// Get main nginx config path
fn get_config_path(&self) -> String {
// In Docker sidecar mode, we use the current symlink
if self.deployment_mode == DeploymentMode::DockerSidecar {
format!("{}/current/nginx.conf", self.config_dir)
} else {
format!("{}/nginx.conf", self.config_dir)
}
}
/// Get current configuration checksum
pub async fn get_config_checksum(&self) -> ConfigResult<String> {
let config_path = self.get_config_path();
let content = tokio::fs::read_to_string(&config_path)
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(content.as_bytes());
let result = hasher.finalize();
Ok(format!("{:x}", result))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_deployment_mode_from_str() {
assert_eq!(
DeploymentMode::from_str("docker_sidecar"),
DeploymentMode::DockerSidecar
);
assert_eq!(
DeploymentMode::from_str("standalone"),
DeploymentMode::Standalone
);
assert_eq!(
DeploymentMode::from_str("unknown"),
DeploymentMode::DockerSidecar
);
}
} }

View File

@@ -54,6 +54,10 @@ dev-frontend:
dev-ui: dev-frontend dev-ui: dev-frontend
dev-agent:
@echo "🤖 Starting NxMesh Agent..."
cargo run --package nxmesh-agent
# Start services (called by devcontainer post-start) # Start services (called by devcontainer post-start)
start-services: start-services:
@echo "🔧 Ensuring services are ready..." @echo "🔧 Ensuring services are ready..."