Skip to content

Commit

Permalink
Merge pull request #57 from CoLearn-Dev/storage-macro-fs
Browse files Browse the repository at this point in the history
Storage Macro: file system
  • Loading branch information
stneng authored Mar 1, 2023
2 parents 7339794 + 404dbb9 commit a7a3a94
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 2 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "colink"
version = "0.3.4"
version = "0.3.5"
edition = "2021"
description = "CoLink Rust SDK"
license = "MIT"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ CoLink SDK helps both application and protocol developers access the functionali
Add this to your Cargo.toml:
```toml
[dependencies]
colink = "0.3.4"
colink = "0.3.5"
```

## Getting Started
Expand Down
11 changes: 11 additions & 0 deletions src/extensions/storage_macro.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod append;
mod chunk;
mod fs;
mod redis;

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
Expand Down Expand Up @@ -39,6 +40,10 @@ impl crate::application::CoLink {
self._create_entry_redis(&string_before, &string_after, payload)
.await
}
"fs" => {
self._create_entry_fs(&string_before, &string_after, payload)
.await
}
_ => Err(format!(
"invalid storage macro, found {} in key name {}",
macro_type, key_name
Expand All @@ -52,6 +57,7 @@ impl crate::application::CoLink {
match macro_type.as_str() {
"chunk" => self._read_entry_chunk(&string_before).await,
"redis" => self._read_entry_redis(&string_before, &string_after).await,
"fs" => self._read_entry_fs(&string_before, &string_after).await,
_ => Err(format!(
"invalid storage macro, found {} in key name {}",
macro_type, key_name
Expand All @@ -72,6 +78,10 @@ impl crate::application::CoLink {
self._update_entry_redis(&string_before, &string_after, payload)
.await
}
"fs" => {
self._update_entry_fs(&string_before, &string_after, payload)
.await
}
"append" => self._update_entry_append(&string_before, payload).await,
_ => Err(format!(
"invalid storage macro, found {} in key name {}",
Expand All @@ -89,6 +99,7 @@ impl crate::application::CoLink {
self._delete_entry_redis(&string_before, &string_after)
.await
}
"fs" => self._delete_entry_fs(&string_before, &string_after).await,
_ => Err(format!(
"invalid storage macro, found {} in key name {}",
macro_type, key_name
Expand Down
5 changes: 5 additions & 0 deletions src/extensions/storage_macro/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ impl crate::application::CoLink {
"chunk" => {
return self._append_entry_chunk(&string_before, payload).await;
}
"fs" => {
return self
._append_entry_fs(&string_before, &string_after, payload)
.await;
}
_ => {}
}
}
Expand Down
100 changes: 100 additions & 0 deletions src/extensions/storage_macro/fs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use async_recursion::async_recursion;
use std::path::PathBuf;
use tokio::io::AsyncWriteExt;

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

impl crate::application::CoLink {
async fn _sm_fs_get_path(
&self,
path_key_name: &str,
path_suffix: &str,
) -> Result<PathBuf, Error> {
let path_key = format!("{}:path", path_key_name);
let mut path = String::from_utf8(self.read_entry(&path_key).await?)?;
if !path_suffix.is_empty() {
let path_suffix = path_suffix.replace(':', "/");
path += "/";
path += &path_suffix;
}
println!("path: {}", path);
let path = PathBuf::from(path);
tokio::fs::create_dir_all(path.parent().unwrap()).await?;
Ok(path)
}

#[async_recursion]
pub(crate) async fn _create_entry_fs(
&self,
path_key_name: &str,
path_suffix: &str,
payload: &[u8],
) -> Result<String, Error> {
let path = self._sm_fs_get_path(path_key_name, path_suffix).await?;
let mut file = tokio::fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(path)
.await?;
file.write_all(payload).await?;
Ok("ok".to_string())
}

#[async_recursion]
pub(crate) async fn _read_entry_fs(
&self,
path_key_name: &str,
path_suffix: &str,
) -> Result<Vec<u8>, Error> {
let path = self._sm_fs_get_path(path_key_name, path_suffix).await?;
let data = tokio::fs::read(path).await?;
Ok(data)
}

#[async_recursion]
pub(crate) async fn _update_entry_fs(
&self,
path_key_name: &str,
path_suffix: &str,
payload: &[u8],
) -> Result<String, Error> {
let path = self._sm_fs_get_path(path_key_name, path_suffix).await?;
let mut file = tokio::fs::File::create(path).await?;
file.write_all(payload).await?;
Ok("ok".to_string())
}

#[async_recursion]
pub(crate) async fn _append_entry_fs(
&self,
path_key_name: &str,
path_suffix: &str,
payload: &[u8],
) -> Result<String, Error> {
let path = self._sm_fs_get_path(path_key_name, path_suffix).await?;
let lock_token = self.lock(&path.to_string_lossy()).await?;
// use a closure to prevent locking forever caused by errors
let res = async {
let mut file = tokio::fs::OpenOptions::new()
.append(true)
.open(path)
.await?;
file.write_all(payload).await?;
Ok::<String, Error>("ok".to_string())
}
.await;
self.unlock(lock_token).await?;
res
}

#[async_recursion]
pub(crate) async fn _delete_entry_fs(
&self,
path_key_name: &str,
path_suffix: &str,
) -> Result<String, Error> {
let path = self._sm_fs_get_path(path_key_name, path_suffix).await?;
tokio::fs::remove_file(path).await?;
Ok("ok".to_string())
}
}
38 changes: 38 additions & 0 deletions tests/test_storage_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,28 @@ async fn test_storage_macro_redis() -> Result<(), Box<dyn std::error::Error + Se
Ok(())
}

#[tokio::test]
async fn test_storage_macro_fs() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let (_ir, _is, cl) = set_up_test_env_single_user().await?;

cl.create_entry("test_storage_macro_fs:path", b"/tmp/colink-sm-fs-test/test")
.await?;
let key_name = "test_storage_macro_fs:$fs";
test_crud(&cl, key_name).await?;

cl.create_entry(
"test_storage_macro_fs_dir:path",
b"/tmp/colink-sm-fs-test/test-dir",
)
.await?;
let key_name = "test_storage_macro_fs_dir:$fs:test-file";
test_crud(&cl, key_name).await?;
let key_name = "test_storage_macro_fs_dir:$fs:test-dir:test-file";
test_crud(&cl, key_name).await?;

Ok(())
}

async fn test_crud(
cl: &CoLink,
key_name: &str,
Expand Down Expand Up @@ -114,6 +136,22 @@ async fn test_storage_macro_chunk_append(
Ok(())
}

#[tokio::test]
async fn test_storage_macro_fs_append(
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let (_ir, _is, cl) = set_up_test_env_single_user().await?;

cl.create_entry(
"test_storage_macro_fs_append:path",
b"/tmp/colink-sm-fs-test/append-test",
)
.await?;
let key_name = "test_storage_macro_fs_append:$fs";
test_append(&cl, key_name, 5e6 as usize).await?;

Ok(())
}

#[ignore]
#[tokio::test]
async fn test_storage_macro_redis_chunk(
Expand Down

0 comments on commit a7a3a94

Please sign in to comment.