From d6a829c52941bc702ef8b2630652bd0c89077354 Mon Sep 17 00:00:00 2001 From: GW_MC <72297530+GWMCwing@users.noreply.github.com> Date: Mon, 1 Jun 2026 10:04:09 +0000 Subject: [PATCH] refactor: enhance handler traits to support dynamic sizing and improve cancellation handling --- Cargo.lock | 3 + apps/nxmesh-agent/Cargo.toml | 1 + .../src/service/master_handler/handlers.rs | 6 +- .../src/service/master_handler/mod.rs | 163 ++++++++++++------ 4 files changed, 118 insertions(+), 55 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 56998a6..511fc4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2589,6 +2589,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-test", + "tokio-util", "toml", "tonic", "tracing", @@ -2613,6 +2614,7 @@ dependencies = [ name = "nxmesh-master" version = "0.1.0" dependencies = [ + "anyhow", "argon2", "async-stream", "async-trait", @@ -2641,6 +2643,7 @@ dependencies = [ "thiserror", "time", "tokio", + "tokio-stream", "tokio-test", "toml", "tonic", diff --git a/apps/nxmesh-agent/Cargo.toml b/apps/nxmesh-agent/Cargo.toml index 5d04197..4387355 100644 --- a/apps/nxmesh-agent/Cargo.toml +++ b/apps/nxmesh-agent/Cargo.toml @@ -33,6 +33,7 @@ tonic.workspace = true async-trait.workspace = true futures.workspace = true tokio-stream.workspace = true +tokio-util = "0.7" # Config config.workspace = true diff --git a/apps/nxmesh-agent/src/service/master_handler/handlers.rs b/apps/nxmesh-agent/src/service/master_handler/handlers.rs index 0e79409..4144154 100644 --- a/apps/nxmesh-agent/src/service/master_handler/handlers.rs +++ b/apps/nxmesh-agent/src/service/master_handler/handlers.rs @@ -17,14 +17,14 @@ pub trait OnConfigUpdateHandler: Send + Sync + 'static { pub struct HandlerImpl where - OCH: OnConfigUpdateHandler, + OCH: OnConfigUpdateHandler + ?Sized, { on_config_update_handler: Arc, } impl HandlerImpl where - OCH: OnConfigUpdateHandler, + OCH: OnConfigUpdateHandler + ?Sized, { pub fn new(on_config_update_handler: Arc) -> Self { Self { @@ -36,7 +36,7 @@ where #[async_trait::async_trait] impl MasterMessageHandler for HandlerImpl where - OCH: OnConfigUpdateHandler, + OCH: OnConfigUpdateHandler + ?Sized, { async fn handle_master_message(&self, message: MasterMessage) -> MessageResult<()> { match message.payload { diff --git a/apps/nxmesh-agent/src/service/master_handler/mod.rs b/apps/nxmesh-agent/src/service/master_handler/mod.rs index cb27f89..178d2ab 100644 --- a/apps/nxmesh-agent/src/service/master_handler/mod.rs +++ b/apps/nxmesh-agent/src/service/master_handler/mod.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use nxmesh_proto::AgentMessage; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; +use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; use crate::{ @@ -35,13 +36,14 @@ pub trait MasterHandler: Send + Sync + 'static { } struct MessageHandleInfo { - handle: tokio::task::JoinHandle<()>, tx: mpsc::Sender, + // used to signal the running handler/connection to stop + cancel: CancellationToken, } pub struct MasterHandlerImpl where - MMH: MasterMessageHandler, + MMH: MasterMessageHandler + ?Sized, { connector: Arc, message_handler: Arc, @@ -50,7 +52,7 @@ where impl MasterHandlerImpl where - MMH: MasterMessageHandler, + MMH: MasterMessageHandler + ?Sized, { pub fn new(connector: Arc, message_handler: Arc) -> Self { Self { @@ -64,29 +66,35 @@ where #[async_trait::async_trait] impl MasterHandler for MasterHandlerImpl where - MMH: MasterMessageHandler, + MMH: MasterMessageHandler + ?Sized, { async fn start_handle_master_message(&self) -> MessageResult<()> { + info!("Starting master message handler..."); let mut client = self.connector.get_client(); + // ensure only one caller can start the handler + // create the cancel token for the lifetime of this handler invocation + let cancel_token = CancellationToken::new(); + { + let mut guard = self.message_handle_lock.write().await; + if guard.is_some() { + warn!("Master message handler is already running"); + return Ok(()); + } + // placeholder tx; will be replaced per-connection + let (tx, _rx) = mpsc::channel(1); + *guard = Some(MessageHandleInfo { + tx, + cancel: cancel_token.clone(), + }); + } + 'connection_loop: loop { - let guard_result = self.message_handle_lock.try_write(); - let mut guard = match guard_result { - Ok(g) if g.is_none() => g, - Ok(_) => { - warn!("Master message handler is already running"); - return Ok(()); - } - Err(e) => { - return Err(MasterHandlerError::MessageHandlingError(format!( - "Failed to acquire lock for message handler: {}", - e - ))); - } - }; + // fresh outbound channel per connection let (tx, rx) = mpsc::channel(32); let outbound_stream = ReceiverStream::new(rx); - // 2. Connect to the master and start the bi-directional streaming RPC + + // try to connect let mut stream = match client.stream(outbound_stream).await { Ok(s) => s.into_inner(), Err(e) => { @@ -94,50 +102,101 @@ where "Failed to connect to master: {}. Retrying in 5 seconds...", e ); - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - continue 'connection_loop; + // update stored sender so any callers see the current tx + { + let mut guard = self.message_handle_lock.write().await; + if let Some(info) = guard.as_mut() { + info.tx = tx.clone(); + } + } + let conn_token = cancel_token.child_token(); + tokio::select! { + _ = conn_token.cancelled() => break 'connection_loop, + _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => continue 'connection_loop, + } } }; - - // 3. Spawn a task to handle incoming messages from the master - let message_handler = self.message_handler.clone(); - let handle = tokio::spawn(async move { - 'message_loop: loop { - let message = stream.message().await; - match message { - Ok(Some(msg)) => { - if let Err(e) = message_handler.handle_master_message(msg).await { - error!("Failed to handle master message: {:?}", e); + // store current tx so senders can use it + { + let mut guard = self.message_handle_lock.write().await; + if let Some(info) = guard.as_mut() { + info.tx = tx.clone(); + } + } + // connection-level token to observe stop requests + let conn_token = cancel_token.child_token(); + info!("Connected to master, starting to receive messages..."); + // process messages inline so we can clear the slot on exit + 'message_processing: loop { + tokio::select! { + _ = conn_token.cancelled() => { + info!("Stop requested for master handler"); + break 'connection_loop; + } + message = stream.message() => { + match message { + Ok(Some(msg)) => { + if let Err(e) = self.message_handler.handle_master_message(msg).await { + error!("Failed to handle master message: {:?}", e); + } + continue; + } + Ok(None) => { + warn!("Master closed the connection"); + break 'message_processing; + } + Err(e) => { + error!("Error receiving message from master: {:?}", e); + break 'message_processing; } - continue 'message_loop; - } - Ok(None) => { - warn!("Master closed the connection"); - return; - } - Err(e) => { - error!("Error receiving message from master: {:?}", e); - return; } } } - }); - *guard = Some(MessageHandleInfo { handle, tx }); + } + + // connection ended — clear stored info + { + let mut guard = self.message_handle_lock.write().await; + guard.take(); + } + + // if stop requested, exit + if cancel_token.is_cancelled() { + break 'connection_loop; + } + + // otherwise reconnect after backoff + tokio::time::sleep(std::time::Duration::from_secs(5)).await; } + + // final cleanup + let mut guard = self.message_handle_lock.write().await; + guard.take(); + Ok(()) } async fn stop_handle_master_message(&self) -> MessageResult<()> { - // 1. signal the task to stop (e.g. using a cancellation token or channel) - // 2. wait for the task to finish and handle any errors - // 3. set handle_master_message_task to None - - let mut guard = self.message_handle_lock.write().await; - if let Some(handle_info) = guard.take() { - handle_info.handle.abort(); - match handle_info.handle.await { - Ok(_) => info!("Master message handler task stopped successfully"), - Err(e) => error!("Failed to stop master message handler task: {:?}", e), + // Signal the running handler to stop and wait for it to clear + let mut maybe_cancel = None; + { + let mut guard = self.message_handle_lock.write().await; + if let Some(info) = guard.take() { + maybe_cancel = Some(info.cancel); } + } + + if let Some(cancel) = maybe_cancel { + cancel.cancel(); + + // wait for the handler to clear (with timeout) + for _ in 0..50 { + if self.message_handle_lock.read().await.is_none() { + info!("Master message handler task stopped successfully"); + return Ok(()); + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + warn!("Timed out waiting for master message handler to stop"); } else { warn!("Master message handler is not running"); }