From a216a7eccf03c61e8e013c585e4c05c8b493df2d Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Mon, 2 Dec 2024 13:11:35 +0100 Subject: [PATCH] chore: introduce collab size threshold to keep smaller collabs in postgres --- libs/database-entity/src/dto.rs | 5 +++++ libs/database/src/collab/disk_cache.rs | 18 +++++++++++------- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/libs/database-entity/src/dto.rs b/libs/database-entity/src/dto.rs index f13eddb26..d53791e8f 100644 --- a/libs/database-entity/src/dto.rs +++ b/libs/database-entity/src/dto.rs @@ -19,8 +19,13 @@ use tracing::error; use uuid::Uuid; use validator::Validate; +/// The default compression level of ZSTD-compressed collabs. pub const ZSTD_COMPRESSION_LEVEL: i32 = 3; +/// The threshold used to determine whether collab data should land +/// in S3 or Postgres. Collabs with size below this value will land into Postgres. +pub const S3_COLLAB_THRESHOLD: usize = 2000; + #[derive(Debug, Clone, Validate, Serialize, Deserialize)] pub struct CreateCollabParams { #[validate(custom = "validate_not_empty_str")] diff --git a/libs/database/src/collab/disk_cache.rs b/libs/database/src/collab/disk_cache.rs index b378f5de4..c7c20f7d9 100644 --- a/libs/database/src/collab/disk_cache.rs +++ b/libs/database/src/collab/disk_cache.rs @@ -2,7 +2,6 @@ use anyhow::{anyhow, Context}; use bytes::Bytes; use collab::entity::{EncodedCollab, EncoderVersion}; use collab_entity::CollabType; -use serde::de::IntoDeserializer; use sqlx::{Error, PgPool, Transaction}; use std::collections::HashMap; use std::ops::DerefMut; @@ -22,7 +21,8 @@ use crate::file::{BucketClient, ResponseBlob}; use crate::index::upsert_collab_embeddings; use app_error::AppError; use database_entity::dto::{ - CollabParams, PendingCollabWrite, QueryCollab, QueryCollabResult, ZSTD_COMPRESSION_LEVEL, + CollabParams, PendingCollabWrite, QueryCollab, QueryCollabResult, S3_COLLAB_THRESHOLD, + ZSTD_COMPRESSION_LEVEL, }; #[derive(Clone)] @@ -93,8 +93,10 @@ impl CollabDiskCache { s3: AwsS3BucketClientImpl, ) -> AppResult<()> { let key = collab_key(workspace_id, ¶ms.object_id); - let encoded_collab = std::mem::take(&mut params.encoded_collab_v1); - tokio::spawn(Self::insert_blob_with_retries(s3, key, encoded_collab, 3)); + if params.encoded_collab_v1.len() > S3_COLLAB_THRESHOLD { + let encoded_collab = std::mem::take(&mut params.encoded_collab_v1); + tokio::spawn(Self::insert_blob_with_retries(s3, key, encoded_collab, 3)); + } insert_into_af_collab(transaction, uid, workspace_id, ¶ms).await?; if let Some(em) = ¶ms.embeddings { @@ -197,9 +199,11 @@ impl CollabDiskCache { ) -> Result<(), AppError> { let mut blobs = HashMap::new(); for param in params_list.iter_mut() { - let key = collab_key(workspace_id, ¶m.object_id); - let blob = std::mem::take(&mut param.encoded_collab_v1); - blobs.insert(key, blob); + if param.encoded_collab_v1.len() > S3_COLLAB_THRESHOLD { + let key = collab_key(workspace_id, ¶m.object_id); + let blob = std::mem::take(&mut param.encoded_collab_v1); + blobs.insert(key, blob); + } } let mut transaction = self.pg_pool.begin().await?;