Skip to content

Commit

Permalink
Merge pull request #18 from CoLearn-Dev/policy_module
Browse files Browse the repository at this point in the history
- policy module extension
  • Loading branch information
stneng authored Oct 23, 2022
2 parents c014202 + 2ba76c2 commit 72e8f6f
Show file tree
Hide file tree
Showing 11 changed files with 216 additions and 11 deletions.
8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "colink"
version = "0.1.20"
version = "0.1.21"
edition = "2021"
description = "CoLink Rust SDK"
license = "MIT"
Expand All @@ -16,25 +16,27 @@ chrono = "0.4"
ctrlc = { version = "3.2", features = ["termination"] }
futures-lite = "1.12"
lapin = "2.1"
sha2 = "0.10"
prost = "0.10"
rand = { version = "0.8.4", features = ["std_rng"] }
secp256k1 = { version = "0.21.2", features = ["rand-std"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha2 = "0.10"
structopt = "0.3"
tokio = { version = "1.18", features = ["macros", "rt-multi-thread", "rt", "fs"] }
tonic = { version = "0.7", features = ["tls", "tls-roots"] }
tracing = "0.1"
tracing-subscriber = "0.2"
uuid = { version = "0.8", features = ["v4"] }

[build-dependencies]
prost-build = "0.10"
tonic-build = "0.7"

[features]
default = ["extensions", "remote_storage", "variable_transfer", "registry"]
default = ["extensions", "remote_storage", "variable_transfer", "registry", "policy_module"]
extensions = []
remote_storage = ["extensions"]
variable_transfer = ["extensions", "remote_storage"]
registry = []
policy_module = []
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ CoLink SDK helps both application adnd protocol developers access the functional
Add this to your Cargo.toml:
```toml
[dependencies]
colink = "0.1.20"
colink = "0.1.21"
```

## Getting Started
Expand Down Expand Up @@ -74,6 +74,9 @@ cargo run --example mtls_request_core_info <address> <ca_certificate> <client_ce
cargo run --example user_lock <address> <user_jwt>
```
```
cargo run --example user_policy_module <address> <user_jwt>
```
```
cargo run --example user_remote_storage <address> <user_jwt A> <user_jwt B> <message> # <message> is optional
```
```
Expand Down
3 changes: 3 additions & 0 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ fn main() {
#[cfg(feature = "registry")]
prost_build::compile_protos(&["proto/colink_registry.proto"], &["proto/"])
.unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
#[cfg(feature = "policy_module")]
prost_build::compile_protos(&["proto/colink_policy_module.proto"], &["proto/"])
.unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e));
}
1 change: 1 addition & 0 deletions examples/host_import_users_and_set_registry.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use colink::extensions::registry::{Registries, Registry};
use colink::*;
use std::env;

Expand Down
35 changes: 35 additions & 0 deletions examples/user_policy_module.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use colink::extensions::policy_module::{Rule, TaskFilter};
use colink::CoLink;
use std::env;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let args = env::args().skip(1).collect::<Vec<_>>();
let addr = &args[0];
let jwt = &args[1];

let cl = CoLink::new(addr, jwt);
let res = cl.policy_module_get_rules().await?;
println!("{:?}", res);

let rule_id = cl
.policy_module_add_rule(&Rule {
task_filter: Some(TaskFilter {
protocol_name: "greetings".to_string(),
..Default::default()
}),
action: "approve".to_string(),
priority: 1,
..Default::default()
})
.await?;
println!("rule_id: {}", rule_id);
let res = cl.policy_module_get_rules().await?;
println!("{:?}", res);

cl.policy_module_remove_rule(&rule_id).await?;
let res = cl.policy_module_get_rules().await?;
println!("{:?}", res);

Ok(())
}
2 changes: 2 additions & 0 deletions src/extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
mod get_participant_id;
#[cfg(feature = "extensions")]
mod lock;
#[cfg(feature = "policy_module")]
pub mod policy_module;
#[cfg(feature = "extensions")]
mod read_or_wait;
#[cfg(feature = "registry")]
Expand Down
164 changes: 164 additions & 0 deletions src/extensions/policy_module.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
use crate::colink_proto::*;
pub use colink_policy_module_proto::*;
use prost::Message;
mod colink_policy_module_proto {
include!(concat!(env!("OUT_DIR"), "/colink_policy_module.rs"));
}

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

impl crate::application::CoLink {
pub async fn policy_module_start(&self) -> Result<(), Error> {
let lock = self.lock("_policy_module:settings").await?;
let (mut settings, timestamp): (Settings, i64) = match self
.read_entries(&[StorageEntry {
key_name: "_policy_module:settings".to_string(),
..Default::default()
}])
.await
{
Ok(res) => (
prost::Message::decode(&*res[0].payload)?,
get_timestamp(&res[0].key_path),
),
Err(_) => (Default::default(), 0),
};
if settings.enable {
self.unlock(lock).await?;
return self.wait_for_applying(timestamp).await; // Wait for the current timestamp to be applied.
}
settings.enable = true;
let mut payload = vec![];
settings.encode(&mut payload).unwrap();
let timestamp = get_timestamp(
&self
.update_entry("_policy_module:settings", &payload)
.await?,
);
self.unlock(lock).await?;
let participants = vec![Participant {
user_id: self.get_user_id()?,
role: "local".to_string(),
}];
self.run_task("policy_module", Default::default(), &participants, false)
.await?;
self.wait_for_applying(timestamp).await
}

pub async fn policy_module_stop(&self) -> Result<(), Error> {
let lock = self.lock("_policy_module:settings").await?;
let mut settings: Settings = match self.read_entry("_policy_module:settings").await {
Ok(res) => prost::Message::decode(&*res)?,
Err(_) => Default::default(),
};
if !settings.enable {
self.unlock(lock).await?;
return Ok(()); // Return directly here because we only release the lock after the policy module truly stopped.
}
settings.enable = false;
let mut payload = vec![];
settings.encode(&mut payload).unwrap();
let timestamp = get_timestamp(
&self
.update_entry("_policy_module:settings", &payload)
.await?,
);
let res = self.wait_for_applying(timestamp).await;
self.unlock(lock).await?; // Unlock after the policy module truly stopped.
res
}

pub async fn policy_module_get_rules(&self) -> Result<Vec<Rule>, Error> {
let settings: Settings = match self.read_entry("_policy_module:settings").await {
Ok(res) => prost::Message::decode(&*res)?,
Err(_) => Default::default(),
};
Ok(settings.rules)
}

pub async fn policy_module_add_rule(&self, rule: &Rule) -> Result<String, Error> {
let lock = self.lock("_policy_module:settings").await?;
let mut settings: Settings = match self.read_entry("_policy_module:settings").await {
Ok(res) => prost::Message::decode(&*res)?,
Err(_) => Default::default(),
};
let rule_id = uuid::Uuid::new_v4().to_string();
let mut rule = rule.clone();
rule.rule_id = rule_id.clone();
settings.rules.push(rule);
let mut payload = vec![];
settings.encode(&mut payload).unwrap();
let timestamp = get_timestamp(
&self
.update_entry("_policy_module:settings", &payload)
.await?,
);
self.unlock(lock).await?;
if settings.enable {
self.wait_for_applying(timestamp).await?;
}
Ok(rule_id)
}

pub async fn policy_module_remove_rule(&self, rule_id: &str) -> Result<(), Error> {
let lock = self.lock("_policy_module:settings").await?;
let mut settings: Settings = match self.read_entry("_policy_module:settings").await {
Ok(res) => prost::Message::decode(&*res)?,
Err(_) => Default::default(),
};
settings.rules.retain(|x| x.rule_id != rule_id);
let mut payload = vec![];
settings.encode(&mut payload).unwrap();
let timestamp = get_timestamp(
&self
.update_entry("_policy_module:settings", &payload)
.await?,
);
self.unlock(lock).await?;
if settings.enable {
self.wait_for_applying(timestamp).await?;
}
Ok(())
}

async fn wait_for_applying(&self, timestamp: i64) -> Result<(), Error> {
let key = "_policy_module:applied_settings_timestamp";
let start_timestamp = match self
.read_entries(&[StorageEntry {
key_name: key.to_string(),
..Default::default()
}])
.await
{
Ok(res) => {
let applied_settings_timestamp =
i64::from_le_bytes(<[u8; 8]>::try_from(&*res[0].payload).unwrap());
if applied_settings_timestamp >= timestamp {
return Ok(());
}
get_timestamp(&res[0].key_path) + 1
}
Err(_) => 0,
};
let queue_name = self.subscribe(key, Some(start_timestamp)).await?;
let mut subscriber = self.new_subscriber(&queue_name).await?;
loop {
let data = subscriber.get_next().await?;
let message: SubscriptionMessage = Message::decode(&*data).unwrap();
if message.change_type != "delete" {
let applied_settings_timestamp =
i64::from_le_bytes(<[u8; 8]>::try_from(&*message.payload).unwrap());
if applied_settings_timestamp >= timestamp {
break;
}
}
}
self.unsubscribe(&queue_name).await?;
Ok(())
}
}

fn get_timestamp(key_path: &str) -> i64 {
let pos = key_path.rfind('@').unwrap();
key_path[pos + 1..].parse().unwrap()
}
1 change: 0 additions & 1 deletion src/extensions/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::colink_proto::*;
pub use colink_registry_proto::{Registries, Registry};
use prost::Message;
mod colink_registry_proto {
#![allow(clippy::derive_partial_eq_without_eq)]
include!(concat!(env!("OUT_DIR"), "/colink_registry.rs"));
}

Expand Down
1 change: 0 additions & 1 deletion src/extensions/remote_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use colink_remote_storage_proto::*;
use prost::Message;

mod colink_remote_storage_proto {
#![allow(clippy::derive_partial_eq_without_eq)]
include!(concat!(env!("OUT_DIR"), "/colink_remote_storage.rs"));
}

Expand Down
1 change: 0 additions & 1 deletion src/extensions/variable_transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::colink_proto::*;
use colink_remote_storage::*;
use prost::Message;
mod colink_remote_storage {
#![allow(clippy::derive_partial_eq_without_eq)]
include!(concat!(env!("OUT_DIR"), "/colink_remote_storage.rs"));
}

Expand Down
6 changes: 2 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![allow(clippy::derive_partial_eq_without_eq)]
mod application;
mod protocol;
mod colink_proto {
#![allow(clippy::derive_partial_eq_without_eq)]
tonic::include_proto!("colink");
}
pub use application::{
Expand All @@ -11,6 +11,4 @@ pub use colink_proto::*;
pub use protocol::{
CoLinkProtocol, ProtocolEntry, _colink_parse_args, _protocol_start, async_trait,
};
mod extensions;
#[cfg(feature = "registry")]
pub use extensions::registry::{Registries, Registry};
pub mod extensions;

0 comments on commit 72e8f6f

Please sign in to comment.