refactor: streamline master message handling by removing unnecessary Arc and Mutex usage, and reorganizing handler structure
This commit is contained in:
@@ -61,7 +61,7 @@ async fn main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// send a dummy heartbeat to verify the connection is working
|
// send a dummy heartbeat to verify the connection is working
|
||||||
let mut client = master_connector.get_client().lock().await.clone();
|
let mut client = master_connector.get_client().clone();
|
||||||
|
|
||||||
let request = nxmesh_proto::TestRequest {
|
let request = nxmesh_proto::TestRequest {
|
||||||
..Default::default()
|
..Default::default()
|
||||||
|
|||||||
@@ -1,275 +0,0 @@
|
|||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use anyhow::{Context, Result, bail};
|
|
||||||
use nxmesh_proto::{AgentMessage, ConfigUpdate, MasterMessage};
|
|
||||||
use tokio::time::sleep;
|
|
||||||
use tokio_stream::wrappers::ReceiverStream;
|
|
||||||
use tonic::Request;
|
|
||||||
use tracing::{error, info, warn};
|
|
||||||
|
|
||||||
use crate::connector::master::{MasterConnector, MasterConnectorTrait};
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
pub trait MasterHandler: Send + Sync {
|
|
||||||
// Create a new routine to handle incoming messages from the master
|
|
||||||
// This method will auto-reconnect if the connection is lost, so it should run indefinitely until the agent is shut down
|
|
||||||
async fn start_handle_master_message(self: Arc<Self>) -> Result<()>;
|
|
||||||
async fn stop_handle_master_message(self: Arc<Self>) -> Result<()>;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
trait MasterMessageHandler: Send + Sync {
|
|
||||||
async fn on_config_update(&self, config_info: ConfigUpdate) -> Result<()>;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct MasterHandlerImpl {
|
|
||||||
settings: Arc<crate::config::settings::Settings>,
|
|
||||||
nginx_handler: Arc<dyn crate::service::nginx_handler::NginxHandler>,
|
|
||||||
master_connector: Arc<MasterConnector>,
|
|
||||||
handle_master_message_task: tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MasterHandlerImpl {
|
|
||||||
pub fn new(
|
|
||||||
settings: impl Into<Arc<crate::config::settings::Settings>>,
|
|
||||||
nginx_handler: impl Into<Arc<dyn crate::service::nginx_handler::NginxHandler>>,
|
|
||||||
master_connector: impl Into<Arc<MasterConnector>>,
|
|
||||||
) -> Arc<Self> {
|
|
||||||
Arc::new(Self {
|
|
||||||
settings: settings.into(),
|
|
||||||
nginx_handler: nginx_handler.into(),
|
|
||||||
master_connector: master_connector.into(),
|
|
||||||
handle_master_message_task: tokio::sync::Mutex::new(None),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn retry_with_delay(
|
|
||||||
retry_count: &mut u32,
|
|
||||||
max_retries: u32,
|
|
||||||
base_delay: std::time::Duration,
|
|
||||||
) -> Result<()> {
|
|
||||||
if *retry_count < max_retries {
|
|
||||||
let backoff_delay = 2u32.pow(*retry_count) * base_delay;
|
|
||||||
warn!(
|
|
||||||
"Retrying connection to master in {:?} (attempt {}/{})...",
|
|
||||||
backoff_delay,
|
|
||||||
*retry_count + 1,
|
|
||||||
max_retries
|
|
||||||
);
|
|
||||||
sleep(backoff_delay).await;
|
|
||||||
*retry_count += 1;
|
|
||||||
Ok(())
|
|
||||||
} else {
|
|
||||||
error!("Exceeded maximum retry attempts to connect to master. Giving up.");
|
|
||||||
bail!("Failed to connect to master after maximum retry attempts.")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_master_message_task(
|
|
||||||
handler_clone: Arc<dyn MasterMessageHandler>,
|
|
||||||
master_connector: Arc<MasterConnector>,
|
|
||||||
) {
|
|
||||||
let mut retry_count: u32 = 0;
|
|
||||||
let max_retries: u32 = 5;
|
|
||||||
let base_retry_delay = std::time::Duration::from_secs(5);
|
|
||||||
|
|
||||||
'connection_loop: loop {
|
|
||||||
let (tx, rx) = tokio::sync::mpsc::channel(128);
|
|
||||||
let outbound = ReceiverStream::new(rx);
|
|
||||||
|
|
||||||
let request = AgentMessage {
|
|
||||||
// TODO: get agent ID from settings or generate a unique ID for this agent
|
|
||||||
agent_id: "TODO".to_string(),
|
|
||||||
timestamp: chrono::Utc::now().timestamp_millis(),
|
|
||||||
payload: None,
|
|
||||||
};
|
|
||||||
let response = master_connector
|
|
||||||
.get_client()
|
|
||||||
.lock()
|
|
||||||
.await
|
|
||||||
.stream(Request::new(outbound))
|
|
||||||
.await;
|
|
||||||
|
|
||||||
if let Err(e) = tx.send(request).await {
|
|
||||||
error!("Failed to send initial message to master: {:?}", e);
|
|
||||||
match Self::retry_with_delay(&mut retry_count, max_retries, base_retry_delay).await
|
|
||||||
{
|
|
||||||
Ok(()) => continue 'connection_loop,
|
|
||||||
Err(e) => {
|
|
||||||
error!("Failed to connect to master after retrying: {:?}", e);
|
|
||||||
break 'connection_loop;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut inbound = match response {
|
|
||||||
Ok(res) => res.into_inner(),
|
|
||||||
Err(e) => {
|
|
||||||
info!("Failed to connect to master: {:?}", e);
|
|
||||||
match Self::retry_with_delay(&mut retry_count, max_retries, base_retry_delay)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(()) => continue 'connection_loop,
|
|
||||||
Err(e) => {
|
|
||||||
error!("Failed to connect to master after retrying: {:?}", e);
|
|
||||||
break 'connection_loop;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
retry_count = 0; // reset retry count on successful connection
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match inbound.message().await {
|
|
||||||
Ok(Some(message)) => {
|
|
||||||
Self::handle_inbound_message(handler_clone.clone(), message).await;
|
|
||||||
}
|
|
||||||
Ok(None) => {
|
|
||||||
warn!("Master closed the connection");
|
|
||||||
match Self::retry_with_delay(
|
|
||||||
&mut retry_count,
|
|
||||||
max_retries,
|
|
||||||
base_retry_delay,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(()) => continue 'connection_loop,
|
|
||||||
Err(e) => {
|
|
||||||
error!("Failed to connect to master after retrying: {:?}", e);
|
|
||||||
break 'connection_loop;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Failed to receive message from master: {:?}", e);
|
|
||||||
match Self::retry_with_delay(
|
|
||||||
&mut retry_count,
|
|
||||||
max_retries,
|
|
||||||
base_retry_delay,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(()) => continue 'connection_loop,
|
|
||||||
Err(e) => {
|
|
||||||
error!("Failed to connect to master after retrying: {:?}", e);
|
|
||||||
break 'connection_loop;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_inbound_message(
|
|
||||||
handler_clone: Arc<dyn MasterMessageHandler>,
|
|
||||||
message: MasterMessage,
|
|
||||||
) -> () {
|
|
||||||
if let Some(payload) = message.payload {
|
|
||||||
match payload {
|
|
||||||
nxmesh_proto::master_message::Payload::ConfigUpdate(config_info) => {
|
|
||||||
handler_clone
|
|
||||||
.on_config_update(config_info)
|
|
||||||
.await
|
|
||||||
.unwrap_or_else(|e| {
|
|
||||||
error!("Failed to handle config update from master: {:?}", e);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
warn!("Received unsupported message from master: {:?}", payload);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
warn!("Received message from master with empty payload");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl MasterHandler for MasterHandlerImpl {
|
|
||||||
async fn start_handle_master_message(self: Arc<Self>) -> Result<()> {
|
|
||||||
if let Some(handle) = self.handle_master_message_task.lock().await.as_ref() {
|
|
||||||
bail!(
|
|
||||||
"Master message handler is already running with task id: {:?}",
|
|
||||||
handle.id()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a clone of the Arc<Self> and upcast to the trait object
|
|
||||||
let handler_clone: Arc<dyn MasterMessageHandler> = self.clone();
|
|
||||||
let master_connector = self.master_connector.clone();
|
|
||||||
|
|
||||||
let handle = tokio::spawn(async move {
|
|
||||||
Self::handle_master_message_task(handler_clone, master_connector).await
|
|
||||||
});
|
|
||||||
|
|
||||||
let mut guard = self.handle_master_message_task.lock().await;
|
|
||||||
*guard = Some(handle);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn stop_handle_master_message(self: Arc<Self>) -> Result<()> {
|
|
||||||
// 1. signal the task to stop (e.g. using a cancellation token or channel)
|
|
||||||
// 2. wait for the task to finish and handle any errors
|
|
||||||
// 3. set handle_master_message_task to None
|
|
||||||
|
|
||||||
let mut guard = self.handle_master_message_task.lock().await;
|
|
||||||
if let Some(handle) = guard.take() {
|
|
||||||
handle.abort();
|
|
||||||
match handle.await {
|
|
||||||
Ok(_) => info!("Master message handler task stopped successfully"),
|
|
||||||
Err(e) => error!("Failed to stop master message handler task: {:?}", e),
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
warn!("Master message handler is not running");
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl MasterMessageHandler for MasterHandlerImpl {
|
|
||||||
async fn on_config_update(&self, config_info: ConfigUpdate) -> Result<()> {
|
|
||||||
info!("Received config update from master: {:?}", config_info);
|
|
||||||
|
|
||||||
let deployment_id = format!("{}__{}", config_info.config_id, config_info.version);
|
|
||||||
|
|
||||||
let nginx_root_path = self
|
|
||||||
.nginx_handler
|
|
||||||
.write_config(
|
|
||||||
&deployment_id,
|
|
||||||
&config_info
|
|
||||||
.root_config
|
|
||||||
.as_ref()
|
|
||||||
.context("Root config is required in the config update")?
|
|
||||||
.content,
|
|
||||||
"nginx.conf",
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
// create files and write the config content to the files
|
|
||||||
for config in config_info.configs {
|
|
||||||
self.nginx_handler
|
|
||||||
.write_config(&deployment_id, &config.content, &config.path)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: handle certificates
|
|
||||||
|
|
||||||
// test the new config before reloading nginx
|
|
||||||
self.nginx_handler
|
|
||||||
.validate(Some(nginx_root_path.as_str()))
|
|
||||||
.await
|
|
||||||
.context("Failed to validate the new nginx config")?;
|
|
||||||
|
|
||||||
// reload nginx to apply the new config
|
|
||||||
self.nginx_handler
|
|
||||||
.reload(Some(nginx_root_path.as_str()))
|
|
||||||
.await
|
|
||||||
.context("Failed to reload nginx with the new config")?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
62
apps/nxmesh-agent/src/service/master_handler/handlers.rs
Normal file
62
apps/nxmesh-agent/src/service/master_handler/handlers.rs
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use nxmesh_proto::{AgentMessage, ConfigUpdate, MasterMessage, master_message::Payload};
|
||||||
|
|
||||||
|
use crate::service::master_handler::{MasterHandlerError, MessageResult};
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
pub trait MasterMessageHandler: Send + Sync + 'static {
|
||||||
|
async fn handle_master_message(&self, message: MasterMessage) -> MessageResult<()>;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
pub trait OnConfigUpdateHandler: Send + Sync + 'static {
|
||||||
|
// Handle the config update message from master, write the config content to files, validate the new config and reload nginx
|
||||||
|
async fn on_config_update(&self, config_info: ConfigUpdate) -> MessageResult<()>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct HandlerImpl<OCH>
|
||||||
|
where
|
||||||
|
OCH: OnConfigUpdateHandler,
|
||||||
|
{
|
||||||
|
on_config_update_handler: Arc<OCH>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<OCH> HandlerImpl<OCH>
|
||||||
|
where
|
||||||
|
OCH: OnConfigUpdateHandler,
|
||||||
|
{
|
||||||
|
pub fn new(on_config_update_handler: Arc<OCH>) -> Self {
|
||||||
|
Self {
|
||||||
|
on_config_update_handler,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl<OCH> MasterMessageHandler for HandlerImpl<OCH>
|
||||||
|
where
|
||||||
|
OCH: OnConfigUpdateHandler,
|
||||||
|
{
|
||||||
|
async fn handle_master_message(&self, message: MasterMessage) -> MessageResult<()> {
|
||||||
|
match message.payload {
|
||||||
|
Some(Payload::ConfigUpdate(config_info)) => {
|
||||||
|
self.on_config_update_handler
|
||||||
|
.on_config_update(config_info)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
Some(_) => {
|
||||||
|
// We should never receive other types of messages from the master, but we should handle it anyway
|
||||||
|
Err(MasterHandlerError::MessageHandlingError(
|
||||||
|
"Received unsupported master message type".to_string(),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
// This should never happen as the master should always send a valid message, but we should handle it anyway
|
||||||
|
return Err(MasterHandlerError::MessageHandlingError(
|
||||||
|
"Received master message with empty payload".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
163
apps/nxmesh-agent/src/service/master_handler/mod.rs
Normal file
163
apps/nxmesh-agent/src/service/master_handler/mod.rs
Normal file
@@ -0,0 +1,163 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use nxmesh_proto::AgentMessage;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use tokio_stream::wrappers::ReceiverStream;
|
||||||
|
use tracing::{error, info, warn};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
connector::master::{MasterConnector, MasterConnectorTrait},
|
||||||
|
service::master_handler::handlers::MasterMessageHandler,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub mod handlers;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum MasterHandlerError {
|
||||||
|
ConnectionError(String),
|
||||||
|
// TODO: should be protobuf error to transmit the error to master
|
||||||
|
MessageHandlingError(String),
|
||||||
|
RetryLimitExceeded(String),
|
||||||
|
SendMessageError(String),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type MessageResult<T> = std::result::Result<T, MasterHandlerError>;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
pub trait MasterHandler: Send + Sync + 'static {
|
||||||
|
// Create a new routine to handle incoming messages from the master
|
||||||
|
// This method will auto-reconnect if the connection is lost, so it should run indefinitely until the agent is shut down
|
||||||
|
async fn start_handle_master_message(&self) -> MessageResult<()>;
|
||||||
|
async fn stop_handle_master_message(&self) -> MessageResult<()>;
|
||||||
|
|
||||||
|
// Send a message to the master, response should be handled by the agent message handler registered
|
||||||
|
async fn send_message_to_master(&self, message: AgentMessage) -> MessageResult<()>;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct MessageHandleInfo {
|
||||||
|
handle: tokio::task::JoinHandle<()>,
|
||||||
|
tx: mpsc::Sender<AgentMessage>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct MasterHandlerImpl<MMH>
|
||||||
|
where
|
||||||
|
MMH: MasterMessageHandler,
|
||||||
|
{
|
||||||
|
connector: Arc<MasterConnector>,
|
||||||
|
message_handler: Arc<MMH>,
|
||||||
|
message_handle_lock: tokio::sync::RwLock<Option<MessageHandleInfo>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<MMH> MasterHandlerImpl<MMH>
|
||||||
|
where
|
||||||
|
MMH: MasterMessageHandler,
|
||||||
|
{
|
||||||
|
pub fn new(connector: Arc<MasterConnector>, message_handler: Arc<MMH>) -> Self {
|
||||||
|
Self {
|
||||||
|
connector,
|
||||||
|
message_handler,
|
||||||
|
message_handle_lock: tokio::sync::RwLock::new(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl<MMH> MasterHandler for MasterHandlerImpl<MMH>
|
||||||
|
where
|
||||||
|
MMH: MasterMessageHandler,
|
||||||
|
{
|
||||||
|
async fn start_handle_master_message(&self) -> MessageResult<()> {
|
||||||
|
let mut client = self.connector.get_client();
|
||||||
|
|
||||||
|
'connection_loop: loop {
|
||||||
|
let guard_result = self.message_handle_lock.try_write();
|
||||||
|
let mut guard = match guard_result {
|
||||||
|
Ok(g) if g.is_none() => g,
|
||||||
|
Ok(_) => {
|
||||||
|
warn!("Master message handler is already running");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
return Err(MasterHandlerError::MessageHandlingError(format!(
|
||||||
|
"Failed to acquire lock for message handler: {}",
|
||||||
|
e
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let (tx, rx) = mpsc::channel(32);
|
||||||
|
let outbound_stream = ReceiverStream::new(rx);
|
||||||
|
// 2. Connect to the master and start the bi-directional streaming RPC
|
||||||
|
let mut stream = match client.stream(outbound_stream).await {
|
||||||
|
Ok(s) => s.into_inner(),
|
||||||
|
Err(e) => {
|
||||||
|
error!(
|
||||||
|
"Failed to connect to master: {}. Retrying in 5 seconds...",
|
||||||
|
e
|
||||||
|
);
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||||||
|
continue 'connection_loop;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// 3. Spawn a task to handle incoming messages from the master
|
||||||
|
let message_handler = self.message_handler.clone();
|
||||||
|
let handle = tokio::spawn(async move {
|
||||||
|
'message_loop: loop {
|
||||||
|
let message = stream.message().await;
|
||||||
|
match message {
|
||||||
|
Ok(Some(msg)) => {
|
||||||
|
if let Err(e) = message_handler.handle_master_message(msg).await {
|
||||||
|
error!("Failed to handle master message: {:?}", e);
|
||||||
|
}
|
||||||
|
continue 'message_loop;
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
warn!("Master closed the connection");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("Error receiving message from master: {:?}", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
*guard = Some(MessageHandleInfo { handle, tx });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn stop_handle_master_message(&self) -> MessageResult<()> {
|
||||||
|
// 1. signal the task to stop (e.g. using a cancellation token or channel)
|
||||||
|
// 2. wait for the task to finish and handle any errors
|
||||||
|
// 3. set handle_master_message_task to None
|
||||||
|
|
||||||
|
let mut guard = self.message_handle_lock.write().await;
|
||||||
|
if let Some(handle_info) = guard.take() {
|
||||||
|
handle_info.handle.abort();
|
||||||
|
match handle_info.handle.await {
|
||||||
|
Ok(_) => info!("Master message handler task stopped successfully"),
|
||||||
|
Err(e) => error!("Failed to stop master message handler task: {:?}", e),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
warn!("Master message handler is not running");
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_message_to_master(&self, message: AgentMessage) -> MessageResult<()> {
|
||||||
|
let guard = self.message_handle_lock.read().await;
|
||||||
|
if let Some(handle_info) = guard.as_ref() {
|
||||||
|
handle_info.tx.send(message).await.map_err(|e| {
|
||||||
|
MasterHandlerError::SendMessageError(format!(
|
||||||
|
"Failed to send message to master: {}",
|
||||||
|
e
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
} else {
|
||||||
|
return Err(MasterHandlerError::SendMessageError(
|
||||||
|
"Master message handler is not running".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user