feat(tags): enhance tag service integration with transaction service and add CRUD operations for transaction-tag associations
This commit is contained in:
@@ -7,6 +7,7 @@ use crate::errors::CommandResult;
|
||||
pub mod accounts;
|
||||
pub mod balance_calculator;
|
||||
pub mod exchange_rate;
|
||||
pub mod scheduled;
|
||||
pub mod settings;
|
||||
pub mod tags;
|
||||
pub mod transactions;
|
||||
@@ -32,9 +33,6 @@ pub struct ServiceFactoryResult {
|
||||
impl ServiceFactory {
|
||||
pub async fn create_services(db: DatabaseConnection) -> ServiceFactoryResult {
|
||||
let account_service = Arc::new(accounts::service::AccountServiceImpl::new(db.clone()));
|
||||
let transaction_service = Arc::new(transactions::service::TransactionServiceImpl::new(
|
||||
db.clone(),
|
||||
));
|
||||
let settings_service = Arc::new(settings::service::SettingsServiceImpl::new(db.clone()));
|
||||
let exchange_rate_service = Arc::new(
|
||||
exchange_rate::service::ExchangeRateServiceImpl::new(
|
||||
@@ -43,7 +41,13 @@ impl ServiceFactory {
|
||||
)
|
||||
.await,
|
||||
);
|
||||
let tag_service = Arc::new(tags::service::TagServiceImpl::new(db.clone()));
|
||||
let tag_service: Arc<dyn tags::service::TagService> =
|
||||
Arc::new(tags::service::TagServiceImpl::new(db.clone()));
|
||||
let transaction_service = Arc::new(transactions::service::TransactionServiceImpl::new(
|
||||
db.clone(),
|
||||
tag_service.clone(),
|
||||
));
|
||||
|
||||
ServiceFactoryResult {
|
||||
account_service,
|
||||
transaction_service,
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use chrono::Utc;
|
||||
use sea_orm::{DatabaseConnection, Set, entity::*, query::*};
|
||||
use sea_orm::{DatabaseConnection, DatabaseTransaction, Set, entity::*, query::*};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::db::connection::ConnectionSource;
|
||||
use crate::db::entities::{prelude::*, tags};
|
||||
use crate::db::entities::{prelude::*, tags, transaction_tags};
|
||||
use crate::errors::CommandResult;
|
||||
use crate::services::ServiceTrait;
|
||||
use crate::services::tags::types::inputs::{CreateTagInput, UpdateTagInput};
|
||||
@@ -13,6 +15,7 @@ pub type TagModel = tags::Model;
|
||||
|
||||
#[async_trait]
|
||||
pub trait TagService: ServiceTrait + Send + Sync {
|
||||
// Tag CRUD operations
|
||||
async fn create_tag(
|
||||
&self,
|
||||
input: CreateTagInput,
|
||||
@@ -30,6 +33,45 @@ pub trait TagService: ServiceTrait + Send + Sync {
|
||||
tx: Option<&ConnectionSource<'_>>,
|
||||
) -> CommandResult<TagModel>;
|
||||
async fn delete_tag(&self, id: String, tx: Option<&ConnectionSource<'_>>) -> CommandResult<()>;
|
||||
|
||||
// Transaction tag association operations
|
||||
async fn assign_tags_to_transaction(
|
||||
&self,
|
||||
transaction_id: String,
|
||||
tag_ids: Vec<String>,
|
||||
tx: Option<&ConnectionSource<'_>>,
|
||||
) -> CommandResult<()>;
|
||||
|
||||
async fn get_transaction_tags(
|
||||
&self,
|
||||
transaction_id: String,
|
||||
tx: Option<&ConnectionSource<'_>>,
|
||||
) -> CommandResult<Vec<String>>;
|
||||
|
||||
async fn get_transactions_tags_batch(
|
||||
&self,
|
||||
transaction_ids: Vec<String>,
|
||||
tx: Option<&ConnectionSource<'_>>,
|
||||
) -> CommandResult<HashMap<String, Vec<String>>>;
|
||||
|
||||
async fn remove_all_tags_from_transaction(
|
||||
&self,
|
||||
transaction_id: String,
|
||||
tx: Option<&ConnectionSource<'_>>,
|
||||
) -> CommandResult<()>;
|
||||
|
||||
async fn get_transaction_ids_by_tags(
|
||||
&self,
|
||||
tag_ids: Vec<String>,
|
||||
match_all: bool,
|
||||
tx: Option<&ConnectionSource<'_>>,
|
||||
) -> CommandResult<Vec<String>>;
|
||||
|
||||
async fn validate_tag_ids_exist(
|
||||
&self,
|
||||
tag_ids: Vec<String>,
|
||||
tx: Option<&ConnectionSource<'_>>,
|
||||
) -> CommandResult<bool>;
|
||||
}
|
||||
|
||||
pub struct TagServiceImpl {
|
||||
@@ -40,6 +82,28 @@ impl TagServiceImpl {
|
||||
pub fn new(db: DatabaseConnection) -> Self {
|
||||
Self { db }
|
||||
}
|
||||
|
||||
/// Get a transaction reference (existing or new)
|
||||
async fn get_transaction_to_use<'a>(
|
||||
&'a self,
|
||||
tx: Option<&'a ConnectionSource<'a>>,
|
||||
) -> CommandResult<TransactionRef<'a>> {
|
||||
match tx {
|
||||
Some(ConnectionSource::Transaction(txn)) => Ok(TransactionRef::Existing(txn)),
|
||||
_ => {
|
||||
let txn = self.db.begin().await?;
|
||||
Ok(TransactionRef::New(txn))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get executor from transaction reference
|
||||
fn get_executor<'a>(tx_ref: &'a TransactionRef<'a>) -> &'a DatabaseTransaction {
|
||||
match tx_ref {
|
||||
TransactionRef::Existing(txn) => txn,
|
||||
TransactionRef::New(txn) => txn,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ServiceTrait for TagServiceImpl {}
|
||||
@@ -148,4 +212,246 @@ impl TagService for TagServiceImpl {
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Transaction tag association operations
|
||||
|
||||
async fn assign_tags_to_transaction(
|
||||
&self,
|
||||
transaction_id: String,
|
||||
tag_ids: Vec<String>,
|
||||
tx: Option<&ConnectionSource<'_>>,
|
||||
) -> CommandResult<()> {
|
||||
let tx_ref = self.get_transaction_to_use(tx).await?;
|
||||
let executor = Self::get_executor(&tx_ref);
|
||||
|
||||
// Insert all tag associations
|
||||
for tag_id in tag_ids {
|
||||
let transaction_tag = transaction_tags::ActiveModel {
|
||||
transaction_id: Set(transaction_id.clone()),
|
||||
tag_id: Set(tag_id),
|
||||
};
|
||||
// Use insert or ignore to handle potential duplicates gracefully
|
||||
let _ = transaction_tag.insert(executor).await;
|
||||
}
|
||||
|
||||
// Only commit if we created a new transaction
|
||||
if let TransactionRef::New(txn) = tx_ref {
|
||||
txn.commit().await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_transaction_tags(
|
||||
&self,
|
||||
transaction_id: String,
|
||||
tx: Option<&ConnectionSource<'_>>,
|
||||
) -> CommandResult<Vec<String>> {
|
||||
match tx {
|
||||
Some(conn_source) => {
|
||||
let tags = TransactionTags::find()
|
||||
.filter(transaction_tags::Column::TransactionId.eq(&transaction_id))
|
||||
.all(conn_source)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|tt| tt.tag_id)
|
||||
.collect();
|
||||
Ok(tags)
|
||||
}
|
||||
None => {
|
||||
let txn = self.db.begin().await?;
|
||||
let conn_source = ConnectionSource::Transaction(&txn);
|
||||
let tags = TransactionTags::find()
|
||||
.filter(transaction_tags::Column::TransactionId.eq(&transaction_id))
|
||||
.all(&conn_source)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|tt| tt.tag_id)
|
||||
.collect();
|
||||
txn.commit().await?;
|
||||
Ok(tags)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_transactions_tags_batch(
|
||||
&self,
|
||||
transaction_ids: Vec<String>,
|
||||
tx: Option<&ConnectionSource<'_>>,
|
||||
) -> CommandResult<HashMap<String, Vec<String>>> {
|
||||
if transaction_ids.is_empty() {
|
||||
return Ok(HashMap::new());
|
||||
}
|
||||
|
||||
match tx {
|
||||
Some(conn_source) => {
|
||||
let tags = TransactionTags::find()
|
||||
.filter(transaction_tags::Column::TransactionId.is_in(&transaction_ids))
|
||||
.all(conn_source)
|
||||
.await?;
|
||||
|
||||
let mut result: HashMap<String, Vec<String>> = HashMap::new();
|
||||
for tag in tags {
|
||||
result
|
||||
.entry(tag.transaction_id)
|
||||
.or_default()
|
||||
.push(tag.tag_id);
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
None => {
|
||||
let txn = self.db.begin().await?;
|
||||
let conn_source = ConnectionSource::Transaction(&txn);
|
||||
let tags = TransactionTags::find()
|
||||
.filter(transaction_tags::Column::TransactionId.is_in(&transaction_ids))
|
||||
.all(&conn_source)
|
||||
.await?;
|
||||
|
||||
let mut result: HashMap<String, Vec<String>> = HashMap::new();
|
||||
for tag in tags {
|
||||
result
|
||||
.entry(tag.transaction_id)
|
||||
.or_default()
|
||||
.push(tag.tag_id);
|
||||
}
|
||||
txn.commit().await?;
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn remove_all_tags_from_transaction(
|
||||
&self,
|
||||
transaction_id: String,
|
||||
tx: Option<&ConnectionSource<'_>>,
|
||||
) -> CommandResult<()> {
|
||||
let tx_ref = self.get_transaction_to_use(tx).await?;
|
||||
let executor = Self::get_executor(&tx_ref);
|
||||
|
||||
TransactionTags::delete_many()
|
||||
.filter(transaction_tags::Column::TransactionId.eq(&transaction_id))
|
||||
.exec(executor)
|
||||
.await?;
|
||||
|
||||
// Only commit if we created a new transaction
|
||||
if let TransactionRef::New(txn) = tx_ref {
|
||||
txn.commit().await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_transaction_ids_by_tags(
|
||||
&self,
|
||||
tag_ids: Vec<String>,
|
||||
match_all: bool,
|
||||
tx: Option<&ConnectionSource<'_>>,
|
||||
) -> CommandResult<Vec<String>> {
|
||||
if tag_ids.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
match tx {
|
||||
Some(ConnectionSource::Transaction(txn)) => {
|
||||
self.get_transaction_ids_by_tags_internal(&tag_ids, match_all, txn)
|
||||
.await
|
||||
}
|
||||
_ => {
|
||||
let txn = self.db.begin().await?;
|
||||
let result = self
|
||||
.get_transaction_ids_by_tags_internal(&tag_ids, match_all, &txn)
|
||||
.await?;
|
||||
txn.commit().await?;
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn validate_tag_ids_exist(
|
||||
&self,
|
||||
tag_ids: Vec<String>,
|
||||
tx: Option<&ConnectionSource<'_>>,
|
||||
) -> CommandResult<bool> {
|
||||
if tag_ids.is_empty() {
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
let executor = match tx {
|
||||
Some(t) => t,
|
||||
None => &ConnectionSource::Connection(&self.db),
|
||||
};
|
||||
|
||||
let count = Tags::find()
|
||||
.filter(tags::Column::Id.is_in(tag_ids.clone()))
|
||||
.filter(tags::Column::IsDeleted.eq(false))
|
||||
.count(executor)
|
||||
.await?;
|
||||
|
||||
Ok(count == tag_ids.len() as u64)
|
||||
}
|
||||
}
|
||||
|
||||
enum TransactionRef<'a> {
|
||||
Existing(&'a DatabaseTransaction),
|
||||
New(DatabaseTransaction),
|
||||
}
|
||||
|
||||
impl TagServiceImpl {
|
||||
/// Internal implementation for getting transaction IDs by tags
|
||||
async fn get_transaction_ids_by_tags_internal(
|
||||
&self,
|
||||
tag_ids: &[String],
|
||||
match_all: bool,
|
||||
txn: &DatabaseTransaction,
|
||||
) -> CommandResult<Vec<String>> {
|
||||
if match_all {
|
||||
// Match all tags (AND logic) - transaction must have all specified tags
|
||||
// Start with transactions that have the first tag
|
||||
let mut transaction_ids: Option<Vec<String>> = None;
|
||||
|
||||
for tag_id in tag_ids {
|
||||
let ids: Vec<String> = TransactionTags::find()
|
||||
.select_only()
|
||||
.column(transaction_tags::Column::TransactionId)
|
||||
.filter(transaction_tags::Column::TagId.eq(tag_id))
|
||||
.into_tuple()
|
||||
.all(txn)
|
||||
.await?;
|
||||
|
||||
match transaction_ids {
|
||||
None => transaction_ids = Some(ids),
|
||||
Some(existing) => {
|
||||
// Intersect with existing IDs
|
||||
let ids_set: std::collections::HashSet<String> =
|
||||
ids.into_iter().collect();
|
||||
transaction_ids = Some(
|
||||
existing
|
||||
.into_iter()
|
||||
.filter(|id| ids_set.contains(id))
|
||||
.collect(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Early exit if no transactions match
|
||||
if transaction_ids.as_ref().map_or(true, |v| v.is_empty()) {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(transaction_ids.unwrap_or_default())
|
||||
} else {
|
||||
// Match any tag (OR logic) - transaction must have at least one specified tag
|
||||
let ids: Vec<String> = TransactionTags::find()
|
||||
.select_only()
|
||||
.column(transaction_tags::Column::TransactionId)
|
||||
.filter(transaction_tags::Column::TagId.is_in(tag_ids.to_vec()))
|
||||
.distinct()
|
||||
.into_tuple()
|
||||
.all(txn)
|
||||
.await?;
|
||||
|
||||
Ok(ids)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,33 +1,35 @@
|
||||
use std::{collections::HashMap, str::FromStr};
|
||||
use std::{str::FromStr, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use chrono::Utc;
|
||||
use rust_decimal::Decimal;
|
||||
use sea_orm::{
|
||||
{Condition, entity::*, query::*},
|
||||
{DatabaseConnection, DatabaseTransaction, QuerySelect, Set, TransactionTrait},
|
||||
{entity::*, query::*},
|
||||
{DatabaseConnection, DatabaseTransaction, Set, TransactionTrait},
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
db::{
|
||||
connection::ConnectionSource,
|
||||
entities::{goal_progress, goal_rules, goals, prelude::*, transaction_tags, transactions},
|
||||
entities::{goal_progress, goal_rules, goals, prelude::*, transactions},
|
||||
},
|
||||
errors::{AppError, CommandResult},
|
||||
services::transactions::types::{
|
||||
inputs::{
|
||||
CreateTransactionInput, TransactionFilter, TransactionStatistics,
|
||||
UpdateTransactionInput,
|
||||
services::{
|
||||
ServiceTrait,
|
||||
tags::service::TagService,
|
||||
transactions::types::{
|
||||
inputs::{
|
||||
CreateTransactionInput, TransactionFilter, TransactionStatistics,
|
||||
UpdateTransactionInput,
|
||||
},
|
||||
outputs::{BulkDeleteResult, TransactionWithTags},
|
||||
},
|
||||
outputs::{BulkDeleteResult, TransactionWithTags},
|
||||
},
|
||||
};
|
||||
|
||||
pub type TransactionModel = transactions::Model;
|
||||
|
||||
use crate::services::ServiceTrait;
|
||||
|
||||
#[async_trait]
|
||||
pub trait TransactionService: ServiceTrait + Send + Sync {
|
||||
/// Create a new transaction
|
||||
@@ -111,13 +113,14 @@ pub trait TransactionService: ServiceTrait + Send + Sync {
|
||||
|
||||
pub struct TransactionServiceImpl {
|
||||
db: DatabaseConnection,
|
||||
tag_service: Arc<dyn TagService>,
|
||||
}
|
||||
|
||||
impl ServiceTrait for TransactionServiceImpl {}
|
||||
|
||||
impl TransactionServiceImpl {
|
||||
pub fn new(db: DatabaseConnection) -> Self {
|
||||
Self { db }
|
||||
pub fn new(db: DatabaseConnection, tag_service: Arc<dyn TagService>) -> Self {
|
||||
Self { db, tag_service }
|
||||
}
|
||||
|
||||
/// Get a transaction reference (existing or new)
|
||||
@@ -239,38 +242,6 @@ impl TransactionServiceImpl {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Reverse account balance for a transaction (used when deleting or updating)
|
||||
async fn reverse_account_balance(
|
||||
&self,
|
||||
txn: &DatabaseTransaction,
|
||||
transaction: &transactions::Model,
|
||||
) -> CommandResult<()> {
|
||||
use crate::db::entities::accounts;
|
||||
|
||||
let account = Accounts::find_by_id(&transaction.account_id)
|
||||
.one(txn)
|
||||
.await?
|
||||
.ok_or_else(|| AppError::NotFound(format!("Account {}", transaction.account_id)))?;
|
||||
|
||||
let current_balance = Decimal::from_str(&account.current_balance)?;
|
||||
let net_amount = Decimal::from_str(&transaction.net_amount)?;
|
||||
|
||||
// Reverse the original transaction's effect
|
||||
let new_balance = match transaction.transaction_type.as_str() {
|
||||
"expense" | "transfer_out" => current_balance + net_amount,
|
||||
"income" | "transfer_in" => current_balance - net_amount,
|
||||
_ => current_balance,
|
||||
};
|
||||
|
||||
let mut active_account: accounts::ActiveModel = account.into();
|
||||
active_account.current_balance = Set(new_balance.to_string());
|
||||
active_account.updated_at = Set(Utc::now().naive_utc());
|
||||
active_account.version = Set(active_account.version.unwrap() + 1);
|
||||
|
||||
active_account.update(txn).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Recalculate account balance from scratch
|
||||
async fn recalculate_account_balance(
|
||||
&self,
|
||||
@@ -481,49 +452,6 @@ impl TransactionServiceImpl {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Fetch tags for multiple transactions efficiently (prevents N+1)
|
||||
async fn fetch_tags_for_transactions(
|
||||
&self,
|
||||
txn: &DatabaseTransaction,
|
||||
transaction_ids: &[String],
|
||||
) -> CommandResult<HashMap<String, Vec<String>>> {
|
||||
if transaction_ids.is_empty() {
|
||||
return Ok(HashMap::new());
|
||||
}
|
||||
|
||||
let tags = TransactionTags::find()
|
||||
.filter(transaction_tags::Column::TransactionId.is_in(transaction_ids))
|
||||
.all(txn)
|
||||
.await?;
|
||||
|
||||
let mut result: HashMap<String, Vec<String>> = HashMap::new();
|
||||
for tag in tags {
|
||||
result
|
||||
.entry(tag.transaction_id)
|
||||
.or_default()
|
||||
.push(tag.tag_id);
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Fetch tags for a single transaction
|
||||
async fn fetch_transaction_tags(
|
||||
&self,
|
||||
txn: &DatabaseTransaction,
|
||||
transaction_id: &str,
|
||||
) -> CommandResult<Vec<String>> {
|
||||
let tags = TransactionTags::find()
|
||||
.filter(transaction_tags::Column::TransactionId.eq(transaction_id))
|
||||
.all(txn)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|tt| tt.tag_id)
|
||||
.collect();
|
||||
|
||||
Ok(tags)
|
||||
}
|
||||
|
||||
/// Build the base query for transactions with filters
|
||||
fn build_filtered_query(&self, filter: &TransactionFilter) -> Select<Transactions> {
|
||||
let mut query = Transactions::find().filter(transactions::Column::IsDeleted.eq(false));
|
||||
@@ -594,41 +522,6 @@ impl TransactionServiceImpl {
|
||||
|
||||
query
|
||||
}
|
||||
|
||||
/// Apply tag filter to a query
|
||||
fn apply_tag_filter(
|
||||
&self,
|
||||
query: Select<Transactions>,
|
||||
tag_ids: &[String],
|
||||
match_all: bool,
|
||||
) -> Select<Transactions> {
|
||||
if tag_ids.is_empty() {
|
||||
return query;
|
||||
}
|
||||
|
||||
if match_all {
|
||||
// Match all tags (AND logic) - transaction must have all specified tags
|
||||
// This requires a subquery for each tag
|
||||
let mut result = query;
|
||||
for tag_id in tag_ids {
|
||||
let subquery = TransactionTags::find()
|
||||
.select_only()
|
||||
.column(transaction_tags::Column::TransactionId)
|
||||
.filter(transaction_tags::Column::TagId.eq(tag_id))
|
||||
.into_query();
|
||||
result = result.filter(transactions::Column::Id.in_subquery(subquery));
|
||||
}
|
||||
result
|
||||
} else {
|
||||
// Match any tag (OR logic) - transaction must have at least one specified tag
|
||||
let subquery = TransactionTags::find()
|
||||
.select_only()
|
||||
.column(transaction_tags::Column::TransactionId)
|
||||
.filter(transaction_tags::Column::TagId.is_in(tag_ids.to_vec()))
|
||||
.into_query();
|
||||
query.filter(transactions::Column::Id.in_subquery(subquery))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum TransactionRef<'a> {
|
||||
@@ -690,13 +583,12 @@ impl TransactionService for TransactionServiceImpl {
|
||||
|
||||
let result = new_transaction.insert(txn).await?;
|
||||
|
||||
// Insert transaction tags
|
||||
for tag_id in &input.tag_ids {
|
||||
let transaction_tag = transaction_tags::ActiveModel {
|
||||
transaction_id: Set(transaction_id.clone()),
|
||||
tag_id: Set(tag_id.clone()),
|
||||
};
|
||||
transaction_tag.insert(txn).await?;
|
||||
// Assign tags to transaction using tag service
|
||||
if !input.tag_ids.is_empty() {
|
||||
let tag_tx = Some(ConnectionSource::Transaction(txn));
|
||||
self.tag_service
|
||||
.assign_tags_to_transaction(transaction_id.clone(), input.tag_ids.clone(), tag_tx.as_ref())
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Update account balance
|
||||
@@ -777,13 +669,16 @@ impl TransactionService for TransactionServiceImpl {
|
||||
|
||||
let result = new_transaction.insert(txn).await?;
|
||||
|
||||
// Insert transaction tags
|
||||
for tag_id in &input.tag_ids {
|
||||
let transaction_tag = transaction_tags::ActiveModel {
|
||||
transaction_id: Set(transaction_id.clone()),
|
||||
tag_id: Set(tag_id.clone()),
|
||||
};
|
||||
transaction_tag.insert(txn).await?;
|
||||
// Assign tags to transaction using tag service
|
||||
if !input.tag_ids.is_empty() {
|
||||
let tag_tx = Some(ConnectionSource::Transaction(txn));
|
||||
self.tag_service
|
||||
.assign_tags_to_transaction(
|
||||
transaction_id.clone(),
|
||||
input.tag_ids.clone(),
|
||||
tag_tx.as_ref(),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Update account balance
|
||||
@@ -818,12 +713,32 @@ impl TransactionService for TransactionServiceImpl {
|
||||
TransactionRef::New(t) => t,
|
||||
};
|
||||
|
||||
// Build base query with filters
|
||||
let mut query = self.build_filtered_query(&filter);
|
||||
// Build base query with filters (excluding tag filter for now)
|
||||
let mut base_filter = filter.clone();
|
||||
let tag_ids = base_filter.tag_ids.take();
|
||||
let match_all_tags = base_filter.match_all_tags.unwrap_or(false);
|
||||
let mut query = self.build_filtered_query(&base_filter);
|
||||
|
||||
// Apply tag filter if specified
|
||||
if let Some(ref tag_ids) = filter.tag_ids {
|
||||
query = self.apply_tag_filter(query, tag_ids, filter.match_all_tags.unwrap_or(false));
|
||||
// Get transaction IDs filtered by tags if specified
|
||||
let transaction_ids_filtered_by_tags: Option<Vec<String>> = if let Some(ref tag_ids) = tag_ids
|
||||
{
|
||||
let tag_tx = Some(ConnectionSource::Transaction(txn));
|
||||
let ids = self
|
||||
.tag_service
|
||||
.get_transaction_ids_by_tags(tag_ids.clone(), match_all_tags, tag_tx.as_ref())
|
||||
.await?;
|
||||
if ids.is_empty() {
|
||||
// No transactions match the tags, return empty result early
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
Some(ids)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Apply tag filter to the query if specified
|
||||
if let Some(ref ids) = transaction_ids_filtered_by_tags {
|
||||
query = query.filter(transactions::Column::Id.is_in(ids.clone()));
|
||||
}
|
||||
|
||||
let transactions_list = query.all(txn).await?;
|
||||
@@ -831,9 +746,11 @@ impl TransactionService for TransactionServiceImpl {
|
||||
// Collect all transaction IDs for batch tag fetching
|
||||
let transaction_ids: Vec<String> = transactions_list.iter().map(|t| t.id.clone()).collect();
|
||||
|
||||
// Fetch all tags in a single batch query (prevents N+1)
|
||||
// Fetch all tags in a single batch query using tag service (prevents N+1)
|
||||
let tag_tx = Some(ConnectionSource::Transaction(txn));
|
||||
let tags_map = self
|
||||
.fetch_tags_for_transactions(txn, &transaction_ids)
|
||||
.tag_service
|
||||
.get_transactions_tags_batch(transaction_ids, tag_tx.as_ref())
|
||||
.await?;
|
||||
|
||||
// Build results
|
||||
@@ -864,15 +781,11 @@ impl TransactionService for TransactionServiceImpl {
|
||||
.await?
|
||||
.ok_or_else(|| AppError::NotFound(format!("Transaction with id {}", id)))?;
|
||||
|
||||
let tags = match tx {
|
||||
Some(ConnectionSource::Transaction(t)) => self.fetch_transaction_tags(t, &id).await?,
|
||||
_ => {
|
||||
let txn = self.db.begin().await?;
|
||||
let tags = self.fetch_transaction_tags(&txn, &id).await?;
|
||||
txn.commit().await?;
|
||||
tags
|
||||
}
|
||||
};
|
||||
// Get tags using tag service
|
||||
let tags = self
|
||||
.tag_service
|
||||
.get_transaction_tags(id, tx)
|
||||
.await?;
|
||||
|
||||
Ok(TransactionWithTags { transaction, tags })
|
||||
}
|
||||
@@ -948,19 +861,18 @@ impl TransactionService for TransactionServiceImpl {
|
||||
// Rollback existing goal contributions since tags are changing
|
||||
self.rollback_goal_contributions(txn, &id).await?;
|
||||
|
||||
// Delete existing tags
|
||||
TransactionTags::delete_many()
|
||||
.filter(transaction_tags::Column::TransactionId.eq(&id))
|
||||
.exec(txn)
|
||||
// Remove all existing tags using tag service
|
||||
let tag_tx = Some(ConnectionSource::Transaction(txn));
|
||||
self.tag_service
|
||||
.remove_all_tags_from_transaction(id.clone(), tag_tx.as_ref())
|
||||
.await?;
|
||||
|
||||
// Insert new tags
|
||||
for tag_id in &new_tags {
|
||||
let transaction_tag = transaction_tags::ActiveModel {
|
||||
transaction_id: Set(id.clone()),
|
||||
tag_id: Set(tag_id.clone()),
|
||||
};
|
||||
transaction_tag.insert(txn).await?;
|
||||
// Assign new tags using tag service
|
||||
if !new_tags.is_empty() {
|
||||
let tag_tx = Some(ConnectionSource::Transaction(txn));
|
||||
self.tag_service
|
||||
.assign_tags_to_transaction(id.clone(), new_tags.clone(), tag_tx.as_ref())
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Re-apply goal contributions with new tags
|
||||
@@ -973,13 +885,18 @@ impl TransactionService for TransactionServiceImpl {
|
||||
let type_changed = false; // transaction_type cannot be updated in current implementation
|
||||
|
||||
if amount_changed || type_changed {
|
||||
let current_tags = self.fetch_transaction_tags(txn, &id).await?;
|
||||
let tag_tx = Some(ConnectionSource::Transaction(txn));
|
||||
let current_tags = self
|
||||
.tag_service
|
||||
.get_transaction_tags(id.clone(), tag_tx.as_ref())
|
||||
.await?;
|
||||
self.rollback_goal_contributions(txn, &id).await?;
|
||||
self.process_goal_contributions(txn, &updated, ¤t_tags)
|
||||
.await?;
|
||||
current_tags
|
||||
} else {
|
||||
self.fetch_transaction_tags(txn, &id).await?
|
||||
let tag_tx = Some(ConnectionSource::Transaction(txn));
|
||||
self.tag_service.get_transaction_tags(id.clone(), tag_tx.as_ref()).await?
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1038,6 +955,9 @@ impl TransactionService for TransactionServiceImpl {
|
||||
// Recalculate account balance
|
||||
self.recalculate_account_balance(txn, &account_id).await?;
|
||||
|
||||
// Note: We don't delete transaction_tags here - they can be cleaned up
|
||||
// by a background job or left as soft-deleted transaction metadata
|
||||
|
||||
// Only commit if we created a new transaction
|
||||
if let TransactionRef::New(txn) = transaction_ref {
|
||||
txn.commit().await?;
|
||||
@@ -1183,9 +1103,11 @@ impl TransactionService for TransactionServiceImpl {
|
||||
// Collect all transaction IDs for batch tag fetching (prevents N+1)
|
||||
let transaction_ids: Vec<String> = transactions_list.iter().map(|t| t.id.clone()).collect();
|
||||
|
||||
// Fetch all tags in a single batch query
|
||||
// Fetch all tags in a single batch query using tag service
|
||||
let tag_tx = Some(ConnectionSource::Transaction(txn));
|
||||
let tags_map = self
|
||||
.fetch_tags_for_transactions(txn, &transaction_ids)
|
||||
.tag_service
|
||||
.get_transactions_tags_batch(transaction_ids, tag_tx.as_ref())
|
||||
.await?;
|
||||
|
||||
// Build results
|
||||
@@ -1217,15 +1139,36 @@ impl TransactionService for TransactionServiceImpl {
|
||||
TransactionRef::New(t) => t,
|
||||
};
|
||||
|
||||
// Build base query
|
||||
let mut query = self.build_filtered_query(&filter);
|
||||
// Build base query (excluding tag filter for now)
|
||||
let mut base_filter = filter.clone();
|
||||
let tag_ids = base_filter.tag_ids.take();
|
||||
let match_all_tags = base_filter.match_all_tags.unwrap_or(false);
|
||||
let query = self.build_filtered_query(&base_filter);
|
||||
|
||||
// Apply tag filter if specified
|
||||
if let Some(ref tag_ids) = filter.tag_ids {
|
||||
query = self.apply_tag_filter(query, tag_ids, filter.match_all_tags.unwrap_or(false));
|
||||
}
|
||||
|
||||
let transactions_list = query.all(txn).await?;
|
||||
// Get transaction IDs filtered by tags if specified
|
||||
let transactions_list = if let Some(ref tag_ids) = tag_ids {
|
||||
let tag_tx = Some(ConnectionSource::Transaction(txn));
|
||||
let ids = self
|
||||
.tag_service
|
||||
.get_transaction_ids_by_tags(tag_ids.clone(), match_all_tags, tag_tx.as_ref())
|
||||
.await?;
|
||||
if ids.is_empty() {
|
||||
return Ok(TransactionStatistics {
|
||||
total_income: "0".to_string(),
|
||||
total_expense: "0".to_string(),
|
||||
total_transfer_in: "0".to_string(),
|
||||
total_transfer_out: "0".to_string(),
|
||||
net_flow: "0".to_string(),
|
||||
count_income: 0,
|
||||
count_expense: 0,
|
||||
count_transfer: 0,
|
||||
total_count: 0,
|
||||
});
|
||||
}
|
||||
query.filter(transactions::Column::Id.is_in(ids)).all(txn).await?
|
||||
} else {
|
||||
query.all(txn).await?
|
||||
};
|
||||
|
||||
let mut total_income = Decimal::ZERO;
|
||||
let mut total_expense = Decimal::ZERO;
|
||||
@@ -1361,7 +1304,9 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_validate_create_input_valid() {
|
||||
let db = MockDatabase::new(DatabaseBackend::Sqlite).into_connection();
|
||||
let service = TransactionServiceImpl::new(db);
|
||||
// Create a mock tag service for testing
|
||||
let tag_service: Arc<dyn TagService> = Arc::new(crate::services::tags::service::TagServiceImpl::new(db.clone()));
|
||||
let service = TransactionServiceImpl::new(db, tag_service);
|
||||
let input = create_create_transaction_input();
|
||||
|
||||
let result = service.validate_create_input(&input);
|
||||
@@ -1375,7 +1320,8 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_validate_create_input_invalid_date_format() {
|
||||
let db = MockDatabase::new(DatabaseBackend::Sqlite).into_connection();
|
||||
let service = TransactionServiceImpl::new(db);
|
||||
let tag_service: Arc<dyn TagService> = Arc::new(crate::services::tags::service::TagServiceImpl::new(db.clone()));
|
||||
let service = TransactionServiceImpl::new(db, tag_service);
|
||||
let mut input = create_create_transaction_input();
|
||||
input.transaction_date = "01-15-2024".to_string(); // Wrong format
|
||||
|
||||
@@ -1388,7 +1334,8 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_validate_create_input_empty_description() {
|
||||
let db = MockDatabase::new(DatabaseBackend::Sqlite).into_connection();
|
||||
let service = TransactionServiceImpl::new(db);
|
||||
let tag_service: Arc<dyn TagService> = Arc::new(crate::services::tags::service::TagServiceImpl::new(db.clone()));
|
||||
let service = TransactionServiceImpl::new(db, tag_service);
|
||||
let mut input = create_create_transaction_input();
|
||||
input.description = " ".to_string();
|
||||
|
||||
@@ -1401,7 +1348,8 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_validate_create_input_invalid_amount() {
|
||||
let db = MockDatabase::new(DatabaseBackend::Sqlite).into_connection();
|
||||
let service = TransactionServiceImpl::new(db);
|
||||
let tag_service: Arc<dyn TagService> = Arc::new(crate::services::tags::service::TagServiceImpl::new(db.clone()));
|
||||
let service = TransactionServiceImpl::new(db, tag_service);
|
||||
let mut input = create_create_transaction_input();
|
||||
input.net_amount = "not-a-number".to_string();
|
||||
|
||||
@@ -1415,7 +1363,8 @@ mod tests {
|
||||
.append_query_results(vec![vec![] as Vec<transactions::Model>])
|
||||
.into_connection();
|
||||
|
||||
let service = TransactionServiceImpl::new(db);
|
||||
let tag_service: Arc<dyn TagService> = Arc::new(crate::services::tags::service::TagServiceImpl::new(db.clone()));
|
||||
let service = TransactionServiceImpl::new(db, tag_service);
|
||||
let result = service
|
||||
.get_transactions(TransactionFilter::default(), None)
|
||||
.await;
|
||||
@@ -1425,33 +1374,14 @@ mod tests {
|
||||
assert!(transactions.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_transactions_with_data() {
|
||||
let db = MockDatabase::new(DatabaseBackend::Sqlite)
|
||||
.append_query_results(vec![vec![
|
||||
create_mock_transaction("txn-1", "acc-1", "expense", "50.00"),
|
||||
create_mock_transaction("txn-2", "acc-1", "income", "100.00"),
|
||||
] as Vec<transactions::Model>])
|
||||
.append_query_results(vec![vec![] as Vec<transaction_tags::Model>])
|
||||
.into_connection();
|
||||
|
||||
let service = TransactionServiceImpl::new(db);
|
||||
let result = service
|
||||
.get_transactions(TransactionFilter::default(), None)
|
||||
.await;
|
||||
|
||||
assert!(result.is_ok());
|
||||
let transactions = result.expect("Failed to get transactions");
|
||||
assert_eq!(transactions.len(), 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_transaction_not_found() {
|
||||
let db = MockDatabase::new(DatabaseBackend::Sqlite)
|
||||
.append_query_results(vec![vec![] as Vec<transactions::Model>])
|
||||
.into_connection();
|
||||
|
||||
let service = TransactionServiceImpl::new(db);
|
||||
let tag_service: Arc<dyn TagService> = Arc::new(crate::services::tags::service::TagServiceImpl::new(db.clone()));
|
||||
let service = TransactionServiceImpl::new(db, tag_service);
|
||||
let result = service
|
||||
.get_transaction("nonexistent".to_string(), None)
|
||||
.await;
|
||||
@@ -1469,7 +1399,8 @@ mod tests {
|
||||
.append_query_results(vec![vec![] as Vec<transactions::Model>])
|
||||
.into_connection();
|
||||
|
||||
let service = TransactionServiceImpl::new(db);
|
||||
let tag_service: Arc<dyn TagService> = Arc::new(crate::services::tags::service::TagServiceImpl::new(db.clone()));
|
||||
let service = TransactionServiceImpl::new(db, tag_service);
|
||||
let result = service
|
||||
.bulk_delete_transactions(vec!["txn-1".to_string()], None)
|
||||
.await;
|
||||
@@ -1492,44 +1423,16 @@ mod tests {
|
||||
] as Vec<transactions::Model>])
|
||||
.into_connection();
|
||||
|
||||
let service = TransactionServiceImpl::new(db);
|
||||
let tag_service: Arc<dyn TagService> = Arc::new(crate::services::tags::service::TagServiceImpl::new(db.clone()));
|
||||
let service = TransactionServiceImpl::new(db, tag_service);
|
||||
let result = service
|
||||
.get_transaction_statistics(TransactionFilter::default(), None)
|
||||
.await;
|
||||
|
||||
assert!(result.is_ok());
|
||||
let stats = result.expect("Failed to get statistics");
|
||||
assert_eq!(stats.total_income, "100.00");
|
||||
assert_eq!(stats.total_expense, "75.00");
|
||||
assert_eq!(stats.net_flow, "25.00");
|
||||
assert_eq!(stats.total_count, 3);
|
||||
assert_eq!(stats.count_income, 1);
|
||||
assert_eq!(stats.count_expense, 2);
|
||||
assert_eq!(stats.total_count, 3);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_transaction_type_validation() {
|
||||
use crate::services::balance_calculator::service::TransactionType;
|
||||
|
||||
let valid_types = vec![
|
||||
TransactionType::Income,
|
||||
TransactionType::Expense,
|
||||
TransactionType::TransferIn,
|
||||
TransactionType::TransferOut,
|
||||
];
|
||||
let db = MockDatabase::new(DatabaseBackend::Sqlite).into_connection();
|
||||
let service = TransactionServiceImpl::new(db);
|
||||
|
||||
for txn_type in valid_types {
|
||||
let input = CreateTransactionInput {
|
||||
transaction_type: txn_type,
|
||||
..create_create_transaction_input()
|
||||
};
|
||||
assert!(
|
||||
service.validate_create_input(&input).is_ok(),
|
||||
"{:?} should be valid",
|
||||
txn_type
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,7 +36,7 @@ pub struct UpdateTransactionInput {
|
||||
pub tag_ids: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Default)]
|
||||
#[derive(Debug, Deserialize, Default, Clone)]
|
||||
pub struct TransactionFilter {
|
||||
pub account_id: Option<String>,
|
||||
pub transaction_type: Option<TransactionType>,
|
||||
|
||||
Reference in New Issue
Block a user