feat: implement transaction handling for upstream and target operations
- Added transaction support in `add_upstream_target`, `remove_upstream`, `remove_upstream_target`, `update_upstream`, and `update_upstream_target` functions to ensure atomicity of operations. - Updated the `NginxService` to include methods for validating and applying configurations using the agent service. - Enhanced error handling in agent service interactions, returning appropriate internal server errors when agent communication fails. - Introduced mock agent service for testing, allowing for simulation of agent interactions without actual network calls. - Refactored tests to cover scenarios where agent operations fail, ensuring that internal server errors are returned as expected.
This commit is contained in:
@@ -1,14 +1,104 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use agent_client::apis::{ApiClient, configuration::Configuration};
|
||||
use agent_client::{
|
||||
apis::{
|
||||
Api, ApiClient, Error as ApiError, ResponseContent,
|
||||
configuration::Configuration,
|
||||
nginx_agent_api::{ValidateAndReloadParams, ValidateParams, WriteConfigParams},
|
||||
},
|
||||
models::{ValidateAndReloadBody, ValidateBody, WriteConfigBody},
|
||||
};
|
||||
use tracing::warn;
|
||||
|
||||
use crate::configs::agent::AgentSettings;
|
||||
use crate::{configs::agent::AgentSettings, errors::service_error::ServiceError};
|
||||
|
||||
pub struct AgentService {
|
||||
#[derive(Debug)]
|
||||
pub enum AgentError {
|
||||
// (internal messages, user-facing messages)
|
||||
#[allow(dead_code)]
|
||||
ValidationFailed(String, String),
|
||||
// (internal messages, user-facing messages)
|
||||
ApplicationFailed(String, String),
|
||||
}
|
||||
|
||||
impl From<AgentError> for ServiceError {
|
||||
fn from(err: AgentError) -> Self {
|
||||
match err {
|
||||
AgentError::ValidationFailed(_internal, user) => ServiceError::InternalError(user),
|
||||
AgentError::ApplicationFailed(_internal, user) => ServiceError::InternalError(user),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: std::fmt::Debug> From<ResponseContent<T>> for AgentError {
|
||||
fn from(err: ResponseContent<T>) -> Self {
|
||||
let ResponseContent {
|
||||
status,
|
||||
content,
|
||||
entity,
|
||||
} = err;
|
||||
{
|
||||
let entity_str = entity
|
||||
.map(|e| format!("{:?}", e))
|
||||
.unwrap_or_else(|| "<empty>".to_string());
|
||||
AgentError::ApplicationFailed(
|
||||
format!(
|
||||
"Agent responded with error status {}: {}, entity: {}",
|
||||
status, content, entity_str
|
||||
),
|
||||
"Agent reported an error during operation.".to_string(),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: std::fmt::Debug> From<ApiError<T>> for AgentError {
|
||||
fn from(err: ApiError<T>) -> Self {
|
||||
match err {
|
||||
ApiError::ResponseError(resp) => AgentError::from(resp),
|
||||
ApiError::Io(err) => AgentError::ApplicationFailed(
|
||||
format!("IO error during agent communication: {}", err),
|
||||
"Failed to communicate with the agent.".to_string(),
|
||||
),
|
||||
ApiError::Reqwest(err) => AgentError::ApplicationFailed(
|
||||
format!("Reqwest error during agent communication: {}", err),
|
||||
"Failed to communicate with the agent.".to_string(),
|
||||
),
|
||||
ApiError::Serde(err) => AgentError::ApplicationFailed(
|
||||
format!("Serialization error during agent communication: {}", err),
|
||||
"Failed to communicate with the agent.".to_string(),
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg_attr(test, mockall::automock)]
|
||||
#[async_trait::async_trait]
|
||||
pub trait AgentService: Send + Sync {
|
||||
#[allow(dead_code)]
|
||||
fn get_client(&self) -> Arc<ApiClient>;
|
||||
|
||||
// TODO: improve error handling and reporting, error reasons
|
||||
// validate configurations that has been created/updated before the given timestamp
|
||||
#[allow(dead_code)]
|
||||
async fn validate(&self, config: &str) -> Result<(), AgentError>;
|
||||
// validate and apply configurations that has been created/updated before the given timestamp
|
||||
async fn apply(&self, config: &str) -> Result<(), AgentError>;
|
||||
}
|
||||
|
||||
pub struct AgentServiceImpl {
|
||||
client: Arc<ApiClient>,
|
||||
}
|
||||
|
||||
impl AgentServiceImpl {
|
||||
pub fn new(config: impl Into<Arc<Configuration>>) -> Self {
|
||||
let client = ApiClient::new(config.into());
|
||||
AgentServiceImpl {
|
||||
client: Arc::new(client),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<AgentSettings> for Configuration {
|
||||
fn from(settings: AgentSettings) -> Self {
|
||||
let mut config = Configuration::default();
|
||||
@@ -27,17 +117,73 @@ impl From<AgentSettings> for Configuration {
|
||||
}
|
||||
}
|
||||
|
||||
impl AgentService {
|
||||
pub fn new(config: impl Into<Arc<Configuration>>) -> Self {
|
||||
let client = ApiClient::new(config.into());
|
||||
AgentService {
|
||||
client: Arc::new(client),
|
||||
}
|
||||
#[async_trait::async_trait]
|
||||
impl AgentService for AgentServiceImpl {
|
||||
fn get_client(&self) -> Arc<ApiClient> {
|
||||
Arc::clone(&self.client)
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn get_client(&self) -> Arc<ApiClient> {
|
||||
Arc::clone(&self.client)
|
||||
async fn validate(&self, config: &str) -> Result<(), AgentError> {
|
||||
let timestamp = chrono::Utc::now().timestamp_millis();
|
||||
let name = Self::get_config_name(true);
|
||||
self._validate(&name, timestamp, config).await
|
||||
}
|
||||
|
||||
async fn apply(&self, config: &str) -> Result<(), AgentError> {
|
||||
let timestamp = chrono::Utc::now().timestamp_millis();
|
||||
let name = Self::get_config_name(false);
|
||||
self._validate(&name, timestamp, config).await?;
|
||||
self._apply(&name, timestamp).await
|
||||
}
|
||||
}
|
||||
|
||||
impl AgentServiceImpl {
|
||||
fn get_config_name(is_validate_only: bool) -> String {
|
||||
format!(
|
||||
"nginx_config_{}{}",
|
||||
if is_validate_only {
|
||||
"validation_"
|
||||
} else {
|
||||
"application_"
|
||||
},
|
||||
uuid::Uuid::new_v4()
|
||||
)
|
||||
}
|
||||
|
||||
async fn _validate(&self, name: &str, timestamp: i64, config: &str) -> Result<(), AgentError> {
|
||||
let api = self.client.nginx_agent_api();
|
||||
|
||||
api.write_config(WriteConfigParams {
|
||||
write_config_body: WriteConfigBody {
|
||||
config_name: name.to_string(),
|
||||
content: config.to_string(),
|
||||
timestamp,
|
||||
},
|
||||
})
|
||||
.await?;
|
||||
|
||||
api.validate(ValidateParams {
|
||||
validate_body: ValidateBody {
|
||||
config_name: name.to_string(),
|
||||
timestamp,
|
||||
},
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn _apply(&self, name: &str, timestamp: i64) -> Result<(), AgentError> {
|
||||
let api = self.client.nginx_agent_api();
|
||||
api.validate_and_reload(ValidateAndReloadParams {
|
||||
validate_and_reload_body: ValidateAndReloadBody {
|
||||
config_name: name.to_string(),
|
||||
timestamp,
|
||||
},
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,7 +202,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_agent_service_creation() {
|
||||
let config = Configuration::default();
|
||||
let service = AgentService::new(config);
|
||||
let service = AgentServiceImpl::new(config);
|
||||
let client = service.get_client();
|
||||
assert!(Arc::ptr_eq(&client, &service.client));
|
||||
}
|
||||
|
||||
@@ -6,11 +6,21 @@ pub mod upstream;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use sea_orm::DatabaseConnection;
|
||||
use sea_orm::{DatabaseConnection, DatabaseTransaction};
|
||||
|
||||
use crate::services::nginx::upstream::{UpstreamService, UpstreamServiceImpl};
|
||||
use crate::{
|
||||
errors::service_error::ServiceError,
|
||||
services::{
|
||||
agent_client::AgentService,
|
||||
nginx::{
|
||||
builder::{NginxConfigBuilder, NginxConfigProvider},
|
||||
upstream::{UpstreamService, UpstreamServiceImpl},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
pub struct NginxService {
|
||||
#[allow(dead_code)]
|
||||
connection: Arc<DatabaseConnection>,
|
||||
//
|
||||
upstream_service: Arc<dyn UpstreamService>,
|
||||
@@ -28,4 +38,49 @@ impl NginxService {
|
||||
pub fn get_upstream_service(&self) -> Arc<dyn UpstreamService> {
|
||||
self.upstream_service.clone()
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub async fn validate_config(
|
||||
&self,
|
||||
agent: Arc<dyn AgentService>,
|
||||
config: &str,
|
||||
) -> Result<(), ServiceError> {
|
||||
agent.validate(config).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn apply_changes(
|
||||
&self,
|
||||
agent: Arc<dyn AgentService>,
|
||||
config: &str,
|
||||
) -> Result<(), ServiceError> {
|
||||
agent.apply(config).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn generate_config(
|
||||
&self,
|
||||
tx: Option<&mut DatabaseTransaction>,
|
||||
) -> Result<String, ServiceError> {
|
||||
let mut builder = NginxConfigBuilder::default();
|
||||
self.upstream_service
|
||||
.generate_config(&mut builder, tx)
|
||||
.await?;
|
||||
|
||||
Ok(builder.to_nginx_config(None))
|
||||
}
|
||||
|
||||
pub async fn regenerate_and_apply_config(
|
||||
&self,
|
||||
agent: Arc<dyn AgentService>,
|
||||
tx: Option<&mut DatabaseTransaction>,
|
||||
) -> Result<(), ServiceError> {
|
||||
let config = self.generate_config(tx).await?;
|
||||
|
||||
self.apply_changes(agent, &config).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,17 +6,12 @@ pub trait NginxConfigProvider {
|
||||
fn to_nginx_config(&self, indent: Option<usize>) -> String;
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct NginxConfigBuilder {
|
||||
upstreams: Vec<UpstreamInfo>,
|
||||
}
|
||||
|
||||
impl NginxConfigBuilder {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
upstreams: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_upstream(&mut self, upstream: UpstreamInfo) {
|
||||
self.upstreams.push(upstream);
|
||||
}
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use optfield::optfield;
|
||||
|
||||
use database::generated::entities::{upstream, upstream_target};
|
||||
use sea_orm::ActiveValue::{Set, Unchanged};
|
||||
|
||||
@@ -10,9 +10,14 @@ use database::generated::entities::{upstream, upstream_target};
|
||||
use crate::{
|
||||
errors::service_error::ServiceError,
|
||||
helpers::database::PaginationFilter,
|
||||
services::nginx::info::{
|
||||
upstream::{UpdateUpstreamInfo, UpstreamCreateInfo, UpstreamInfo},
|
||||
upstream_target::{UpdateUpstreamTargetInfo, UpstreamTargetCreateInfo, UpstreamTargetInfo},
|
||||
services::nginx::{
|
||||
builder::NginxConfigBuilder,
|
||||
info::{
|
||||
upstream::{UpdateUpstreamInfo, UpstreamCreateInfo, UpstreamInfo},
|
||||
upstream_target::{
|
||||
UpdateUpstreamTargetInfo, UpstreamTargetCreateInfo, UpstreamTargetInfo,
|
||||
},
|
||||
},
|
||||
},
|
||||
with_conn,
|
||||
};
|
||||
@@ -57,6 +62,7 @@ pub trait UpstreamService: Send + Sync {
|
||||
options: Option<GetUpstreamTargetOptions>,
|
||||
tx: Option<&mut DatabaseTransaction>,
|
||||
) -> Result<UpstreamTargetInfo, ServiceError>;
|
||||
#[allow(dead_code)]
|
||||
async fn get_upstream_targets_by_upstream(
|
||||
&self,
|
||||
upstream_id: uuid::Uuid,
|
||||
@@ -73,6 +79,11 @@ pub trait UpstreamService: Send + Sync {
|
||||
target_id: uuid::Uuid,
|
||||
tx: Option<&mut DatabaseTransaction>,
|
||||
) -> Result<(), ServiceError>;
|
||||
async fn generate_config(
|
||||
&self,
|
||||
builder: &mut NginxConfigBuilder,
|
||||
tx: Option<&mut DatabaseTransaction>,
|
||||
) -> Result<(), ServiceError>;
|
||||
}
|
||||
|
||||
pub struct UpstreamServiceImpl {
|
||||
@@ -387,6 +398,26 @@ impl UpstreamService for UpstreamServiceImpl {
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
async fn generate_config(
|
||||
&self,
|
||||
builder: &mut NginxConfigBuilder,
|
||||
tx: Option<&mut DatabaseTransaction>,
|
||||
) -> Result<(), ServiceError> {
|
||||
// get all upstreams and their targets
|
||||
let upstreams = with_conn!(&*self.connection, tx, conn, {
|
||||
upstream::Entity::find()
|
||||
.find_with_related(upstream_target::Entity)
|
||||
.all(*conn)
|
||||
.await?
|
||||
});
|
||||
let upstreams_info = upstreams
|
||||
.into_iter()
|
||||
.map(|(up_model, target_models)| (up_model, target_models).into())
|
||||
.collect::<Vec<UpstreamInfo>>();
|
||||
builder.add_upstreams(upstreams_info);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -649,10 +680,16 @@ mod tests {
|
||||
|
||||
let db = MockDatabase::new(DatabaseBackend::Sqlite)
|
||||
.append_query_results(vec![vec![existing.clone()]])
|
||||
.append_exec_results(vec![MockExecResult {
|
||||
rows_affected: 1,
|
||||
last_insert_id: 0,
|
||||
}])
|
||||
.append_exec_results(vec![
|
||||
MockExecResult {
|
||||
rows_affected: 1,
|
||||
last_insert_id: 0,
|
||||
},
|
||||
MockExecResult {
|
||||
rows_affected: 1,
|
||||
last_insert_id: 0,
|
||||
},
|
||||
])
|
||||
.into_connection();
|
||||
|
||||
let svc = UpstreamServiceImpl::new(Arc::new(db));
|
||||
|
||||
Reference in New Issue
Block a user