Skip to content

Commit

Permalink
chore: introduce collab size threshold to keep smaller collabs in pos…
Browse files Browse the repository at this point in the history
…tgres
  • Loading branch information
Horusiath committed Dec 2, 2024
1 parent 7c1dd1c commit a216a7e
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
5 changes: 5 additions & 0 deletions libs/database-entity/src/dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@ use tracing::error;
use uuid::Uuid;
use validator::Validate;

/// The default compression level of ZSTD-compressed collabs.
pub const ZSTD_COMPRESSION_LEVEL: i32 = 3;

/// The threshold used to determine whether collab data should land
/// in S3 or Postgres. Collabs with size below this value will land into Postgres.
pub const S3_COLLAB_THRESHOLD: usize = 2000;

#[derive(Debug, Clone, Validate, Serialize, Deserialize)]
pub struct CreateCollabParams {
#[validate(custom = "validate_not_empty_str")]
Expand Down
18 changes: 11 additions & 7 deletions libs/database/src/collab/disk_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use anyhow::{anyhow, Context};
use bytes::Bytes;
use collab::entity::{EncodedCollab, EncoderVersion};
use collab_entity::CollabType;
use serde::de::IntoDeserializer;
use sqlx::{Error, PgPool, Transaction};
use std::collections::HashMap;
use std::ops::DerefMut;
Expand All @@ -22,7 +21,8 @@ use crate::file::{BucketClient, ResponseBlob};
use crate::index::upsert_collab_embeddings;
use app_error::AppError;
use database_entity::dto::{
CollabParams, PendingCollabWrite, QueryCollab, QueryCollabResult, ZSTD_COMPRESSION_LEVEL,
CollabParams, PendingCollabWrite, QueryCollab, QueryCollabResult, S3_COLLAB_THRESHOLD,
ZSTD_COMPRESSION_LEVEL,
};

#[derive(Clone)]
Expand Down Expand Up @@ -93,8 +93,10 @@ impl CollabDiskCache {
s3: AwsS3BucketClientImpl,
) -> AppResult<()> {
let key = collab_key(workspace_id, &params.object_id);
let encoded_collab = std::mem::take(&mut params.encoded_collab_v1);
tokio::spawn(Self::insert_blob_with_retries(s3, key, encoded_collab, 3));
if params.encoded_collab_v1.len() > S3_COLLAB_THRESHOLD {
let encoded_collab = std::mem::take(&mut params.encoded_collab_v1);
tokio::spawn(Self::insert_blob_with_retries(s3, key, encoded_collab, 3));
}

insert_into_af_collab(transaction, uid, workspace_id, &params).await?;
if let Some(em) = &params.embeddings {
Expand Down Expand Up @@ -197,9 +199,11 @@ impl CollabDiskCache {
) -> Result<(), AppError> {
let mut blobs = HashMap::new();
for param in params_list.iter_mut() {
let key = collab_key(workspace_id, &param.object_id);
let blob = std::mem::take(&mut param.encoded_collab_v1);
blobs.insert(key, blob);
if param.encoded_collab_v1.len() > S3_COLLAB_THRESHOLD {
let key = collab_key(workspace_id, &param.object_id);
let blob = std::mem::take(&mut param.encoded_collab_v1);
blobs.insert(key, blob);
}
}

let mut transaction = self.pg_pool.begin().await?;
Expand Down

0 comments on commit a216a7e

Please sign in to comment.