From 73c2978b6c4c1b9418e90804d1e6ba0a9c2b79fe Mon Sep 17 00:00:00 2001 From: stneng Date: Fri, 5 Aug 2022 06:41:13 +0000 Subject: [PATCH 1/5] registry --- Cargo.toml | 11 ++-- README.md | 3 + build.rs | 5 +- .../host_import_users_and_set_registry.rs | 60 +++++++++++++++++++ proto | 2 +- src/application.rs | 7 +++ src/extensions.rs | 6 +- src/extensions/lock.rs | 1 - .../{extension.rs => read_or_wait.rs} | 1 - src/extensions/registry.rs | 22 +++++++ src/lib.rs | 2 + 11 files changed, 110 insertions(+), 10 deletions(-) create mode 100644 examples/host_import_users_and_set_registry.rs rename src/extensions/{extension.rs => read_or_wait.rs} (96%) create mode 100644 src/extensions/registry.rs diff --git a/Cargo.toml b/Cargo.toml index 74fc6fe..c1df28e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "colink-sdk" -version = "0.1.11" +version = "0.1.12" edition = "2021" [dependencies] @@ -27,7 +27,10 @@ prost-build = "0.10" tonic-build = "0.7" [features] -default = ["extension"] -extension = ["lock", "variable_transfer"] +default = ["extensions"] +extensions = ["read_or_wait", "lock", "variable_transfer", "registry"] +read_or_wait = [] lock = [] -variable_transfer = [] +remote_storage = [] +variable_transfer = ["read_or_wait", "remote_storage"] +registry = [] diff --git a/README.md b/README.md index ce0f10f..de69195 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,9 @@ cargo run --example host_import_user
cargo run --example host_import_users_and_exchange_guest_jwts
# is optional ``` ``` +cargo run --example host_import_users_and_set_registry
# is optional +``` +``` cargo run --example user_confirm_task
# : approve(default)/reject/ignore ``` ``` diff --git a/build.rs b/build.rs index 853739c..badeae2 100644 --- a/build.rs +++ b/build.rs @@ -1,7 +1,10 @@ fn main() { tonic_build::compile_protos("proto/colink.proto") .unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e)); - #[cfg(feature = "variable_transfer")] + #[cfg(feature = "remote_storage")] prost_build::compile_protos(&["proto/colink_remote_storage.proto"], &["proto/"]) .unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e)); + #[cfg(feature = "registry")] + prost_build::compile_protos(&["proto/colink_registry.proto"], &["proto/"]) + .unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e)); } diff --git a/examples/host_import_users_and_set_registry.rs b/examples/host_import_users_and_set_registry.rs new file mode 100644 index 0000000..febb2c6 --- /dev/null +++ b/examples/host_import_users_and_set_registry.rs @@ -0,0 +1,60 @@ +use colink_sdk::*; +use std::env; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args = env::args().skip(1).collect::>(); + let addr = &args[0]; + let jwt = &args[1]; + let num = &args[2]; + let num: usize = num.parse().unwrap(); + let expiration_timestamp: i64 = if args.len() > 3 { + args[3].parse().unwrap() + } else { + // A default expiration timestamp at 31 days later + chrono::Utc::now().timestamp() + 86400 * 31 + }; + + let cl = CoLink::new(addr, jwt); + let mut users = vec![]; + let (pk, sk) = generate_user(); + let (_, core_pub_key) = cl.request_core_info().await?; + let (signature_timestamp, sig) = + prepare_import_user_signature(&pk, &sk, &core_pub_key, expiration_timestamp); + let registry_user = cl + .import_user(&pk, signature_timestamp, expiration_timestamp, &sig) + .await?; + println!("registry_user:"); + println!("{}", registry_user); + let clt = CoLink::new(addr, ®istry_user); + let registry_jwt = clt + .generate_token_with_expiration_time(expiration_timestamp, "guest") + .await?; + + let registry = Registry { + address: addr.clone(), + guest_jwt: registry_jwt, + }; + let registries = Registries { + registries: vec![registry], + }; + clt.set_registries(®istries).await?; + for i in 0..num { + let (pk, sk) = generate_user(); + let (_, core_pub_key) = cl.request_core_info().await?; + let (signature_timestamp, sig) = + prepare_import_user_signature(&pk, &sk, &core_pub_key, expiration_timestamp); + users.push( + cl.import_user(&pk, signature_timestamp, expiration_timestamp, &sig) + .await?, + ); + let cl = CoLink::new(addr, &users[i]); + cl.set_registries(®istries).await?; + } + println!("user:"); + for i in 0..num { + println!("{}", users[i]); + } + + Ok(()) +} diff --git a/proto b/proto index bf8e158..243f26a 160000 --- a/proto +++ b/proto @@ -1 +1 @@ -Subproject commit bf8e1584991984a045edd3f49fddf172021e30c3 +Subproject commit 243f26a245cb0bb58fa9a9d8ebad0d8254ec73b8 diff --git a/src/application.rs b/src/application.rs index c692dad..f2cc038 100644 --- a/src/application.rs +++ b/src/application.rs @@ -100,6 +100,13 @@ impl CoLink { Ok(auth_content.user_id) } + pub fn get_core_addr(&self) -> Result { + if self.core_addr.is_empty() { + return Err("core_addr not found".to_string()); + } + Ok(self.core_addr.clone()) + } + pub fn update_jwt(&mut self, new_jwt: &str) -> Result<(), String> { self.jwt = new_jwt.to_string(); Ok(()) diff --git a/src/extensions.rs b/src/extensions.rs index 9f0ce71..c35279a 100644 --- a/src/extensions.rs +++ b/src/extensions.rs @@ -1,6 +1,8 @@ -#[cfg(feature = "extension")] -mod extension; #[cfg(feature = "lock")] mod lock; +#[cfg(feature = "read_or_wait")] +mod read_or_wait; +#[cfg(feature = "registry")] +pub mod registry; #[cfg(feature = "variable_transfer")] mod variable_transfer; diff --git a/src/extensions/lock.rs b/src/extensions/lock.rs index ce57c58..89f5cc5 100644 --- a/src/extensions/lock.rs +++ b/src/extensions/lock.rs @@ -52,7 +52,6 @@ impl crate::application::CoLink { } } -#[cfg(feature = "lock")] pub struct CoLinkLockToken { key: String, rnd_num: i32, diff --git a/src/extensions/extension.rs b/src/extensions/read_or_wait.rs similarity index 96% rename from src/extensions/extension.rs rename to src/extensions/read_or_wait.rs index 0ea8637..8078162 100644 --- a/src/extensions/extension.rs +++ b/src/extensions/read_or_wait.rs @@ -4,7 +4,6 @@ use tracing::debug; type Error = Box; impl crate::application::CoLink { - #[cfg(feature = "extension")] pub async fn read_or_wait(&self, key: &str) -> Result, Error> { match self.read_entry(key).await { Ok(res) => Ok(res), diff --git a/src/extensions/registry.rs b/src/extensions/registry.rs new file mode 100644 index 0000000..32f14b1 --- /dev/null +++ b/src/extensions/registry.rs @@ -0,0 +1,22 @@ +use crate::colink_proto::*; +pub use colink_registry_proto::{Registries, Registry}; +use prost::Message; +mod colink_registry_proto { + include!(concat!(env!("OUT_DIR"), "/colink_registry.rs")); +} + +type Error = Box; + +impl crate::application::CoLink { + pub async fn set_registries(&self, registries: &Registries) -> Result<(), Error> { + let participants = vec![Participant { + user_id: self.get_user_id()?, + role: "set_registries".to_string(), + }]; + let mut payload = vec![]; + registries.encode(&mut payload).unwrap(); + self.run_task("registry", &payload, &participants, false) + .await?; + Ok(()) + } +} diff --git a/src/lib.rs b/src/lib.rs index 3c7336c..4f73831 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,3 +11,5 @@ pub use protocol::{ CoLinkProtocol, ProtocolEntry, _colink_parse_args, _protocol_start, async_trait, }; mod extensions; +#[cfg(feature = "registry")] +pub use extensions::registry::{Registries, Registry}; From 21afa58c1d1a94fc499deba95ae587bfaafe980f Mon Sep 17 00:00:00 2001 From: stneng Date: Mon, 8 Aug 2022 03:11:37 +0000 Subject: [PATCH 2/5] fixes --- examples/host_import_users_and_set_registry.rs | 4 ++-- src/extensions/registry.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/host_import_users_and_set_registry.rs b/examples/host_import_users_and_set_registry.rs index febb2c6..c6319e1 100644 --- a/examples/host_import_users_and_set_registry.rs +++ b/examples/host_import_users_and_set_registry.rs @@ -38,7 +38,7 @@ async fn main() -> Result<(), Box let registries = Registries { registries: vec![registry], }; - clt.set_registries(®istries).await?; + clt.update_registries(®istries).await?; for i in 0..num { let (pk, sk) = generate_user(); let (_, core_pub_key) = cl.request_core_info().await?; @@ -49,7 +49,7 @@ async fn main() -> Result<(), Box .await?, ); let cl = CoLink::new(addr, &users[i]); - cl.set_registries(®istries).await?; + cl.update_registries(®istries).await?; } println!("user:"); for i in 0..num { diff --git a/src/extensions/registry.rs b/src/extensions/registry.rs index 32f14b1..f8553ac 100644 --- a/src/extensions/registry.rs +++ b/src/extensions/registry.rs @@ -8,10 +8,10 @@ mod colink_registry_proto { type Error = Box; impl crate::application::CoLink { - pub async fn set_registries(&self, registries: &Registries) -> Result<(), Error> { + pub async fn update_registries(&self, registries: &Registries) -> Result<(), Error> { let participants = vec![Participant { user_id: self.get_user_id()?, - role: "set_registries".to_string(), + role: "update_registries".to_string(), }]; let mut payload = vec![]; registries.encode(&mut payload).unwrap(); From 2f4fae643ad7c2355e132ad37c843bebda99b366 Mon Sep 17 00:00:00 2001 From: stneng Date: Tue, 9 Aug 2022 03:55:34 +0000 Subject: [PATCH 3/5] remote storage --- Cargo.toml | 2 +- src/extensions.rs | 2 + src/extensions/remote_storage.rs | 136 +++++++++++++++++++++++++++++++ 3 files changed, 139 insertions(+), 1 deletion(-) create mode 100644 src/extensions/remote_storage.rs diff --git a/Cargo.toml b/Cargo.toml index c1df28e..4f5e614 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,6 @@ default = ["extensions"] extensions = ["read_or_wait", "lock", "variable_transfer", "registry"] read_or_wait = [] lock = [] -remote_storage = [] +remote_storage = ["read_or_wait"] variable_transfer = ["read_or_wait", "remote_storage"] registry = [] diff --git a/src/extensions.rs b/src/extensions.rs index c35279a..64aa5d8 100644 --- a/src/extensions.rs +++ b/src/extensions.rs @@ -4,5 +4,7 @@ mod lock; mod read_or_wait; #[cfg(feature = "registry")] pub mod registry; +#[cfg(feature = "remote_storage")] +mod remote_storage; #[cfg(feature = "variable_transfer")] mod variable_transfer; diff --git a/src/extensions/remote_storage.rs b/src/extensions/remote_storage.rs new file mode 100644 index 0000000..ecec7ce --- /dev/null +++ b/src/extensions/remote_storage.rs @@ -0,0 +1,136 @@ +use crate::colink_proto::*; +use colink_remote_storage_proto::*; +use prost::Message; + +mod colink_remote_storage_proto { + include!(concat!(env!("OUT_DIR"), "/colink_remote_storage.rs")); +} + +type Error = Box; + +impl crate::application::CoLink { + pub async fn remote_storage_create( + &self, + providers: &[String], + key: &str, + payload: &[u8], + is_public: bool, + ) -> Result<(), Error> { + let mut participants = vec![Participant { + user_id: self.get_user_id()?, + role: "requester".to_string(), + }]; + for provider in providers { + participants.push(Participant { + user_id: provider.to_string(), + role: "provider".to_string(), + }); + } + let params = CreateParams { + remote_key_name: key.to_string(), + payload: payload.to_vec(), + is_public, + }; + let mut payload = vec![]; + params.encode(&mut payload).unwrap(); + self.run_task("remote_storage.create", &payload, &participants, false) + .await?; + Ok(()) + } + + pub async fn remote_storage_read( + &self, + provider: &str, + key: &str, + is_public: bool, + holder_id: &str, + ) -> Result, Error> { + let participants = vec![ + Participant { + user_id: self.get_user_id()?, + role: "requester".to_string(), + }, + Participant { + user_id: provider.to_string(), + role: "provider".to_string(), + }, + ]; + let params = ReadParams { + remote_key_name: key.to_string(), + is_public, + holder_id: holder_id.to_string(), + }; + let mut payload = vec![]; + params.encode(&mut payload).unwrap(); + let task_id = self + .run_task("remote_storage.read", &payload, &participants, false) + .await?; + let status = self + .read_or_wait(&format!("tasks:{}:status", task_id)) + .await?; + if status[0] == 0 { + let data = self + .read_or_wait(&format!("tasks:{}:data", task_id)) + .await?; + Ok(data) + } else { + Err(format!("remote_storage.read: status_code: {}", status[0]))? + } + } + + pub async fn remote_storage_update( + &self, + providers: &[String], + key: &str, + payload: &[u8], + is_public: bool, + ) -> Result<(), Error> { + let mut participants = vec![Participant { + user_id: self.get_user_id()?, + role: "requester".to_string(), + }]; + for provider in providers { + participants.push(Participant { + user_id: provider.to_string(), + role: "provider".to_string(), + }); + } + let params = UpdateParams { + remote_key_name: key.to_string(), + payload: payload.to_vec(), + is_public, + }; + let mut payload = vec![]; + params.encode(&mut payload).unwrap(); + self.run_task("remote_storage.update", &payload, &participants, false) + .await?; + Ok(()) + } + + pub async fn remote_storage_delete( + &self, + provider: &str, + key: &str, + is_public: bool, + ) -> Result<(), Error> { + let participants = vec![ + Participant { + user_id: self.get_user_id()?, + role: "requester".to_string(), + }, + Participant { + user_id: provider.to_string(), + role: "provider".to_string(), + }, + ]; + let params = DeleteParams { + remote_key_name: key.to_string(), + is_public, + }; + let mut payload = vec![]; + params.encode(&mut payload).unwrap(); + self.run_task("remote_storage.delete", &payload, &participants, false) + .await?; + Ok(()) + } +} From 223d7935c98fa37f6ac45e71881204f64bf32476 Mon Sep 17 00:00:00 2001 From: stneng Date: Tue, 9 Aug 2022 04:10:59 +0000 Subject: [PATCH 4/5] remote storage --- examples/user_remote_storage.rs | 78 ++++++++++++++++++++++++++++++++ src/extensions/remote_storage.rs | 20 ++++---- 2 files changed, 88 insertions(+), 10 deletions(-) create mode 100644 examples/user_remote_storage.rs diff --git a/examples/user_remote_storage.rs b/examples/user_remote_storage.rs new file mode 100644 index 0000000..d9e5523 --- /dev/null +++ b/examples/user_remote_storage.rs @@ -0,0 +1,78 @@ +use colink_sdk::{decode_jwt_without_validation, CoLink, SubscriptionMessage}; +use std::env; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args = env::args().skip(1).collect::>(); + let addr = &args[0]; + let jwt_a = &args[1]; + let jwt_b = &args[2]; + let msg = if args.len() > 3 { &args[3] } else { "hello" }; + let user_id_a = decode_jwt_without_validation(jwt_a).unwrap().user_id; + let user_id_b = decode_jwt_without_validation(jwt_b).unwrap().user_id; + + let cl = CoLink::new(addr, jwt_a); + // create + cl.remote_storage_create( + &[user_id_b.clone()], + "remote_storage_demo", + msg.as_bytes(), + false, + ) + .await?; + + let clb = CoLink::new(addr, jwt_b); + let data = clb + .read_or_wait(&format!( + "_remote_storage:private:{}:remote_storage_demo", + user_id_a + )) + .await?; + println!("{}", String::from_utf8_lossy(&data)); + + // read + let data = cl + .remote_storage_read(&user_id_b, "remote_storage_demo", false, "") + .await?; + println!("{}", String::from_utf8_lossy(&data)); + + // update + let queue_name = clb + .subscribe( + &format!("_remote_storage:private:{}:remote_storage_demo", user_id_a), + None, + ) + .await?; + + cl.remote_storage_update( + &[user_id_b.clone()], + "remote_storage_demo", + format!("update {}", msg).as_bytes(), + false, + ) + .await?; + + let mut subscriber = clb.new_subscriber(&queue_name).await?; + let data = subscriber.get_next().await?; + let message: SubscriptionMessage = prost::Message::decode(&*data).unwrap(); + if message.change_type != "delete" { + println!("{}", String::from_utf8_lossy(&*message.payload)); + } else { + Err("Receive delete change_type.")? + } + + // delete + cl.remote_storage_delete(&[user_id_b.clone()], "remote_storage_demo", false) + .await?; + + let data = subscriber.get_next().await?; + clb.unsubscribe(&queue_name).await?; + let message: SubscriptionMessage = prost::Message::decode(&*data).unwrap(); + if message.change_type == "delete" { + println!("Deleted"); + } else { + Err("Receive non-delete change_type.")? + } + + Ok(()) +} diff --git a/src/extensions/remote_storage.rs b/src/extensions/remote_storage.rs index ecec7ce..5463249 100644 --- a/src/extensions/remote_storage.rs +++ b/src/extensions/remote_storage.rs @@ -70,7 +70,7 @@ impl crate::application::CoLink { .await?; if status[0] == 0 { let data = self - .read_or_wait(&format!("tasks:{}:data", task_id)) + .read_or_wait(&format!("tasks:{}:output", task_id)) .await?; Ok(data) } else { @@ -109,20 +109,20 @@ impl crate::application::CoLink { pub async fn remote_storage_delete( &self, - provider: &str, + providers: &[String], key: &str, is_public: bool, ) -> Result<(), Error> { - let participants = vec![ - Participant { - user_id: self.get_user_id()?, - role: "requester".to_string(), - }, - Participant { + let mut participants = vec![Participant { + user_id: self.get_user_id()?, + role: "requester".to_string(), + }]; + for provider in providers { + participants.push(Participant { user_id: provider.to_string(), role: "provider".to_string(), - }, - ]; + }); + } let params = DeleteParams { remote_key_name: key.to_string(), is_public, From 28e67e7bb87246fcc88547210323bf3f2324aed6 Mon Sep 17 00:00:00 2001 From: stneng Date: Tue, 9 Aug 2022 04:12:45 +0000 Subject: [PATCH 5/5] doc --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index de69195..9719e5d 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,9 @@ cargo run --example mtls_request_core_info
``` +``` +cargo run --example user_remote_storage
# is optional +``` ## Protocol ```