Skip to content

Commit

Permalink
Merge pull request #2 from CoLearn-Dev/registry
Browse files Browse the repository at this point in the history
- registry extension
- remote storage extension
  • Loading branch information
stneng authored Aug 11, 2022
2 parents 284fa08 + 28e67e7 commit e55b5ff
Show file tree
Hide file tree
Showing 13 changed files with 329 additions and 10 deletions.
11 changes: 7 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "colink-sdk"
version = "0.1.11"
version = "0.1.12"
edition = "2021"

[dependencies]
Expand All @@ -27,7 +27,10 @@ prost-build = "0.10"
tonic-build = "0.7"

[features]
default = ["extension"]
extension = ["lock", "variable_transfer"]
default = ["extensions"]
extensions = ["read_or_wait", "lock", "variable_transfer", "registry"]
read_or_wait = []
lock = []
variable_transfer = []
remote_storage = ["read_or_wait"]
variable_transfer = ["read_or_wait", "remote_storage"]
registry = []
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ cargo run --example host_import_user <address> <host_jwt> <expiration_timestamp>
cargo run --example host_import_users_and_exchange_guest_jwts <address> <host_jwt> <number> <expiration_timestamp> # <expiration_timestamp> is optional
```
```
cargo run --example host_import_users_and_set_registry <address> <host_jwt> <number> <expiration_timestamp> # <expiration_timestamp> is optional
```
```
cargo run --example user_confirm_task <address> <user_jwt> <task_id> <action> # <action>: approve(default)/reject/ignore
```
```
Expand Down Expand Up @@ -41,6 +44,9 @@ cargo run --example mtls_request_core_info <address> <ca_certificate> <client_ce
```
cargo run --example user_lock <address> <user_jwt>
```
```
cargo run --example user_remote_storage <address> <user_jwt A> <user_jwt B> <message> # <message> is optional
```

## Protocol
```
Expand Down
5 changes: 4 additions & 1 deletion build.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
fn main() {
tonic_build::compile_protos("proto/colink.proto")
.unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
#[cfg(feature = "variable_transfer")]
#[cfg(feature = "remote_storage")]
prost_build::compile_protos(&["proto/colink_remote_storage.proto"], &["proto/"])
.unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
#[cfg(feature = "registry")]
prost_build::compile_protos(&["proto/colink_registry.proto"], &["proto/"])
.unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
}
60 changes: 60 additions & 0 deletions examples/host_import_users_and_set_registry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use colink_sdk::*;
use std::env;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let args = env::args().skip(1).collect::<Vec<_>>();
let addr = &args[0];
let jwt = &args[1];
let num = &args[2];
let num: usize = num.parse().unwrap();
let expiration_timestamp: i64 = if args.len() > 3 {
args[3].parse().unwrap()
} else {
// A default expiration timestamp at 31 days later
chrono::Utc::now().timestamp() + 86400 * 31
};

let cl = CoLink::new(addr, jwt);
let mut users = vec![];
let (pk, sk) = generate_user();
let (_, core_pub_key) = cl.request_core_info().await?;
let (signature_timestamp, sig) =
prepare_import_user_signature(&pk, &sk, &core_pub_key, expiration_timestamp);
let registry_user = cl
.import_user(&pk, signature_timestamp, expiration_timestamp, &sig)
.await?;
println!("registry_user:");
println!("{}", registry_user);
let clt = CoLink::new(addr, &registry_user);
let registry_jwt = clt
.generate_token_with_expiration_time(expiration_timestamp, "guest")
.await?;

let registry = Registry {
address: addr.clone(),
guest_jwt: registry_jwt,
};
let registries = Registries {
registries: vec![registry],
};
clt.update_registries(&registries).await?;
for i in 0..num {
let (pk, sk) = generate_user();
let (_, core_pub_key) = cl.request_core_info().await?;
let (signature_timestamp, sig) =
prepare_import_user_signature(&pk, &sk, &core_pub_key, expiration_timestamp);
users.push(
cl.import_user(&pk, signature_timestamp, expiration_timestamp, &sig)
.await?,
);
let cl = CoLink::new(addr, &users[i]);
cl.update_registries(&registries).await?;
}
println!("user:");
for i in 0..num {
println!("{}", users[i]);
}

Ok(())
}
78 changes: 78 additions & 0 deletions examples/user_remote_storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use colink_sdk::{decode_jwt_without_validation, CoLink, SubscriptionMessage};
use std::env;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let args = env::args().skip(1).collect::<Vec<_>>();
let addr = &args[0];
let jwt_a = &args[1];
let jwt_b = &args[2];
let msg = if args.len() > 3 { &args[3] } else { "hello" };
let user_id_a = decode_jwt_without_validation(jwt_a).unwrap().user_id;
let user_id_b = decode_jwt_without_validation(jwt_b).unwrap().user_id;

let cl = CoLink::new(addr, jwt_a);
// create
cl.remote_storage_create(
&[user_id_b.clone()],
"remote_storage_demo",
msg.as_bytes(),
false,
)
.await?;

let clb = CoLink::new(addr, jwt_b);
let data = clb
.read_or_wait(&format!(
"_remote_storage:private:{}:remote_storage_demo",
user_id_a
))
.await?;
println!("{}", String::from_utf8_lossy(&data));

// read
let data = cl
.remote_storage_read(&user_id_b, "remote_storage_demo", false, "")
.await?;
println!("{}", String::from_utf8_lossy(&data));

// update
let queue_name = clb
.subscribe(
&format!("_remote_storage:private:{}:remote_storage_demo", user_id_a),
None,
)
.await?;

cl.remote_storage_update(
&[user_id_b.clone()],
"remote_storage_demo",
format!("update {}", msg).as_bytes(),
false,
)
.await?;

let mut subscriber = clb.new_subscriber(&queue_name).await?;
let data = subscriber.get_next().await?;
let message: SubscriptionMessage = prost::Message::decode(&*data).unwrap();
if message.change_type != "delete" {
println!("{}", String::from_utf8_lossy(&*message.payload));
} else {
Err("Receive delete change_type.")?
}

// delete
cl.remote_storage_delete(&[user_id_b.clone()], "remote_storage_demo", false)
.await?;

let data = subscriber.get_next().await?;
clb.unsubscribe(&queue_name).await?;
let message: SubscriptionMessage = prost::Message::decode(&*data).unwrap();
if message.change_type == "delete" {
println!("Deleted");
} else {
Err("Receive non-delete change_type.")?
}

Ok(())
}
2 changes: 1 addition & 1 deletion proto
Submodule proto updated 1 files
+17 −0 colink_registry.proto
7 changes: 7 additions & 0 deletions src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ impl CoLink {
Ok(auth_content.user_id)
}

pub fn get_core_addr(&self) -> Result<String, String> {
if self.core_addr.is_empty() {
return Err("core_addr not found".to_string());
}
Ok(self.core_addr.clone())
}

pub fn update_jwt(&mut self, new_jwt: &str) -> Result<(), String> {
self.jwt = new_jwt.to_string();
Ok(())
Expand Down
8 changes: 6 additions & 2 deletions src/extensions.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
#[cfg(feature = "extension")]
mod extension;
#[cfg(feature = "lock")]
mod lock;
#[cfg(feature = "read_or_wait")]
mod read_or_wait;
#[cfg(feature = "registry")]
pub mod registry;
#[cfg(feature = "remote_storage")]
mod remote_storage;
#[cfg(feature = "variable_transfer")]
mod variable_transfer;
1 change: 0 additions & 1 deletion src/extensions/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ impl crate::application::CoLink {
}
}

#[cfg(feature = "lock")]
pub struct CoLinkLockToken {
key: String,
rnd_num: i32,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use tracing::debug;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

impl crate::application::CoLink {
#[cfg(feature = "extension")]
pub async fn read_or_wait(&self, key: &str) -> Result<Vec<u8>, Error> {
match self.read_entry(key).await {
Ok(res) => Ok(res),
Expand Down
22 changes: 22 additions & 0 deletions src/extensions/registry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use crate::colink_proto::*;
pub use colink_registry_proto::{Registries, Registry};
use prost::Message;
mod colink_registry_proto {
include!(concat!(env!("OUT_DIR"), "/colink_registry.rs"));
}

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

impl crate::application::CoLink {
pub async fn update_registries(&self, registries: &Registries) -> Result<(), Error> {
let participants = vec![Participant {
user_id: self.get_user_id()?,
role: "update_registries".to_string(),
}];
let mut payload = vec![];
registries.encode(&mut payload).unwrap();
self.run_task("registry", &payload, &participants, false)
.await?;
Ok(())
}
}
136 changes: 136 additions & 0 deletions src/extensions/remote_storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
use crate::colink_proto::*;
use colink_remote_storage_proto::*;
use prost::Message;

mod colink_remote_storage_proto {
include!(concat!(env!("OUT_DIR"), "/colink_remote_storage.rs"));
}

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

impl crate::application::CoLink {
pub async fn remote_storage_create(
&self,
providers: &[String],
key: &str,
payload: &[u8],
is_public: bool,
) -> Result<(), Error> {
let mut participants = vec![Participant {
user_id: self.get_user_id()?,
role: "requester".to_string(),
}];
for provider in providers {
participants.push(Participant {
user_id: provider.to_string(),
role: "provider".to_string(),
});
}
let params = CreateParams {
remote_key_name: key.to_string(),
payload: payload.to_vec(),
is_public,
};
let mut payload = vec![];
params.encode(&mut payload).unwrap();
self.run_task("remote_storage.create", &payload, &participants, false)
.await?;
Ok(())
}

pub async fn remote_storage_read(
&self,
provider: &str,
key: &str,
is_public: bool,
holder_id: &str,
) -> Result<Vec<u8>, Error> {
let participants = vec![
Participant {
user_id: self.get_user_id()?,
role: "requester".to_string(),
},
Participant {
user_id: provider.to_string(),
role: "provider".to_string(),
},
];
let params = ReadParams {
remote_key_name: key.to_string(),
is_public,
holder_id: holder_id.to_string(),
};
let mut payload = vec![];
params.encode(&mut payload).unwrap();
let task_id = self
.run_task("remote_storage.read", &payload, &participants, false)
.await?;
let status = self
.read_or_wait(&format!("tasks:{}:status", task_id))
.await?;
if status[0] == 0 {
let data = self
.read_or_wait(&format!("tasks:{}:output", task_id))
.await?;
Ok(data)
} else {
Err(format!("remote_storage.read: status_code: {}", status[0]))?
}
}

pub async fn remote_storage_update(
&self,
providers: &[String],
key: &str,
payload: &[u8],
is_public: bool,
) -> Result<(), Error> {
let mut participants = vec![Participant {
user_id: self.get_user_id()?,
role: "requester".to_string(),
}];
for provider in providers {
participants.push(Participant {
user_id: provider.to_string(),
role: "provider".to_string(),
});
}
let params = UpdateParams {
remote_key_name: key.to_string(),
payload: payload.to_vec(),
is_public,
};
let mut payload = vec![];
params.encode(&mut payload).unwrap();
self.run_task("remote_storage.update", &payload, &participants, false)
.await?;
Ok(())
}

pub async fn remote_storage_delete(
&self,
providers: &[String],
key: &str,
is_public: bool,
) -> Result<(), Error> {
let mut participants = vec![Participant {
user_id: self.get_user_id()?,
role: "requester".to_string(),
}];
for provider in providers {
participants.push(Participant {
user_id: provider.to_string(),
role: "provider".to_string(),
});
}
let params = DeleteParams {
remote_key_name: key.to_string(),
is_public,
};
let mut payload = vec![];
params.encode(&mut payload).unwrap();
self.run_task("remote_storage.delete", &payload, &participants, false)
.await?;
Ok(())
}
}
Loading

0 comments on commit e55b5ff

Please sign in to comment.