From 860b0f077d1ae6f51206ee0a9112481c10ab1583 Mon Sep 17 00:00:00 2001 From: GW_MC <72297530+GWMCwing@users.noreply.github.com> Date: Tue, 3 Mar 2026 09:01:14 +0000 Subject: [PATCH] feat: Add tokio-stream dependency and update gRPC client for bidirectional streaming --- Cargo.lock | 1 + Cargo.toml | 1 + config/development.toml | 2 +- crates/nxmesh-agent/Cargo.toml | 1 + crates/nxmesh-agent/src/master/client.rs | 114 ++++++++++++++++++++--- 5 files changed, 106 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c9e39f3..66d6d65 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2437,6 +2437,7 @@ dependencies = [ "sha2", "thiserror 1.0.69", "tokio", + "tokio-stream", "tokio-test", "toml", "tonic", diff --git a/Cargo.toml b/Cargo.toml index b4a4980..3cd5042 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,7 @@ sea-orm-migration = "2.0.0-rc" # Async async-trait = "0.1" futures = "0.3" +tokio-stream = "0.1" # Configuration toml = "0.8" diff --git a/config/development.toml b/config/development.toml index 26abdc4..d3ce842 100644 --- a/config/development.toml +++ b/config/development.toml @@ -19,5 +19,5 @@ name = "development-agent" data_dir = "./agent-runtime-data" [master] -url = "http://localhost:8080" +url = "http://localhost:8443" token = "token" diff --git a/crates/nxmesh-agent/Cargo.toml b/crates/nxmesh-agent/Cargo.toml index a845e61..ce13f78 100644 --- a/crates/nxmesh-agent/Cargo.toml +++ b/crates/nxmesh-agent/Cargo.toml @@ -35,6 +35,7 @@ reqwest.workspace = true # Async async-trait.workspace = true futures.workspace = true +tokio-stream.workspace = true # Config config.workspace = true diff --git a/crates/nxmesh-agent/src/master/client.rs b/crates/nxmesh-agent/src/master/client.rs index 6fc4f9a..a378a39 100644 --- a/crates/nxmesh-agent/src/master/client.rs +++ b/crates/nxmesh-agent/src/master/client.rs @@ -9,6 +9,7 @@ use uuid::Uuid; use nxmesh_proto::{ agent_service_client::AgentServiceClient, agent_message, + master_message, AgentMessage, ConfigUpdate, HealthReport, MasterMessage, RegistrationRequest, }; @@ -128,14 +129,13 @@ impl MasterClient { let agent_id = self.get_agent_id().await?; info!("Starting bidirectional stream for agent {}", agent_id); - // Create channels for the stream - let (outgoing_tx, _outgoing_rx) = mpsc::channel::(100); - let (_incoming_tx, _incoming_rx) = mpsc::channel::(100); + // Create channel for outgoing messages (Agent -> Master) + let (outgoing_tx, outgoing_rx) = mpsc::channel::(100); // Update the tx channel self.tx = outgoing_tx.clone(); - // Spawn task to handle incoming health reports + // Spawn task to handle incoming health reports and forward to outgoing channel let outgoing_tx_clone = outgoing_tx.clone(); let agent_id_clone = agent_id.clone(); tokio::spawn(async move { @@ -152,26 +152,64 @@ impl MasterClient { } }); - // For now, simulate the stream handling - // In production, this would use the actual gRPC streaming + // Send initial registration message to start the stream + let registration_request = RegistrationRequest { + token: self.settings.token.clone(), + hostname: self.agent_settings.name.clone(), + ip_address: get_local_ip().unwrap_or_else(|| "127.0.0.1".to_string()), + version: env!("CARGO_PKG_VERSION").to_string(), + capabilities: vec![ + "nginx_management".to_string(), + "config_reload".to_string(), + "health_reporting".to_string(), + ], + labels: self.agent_settings.labels.clone(), + deployment_mode: match self.nginx_settings.deployment_mode.as_str() { + "docker_sidecar" => nxmesh_proto::DeploymentMode::DockerSidecar as i32, + "kubernetes_sidecar" => nxmesh_proto::DeploymentMode::KubernetesSidecar as i32, + "standalone" => nxmesh_proto::DeploymentMode::Standalone as i32, + _ => nxmesh_proto::DeploymentMode::DockerSidecar as i32, + }, + }; + + let initial_msg = AgentMessage { + agent_id: agent_id.clone(), + timestamp: chrono::Utc::now().timestamp(), + payload: Some(agent_message::Payload::Registration(registration_request)), + }; + + // Send the initial message to establish the stream + if let Err(e) = outgoing_tx.send(initial_msg).await { + return Err(format!("Failed to send initial message: {}", e).into()); + } + + // Convert the mpsc receiver into a stream for gRPC + let outgoing_stream = tokio_stream::wrappers::ReceiverStream::new(outgoing_rx); + + // Start the gRPC bidirectional stream + let mut stream = self.client.stream(outgoing_stream).await?.into_inner(); + info!("Stream started for agent {}", agent_id); - // Handle incoming messages in a separate task - let _config_handler = self.config_handler.clone(); + // Process incoming messages from master + let config_handler = self.config_handler.clone(); let state = self.state.clone(); + // Spawn heartbeat task + let heartbeat_tx = outgoing_tx.clone(); + let heartbeat_agent_id = agent_id.clone(); + let heartbeat_state = state.clone(); tokio::spawn(async move { - // Simulate receiving messages loop { tokio::time::sleep(tokio::time::Duration::from_secs(30)).await; // Check if still connected - let current_state = state.read().await.clone(); + let current_state = heartbeat_state.read().await.clone(); match current_state { ConnectionState::Connected { .. } => { // Send periodic heartbeat let heartbeat = AgentMessage { - agent_id: agent_id.clone(), + agent_id: heartbeat_agent_id.clone(), timestamp: chrono::Utc::now().timestamp(), payload: Some(agent_message::Payload::Event( nxmesh_proto::Event { @@ -182,7 +220,7 @@ impl MasterClient { }, )), }; - if outgoing_tx.send(heartbeat).await.is_err() { + if heartbeat_tx.send(heartbeat).await.is_err() { break; } } @@ -191,6 +229,25 @@ impl MasterClient { } }); + // Main loop: process incoming messages from master + loop { + match stream.message().await { + Ok(Some(master_msg)) => { + if let Err(e) = handle_master_message(master_msg, &config_handler).await { + warn!("Failed to handle master message: {}", e); + } + } + Ok(None) => { + info!("Stream closed by master"); + break; + } + Err(e) => { + warn!("Stream error: {}", e); + break; + } + } + } + Ok(()) } @@ -229,6 +286,39 @@ fn get_local_ip() -> Option { Some("127.0.0.1".to_string()) } +/// Handle incoming message from master +async fn handle_master_message( + msg: MasterMessage, + config_handler: &Arc, +) -> Result<(), Box> { + use master_message::Payload; + + match msg.payload { + Some(Payload::ConfigUpdate(config)) => { + info!("Received config update from master: version={}", config.version); + config_handler(config); + } + Some(Payload::Command(cmd)) => { + info!("Received command from master: command_id={}", cmd.command_id); + // TODO: Handle commands (reload, restart, etc.) + } + Some(Payload::Ack(ack)) => { + tracing::debug!("Received ACK from master: message_id={}", ack.message_id); + } + Some(Payload::Error(err)) => { + warn!("Received error from master: {} - {}", err.code, err.message); + } + Some(Payload::RegistrationResponse(resp)) => { + info!("Received registration response: success={}", resp.success); + } + None => { + warn!("Received empty master message"); + } + } + + Ok(()) +} + #[cfg(test)] mod tests { use super::*;