Skip to content

Commit

Permalink
Merge pull request #35 from CoLearn-Dev/vt-inbox
Browse files Browse the repository at this point in the history
VT inbox
  • Loading branch information
stneng authored Jan 3, 2023
2 parents 81247be + bf6a7a7 commit 4c65290
Show file tree
Hide file tree
Showing 9 changed files with 776 additions and 100 deletions.
15 changes: 10 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "colink"
version = "0.2.5"
version = "0.2.6"
edition = "2021"
description = "CoLink Rust SDK"
license = "MIT"
Expand All @@ -10,20 +10,25 @@ documentation = "https://docs.rs/colink"
repository = "https://github.com/CoLearn-Dev/colink-sdk-rust-dev"

[dependencies]
async-recursion = "1.0.0"
async-recursion = { version = "1.0.0", optional = true }
async-trait = "0.1"
base64 = "0.13.0"
chrono = "0.4"
clap = { version = "4.0", features = ["derive", "env"] }
futures-lite = "1.12"
hyper = { version = "0.14", optional = true }
hyper-rustls = { version = "0.23", optional = true }
jsonwebtoken = { version = "7.2", optional = true }
lapin = "2.1"
prost = "0.10"
rand = { version = "0.8.4", features = ["std_rng"] }
secp256k1 = { version = "0.21.2", features = ["rand-std"] }
rcgen = { version = "0.10", optional = true }
secp256k1 = { version = "0.25", features = ["rand-std"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha2 = "0.10"
tokio = { version = "1.18", features = ["macros", "rt-multi-thread", "rt", "fs"] }
tokio-rustls = { version = "0.23", optional = true }
tonic = { version = "0.7", features = ["tls", "tls-roots"] }
tracing = "0.1"
tracing-subscriber = "0.2"
Expand All @@ -37,8 +42,8 @@ tonic-build = "0.7"
default = ["extensions", "remote_storage", "variable_transfer", "registry", "policy_module", "instant_server", "storage_macro"]
extensions = []
remote_storage = ["extensions"]
variable_transfer = ["extensions", "remote_storage"]
variable_transfer = ["extensions", "remote_storage", "hyper", "jsonwebtoken", "rcgen", "tokio-rustls", "hyper-rustls"]
registry = []
policy_module = []
instant_server = []
storage_macro = []
storage_macro = ["async-recursion"]
12 changes: 12 additions & 0 deletions src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use lapin::{
use secp256k1::Secp256k1;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::sync::Arc;
use tonic::{
metadata::MetadataValue,
transport::{Certificate, Channel, ClientTlsConfig, Identity},
Expand All @@ -30,6 +31,8 @@ pub struct CoLink {
pub(crate) task_id: String,
pub(crate) ca_certificate: Option<Certificate>,
pub(crate) identity: Option<Identity>,
#[cfg(feature = "variable_transfer")]
pub(crate) vt_p2p_ctx: Arc<crate::extensions::variable_transfer::p2p_inbox::VtP2pCtx>,
}

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
Expand All @@ -42,6 +45,10 @@ impl CoLink {
task_id: "".to_string(),
ca_certificate: None,
identity: None,
#[cfg(feature = "variable_transfer")]
vt_p2p_ctx: Arc::new(
crate::extensions::variable_transfer::p2p_inbox::VtP2pCtx::default(),
),
}
}

Expand Down Expand Up @@ -82,6 +89,11 @@ impl CoLink {

pub fn set_task_id(&mut self, task_id: &str) {
self.task_id = task_id.to_string();
#[cfg(feature = "variable_transfer")]
{
self.vt_p2p_ctx =
Arc::new(crate::extensions::variable_transfer::p2p_inbox::VtP2pCtx::default());
}
}

pub fn get_task_id(&self) -> Result<String, String> {
Expand Down
2 changes: 1 addition & 1 deletion src/extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod storage_macro;
#[cfg(feature = "extensions")]
mod switch_to_generated_user;
#[cfg(feature = "variable_transfer")]
mod variable_transfer;
pub(crate) mod variable_transfer;
#[cfg(feature = "extensions")]
mod wait_task;
#[cfg(feature = "extensions")]
Expand Down
68 changes: 25 additions & 43 deletions src/extensions/variable_transfer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::colink_proto::*;
use colink_remote_storage::*;
use prost::Message;
mod colink_remote_storage {
include!(concat!(env!("OUT_DIR"), "/colink_remote_storage.rs"));
}
use std::sync::Arc;
pub(crate) mod p2p_inbox;
mod remote_storage;
mod tls_utils;

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

Expand All @@ -17,52 +16,35 @@ impl crate::application::CoLink {
if self.task_id.is_empty() {
Err("task_id not found".to_string())?;
}
let mut new_participants = vec![Participant {
user_id: self.get_user_id()?,
role: "requester".to_string(),
}];
for p in receivers {
if p.user_id == self.get_user_id()? {
self.create_entry(
&format!(
"_remote_storage:private:{}:_variable_transfer:{}:{}",
p.user_id,
self.get_task_id()?,
key
),
payload,
)
.await?;
} else {
new_participants.push(Participant {
user_id: p.user_id.clone(),
role: "provider".to_string(),
});
}
let payload = Arc::new(payload.to_vec());
for receiver in receivers {
let cl = self.clone();
let key = key.to_string();
let payload = payload.clone();
let receiver = receiver.clone();
tokio::spawn(async move {
if cl
._set_variable_p2p(&key, &payload, &receiver)
.await
.is_err()
{
cl.set_variable_with_remote_storage(&key, &payload, &[receiver.clone()])
.await?;
}
Ok::<(), Box<dyn std::error::Error + Send + Sync + 'static>>(())
});
}
let params = CreateParams {
remote_key_name: format!("_variable_transfer:{}:{}", self.get_task_id()?, key),
payload: payload.to_vec(),
..Default::default()
};
let mut payload = vec![];
params.encode(&mut payload).unwrap();
self.run_task("remote_storage.create", &payload, &new_participants, false)
.await?;
Ok(())
}

pub async fn get_variable(&self, key: &str, sender: &Participant) -> Result<Vec<u8>, Error> {
if self.task_id.is_empty() {
Err("task_id not found".to_string())?;
}
let key = format!(
"_remote_storage:private:{}:_variable_transfer:{}:{}",
sender.user_id,
self.get_task_id()?,
key
);
let res = self.read_or_wait(&key).await?;
if let Ok(res) = self._get_variable_p2p(key, sender).await {
return Ok(res);
}
let res = self.get_variable_with_remote_storage(key, sender).await?;
Ok(res)
}
}
Loading

0 comments on commit 4c65290

Please sign in to comment.