diff --git a/Cargo.lock b/Cargo.lock index 86a92e0..f583c2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7,7 +7,7 @@ name = "agent_client" version = "0.1.0" dependencies = [ "async-trait", - "mockall", + "mockall 0.13.1", "reqwest", "serde", "serde_json", @@ -2247,7 +2247,21 @@ dependencies = [ "cfg-if", "downcast", "fragile", - "mockall_derive", + "mockall_derive 0.13.1", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f58d964098a5f9c6b63d0798e5372fd04708193510a7af313c22e9f29b7b620b" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "mockall_derive 0.14.0", "predicates", "predicates-tree", ] @@ -2264,6 +2278,18 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "mockall_derive" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca41ce716dda6a9be188b385aa78ee5260fc25cd3802cb2a8afdc6afbe6b6dbf" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 2.0.111", +] + [[package]] name = "native-tls" version = "0.2.14" @@ -2465,17 +2491,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "optfield" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "969ccca8ffc4fb105bd131a228107d5c9dd89d9d627edf3295cbe979156f9712" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.111", -] - [[package]] name = "ordered-float" version = "4.6.0" @@ -5599,8 +5614,8 @@ dependencies = [ "jsonwebtoken", "migration", "mime_guess", + "mockall 0.14.0", "once_cell", - "optfield", "reqwest", "sea-orm", "serde", diff --git a/apps/api/Cargo.toml b/apps/api/Cargo.toml index b71f3a1..2563501 100644 --- a/apps/api/Cargo.toml +++ b/apps/api/Cargo.toml @@ -31,11 +31,12 @@ uuid = { version = "1.19.0", features = ["v4", "serde", "fast-rng"] } tower-http = { version = "0.6.8", features = ["cors"] } reqwest = { version = "^0.12", features = ["json", "multipart", "stream"] } serde_urlencoded = { version = "0.7.1" } -optfield = { version = "0.4.0" } [dev-dependencies] tempfile = "3" axum-test = "18.4.1" +agent_client = { path = "../../public/agent-client", features = ["mockall"] } +mockall = { version = "0.14.0", features = [] } [lints.clippy] unwrap_used = "deny" diff --git a/apps/api/src/routes/api/restricted/nginx/upstream/create_upstream.rs b/apps/api/src/routes/api/restricted/nginx/upstream/create_upstream.rs index 45b79ca..7ce3d9e 100644 --- a/apps/api/src/routes/api/restricted/nginx/upstream/create_upstream.rs +++ b/apps/api/src/routes/api/restricted/nginx/upstream/create_upstream.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use axum::{Json, extract::State, response::Result as AxumResult}; +use sea_orm::TransactionTrait; use crate::{ errors::api_error::ApiError, @@ -104,7 +105,18 @@ pub async fn create_upstream( .collect(), }; - let upstream_info = upstream_service.create_upstream(create_info, None).await?; + let mut tx = state.database_connection.begin().await?; + let upstream_info = upstream_service + .create_upstream(create_info, Some(&mut tx)) + .await?; + + state + .service + .nginx + .regenerate_and_apply_config(state.service.agent_client.clone(), Some(&mut tx)) + .await?; + + tx.commit().await?; Ok(Json(upstream_info.into())) } @@ -126,12 +138,17 @@ mod tests { create_upstream::{CreateUpstreamRequestBody, UpstreamTargetInfo as ReqTarget}, get_upstream_router, }, - services::get_app_service, + services::{agent_client::MockAgentService, get_app_service, get_mock_app_service}, }; fn get_router_with_state(db: DatabaseConnection) -> axum::Router { let program_settings = ProgramSettings::mock(); - let app_service = get_app_service(&Arc::new(db.clone()), &program_settings); + let mut mock = MockAgentService::new(); + mock.expect_validate().returning(|_cfg| Ok(())); + mock.expect_apply().returning(|_cfg| Ok(())); + let mock_agent = Arc::new(mock); + let app_service = + get_mock_app_service(&Arc::new(db.clone()), &program_settings, mock_agent); let state = Arc::new(crate::routes::AppState { database_connection: Arc::new(db), service: Arc::new(app_service), @@ -174,6 +191,10 @@ mod tests { let db = MockDatabase::new(DatabaseBackend::Sqlite) .append_query_results(vec![vec![up_model.clone()]]) .append_query_results(vec![vec![target_model.clone()]]) + // additional query result for regenerate_and_apply_config -> generate_config + // `find_with_related` returns rows of `(upstream, Option)` which + // the mock DB expects as `(Model, Option)` per row. + .append_query_results(vec![vec![(up_model.clone(), Some(target_model.clone()))]]) .into_connection(); let router = get_router_with_state(db.clone()); @@ -218,6 +239,82 @@ mod tests { res.assert_status(StatusCode::UNPROCESSABLE_ENTITY); } + #[tokio::test] + async fn handler_create_upstream_agent_error_returns_internal() { + let up_id = uuid::Uuid::new_v4(); + + let up_model = upstream::Model { + id: up_id, + name: "new_upstream".to_string(), + protocol: "http".to_string(), + algorithm: "round_robin".to_string(), + sticky_session: false, + created_by: Some(uuid::Uuid::new_v4()), + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + }; + + let target_id = uuid::Uuid::new_v4(); + let target_model = upstream_target::Model { + id: target_id, + upstream_id: up_id, + target_host: "127.0.0.1".to_string(), + target_port: 8080, + weight: 1, + is_backup: false, + enabled: true, + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + }; + + // configure mock agent to error on apply + let mut mock = MockAgentService::new(); + mock.expect_validate().returning(|_cfg| Ok(())); + mock.expect_apply().returning(|_cfg| { + Err(crate::services::agent_client::AgentError::ApplicationFailed( + "internal".to_string(), + "Failed to communicate with the agent.".to_string(), + )) + }); + let mock_agent = Arc::new(mock); + + let db = MockDatabase::new(DatabaseBackend::Sqlite) + .append_query_results(vec![vec![up_model.clone()]]) + .append_query_results(vec![vec![target_model.clone()]]) + .append_query_results(vec![vec![(up_model.clone(), Some(target_model.clone()))]]) + .into_connection(); + + let program_settings = ProgramSettings::mock(); + let app_service = get_mock_app_service(&Arc::new(db.clone()), &program_settings, mock_agent); + let state = Arc::new(crate::routes::AppState { + database_connection: Arc::new(db), + service: Arc::new(app_service), + config: Arc::new(program_settings), + }); + + let router = get_upstream_router(state).layer(axum::middleware::from_fn( + crate::middlewares::require_auth::mock::mock_require_auth, + )); + let server = TestServer::new(router).expect("failed to create test server"); + + let payload = CreateUpstreamRequestBody { + name: "new_upstream".to_string(), + protocol: "http".to_string(), + algorithm: None, + sticky_session: None, + upstream_targets: vec![ReqTarget { + host: "127.0.0.1".to_string(), + port: 8080, + weight: None, + is_backup: None, + enabled: None, + }], + }; + + let res = server.post("/upstreams").json(&payload).await; + res.assert_status(axum::http::StatusCode::INTERNAL_SERVER_ERROR); + } + #[tokio::test] async fn handler_create_upstream_unauthenticated_returns_unauthorized() { let db = MockDatabase::new(DatabaseBackend::Sqlite).into_connection(); diff --git a/apps/api/src/routes/api/restricted/nginx/upstream/create_upstream_target.rs b/apps/api/src/routes/api/restricted/nginx/upstream/create_upstream_target.rs index 86cfd6b..c0b31df 100644 --- a/apps/api/src/routes/api/restricted/nginx/upstream/create_upstream_target.rs +++ b/apps/api/src/routes/api/restricted/nginx/upstream/create_upstream_target.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use axum::{Json, extract::State, response::Result as AxumResult}; +use sea_orm::TransactionTrait; use crate::{ errors::api_error::ApiError, @@ -61,10 +62,19 @@ pub async fn add_upstream_target( upstream_id: concrete_payload.upstream_id, }; + let mut tx = state.database_connection.begin().await?; let upstream_info = upstream_service - .create_upstream_target(create_info, None) + .create_upstream_target(create_info, Some(&mut tx)) .await?; + state + .service + .nginx + .regenerate_and_apply_config(state.service.agent_client.clone(), Some(&mut tx)) + .await?; + + tx.commit().await?; + Ok(Json(upstream_info.into())) } @@ -76,7 +86,7 @@ mod tests { use axum_test::TestServer; use sea_orm::{DatabaseBackend, DatabaseConnection, MockDatabase}; - use database::generated::entities::upstream_target; + use database::generated::entities::{upstream, upstream_target}; use crate::{ configs::{FromConfig, ProgramSettings}, @@ -84,12 +94,17 @@ mod tests { routes::api::restricted::nginx::upstream::{ create_upstream_target::CreateUpstreamTargetInfo, get_upstream_router, }, - services::get_app_service, + services::{agent_client::MockAgentService, get_mock_app_service}, }; fn get_router_with_state(db: DatabaseConnection) -> axum::Router { let program_settings = ProgramSettings::mock(); - let app_service = get_app_service(&Arc::new(db.clone()), &program_settings); + let mut mock = MockAgentService::new(); + mock.expect_validate().returning(|_cfg| Ok(())); + mock.expect_apply().returning(|_cfg| Ok(())); + let mock_agent = Arc::new(mock); + let app_service = + get_mock_app_service(&Arc::new(db.clone()), &program_settings, mock_agent); let state = Arc::new(crate::routes::AppState { database_connection: Arc::new(db), service: Arc::new(app_service), @@ -100,6 +115,83 @@ mod tests { )) } + #[tokio::test] + async fn handler_add_upstream_target_agent_error_returns_internal() { + let up_id = uuid::Uuid::new_v4(); + + let target_id = uuid::Uuid::new_v4(); + let target_model = upstream_target::Model { + id: target_id, + upstream_id: up_id, + target_host: "127.0.0.1".to_string(), + target_port: 8080, + weight: 1, + is_backup: false, + enabled: true, + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + }; + + let up_model = upstream::Model { + id: up_id, + name: "test_upstream".to_string(), + protocol: "http".to_string(), + algorithm: "round_robin".to_string(), + sticky_session: false, + created_by: None, + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + }; + + // configure mock agent to return an error on apply + let mut mock = MockAgentService::new(); + mock.expect_validate().returning(|_cfg| Ok(())); + mock.expect_apply().returning(|_cfg| { + Err( + crate::services::agent_client::AgentError::ApplicationFailed( + "internal".to_string(), + "Failed to communicate with the agent.".to_string(), + ), + ) + }); + let mock_agent = Arc::new(mock); + + let db = MockDatabase::new(DatabaseBackend::Sqlite) + .append_query_results(vec![vec![target_model.clone()]]) + .append_query_results(vec![vec![(up_model.clone(), Some(target_model.clone()))]]) + .into_connection(); + + let program_settings = ProgramSettings::mock(); + let app_service = + get_mock_app_service(&Arc::new(db.clone()), &program_settings, mock_agent); + let state = Arc::new(crate::routes::AppState { + database_connection: Arc::new(db), + service: Arc::new(app_service), + config: Arc::new(program_settings), + }); + + let router = get_upstream_router(state).layer(axum::middleware::from_fn( + crate::middlewares::require_auth::mock::mock_require_auth, + )); + let server = TestServer::new(router).expect("failed to create test server"); + + let payload = CreateUpstreamTargetInfo { + upstream_id: up_id, + host: "127.0.0.1".to_string(), + port: 8080, + weight: None, + is_backup: None, + enabled: None, + }; + + let res = server + .post(&format!("/upstreams/{}/targets", up_id)) + .json(&payload) + .await; + + res.assert_status(axum::http::StatusCode::INTERNAL_SERVER_ERROR); + } + #[tokio::test] async fn handler_add_upstream_target_succeeds_returns_created() { let up_id = uuid::Uuid::new_v4(); @@ -117,8 +209,21 @@ mod tests { updated_at: chrono::Utc::now(), }; + let up_model = upstream::Model { + id: up_id, + name: "test_upstream".to_string(), + protocol: "http".to_string(), + algorithm: "round_robin".to_string(), + sticky_session: false, + created_by: None, + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + }; + let db = MockDatabase::new(DatabaseBackend::Sqlite) .append_query_results(vec![vec![target_model.clone()]]) + // additional query result for regenerate_and_apply_config -> generate_config + .append_query_results(vec![vec![(up_model.clone(), Some(target_model.clone()))]]) .into_connection(); let router = get_router_with_state(db.clone()); diff --git a/apps/api/src/routes/api/restricted/nginx/upstream/remove_upstream.rs b/apps/api/src/routes/api/restricted/nginx/upstream/remove_upstream.rs index 419d8f5..25de995 100644 --- a/apps/api/src/routes/api/restricted/nginx/upstream/remove_upstream.rs +++ b/apps/api/src/routes/api/restricted/nginx/upstream/remove_upstream.rs @@ -5,6 +5,7 @@ use axum::{ extract::{Path, State}, response::Result as AxumResult, }; +use sea_orm::TransactionTrait; use uuid::Uuid; use crate::{ @@ -19,7 +20,18 @@ pub async fn remove_upstream( ) -> AxumResult, ApiError> { let upstream_service = &state.service.nginx.get_upstream_service(); - upstream_service.delete_upstream(upstream_id, None).await?; + let mut tx = state.database_connection.begin().await?; + upstream_service + .delete_upstream(upstream_id, Some(&mut tx)) + .await?; + + state + .service + .nginx + .regenerate_and_apply_config(state.service.agent_client.clone(), Some(&mut tx)) + .await?; + + tx.commit().await?; Ok(Json(())) } @@ -32,18 +44,23 @@ mod tests { use axum_test::TestServer; use sea_orm::{DatabaseBackend, DatabaseConnection, MockDatabase, MockExecResult}; - use database::generated::entities::upstream; + use database::generated::entities::{upstream, upstream_target}; use crate::{ configs::{FromConfig, ProgramSettings}, middlewares::require_auth::mock::REQUEST_AUTH_USER_INVALID_HEADER, routes::api::restricted::nginx::upstream::get_upstream_router, - services::get_app_service, + services::{agent_client::MockAgentService, get_mock_app_service}, }; fn get_router_with_state(db: DatabaseConnection) -> axum::Router { let program_settings = ProgramSettings::mock(); - let app_service = get_app_service(&Arc::new(db.clone()), &program_settings); + let mut mock = MockAgentService::new(); + mock.expect_validate().returning(|_cfg| Ok(())); + mock.expect_apply().returning(|_cfg| Ok(())); + let mock_agent = Arc::new(mock); + let app_service = + get_mock_app_service(&Arc::new(db.clone()), &program_settings, mock_agent); let state = Arc::new(crate::routes::AppState { database_connection: Arc::new(db), service: Arc::new(app_service), @@ -69,6 +86,18 @@ mod tests { updated_at: chrono::Utc::now(), }; + let target_model = upstream_target::Model { + id: uuid::Uuid::new_v4(), + upstream_id: up_id, + target_host: "127.0.0.1".to_string(), + target_port: 8080, + weight: 1, + is_backup: false, + enabled: true, + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + }; + let db = MockDatabase::new(DatabaseBackend::Sqlite) .append_query_results(vec![vec![existing.clone()]]) .append_exec_results(vec![ @@ -81,6 +110,8 @@ mod tests { last_insert_id: 0, }, ]) + // additional query result for regenerate_and_apply_config -> generate_config + .append_query_results(vec![vec![(existing.clone(), Some(target_model.clone()))]]) .into_connection(); let router = get_router_with_state(db.clone()); @@ -91,6 +122,78 @@ mod tests { res.assert_status_ok(); } + #[tokio::test] + async fn handler_remove_upstream_agent_error_returns_internal() { + let up_id = uuid::Uuid::new_v4(); + + let existing = upstream::Model { + id: up_id, + name: "todelete".to_string(), + protocol: "http".to_string(), + algorithm: "rr".to_string(), + sticky_session: false, + created_by: None, + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + }; + + let target_model = upstream_target::Model { + id: uuid::Uuid::new_v4(), + upstream_id: up_id, + target_host: "127.0.0.1".to_string(), + target_port: 8080, + weight: 1, + is_backup: false, + enabled: true, + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + }; + + let mut mock = MockAgentService::new(); + mock.expect_validate().returning(|_cfg| Ok(())); + mock.expect_apply().returning(|_cfg| { + Err( + crate::services::agent_client::AgentError::ApplicationFailed( + "internal".to_string(), + "Failed to communicate with the agent.".to_string(), + ), + ) + }); + let mock_agent = Arc::new(mock); + + let db = MockDatabase::new(DatabaseBackend::Sqlite) + .append_query_results(vec![vec![existing.clone()]]) + .append_exec_results(vec![ + MockExecResult { + rows_affected: 1, + last_insert_id: 0, + }, + MockExecResult { + rows_affected: 1, + last_insert_id: 0, + }, + ]) + .append_query_results(vec![vec![(existing.clone(), Some(target_model.clone()))]]) + .into_connection(); + + let program_settings = ProgramSettings::mock(); + let app_service = + get_mock_app_service(&Arc::new(db.clone()), &program_settings, mock_agent); + let state = Arc::new(crate::routes::AppState { + database_connection: Arc::new(db), + service: Arc::new(app_service), + config: Arc::new(program_settings), + }); + + let router = get_upstream_router(state).layer(axum::middleware::from_fn( + crate::middlewares::require_auth::mock::mock_require_auth, + )); + let server = TestServer::new(router).expect("failed to create test server"); + + let res = server.delete(&format!("/upstreams/{}", up_id)).await; + res.assert_status(axum::http::StatusCode::INTERNAL_SERVER_ERROR); + } + #[tokio::test] async fn handler_remove_upstream_unauthenticated_returns_unauthorized() { let db = MockDatabase::new(DatabaseBackend::Sqlite).into_connection(); diff --git a/apps/api/src/routes/api/restricted/nginx/upstream/remove_upstream_target.rs b/apps/api/src/routes/api/restricted/nginx/upstream/remove_upstream_target.rs index 3784352..f6bf323 100644 --- a/apps/api/src/routes/api/restricted/nginx/upstream/remove_upstream_target.rs +++ b/apps/api/src/routes/api/restricted/nginx/upstream/remove_upstream_target.rs @@ -5,6 +5,7 @@ use axum::{ extract::{Path, State}, response::Result as AxumResult, }; +use sea_orm::TransactionTrait; use uuid::Uuid; use crate::{ @@ -19,10 +20,19 @@ pub async fn remove_upstream_target( ) -> AxumResult, ApiError> { let upstream_service = &state.service.nginx.get_upstream_service(); + let mut tx = state.database_connection.begin().await?; upstream_service - .delete_upstream_target(upstream_target_id, None) + .delete_upstream_target(upstream_target_id, Some(&mut tx)) .await?; + state + .service + .nginx + .regenerate_and_apply_config(state.service.agent_client.clone(), Some(&mut tx)) + .await?; + + tx.commit().await?; + Ok(Json(())) } @@ -34,18 +44,23 @@ mod tests { use axum_test::TestServer; use sea_orm::{DatabaseBackend, DatabaseConnection, MockDatabase, MockExecResult}; - use database::generated::entities::upstream_target; + use database::generated::entities::{upstream, upstream_target}; use crate::{ configs::{FromConfig, ProgramSettings}, middlewares::require_auth::mock::REQUEST_AUTH_USER_INVALID_HEADER, routes::api::restricted::nginx::upstream::get_upstream_router, - services::get_app_service, + services::{agent_client::MockAgentService, get_mock_app_service}, }; fn get_router_with_state(db: DatabaseConnection) -> axum::Router { let program_settings = ProgramSettings::mock(); - let app_service = get_app_service(&Arc::new(db.clone()), &program_settings); + let mut mock = MockAgentService::new(); + mock.expect_validate().returning(|_cfg| Ok(())); + mock.expect_apply().returning(|_cfg| Ok(())); + let mock_agent = Arc::new(mock); + let app_service = + get_mock_app_service(&Arc::new(db.clone()), &program_settings, mock_agent); let state = Arc::new(crate::routes::AppState { database_connection: Arc::new(db), service: Arc::new(app_service), @@ -73,6 +88,17 @@ mod tests { }; // first find_by_id, then delete (delete typically doesn't return models) + let up_model = upstream::Model { + id: current_model.upstream_id, + name: "test_upstream".to_string(), + protocol: "http".to_string(), + algorithm: "round_robin".to_string(), + sticky_session: false, + created_by: None, + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + }; + let first: Vec> = vec![vec![current_model.clone()]]; let db = MockDatabase::new(DatabaseBackend::Sqlite) .append_query_results(first) @@ -80,6 +106,8 @@ mod tests { rows_affected: 1, last_insert_id: 0, }]) + // additional query result for regenerate_and_apply_config -> generate_config + .append_query_results(vec![vec![(up_model.clone(), Some(current_model.clone()))]]) .into_connection(); let router = get_router_with_state(db.clone()); @@ -90,6 +118,73 @@ mod tests { res.assert_status_ok(); } + #[tokio::test] + async fn handler_remove_upstream_target_agent_error_returns_internal() { + let ut_id = uuid::Uuid::new_v4(); + + let current_model = upstream_target::Model { + id: ut_id, + upstream_id: uuid::Uuid::new_v4(), + target_host: "127.0.0.1".to_string(), + target_port: 8080, + weight: 1, + is_backup: false, + enabled: true, + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + }; + + let up_model = upstream::Model { + id: current_model.upstream_id, + name: "test_upstream".to_string(), + protocol: "http".to_string(), + algorithm: "round_robin".to_string(), + sticky_session: false, + created_by: None, + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + }; + + let mut mock = MockAgentService::new(); + mock.expect_validate().returning(|_cfg| Ok(())); + mock.expect_apply().returning(|_cfg| { + Err( + crate::services::agent_client::AgentError::ApplicationFailed( + "internal".to_string(), + "Failed to communicate with the agent.".to_string(), + ), + ) + }); + let mock_agent = Arc::new(mock); + + let first: Vec> = vec![vec![current_model.clone()]]; + let db = MockDatabase::new(DatabaseBackend::Sqlite) + .append_query_results(first) + .append_exec_results(vec![MockExecResult { + rows_affected: 1, + last_insert_id: 0, + }]) + .append_query_results(vec![vec![(up_model.clone(), Some(current_model.clone()))]]) + .into_connection(); + + let program_settings = ProgramSettings::mock(); + let app_service = + get_mock_app_service(&Arc::new(db.clone()), &program_settings, mock_agent); + let state = Arc::new(crate::routes::AppState { + database_connection: Arc::new(db), + service: Arc::new(app_service), + config: Arc::new(program_settings), + }); + + let router = get_upstream_router(state).layer(axum::middleware::from_fn( + crate::middlewares::require_auth::mock::mock_require_auth, + )); + let server = TestServer::new(router).expect("failed to create test server"); + + let res = server.delete(&format!("/upstream_targets/{}", ut_id)).await; + res.assert_status(axum::http::StatusCode::INTERNAL_SERVER_ERROR); + } + #[tokio::test] async fn handler_remove_upstream_target_unauthenticated_returns_unauthorized() { let db = MockDatabase::new(DatabaseBackend::Sqlite).into_connection(); diff --git a/apps/api/src/routes/api/restricted/nginx/upstream/update_upstream.rs b/apps/api/src/routes/api/restricted/nginx/upstream/update_upstream.rs index a287671..0f496cb 100644 --- a/apps/api/src/routes/api/restricted/nginx/upstream/update_upstream.rs +++ b/apps/api/src/routes/api/restricted/nginx/upstream/update_upstream.rs @@ -5,6 +5,7 @@ use axum::{ extract::{Path, State}, response::Result as AxumResult, }; +use sea_orm::TransactionTrait; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -55,14 +56,22 @@ pub async fn update_upstream( let upstream_service = &state.service.nginx.get_upstream_service(); let update_info: UpdateUpstreamInfo = payload.into(); + let mut tx = state.database_connection.begin().await?; let r = upstream_service - .update_upstream(upstream_id, update_info, None) + .update_upstream(upstream_id, update_info, Some(&mut tx)) .await?; + state + .service + .nginx + .regenerate_and_apply_config(state.service.agent_client.clone(), Some(&mut tx)) + .await?; + + tx.commit().await?; + Ok(Json(r.into())) } - #[cfg(test)] mod tests { use std::sync::Arc; @@ -71,19 +80,24 @@ mod tests { use axum_test::TestServer; use sea_orm::{DatabaseBackend, DatabaseConnection, MockDatabase}; - use database::generated::entities::upstream; + use database::generated::entities::{upstream, upstream_target}; + use super::UpdateUpstreamRequestBody; use crate::{ configs::{FromConfig, ProgramSettings}, middlewares::require_auth::mock::REQUEST_AUTH_USER_INVALID_HEADER, routes::api::restricted::nginx::upstream::get_upstream_router, - services::get_app_service, + services::{agent_client::MockAgentService, get_mock_app_service}, }; - use super::UpdateUpstreamRequestBody; fn get_router_with_state(db: DatabaseConnection) -> axum::Router { let program_settings = ProgramSettings::mock(); - let app_service = get_app_service(&Arc::new(db.clone()), &program_settings); + let mut mock = MockAgentService::new(); + mock.expect_validate().returning(|_cfg| Ok(())); + mock.expect_apply().returning(|_cfg| Ok(())); + let mock_agent = Arc::new(mock); + let app_service = + get_mock_app_service(&Arc::new(db.clone()), &program_settings, mock_agent); let state = Arc::new(crate::routes::AppState { database_connection: Arc::new(db), service: Arc::new(app_service), @@ -121,11 +135,17 @@ mod tests { }; // first find_by_id, then update returns updated model + let up_model = current_model.clone(); let first: Vec> = vec![vec![current_model.clone()]]; let second: Vec> = vec![vec![updated_model.clone()]]; let db = MockDatabase::new(DatabaseBackend::Sqlite) .append_query_results(first) .append_query_results(second) + // additional query result for regenerate_and_apply_config -> generate_config + .append_query_results(vec![vec![( + up_model.clone(), + Option::::None, + )]]) .into_connection(); let router = get_router_with_state(db.clone()); @@ -153,6 +173,87 @@ mod tests { assert_eq!(body.name, "updated_upstream"); } + #[tokio::test] + async fn handler_update_upstream_agent_error_returns_internal() { + let up_id = uuid::Uuid::new_v4(); + + let current_model = upstream::Model { + id: up_id, + name: "old_upstream".to_string(), + protocol: "http".to_string(), + algorithm: "round_robin".to_string(), + sticky_session: false, + created_by: Some(uuid::Uuid::new_v4()), + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + }; + + let updated_model = upstream::Model { + id: up_id, + name: "updated_upstream".to_string(), + protocol: "http".to_string(), + algorithm: "round_robin".to_string(), + sticky_session: false, + created_by: Some(uuid::Uuid::new_v4()), + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + }; + + let up_model = current_model.clone(); + + let mut mock = MockAgentService::new(); + mock.expect_validate().returning(|_cfg| Ok(())); + mock.expect_apply().returning(|_cfg| { + Err( + crate::services::agent_client::AgentError::ApplicationFailed( + "internal".to_string(), + "Failed to communicate with the agent.".to_string(), + ), + ) + }); + let mock_agent = Arc::new(mock); + + let first: Vec> = vec![vec![current_model.clone()]]; + let second: Vec> = vec![vec![updated_model.clone()]]; + let db = MockDatabase::new(DatabaseBackend::Sqlite) + .append_query_results(first) + .append_query_results(second) + .append_query_results(vec![vec![( + up_model.clone(), + Option::::None, + )]]) + .into_connection(); + + let program_settings = ProgramSettings::mock(); + let app_service = + get_mock_app_service(&Arc::new(db.clone()), &program_settings, mock_agent); + let state = Arc::new(crate::routes::AppState { + database_connection: Arc::new(db), + service: Arc::new(app_service), + config: Arc::new(program_settings), + }); + + let router = get_upstream_router(state).layer(axum::middleware::from_fn( + crate::middlewares::require_auth::mock::mock_require_auth, + )); + let server = TestServer::new(router).expect("failed to create test server"); + + let payload = UpdateUpstreamRequestBody { + name: Some("updated_upstream".to_string()), + protocol: None, + algorithm: None, + sticky_session: None, + upstream_targets: None, + }; + + let res = server + .patch(&format!("/upstreams/{}", up_id)) + .json(&payload) + .await; + + res.assert_status(axum::http::StatusCode::INTERNAL_SERVER_ERROR); + } + #[tokio::test] async fn handler_update_upstream_unauthenticated_returns_unauthorized() { let db = MockDatabase::new(DatabaseBackend::Sqlite).into_connection(); diff --git a/apps/api/src/routes/api/restricted/nginx/upstream/update_upstream_target.rs b/apps/api/src/routes/api/restricted/nginx/upstream/update_upstream_target.rs index 814ce69..0750475 100644 --- a/apps/api/src/routes/api/restricted/nginx/upstream/update_upstream_target.rs +++ b/apps/api/src/routes/api/restricted/nginx/upstream/update_upstream_target.rs @@ -5,6 +5,7 @@ use axum::{ extract::{Path, State}, response::Result as AxumResult, }; +use sea_orm::TransactionTrait; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -48,10 +49,19 @@ pub async fn update_upstream_target( let upstream_service = &state.service.nginx.get_upstream_service(); let update_info: UpdateUpstreamTargetInfo = payload.into(); + let mut tx = state.database_connection.begin().await?; let r = upstream_service - .update_upstream_target(upstream_target_id, update_info, None) + .update_upstream_target(upstream_target_id, update_info, Some(&mut tx)) .await?; + state + .service + .nginx + .regenerate_and_apply_config(state.service.agent_client.clone(), Some(&mut tx)) + .await?; + + tx.commit().await?; + Ok(Json(r.into())) } @@ -64,18 +74,23 @@ mod tests { use axum_test::TestServer; use sea_orm::{DatabaseBackend, DatabaseConnection, MockDatabase}; - use database::generated::entities::upstream_target; + use database::generated::entities::{upstream, upstream_target}; use super::UpdateUpstreamTargetRequestBody; use crate::{ configs::{FromConfig, ProgramSettings}, middlewares::require_auth::mock::REQUEST_AUTH_USER_INVALID_HEADER, - services::get_app_service, + services::{agent_client::MockAgentService, get_mock_app_service}, }; fn get_router_with_state(db: DatabaseConnection) -> axum::Router { let program_settings = ProgramSettings::mock(); - let app_service = get_app_service(&Arc::new(db.clone()), &program_settings); + let mut mock = MockAgentService::new(); + mock.expect_validate().returning(|_cfg| Ok(())); + mock.expect_apply().returning(|_cfg| Ok(())); + let mock_agent = Arc::new(mock); + let app_service = + get_mock_app_service(&Arc::new(db.clone()), &program_settings, mock_agent); let state = Arc::new(crate::routes::AppState { database_connection: Arc::new(db), service: Arc::new(app_service), @@ -121,12 +136,27 @@ mod tests { updated_at: chrono::Utc::now(), }; + let up_model = upstream::Model { + id: current_model.upstream_id, + name: "test_upstream".to_string(), + protocol: "http".to_string(), + algorithm: "round_robin".to_string(), + sticky_session: false, + created_by: None, + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + }; + let first: Vec> = vec![vec![current_model.clone()]]; let second: Vec> = vec![vec![updated_model.clone()]]; + // additional query result for regenerate_and_apply_config -> generate_config + let third: Vec)>> = + vec![vec![(up_model.clone(), Some(updated_model.clone()))]]; let db = MockDatabase::new(DatabaseBackend::Sqlite) .append_query_results(first) .append_query_results(second) + .append_query_results(third) .into_connection(); let router = get_router_with_state(db.clone()); @@ -203,4 +233,103 @@ mod tests { res.assert_status(StatusCode::NOT_FOUND); } + + #[tokio::test] + async fn handler_update_upstream_target_agent_error_returns_internal() { + let target_id = uuid::Uuid::new_v4(); + + let current_model = upstream_target::Model { + id: target_id, + upstream_id: uuid::Uuid::new_v4(), + target_host: "127.0.0.1".to_string(), + target_port: 8080, + weight: 1, + is_backup: false, + enabled: true, + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + }; + + let updated_model = upstream_target::Model { + id: target_id, + upstream_id: current_model.upstream_id, + target_host: "127.0.0.1".to_string(), + target_port: 8081, + weight: 2, + is_backup: false, + enabled: false, + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + }; + + let up_model = upstream::Model { + id: current_model.upstream_id, + name: "test_upstream".to_string(), + protocol: "http".to_string(), + algorithm: "round_robin".to_string(), + sticky_session: false, + created_by: None, + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + }; + + let mut mock = MockAgentService::new(); + mock.expect_validate().returning(|_cfg| Ok(())); + mock.expect_apply().returning(|_cfg| { + Err( + crate::services::agent_client::AgentError::ApplicationFailed( + "internal".to_string(), + "Failed to communicate with the agent.".to_string(), + ), + ) + }); + let mock_agent = Arc::new(mock); + + let first: Vec> = vec![vec![current_model.clone()]]; + let second: Vec> = vec![vec![updated_model.clone()]]; + let third: Vec)>> = + vec![vec![(up_model.clone(), Some(updated_model.clone()))]]; + + let db = MockDatabase::new(DatabaseBackend::Sqlite) + .append_query_results(first) + .append_query_results(second) + .append_query_results(third) + .into_connection(); + + let program_settings = ProgramSettings::mock(); + let app_service = + get_mock_app_service(&Arc::new(db.clone()), &program_settings, mock_agent); + let state = Arc::new(crate::routes::AppState { + database_connection: Arc::new(db), + service: Arc::new(app_service), + config: Arc::new(program_settings), + }); + + let router = axum::Router::new() + .route( + "/upstream_targets/{upstream_target_id}", + axum::routing::patch(crate::routes::api::restricted::nginx::upstream::update_upstream_target::update_upstream_target), + ) + .with_state(state) + .layer(axum::middleware::from_fn( + crate::middlewares::require_auth::mock::mock_require_auth, + )); + + let server = TestServer::new(router).expect("failed to create test server"); + + let payload = UpdateUpstreamTargetRequestBody { + host: None, + port: Some(8081), + enabled: Some(false), + is_backup: None, + weight: Some(2), + }; + + let res = server + .patch(&format!("/upstream_targets/{}", target_id)) + .json(&payload) + .await; + + res.assert_status(axum::http::StatusCode::INTERNAL_SERVER_ERROR); + } } diff --git a/apps/api/src/services.rs b/apps/api/src/services.rs index aa61504..66fb373 100644 --- a/apps/api/src/services.rs +++ b/apps/api/src/services.rs @@ -8,10 +8,13 @@ use std::sync::Arc; use ::agent_client::apis::configuration::Configuration; +#[cfg(test)] +use crate::services::agent_client::MockAgentService; use crate::{ configs::ProgramSettings, routes::{self, AuthState}, services::{ + agent_client::{AgentService, AgentServiceImpl}, auth::{ authentication::{AuthenticationServiceImpl, strategies::password::PasswordStrategy}, user::{UserService, UserServiceImpl}, @@ -32,7 +35,7 @@ pub struct AppService { #[allow(dead_code)] pub nginx: ServiceState, #[allow(dead_code)] - pub agent_client: ServiceState, + pub agent_client: ServiceState, } pub fn get_app_service( @@ -52,8 +55,31 @@ pub fn get_app_service( }, user: Arc::new(UserServiceImpl::new(db_connection.clone())), nginx: Arc::new(NginxService::new(db_connection.clone())), - agent_client: Arc::new(agent_client::AgentService::new(Configuration::from( + agent_client: Arc::new(AgentServiceImpl::new(Configuration::from( settings.agent.clone(), ))), } } + +#[cfg(test)] +pub fn get_mock_app_service( + db_connection: &Arc, + settings: &ProgramSettings, + mock_agent: Arc, +) -> AppService { + AppService { + server_state: Arc::new(ServerStateService::new(db_connection.clone())), + settings: Arc::new(SettingsService::new(db_connection.clone())), + auth_state: routes::AuthState { + strategy: routes::AuthStrategy { + password: Arc::new(PasswordStrategy::new(db_connection.clone())), + }, + authentication: Arc::new(AuthenticationServiceImpl::new( + settings.auth.jwt_secret.clone(), + )), + }, + user: Arc::new(UserServiceImpl::new(db_connection.clone())), + nginx: Arc::new(NginxService::new(db_connection.clone())), + agent_client: mock_agent, + } +} diff --git a/apps/api/src/services/agent_client.rs b/apps/api/src/services/agent_client.rs index 8aa7d95..784d6ca 100644 --- a/apps/api/src/services/agent_client.rs +++ b/apps/api/src/services/agent_client.rs @@ -1,14 +1,104 @@ use std::sync::Arc; -use agent_client::apis::{ApiClient, configuration::Configuration}; +use agent_client::{ + apis::{ + Api, ApiClient, Error as ApiError, ResponseContent, + configuration::Configuration, + nginx_agent_api::{ValidateAndReloadParams, ValidateParams, WriteConfigParams}, + }, + models::{ValidateAndReloadBody, ValidateBody, WriteConfigBody}, +}; use tracing::warn; -use crate::configs::agent::AgentSettings; +use crate::{configs::agent::AgentSettings, errors::service_error::ServiceError}; -pub struct AgentService { +#[derive(Debug)] +pub enum AgentError { + // (internal messages, user-facing messages) + #[allow(dead_code)] + ValidationFailed(String, String), + // (internal messages, user-facing messages) + ApplicationFailed(String, String), +} + +impl From for ServiceError { + fn from(err: AgentError) -> Self { + match err { + AgentError::ValidationFailed(_internal, user) => ServiceError::InternalError(user), + AgentError::ApplicationFailed(_internal, user) => ServiceError::InternalError(user), + } + } +} + +impl From> for AgentError { + fn from(err: ResponseContent) -> Self { + let ResponseContent { + status, + content, + entity, + } = err; + { + let entity_str = entity + .map(|e| format!("{:?}", e)) + .unwrap_or_else(|| "".to_string()); + AgentError::ApplicationFailed( + format!( + "Agent responded with error status {}: {}, entity: {}", + status, content, entity_str + ), + "Agent reported an error during operation.".to_string(), + ) + } + } +} + +impl From> for AgentError { + fn from(err: ApiError) -> Self { + match err { + ApiError::ResponseError(resp) => AgentError::from(resp), + ApiError::Io(err) => AgentError::ApplicationFailed( + format!("IO error during agent communication: {}", err), + "Failed to communicate with the agent.".to_string(), + ), + ApiError::Reqwest(err) => AgentError::ApplicationFailed( + format!("Reqwest error during agent communication: {}", err), + "Failed to communicate with the agent.".to_string(), + ), + ApiError::Serde(err) => AgentError::ApplicationFailed( + format!("Serialization error during agent communication: {}", err), + "Failed to communicate with the agent.".to_string(), + ), + } + } +} + +#[cfg_attr(test, mockall::automock)] +#[async_trait::async_trait] +pub trait AgentService: Send + Sync { + #[allow(dead_code)] + fn get_client(&self) -> Arc; + + // TODO: improve error handling and reporting, error reasons + // validate configurations that has been created/updated before the given timestamp + #[allow(dead_code)] + async fn validate(&self, config: &str) -> Result<(), AgentError>; + // validate and apply configurations that has been created/updated before the given timestamp + async fn apply(&self, config: &str) -> Result<(), AgentError>; +} + +pub struct AgentServiceImpl { client: Arc, } +impl AgentServiceImpl { + pub fn new(config: impl Into>) -> Self { + let client = ApiClient::new(config.into()); + AgentServiceImpl { + client: Arc::new(client), + } + } +} + impl From for Configuration { fn from(settings: AgentSettings) -> Self { let mut config = Configuration::default(); @@ -27,17 +117,73 @@ impl From for Configuration { } } -impl AgentService { - pub fn new(config: impl Into>) -> Self { - let client = ApiClient::new(config.into()); - AgentService { - client: Arc::new(client), - } +#[async_trait::async_trait] +impl AgentService for AgentServiceImpl { + fn get_client(&self) -> Arc { + Arc::clone(&self.client) } - #[allow(dead_code)] - pub fn get_client(&self) -> Arc { - Arc::clone(&self.client) + async fn validate(&self, config: &str) -> Result<(), AgentError> { + let timestamp = chrono::Utc::now().timestamp_millis(); + let name = Self::get_config_name(true); + self._validate(&name, timestamp, config).await + } + + async fn apply(&self, config: &str) -> Result<(), AgentError> { + let timestamp = chrono::Utc::now().timestamp_millis(); + let name = Self::get_config_name(false); + self._validate(&name, timestamp, config).await?; + self._apply(&name, timestamp).await + } +} + +impl AgentServiceImpl { + fn get_config_name(is_validate_only: bool) -> String { + format!( + "nginx_config_{}{}", + if is_validate_only { + "validation_" + } else { + "application_" + }, + uuid::Uuid::new_v4() + ) + } + + async fn _validate(&self, name: &str, timestamp: i64, config: &str) -> Result<(), AgentError> { + let api = self.client.nginx_agent_api(); + + api.write_config(WriteConfigParams { + write_config_body: WriteConfigBody { + config_name: name.to_string(), + content: config.to_string(), + timestamp, + }, + }) + .await?; + + api.validate(ValidateParams { + validate_body: ValidateBody { + config_name: name.to_string(), + timestamp, + }, + }) + .await?; + + Ok(()) + } + + async fn _apply(&self, name: &str, timestamp: i64) -> Result<(), AgentError> { + let api = self.client.nginx_agent_api(); + api.validate_and_reload(ValidateAndReloadParams { + validate_and_reload_body: ValidateAndReloadBody { + config_name: name.to_string(), + timestamp, + }, + }) + .await?; + + Ok(()) } } @@ -56,7 +202,7 @@ mod tests { #[test] fn test_agent_service_creation() { let config = Configuration::default(); - let service = AgentService::new(config); + let service = AgentServiceImpl::new(config); let client = service.get_client(); assert!(Arc::ptr_eq(&client, &service.client)); } diff --git a/apps/api/src/services/nginx.rs b/apps/api/src/services/nginx.rs index bb16164..fd56b07 100644 --- a/apps/api/src/services/nginx.rs +++ b/apps/api/src/services/nginx.rs @@ -6,11 +6,21 @@ pub mod upstream; use std::sync::Arc; -use sea_orm::DatabaseConnection; +use sea_orm::{DatabaseConnection, DatabaseTransaction}; -use crate::services::nginx::upstream::{UpstreamService, UpstreamServiceImpl}; +use crate::{ + errors::service_error::ServiceError, + services::{ + agent_client::AgentService, + nginx::{ + builder::{NginxConfigBuilder, NginxConfigProvider}, + upstream::{UpstreamService, UpstreamServiceImpl}, + }, + }, +}; pub struct NginxService { + #[allow(dead_code)] connection: Arc, // upstream_service: Arc, @@ -28,4 +38,49 @@ impl NginxService { pub fn get_upstream_service(&self) -> Arc { self.upstream_service.clone() } + + #[allow(dead_code)] + pub async fn validate_config( + &self, + agent: Arc, + config: &str, + ) -> Result<(), ServiceError> { + agent.validate(config).await?; + + Ok(()) + } + + pub async fn apply_changes( + &self, + agent: Arc, + config: &str, + ) -> Result<(), ServiceError> { + agent.apply(config).await?; + + Ok(()) + } + + pub async fn generate_config( + &self, + tx: Option<&mut DatabaseTransaction>, + ) -> Result { + let mut builder = NginxConfigBuilder::default(); + self.upstream_service + .generate_config(&mut builder, tx) + .await?; + + Ok(builder.to_nginx_config(None)) + } + + pub async fn regenerate_and_apply_config( + &self, + agent: Arc, + tx: Option<&mut DatabaseTransaction>, + ) -> Result<(), ServiceError> { + let config = self.generate_config(tx).await?; + + self.apply_changes(agent, &config).await?; + + Ok(()) + } } diff --git a/apps/api/src/services/nginx/builder.rs b/apps/api/src/services/nginx/builder.rs index 972d0b7..c042d05 100644 --- a/apps/api/src/services/nginx/builder.rs +++ b/apps/api/src/services/nginx/builder.rs @@ -6,17 +6,12 @@ pub trait NginxConfigProvider { fn to_nginx_config(&self, indent: Option) -> String; } +#[derive(Default)] pub struct NginxConfigBuilder { upstreams: Vec, } impl NginxConfigBuilder { - pub fn new() -> Self { - Self { - upstreams: Vec::new(), - } - } - pub fn add_upstream(&mut self, upstream: UpstreamInfo) { self.upstreams.push(upstream); } diff --git a/apps/api/src/services/nginx/info/upstream.rs b/apps/api/src/services/nginx/info/upstream.rs index 0353d0f..cd0f078 100644 --- a/apps/api/src/services/nginx/info/upstream.rs +++ b/apps/api/src/services/nginx/info/upstream.rs @@ -1,5 +1,4 @@ use chrono::{DateTime, Utc}; -use optfield::optfield; use database::generated::entities::{upstream, upstream_target}; use sea_orm::ActiveValue::{Set, Unchanged}; diff --git a/apps/api/src/services/nginx/upstream.rs b/apps/api/src/services/nginx/upstream.rs index df12161..088244f 100644 --- a/apps/api/src/services/nginx/upstream.rs +++ b/apps/api/src/services/nginx/upstream.rs @@ -10,9 +10,14 @@ use database::generated::entities::{upstream, upstream_target}; use crate::{ errors::service_error::ServiceError, helpers::database::PaginationFilter, - services::nginx::info::{ - upstream::{UpdateUpstreamInfo, UpstreamCreateInfo, UpstreamInfo}, - upstream_target::{UpdateUpstreamTargetInfo, UpstreamTargetCreateInfo, UpstreamTargetInfo}, + services::nginx::{ + builder::NginxConfigBuilder, + info::{ + upstream::{UpdateUpstreamInfo, UpstreamCreateInfo, UpstreamInfo}, + upstream_target::{ + UpdateUpstreamTargetInfo, UpstreamTargetCreateInfo, UpstreamTargetInfo, + }, + }, }, with_conn, }; @@ -57,6 +62,7 @@ pub trait UpstreamService: Send + Sync { options: Option, tx: Option<&mut DatabaseTransaction>, ) -> Result; + #[allow(dead_code)] async fn get_upstream_targets_by_upstream( &self, upstream_id: uuid::Uuid, @@ -73,6 +79,11 @@ pub trait UpstreamService: Send + Sync { target_id: uuid::Uuid, tx: Option<&mut DatabaseTransaction>, ) -> Result<(), ServiceError>; + async fn generate_config( + &self, + builder: &mut NginxConfigBuilder, + tx: Option<&mut DatabaseTransaction>, + ) -> Result<(), ServiceError>; } pub struct UpstreamServiceImpl { @@ -387,6 +398,26 @@ impl UpstreamService for UpstreamServiceImpl { Ok(()) }) } + + async fn generate_config( + &self, + builder: &mut NginxConfigBuilder, + tx: Option<&mut DatabaseTransaction>, + ) -> Result<(), ServiceError> { + // get all upstreams and their targets + let upstreams = with_conn!(&*self.connection, tx, conn, { + upstream::Entity::find() + .find_with_related(upstream_target::Entity) + .all(*conn) + .await? + }); + let upstreams_info = upstreams + .into_iter() + .map(|(up_model, target_models)| (up_model, target_models).into()) + .collect::>(); + builder.add_upstreams(upstreams_info); + Ok(()) + } } #[cfg(test)] @@ -649,10 +680,16 @@ mod tests { let db = MockDatabase::new(DatabaseBackend::Sqlite) .append_query_results(vec![vec![existing.clone()]]) - .append_exec_results(vec![MockExecResult { - rows_affected: 1, - last_insert_id: 0, - }]) + .append_exec_results(vec![ + MockExecResult { + rows_affected: 1, + last_insert_id: 0, + }, + MockExecResult { + rows_affected: 1, + last_insert_id: 0, + }, + ]) .into_connection(); let svc = UpstreamServiceImpl::new(Arc::new(db));