Skip to content

Commit

Permalink
chore: delete imported task stream value (#1160)
Browse files Browse the repository at this point in the history
* chore: delete import task

* chore: delete requeue task
  • Loading branch information
appflowy authored Jan 14, 2025
1 parent 4ddb08e commit 5a2f54b
Showing 1 changed file with 10 additions and 10 deletions.
20 changes: 10 additions & 10 deletions services/appflowy-worker/src/import_worker/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,8 @@ async fn handle_expired_task(
task.workspace_id, err
);
}
if let Err(err) = xack_task(&mut context.redis_client, stream_name, group_name, entry_id).await {
if let Err(err) = delete_task(&mut context.redis_client, stream_name, group_name, entry_id).await
{
error!(
"[Import] failed to acknowledge task:{} error:{:?}",
task.workspace_id, err
Expand Down Expand Up @@ -409,7 +410,7 @@ async fn process_and_ack_task(
entry_id: &str,
) -> Result<(), ImportError> {
let result = process_task(context.clone(), import_task).await;
xack_task(&mut context.redis_client, stream_name, group_name, entry_id)
delete_task(&mut context.redis_client, stream_name, group_name, entry_id)
.await
.ok();
result
Expand Down Expand Up @@ -471,7 +472,7 @@ fn is_task_expired(created_timestamp: i64, last_process_at: Option<i64>) -> Resu
async fn push_task(
redis_client: &mut ConnectionManager,
stream_name: &str,
group_name: &str,
_group_name: &str,
task: ImportTask,
entry_id: &str,
) -> Result<(), ImportError> {
Expand All @@ -483,11 +484,10 @@ async fn push_task(
let mut pipeline = redis::pipe();
pipeline
.atomic() // Ensures the commands are executed atomically
.cmd("XACK") // Acknowledge the task
.cmd("XDEL") // delete the task
.arg(stream_name)
.arg(group_name)
.arg(entry_id)
.ignore() // Ignore the result of XACK
.ignore() // Ignore the result of XDEL
.cmd("XADD") // Re-add the task to the stream
.arg(stream_name)
.arg("*")
Expand All @@ -507,17 +507,17 @@ async fn push_task(
}
}

async fn xack_task(
async fn delete_task(
redis_client: &mut ConnectionManager,
stream_name: &str,
group_name: &str,
_group_name: &str,
entry_id: &str,
) -> Result<(), ImportError> {
let _: () = redis_client
.xack(stream_name, group_name, &[entry_id])
.xdel(stream_name, &[entry_id])
.await
.map_err(|e| {
error!("Failed to acknowledge task: {:?}", e);
error!("Failed to delete import task: {:?}", e);
ImportError::Internal(e.into())
})?;
Ok(())
Expand Down

0 comments on commit 5a2f54b

Please sign in to comment.