feat: Implement SSH Agent Connector and gRPC server

- Added `AgentConnectorTrait` and `AgentConnector` for managing agent connections.
- Introduced `SshAgentConnector` to handle SSH-related functionalities and start a gRPC server.
- Created database entities for `agents`, `certificates`, `organizations`, `public_key_revocations`, `setup_tokens`, `upstreams`, `users`, `virtual_hosts`, and `workspaces` using SeaORM.
- Developed `CertificateService` for managing certificate generation and retrieval.
- Implemented the main server logic to initialize the database connection and start the agent server.
- Configured development settings in `development.toml` for server and database connections.
This commit is contained in:
GW_MC
2026-03-21 03:09:39 +00:00
parent 9640f03d69
commit f5eb25993b
27 changed files with 1581 additions and 2 deletions

View File

@@ -0,0 +1,54 @@
use nxmesh_proto::{
Ack, AgentMessage, HealthReport, MasterMessage, MetricsBatch,
agent_service_server::AgentService,
};
use tracing::warn;
#[derive(Debug, Default)]
pub struct AgentServerService {}
#[async_trait::async_trait]
impl AgentService for AgentServerService {
#[doc = " Server streaming response type for the Stream method."]
type StreamStream = tonic::codec::Streaming<MasterMessage>;
#[doc = " Stream establishes a persistent connection for real-time communication"]
#[allow(
mismatched_lifetime_syntaxes,
clippy::type_complexity,
clippy::type_repetition_in_bounds
)]
async fn stream(
&self,
request: tonic::Request<tonic::Streaming<AgentMessage>>,
) -> Result<tonic::Response<Self::StreamStream>, tonic::Status> {
todo!()
}
#[doc = " ReportHealth sends a health report to the master"]
#[allow(
mismatched_lifetime_syntaxes,
clippy::type_complexity,
clippy::type_repetition_in_bounds
)]
async fn report_health(
&self,
request: tonic::Request<HealthReport>,
) -> Result<tonic::Response<Ack>, tonic::Status> {
warn!("Received health report: {:?}", request.get_ref());
todo!()
}
#[doc = " ReportMetrics sends metrics batch to the master"]
#[allow(
mismatched_lifetime_syntaxes,
clippy::type_complexity,
clippy::type_repetition_in_bounds
)]
async fn report_metrics(
&self,
request: tonic::Request<MetricsBatch>,
) -> Result<tonic::Response<Ack>, tonic::Status> {
todo!()
}
}

View File

@@ -0,0 +1,349 @@
use std::{io::Write, os::unix::fs::PermissionsExt, path::Path, sync::Arc};
use rcgen::{
BasicConstraints, CertificateParams, DnType, ExtendedKeyUsagePurpose, IsCa, Issuer, KeyPair,
KeyUsagePurpose, SanType, string::Ia5String,
};
use sea_orm::DatabaseConnection;
use time::{Duration, OffsetDateTime};
use tracing::debug;
// TODO: cert rotation, revocation, and CRL support
pub enum ConnectionType {
GRPC,
HTTP,
}
#[async_trait::async_trait]
pub trait CertificateService: Sync + Send {
/// Get the CA certificate path, if the CA certificate does not exist, return an error
async fn get_ca_cert(
&self,
) -> Result<(String, String), Box<dyn std::error::Error + Send + Sync>>;
/// Generate an in memory public and private key pair, sign it with the CA certificate and key, and return the signed public and private key as PEM string, if the CA certificate does not exist, return an error, if the CA certificate already exists, return an error
async fn generate_pub_cert_pair(
&self,
san_ips: Vec<std::net::IpAddr>,
san_dns: Vec<Ia5String>,
) -> Result<(String, String), Box<dyn std::error::Error + Send + Sync>>;
/// Generate a new CA certificate and save it to the specified path, if the CA certificate already exists, return an error
async fn generate_ca_cert(
&self,
) -> Result<CertPathInfo, Box<dyn std::error::Error + Send + Sync>>;
/// Generate certificates for agent and save them to the specified output directory, the output directory should be created if it does not exist
async fn generate_agent_certs(
&self,
agent_id: &str,
output_dir: &str,
) -> Result<AgentCertPathInfo, Box<dyn std::error::Error + Send + Sync>>;
/// Zip the generated agent certificates, the input should be the cert path and key path, the output should be a zip file containing the cert and key
async fn zip_certificates(
&self,
cert_path: &str,
key_path: &str,
ca_cert_path: &str,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>>;
/// Get the sans to be included in the generated certificates, this is used to support IP-based connections to the agent, the SANs will be included in both the CA certificate and the agent certificates, if the SANs are not specified, some common local addresses will be included by default
fn get_sans(&self, connection_type: ConnectionType) -> (Vec<std::net::IpAddr>, Vec<Ia5String>);
}
pub struct CertificateServiceImpl {
db: DatabaseConnection,
/// The path to the CA certificate, the CA certificate and private key will be saved to this path when generating a new CA certificate
cert_folder_path: String,
settings: Arc<crate::config::settings::Settings>,
}
impl CertificateServiceImpl {
pub fn new(
db: DatabaseConnection,
cert_folder_path: String,
settings: Arc<crate::config::settings::Settings>,
) -> Self {
Self {
db,
cert_folder_path,
settings,
}
}
}
#[derive(Debug, Clone)]
pub struct CertPathInfo {
pub private_key: String,
pub cert_pem: String,
pub public_key: String,
}
#[derive(Debug, Clone)]
pub struct AgentCertPathInfo {
pub cert_path: String,
pub key_path: String,
pub ca_cert_path: String,
}
#[async_trait::async_trait]
impl CertificateService for CertificateServiceImpl {
async fn get_ca_cert(
&self,
) -> Result<(String, String), Box<dyn std::error::Error + Send + Sync>> {
if Path::new(&self.cert_folder_path).exists() {
let cert_path = Path::new(&self.cert_folder_path).join("ca.crt");
let key_path = Path::new(&self.cert_folder_path).join("ca.key");
if cert_path.exists() && key_path.exists() {
Ok((
cert_path.to_string_lossy().to_string(),
key_path.to_string_lossy().to_string(),
))
} else {
Err(Box::new(std::io::Error::new(
std::io::ErrorKind::NotFound,
"CA certificate or key not found",
)))
}
} else {
Err(Box::new(std::io::Error::new(
std::io::ErrorKind::NotFound,
"CA certificate folder not found",
)))
}
}
async fn generate_pub_cert_pair(
&self,
san_ips: Vec<std::net::IpAddr>,
san_dns: Vec<Ia5String>,
) -> Result<(String, String), Box<dyn std::error::Error + Send + Sync>> {
let (ca_cert_path, ca_key_path) = self.get_ca_cert().await?;
let ca_cert_pem = std::fs::read_to_string(ca_cert_path)?;
let ca_key_pem = std::fs::read_to_string(ca_key_path)?;
let ca_key = KeyPair::from_pem(&ca_key_pem)?;
let issuer = Issuer::from_ca_cert_pem(&ca_cert_pem, ca_key)?;
// TODO: require input to set the SANs for the generated certificate, for now we will include some common local addresses to support IP-based connections to the agent, but in the future we should allow users to specify the SANs for the generated certificates
// Include SANs for common local addresses to support IP-based connections
let subject_alt_names: Vec<SanType> = [
san_ips
.into_iter()
.map(SanType::IpAddress)
.collect::<Vec<SanType>>(),
san_dns
.into_iter()
.map(|dns| SanType::DnsName(dns))
.collect::<Vec<SanType>>(),
]
.concat();
let mut params = CertificateParams::default();
params.subject_alt_names = subject_alt_names;
params.is_ca = IsCa::NoCa;
params.key_usages.push(KeyUsagePurpose::DigitalSignature);
params
.extended_key_usages
.push(ExtendedKeyUsagePurpose::ServerAuth);
params
.extended_key_usages
.push(ExtendedKeyUsagePurpose::ClientAuth);
params.serial_number = Some(rand::random::<u64>().into()); // Unique serial
let (not_before, not_after) = validity_period();
params.not_before = not_before;
params.not_after = not_after;
let key_pair = KeyPair::generate_for(&rcgen::PKCS_ED25519)?;
let cert = params.signed_by(&key_pair, &issuer)?;
Ok((cert.pem(), key_pair.serialize_pem()))
}
async fn generate_ca_cert(
&self,
) -> Result<CertPathInfo, Box<dyn std::error::Error + Send + Sync>> {
// check if the CA certificate already exists in the folder
let cert_folder_path = Path::new(&self.cert_folder_path);
let cert_path = cert_folder_path.join("ca.crt");
let key_path = cert_folder_path.join("ca.key");
let pub_path = cert_folder_path.join("ca.pub");
if !cert_folder_path.exists() {
std::fs::create_dir_all(cert_folder_path)?;
}
if cert_path.exists() || key_path.exists() || pub_path.exists() {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::AlreadyExists,
"CA certificate already exists",
)));
}
let kp = KeyPair::generate_for(&rcgen::PKCS_ED25519)?;
let mut params = CertificateParams::new(Vec::default())?;
params.is_ca = IsCa::Ca(BasicConstraints::Unconstrained);
params
.distinguished_name
.push(DnType::OrganizationName, "MasterCA");
params.key_usages.push(KeyUsagePurpose::DigitalSignature);
params.key_usages.push(KeyUsagePurpose::KeyCertSign);
params.key_usages.push(KeyUsagePurpose::CrlSign);
let (not_before, not_after) = validity_period();
params.not_before = not_before;
params.not_after = not_after;
let ca_cert = params.self_signed(&kp)?;
let cert_pem = ca_cert.pem();
let private_key = kp.serialize_pem();
let public_key = kp.public_key_pem();
// save the CA certificate and private key to the specified path
std::fs::write(&cert_path, cert_pem.as_bytes())?;
std::fs::set_permissions(cert_path, std::fs::Permissions::from_mode(0o600))?;
std::fs::write(&key_path, private_key.as_bytes())?;
std::fs::set_permissions(key_path, std::fs::Permissions::from_mode(0o600))?;
std::fs::write(&pub_path, public_key.as_bytes())?;
std::fs::set_permissions(pub_path, std::fs::Permissions::from_mode(0o600))?;
Ok(CertPathInfo {
private_key,
cert_pem,
public_key,
})
}
async fn generate_agent_certs(
&self,
agent_id: &str,
output_dir: &str,
) -> Result<AgentCertPathInfo, Box<dyn std::error::Error + Send + Sync>> {
debug!(
"Generating agent certificates for agent_id: {}, output_dir: {}",
agent_id, output_dir
);
let output_path_dir = Path::new(output_dir).join(agent_id);
let cert_path = output_path_dir.join("cert.pem");
let key_path = output_path_dir.join("key.pem");
// validate output parent directory exists
if !std::path::Path::new(output_dir).exists() {
// TODO: custom error type
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::NotFound,
"Output parent directory does not exist",
)));
}
// create output directory if it does not exist
if !output_path_dir.exists() {
std::fs::create_dir_all(&output_path_dir)?;
}
// Check if CA certificate exists
let (ca_cert_path, ca_key_path) = self.get_ca_cert().await?;
// Read CA certificate and key from disk
debug!("Reading CA certificate from path: {:?}", ca_cert_path);
let ca_cert_pem = std::fs::read_to_string(ca_cert_path.clone())?;
let ca_key_pem = std::fs::read_to_string(ca_key_path)?;
// Parse CA key and create issuer
debug!("Parsing CA key and creating issuer");
let ca_key = KeyPair::from_pem(&ca_key_pem)?;
let issuer = Issuer::from_ca_cert_pem(&ca_cert_pem, ca_key)?;
// Generate agent keypair
let agent_keypair = KeyPair::generate_for(&rcgen::PKCS_ED25519)?;
// Params for agent leaf cert
let mut params = CertificateParams::new(vec![agent_id.to_string()])?;
params
.distinguished_name
.push(DnType::CommonName, agent_id.to_string());
params.use_authority_key_identifier_extension = true;
params.key_usages.push(KeyUsagePurpose::DigitalSignature);
params
.extended_key_usages
.push(ExtendedKeyUsagePurpose::ServerAuth);
params
.extended_key_usages
.push(ExtendedKeyUsagePurpose::ClientAuth);
params.serial_number = Some(rand::random::<u64>().into()); // Unique serial
let (not_before, not_after) = validity_period();
params.not_before = not_before;
params.not_after = not_after;
// Sign with CA
let agent_cert = params.signed_by(&agent_keypair, &issuer)?;
let agent_cert_pem = agent_cert.pem();
let agent_key_pem = agent_keypair.serialize_pem();
// Save agent certificate and private key to output directory
debug!(
"Saving agent certificate and key to output directory: {:?}",
output_path_dir
);
std::fs::write(&cert_path, agent_cert_pem.as_bytes())?;
std::fs::set_permissions(&cert_path, std::fs::Permissions::from_mode(0o600))?;
std::fs::write(&key_path, agent_key_pem.as_bytes())?;
std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600))?;
Ok(AgentCertPathInfo {
cert_path: cert_path.to_string_lossy().to_string(),
key_path: key_path.to_string_lossy().to_string(),
ca_cert_path: ca_cert_path.to_string(),
})
}
async fn zip_certificates(
&self,
cert_path: &str,
key_path: &str,
ca_cert_path: &str,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let zip_path = format!("{}.zip", cert_path.trim_end_matches(".pem"));
let file = std::fs::File::create(&zip_path)?;
let mut zip = zip::ZipWriter::new(file);
let options = zip::write::SimpleFileOptions::default()
.compression_method(zip::CompressionMethod::Deflated)
.unix_permissions(0o600);
zip.start_file("cert.pem", options)?;
let cert_data = std::fs::read(cert_path)?;
zip.write_all(&cert_data)?;
zip.start_file("key.pem", options)?;
let key_data = std::fs::read(key_path)?;
zip.write_all(&key_data)?;
zip.start_file("ca.pem", options)?;
let ca_cert_data = std::fs::read(ca_cert_path)?;
zip.write_all(&ca_cert_data)?;
zip.finish()?;
Ok(zip_path)
}
fn get_sans(&self, connection_type: ConnectionType) -> (Vec<std::net::IpAddr>, Vec<Ia5String>) {
let cert_settings = match connection_type {
ConnectionType::GRPC => &self.settings.grpc.certificate,
ConnectionType::HTTP => &self.settings.server.certificate,
};
(cert_settings.san_ip.clone(), cert_settings.san_dns.clone())
}
}
fn validity_period() -> (OffsetDateTime, OffsetDateTime) {
let year = Duration::new(365 * 86400, 0);
let not_before = OffsetDateTime::now_utc();
let not_after = match not_before.checked_add(year) {
Some(v) => v,
None => not_before,
};
(not_before, not_after)
}

View File

@@ -0,0 +1,40 @@
use std::sync::Arc;
use crate::{connector::agent::AgentConnectorTrait, service::certificate::CertificateService};
pub mod agent;
pub mod certificate;
pub async fn start_master_server(
settings: crate::config::settings::Settings,
cli: crate::cli::Cli,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Initialize database connection
let db_connection = crate::db::establish_connection(&settings.database.url).await?;
// Initialize certificate service with default cert folder path
let cert_service = Arc::new(crate::service::certificate::CertificateServiceImpl::new(
db_connection.clone(),
settings.grpc.certificate.cert_dir.clone(),
Arc::new(settings.clone()),
));
// if generate_ca is set, generate a new certificate and exit
if cli.generate_ca {
// TODO: check the error type and return a more specific error message
cert_service.generate_ca_cert().await.ok();
println!("Certificate generated and stored successfully.");
}
// Initialize agent connector
let mut agent_connector = crate::connector::agent::AgentConnector::new(Box::new(
crate::connector::agent::ssh::SshAgentConnector::new(settings.clone())?,
));
// Start the agent server
agent_connector
.start_server(&settings, cert_service, db_connection)
.await?;
Ok(())
}