Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

s3 uploads #543 #659

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
802 changes: 652 additions & 150 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -41,6 +41,8 @@ _Status: alpha. [Breaking changes](CHANGELOG.md) are expected until 1.0._
- 📖 **Pagination, sorting and filtering** queries using [Atomic Collections](https://docs.atomicdata.dev/schema/collections.html).
- 🔐 **Authorization** (read / write permissions) and Hierarchical structures powered by [Atomic Hierarchy](https://docs.atomicdata.dev/hierarchy.html)
- 📲 **Invite and sharing system** with [Atomic Invites](https://docs.atomicdata.dev/invitations.html)
- 📂 **File management**: Upload, download and preview attachments with support for using S3 as file storage backend.
- 🖥️ **Desktop app**: Easy desktop installation, with status bar icon, powered by [tauri](https://github.com/tauri-apps/tauri/).
- 🌐 **Embedded server** with support for HTTP / HTTPS / HTTP2.0 (TLS) and Built-in LetsEncrypt handshake.
- 📚 **Libraries**: [Javascript / Typescript](https://www.npmjs.com/package/@tomic/lib), [React](https://www.npmjs.com/package/@tomic/react), [Svelte](https://www.npmjs.com/package/@tomic/svelte), [Rust](https://crates.io/crates/atomic-lib)

@@ -50,6 +52,7 @@ https://user-images.githubusercontent.com/2183313/139728539-d69b899f-6f9b-44cb-a

Check out the [documentation] for installation instructions, API docs, and more.

### Configuring S3 for File Storage
## Contribute

Issues and PRs are welcome!
15 changes: 15 additions & 0 deletions docs/src/atomicserver/installation.md
Original file line number Diff line number Diff line change
@@ -106,6 +106,21 @@ ATOMIC_HTTPS=false
ATOMIC_SERVER_URL=https://example.com
```

### Configuring S3 for File Storage

You can configure atomic-server to use S3 (and compatible services) for file storage via environment variables or command line arguments when starting atomic-server.

Credentials can either be found in the standard location for AWS credentials on your OS (e.g. `~/.aws/credentials` on UNIX systems) or by using the environment variables `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`.

Available configuration values:

- bucket: `--s3-bucket="my-bucket-name"` or env var `ATOMIC_S3_BUCKET` (required)
- region: `--s3-region="us-east-2"` or env var `ATOMIC_S3_REGION`
- endpoint: `--s3-endpoint="https://s3.us-east-2.amazonaws.com"` or env var `ATOMIC_S3_ENDPOINT`
0 path: `--s3-path="atomic_uploads"` or env var `ATOMIC_S3_PATH`

For example, the above configuration would uploads files to `s3://my-bucket-name/atomic_uploads/` in the `us-east-2` region.

## Using `systemd` to run Atomic-Server as a service

In Linux operating systems, you can use `systemd` to manage running processes.
1 change: 1 addition & 0 deletions lib/src/db.rs
Original file line number Diff line number Diff line change
@@ -496,6 +496,7 @@ impl Storelike for Db {

let dynamic_span =
tracing::span!(tracing::Level::TRACE, "get_resource_extended (dynamic)").entered();
println!("get_resource_extended: {}", &removed_query_params);
let mut resource = self.get_resource(&removed_query_params)?;

let _explanation = crate::hierarchy::check_read(self, &resource, for_agent)?;
1 change: 1 addition & 0 deletions lib/src/storelike.rs
Original file line number Diff line number Diff line change
@@ -209,6 +209,7 @@ pub trait Storelike: Sized {
) -> AtomicResult<Resource> {
if let Some(self_url) = self.get_self_url() {
if subject.starts_with(&self_url) {
println!("Custom backtrace: {}", std::backtrace::Backtrace::force_capture());
return Err(AtomicError::not_found(format!(
"Failed to retrieve locally: '{}'",
subject
1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@ dialoguer = "0.11"
directories = ">= 2, < 5"
dotenv = "0.15"
futures = "0.3"
opendal = "0.45.1"
percent-encoding = "2.2.0"
regex = "1"
rio_api = "0.8"
14 changes: 13 additions & 1 deletion server/src/appstate.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! App state, which is accessible from handlers
use crate::{
commit_monitor::CommitMonitor, config::Config, errors::AtomicServerResult, search::SearchState,
commit_monitor::CommitMonitor, config::Config, errors::AtomicServerResult, files::FileStore,
search::SearchState,
};
use atomic_lib::{
agents::{generate_public_key, Agent},
@@ -23,6 +24,10 @@ pub struct AppState {
/// The Actix Address of the CommitMonitor, which should receive updates when a commit is applied
pub commit_monitor: actix::Addr<CommitMonitor>,
pub search_state: SearchState,
/// stores config values and the active FileStore type, e.g. FS or S3
pub file_store: FileStore,
/// stores config values for FS filestore regardless of active file store as fallback
pub fs_file_store: FileStore,
}

/// Creates the AppState (the server's context available in Handlers).
@@ -93,11 +98,18 @@ pub fn init(config: Config) -> AtomicServerResult<AppState> {
crate::search::add_all_resources(&search_state, &store)?
}

tracing::info!("Initializing file stores");
// Initialize file stores
let fs_file_store = FileStore::init_fs_from_config(&config);
let file_store = FileStore::init_from_config(&config, fs_file_store.clone());

Ok(AppState {
store,
config,
commit_monitor,
search_state,
file_store,
fs_file_store,
})
}

1 change: 1 addition & 0 deletions server/src/bin.rs
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ mod commit_monitor;
pub mod config;
mod content_types;
mod errors;
mod files;
mod handlers;
mod helpers;
#[cfg(feature = "https")]
16 changes: 16 additions & 0 deletions server/src/config.rs
Original file line number Diff line number Diff line change
@@ -74,6 +74,22 @@ pub struct Opts {
#[clap(long, env = "ATOMIC_DATA_DIR")]
pub data_dir: Option<PathBuf>,

/// bucket name from s3-compatible storage service
#[clap(long, env = "ATOMIC_S3_BUCKET")]
pub s3_bucket: Option<String>,

/// region for s3-compatible storage service, defaults to "us-east-1"
#[clap(long, env = "ATOMIC_S3_REGION")]
pub s3_region: Option<String>,

/// endpoint for s3-compatible storage service, defaults to "https://s3.amazonaws.com"
#[clap(long, env = "ATOMIC_S3_ENDPOINT")]
pub s3_endpoint: Option<String>,

/// path where s3 uploads will be stored
#[clap(long, env = "ATOMIC_S3_PATH")]
pub s3_path: Option<String>,
joepio marked this conversation as resolved.
Show resolved Hide resolved

/// CAUTION: Skip authentication checks, making all data publicly readable. Improves performance.
#[clap(long, env = "ATOMIC_PUBLIC_MODE")]
pub public_mode: bool,
10 changes: 10 additions & 0 deletions server/src/errors.rs
Original file line number Diff line number Diff line change
@@ -179,3 +179,13 @@ impl From<actix_web::Error> for AtomicServerError {
}
}
}

impl From<opendal::Error> for AtomicServerError {
fn from(error: opendal::Error) -> Self {
AtomicServerError {
message: error.to_string(),
error_type: AppErrorType::Other,
error_resource: None,
}
}
}
175 changes: 175 additions & 0 deletions server/src/files.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
use std::{fmt, fs, io::Write, path::PathBuf, time::Duration};

use actix_multipart::Field;
use futures::StreamExt;
use opendal::{services::S3, Operator};

use crate::{appstate::AppState, config::Config, errors::AtomicServerResult};

#[derive(Clone, Debug, PartialEq)]
pub enum FileStore {
S3(S3Config),
FS(FSConfig),
}

#[derive(Clone, Debug, PartialEq)]
pub struct S3Config {
pub bucket: String,
pub path: String,
pub endpoint: Option<String>,
pub region: Option<String>,
}

#[derive(Clone, Debug, PartialEq)]
pub struct FSConfig {
pub path: PathBuf,
}

impl FileStore {
const S3_PREFIX: &'static str = "s3:";
const FS_PREFIX: &'static str = "fs:";

pub fn init_fs_from_config(config: &Config) -> FileStore {
FileStore::FS(FSConfig {
path: config.uploads_path.clone(),
})
}

pub fn init_from_config(config: &Config, fs_file_store: FileStore) -> FileStore {
let opts = &config.opts;
if let Some(bucket) = &opts.s3_bucket {
metame marked this conversation as resolved.
Show resolved Hide resolved
let config = S3Config {
bucket: bucket.clone(),
endpoint: opts.s3_endpoint.clone(),
region: opts.s3_region.clone(),
path: opts.s3_path.clone().unwrap_or("uploads".to_string()),
};
FileStore::S3(config)
} else {
fs_file_store
}
}

pub fn get_subject_file_store<'a>(appstate: &'a AppState, subject: &str) -> &'a FileStore {
metame marked this conversation as resolved.
Show resolved Hide resolved
if subject.contains(Self::S3_PREFIX) {
&appstate.file_store
} else {
&appstate.fs_file_store
}
}

pub fn get_fs_file_path(&self, file_id: &str) -> AtomicServerResult<PathBuf> {
tracing::info!("fs_file_path: {}", file_id);
if let FileStore::FS(config) = self {
let fs_file_id = file_id.strip_prefix(Self::FS_PREFIX).unwrap_or(file_id);
let mut file_path = config.path.clone();
file_path.push(fs_file_id);
Ok(file_path)
} else {
Err("Wrong FileStore passed to get_fs_file_path".into())
}
}

pub fn prefix(&self) -> &str {
match self {
Self::S3(_) => Self::S3_PREFIX,
Self::FS(_) => Self::FS_PREFIX,
}
}

pub fn encoded(&self) -> String {
urlencoding::encode(self.prefix()).into_owned()
}

pub async fn upload_file(&self, file_id: &str, field: Field) -> AtomicServerResult<i64> {
match self {
FileStore::S3(_) => s3_upload(self, file_id, field).await,
FileStore::FS(config) => fs_upload(self, config, file_id, field).await,
}
}
}

impl fmt::Display for FileStore {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.prefix())
}
}

async fn fs_upload(
file_store: &FileStore,
config: &FSConfig,
file_id: &str,
mut field: Field,
) -> AtomicServerResult<i64> {
std::fs::create_dir_all(config.path.clone())?;

let mut file = fs::File::create(file_store.get_fs_file_path(file_id)?)?;

let byte_count: i64 = file
.metadata()?
.len()
.try_into()
.map_err(|_e| "Too large")?;

// Field in turn is stream of *Bytes* object
while let Some(chunk) = field.next().await {
let data = chunk.map_err(|e| format!("Error while reading multipart data. {}", e))?;
// TODO: Update a SHA256 hash here for checksum
file.write_all(&data)?;
}

Ok(byte_count)
}

async fn s3_upload(
file_store: &FileStore,
file_id: &str,
mut field: Field,
) -> AtomicServerResult<i64> {
let mut builder = S3::default();

if let FileStore::S3(config) = file_store {
builder.bucket(&config.bucket);
builder.root(&config.path);
config.region.as_ref().map(|r| builder.region(r));
config.endpoint.as_ref().map(|e| builder.endpoint(e));
} else {
return Err("Uploading to S3 but no S3 config provided".into());
}

let op: Operator = Operator::new(builder)?.finish();
let mut w = op.writer(file_id).await?;
let mut len = 0;
while let Some(chunk) = field.next().await {
let data = chunk.map_err(|e| format!("Error while reading multipart data. {}", e))?;
len += data.len();
w.write(data).await?;
}

let byte_length: i64 = len.try_into().map_err(|_e| "Too large")?;
w.close().await?;
Ok(byte_length)
}

pub async fn get_s3_signed_url(
file_store: &FileStore,
duration: Duration,
file_id: &str,
) -> AtomicServerResult<String> {
let mut builder = S3::default();

if let FileStore::S3(config) = file_store {
builder.bucket(&config.bucket);
builder.root(&config.path);
config.region.as_ref().map(|r| builder.region(r));
config.endpoint.as_ref().map(|e| builder.endpoint(e));
} else {
return Err("Downloading from S3 but no S3 config provided".into());
}

let op: Operator = Operator::new(builder)?.finish();

let uri = op.presign_read(file_id, duration).await?.uri().to_string();

Ok(uri)
}
48 changes: 37 additions & 11 deletions server/src/handlers/download.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
use std::time::Duration;

use actix_files::NamedFile;
use actix_web::{web, HttpRequest, HttpResponse};
use atomic_lib::{urls, Resource, Storelike};
use actix_web::{web, HttpRequest, HttpResponse, Responder};
use atomic_lib::{urls, Storelike};

use crate::{appstate::AppState, errors::AtomicServerResult, helpers::get_client_agent};
use crate::{
appstate::AppState,
errors::AtomicServerResult,
files::{self, FileStore},
helpers::get_client_agent,
};

/// Downloads the File of the Resource that matches the same URL minus the `/download` path.
#[tracing::instrument(skip(appstate, req))]
@@ -26,20 +33,39 @@ pub async fn handle_download(

let for_agent = get_client_agent(headers, &appstate, subject.clone())?;
tracing::info!("handle_download: {}", subject);
let resource = store.get_resource_extended(&subject, false, &for_agent)?;
download_file_handler_partial(&resource, &req, &appstate)
let file_store = FileStore::get_subject_file_store(&appstate, &subject);
let encoded = subject.replace(file_store.prefix(), &file_store.encoded());
let resource = store.get_resource_extended(&encoded, false, &for_agent)?;
let file_id = resource
.get(urls::INTERNAL_ID)
.map_err(|e| format!("Internal ID of file could not be resolved. {}", e))?
.to_string();

if let FileStore::S3(_) = file_store {
signed_url_redirect_handler(file_id.as_str(), &req, &appstate).await
} else {
download_file_handler_partial(file_id.as_str(), &req, &appstate)
}
}

pub fn download_file_handler_partial(
resource: &Resource,
file_id: &str,
req: &HttpRequest,
appstate: &AppState,
) -> AtomicServerResult<HttpResponse> {
let file_name = resource
.get(urls::INTERNAL_ID)
.map_err(|e| format!("Internal ID of file could not be resolved. {}", e))?;
let mut file_path = appstate.config.uploads_path.clone();
file_path.push(file_name.to_string());
let file_path = appstate.fs_file_store.get_fs_file_path(file_id)?;
let file = NamedFile::open(file_path)?;
Ok(file.into_response(req))
}

async fn signed_url_redirect_handler(
file_id: &str,
req: &HttpRequest,
appstate: &AppState,
) -> AtomicServerResult<HttpResponse> {
let signed_url =
files::get_s3_signed_url(&appstate.file_store, Duration::from_secs(3600), file_id).await?;
Ok(web::Redirect::to(signed_url)
.respond_to(req)
.map_into_boxed_body())
}
Loading