Skip to content

Commit

Permalink
Merge pull request #36 from nociza/storage-macro
Browse files Browse the repository at this point in the history
Storage Macro Redis Support
  • Loading branch information
stneng authored Jan 19, 2023
2 parents c2ed6c6 + 99ec02e commit 30dc314
Show file tree
Hide file tree
Showing 9 changed files with 228 additions and 38 deletions.
2 changes: 2 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[env]
RUST_TEST_THREADS = "1"
6 changes: 5 additions & 1 deletion .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ jobs:
rabbitmq:
image: rabbitmq:3.8-management
ports:
- 5672:5672
- 5672:5672
- 15672:15672
redis:
image: redis
ports:
- 6379:6379
steps:
- name: Checkout
uses: actions/checkout@v3
Expand Down
11 changes: 6 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "colink"
version = "0.2.8"
version = "0.2.9"
edition = "2021"
description = "CoLink Rust SDK"
license = "MIT"
Expand All @@ -10,9 +10,9 @@ documentation = "https://docs.rs/colink"
repository = "https://github.com/CoLearn-Dev/colink-sdk-rust-dev"

[dependencies]
async-recursion = { version = "1.0.0", optional = true }
async-recursion = { version = "1.0", optional = true }
async-trait = "0.1"
base64 = "0.13.0"
base64 = "0.13"
chrono = "0.4"
clap = { version = "4.0", features = ["derive", "env"] }
futures-lite = "1.12"
Expand All @@ -21,8 +21,9 @@ hyper-rustls = { version = "0.23", optional = true }
jsonwebtoken = { version = "7.2", optional = true }
lapin = "2.1"
prost = "0.10"
rand = { version = "0.8.4", features = ["std_rng"] }
rand = { version = "0.8", features = ["std_rng"] }
rcgen = { version = "0.10", optional = true }
redis = { version = "0.22", optional = true }
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls-native-roots"], optional = true }
secp256k1 = { version = "0.25", features = ["rand-std"] }
serde = { version = "1.0", features = ["derive"] }
Expand All @@ -47,4 +48,4 @@ variable_transfer = ["extensions", "remote_storage", "hyper", "jsonwebtoken", "r
registry = []
policy_module = []
instant_server = ["reqwest"]
storage_macro = ["async-recursion"]
storage_macro = ["async-recursion", "redis"]
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ CoLink SDK helps both application adnd protocol developers access the functional
Add this to your Cargo.toml:
```toml
[dependencies]
colink = "0.2.8"
colink = "0.2.9"
```

## Getting Started
Expand Down
2 changes: 2 additions & 0 deletions src/extensions/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
impl crate::application::CoLink {
/// The default retry time cap is 100 ms. If you want to specify a retry time cap, use lock_with_retry_time instead.
pub async fn lock(&self, key: &str) -> Result<CoLinkLockToken, Error> {
#[cfg(feature = "storage_macro")]
let key = &key.replace('$', "_lock_dollar_");
self.lock_with_retry_time(key, 100).await
}

Expand Down
25 changes: 21 additions & 4 deletions src/extensions/storage_macro.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod chunk;
mod db_redis;

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

Expand Down Expand Up @@ -30,12 +31,16 @@ impl crate::application::CoLink {
key_name: &str,
payload: &[u8],
) -> Result<String, Error> {
let (string_before, macro_type, _) = self._parse_macro(key_name);
let (string_before, macro_type, string_after) = self._parse_macro(key_name);
match macro_type.as_str() {
"chunk" => {
self._create_entry_chunk(string_before.as_str(), payload)
.await
}
"redis" => {
self._create_entry_redis(string_before.as_str(), string_after.as_str(), payload)
.await
}
_ => Err(format!(
"invalid storage macro, found {} in key name {}",
macro_type, key_name
Expand All @@ -45,9 +50,13 @@ impl crate::application::CoLink {
}

pub(crate) async fn _sm_read_entry(&self, key_name: &str) -> Result<Vec<u8>, Error> {
let (string_before, macro_type, _) = self._parse_macro(key_name);
let (string_before, macro_type, string_after) = self._parse_macro(key_name);
match macro_type.as_str() {
"chunk" => self._read_entry_chunk(string_before.as_str()).await,
"redis" => {
self._read_entry_redis(string_before.as_str(), string_after.as_str())
.await
}
_ => Err(format!(
"invalid storage macro, found {} in key name {}",
macro_type, key_name
Expand All @@ -61,12 +70,16 @@ impl crate::application::CoLink {
key_name: &str,
payload: &[u8],
) -> Result<String, Error> {
let (string_before, macro_type, _) = self._parse_macro(key_name);
let (string_before, macro_type, string_after) = self._parse_macro(key_name);
match macro_type.as_str() {
"chunk" => {
self._update_entry_chunk(string_before.as_str(), payload)
.await
}
"redis" => {
self._update_entry_redis(string_before.as_str(), string_after.as_str(), payload)
.await
}
_ => Err(format!(
"invalid storage macro, found {} in key name {}",
macro_type, key_name
Expand All @@ -76,9 +89,13 @@ impl crate::application::CoLink {
}

pub(crate) async fn _sm_delete_entry(&self, key_name: &str) -> Result<String, Error> {
let (string_before, macro_type, _) = self._parse_macro(key_name);
let (string_before, macro_type, string_after) = self._parse_macro(key_name);
match macro_type.as_str() {
"chunk" => self._delete_entry_chunk(string_before.as_str()).await,
"redis" => {
self._delete_entry_redis(string_before.as_str(), string_after.as_str())
.await
}
_ => Err(format!(
"invalid storage macro, found {} in key name {}",
macro_type, key_name
Expand Down
64 changes: 37 additions & 27 deletions src/extensions/storage_macro/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,34 +50,39 @@ impl crate::application::CoLink {
) -> Result<String, Error> {
let metadata_key = format!("{}:chunk_metadata", key_name);
// lock the metadata entry to prevent simultaneous writes
let lock_token = self.lock(&metadata_key.clone()).await?;
// create the chunks and store them
let chunk_paths = self._store_chunks(payload, key_name).await?;
// make sure that the chunk paths are smaller than the maximum entry size
let chunk_paths_string = self._check_chunk_paths_size(chunk_paths)?;
// store the chunk paths in the metadata entry and update metadata
let response = self
.create_entry(&metadata_key.clone(), &chunk_paths_string.into_bytes())
.await?;
let lock_token = self.lock(&metadata_key).await?;
// use a closure to prevent locking forever caused by errors
let res = async {
// create the chunks and store them
let chunk_paths = self._store_chunks(payload, key_name).await?;
// make sure that the chunk paths are smaller than the maximum entry size
let chunk_paths_string = self._check_chunk_paths_size(chunk_paths)?;
// store the chunk paths in the metadata entry and update metadata
let response = self
.create_entry(&metadata_key, &chunk_paths_string.into_bytes())
.await?;
Ok::<String, Error>(response)
}
.await;
self.unlock(lock_token).await?;
Ok(response)
res
}

#[async_recursion]
pub(crate) async fn _read_entry_chunk(&self, key_name: &str) -> Result<Vec<u8>, Error> {
let metadata_key = format!("{}:chunk_metadata", key_name);
let metadata_response = self.read_entry(&metadata_key.clone()).await?;
let payload_string = String::from_utf8(metadata_response.clone())?;
let metadata_response = self.read_entry(&metadata_key).await?;
let payload_string = String::from_utf8(metadata_response)?;
let user_id = decode_jwt_without_validation(&self.jwt).unwrap().user_id;

// read the chunks into a single vector
let chunks_paths = payload_string.split(';').collect::<Vec<&str>>();
let mut payload = Vec::new();
for (i, timestamp) in chunks_paths.iter().enumerate() {
let response = self
let mut response = self
.read_entry(&format!("{}::{}:{}@{}", user_id, key_name, i, timestamp))
.await?;
payload.append(&mut response.clone());
payload.append(&mut response);
}
Ok(payload)
}
Expand All @@ -90,25 +95,30 @@ impl crate::application::CoLink {
) -> Result<String, Error> {
let metadata_key = format!("{}:chunk_metadata", key_name);
// lock the metadata entry to prevent simultaneous writes
let lock_token = self.lock(&metadata_key.clone()).await?;
// split payload into chunks and update the chunks
let chunk_paths = self._store_chunks(payload, key_name).await?;
// make sure that the chunk paths are smaller than the maximum entry size
let chunk_paths_string = self._check_chunk_paths_size(chunk_paths)?;
// update the metadata entry
let response = self
.update_entry(&metadata_key.clone(), &chunk_paths_string.into_bytes())
.await?;
let lock_token = self.lock(&metadata_key).await?;
// use a closure to prevent locking forever caused by errors
let res = async {
// split payload into chunks and update the chunks
let chunk_paths = self._store_chunks(payload, key_name).await?;
// make sure that the chunk paths are smaller than the maximum entry size
let chunk_paths_string = self._check_chunk_paths_size(chunk_paths)?;
// update the metadata entry
let response = self
.update_entry(&metadata_key, &chunk_paths_string.into_bytes())
.await?;
Ok::<String, Error>(response)
}
.await;
self.unlock(lock_token).await?;
Ok(response)
res
}

#[async_recursion]
pub(crate) async fn _delete_entry_chunk(&self, key_name: &str) -> Result<String, Error> {
let metadata_key = format!("{}:chunk_metadata", key_name);
let lock_token = self.lock(&metadata_key.clone()).await?;
let response = self.delete_entry(&metadata_key.clone()).await?;
let lock_token = self.lock(&metadata_key).await?;
let res = self.delete_entry(&metadata_key).await;
self.unlock(lock_token).await?;
Ok(response)
res
}
}
77 changes: 77 additions & 0 deletions src/extensions/storage_macro/db_redis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use async_recursion::async_recursion;
use redis::Commands;

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

impl crate::application::CoLink {
fn _get_con_from_address(&self, redis_address: &str) -> Result<redis::Connection, Error> {
let client = redis::Client::open(redis_address)?;
let con = client.get_connection()?;
Ok(con)
}

async fn _get_con_from_stored_credentials(
&self,
key_path: &str,
) -> Result<redis::Connection, Error> {
let redis_url_key = format!("{}:redis_url", key_path);
let redis_url = self.read_entry(redis_url_key.as_str()).await?;
let redis_url_string = String::from_utf8(redis_url)?;
self._get_con_from_address(redis_url_string.as_str())
}

#[async_recursion]
pub(crate) async fn _create_entry_redis(
&self,
address: &str,
key_name: &str,
payload: &[u8],
) -> Result<String, Error> {
let mut con = self._get_con_from_stored_credentials(address).await?;
let response: i32 = con.set_nx(key_name, payload)?;
if response == 0 {
Err("key already exists.")?
}
Ok(response.to_string())
}

#[async_recursion]
pub(crate) async fn _read_entry_redis(
&self,
address: &str,
key_name: &str,
) -> Result<Vec<u8>, Error> {
let mut con = self._get_con_from_stored_credentials(address).await?;
let response: Option<Vec<u8>> = con.get(key_name)?;
match response {
Some(response) => Ok(response),
None => Err("key does not exist.")?,
}
}

#[async_recursion]
pub(crate) async fn _update_entry_redis(
&self,
address: &str,
key_name: &str,
payload: &[u8],
) -> Result<String, Error> {
let mut con = self._get_con_from_stored_credentials(address).await?;
let response = con.set(key_name, payload)?;
Ok(response)
}

#[async_recursion]
pub(crate) async fn _delete_entry_redis(
&self,
address: &str,
key_name: &str,
) -> Result<String, Error> {
let mut con = self._get_con_from_stored_credentials(address).await?;
let response: i32 = con.del(key_name)?;
if response == 0 {
Err("key does not exist.")?
}
Ok(response.to_string())
}
}
77 changes: 77 additions & 0 deletions tests/test_storage_macro.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use colink::{
extensions::instant_server::{InstantRegistry, InstantServer},
CoLink,
};
use rand::Rng;

#[tokio::test]
async fn test_storage_macro_chunk() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>
{
let _ir = InstantRegistry::new();
let is = InstantServer::new();
let cl = is.get_colink().switch_to_generated_user().await?;

let key_name = "storage_macro_test_chunk:$chunk";
test_crud(&cl, key_name).await?;

Ok(())
}

#[tokio::test]
async fn test_storage_macro_redis() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>
{
let _ir = InstantRegistry::new();
let is = InstantServer::new();
let cl = is.get_colink().switch_to_generated_user().await?;

cl.create_entry("storage_macro_test_redis:redis_url", b"redis://localhost")
.await?;
let key_name = "storage_macro_test_redis:$redis:redis_key";
test_crud(&cl, key_name).await?;

Ok(())
}

#[ignore]
#[tokio::test]
async fn test_storage_macro_chunk_redis(
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let _ir = InstantRegistry::new();
let is = InstantServer::new();
let cl = is.get_colink().switch_to_generated_user().await?;

cl.create_entry(
"test_storage_macro_chunk_redis:redis_url",
b"redis://localhost",
)
.await?;
let key_name = "test_storage_macro_chunk_redis:$redis:redis_chunk:$chunk";
test_crud(&cl, key_name).await?;

Ok(())
}

async fn test_crud(
cl: &CoLink,
key_name: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let payload = rand::thread_rng()
.sample_iter(&rand::distributions::Standard)
.take(5e6 as usize)
.collect::<Vec<u8>>();
cl.create_entry(key_name, &payload.clone()).await?;
assert!(cl.create_entry(key_name, b"").await.is_err());
let data = cl.read_entry(key_name).await?;
assert_eq!(data, payload);
let new_payload = rand::thread_rng()
.sample_iter(&rand::distributions::Standard)
.take(3e6 as usize)
.collect::<Vec<u8>>();
cl.update_entry(key_name, &new_payload.clone()).await?;
let data = cl.read_entry(key_name).await?;
assert_eq!(data, new_payload);
cl.delete_entry(key_name).await?;
assert!(cl.read_entry(key_name).await.is_err());
assert!(cl.delete_entry(key_name).await.is_err());
Ok(())
}

0 comments on commit 30dc314

Please sign in to comment.