From 1fcd5af526b256e65b2aa3afe79bd03a0da78ef9 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 30 Nov 2023 11:01:20 +0000 Subject: [PATCH] indexeddb: Update storage for `inbound_group_sessions` (#2885) Currently, querying for inbound group sessions which need backing up is very inefficient: we have to search through the whole list. Here, we change the way they are stored so that we can maintain an index of the ones that need a backup. Fixes: https://github.com/vector-im/element-web/issues/26488 Fixes: https://github.com/matrix-org/matrix-rust-sdk/issues/2877 --- * indexeddb: Update storage for inbound_group_sessions Currently, querying for inbound group sessions which need backing up is very inefficient: we have to search through the whole list. Here, we change the way they are stored so that we can maintain an index of the ones that need a backup. * Rename functions for clarity * Remove spurious log line This was a bit verbose * Rename constants for i_g_s store names * improve log messages * add a warning * Rename `InboundGroupSessionIndexedDbObject.data` * formatting --- Cargo.lock | 1 + crates/matrix-sdk-indexeddb/Cargo.toml | 1 + .../src/crypto_store/migrations.rs | 330 ++++++++++++++++-- .../src/crypto_store/mod.rs | 187 +++++++--- 4 files changed, 454 insertions(+), 65 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2c194f5fc9a..444f8f944f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3259,6 +3259,7 @@ dependencies = [ "thiserror", "tokio", "tracing", + "tracing-subscriber", "uuid", "wasm-bindgen", "wasm-bindgen-test", diff --git a/crates/matrix-sdk-indexeddb/Cargo.toml b/crates/matrix-sdk-indexeddb/Cargo.toml index d378e8b4bfb..23eaf043fc7 100644 --- a/crates/matrix-sdk-indexeddb/Cargo.toml +++ b/crates/matrix-sdk-indexeddb/Cargo.toml @@ -49,5 +49,6 @@ matrix-sdk-base = { path = "../matrix-sdk-base", features = ["testing"] } matrix-sdk-common = { path = "../matrix-sdk-common", features = ["js"] } matrix-sdk-crypto = { path = "../matrix-sdk-crypto", features = ["js", "testing"] } matrix-sdk-test = { path = "../../testing/matrix-sdk-test" } +tracing-subscriber = { version = "0.3.18", default-features = false, features = ["registry", "tracing-log"] } uuid = "1.3.0" wasm-bindgen-test = "0.3.33" diff --git a/crates/matrix-sdk-indexeddb/src/crypto_store/migrations.rs b/crates/matrix-sdk-indexeddb/src/crypto_store/migrations.rs index ef04b630e67..cb743ef372d 100644 --- a/crates/matrix-sdk-indexeddb/src/crypto_store/migrations.rs +++ b/crates/matrix-sdk-indexeddb/src/crypto_store/migrations.rs @@ -13,15 +13,67 @@ // limitations under the License. use indexed_db_futures::{prelude::*, web_sys::DomException}; -use tracing::info; +use matrix_sdk_crypto::olm::InboundGroupSession; +use tracing::{debug, info}; use wasm_bindgen::JsValue; -use crate::crypto_store::{keys, Result}; +use crate::{ + crypto_store::{ + indexeddb_serializer::IndexeddbSerializer, keys, InboundGroupSessionIndexedDbObject, Result, + }, + IndexeddbCryptoStoreError, +}; + +mod old_keys { + /// Old format of the inbound_group_sessions store which lacked indexes or a + /// sensible structure + pub const INBOUND_GROUP_SESSIONS_V1: &str = "inbound_group_sessions"; +} /// Open the indexeddb with the given name, upgrading it to the latest version /// of the schema if necessary. -pub async fn open_and_upgrade_db(name: &str) -> Result { - let mut db_req: OpenDbRequest = IdbDatabase::open_u32(name, 5)?; +pub async fn open_and_upgrade_db( + name: &str, + serializer: &IndexeddbSerializer, +) -> Result { + // This is all a bit of a hack. Some of the version migrations require a data + // migration, which has to be done via async APIs; however, the + // JS `upgrade_needed` mechanism does not allow for async calls. + // + // Start by finding out what the existing version is, if any. + let db = IdbDatabase::open(name)?.await?; + let old_version = db.version() as u32; + db.close(); + + // If we have yet to complete the migration to V7, migrate the schema to V6 + // (if necessary), and then migrate any remaining data. + if old_version <= 6 { + let db = migrate_schema_up_to_v6(name).await?; + migrate_data_for_v6(serializer, &db).await?; + db.close(); + } + + // Now we can safely complete the migration to V7 which will drop the old store. + let mut db_req: OpenDbRequest = IdbDatabase::open_u32(name, 7)?; + db_req.set_on_upgrade_needed(Some(|evt: &IdbVersionChangeEvent| -> Result<(), JsValue> { + let old_version = evt.old_version() as u32; + let new_version = evt.new_version() as u32; + + info!(old_version, new_version, "Continuing IndexeddbCryptoStore upgrade"); + + if old_version < 7 { + migrate_stores_to_v7(evt.db())?; + } + + info!(old_version, new_version, "IndexeddbCryptoStore upgrade complete"); + Ok(()) + })); + + Ok(db_req.await?) +} + +async fn migrate_schema_up_to_v6(name: &str) -> Result { + let mut db_req: OpenDbRequest = IdbDatabase::open_u32(name, 6)?; db_req.set_on_upgrade_needed(Some(|evt: &IdbVersionChangeEvent| -> Result<(), JsValue> { // Even if the web-sys bindings expose the version as a f64, the IndexedDB API @@ -30,40 +82,56 @@ pub async fn open_and_upgrade_db(name: &str) -> Result Result<(), DomException> { +fn migrate_stores_to_v1(db: &IdbDatabase) -> Result<(), DomException> { db.create_object_store(keys::CORE)?; db.create_object_store(keys::SESSION)?; - db.create_object_store(keys::INBOUND_GROUP_SESSIONS)?; + db.create_object_store(old_keys::INBOUND_GROUP_SESSIONS_V1)?; db.create_object_store(keys::OUTBOUND_GROUP_SESSIONS)?; db.create_object_store(keys::TRACKED_USERS)?; db.create_object_store(keys::OLM_HASHES)?; @@ -75,21 +143,21 @@ fn create_stores_for_v1(db: &IdbDatabase) -> Result<(), DomException> { Ok(()) } -fn create_stores_for_v2(db: &IdbDatabase) -> Result<(), DomException> { +fn migrate_stores_to_v2(db: &IdbDatabase) -> Result<(), DomException> { // We changed how we store inbound group sessions, the key used to // be a tuple of `(room_id, sender_key, session_id)` now it's a // tuple of `(room_id, session_id)` // // Let's just drop the whole object store. - db.delete_object_store(keys::INBOUND_GROUP_SESSIONS)?; - db.create_object_store(keys::INBOUND_GROUP_SESSIONS)?; + db.delete_object_store(old_keys::INBOUND_GROUP_SESSIONS_V1)?; + db.create_object_store(old_keys::INBOUND_GROUP_SESSIONS_V1)?; db.create_object_store(keys::ROOM_SETTINGS)?; Ok(()) } -fn create_stores_for_v3(db: &IdbDatabase) -> Result<(), DomException> { +fn migrate_stores_to_v3(db: &IdbDatabase) -> Result<(), DomException> { // We changed the way we store outbound session. // ShareInfo changed from a struct to an enum with struct variant. // Let's just discard the existing outbounds @@ -102,12 +170,12 @@ fn create_stores_for_v3(db: &IdbDatabase) -> Result<(), DomException> { Ok(()) } -fn create_stores_for_v4(db: &IdbDatabase) -> Result<(), DomException> { +fn migrate_stores_to_v4(db: &IdbDatabase) -> Result<(), DomException> { db.create_object_store(keys::SECRETS_INBOX)?; Ok(()) } -fn create_stores_for_v5(db: &IdbDatabase) -> Result<(), DomException> { +fn migrate_stores_to_v5(db: &IdbDatabase) -> Result<(), DomException> { // Create a new store for outgoing secret requests let object_store = db.create_object_store(keys::GOSSIP_REQUESTS)?; @@ -136,3 +204,225 @@ fn create_stores_for_v5(db: &IdbDatabase) -> Result<(), DomException> { Ok(()) } + +fn migrate_stores_to_v6(db: &IdbDatabase) -> Result<(), DomException> { + // We want to change the shape of the inbound group sessions store. To do so, we + // first need to build a new store, then copy all the data over. + // + // But copying the data needs to happen outside the database upgrade process + // (because it needs async calls). So, here we create a new store for + // inbound group sessions. We don't populate it yet; that happens once we + // have done the upgrade to v6, in `migrate_data_for_v6`. Finally we drop the + // old store in create_stores_for_v7. + + let object_store = db.create_object_store(keys::INBOUND_GROUP_SESSIONS_V2)?; + + let mut params = IdbIndexParameters::new(); + params.unique(false); + object_store.create_index_with_params( + keys::INBOUND_GROUP_SESSIONS_BACKUP_INDEX, + &IdbKeyPath::str("needs_backup"), + ¶ms, + )?; + + Ok(()) +} + +async fn migrate_data_for_v6(serializer: &IndexeddbSerializer, db: &IdbDatabase) -> Result<()> { + // The new store has been made for inbound group sessions; time to populate it. + let txn = db.transaction_on_multi_with_mode( + &[old_keys::INBOUND_GROUP_SESSIONS_V1, keys::INBOUND_GROUP_SESSIONS_V2], + IdbTransactionMode::Readwrite, + )?; + + let old_store = txn.object_store(old_keys::INBOUND_GROUP_SESSIONS_V1)?; + let new_store = txn.object_store(keys::INBOUND_GROUP_SESSIONS_V2)?; + + let row_count = old_store.count()?.await?; + info!(row_count, "Migrating inbound group session data from v1 to v2"); + + if let Some(cursor) = old_store.open_cursor()?.await? { + let mut idx = 0; + loop { + idx += 1; + let key = cursor.key().ok_or(matrix_sdk_crypto::CryptoStoreError::Backend( + "inbound_group_sessions v1 cursor has no key".into(), + ))?; + let value = cursor.value(); + + if idx % 100 == 0 { + debug!("Migrating session {idx} of {row_count}"); + } + + let igs = InboundGroupSession::from_pickle(serializer.deserialize_value(value)?) + .map_err(|e| IndexeddbCryptoStoreError::CryptoStoreError(e.into()))?; + + // This is much the same as `IndexeddbStore::serialize_inbound_group_session`. + let new_data = serde_wasm_bindgen::to_value(&InboundGroupSessionIndexedDbObject { + pickled_session: serializer.serialize_value_as_bytes(&igs.pickle().await)?, + needs_backup: !igs.backed_up(), + })?; + + new_store.add_key_val(&key, &new_data)?; + + // we are done with the original data, so delete it now. + cursor.delete()?; + + if !cursor.continue_cursor()?.await? { + break; + } + } + } + + Ok(txn.await.into_result()?) +} + +fn migrate_stores_to_v7(db: &IdbDatabase) -> Result<(), DomException> { + db.delete_object_store(old_keys::INBOUND_GROUP_SESSIONS_V1) +} + +#[cfg(all(test, target_arch = "wasm32"))] +mod tests { + use std::sync::Arc; + + use indexed_db_futures::prelude::*; + use matrix_sdk_common::js_tracing::make_tracing_subscriber; + use matrix_sdk_crypto::{ + olm::SessionKey, + store::CryptoStore, + types::EventEncryptionAlgorithm, + vodozemac::{Curve25519PublicKey, Curve25519SecretKey, Ed25519SecretKey}, + }; + use matrix_sdk_store_encryption::StoreCipher; + use matrix_sdk_test::async_test; + use ruma::room_id; + use tracing_subscriber::util::SubscriberInitExt; + + use crate::{crypto_store::migrations::*, IndexeddbCryptoStore}; + + wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); + + /// Test migrating `inbound_group_session` data from store v5 to store v7, + /// on a store with encryption disabled. + #[async_test] + async fn test_v7_migration_unencrypted() { + test_v7_migration_with_cipher("test_v7_migration_unencrypted", None).await + } + + /// Test migrating `inbound_group_session` data from store v5 to store v7, + /// on a store with encryption enabled. + #[async_test] + async fn test_v7_migration_encrypted() { + let cipher = StoreCipher::new().unwrap(); + test_v7_migration_with_cipher("test_v7_migration_encrypted", Some(Arc::new(cipher))).await; + } + + /// Helper function for `test_v7_migration_{un,}encrypted`: test migrating + /// `inbound_group_session` data from store v5 to store v7. + async fn test_v7_migration_with_cipher( + db_prefix: &str, + store_cipher: Option>, + ) { + let _ = make_tracing_subscriber(None).try_init(); + let db_name = format!("{db_prefix:0}::matrix-sdk-crypto"); + + // delete the db in case it was used in a previous run + let _ = IdbDatabase::delete_by_name(&db_name); + + // Schema V7 migrated the inbound group sessions to a new format. + // To test, first create a database and populate it with the *old* style of + // entry. + let db = create_v5_db(&db_name).await.unwrap(); + + let room_id = room_id!("!test:localhost"); + let curve_key = Curve25519PublicKey::from(&Curve25519SecretKey::new()); + let ed_key = Ed25519SecretKey::new().public_key(); + + // a backed-up session + let session1 = InboundGroupSession::new( + curve_key, + ed_key, + room_id, + &SessionKey::from_base64( + "AgAAAABTyn3CR8mzAxhsHH88td5DrRqfipJCnNbZeMrfzhON6O1Cyr9ewx/sDFLO6\ + +NvyW92yGvMub7nuAEQb+SgnZLm7nwvuVvJgSZKpoJMVliwg8iY9TXKFT286oBtT2\ + /8idy6TcpKax4foSHdMYlZXu5zOsGDdd9eYnYHpUEyDT0utuiaakZM3XBMNLEVDj9\ + Ps929j1FGgne1bDeFVoty2UAOQK8s/0JJigbKSu6wQ/SzaCYpE/LD4Egk2Nxs1JE2\ + 33ii9J8RGPYOp7QWl0kTEc8mAlqZL7mKppo9AwgtmYweAg", + ) + .unwrap(), + EventEncryptionAlgorithm::MegolmV1AesSha2, + None, + ) + .unwrap(); + session1.mark_as_backed_up(); + + // an un-backed-up session + let session2 = InboundGroupSession::new( + curve_key, + ed_key, + room_id, + &SessionKey::from_base64( + "AgAAAACO1PjBdqucFUcNFU6JgXYAi7KMeeUqUibaLm6CkHJcMiDTFWq/K5SFAukJc\ + WjeyOpnZr4vpezRlbvNaQpNPMub2Cs2u14fHj9OpKFD7c4hFS4j94q4pTLZly3qEV\ + BIjWdOpcIVfN7QVGVIxYiI6KHEddCHrNCo9fc8GUdfzrMnmUooQr/m4ZAkRdErzUH\ + uUAlUBwOKcPi7Cs/KrMw/sHCRDkTntHZ3BOrzJsAVbHUgq+8/Sqy3YE+CX6uEnig+\ + 1NWjZD9f1vvXnSKKDdHj1927WFMFZ/yYc24607zEVUaODQ", + ) + .unwrap(), + EventEncryptionAlgorithm::MegolmV1AesSha2, + None, + ) + .unwrap(); + + let serializer = IndexeddbSerializer::new(store_cipher.clone()); + + let txn = db + .transaction_on_one_with_mode( + old_keys::INBOUND_GROUP_SESSIONS_V1, + IdbTransactionMode::Readwrite, + ) + .unwrap(); + let sessions = txn.object_store(old_keys::INBOUND_GROUP_SESSIONS_V1).unwrap(); + for session in vec![&session1, &session2] { + let room_id = session.room_id(); + let session_id = session.session_id(); + let key = serializer.encode_key(keys::INBOUND_GROUP_SESSIONS_V2, (room_id, session_id)); + let pickle = session.pickle().await; + + sessions.put_key_val(&key, &serializer.serialize_value(&pickle).unwrap()).unwrap(); + } + txn.await.into_result().unwrap(); + + // now close our DB, reopen it properly, and check that we can still read our + // data. + db.close(); + + let store = + IndexeddbCryptoStore::open_with_store_cipher(&db_prefix, store_cipher).await.unwrap(); + + let s = + store.get_inbound_group_session(room_id, session1.session_id()).await.unwrap().unwrap(); + assert_eq!(s.session_id(), session1.session_id()); + assert_eq!(s.backed_up(), true); + + let s = + store.get_inbound_group_session(room_id, session2.session_id()).await.unwrap().unwrap(); + assert_eq!(s.session_id(), session2.session_id()); + assert_eq!(s.backed_up(), false); + } + + async fn create_v5_db(name: &str) -> std::result::Result { + let mut db_req: OpenDbRequest = IdbDatabase::open_u32(name, 5)?; + db_req.set_on_upgrade_needed(Some(|evt: &IdbVersionChangeEvent| -> Result<(), JsValue> { + let db = evt.db(); + migrate_stores_to_v1(db)?; + migrate_stores_to_v2(db)?; + migrate_stores_to_v3(db)?; + migrate_stores_to_v4(db)?; + migrate_stores_to_v5(db)?; + Ok(()) + })); + db_req.await + } +} diff --git a/crates/matrix-sdk-indexeddb/src/crypto_store/mod.rs b/crates/matrix-sdk-indexeddb/src/crypto_store/mod.rs index ec1553c7d3b..79d7f1f7526 100644 --- a/crates/matrix-sdk-indexeddb/src/crypto_store/mod.rs +++ b/crates/matrix-sdk-indexeddb/src/crypto_store/mod.rs @@ -53,7 +53,9 @@ mod keys { pub const CORE: &str = "core"; pub const SESSION: &str = "session"; - pub const INBOUND_GROUP_SESSIONS: &str = "inbound_group_sessions"; + + pub const INBOUND_GROUP_SESSIONS_V2: &str = "inbound_group_sessions2"; + pub const INBOUND_GROUP_SESSIONS_BACKUP_INDEX: &str = "backup"; pub const OUTBOUND_GROUP_SESSIONS: &str = "outbound_group_sessions"; @@ -164,7 +166,7 @@ impl IndexeddbCryptoStore { let name = format!("{prefix:0}::matrix-sdk-crypto"); let serializer = IndexeddbSerializer::new(store_cipher); - let db = open_and_upgrade_db(&name).await?; + let db = open_and_upgrade_db(&name, &serializer).await?; let session_cache = SessionStore::new(); Ok(Self { @@ -251,6 +253,44 @@ impl IndexeddbCryptoStore { self.static_account.read().unwrap().clone() } + /// Transform an [`InboundGroupSession`] into a `JsValue` holding a + /// [`InboundGroupSessionIndexedDbObject`], ready for storing. + async fn serialize_inbound_group_session( + &self, + session: &InboundGroupSession, + ) -> Result { + let obj = InboundGroupSessionIndexedDbObject { + pickled_session: self.serializer.serialize_value_as_bytes(&session.pickle().await)?, + needs_backup: !session.backed_up(), + }; + Ok(serde_wasm_bindgen::to_value(&obj)?) + } + + /// Transform a JsValue holding a [`InboundGroupSessionIndexedDbObject`] + /// back into a [`InboundGroupSession`]. + fn deserialize_inbound_group_session( + &self, + stored_value: JsValue, + ) -> Result { + let idb_object: InboundGroupSessionIndexedDbObject = + serde_wasm_bindgen::from_value(stored_value)?; + let pickled_session = + self.serializer.deserialize_value_from_bytes(&idb_object.pickled_session)?; + let session = InboundGroupSession::from_pickle(pickled_session) + .map_err(|e| IndexeddbCryptoStoreError::CryptoStoreError(e.into()))?; + + // Although a "backed up" flag is stored inside `idb_object.pickled_session`, it + // is not maintained when backups are reset. Overwrite the flag with the + // needs_backup value from the IDB object. + if idb_object.needs_backup { + session.reset_backup_state(); + } else { + session.mark_as_backed_up(); + } + + Ok(session) + } + /// Transform a [`GossipRequest`] into a `JsValue` holding a /// [`GossipRequestIndexedDbObject`], ready for storing. fn serialize_gossip_request(&self, gossip_request: &GossipRequest) -> Result { @@ -369,7 +409,7 @@ impl_crypto_store! { keys::IDENTITIES, ), - (!changes.inbound_group_sessions.is_empty(), keys::INBOUND_GROUP_SESSIONS), + (!changes.inbound_group_sessions.is_empty(), keys::INBOUND_GROUP_SESSIONS_V2), (!changes.outbound_group_sessions.is_empty(), keys::OUTBOUND_GROUP_SESSIONS), (!changes.message_hashes.is_empty(), keys::OLM_HASHES), (!changes.withheld_session_info.is_empty(), keys::DIRECT_WITHHELD_INFO), @@ -439,15 +479,14 @@ impl_crypto_store! { } if !changes.inbound_group_sessions.is_empty() { - let sessions = tx.object_store(keys::INBOUND_GROUP_SESSIONS)?; + let sessions = tx.object_store(keys::INBOUND_GROUP_SESSIONS_V2)?; for session in changes.inbound_group_sessions { let room_id = session.room_id(); let session_id = session.session_id(); - let key = self.serializer.encode_key(keys::INBOUND_GROUP_SESSIONS, (room_id, session_id)); - let pickle = session.pickle().await; - - sessions.put_key_val(&key, &self.serializer.serialize_value(&pickle)?)?; + let key = self.serializer.encode_key(keys::INBOUND_GROUP_SESSIONS_V2, (room_id, session_id)); + let value = self.serialize_inbound_group_session(&session).await?; + sessions.put_key_val(&key, &value)?; } } @@ -717,19 +756,18 @@ impl_crypto_store! { room_id: &RoomId, session_id: &str, ) -> Result> { - let key = self.serializer.encode_key(keys::INBOUND_GROUP_SESSIONS, (room_id, session_id)); - if let Some(pickle) = self + let key = self.serializer.encode_key(keys::INBOUND_GROUP_SESSIONS_V2, (room_id, session_id)); + if let Some(value) = self .inner .transaction_on_one_with_mode( - keys::INBOUND_GROUP_SESSIONS, + keys::INBOUND_GROUP_SESSIONS_V2, IdbTransactionMode::Readonly, )? - .object_store(keys::INBOUND_GROUP_SESSIONS)? + .object_store(keys::INBOUND_GROUP_SESSIONS_V2)? .get(&key)? .await? { - let pickle = self.serializer.deserialize_value(pickle)?; - Ok(Some(InboundGroupSession::from_pickle(pickle).map_err(CryptoStoreError::from)?)) + Ok(Some(self.deserialize_inbound_group_session(value)?)) } else { Ok(None) } @@ -739,57 +777,92 @@ impl_crypto_store! { Ok(self .inner .transaction_on_one_with_mode( - keys::INBOUND_GROUP_SESSIONS, + keys::INBOUND_GROUP_SESSIONS_V2, IdbTransactionMode::Readonly, )? - .object_store(keys::INBOUND_GROUP_SESSIONS)? + .object_store(keys::INBOUND_GROUP_SESSIONS_V2)? .get_all()? .await? .iter() - .filter_map(|i| self.serializer.deserialize_value(i).ok()) - .filter_map(|p| InboundGroupSession::from_pickle(p).ok()) + .filter_map(|v| self.deserialize_inbound_group_session(v).ok()) .collect()) } async fn inbound_group_session_counts(&self) -> Result { - let all = self.get_inbound_group_sessions().await?; - let backed_up = all.iter().filter(|s| s.backed_up()).count(); - - Ok(RoomKeyCounts { total: all.len(), backed_up }) + let tx = self + .inner + .transaction_on_one_with_mode( + keys::INBOUND_GROUP_SESSIONS_V2, + IdbTransactionMode::Readonly, + )?; + let store = tx.object_store(keys::INBOUND_GROUP_SESSIONS_V2)?; + let all = store.count()?.await? as usize; + let not_backed_up = store.index(keys::INBOUND_GROUP_SESSIONS_BACKUP_INDEX)?.count()?.await? as usize; + tx.await.into_result()?; + Ok(RoomKeyCounts { total: all, backed_up: all - not_backed_up }) } async fn inbound_group_sessions_for_backup( &self, limit: usize, ) -> Result> { - Ok(self - .get_inbound_group_sessions() - .await? - .into_iter() - .filter(|s| !s.backed_up()) - .take(limit) - .collect()) + let tx = self + .inner + .transaction_on_one_with_mode( + keys::INBOUND_GROUP_SESSIONS_V2, + IdbTransactionMode::Readonly, + )?; + + + let store = tx.object_store(keys::INBOUND_GROUP_SESSIONS_V2)?; + let idx = store.index(keys::INBOUND_GROUP_SESSIONS_BACKUP_INDEX)?; + + // XXX ideally we would use `get_all_with_key_and_limit`, but that doesn't appear to be + // exposed (https://github.com/Alorel/rust-indexed-db/issues/31). Instead we replicate + // the behaviour with a cursor. + let Some(cursor) = idx.open_cursor()?.await? else { + return Ok(vec![]); + }; + + let mut result = Vec::new(); + for _ in 0..limit { + result.push(self.deserialize_inbound_group_session(cursor.value())?); + if !cursor.continue_cursor()?.await? { + break; + } + } + + tx.await.into_result()?; + Ok(result) } async fn reset_backup_state(&self) -> Result<()> { - let inbound_group_sessions = self - .get_inbound_group_sessions() - .await? - .into_iter() - .filter(|s| { - if s.backed_up() { - s.reset_backup_state(); - true - } else { - false + let tx = self + .inner + .transaction_on_one_with_mode( + keys::INBOUND_GROUP_SESSIONS_V2, + IdbTransactionMode::Readwrite, + )?; + + if let Some(cursor) = tx.object_store(keys::INBOUND_GROUP_SESSIONS_V2)?.open_cursor()?.await? { + loop { + let mut idb_object: InboundGroupSessionIndexedDbObject = serde_wasm_bindgen::from_value(cursor.value())?; + if !idb_object.needs_backup { + idb_object.needs_backup = true; + // We don't bother to update the encrypted `InboundGroupSession` object stored + // inside `idb_object.data`, since that would require decryption and encryption. + // Instead, it will be patched up by `deserialize_inbound_group_session`. + let idb_object = serde_wasm_bindgen::to_value(&idb_object)?; + cursor.update(&idb_object)?.await?; } - }) - .collect::>(); - if !inbound_group_sessions.is_empty() { - let changes = Changes { inbound_group_sessions, ..Default::default() }; - self.save_changes(changes).await?; + + if !cursor.continue_cursor()?.await? { + break; + } + } } - Ok(()) + + Ok(tx.await.into_result()?) } async fn save_tracked_users(&self, users: &[(&UserId, bool)]) -> Result<()> { @@ -1110,6 +1183,30 @@ struct GossipRequestIndexedDbObject { unsent: bool, } +/// The objects we store in the inbound_group_sessions2 indexeddb object store +#[derive(Debug, serde::Serialize, serde::Deserialize)] +struct InboundGroupSessionIndexedDbObject { + /// (Possibly encrypted) serialisation of a + /// [`matrix_sdk_crypto::olm::group_sessions::PickledInboundGroupSession`] + /// structure. + pickled_session: Vec, + + /// Whether the session data has yet to be backed up. + /// + /// Since we only need to be able to find entries where this is `true`, we + /// skip serialization in cases where it is `false`. That has the effect + /// of omitting it from the indexeddb index. + /// + /// We also use a custom serializer because bools can't be used as keys in + /// indexeddb. + #[serde( + default, + skip_serializing_if = "std::ops::Not::not", + with = "crate::serialize_bool_for_indexeddb" + )] + needs_backup: bool, +} + #[cfg(all(test, target_arch = "wasm32"))] mod tests { use matrix_sdk_crypto::cryptostore_integration_tests;