feat: implement update handlers for upstream and upstream target management

This commit is contained in:
GW_MC
2025-12-30 15:09:49 +08:00
parent b43f9fcb00
commit f4db47daf2
7 changed files with 555 additions and 35 deletions

View File

@@ -2,6 +2,7 @@ use chrono::{DateTime, Utc};
use optfield::optfield;
use database::generated::entities::{upstream, upstream_target};
use sea_orm::ActiveValue::{Set, Unchanged};
use uuid::Uuid;
use crate::{
@@ -13,15 +14,14 @@ use crate::{
set_if_some,
};
#[optfield(pub UpdateUpstreamInfo)]
#[derive(Clone)]
pub struct UpstreamInfo {
pub id: uuid::Uuid,
pub id: Uuid,
pub name: String,
pub protocol: String,
pub algorithm: String,
pub sticky_session: bool,
pub created_by: Option<uuid::Uuid>,
pub created_by: Option<Uuid>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
//
@@ -33,11 +33,21 @@ pub struct UpstreamCreateInfo {
pub protocol: String,
pub algorithm: String,
pub sticky_session: bool,
pub created_by: Option<uuid::Uuid>,
pub created_by: Option<Uuid>,
//
pub upstream_targets: Vec<upstream_target_info::UpstreamTargetCreateInfo>,
}
#[derive(Clone)]
pub struct UpdateUpstreamInfo {
pub name: Option<String>,
pub protocol: Option<String>,
pub algorithm: Option<String>,
pub sticky_session: Option<bool>,
//
pub upstream_targets: Option<Vec<(Uuid, bool)>>,
}
impl NginxConfigProvider for UpstreamInfo {
fn to_nginx_config(&self, indent: Option<usize>) -> String {
let targets_config: Vec<String> = self
@@ -142,18 +152,14 @@ impl From<UpstreamInfo> for (upstream::ActiveModel, Vec<upstream_target::ActiveM
impl UpdateUpstreamInfo {
pub fn apply_to_model(self, current_model: upstream::Model) -> upstream::ActiveModel {
upstream::ActiveModel {
id: sea_orm::ActiveValue::Unchanged(current_model.id),
id: Unchanged(current_model.id),
name: set_if_some!(self.name),
protocol: set_if_some!(self.protocol),
algorithm: set_if_some!(self.algorithm),
sticky_session: set_if_some!(self.sticky_session),
created_by: set_if_some!(if self.created_by.is_some() {
Some(self.created_by)
} else {
None
}),
created_at: set_if_some!(self.created_at),
updated_at: set_if_some!(self.updated_at),
created_by: Unchanged(current_model.created_by),
created_at: Unchanged(current_model.created_at),
updated_at: Set(chrono::Utc::now()),
}
}
}

View File

@@ -1,5 +1,4 @@
use chrono::{DateTime, Utc};
use optfield::optfield;
use sea_orm::ActiveValue::{Set, Unchanged};
use uuid::Uuid;
@@ -11,7 +10,6 @@ use crate::{
set_if_some,
};
#[optfield(pub UpdateUpstreamTargetInfo)]
#[derive(Clone)]
pub struct UpstreamTargetInfo {
pub id: uuid::Uuid,
@@ -27,6 +25,15 @@ pub struct UpstreamTargetInfo {
pub upstream: Option<UpstreamBasicInfo>,
}
#[derive(Clone)]
pub struct UpdateUpstreamTargetInfo {
pub target_host: Option<String>,
pub target_port: Option<i64>,
pub weight: Option<i64>,
pub is_backup: Option<bool>,
pub enabled: Option<bool>,
}
#[derive(Clone)]
pub struct UpstreamBasicInfo {
pub id: uuid::Uuid,
@@ -146,9 +153,9 @@ impl UpdateUpstreamTargetInfo {
weight: set_if_some!(self.weight),
is_backup: set_if_some!(self.is_backup),
enabled: set_if_some!(self.enabled),
created_at: set_if_some!(self.created_at),
updated_at: set_if_some!(self.updated_at),
upstream_id: set_if_some!(self.upstream_id),
created_at: Unchanged(current_model.created_at),
updated_at: Set(chrono::Utc::now()),
upstream_id: Unchanged(current_model.upstream_id),
}
}
}

View File

@@ -142,7 +142,16 @@ impl UpstreamService {
upstream: UpdateUpstreamInfo,
tx: Option<&mut DatabaseTransaction>,
) -> Result<UpstreamInfo, ServiceError> {
let current_model = with_conn!(&*self.connection, tx, conn, {
// If a transaction was provided use it, otherwise create and own one here.
let mut maybe_owned_tx: Option<DatabaseTransaction> = None;
let tx_ref: Option<&mut DatabaseTransaction> = if let Some(tx) = tx {
Some(tx)
} else {
maybe_owned_tx = Some(self.connection.begin().await?);
maybe_owned_tx.as_mut()
};
let current_model = with_conn!(&*self.connection, tx_ref, conn, {
upstream::Entity::find_by_id(id)
.one(*conn)
.await?
@@ -151,9 +160,36 @@ impl UpstreamService {
id
)))?
});
let active_model = upstream.apply_to_model(current_model);
let upstream_active_model = upstream.clone().apply_to_model(current_model);
let r = active_model.update(&*self.connection).await?;
let r = with_conn!(&*self.connection, tx_ref, conn, {
let updated_upstream_model = upstream_active_model.update(*conn).await?;
// update upstream targets if any
if let Some(targets) = upstream.upstream_targets {
for (target_id, enabled) in targets.into_iter() {
let target_model = upstream_target::Entity::find_by_id(target_id)
.one(*conn)
.await?
.ok_or(ServiceError::NotFound(format!(
"Upstream target with id {} not found",
target_id
)))?;
let mut target_active_model: upstream_target::ActiveModel = target_model.into();
target_active_model.enabled = sea_orm::ActiveValue::Set(enabled);
target_active_model.update(*conn).await?;
Ok::<(), ServiceError>(())?;
}
}
updated_upstream_model
});
// Commit
if let Some(t) = maybe_owned_tx.take() {
t.commit().await?;
}
Ok(r.into())
}
@@ -494,14 +530,10 @@ mod tests {
let svc = UpstreamService::new(Arc::new(db));
let update_info = crate::services::nginx::info::upstream::UpdateUpstreamInfo {
id: None,
name: None,
protocol: None,
algorithm: None,
sticky_session: None,
created_by: None,
created_at: None,
updated_at: None,
upstream_targets: None,
};
let res = svc.update_upstream(id, update_info, None).await;
@@ -522,14 +554,11 @@ mod tests {
.update_upstream(
uuid::Uuid::new_v4(),
crate::services::nginx::info::upstream::UpdateUpstreamInfo {
id: None,
name: None,
protocol: None,
algorithm: None,
sticky_session: None,
created_by: None,
created_at: None,
updated_at: None,
upstream_targets: None,
},
None,
@@ -650,16 +679,11 @@ mod tests {
let svc = UpstreamService::new(Arc::new(db));
let update_info = crate::services::nginx::info::upstream_target::UpdateUpstreamTargetInfo {
id: None,
target_host: None,
target_port: None,
weight: None,
is_backup: None,
enabled: None,
created_at: None,
updated_at: None,
upstream_id: None,
upstream: None,
};
let res = svc.update_upstream_target(id, update_info, None).await;
assert!(res.is_ok());