diff --git a/apps/nxmesh-agent/src/main.rs b/apps/nxmesh-agent/src/main.rs index 9fce337..d18ba08 100644 --- a/apps/nxmesh-agent/src/main.rs +++ b/apps/nxmesh-agent/src/main.rs @@ -61,7 +61,7 @@ async fn main() { } // send a dummy heartbeat to verify the connection is working - let mut client = master_connector.get_client().lock().await.clone(); + let mut client = master_connector.get_client().clone(); let request = nxmesh_proto::TestRequest { ..Default::default() diff --git a/apps/nxmesh-agent/src/service/master_handler.rs b/apps/nxmesh-agent/src/service/master_handler.rs deleted file mode 100644 index 083c6a4..0000000 --- a/apps/nxmesh-agent/src/service/master_handler.rs +++ /dev/null @@ -1,275 +0,0 @@ -use std::sync::Arc; - -use anyhow::{Context, Result, bail}; -use nxmesh_proto::{AgentMessage, ConfigUpdate, MasterMessage}; -use tokio::time::sleep; -use tokio_stream::wrappers::ReceiverStream; -use tonic::Request; -use tracing::{error, info, warn}; - -use crate::connector::master::{MasterConnector, MasterConnectorTrait}; - -#[async_trait::async_trait] -pub trait MasterHandler: Send + Sync { - // Create a new routine to handle incoming messages from the master - // This method will auto-reconnect if the connection is lost, so it should run indefinitely until the agent is shut down - async fn start_handle_master_message(self: Arc) -> Result<()>; - async fn stop_handle_master_message(self: Arc) -> Result<()>; -} - -#[async_trait::async_trait] -trait MasterMessageHandler: Send + Sync { - async fn on_config_update(&self, config_info: ConfigUpdate) -> Result<()>; -} - -pub struct MasterHandlerImpl { - settings: Arc, - nginx_handler: Arc, - master_connector: Arc, - handle_master_message_task: tokio::sync::Mutex>>, -} - -impl MasterHandlerImpl { - pub fn new( - settings: impl Into>, - nginx_handler: impl Into>, - master_connector: impl Into>, - ) -> Arc { - Arc::new(Self { - settings: settings.into(), - nginx_handler: nginx_handler.into(), - master_connector: master_connector.into(), - handle_master_message_task: tokio::sync::Mutex::new(None), - }) - } - - async fn retry_with_delay( - retry_count: &mut u32, - max_retries: u32, - base_delay: std::time::Duration, - ) -> Result<()> { - if *retry_count < max_retries { - let backoff_delay = 2u32.pow(*retry_count) * base_delay; - warn!( - "Retrying connection to master in {:?} (attempt {}/{})...", - backoff_delay, - *retry_count + 1, - max_retries - ); - sleep(backoff_delay).await; - *retry_count += 1; - Ok(()) - } else { - error!("Exceeded maximum retry attempts to connect to master. Giving up."); - bail!("Failed to connect to master after maximum retry attempts.") - } - } - - async fn handle_master_message_task( - handler_clone: Arc, - master_connector: Arc, - ) { - let mut retry_count: u32 = 0; - let max_retries: u32 = 5; - let base_retry_delay = std::time::Duration::from_secs(5); - - 'connection_loop: loop { - let (tx, rx) = tokio::sync::mpsc::channel(128); - let outbound = ReceiverStream::new(rx); - - let request = AgentMessage { - // TODO: get agent ID from settings or generate a unique ID for this agent - agent_id: "TODO".to_string(), - timestamp: chrono::Utc::now().timestamp_millis(), - payload: None, - }; - let response = master_connector - .get_client() - .lock() - .await - .stream(Request::new(outbound)) - .await; - - if let Err(e) = tx.send(request).await { - error!("Failed to send initial message to master: {:?}", e); - match Self::retry_with_delay(&mut retry_count, max_retries, base_retry_delay).await - { - Ok(()) => continue 'connection_loop, - Err(e) => { - error!("Failed to connect to master after retrying: {:?}", e); - break 'connection_loop; - } - } - } - - let mut inbound = match response { - Ok(res) => res.into_inner(), - Err(e) => { - info!("Failed to connect to master: {:?}", e); - match Self::retry_with_delay(&mut retry_count, max_retries, base_retry_delay) - .await - { - Ok(()) => continue 'connection_loop, - Err(e) => { - error!("Failed to connect to master after retrying: {:?}", e); - break 'connection_loop; - } - } - } - }; - - retry_count = 0; // reset retry count on successful connection - - loop { - match inbound.message().await { - Ok(Some(message)) => { - Self::handle_inbound_message(handler_clone.clone(), message).await; - } - Ok(None) => { - warn!("Master closed the connection"); - match Self::retry_with_delay( - &mut retry_count, - max_retries, - base_retry_delay, - ) - .await - { - Ok(()) => continue 'connection_loop, - Err(e) => { - error!("Failed to connect to master after retrying: {:?}", e); - break 'connection_loop; - } - } - } - Err(e) => { - warn!("Failed to receive message from master: {:?}", e); - match Self::retry_with_delay( - &mut retry_count, - max_retries, - base_retry_delay, - ) - .await - { - Ok(()) => continue 'connection_loop, - Err(e) => { - error!("Failed to connect to master after retrying: {:?}", e); - break 'connection_loop; - } - } - } - } - } - } - } - - async fn handle_inbound_message( - handler_clone: Arc, - message: MasterMessage, - ) -> () { - if let Some(payload) = message.payload { - match payload { - nxmesh_proto::master_message::Payload::ConfigUpdate(config_info) => { - handler_clone - .on_config_update(config_info) - .await - .unwrap_or_else(|e| { - error!("Failed to handle config update from master: {:?}", e); - }); - } - _ => { - warn!("Received unsupported message from master: {:?}", payload); - } - } - } else { - warn!("Received message from master with empty payload"); - } - } -} - -#[async_trait::async_trait] -impl MasterHandler for MasterHandlerImpl { - async fn start_handle_master_message(self: Arc) -> Result<()> { - if let Some(handle) = self.handle_master_message_task.lock().await.as_ref() { - bail!( - "Master message handler is already running with task id: {:?}", - handle.id() - ); - } - - // Create a clone of the Arc and upcast to the trait object - let handler_clone: Arc = self.clone(); - let master_connector = self.master_connector.clone(); - - let handle = tokio::spawn(async move { - Self::handle_master_message_task(handler_clone, master_connector).await - }); - - let mut guard = self.handle_master_message_task.lock().await; - *guard = Some(handle); - - Ok(()) - } - - async fn stop_handle_master_message(self: Arc) -> Result<()> { - // 1. signal the task to stop (e.g. using a cancellation token or channel) - // 2. wait for the task to finish and handle any errors - // 3. set handle_master_message_task to None - - let mut guard = self.handle_master_message_task.lock().await; - if let Some(handle) = guard.take() { - handle.abort(); - match handle.await { - Ok(_) => info!("Master message handler task stopped successfully"), - Err(e) => error!("Failed to stop master message handler task: {:?}", e), - } - } else { - warn!("Master message handler is not running"); - } - Ok(()) - } -} - -#[async_trait::async_trait] -impl MasterMessageHandler for MasterHandlerImpl { - async fn on_config_update(&self, config_info: ConfigUpdate) -> Result<()> { - info!("Received config update from master: {:?}", config_info); - - let deployment_id = format!("{}__{}", config_info.config_id, config_info.version); - - let nginx_root_path = self - .nginx_handler - .write_config( - &deployment_id, - &config_info - .root_config - .as_ref() - .context("Root config is required in the config update")? - .content, - "nginx.conf", - ) - .await?; - - // create files and write the config content to the files - for config in config_info.configs { - self.nginx_handler - .write_config(&deployment_id, &config.content, &config.path) - .await?; - } - - // TODO: handle certificates - - // test the new config before reloading nginx - self.nginx_handler - .validate(Some(nginx_root_path.as_str())) - .await - .context("Failed to validate the new nginx config")?; - - // reload nginx to apply the new config - self.nginx_handler - .reload(Some(nginx_root_path.as_str())) - .await - .context("Failed to reload nginx with the new config")?; - - Ok(()) - } -} diff --git a/apps/nxmesh-agent/src/service/master_handler/handlers.rs b/apps/nxmesh-agent/src/service/master_handler/handlers.rs new file mode 100644 index 0000000..0e79409 --- /dev/null +++ b/apps/nxmesh-agent/src/service/master_handler/handlers.rs @@ -0,0 +1,62 @@ +use std::sync::Arc; + +use nxmesh_proto::{AgentMessage, ConfigUpdate, MasterMessage, master_message::Payload}; + +use crate::service::master_handler::{MasterHandlerError, MessageResult}; + +#[async_trait::async_trait] +pub trait MasterMessageHandler: Send + Sync + 'static { + async fn handle_master_message(&self, message: MasterMessage) -> MessageResult<()>; +} + +#[async_trait::async_trait] +pub trait OnConfigUpdateHandler: Send + Sync + 'static { + // Handle the config update message from master, write the config content to files, validate the new config and reload nginx + async fn on_config_update(&self, config_info: ConfigUpdate) -> MessageResult<()>; +} + +pub struct HandlerImpl +where + OCH: OnConfigUpdateHandler, +{ + on_config_update_handler: Arc, +} + +impl HandlerImpl +where + OCH: OnConfigUpdateHandler, +{ + pub fn new(on_config_update_handler: Arc) -> Self { + Self { + on_config_update_handler, + } + } +} + +#[async_trait::async_trait] +impl MasterMessageHandler for HandlerImpl +where + OCH: OnConfigUpdateHandler, +{ + async fn handle_master_message(&self, message: MasterMessage) -> MessageResult<()> { + match message.payload { + Some(Payload::ConfigUpdate(config_info)) => { + self.on_config_update_handler + .on_config_update(config_info) + .await + } + Some(_) => { + // We should never receive other types of messages from the master, but we should handle it anyway + Err(MasterHandlerError::MessageHandlingError( + "Received unsupported master message type".to_string(), + )) + } + None => { + // This should never happen as the master should always send a valid message, but we should handle it anyway + return Err(MasterHandlerError::MessageHandlingError( + "Received master message with empty payload".to_string(), + )); + } + } + } +} diff --git a/apps/nxmesh-agent/src/service/master_handler/mod.rs b/apps/nxmesh-agent/src/service/master_handler/mod.rs new file mode 100644 index 0000000..cb27f89 --- /dev/null +++ b/apps/nxmesh-agent/src/service/master_handler/mod.rs @@ -0,0 +1,163 @@ +use std::sync::Arc; + +use nxmesh_proto::AgentMessage; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tracing::{error, info, warn}; + +use crate::{ + connector::master::{MasterConnector, MasterConnectorTrait}, + service::master_handler::handlers::MasterMessageHandler, +}; + +pub mod handlers; + +#[derive(Debug)] +pub enum MasterHandlerError { + ConnectionError(String), + // TODO: should be protobuf error to transmit the error to master + MessageHandlingError(String), + RetryLimitExceeded(String), + SendMessageError(String), +} + +pub type MessageResult = std::result::Result; + +#[async_trait::async_trait] +pub trait MasterHandler: Send + Sync + 'static { + // Create a new routine to handle incoming messages from the master + // This method will auto-reconnect if the connection is lost, so it should run indefinitely until the agent is shut down + async fn start_handle_master_message(&self) -> MessageResult<()>; + async fn stop_handle_master_message(&self) -> MessageResult<()>; + + // Send a message to the master, response should be handled by the agent message handler registered + async fn send_message_to_master(&self, message: AgentMessage) -> MessageResult<()>; +} + +struct MessageHandleInfo { + handle: tokio::task::JoinHandle<()>, + tx: mpsc::Sender, +} + +pub struct MasterHandlerImpl +where + MMH: MasterMessageHandler, +{ + connector: Arc, + message_handler: Arc, + message_handle_lock: tokio::sync::RwLock>, +} + +impl MasterHandlerImpl +where + MMH: MasterMessageHandler, +{ + pub fn new(connector: Arc, message_handler: Arc) -> Self { + Self { + connector, + message_handler, + message_handle_lock: tokio::sync::RwLock::new(None), + } + } +} + +#[async_trait::async_trait] +impl MasterHandler for MasterHandlerImpl +where + MMH: MasterMessageHandler, +{ + async fn start_handle_master_message(&self) -> MessageResult<()> { + let mut client = self.connector.get_client(); + + 'connection_loop: loop { + let guard_result = self.message_handle_lock.try_write(); + let mut guard = match guard_result { + Ok(g) if g.is_none() => g, + Ok(_) => { + warn!("Master message handler is already running"); + return Ok(()); + } + Err(e) => { + return Err(MasterHandlerError::MessageHandlingError(format!( + "Failed to acquire lock for message handler: {}", + e + ))); + } + }; + let (tx, rx) = mpsc::channel(32); + let outbound_stream = ReceiverStream::new(rx); + // 2. Connect to the master and start the bi-directional streaming RPC + let mut stream = match client.stream(outbound_stream).await { + Ok(s) => s.into_inner(), + Err(e) => { + error!( + "Failed to connect to master: {}. Retrying in 5 seconds...", + e + ); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + continue 'connection_loop; + } + }; + + // 3. Spawn a task to handle incoming messages from the master + let message_handler = self.message_handler.clone(); + let handle = tokio::spawn(async move { + 'message_loop: loop { + let message = stream.message().await; + match message { + Ok(Some(msg)) => { + if let Err(e) = message_handler.handle_master_message(msg).await { + error!("Failed to handle master message: {:?}", e); + } + continue 'message_loop; + } + Ok(None) => { + warn!("Master closed the connection"); + return; + } + Err(e) => { + error!("Error receiving message from master: {:?}", e); + return; + } + } + } + }); + *guard = Some(MessageHandleInfo { handle, tx }); + } + } + + async fn stop_handle_master_message(&self) -> MessageResult<()> { + // 1. signal the task to stop (e.g. using a cancellation token or channel) + // 2. wait for the task to finish and handle any errors + // 3. set handle_master_message_task to None + + let mut guard = self.message_handle_lock.write().await; + if let Some(handle_info) = guard.take() { + handle_info.handle.abort(); + match handle_info.handle.await { + Ok(_) => info!("Master message handler task stopped successfully"), + Err(e) => error!("Failed to stop master message handler task: {:?}", e), + } + } else { + warn!("Master message handler is not running"); + } + Ok(()) + } + + async fn send_message_to_master(&self, message: AgentMessage) -> MessageResult<()> { + let guard = self.message_handle_lock.read().await; + if let Some(handle_info) = guard.as_ref() { + handle_info.tx.send(message).await.map_err(|e| { + MasterHandlerError::SendMessageError(format!( + "Failed to send message to master: {}", + e + )) + })?; + } else { + return Err(MasterHandlerError::SendMessageError( + "Master message handler is not running".to_string(), + )); + } + Ok(()) + } +}