Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor and clean up the SyncService #4543

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 65 additions & 74 deletions crates/matrix-sdk-ui/src/sync_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
//! MUST observe. Whenever an error/termination is observed, the user MUST call
//! [`SyncService::start()`] again to restart the room list sync.

use std::sync::{Arc, Mutex};
use std::sync::Arc;

use eyeball::{SharedObservable, Subscriber};
use futures_core::Future;
Expand Down Expand Up @@ -67,48 +67,40 @@ pub enum State {
Error,
}

pub struct SyncService {
/// Room list service used to synchronize the rooms state.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you remove all the fields comments? :/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, one of them was lost in the move. Or do you mean all those should be replicated in the Inner struct?

Anyways, I added the missing one back: 9ae0235.

struct SyncServiceInner {
room_list_service: Arc<RoomListService>,

/// Encryption sync taking care of e2ee events.
encryption_sync_service: Arc<EncryptionSyncService>,

/// What's the state of this sync service?
state: SharedObservable<State>,

/// Use a mutex everytime to modify the `state` value, otherwise it would be
/// possible to have race conditions when starting or pausing the
/// service multiple times really quickly.
modifying_state: AsyncMutex<()>,

/// Task running the room list service.
room_list_task: Arc<Mutex<Option<JoinHandle<()>>>>,

/// Task running the encryption sync.
encryption_sync_task: Arc<Mutex<Option<JoinHandle<()>>>>,

/// Global lock to allow using at most one `EncryptionSyncService` at all
/// times.
///
/// This ensures that there's only one ever existing in the application's
/// lifetime (under the assumption that there is at most one
/// `SyncService` per application).
encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,

/// Scheduler task ensuring proper termination.
///
/// This task is waiting for a `TerminationReport` from any of the other two
/// tasks, or from a user request via [`Self::stop()`]. It makes sure
/// that the two services are properly shut up and just interrupted.
///
/// This is set at the same time as the other two tasks.
scheduler_task: Arc<Mutex<Option<JoinHandle<()>>>>,

scheduler_task: Option<JoinHandle<()>>,
/// `TerminationReport` sender for the [`Self::stop()`] function.
///
/// This is set at the same time as all the tasks in [`Self::start()`].
scheduler_sender: Mutex<Option<Sender<TerminationReport>>>,
scheduler_sender: Option<Sender<TerminationReport>>,
}

pub struct SyncService {
inner: Arc<AsyncMutex<SyncServiceInner>>,
room_list_service: Arc<RoomListService>,
/// What's the state of this sync service? This field is replicated from the
/// [`SyncServiceInner`] struct, but it should not be modified in this
/// struct. It's re-exposed here so we can subscribe to the state
/// without taking the lock on the `inner` field.
state: SharedObservable<State>,
/// Global lock to allow using at most one `EncryptionSyncService` at all
/// times.
///
/// This ensures that there's only one ever existing in the
/// application's lifetime (under the assumption that there is at most
/// one `SyncService` per application).
encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
poljar marked this conversation as resolved.
Show resolved Hide resolved
}

impl SyncService {
Expand All @@ -134,13 +126,14 @@ impl SyncService {
/// the other one too).
fn spawn_scheduler_task(
&self,
inner: &SyncServiceInner,
room_list_task: JoinHandle<()>,
encryption_sync_task: JoinHandle<()>,
mut receiver: Receiver<TerminationReport>,
) -> impl Future<Output = ()> {
let encryption_sync_task = self.encryption_sync_task.clone();
let encryption_sync = self.encryption_sync_service.clone();
let room_list_service = self.room_list_service.clone();
let room_list_task = self.room_list_task.clone();
let state = self.state.clone();
let encryption_sync = inner.encryption_sync_service.clone();
let room_list_service = inner.room_list_service.clone();
let state = inner.state.clone();

async move {
let Some(report) = receiver.recv().await else {
Expand All @@ -165,13 +158,8 @@ impl SyncService {
}
}

{
let task = room_list_task.lock().unwrap().take();
if let Some(task) = task {
if let Err(err) = task.await {
error!("when awaiting room list service: {err:#}");
}
}
if let Err(err) = room_list_task.await {
error!("when awaiting room list service: {err:#}");
}

if stop_encryption {
Expand All @@ -180,13 +168,8 @@ impl SyncService {
}
}

{
let task = encryption_sync_task.lock().unwrap().take();
if let Some(task) = task {
if let Err(err) = task.await {
error!("when awaiting encryption sync: {err:#}");
}
}
if let Err(err) = encryption_sync_task.await {
error!("when awaiting encryption sync: {err:#}");
}

if report.is_error {
Expand Down Expand Up @@ -306,7 +289,7 @@ impl SyncService {
/// - if the stream has been aborted before, it will be properly cleaned up
/// and restarted.
pub async fn start(&self) {
let _guard = self.modifying_state.lock().await;
let mut inner = self.inner.lock().await;

// Only (re)start the tasks if any was stopped.
if matches!(self.state.get(), State::Running) {
Expand All @@ -319,20 +302,25 @@ impl SyncService {
let (sender, receiver) = tokio::sync::mpsc::channel(16);

// First, take care of the room list.
*self.room_list_task.lock().unwrap() =
Some(spawn(Self::room_list_sync_task(self.room_list_service.clone(), sender.clone())));
let room_list_task =
spawn(Self::room_list_sync_task(self.room_list_service.clone(), sender.clone()));

// Then, take care of the encryption sync.
let sync_permit_guard = self.encryption_sync_permit.clone().lock_owned().await;
*self.encryption_sync_task.lock().unwrap() = Some(spawn(Self::encryption_sync_task(
self.encryption_sync_service.clone(),
let sync_permit_guard = inner.encryption_sync_permit.clone().lock_owned().await;
let encryption_sync_task = spawn(Self::encryption_sync_task(
inner.encryption_sync_service.clone(),
sender.clone(),
sync_permit_guard,
)));
));

// Spawn the scheduler task.
*self.scheduler_sender.lock().unwrap() = Some(sender);
*self.scheduler_task.lock().unwrap() = Some(spawn(self.spawn_scheduler_task(receiver)));
inner.scheduler_sender = Some(sender);
inner.scheduler_task = Some(spawn(self.spawn_scheduler_task(
&inner,
room_list_task,
encryption_sync_task,
receiver,
)));

self.state.set(State::Running);
}
Expand All @@ -344,7 +332,7 @@ impl SyncService {
/// necessary.
#[instrument(skip_all)]
pub async fn stop(&self) -> Result<(), Error> {
let _guard = self.modifying_state.lock().await;
let mut inner = self.inner.lock().await;

match self.state.get() {
State::Idle | State::Terminated | State::Error => {
Expand All @@ -360,7 +348,7 @@ impl SyncService {
// later, so that we're in a clean state independently of the request to
// stop.

let sender = self.scheduler_sender.lock().unwrap().clone();
let sender = inner.scheduler_sender.clone();
sender
.ok_or_else(|| {
error!("missing sender");
Expand All @@ -377,7 +365,7 @@ impl SyncService {
Error::InternalSchedulerError
})?;

let scheduler_task = self.scheduler_task.lock().unwrap().take();
let scheduler_task = inner.scheduler_task.take();
scheduler_task
.ok_or_else(|| {
error!("missing scheduler task");
Expand Down Expand Up @@ -420,11 +408,8 @@ struct TerminationReport {
#[doc(hidden)]
impl SyncService {
/// Return the existential states of internal tasks.
pub fn task_states(&self) -> (bool, bool) {
(
self.encryption_sync_task.lock().unwrap().is_some(),
self.room_list_task.lock().unwrap().is_some(),
)
pub async fn task_state(&self) -> bool {
self.inner.lock().await.scheduler_task.is_some()
}
}

Expand Down Expand Up @@ -476,16 +461,22 @@ impl SyncServiceBuilder {
.await?,
);

let room_list_service = Arc::new(room_list);
let state = SharedObservable::new(State::Idle);

Ok(SyncService {
room_list_service: Arc::new(room_list),
encryption_sync_service: encryption_sync,
encryption_sync_task: Arc::new(Mutex::new(None)),
room_list_task: Arc::new(Mutex::new(None)),
scheduler_task: Arc::new(Mutex::new(None)),
scheduler_sender: Mutex::new(None),
state: SharedObservable::new(State::Idle),
modifying_state: AsyncMutex::new(()),
encryption_sync_permit,
state: state.clone(),
room_list_service: room_list_service.clone(),
encryption_sync_permit: encryption_sync_permit.clone(),

inner: Arc::new(AsyncMutex::new(SyncServiceInner {
scheduler_task: None,
scheduler_sender: None,
room_list_service,
encryption_sync_service: encryption_sync,
state,
encryption_sync_permit,
})),
})
}
}
Expand Down
10 changes: 5 additions & 5 deletions crates/matrix-sdk-ui/tests/integration/sync_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,19 @@ async fn test_sync_service_state() -> anyhow::Result<()> {
// At first, the sync service is sleeping.
assert_eq!(state_stream.get(), State::Idle);
assert!(server.received_requests().await.unwrap().is_empty());
assert_eq!(sync_service.task_states(), (false, false));
assert_eq!(sync_service.task_state().await, false);
assert!(sync_service.try_get_encryption_sync_permit().is_some());

// After starting, the sync service is, well, running.
sync_service.start().await;
assert_next_matches!(state_stream, State::Running);
assert_eq!(sync_service.task_states(), (true, true));
assert_eq!(sync_service.task_state().await, true);
assert!(sync_service.try_get_encryption_sync_permit().is_none());

// Restarting while started doesn't change the current state.
sync_service.start().await;
assert_pending!(state_stream);
assert_eq!(sync_service.task_states(), (true, true));
assert_eq!(sync_service.task_state().await, true);
assert!(sync_service.try_get_encryption_sync_permit().is_none());

// Let the server respond a few times.
Expand All @@ -94,7 +94,7 @@ async fn test_sync_service_state() -> anyhow::Result<()> {
// Pausing will stop both syncs, after a bit of delay.
sync_service.stop().await?;
assert_next_matches!(state_stream, State::Idle);
assert_eq!(sync_service.task_states(), (false, false));
assert_eq!(sync_service.task_state().await, false);
assert!(sync_service.try_get_encryption_sync_permit().is_some());

let mut num_encryption_sync_requests: i32 = 0;
Expand Down Expand Up @@ -149,7 +149,7 @@ async fn test_sync_service_state() -> anyhow::Result<()> {
// the same position than just before being stopped.
sync_service.start().await;
assert_next_matches!(state_stream, State::Running);
assert_eq!(sync_service.task_states(), (true, true));
assert_eq!(sync_service.task_state().await, true);
assert!(sync_service.try_get_encryption_sync_permit().is_none());

tokio::time::sleep(Duration::from_millis(100)).await;
Expand Down