diff --git a/Cargo.toml b/Cargo.toml index a011bff..ddb8b29 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "colink" -version = "0.1.20" +version = "0.1.21" edition = "2021" description = "CoLink Rust SDK" license = "MIT" @@ -16,25 +16,27 @@ chrono = "0.4" ctrlc = { version = "3.2", features = ["termination"] } futures-lite = "1.12" lapin = "2.1" -sha2 = "0.10" prost = "0.10" rand = { version = "0.8.4", features = ["std_rng"] } secp256k1 = { version = "0.21.2", features = ["rand-std"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +sha2 = "0.10" structopt = "0.3" tokio = { version = "1.18", features = ["macros", "rt-multi-thread", "rt", "fs"] } tonic = { version = "0.7", features = ["tls", "tls-roots"] } tracing = "0.1" tracing-subscriber = "0.2" +uuid = { version = "0.8", features = ["v4"] } [build-dependencies] prost-build = "0.10" tonic-build = "0.7" [features] -default = ["extensions", "remote_storage", "variable_transfer", "registry"] +default = ["extensions", "remote_storage", "variable_transfer", "registry", "policy_module"] extensions = [] remote_storage = ["extensions"] variable_transfer = ["extensions", "remote_storage"] registry = [] +policy_module = [] diff --git a/README.md b/README.md index 323dda6..3a6b6b0 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ CoLink SDK helps both application adnd protocol developers access the functional Add this to your Cargo.toml: ```toml [dependencies] -colink = "0.1.20" +colink = "0.1.21" ``` ## Getting Started @@ -74,6 +74,9 @@ cargo run --example mtls_request_core_info
``` ``` +cargo run --example user_policy_module
+``` +``` cargo run --example user_remote_storage
# is optional ``` ``` diff --git a/build.rs b/build.rs index badeae2..81c8f43 100644 --- a/build.rs +++ b/build.rs @@ -7,4 +7,7 @@ fn main() { #[cfg(feature = "registry")] prost_build::compile_protos(&["proto/colink_registry.proto"], &["proto/"]) .unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e)); + #[cfg(feature = "policy_module")] + prost_build::compile_protos(&["proto/colink_policy_module.proto"], &["proto/"]) + .unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e)); } diff --git a/examples/host_import_users_and_set_registry.rs b/examples/host_import_users_and_set_registry.rs index 0d1d612..6b41dd9 100644 --- a/examples/host_import_users_and_set_registry.rs +++ b/examples/host_import_users_and_set_registry.rs @@ -1,3 +1,4 @@ +use colink::extensions::registry::{Registries, Registry}; use colink::*; use std::env; diff --git a/examples/user_policy_module.rs b/examples/user_policy_module.rs new file mode 100644 index 0000000..4c3ae73 --- /dev/null +++ b/examples/user_policy_module.rs @@ -0,0 +1,35 @@ +use colink::extensions::policy_module::{Rule, TaskFilter}; +use colink::CoLink; +use std::env; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args = env::args().skip(1).collect::>(); + let addr = &args[0]; + let jwt = &args[1]; + + let cl = CoLink::new(addr, jwt); + let res = cl.policy_module_get_rules().await?; + println!("{:?}", res); + + let rule_id = cl + .policy_module_add_rule(&Rule { + task_filter: Some(TaskFilter { + protocol_name: "greetings".to_string(), + ..Default::default() + }), + action: "approve".to_string(), + priority: 1, + ..Default::default() + }) + .await?; + println!("rule_id: {}", rule_id); + let res = cl.policy_module_get_rules().await?; + println!("{:?}", res); + + cl.policy_module_remove_rule(&rule_id).await?; + let res = cl.policy_module_get_rules().await?; + println!("{:?}", res); + + Ok(()) +} diff --git a/src/extensions.rs b/src/extensions.rs index 14d7873..568d922 100644 --- a/src/extensions.rs +++ b/src/extensions.rs @@ -2,6 +2,8 @@ mod get_participant_id; #[cfg(feature = "extensions")] mod lock; +#[cfg(feature = "policy_module")] +pub mod policy_module; #[cfg(feature = "extensions")] mod read_or_wait; #[cfg(feature = "registry")] diff --git a/src/extensions/policy_module.rs b/src/extensions/policy_module.rs new file mode 100644 index 0000000..80f5468 --- /dev/null +++ b/src/extensions/policy_module.rs @@ -0,0 +1,164 @@ +use crate::colink_proto::*; +pub use colink_policy_module_proto::*; +use prost::Message; +mod colink_policy_module_proto { + include!(concat!(env!("OUT_DIR"), "/colink_policy_module.rs")); +} + +type Error = Box; + +impl crate::application::CoLink { + pub async fn policy_module_start(&self) -> Result<(), Error> { + let lock = self.lock("_policy_module:settings").await?; + let (mut settings, timestamp): (Settings, i64) = match self + .read_entries(&[StorageEntry { + key_name: "_policy_module:settings".to_string(), + ..Default::default() + }]) + .await + { + Ok(res) => ( + prost::Message::decode(&*res[0].payload)?, + get_timestamp(&res[0].key_path), + ), + Err(_) => (Default::default(), 0), + }; + if settings.enable { + self.unlock(lock).await?; + return self.wait_for_applying(timestamp).await; // Wait for the current timestamp to be applied. + } + settings.enable = true; + let mut payload = vec![]; + settings.encode(&mut payload).unwrap(); + let timestamp = get_timestamp( + &self + .update_entry("_policy_module:settings", &payload) + .await?, + ); + self.unlock(lock).await?; + let participants = vec![Participant { + user_id: self.get_user_id()?, + role: "local".to_string(), + }]; + self.run_task("policy_module", Default::default(), &participants, false) + .await?; + self.wait_for_applying(timestamp).await + } + + pub async fn policy_module_stop(&self) -> Result<(), Error> { + let lock = self.lock("_policy_module:settings").await?; + let mut settings: Settings = match self.read_entry("_policy_module:settings").await { + Ok(res) => prost::Message::decode(&*res)?, + Err(_) => Default::default(), + }; + if !settings.enable { + self.unlock(lock).await?; + return Ok(()); // Return directly here because we only release the lock after the policy module truly stopped. + } + settings.enable = false; + let mut payload = vec![]; + settings.encode(&mut payload).unwrap(); + let timestamp = get_timestamp( + &self + .update_entry("_policy_module:settings", &payload) + .await?, + ); + let res = self.wait_for_applying(timestamp).await; + self.unlock(lock).await?; // Unlock after the policy module truly stopped. + res + } + + pub async fn policy_module_get_rules(&self) -> Result, Error> { + let settings: Settings = match self.read_entry("_policy_module:settings").await { + Ok(res) => prost::Message::decode(&*res)?, + Err(_) => Default::default(), + }; + Ok(settings.rules) + } + + pub async fn policy_module_add_rule(&self, rule: &Rule) -> Result { + let lock = self.lock("_policy_module:settings").await?; + let mut settings: Settings = match self.read_entry("_policy_module:settings").await { + Ok(res) => prost::Message::decode(&*res)?, + Err(_) => Default::default(), + }; + let rule_id = uuid::Uuid::new_v4().to_string(); + let mut rule = rule.clone(); + rule.rule_id = rule_id.clone(); + settings.rules.push(rule); + let mut payload = vec![]; + settings.encode(&mut payload).unwrap(); + let timestamp = get_timestamp( + &self + .update_entry("_policy_module:settings", &payload) + .await?, + ); + self.unlock(lock).await?; + if settings.enable { + self.wait_for_applying(timestamp).await?; + } + Ok(rule_id) + } + + pub async fn policy_module_remove_rule(&self, rule_id: &str) -> Result<(), Error> { + let lock = self.lock("_policy_module:settings").await?; + let mut settings: Settings = match self.read_entry("_policy_module:settings").await { + Ok(res) => prost::Message::decode(&*res)?, + Err(_) => Default::default(), + }; + settings.rules.retain(|x| x.rule_id != rule_id); + let mut payload = vec![]; + settings.encode(&mut payload).unwrap(); + let timestamp = get_timestamp( + &self + .update_entry("_policy_module:settings", &payload) + .await?, + ); + self.unlock(lock).await?; + if settings.enable { + self.wait_for_applying(timestamp).await?; + } + Ok(()) + } + + async fn wait_for_applying(&self, timestamp: i64) -> Result<(), Error> { + let key = "_policy_module:applied_settings_timestamp"; + let start_timestamp = match self + .read_entries(&[StorageEntry { + key_name: key.to_string(), + ..Default::default() + }]) + .await + { + Ok(res) => { + let applied_settings_timestamp = + i64::from_le_bytes(<[u8; 8]>::try_from(&*res[0].payload).unwrap()); + if applied_settings_timestamp >= timestamp { + return Ok(()); + } + get_timestamp(&res[0].key_path) + 1 + } + Err(_) => 0, + }; + let queue_name = self.subscribe(key, Some(start_timestamp)).await?; + let mut subscriber = self.new_subscriber(&queue_name).await?; + loop { + let data = subscriber.get_next().await?; + let message: SubscriptionMessage = Message::decode(&*data).unwrap(); + if message.change_type != "delete" { + let applied_settings_timestamp = + i64::from_le_bytes(<[u8; 8]>::try_from(&*message.payload).unwrap()); + if applied_settings_timestamp >= timestamp { + break; + } + } + } + self.unsubscribe(&queue_name).await?; + Ok(()) + } +} + +fn get_timestamp(key_path: &str) -> i64 { + let pos = key_path.rfind('@').unwrap(); + key_path[pos + 1..].parse().unwrap() +} diff --git a/src/extensions/registry.rs b/src/extensions/registry.rs index a89b0f2..f8553ac 100644 --- a/src/extensions/registry.rs +++ b/src/extensions/registry.rs @@ -2,7 +2,6 @@ use crate::colink_proto::*; pub use colink_registry_proto::{Registries, Registry}; use prost::Message; mod colink_registry_proto { - #![allow(clippy::derive_partial_eq_without_eq)] include!(concat!(env!("OUT_DIR"), "/colink_registry.rs")); } diff --git a/src/extensions/remote_storage.rs b/src/extensions/remote_storage.rs index 88f7243..5463249 100644 --- a/src/extensions/remote_storage.rs +++ b/src/extensions/remote_storage.rs @@ -3,7 +3,6 @@ use colink_remote_storage_proto::*; use prost::Message; mod colink_remote_storage_proto { - #![allow(clippy::derive_partial_eq_without_eq)] include!(concat!(env!("OUT_DIR"), "/colink_remote_storage.rs")); } diff --git a/src/extensions/variable_transfer.rs b/src/extensions/variable_transfer.rs index 2804c50..92805f8 100644 --- a/src/extensions/variable_transfer.rs +++ b/src/extensions/variable_transfer.rs @@ -2,7 +2,6 @@ use crate::colink_proto::*; use colink_remote_storage::*; use prost::Message; mod colink_remote_storage { - #![allow(clippy::derive_partial_eq_without_eq)] include!(concat!(env!("OUT_DIR"), "/colink_remote_storage.rs")); } diff --git a/src/lib.rs b/src/lib.rs index 484cfd9..dcd5f3a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ +#![allow(clippy::derive_partial_eq_without_eq)] mod application; mod protocol; mod colink_proto { - #![allow(clippy::derive_partial_eq_without_eq)] tonic::include_proto!("colink"); } pub use application::{ @@ -11,6 +11,4 @@ pub use colink_proto::*; pub use protocol::{ CoLinkProtocol, ProtocolEntry, _colink_parse_args, _protocol_start, async_trait, }; -mod extensions; -#[cfg(feature = "registry")] -pub use extensions::registry::{Registries, Registry}; +pub mod extensions;