From f428f18f8711917653e3c0e911e02fedb665249f Mon Sep 17 00:00:00 2001 From: GW_MC <72297530+GWMCwing@users.noreply.github.com> Date: Wed, 6 May 2026 09:23:51 +0000 Subject: [PATCH] feat: implement MasterHandler for handling master messages and add retry logic --- .../src/service/master_handler.rs | 265 +++++++++++++++++- apps/nxmesh-agent/src/service/mod.rs | 1 + 2 files changed, 252 insertions(+), 14 deletions(-) diff --git a/apps/nxmesh-agent/src/service/master_handler.rs b/apps/nxmesh-agent/src/service/master_handler.rs index 52ace69..083c6a4 100644 --- a/apps/nxmesh-agent/src/service/master_handler.rs +++ b/apps/nxmesh-agent/src/service/master_handler.rs @@ -1,38 +1,275 @@ use std::sync::Arc; -use nxmesh_proto::ConfigUpdate; -use tracing::info; +use anyhow::{Context, Result, bail}; +use nxmesh_proto::{AgentMessage, ConfigUpdate, MasterMessage}; +use tokio::time::sleep; +use tokio_stream::wrappers::ReceiverStream; +use tonic::Request; +use tracing::{error, info, warn}; -use crate::connector::master::MasterConnector; +use crate::connector::master::{MasterConnector, MasterConnectorTrait}; #[async_trait::async_trait] -pub trait MasterHandler { - async fn on_config_update( - &self, - config_info: ConfigUpdate, - ) -> Result<(), Box>; +pub trait MasterHandler: Send + Sync { + // Create a new routine to handle incoming messages from the master + // This method will auto-reconnect if the connection is lost, so it should run indefinitely until the agent is shut down + async fn start_handle_master_message(self: Arc) -> Result<()>; + async fn stop_handle_master_message(self: Arc) -> Result<()>; +} + +#[async_trait::async_trait] +trait MasterMessageHandler: Send + Sync { + async fn on_config_update(&self, config_info: ConfigUpdate) -> Result<()>; } pub struct MasterHandlerImpl { settings: Arc, + nginx_handler: Arc, + master_connector: Arc, + handle_master_message_task: tokio::sync::Mutex>>, } impl MasterHandlerImpl { - pub fn new(settings: impl Into>) -> Self { - Self { + pub fn new( + settings: impl Into>, + nginx_handler: impl Into>, + master_connector: impl Into>, + ) -> Arc { + Arc::new(Self { settings: settings.into(), + nginx_handler: nginx_handler.into(), + master_connector: master_connector.into(), + handle_master_message_task: tokio::sync::Mutex::new(None), + }) + } + + async fn retry_with_delay( + retry_count: &mut u32, + max_retries: u32, + base_delay: std::time::Duration, + ) -> Result<()> { + if *retry_count < max_retries { + let backoff_delay = 2u32.pow(*retry_count) * base_delay; + warn!( + "Retrying connection to master in {:?} (attempt {}/{})...", + backoff_delay, + *retry_count + 1, + max_retries + ); + sleep(backoff_delay).await; + *retry_count += 1; + Ok(()) + } else { + error!("Exceeded maximum retry attempts to connect to master. Giving up."); + bail!("Failed to connect to master after maximum retry attempts.") + } + } + + async fn handle_master_message_task( + handler_clone: Arc, + master_connector: Arc, + ) { + let mut retry_count: u32 = 0; + let max_retries: u32 = 5; + let base_retry_delay = std::time::Duration::from_secs(5); + + 'connection_loop: loop { + let (tx, rx) = tokio::sync::mpsc::channel(128); + let outbound = ReceiverStream::new(rx); + + let request = AgentMessage { + // TODO: get agent ID from settings or generate a unique ID for this agent + agent_id: "TODO".to_string(), + timestamp: chrono::Utc::now().timestamp_millis(), + payload: None, + }; + let response = master_connector + .get_client() + .lock() + .await + .stream(Request::new(outbound)) + .await; + + if let Err(e) = tx.send(request).await { + error!("Failed to send initial message to master: {:?}", e); + match Self::retry_with_delay(&mut retry_count, max_retries, base_retry_delay).await + { + Ok(()) => continue 'connection_loop, + Err(e) => { + error!("Failed to connect to master after retrying: {:?}", e); + break 'connection_loop; + } + } + } + + let mut inbound = match response { + Ok(res) => res.into_inner(), + Err(e) => { + info!("Failed to connect to master: {:?}", e); + match Self::retry_with_delay(&mut retry_count, max_retries, base_retry_delay) + .await + { + Ok(()) => continue 'connection_loop, + Err(e) => { + error!("Failed to connect to master after retrying: {:?}", e); + break 'connection_loop; + } + } + } + }; + + retry_count = 0; // reset retry count on successful connection + + loop { + match inbound.message().await { + Ok(Some(message)) => { + Self::handle_inbound_message(handler_clone.clone(), message).await; + } + Ok(None) => { + warn!("Master closed the connection"); + match Self::retry_with_delay( + &mut retry_count, + max_retries, + base_retry_delay, + ) + .await + { + Ok(()) => continue 'connection_loop, + Err(e) => { + error!("Failed to connect to master after retrying: {:?}", e); + break 'connection_loop; + } + } + } + Err(e) => { + warn!("Failed to receive message from master: {:?}", e); + match Self::retry_with_delay( + &mut retry_count, + max_retries, + base_retry_delay, + ) + .await + { + Ok(()) => continue 'connection_loop, + Err(e) => { + error!("Failed to connect to master after retrying: {:?}", e); + break 'connection_loop; + } + } + } + } + } + } + } + + async fn handle_inbound_message( + handler_clone: Arc, + message: MasterMessage, + ) -> () { + if let Some(payload) = message.payload { + match payload { + nxmesh_proto::master_message::Payload::ConfigUpdate(config_info) => { + handler_clone + .on_config_update(config_info) + .await + .unwrap_or_else(|e| { + error!("Failed to handle config update from master: {:?}", e); + }); + } + _ => { + warn!("Received unsupported message from master: {:?}", payload); + } + } + } else { + warn!("Received message from master with empty payload"); } } } #[async_trait::async_trait] impl MasterHandler for MasterHandlerImpl { - async fn on_config_update( - &self, - config_info: ConfigUpdate, - ) -> Result<(), Box> { + async fn start_handle_master_message(self: Arc) -> Result<()> { + if let Some(handle) = self.handle_master_message_task.lock().await.as_ref() { + bail!( + "Master message handler is already running with task id: {:?}", + handle.id() + ); + } + + // Create a clone of the Arc and upcast to the trait object + let handler_clone: Arc = self.clone(); + let master_connector = self.master_connector.clone(); + + let handle = tokio::spawn(async move { + Self::handle_master_message_task(handler_clone, master_connector).await + }); + + let mut guard = self.handle_master_message_task.lock().await; + *guard = Some(handle); + + Ok(()) + } + + async fn stop_handle_master_message(self: Arc) -> Result<()> { + // 1. signal the task to stop (e.g. using a cancellation token or channel) + // 2. wait for the task to finish and handle any errors + // 3. set handle_master_message_task to None + + let mut guard = self.handle_master_message_task.lock().await; + if let Some(handle) = guard.take() { + handle.abort(); + match handle.await { + Ok(_) => info!("Master message handler task stopped successfully"), + Err(e) => error!("Failed to stop master message handler task: {:?}", e), + } + } else { + warn!("Master message handler is not running"); + } + Ok(()) + } +} + +#[async_trait::async_trait] +impl MasterMessageHandler for MasterHandlerImpl { + async fn on_config_update(&self, config_info: ConfigUpdate) -> Result<()> { info!("Received config update from master: {:?}", config_info); + let deployment_id = format!("{}__{}", config_info.config_id, config_info.version); + + let nginx_root_path = self + .nginx_handler + .write_config( + &deployment_id, + &config_info + .root_config + .as_ref() + .context("Root config is required in the config update")? + .content, + "nginx.conf", + ) + .await?; + + // create files and write the config content to the files + for config in config_info.configs { + self.nginx_handler + .write_config(&deployment_id, &config.content, &config.path) + .await?; + } + + // TODO: handle certificates + + // test the new config before reloading nginx + self.nginx_handler + .validate(Some(nginx_root_path.as_str())) + .await + .context("Failed to validate the new nginx config")?; + + // reload nginx to apply the new config + self.nginx_handler + .reload(Some(nginx_root_path.as_str())) + .await + .context("Failed to reload nginx with the new config")?; + Ok(()) } } diff --git a/apps/nxmesh-agent/src/service/mod.rs b/apps/nxmesh-agent/src/service/mod.rs index 87747d2..973457b 100644 --- a/apps/nxmesh-agent/src/service/mod.rs +++ b/apps/nxmesh-agent/src/service/mod.rs @@ -1 +1,2 @@ +pub mod master_handler; pub mod nginx_handler;