From 49671c0506e9d10f4dc9e3b19736787762aa4551 Mon Sep 17 00:00:00 2001 From: weidong fu Date: Tue, 14 Jan 2025 12:14:17 +0800 Subject: [PATCH] chore: notify user file size --- .../src/import_worker/worker.rs | 60 +++++++++++-------- 1 file changed, 35 insertions(+), 25 deletions(-) diff --git a/services/appflowy-worker/src/import_worker/worker.rs b/services/appflowy-worker/src/import_worker/worker.rs index 80c9e700a..c8b3d79f5 100644 --- a/services/appflowy-worker/src/import_worker/worker.rs +++ b/services/appflowy-worker/src/import_worker/worker.rs @@ -288,7 +288,23 @@ async fn consume_task( Some(file_size) => { if file_size > context.maximum_import_file_size as i64 { let file_size_in_mb = file_size as f64 / 1_048_576.0; - let max_size_in_mb = context.maximum_import_file_size as f64 / 1_048_576.0; + let max_size_in_mb = (context.maximum_import_file_size as f64 / 1_048_576.0).ceil(); + if let Ok(import_record) = select_import_task(&context.pg_pool, &task.task_id).await { + handle_failed_task( + &mut context, + &import_record, + task, + stream_name, + group_name, + &entry_id, + ImportError::UploadFileTooLarge { + file_size_in_mb, + max_size_in_mb, + }, + ImportTaskState::Failed, + ) + .await?; + } return Err(ImportError::UploadFileTooLarge { file_size_in_mb, @@ -299,16 +315,18 @@ async fn consume_task( } // Check if the task is expired - if let Err(err) = is_task_expired(task.created_at.unwrap(), task.last_process_at) { + if let Err(reason) = is_task_expired(task.created_at.unwrap(), task.last_process_at) { if let Ok(import_record) = select_import_task(&context.pg_pool, &task.task_id).await { - handle_expired_task( + error!("[Import] {} task is expired: {}", task.workspace_id, reason); + handle_failed_task( &mut context, &import_record, task, stream_name, group_name, &entry_id, - &err, + ImportError::UploadFileExpire, + ImportTaskState::Expire, ) .await?; } @@ -342,30 +360,28 @@ async fn consume_task( } } -async fn handle_expired_task( +#[allow(clippy::too_many_arguments)] +async fn handle_failed_task( context: &mut TaskContext, import_record: &AFImportTask, task: &NotionImportTask, stream_name: &str, group_name: &str, entry_id: &str, - reason: &str, + error: ImportError, + task_state: ImportTaskState, ) -> Result<(), ImportError> { info!( - "[Import]: {} import is expired with reason:{}", - task.workspace_id, reason + "[Import]: {} import was failed with reason:{}", + task.workspace_id, error ); - update_import_task_status( - &import_record.task_id, - ImportTaskState::Expire, - &context.pg_pool, - ) - .await - .map_err(|e| { - error!("Failed to update import task status: {:?}", e); - ImportError::Internal(e.into()) - })?; + update_import_task_status(&import_record.task_id, task_state, &context.pg_pool) + .await + .map_err(|e| { + error!("Failed to update import task status: {:?}", e); + ImportError::Internal(e.into()) + })?; remove_workspace(&import_record.workspace_id, &context.pg_pool).await; info!("[Import]: deleted workspace {}", task.workspace_id); @@ -382,13 +398,7 @@ async fn handle_expired_task( task.workspace_id, err ); } - notify_user( - task, - Err(ImportError::UploadFileExpire), - context.notifier.clone(), - &context.metrics, - ) - .await?; + notify_user(task, Err(error), context.notifier.clone(), &context.metrics).await?; Ok(()) }