diff --git a/apps/nxmesh-agent/Cargo.toml b/apps/nxmesh-agent/Cargo.toml index 4387355..d86623f 100644 --- a/apps/nxmesh-agent/Cargo.toml +++ b/apps/nxmesh-agent/Cargo.toml @@ -59,6 +59,7 @@ zip = { workspace = true } clap = { workspace = true, features = ["derive"] } anyhow = { version = "1.0.102", features = ["backtrace"] } fs4 = { version = "0.13.1", features = ["tokio"] } +dashmap = "6.2.1" [dev-dependencies] tokio-test.workspace = true diff --git a/apps/nxmesh-agent/src/config/settings/mod.rs b/apps/nxmesh-agent/src/config/settings/mod.rs index 07f173f..e42a042 100644 --- a/apps/nxmesh-agent/src/config/settings/mod.rs +++ b/apps/nxmesh-agent/src/config/settings/mod.rs @@ -22,6 +22,8 @@ pub trait Validate { /// Agent settings #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Settings { + pub agent_id: String, + pub grpc: GrpcSettings, #[serde(default)] pub log: LogSettings, diff --git a/apps/nxmesh-agent/src/connector/master/mod.rs b/apps/nxmesh-agent/src/connector/master/mod.rs index cf31b9e..544d48f 100644 --- a/apps/nxmesh-agent/src/connector/master/mod.rs +++ b/apps/nxmesh-agent/src/connector/master/mod.rs @@ -81,6 +81,7 @@ mod tests { fn test_settings() -> Settings { Settings { + agent_id: "test-agent".to_string(), grpc: GrpcSettings { connection_string: "https://localhost:50051".to_string(), m_auth: MAuthSettings::Tls(TLSSettings::ZipPath { diff --git a/apps/nxmesh-agent/src/main.rs b/apps/nxmesh-agent/src/main.rs index 245f0f1..0fcf89a 100644 --- a/apps/nxmesh-agent/src/main.rs +++ b/apps/nxmesh-agent/src/main.rs @@ -1,3 +1,4 @@ +#![recursion_limit = "128"] #![forbid(unsafe_code)] #![deny(clippy::unwrap_used, clippy::panic, clippy::expect_used)] diff --git a/apps/nxmesh-agent/src/service/master_handler/handlers.rs b/apps/nxmesh-agent/src/service/master_handler/handlers.rs index 4144154..b5a2799 100644 --- a/apps/nxmesh-agent/src/service/master_handler/handlers.rs +++ b/apps/nxmesh-agent/src/service/master_handler/handlers.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, Weak}; use nxmesh_proto::{AgentMessage, ConfigUpdate, MasterMessage, master_message::Payload}; @@ -6,27 +6,37 @@ use crate::service::master_handler::{MasterHandlerError, MessageResult}; #[async_trait::async_trait] pub trait MasterMessageHandler: Send + Sync + 'static { - async fn handle_master_message(&self, message: MasterMessage) -> MessageResult<()>; + async fn handle_master_message( + &self, + agent_id: &str, + message: MasterMessage, + ) -> MessageResult<()>; } #[async_trait::async_trait] pub trait OnConfigUpdateHandler: Send + Sync + 'static { // Handle the config update message from master, write the config content to files, validate the new config and reload nginx - async fn on_config_update(&self, config_info: ConfigUpdate) -> MessageResult<()>; + async fn on_config_update( + &self, + agent_id: &str, + timestamp: i64, + message_id: &str, + config_info: ConfigUpdate, + ) -> MessageResult<()>; } pub struct HandlerImpl where OCH: OnConfigUpdateHandler + ?Sized, { - on_config_update_handler: Arc, + on_config_update_handler: Weak, } impl HandlerImpl where OCH: OnConfigUpdateHandler + ?Sized, { - pub fn new(on_config_update_handler: Arc) -> Self { + pub fn new(on_config_update_handler: Weak) -> Self { Self { on_config_update_handler, } @@ -38,11 +48,26 @@ impl MasterMessageHandler for HandlerImpl where OCH: OnConfigUpdateHandler + ?Sized, { - async fn handle_master_message(&self, message: MasterMessage) -> MessageResult<()> { + async fn handle_master_message( + &self, + agent_id: &str, + message: MasterMessage, + ) -> MessageResult<()> { match message.payload { Some(Payload::ConfigUpdate(config_info)) => { - self.on_config_update_handler - .on_config_update(config_info) + let on_config_update_handler = + self.on_config_update_handler.upgrade().ok_or_else(|| { + MasterHandlerError::MessageHandlingError( + "Failed to upgrade weak reference to config update handler".to_string(), + ) + })?; + on_config_update_handler + .on_config_update( + agent_id, + message.timestamp, + &message.message_id, + config_info, + ) .await } Some(_) => { diff --git a/apps/nxmesh-agent/src/service/master_handler/mod.rs b/apps/nxmesh-agent/src/service/master_handler/mod.rs index 178d2ab..e280a77 100644 --- a/apps/nxmesh-agent/src/service/master_handler/mod.rs +++ b/apps/nxmesh-agent/src/service/master_handler/mod.rs @@ -45,6 +45,7 @@ pub struct MasterHandlerImpl where MMH: MasterMessageHandler + ?Sized, { + agent_id: String, connector: Arc, message_handler: Arc, message_handle_lock: tokio::sync::RwLock>, @@ -54,8 +55,9 @@ impl MasterHandlerImpl where MMH: MasterMessageHandler + ?Sized, { - pub fn new(connector: Arc, message_handler: Arc) -> Self { + pub fn new(agent_id: &str, connector: Arc, message_handler: Arc) -> Self { Self { + agent_id: agent_id.to_string(), connector, message_handler, message_handle_lock: tokio::sync::RwLock::new(None), @@ -136,7 +138,7 @@ where message = stream.message() => { match message { Ok(Some(msg)) => { - if let Err(e) = self.message_handler.handle_master_message(msg).await { + if let Err(e) = self.message_handler.handle_master_message(&self.agent_id, msg).await { error!("Failed to handle master message: {:?}", e); } continue; diff --git a/apps/nxmesh-agent/src/service/mod.rs b/apps/nxmesh-agent/src/service/mod.rs index 35e3cfd..d7b05ad 100644 --- a/apps/nxmesh-agent/src/service/mod.rs +++ b/apps/nxmesh-agent/src/service/mod.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use crate::{ config::settings::Settings, @@ -19,16 +19,29 @@ pub struct Services { pub async fn get_services(settings: Arc) -> anyhow::Result { let master_connector = initialize_master_connector(settings.clone()).await?; + let master_connector = Arc::new(master_connector); - let nginx_handler = Arc::new(NginxHandlerImpl::new(settings.nginx.clone().into())); + let master_handler_slot = Arc::new(Mutex::new(None)); + let slot = master_handler_slot.clone(); - let message_handler = Arc::new(HandlerImpl::new(nginx_handler.clone())); + #[expect(clippy::expect_used)] + let nginx_handler = Arc::new_cyclic(|nginx_handler_weak| { + let message_handler = Arc::new(HandlerImpl::new(nginx_handler_weak.clone())); + let master_handler = Arc::new(MasterHandlerImpl::new( + settings.agent_id.as_str(), + master_connector.clone(), + message_handler, + )); + *slot.lock().expect("master handler slot lock poisoned") = Some(master_handler.clone()); - // build the services - let master_handler = Arc::new(MasterHandlerImpl::new( - Arc::new(master_connector), - message_handler, - )); + NginxHandlerImpl::new(settings.nginx.clone().into(), master_handler) + }); + #[expect(clippy::expect_used)] + let master_handler = master_handler_slot + .lock() + .expect("master handler slot lock poisoned") + .clone() + .ok_or_else(|| anyhow::anyhow!("Failed to initialize master handler"))?; Ok(Services { master_handler, diff --git a/apps/nxmesh-agent/src/service/nginx_handler/command_handler.rs b/apps/nxmesh-agent/src/service/nginx_handler/command_handler.rs index d21a150..6c45f14 100644 --- a/apps/nxmesh-agent/src/service/nginx_handler/command_handler.rs +++ b/apps/nxmesh-agent/src/service/nginx_handler/command_handler.rs @@ -1,14 +1,36 @@ use std::sync::Arc; -use anyhow::Result; +use thiserror::Error; use tokio::process::Command; use tracing::{debug, warn}; -use crate::config::settings::NginxSettings; +use crate::{config::settings::NginxSettings, service::master_handler::MasterHandlerError}; #[cfg(test)] use mockall::predicate::*; -// TODO: custom error type + +#[derive(Debug, Error)] +pub enum CommandHandlerError { + #[error("Failed to execute command: {0}")] + CommandExecutionError(#[from] std::io::Error), + #[error("Invalid config path: {0}")] + InvalidConfigPath(String), + #[error("Invalid output path: {0}")] + InvalidOutputPath(String), + #[error("Permission denied: {0}")] + PermissionDenied(String), + #[error("Other error: {0}")] + OtherError(String), +} + +impl From for MasterHandlerError { + fn from(err: CommandHandlerError) -> Self { + MasterHandlerError::MessageHandlingError(err.to_string()) + } +} + +pub type CommandHandlerResult = std::result::Result; +type Result = CommandHandlerResult; #[async_trait::async_trait] #[cfg_attr(test, mockall::automock)] @@ -40,10 +62,16 @@ impl CommandHandlerImpl { fn validate_config_path(config_path: &str) -> Result<()> { if !std::path::Path::new(config_path).exists() { - anyhow::bail!("Config file not found at path: {}", config_path); + return Err(CommandHandlerError::InvalidConfigPath(format!( + "Config file not found at path: {}", + config_path + ))); } if !std::path::Path::new(config_path).is_file() { - anyhow::bail!("Config path is not a file: {}", config_path); + return Err(CommandHandlerError::InvalidConfigPath(format!( + "Config path is not a file: {}", + config_path + ))); } Ok(()) } @@ -91,7 +119,12 @@ impl CommandHandler for CommandHandlerImpl { .await?; if !output.status.success() { let error_info = String::from_utf8_lossy(&output.stderr); - anyhow::bail!("Failed to reload nginx: {}", error_info.trim()); + return Err(CommandHandlerError::CommandExecutionError( + std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to reload nginx: {}", error_info.trim()), + ), + )); } let success_info = String::from_utf8_lossy(&output.stdout); debug!("Nginx reloaded successfully: {}", success_info.trim()); @@ -108,7 +141,12 @@ impl CommandHandler for CommandHandlerImpl { if !output.status.success() { let error_info = String::from_utf8_lossy(&output.stderr); - anyhow::bail!("Failed to stop nginx: {}", error_info.trim()); + return Err(CommandHandlerError::CommandExecutionError( + std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to stop nginx: {}", error_info.trim()), + ), + )); } let success_info = String::from_utf8_lossy(&output.stdout); debug!("Nginx stopped successfully: {}", success_info.trim()); @@ -132,7 +170,12 @@ impl CommandHandler for CommandHandlerImpl { let output = Command::new(program).args(&validate_args).output().await?; if !output.status.success() { let error_info = String::from_utf8_lossy(&output.stderr); - anyhow::bail!("Nginx config validation failed: {}", error_info.trim()); + return Err(CommandHandlerError::CommandExecutionError( + std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to validate nginx config: {}", error_info.trim()), + ), + )); } let success_info = String::from_utf8_lossy(&output.stdout); debug!("Nginx config validation succeeded: {}", success_info.trim()); @@ -147,7 +190,12 @@ impl CommandHandler for CommandHandlerImpl { if !output.status.success() { let error_info = String::from_utf8_lossy(&output.stderr); - anyhow::bail!("Failed to get nginx version: {}", error_info.trim()); + return Err(CommandHandlerError::CommandExecutionError( + std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to get nginx version: {}", error_info.trim()), + ), + )); } let version_info = String::from_utf8_lossy(&output.stderr); @@ -162,7 +210,12 @@ impl CommandHandler for CommandHandlerImpl { if !output.status.success() { let error_info = String::from_utf8_lossy(&output.stderr); - anyhow::bail!("Failed to get nginx status: {}", error_info.trim()); + return Err(CommandHandlerError::CommandExecutionError( + std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to get nginx status: {}", error_info.trim()), + ), + )); } let status_info = String::from_utf8_lossy(&output.stderr); diff --git a/apps/nxmesh-agent/src/service/nginx_handler/fs_handler.rs b/apps/nxmesh-agent/src/service/nginx_handler/fs_handler.rs index a36e0cd..053b17b 100644 --- a/apps/nxmesh-agent/src/service/nginx_handler/fs_handler.rs +++ b/apps/nxmesh-agent/src/service/nginx_handler/fs_handler.rs @@ -1,19 +1,42 @@ use std::sync::Arc; -use anyhow::{Context, Result}; use fs4::tokio::AsyncFileExt; +use thiserror::Error; use tokio::{io::AsyncWriteExt, process::Command}; -use tracing::{debug, warn}; +use tracing::warn; -use crate::config::settings::NginxSettings; +use crate::{config::settings::NginxSettings, service::master_handler::MasterHandlerError}; #[cfg(test)] use mockall::predicate::*; // TODO: custom error type +#[derive(Debug, Error)] +pub enum FsHandlerError { + #[error("Invalid output path: {0}")] + InvalidOutputPath(String), + #[error("IO error: {0}")] + IoError(#[from] std::io::Error), +} + +impl From for MasterHandlerError { + fn from(err: FsHandlerError) -> Self { + MasterHandlerError::MessageHandlingError(format!("File system handling error: {}", err)) + } +} + +pub type FsResult = std::result::Result; +type Result = FsResult; + #[async_trait::async_trait] #[cfg_attr(test, mockall::automock)] pub trait FsHandler: Send + Sync + 'static { + fn get_deployment_id(config_id: &str, version: &str) -> String + where + Self: Sized, + { + format!("{}-{}", config_id, version) + } // Write a new config file for nginx. // The output_path is a relative path to the nginx config directory of the deployment folder. The actual path to the config should not be assumed by the caller, as it can be different in different environments, but will be promised to be relative to the deployment folder for each the corresponding deployment_id. Path traversal is not allowed. async fn write_config( @@ -46,10 +69,16 @@ impl FsHandlerImpl { fn validate_config_path(config_path: &str) -> Result<()> { if !std::path::Path::new(config_path).exists() { - anyhow::bail!("Config file not found at path: {}", config_path); + return Err(FsHandlerError::InvalidOutputPath(format!( + "Config file not found at path: {}", + config_path + ))); } if !std::path::Path::new(config_path).is_file() { - anyhow::bail!("Config path is not a file: {}", config_path); + return Err(FsHandlerError::InvalidOutputPath(format!( + "Config path is not a file: {}", + config_path + ))); } Ok(()) } @@ -70,13 +99,17 @@ impl FsHandlerImpl { ) -> Result { let output_path_obj = std::path::Path::new(output_path); if output_path_obj.is_absolute() { - anyhow::bail!("Output path must be a relative path"); + return Err(FsHandlerError::InvalidOutputPath( + "Output path must be a relative path".into(), + )); } if output_path_obj .components() .any(|comp| comp == std::path::Component::ParentDir) { - anyhow::bail!("Output path must not contain parent directory traversal"); + return Err(FsHandlerError::InvalidOutputPath( + "Output path must not contain parent directory traversal".into(), + )); } let deployment_config_dir = self.get_deployment_dir_path(deployment_id); @@ -103,9 +136,12 @@ impl FsHandler for FsHandlerImpl { let full_output_path = self .get_deployment_config_path(deployment_id, output_path, true) .await?; - let parent_dir = full_output_path - .parent() - .context("Failed to get parent directory of the config file")?; + let parent_dir = full_output_path.parent().ok_or_else(|| { + FsHandlerError::InvalidOutputPath(format!( + "Failed to get parent directory of output path: {:?}", + full_output_path + )) + })?; // ensure the parent directory exists before creating the file tokio::fs::create_dir_all(parent_dir).await?; let mut file = tokio::fs::OpenOptions::new() diff --git a/apps/nxmesh-agent/src/service/nginx_handler/message_handler.rs b/apps/nxmesh-agent/src/service/nginx_handler/message_handler.rs new file mode 100644 index 0000000..342e443 --- /dev/null +++ b/apps/nxmesh-agent/src/service/nginx_handler/message_handler.rs @@ -0,0 +1,121 @@ +use std::sync::Arc; + +use dashmap::DashMap; +use nxmesh_proto::{ + ConfigUpdate, ConfigUpdateResult, + agent_message::Payload::ConfigUpdateResult as ConfigUpdateResultPayload, +}; +use tracing::warn; + +use crate::{ + config::settings::NginxSettings, + service::{ + master_handler::{MasterHandler, MessageResult, handlers::OnConfigUpdateHandler}, + nginx_handler::{command_handler::CommandHandler, fs_handler::FsHandler}, + }, +}; + +const DEFAULT_CONFIG_PATH: &str = "nginx.conf"; +const DEFAULT_NGINX_CONFIG_CONTENT: &str = r#" +events {} +"#; + +pub trait NginxMasterMessageHandler: Send + Sync + 'static +// ++ OnConfigUpdateHandler +{} + +pub struct NginxMasterMessageHandlerImpl { + settings: Arc, + command_handler: Arc, + fs_handler: Arc, + master_handler: Arc, + // + // dash_map for for storing the on-going config updates, with the key as deployment_id, and the value as a tuple of (version_id, timestamp). On-going update must lock the deployment_id, and the new update with newer timestamp will wait until the lock is released. This is to ensure the config updates are applied in order. + // When the current timestamp is older than the timestamp in the map, the current update must be rejected, and the master should be informed to resend the update with the latest timestamp. + ongoing_updates: DashMap, +} + +impl NginxMasterMessageHandlerImpl { + pub fn new( + settings: Arc, + command_handler: Arc, + fs_handler: Arc, + master_handler: Arc, + ) -> Self { + Self { + settings, + command_handler, + fs_handler, + master_handler, + ongoing_updates: DashMap::new(), + } + } +} + +impl NginxMasterMessageHandler for NginxMasterMessageHandlerImpl {} + +#[async_trait::async_trait] +impl OnConfigUpdateHandler for NginxMasterMessageHandlerImpl { + async fn on_config_update( + &self, + agent_id: &str, + timestamp: i64, + message_id: &str, + config_info: ConfigUpdate, + ) -> MessageResult<()> { + // TODO: handle concurrency, expect only the latest version with latest timestamp is applied + // when a newer config update comes in, and the older config update is still being processed. The new config will wait until the old config is applied. + let deployment_id = format!("{}-{}", config_info.config_id, config_info.version); + // write the configs + let root_config_path = match config_info.root_config { + Some(config_content) => { + self.fs_handler + .write_config( + &deployment_id, + &config_content.content, + &config_content.path, + ) + .await? + } + None => { + // If the config content is not provided, write a default config to ensure the deployment folder is created and can be used for later updates. + warn!( + "Config content is not provided for config update, writing a default minimal config for deployment_id: {}", + deployment_id + ); + self.fs_handler + .write_config( + &deployment_id, + DEFAULT_NGINX_CONFIG_CONTENT, + DEFAULT_CONFIG_PATH, + ) + .await? + } + }; + // + for config in config_info.configs { + self.fs_handler + .write_config(&deployment_id, &config.content, &config.path) + .await?; + } + // apply reload on the root config + self.command_handler.reload(Some(&root_config_path)).await?; + // Reply the master to confirm the config update is successful + self.master_handler + .send_message_to_master(nxmesh_proto::AgentMessage { + agent_id: agent_id.to_string(), + timestamp, + message_id: message_id.to_string(), + payload: Some(ConfigUpdateResultPayload(ConfigUpdateResult { + success: true, + error_message: None, + config_id: config_info.config_id, + version: config_info.version, + })), + }) + .await?; + // + Ok(()) + } +} diff --git a/apps/nxmesh-agent/src/service/nginx_handler/mod.rs b/apps/nxmesh-agent/src/service/nginx_handler/mod.rs index b88ddb7..cba35fe 100644 --- a/apps/nxmesh-agent/src/service/nginx_handler/mod.rs +++ b/apps/nxmesh-agent/src/service/nginx_handler/mod.rs @@ -1,21 +1,22 @@ use std::sync::Arc; -use anyhow::Result; use nxmesh_proto::ConfigUpdate; use crate::{ config::settings::NginxSettings, service::{ - master_handler::{MessageResult, handlers::OnConfigUpdateHandler}, + master_handler::{MasterHandler, MessageResult, handlers::OnConfigUpdateHandler}, nginx_handler::{ - command_handler::{CommandHandler, CommandHandlerImpl}, - fs_handler::{FsHandler, FsHandlerImpl}, + command_handler::{CommandHandler, CommandHandlerImpl, CommandHandlerResult}, + fs_handler::{FsHandler, FsHandlerImpl, FsResult}, + message_handler::{NginxMasterMessageHandler, NginxMasterMessageHandlerImpl}, }, }, }; mod command_handler; mod fs_handler; +mod message_handler; #[cfg(test)] use mockall::predicate::*; @@ -25,11 +26,11 @@ use mockall::predicate::*; #[cfg_attr(test, mockall::automock)] pub trait NginxHandler: Send + Sync + 'static { // Reload nginx to apply new config. The config_path is an optional parameter that specifies the path to the nginx config file to be used for this reload operation. If not provided, the default config path will be used. - async fn reload(&self, config_path: Option<&str>) -> Result<()>; - async fn stop(&self) -> Result<()>; - async fn validate(&self, config_path: Option<&str>) -> Result<()>; - async fn get_version(&self) -> Result; - async fn get_status(&self) -> Result; + async fn reload(&self, config_path: Option<&str>) -> CommandHandlerResult<()>; + async fn stop(&self) -> CommandHandlerResult<()>; + async fn validate(&self, config_path: Option<&str>) -> CommandHandlerResult<()>; + async fn get_version(&self) -> CommandHandlerResult; + async fn get_status(&self) -> CommandHandlerResult; // Write a new config file for nginx. // The output_path is a relative path to the nginx config directory of the deployment folder. The actual path to the config should not be assumed by the caller, as it can be different in different environments, but will be promised to be relative to the deployment folder for each the corresponding deployment_id. Path traversal is not allowed. async fn write_config( @@ -37,63 +38,65 @@ pub trait NginxHandler: Send + Sync + 'static { deployment_id: &str, config_content: &str, output_path: &str, - ) -> Result; + ) -> FsResult; // Append a new config content to an existing config file for nginx. This is useful for some use cases where we want to keep the existing config and just add some new config content to it. The output_path is a relative path to the nginx config directory of the deployment folder, which should be the same as the one used in write_config function. Path traversal is not allowed. async fn append_config( &self, deployment_id: &str, config_content: &str, output_path: &str, - ) -> Result; + ) -> FsResult; // clean up old config files that are applied to nginx // keep only latest n deployments. - async fn cleanup_config(&self, n: usize) -> Result<()>; + async fn cleanup_config(&self, n: usize) -> FsResult<()>; } -pub struct NginxHandlerImpl -where - CH: CommandHandler + ?Sized, - FSH: FsHandler + ?Sized, -{ +pub struct NginxHandlerImpl { settings: Arc, - command_handler: Arc, - fs_handler: Arc, + command_handler: Arc, + fs_handler: Arc, + nginx_master_message_handler: Arc, } -impl NginxHandlerImpl { - pub fn new(settings: Arc) -> Self { +impl NginxHandlerImpl { + pub fn new(settings: Arc, master_handler: Arc) -> Self { + let command_handler: Arc = + Arc::new(CommandHandlerImpl::new(settings.clone())); + let fs_handler: Arc = Arc::new(FsHandlerImpl::new(settings.clone())); Self { settings: settings.clone(), - command_handler: Arc::new(CommandHandlerImpl::new(settings.clone())), - fs_handler: Arc::new(FsHandlerImpl::new(settings)), + command_handler: command_handler.clone(), + fs_handler: fs_handler.clone(), + nginx_master_message_handler: Arc::new(NginxMasterMessageHandlerImpl::new( + settings.clone(), + command_handler.clone(), + fs_handler.clone(), + master_handler, + )), } } } #[async_trait::async_trait] -impl NginxHandler for NginxHandlerImpl -where - CH: CommandHandler + ?Sized, - FSH: FsHandler + ?Sized, -{ - async fn reload(&self, config_path: Option<&str>) -> Result<()> { +impl NginxHandler for NginxHandlerImpl { + async fn reload(&self, config_path: Option<&str>) -> CommandHandlerResult<()> { self.command_handler.reload(config_path).await } - async fn stop(&self) -> Result<()> { + async fn stop(&self) -> CommandHandlerResult<()> { self.command_handler.stop().await } - async fn validate(&self, config_path: Option<&str>) -> Result<()> { + async fn validate(&self, config_path: Option<&str>) -> CommandHandlerResult<()> { self.command_handler.validate(config_path).await } - async fn get_version(&self) -> Result { + async fn get_version(&self) -> CommandHandlerResult { self.command_handler.get_version().await } - async fn get_status(&self) -> Result { + async fn get_status(&self) -> CommandHandlerResult { self.command_handler.get_status().await } @@ -102,7 +105,7 @@ where deployment_id: &str, config_content: &str, output_path: &str, - ) -> Result { + ) -> FsResult { self.fs_handler .write_config(deployment_id, config_content, output_path) .await @@ -113,24 +116,28 @@ where deployment_id: &str, config_content: &str, output_path: &str, - ) -> Result { + ) -> FsResult { self.fs_handler .append_config(deployment_id, config_content, output_path) .await } - async fn cleanup_config(&self, n: usize) -> Result<()> { + async fn cleanup_config(&self, n: usize) -> FsResult<()> { self.fs_handler.cleanup_config(n).await } } #[async_trait::async_trait] -impl OnConfigUpdateHandler for NginxHandlerImpl -where - CH: CommandHandler + ?Sized, - FSH: FsHandler + ?Sized, -{ - async fn on_config_update(&self, config_info: ConfigUpdate) -> MessageResult<()> { - todo!(); +impl OnConfigUpdateHandler for NginxHandlerImpl { + async fn on_config_update( + &self, + agent_id: &str, + timestamp: i64, + message_id: &str, + config_info: ConfigUpdate, + ) -> MessageResult<()> { + self.nginx_master_message_handler + .on_config_update(agent_id, timestamp, message_id, config_info) + .await } } diff --git a/config/agent/development.toml b/config/agent/development.toml index 0859b25..5fa3099 100644 --- a/config/agent/development.toml +++ b/config/agent/development.toml @@ -1,3 +1,5 @@ +agent_id = "agent-id-01" + [grpc] connection_string = "https://127.0.0.1:8443"