From f7162f0c1779668c78060c1d84455b564f483de5 Mon Sep 17 00:00:00 2001 From: GW_MC <72297530+GWMCwing@users.noreply.github.com> Date: Mon, 1 Jun 2026 10:05:49 +0000 Subject: [PATCH] feat: implement service initialization and shutdown handling for NxMesh Agent --- apps/nxmesh-agent/src/main.rs | 63 ++++++++++--------- apps/nxmesh-agent/src/service/mod.rs | 45 +++++++++++++ .../src/service/nginx_handler/mod.rs | 21 ++++++- 3 files changed, 97 insertions(+), 32 deletions(-) diff --git a/apps/nxmesh-agent/src/main.rs b/apps/nxmesh-agent/src/main.rs index d18ba08..245f0f1 100644 --- a/apps/nxmesh-agent/src/main.rs +++ b/apps/nxmesh-agent/src/main.rs @@ -1,7 +1,7 @@ #![forbid(unsafe_code)] #![deny(clippy::unwrap_used, clippy::panic, clippy::expect_used)] -use std::process::exit; +use std::{process::exit, sync::Arc}; use tracing::{error, info}; use tracing_subscriber::{ @@ -9,7 +9,7 @@ use tracing_subscriber::{ util::SubscriberInitExt, }; -use crate::connector::master::{MasterConnector, MasterConnectorTrait, ssh::SshMasterConnector}; +use crate::service::get_services; mod cli; mod config; @@ -44,38 +44,26 @@ async fn main() { // info!("Loaded settings: {:#?}", settings); info!("Starting NxMesh Agent..."); - // install grpc client - #[expect(clippy::expect_used)] - let ssh_connector = SshMasterConnector::new(settings.grpc.clone()) + let services = get_services(Arc::new(settings)) .await - .inspect_err(|e| { - error!("Failed to create SSH Master Connector: {}", e); - exit(1); + .map_err(|e| { + error!("Failed to initialize services: {}", e); + e }) - .expect("Failed to create SSH Master Connector"); - let mut master_connector = MasterConnector::new(Box::new(ssh_connector)); + .unwrap_or_else(|_| { + std::process::exit(1); + }); - if let Err(e) = master_connector.connect(&settings).await { - error!("Failed to connect to master: {}", e); - exit(1); - } - - // send a dummy heartbeat to verify the connection is working - let mut client = master_connector.get_client().clone(); - - let request = nxmesh_proto::TestRequest { - ..Default::default() - }; - - match client.connection_test(request).await { - Ok(_) => info!("Successfully sent connection test to master."), - Err(e) => { - error!("Failed to send connection test to master: {}", e); - exit(1); + let master_handler = services.master_handler.clone(); + // spawn the long-running handler so main can wait for shutdown signal + tokio::spawn(async move { + if let Err(e) = master_handler.start_handle_master_message().await { + error!("Master message handler exited with error: {:?}", e); } - } + }); - info!("Successfully connected to master. Agent is running."); + info!("Agent is running. Waiting for shutdown signal."); + shutdown_handler(services.master_handler.clone()).await; } fn install_tracing_subscriber() @@ -92,3 +80,20 @@ fn install_tracing_subscriber() reload_handle } + +async fn wait_for_shutdown_signal() { + #[expect(clippy::expect_used)] + tokio::signal::ctrl_c() + .await + .expect("Failed to listen for ctrl_c"); + info!("Shutdown signal received, stopping handler."); +} + +async fn shutdown_handler(master_handler: Arc) { + wait_for_shutdown_signal().await; + // + let _ = master_handler.stop_handle_master_message().await; + // + info!("Agent stopped."); + exit(0); +} diff --git a/apps/nxmesh-agent/src/service/mod.rs b/apps/nxmesh-agent/src/service/mod.rs index 973457b..35e3cfd 100644 --- a/apps/nxmesh-agent/src/service/mod.rs +++ b/apps/nxmesh-agent/src/service/mod.rs @@ -1,2 +1,47 @@ +use std::sync::Arc; + +use crate::{ + config::settings::Settings, + connector::master::{MasterConnector, ssh::SshMasterConnector}, + service::{ + master_handler::{MasterHandler, MasterHandlerImpl, handlers::HandlerImpl}, + nginx_handler::{NginxHandler, NginxHandlerImpl}, + }, +}; + pub mod master_handler; pub mod nginx_handler; + +pub struct Services { + pub master_handler: Arc, + pub nginx_handler: Arc, +} + +pub async fn get_services(settings: Arc) -> anyhow::Result { + let master_connector = initialize_master_connector(settings.clone()).await?; + + let nginx_handler = Arc::new(NginxHandlerImpl::new(settings.nginx.clone().into())); + + let message_handler = Arc::new(HandlerImpl::new(nginx_handler.clone())); + + // build the services + let master_handler = Arc::new(MasterHandlerImpl::new( + Arc::new(master_connector), + message_handler, + )); + + Ok(Services { + master_handler, + nginx_handler, + }) +} + +async fn initialize_master_connector(settings: Arc) -> anyhow::Result { + let ssh_connector = SshMasterConnector::new(settings.grpc.clone()) + .await + .map_err(|e| anyhow::anyhow!("Failed to initialize SSH connector: {}", e))?; + + let master_connector = MasterConnector::new(Box::new(ssh_connector)); + + Ok(master_connector) +} diff --git a/apps/nxmesh-agent/src/service/nginx_handler/mod.rs b/apps/nxmesh-agent/src/service/nginx_handler/mod.rs index 2983815..b88ddb7 100644 --- a/apps/nxmesh-agent/src/service/nginx_handler/mod.rs +++ b/apps/nxmesh-agent/src/service/nginx_handler/mod.rs @@ -1,12 +1,16 @@ use std::sync::Arc; use anyhow::Result; +use nxmesh_proto::ConfigUpdate; use crate::{ config::settings::NginxSettings, - service::nginx_handler::{ - command_handler::{CommandHandler, CommandHandlerImpl}, - fs_handler::{FsHandler, FsHandlerImpl}, + service::{ + master_handler::{MessageResult, handlers::OnConfigUpdateHandler}, + nginx_handler::{ + command_handler::{CommandHandler, CommandHandlerImpl}, + fs_handler::{FsHandler, FsHandlerImpl}, + }, }, }; @@ -119,3 +123,14 @@ where self.fs_handler.cleanup_config(n).await } } + +#[async_trait::async_trait] +impl OnConfigUpdateHandler for NginxHandlerImpl +where + CH: CommandHandler + ?Sized, + FSH: FsHandler + ?Sized, +{ + async fn on_config_update(&self, config_info: ConfigUpdate) -> MessageResult<()> { + todo!(); + } +}