feat: add command handling support to Nginx message handler

This commit is contained in:
2026-06-16 10:55:13 +00:00
parent c0d243f661
commit 3c2cda88f1
4 changed files with 109 additions and 16 deletions

View File

@@ -1,6 +1,8 @@
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use nxmesh_proto::{AgentMessage, ConfigUpdate, MasterMessage, master_message::Payload}; use nxmesh_proto::{
AgentMessage, ConfigUpdate, MasterMessage, command::Command, master_message::Payload,
};
use crate::service::master_handler::{MasterHandlerError, MessageResult}; use crate::service::master_handler::{MasterHandlerError, MessageResult};
@@ -25,28 +27,45 @@ pub trait OnConfigUpdateHandler: Send + Sync + 'static {
) -> MessageResult<()>; ) -> MessageResult<()>;
} }
pub struct HandlerImpl<OCH> #[async_trait::async_trait]
where pub trait OnCommandHandler: Send + Sync + 'static {
OCH: OnConfigUpdateHandler + ?Sized, // Handle the command message from master, execute the command and return the result
{ async fn on_command(
on_config_update_handler: Weak<OCH>, &self,
agent_id: &str,
timestamp: i64,
message_id: &str,
command: Command,
) -> MessageResult<()>;
} }
impl<OCH> HandlerImpl<OCH> pub struct HandlerImpl<OCUH, OCH>
where where
OCH: OnConfigUpdateHandler + ?Sized, OCUH: OnConfigUpdateHandler + ?Sized,
OCH: OnCommandHandler + ?Sized,
{ {
pub fn new(on_config_update_handler: Weak<OCH>) -> Self { on_config_update_handler: Weak<OCUH>,
on_command_handler: Weak<OCH>,
}
impl<OCUH, OCH> HandlerImpl<OCUH, OCH>
where
OCUH: OnConfigUpdateHandler + ?Sized,
OCH: OnCommandHandler + ?Sized,
{
pub fn new(on_config_update_handler: Weak<OCUH>, on_command_handler: Weak<OCH>) -> Self {
Self { Self {
on_config_update_handler, on_config_update_handler,
on_command_handler,
} }
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl<OCH> MasterMessageHandler for HandlerImpl<OCH> impl<OCUH, OCH> MasterMessageHandler for HandlerImpl<OCUH, OCH>
where where
OCH: OnConfigUpdateHandler + ?Sized, OCUH: OnConfigUpdateHandler + ?Sized,
OCH: OnCommandHandler + ?Sized,
{ {
async fn handle_master_message( async fn handle_master_message(
&self, &self,

View File

@@ -26,7 +26,10 @@ pub async fn get_services(settings: Arc<Settings>) -> anyhow::Result<Services> {
#[expect(clippy::expect_used)] #[expect(clippy::expect_used)]
let nginx_handler = Arc::new_cyclic(|nginx_handler_weak| { let nginx_handler = Arc::new_cyclic(|nginx_handler_weak| {
let message_handler = Arc::new(HandlerImpl::new(nginx_handler_weak.clone())); let message_handler = Arc::new(HandlerImpl::new(
nginx_handler_weak.clone(),
nginx_handler_weak.clone(),
));
let master_handler = Arc::new(MasterHandlerImpl::new( let master_handler = Arc::new(MasterHandlerImpl::new(
settings.agent_id.as_str(), settings.agent_id.as_str(),
master_connector.clone(), master_connector.clone(),

View File

@@ -3,14 +3,18 @@ use std::sync::Arc;
use dashmap::DashMap; use dashmap::DashMap;
use nxmesh_proto::{ use nxmesh_proto::{
ConfigUpdate, ConfigUpdateResult, ConfigUpdate, ConfigUpdateResult,
agent_message::Payload::ConfigUpdateResult as ConfigUpdateResultPayload, agent_message::Payload::ConfigUpdateResult as ConfigUpdateResultPayload, command::Command,
command_result,
}; };
use tracing::warn; use tracing::warn;
use crate::{ use crate::{
config::settings::NginxSettings, config::settings::NginxSettings,
service::{ service::{
master_handler::{MasterHandler, MessageResult, handlers::OnConfigUpdateHandler}, master_handler::{
MasterHandler, MessageResult,
handlers::{OnCommandHandler, OnConfigUpdateHandler},
},
nginx_handler::{command_handler::CommandHandler, fs_handler::FsHandler}, nginx_handler::{command_handler::CommandHandler, fs_handler::FsHandler},
}, },
}; };
@@ -23,6 +27,7 @@ events {}
pub trait NginxMasterMessageHandler: Send + Sync + 'static pub trait NginxMasterMessageHandler: Send + Sync + 'static
// //
+ OnConfigUpdateHandler + OnConfigUpdateHandler
+ OnCommandHandler
{} {}
pub struct NginxMasterMessageHandlerImpl { pub struct NginxMasterMessageHandlerImpl {
@@ -119,3 +124,51 @@ impl OnConfigUpdateHandler for NginxMasterMessageHandlerImpl {
Ok(()) Ok(())
} }
} }
#[async_trait::async_trait]
impl OnCommandHandler for NginxMasterMessageHandlerImpl {
async fn on_command(
&self,
agent_id: &str,
timestamp: i64,
message_id: &str,
command: Command,
) -> MessageResult<()> {
// execute the command
let mut agent_message = nxmesh_proto::AgentMessage {
agent_id: agent_id.to_string(),
timestamp,
message_id: message_id.to_string(),
payload: None,
};
let result: command_result::Result = match command {
// TODO: should use the previous config path
Command::Reload(_) => {
let result = self.command_handler.reload(None).await;
command_result::Result::ReloadResult(nxmesh_proto::ReloadResult {
success: result.is_ok(),
error_message: result.err().map(|e| e.to_string()).unwrap_or_default(),
})
}
Command::Test(_) => {
let result = self.command_handler.validate(None).await;
command_result::Result::TestResult(nxmesh_proto::TestResult {
success: result.is_ok(),
error_message: result.err().map(|e| e.to_string()).unwrap_or_default(),
})
}
};
// Reply the master to confirm the command execution is successful, and return the command output
agent_message.payload = Some(nxmesh_proto::agent_message::Payload::CommandResult(
nxmesh_proto::CommandResult {
result: Some(result),
},
));
self.master_handler
.send_message_to_master(agent_message)
.await?;
//
Ok(())
}
}

View File

@@ -1,11 +1,14 @@
use std::sync::Arc; use std::sync::Arc;
use nxmesh_proto::ConfigUpdate; use nxmesh_proto::{ConfigUpdate, command::Command};
use crate::{ use crate::{
config::settings::NginxSettings, config::settings::NginxSettings,
service::{ service::{
master_handler::{MasterHandler, MessageResult, handlers::OnConfigUpdateHandler}, master_handler::{
MasterHandler, MessageResult,
handlers::{OnCommandHandler, OnConfigUpdateHandler},
},
nginx_handler::{ nginx_handler::{
command_handler::{CommandHandler, CommandHandlerImpl, CommandHandlerResult}, command_handler::{CommandHandler, CommandHandlerImpl, CommandHandlerResult},
fs_handler::{FsHandler, FsHandlerImpl, FsResult}, fs_handler::{FsHandler, FsHandlerImpl, FsResult},
@@ -141,3 +144,18 @@ impl OnConfigUpdateHandler for NginxHandlerImpl {
.await .await
} }
} }
#[async_trait::async_trait]
impl OnCommandHandler for NginxHandlerImpl {
async fn on_command(
&self,
agent_id: &str,
timestamp: i64,
message_id: &str,
command: Command,
) -> MessageResult<()> {
self.nginx_master_message_handler
.on_command(agent_id, timestamp, message_id, command)
.await
}
}