feat: Add tokio-stream dependency and update gRPC client for bidirectional streaming

This commit is contained in:
GW_MC
2026-03-03 09:01:14 +00:00
parent 08b28a2acf
commit 860b0f077d
5 changed files with 106 additions and 13 deletions

1
Cargo.lock generated
View File

@@ -2437,6 +2437,7 @@ dependencies = [
"sha2", "sha2",
"thiserror 1.0.69", "thiserror 1.0.69",
"tokio", "tokio",
"tokio-stream",
"tokio-test", "tokio-test",
"toml", "toml",
"tonic", "tonic",

View File

@@ -44,6 +44,7 @@ sea-orm-migration = "2.0.0-rc"
# Async # Async
async-trait = "0.1" async-trait = "0.1"
futures = "0.3" futures = "0.3"
tokio-stream = "0.1"
# Configuration # Configuration
toml = "0.8" toml = "0.8"

View File

@@ -19,5 +19,5 @@ name = "development-agent"
data_dir = "./agent-runtime-data" data_dir = "./agent-runtime-data"
[master] [master]
url = "http://localhost:8080" url = "http://localhost:8443"
token = "token" token = "token"

View File

@@ -35,6 +35,7 @@ reqwest.workspace = true
# Async # Async
async-trait.workspace = true async-trait.workspace = true
futures.workspace = true futures.workspace = true
tokio-stream.workspace = true
# Config # Config
config.workspace = true config.workspace = true

View File

@@ -9,6 +9,7 @@ use uuid::Uuid;
use nxmesh_proto::{ use nxmesh_proto::{
agent_service_client::AgentServiceClient, agent_service_client::AgentServiceClient,
agent_message, agent_message,
master_message,
AgentMessage, ConfigUpdate, HealthReport, MasterMessage, RegistrationRequest, AgentMessage, ConfigUpdate, HealthReport, MasterMessage, RegistrationRequest,
}; };
@@ -128,14 +129,13 @@ impl MasterClient {
let agent_id = self.get_agent_id().await?; let agent_id = self.get_agent_id().await?;
info!("Starting bidirectional stream for agent {}", agent_id); info!("Starting bidirectional stream for agent {}", agent_id);
// Create channels for the stream // Create channel for outgoing messages (Agent -> Master)
let (outgoing_tx, _outgoing_rx) = mpsc::channel::<AgentMessage>(100); let (outgoing_tx, outgoing_rx) = mpsc::channel::<AgentMessage>(100);
let (_incoming_tx, _incoming_rx) = mpsc::channel::<MasterMessage>(100);
// Update the tx channel // Update the tx channel
self.tx = outgoing_tx.clone(); self.tx = outgoing_tx.clone();
// Spawn task to handle incoming health reports // Spawn task to handle incoming health reports and forward to outgoing channel
let outgoing_tx_clone = outgoing_tx.clone(); let outgoing_tx_clone = outgoing_tx.clone();
let agent_id_clone = agent_id.clone(); let agent_id_clone = agent_id.clone();
tokio::spawn(async move { tokio::spawn(async move {
@@ -152,26 +152,64 @@ impl MasterClient {
} }
}); });
// For now, simulate the stream handling // Send initial registration message to start the stream
// In production, this would use the actual gRPC streaming let registration_request = RegistrationRequest {
token: self.settings.token.clone(),
hostname: self.agent_settings.name.clone(),
ip_address: get_local_ip().unwrap_or_else(|| "127.0.0.1".to_string()),
version: env!("CARGO_PKG_VERSION").to_string(),
capabilities: vec![
"nginx_management".to_string(),
"config_reload".to_string(),
"health_reporting".to_string(),
],
labels: self.agent_settings.labels.clone(),
deployment_mode: match self.nginx_settings.deployment_mode.as_str() {
"docker_sidecar" => nxmesh_proto::DeploymentMode::DockerSidecar as i32,
"kubernetes_sidecar" => nxmesh_proto::DeploymentMode::KubernetesSidecar as i32,
"standalone" => nxmesh_proto::DeploymentMode::Standalone as i32,
_ => nxmesh_proto::DeploymentMode::DockerSidecar as i32,
},
};
let initial_msg = AgentMessage {
agent_id: agent_id.clone(),
timestamp: chrono::Utc::now().timestamp(),
payload: Some(agent_message::Payload::Registration(registration_request)),
};
// Send the initial message to establish the stream
if let Err(e) = outgoing_tx.send(initial_msg).await {
return Err(format!("Failed to send initial message: {}", e).into());
}
// Convert the mpsc receiver into a stream for gRPC
let outgoing_stream = tokio_stream::wrappers::ReceiverStream::new(outgoing_rx);
// Start the gRPC bidirectional stream
let mut stream = self.client.stream(outgoing_stream).await?.into_inner();
info!("Stream started for agent {}", agent_id); info!("Stream started for agent {}", agent_id);
// Handle incoming messages in a separate task // Process incoming messages from master
let _config_handler = self.config_handler.clone(); let config_handler = self.config_handler.clone();
let state = self.state.clone(); let state = self.state.clone();
// Spawn heartbeat task
let heartbeat_tx = outgoing_tx.clone();
let heartbeat_agent_id = agent_id.clone();
let heartbeat_state = state.clone();
tokio::spawn(async move { tokio::spawn(async move {
// Simulate receiving messages
loop { loop {
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await; tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
// Check if still connected // Check if still connected
let current_state = state.read().await.clone(); let current_state = heartbeat_state.read().await.clone();
match current_state { match current_state {
ConnectionState::Connected { .. } => { ConnectionState::Connected { .. } => {
// Send periodic heartbeat // Send periodic heartbeat
let heartbeat = AgentMessage { let heartbeat = AgentMessage {
agent_id: agent_id.clone(), agent_id: heartbeat_agent_id.clone(),
timestamp: chrono::Utc::now().timestamp(), timestamp: chrono::Utc::now().timestamp(),
payload: Some(agent_message::Payload::Event( payload: Some(agent_message::Payload::Event(
nxmesh_proto::Event { nxmesh_proto::Event {
@@ -182,7 +220,7 @@ impl MasterClient {
}, },
)), )),
}; };
if outgoing_tx.send(heartbeat).await.is_err() { if heartbeat_tx.send(heartbeat).await.is_err() {
break; break;
} }
} }
@@ -191,6 +229,25 @@ impl MasterClient {
} }
}); });
// Main loop: process incoming messages from master
loop {
match stream.message().await {
Ok(Some(master_msg)) => {
if let Err(e) = handle_master_message(master_msg, &config_handler).await {
warn!("Failed to handle master message: {}", e);
}
}
Ok(None) => {
info!("Stream closed by master");
break;
}
Err(e) => {
warn!("Stream error: {}", e);
break;
}
}
}
Ok(()) Ok(())
} }
@@ -229,6 +286,39 @@ fn get_local_ip() -> Option<String> {
Some("127.0.0.1".to_string()) Some("127.0.0.1".to_string())
} }
/// Handle incoming message from master
async fn handle_master_message(
msg: MasterMessage,
config_handler: &Arc<dyn Fn(ConfigUpdate) + Send + Sync>,
) -> Result<(), Box<dyn std::error::Error>> {
use master_message::Payload;
match msg.payload {
Some(Payload::ConfigUpdate(config)) => {
info!("Received config update from master: version={}", config.version);
config_handler(config);
}
Some(Payload::Command(cmd)) => {
info!("Received command from master: command_id={}", cmd.command_id);
// TODO: Handle commands (reload, restart, etc.)
}
Some(Payload::Ack(ack)) => {
tracing::debug!("Received ACK from master: message_id={}", ack.message_id);
}
Some(Payload::Error(err)) => {
warn!("Received error from master: {} - {}", err.code, err.message);
}
Some(Payload::RegistrationResponse(resp)) => {
info!("Received registration response: success={}", resp.success);
}
None => {
warn!("Received empty master message");
}
}
Ok(())
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;