feature/upstream-service #13

Merged
GW_MC merged 43 commits from feature/upstream-service into master 2026-01-01 10:49:32 +08:00
2 changed files with 40 additions and 6 deletions
Showing only changes of commit 75097a661b - Show all commits

View File

@@ -55,7 +55,14 @@ pub async fn get_upstream_list(
) -> AxumResult<Json<UpstreamListResponse>, ServiceError> { ) -> AxumResult<Json<UpstreamListResponse>, ServiceError> {
let upstream_service = &state.service.nginx.get_upstream_service(); let upstream_service = &state.service.nginx.get_upstream_service();
let upstreams = upstream_service let upstreams = upstream_service
.get_upstreams(Some(pagination.clone().into()), None) .get_upstreams(
Some(pagination.clone().into()),
Some(GetUpstreamOptions {
include_targets: true,
filter_by_enabled: false,
}),
None,
)
.await?; .await?;
// //
@@ -93,6 +100,7 @@ pub async fn get_upstream(
upstream_id, upstream_id,
Some(GetUpstreamOptions { Some(GetUpstreamOptions {
include_targets: true, include_targets: true,
filter_by_enabled: false,
}), }),
None, None,
) )

View File

@@ -1,8 +1,8 @@
use std::sync::Arc; use std::sync::Arc;
use sea_orm::{ use sea_orm::{
ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait, ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait, ExprTrait,
ModelTrait, QueryFilter, QuerySelect, TransactionTrait, ModelTrait, QueryFilter, QuerySelect, QueryTrait, TransactionTrait,
}; };
use database::generated::entities::{upstream, upstream_target}; use database::generated::entities::{upstream, upstream_target};
@@ -38,6 +38,7 @@ pub trait UpstreamService: Send + Sync {
async fn get_upstreams( async fn get_upstreams(
&self, &self,
pagination: Option<PaginationFilter>, pagination: Option<PaginationFilter>,
options: Option<GetUpstreamOptions>,
tx: Option<&mut DatabaseTransaction>, tx: Option<&mut DatabaseTransaction>,
) -> Result<Vec<UpstreamInfo>, ServiceError>; ) -> Result<Vec<UpstreamInfo>, ServiceError>;
async fn update_upstream( async fn update_upstream(
@@ -93,6 +94,7 @@ pub struct UpstreamServiceImpl {
#[derive(Default)] #[derive(Default)]
pub struct GetUpstreamOptions { pub struct GetUpstreamOptions {
pub include_targets: bool, pub include_targets: bool,
pub filter_by_enabled: bool,
} }
#[derive(Default)] #[derive(Default)]
@@ -168,6 +170,9 @@ impl UpstreamService for UpstreamServiceImpl {
)))?; )))?;
let targets = upstream_target::Entity::find() let targets = upstream_target::Entity::find()
.filter(upstream_target::Column::UpstreamId.eq(upstream_id)) .filter(upstream_target::Column::UpstreamId.eq(upstream_id))
.apply_if(Some(concrete_options.filter_by_enabled), |query, _v| {
query.filter(upstream_target::Column::Enabled.eq(true))
})
.all(*conn) .all(*conn)
.await?; .await?;
(up, targets) (up, targets)
@@ -191,6 +196,7 @@ impl UpstreamService for UpstreamServiceImpl {
async fn get_upstreams( async fn get_upstreams(
&self, &self,
pagination: Option<PaginationFilter>, pagination: Option<PaginationFilter>,
options: Option<GetUpstreamOptions>,
tx: Option<&mut DatabaseTransaction>, tx: Option<&mut DatabaseTransaction>,
) -> Result<Vec<UpstreamInfo>, ServiceError> { ) -> Result<Vec<UpstreamInfo>, ServiceError> {
let r = with_conn!(&*self.connection, tx, conn, { let r = with_conn!(&*self.connection, tx, conn, {
@@ -201,7 +207,24 @@ impl UpstreamService for UpstreamServiceImpl {
} else { } else {
find_query find_query
}; };
find_query.all(*conn).await? let find_query = match options {
Some(opts) => {
if opts.include_targets && opts.filter_by_enabled {
find_query.filter(
upstream_target::Column::Enabled
.eq(true)
.or(upstream_target::Column::Id.is_null()),
)
} else {
find_query
}
}
_ => find_query,
};
find_query
.find_with_related(upstream_target::Entity)
.all(*conn)
.await?
}); });
Ok(r.into_iter().map(|m| m.into()).collect()) Ok(r.into_iter().map(|m| m.into()).collect())
@@ -375,7 +398,9 @@ impl UpstreamService for UpstreamServiceImpl {
}); });
let active_model = target.apply_to_model(current_model); let active_model = target.apply_to_model(current_model);
let r = active_model.update(&*self.connection).await?; let r = with_conn!(&*self.connection, tx, conn, {
active_model.update(*conn).await?
});
Ok(r.into()) Ok(r.into())
} }
@@ -505,6 +530,7 @@ mod tests {
up_id, up_id,
Some(GetUpstreamOptions { Some(GetUpstreamOptions {
include_targets: true, include_targets: true,
filter_by_enabled: false,
}), }),
None, None,
) )
@@ -559,7 +585,7 @@ mod tests {
let svc = UpstreamServiceImpl::new(Arc::new(db)); let svc = UpstreamServiceImpl::new(Arc::new(db));
let res = svc.get_upstreams(None, None).await; let res = svc.get_upstreams(None, None, None).await;
assert!(res.is_ok()); assert!(res.is_ok());
let list = res.expect("Failed to get upstreams"); let list = res.expect("Failed to get upstreams");
assert_eq!(list.len(), 2); assert_eq!(list.len(), 2);