Merge pull request 'feature/nginx-handler' (#6) from feature/nginx-handler into master
Some checks failed
Test / get-ci-image (push) Successful in 6s
Test / lint-frontend (push) Successful in 13s
Test / test-frontend (push) Successful in 23s
Test / frontend-build (push) Successful in 15s
Verify / get-ci-image (push) Successful in 5s
Test / test-crates (push) Failing after 1m27s
Test / lint-crates (push) Failing after 1m25s
Verify / verify-generated-db-entities (push) Successful in 1m53s

Reviewed-on: #6
This commit was merged in pull request #6.
This commit is contained in:
2026-06-19 19:13:15 +08:00
40 changed files with 3214 additions and 1576 deletions

View File

@@ -0,0 +1,39 @@
{
"features": {
"ghcr.io/devcontainers-extra/features/act": {
"version": "1.0.15",
"resolved": "ghcr.io/devcontainers-extra/features/act@sha256:db4a2194930d1f7ec62822d4f600dd2fa4aff3c33b98cdb0b578b64ffb10924c",
"integrity": "sha256:db4a2194930d1f7ec62822d4f600dd2fa4aff3c33b98cdb0b578b64ffb10924c"
},
"ghcr.io/devcontainers-extra/features/bun": {
"version": "1.1.0",
"resolved": "ghcr.io/devcontainers-extra/features/bun@sha256:0624284ecaead9dd4c6654616a7f939cfa4ebcbc60593700a74e35b1767befa5",
"integrity": "sha256:0624284ecaead9dd4c6654616a7f939cfa4ebcbc60593700a74e35b1767befa5"
},
"ghcr.io/devcontainers/features/common-utils:2": {
"version": "2.5.9",
"resolved": "ghcr.io/devcontainers/features/common-utils@sha256:cb0c4d3c276f157eed17935747e364178d75fee17f55c4e129966f64633deb3a",
"integrity": "sha256:cb0c4d3c276f157eed17935747e364178d75fee17f55c4e129966f64633deb3a"
},
"ghcr.io/devcontainers/features/docker-in-docker:2": {
"version": "2.17.0",
"resolved": "ghcr.io/devcontainers/features/docker-in-docker@sha256:25b9f05705ffba7dbe503230ac76081419306f8c8bc88e0ce78c4ecd99a0c78c",
"integrity": "sha256:25b9f05705ffba7dbe503230ac76081419306f8c8bc88e0ce78c4ecd99a0c78c"
},
"ghcr.io/devcontainers/features/node:1": {
"version": "1.7.1",
"resolved": "ghcr.io/devcontainers/features/node@sha256:8c0de46939b61958041700ee89e3493f3b2e4131a06dc46b4d9423427d06e5f6",
"integrity": "sha256:8c0de46939b61958041700ee89e3493f3b2e4131a06dc46b4d9423427d06e5f6"
},
"ghcr.io/devcontainers/features/rust:1": {
"version": "1.5.0",
"resolved": "ghcr.io/devcontainers/features/rust@sha256:0c55e65f2e3df736e478f26ee4d5ed41bae6b54dac1318c443e31444c8ed283c",
"integrity": "sha256:0c55e65f2e3df736e478f26ee4d5ed41bae6b54dac1318c443e31444c8ed283c"
},
"ghcr.io/guiyomh/features/just:0": {
"version": "0.1.0",
"resolved": "ghcr.io/guiyomh/features/just@sha256:8311dff976bd153a54a879021353a7e149963e580022b25af49c45cfc5f13bec",
"integrity": "sha256:8311dff976bd153a54a879021353a7e149963e580022b25af49c45cfc5f13bec"
}
}
}

View File

@@ -43,7 +43,14 @@
"esbenp.prettier-vscode",
"dbaeumer.vscode-eslint",
"ms-azuretools.vscode-docker",
"nefrob.vscode-just-syntax"
"nefrob.vscode-just-syntax",
"zxh404.vscode-proto3",
"mhutchie.git-graph",
"qwtel.sqlite-viewer",
"streetsidesoftware.code-spell-checker",
"christian-kohler.npm-intellisense",
"christian-kohler.path-intellisense",
"redhat.vscode-yaml"
],
"settings": {
"rust-analyzer.cargo.features": "all",

44
Cargo.lock generated
View File

@@ -1091,6 +1091,20 @@ dependencies = [
"syn 2.0.117",
]
[[package]]
name = "dashmap"
version = "6.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6361d5c062261c78a176addb82d4c821ae42bed6089de0e12603cd25de2059c"
dependencies = [
"cfg-if",
"crossbeam-utils",
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]]
name = "data-encoding"
version = "2.10.0"
@@ -1443,6 +1457,17 @@ version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28dd6caf6059519a65843af8fe2a3ae298b14b80179855aeb4adc2c1934ee619"
[[package]]
name = "fs4"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8640e34b88f7652208ce9e88b1a37a2ae95227d84abec377ccd3c5cfeb141ed4"
dependencies = [
"rustix",
"tokio",
"windows-sys 0.59.0",
]
[[package]]
name = "funty"
version = "2.0.0"
@@ -2558,10 +2583,13 @@ dependencies = [
name = "nxmesh-agent"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"chrono",
"clap",
"config",
"dashmap",
"fs4",
"futures",
"hex",
"hostname",
@@ -2576,6 +2604,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tokio-test",
"tokio-util",
"toml",
"tonic",
"tracing",
@@ -2600,6 +2629,7 @@ dependencies = [
name = "nxmesh-master"
version = "0.1.0"
dependencies = [
"anyhow",
"argon2",
"async-stream",
"async-trait",
@@ -2628,6 +2658,7 @@ dependencies = [
"thiserror",
"time",
"tokio",
"tokio-stream",
"tokio-test",
"toml",
"tonic",
@@ -2672,9 +2703,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.21.3"
version = "1.21.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50"
[[package]]
name = "once_cell_polyfill"
@@ -5380,6 +5411,15 @@ dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "windows-sys"
version = "0.59.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b"
dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "windows-sys"
version = "0.60.2"

View File

@@ -56,9 +56,6 @@ futures = "0.3"
toml = "0.9"
config = "0.15"
# HTTP client
reqwest = { version = "0.13.2", default-features = false, features = ["json"] }
# Crypto
sha2 = "0.10"
hex = "0.4"

View File

@@ -33,6 +33,7 @@ tonic.workspace = true
async-trait.workspace = true
futures.workspace = true
tokio-stream.workspace = true
tokio-util = "0.7"
# Config
config.workspace = true
@@ -56,6 +57,9 @@ zip = { workspace = true }
# CLI
clap = { workspace = true, features = ["derive"] }
anyhow = { version = "1.0.102", features = ["backtrace"] }
fs4 = { version = "0.13.1", features = ["tokio"] }
dashmap = "6.2.1"
[dev-dependencies]
tokio-test.workspace = true

View File

@@ -0,0 +1,119 @@
use clap::Parser;
#[derive(Parser)]
#[command(about = "Import certificates for agent from zip file or separate cert and key files")]
pub struct ImportCertsCommand {
/// Zip file containing ca.pem cert.pem and key.pem
#[arg(value_name = "ZIP_FILE", group = "input_source")]
zip: Option<String>,
/// Certificate name in zip file, required if using zip input
#[arg(
long,
group = "input_source",
requires = "zip",
default_value = "cert.pem",
value_name = "CERT_NAME"
)]
cert_name: Option<String>,
/// Key name in zip file, required if using zip input
#[arg(
long,
group = "input_source",
requires = "zip",
default_value = "key.pem",
value_name = "KEY_NAME"
)]
key_name: Option<String>,
/// CA certificate name in zip file, required if using zip input
#[arg(
long,
group = "input_source",
requires = "zip",
default_value = "ca.pem",
value_name = "CA_NAME"
)]
ca_name: Option<String>,
// Separate cert and key file inputs, required if not using zip input
/// Certificate file path
#[arg(
long,
group = "input_source",
requires = "key",
conflicts_with = "zip",
value_name = "CERT_FILE"
)]
cert: Option<String>,
/// Key file path
#[arg(
long,
group = "input_source",
requires = "cert",
conflicts_with = "zip",
value_name = "KEY_FILE"
)]
key: Option<String>,
/// Master CA certificate file path for verifying master identity, optional if the CA certificate is already trusted by the system
/// This is required if the master server uses a self-signed certificate that is not trusted by the system
#[arg(
long,
group = "input_source",
conflicts_with = "zip",
value_name = "CA_CERT_FILE"
)]
ca_cert: Option<String>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parses_import_certs_with_zip_defaults() {
let parsed = ImportCertsCommand::try_parse_from(["import-certs", "bundle.zip"]);
assert!(parsed.is_ok());
let parsed = parsed.ok();
assert!(parsed.is_some());
let parsed = parsed.unwrap_or_else(|| unreachable!());
assert_eq!(parsed.zip.as_deref(), Some("bundle.zip"));
assert_eq!(parsed.cert_name.as_deref(), Some("cert.pem"));
assert_eq!(parsed.key_name.as_deref(), Some("key.pem"));
assert_eq!(parsed.ca_name.as_deref(), Some("ca.pem"));
assert!(parsed.cert.is_none());
assert!(parsed.key.is_none());
assert!(parsed.ca_cert.is_none());
}
#[test]
fn rejects_import_certs_with_separate_files() {
let parsed = ImportCertsCommand::try_parse_from([
"import-certs",
"--cert",
"agent.crt",
"--key",
"agent.key",
"--ca-cert",
"ca.crt",
]);
assert!(parsed.is_err());
}
#[test]
fn rejects_conflicting_zip_and_separate_inputs() {
let parsed = ImportCertsCommand::try_parse_from([
"import-certs",
"bundle.zip",
"--cert",
"agent.crt",
"--key",
"agent.key",
]);
assert!(parsed.is_err());
}
}

View File

@@ -1,5 +1,7 @@
use clap::{Parser, Subcommand};
pub mod import_certs;
#[derive(Parser)]
#[command(version, about, long_about = None)]
pub struct Cli {
@@ -13,78 +15,14 @@ pub struct Cli {
#[derive(Subcommand)]
pub enum Commands {
#[command(about = "Import certificates for agent from zip file or separate cert and key files")]
ImportCerts {
// Zip file input, mutually exclusive with separate cert and key file inputs
/// Zip file containing ca.pem cert.pem and key.pem
#[arg(value_name = "ZIP_FILE", group = "input_source")]
zip: Option<String>,
/// Certificate name in zip file, required if using zip input
#[arg(
long,
group = "input_source",
requires = "zip",
default_value = "cert.pem",
value_name = "CERT_NAME"
)]
cert_name: Option<String>,
/// Key name in zip file, required if using zip input
#[arg(
long,
group = "input_source",
requires = "zip",
default_value = "key.pem",
value_name = "KEY_NAME"
)]
key_name: Option<String>,
/// CA certificate name in zip file, required if using zip input
#[arg(
long,
group = "input_source",
requires = "zip",
default_value = "ca.pem",
value_name = "CA_NAME"
)]
ca_name: Option<String>,
// Separate cert and key file inputs, required if not using zip input
/// Certificate file path
#[arg(
long,
group = "input_source",
requires = "key",
conflicts_with = "zip",
value_name = "CERT_FILE"
)]
cert: Option<String>,
/// Key file path
#[arg(
long,
group = "input_source",
requires = "cert",
conflicts_with = "zip",
value_name = "KEY_FILE"
)]
key: Option<String>,
/// Master CA certificate file path for verifying master identity, optional if the CA certificate is already trusted by the system
/// This is required if the master server uses a self-signed certificate that is not trusted by the system
#[arg(
long,
group = "input_source",
conflicts_with = "zip",
value_name = "CA_CERT_FILE"
)]
ca_cert: Option<String>,
},
ImportCerts(import_certs::ImportCertsCommand),
}
#[cfg(test)]
mod tests {
use clap::Parser;
use super::{Cli, Commands};
use super::*;
#[test]
fn parses_serve_flag_without_subcommand() {
@@ -98,65 +36,4 @@ mod tests {
assert!(parsed.serve);
assert!(parsed.command.is_none());
}
#[test]
fn parses_import_certs_with_zip_defaults() {
let parsed = Cli::try_parse_from(["nxmesh-agent", "import-certs", "bundle.zip"]);
assert!(parsed.is_ok());
let parsed = parsed.ok();
assert!(parsed.is_some());
let parsed = parsed.unwrap_or_else(|| unreachable!());
match parsed.command {
Some(Commands::ImportCerts {
zip,
cert_name,
key_name,
ca_name,
cert,
key,
ca_cert,
}) => {
assert_eq!(zip.as_deref(), Some("bundle.zip"));
assert_eq!(cert_name.as_deref(), Some("cert.pem"));
assert_eq!(key_name.as_deref(), Some("key.pem"));
assert_eq!(ca_name.as_deref(), Some("ca.pem"));
assert!(cert.is_none());
assert!(key.is_none());
assert!(ca_cert.is_none());
}
_ => unreachable!(),
}
}
#[test]
fn rejects_import_certs_with_separate_files() {
let parsed = Cli::try_parse_from([
"nxmesh-agent",
"import-certs",
"--cert",
"agent.crt",
"--key",
"agent.key",
"--ca-cert",
"ca.crt",
]);
assert!(parsed.is_err());
}
#[test]
fn rejects_conflicting_zip_and_separate_inputs() {
let parsed = Cli::try_parse_from([
"nxmesh-agent",
"import-certs",
"bundle.zip",
"--cert",
"agent.crt",
"--key",
"agent.key",
]);
assert!(parsed.is_err());
}
}

View File

@@ -1,560 +0,0 @@
use config::{Config, ConfigError, Environment, File};
use serde::{Deserialize, Deserializer, Serialize};
use std::{os::unix::fs::PermissionsExt, str::FromStr};
use tracing::level_filters::LevelFilter;
const NGINX_BINARY_PATH_TEMPLATE: &str = "{{nginx_binary_path}}";
const NGINX_DEFAULT_BINARY: &str = "nginx";
type ValidationError = String;
trait Validate {
fn validate(&self) -> Result<(), ValidationError>;
}
/// Agent settings
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Settings {
pub grpc: GrpcSettings,
#[serde(default)]
pub log: LogSettings,
pub nginx: Option<NginxSettings>,
}
/// gRPC client settings
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GrpcSettings {
pub connection_string: String,
pub m_auth: MAuthSettings,
#[serde(default)]
pub cors: Option<CorsSettings>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MAuthSettings {
Tls(TLSSettings),
}
/// TLS certificate settings
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TLSSettings {
RawPath {
ca_path: String,
cert_path: String,
key_path: String,
},
ZipPath {
cert_zip_path: String,
},
}
/// CORS settings
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct CorsSettings {
#[serde(default)]
pub allowed_origins: Vec<String>,
#[serde(default)]
pub allowed_methods: Vec<String>,
#[serde(default)]
pub allowed_headers: Vec<String>,
#[serde(default)]
pub allow_credentials: bool,
}
/// Logging settings
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogSettings {
#[serde(
deserialize_with = "deserialize_level_filter",
serialize_with = "serialize_level_filter"
)]
pub level: LevelFilter,
}
impl Default for LogSettings {
fn default() -> Self {
Self {
level: default_log_level(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NginxSettings {
#[serde(default = "default_nginx_config_path")]
pub nginx_config_path: String,
// #[serde(default = "default_nginx_binary_path")]
#[serde(default)]
pub nginx_binary_path: Option<String>,
// commands
#[serde(default = "default_nginx_reload_command")]
pub override_nginx_reload_command: Vec<String>,
#[serde(default = "default_nginx_test_command")]
pub override_nginx_test_command: Vec<String>,
// timeouts
#[serde(default = "default_nginx_reload_timeout_seconds")]
pub nginx_reload_timeout_seconds: u64,
#[serde(default = "default_nginx_test_timeout_seconds")]
pub nginx_test_timeout_seconds: u64,
}
impl Validate for Settings {
fn validate(&self) -> Result<(), ValidationError> {
self.grpc.validate()?;
if let Some(nginx) = &self.nginx {
nginx.validate()?;
}
Ok(())
}
}
impl Settings {
/// Load settings from config files and environment
pub fn load() -> Result<Self, ConfigError> {
let run_mode = std::env::var("RUN_MODE").unwrap_or_else(|_| "development".into());
let settings = Config::builder()
.add_source(File::with_name("config/default").required(false))
.add_source(File::with_name(&format!("config/{}", run_mode)).required(false))
.add_source(File::with_name("config/agent/default").required(false))
.add_source(File::with_name(&format!("config/agent/{}", run_mode)).required(false))
.add_source(Environment::with_prefix("NXMESH").separator("__"))
.build()?;
let mut settings: Self = settings.try_deserialize()?;
settings.validate().map_err(ConfigError::Message)?;
if let Some(nginx) = &mut settings.nginx {
nginx.validate().map_err(ConfigError::Message)?;
// replace binary path template in commands with actual binary path, if the template is present
nginx
.override_nginx_reload_command
.iter_mut()
.for_each(|cmd| {
*cmd = cmd.replace(
NGINX_BINARY_PATH_TEMPLATE,
&nginx
.nginx_binary_path
.clone()
.unwrap_or_else(|| NGINX_DEFAULT_BINARY.into()),
);
});
nginx
.override_nginx_test_command
.iter_mut()
.for_each(|cmd| {
*cmd = cmd.replace(
NGINX_BINARY_PATH_TEMPLATE,
&nginx
.nginx_binary_path
.clone()
.unwrap_or_else(|| NGINX_DEFAULT_BINARY.into()),
);
});
}
Ok(settings)
}
}
impl Validate for GrpcSettings {
fn validate(&self) -> Result<(), ValidationError> {
if self.connection_string.is_empty() {
return Err("gRPC connection string cannot be empty".into());
}
self.m_auth.validate()?;
if let Some(cors) = &self.cors {
cors.validate()?;
}
Ok(())
}
}
impl Validate for MAuthSettings {
fn validate(&self) -> Result<(), ValidationError> {
match self {
MAuthSettings::Tls(tls_settings) => tls_settings.validate()?,
}
Ok(())
}
}
impl Validate for TLSSettings {
fn validate(&self) -> Result<(), ValidationError> {
match self {
TLSSettings::RawPath {
ca_path,
cert_path,
key_path,
} => {
if !std::path::Path::new(ca_path).exists() {
return Err(format!("CA file not found: {}", ca_path));
}
if !std::path::Path::new(cert_path).exists() {
return Err(format!("Certificate file not found: {}", cert_path));
}
if !std::path::Path::new(key_path).exists() {
return Err(format!("Key file not found: {}", key_path));
}
}
TLSSettings::ZipPath { cert_zip_path } => {
if !std::path::Path::new(cert_zip_path).exists() {
return Err(format!("Certificate zip file not found: {}", cert_zip_path));
}
}
}
Ok(())
}
}
impl Validate for CorsSettings {
fn validate(&self) -> Result<(), ValidationError> {
Ok(())
}
}
impl Validate for NginxSettings {
fn validate(&self) -> Result<(), ValidationError> {
match &self.nginx_binary_path {
Some(path) if path.is_empty() => {
return Err("Nginx binary path cannot be empty".into());
}
Some(path) if !std::path::Path::new(path).exists() => {
return Err(format!("Nginx binary not found: {}", path));
}
Some(path)
if !std::fs::metadata(path)
.map_err(|e| format!("Failed to read nginx binary metadata: {}", e))?
.permissions()
.mode()
& 0o111
!= 0 =>
{
return Err(format!("Nginx binary is not executable: {}", path));
}
_ => {}
}
if self.nginx_config_path.is_empty() {
return Err("Nginx config path cannot be empty".into());
}
if !std::path::Path::new(&self.nginx_config_path).exists() {
return Err(format!(
"Nginx config file not found: {}",
self.nginx_config_path
));
}
// ensure reload and test commands contain the binary path template
if !&self
.override_nginx_reload_command
.join(" ")
.contains(NGINX_BINARY_PATH_TEMPLATE)
{
return Err(format!(
"Nginx reload command must contain the binary path template '{}': {}",
NGINX_BINARY_PATH_TEMPLATE,
self.override_nginx_reload_command.join(" ")
));
}
if !&self
.override_nginx_test_command
.join(" ")
.contains(NGINX_BINARY_PATH_TEMPLATE)
{
return Err(format!(
"Nginx test command must contain the binary path template '{}': {}",
NGINX_BINARY_PATH_TEMPLATE,
self.override_nginx_test_command.join(" ")
));
}
Ok(())
}
}
fn default_log_level() -> LevelFilter {
LevelFilter::INFO
}
fn default_nginx_config_path() -> String {
"/etc/nginx/nginx.conf".into()
}
fn default_nginx_reload_command() -> Vec<String> {
vec![
NGINX_BINARY_PATH_TEMPLATE.to_string(),
"-s".to_string(),
"reload".to_string(),
]
}
fn default_nginx_test_command() -> Vec<String> {
vec![NGINX_BINARY_PATH_TEMPLATE.to_string(), "-t".to_string()]
}
fn default_nginx_reload_timeout_seconds() -> u64 {
30
}
fn default_nginx_test_timeout_seconds() -> u64 {
30
}
fn deserialize_level_filter<'de, D>(deserializer: D) -> Result<LevelFilter, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
LevelFilter::from_str(&s).map_err(serde::de::Error::custom)
}
fn serialize_level_filter<S>(level: &LevelFilter, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&level.to_string())
}
#[cfg(test)]
mod tests {
use std::{
fs,
os::unix::fs::PermissionsExt,
path::{Path, PathBuf},
};
use tempfile::TempDir;
use super::*;
#[test]
fn test_esnure_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<Settings>();
assert_send_sync::<GrpcSettings>();
assert_send_sync::<TLSSettings>();
assert_send_sync::<CorsSettings>();
assert_send_sync::<LogSettings>();
assert_send_sync::<NginxSettings>();
}
fn write_file(path: &Path) {
let result = fs::write(path, b"content");
assert!(result.is_ok());
}
fn create_exec_file(path: &Path) {
write_file(path);
let metadata = fs::metadata(path);
assert!(metadata.is_ok());
let metadata = metadata.ok();
assert!(metadata.is_some());
let metadata = metadata.unwrap_or_else(|| unreachable!());
let mut perms = metadata.permissions();
perms.set_mode(0o755);
let result = fs::set_permissions(path, perms);
assert!(result.is_ok());
}
fn create_non_exec_file(path: &Path) {
write_file(path);
let metadata = fs::metadata(path);
assert!(metadata.is_ok());
let metadata = metadata.ok();
assert!(metadata.is_some());
let metadata = metadata.unwrap_or_else(|| unreachable!());
let mut perms = metadata.permissions();
perms.set_mode(0o644);
let result = fs::set_permissions(path, perms);
assert!(result.is_ok());
}
fn valid_tls_raw_paths(temp_dir: &TempDir) -> (PathBuf, PathBuf, PathBuf) {
let ca_path = temp_dir.path().join("ca.pem");
let cert_path = temp_dir.path().join("cert.pem");
let key_path = temp_dir.path().join("key.pem");
write_file(&ca_path);
write_file(&cert_path);
write_file(&key_path);
(ca_path, cert_path, key_path)
}
#[test]
fn tls_raw_path_validate_succeeds_when_all_files_exist() {
let temp_dir = TempDir::new();
assert!(temp_dir.is_ok());
let temp_dir = temp_dir.ok();
assert!(temp_dir.is_some());
let temp_dir = temp_dir.unwrap_or_else(|| unreachable!());
let (ca_path, cert_path, key_path) = valid_tls_raw_paths(&temp_dir);
let settings = TLSSettings::RawPath {
ca_path: ca_path.to_string_lossy().to_string(),
cert_path: cert_path.to_string_lossy().to_string(),
key_path: key_path.to_string_lossy().to_string(),
};
assert!(settings.validate().is_ok());
}
#[test]
fn tls_raw_path_validate_fails_when_ca_missing() {
let settings = TLSSettings::RawPath {
ca_path: "/tmp/does-not-exist-ca.pem".into(),
cert_path: "/tmp/does-not-exist-cert.pem".into(),
key_path: "/tmp/does-not-exist-key.pem".into(),
};
let result = settings.validate();
assert!(result.is_err());
let msg = result.err().unwrap_or_else(|| unreachable!());
assert!(msg.contains("CA file not found"));
}
#[test]
fn tls_zip_path_validate_fails_when_zip_missing() {
let settings = TLSSettings::ZipPath {
cert_zip_path: "/tmp/missing-certs.zip".into(),
};
let result = settings.validate();
assert!(result.is_err());
let msg = result.err().unwrap_or_else(|| unreachable!());
assert!(msg.contains("Certificate zip file not found"));
}
#[test]
fn grpc_validate_fails_when_connection_string_empty() {
let settings = GrpcSettings {
connection_string: "".into(),
m_auth: MAuthSettings::Tls(TLSSettings::ZipPath {
cert_zip_path: "/tmp/does-not-exist.zip".into(),
}),
cors: None,
};
let result = settings.validate();
assert!(result.is_err());
let msg = result.err().unwrap_or_else(|| unreachable!());
assert!(msg.contains("gRPC connection string cannot be empty"));
}
#[test]
fn nginx_validate_succeeds_for_valid_paths_and_commands() {
let temp_dir = TempDir::new();
assert!(temp_dir.is_ok());
let temp_dir = temp_dir.ok();
assert!(temp_dir.is_some());
let temp_dir = temp_dir.unwrap_or_else(|| unreachable!());
let nginx_binary = temp_dir.path().join("nginx");
let nginx_config = temp_dir.path().join("nginx.conf");
create_exec_file(&nginx_binary);
write_file(&nginx_config);
let nginx = NginxSettings {
nginx_config_path: nginx_config.to_string_lossy().to_string(),
nginx_binary_path: Some(nginx_binary.to_string_lossy().to_string()),
override_nginx_reload_command: default_nginx_reload_command(),
override_nginx_test_command: default_nginx_test_command(),
nginx_reload_timeout_seconds: 30,
nginx_test_timeout_seconds: 30,
};
assert!(nginx.validate().is_ok());
}
#[test]
fn nginx_validate_fails_for_non_executable_binary() {
let temp_dir = TempDir::new();
assert!(temp_dir.is_ok());
let temp_dir = temp_dir.ok();
assert!(temp_dir.is_some());
let temp_dir = temp_dir.unwrap_or_else(|| unreachable!());
let nginx_binary = temp_dir.path().join("nginx");
let nginx_config = temp_dir.path().join("nginx.conf");
create_non_exec_file(&nginx_binary);
write_file(&nginx_config);
let nginx = NginxSettings {
nginx_config_path: nginx_config.to_string_lossy().to_string(),
nginx_binary_path: Some(nginx_binary.to_string_lossy().to_string()),
override_nginx_reload_command: default_nginx_reload_command(),
override_nginx_test_command: default_nginx_test_command(),
nginx_reload_timeout_seconds: 30,
nginx_test_timeout_seconds: 30,
};
let result = nginx.validate();
assert!(result.is_err());
let msg = result.err().unwrap_or_else(|| unreachable!());
assert!(msg.contains("Nginx binary is not executable"));
}
#[test]
fn nginx_validate_fails_when_reload_command_lacks_template() {
let temp_dir = TempDir::new();
assert!(temp_dir.is_ok());
let temp_dir = temp_dir.ok();
assert!(temp_dir.is_some());
let temp_dir = temp_dir.unwrap_or_else(|| unreachable!());
let nginx_binary = temp_dir.path().join("nginx");
let nginx_config = temp_dir.path().join("nginx.conf");
create_exec_file(&nginx_binary);
write_file(&nginx_config);
let nginx = NginxSettings {
nginx_config_path: nginx_config.to_string_lossy().to_string(),
nginx_binary_path: Some(nginx_binary.to_string_lossy().to_string()),
override_nginx_reload_command: vec!["nginx".into(), "-s".into(), "reload".into()],
override_nginx_test_command: default_nginx_test_command(),
nginx_reload_timeout_seconds: 30,
nginx_test_timeout_seconds: 30,
};
let result = nginx.validate();
assert!(result.is_err());
let msg = result.err().unwrap_or_else(|| unreachable!());
assert!(msg.contains("Nginx reload command must contain the binary path template"));
}
#[test]
fn level_filter_round_trip_serialization() {
#[derive(Serialize, Deserialize)]
struct Wrapper {
#[serde(
deserialize_with = "deserialize_level_filter",
serialize_with = "serialize_level_filter"
)]
level: LevelFilter,
}
let original = Wrapper {
level: LevelFilter::DEBUG,
};
let encoded = serde_json::to_string(&original);
assert!(encoded.is_ok());
let encoded = encoded.ok();
assert!(encoded.is_some());
let encoded = encoded.unwrap_or_else(|| unreachable!());
assert!(encoded.to_lowercase().contains("debug"));
let decoded = serde_json::from_str::<Wrapper>(&encoded);
assert!(decoded.is_ok());
let decoded = decoded.ok();
assert!(decoded.is_some());
let decoded = decoded.unwrap_or_else(|| unreachable!());
assert_eq!(decoded.level, LevelFilter::DEBUG);
}
}

View File

@@ -0,0 +1,166 @@
use serde::{Deserialize, Serialize};
use crate::config::settings::{Validate, ValidationError};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MAuthSettings {
Tls(TLSSettings),
}
/// TLS certificate settings
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TLSSettings {
RawPath {
ca_path: String,
cert_path: String,
key_path: String,
},
ZipPath {
cert_zip_path: String,
},
}
impl Validate for MAuthSettings {
fn validate(&self) -> Result<(), ValidationError> {
match self {
MAuthSettings::Tls(tls_settings) => tls_settings.validate()?,
}
Ok(())
}
}
impl Validate for TLSSettings {
fn validate(&self) -> Result<(), ValidationError> {
match self {
TLSSettings::RawPath {
ca_path,
cert_path,
key_path,
} => {
if !std::path::Path::new(ca_path).exists() {
return Err(format!("CA file not found: {}", ca_path));
}
if !std::path::Path::new(cert_path).exists() {
return Err(format!("Certificate file not found: {}", cert_path));
}
if !std::path::Path::new(key_path).exists() {
return Err(format!("Key file not found: {}", key_path));
}
}
TLSSettings::ZipPath { cert_zip_path } => {
if !std::path::Path::new(cert_zip_path).exists() {
return Err(format!("Certificate zip file not found: {}", cert_zip_path));
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::{
fs,
os::unix::fs::PermissionsExt,
path::{Path, PathBuf},
};
use tempfile::TempDir;
use super::*;
#[test]
fn test_esnure_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<TLSSettings>();
}
fn write_file(path: &Path) {
let result = fs::write(path, b"content");
assert!(result.is_ok());
}
fn create_exec_file(path: &Path) {
write_file(path);
let metadata = fs::metadata(path);
assert!(metadata.is_ok());
let metadata = metadata.ok();
assert!(metadata.is_some());
let metadata = metadata.unwrap_or_else(|| unreachable!());
let mut perms = metadata.permissions();
perms.set_mode(0o755);
let result = fs::set_permissions(path, perms);
assert!(result.is_ok());
}
fn create_non_exec_file(path: &Path) {
write_file(path);
let metadata = fs::metadata(path);
assert!(metadata.is_ok());
let metadata = metadata.ok();
assert!(metadata.is_some());
let metadata = metadata.unwrap_or_else(|| unreachable!());
let mut perms = metadata.permissions();
perms.set_mode(0o644);
let result = fs::set_permissions(path, perms);
assert!(result.is_ok());
}
fn valid_tls_raw_paths(temp_dir: &TempDir) -> (PathBuf, PathBuf, PathBuf) {
let ca_path = temp_dir.path().join("ca.pem");
let cert_path = temp_dir.path().join("cert.pem");
let key_path = temp_dir.path().join("key.pem");
write_file(&ca_path);
write_file(&cert_path);
write_file(&key_path);
(ca_path, cert_path, key_path)
}
#[test]
fn tls_raw_path_validate_succeeds_when_all_files_exist() {
let temp_dir = TempDir::new();
assert!(temp_dir.is_ok());
let temp_dir = temp_dir.ok();
assert!(temp_dir.is_some());
let temp_dir = temp_dir.unwrap_or_else(|| unreachable!());
let (ca_path, cert_path, key_path) = valid_tls_raw_paths(&temp_dir);
let settings = TLSSettings::RawPath {
ca_path: ca_path.to_string_lossy().to_string(),
cert_path: cert_path.to_string_lossy().to_string(),
key_path: key_path.to_string_lossy().to_string(),
};
assert!(settings.validate().is_ok());
}
#[test]
fn tls_raw_path_validate_fails_when_ca_missing() {
let settings = TLSSettings::RawPath {
ca_path: "/tmp/does-not-exist-ca.pem".into(),
cert_path: "/tmp/does-not-exist-cert.pem".into(),
key_path: "/tmp/does-not-exist-key.pem".into(),
};
let result = settings.validate();
assert!(result.is_err());
let msg = result.err().unwrap_or_else(|| unreachable!());
assert!(msg.contains("CA file not found"));
}
#[test]
fn tls_zip_path_validate_fails_when_zip_missing() {
let settings = TLSSettings::ZipPath {
cert_zip_path: "/tmp/missing-certs.zip".into(),
};
let result = settings.validate();
assert!(result.is_err());
let msg = result.err().unwrap_or_else(|| unreachable!());
assert!(msg.contains("Certificate zip file not found"));
}
}

View File

@@ -0,0 +1,34 @@
use serde::{Deserialize, Serialize};
use crate::config::settings::{Validate, ValidationError};
/// CORS settings
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct CorsSettings {
#[serde(default)]
pub allowed_origins: Vec<String>,
#[serde(default)]
pub allowed_methods: Vec<String>,
#[serde(default)]
pub allowed_headers: Vec<String>,
#[serde(default)]
pub allow_credentials: bool,
}
impl Validate for CorsSettings {
fn validate(&self) -> Result<(), ValidationError> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_esnure_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<CorsSettings>();
}
}

View File

@@ -0,0 +1,56 @@
use serde::{Deserialize, Serialize};
use super::super::settings::{Validate, ValidationError};
use super::{auth::MAuthSettings, cors::CorsSettings};
/// gRPC client settings
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GrpcSettings {
pub connection_string: String,
pub m_auth: MAuthSettings,
#[serde(default)]
pub cors: Option<CorsSettings>,
}
impl Validate for GrpcSettings {
fn validate(&self) -> Result<(), ValidationError> {
if self.connection_string.is_empty() {
return Err("gRPC connection string cannot be empty".into());
}
self.m_auth.validate()?;
if let Some(cors) = &self.cors {
cors.validate()?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::config::settings::TLSSettings;
use super::*;
#[test]
fn test_esnure_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<GrpcSettings>();
}
#[test]
fn grpc_validate_fails_when_connection_string_empty() {
let settings = GrpcSettings {
connection_string: "".into(),
m_auth: MAuthSettings::Tls(TLSSettings::ZipPath {
cert_zip_path: "/tmp/does-not-exist.zip".into(),
}),
cors: None,
};
let result = settings.validate();
assert!(result.is_err());
let msg = result.err().unwrap_or_else(|| unreachable!());
assert!(msg.contains("gRPC connection string cannot be empty"));
}
}

View File

@@ -0,0 +1,82 @@
use std::str::FromStr;
use serde::{Deserialize, Deserializer, Serialize};
use tracing::level_filters::LevelFilter;
/// Logging settings
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogSettings {
#[serde(
deserialize_with = "deserialize_level_filter",
serialize_with = "serialize_level_filter"
)]
pub level: LevelFilter,
}
impl Default for LogSettings {
fn default() -> Self {
Self {
level: default_log_level(),
}
}
}
fn default_log_level() -> LevelFilter {
LevelFilter::INFO
}
fn deserialize_level_filter<'de, D>(deserializer: D) -> Result<LevelFilter, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
LevelFilter::from_str(&s).map_err(serde::de::Error::custom)
}
fn serialize_level_filter<S>(level: &LevelFilter, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&level.to_string())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_esnure_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<LogSettings>();
}
#[test]
fn level_filter_round_trip_serialization() {
#[derive(Serialize, Deserialize)]
struct Wrapper {
#[serde(
deserialize_with = "deserialize_level_filter",
serialize_with = "serialize_level_filter"
)]
level: LevelFilter,
}
let original = Wrapper {
level: LevelFilter::DEBUG,
};
let encoded = serde_json::to_string(&original);
assert!(encoded.is_ok());
let encoded = encoded.ok();
assert!(encoded.is_some());
let encoded = encoded.unwrap_or_else(|| unreachable!());
assert!(encoded.to_lowercase().contains("debug"));
let decoded = serde_json::from_str::<Wrapper>(&encoded);
assert!(decoded.is_ok());
let decoded = decoded.ok();
assert!(decoded.is_some());
let decoded = decoded.unwrap_or_else(|| unreachable!());
assert_eq!(decoded.level, LevelFilter::DEBUG);
}
}

View File

@@ -0,0 +1,76 @@
use config::{Config, ConfigError, Environment, File};
use serde::{Deserialize, Serialize};
mod auth;
mod cors;
mod grpc;
mod log;
mod nginx;
pub use auth::*;
pub use cors::*;
pub use grpc::*;
pub use log::*;
pub use nginx::*;
pub type ValidationError = String;
pub trait Validate {
fn validate(&self) -> Result<(), ValidationError>;
}
/// Agent settings
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Settings {
pub agent_id: String,
pub grpc: GrpcSettings,
#[serde(default)]
pub log: LogSettings,
#[serde(default)]
pub nginx: NginxSettings,
}
impl Validate for Settings {
fn validate(&self) -> Result<(), ValidationError> {
self.grpc.validate()?;
self.nginx.validate()?;
Ok(())
}
}
impl Settings {
/// Load settings from config files and environment
pub fn load() -> Result<Self, ConfigError> {
let run_mode = std::env::var("RUN_MODE").unwrap_or_else(|_| "development".into());
let settings = Config::builder()
.add_source(File::with_name("config/default").required(false))
.add_source(File::with_name(&format!("config/{}", run_mode)).required(false))
.add_source(File::with_name("config/agent/default").required(false))
.add_source(File::with_name(&format!("config/agent/{}", run_mode)).required(false))
.add_source(Environment::with_prefix("NXMESH").separator("__"))
.build()?;
let mut settings: Self = settings.try_deserialize()?;
settings.validate().map_err(ConfigError::Message)?;
settings.nginx.validate().map_err(ConfigError::Message)?;
settings.nginx.transform_commands();
Ok(settings)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_ensure_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<Settings>();
}
}

View File

@@ -0,0 +1,280 @@
use std::os::unix::fs::PermissionsExt;
use serde::{Deserialize, Serialize};
use crate::config::settings::{Validate, ValidationError};
const NGINX_BINARY_PATH_TEMPLATE: &str = "{{nginx_binary_path}}";
const NGINX_DEFAULT_BINARY: &str = "nginx";
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct NginxSettings {
#[serde(default = "default_nginx_config_path")]
pub nginx_config_path: String,
// #[serde(default = "default_nginx_binary_path")]
#[serde(default)]
pub nginx_binary_path: Option<String>,
// commands
#[serde(default = "default_nginx_reload_command")]
pub override_nginx_reload_command: Vec<String>,
#[serde(default = "default_nginx_test_command")]
pub override_nginx_test_command: Vec<String>,
// timeouts
#[serde(default = "default_nginx_reload_timeout_seconds")]
pub nginx_reload_timeout_seconds: u64,
#[serde(default = "default_nginx_test_timeout_seconds")]
pub nginx_test_timeout_seconds: u64,
}
impl NginxSettings {
/// Transforms the reload and test commands by replacing the binary path template with the actual binary path if provided.
/// This MUST be called after validation to ensure the binary path is valid and the commands contain the template.
pub fn transform_commands(&mut self) {
self.override_nginx_reload_command = self.transformed_reload_command();
self.override_nginx_test_command = self.transformed_test_command();
}
fn transformed_reload_command(&self) -> Vec<String> {
self.override_nginx_reload_command
.iter()
.map(|cmd| {
cmd.replace(
NGINX_BINARY_PATH_TEMPLATE,
&self
.nginx_binary_path
.clone()
.unwrap_or_else(|| NGINX_DEFAULT_BINARY.into()),
)
})
.collect()
}
fn transformed_test_command(&self) -> Vec<String> {
self.override_nginx_test_command
.iter()
.map(|cmd| {
cmd.replace(
NGINX_BINARY_PATH_TEMPLATE,
&self
.nginx_binary_path
.clone()
.unwrap_or_else(|| NGINX_DEFAULT_BINARY.into()),
)
})
.collect()
}
}
impl Validate for NginxSettings {
fn validate(&self) -> Result<(), ValidationError> {
match &self.nginx_binary_path {
Some(path) if path.is_empty() => {
return Err("Nginx binary path cannot be empty".into());
}
Some(path) if !std::path::Path::new(path).exists() => {
return Err(format!("Nginx binary not found: {}", path));
}
Some(path)
if !std::fs::metadata(path)
.map_err(|e| format!("Failed to read nginx binary metadata: {}", e))?
.permissions()
.mode()
& 0o111
!= 0 =>
{
return Err(format!("Nginx binary is not executable: {}", path));
}
_ => {}
}
if self.nginx_config_path.is_empty() {
return Err("Nginx config path cannot be empty".into());
}
if !std::path::Path::new(&self.nginx_config_path).exists() {
return Err(format!(
"Nginx config file not found: {}",
self.nginx_config_path
));
}
// ensure reload and test commands contain the binary path template
if !&self
.override_nginx_reload_command
.join(" ")
.contains(NGINX_BINARY_PATH_TEMPLATE)
{
return Err(format!(
"Nginx reload command must contain the binary path template '{}': {}",
NGINX_BINARY_PATH_TEMPLATE,
self.override_nginx_reload_command.join(" ")
));
}
if !&self
.override_nginx_test_command
.join(" ")
.contains(NGINX_BINARY_PATH_TEMPLATE)
{
return Err(format!(
"Nginx test command must contain the binary path template '{}': {}",
NGINX_BINARY_PATH_TEMPLATE,
self.override_nginx_test_command.join(" ")
));
}
Ok(())
}
}
fn default_nginx_config_path() -> String {
"/etc/nginx/nginx.conf".into()
}
fn default_nginx_reload_command() -> Vec<String> {
vec![
NGINX_BINARY_PATH_TEMPLATE.to_string(),
"-s".to_string(),
"reload".to_string(),
]
}
fn default_nginx_test_command() -> Vec<String> {
vec![NGINX_BINARY_PATH_TEMPLATE.to_string(), "-t".to_string()]
}
fn default_nginx_reload_timeout_seconds() -> u64 {
30
}
fn default_nginx_test_timeout_seconds() -> u64 {
30
}
#[cfg(test)]
mod tests {
use std::{fs, os::unix::fs::PermissionsExt, path::Path};
use tempfile::TempDir;
use super::*;
#[test]
fn test_esnure_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<NginxSettings>();
}
fn write_file(path: &Path) {
let result = fs::write(path, b"content");
assert!(result.is_ok());
}
fn create_exec_file(path: &Path) {
write_file(path);
let metadata = fs::metadata(path);
assert!(metadata.is_ok());
let metadata = metadata.ok();
assert!(metadata.is_some());
let metadata = metadata.unwrap_or_else(|| unreachable!());
let mut perms = metadata.permissions();
perms.set_mode(0o755);
let result = fs::set_permissions(path, perms);
assert!(result.is_ok());
}
fn create_non_exec_file(path: &Path) {
write_file(path);
let metadata = fs::metadata(path);
assert!(metadata.is_ok());
let metadata = metadata.ok();
assert!(metadata.is_some());
let metadata = metadata.unwrap_or_else(|| unreachable!());
let mut perms = metadata.permissions();
perms.set_mode(0o644);
let result = fs::set_permissions(path, perms);
assert!(result.is_ok());
}
#[test]
fn nginx_validate_succeeds_for_valid_paths_and_commands() {
let temp_dir = TempDir::new();
assert!(temp_dir.is_ok());
let temp_dir = temp_dir.ok();
assert!(temp_dir.is_some());
let temp_dir = temp_dir.unwrap_or_else(|| unreachable!());
let nginx_binary = temp_dir.path().join("nginx");
let nginx_config = temp_dir.path().join("nginx.conf");
create_exec_file(&nginx_binary);
write_file(&nginx_config);
let nginx = NginxSettings {
nginx_config_path: nginx_config.to_string_lossy().to_string(),
nginx_binary_path: Some(nginx_binary.to_string_lossy().to_string()),
override_nginx_reload_command: default_nginx_reload_command(),
override_nginx_test_command: default_nginx_test_command(),
nginx_reload_timeout_seconds: 30,
nginx_test_timeout_seconds: 30,
};
assert!(nginx.validate().is_ok());
}
#[test]
fn nginx_validate_fails_for_non_executable_binary() {
let temp_dir = TempDir::new();
assert!(temp_dir.is_ok());
let temp_dir = temp_dir.ok();
assert!(temp_dir.is_some());
let temp_dir = temp_dir.unwrap_or_else(|| unreachable!());
let nginx_binary = temp_dir.path().join("nginx");
let nginx_config = temp_dir.path().join("nginx.conf");
create_non_exec_file(&nginx_binary);
write_file(&nginx_config);
let nginx = NginxSettings {
nginx_config_path: nginx_config.to_string_lossy().to_string(),
nginx_binary_path: Some(nginx_binary.to_string_lossy().to_string()),
override_nginx_reload_command: default_nginx_reload_command(),
override_nginx_test_command: default_nginx_test_command(),
nginx_reload_timeout_seconds: 30,
nginx_test_timeout_seconds: 30,
};
let result = nginx.validate();
assert!(result.is_err());
let msg = result.err().unwrap_or_else(|| unreachable!());
assert!(msg.contains("Nginx binary is not executable"));
}
#[test]
fn nginx_validate_fails_when_reload_command_lacks_template() {
let temp_dir = TempDir::new();
assert!(temp_dir.is_ok());
let temp_dir = temp_dir.ok();
assert!(temp_dir.is_some());
let temp_dir = temp_dir.unwrap_or_else(|| unreachable!());
let nginx_binary = temp_dir.path().join("nginx");
let nginx_config = temp_dir.path().join("nginx.conf");
create_exec_file(&nginx_binary);
write_file(&nginx_config);
let nginx = NginxSettings {
nginx_config_path: nginx_config.to_string_lossy().to_string(),
nginx_binary_path: Some(nginx_binary.to_string_lossy().to_string()),
override_nginx_reload_command: vec!["nginx".into(), "-s".into(), "reload".into()],
override_nginx_test_command: default_nginx_test_command(),
nginx_reload_timeout_seconds: 30,
nginx_test_timeout_seconds: 30,
};
let result = nginx.validate();
assert!(result.is_err());
let msg = result.err().unwrap_or_else(|| unreachable!());
assert!(msg.contains("Nginx reload command must contain the binary path template"));
}
}

View File

@@ -13,7 +13,7 @@ pub trait MasterConnectorTrait: Send + Sync {
&mut self,
settings: &crate::config::settings::Settings,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
fn get_client(&self) -> Arc<Mutex<AgentClient>>;
fn get_client(&self) -> AgentClient;
}
pub struct MasterConnector {
@@ -35,7 +35,7 @@ impl MasterConnectorTrait for MasterConnector {
self.connector.connect(settings).await
}
fn get_client(&self) -> Arc<Mutex<AgentClient>> {
fn get_client(&self) -> AgentClient {
self.connector.get_client()
}
}
@@ -58,7 +58,7 @@ mod tests {
struct FakeConnector {
called: Arc<AtomicBool>,
fail: bool,
client: Arc<Mutex<AgentClient>>,
client: AgentClient,
}
#[async_trait::async_trait]
@@ -74,13 +74,14 @@ mod tests {
Ok(())
}
fn get_client(&self) -> Arc<Mutex<AgentClient>> {
fn get_client(&self) -> AgentClient {
self.client.clone()
}
}
fn test_settings() -> Settings {
Settings {
agent_id: "test-agent".to_string(),
grpc: GrpcSettings {
connection_string: "https://localhost:50051".to_string(),
m_auth: MAuthSettings::Tls(TLSSettings::ZipPath {
@@ -89,14 +90,14 @@ mod tests {
cors: None,
},
log: LogSettings::default(),
nginx: None,
nginx: Default::default(),
}
}
fn test_client() -> Arc<Mutex<AgentClient>> {
fn test_client() -> AgentClient {
let channel =
tonic::transport::Channel::from_static("http://127.0.0.1:50051").connect_lazy();
Arc::new(Mutex::new(AgentClient::new(channel)))
AgentClient::new(channel)
}
#[tokio::test]
@@ -126,18 +127,4 @@ mod tests {
let result = master.connect(&test_settings()).await;
assert!(result.is_err());
}
#[tokio::test]
async fn master_connector_returns_underlying_client() {
let shared_client = test_client();
let fake = FakeConnector {
called: Arc::new(AtomicBool::new(false)),
fail: false,
client: shared_client.clone(),
};
let master = MasterConnector::new(Box::new(fake));
let client = master.get_client();
assert!(Arc::ptr_eq(&client, &shared_client));
}
}

View File

@@ -1,6 +1,6 @@
use std::{fs::File, io::Read, sync::Arc};
use std::{fs::File, io::Read};
use tokio::{fs::read, sync::Mutex};
use tokio::fs::read;
use nxmesh_proto::agent_service_client::AgentServiceClient;
use tonic::transport::{Certificate, ClientTlsConfig, Identity};
@@ -11,7 +11,7 @@ use crate::config::settings::{MAuthSettings, TLSSettings};
use super::{AgentClient, MasterConnectorTrait};
pub struct SshMasterConnector {
client: Arc<Mutex<AgentClient>>,
client: AgentClient,
}
impl SshMasterConnector {
@@ -34,7 +34,7 @@ impl SshMasterConnector {
.connect_lazy();
// Create the gRPC client
let client = Arc::new(Mutex::new(AgentServiceClient::new(endpoint)));
let client = AgentServiceClient::new(endpoint);
Ok(Self { client })
}
@@ -126,7 +126,7 @@ impl MasterConnectorTrait for SshMasterConnector {
Ok(())
}
fn get_client(&self) -> Arc<Mutex<AgentClient>> {
fn get_client(&self) -> AgentClient {
self.client.clone()
}
}

View File

@@ -1,7 +1,8 @@
#![recursion_limit = "128"]
#![forbid(unsafe_code)]
#![deny(clippy::unwrap_used, clippy::panic, clippy::expect_used)]
use std::process::exit;
use std::{process::exit, sync::Arc};
use tracing::{error, info};
use tracing_subscriber::{
@@ -9,11 +10,12 @@ use tracing_subscriber::{
util::SubscriberInitExt,
};
use crate::connector::master::{MasterConnector, MasterConnectorTrait, ssh::SshMasterConnector};
use crate::service::get_services;
mod cli;
mod config;
mod connector;
mod service;
#[tokio::main]
async fn main() {
@@ -43,38 +45,26 @@ async fn main() {
// info!("Loaded settings: {:#?}", settings);
info!("Starting NxMesh Agent...");
// install grpc client
#[expect(clippy::expect_used)]
let ssh_connector = SshMasterConnector::new(settings.grpc.clone())
let services = get_services(Arc::new(settings))
.await
.inspect_err(|e| {
error!("Failed to create SSH Master Connector: {}", e);
exit(1);
.map_err(|e| {
error!("Failed to initialize services: {}", e);
e
})
.expect("Failed to create SSH Master Connector");
let mut master_connector = MasterConnector::new(Box::new(ssh_connector));
.unwrap_or_else(|_| {
std::process::exit(1);
});
if let Err(e) = master_connector.connect(&settings).await {
error!("Failed to connect to master: {}", e);
exit(1);
let master_handler = services.master_handler.clone();
// spawn the long-running handler so main can wait for shutdown signal
tokio::spawn(async move {
if let Err(e) = master_handler.start_handle_master_message().await {
error!("Master message handler exited with error: {:?}", e);
}
});
// send a dummy heartbeat to verify the connection is working
let client = master_connector.get_client();
let request = nxmesh_proto::HealthReport {
..Default::default()
};
match client.lock().await.report_health(request).await {
Ok(_) => info!("Successfully sent health report to master."),
Err(e) => {
error!("Failed to send health report to master: {}", e);
exit(1);
}
}
info!("Successfully connected to master. Agent is running.");
info!("Agent is running. Waiting for shutdown signal.");
shutdown_handler(services.master_handler.clone()).await;
}
fn install_tracing_subscriber()
@@ -91,3 +81,20 @@ fn install_tracing_subscriber()
reload_handle
}
async fn wait_for_shutdown_signal() {
#[expect(clippy::expect_used)]
tokio::signal::ctrl_c()
.await
.expect("Failed to listen for ctrl_c");
info!("Shutdown signal received, stopping handler.");
}
async fn shutdown_handler(master_handler: Arc<dyn service::master_handler::MasterHandler>) {
wait_for_shutdown_signal().await;
//
let _ = master_handler.stop_handle_master_message().await;
//
info!("Agent stopped.");
exit(0);
}

View File

@@ -1,38 +0,0 @@
use std::sync::Arc;
use nxmesh_proto::ConfigUpdate;
use tracing::info;
use crate::connector::master::MasterConnector;
#[async_trait::async_trait]
pub trait MasterHandler {
async fn on_config_update(
&self,
config_info: ConfigUpdate,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
}
pub struct MasterHandlerImpl {
settings: Arc<crate::config::settings::Settings>,
}
impl MasterHandlerImpl {
pub fn new(settings: impl Into<Arc<crate::config::settings::Settings>>) -> Self {
Self {
settings: settings.into(),
}
}
}
#[async_trait::async_trait]
impl MasterHandler for MasterHandlerImpl {
async fn on_config_update(
&self,
config_info: ConfigUpdate,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!("Received config update from master: {:?}", config_info);
Ok(())
}
}

View File

@@ -0,0 +1,106 @@
use std::sync::{Arc, Weak};
use nxmesh_proto::{
AgentMessage, ConfigUpdate, MasterMessage, command::Command, master_message::Payload,
};
use crate::service::master_handler::{MasterHandlerError, MessageResult};
#[async_trait::async_trait]
pub trait MasterMessageHandler: Send + Sync + 'static {
async fn handle_master_message(
&self,
agent_id: &str,
message: MasterMessage,
) -> MessageResult<()>;
}
#[async_trait::async_trait]
pub trait OnConfigUpdateHandler: Send + Sync + 'static {
// Handle the config update message from master, write the config content to files, validate the new config and reload nginx
async fn on_config_update(
&self,
agent_id: &str,
timestamp: i64,
message_id: &str,
config_info: ConfigUpdate,
) -> MessageResult<()>;
}
#[async_trait::async_trait]
pub trait OnCommandHandler: Send + Sync + 'static {
// Handle the command message from master, execute the command and return the result
async fn on_command(
&self,
agent_id: &str,
timestamp: i64,
message_id: &str,
command: Command,
) -> MessageResult<()>;
}
pub struct HandlerImpl<OCUH, OCH>
where
OCUH: OnConfigUpdateHandler + ?Sized,
OCH: OnCommandHandler + ?Sized,
{
on_config_update_handler: Weak<OCUH>,
on_command_handler: Weak<OCH>,
}
impl<OCUH, OCH> HandlerImpl<OCUH, OCH>
where
OCUH: OnConfigUpdateHandler + ?Sized,
OCH: OnCommandHandler + ?Sized,
{
pub fn new(on_config_update_handler: Weak<OCUH>, on_command_handler: Weak<OCH>) -> Self {
Self {
on_config_update_handler,
on_command_handler,
}
}
}
#[async_trait::async_trait]
impl<OCUH, OCH> MasterMessageHandler for HandlerImpl<OCUH, OCH>
where
OCUH: OnConfigUpdateHandler + ?Sized,
OCH: OnCommandHandler + ?Sized,
{
async fn handle_master_message(
&self,
agent_id: &str,
message: MasterMessage,
) -> MessageResult<()> {
match message.payload {
Some(Payload::ConfigUpdate(config_info)) => {
let on_config_update_handler =
self.on_config_update_handler.upgrade().ok_or_else(|| {
MasterHandlerError::MessageHandlingError(
"Failed to upgrade weak reference to config update handler".to_string(),
)
})?;
on_config_update_handler
.on_config_update(
agent_id,
message.timestamp,
&message.message_id,
config_info,
)
.await
}
Some(_) => {
// We should never receive other types of messages from the master, but we should handle it anyway
Err(MasterHandlerError::MessageHandlingError(
"Received unsupported master message type".to_string(),
))
}
None => {
// This should never happen as the master should always send a valid message, but we should handle it anyway
return Err(MasterHandlerError::MessageHandlingError(
"Received master message with empty payload".to_string(),
));
}
}
}
}

View File

@@ -0,0 +1,224 @@
use std::sync::Arc;
use nxmesh_proto::AgentMessage;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use crate::{
connector::master::{MasterConnector, MasterConnectorTrait},
service::master_handler::handlers::MasterMessageHandler,
};
pub mod handlers;
#[derive(Debug)]
pub enum MasterHandlerError {
ConnectionError(String),
// TODO: should be protobuf error to transmit the error to master
MessageHandlingError(String),
RetryLimitExceeded(String),
SendMessageError(String),
}
pub type MessageResult<T> = std::result::Result<T, MasterHandlerError>;
#[async_trait::async_trait]
pub trait MasterHandler: Send + Sync + 'static {
// Create a new routine to handle incoming messages from the master
// This method will auto-reconnect if the connection is lost, so it should run indefinitely until the agent is shut down
async fn start_handle_master_message(&self) -> MessageResult<()>;
async fn stop_handle_master_message(&self) -> MessageResult<()>;
// Send a message to the master, response should be handled by the agent message handler registered
async fn send_message_to_master(&self, message: AgentMessage) -> MessageResult<()>;
}
struct MessageHandleInfo {
tx: mpsc::Sender<AgentMessage>,
// used to signal the running handler/connection to stop
cancel: CancellationToken,
}
pub struct MasterHandlerImpl<MMH>
where
MMH: MasterMessageHandler + ?Sized,
{
agent_id: String,
connector: Arc<MasterConnector>,
message_handler: Arc<MMH>,
message_handle_lock: tokio::sync::RwLock<Option<MessageHandleInfo>>,
}
impl<MMH> MasterHandlerImpl<MMH>
where
MMH: MasterMessageHandler + ?Sized,
{
pub fn new(agent_id: &str, connector: Arc<MasterConnector>, message_handler: Arc<MMH>) -> Self {
Self {
agent_id: agent_id.to_string(),
connector,
message_handler,
message_handle_lock: tokio::sync::RwLock::new(None),
}
}
}
#[async_trait::async_trait]
impl<MMH> MasterHandler for MasterHandlerImpl<MMH>
where
MMH: MasterMessageHandler + ?Sized,
{
async fn start_handle_master_message(&self) -> MessageResult<()> {
info!("Starting master message handler...");
let mut client = self.connector.get_client();
// ensure only one caller can start the handler
// create the cancel token for the lifetime of this handler invocation
let cancel_token = CancellationToken::new();
{
let mut guard = self.message_handle_lock.write().await;
if guard.is_some() {
warn!("Master message handler is already running");
return Ok(());
}
// placeholder tx; will be replaced per-connection
let (tx, _rx) = mpsc::channel(1);
*guard = Some(MessageHandleInfo {
tx,
cancel: cancel_token.clone(),
});
}
'connection_loop: loop {
// fresh outbound channel per connection
let (tx, rx) = mpsc::channel(32);
let outbound_stream = ReceiverStream::new(rx);
// try to connect
let mut stream = match client.stream(outbound_stream).await {
Ok(s) => s.into_inner(),
Err(e) => {
error!(
"Failed to connect to master: {}. Retrying in 5 seconds...",
e
);
// update stored sender so any callers see the current tx
{
let mut guard = self.message_handle_lock.write().await;
if let Some(info) = guard.as_mut() {
info.tx = tx.clone();
}
}
let conn_token = cancel_token.child_token();
tokio::select! {
_ = conn_token.cancelled() => break 'connection_loop,
_ = tokio::time::sleep(std::time::Duration::from_secs(5)) => continue 'connection_loop,
}
}
};
// store current tx so senders can use it
{
let mut guard = self.message_handle_lock.write().await;
if let Some(info) = guard.as_mut() {
info.tx = tx.clone();
}
}
// connection-level token to observe stop requests
let conn_token = cancel_token.child_token();
info!("Connected to master, starting to receive messages...");
// process messages inline so we can clear the slot on exit
'message_processing: loop {
tokio::select! {
_ = conn_token.cancelled() => {
info!("Stop requested for master handler");
break 'connection_loop;
}
message = stream.message() => {
match message {
Ok(Some(msg)) => {
if let Err(e) = self.message_handler.handle_master_message(&self.agent_id, msg).await {
error!("Failed to handle master message: {:?}", e);
}
continue;
}
Ok(None) => {
warn!("Master closed the connection");
break 'message_processing;
}
Err(e) => {
error!("Error receiving message from master: {:?}", e);
break 'message_processing;
}
}
}
}
}
// connection ended — clear stored info
{
let mut guard = self.message_handle_lock.write().await;
guard.take();
}
// if stop requested, exit
if cancel_token.is_cancelled() {
break 'connection_loop;
}
// otherwise reconnect after backoff
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
// final cleanup
let mut guard = self.message_handle_lock.write().await;
guard.take();
Ok(())
}
async fn stop_handle_master_message(&self) -> MessageResult<()> {
// Signal the running handler to stop and wait for it to clear
let mut maybe_cancel = None;
{
let mut guard = self.message_handle_lock.write().await;
if let Some(info) = guard.take() {
maybe_cancel = Some(info.cancel);
}
}
if let Some(cancel) = maybe_cancel {
cancel.cancel();
// wait for the handler to clear (with timeout)
for _ in 0..50 {
if self.message_handle_lock.read().await.is_none() {
info!("Master message handler task stopped successfully");
return Ok(());
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
warn!("Timed out waiting for master message handler to stop");
} else {
warn!("Master message handler is not running");
}
Ok(())
}
async fn send_message_to_master(&self, message: AgentMessage) -> MessageResult<()> {
let guard = self.message_handle_lock.read().await;
if let Some(handle_info) = guard.as_ref() {
handle_info.tx.send(message).await.map_err(|e| {
MasterHandlerError::SendMessageError(format!(
"Failed to send message to master: {}",
e
))
})?;
} else {
return Err(MasterHandlerError::SendMessageError(
"Master message handler is not running".to_string(),
));
}
Ok(())
}
}

View File

@@ -0,0 +1,63 @@
use std::sync::{Arc, Mutex};
use crate::{
config::settings::Settings,
connector::master::{MasterConnector, ssh::SshMasterConnector},
service::{
master_handler::{MasterHandler, MasterHandlerImpl, handlers::HandlerImpl},
nginx_handler::{NginxHandler, NginxHandlerImpl},
},
};
pub mod master_handler;
pub mod nginx_handler;
pub struct Services {
pub master_handler: Arc<dyn MasterHandler>,
pub nginx_handler: Arc<dyn NginxHandler>,
}
pub async fn get_services(settings: Arc<Settings>) -> anyhow::Result<Services> {
let master_connector = initialize_master_connector(settings.clone()).await?;
let master_connector = Arc::new(master_connector);
let master_handler_slot = Arc::new(Mutex::new(None));
let slot = master_handler_slot.clone();
#[expect(clippy::expect_used)]
let nginx_handler = Arc::new_cyclic(|nginx_handler_weak| {
let message_handler = Arc::new(HandlerImpl::new(
nginx_handler_weak.clone(),
nginx_handler_weak.clone(),
));
let master_handler = Arc::new(MasterHandlerImpl::new(
settings.agent_id.as_str(),
master_connector.clone(),
message_handler,
));
*slot.lock().expect("master handler slot lock poisoned") = Some(master_handler.clone());
NginxHandlerImpl::new(settings.nginx.clone().into(), master_handler)
});
#[expect(clippy::expect_used)]
let master_handler = master_handler_slot
.lock()
.expect("master handler slot lock poisoned")
.clone()
.ok_or_else(|| anyhow::anyhow!("Failed to initialize master handler"))?;
Ok(Services {
master_handler,
nginx_handler,
})
}
async fn initialize_master_connector(settings: Arc<Settings>) -> anyhow::Result<MasterConnector> {
let ssh_connector = SshMasterConnector::new(settings.grpc.clone())
.await
.map_err(|e| anyhow::anyhow!("Failed to initialize SSH connector: {}", e))?;
let master_connector = MasterConnector::new(Box::new(ssh_connector));
Ok(master_connector)
}

View File

@@ -0,0 +1,252 @@
use std::sync::Arc;
use thiserror::Error;
use tokio::process::Command;
use tracing::{debug, warn};
use crate::{config::settings::NginxSettings, service::master_handler::MasterHandlerError};
#[cfg(test)]
use mockall::predicate::*;
#[derive(Debug, Error)]
pub enum CommandHandlerError {
#[error("Failed to execute command: {0}")]
CommandExecutionError(#[from] std::io::Error),
#[error("Invalid config path: {0}")]
InvalidConfigPath(String),
#[error("Invalid output path: {0}")]
InvalidOutputPath(String),
#[error("Permission denied: {0}")]
PermissionDenied(String),
#[error("Other error: {0}")]
OtherError(String),
}
impl From<CommandHandlerError> for MasterHandlerError {
fn from(err: CommandHandlerError) -> Self {
MasterHandlerError::MessageHandlingError(err.to_string())
}
}
pub type CommandHandlerResult<T> = std::result::Result<T, CommandHandlerError>;
type Result<T> = CommandHandlerResult<T>;
#[async_trait::async_trait]
#[cfg_attr(test, mockall::automock)]
pub trait CommandHandler: Send + Sync + 'static {
// Reload nginx to apply new config. The config_path is an optional parameter that specifies the path to the nginx config file to be used for this reload operation. If not provided, the default config path will be used.
async fn reload(&self, config_path: Option<&str>) -> Result<()>;
async fn stop(&self) -> Result<()>;
async fn validate(&self, config_path: Option<&str>) -> Result<()>;
async fn get_version(&self) -> Result<String>;
async fn get_status(&self) -> Result<String>;
}
pub struct CommandHandlerImpl {
settings: Arc<NginxSettings>,
}
impl CommandHandlerImpl {
pub fn new(settings: Arc<NginxSettings>) -> Self {
Self { settings }
}
fn get_nginx_command(&self) -> String {
// TODO: rename the setting for better clarity, it can be a binary path or a custom command
self.settings
.nginx_binary_path
.clone()
.unwrap_or_else(|| "nginx".to_string())
}
fn validate_config_path(config_path: &str) -> Result<()> {
if !std::path::Path::new(config_path).exists() {
return Err(CommandHandlerError::InvalidConfigPath(format!(
"Config file not found at path: {}",
config_path
)));
}
if !std::path::Path::new(config_path).is_file() {
return Err(CommandHandlerError::InvalidConfigPath(format!(
"Config path is not a file: {}",
config_path
)));
}
Ok(())
}
fn apply_config_path_to_command_vecs<'a>(
command: &'a mut Vec<String>,
config_path: &str,
) -> Result<&'a mut Vec<String>> {
// if given a config path, add it to the end of the command arguments to override the default config path used
Self::validate_config_path(config_path)?;
let parent_dir = match std::path::Path::new(config_path).parent() {
Some(dir) => dir,
// return root
None => std::path::Path::new("/"),
};
// set prefix path to the parent directory of the config file to ensure nginx can find all related files (e.g. certs, conf.d, etc.)
command.push("-p".to_string());
command.push(parent_dir.to_string_lossy().to_string());
// add the config file path to the command arguments to override the default config path used by nginx
command.push("-c".to_string());
command.push(config_path.to_string());
Ok(command)
}
}
#[async_trait::async_trait]
impl CommandHandler for CommandHandlerImpl {
async fn reload(&self, config_path: Option<&str>) -> Result<()> {
// TODO: add timeout for the command execution
let reload_command_str = self.settings.override_nginx_reload_command.clone();
let program = match reload_command_str.first() {
Some(cmd) => cmd,
None => &self.get_nginx_command(),
};
let mut reload_command_vec = reload_command_str[1..].to_vec();
// if given a config path, add it to the end of the command arguments to override the default config path used
if let Some(path) = config_path {
Self::apply_config_path_to_command_vecs(&mut reload_command_vec, path)?;
}
let output = Command::new(program)
.args(&reload_command_vec)
.output()
.await?;
if !output.status.success() {
let error_info = String::from_utf8_lossy(&output.stderr);
return Err(CommandHandlerError::CommandExecutionError(
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to reload nginx: {}", error_info.trim()),
),
));
}
let success_info = String::from_utf8_lossy(&output.stdout);
debug!("Nginx reloaded successfully: {}", success_info.trim());
Ok(())
}
async fn stop(&self) -> Result<()> {
let output = Command::new(self.get_nginx_command())
.arg("-s")
.arg("stop")
.output()
.await?;
if !output.status.success() {
let error_info = String::from_utf8_lossy(&output.stderr);
return Err(CommandHandlerError::CommandExecutionError(
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to stop nginx: {}", error_info.trim()),
),
));
}
let success_info = String::from_utf8_lossy(&output.stdout);
debug!("Nginx stopped successfully: {}", success_info.trim());
Ok(())
}
async fn validate(&self, config_path: Option<&str>) -> Result<()> {
// TODO: add timeout for the command execution
let validate_command_str = self.settings.override_nginx_test_command.clone();
let program = match validate_command_str.first() {
Some(cmd) => cmd,
None => &self.get_nginx_command(),
};
let mut validate_args = validate_command_str[1..].to_vec();
// if given a config path, add it to the end of the command arguments to override the default config path used
if let Some(path) = config_path {
Self::apply_config_path_to_command_vecs(&mut validate_args, path)?;
}
let output = Command::new(program).args(&validate_args).output().await?;
if !output.status.success() {
let error_info = String::from_utf8_lossy(&output.stderr);
return Err(CommandHandlerError::CommandExecutionError(
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to validate nginx config: {}", error_info.trim()),
),
));
}
let success_info = String::from_utf8_lossy(&output.stdout);
debug!("Nginx config validation succeeded: {}", success_info.trim());
Ok(())
}
async fn get_version(&self) -> Result<String> {
let output = Command::new(self.get_nginx_command())
.arg("-v")
.output()
.await?;
if !output.status.success() {
let error_info = String::from_utf8_lossy(&output.stderr);
return Err(CommandHandlerError::CommandExecutionError(
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to get nginx version: {}", error_info.trim()),
),
));
}
let version_info = String::from_utf8_lossy(&output.stderr);
Ok(version_info.trim().to_string())
}
async fn get_status(&self) -> Result<String> {
let output = Command::new(self.get_nginx_command())
.arg("-t")
.output()
.await?;
if !output.status.success() {
let error_info = String::from_utf8_lossy(&output.stderr);
return Err(CommandHandlerError::CommandExecutionError(
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to get nginx status: {}", error_info.trim()),
),
));
}
let status_info = String::from_utf8_lossy(&output.stderr);
Ok(status_info.trim().to_string())
}
}
#[cfg(test)]
#[allow(clippy::expect_used)]
mod tests {
use tempfile::TempDir;
use super::*;
#[tokio::test]
async fn apply_config_path_to_command_vecs_appends_prefix_and_config() -> Result<()> {
let temp = TempDir::new()?;
let cfg_file = temp.path().join("nginx.conf");
tokio::fs::write(&cfg_file, b"data").await?;
let mut args: Vec<String> = vec!["base".to_string()];
let result = CommandHandlerImpl::apply_config_path_to_command_vecs(
&mut args,
&cfg_file.to_string_lossy(),
);
assert!(result.is_ok());
let args = result.expect("Failed to apply config path to command vecs");
// expect -p <parent_dir> -c <config>
assert!(args.contains(&"-p".to_string()));
assert!(args.contains(&"-c".to_string()));
assert!(args.contains(&cfg_file.to_string_lossy().to_string()));
Ok(())
}
}

View File

@@ -0,0 +1,448 @@
use std::sync::Arc;
use fs4::tokio::AsyncFileExt;
use thiserror::Error;
use tokio::{io::AsyncWriteExt, process::Command};
use tracing::warn;
use crate::{config::settings::NginxSettings, service::master_handler::MasterHandlerError};
#[cfg(test)]
use mockall::predicate::*;
// TODO: custom error type
#[derive(Debug, Error)]
pub enum FsHandlerError {
#[error("Invalid output path: {0}")]
InvalidOutputPath(String),
#[error("IO error: {0}")]
IoError(#[from] std::io::Error),
}
impl From<FsHandlerError> for MasterHandlerError {
fn from(err: FsHandlerError) -> Self {
MasterHandlerError::MessageHandlingError(format!("File system handling error: {}", err))
}
}
pub type FsResult<T> = std::result::Result<T, FsHandlerError>;
type Result<T> = FsResult<T>;
#[async_trait::async_trait]
#[cfg_attr(test, mockall::automock)]
pub trait FsHandler: Send + Sync + 'static {
fn get_deployment_id(config_id: &str, version: &str) -> String
where
Self: Sized,
{
format!("{}-{}", config_id, version)
}
// Write a new config file for nginx.
// The output_path is a relative path to the nginx config directory of the deployment folder. The actual path to the config should not be assumed by the caller, as it can be different in different environments, but will be promised to be relative to the deployment folder for each the corresponding deployment_id. Path traversal is not allowed.
async fn write_config(
&self,
deployment_id: &str,
config_content: &str,
output_path: &str,
) -> Result<String>;
// Append a new config content to an existing config file for nginx. This is useful for some use cases where we want to keep the existing config and just add some new config content to it. The output_path is a relative path to the nginx config directory of the deployment folder, which should be the same as the one used in write_config function. Path traversal is not allowed.
async fn append_config(
&self,
deployment_id: &str,
config_content: &str,
output_path: &str,
) -> Result<String>;
// clean up old config files that are applied to nginx
// keep only latest n deployments.
async fn cleanup_config(&self, n: usize) -> Result<()>;
// Persist the root config path of the last successful deployment.
// Survives agent restarts so Reload/Test commands work without a new ConfigUpdate.
async fn save_last_deployment(&self, root_config_path: &str) -> Result<()>;
// Load the last persisted root config path, if any.
// Returns Ok(None) when no state file exists or it is empty/corrupt.
async fn load_last_deployment(&self) -> Result<Option<String>>;
}
pub struct FsHandlerImpl {
settings: Arc<NginxSettings>,
}
impl FsHandlerImpl {
pub fn new(settings: Arc<NginxSettings>) -> Self {
Self { settings }
}
fn validate_config_path(config_path: &str) -> Result<()> {
if !std::path::Path::new(config_path).exists() {
return Err(FsHandlerError::InvalidOutputPath(format!(
"Config file not found at path: {}",
config_path
)));
}
if !std::path::Path::new(config_path).is_file() {
return Err(FsHandlerError::InvalidOutputPath(format!(
"Config path is not a file: {}",
config_path
)));
}
Ok(())
}
fn get_deployment_dir(&self) -> std::path::PathBuf {
std::path::Path::new(&self.settings.nginx_config_path).join("deployments")
}
fn get_deployment_dir_path(&self, deployment_id: &str) -> std::path::PathBuf {
self.get_deployment_dir().join(deployment_id)
}
fn get_state_file_path(&self) -> std::path::PathBuf {
std::path::Path::new(&self.settings.nginx_config_path).join(".last_deployment")
}
async fn get_deployment_config_path(
&self,
deployment_id: &str,
output_path: &str,
create_dir_if_not_exists: bool,
) -> Result<std::path::PathBuf> {
let output_path_obj = std::path::Path::new(output_path);
if output_path_obj.is_absolute() {
return Err(FsHandlerError::InvalidOutputPath(
"Output path must be a relative path".into(),
));
}
if output_path_obj
.components()
.any(|comp| comp == std::path::Component::ParentDir)
{
return Err(FsHandlerError::InvalidOutputPath(
"Output path must not contain parent directory traversal".into(),
));
}
let deployment_config_dir = self.get_deployment_dir_path(deployment_id);
let full_path = deployment_config_dir.join(output_path);
if create_dir_if_not_exists {
if let Some(parent) = full_path.parent() {
tokio::fs::create_dir_all(parent).await?;
} else {
tokio::fs::create_dir_all(&deployment_config_dir).await?;
}
}
Ok(full_path)
}
}
#[async_trait::async_trait]
impl FsHandler for FsHandlerImpl {
async fn write_config(
&self,
deployment_id: &str,
config_content: &str,
output_path: &str,
) -> Result<String> {
let full_output_path = self
.get_deployment_config_path(deployment_id, output_path, true)
.await?;
let parent_dir = full_output_path.parent().ok_or_else(|| {
FsHandlerError::InvalidOutputPath(format!(
"Failed to get parent directory of output path: {:?}",
full_output_path
))
})?;
// ensure the parent directory exists before creating the file
tokio::fs::create_dir_all(parent_dir).await?;
let mut file = tokio::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(full_output_path.clone())
.await?;
// lock the file for writing to prevent concurrent write issue
file.lock_exclusive()?;
file.write_all(config_content.as_bytes()).await?;
file.unlock()?;
file.flush().await?;
Ok(full_output_path.to_string_lossy().to_string())
}
async fn append_config(
&self,
deployment_id: &str,
config_content: &str,
output_path: &str,
) -> Result<String> {
let full_output_path = self
.get_deployment_config_path(deployment_id, output_path, true)
.await?;
let mut file = tokio::fs::OpenOptions::new()
.write(true)
.create(true)
.append(true)
.open(full_output_path.clone())
.await?;
// lock the file for writing to prevent concurrent write issue
file.lock_exclusive()?;
file.write_all(config_content.as_bytes()).await?;
file.unlock()?;
file.flush().await?;
Ok(full_output_path.to_string_lossy().to_string())
}
async fn save_last_deployment(&self, root_config_path: &str) -> Result<()> {
let state_path = self.get_state_file_path();
let tmp_path = state_path.with_extension("tmp");
tokio::fs::write(&tmp_path, format!("{}\n", root_config_path)).await?;
tokio::fs::rename(&tmp_path, &state_path).await?;
Ok(())
}
async fn load_last_deployment(&self) -> Result<Option<String>> {
// primary: try state file
let state_path = self.get_state_file_path();
if state_path.exists() {
let content = tokio::fs::read_to_string(&state_path).await?;
let path = content.trim().to_string();
if !path.is_empty() {
return Ok(Some(path));
}
}
// fallback: scan deployments directory for the newest deployment
let deployment_dir = self.get_deployment_dir();
if !deployment_dir.exists() {
return Ok(None);
}
let mut entries = tokio::fs::read_dir(&deployment_dir).await?;
let mut candidates: Vec<(std::path::PathBuf, std::time::SystemTime)> = Vec::new();
while let Some(entry) = entries.next_entry().await? {
if entry.file_type().await.map_or(false, |t| t.is_dir()) {
if let Ok(mtime) = entry.metadata().await.and_then(|m| m.modified()) {
candidates.push((entry.path(), mtime));
}
}
}
// sort descending by mtime (newest first)
candidates.sort_by(|a, b| b.1.cmp(&a.1));
for (dir, _) in &candidates {
let mut dir_entries = tokio::fs::read_dir(dir).await?;
while let Some(file) = dir_entries.next_entry().await? {
if file.file_type().await.map_or(false, |t| t.is_file()) {
let name = file.file_name().to_string_lossy().to_string();
if name == "nginx.conf" || name.ends_with(".conf") {
let path = file.path().to_string_lossy().to_string();
return Ok(Some(path));
}
}
}
}
Ok(None)
}
async fn cleanup_config(&self, n: usize) -> Result<()> {
let deployment_dir = self.get_deployment_dir();
// loop through all files in the deployment dir and delete them
let mut entries = tokio::fs::read_dir(&deployment_dir).await?;
let mut deployment_ids = Vec::new();
while let Some(entry) = entries.next_entry().await? {
let file_type = entry.file_type().await?;
if file_type.is_dir()
&& let Some(deployment_id) = entry.file_name().to_str()
{
deployment_ids.push(deployment_id.to_string());
}
}
// sort the deployment ids by modified time in descending order and keep the latest n deployments, delete the rest
deployment_ids.sort_by_key(|id| {
let path = self.get_deployment_dir_path(id);
std::fs::metadata(path)
.and_then(|meta| meta.modified())
.unwrap_or(std::time::SystemTime::UNIX_EPOCH)
});
for deployment_id in deployment_ids.into_iter().skip(n) {
let path = self.get_deployment_dir_path(&deployment_id);
// ensure path is within the deplyment and nginx directory to prevent accidental deletion of other files
if !path.starts_with(&deployment_dir)
|| !path.starts_with(&self.settings.nginx_config_path)
{
warn!(
"Skipping deletion of path outside of deployment or nginx config directory: {:?}",
path
);
continue;
}
tokio::fs::remove_dir_all(path).await?;
}
Ok(())
}
}
#[cfg(test)]
#[allow(clippy::expect_used)]
mod tests {
use super::*;
use anyhow::Result;
use std::sync::Arc;
use tempfile::TempDir;
#[tokio::test]
async fn write_and_append_config_roundtrip() -> Result<()> {
let temp = TempDir::new()?;
let settings = NginxSettings {
nginx_config_path: temp.path().to_string_lossy().to_string(),
nginx_binary_path: None,
override_nginx_reload_command: vec![],
override_nginx_test_command: vec![],
nginx_reload_timeout_seconds: 1,
nginx_test_timeout_seconds: 1,
};
let handler = FsHandlerImpl::new(Arc::new(settings));
handler
.write_config("deployment1", "hello", "conf/nginx.conf")
.await?;
let full_path = temp
.path()
.join("deployments")
.join("deployment1")
.join("conf/nginx.conf");
let content = tokio::fs::read_to_string(&full_path).await?;
assert_eq!(content, "hello");
handler
.append_config("deployment1", " world", "conf/nginx.conf")
.await?;
let content = tokio::fs::read_to_string(&full_path).await?;
assert_eq!(content, "hello world");
Ok(())
}
#[tokio::test]
async fn write_config_rejects_absolute_and_traversal_paths() -> Result<()> {
let temp = TempDir::new()?;
let settings = NginxSettings {
nginx_config_path: temp.path().to_string_lossy().to_string(),
nginx_binary_path: None,
override_nginx_reload_command: vec![],
override_nginx_test_command: vec![],
nginx_reload_timeout_seconds: 1,
nginx_test_timeout_seconds: 1,
};
let handler = FsHandlerImpl::new(Arc::new(settings));
let err = handler
.write_config("d", "x", "/absolute/path.conf")
.await
.err();
assert!(err.is_some());
let err = handler.write_config("d", "x", "../escape.conf").await.err();
assert!(err.is_some());
Ok(())
}
#[tokio::test]
async fn validate_config_path_checks_file_exists_and_is_file() {
// missing file
let res = FsHandlerImpl::validate_config_path("/this/path/does/not/exist.conf");
assert!(res.is_err());
// create a temp dir and ensure a directory is rejected
let temp = TempDir::new().expect("Failed to create temp dir");
let dir_path = temp.path();
let res = FsHandlerImpl::validate_config_path(dir_path.to_string_lossy().as_ref());
assert!(res.is_err());
}
#[tokio::test]
async fn get_deployment_config_path_create_flag_behaviour() -> Result<()> {
let temp = TempDir::new()?;
let settings = NginxSettings {
nginx_config_path: temp.path().to_string_lossy().to_string(),
nginx_binary_path: None,
override_nginx_reload_command: vec![],
override_nginx_test_command: vec![],
nginx_reload_timeout_seconds: 1,
nginx_test_timeout_seconds: 1,
};
let handler = FsHandlerImpl::new(Arc::new(settings));
// when create_dir_if_not_exists = false, directory shouldn't be created
let path = handler
.get_deployment_config_path("did", "conf/nginx.conf", false)
.await?;
assert!(
!path
.parent()
.expect("Failed to get parent directory of deployment config path")
.exists()
);
// when create_dir_if_not_exists = true, directory should be created
let path = handler
.get_deployment_config_path("did", "conf/nginx.conf", true)
.await?;
assert!(
path.parent()
.expect("Failed to get parent directory of deployment config path")
.exists()
);
Ok(())
}
#[tokio::test]
async fn cleanup_config_deletes_expected_deployments() -> Result<()> {
let temp = TempDir::new()?;
let settings = NginxSettings {
nginx_config_path: temp.path().to_string_lossy().to_string(),
nginx_binary_path: None,
override_nginx_reload_command: vec![],
override_nginx_test_command: vec![],
nginx_reload_timeout_seconds: 1,
nginx_test_timeout_seconds: 1,
};
let handler = FsHandlerImpl::new(Arc::new(settings));
let base = temp.path().join("deployments");
// create three deployments sequentially so mtimes differ
for id in &["d1", "d2", "d3"] {
let p = base.join(id);
std::fs::create_dir_all(&p)?;
std::fs::write(p.join("file"), b"x")?;
std::thread::sleep(std::time::Duration::from_millis(500));
}
// call cleanup keeping 1; current implementation keeps the oldest n, so expect only d1 remains
handler.cleanup_config(1).await?;
let mut exists = vec![];
for id in &["d1", "d2", "d3"] {
exists.push((id.to_string(), base.join(id).exists()));
}
// d1 should remain, others removed (matches current implementation behavior)
assert!(exists.iter().find(|(id, e)| id == "d1" && *e).is_some());
assert!(exists.iter().find(|(id, e)| id == "d2" && !*e).is_some());
assert!(exists.iter().find(|(id, e)| id == "d3" && !*e).is_some());
Ok(())
}
}

View File

@@ -0,0 +1,185 @@
use std::sync::Arc;
use dashmap::DashMap;
use nxmesh_proto::{
ConfigUpdate, ConfigUpdateResult,
agent_message::Payload::ConfigUpdateResult as ConfigUpdateResultPayload, command::Command,
command_result,
};
use tracing::{info, warn};
use crate::{
config::settings::NginxSettings,
service::{
master_handler::{
MasterHandler, MessageResult,
handlers::{OnCommandHandler, OnConfigUpdateHandler},
},
nginx_handler::{command_handler::CommandHandler, fs_handler::FsHandler},
},
};
const DEFAULT_CONFIG_PATH: &str = "nginx.conf";
const DEFAULT_NGINX_CONFIG_CONTENT: &str = r#"
events {}
"#;
pub trait NginxMasterMessageHandler: Send + Sync + 'static
//
+ OnConfigUpdateHandler
+ OnCommandHandler
{}
pub struct NginxMasterMessageHandlerImpl {
settings: Arc<NginxSettings>,
command_handler: Arc<dyn CommandHandler>,
fs_handler: Arc<dyn FsHandler>,
master_handler: Arc<dyn MasterHandler>,
//
// dash_map for for storing the on-going config updates, with the key as deployment_id, and the value as a tuple of (version_id, timestamp). On-going update must lock the deployment_id, and the new update with newer timestamp will wait until the lock is released. This is to ensure the config updates are applied in order.
// When the current timestamp is older than the timestamp in the map, the current update must be rejected, and the master should be informed to resend the update with the latest timestamp.
ongoing_updates: DashMap<String, (String, i64)>,
}
impl NginxMasterMessageHandlerImpl {
pub fn new(
settings: Arc<NginxSettings>,
command_handler: Arc<dyn CommandHandler>,
fs_handler: Arc<dyn FsHandler>,
master_handler: Arc<dyn MasterHandler>,
) -> Self {
Self {
settings,
command_handler,
fs_handler,
master_handler,
ongoing_updates: DashMap::new(),
}
}
}
impl NginxMasterMessageHandler for NginxMasterMessageHandlerImpl {}
#[async_trait::async_trait]
impl OnConfigUpdateHandler for NginxMasterMessageHandlerImpl {
async fn on_config_update(
&self,
agent_id: &str,
timestamp: i64,
message_id: &str,
config_info: ConfigUpdate,
) -> MessageResult<()> {
// TODO: handle concurrency, expect only the latest version with latest timestamp is applied
// when a newer config update comes in, and the older config update is still being processed. The new config will wait until the old config is applied.
let deployment_id = format!("{}-{}", config_info.config_id, config_info.version);
// write the configs
let root_config_path = match config_info.root_config {
Some(config_content) => {
self.fs_handler
.write_config(
&deployment_id,
&config_content.content,
&config_content.path,
)
.await?
}
None => {
// If the config content is not provided, write a default config to ensure the deployment folder is created and can be used for later updates.
warn!(
"Config content is not provided for config update, writing a default minimal config for deployment_id: {}",
deployment_id
);
self.fs_handler
.write_config(
&deployment_id,
DEFAULT_NGINX_CONFIG_CONTENT,
DEFAULT_CONFIG_PATH,
)
.await?
}
};
//
for config in config_info.configs {
self.fs_handler
.write_config(&deployment_id, &config.content, &config.path)
.await?;
}
// apply reload on the root config
self.command_handler.reload(Some(&root_config_path)).await?;
// persist deployment path so Reload/Test commands survive agent restarts
self.fs_handler.save_last_deployment(&root_config_path).await?;
info!("Persisted last deployment path: {}", root_config_path);
// Reply the master to confirm the config update is successful
self.master_handler
.send_message_to_master(nxmesh_proto::AgentMessage {
agent_id: agent_id.to_string(),
timestamp,
message_id: message_id.to_string(),
payload: Some(ConfigUpdateResultPayload(ConfigUpdateResult {
success: true,
error_message: None,
config_id: config_info.config_id,
version: config_info.version,
})),
})
.await?;
//
Ok(())
}
}
#[async_trait::async_trait]
impl OnCommandHandler for NginxMasterMessageHandlerImpl {
async fn on_command(
&self,
agent_id: &str,
timestamp: i64,
message_id: &str,
command: Command,
) -> MessageResult<()> {
// execute the command
let mut agent_message = nxmesh_proto::AgentMessage {
agent_id: agent_id.to_string(),
timestamp,
message_id: message_id.to_string(),
payload: None,
};
// load the last known deployment path for use with Reload/Test commands
let last_config_path = self.fs_handler.load_last_deployment().await?;
let result: command_result::Result = match command {
Command::Reload(_) => {
let result = self
.command_handler
.reload(last_config_path.as_deref())
.await;
command_result::Result::ReloadResult(nxmesh_proto::ReloadResult {
success: result.is_ok(),
error_message: result.err().map(|e| e.to_string()).unwrap_or_default(),
})
}
Command::Test(_) => {
let result = self
.command_handler
.validate(last_config_path.as_deref())
.await;
command_result::Result::TestResult(nxmesh_proto::TestResult {
success: result.is_ok(),
error_message: result.err().map(|e| e.to_string()).unwrap_or_default(),
})
}
};
// Reply the master to confirm the command execution is successful, and return the command output
agent_message.payload = Some(nxmesh_proto::agent_message::Payload::CommandResult(
nxmesh_proto::CommandResult {
result: Some(result),
},
));
self.master_handler
.send_message_to_master(agent_message)
.await?;
//
Ok(())
}
}

View File

@@ -0,0 +1,161 @@
use std::sync::Arc;
use nxmesh_proto::{ConfigUpdate, command::Command};
use crate::{
config::settings::NginxSettings,
service::{
master_handler::{
MasterHandler, MessageResult,
handlers::{OnCommandHandler, OnConfigUpdateHandler},
},
nginx_handler::{
command_handler::{CommandHandler, CommandHandlerImpl, CommandHandlerResult},
fs_handler::{FsHandler, FsHandlerImpl, FsResult},
message_handler::{NginxMasterMessageHandler, NginxMasterMessageHandlerImpl},
},
},
};
mod command_handler;
mod fs_handler;
mod message_handler;
#[cfg(test)]
use mockall::predicate::*;
// TODO: custom error type
#[async_trait::async_trait]
#[cfg_attr(test, mockall::automock)]
pub trait NginxHandler: Send + Sync + 'static {
// Reload nginx to apply new config. The config_path is an optional parameter that specifies the path to the nginx config file to be used for this reload operation. If not provided, the default config path will be used.
async fn reload(&self, config_path: Option<&str>) -> CommandHandlerResult<()>;
async fn stop(&self) -> CommandHandlerResult<()>;
async fn validate(&self, config_path: Option<&str>) -> CommandHandlerResult<()>;
async fn get_version(&self) -> CommandHandlerResult<String>;
async fn get_status(&self) -> CommandHandlerResult<String>;
// Write a new config file for nginx.
// The output_path is a relative path to the nginx config directory of the deployment folder. The actual path to the config should not be assumed by the caller, as it can be different in different environments, but will be promised to be relative to the deployment folder for each the corresponding deployment_id. Path traversal is not allowed.
async fn write_config(
&self,
deployment_id: &str,
config_content: &str,
output_path: &str,
) -> FsResult<String>;
// Append a new config content to an existing config file for nginx. This is useful for some use cases where we want to keep the existing config and just add some new config content to it. The output_path is a relative path to the nginx config directory of the deployment folder, which should be the same as the one used in write_config function. Path traversal is not allowed.
async fn append_config(
&self,
deployment_id: &str,
config_content: &str,
output_path: &str,
) -> FsResult<String>;
// clean up old config files that are applied to nginx
// keep only latest n deployments.
async fn cleanup_config(&self, n: usize) -> FsResult<()>;
}
pub struct NginxHandlerImpl {
settings: Arc<NginxSettings>,
command_handler: Arc<dyn CommandHandler>,
fs_handler: Arc<dyn FsHandler>,
nginx_master_message_handler: Arc<dyn NginxMasterMessageHandler>,
}
impl NginxHandlerImpl {
pub fn new(settings: Arc<NginxSettings>, master_handler: Arc<dyn MasterHandler>) -> Self {
let command_handler: Arc<dyn CommandHandler> =
Arc::new(CommandHandlerImpl::new(settings.clone()));
let fs_handler: Arc<dyn FsHandler> = Arc::new(FsHandlerImpl::new(settings.clone()));
Self {
settings: settings.clone(),
command_handler: command_handler.clone(),
fs_handler: fs_handler.clone(),
nginx_master_message_handler: Arc::new(NginxMasterMessageHandlerImpl::new(
settings.clone(),
command_handler.clone(),
fs_handler.clone(),
master_handler,
)),
}
}
}
#[async_trait::async_trait]
impl NginxHandler for NginxHandlerImpl {
async fn reload(&self, config_path: Option<&str>) -> CommandHandlerResult<()> {
self.command_handler.reload(config_path).await
}
async fn stop(&self) -> CommandHandlerResult<()> {
self.command_handler.stop().await
}
async fn validate(&self, config_path: Option<&str>) -> CommandHandlerResult<()> {
self.command_handler.validate(config_path).await
}
async fn get_version(&self) -> CommandHandlerResult<String> {
self.command_handler.get_version().await
}
async fn get_status(&self) -> CommandHandlerResult<String> {
self.command_handler.get_status().await
}
async fn write_config(
&self,
deployment_id: &str,
config_content: &str,
output_path: &str,
) -> FsResult<String> {
self.fs_handler
.write_config(deployment_id, config_content, output_path)
.await
}
async fn append_config(
&self,
deployment_id: &str,
config_content: &str,
output_path: &str,
) -> FsResult<String> {
self.fs_handler
.append_config(deployment_id, config_content, output_path)
.await
}
async fn cleanup_config(&self, n: usize) -> FsResult<()> {
self.fs_handler.cleanup_config(n).await
}
}
#[async_trait::async_trait]
impl OnConfigUpdateHandler for NginxHandlerImpl {
async fn on_config_update(
&self,
agent_id: &str,
timestamp: i64,
message_id: &str,
config_info: ConfigUpdate,
) -> MessageResult<()> {
self.nginx_master_message_handler
.on_config_update(agent_id, timestamp, message_id, config_info)
.await
}
}
#[async_trait::async_trait]
impl OnCommandHandler for NginxHandlerImpl {
async fn on_command(
&self,
agent_id: &str,
timestamp: i64,
message_id: &str,
command: Command,
) -> MessageResult<()> {
self.nginx_master_message_handler
.on_command(agent_id, timestamp, message_id, command)
.await
}
}

View File

@@ -1,586 +0,0 @@
use config::{Config, ConfigError, Environment, File};
use rcgen::string::Ia5String;
use serde::{Deserialize, Deserializer, Serialize};
use std::{net::IpAddr, str::FromStr};
use tracing::level_filters::LevelFilter;
type ValidationError = String;
trait Validate {
fn validate(&self) -> Result<(), ValidationError>;
}
/// Master server settings
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Settings {
pub server: ServerSettings,
pub database: DatabaseSettings,
pub grpc: GrpcSettings,
pub auth: AuthSettings,
#[serde(default)]
pub log: LogSettings,
}
/// HTTP server settings
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServerSettings {
#[serde(default = "default_server_bind_address")]
pub bind_address: String,
#[serde(default = "default_server_port")]
pub port: u16,
#[serde(default)]
pub certificate: CertificateSettings,
#[serde(default)]
pub cors: Option<CorsSettings>,
}
/// Database connection settings
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DatabaseSettings {
pub url: String,
pub max_connections: Option<u32>,
}
/// gRPC server settings
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GrpcSettings {
#[serde(default = "default_grpc_bind_address")]
pub bind_address: String,
#[serde(default = "default_grpc_port")]
pub port: u16,
#[serde(default)]
pub certificate: CertificateSettings,
#[serde(default)]
pub cors: Option<CorsSettings>,
}
/// Authentication settings
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuthSettings {
pub jwt_secret: String,
#[serde(default = "default_jwt_expiration_hours")]
pub jwt_expiration_hours: u64,
}
/// TLS certificate settings
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct CertificateSettings {
#[serde(default = "default_cert_folder")]
pub cert_dir: String,
#[serde(
default,
serialize_with = "serialize_ia5string_vec",
deserialize_with = "deserialize_ia5string_vec"
)]
pub san_dns: Vec<Ia5String>,
#[serde(default)]
pub san_ip: Vec<IpAddr>,
#[serde(default)]
cert_path: Option<String>,
#[serde(default)]
key_path: Option<String>,
}
impl CertificateSettings {
pub fn cert_path(&self) -> Option<String> {
self.cert_path
.as_ref()
.map(|p| format!("{}/{}", self.cert_dir, p))
}
pub fn key_path(&self) -> Option<String> {
self.key_path
.as_ref()
.map(|p| format!("{}/{}", self.cert_dir, p))
}
}
/// CORS settings
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct CorsSettings {
#[serde(default)]
pub allowed_origins: Vec<String>,
#[serde(default)]
pub allowed_methods: Vec<String>,
#[serde(default)]
pub allowed_headers: Vec<String>,
#[serde(default)]
pub allow_credentials: bool,
}
/// Logging settings
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogSettings {
#[serde(
deserialize_with = "deserialize_level_filter",
serialize_with = "serialize_level_filter"
)]
pub level: LevelFilter,
}
impl Default for LogSettings {
fn default() -> Self {
Self {
level: default_log_level(),
}
}
}
impl Validate for Settings {
fn validate(&self) -> Result<(), ValidationError> {
self.server.validate()?;
self.grpc.validate()?;
self.database.validate()?;
self.auth.validate()?;
Ok(())
}
}
impl Settings {
/// Load settings from config files and environment
pub fn load() -> Result<Self, ConfigError> {
let run_mode = std::env::var("RUN_MODE").unwrap_or_else(|_| "development".into());
let settings = Config::builder()
.add_source(File::with_name("config/default").required(false))
.add_source(File::with_name(&format!("config/{}", run_mode)).required(false))
.add_source(File::with_name("config/master/default").required(false))
.add_source(File::with_name(&format!("config/master/{}", run_mode)).required(false))
.add_source(Environment::with_prefix("NXMESH").separator("__"))
.build()?;
let settings: Self = settings.try_deserialize()?;
settings.validate().map_err(ConfigError::Message)?;
Ok(settings)
}
}
impl Validate for ServerSettings {
fn validate(&self) -> Result<(), ValidationError> {
if self.bind_address.is_empty() {
return Err("Server bind address cannot be empty".into());
}
if self.port == 0 {
return Err("Server port must be greater than 0".into());
}
self.certificate.validate()?;
if let Some(cors) = &self.cors {
cors.validate()?;
}
Ok(())
}
}
impl Validate for GrpcSettings {
fn validate(&self) -> Result<(), ValidationError> {
if self.bind_address.is_empty() {
return Err("gRPC bind address cannot be empty".into());
}
if self.port == 0 {
return Err("gRPC port must be greater than 0".into());
}
self.certificate.validate()?;
if let Some(cors) = &self.cors {
cors.validate()?;
}
Ok(())
}
}
impl Validate for DatabaseSettings {
fn validate(&self) -> Result<(), ValidationError> {
if self.url.is_empty() {
return Err("Database URL cannot be empty".into());
}
if let Some(max_connections) = self.max_connections
&& max_connections == 0
{
return Err("Max database connections must be greater than 0".into());
}
Ok(())
}
}
impl Validate for AuthSettings {
fn validate(&self) -> Result<(), ValidationError> {
if self.jwt_secret.is_empty() {
return Err("JWT secret cannot be empty".into());
}
if self.jwt_expiration_hours == 0 {
return Err("JWT expiration hours must be greater than 0".into());
}
Ok(())
}
}
impl Validate for CertificateSettings {
fn validate(&self) -> Result<(), ValidationError> {
let base_path = std::path::Path::new(&self.cert_dir);
if !base_path.exists() {
// create the cert directory if it doesn't exist
std::fs::create_dir_all(base_path).map_err(|e| {
format!(
"Failed to create certificate directory {:?}: {}",
base_path, e
)
})?;
}
let cert_path = self.cert_path.as_ref().map(|p| base_path.join(p));
let key_path = self.key_path.as_ref().map(|p| base_path.join(p));
if (cert_path.is_some() && key_path.is_none())
|| (cert_path.is_none() && key_path.is_some())
{
return Err("Both certificate and key paths must be provided for TLS".into());
}
if let (Some(cert_path), Some(key_path)) = (&cert_path, &key_path) {
if !std::path::Path::new(cert_path).exists() {
return Err(format!("Certificate file not found: {:?}", cert_path));
}
if !std::path::Path::new(key_path).exists() {
return Err(format!("Key file not found: {:?}", key_path));
}
}
// validate for SAN entries - must be valid DNS names or IP addresses
for dns in &self.san_dns {
if dns.to_string().is_empty() {
return Err("SAN DNS entries cannot be empty".into());
}
}
for ip in &self.san_ip {
if ip.is_unspecified() {
return Err("SAN IP entries cannot be unspecified".into());
}
}
// require at least one SAN entry for the generated certificate
if self.san_dns.is_empty() && self.san_ip.is_empty() {
return Err(
"At least one SAN entry (DNS or IP) must be provided for the certificate".into(),
);
}
Ok(())
}
}
impl Validate for CorsSettings {
fn validate(&self) -> Result<(), ValidationError> {
Ok(())
}
}
fn default_jwt_expiration_hours() -> u64 {
24
}
fn default_server_bind_address() -> String {
"0.0.0.0".into()
}
fn default_server_port() -> u16 {
8080
}
fn default_grpc_bind_address() -> String {
"0.0.0.0".into()
}
fn default_grpc_port() -> u16 {
50051
}
fn default_log_level() -> LevelFilter {
LevelFilter::INFO
}
fn default_cert_folder() -> String {
"./certs".into()
}
fn deserialize_level_filter<'de, D>(deserializer: D) -> Result<LevelFilter, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
LevelFilter::from_str(&s).map_err(serde::de::Error::custom)
}
fn serialize_level_filter<S>(level: &LevelFilter, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&level.to_string())
}
fn deserialize_ia5string_vec<'de, D>(deserializer: D) -> Result<Vec<Ia5String>, D::Error>
where
D: Deserializer<'de>,
{
let vec = Vec::<String>::deserialize(deserializer)?;
vec.into_iter()
.map(|s| Ia5String::try_from(s).map_err(serde::de::Error::custom))
.collect()
}
fn serialize_ia5string_vec<S>(vec: &Vec<Ia5String>, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let string_vec: Vec<String> = vec.iter().map(|ia5| ia5.to_string()).collect();
string_vec.serialize(serializer)
}
#[cfg(test)]
mod tests {
use std::{
fs,
net::{IpAddr, Ipv4Addr},
path::PathBuf,
time::{SystemTime, UNIX_EPOCH},
};
use super::*;
#[test]
fn test_esnure_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<Settings>();
assert_send_sync::<ServerSettings>();
assert_send_sync::<DatabaseSettings>();
assert_send_sync::<GrpcSettings>();
assert_send_sync::<AuthSettings>();
assert_send_sync::<CertificateSettings>();
assert_send_sync::<CorsSettings>();
assert_send_sync::<LogSettings>();
}
fn make_temp_dir(prefix: &str) -> PathBuf {
let ts = SystemTime::now().duration_since(UNIX_EPOCH);
assert!(ts.is_ok());
let ts = ts.unwrap_or_default();
let path = std::env::temp_dir().join(format!(
"{}_{}_{}",
prefix,
std::process::id(),
ts.as_nanos()
));
let created = fs::create_dir_all(&path);
assert!(created.is_ok());
path
}
#[test]
fn certificate_paths_include_cert_dir() {
let cert = CertificateSettings {
cert_dir: "./certs".to_string(),
san_dns: Vec::new(),
san_ip: Vec::new(),
cert_path: Some("server.crt".to_string()),
key_path: Some("server.key".to_string()),
};
assert_eq!(cert.cert_path(), Some("./certs/server.crt".to_string()));
assert_eq!(cert.key_path(), Some("./certs/server.key".to_string()));
}
#[test]
fn certificate_validate_creates_directory_when_missing() {
let cert_dir = make_temp_dir("nxmesh-master-cert-create").join("nested");
let san = Ia5String::try_from("localhost".to_string());
assert!(san.is_ok());
let san = san.unwrap_or_else(|_| unreachable!());
let cert = CertificateSettings {
cert_dir: cert_dir.to_string_lossy().to_string(),
san_dns: vec![san],
san_ip: Vec::new(),
cert_path: None,
key_path: None,
};
let result = cert.validate();
assert!(result.is_ok());
assert!(cert_dir.exists());
let _ = fs::remove_dir_all(cert_dir.parent().unwrap_or(&cert_dir));
}
#[test]
fn certificate_validate_fails_when_only_cert_path_is_set() {
let cert_dir = make_temp_dir("nxmesh-master-cert-partial");
let san = Ia5String::try_from("localhost".to_string());
assert!(san.is_ok());
let san = san.unwrap_or_else(|_| unreachable!());
let cert = CertificateSettings {
cert_dir: cert_dir.to_string_lossy().to_string(),
san_dns: vec![san],
san_ip: Vec::new(),
cert_path: Some("server.crt".to_string()),
key_path: None,
};
let result = cert.validate();
assert!(result.is_err());
let msg = result.err().unwrap_or_default();
assert!(msg.contains("Both certificate and key paths must be provided"));
let _ = fs::remove_dir_all(&cert_dir);
}
#[test]
fn certificate_validate_fails_with_unspecified_ip() {
let cert_dir = make_temp_dir("nxmesh-master-cert-unspecified-ip");
let cert = CertificateSettings {
cert_dir: cert_dir.to_string_lossy().to_string(),
san_dns: Vec::new(),
san_ip: vec![IpAddr::V4(Ipv4Addr::UNSPECIFIED)],
cert_path: None,
key_path: None,
};
let result = cert.validate();
assert!(result.is_err());
let msg = result.err().unwrap_or_default();
assert!(msg.contains("SAN IP entries cannot be unspecified"));
let _ = fs::remove_dir_all(&cert_dir);
}
#[test]
fn certificate_validate_fails_without_any_san_entries() {
let cert_dir = make_temp_dir("nxmesh-master-cert-no-san");
let cert = CertificateSettings {
cert_dir: cert_dir.to_string_lossy().to_string(),
san_dns: Vec::new(),
san_ip: Vec::new(),
cert_path: None,
key_path: None,
};
let result = cert.validate();
assert!(result.is_err());
let msg = result.err().unwrap_or_default();
assert!(msg.contains("At least one SAN entry"));
let _ = fs::remove_dir_all(&cert_dir);
}
#[test]
fn database_validate_fails_for_zero_max_connections() {
let db = DatabaseSettings {
url: "postgres://localhost/db".to_string(),
max_connections: Some(0),
};
let result = db.validate();
assert!(result.is_err());
let msg = result.err().unwrap_or_default();
assert!(msg.contains("Max database connections must be greater than 0"));
}
#[test]
fn auth_validate_fails_for_empty_secret() {
let auth = AuthSettings {
jwt_secret: "".to_string(),
jwt_expiration_hours: 24,
};
let result = auth.validate();
assert!(result.is_err());
let msg = result.err().unwrap_or_default();
assert!(msg.contains("JWT secret cannot be empty"));
}
#[test]
fn server_validate_fails_for_zero_port() {
let cert_dir = make_temp_dir("nxmesh-master-server-validate");
let san = Ia5String::try_from("localhost".to_string());
assert!(san.is_ok());
let san = san.unwrap_or_else(|_| unreachable!());
let server = ServerSettings {
bind_address: "0.0.0.0".to_string(),
port: 0,
certificate: CertificateSettings {
cert_dir: cert_dir.to_string_lossy().to_string(),
san_dns: vec![san],
san_ip: Vec::new(),
cert_path: None,
key_path: None,
},
cors: None,
};
let result = server.validate();
assert!(result.is_err());
let msg = result.err().unwrap_or_default();
assert!(msg.contains("Server port must be greater than 0"));
let _ = fs::remove_dir_all(&cert_dir);
}
#[test]
fn level_filter_round_trip_serialization() {
#[derive(Serialize, Deserialize)]
struct Wrapper {
#[serde(
deserialize_with = "deserialize_level_filter",
serialize_with = "serialize_level_filter"
)]
level: LevelFilter,
}
let data = Wrapper {
level: LevelFilter::DEBUG,
};
let encoded = serde_json::to_string(&data);
assert!(encoded.is_ok());
let encoded = encoded.unwrap_or_default();
assert!(encoded.to_lowercase().contains("debug"));
let decoded: Result<Wrapper, _> = serde_json::from_str(&encoded);
assert!(decoded.is_ok());
let decoded = decoded.unwrap_or(Wrapper {
level: LevelFilter::ERROR,
});
assert_eq!(decoded.level, LevelFilter::DEBUG);
}
#[test]
fn ia5string_vec_round_trip_serialization() {
#[derive(Serialize, Deserialize)]
struct Wrapper {
#[serde(
deserialize_with = "deserialize_ia5string_vec",
serialize_with = "serialize_ia5string_vec"
)]
san_dns: Vec<Ia5String>,
}
let first = Ia5String::try_from("localhost".to_string());
assert!(first.is_ok());
let second = Ia5String::try_from("example.com".to_string());
assert!(second.is_ok());
let first = first.unwrap_or_else(|_| unreachable!());
let second = second.unwrap_or_else(|_| unreachable!());
let data = Wrapper {
san_dns: vec![first, second],
};
let encoded = serde_json::to_string(&data);
assert!(encoded.is_ok());
let encoded = encoded.unwrap_or_default();
assert!(encoded.contains("localhost"));
assert!(encoded.contains("example.com"));
let decoded: Result<Wrapper, _> = serde_json::from_str(&encoded);
assert!(decoded.is_ok());
let decoded = decoded.unwrap_or(Wrapper {
san_dns: Vec::new(),
});
assert_eq!(decoded.san_dns.len(), 2);
}
}

View File

@@ -0,0 +1,58 @@
use serde::{Deserialize, Serialize};
use crate::config::settings::{Validate, ValidationError};
/// Authentication settings
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuthSettings {
pub jwt_secret: String,
#[serde(default = "default_jwt_expiration_hours")]
pub jwt_expiration_hours: u64,
}
impl Validate for AuthSettings {
fn validate(&self) -> Result<(), ValidationError> {
if self.jwt_secret.is_empty() {
return Err("JWT secret cannot be empty".into());
}
if self.jwt_expiration_hours == 0 {
return Err("JWT expiration hours must be greater than 0".into());
}
Ok(())
}
}
fn default_jwt_expiration_hours() -> u64 {
24
}
#[cfg(test)]
mod tests {
use std::{
fs,
net::{IpAddr, Ipv4Addr},
path::PathBuf,
time::{SystemTime, UNIX_EPOCH},
};
use super::*;
#[test]
fn test_esnure_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<AuthSettings>();
}
#[test]
fn auth_validate_fails_for_empty_secret() {
let auth = AuthSettings {
jwt_secret: "".to_string(),
jwt_expiration_hours: 24,
};
let result = auth.validate();
assert!(result.is_err());
let msg = result.err().unwrap_or_default();
assert!(msg.contains("JWT secret cannot be empty"));
}
}

View File

@@ -0,0 +1,276 @@
use std::net::IpAddr;
use rcgen::string::Ia5String;
use serde::{Deserialize, Deserializer, Serialize};
use crate::config::settings::{Validate, ValidationError};
/// TLS certificate settings
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct CertificateSettings {
#[serde(default = "default_cert_folder")]
pub cert_dir: String,
#[serde(
default,
serialize_with = "serialize_ia5string_vec",
deserialize_with = "deserialize_ia5string_vec"
)]
pub san_dns: Vec<Ia5String>,
#[serde(default)]
pub san_ip: Vec<IpAddr>,
#[serde(default)]
pub cert_path: Option<String>,
#[serde(default)]
pub key_path: Option<String>,
}
impl CertificateSettings {
pub fn cert_path(&self) -> Option<String> {
self.cert_path
.as_ref()
.map(|p| format!("{}/{}", self.cert_dir, p))
}
pub fn key_path(&self) -> Option<String> {
self.key_path
.as_ref()
.map(|p| format!("{}/{}", self.cert_dir, p))
}
}
impl Validate for CertificateSettings {
fn validate(&self) -> Result<(), ValidationError> {
let base_path = std::path::Path::new(&self.cert_dir);
if !base_path.exists() {
// create the cert directory if it doesn't exist
std::fs::create_dir_all(base_path).map_err(|e| {
format!(
"Failed to create certificate directory {:?}: {}",
base_path, e
)
})?;
}
let cert_path = self.cert_path.as_ref().map(|p| base_path.join(p));
let key_path = self.key_path.as_ref().map(|p| base_path.join(p));
if (cert_path.is_some() && key_path.is_none())
|| (cert_path.is_none() && key_path.is_some())
{
return Err("Both certificate and key paths must be provided for TLS".into());
}
if let (Some(cert_path), Some(key_path)) = (&cert_path, &key_path) {
if !std::path::Path::new(cert_path).exists() {
return Err(format!("Certificate file not found: {:?}", cert_path));
}
if !std::path::Path::new(key_path).exists() {
return Err(format!("Key file not found: {:?}", key_path));
}
}
// validate for SAN entries - must be valid DNS names or IP addresses
for dns in &self.san_dns {
if dns.to_string().is_empty() {
return Err("SAN DNS entries cannot be empty".into());
}
}
for ip in &self.san_ip {
if ip.is_unspecified() {
return Err("SAN IP entries cannot be unspecified".into());
}
}
// require at least one SAN entry for the generated certificate
if self.san_dns.is_empty() && self.san_ip.is_empty() {
return Err(
"At least one SAN entry (DNS or IP) must be provided for the certificate".into(),
);
}
Ok(())
}
}
fn default_cert_folder() -> String {
"./certs".into()
}
fn deserialize_ia5string_vec<'de, D>(deserializer: D) -> Result<Vec<Ia5String>, D::Error>
where
D: Deserializer<'de>,
{
let vec = Vec::<String>::deserialize(deserializer)?;
vec.into_iter()
.map(|s| Ia5String::try_from(s).map_err(serde::de::Error::custom))
.collect()
}
fn serialize_ia5string_vec<S>(vec: &Vec<Ia5String>, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let string_vec: Vec<String> = vec.iter().map(|ia5| ia5.to_string()).collect();
string_vec.serialize(serializer)
}
#[cfg(test)]
mod tests {
use std::{
fs,
net::{IpAddr, Ipv4Addr},
path::PathBuf,
time::{SystemTime, UNIX_EPOCH},
};
use super::*;
#[test]
fn test_esnure_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<CertificateSettings>();
}
fn make_temp_dir(prefix: &str) -> PathBuf {
let ts = SystemTime::now().duration_since(UNIX_EPOCH);
assert!(ts.is_ok());
let ts = ts.unwrap_or_default();
let path = std::env::temp_dir().join(format!(
"{}_{}_{}",
prefix,
std::process::id(),
ts.as_nanos()
));
let created = fs::create_dir_all(&path);
assert!(created.is_ok());
path
}
#[test]
fn certificate_paths_include_cert_dir() {
let cert = CertificateSettings {
cert_dir: "./certs".to_string(),
san_dns: Vec::new(),
san_ip: Vec::new(),
cert_path: Some("server.crt".to_string()),
key_path: Some("server.key".to_string()),
};
assert_eq!(cert.cert_path(), Some("./certs/server.crt".to_string()));
assert_eq!(cert.key_path(), Some("./certs/server.key".to_string()));
}
#[test]
fn certificate_validate_creates_directory_when_missing() {
let cert_dir = make_temp_dir("nxmesh-master-cert-create").join("nested");
let san = Ia5String::try_from("localhost".to_string());
assert!(san.is_ok());
let san = san.unwrap_or_else(|_| unreachable!());
let cert = CertificateSettings {
cert_dir: cert_dir.to_string_lossy().to_string(),
san_dns: vec![san],
san_ip: Vec::new(),
cert_path: None,
key_path: None,
};
let result = cert.validate();
assert!(result.is_ok());
assert!(cert_dir.exists());
let _ = fs::remove_dir_all(cert_dir.parent().unwrap_or(&cert_dir));
}
#[test]
fn certificate_validate_fails_when_only_cert_path_is_set() {
let cert_dir = make_temp_dir("nxmesh-master-cert-partial");
let san = Ia5String::try_from("localhost".to_string());
assert!(san.is_ok());
let san = san.unwrap_or_else(|_| unreachable!());
let cert = CertificateSettings {
cert_dir: cert_dir.to_string_lossy().to_string(),
san_dns: vec![san],
san_ip: Vec::new(),
cert_path: Some("server.crt".to_string()),
key_path: None,
};
let result = cert.validate();
assert!(result.is_err());
let msg = result.err().unwrap_or_default();
assert!(msg.contains("Both certificate and key paths must be provided"));
let _ = fs::remove_dir_all(&cert_dir);
}
#[test]
fn certificate_validate_fails_with_unspecified_ip() {
let cert_dir = make_temp_dir("nxmesh-master-cert-unspecified-ip");
let cert = CertificateSettings {
cert_dir: cert_dir.to_string_lossy().to_string(),
san_dns: Vec::new(),
san_ip: vec![IpAddr::V4(Ipv4Addr::UNSPECIFIED)],
cert_path: None,
key_path: None,
};
let result = cert.validate();
assert!(result.is_err());
let msg = result.err().unwrap_or_default();
assert!(msg.contains("SAN IP entries cannot be unspecified"));
let _ = fs::remove_dir_all(&cert_dir);
}
#[test]
fn certificate_validate_fails_without_any_san_entries() {
let cert_dir = make_temp_dir("nxmesh-master-cert-no-san");
let cert = CertificateSettings {
cert_dir: cert_dir.to_string_lossy().to_string(),
san_dns: Vec::new(),
san_ip: Vec::new(),
cert_path: None,
key_path: None,
};
let result = cert.validate();
assert!(result.is_err());
let msg = result.err().unwrap_or_default();
assert!(msg.contains("At least one SAN entry"));
let _ = fs::remove_dir_all(&cert_dir);
}
#[test]
fn ia5string_vec_round_trip_serialization() {
#[derive(Serialize, Deserialize)]
struct Wrapper {
#[serde(
deserialize_with = "deserialize_ia5string_vec",
serialize_with = "serialize_ia5string_vec"
)]
san_dns: Vec<Ia5String>,
}
let first = Ia5String::try_from("localhost".to_string());
assert!(first.is_ok());
let second = Ia5String::try_from("example.com".to_string());
assert!(second.is_ok());
let first = first.unwrap_or_else(|_| unreachable!());
let second = second.unwrap_or_else(|_| unreachable!());
let data = Wrapper {
san_dns: vec![first, second],
};
let encoded = serde_json::to_string(&data);
assert!(encoded.is_ok());
let encoded = encoded.unwrap_or_default();
assert!(encoded.contains("localhost"));
assert!(encoded.contains("example.com"));
let decoded: Result<Wrapper, _> = serde_json::from_str(&encoded);
assert!(decoded.is_ok());
let decoded = decoded.unwrap_or(Wrapper {
san_dns: Vec::new(),
});
assert_eq!(decoded.san_dns.len(), 2);
}
}

View File

@@ -0,0 +1,34 @@
use serde::{Deserialize, Serialize};
use crate::config::settings::{Validate, ValidationError};
/// CORS settings
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct CorsSettings {
#[serde(default)]
pub allowed_origins: Vec<String>,
#[serde(default)]
pub allowed_methods: Vec<String>,
#[serde(default)]
pub allowed_headers: Vec<String>,
#[serde(default)]
pub allow_credentials: bool,
}
impl Validate for CorsSettings {
fn validate(&self) -> Result<(), ValidationError> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_esnure_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<CorsSettings>();
}
}

View File

@@ -0,0 +1,48 @@
use serde::{Deserialize, Serialize};
use crate::config::settings::{Validate, ValidationError};
/// Database connection settings
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DatabaseSettings {
pub url: String,
pub max_connections: Option<u32>,
}
impl Validate for DatabaseSettings {
fn validate(&self) -> Result<(), ValidationError> {
if self.url.is_empty() {
return Err("Database URL cannot be empty".into());
}
if let Some(max_connections) = self.max_connections
&& max_connections == 0
{
return Err("Max database connections must be greater than 0".into());
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_esnure_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<DatabaseSettings>();
}
#[test]
fn database_validate_fails_for_zero_max_connections() {
let db = DatabaseSettings {
url: "postgres://localhost/db".to_string(),
max_connections: Some(0),
};
let result = db.validate();
assert!(result.is_err());
let msg = result.err().unwrap_or_default();
assert!(msg.contains("Max database connections must be greater than 0"));
}
}

View File

@@ -0,0 +1,53 @@
use serde::{Deserialize, Serialize};
use crate::config::settings::{
Validate, ValidationError, cert::CertificateSettings, cors::CorsSettings,
};
/// gRPC server settings
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GrpcSettings {
#[serde(default = "default_grpc_bind_address")]
pub bind_address: String,
#[serde(default = "default_grpc_port")]
pub port: u16,
#[serde(default)]
pub certificate: CertificateSettings,
#[serde(default)]
pub cors: Option<CorsSettings>,
}
impl Validate for GrpcSettings {
fn validate(&self) -> Result<(), ValidationError> {
if self.bind_address.is_empty() {
return Err("gRPC bind address cannot be empty".into());
}
if self.port == 0 {
return Err("gRPC port must be greater than 0".into());
}
self.certificate.validate()?;
if let Some(cors) = &self.cors {
cors.validate()?;
}
Ok(())
}
}
fn default_grpc_bind_address() -> String {
"0.0.0.0".into()
}
fn default_grpc_port() -> u16 {
50051
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_esnure_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<GrpcSettings>();
}
}

View File

@@ -0,0 +1,81 @@
use std::str::FromStr;
use serde::{Deserialize, Deserializer, Serialize};
use tracing::level_filters::LevelFilter;
/// Logging settings
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogSettings {
#[serde(
deserialize_with = "deserialize_level_filter",
serialize_with = "serialize_level_filter"
)]
pub level: LevelFilter,
}
impl Default for LogSettings {
fn default() -> Self {
Self {
level: default_log_level(),
}
}
}
fn default_log_level() -> LevelFilter {
LevelFilter::INFO
}
fn deserialize_level_filter<'de, D>(deserializer: D) -> Result<LevelFilter, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
LevelFilter::from_str(&s).map_err(serde::de::Error::custom)
}
fn serialize_level_filter<S>(level: &LevelFilter, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&level.to_string())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_esnure_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<LogSettings>();
}
#[test]
fn level_filter_round_trip_serialization() {
#[derive(Serialize, Deserialize)]
struct Wrapper {
#[serde(
deserialize_with = "deserialize_level_filter",
serialize_with = "serialize_level_filter"
)]
level: LevelFilter,
}
let data = Wrapper {
level: LevelFilter::DEBUG,
};
let encoded = serde_json::to_string(&data);
assert!(encoded.is_ok());
let encoded = encoded.unwrap_or_default();
assert!(encoded.to_lowercase().contains("debug"));
let decoded: Result<Wrapper, _> = serde_json::from_str(&encoded);
assert!(decoded.is_ok());
let decoded = decoded.unwrap_or(Wrapper {
level: LevelFilter::ERROR,
});
assert_eq!(decoded.level, LevelFilter::DEBUG);
}
}

View File

@@ -0,0 +1,75 @@
use config::{Config, ConfigError, Environment, File};
use serde::{Deserialize, Serialize};
pub type ValidationError = String;
pub mod auth;
pub mod cert;
pub mod cors;
pub mod database;
pub mod grpc;
pub mod log;
pub mod server;
use auth::AuthSettings;
use database::DatabaseSettings;
use grpc::GrpcSettings;
use log::LogSettings;
use server::ServerSettings;
pub trait Validate {
fn validate(&self) -> Result<(), ValidationError>;
}
/// Master server settings
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Settings {
pub server: ServerSettings,
pub database: DatabaseSettings,
pub grpc: GrpcSettings,
pub auth: AuthSettings,
#[serde(default)]
pub log: LogSettings,
}
impl Validate for Settings {
fn validate(&self) -> Result<(), ValidationError> {
self.server.validate()?;
self.grpc.validate()?;
self.database.validate()?;
self.auth.validate()?;
Ok(())
}
}
impl Settings {
/// Load settings from config files and environment
pub fn load() -> Result<Self, ConfigError> {
let run_mode = std::env::var("RUN_MODE").unwrap_or_else(|_| "development".into());
let settings = Config::builder()
.add_source(File::with_name("config/default").required(false))
.add_source(File::with_name(&format!("config/{}", run_mode)).required(false))
.add_source(File::with_name("config/master/default").required(false))
.add_source(File::with_name(&format!("config/master/{}", run_mode)).required(false))
.add_source(Environment::with_prefix("NXMESH").separator("__"))
.build()?;
let settings: Self = settings.try_deserialize()?;
settings.validate().map_err(ConfigError::Message)?;
Ok(settings)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_esnure_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<Settings>();
}
}

View File

@@ -0,0 +1,103 @@
use serde::{Deserialize, Serialize};
use crate::config::settings::{
Validate, ValidationError, cert::CertificateSettings, cors::CorsSettings,
};
/// HTTP server settings
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServerSettings {
#[serde(default = "default_server_bind_address")]
pub bind_address: String,
#[serde(default = "default_server_port")]
pub port: u16,
#[serde(default)]
pub certificate: CertificateSettings,
#[serde(default)]
pub cors: Option<CorsSettings>,
}
impl Validate for ServerSettings {
fn validate(&self) -> Result<(), ValidationError> {
if self.bind_address.is_empty() {
return Err("Server bind address cannot be empty".into());
}
if self.port == 0 {
return Err("Server port must be greater than 0".into());
}
self.certificate.validate()?;
if let Some(cors) = &self.cors {
cors.validate()?;
}
Ok(())
}
}
fn default_server_bind_address() -> String {
"0.0.0.0".into()
}
fn default_server_port() -> u16 {
8080
}
#[cfg(test)]
mod tests {
use std::{
fs,
path::PathBuf,
time::{SystemTime, UNIX_EPOCH},
};
use rcgen::string::Ia5String;
use super::*;
#[test]
fn test_esnure_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<ServerSettings>();
}
fn make_temp_dir(prefix: &str) -> PathBuf {
let ts = SystemTime::now().duration_since(UNIX_EPOCH);
assert!(ts.is_ok());
let ts = ts.unwrap_or_default();
let path = std::env::temp_dir().join(format!(
"{}_{}_{}",
prefix,
std::process::id(),
ts.as_nanos()
));
let created = fs::create_dir_all(&path);
assert!(created.is_ok());
path
}
#[test]
fn server_validate_fails_for_zero_port() {
let cert_dir = make_temp_dir("nxmesh-master-server-validate");
let san = Ia5String::try_from("localhost".to_string());
assert!(san.is_ok());
let san = san.unwrap_or_else(|_| unreachable!());
let server = ServerSettings {
bind_address: "0.0.0.0".to_string(),
port: 0,
certificate: CertificateSettings {
cert_dir: cert_dir.to_string_lossy().to_string(),
san_dns: vec![san],
san_ip: Vec::new(),
cert_path: None,
key_path: None,
},
cors: None,
};
let result = server.validate();
assert!(result.is_err());
let msg = result.err().unwrap_or_default();
assert!(msg.contains("Server port must be greater than 0"));
let _ = fs::remove_dir_all(&cert_dir);
}
}

View File

@@ -37,7 +37,11 @@ pub async fn get_fallback_handler() -> Result<axum::response::Html<Vec<u8>>, axu
}
fn get_index_html() -> Option<Vec<u8>> {
FrontendAssets::get(INDEX_HTML).map(|asset| asset.data.as_ref().to_owned())
// Try root index.html first, then fall back to client/index.html when assets
// are packaged under the `client/` subfolder.
FrontendAssets::get(INDEX_HTML)
.or_else(|| FrontendAssets::get(&format!("client/{}", INDEX_HTML)))
.map(|asset| asset.data.as_ref().to_owned())
}
async fn get_file_handler(
@@ -49,7 +53,10 @@ async fn get_file_handler(
path
};
match FrontendAssets::get(&file_path) {
// Try direct lookup first, then fallback to the `client/` subfolder.
match FrontendAssets::get(&file_path)
.or_else(|| FrontendAssets::get(&format!("client/{}", file_path)))
{
Some(asset) => {
let content_type = mime_guess::from_path(&file_path).first_or_octet_stream();
let response = axum::response::Response::builder()

View File

@@ -1,8 +1,6 @@
use nxmesh_proto::{
Ack, AgentMessage, HealthReport, MasterMessage, MetricsBatch,
agent_service_server::AgentService,
};
use tracing::warn;
use nxmesh_proto::{AgentMessage, MasterMessage, agent_service_server::AgentService};
pub mod repo;
#[derive(Debug, Default)]
pub struct AgentServerService {}
@@ -25,30 +23,13 @@ impl AgentService for AgentServerService {
todo!()
}
#[doc = " ReportHealth sends a health report to the master"]
#[allow(
mismatched_lifetime_syntaxes,
clippy::type_complexity,
clippy::type_repetition_in_bounds
)]
async fn report_health(
async fn connection_test(
&self,
request: tonic::Request<HealthReport>,
) -> Result<tonic::Response<Ack>, tonic::Status> {
warn!("Received health report: {:?}", request.get_ref());
todo!()
}
#[doc = " ReportMetrics sends metrics batch to the master"]
#[allow(
mismatched_lifetime_syntaxes,
clippy::type_complexity,
clippy::type_repetition_in_bounds
)]
async fn report_metrics(
&self,
request: tonic::Request<MetricsBatch>,
) -> Result<tonic::Response<Ack>, tonic::Status> {
todo!()
_request: tonic::Request<nxmesh_proto::TestRequest>,
) -> Result<tonic::Response<nxmesh_proto::TestResponse>, tonic::Status> {
Ok(tonic::Response::new(nxmesh_proto::TestResponse {
success: true,
error_message: String::new(),
}))
}
}

View File

@@ -1,3 +1,5 @@
agent_id = "agent-id-01"
[grpc]
connection_string = "https://127.0.0.1:8443"

View File

@@ -4,6 +4,7 @@ fn main() -> Result<()> {
tonic_prost_build::configure()
.build_server(true)
.build_client(true)
.protoc_arg("--experimental_allow_proto3_optional")
.compile_protos(&["proto/agent.proto"], &["proto"])?;
Ok(())
}

View File

@@ -11,103 +11,53 @@ service AgentService {
// Stream establishes a persistent connection for real-time communication
rpc Stream(stream AgentMessage) returns (stream MasterMessage);
// ReportHealth sends a health report to the master
rpc ReportHealth(HealthReport) returns (Ack);
rpc ConnectionTest(TestRequest) returns (TestResponse);
}
// ReportMetrics sends metrics batch to the master
rpc ReportMetrics(MetricsBatch) returns (Ack);
message TestRequest {
// no fields needed for test request
}
message TestResponse {
bool success = 1;
string error_message = 2; // if success is false, this field should contain the error message
}
// Messages sent from master to agent
message MasterMessage {
int64 timestamp = 1;
string message_id = 2;
oneof payload {
// requests
ConfigUpdate config_update = 3;
Command command = 4;
}
}
// Messages sent from agent to master
message AgentMessage {
string agent_id = 1;
int64 timestamp = 2;
string message_id = 3;
oneof payload {
RegistrationRequest registration = 3;
HealthReport health = 4;
ConfigStatus config_status = 5;
MetricsBatch metrics = 6;
LogBatch logs = 7;
Event event = 8;
// responses
ConfigUpdateResult config_update_result = 6;
CommandResult command_result = 7;
}
}
// Messages sent from master to agent
message MasterMessage {
int64 timestamp = 1;
oneof payload {
RegistrationResponse registration_response = 2;
ConfigUpdate config_update = 3;
Command command = 4;
Ack ack = 5;
Error error = 6;
}
}
//
//
//
// Registration
message RegistrationRequest {
string hostname = 1;
string ip_address = 2;
string version = 3;
repeated string capabilities = 4;
map<string, string> labels = 5;
DeploymentMode deployment_mode = 6;
}
message RegistrationResponse {
string agent_id = 1;
bool success = 2;
string error_message = 3;
int64 heartbeat_interval_seconds = 4;
}
enum DeploymentMode {
DEPLOYMENT_MODE_UNSPECIFIED = 0;
DOCKER_SIDECAR = 1;
KUBERNETES_SIDECAR = 2;
STANDALONE = 3;
}
// Health Reporting
message HealthReport {
NginxStatus nginx = 1;
SystemMetrics system = 2;
string config_checksum = 3;
int64 config_version = 4;
repeated Alert alerts = 5;
}
message NginxStatus {
bool is_running = 1;
uint32 pid = 2;
uint64 uptime_seconds = 3;
uint32 active_connections = 4;
uint64 total_requests = 5;
float requests_per_second = 6;
}
message SystemMetrics {
float cpu_percent = 1;
uint64 memory_used_bytes = 2;
uint64 memory_total_bytes = 3;
uint64 disk_used_bytes = 4;
uint64 disk_total_bytes = 5;
float load_average_1m = 6;
}
message Alert {
string id = 1;
string severity = 2; // info, warning, error, critical
string message = 3;
int64 timestamp = 4;
}
// Configuration
// ConfigUpdate represents a request from master to agent to update the configuration
message ConfigUpdate {
string config_id = 1;
int64 version = 2;
repeated ConfigContent configs = 3;
repeated CertificateContent certificates = 4;
string config_id = 1; // unique identifier for this config update
string version = 2;
// The root config is the main nginx.conf file, this file will be used as the entry point for nginx configuration. The content of this file should include references to other config files if needed. The agent will write this root config to the nginx config directory and use it to reload nginx.
ConfigContent root_config = 3;
// The other config files that are referenced by the root config, e.g. "site.conf", "private/example.com.conf". If the root config does not reference any other config files, this field can be left empty. The agent will write these config files to the nginx config directory and ensure they are included in the root config.
repeated ConfigContent configs = 4;
}
message ConfigContent {
@@ -116,113 +66,56 @@ message ConfigContent {
string content = 2;
}
message CertificateContent {
string id = 1;
// relative path from other config files, e.g. "certs/example.com.pem"
string path = 2;
string certificate_pem = 3;
string private_key_pem = 4;
message ConfigUpdateResult {
string config_id = 1; // should match the config_id in ConfigUpdate
string version = 2;
bool success = 3;
optional ConfigUpdateError error_message = 4; // if success is false, this field should contain the error message
}
message ConfigStatus {
string config_id = 1;
int64 version = 2;
ConfigApplyStatus status = 3;
string error_message = 4;
int64 applied_at = 5;
enum ConfigUpdateError {
UNKNOWN = 0;
INVALID_CONFIG = 1; // the config content is invalid, e.g. syntax error
WRITE_FAILED = 2; // failed to write the config file to disk
RELOAD_FAILED = 3; // failed to reload nginx with the new config
}
enum ConfigApplyStatus {
CONFIG_APPLY_STATUS_UNSPECIFIED = 0;
PENDING = 1;
VALIDATING = 2;
APPLYING = 3;
SUCCESS = 4;
FAILED = 5;
ROLLED_BACK = 6;
}
//
//
//
// Metrics
message MetricsBatch {
int64 timestamp = 1;
repeated Metric metrics = 2;
}
// TODO: allow setting the default fallback and the corresponding default nginx root config when nginx reload fails to re-use old config, "use default config", "stop nginx".
message Metric {
string name = 1;
double value = 2;
int64 timestamp = 3;
map<string, string> labels = 4;
MetricType type = 5;
}
enum MetricType {
METRIC_TYPE_UNSPECIFIED = 0;
GAUGE = 1;
COUNTER = 2;
HISTOGRAM = 3;
}
// Logs
message LogBatch {
repeated LogEntry entries = 1;
}
message LogEntry {
int64 timestamp = 1;
string level = 2;
string message = 3;
map<string, string> fields = 4;
}
// Commands
// Command represents a request from master to agent to execute a command, e.g. "reload", "test"
message Command {
string command_id = 1;
oneof command {
ReloadCommand reload = 2;
RestartCommand restart = 3;
StopCommand stop = 4;
GetStatusCommand get_status = 5;
ValidateConfigCommand validate_config = 6;
ReloadCommand reload = 1;
TestCommand test = 2;
}
}
message ReloadCommand {
bool graceful = 1;
// no additional fields needed for reload command
}
message RestartCommand {
bool force = 1;
message TestCommand {
// no additional fields needed for test command
}
message StopCommand {
bool graceful = 1;
uint32 timeout_seconds = 2;
message CommandResult {
oneof result {
ReloadResult reload_result = 1;
TestResult test_result = 2;
}
}
message GetStatusCommand {}
message ValidateConfigCommand {
string config_content = 1;
message ReloadResult {
bool success = 1;
string error_message = 2; // if success is false, this field should contain the error message
}
// Events
message Event {
string event_id = 1;
string event_type = 2;
int64 timestamp = 3;
map<string, string> data = 4;
message TestResult {
bool success = 1;
string error_message = 2; // if success is false, this field should contain the error message
}
// Common messages
message Ack {
string message_id = 1;
bool success = 2;
string error_message = 3;
}
message Error {
string code = 1;
string message = 2;
map<string, string> details = 3;
}

View File

@@ -38,7 +38,7 @@ setup-frontend:
act *ARGS:
# run act with custom secret-file
@echo "🎬 Running act with custom secrets file..."
act --env-file .github/.env --secret-file .github/.secrets.env --var-file .github/.var.env --network host {{ ARGS }}
act --env-file .github/.env --secret-file .github/.secrets.env --var-file .github/.var.env --network host --artifact-server-path ./.act/.artifacts {{ ARGS }}
# Start all services for development
dev: