Skip to content

Commit

Permalink
Merge pull request #44 from CoLearn-Dev/sm-append
Browse files Browse the repository at this point in the history
Storage Macro Append
  • Loading branch information
stneng authored Feb 6, 2023
2 parents d5ee1e6 + 9755647 commit 08119c2
Show file tree
Hide file tree
Showing 12 changed files with 304 additions and 118 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "colink"
version = "0.2.9"
version = "0.2.10"
edition = "2021"
description = "CoLink Rust SDK"
license = "MIT"
Expand All @@ -23,13 +23,13 @@ lapin = "2.1"
prost = "0.10"
rand = { version = "0.8", features = ["std_rng"] }
rcgen = { version = "0.10", optional = true }
redis = { version = "0.22", optional = true }
redis = { version = "0.22", features = ["tokio-comp"], optional = true }
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls-native-roots"], optional = true }
secp256k1 = { version = "0.25", features = ["rand-std"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha2 = "0.10"
tokio = { version = "1.18", features = ["macros", "rt-multi-thread", "rt", "fs"] }
tokio = { version = "1.24", features = ["macros", "rt-multi-thread", "rt", "fs"] }
tokio-rustls = { version = "0.23", optional = true }
tonic = { version = "0.7", features = ["tls", "tls-roots"] }
tracing = "0.1"
Expand Down
23 changes: 5 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# CoLink Rust SDK

CoLink SDK helps both application adnd protocol developers access the functionalities provided by [the CoLink server](https://github.com/CoLearn-Dev/colink-server-dev).
CoLink SDK helps both application and protocol developers access the functionalities provided by [the CoLink server](https://github.com/CoLearn-Dev/colink-server-dev).

- For *application developers*, CoLink SDK allows them to update storage, manage computation requests, and monitor the CoLink server status.
- For *protocol developers*, CoLink SDK allows them to write CoLink Extensions that extend the functionality of CoLink to support new protocols.
Expand All @@ -9,28 +9,15 @@ CoLink SDK helps both application adnd protocol developers access the functional
Add this to your Cargo.toml:
```toml
[dependencies]
colink = "0.2.9"
colink = "0.2.10"
```

## Getting Started
You can use this SDK to run protocols, update storage, developing protocol operators. Here is a tutorial for you about how to start a greetings task between two users.
- Set up CoLink server.
Please refer to [colinkctl](https://github.com/CoLearn-Dev/colinkctl), and run the command below. For the following steps, we assume you are using the default settings in colinkctl.
```bash
colinkctl enable_dev_env
```
- Create two new terminals and start protocol operator for two users separately.
```bash
cargo run --example protocol_greetings -- --addr http://localhost:8080 --jwt $(sed -n "1,1p" ~/.colink/user_token.txt)
```
```bash
cargo run --example protocol_greetings -- --addr http://localhost:8080 --jwt $(sed -n "2,2p" ~/.colink/user_token.txt)
```
- Run task
```bash
cargo run --example user_run_task http://localhost:8080 $(sed -n "1,2p" ~/.colink/user_token.txt)
```
- Check the output in protocol operators' terminals
Please refer to [CoLink Server Setup](https://co-learn.notion.site/CoLink-Server-Setup-aa58e481e36e40cba83a002c1f3bd158)
- Use Rust SDK.
Please refer to [CoLink SDK Examples in Rust](https://co-learn.notion.site/CoLink-SDK-Examples-in-Rust-a9b583ac5d764390aeba7293aa63f39d)

## More examples
### Application
Expand Down
31 changes: 12 additions & 19 deletions src/extensions/storage_macro.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
mod append;
mod chunk;
mod db_redis;
mod redis;

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

impl crate::application::CoLink {
fn _parse_macro(&self, key_name: &str) -> (String, String, String) {
pub(crate) fn _parse_macro(&self, key_name: &str) -> (String, String, String) {
let split_key = key_name.split(':').collect::<Vec<&str>>();
let mut macro_type = String::new();
for s in split_key.iter().rev() {
Expand Down Expand Up @@ -33,12 +34,9 @@ impl crate::application::CoLink {
) -> Result<String, Error> {
let (string_before, macro_type, string_after) = self._parse_macro(key_name);
match macro_type.as_str() {
"chunk" => {
self._create_entry_chunk(string_before.as_str(), payload)
.await
}
"chunk" => self._create_entry_chunk(&string_before, payload).await,
"redis" => {
self._create_entry_redis(string_before.as_str(), string_after.as_str(), payload)
self._create_entry_redis(&string_before, &string_after, payload)
.await
}
_ => Err(format!(
Expand All @@ -52,11 +50,8 @@ impl crate::application::CoLink {
pub(crate) async fn _sm_read_entry(&self, key_name: &str) -> Result<Vec<u8>, Error> {
let (string_before, macro_type, string_after) = self._parse_macro(key_name);
match macro_type.as_str() {
"chunk" => self._read_entry_chunk(string_before.as_str()).await,
"redis" => {
self._read_entry_redis(string_before.as_str(), string_after.as_str())
.await
}
"chunk" => self._read_entry_chunk(&string_before).await,
"redis" => self._read_entry_redis(&string_before, &string_after).await,
_ => Err(format!(
"invalid storage macro, found {} in key name {}",
macro_type, key_name
Expand All @@ -72,14 +67,12 @@ impl crate::application::CoLink {
) -> Result<String, Error> {
let (string_before, macro_type, string_after) = self._parse_macro(key_name);
match macro_type.as_str() {
"chunk" => {
self._update_entry_chunk(string_before.as_str(), payload)
.await
}
"chunk" => self._update_entry_chunk(&string_before, payload).await,
"redis" => {
self._update_entry_redis(string_before.as_str(), string_after.as_str(), payload)
self._update_entry_redis(&string_before, &string_after, payload)
.await
}
"append" => self._update_entry_append(&string_before, payload).await,
_ => Err(format!(
"invalid storage macro, found {} in key name {}",
macro_type, key_name
Expand All @@ -91,9 +84,9 @@ impl crate::application::CoLink {
pub(crate) async fn _sm_delete_entry(&self, key_name: &str) -> Result<String, Error> {
let (string_before, macro_type, string_after) = self._parse_macro(key_name);
match macro_type.as_str() {
"chunk" => self._delete_entry_chunk(string_before.as_str()).await,
"chunk" => self._delete_entry_chunk(&string_before).await,
"redis" => {
self._delete_entry_redis(string_before.as_str(), string_after.as_str())
self._delete_entry_redis(&string_before, &string_after)
.await
}
_ => Err(format!(
Expand Down
38 changes: 38 additions & 0 deletions src/extensions/storage_macro/append.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use async_recursion::async_recursion;

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

impl crate::application::CoLink {
#[async_recursion]
pub(crate) async fn _update_entry_append(
&self,
key_name: &str,
payload: &[u8],
) -> Result<String, Error> {
if key_name.contains('$') {
let (string_before, macro_type, string_after) = self._parse_macro(key_name);
match macro_type.as_str() {
"redis" => {
return self
._append_entry_redis(&string_before, &string_after, payload)
.await;
}
"chunk" => {
return self._append_entry_chunk(&string_before, payload).await;
}
_ => {}
}
}
let lock = self.lock(key_name).await?;
// use a closure to prevent locking forever caused by errors
let res = async {
let mut data = self.read_entry(key_name).await?;
data.append(&mut payload.to_vec());
let res = self.update_entry(key_name, &data).await?;
Ok::<String, Error>(res)
}
.await;
self.unlock(lock).await?;
res
}
}
89 changes: 87 additions & 2 deletions src/extensions/storage_macro/chunk.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::decode_jwt_without_validation;
use async_recursion::async_recursion;

const CHUNK_SIZE: usize = 1024 * 1024; // use 1MB chunks
Expand Down Expand Up @@ -30,6 +29,62 @@ impl crate::application::CoLink {
Ok(chunk_paths)
}

#[async_recursion]
async fn _append_chunks(
&self,
chunk_paths: &str,
payload: &[u8],
key_name: &str,
) -> Result<Vec<String>, Error> {
let mut chunk_paths = chunk_paths
.split(';')
.map(|x| x.to_string())
.collect::<Vec<String>>();
let last_chunk_id = chunk_paths.len() - 1;
let last_chunk_timestamp = chunk_paths[last_chunk_id].clone();
let mut last_chunk = self
.read_entry(&format!(
"{}::{}:{}@{}",
self.get_user_id()?,
key_name,
last_chunk_id,
last_chunk_timestamp
))
.await?;
let mut offset = 0;
let mut chunk_id = chunk_paths.len();
if last_chunk.len() < CHUNK_SIZE {
let chunk_size = if payload.len() < CHUNK_SIZE - last_chunk.len() {
payload.len()
} else {
CHUNK_SIZE - last_chunk.len()
};
last_chunk.append(&mut payload[..chunk_size].to_vec());
let response = self
.update_entry(&format!("{}:{}", key_name, last_chunk_id), &last_chunk)
.await?;
chunk_paths[last_chunk_id] = response.split('@').last().unwrap().to_string();
offset = chunk_size;
}
while offset < payload.len() {
let chunk_size = if offset + CHUNK_SIZE > payload.len() {
payload.len() - offset
} else {
CHUNK_SIZE
};
let response = self
.update_entry(
&format!("{}:{}", key_name, chunk_id),
&payload[offset..offset + chunk_size],
)
.await?;
chunk_paths.push(response.split('@').last().unwrap().to_string()); // only store the timestamps
offset += chunk_size;
chunk_id += 1;
}
Ok(chunk_paths)
}

fn _check_chunk_paths_size(&self, chunk_paths: Vec<String>) -> Result<String, Error> {
let chunk_paths_string = chunk_paths.join(";");
if chunk_paths_string.len() > CHUNK_SIZE {
Expand Down Expand Up @@ -73,7 +128,7 @@ impl crate::application::CoLink {
let metadata_key = format!("{}:chunk_metadata", key_name);
let metadata_response = self.read_entry(&metadata_key).await?;
let payload_string = String::from_utf8(metadata_response)?;
let user_id = decode_jwt_without_validation(&self.jwt).unwrap().user_id;
let user_id = self.get_user_id()?;

// read the chunks into a single vector
let chunks_paths = payload_string.split(';').collect::<Vec<&str>>();
Expand Down Expand Up @@ -113,6 +168,36 @@ impl crate::application::CoLink {
res
}

#[async_recursion]
pub(crate) async fn _append_entry_chunk(
&self,
key_name: &str,
payload: &[u8],
) -> Result<String, Error> {
let metadata_key = format!("{}:chunk_metadata", key_name);
// lock the metadata entry to prevent simultaneous writes
let lock_token = self.lock(&metadata_key).await?;
// use a closure to prevent locking forever caused by errors
let res = async {
// split payload into chunks and update the chunks
let metadata_response = self.read_entry(&metadata_key).await?;
let payload_string = String::from_utf8(metadata_response)?;
let chunk_paths = self
._append_chunks(&payload_string, payload, key_name)
.await?;
// make sure that the chunk paths are smaller than the maximum entry size
let chunk_paths_string = self._check_chunk_paths_size(chunk_paths)?;
// update the metadata entry
let response = self
.update_entry(&metadata_key, &chunk_paths_string.into_bytes())
.await?;
Ok::<String, Error>(response)
}
.await;
self.unlock(lock_token).await?;
res
}

#[async_recursion]
pub(crate) async fn _delete_entry_chunk(&self, key_name: &str) -> Result<String, Error> {
let metadata_key = format!("{}:chunk_metadata", key_name);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,16 @@
use async_recursion::async_recursion;
use redis::Commands;
use redis::{aio::Connection, AsyncCommands};

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

impl crate::application::CoLink {
fn _get_con_from_address(&self, redis_address: &str) -> Result<redis::Connection, Error> {
let client = redis::Client::open(redis_address)?;
let con = client.get_connection()?;
Ok(con)
}

async fn _get_con_from_stored_credentials(
&self,
key_path: &str,
) -> Result<redis::Connection, Error> {
async fn _get_con_from_stored_credentials(&self, key_path: &str) -> Result<Connection, Error> {
let redis_url_key = format!("{}:redis_url", key_path);
let redis_url = self.read_entry(redis_url_key.as_str()).await?;
let redis_url_string = String::from_utf8(redis_url)?;
self._get_con_from_address(redis_url_string.as_str())
let client = redis::Client::open(redis_url_string)?;
let con = client.get_async_connection().await?;
Ok(con)
}

#[async_recursion]
Expand All @@ -28,7 +21,7 @@ impl crate::application::CoLink {
payload: &[u8],
) -> Result<String, Error> {
let mut con = self._get_con_from_stored_credentials(address).await?;
let response: i32 = con.set_nx(key_name, payload)?;
let response: i32 = con.set_nx(key_name, payload).await?;
if response == 0 {
Err("key already exists.")?
}
Expand All @@ -42,7 +35,7 @@ impl crate::application::CoLink {
key_name: &str,
) -> Result<Vec<u8>, Error> {
let mut con = self._get_con_from_stored_credentials(address).await?;
let response: Option<Vec<u8>> = con.get(key_name)?;
let response: Option<Vec<u8>> = con.get(key_name).await?;
match response {
Some(response) => Ok(response),
None => Err("key does not exist.")?,
Expand All @@ -57,7 +50,22 @@ impl crate::application::CoLink {
payload: &[u8],
) -> Result<String, Error> {
let mut con = self._get_con_from_stored_credentials(address).await?;
let response = con.set(key_name, payload)?;
let response = con.set(key_name, payload).await?;
Ok(response)
}

#[async_recursion]
pub(crate) async fn _append_entry_redis(
&self,
address: &str,
key_name: &str,
payload: &[u8],
) -> Result<String, Error> {
let mut con = self._get_con_from_stored_credentials(address).await?;
let response = con
.append::<&str, &[u8], i32>(key_name, payload)
.await?
.to_string();
Ok(response)
}

Expand All @@ -68,7 +76,7 @@ impl crate::application::CoLink {
key_name: &str,
) -> Result<String, Error> {
let mut con = self._get_con_from_stored_credentials(address).await?;
let response: i32 = con.del(key_name)?;
let response: i32 = con.del(key_name).await?;
if response == 0 {
Err("key does not exist.")?
}
Expand Down
Loading

0 comments on commit 08119c2

Please sign in to comment.