feat: implement service initialization and shutdown handling for NxMesh Agent
This commit is contained in:
@@ -1,7 +1,7 @@
|
|||||||
#![forbid(unsafe_code)]
|
#![forbid(unsafe_code)]
|
||||||
#![deny(clippy::unwrap_used, clippy::panic, clippy::expect_used)]
|
#![deny(clippy::unwrap_used, clippy::panic, clippy::expect_used)]
|
||||||
|
|
||||||
use std::process::exit;
|
use std::{process::exit, sync::Arc};
|
||||||
|
|
||||||
use tracing::{error, info};
|
use tracing::{error, info};
|
||||||
use tracing_subscriber::{
|
use tracing_subscriber::{
|
||||||
@@ -9,7 +9,7 @@ use tracing_subscriber::{
|
|||||||
util::SubscriberInitExt,
|
util::SubscriberInitExt,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::connector::master::{MasterConnector, MasterConnectorTrait, ssh::SshMasterConnector};
|
use crate::service::get_services;
|
||||||
|
|
||||||
mod cli;
|
mod cli;
|
||||||
mod config;
|
mod config;
|
||||||
@@ -44,38 +44,26 @@ async fn main() {
|
|||||||
// info!("Loaded settings: {:#?}", settings);
|
// info!("Loaded settings: {:#?}", settings);
|
||||||
|
|
||||||
info!("Starting NxMesh Agent...");
|
info!("Starting NxMesh Agent...");
|
||||||
// install grpc client
|
let services = get_services(Arc::new(settings))
|
||||||
#[expect(clippy::expect_used)]
|
|
||||||
let ssh_connector = SshMasterConnector::new(settings.grpc.clone())
|
|
||||||
.await
|
.await
|
||||||
.inspect_err(|e| {
|
.map_err(|e| {
|
||||||
error!("Failed to create SSH Master Connector: {}", e);
|
error!("Failed to initialize services: {}", e);
|
||||||
exit(1);
|
e
|
||||||
})
|
})
|
||||||
.expect("Failed to create SSH Master Connector");
|
.unwrap_or_else(|_| {
|
||||||
let mut master_connector = MasterConnector::new(Box::new(ssh_connector));
|
std::process::exit(1);
|
||||||
|
});
|
||||||
|
|
||||||
if let Err(e) = master_connector.connect(&settings).await {
|
let master_handler = services.master_handler.clone();
|
||||||
error!("Failed to connect to master: {}", e);
|
// spawn the long-running handler so main can wait for shutdown signal
|
||||||
exit(1);
|
tokio::spawn(async move {
|
||||||
}
|
if let Err(e) = master_handler.start_handle_master_message().await {
|
||||||
|
error!("Master message handler exited with error: {:?}", e);
|
||||||
// send a dummy heartbeat to verify the connection is working
|
|
||||||
let mut client = master_connector.get_client().clone();
|
|
||||||
|
|
||||||
let request = nxmesh_proto::TestRequest {
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
match client.connection_test(request).await {
|
|
||||||
Ok(_) => info!("Successfully sent connection test to master."),
|
|
||||||
Err(e) => {
|
|
||||||
error!("Failed to send connection test to master: {}", e);
|
|
||||||
exit(1);
|
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
|
|
||||||
info!("Successfully connected to master. Agent is running.");
|
info!("Agent is running. Waiting for shutdown signal.");
|
||||||
|
shutdown_handler(services.master_handler.clone()).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn install_tracing_subscriber()
|
fn install_tracing_subscriber()
|
||||||
@@ -92,3 +80,20 @@ fn install_tracing_subscriber()
|
|||||||
|
|
||||||
reload_handle
|
reload_handle
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn wait_for_shutdown_signal() {
|
||||||
|
#[expect(clippy::expect_used)]
|
||||||
|
tokio::signal::ctrl_c()
|
||||||
|
.await
|
||||||
|
.expect("Failed to listen for ctrl_c");
|
||||||
|
info!("Shutdown signal received, stopping handler.");
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn shutdown_handler(master_handler: Arc<dyn service::master_handler::MasterHandler>) {
|
||||||
|
wait_for_shutdown_signal().await;
|
||||||
|
//
|
||||||
|
let _ = master_handler.stop_handle_master_message().await;
|
||||||
|
//
|
||||||
|
info!("Agent stopped.");
|
||||||
|
exit(0);
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,2 +1,47 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
config::settings::Settings,
|
||||||
|
connector::master::{MasterConnector, ssh::SshMasterConnector},
|
||||||
|
service::{
|
||||||
|
master_handler::{MasterHandler, MasterHandlerImpl, handlers::HandlerImpl},
|
||||||
|
nginx_handler::{NginxHandler, NginxHandlerImpl},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
pub mod master_handler;
|
pub mod master_handler;
|
||||||
pub mod nginx_handler;
|
pub mod nginx_handler;
|
||||||
|
|
||||||
|
pub struct Services {
|
||||||
|
pub master_handler: Arc<dyn MasterHandler>,
|
||||||
|
pub nginx_handler: Arc<dyn NginxHandler>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_services(settings: Arc<Settings>) -> anyhow::Result<Services> {
|
||||||
|
let master_connector = initialize_master_connector(settings.clone()).await?;
|
||||||
|
|
||||||
|
let nginx_handler = Arc::new(NginxHandlerImpl::new(settings.nginx.clone().into()));
|
||||||
|
|
||||||
|
let message_handler = Arc::new(HandlerImpl::new(nginx_handler.clone()));
|
||||||
|
|
||||||
|
// build the services
|
||||||
|
let master_handler = Arc::new(MasterHandlerImpl::new(
|
||||||
|
Arc::new(master_connector),
|
||||||
|
message_handler,
|
||||||
|
));
|
||||||
|
|
||||||
|
Ok(Services {
|
||||||
|
master_handler,
|
||||||
|
nginx_handler,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn initialize_master_connector(settings: Arc<Settings>) -> anyhow::Result<MasterConnector> {
|
||||||
|
let ssh_connector = SshMasterConnector::new(settings.grpc.clone())
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to initialize SSH connector: {}", e))?;
|
||||||
|
|
||||||
|
let master_connector = MasterConnector::new(Box::new(ssh_connector));
|
||||||
|
|
||||||
|
Ok(master_connector)
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,12 +1,16 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
|
use nxmesh_proto::ConfigUpdate;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
config::settings::NginxSettings,
|
config::settings::NginxSettings,
|
||||||
service::nginx_handler::{
|
service::{
|
||||||
command_handler::{CommandHandler, CommandHandlerImpl},
|
master_handler::{MessageResult, handlers::OnConfigUpdateHandler},
|
||||||
fs_handler::{FsHandler, FsHandlerImpl},
|
nginx_handler::{
|
||||||
|
command_handler::{CommandHandler, CommandHandlerImpl},
|
||||||
|
fs_handler::{FsHandler, FsHandlerImpl},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -119,3 +123,14 @@ where
|
|||||||
self.fs_handler.cleanup_config(n).await
|
self.fs_handler.cleanup_config(n).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl<CH, FSH> OnConfigUpdateHandler for NginxHandlerImpl<CH, FSH>
|
||||||
|
where
|
||||||
|
CH: CommandHandler + ?Sized,
|
||||||
|
FSH: FsHandler + ?Sized,
|
||||||
|
{
|
||||||
|
async fn on_config_update(&self, config_info: ConfigUpdate) -> MessageResult<()> {
|
||||||
|
todo!();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user