diff --git a/apps/nxmesh-agent/src/service/master_handler/handlers.rs b/apps/nxmesh-agent/src/service/master_handler/handlers.rs index b5a2799..9f96cf3 100644 --- a/apps/nxmesh-agent/src/service/master_handler/handlers.rs +++ b/apps/nxmesh-agent/src/service/master_handler/handlers.rs @@ -1,6 +1,8 @@ use std::sync::{Arc, Weak}; -use nxmesh_proto::{AgentMessage, ConfigUpdate, MasterMessage, master_message::Payload}; +use nxmesh_proto::{ + AgentMessage, ConfigUpdate, MasterMessage, command::Command, master_message::Payload, +}; use crate::service::master_handler::{MasterHandlerError, MessageResult}; @@ -25,28 +27,45 @@ pub trait OnConfigUpdateHandler: Send + Sync + 'static { ) -> MessageResult<()>; } -pub struct HandlerImpl -where - OCH: OnConfigUpdateHandler + ?Sized, -{ - on_config_update_handler: Weak, +#[async_trait::async_trait] +pub trait OnCommandHandler: Send + Sync + 'static { + // Handle the command message from master, execute the command and return the result + async fn on_command( + &self, + agent_id: &str, + timestamp: i64, + message_id: &str, + command: Command, + ) -> MessageResult<()>; } -impl HandlerImpl +pub struct HandlerImpl where - OCH: OnConfigUpdateHandler + ?Sized, + OCUH: OnConfigUpdateHandler + ?Sized, + OCH: OnCommandHandler + ?Sized, { - pub fn new(on_config_update_handler: Weak) -> Self { + on_config_update_handler: Weak, + on_command_handler: Weak, +} + +impl HandlerImpl +where + OCUH: OnConfigUpdateHandler + ?Sized, + OCH: OnCommandHandler + ?Sized, +{ + pub fn new(on_config_update_handler: Weak, on_command_handler: Weak) -> Self { Self { on_config_update_handler, + on_command_handler, } } } #[async_trait::async_trait] -impl MasterMessageHandler for HandlerImpl +impl MasterMessageHandler for HandlerImpl where - OCH: OnConfigUpdateHandler + ?Sized, + OCUH: OnConfigUpdateHandler + ?Sized, + OCH: OnCommandHandler + ?Sized, { async fn handle_master_message( &self, diff --git a/apps/nxmesh-agent/src/service/mod.rs b/apps/nxmesh-agent/src/service/mod.rs index d7b05ad..7fa9067 100644 --- a/apps/nxmesh-agent/src/service/mod.rs +++ b/apps/nxmesh-agent/src/service/mod.rs @@ -26,7 +26,10 @@ pub async fn get_services(settings: Arc) -> anyhow::Result { #[expect(clippy::expect_used)] let nginx_handler = Arc::new_cyclic(|nginx_handler_weak| { - let message_handler = Arc::new(HandlerImpl::new(nginx_handler_weak.clone())); + let message_handler = Arc::new(HandlerImpl::new( + nginx_handler_weak.clone(), + nginx_handler_weak.clone(), + )); let master_handler = Arc::new(MasterHandlerImpl::new( settings.agent_id.as_str(), master_connector.clone(), diff --git a/apps/nxmesh-agent/src/service/nginx_handler/message_handler.rs b/apps/nxmesh-agent/src/service/nginx_handler/message_handler.rs index 342e443..a0a994f 100644 --- a/apps/nxmesh-agent/src/service/nginx_handler/message_handler.rs +++ b/apps/nxmesh-agent/src/service/nginx_handler/message_handler.rs @@ -3,14 +3,18 @@ use std::sync::Arc; use dashmap::DashMap; use nxmesh_proto::{ ConfigUpdate, ConfigUpdateResult, - agent_message::Payload::ConfigUpdateResult as ConfigUpdateResultPayload, + agent_message::Payload::ConfigUpdateResult as ConfigUpdateResultPayload, command::Command, + command_result, }; use tracing::warn; use crate::{ config::settings::NginxSettings, service::{ - master_handler::{MasterHandler, MessageResult, handlers::OnConfigUpdateHandler}, + master_handler::{ + MasterHandler, MessageResult, + handlers::{OnCommandHandler, OnConfigUpdateHandler}, + }, nginx_handler::{command_handler::CommandHandler, fs_handler::FsHandler}, }, }; @@ -23,6 +27,7 @@ events {} pub trait NginxMasterMessageHandler: Send + Sync + 'static // + OnConfigUpdateHandler ++ OnCommandHandler {} pub struct NginxMasterMessageHandlerImpl { @@ -119,3 +124,51 @@ impl OnConfigUpdateHandler for NginxMasterMessageHandlerImpl { Ok(()) } } + +#[async_trait::async_trait] +impl OnCommandHandler for NginxMasterMessageHandlerImpl { + async fn on_command( + &self, + agent_id: &str, + timestamp: i64, + message_id: &str, + command: Command, + ) -> MessageResult<()> { + // execute the command + let mut agent_message = nxmesh_proto::AgentMessage { + agent_id: agent_id.to_string(), + timestamp, + message_id: message_id.to_string(), + payload: None, + }; + let result: command_result::Result = match command { + // TODO: should use the previous config path + Command::Reload(_) => { + let result = self.command_handler.reload(None).await; + command_result::Result::ReloadResult(nxmesh_proto::ReloadResult { + success: result.is_ok(), + error_message: result.err().map(|e| e.to_string()).unwrap_or_default(), + }) + } + Command::Test(_) => { + let result = self.command_handler.validate(None).await; + command_result::Result::TestResult(nxmesh_proto::TestResult { + success: result.is_ok(), + error_message: result.err().map(|e| e.to_string()).unwrap_or_default(), + }) + } + }; + // Reply the master to confirm the command execution is successful, and return the command output + agent_message.payload = Some(nxmesh_proto::agent_message::Payload::CommandResult( + nxmesh_proto::CommandResult { + result: Some(result), + }, + )); + + self.master_handler + .send_message_to_master(agent_message) + .await?; + // + Ok(()) + } +} diff --git a/apps/nxmesh-agent/src/service/nginx_handler/mod.rs b/apps/nxmesh-agent/src/service/nginx_handler/mod.rs index cba35fe..0f3a3e3 100644 --- a/apps/nxmesh-agent/src/service/nginx_handler/mod.rs +++ b/apps/nxmesh-agent/src/service/nginx_handler/mod.rs @@ -1,11 +1,14 @@ use std::sync::Arc; -use nxmesh_proto::ConfigUpdate; +use nxmesh_proto::{ConfigUpdate, command::Command}; use crate::{ config::settings::NginxSettings, service::{ - master_handler::{MasterHandler, MessageResult, handlers::OnConfigUpdateHandler}, + master_handler::{ + MasterHandler, MessageResult, + handlers::{OnCommandHandler, OnConfigUpdateHandler}, + }, nginx_handler::{ command_handler::{CommandHandler, CommandHandlerImpl, CommandHandlerResult}, fs_handler::{FsHandler, FsHandlerImpl, FsResult}, @@ -141,3 +144,18 @@ impl OnConfigUpdateHandler for NginxHandlerImpl { .await } } + +#[async_trait::async_trait] +impl OnCommandHandler for NginxHandlerImpl { + async fn on_command( + &self, + agent_id: &str, + timestamp: i64, + message_id: &str, + command: Command, + ) -> MessageResult<()> { + self.nginx_master_message_handler + .on_command(agent_id, timestamp, message_id, command) + .await + } +}