feature/upstream-service #13
@@ -1,4 +1,4 @@
|
||||
use std::{option, sync::Arc};
|
||||
use std::sync::Arc;
|
||||
|
||||
use sea_orm::{
|
||||
ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait,
|
||||
@@ -26,6 +26,11 @@ pub struct GetUpstreamOptions {
|
||||
pub include_targets: bool,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct GetUpstreamTargetOptions {
|
||||
pub include_upstream: bool,
|
||||
}
|
||||
|
||||
impl UpstreamService {
|
||||
pub fn new(connection: Arc<DatabaseConnection>) -> Self {
|
||||
Self { connection }
|
||||
@@ -155,18 +160,44 @@ impl UpstreamService {
|
||||
pub async fn get_upstream_target(
|
||||
&self,
|
||||
target_id: uuid::Uuid,
|
||||
options: Option<GetUpstreamTargetOptions>,
|
||||
tx: Option<&mut DatabaseTransaction>,
|
||||
) -> Result<UpstreamTargetInfo, ServiceError> {
|
||||
let r = with_conn!(&*self.connection, tx, conn, {
|
||||
upstream_target::Entity::find_by_id(target_id)
|
||||
.one(*conn)
|
||||
.await?
|
||||
.ok_or(ServiceError::NotFound(format!(
|
||||
"Upstream target with id {} not found",
|
||||
target_id
|
||||
)))?
|
||||
});
|
||||
Ok(r.into())
|
||||
let concrete_options = options.unwrap_or_default();
|
||||
let info: UpstreamTargetInfo = if concrete_options.include_upstream {
|
||||
match with_conn!(&*self.connection, tx, conn, {
|
||||
upstream_target::Entity::find_by_id(target_id)
|
||||
.find_also_related(upstream::Entity)
|
||||
.one(*conn)
|
||||
.await?
|
||||
}) {
|
||||
Some((target_model, Some(upstream_model))) => (target_model, upstream_model).into(),
|
||||
Some((_target_model, None)) => {
|
||||
return Err(ServiceError::InternalError(format!(
|
||||
"Inconsistent data: Upstream target with id {} has no associated upstream",
|
||||
target_id
|
||||
)));
|
||||
}
|
||||
None => {
|
||||
return Err(ServiceError::NotFound(format!(
|
||||
"Upstream target with id {} not found",
|
||||
target_id
|
||||
)));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
with_conn!(&*self.connection, tx, conn, {
|
||||
upstream_target::Entity::find_by_id(target_id)
|
||||
.one(*conn)
|
||||
.await?
|
||||
.ok_or(ServiceError::NotFound(format!(
|
||||
"Upstream target with id {} not found",
|
||||
target_id
|
||||
)))?
|
||||
})
|
||||
.into()
|
||||
};
|
||||
Ok(info)
|
||||
}
|
||||
|
||||
pub async fn get_upstream_targets_by_upstream(
|
||||
@@ -224,3 +255,411 @@ impl UpstreamService {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::sync::Arc;
|
||||
|
||||
use sea_orm::MockExecResult;
|
||||
use sea_orm::{DatabaseBackend, MockDatabase};
|
||||
|
||||
use database::generated::entities::{upstream, upstream_target};
|
||||
|
||||
#[tokio::test]
|
||||
async fn create_upstream_returns_info() {
|
||||
let up_model = upstream::Model {
|
||||
id: uuid::Uuid::new_v4(),
|
||||
name: "test_upstream".to_string(),
|
||||
protocol: "http".to_string(),
|
||||
algorithm: "round_robin".to_string(),
|
||||
sticky_session: false,
|
||||
created_by: None,
|
||||
created_at: chrono::Utc::now(),
|
||||
updated_at: chrono::Utc::now(),
|
||||
};
|
||||
|
||||
let db = MockDatabase::new(DatabaseBackend::Sqlite)
|
||||
.append_query_results(vec![vec![up_model.clone()]])
|
||||
.into_connection();
|
||||
|
||||
let svc = UpstreamService::new(Arc::new(db));
|
||||
|
||||
let create_info = crate::services::nginx::info::upstream::UpstreamCreateInfo {
|
||||
name: "test_upstream".to_string(),
|
||||
protocol: "http".to_string(),
|
||||
algorithm: "round_robin".to_string(),
|
||||
sticky_session: false,
|
||||
created_by: None,
|
||||
upstream_targets: Vec::new(),
|
||||
};
|
||||
|
||||
let res = svc.create_upstream(create_info, None).await;
|
||||
assert!(res.is_ok());
|
||||
let info = res.expect("Failed to create upstream");
|
||||
assert_eq!(info.name, "test_upstream");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn get_upstream_with_targets_returns_targets() {
|
||||
let up_id = uuid::Uuid::new_v4();
|
||||
|
||||
let up_model = upstream::Model {
|
||||
id: up_id,
|
||||
name: "with_targets".to_string(),
|
||||
protocol: "http".to_string(),
|
||||
algorithm: "least_conn".to_string(),
|
||||
sticky_session: true,
|
||||
created_by: None,
|
||||
created_at: chrono::Utc::now(),
|
||||
updated_at: chrono::Utc::now(),
|
||||
};
|
||||
|
||||
let target_model = upstream_target::Model {
|
||||
id: uuid::Uuid::new_v4(),
|
||||
upstream_id: up_id,
|
||||
target_host: "127.0.0.1".to_string(),
|
||||
target_port: 8080,
|
||||
weight: 1,
|
||||
is_backup: false,
|
||||
enabled: true,
|
||||
created_at: chrono::Utc::now(),
|
||||
updated_at: chrono::Utc::now(),
|
||||
};
|
||||
|
||||
let db = MockDatabase::new(DatabaseBackend::Sqlite)
|
||||
// find_by_id -> returns upstream model
|
||||
.append_query_results(vec![vec![up_model.clone()]])
|
||||
// find targets -> returns the target(s)
|
||||
.append_query_results(vec![vec![target_model.clone()]])
|
||||
.into_connection();
|
||||
|
||||
let svc = UpstreamService::new(Arc::new(db));
|
||||
|
||||
let res = svc
|
||||
.get_upstream(
|
||||
up_id,
|
||||
Some(GetUpstreamOptions {
|
||||
include_targets: true,
|
||||
}),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(res.is_ok());
|
||||
let info = res.expect("Failed to get upstream with targets");
|
||||
assert_eq!(info.id, up_id);
|
||||
assert_eq!(info.upstream_targets.len(), 1);
|
||||
assert_eq!(info.upstream_targets[0].target_host, "127.0.0.1");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn get_upstream_not_found_returns_not_found() {
|
||||
let db = MockDatabase::new(DatabaseBackend::Sqlite)
|
||||
.append_query_results(vec![Vec::<sea_orm::MockRow>::new()])
|
||||
.into_connection();
|
||||
|
||||
let svc = UpstreamService::new(Arc::new(db));
|
||||
|
||||
let res = svc.get_upstream(uuid::Uuid::new_v4(), None, None).await;
|
||||
|
||||
assert!(matches!(res, Err(ServiceError::NotFound(_))));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn get_upstreams_returns_list() {
|
||||
let u1 = upstream::Model {
|
||||
id: uuid::Uuid::new_v4(),
|
||||
name: "u1".to_string(),
|
||||
protocol: "http".to_string(),
|
||||
algorithm: "rr".to_string(),
|
||||
sticky_session: false,
|
||||
created_by: None,
|
||||
created_at: chrono::Utc::now(),
|
||||
updated_at: chrono::Utc::now(),
|
||||
};
|
||||
let u2 = upstream::Model {
|
||||
id: uuid::Uuid::new_v4(),
|
||||
name: "u2".to_string(),
|
||||
protocol: "http".to_string(),
|
||||
algorithm: "rr".to_string(),
|
||||
sticky_session: false,
|
||||
created_by: None,
|
||||
created_at: chrono::Utc::now(),
|
||||
updated_at: chrono::Utc::now(),
|
||||
};
|
||||
|
||||
let db = MockDatabase::new(DatabaseBackend::Sqlite)
|
||||
.append_query_results(vec![vec![u1.clone(), u2.clone()]])
|
||||
.into_connection();
|
||||
|
||||
let svc = UpstreamService::new(Arc::new(db));
|
||||
|
||||
let res = svc.get_upstreams(None, None).await;
|
||||
assert!(res.is_ok());
|
||||
let list = res.expect("Failed to get upstreams");
|
||||
assert_eq!(list.len(), 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn get_upstream_targets_by_upstream_returns_targets() {
|
||||
let up_id = uuid::Uuid::new_v4();
|
||||
|
||||
let t = upstream_target::Model {
|
||||
id: uuid::Uuid::new_v4(),
|
||||
upstream_id: up_id,
|
||||
target_host: "10.0.0.1".to_string(),
|
||||
target_port: 80,
|
||||
weight: 10,
|
||||
is_backup: false,
|
||||
enabled: true,
|
||||
created_at: chrono::Utc::now(),
|
||||
updated_at: chrono::Utc::now(),
|
||||
};
|
||||
|
||||
let db = MockDatabase::new(DatabaseBackend::Sqlite)
|
||||
.append_query_results(vec![vec![t.clone()]])
|
||||
.into_connection();
|
||||
|
||||
let svc = UpstreamService::new(Arc::new(db));
|
||||
|
||||
let res = svc.get_upstream_targets_by_upstream(up_id, None).await;
|
||||
assert!(res.is_ok());
|
||||
let targets = res.expect("Failed to get upstream targets");
|
||||
assert_eq!(targets.len(), 1);
|
||||
assert_eq!(targets[0].target_host, "10.0.0.1");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn update_upstream_success() {
|
||||
let id = uuid::Uuid::new_v4();
|
||||
let existing = upstream::Model {
|
||||
id,
|
||||
name: "old".to_string(),
|
||||
protocol: "http".to_string(),
|
||||
algorithm: "rr".to_string(),
|
||||
sticky_session: false,
|
||||
created_by: None,
|
||||
created_at: chrono::Utc::now(),
|
||||
updated_at: chrono::Utc::now(),
|
||||
};
|
||||
let updated = upstream::Model {
|
||||
id,
|
||||
name: "new".to_string(),
|
||||
protocol: "http".to_string(),
|
||||
algorithm: "rr".to_string(),
|
||||
sticky_session: false,
|
||||
created_by: None,
|
||||
created_at: existing.created_at,
|
||||
updated_at: chrono::Utc::now(),
|
||||
};
|
||||
|
||||
let db = MockDatabase::new(DatabaseBackend::Sqlite)
|
||||
.append_query_results(vec![vec![existing.clone()]]) // find_by_id
|
||||
.append_query_results(vec![vec![updated.clone()]]) // update result
|
||||
.into_connection();
|
||||
|
||||
let svc = UpstreamService::new(Arc::new(db));
|
||||
|
||||
let update_info = crate::services::nginx::info::upstream::UpdateUpstreamInfo {
|
||||
id: None,
|
||||
name: None,
|
||||
protocol: None,
|
||||
algorithm: None,
|
||||
sticky_session: None,
|
||||
created_by: None,
|
||||
created_at: None,
|
||||
updated_at: None,
|
||||
upstream_targets: None,
|
||||
};
|
||||
let res = svc.update_upstream(id, update_info, None).await;
|
||||
assert!(res.is_ok());
|
||||
let got = res.expect("Failed to update upstream");
|
||||
assert_eq!(got.name, "new");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn update_upstream_not_found() {
|
||||
let db = MockDatabase::new(DatabaseBackend::Sqlite)
|
||||
.append_query_results(vec![Vec::<sea_orm::MockRow>::new()])
|
||||
.into_connection();
|
||||
|
||||
let svc = UpstreamService::new(Arc::new(db));
|
||||
|
||||
let res = svc
|
||||
.update_upstream(
|
||||
uuid::Uuid::new_v4(),
|
||||
crate::services::nginx::info::upstream::UpdateUpstreamInfo {
|
||||
id: None,
|
||||
name: None,
|
||||
protocol: None,
|
||||
algorithm: None,
|
||||
sticky_session: None,
|
||||
created_by: None,
|
||||
created_at: None,
|
||||
updated_at: None,
|
||||
upstream_targets: None,
|
||||
},
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(matches!(res, Err(ServiceError::NotFound(_))));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn delete_upstream_success() {
|
||||
let id = uuid::Uuid::new_v4();
|
||||
let existing = upstream::Model {
|
||||
id,
|
||||
name: "todelete".to_string(),
|
||||
protocol: "http".to_string(),
|
||||
algorithm: "rr".to_string(),
|
||||
sticky_session: false,
|
||||
created_by: None,
|
||||
created_at: chrono::Utc::now(),
|
||||
updated_at: chrono::Utc::now(),
|
||||
};
|
||||
|
||||
let db = MockDatabase::new(DatabaseBackend::Sqlite)
|
||||
.append_query_results(vec![vec![existing.clone()]])
|
||||
.append_exec_results(vec![MockExecResult {
|
||||
rows_affected: 1,
|
||||
last_insert_id: 0,
|
||||
}])
|
||||
.into_connection();
|
||||
|
||||
let svc = UpstreamService::new(Arc::new(db));
|
||||
|
||||
let res = svc.delete_upstream(id, None).await;
|
||||
assert!(res.is_ok());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn delete_upstream_not_found() {
|
||||
let db = MockDatabase::new(DatabaseBackend::Sqlite)
|
||||
.append_query_results(vec![Vec::<sea_orm::MockRow>::new()])
|
||||
.into_connection();
|
||||
|
||||
let svc = UpstreamService::new(Arc::new(db));
|
||||
|
||||
let res = svc.delete_upstream(uuid::Uuid::new_v4(), None).await;
|
||||
assert!(matches!(res, Err(ServiceError::NotFound(_))));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn create_upstream_target_success() {
|
||||
let id = uuid::Uuid::new_v4();
|
||||
let upstream_id = uuid::Uuid::new_v4();
|
||||
let created = upstream_target::Model {
|
||||
id,
|
||||
upstream_id,
|
||||
target_host: "1.2.3.4".to_string(),
|
||||
target_port: 8080,
|
||||
weight: 5,
|
||||
is_backup: false,
|
||||
enabled: true,
|
||||
created_at: chrono::Utc::now(),
|
||||
updated_at: chrono::Utc::now(),
|
||||
};
|
||||
|
||||
let db = MockDatabase::new(DatabaseBackend::Sqlite)
|
||||
.append_query_results(vec![vec![created.clone()]])
|
||||
.into_connection();
|
||||
|
||||
let svc = UpstreamService::new(Arc::new(db));
|
||||
|
||||
let create_info = crate::services::nginx::info::upstream_target::UpstreamTargetCreateInfo {
|
||||
target_host: "1.2.3.4".to_string(),
|
||||
target_port: 8080,
|
||||
weight: 5,
|
||||
is_backup: false,
|
||||
enabled: true,
|
||||
upstream_id,
|
||||
};
|
||||
|
||||
let res = svc.create_upstream_target(create_info, None).await;
|
||||
assert!(res.is_ok());
|
||||
let t = res.expect("Failed to create target");
|
||||
assert_eq!(t.target_host, "1.2.3.4");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn update_upstream_target_success() {
|
||||
let id = uuid::Uuid::new_v4();
|
||||
let existing = upstream_target::Model {
|
||||
id,
|
||||
upstream_id: uuid::Uuid::new_v4(),
|
||||
target_host: "old".to_string(),
|
||||
target_port: 80,
|
||||
weight: 1,
|
||||
is_backup: false,
|
||||
enabled: true,
|
||||
created_at: chrono::Utc::now(),
|
||||
updated_at: chrono::Utc::now(),
|
||||
};
|
||||
let updated = upstream_target::Model {
|
||||
id,
|
||||
upstream_id: existing.upstream_id,
|
||||
target_host: "new".to_string(),
|
||||
target_port: 80,
|
||||
weight: 1,
|
||||
is_backup: false,
|
||||
enabled: true,
|
||||
created_at: existing.created_at,
|
||||
updated_at: chrono::Utc::now(),
|
||||
};
|
||||
|
||||
let db = MockDatabase::new(DatabaseBackend::Sqlite)
|
||||
.append_query_results(vec![vec![existing.clone()]])
|
||||
.append_query_results(vec![vec![updated.clone()]])
|
||||
.into_connection();
|
||||
|
||||
let svc = UpstreamService::new(Arc::new(db));
|
||||
|
||||
let update_info = crate::services::nginx::info::upstream_target::UpdateUpstreamTargetInfo {
|
||||
id: None,
|
||||
target_host: None,
|
||||
target_port: None,
|
||||
weight: None,
|
||||
is_backup: None,
|
||||
enabled: None,
|
||||
created_at: None,
|
||||
updated_at: None,
|
||||
upstream_id: None,
|
||||
upstream: None,
|
||||
};
|
||||
let res = svc.update_upstream_target(id, update_info, None).await;
|
||||
assert!(res.is_ok());
|
||||
let got = res.expect("Failed to update target");
|
||||
assert_eq!(got.target_host, "new");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn delete_upstream_target_success() {
|
||||
let id = uuid::Uuid::new_v4();
|
||||
let existing = upstream_target::Model {
|
||||
id,
|
||||
upstream_id: uuid::Uuid::new_v4(),
|
||||
target_host: "del".to_string(),
|
||||
target_port: 80,
|
||||
weight: 1,
|
||||
is_backup: false,
|
||||
enabled: true,
|
||||
created_at: chrono::Utc::now(),
|
||||
updated_at: chrono::Utc::now(),
|
||||
};
|
||||
|
||||
let db = MockDatabase::new(DatabaseBackend::Sqlite)
|
||||
.append_query_results(vec![vec![existing.clone()]])
|
||||
.append_exec_results(vec![MockExecResult {
|
||||
rows_affected: 1,
|
||||
last_insert_id: 0,
|
||||
}])
|
||||
.into_connection();
|
||||
|
||||
let svc = UpstreamService::new(Arc::new(db));
|
||||
let res = svc.delete_upstream_target(id, None).await;
|
||||
assert!(res.is_ok());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user