-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(group): make MLS group thread safe #1349 #1404
base: main
Are you sure you want to change the base?
Conversation
…hread-safe-groups # Conflicts: # xmtp_mls/src/client.rs # xmtp_mls/src/groups/mls_sync.rs # xmtp_mls/src/groups/mod.rs # xmtp_mls/src/groups/subscriptions.rs # xmtp_mls/src/subscriptions.rs
operation: F, | ||
) -> Result<R, GroupError> | ||
where | ||
F: FnOnce(OpenMlsGroup) -> Result<R, GroupError>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than taking in a closure, what would you think about creating a Lock struct like LockedOpenMlsGroup
that wraps the mls group and returning that? Like a MutexGuard does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tbh not sure if I get your solution, could you please elaborate on it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like this
@@ -358,6 +358,23 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
operation(mls_group)
}
+ pub(crate) fn lock(
+ self,
+ provider: &impl OpenMlsProvider,
+ ) -> Result<LockedMlsGroup, GroupError> {
+ // Get the group ID for locking
+ let group_id = self.group_id.clone();
+
+ // Acquire the lock synchronously using blocking_lock
+ let lock = MLS_COMMIT_LOCK.get_lock_sync(group_id.clone())?;
+ // Load the MLS group
+ let group = OpenMlsGroup::load(provider.storage(), &GroupId::from_slice(&self.group_id))
+ .map_err(|_| GroupError::GroupNotFound)?
+ .ok_or(GroupError::GroupNotFound)?;
+
+ Ok(LockedMlsGroup { group, lock })
+ }
+
// Load the stored OpenMLS group from the OpenMLS provider's keystore
#[tracing::instrument(level = "trace", skip_all)]
pub(crate) async fn load_mls_group_with_lock_async<F, E, R, Fut>(
@@ -1643,6 +1660,18 @@ fn build_group_join_config() -> MlsGroupJoinConfig {
.build()
}
+struct LockedMlsGroup {
+ group: OpenMlsGroup,
+ lock: SemaphoreGuard,
+}
+
+impl Deref for LockedMlsGroup {
+ type Target = OpenMlsGroup;
+ fn deref(&self) -> &Self::Target {
+ &self.group
+ }
+}
+
#[cfg(test)]
pub(crate) mod tests {
#[cfg(target_arch = "wasm32")]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codabrink I'll suggest that lets consider your solution as a refactor for later, tbh for now kinda takes more time to implement and adjust the code, wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want, I can help you swap out the patterns. Shouldn't take long. The main reason I suggested the change is this approach would result in much less line changes. Wrapping everything in a closure makes it hard to track what changed between this and what was before it, which is a little scary. If we end up keeping this pattern, it's not the end of the world.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perfect. Let's do it in an online pair session today and see if we can achieve it in a short time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice ! Huge, hope we run into less "drop everything the world is forked" now. Left a comment about an additional test that could help us figure out possible next steps
would be good to maybe get @cameronvoell and @neekolas review? otherwise good to go, hope we can get bindings out tomorrow morning
xmtp_mls/src/lib.rs
Outdated
}; | ||
|
||
// Synchronously acquire the permit | ||
let permit = semaphore.clone().try_acquire_owned()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the part that I'm most uncertain about, it is most problematic in specific multi-threaded circumstances, which I think would be worth creating a test for, probably in bindings_ffi
since that's where multi-threading is most relevant. wasm will always be single-threaded
- Create client
- create a reference to the client, and start a
stream_all_messages
with this reference in its own thread. - clone two references to client
- spawn two threads each holding a reference to this client
- do a few syncs in each thread with each client reference. on the main thread, send a bunch of messages with a different user-client
- in addition to messages, can try to do operations on the group like updating the name/picture/etc, since those predominantly use the sync variant
the aim here is to try to create a situation that races the two syncs to acquire a (syncronous) permit. The worst case is if we lose messages in a stream, since it can be cumbersome for integrators to restart streams. An error from calling sync is less bad and recoverable by integrators, but we may want to create a descriptive error message for this case (like, "critical updates already ongoing" or something) so it doesn't cause too much turmoil in our support chats when it happens. An error because a sync is happening when trying to update the group photo or something else is probably the least bad
Maybe can be done in a follow up?
Curious why we are using a Semaphore instead of a Mutex for each group, given that we only want one thread to be able to operate on a group at a time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment on testing draft pr in xmtp-android - Im seeing a crash when running group streaming tests, but still need to test to see if it is caused by this PR, or another recent change in libxmtp Confirmed the latest libxmtp main
works fine in xmtp-android, so something in this PR is causing a crash in streaming tests in xmtp-android and xmtp-react-native
xmtp-android test: xmtp/xmtp-android#350 (comment)
xmtp-react-native test: xmtp/xmtp-react-native#566 (comment)
In the beginning it was so easy to shift to locking the group without touching a lot and it's more reliable from POV.
But still, today will go through what @codabrink suggested and if it doesn't require a lot of time we shift to mut solution. |
Just tested the mutex, not sure but I see more changes needed to use mutex instead of semaphore based on our current code base. at the end changing from Semaphore to Mutex shouldn't be that hard for us since the lock is isolated in one place. |
Hey again! sorry for the noises, today I tested the Coda's solutions (not directly related to your questions) but that helped to replace the semaphore with the mutex; so my conclusion is: If we want just to use a mutex hashset, we need to take care of the blocking if the lock is not acquired or not inserted manually! Semaphore handles that automatically! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tested that latest commit in xmtp-android and xmtp-react-native. The stream crash issues seem resolved, but unfortunately the original React Native fork reproduction is failing again about 4/5 times, so I think these changes are not addressing the underlying issue they were initially trying to prevent.
comment on xmtp-react-native test draft pr here with logs of both a passing and failing result xmtp/xmtp-react-native#566 (comment)
Changes:
Make adding and removing members in an MLS group thread-safe.
Add
Mutex<HashMap<Vec<u8>, Arc<Semaphore>>>
to have a lock per groupIssue:
In rare parallel scenarios, invoking Add or Remove members in an MLS group could result in a ForkedGroups issue, leaving some users in an outdated state. This problem arises when the group state is fetched, and a commit is generated, potentially publishing two or more intents with the same group state. If one of these intents gets published and its within commit is merged, the second intent, even if republished, continues to reference the outdated group state.
As a result, the client is unable to decrypt the commit due to an AEAD error. Furthermore, all upcoming messages, belong to future epoch, remain undecryptable.