Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(group): make MLS group thread safe #1349 #1404

Open
wants to merge 23 commits into
base: main
Choose a base branch
from

Conversation

mchenani
Copy link
Contributor

Changes:
Make adding and removing members in an MLS group thread-safe.
Add Mutex<HashMap<Vec<u8>, Arc<Semaphore>>> to have a lock per group
Issue:
In rare parallel scenarios, invoking Add or Remove members in an MLS group could result in a ForkedGroups issue, leaving some users in an outdated state. This problem arises when the group state is fetched, and a commit is generated, potentially publishing two or more intents with the same group state. If one of these intents gets published and its within commit is merged, the second intent, even if republished, continues to reference the outdated group state.

As a result, the client is unable to decrypt the commit due to an AEAD error. Furthermore, all upcoming messages, belong to future epoch, remain undecryptable.

@mchenani mchenani requested a review from a team as a code owner December 11, 2024 16:29
…hread-safe-groups

# Conflicts:
#	xmtp_mls/src/client.rs
#	xmtp_mls/src/groups/mls_sync.rs
#	xmtp_mls/src/groups/mod.rs
#	xmtp_mls/src/groups/subscriptions.rs
#	xmtp_mls/src/subscriptions.rs
operation: F,
) -> Result<R, GroupError>
where
F: FnOnce(OpenMlsGroup) -> Result<R, GroupError>,
Copy link
Contributor

@codabrink codabrink Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than taking in a closure, what would you think about creating a Lock struct like LockedOpenMlsGroup that wraps the mls group and returning that? Like a MutexGuard does.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tbh not sure if I get your solution, could you please elaborate on it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like this

@@ -358,6 +358,23 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
         operation(mls_group)
     }
 
+    pub(crate) fn lock(
+        self,
+        provider: &impl OpenMlsProvider,
+    ) -> Result<LockedMlsGroup, GroupError> {
+        // Get the group ID for locking
+        let group_id = self.group_id.clone();
+
+        // Acquire the lock synchronously using blocking_lock
+        let lock = MLS_COMMIT_LOCK.get_lock_sync(group_id.clone())?;
+        // Load the MLS group
+        let group = OpenMlsGroup::load(provider.storage(), &GroupId::from_slice(&self.group_id))
+            .map_err(|_| GroupError::GroupNotFound)?
+            .ok_or(GroupError::GroupNotFound)?;
+
+        Ok(LockedMlsGroup { group, lock })
+    }
+
     // Load the stored OpenMLS group from the OpenMLS provider's keystore
     #[tracing::instrument(level = "trace", skip_all)]
     pub(crate) async fn load_mls_group_with_lock_async<F, E, R, Fut>(
@@ -1643,6 +1660,18 @@ fn build_group_join_config() -> MlsGroupJoinConfig {
         .build()
 }
 
+struct LockedMlsGroup {
+    group: OpenMlsGroup,
+    lock: SemaphoreGuard,
+}
+
+impl Deref for LockedMlsGroup {
+    type Target = OpenMlsGroup;
+    fn deref(&self) -> &Self::Target {
+        &self.group
+    }
+}
+
 #[cfg(test)]
 pub(crate) mod tests {
     #[cfg(target_arch = "wasm32")]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codabrink I'll suggest that lets consider your solution as a refactor for later, tbh for now kinda takes more time to implement and adjust the code, wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want, I can help you swap out the patterns. Shouldn't take long. The main reason I suggested the change is this approach would result in much less line changes. Wrapping everything in a closure makes it hard to track what changed between this and what was before it, which is a little scary. If we end up keeping this pattern, it's not the end of the world.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect. Let's do it in an online pair session today and see if we can achieve it in a short time.

@mchenani mchenani requested review from insipx and a team December 12, 2024 21:38
Copy link
Contributor

@insipx insipx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice ! Huge, hope we run into less "drop everything the world is forked" now. Left a comment about an additional test that could help us figure out possible next steps

would be good to maybe get @cameronvoell and @neekolas review? otherwise good to go, hope we can get bindings out tomorrow morning

};

// Synchronously acquire the permit
let permit = semaphore.clone().try_acquire_owned()?;
Copy link
Contributor

@insipx insipx Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the part that I'm most uncertain about, it is most problematic in specific multi-threaded circumstances, which I think would be worth creating a test for, probably in bindings_ffi since that's where multi-threading is most relevant. wasm will always be single-threaded

  • Create client
  • create a reference to the client, and start a stream_all_messages with this reference in its own thread.
  • clone two references to client
  • spawn two threads each holding a reference to this client
  • do a few syncs in each thread with each client reference. on the main thread, send a bunch of messages with a different user-client
  • in addition to messages, can try to do operations on the group like updating the name/picture/etc, since those predominantly use the sync variant

the aim here is to try to create a situation that races the two syncs to acquire a (syncronous) permit. The worst case is if we lose messages in a stream, since it can be cumbersome for integrators to restart streams. An error from calling sync is less bad and recoverable by integrators, but we may want to create a descriptive error message for this case (like, "critical updates already ongoing" or something) so it doesn't cause too much turmoil in our support chats when it happens. An error because a sync is happening when trying to update the group photo or something else is probably the least bad

Maybe can be done in a follow up?

Copy link
Contributor

Curious why we are using a Semaphore instead of a Mutex for each group, given that we only want one thread to be able to operate on a group at a time.

Copy link
Contributor

@cameronvoell cameronvoell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment on testing draft pr in xmtp-android - Im seeing a crash when running group streaming tests, but still need to test to see if it is caused by this PR, or another recent change in libxmtp Confirmed the latest libxmtp main works fine in xmtp-android, so something in this PR is causing a crash in streaming tests in xmtp-android and xmtp-react-native

xmtp-android test: xmtp/xmtp-android#350 (comment)
xmtp-react-native test: xmtp/xmtp-react-native#566 (comment)

@mchenani
Copy link
Contributor Author

Curious why we are using a Semaphore instead of a Mutex for each group, given that we only want one thread to be able to operate on a group at a time.

In the beginning it was so easy to shift to locking the group without touching a lot and it's more reliable from POV.
Reasons:

  1. I didn't want to touch the group object itself
  2. Still not sure what would affect if I just make the group mut, cuz some places obtaining the lock is easier to track the group reference.

But still, today will go through what @codabrink suggested and if it doesn't require a lot of time we shift to mut solution.

@mchenani
Copy link
Contributor Author

Curious why we are using a Semaphore instead of a Mutex for each group, given that we only want one thread to be able to operate on a group at a time.

In the beginning it was so easy to shift to locking the group without touching a lot and it's more reliable from POV. Reasons:

  1. I didn't want to touch the group object itself
  2. Still not sure what would affect if I just make the group mut, cuz some places obtaining the lock is easier to track the group reference.

But still, today will go through what @codabrink suggested and if it doesn't require a lot of time we shift to mut solution.

Just tested the mutex, not sure but I see more changes needed to use mutex instead of semaphore based on our current code base. at the end changing from Semaphore to Mutex shouldn't be that hard for us since the lock is isolated in one place.

@mchenani
Copy link
Contributor Author

Curious why we are using a Semaphore instead of a Mutex for each group, given that we only want one thread to be able to operate on a group at a time.

Hey again! sorry for the noises, today I tested the Coda's solutions (not directly related to your questions) but that helped to replace the semaphore with the mutex; so my conclusion is: If we want just to use a mutex hashset, we need to take care of the blocking if the lock is not acquired or not inserted manually! Semaphore handles that automatically!

@cameronvoell cameronvoell self-requested a review December 13, 2024 20:28
Copy link
Contributor

@cameronvoell cameronvoell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tested that latest commit in xmtp-android and xmtp-react-native. The stream crash issues seem resolved, but unfortunately the original React Native fork reproduction is failing again about 4/5 times, so I think these changes are not addressing the underlying issue they were initially trying to prevent.

comment on xmtp-react-native test draft pr here with logs of both a passing and failing result xmtp/xmtp-react-native#566 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants