diff --git a/apps/nxmesh-agent/Cargo.toml b/apps/nxmesh-agent/Cargo.toml index 077ffda..8c833b8 100644 --- a/apps/nxmesh-agent/Cargo.toml +++ b/apps/nxmesh-agent/Cargo.toml @@ -29,9 +29,6 @@ tracing-subscriber.workspace = true # gRPC tonic.workspace = true -# HTTP -reqwest.workspace = true - # Async async-trait.workspace = true futures.workspace = true @@ -54,6 +51,12 @@ uuid.workspace = true # Hostname hostname = "0.4" +# Certificates +zip = { workspace = true } + +# CLI +clap = { workspace = true, features = ["derive"] } + [dev-dependencies] tokio-test.workspace = true mockall.workspace = true diff --git a/apps/nxmesh-agent/src/cli/mod.rs b/apps/nxmesh-agent/src/cli/mod.rs new file mode 100644 index 0000000..676c83b --- /dev/null +++ b/apps/nxmesh-agent/src/cli/mod.rs @@ -0,0 +1,81 @@ +use clap::{Parser, Subcommand}; + +#[derive(Parser)] +#[command(version, about, long_about = None)] +pub struct Cli { + /// Start the agent server + #[arg(short, long, group = "mode")] + pub serve: bool, + + #[command(subcommand)] + pub command: Option, +} + +#[derive(Subcommand)] +pub enum Commands { + #[command(about = "Import certificates for agent from zip file or separate cert and key files")] + ImportCerts { + // Zip file input, mutually exclusive with separate cert and key file inputs + /// Zip file containing ca.pem cert.pem and key.pem + #[arg(value_name = "ZIP_FILE", group = "input_source")] + zip: Option, + /// Certificate name in zip file, required if using zip input + #[arg( + long, + group = "input_source", + requires = "zip", + default_value = "cert.pem", + value_name = "CERT_NAME" + )] + cert_name: Option, + /// Key name in zip file, required if using zip input + #[arg( + long, + group = "input_source", + requires = "zip", + default_value = "key.pem", + value_name = "KEY_NAME" + )] + key_name: Option, + /// CA certificate name in zip file, required if using zip input + #[arg( + long, + group = "input_source", + requires = "zip", + default_value = "ca.pem", + value_name = "CA_NAME" + )] + ca_name: Option, + + // Separate cert and key file inputs, required if not using zip input + /// Certificate file path + #[arg( + long, + group = "input_source", + requires = "key", + conflicts_with = "zip", + value_name = "CERT_FILE" + )] + cert: Option, + + /// Key file path + #[arg( + long, + group = "input_source", + requires = "cert", + conflicts_with = "zip", + value_name = "KEY_FILE" + )] + key: Option, + + /// Master CA certificate file path for verifying master identity, optional if the CA certificate is already trusted by the system + /// This is required if the master server uses a self-signed certificate that is not trusted by the system + #[arg( + long, + group = "input_source", + conflicts_with = "zip", + value_name = "CA_CERT_FILE" + )] + ca_cert: Option, + }, +} diff --git a/apps/nxmesh-agent/src/config/mod.rs b/apps/nxmesh-agent/src/config/mod.rs new file mode 100644 index 0000000..6e98cef --- /dev/null +++ b/apps/nxmesh-agent/src/config/mod.rs @@ -0,0 +1 @@ +pub mod settings; diff --git a/apps/nxmesh-agent/src/config/settings.rs b/apps/nxmesh-agent/src/config/settings.rs new file mode 100644 index 0000000..2a579c1 --- /dev/null +++ b/apps/nxmesh-agent/src/config/settings.rs @@ -0,0 +1,333 @@ +use config::{Config, ConfigError, Environment, File}; +use serde::{Deserialize, Deserializer, Serialize}; +use std::{os::unix::fs::PermissionsExt, str::FromStr}; +use tracing::level_filters::LevelFilter; + +const NGINX_BINARY_PATH_TEMPLATE: &str = "{{nginx_binary_path}}"; +const NGINX_DEFAULT_BINARY: &str = "nginx"; + +type ValidationError = String; + +trait Validate { + fn validate(&self) -> Result<(), ValidationError>; +} + +/// Agent settings +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Settings { + pub grpc: GrpcSettings, + #[serde(default)] + pub log: LogSettings, + pub nginx: Option, +} + +/// gRPC client settings +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GrpcSettings { + pub connection_string: String, + pub m_auth: MAuthSettings, + #[serde(default)] + pub cors: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum MAuthSettings { + Tls(TLSSettings), +} + +/// TLS certificate settings +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum TLSSettings { + RawPath { + ca_path: String, + cert_path: String, + key_path: String, + }, + ZipPath { + cert_zip_path: String, + }, +} + +/// CORS settings +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct CorsSettings { + #[serde(default)] + pub allowed_origins: Vec, + #[serde(default)] + pub allowed_methods: Vec, + #[serde(default)] + pub allowed_headers: Vec, + #[serde(default)] + pub allow_credentials: bool, +} + +/// Logging settings +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LogSettings { + #[serde( + deserialize_with = "deserialize_level_filter", + serialize_with = "serialize_level_filter" + )] + pub level: LevelFilter, +} + +impl Default for LogSettings { + fn default() -> Self { + Self { + level: default_log_level(), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NginxSettings { + #[serde(default = "default_nginx_config_path")] + pub nginx_config_path: String, + // #[serde(default = "default_nginx_binary_path")] + #[serde(default)] + pub nginx_binary_path: Option, + // commands + #[serde(default = "default_nginx_reload_command")] + pub override_nginx_reload_command: Vec, + #[serde(default = "default_nginx_test_command")] + pub override_nginx_test_command: Vec, + // timeouts + #[serde(default = "default_nginx_reload_timeout_seconds")] + pub nginx_reload_timeout_seconds: u64, + #[serde(default = "default_nginx_test_timeout_seconds")] + pub nginx_test_timeout_seconds: u64, +} + +impl Validate for Settings { + fn validate(&self) -> Result<(), ValidationError> { + self.grpc.validate()?; + if let Some(nginx) = &self.nginx { + nginx.validate()?; + } + Ok(()) + } +} + +impl Settings { + /// Load settings from config files and environment + pub fn load() -> Result { + let run_mode = std::env::var("RUN_MODE").unwrap_or_else(|_| "development".into()); + + let settings = Config::builder() + .add_source(File::with_name("config/default").required(false)) + .add_source(File::with_name(&format!("config/{}", run_mode)).required(false)) + .add_source(File::with_name("config/agent/default").required(false)) + .add_source(File::with_name(&format!("config/agent/{}", run_mode)).required(false)) + .add_source(Environment::with_prefix("NXMESH").separator("__")) + .build()?; + + let mut settings: Self = settings.try_deserialize()?; + + settings.validate().map_err(ConfigError::Message)?; + + if let Some(nginx) = &mut settings.nginx { + nginx.validate().map_err(ConfigError::Message)?; + + // replace binary path template in commands with actual binary path, if the template is present + nginx + .override_nginx_reload_command + .iter_mut() + .for_each(|cmd| { + *cmd = cmd.replace( + NGINX_BINARY_PATH_TEMPLATE, + &nginx + .nginx_binary_path + .clone() + .unwrap_or_else(|| NGINX_DEFAULT_BINARY.into()), + ); + }); + nginx + .override_nginx_test_command + .iter_mut() + .for_each(|cmd| { + *cmd = cmd.replace( + NGINX_BINARY_PATH_TEMPLATE, + &nginx + .nginx_binary_path + .clone() + .unwrap_or_else(|| NGINX_DEFAULT_BINARY.into()), + ); + }); + } + + Ok(settings) + } +} + +impl Validate for GrpcSettings { + fn validate(&self) -> Result<(), ValidationError> { + if self.connection_string.is_empty() { + return Err("gRPC connection string cannot be empty".into()); + } + self.m_auth.validate()?; + if let Some(cors) = &self.cors { + cors.validate()?; + } + Ok(()) + } +} + +impl Validate for MAuthSettings { + fn validate(&self) -> Result<(), ValidationError> { + match self { + MAuthSettings::Tls(tls_settings) => tls_settings.validate()?, + } + Ok(()) + } +} + +impl Validate for TLSSettings { + fn validate(&self) -> Result<(), ValidationError> { + match self { + TLSSettings::RawPath { + ca_path, + cert_path, + key_path, + } => { + if !std::path::Path::new(ca_path).exists() { + return Err(format!("CA file not found: {}", ca_path)); + } + if !std::path::Path::new(cert_path).exists() { + return Err(format!("Certificate file not found: {}", cert_path)); + } + if !std::path::Path::new(key_path).exists() { + return Err(format!("Key file not found: {}", key_path)); + } + } + TLSSettings::ZipPath { cert_zip_path } => { + if !std::path::Path::new(cert_zip_path).exists() { + return Err(format!("Certificate zip file not found: {}", cert_zip_path)); + } + } + } + Ok(()) + } +} + +impl Validate for CorsSettings { + fn validate(&self) -> Result<(), ValidationError> { + Ok(()) + } +} + +impl Validate for NginxSettings { + fn validate(&self) -> Result<(), ValidationError> { + match &self.nginx_binary_path { + Some(path) if path.is_empty() => { + return Err("Nginx binary path cannot be empty".into()); + } + Some(path) if !std::path::Path::new(path).exists() => { + return Err(format!("Nginx binary not found: {}", path)); + } + Some(path) + if !std::fs::metadata(path) + .map_err(|e| format!("Failed to read nginx binary metadata: {}", e))? + .permissions() + .mode() + & 0o111 + != 0 => + { + return Err(format!("Nginx binary is not executable: {}", path)); + } + _ => {} + } + if self.nginx_config_path.is_empty() { + return Err("Nginx config path cannot be empty".into()); + } + if !std::path::Path::new(&self.nginx_config_path).exists() { + return Err(format!( + "Nginx config file not found: {}", + self.nginx_config_path + )); + } + + // ensure reload and test commands contain the binary path template + if !&self + .override_nginx_reload_command + .join(" ") + .contains(NGINX_BINARY_PATH_TEMPLATE) + { + return Err(format!( + "Nginx reload command must contain the binary path template '{}': {}", + NGINX_BINARY_PATH_TEMPLATE, + self.override_nginx_reload_command.join(" ") + )); + } + if !&self + .override_nginx_test_command + .join(" ") + .contains(NGINX_BINARY_PATH_TEMPLATE) + { + return Err(format!( + "Nginx test command must contain the binary path template '{}': {}", + NGINX_BINARY_PATH_TEMPLATE, + self.override_nginx_test_command.join(" ") + )); + } + Ok(()) + } +} + +fn default_log_level() -> LevelFilter { + LevelFilter::INFO +} + +fn default_nginx_config_path() -> String { + "/etc/nginx/nginx.conf".into() +} + +fn default_nginx_reload_command() -> Vec { + vec![ + NGINX_BINARY_PATH_TEMPLATE.to_string(), + "-s".to_string(), + "reload".to_string(), + ] +} + +fn default_nginx_test_command() -> Vec { + vec![NGINX_BINARY_PATH_TEMPLATE.to_string(), "-t".to_string()] +} + +fn default_nginx_reload_timeout_seconds() -> u64 { + 30 +} + +fn default_nginx_test_timeout_seconds() -> u64 { + 30 +} + +fn deserialize_level_filter<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let s = String::deserialize(deserializer)?; + LevelFilter::from_str(&s).map_err(serde::de::Error::custom) +} + +fn serialize_level_filter(level: &LevelFilter, serializer: S) -> Result +where + S: serde::Serializer, +{ + serializer.serialize_str(&level.to_string()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_esnure_send_and_sync() { + fn assert_send_sync() {} + assert_send_sync::(); + assert_send_sync::(); + assert_send_sync::(); + assert_send_sync::(); + assert_send_sync::(); + assert_send_sync::(); + } +} diff --git a/apps/nxmesh-agent/src/connector/master/mod.rs b/apps/nxmesh-agent/src/connector/master/mod.rs new file mode 100644 index 0000000..da408b2 --- /dev/null +++ b/apps/nxmesh-agent/src/connector/master/mod.rs @@ -0,0 +1,40 @@ +use std::sync::Arc; + +use tokio::sync::Mutex; + +pub mod ssh; + +pub type AgentClient = nxmesh_proto::agent_service_client::AgentServiceClient; + +#[async_trait::async_trait] +pub trait MasterConnectorTrait: Send + Sync { + async fn connect( + &mut self, + settings: &crate::config::settings::Settings, + ) -> Result<(), Box>; + fn get_client(&self) -> Arc>; +} + +pub struct MasterConnector { + connector: Box, +} + +impl MasterConnector { + pub fn new(connector: Box) -> Self { + Self { connector } + } +} + +#[async_trait::async_trait] +impl MasterConnectorTrait for MasterConnector { + async fn connect( + &mut self, + settings: &crate::config::settings::Settings, + ) -> Result<(), Box> { + self.connector.connect(settings).await + } + + fn get_client(&self) -> Arc> { + self.connector.get_client() + } +} diff --git a/apps/nxmesh-agent/src/connector/master/ssh.rs b/apps/nxmesh-agent/src/connector/master/ssh.rs new file mode 100644 index 0000000..2414bf4 --- /dev/null +++ b/apps/nxmesh-agent/src/connector/master/ssh.rs @@ -0,0 +1,132 @@ +use std::{fs::File, io::Read, sync::Arc}; + +use tokio::{fs::read, sync::Mutex}; + +use nxmesh_proto::agent_service_client::AgentServiceClient; +use tonic::transport::{Certificate, ClientTlsConfig, Identity}; +use tracing::warn; + +use crate::config::settings::{self, MAuthSettings, TLSSettings}; + +use super::{AgentClient, MasterConnectorTrait}; + +pub struct SshMasterConnector { + client: Arc>, +} + +impl SshMasterConnector { + pub async fn new( + settings: crate::config::settings::GrpcSettings, + ) -> Result> { + let tls_config = Self::generate_tls_config(&settings.m_auth).await?; + // Create a gRPC channel + let endpoint = tonic::transport::Channel::from_shared(settings.connection_string.clone()) + .map_err(|e| format!("Failed to create gRPC endpoint: {}", e))? + .tls_config(tls_config) + .map_err(|e| { + format!( + "Failed to set TLS config: {}. Ensure TLS settings and certificates are correct.", + e + ) + })? + .connect_timeout(std::time::Duration::from_secs(5)) + .timeout(std::time::Duration::from_secs(10)) + .connect_lazy(); + + // Create the gRPC client + let client = Arc::new(Mutex::new(AgentServiceClient::new(endpoint))); + Ok(Self { client }) + } + + async fn generate_tls_config( + settings: &MAuthSettings, + ) -> Result> { + let tls_config = match &settings { + MAuthSettings::Tls(tls_settings) => { + let (ca, cert, key) = match tls_settings { + TLSSettings::RawPath { + ca_path, + cert_path, + key_path, + } => { + // Read the certificate and key from the specified file paths + let ca = read(ca_path).await?; + let cert = read(cert_path).await?; + let key = read(key_path).await?; + (ca, cert, key) + } + TLSSettings::ZipPath { cert_zip_path } => { + // Extract the certificate and key from the zip file + Self::extract_certificate(cert_zip_path).await? + } + }; + + // TODO: allow skipping SANs validation if specified in the settings + ClientTlsConfig::new() + .ca_certificate(Certificate::from_pem(&ca)) + .identity(Identity::from_pem(&cert, &key)) + } + #[allow(unreachable_patterns)] + _ => { + return Err("TLS settings are required for SSH connection".into()); + } + }; + + Ok(tls_config) + } + + async fn extract_certificate( + cert_zip_path: &str, + ) -> Result<(Vec, Vec, Vec), Box> { + // unzip the file and extract the cert, ca and key + let file = File::open(cert_zip_path)?; + let mut archive = zip::ZipArchive::new(file)?; + let mut cert = Vec::new(); + let mut key = Vec::new(); + let mut ca = Vec::new(); + + for i in 0..archive.len() { + let mut file = archive.by_index(i)?; + let outpath = match file.enclosed_name() { + Some(path) => path.to_owned(), + None => continue, + }; + let file_name = outpath + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or_default(); + if file_name != "cert.pem" && file_name != "key.pem" && file_name != "ca.pem" { + warn!("Unexpected file in certificate zip: {}", file_name); + continue; + } + if file_name == "cert.pem" { + file.read_to_end(&mut cert)?; + } else if file_name == "key.pem" { + file.read_to_end(&mut key)?; + } else if file_name == "ca.pem" { + file.read_to_end(&mut ca)?; + } + } + + if cert.is_empty() || key.is_empty() || ca.is_empty() { + return Err("Certificate zip must contain cert.pem, key.pem and ca.pem".into()); + } + + Ok((ca, cert, key)) + } +} + +#[async_trait::async_trait] +impl MasterConnectorTrait for SshMasterConnector { + async fn connect( + &mut self, + _settings: &crate::config::settings::Settings, + ) -> Result<(), Box> { + // ensure connection if required + Ok(()) + } + + fn get_client(&self) -> Arc> { + self.client.clone() + } +} diff --git a/apps/nxmesh-agent/src/connector/mod.rs b/apps/nxmesh-agent/src/connector/mod.rs new file mode 100644 index 0000000..0d22726 --- /dev/null +++ b/apps/nxmesh-agent/src/connector/mod.rs @@ -0,0 +1 @@ +pub mod master; \ No newline at end of file diff --git a/apps/nxmesh-agent/src/main.rs b/apps/nxmesh-agent/src/main.rs index e69de29..e3dc71f 100644 --- a/apps/nxmesh-agent/src/main.rs +++ b/apps/nxmesh-agent/src/main.rs @@ -0,0 +1,93 @@ +#![forbid(unsafe_code)] +#![deny(clippy::unwrap_used, clippy::panic, clippy::expect_used)] + +use std::process::exit; + +use tracing::{error, info}; +use tracing_subscriber::{ + Layer, filter::LevelFilter, fmt, layer::SubscriberExt, registry::Registry, reload, + util::SubscriberInitExt, +}; + +use crate::connector::master::{MasterConnector, MasterConnectorTrait, ssh::SshMasterConnector}; + +mod cli; +mod config; +mod connector; + +#[tokio::main] +async fn main() { + // install a global subscriber for logging + let reload_handle = install_tracing_subscriber(); + // Load configuration settings + let settings = match config::settings::Settings::load() { + Ok(s) => s, + Err(e) => { + error!("Failed to load configuration: {}", e); + std::process::exit(1); + } + }; + + reload_handle + .modify(|filter| *filter = Box::new(settings.log.level)) + .inspect_err(|e| { + error!( + "Failed to set log level: {}. Continuing with default level.", + e + ) + }) + // ignore errors here since we can still run with the default log level + .ok(); + + // print the loaded settings for debugging + // info!("Loaded settings: {:#?}", settings); + + info!("Starting NxMesh Agent..."); + // install grpc client + #[expect(clippy::expect_used)] + let ssh_connector = SshMasterConnector::new(settings.grpc.clone()) + .await + .inspect_err(|e| { + error!("Failed to create SSH Master Connector: {}", e); + exit(1); + }) + .expect("Failed to create SSH Master Connector"); + let mut master_connector = MasterConnector::new(Box::new(ssh_connector)); + + if let Err(e) = master_connector.connect(&settings).await { + error!("Failed to connect to master: {}", e); + exit(1); + } + + // send a dummy heartbeat to verify the connection is working + let client = master_connector.get_client(); + + let request = nxmesh_proto::HealthReport { + ..Default::default() + }; + + match client.lock().await.report_health(request).await { + Ok(_) => info!("Successfully sent health report to master."), + Err(e) => { + error!("Failed to send health report to master: {}", e); + exit(1); + } + } + + info!("Successfully connected to master. Agent is running."); +} + +fn install_tracing_subscriber() +-> reload::Handle + Send + Sync>, Registry> { + let filter = LevelFilter::INFO; + let (filter_layer, reload_handle) = + reload::Layer::new(Box::new(fmt::layer().with_filter(filter)) + as Box + Send + Sync>); + + tracing_subscriber::registry() + .with(filter_layer) + .with(fmt::Layer::default()) + .init(); + + reload_handle +}