Skip to content

Commit

Permalink
refactor(send_queue): Introduce QueueWedgeError, move logic out of ffi
Browse files Browse the repository at this point in the history
  • Loading branch information
BillCarsonFr committed Oct 15, 2024
1 parent 2eca727 commit e769600
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 96 deletions.
71 changes: 4 additions & 67 deletions bindings/matrix-sdk-ffi/src/timeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ use crate::{
mod content;

pub use content::MessageContent;
use matrix_sdk::deserialized_responses::QueueWedgeError;

#[derive(uniffi::Object)]
#[repr(transparent)]
Expand Down Expand Up @@ -908,42 +909,11 @@ pub enum EventSendState {
/// The local event has not been sent yet.
NotSentYet,

/// One or more verified users in the room has an unsigned device.
///
/// Happens only when the room key recipient strategy (as set by
/// [`ClientBuilder::room_key_recipient_strategy`]) has
/// [`error_on_verified_user_problem`](CollectStrategy::DeviceBasedStrategy::error_on_verified_user_problem) set.
VerifiedUserHasUnsignedDevice {
/// The unsigned devices belonging to verified users. A map from user ID
/// to a list of device IDs.
devices: HashMap<String, Vec<String>>,
},

/// One or more verified users in the room has changed identity since they
/// were verified.
///
/// Happens only when the room key recipient strategy (as set by
/// [`ClientBuilder::room_key_recipient_strategy`]) has
/// [`error_on_verified_user_problem`](CollectStrategy::DeviceBasedStrategy::error_on_verified_user_problem)
/// set, or when using [`CollectStrategy::IdentityBasedStrategy`].
VerifiedUserChangedIdentity {
/// The users that were previously verified, but are no longer
users: Vec<String>,
},

/// The user does not have cross-signing set up, but
/// [`CollectStrategy::IdentityBasedStrategy`] was used.
CrossSigningNotSetup,

/// The current device is not verified, but
/// [`CollectStrategy::IdentityBasedStrategy`] was used.
SendingFromUnverifiedDevice,

/// The local event has been sent to the server, but unsuccessfully: The
/// sending has failed.
SendingFailed {
/// Stringified error message.
error: String,
/// The error reason, with information for the user.
error: QueueWedgeError,
/// Whether the error is considered recoverable or not.
///
/// An error that's recoverable will disable the room's send queue,
Expand All @@ -962,46 +932,13 @@ impl From<&matrix_sdk_ui::timeline::EventSendState> for EventSendState {
match value {
NotSentYet => Self::NotSentYet,
SendingFailed { error, is_recoverable } => {
event_send_state_from_sending_failed(error, *is_recoverable)
Self::SendingFailed { is_recoverable: *is_recoverable, error: error.clone() }
}
Sent { event_id } => Self::Sent { event_id: event_id.to_string() },
}
}
}

fn event_send_state_from_sending_failed(error: &Error, is_recoverable: bool) -> EventSendState {
use matrix_sdk::crypto::{OlmError, SessionRecipientCollectionError::*};

match error {
// Special-case the SessionRecipientCollectionErrors, to pass the information they contain
// back to the application.
Error::OlmError(OlmError::SessionRecipientCollectionError(error)) => match error {
VerifiedUserHasUnsignedDevice(devices) => {
let devices = devices
.iter()
.map(|(user_id, devices)| {
(
user_id.to_string(),
devices.iter().map(|device_id| device_id.to_string()).collect(),
)
})
.collect();
EventSendState::VerifiedUserHasUnsignedDevice { devices }
}

VerifiedUserChangedIdentity(bad_users) => EventSendState::VerifiedUserChangedIdentity {
users: bad_users.iter().map(|user_id| user_id.to_string()).collect(),
},

CrossSigningNotSetup => EventSendState::CrossSigningNotSetup,

SendingFromUnverifiedDevice => EventSendState::SendingFromUnverifiedDevice,
},

_ => EventSendState::SendingFailed { error: error.to_string(), is_recoverable },
}
}

/// Recommended decorations for decrypted messages, representing the message's
/// authenticity properties.
#[derive(uniffi::Enum, Clone)]
Expand Down
49 changes: 48 additions & 1 deletion crates/matrix-sdk-common/src/deserialized_responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{collections::BTreeMap, fmt};
use std::{
collections::{BTreeMap, HashMap},
fmt,
};

use ruma::{
events::{AnyMessageLikeEvent, AnySyncTimelineEvent, AnyTimelineEvent},
Expand Down Expand Up @@ -703,6 +706,50 @@ impl From<SyncTimelineEventDeserializationHelperV0> for SyncTimelineEvent {
}
}

/// Represent a failed to send error of an event sent via the send_queue.
/// These errors are not automatically retrievable, but yet some manual action
/// can be taken before retry sending.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
pub enum QueueWedgeError {
/// This error occurs when there are some insecure devices in the room, and
/// the current encryption setting prohibit sharing with them.
InsecureDevices {
/// The insecure devices as a Map of userID to deviceID.
user_device_map: HashMap<String, Vec<String>>,
},
/// This error occurs when a previously verified user is not anymore, and
/// the current encryption setting prohibit sharing when it happens.
IdentityViolations {
/// The users that are expected to be verified but are not.
users: Vec<String>,
},
/// It is required to set up cross-signing and properly verify the current
/// session before sending.
CrossVerificationRequired,
/// Other errors.
GenericApiError { msg: String },
}

/// Simple display implementation that strips out userIds/DeviceIds to avoid
/// accidental logging.
impl fmt::Display for QueueWedgeError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
QueueWedgeError::InsecureDevices { .. } => {
f.write_str("There are insecure devices in the room")
}
QueueWedgeError::IdentityViolations { .. } => {
f.write_str("Some users that were previously verified are not anymore")
}
QueueWedgeError::CrossVerificationRequired => {
f.write_str("Own verification is required")
}
QueueWedgeError::GenericApiError { msg } => f.write_str(msg),
}
}
}

#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
Expand Down
12 changes: 6 additions & 6 deletions crates/matrix-sdk-ui/src/timeline/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use matrix_sdk::{
},
Result, Room,
};
use matrix_sdk_base::deserialized_responses::QueueWedgeError;
use ruma::{
api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
events::{
Expand Down Expand Up @@ -1262,9 +1263,9 @@ impl<P: RoomDataProvider> TimelineController<P> {
EventSendState::SendingFailed {
// Put a dummy error in this case, since we're not persisting the errors
// that occurred in previous sessions.
error: Arc::new(matrix_sdk::Error::UnknownError(Box::new(
MissingLocalEchoFailError,
))),
error: QueueWedgeError::GenericApiError {
msg: MISSING_LOCAL_ECHO_FAIL_ERROR.into(),
},
is_recoverable: false,
},
)
Expand Down Expand Up @@ -1521,9 +1522,8 @@ impl TimelineController {
}
}

#[derive(Debug, Error)]
#[error("local echo failed to send in a previous session")]
struct MissingLocalEchoFailError;
const MISSING_LOCAL_ECHO_FAIL_ERROR: &'static str =
"local echo failed to send in a previous session";

#[derive(Debug, Default)]
pub(super) struct HandleManyEventsResult {
Expand Down
7 changes: 3 additions & 4 deletions crates/matrix-sdk-ui/src/timeline/event_item/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use as_variant::as_variant;
use matrix_sdk::{send_queue::SendHandle, Error};
use matrix_sdk::send_queue::SendHandle;
use matrix_sdk_base::deserialized_responses::QueueWedgeError;
use ruma::{EventId, OwnedEventId, OwnedTransactionId};

use super::TimelineEventItemId;
Expand Down Expand Up @@ -70,7 +69,7 @@ pub enum EventSendState {
/// sending has failed.
SendingFailed {
/// Details about how sending the event failed.
error: Arc<Error>,
error: QueueWedgeError,
/// Whether the error is considered recoverable or not.
///
/// An error that's recoverable will disable the room's send queue,
Expand Down
12 changes: 4 additions & 8 deletions crates/matrix-sdk-ui/src/timeline/tests/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{io, sync::Arc};

use assert_matches::assert_matches;
use eyeball_im::VectorDiff;
use matrix_sdk::{
assert_next_matches_with_timeout, send_queue::RoomSendQueueUpdate,
test_utils::events::EventFactory, Error,
test_utils::events::EventFactory,
};
use matrix_sdk_base::deserialized_responses::QueueWedgeError;
use matrix_sdk_test::{async_test, ALICE, BOB};
use ruma::{
event_id,
Expand Down Expand Up @@ -66,15 +65,12 @@ async fn test_remote_echo_full_trip() {
// Scenario 2: The local event has not been sent to the server successfully, it
// has failed. In this case, there is no event ID.
{
let some_io_error = Error::Io(io::Error::new(io::ErrorKind::Other, "this is a test"));
let some_io_error = QueueWedgeError::GenericApiError { msg: "this is a test".to_owned() };
timeline
.controller
.update_event_send_state(
&txn_id,
EventSendState::SendingFailed {
error: Arc::new(some_io_error),
is_recoverable: true,
},
EventSendState::SendingFailed { error: some_io_error, is_recoverable: true },
)
.await;

Expand Down
52 changes: 44 additions & 8 deletions crates/matrix-sdk/src/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,23 @@ use matrix_sdk_base::{
},
RoomState, StoreError,
};
use matrix_sdk_common::executor::{spawn, JoinHandle};
use matrix_sdk_common::{
deserialized_responses::QueueWedgeError,
executor::{spawn, JoinHandle},
};
use ruma::{
events::{
reaction::ReactionEventContent, relation::Annotation, AnyMessageLikeEventContent,
EventContent as _,
},
serde::Raw,
EventId, OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId,
EventId, OwnedEventId, OwnedRoomId, OwnedTransactionId, OwnedUserId, TransactionId,
};
use tokio::sync::{broadcast, Notify, RwLock};
use tracing::{debug, error, info, instrument, trace, warn};

#[cfg(feature = "e2e-encryption")]
use crate::crypto::{OlmError, SessionRecipientCollectionError};
use crate::{
client::WeakClient,
config::RequestConfig,
Expand Down Expand Up @@ -187,7 +192,7 @@ pub struct SendQueueRoomError {
pub room_id: OwnedRoomId,

/// The error the room has ran into, when trying to send an event.
pub error: Arc<crate::Error>,
pub error: QueueWedgeError,

/// Whether the error is considered recoverable or not.
///
Expand Down Expand Up @@ -494,6 +499,8 @@ impl RoomSendQueue {
_ => false,
};

let wedged_err = convert_error_to_wedged_reason(&err);

if is_recoverable {
warn!(txn_id = %queued_event.transaction_id, error = ?err, "Recoverable error when sending event: {err}, disabling send queue");

Expand All @@ -520,17 +527,15 @@ impl RoomSendQueue {
}
}

let error = Arc::new(err);

let _ = global_error_reporter.send(SendQueueRoomError {
room_id: room.room_id().to_owned(),
error: error.clone(),
error: wedged_err.clone(),
is_recoverable,
});

let _ = updates.send(RoomSendQueueUpdate::SendError {
transaction_id: queued_event.transaction_id,
error,
error: wedged_err,
is_recoverable,
});
}
Expand Down Expand Up @@ -577,6 +582,37 @@ impl RoomSendQueue {
}
}

fn convert_error_to_wedged_reason(value: &crate::Error) -> QueueWedgeError {
match value {
#[cfg(feature = "e2e-encryption")]
crate::Error::OlmError(OlmError::SessionRecipientCollectionError(error)) => match error {
SessionRecipientCollectionError::VerifiedUserHasUnsignedDevice(user_map) => {
QueueWedgeError::InsecureDevices {
user_device_map: user_map
.iter()
.map(|(user_id, devices)| {
(
user_id.to_string(),
devices.iter().map(|device_id| device_id.to_string()).collect(),
)
})
.collect(),
}
}
SessionRecipientCollectionError::VerifiedUserChangedIdentity(users) => {
QueueWedgeError::IdentityViolations {
users: users.iter().map(OwnedUserId::to_string).collect(),
}
}
SessionRecipientCollectionError::CrossSigningNotSetup
| SessionRecipientCollectionError::SendingFromUnverifiedDevice => {
QueueWedgeError::CrossVerificationRequired
}
},
_ => QueueWedgeError::GenericApiError { msg: value.to_string() },
}
}

struct RoomSendQueueInner {
/// The room which this send queue relates to.
room: WeakRoom,
Expand Down Expand Up @@ -1193,7 +1229,7 @@ pub enum RoomSendQueueUpdate {
/// Transaction id used to identify this event.
transaction_id: OwnedTransactionId,
/// Error received while sending the event.
error: Arc<crate::Error>,
error: QueueWedgeError,
/// Whether the error is considered recoverable or not.
///
/// An error that's recoverable will disable the room's send queue,
Expand Down
4 changes: 2 additions & 2 deletions crates/matrix-sdk/tests/integration/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
use assert_matches2::{assert_let, assert_matches};
use matrix_sdk::{
config::{RequestConfig, StoreConfig},
deserialized_responses::QueueWedgeError,
send_queue::{LocalEcho, LocalEchoContent, RoomSendQueueError, RoomSendQueueUpdate},
test_utils::{
events::EventFactory, logged_in_client, logged_in_client_with_server, set_client_session,
Expand Down Expand Up @@ -471,8 +472,7 @@ async fn test_error_then_locally_reenabling() {
// seconds).
// It's the same transaction id that's used to signal the send error.
let error = assert_update!(watch => error { recoverable=true, txn=txn1 });
let error = error.as_client_api_error().unwrap();
assert_eq!(error.status_code, 500);
assert_matches!(error, QueueWedgeError::GenericApiError { .. });

sleep(Duration::from_millis(50)).await;

Expand Down

0 comments on commit e769600

Please sign in to comment.