feat: added config update message handling
This commit is contained in:
@@ -59,6 +59,7 @@ zip = { workspace = true }
|
|||||||
clap = { workspace = true, features = ["derive"] }
|
clap = { workspace = true, features = ["derive"] }
|
||||||
anyhow = { version = "1.0.102", features = ["backtrace"] }
|
anyhow = { version = "1.0.102", features = ["backtrace"] }
|
||||||
fs4 = { version = "0.13.1", features = ["tokio"] }
|
fs4 = { version = "0.13.1", features = ["tokio"] }
|
||||||
|
dashmap = "6.2.1"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio-test.workspace = true
|
tokio-test.workspace = true
|
||||||
|
|||||||
@@ -22,6 +22,8 @@ pub trait Validate {
|
|||||||
/// Agent settings
|
/// Agent settings
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct Settings {
|
pub struct Settings {
|
||||||
|
pub agent_id: String,
|
||||||
|
|
||||||
pub grpc: GrpcSettings,
|
pub grpc: GrpcSettings,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub log: LogSettings,
|
pub log: LogSettings,
|
||||||
|
|||||||
@@ -81,6 +81,7 @@ mod tests {
|
|||||||
|
|
||||||
fn test_settings() -> Settings {
|
fn test_settings() -> Settings {
|
||||||
Settings {
|
Settings {
|
||||||
|
agent_id: "test-agent".to_string(),
|
||||||
grpc: GrpcSettings {
|
grpc: GrpcSettings {
|
||||||
connection_string: "https://localhost:50051".to_string(),
|
connection_string: "https://localhost:50051".to_string(),
|
||||||
m_auth: MAuthSettings::Tls(TLSSettings::ZipPath {
|
m_auth: MAuthSettings::Tls(TLSSettings::ZipPath {
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
#![recursion_limit = "128"]
|
||||||
#![forbid(unsafe_code)]
|
#![forbid(unsafe_code)]
|
||||||
#![deny(clippy::unwrap_used, clippy::panic, clippy::expect_used)]
|
#![deny(clippy::unwrap_used, clippy::panic, clippy::expect_used)]
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::{Arc, Weak};
|
||||||
|
|
||||||
use nxmesh_proto::{AgentMessage, ConfigUpdate, MasterMessage, master_message::Payload};
|
use nxmesh_proto::{AgentMessage, ConfigUpdate, MasterMessage, master_message::Payload};
|
||||||
|
|
||||||
@@ -6,27 +6,37 @@ use crate::service::master_handler::{MasterHandlerError, MessageResult};
|
|||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
pub trait MasterMessageHandler: Send + Sync + 'static {
|
pub trait MasterMessageHandler: Send + Sync + 'static {
|
||||||
async fn handle_master_message(&self, message: MasterMessage) -> MessageResult<()>;
|
async fn handle_master_message(
|
||||||
|
&self,
|
||||||
|
agent_id: &str,
|
||||||
|
message: MasterMessage,
|
||||||
|
) -> MessageResult<()>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
pub trait OnConfigUpdateHandler: Send + Sync + 'static {
|
pub trait OnConfigUpdateHandler: Send + Sync + 'static {
|
||||||
// Handle the config update message from master, write the config content to files, validate the new config and reload nginx
|
// Handle the config update message from master, write the config content to files, validate the new config and reload nginx
|
||||||
async fn on_config_update(&self, config_info: ConfigUpdate) -> MessageResult<()>;
|
async fn on_config_update(
|
||||||
|
&self,
|
||||||
|
agent_id: &str,
|
||||||
|
timestamp: i64,
|
||||||
|
message_id: &str,
|
||||||
|
config_info: ConfigUpdate,
|
||||||
|
) -> MessageResult<()>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct HandlerImpl<OCH>
|
pub struct HandlerImpl<OCH>
|
||||||
where
|
where
|
||||||
OCH: OnConfigUpdateHandler + ?Sized,
|
OCH: OnConfigUpdateHandler + ?Sized,
|
||||||
{
|
{
|
||||||
on_config_update_handler: Arc<OCH>,
|
on_config_update_handler: Weak<OCH>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<OCH> HandlerImpl<OCH>
|
impl<OCH> HandlerImpl<OCH>
|
||||||
where
|
where
|
||||||
OCH: OnConfigUpdateHandler + ?Sized,
|
OCH: OnConfigUpdateHandler + ?Sized,
|
||||||
{
|
{
|
||||||
pub fn new(on_config_update_handler: Arc<OCH>) -> Self {
|
pub fn new(on_config_update_handler: Weak<OCH>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
on_config_update_handler,
|
on_config_update_handler,
|
||||||
}
|
}
|
||||||
@@ -38,11 +48,26 @@ impl<OCH> MasterMessageHandler for HandlerImpl<OCH>
|
|||||||
where
|
where
|
||||||
OCH: OnConfigUpdateHandler + ?Sized,
|
OCH: OnConfigUpdateHandler + ?Sized,
|
||||||
{
|
{
|
||||||
async fn handle_master_message(&self, message: MasterMessage) -> MessageResult<()> {
|
async fn handle_master_message(
|
||||||
|
&self,
|
||||||
|
agent_id: &str,
|
||||||
|
message: MasterMessage,
|
||||||
|
) -> MessageResult<()> {
|
||||||
match message.payload {
|
match message.payload {
|
||||||
Some(Payload::ConfigUpdate(config_info)) => {
|
Some(Payload::ConfigUpdate(config_info)) => {
|
||||||
self.on_config_update_handler
|
let on_config_update_handler =
|
||||||
.on_config_update(config_info)
|
self.on_config_update_handler.upgrade().ok_or_else(|| {
|
||||||
|
MasterHandlerError::MessageHandlingError(
|
||||||
|
"Failed to upgrade weak reference to config update handler".to_string(),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
on_config_update_handler
|
||||||
|
.on_config_update(
|
||||||
|
agent_id,
|
||||||
|
message.timestamp,
|
||||||
|
&message.message_id,
|
||||||
|
config_info,
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
Some(_) => {
|
Some(_) => {
|
||||||
|
|||||||
@@ -45,6 +45,7 @@ pub struct MasterHandlerImpl<MMH>
|
|||||||
where
|
where
|
||||||
MMH: MasterMessageHandler + ?Sized,
|
MMH: MasterMessageHandler + ?Sized,
|
||||||
{
|
{
|
||||||
|
agent_id: String,
|
||||||
connector: Arc<MasterConnector>,
|
connector: Arc<MasterConnector>,
|
||||||
message_handler: Arc<MMH>,
|
message_handler: Arc<MMH>,
|
||||||
message_handle_lock: tokio::sync::RwLock<Option<MessageHandleInfo>>,
|
message_handle_lock: tokio::sync::RwLock<Option<MessageHandleInfo>>,
|
||||||
@@ -54,8 +55,9 @@ impl<MMH> MasterHandlerImpl<MMH>
|
|||||||
where
|
where
|
||||||
MMH: MasterMessageHandler + ?Sized,
|
MMH: MasterMessageHandler + ?Sized,
|
||||||
{
|
{
|
||||||
pub fn new(connector: Arc<MasterConnector>, message_handler: Arc<MMH>) -> Self {
|
pub fn new(agent_id: &str, connector: Arc<MasterConnector>, message_handler: Arc<MMH>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
agent_id: agent_id.to_string(),
|
||||||
connector,
|
connector,
|
||||||
message_handler,
|
message_handler,
|
||||||
message_handle_lock: tokio::sync::RwLock::new(None),
|
message_handle_lock: tokio::sync::RwLock::new(None),
|
||||||
@@ -136,7 +138,7 @@ where
|
|||||||
message = stream.message() => {
|
message = stream.message() => {
|
||||||
match message {
|
match message {
|
||||||
Ok(Some(msg)) => {
|
Ok(Some(msg)) => {
|
||||||
if let Err(e) = self.message_handler.handle_master_message(msg).await {
|
if let Err(e) = self.message_handler.handle_master_message(&self.agent_id, msg).await {
|
||||||
error!("Failed to handle master message: {:?}", e);
|
error!("Failed to handle master message: {:?}", e);
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
config::settings::Settings,
|
config::settings::Settings,
|
||||||
@@ -19,16 +19,29 @@ pub struct Services {
|
|||||||
|
|
||||||
pub async fn get_services(settings: Arc<Settings>) -> anyhow::Result<Services> {
|
pub async fn get_services(settings: Arc<Settings>) -> anyhow::Result<Services> {
|
||||||
let master_connector = initialize_master_connector(settings.clone()).await?;
|
let master_connector = initialize_master_connector(settings.clone()).await?;
|
||||||
|
let master_connector = Arc::new(master_connector);
|
||||||
|
|
||||||
let nginx_handler = Arc::new(NginxHandlerImpl::new(settings.nginx.clone().into()));
|
let master_handler_slot = Arc::new(Mutex::new(None));
|
||||||
|
let slot = master_handler_slot.clone();
|
||||||
|
|
||||||
let message_handler = Arc::new(HandlerImpl::new(nginx_handler.clone()));
|
#[expect(clippy::expect_used)]
|
||||||
|
let nginx_handler = Arc::new_cyclic(|nginx_handler_weak| {
|
||||||
// build the services
|
let message_handler = Arc::new(HandlerImpl::new(nginx_handler_weak.clone()));
|
||||||
let master_handler = Arc::new(MasterHandlerImpl::new(
|
let master_handler = Arc::new(MasterHandlerImpl::new(
|
||||||
Arc::new(master_connector),
|
settings.agent_id.as_str(),
|
||||||
|
master_connector.clone(),
|
||||||
message_handler,
|
message_handler,
|
||||||
));
|
));
|
||||||
|
*slot.lock().expect("master handler slot lock poisoned") = Some(master_handler.clone());
|
||||||
|
|
||||||
|
NginxHandlerImpl::new(settings.nginx.clone().into(), master_handler)
|
||||||
|
});
|
||||||
|
#[expect(clippy::expect_used)]
|
||||||
|
let master_handler = master_handler_slot
|
||||||
|
.lock()
|
||||||
|
.expect("master handler slot lock poisoned")
|
||||||
|
.clone()
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("Failed to initialize master handler"))?;
|
||||||
|
|
||||||
Ok(Services {
|
Ok(Services {
|
||||||
master_handler,
|
master_handler,
|
||||||
|
|||||||
@@ -1,14 +1,36 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use anyhow::Result;
|
use thiserror::Error;
|
||||||
use tokio::process::Command;
|
use tokio::process::Command;
|
||||||
use tracing::{debug, warn};
|
use tracing::{debug, warn};
|
||||||
|
|
||||||
use crate::config::settings::NginxSettings;
|
use crate::{config::settings::NginxSettings, service::master_handler::MasterHandlerError};
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
use mockall::predicate::*;
|
use mockall::predicate::*;
|
||||||
// TODO: custom error type
|
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum CommandHandlerError {
|
||||||
|
#[error("Failed to execute command: {0}")]
|
||||||
|
CommandExecutionError(#[from] std::io::Error),
|
||||||
|
#[error("Invalid config path: {0}")]
|
||||||
|
InvalidConfigPath(String),
|
||||||
|
#[error("Invalid output path: {0}")]
|
||||||
|
InvalidOutputPath(String),
|
||||||
|
#[error("Permission denied: {0}")]
|
||||||
|
PermissionDenied(String),
|
||||||
|
#[error("Other error: {0}")]
|
||||||
|
OtherError(String),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<CommandHandlerError> for MasterHandlerError {
|
||||||
|
fn from(err: CommandHandlerError) -> Self {
|
||||||
|
MasterHandlerError::MessageHandlingError(err.to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type CommandHandlerResult<T> = std::result::Result<T, CommandHandlerError>;
|
||||||
|
type Result<T> = CommandHandlerResult<T>;
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
#[cfg_attr(test, mockall::automock)]
|
#[cfg_attr(test, mockall::automock)]
|
||||||
@@ -40,10 +62,16 @@ impl CommandHandlerImpl {
|
|||||||
|
|
||||||
fn validate_config_path(config_path: &str) -> Result<()> {
|
fn validate_config_path(config_path: &str) -> Result<()> {
|
||||||
if !std::path::Path::new(config_path).exists() {
|
if !std::path::Path::new(config_path).exists() {
|
||||||
anyhow::bail!("Config file not found at path: {}", config_path);
|
return Err(CommandHandlerError::InvalidConfigPath(format!(
|
||||||
|
"Config file not found at path: {}",
|
||||||
|
config_path
|
||||||
|
)));
|
||||||
}
|
}
|
||||||
if !std::path::Path::new(config_path).is_file() {
|
if !std::path::Path::new(config_path).is_file() {
|
||||||
anyhow::bail!("Config path is not a file: {}", config_path);
|
return Err(CommandHandlerError::InvalidConfigPath(format!(
|
||||||
|
"Config path is not a file: {}",
|
||||||
|
config_path
|
||||||
|
)));
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -91,7 +119,12 @@ impl CommandHandler for CommandHandlerImpl {
|
|||||||
.await?;
|
.await?;
|
||||||
if !output.status.success() {
|
if !output.status.success() {
|
||||||
let error_info = String::from_utf8_lossy(&output.stderr);
|
let error_info = String::from_utf8_lossy(&output.stderr);
|
||||||
anyhow::bail!("Failed to reload nginx: {}", error_info.trim());
|
return Err(CommandHandlerError::CommandExecutionError(
|
||||||
|
std::io::Error::new(
|
||||||
|
std::io::ErrorKind::Other,
|
||||||
|
format!("Failed to reload nginx: {}", error_info.trim()),
|
||||||
|
),
|
||||||
|
));
|
||||||
}
|
}
|
||||||
let success_info = String::from_utf8_lossy(&output.stdout);
|
let success_info = String::from_utf8_lossy(&output.stdout);
|
||||||
debug!("Nginx reloaded successfully: {}", success_info.trim());
|
debug!("Nginx reloaded successfully: {}", success_info.trim());
|
||||||
@@ -108,7 +141,12 @@ impl CommandHandler for CommandHandlerImpl {
|
|||||||
|
|
||||||
if !output.status.success() {
|
if !output.status.success() {
|
||||||
let error_info = String::from_utf8_lossy(&output.stderr);
|
let error_info = String::from_utf8_lossy(&output.stderr);
|
||||||
anyhow::bail!("Failed to stop nginx: {}", error_info.trim());
|
return Err(CommandHandlerError::CommandExecutionError(
|
||||||
|
std::io::Error::new(
|
||||||
|
std::io::ErrorKind::Other,
|
||||||
|
format!("Failed to stop nginx: {}", error_info.trim()),
|
||||||
|
),
|
||||||
|
));
|
||||||
}
|
}
|
||||||
let success_info = String::from_utf8_lossy(&output.stdout);
|
let success_info = String::from_utf8_lossy(&output.stdout);
|
||||||
debug!("Nginx stopped successfully: {}", success_info.trim());
|
debug!("Nginx stopped successfully: {}", success_info.trim());
|
||||||
@@ -132,7 +170,12 @@ impl CommandHandler for CommandHandlerImpl {
|
|||||||
let output = Command::new(program).args(&validate_args).output().await?;
|
let output = Command::new(program).args(&validate_args).output().await?;
|
||||||
if !output.status.success() {
|
if !output.status.success() {
|
||||||
let error_info = String::from_utf8_lossy(&output.stderr);
|
let error_info = String::from_utf8_lossy(&output.stderr);
|
||||||
anyhow::bail!("Nginx config validation failed: {}", error_info.trim());
|
return Err(CommandHandlerError::CommandExecutionError(
|
||||||
|
std::io::Error::new(
|
||||||
|
std::io::ErrorKind::Other,
|
||||||
|
format!("Failed to validate nginx config: {}", error_info.trim()),
|
||||||
|
),
|
||||||
|
));
|
||||||
}
|
}
|
||||||
let success_info = String::from_utf8_lossy(&output.stdout);
|
let success_info = String::from_utf8_lossy(&output.stdout);
|
||||||
debug!("Nginx config validation succeeded: {}", success_info.trim());
|
debug!("Nginx config validation succeeded: {}", success_info.trim());
|
||||||
@@ -147,7 +190,12 @@ impl CommandHandler for CommandHandlerImpl {
|
|||||||
|
|
||||||
if !output.status.success() {
|
if !output.status.success() {
|
||||||
let error_info = String::from_utf8_lossy(&output.stderr);
|
let error_info = String::from_utf8_lossy(&output.stderr);
|
||||||
anyhow::bail!("Failed to get nginx version: {}", error_info.trim());
|
return Err(CommandHandlerError::CommandExecutionError(
|
||||||
|
std::io::Error::new(
|
||||||
|
std::io::ErrorKind::Other,
|
||||||
|
format!("Failed to get nginx version: {}", error_info.trim()),
|
||||||
|
),
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let version_info = String::from_utf8_lossy(&output.stderr);
|
let version_info = String::from_utf8_lossy(&output.stderr);
|
||||||
@@ -162,7 +210,12 @@ impl CommandHandler for CommandHandlerImpl {
|
|||||||
|
|
||||||
if !output.status.success() {
|
if !output.status.success() {
|
||||||
let error_info = String::from_utf8_lossy(&output.stderr);
|
let error_info = String::from_utf8_lossy(&output.stderr);
|
||||||
anyhow::bail!("Failed to get nginx status: {}", error_info.trim());
|
return Err(CommandHandlerError::CommandExecutionError(
|
||||||
|
std::io::Error::new(
|
||||||
|
std::io::ErrorKind::Other,
|
||||||
|
format!("Failed to get nginx status: {}", error_info.trim()),
|
||||||
|
),
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let status_info = String::from_utf8_lossy(&output.stderr);
|
let status_info = String::from_utf8_lossy(&output.stderr);
|
||||||
|
|||||||
@@ -1,19 +1,42 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use anyhow::{Context, Result};
|
|
||||||
use fs4::tokio::AsyncFileExt;
|
use fs4::tokio::AsyncFileExt;
|
||||||
|
use thiserror::Error;
|
||||||
use tokio::{io::AsyncWriteExt, process::Command};
|
use tokio::{io::AsyncWriteExt, process::Command};
|
||||||
use tracing::{debug, warn};
|
use tracing::warn;
|
||||||
|
|
||||||
use crate::config::settings::NginxSettings;
|
use crate::{config::settings::NginxSettings, service::master_handler::MasterHandlerError};
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
use mockall::predicate::*;
|
use mockall::predicate::*;
|
||||||
// TODO: custom error type
|
// TODO: custom error type
|
||||||
|
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum FsHandlerError {
|
||||||
|
#[error("Invalid output path: {0}")]
|
||||||
|
InvalidOutputPath(String),
|
||||||
|
#[error("IO error: {0}")]
|
||||||
|
IoError(#[from] std::io::Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<FsHandlerError> for MasterHandlerError {
|
||||||
|
fn from(err: FsHandlerError) -> Self {
|
||||||
|
MasterHandlerError::MessageHandlingError(format!("File system handling error: {}", err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type FsResult<T> = std::result::Result<T, FsHandlerError>;
|
||||||
|
type Result<T> = FsResult<T>;
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
#[cfg_attr(test, mockall::automock)]
|
#[cfg_attr(test, mockall::automock)]
|
||||||
pub trait FsHandler: Send + Sync + 'static {
|
pub trait FsHandler: Send + Sync + 'static {
|
||||||
|
fn get_deployment_id(config_id: &str, version: &str) -> String
|
||||||
|
where
|
||||||
|
Self: Sized,
|
||||||
|
{
|
||||||
|
format!("{}-{}", config_id, version)
|
||||||
|
}
|
||||||
// Write a new config file for nginx.
|
// Write a new config file for nginx.
|
||||||
// The output_path is a relative path to the nginx config directory of the deployment folder. The actual path to the config should not be assumed by the caller, as it can be different in different environments, but will be promised to be relative to the deployment folder for each the corresponding deployment_id. Path traversal is not allowed.
|
// The output_path is a relative path to the nginx config directory of the deployment folder. The actual path to the config should not be assumed by the caller, as it can be different in different environments, but will be promised to be relative to the deployment folder for each the corresponding deployment_id. Path traversal is not allowed.
|
||||||
async fn write_config(
|
async fn write_config(
|
||||||
@@ -46,10 +69,16 @@ impl FsHandlerImpl {
|
|||||||
|
|
||||||
fn validate_config_path(config_path: &str) -> Result<()> {
|
fn validate_config_path(config_path: &str) -> Result<()> {
|
||||||
if !std::path::Path::new(config_path).exists() {
|
if !std::path::Path::new(config_path).exists() {
|
||||||
anyhow::bail!("Config file not found at path: {}", config_path);
|
return Err(FsHandlerError::InvalidOutputPath(format!(
|
||||||
|
"Config file not found at path: {}",
|
||||||
|
config_path
|
||||||
|
)));
|
||||||
}
|
}
|
||||||
if !std::path::Path::new(config_path).is_file() {
|
if !std::path::Path::new(config_path).is_file() {
|
||||||
anyhow::bail!("Config path is not a file: {}", config_path);
|
return Err(FsHandlerError::InvalidOutputPath(format!(
|
||||||
|
"Config path is not a file: {}",
|
||||||
|
config_path
|
||||||
|
)));
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -70,13 +99,17 @@ impl FsHandlerImpl {
|
|||||||
) -> Result<std::path::PathBuf> {
|
) -> Result<std::path::PathBuf> {
|
||||||
let output_path_obj = std::path::Path::new(output_path);
|
let output_path_obj = std::path::Path::new(output_path);
|
||||||
if output_path_obj.is_absolute() {
|
if output_path_obj.is_absolute() {
|
||||||
anyhow::bail!("Output path must be a relative path");
|
return Err(FsHandlerError::InvalidOutputPath(
|
||||||
|
"Output path must be a relative path".into(),
|
||||||
|
));
|
||||||
}
|
}
|
||||||
if output_path_obj
|
if output_path_obj
|
||||||
.components()
|
.components()
|
||||||
.any(|comp| comp == std::path::Component::ParentDir)
|
.any(|comp| comp == std::path::Component::ParentDir)
|
||||||
{
|
{
|
||||||
anyhow::bail!("Output path must not contain parent directory traversal");
|
return Err(FsHandlerError::InvalidOutputPath(
|
||||||
|
"Output path must not contain parent directory traversal".into(),
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let deployment_config_dir = self.get_deployment_dir_path(deployment_id);
|
let deployment_config_dir = self.get_deployment_dir_path(deployment_id);
|
||||||
@@ -103,9 +136,12 @@ impl FsHandler for FsHandlerImpl {
|
|||||||
let full_output_path = self
|
let full_output_path = self
|
||||||
.get_deployment_config_path(deployment_id, output_path, true)
|
.get_deployment_config_path(deployment_id, output_path, true)
|
||||||
.await?;
|
.await?;
|
||||||
let parent_dir = full_output_path
|
let parent_dir = full_output_path.parent().ok_or_else(|| {
|
||||||
.parent()
|
FsHandlerError::InvalidOutputPath(format!(
|
||||||
.context("Failed to get parent directory of the config file")?;
|
"Failed to get parent directory of output path: {:?}",
|
||||||
|
full_output_path
|
||||||
|
))
|
||||||
|
})?;
|
||||||
// ensure the parent directory exists before creating the file
|
// ensure the parent directory exists before creating the file
|
||||||
tokio::fs::create_dir_all(parent_dir).await?;
|
tokio::fs::create_dir_all(parent_dir).await?;
|
||||||
let mut file = tokio::fs::OpenOptions::new()
|
let mut file = tokio::fs::OpenOptions::new()
|
||||||
|
|||||||
121
apps/nxmesh-agent/src/service/nginx_handler/message_handler.rs
Normal file
121
apps/nxmesh-agent/src/service/nginx_handler/message_handler.rs
Normal file
@@ -0,0 +1,121 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use dashmap::DashMap;
|
||||||
|
use nxmesh_proto::{
|
||||||
|
ConfigUpdate, ConfigUpdateResult,
|
||||||
|
agent_message::Payload::ConfigUpdateResult as ConfigUpdateResultPayload,
|
||||||
|
};
|
||||||
|
use tracing::warn;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
config::settings::NginxSettings,
|
||||||
|
service::{
|
||||||
|
master_handler::{MasterHandler, MessageResult, handlers::OnConfigUpdateHandler},
|
||||||
|
nginx_handler::{command_handler::CommandHandler, fs_handler::FsHandler},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
const DEFAULT_CONFIG_PATH: &str = "nginx.conf";
|
||||||
|
const DEFAULT_NGINX_CONFIG_CONTENT: &str = r#"
|
||||||
|
events {}
|
||||||
|
"#;
|
||||||
|
|
||||||
|
pub trait NginxMasterMessageHandler: Send + Sync + 'static
|
||||||
|
//
|
||||||
|
+ OnConfigUpdateHandler
|
||||||
|
{}
|
||||||
|
|
||||||
|
pub struct NginxMasterMessageHandlerImpl {
|
||||||
|
settings: Arc<NginxSettings>,
|
||||||
|
command_handler: Arc<dyn CommandHandler>,
|
||||||
|
fs_handler: Arc<dyn FsHandler>,
|
||||||
|
master_handler: Arc<dyn MasterHandler>,
|
||||||
|
//
|
||||||
|
// dash_map for for storing the on-going config updates, with the key as deployment_id, and the value as a tuple of (version_id, timestamp). On-going update must lock the deployment_id, and the new update with newer timestamp will wait until the lock is released. This is to ensure the config updates are applied in order.
|
||||||
|
// When the current timestamp is older than the timestamp in the map, the current update must be rejected, and the master should be informed to resend the update with the latest timestamp.
|
||||||
|
ongoing_updates: DashMap<String, (String, i64)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NginxMasterMessageHandlerImpl {
|
||||||
|
pub fn new(
|
||||||
|
settings: Arc<NginxSettings>,
|
||||||
|
command_handler: Arc<dyn CommandHandler>,
|
||||||
|
fs_handler: Arc<dyn FsHandler>,
|
||||||
|
master_handler: Arc<dyn MasterHandler>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
settings,
|
||||||
|
command_handler,
|
||||||
|
fs_handler,
|
||||||
|
master_handler,
|
||||||
|
ongoing_updates: DashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NginxMasterMessageHandler for NginxMasterMessageHandlerImpl {}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl OnConfigUpdateHandler for NginxMasterMessageHandlerImpl {
|
||||||
|
async fn on_config_update(
|
||||||
|
&self,
|
||||||
|
agent_id: &str,
|
||||||
|
timestamp: i64,
|
||||||
|
message_id: &str,
|
||||||
|
config_info: ConfigUpdate,
|
||||||
|
) -> MessageResult<()> {
|
||||||
|
// TODO: handle concurrency, expect only the latest version with latest timestamp is applied
|
||||||
|
// when a newer config update comes in, and the older config update is still being processed. The new config will wait until the old config is applied.
|
||||||
|
let deployment_id = format!("{}-{}", config_info.config_id, config_info.version);
|
||||||
|
// write the configs
|
||||||
|
let root_config_path = match config_info.root_config {
|
||||||
|
Some(config_content) => {
|
||||||
|
self.fs_handler
|
||||||
|
.write_config(
|
||||||
|
&deployment_id,
|
||||||
|
&config_content.content,
|
||||||
|
&config_content.path,
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
// If the config content is not provided, write a default config to ensure the deployment folder is created and can be used for later updates.
|
||||||
|
warn!(
|
||||||
|
"Config content is not provided for config update, writing a default minimal config for deployment_id: {}",
|
||||||
|
deployment_id
|
||||||
|
);
|
||||||
|
self.fs_handler
|
||||||
|
.write_config(
|
||||||
|
&deployment_id,
|
||||||
|
DEFAULT_NGINX_CONFIG_CONTENT,
|
||||||
|
DEFAULT_CONFIG_PATH,
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
}
|
||||||
|
};
|
||||||
|
//
|
||||||
|
for config in config_info.configs {
|
||||||
|
self.fs_handler
|
||||||
|
.write_config(&deployment_id, &config.content, &config.path)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
// apply reload on the root config
|
||||||
|
self.command_handler.reload(Some(&root_config_path)).await?;
|
||||||
|
// Reply the master to confirm the config update is successful
|
||||||
|
self.master_handler
|
||||||
|
.send_message_to_master(nxmesh_proto::AgentMessage {
|
||||||
|
agent_id: agent_id.to_string(),
|
||||||
|
timestamp,
|
||||||
|
message_id: message_id.to_string(),
|
||||||
|
payload: Some(ConfigUpdateResultPayload(ConfigUpdateResult {
|
||||||
|
success: true,
|
||||||
|
error_message: None,
|
||||||
|
config_id: config_info.config_id,
|
||||||
|
version: config_info.version,
|
||||||
|
})),
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
//
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,21 +1,22 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use anyhow::Result;
|
|
||||||
use nxmesh_proto::ConfigUpdate;
|
use nxmesh_proto::ConfigUpdate;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
config::settings::NginxSettings,
|
config::settings::NginxSettings,
|
||||||
service::{
|
service::{
|
||||||
master_handler::{MessageResult, handlers::OnConfigUpdateHandler},
|
master_handler::{MasterHandler, MessageResult, handlers::OnConfigUpdateHandler},
|
||||||
nginx_handler::{
|
nginx_handler::{
|
||||||
command_handler::{CommandHandler, CommandHandlerImpl},
|
command_handler::{CommandHandler, CommandHandlerImpl, CommandHandlerResult},
|
||||||
fs_handler::{FsHandler, FsHandlerImpl},
|
fs_handler::{FsHandler, FsHandlerImpl, FsResult},
|
||||||
|
message_handler::{NginxMasterMessageHandler, NginxMasterMessageHandlerImpl},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
mod command_handler;
|
mod command_handler;
|
||||||
mod fs_handler;
|
mod fs_handler;
|
||||||
|
mod message_handler;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
use mockall::predicate::*;
|
use mockall::predicate::*;
|
||||||
@@ -25,11 +26,11 @@ use mockall::predicate::*;
|
|||||||
#[cfg_attr(test, mockall::automock)]
|
#[cfg_attr(test, mockall::automock)]
|
||||||
pub trait NginxHandler: Send + Sync + 'static {
|
pub trait NginxHandler: Send + Sync + 'static {
|
||||||
// Reload nginx to apply new config. The config_path is an optional parameter that specifies the path to the nginx config file to be used for this reload operation. If not provided, the default config path will be used.
|
// Reload nginx to apply new config. The config_path is an optional parameter that specifies the path to the nginx config file to be used for this reload operation. If not provided, the default config path will be used.
|
||||||
async fn reload(&self, config_path: Option<&str>) -> Result<()>;
|
async fn reload(&self, config_path: Option<&str>) -> CommandHandlerResult<()>;
|
||||||
async fn stop(&self) -> Result<()>;
|
async fn stop(&self) -> CommandHandlerResult<()>;
|
||||||
async fn validate(&self, config_path: Option<&str>) -> Result<()>;
|
async fn validate(&self, config_path: Option<&str>) -> CommandHandlerResult<()>;
|
||||||
async fn get_version(&self) -> Result<String>;
|
async fn get_version(&self) -> CommandHandlerResult<String>;
|
||||||
async fn get_status(&self) -> Result<String>;
|
async fn get_status(&self) -> CommandHandlerResult<String>;
|
||||||
// Write a new config file for nginx.
|
// Write a new config file for nginx.
|
||||||
// The output_path is a relative path to the nginx config directory of the deployment folder. The actual path to the config should not be assumed by the caller, as it can be different in different environments, but will be promised to be relative to the deployment folder for each the corresponding deployment_id. Path traversal is not allowed.
|
// The output_path is a relative path to the nginx config directory of the deployment folder. The actual path to the config should not be assumed by the caller, as it can be different in different environments, but will be promised to be relative to the deployment folder for each the corresponding deployment_id. Path traversal is not allowed.
|
||||||
async fn write_config(
|
async fn write_config(
|
||||||
@@ -37,63 +38,65 @@ pub trait NginxHandler: Send + Sync + 'static {
|
|||||||
deployment_id: &str,
|
deployment_id: &str,
|
||||||
config_content: &str,
|
config_content: &str,
|
||||||
output_path: &str,
|
output_path: &str,
|
||||||
) -> Result<String>;
|
) -> FsResult<String>;
|
||||||
// Append a new config content to an existing config file for nginx. This is useful for some use cases where we want to keep the existing config and just add some new config content to it. The output_path is a relative path to the nginx config directory of the deployment folder, which should be the same as the one used in write_config function. Path traversal is not allowed.
|
// Append a new config content to an existing config file for nginx. This is useful for some use cases where we want to keep the existing config and just add some new config content to it. The output_path is a relative path to the nginx config directory of the deployment folder, which should be the same as the one used in write_config function. Path traversal is not allowed.
|
||||||
async fn append_config(
|
async fn append_config(
|
||||||
&self,
|
&self,
|
||||||
deployment_id: &str,
|
deployment_id: &str,
|
||||||
config_content: &str,
|
config_content: &str,
|
||||||
output_path: &str,
|
output_path: &str,
|
||||||
) -> Result<String>;
|
) -> FsResult<String>;
|
||||||
|
|
||||||
// clean up old config files that are applied to nginx
|
// clean up old config files that are applied to nginx
|
||||||
// keep only latest n deployments.
|
// keep only latest n deployments.
|
||||||
async fn cleanup_config(&self, n: usize) -> Result<()>;
|
async fn cleanup_config(&self, n: usize) -> FsResult<()>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct NginxHandlerImpl<CH, FSH>
|
pub struct NginxHandlerImpl {
|
||||||
where
|
|
||||||
CH: CommandHandler + ?Sized,
|
|
||||||
FSH: FsHandler + ?Sized,
|
|
||||||
{
|
|
||||||
settings: Arc<NginxSettings>,
|
settings: Arc<NginxSettings>,
|
||||||
command_handler: Arc<CH>,
|
command_handler: Arc<dyn CommandHandler>,
|
||||||
fs_handler: Arc<FSH>,
|
fs_handler: Arc<dyn FsHandler>,
|
||||||
|
nginx_master_message_handler: Arc<dyn NginxMasterMessageHandler>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NginxHandlerImpl<CommandHandlerImpl, FsHandlerImpl> {
|
impl NginxHandlerImpl {
|
||||||
pub fn new(settings: Arc<NginxSettings>) -> Self {
|
pub fn new(settings: Arc<NginxSettings>, master_handler: Arc<dyn MasterHandler>) -> Self {
|
||||||
|
let command_handler: Arc<dyn CommandHandler> =
|
||||||
|
Arc::new(CommandHandlerImpl::new(settings.clone()));
|
||||||
|
let fs_handler: Arc<dyn FsHandler> = Arc::new(FsHandlerImpl::new(settings.clone()));
|
||||||
Self {
|
Self {
|
||||||
settings: settings.clone(),
|
settings: settings.clone(),
|
||||||
command_handler: Arc::new(CommandHandlerImpl::new(settings.clone())),
|
command_handler: command_handler.clone(),
|
||||||
fs_handler: Arc::new(FsHandlerImpl::new(settings)),
|
fs_handler: fs_handler.clone(),
|
||||||
|
nginx_master_message_handler: Arc::new(NginxMasterMessageHandlerImpl::new(
|
||||||
|
settings.clone(),
|
||||||
|
command_handler.clone(),
|
||||||
|
fs_handler.clone(),
|
||||||
|
master_handler,
|
||||||
|
)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl<CH, FSH> NginxHandler for NginxHandlerImpl<CH, FSH>
|
impl NginxHandler for NginxHandlerImpl {
|
||||||
where
|
async fn reload(&self, config_path: Option<&str>) -> CommandHandlerResult<()> {
|
||||||
CH: CommandHandler + ?Sized,
|
|
||||||
FSH: FsHandler + ?Sized,
|
|
||||||
{
|
|
||||||
async fn reload(&self, config_path: Option<&str>) -> Result<()> {
|
|
||||||
self.command_handler.reload(config_path).await
|
self.command_handler.reload(config_path).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn stop(&self) -> Result<()> {
|
async fn stop(&self) -> CommandHandlerResult<()> {
|
||||||
self.command_handler.stop().await
|
self.command_handler.stop().await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn validate(&self, config_path: Option<&str>) -> Result<()> {
|
async fn validate(&self, config_path: Option<&str>) -> CommandHandlerResult<()> {
|
||||||
self.command_handler.validate(config_path).await
|
self.command_handler.validate(config_path).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_version(&self) -> Result<String> {
|
async fn get_version(&self) -> CommandHandlerResult<String> {
|
||||||
self.command_handler.get_version().await
|
self.command_handler.get_version().await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_status(&self) -> Result<String> {
|
async fn get_status(&self) -> CommandHandlerResult<String> {
|
||||||
self.command_handler.get_status().await
|
self.command_handler.get_status().await
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -102,7 +105,7 @@ where
|
|||||||
deployment_id: &str,
|
deployment_id: &str,
|
||||||
config_content: &str,
|
config_content: &str,
|
||||||
output_path: &str,
|
output_path: &str,
|
||||||
) -> Result<String> {
|
) -> FsResult<String> {
|
||||||
self.fs_handler
|
self.fs_handler
|
||||||
.write_config(deployment_id, config_content, output_path)
|
.write_config(deployment_id, config_content, output_path)
|
||||||
.await
|
.await
|
||||||
@@ -113,24 +116,28 @@ where
|
|||||||
deployment_id: &str,
|
deployment_id: &str,
|
||||||
config_content: &str,
|
config_content: &str,
|
||||||
output_path: &str,
|
output_path: &str,
|
||||||
) -> Result<String> {
|
) -> FsResult<String> {
|
||||||
self.fs_handler
|
self.fs_handler
|
||||||
.append_config(deployment_id, config_content, output_path)
|
.append_config(deployment_id, config_content, output_path)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn cleanup_config(&self, n: usize) -> Result<()> {
|
async fn cleanup_config(&self, n: usize) -> FsResult<()> {
|
||||||
self.fs_handler.cleanup_config(n).await
|
self.fs_handler.cleanup_config(n).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl<CH, FSH> OnConfigUpdateHandler for NginxHandlerImpl<CH, FSH>
|
impl OnConfigUpdateHandler for NginxHandlerImpl {
|
||||||
where
|
async fn on_config_update(
|
||||||
CH: CommandHandler + ?Sized,
|
&self,
|
||||||
FSH: FsHandler + ?Sized,
|
agent_id: &str,
|
||||||
{
|
timestamp: i64,
|
||||||
async fn on_config_update(&self, config_info: ConfigUpdate) -> MessageResult<()> {
|
message_id: &str,
|
||||||
todo!();
|
config_info: ConfigUpdate,
|
||||||
|
) -> MessageResult<()> {
|
||||||
|
self.nginx_master_message_handler
|
||||||
|
.on_config_update(agent_id, timestamp, message_id, config_info)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
agent_id = "agent-id-01"
|
||||||
|
|
||||||
[grpc]
|
[grpc]
|
||||||
connection_string = "https://127.0.0.1:8443"
|
connection_string = "https://127.0.0.1:8443"
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user