Skip to content

Commit

Permalink
Merge pull request #15 from CoLearn-Dev/wait-task
Browse files Browse the repository at this point in the history
- add wait_task
- fix read_or_wait
  • Loading branch information
stneng authored Oct 4, 2022
2 parents 520c5be + 110725d commit 87f9a5e
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 3 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "colink"
version = "0.1.18"
version = "0.1.19"
edition = "2021"
description = "CoLink Rust SDK"
license = "MIT"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ CoLink SDK helps both application adnd protocol developers access the functional
Add this to your Cargo.toml:
```toml
[dependencies]
colink = "0.1.18"
colink = "0.1.19"
```

## Getting Started
Expand Down
2 changes: 2 additions & 0 deletions src/extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ pub mod registry;
mod remote_storage;
#[cfg(feature = "variable_transfer")]
mod variable_transfer;
#[cfg(feature = "extensions")]
mod wait_task;
2 changes: 1 addition & 1 deletion src/extensions/read_or_wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ impl crate::application::CoLink {
match self.read_entry(key).await {
Ok(res) => Ok(res),
Err(e) => {
let queue_name = self.subscribe(key, None).await?;
let queue_name = self.subscribe(key, Some(0)).await?;
let mut subscriber = self.new_subscriber(&queue_name).await?;
let data = subscriber.get_next().await?;
debug!("Received [{}]", String::from_utf8_lossy(&data));
Expand Down
47 changes: 47 additions & 0 deletions src/extensions/wait_task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use crate::colink_proto::*;
use prost::Message;
use tracing::debug;

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

impl crate::application::CoLink {
pub async fn wait_task(&self, task_id: &str) -> Result<(), Error> {
let task_key = format!("_internal:tasks:{}", task_id);
let start_timestamp = match self
.read_entries(&[StorageEntry {
key_name: task_key.clone(),
..Default::default()
}])
.await
{
Ok(res) => {
let task: Task = Message::decode(&*res[0].payload).unwrap();
if task.status == "finished" {
return Ok(());
}
get_timestamp(&res[0].key_path) + 1
}
Err(_) => 0,
};
let queue_name = self.subscribe(&task_key, Some(start_timestamp)).await?;
let mut subscriber = self.new_subscriber(&queue_name).await?;
loop {
let data = subscriber.get_next().await?;
debug!("Received [{}]", String::from_utf8_lossy(&data));
let message: SubscriptionMessage = Message::decode(&*data).unwrap();
if message.change_type != "delete" {
let task: Task = Message::decode(&*message.payload).unwrap();
if task.status == "finished" {
break;
}
}
}
self.unsubscribe(&queue_name).await?;
Ok(())
}
}

fn get_timestamp(key_path: &str) -> i64 {
let pos = key_path.rfind('@').unwrap();
key_path[pos + 1..].parse().unwrap()
}

0 comments on commit 87f9a5e

Please sign in to comment.