feat: implement MasterHandler for handling master messages and add retry logic
This commit is contained in:
@@ -1,38 +1,275 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use nxmesh_proto::ConfigUpdate;
|
||||
use tracing::info;
|
||||
use anyhow::{Context, Result, bail};
|
||||
use nxmesh_proto::{AgentMessage, ConfigUpdate, MasterMessage};
|
||||
use tokio::time::sleep;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tonic::Request;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use crate::connector::master::MasterConnector;
|
||||
use crate::connector::master::{MasterConnector, MasterConnectorTrait};
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait MasterHandler {
|
||||
async fn on_config_update(
|
||||
&self,
|
||||
config_info: ConfigUpdate,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
|
||||
pub trait MasterHandler: Send + Sync {
|
||||
// Create a new routine to handle incoming messages from the master
|
||||
// This method will auto-reconnect if the connection is lost, so it should run indefinitely until the agent is shut down
|
||||
async fn start_handle_master_message(self: Arc<Self>) -> Result<()>;
|
||||
async fn stop_handle_master_message(self: Arc<Self>) -> Result<()>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
trait MasterMessageHandler: Send + Sync {
|
||||
async fn on_config_update(&self, config_info: ConfigUpdate) -> Result<()>;
|
||||
}
|
||||
|
||||
pub struct MasterHandlerImpl {
|
||||
settings: Arc<crate::config::settings::Settings>,
|
||||
nginx_handler: Arc<dyn crate::service::nginx_handler::NginxHandler>,
|
||||
master_connector: Arc<MasterConnector>,
|
||||
handle_master_message_task: tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
|
||||
}
|
||||
|
||||
impl MasterHandlerImpl {
|
||||
pub fn new(settings: impl Into<Arc<crate::config::settings::Settings>>) -> Self {
|
||||
Self {
|
||||
pub fn new(
|
||||
settings: impl Into<Arc<crate::config::settings::Settings>>,
|
||||
nginx_handler: impl Into<Arc<dyn crate::service::nginx_handler::NginxHandler>>,
|
||||
master_connector: impl Into<Arc<MasterConnector>>,
|
||||
) -> Arc<Self> {
|
||||
Arc::new(Self {
|
||||
settings: settings.into(),
|
||||
nginx_handler: nginx_handler.into(),
|
||||
master_connector: master_connector.into(),
|
||||
handle_master_message_task: tokio::sync::Mutex::new(None),
|
||||
})
|
||||
}
|
||||
|
||||
async fn retry_with_delay(
|
||||
retry_count: &mut u32,
|
||||
max_retries: u32,
|
||||
base_delay: std::time::Duration,
|
||||
) -> Result<()> {
|
||||
if *retry_count < max_retries {
|
||||
let backoff_delay = 2u32.pow(*retry_count) * base_delay;
|
||||
warn!(
|
||||
"Retrying connection to master in {:?} (attempt {}/{})...",
|
||||
backoff_delay,
|
||||
*retry_count + 1,
|
||||
max_retries
|
||||
);
|
||||
sleep(backoff_delay).await;
|
||||
*retry_count += 1;
|
||||
Ok(())
|
||||
} else {
|
||||
error!("Exceeded maximum retry attempts to connect to master. Giving up.");
|
||||
bail!("Failed to connect to master after maximum retry attempts.")
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_master_message_task(
|
||||
handler_clone: Arc<dyn MasterMessageHandler>,
|
||||
master_connector: Arc<MasterConnector>,
|
||||
) {
|
||||
let mut retry_count: u32 = 0;
|
||||
let max_retries: u32 = 5;
|
||||
let base_retry_delay = std::time::Duration::from_secs(5);
|
||||
|
||||
'connection_loop: loop {
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(128);
|
||||
let outbound = ReceiverStream::new(rx);
|
||||
|
||||
let request = AgentMessage {
|
||||
// TODO: get agent ID from settings or generate a unique ID for this agent
|
||||
agent_id: "TODO".to_string(),
|
||||
timestamp: chrono::Utc::now().timestamp_millis(),
|
||||
payload: None,
|
||||
};
|
||||
let response = master_connector
|
||||
.get_client()
|
||||
.lock()
|
||||
.await
|
||||
.stream(Request::new(outbound))
|
||||
.await;
|
||||
|
||||
if let Err(e) = tx.send(request).await {
|
||||
error!("Failed to send initial message to master: {:?}", e);
|
||||
match Self::retry_with_delay(&mut retry_count, max_retries, base_retry_delay).await
|
||||
{
|
||||
Ok(()) => continue 'connection_loop,
|
||||
Err(e) => {
|
||||
error!("Failed to connect to master after retrying: {:?}", e);
|
||||
break 'connection_loop;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut inbound = match response {
|
||||
Ok(res) => res.into_inner(),
|
||||
Err(e) => {
|
||||
info!("Failed to connect to master: {:?}", e);
|
||||
match Self::retry_with_delay(&mut retry_count, max_retries, base_retry_delay)
|
||||
.await
|
||||
{
|
||||
Ok(()) => continue 'connection_loop,
|
||||
Err(e) => {
|
||||
error!("Failed to connect to master after retrying: {:?}", e);
|
||||
break 'connection_loop;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
retry_count = 0; // reset retry count on successful connection
|
||||
|
||||
loop {
|
||||
match inbound.message().await {
|
||||
Ok(Some(message)) => {
|
||||
Self::handle_inbound_message(handler_clone.clone(), message).await;
|
||||
}
|
||||
Ok(None) => {
|
||||
warn!("Master closed the connection");
|
||||
match Self::retry_with_delay(
|
||||
&mut retry_count,
|
||||
max_retries,
|
||||
base_retry_delay,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(()) => continue 'connection_loop,
|
||||
Err(e) => {
|
||||
error!("Failed to connect to master after retrying: {:?}", e);
|
||||
break 'connection_loop;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to receive message from master: {:?}", e);
|
||||
match Self::retry_with_delay(
|
||||
&mut retry_count,
|
||||
max_retries,
|
||||
base_retry_delay,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(()) => continue 'connection_loop,
|
||||
Err(e) => {
|
||||
error!("Failed to connect to master after retrying: {:?}", e);
|
||||
break 'connection_loop;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_inbound_message(
|
||||
handler_clone: Arc<dyn MasterMessageHandler>,
|
||||
message: MasterMessage,
|
||||
) -> () {
|
||||
if let Some(payload) = message.payload {
|
||||
match payload {
|
||||
nxmesh_proto::master_message::Payload::ConfigUpdate(config_info) => {
|
||||
handler_clone
|
||||
.on_config_update(config_info)
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
error!("Failed to handle config update from master: {:?}", e);
|
||||
});
|
||||
}
|
||||
_ => {
|
||||
warn!("Received unsupported message from master: {:?}", payload);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warn!("Received message from master with empty payload");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl MasterHandler for MasterHandlerImpl {
|
||||
async fn on_config_update(
|
||||
&self,
|
||||
config_info: ConfigUpdate,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
async fn start_handle_master_message(self: Arc<Self>) -> Result<()> {
|
||||
if let Some(handle) = self.handle_master_message_task.lock().await.as_ref() {
|
||||
bail!(
|
||||
"Master message handler is already running with task id: {:?}",
|
||||
handle.id()
|
||||
);
|
||||
}
|
||||
|
||||
// Create a clone of the Arc<Self> and upcast to the trait object
|
||||
let handler_clone: Arc<dyn MasterMessageHandler> = self.clone();
|
||||
let master_connector = self.master_connector.clone();
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
Self::handle_master_message_task(handler_clone, master_connector).await
|
||||
});
|
||||
|
||||
let mut guard = self.handle_master_message_task.lock().await;
|
||||
*guard = Some(handle);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn stop_handle_master_message(self: Arc<Self>) -> Result<()> {
|
||||
// 1. signal the task to stop (e.g. using a cancellation token or channel)
|
||||
// 2. wait for the task to finish and handle any errors
|
||||
// 3. set handle_master_message_task to None
|
||||
|
||||
let mut guard = self.handle_master_message_task.lock().await;
|
||||
if let Some(handle) = guard.take() {
|
||||
handle.abort();
|
||||
match handle.await {
|
||||
Ok(_) => info!("Master message handler task stopped successfully"),
|
||||
Err(e) => error!("Failed to stop master message handler task: {:?}", e),
|
||||
}
|
||||
} else {
|
||||
warn!("Master message handler is not running");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl MasterMessageHandler for MasterHandlerImpl {
|
||||
async fn on_config_update(&self, config_info: ConfigUpdate) -> Result<()> {
|
||||
info!("Received config update from master: {:?}", config_info);
|
||||
|
||||
let deployment_id = format!("{}__{}", config_info.config_id, config_info.version);
|
||||
|
||||
let nginx_root_path = self
|
||||
.nginx_handler
|
||||
.write_config(
|
||||
&deployment_id,
|
||||
&config_info
|
||||
.root_config
|
||||
.as_ref()
|
||||
.context("Root config is required in the config update")?
|
||||
.content,
|
||||
"nginx.conf",
|
||||
)
|
||||
.await?;
|
||||
|
||||
// create files and write the config content to the files
|
||||
for config in config_info.configs {
|
||||
self.nginx_handler
|
||||
.write_config(&deployment_id, &config.content, &config.path)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// TODO: handle certificates
|
||||
|
||||
// test the new config before reloading nginx
|
||||
self.nginx_handler
|
||||
.validate(Some(nginx_root_path.as_str()))
|
||||
.await
|
||||
.context("Failed to validate the new nginx config")?;
|
||||
|
||||
// reload nginx to apply the new config
|
||||
self.nginx_handler
|
||||
.reload(Some(nginx_root_path.as_str()))
|
||||
.await
|
||||
.context("Failed to reload nginx with the new config")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1 +1,2 @@
|
||||
pub mod master_handler;
|
||||
pub mod nginx_handler;
|
||||
|
||||
Reference in New Issue
Block a user