feat: stub master file structures
This commit is contained in:
194
crates/nxmesh-master/src/grpc/agent_service.rs
Normal file
194
crates/nxmesh-master/src/grpc/agent_service.rs
Normal file
@@ -0,0 +1,194 @@
|
||||
//! Agent gRPC service
|
||||
|
||||
use chrono::Utc;
|
||||
use nxmesh_proto::{
|
||||
agent_service_server::AgentService, agent::AgentMessage, Ack, ConfigUpdate, HealthReport,
|
||||
MasterMessage, MetricsBatch, RegistrationRequest, RegistrationResponse,
|
||||
};
|
||||
use sea_orm::ActiveModelTrait;
|
||||
use sea_orm::Set;
|
||||
use tonic::{Request, Response, Status, Streaming};
|
||||
use futures::Stream;
|
||||
use std::pin::Pin;
|
||||
use tracing::{error, info, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::db::entities::agent;
|
||||
use crate::db::Database;
|
||||
|
||||
/// Agent service implementation
|
||||
#[derive(Debug)]
|
||||
pub struct AgentServiceImpl {
|
||||
db: Database,
|
||||
}
|
||||
|
||||
impl AgentServiceImpl {
|
||||
pub fn new(db: Database) -> Self {
|
||||
Self { db }
|
||||
}
|
||||
|
||||
async fn handle_registration(
|
||||
&self,
|
||||
request: RegistrationRequest,
|
||||
) -> Result<RegistrationResponse, Status> {
|
||||
info!("Agent registration request: hostname={}", request.hostname);
|
||||
|
||||
// TODO: Validate token properly
|
||||
// For now, create a new agent record
|
||||
let agent_id = Uuid::new_v4();
|
||||
|
||||
let now = Utc::now();
|
||||
let agent = agent::ActiveModel {
|
||||
id: Set(agent_id),
|
||||
workspace_id: Set(Uuid::nil()), // TODO: Get from token
|
||||
name: Set(request.hostname.clone()),
|
||||
hostname: Set(request.hostname),
|
||||
ip_address: Set(Some(request.ip_address)),
|
||||
version: Set(Some(request.version)),
|
||||
state: Set("online".to_string()),
|
||||
deployment_mode: Set(Some(format!("{:?}", request.deployment_mode))),
|
||||
last_seen_at: Set(Some(now.into())),
|
||||
capabilities: Set(Some(serde_json::json!(request.capabilities))),
|
||||
labels: Set(Some(serde_json::json!(request.labels))),
|
||||
token_hash: Set(None),
|
||||
created_at: Set(now.into()),
|
||||
updated_at: Set(now.into()),
|
||||
};
|
||||
|
||||
match agent.insert(self.db.conn()).await {
|
||||
Ok(_) => {
|
||||
info!("Agent registered successfully: {}", agent_id);
|
||||
Ok(RegistrationResponse {
|
||||
agent_id: agent_id.to_string(),
|
||||
success: true,
|
||||
error_message: String::new(),
|
||||
heartbeat_interval_seconds: 30,
|
||||
})
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to register agent: {}", e);
|
||||
Err(Status::internal(format!("Database error: {}", e)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_health_report(&self, report: HealthReport, agent_id: &str) -> Result<(), Status> {
|
||||
let agent_uuid = Uuid::parse_str(agent_id)
|
||||
.map_err(|_| Status::invalid_argument("Invalid agent ID"))?;
|
||||
|
||||
// Update agent's last_seen_at
|
||||
let now = Utc::now();
|
||||
let agent = agent::ActiveModel {
|
||||
id: Set(agent_uuid),
|
||||
last_seen_at: Set(Some(now.into())),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
if let Err(e) = agent.update(self.db.conn()).await {
|
||||
warn!("Failed to update agent last_seen: {}", e);
|
||||
}
|
||||
|
||||
// TODO: Store health report in time-series database
|
||||
if let Some(nginx) = report.nginx {
|
||||
info!(
|
||||
"Health report from {}: nginx_running={}",
|
||||
agent_id, nginx.is_running
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl AgentService for AgentServiceImpl {
|
||||
type StreamStream = Pin<Box<dyn Stream<Item = Result<MasterMessage, Status>> + Send>>;
|
||||
|
||||
/// Bidirectional streaming RPC
|
||||
async fn stream(
|
||||
&self,
|
||||
request: Request<Streaming<AgentMessage>>,
|
||||
) -> Result<Response<Self::StreamStream>, Status> {
|
||||
let mut stream = request.into_inner();
|
||||
let db = self.db.clone();
|
||||
|
||||
let output_stream = async_stream::try_stream! {
|
||||
while let Some(result) = stream.message().await? {
|
||||
let msg = result;
|
||||
|
||||
// Handle different message types via payload
|
||||
if let Some(payload) = msg.payload {
|
||||
use nxmesh_proto::agent_message::Payload;
|
||||
match payload {
|
||||
Payload::Registration(_reg) => {
|
||||
info!("Received registration in stream from agent {}", msg.agent_id);
|
||||
}
|
||||
Payload::Health(health) => {
|
||||
if let Err(e) = Self::handle_health_report(&Self { db: db.clone() }, health, &msg.agent_id).await {
|
||||
warn!("Failed to handle health report: {}", e);
|
||||
}
|
||||
}
|
||||
Payload::Metrics(_metrics) => {
|
||||
info!("Received metrics from agent {}", msg.agent_id);
|
||||
}
|
||||
Payload::ConfigStatus(_status) => {
|
||||
info!("Received config status from agent {}", msg.agent_id);
|
||||
}
|
||||
Payload::Logs(_logs) => {
|
||||
info!("Received logs from agent {}", msg.agent_id);
|
||||
}
|
||||
Payload::Event(_event) => {
|
||||
info!("Received event from agent {}", msg.agent_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Echo back a response
|
||||
yield MasterMessage {
|
||||
timestamp: Utc::now().timestamp(),
|
||||
..Default::default()
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Response::new(Box::pin(output_stream)))
|
||||
}
|
||||
|
||||
/// Report health status
|
||||
async fn report_health(
|
||||
&self,
|
||||
request: Request<HealthReport>,
|
||||
) -> Result<Response<Ack>, Status> {
|
||||
// Extract agent ID from metadata before consuming request
|
||||
let agent_id = request.metadata()
|
||||
.get("x-agent-id")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.unwrap_or("unknown")
|
||||
.to_string();
|
||||
|
||||
let report = request.into_inner();
|
||||
|
||||
self.handle_health_report(report, &agent_id).await?;
|
||||
|
||||
Ok(Response::new(Ack {
|
||||
message_id: "health".to_string(),
|
||||
success: true,
|
||||
error_message: String::new(),
|
||||
}))
|
||||
}
|
||||
|
||||
/// Report metrics
|
||||
async fn report_metrics(
|
||||
&self,
|
||||
request: Request<MetricsBatch>,
|
||||
) -> Result<Response<Ack>, Status> {
|
||||
let metrics = request.into_inner();
|
||||
info!("Metrics batch received with {} metrics", metrics.metrics.len());
|
||||
|
||||
Ok(Response::new(Ack {
|
||||
message_id: "metrics".to_string(),
|
||||
success: true,
|
||||
error_message: String::new(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
9
crates/nxmesh-master/src/grpc/interceptor.rs
Normal file
9
crates/nxmesh-master/src/grpc/interceptor.rs
Normal file
@@ -0,0 +1,9 @@
|
||||
//! gRPC interceptors
|
||||
|
||||
use tonic::{Request, Status};
|
||||
|
||||
/// Authentication interceptor
|
||||
pub fn auth_interceptor(req: Request<()>) -> Result<Request<()>, Status> {
|
||||
// TODO: Implement authentication
|
||||
Ok(req)
|
||||
}
|
||||
5
crates/nxmesh-master/src/grpc/mod.rs
Normal file
5
crates/nxmesh-master/src/grpc/mod.rs
Normal file
@@ -0,0 +1,5 @@
|
||||
//! gRPC service
|
||||
|
||||
pub mod agent_service;
|
||||
pub mod interceptor;
|
||||
pub mod server;
|
||||
23
crates/nxmesh-master/src/grpc/server.rs
Normal file
23
crates/nxmesh-master/src/grpc/server.rs
Normal file
@@ -0,0 +1,23 @@
|
||||
//! gRPC server
|
||||
|
||||
use tonic::transport::Server;
|
||||
|
||||
use crate::db::Database;
|
||||
|
||||
use super::agent_service::AgentServiceImpl;
|
||||
|
||||
/// Start the gRPC server
|
||||
pub async fn start(bind_address: &str, db: Database) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let addr = bind_address.parse()?;
|
||||
|
||||
let agent_service = AgentServiceImpl::new(db);
|
||||
|
||||
Server::builder()
|
||||
.add_service(nxmesh_proto::agent_service_server::AgentServiceServer::new(
|
||||
agent_service,
|
||||
))
|
||||
.serve(addr)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user