feat: Implement SSH master connector and CLI for certificate management
This commit is contained in:
@@ -29,9 +29,6 @@ tracing-subscriber.workspace = true
|
|||||||
# gRPC
|
# gRPC
|
||||||
tonic.workspace = true
|
tonic.workspace = true
|
||||||
|
|
||||||
# HTTP
|
|
||||||
reqwest.workspace = true
|
|
||||||
|
|
||||||
# Async
|
# Async
|
||||||
async-trait.workspace = true
|
async-trait.workspace = true
|
||||||
futures.workspace = true
|
futures.workspace = true
|
||||||
@@ -54,6 +51,12 @@ uuid.workspace = true
|
|||||||
# Hostname
|
# Hostname
|
||||||
hostname = "0.4"
|
hostname = "0.4"
|
||||||
|
|
||||||
|
# Certificates
|
||||||
|
zip = { workspace = true }
|
||||||
|
|
||||||
|
# CLI
|
||||||
|
clap = { workspace = true, features = ["derive"] }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio-test.workspace = true
|
tokio-test.workspace = true
|
||||||
mockall.workspace = true
|
mockall.workspace = true
|
||||||
|
|||||||
81
apps/nxmesh-agent/src/cli/mod.rs
Normal file
81
apps/nxmesh-agent/src/cli/mod.rs
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
use clap::{Parser, Subcommand};
|
||||||
|
|
||||||
|
#[derive(Parser)]
|
||||||
|
#[command(version, about, long_about = None)]
|
||||||
|
pub struct Cli {
|
||||||
|
/// Start the agent server
|
||||||
|
#[arg(short, long, group = "mode")]
|
||||||
|
pub serve: bool,
|
||||||
|
|
||||||
|
#[command(subcommand)]
|
||||||
|
pub command: Option<Commands>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Subcommand)]
|
||||||
|
pub enum Commands {
|
||||||
|
#[command(about = "Import certificates for agent from zip file or separate cert and key files")]
|
||||||
|
ImportCerts {
|
||||||
|
// Zip file input, mutually exclusive with separate cert and key file inputs
|
||||||
|
/// Zip file containing ca.pem cert.pem and key.pem
|
||||||
|
#[arg(value_name = "ZIP_FILE", group = "input_source")]
|
||||||
|
zip: Option<String>,
|
||||||
|
/// Certificate name in zip file, required if using zip input
|
||||||
|
#[arg(
|
||||||
|
long,
|
||||||
|
group = "input_source",
|
||||||
|
requires = "zip",
|
||||||
|
default_value = "cert.pem",
|
||||||
|
value_name = "CERT_NAME"
|
||||||
|
)]
|
||||||
|
cert_name: Option<String>,
|
||||||
|
/// Key name in zip file, required if using zip input
|
||||||
|
#[arg(
|
||||||
|
long,
|
||||||
|
group = "input_source",
|
||||||
|
requires = "zip",
|
||||||
|
default_value = "key.pem",
|
||||||
|
value_name = "KEY_NAME"
|
||||||
|
)]
|
||||||
|
key_name: Option<String>,
|
||||||
|
/// CA certificate name in zip file, required if using zip input
|
||||||
|
#[arg(
|
||||||
|
long,
|
||||||
|
group = "input_source",
|
||||||
|
requires = "zip",
|
||||||
|
default_value = "ca.pem",
|
||||||
|
value_name = "CA_NAME"
|
||||||
|
)]
|
||||||
|
ca_name: Option<String>,
|
||||||
|
|
||||||
|
// Separate cert and key file inputs, required if not using zip input
|
||||||
|
/// Certificate file path
|
||||||
|
#[arg(
|
||||||
|
long,
|
||||||
|
group = "input_source",
|
||||||
|
requires = "key",
|
||||||
|
conflicts_with = "zip",
|
||||||
|
value_name = "CERT_FILE"
|
||||||
|
)]
|
||||||
|
cert: Option<String>,
|
||||||
|
|
||||||
|
/// Key file path
|
||||||
|
#[arg(
|
||||||
|
long,
|
||||||
|
group = "input_source",
|
||||||
|
requires = "cert",
|
||||||
|
conflicts_with = "zip",
|
||||||
|
value_name = "KEY_FILE"
|
||||||
|
)]
|
||||||
|
key: Option<String>,
|
||||||
|
|
||||||
|
/// Master CA certificate file path for verifying master identity, optional if the CA certificate is already trusted by the system
|
||||||
|
/// This is required if the master server uses a self-signed certificate that is not trusted by the system
|
||||||
|
#[arg(
|
||||||
|
long,
|
||||||
|
group = "input_source",
|
||||||
|
conflicts_with = "zip",
|
||||||
|
value_name = "CA_CERT_FILE"
|
||||||
|
)]
|
||||||
|
ca_cert: Option<String>,
|
||||||
|
},
|
||||||
|
}
|
||||||
1
apps/nxmesh-agent/src/config/mod.rs
Normal file
1
apps/nxmesh-agent/src/config/mod.rs
Normal file
@@ -0,0 +1 @@
|
|||||||
|
pub mod settings;
|
||||||
333
apps/nxmesh-agent/src/config/settings.rs
Normal file
333
apps/nxmesh-agent/src/config/settings.rs
Normal file
@@ -0,0 +1,333 @@
|
|||||||
|
use config::{Config, ConfigError, Environment, File};
|
||||||
|
use serde::{Deserialize, Deserializer, Serialize};
|
||||||
|
use std::{os::unix::fs::PermissionsExt, str::FromStr};
|
||||||
|
use tracing::level_filters::LevelFilter;
|
||||||
|
|
||||||
|
const NGINX_BINARY_PATH_TEMPLATE: &str = "{{nginx_binary_path}}";
|
||||||
|
const NGINX_DEFAULT_BINARY: &str = "nginx";
|
||||||
|
|
||||||
|
type ValidationError = String;
|
||||||
|
|
||||||
|
trait Validate {
|
||||||
|
fn validate(&self) -> Result<(), ValidationError>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Agent settings
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct Settings {
|
||||||
|
pub grpc: GrpcSettings,
|
||||||
|
#[serde(default)]
|
||||||
|
pub log: LogSettings,
|
||||||
|
pub nginx: Option<NginxSettings>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// gRPC client settings
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct GrpcSettings {
|
||||||
|
pub connection_string: String,
|
||||||
|
pub m_auth: MAuthSettings,
|
||||||
|
#[serde(default)]
|
||||||
|
pub cors: Option<CorsSettings>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub enum MAuthSettings {
|
||||||
|
Tls(TLSSettings),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// TLS certificate settings
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub enum TLSSettings {
|
||||||
|
RawPath {
|
||||||
|
ca_path: String,
|
||||||
|
cert_path: String,
|
||||||
|
key_path: String,
|
||||||
|
},
|
||||||
|
ZipPath {
|
||||||
|
cert_zip_path: String,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
/// CORS settings
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||||
|
pub struct CorsSettings {
|
||||||
|
#[serde(default)]
|
||||||
|
pub allowed_origins: Vec<String>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub allowed_methods: Vec<String>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub allowed_headers: Vec<String>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub allow_credentials: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Logging settings
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct LogSettings {
|
||||||
|
#[serde(
|
||||||
|
deserialize_with = "deserialize_level_filter",
|
||||||
|
serialize_with = "serialize_level_filter"
|
||||||
|
)]
|
||||||
|
pub level: LevelFilter,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for LogSettings {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
level: default_log_level(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct NginxSettings {
|
||||||
|
#[serde(default = "default_nginx_config_path")]
|
||||||
|
pub nginx_config_path: String,
|
||||||
|
// #[serde(default = "default_nginx_binary_path")]
|
||||||
|
#[serde(default)]
|
||||||
|
pub nginx_binary_path: Option<String>,
|
||||||
|
// commands
|
||||||
|
#[serde(default = "default_nginx_reload_command")]
|
||||||
|
pub override_nginx_reload_command: Vec<String>,
|
||||||
|
#[serde(default = "default_nginx_test_command")]
|
||||||
|
pub override_nginx_test_command: Vec<String>,
|
||||||
|
// timeouts
|
||||||
|
#[serde(default = "default_nginx_reload_timeout_seconds")]
|
||||||
|
pub nginx_reload_timeout_seconds: u64,
|
||||||
|
#[serde(default = "default_nginx_test_timeout_seconds")]
|
||||||
|
pub nginx_test_timeout_seconds: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Validate for Settings {
|
||||||
|
fn validate(&self) -> Result<(), ValidationError> {
|
||||||
|
self.grpc.validate()?;
|
||||||
|
if let Some(nginx) = &self.nginx {
|
||||||
|
nginx.validate()?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Settings {
|
||||||
|
/// Load settings from config files and environment
|
||||||
|
pub fn load() -> Result<Self, ConfigError> {
|
||||||
|
let run_mode = std::env::var("RUN_MODE").unwrap_or_else(|_| "development".into());
|
||||||
|
|
||||||
|
let settings = Config::builder()
|
||||||
|
.add_source(File::with_name("config/default").required(false))
|
||||||
|
.add_source(File::with_name(&format!("config/{}", run_mode)).required(false))
|
||||||
|
.add_source(File::with_name("config/agent/default").required(false))
|
||||||
|
.add_source(File::with_name(&format!("config/agent/{}", run_mode)).required(false))
|
||||||
|
.add_source(Environment::with_prefix("NXMESH").separator("__"))
|
||||||
|
.build()?;
|
||||||
|
|
||||||
|
let mut settings: Self = settings.try_deserialize()?;
|
||||||
|
|
||||||
|
settings.validate().map_err(ConfigError::Message)?;
|
||||||
|
|
||||||
|
if let Some(nginx) = &mut settings.nginx {
|
||||||
|
nginx.validate().map_err(ConfigError::Message)?;
|
||||||
|
|
||||||
|
// replace binary path template in commands with actual binary path, if the template is present
|
||||||
|
nginx
|
||||||
|
.override_nginx_reload_command
|
||||||
|
.iter_mut()
|
||||||
|
.for_each(|cmd| {
|
||||||
|
*cmd = cmd.replace(
|
||||||
|
NGINX_BINARY_PATH_TEMPLATE,
|
||||||
|
&nginx
|
||||||
|
.nginx_binary_path
|
||||||
|
.clone()
|
||||||
|
.unwrap_or_else(|| NGINX_DEFAULT_BINARY.into()),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
nginx
|
||||||
|
.override_nginx_test_command
|
||||||
|
.iter_mut()
|
||||||
|
.for_each(|cmd| {
|
||||||
|
*cmd = cmd.replace(
|
||||||
|
NGINX_BINARY_PATH_TEMPLATE,
|
||||||
|
&nginx
|
||||||
|
.nginx_binary_path
|
||||||
|
.clone()
|
||||||
|
.unwrap_or_else(|| NGINX_DEFAULT_BINARY.into()),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(settings)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Validate for GrpcSettings {
|
||||||
|
fn validate(&self) -> Result<(), ValidationError> {
|
||||||
|
if self.connection_string.is_empty() {
|
||||||
|
return Err("gRPC connection string cannot be empty".into());
|
||||||
|
}
|
||||||
|
self.m_auth.validate()?;
|
||||||
|
if let Some(cors) = &self.cors {
|
||||||
|
cors.validate()?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Validate for MAuthSettings {
|
||||||
|
fn validate(&self) -> Result<(), ValidationError> {
|
||||||
|
match self {
|
||||||
|
MAuthSettings::Tls(tls_settings) => tls_settings.validate()?,
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Validate for TLSSettings {
|
||||||
|
fn validate(&self) -> Result<(), ValidationError> {
|
||||||
|
match self {
|
||||||
|
TLSSettings::RawPath {
|
||||||
|
ca_path,
|
||||||
|
cert_path,
|
||||||
|
key_path,
|
||||||
|
} => {
|
||||||
|
if !std::path::Path::new(ca_path).exists() {
|
||||||
|
return Err(format!("CA file not found: {}", ca_path));
|
||||||
|
}
|
||||||
|
if !std::path::Path::new(cert_path).exists() {
|
||||||
|
return Err(format!("Certificate file not found: {}", cert_path));
|
||||||
|
}
|
||||||
|
if !std::path::Path::new(key_path).exists() {
|
||||||
|
return Err(format!("Key file not found: {}", key_path));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
TLSSettings::ZipPath { cert_zip_path } => {
|
||||||
|
if !std::path::Path::new(cert_zip_path).exists() {
|
||||||
|
return Err(format!("Certificate zip file not found: {}", cert_zip_path));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Validate for CorsSettings {
|
||||||
|
fn validate(&self) -> Result<(), ValidationError> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Validate for NginxSettings {
|
||||||
|
fn validate(&self) -> Result<(), ValidationError> {
|
||||||
|
match &self.nginx_binary_path {
|
||||||
|
Some(path) if path.is_empty() => {
|
||||||
|
return Err("Nginx binary path cannot be empty".into());
|
||||||
|
}
|
||||||
|
Some(path) if !std::path::Path::new(path).exists() => {
|
||||||
|
return Err(format!("Nginx binary not found: {}", path));
|
||||||
|
}
|
||||||
|
Some(path)
|
||||||
|
if !std::fs::metadata(path)
|
||||||
|
.map_err(|e| format!("Failed to read nginx binary metadata: {}", e))?
|
||||||
|
.permissions()
|
||||||
|
.mode()
|
||||||
|
& 0o111
|
||||||
|
!= 0 =>
|
||||||
|
{
|
||||||
|
return Err(format!("Nginx binary is not executable: {}", path));
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
if self.nginx_config_path.is_empty() {
|
||||||
|
return Err("Nginx config path cannot be empty".into());
|
||||||
|
}
|
||||||
|
if !std::path::Path::new(&self.nginx_config_path).exists() {
|
||||||
|
return Err(format!(
|
||||||
|
"Nginx config file not found: {}",
|
||||||
|
self.nginx_config_path
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ensure reload and test commands contain the binary path template
|
||||||
|
if !&self
|
||||||
|
.override_nginx_reload_command
|
||||||
|
.join(" ")
|
||||||
|
.contains(NGINX_BINARY_PATH_TEMPLATE)
|
||||||
|
{
|
||||||
|
return Err(format!(
|
||||||
|
"Nginx reload command must contain the binary path template '{}': {}",
|
||||||
|
NGINX_BINARY_PATH_TEMPLATE,
|
||||||
|
self.override_nginx_reload_command.join(" ")
|
||||||
|
));
|
||||||
|
}
|
||||||
|
if !&self
|
||||||
|
.override_nginx_test_command
|
||||||
|
.join(" ")
|
||||||
|
.contains(NGINX_BINARY_PATH_TEMPLATE)
|
||||||
|
{
|
||||||
|
return Err(format!(
|
||||||
|
"Nginx test command must contain the binary path template '{}': {}",
|
||||||
|
NGINX_BINARY_PATH_TEMPLATE,
|
||||||
|
self.override_nginx_test_command.join(" ")
|
||||||
|
));
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_log_level() -> LevelFilter {
|
||||||
|
LevelFilter::INFO
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_nginx_config_path() -> String {
|
||||||
|
"/etc/nginx/nginx.conf".into()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_nginx_reload_command() -> Vec<String> {
|
||||||
|
vec![
|
||||||
|
NGINX_BINARY_PATH_TEMPLATE.to_string(),
|
||||||
|
"-s".to_string(),
|
||||||
|
"reload".to_string(),
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_nginx_test_command() -> Vec<String> {
|
||||||
|
vec![NGINX_BINARY_PATH_TEMPLATE.to_string(), "-t".to_string()]
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_nginx_reload_timeout_seconds() -> u64 {
|
||||||
|
30
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_nginx_test_timeout_seconds() -> u64 {
|
||||||
|
30
|
||||||
|
}
|
||||||
|
|
||||||
|
fn deserialize_level_filter<'de, D>(deserializer: D) -> Result<LevelFilter, D::Error>
|
||||||
|
where
|
||||||
|
D: Deserializer<'de>,
|
||||||
|
{
|
||||||
|
let s = String::deserialize(deserializer)?;
|
||||||
|
LevelFilter::from_str(&s).map_err(serde::de::Error::custom)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn serialize_level_filter<S>(level: &LevelFilter, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: serde::Serializer,
|
||||||
|
{
|
||||||
|
serializer.serialize_str(&level.to_string())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_esnure_send_and_sync() {
|
||||||
|
fn assert_send_sync<T: Send + Sync>() {}
|
||||||
|
assert_send_sync::<Settings>();
|
||||||
|
assert_send_sync::<GrpcSettings>();
|
||||||
|
assert_send_sync::<TLSSettings>();
|
||||||
|
assert_send_sync::<CorsSettings>();
|
||||||
|
assert_send_sync::<LogSettings>();
|
||||||
|
assert_send_sync::<NginxSettings>();
|
||||||
|
}
|
||||||
|
}
|
||||||
40
apps/nxmesh-agent/src/connector/master/mod.rs
Normal file
40
apps/nxmesh-agent/src/connector/master/mod.rs
Normal file
@@ -0,0 +1,40 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
|
pub mod ssh;
|
||||||
|
|
||||||
|
pub type AgentClient = nxmesh_proto::agent_service_client::AgentServiceClient<tonic::transport::Channel>;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
pub trait MasterConnectorTrait: Send + Sync {
|
||||||
|
async fn connect(
|
||||||
|
&mut self,
|
||||||
|
settings: &crate::config::settings::Settings,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
|
||||||
|
fn get_client(&self) -> Arc<Mutex<AgentClient>>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct MasterConnector {
|
||||||
|
connector: Box<dyn MasterConnectorTrait>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MasterConnector {
|
||||||
|
pub fn new(connector: Box<dyn MasterConnectorTrait>) -> Self {
|
||||||
|
Self { connector }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl MasterConnectorTrait for MasterConnector {
|
||||||
|
async fn connect(
|
||||||
|
&mut self,
|
||||||
|
settings: &crate::config::settings::Settings,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
|
self.connector.connect(settings).await
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_client(&self) -> Arc<Mutex<AgentClient>> {
|
||||||
|
self.connector.get_client()
|
||||||
|
}
|
||||||
|
}
|
||||||
132
apps/nxmesh-agent/src/connector/master/ssh.rs
Normal file
132
apps/nxmesh-agent/src/connector/master/ssh.rs
Normal file
@@ -0,0 +1,132 @@
|
|||||||
|
use std::{fs::File, io::Read, sync::Arc};
|
||||||
|
|
||||||
|
use tokio::{fs::read, sync::Mutex};
|
||||||
|
|
||||||
|
use nxmesh_proto::agent_service_client::AgentServiceClient;
|
||||||
|
use tonic::transport::{Certificate, ClientTlsConfig, Identity};
|
||||||
|
use tracing::warn;
|
||||||
|
|
||||||
|
use crate::config::settings::{self, MAuthSettings, TLSSettings};
|
||||||
|
|
||||||
|
use super::{AgentClient, MasterConnectorTrait};
|
||||||
|
|
||||||
|
pub struct SshMasterConnector {
|
||||||
|
client: Arc<Mutex<AgentClient>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SshMasterConnector {
|
||||||
|
pub async fn new(
|
||||||
|
settings: crate::config::settings::GrpcSettings,
|
||||||
|
) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
|
||||||
|
let tls_config = Self::generate_tls_config(&settings.m_auth).await?;
|
||||||
|
// Create a gRPC channel
|
||||||
|
let endpoint = tonic::transport::Channel::from_shared(settings.connection_string.clone())
|
||||||
|
.map_err(|e| format!("Failed to create gRPC endpoint: {}", e))?
|
||||||
|
.tls_config(tls_config)
|
||||||
|
.map_err(|e| {
|
||||||
|
format!(
|
||||||
|
"Failed to set TLS config: {}. Ensure TLS settings and certificates are correct.",
|
||||||
|
e
|
||||||
|
)
|
||||||
|
})?
|
||||||
|
.connect_timeout(std::time::Duration::from_secs(5))
|
||||||
|
.timeout(std::time::Duration::from_secs(10))
|
||||||
|
.connect_lazy();
|
||||||
|
|
||||||
|
// Create the gRPC client
|
||||||
|
let client = Arc::new(Mutex::new(AgentServiceClient::new(endpoint)));
|
||||||
|
Ok(Self { client })
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn generate_tls_config(
|
||||||
|
settings: &MAuthSettings,
|
||||||
|
) -> Result<ClientTlsConfig, Box<dyn std::error::Error + Send + Sync>> {
|
||||||
|
let tls_config = match &settings {
|
||||||
|
MAuthSettings::Tls(tls_settings) => {
|
||||||
|
let (ca, cert, key) = match tls_settings {
|
||||||
|
TLSSettings::RawPath {
|
||||||
|
ca_path,
|
||||||
|
cert_path,
|
||||||
|
key_path,
|
||||||
|
} => {
|
||||||
|
// Read the certificate and key from the specified file paths
|
||||||
|
let ca = read(ca_path).await?;
|
||||||
|
let cert = read(cert_path).await?;
|
||||||
|
let key = read(key_path).await?;
|
||||||
|
(ca, cert, key)
|
||||||
|
}
|
||||||
|
TLSSettings::ZipPath { cert_zip_path } => {
|
||||||
|
// Extract the certificate and key from the zip file
|
||||||
|
Self::extract_certificate(cert_zip_path).await?
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// TODO: allow skipping SANs validation if specified in the settings
|
||||||
|
ClientTlsConfig::new()
|
||||||
|
.ca_certificate(Certificate::from_pem(&ca))
|
||||||
|
.identity(Identity::from_pem(&cert, &key))
|
||||||
|
}
|
||||||
|
#[allow(unreachable_patterns)]
|
||||||
|
_ => {
|
||||||
|
return Err("TLS settings are required for SSH connection".into());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(tls_config)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn extract_certificate(
|
||||||
|
cert_zip_path: &str,
|
||||||
|
) -> Result<(Vec<u8>, Vec<u8>, Vec<u8>), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
|
// unzip the file and extract the cert, ca and key
|
||||||
|
let file = File::open(cert_zip_path)?;
|
||||||
|
let mut archive = zip::ZipArchive::new(file)?;
|
||||||
|
let mut cert = Vec::new();
|
||||||
|
let mut key = Vec::new();
|
||||||
|
let mut ca = Vec::new();
|
||||||
|
|
||||||
|
for i in 0..archive.len() {
|
||||||
|
let mut file = archive.by_index(i)?;
|
||||||
|
let outpath = match file.enclosed_name() {
|
||||||
|
Some(path) => path.to_owned(),
|
||||||
|
None => continue,
|
||||||
|
};
|
||||||
|
let file_name = outpath
|
||||||
|
.file_name()
|
||||||
|
.and_then(|n| n.to_str())
|
||||||
|
.unwrap_or_default();
|
||||||
|
if file_name != "cert.pem" && file_name != "key.pem" && file_name != "ca.pem" {
|
||||||
|
warn!("Unexpected file in certificate zip: {}", file_name);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if file_name == "cert.pem" {
|
||||||
|
file.read_to_end(&mut cert)?;
|
||||||
|
} else if file_name == "key.pem" {
|
||||||
|
file.read_to_end(&mut key)?;
|
||||||
|
} else if file_name == "ca.pem" {
|
||||||
|
file.read_to_end(&mut ca)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if cert.is_empty() || key.is_empty() || ca.is_empty() {
|
||||||
|
return Err("Certificate zip must contain cert.pem, key.pem and ca.pem".into());
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok((ca, cert, key))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl MasterConnectorTrait for SshMasterConnector {
|
||||||
|
async fn connect(
|
||||||
|
&mut self,
|
||||||
|
_settings: &crate::config::settings::Settings,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
|
// ensure connection if required
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_client(&self) -> Arc<Mutex<AgentClient>> {
|
||||||
|
self.client.clone()
|
||||||
|
}
|
||||||
|
}
|
||||||
1
apps/nxmesh-agent/src/connector/mod.rs
Normal file
1
apps/nxmesh-agent/src/connector/mod.rs
Normal file
@@ -0,0 +1 @@
|
|||||||
|
pub mod master;
|
||||||
@@ -0,0 +1,93 @@
|
|||||||
|
#![forbid(unsafe_code)]
|
||||||
|
#![deny(clippy::unwrap_used, clippy::panic, clippy::expect_used)]
|
||||||
|
|
||||||
|
use std::process::exit;
|
||||||
|
|
||||||
|
use tracing::{error, info};
|
||||||
|
use tracing_subscriber::{
|
||||||
|
Layer, filter::LevelFilter, fmt, layer::SubscriberExt, registry::Registry, reload,
|
||||||
|
util::SubscriberInitExt,
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::connector::master::{MasterConnector, MasterConnectorTrait, ssh::SshMasterConnector};
|
||||||
|
|
||||||
|
mod cli;
|
||||||
|
mod config;
|
||||||
|
mod connector;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
// install a global subscriber for logging
|
||||||
|
let reload_handle = install_tracing_subscriber();
|
||||||
|
// Load configuration settings
|
||||||
|
let settings = match config::settings::Settings::load() {
|
||||||
|
Ok(s) => s,
|
||||||
|
Err(e) => {
|
||||||
|
error!("Failed to load configuration: {}", e);
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
reload_handle
|
||||||
|
.modify(|filter| *filter = Box::new(settings.log.level))
|
||||||
|
.inspect_err(|e| {
|
||||||
|
error!(
|
||||||
|
"Failed to set log level: {}. Continuing with default level.",
|
||||||
|
e
|
||||||
|
)
|
||||||
|
})
|
||||||
|
// ignore errors here since we can still run with the default log level
|
||||||
|
.ok();
|
||||||
|
|
||||||
|
// print the loaded settings for debugging
|
||||||
|
// info!("Loaded settings: {:#?}", settings);
|
||||||
|
|
||||||
|
info!("Starting NxMesh Agent...");
|
||||||
|
// install grpc client
|
||||||
|
#[expect(clippy::expect_used)]
|
||||||
|
let ssh_connector = SshMasterConnector::new(settings.grpc.clone())
|
||||||
|
.await
|
||||||
|
.inspect_err(|e| {
|
||||||
|
error!("Failed to create SSH Master Connector: {}", e);
|
||||||
|
exit(1);
|
||||||
|
})
|
||||||
|
.expect("Failed to create SSH Master Connector");
|
||||||
|
let mut master_connector = MasterConnector::new(Box::new(ssh_connector));
|
||||||
|
|
||||||
|
if let Err(e) = master_connector.connect(&settings).await {
|
||||||
|
error!("Failed to connect to master: {}", e);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// send a dummy heartbeat to verify the connection is working
|
||||||
|
let client = master_connector.get_client();
|
||||||
|
|
||||||
|
let request = nxmesh_proto::HealthReport {
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
match client.lock().await.report_health(request).await {
|
||||||
|
Ok(_) => info!("Successfully sent health report to master."),
|
||||||
|
Err(e) => {
|
||||||
|
error!("Failed to send health report to master: {}", e);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("Successfully connected to master. Agent is running.");
|
||||||
|
}
|
||||||
|
|
||||||
|
fn install_tracing_subscriber()
|
||||||
|
-> reload::Handle<Box<dyn tracing_subscriber::layer::Layer<Registry> + Send + Sync>, Registry> {
|
||||||
|
let filter = LevelFilter::INFO;
|
||||||
|
let (filter_layer, reload_handle) =
|
||||||
|
reload::Layer::new(Box::new(fmt::layer().with_filter(filter))
|
||||||
|
as Box<dyn tracing_subscriber::layer::Layer<Registry> + Send + Sync>);
|
||||||
|
|
||||||
|
tracing_subscriber::registry()
|
||||||
|
.with(filter_layer)
|
||||||
|
.with(fmt::Layer::default())
|
||||||
|
.init();
|
||||||
|
|
||||||
|
reload_handle
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user