Files
YANPM/apps/api/src/services/agent_client.rs

263 lines
8.3 KiB
Rust

use std::{os::unix::fs::FileTypeExt, sync::Arc};
use agent_client::{
apis::{
Api, ApiClient, Error as ApiError, ResponseContent,
configuration::Configuration,
nginx_agent_api::{ValidateAndReloadParams, ValidateParams, WriteConfigParams},
},
models::{ValidateAndReloadBody, ValidateBody, WriteConfigBody},
};
use tracing::warn;
use crate::{configs::agent::AgentSettings, errors::service_error::ServiceError};
#[derive(Debug)]
pub enum AgentError {
// (internal messages, user-facing messages)
#[allow(dead_code)]
ValidationFailed(String, String),
// (internal messages, user-facing messages)
ApplicationFailed(String, String),
}
impl From<AgentError> for ServiceError {
fn from(err: AgentError) -> Self {
match err {
AgentError::ValidationFailed(_internal, user) => ServiceError::InternalError(user),
AgentError::ApplicationFailed(_internal, user) => ServiceError::InternalError(user),
}
}
}
impl<T: std::fmt::Debug> From<ResponseContent<T>> for AgentError {
fn from(err: ResponseContent<T>) -> Self {
let ResponseContent {
status,
content,
entity,
} = err;
{
let entity_str = entity
.map(|e| format!("{:?}", e))
.unwrap_or_else(|| "<empty>".to_string());
AgentError::ApplicationFailed(
format!(
"Agent responded with error status {}: {}, entity: {}",
status, content, entity_str
),
"Agent reported an error during operation.".to_string(),
)
}
}
}
impl<T: std::fmt::Debug> From<ApiError<T>> for AgentError {
fn from(err: ApiError<T>) -> Self {
match err {
ApiError::ResponseError(resp) => AgentError::from(resp),
ApiError::Io(err) => AgentError::ApplicationFailed(
format!("IO error during agent communication: {}", err),
"Failed to communicate with the agent.".to_string(),
),
ApiError::Reqwest(err) => AgentError::ApplicationFailed(
format!("Reqwest error during agent communication: {}", err),
"Failed to communicate with the agent.".to_string(),
),
ApiError::Serde(err) => AgentError::ApplicationFailed(
format!("Serialization error during agent communication: {}", err),
"Failed to communicate with the agent.".to_string(),
),
}
}
}
#[cfg_attr(test, mockall::automock)]
#[async_trait::async_trait]
pub trait AgentService: Send + Sync {
#[allow(dead_code)]
fn get_client(&self) -> Arc<ApiClient>;
// TODO: improve error handling and reporting, error reasons
// validate configurations that has been created/updated before the given timestamp
#[allow(dead_code)]
async fn validate(&self, config: &str) -> Result<(), AgentError>;
// validate and apply configurations that has been created/updated before the given timestamp
async fn apply(&self, config: &str) -> Result<(), AgentError>;
}
pub struct AgentServiceImpl {
client: Arc<ApiClient>,
}
impl AgentServiceImpl {
pub fn new(config: impl Into<Arc<Configuration>>) -> Self {
let client = ApiClient::new(config.into());
AgentServiceImpl {
client: Arc::new(client),
}
}
}
impl From<AgentSettings> for Configuration {
fn from(settings: AgentSettings) -> Self {
let mut config = Configuration::default();
let mut builder = reqwest::Client::builder();
let url = settings.socket_path;
// check if the url is a unix socket path
let is_socket = std::fs::metadata(&url).is_ok_and(|m| m.file_type().is_socket());
if is_socket || url.starts_with("unix://") {
builder = builder.unix_socket(url.to_string());
config.client = builder.build().expect("Failed to build reqwest client");
} else {
warn!("AgentSettings contains a non-unix socket path: {}", url);
config.base_path = url;
}
config
}
}
#[async_trait::async_trait]
impl AgentService for AgentServiceImpl {
fn get_client(&self) -> Arc<ApiClient> {
Arc::clone(&self.client)
}
async fn validate(&self, config: &str) -> Result<(), AgentError> {
let timestamp = chrono::Utc::now().timestamp_millis();
let name = Self::get_config_name(true);
self._validate(&name, timestamp, config).await
}
async fn apply(&self, config: &str) -> Result<(), AgentError> {
let timestamp = chrono::Utc::now().timestamp_millis();
let name = Self::get_config_name(false);
self._validate(&name, timestamp, config).await?;
self._apply(&name, timestamp).await
}
}
impl AgentServiceImpl {
fn get_config_name(is_validate_only: bool) -> String {
format!(
"nginx_config_{}{}",
if is_validate_only {
"validation_"
} else {
"application_"
},
uuid::Uuid::new_v4()
)
}
async fn _validate(&self, name: &str, timestamp: i64, config: &str) -> Result<(), AgentError> {
let api = self.client.nginx_agent_api();
api.write_config(WriteConfigParams {
write_config_body: WriteConfigBody {
config_name: name.to_string(),
content: config.to_string(),
timestamp,
},
})
.await?;
api.validate(ValidateParams {
validate_body: ValidateBody {
config_name: name.to_string(),
timestamp,
},
})
.await?;
Ok(())
}
async fn _apply(&self, name: &str, timestamp: i64) -> Result<(), AgentError> {
let api = self.client.nginx_agent_api();
api.validate_and_reload(ValidateAndReloadParams {
validate_and_reload_body: ValidateAndReloadBody {
config_name: name.to_string(),
timestamp,
},
})
.await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use agent_client::{
apis::{Api, nginx_agent_api::StatusSuccess},
models::StatusResp,
};
use axum::{http::StatusCode, response::Json};
use std::time::Duration;
use tempfile::tempdir;
use tokio::time::sleep;
#[test]
fn test_agent_service_creation() {
let config = Configuration::default();
let service = AgentServiceImpl::new(config);
let client = service.get_client();
assert!(Arc::ptr_eq(&client, &service.client));
}
#[tokio::test]
async fn test_agent_socket_support() {
// create temporary socket path
let dir = tempdir().expect("Failed to create temp dir");
let socket_path = dir.path().join("agent.sock");
// create axum app with a simple /status route
let app = axum::Router::new().route(
"/status",
axum::routing::get(|| async {
let result: (StatusCode, StatusResp) = (StatusCode::OK, StatusResp { ok: true });
(result.0, Json(result.1))
}),
);
// bind tokio unix listener and serve in background
let listener =
tokio::net::UnixListener::bind(&socket_path).expect("Failed to bind to unix socket");
let server_fut = axum::serve::serve(listener, app);
let _srv = tokio::spawn(async move {
let _ = server_fut.await;
});
// give server a moment to start
sleep(Duration::from_millis(50)).await;
let client: ApiClient = ApiClient::new(Arc::new(Configuration {
base_path: "http://localhost".to_string(),
client: reqwest::Client::builder()
.unix_socket(socket_path.clone())
.build()
.expect("Failed to build reqwest client"),
..Default::default()
}));
let res = client
.nginx_agent_api()
.status()
.await
.expect("Failed to get status");
let body = res.entity.expect("Response entity is missing");
assert!(res.status.is_success());
if let StatusSuccess::Status200(body) = body {
assert!(body.ok);
} else {
panic!("Unexpected response body");
}
}
}