Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(group): make MLS group thread safe #1349 #1404

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1396,7 +1396,7 @@ impl FfiConversation {

pub fn group_image_url_square(&self) -> Result<String, GenericError> {
let provider = self.inner.mls_provider()?;
Ok(self.inner.group_image_url_square(provider)?)
Ok(self.inner.group_image_url_square(&provider)?)
}

pub async fn update_group_description(
Expand All @@ -1412,7 +1412,7 @@ impl FfiConversation {

pub fn group_description(&self) -> Result<String, GenericError> {
let provider = self.inner.mls_provider()?;
Ok(self.inner.group_description(provider)?)
Ok(self.inner.group_description(&provider)?)
}

pub async fn update_group_pinned_frame_url(
Expand Down Expand Up @@ -1546,7 +1546,9 @@ impl FfiConversation {

pub fn group_metadata(&self) -> Result<Arc<FfiConversationMetadata>, GenericError> {
let provider = self.inner.mls_provider()?;
let metadata = self.inner.metadata(provider)?;
let metadata = tokio::task::block_in_place(|| {
futures::executor::block_on(self.inner.metadata(&provider))
})?;
Ok(Arc::new(FfiConversationMetadata {
inner: Arc::new(metadata),
}))
Expand All @@ -1558,7 +1560,9 @@ impl FfiConversation {

pub fn conversation_type(&self) -> Result<FfiConversationType, GenericError> {
let provider = self.inner.mls_provider()?;
let conversation_type = self.inner.conversation_type(&provider)?;
let conversation_type = tokio::task::block_in_place(|| {
futures::executor::block_on(self.inner.conversation_type(&provider))
})?;
Ok(conversation_type.into())
}
}
Expand Down
28 changes: 15 additions & 13 deletions bindings_node/src/conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,10 @@ impl Conversation {
self.created_at_ns,
);
let provider = group.mls_provider().map_err(ErrorWrapper::from)?;
let conversation_type = group
.conversation_type(&provider)
.map_err(ErrorWrapper::from)?;
let conversation_type = tokio::task::block_in_place(|| {
futures::executor::block_on(group.conversation_type(&provider))
})
.map_err(ErrorWrapper::from)?;
let kind = match conversation_type {
ConversationType::Group => None,
ConversationType::Dm => Some(XmtpGroupMessageKind::Application),
Expand Down Expand Up @@ -248,7 +249,7 @@ impl Conversation {
);

let admin_list = group
.admin_list(group.mls_provider().map_err(ErrorWrapper::from)?)
.admin_list(&group.mls_provider().map_err(ErrorWrapper::from)?)
.map_err(ErrorWrapper::from)?;

Ok(admin_list)
Expand All @@ -263,7 +264,7 @@ impl Conversation {
);

let super_admin_list = group
.super_admin_list(group.mls_provider().map_err(ErrorWrapper::from)?)
.super_admin_list(&group.mls_provider().map_err(ErrorWrapper::from)?)
.map_err(ErrorWrapper::from)?;

Ok(super_admin_list)
Expand Down Expand Up @@ -449,7 +450,7 @@ impl Conversation {
);

let group_name = group
.group_name(group.mls_provider().map_err(ErrorWrapper::from)?)
.group_name(&group.mls_provider().map_err(ErrorWrapper::from)?)
.map_err(ErrorWrapper::from)?;

Ok(group_name)
Expand Down Expand Up @@ -480,7 +481,7 @@ impl Conversation {
);

let group_image_url_square = group
.group_image_url_square(group.mls_provider().map_err(ErrorWrapper::from)?)
.group_image_url_square(&group.mls_provider().map_err(ErrorWrapper::from)?)
.map_err(ErrorWrapper::from)?;

Ok(group_image_url_square)
Expand Down Expand Up @@ -511,7 +512,7 @@ impl Conversation {
);

let group_description = group
.group_description(group.mls_provider().map_err(ErrorWrapper::from)?)
.group_description(&group.mls_provider().map_err(ErrorWrapper::from)?)
.map_err(ErrorWrapper::from)?;

Ok(group_description)
Expand Down Expand Up @@ -542,7 +543,7 @@ impl Conversation {
);

let group_pinned_frame_url = group
.group_pinned_frame_url(group.mls_provider().map_err(ErrorWrapper::from)?)
.group_pinned_frame_url(&group.mls_provider().map_err(ErrorWrapper::from)?)
.map_err(ErrorWrapper::from)?;

Ok(group_pinned_frame_url)
Expand Down Expand Up @@ -585,7 +586,7 @@ impl Conversation {

Ok(
group
.is_active(group.mls_provider().map_err(ErrorWrapper::from)?)
.is_active(&group.mls_provider().map_err(ErrorWrapper::from)?)
.map_err(ErrorWrapper::from)?,
)
}
Expand All @@ -609,9 +610,10 @@ impl Conversation {
self.created_at_ns,
);

let metadata = group
.metadata(group.mls_provider().map_err(ErrorWrapper::from)?)
.map_err(ErrorWrapper::from)?;
let metadata = tokio::task::block_in_place(|| {
futures::executor::block_on(group.metadata(&group.mls_provider()?))
})
.map_err(ErrorWrapper::from)?;

Ok(GroupMetadata { inner: metadata })
}
Expand Down
25 changes: 15 additions & 10 deletions bindings_wasm/src/conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,18 @@ impl Conversation {
}

#[wasm_bindgen(js_name = findMessages)]
pub fn find_messages(&self, opts: Option<ListMessagesOptions>) -> Result<Vec<Message>, JsError> {
pub async fn find_messages(
&self,
opts: Option<ListMessagesOptions>,
) -> Result<Vec<Message>, JsError> {
let opts = opts.unwrap_or_default();
let group = self.to_mls_group();
let provider = group
.mls_provider()
.map_err(|e| JsError::new(&format!("{e}")))?;
let conversation_type = group
.conversation_type(&provider)
.await
.map_err(|e| JsError::new(&format!("{e}")))?;
let kind = match conversation_type {
ConversationType::Group => None,
Expand Down Expand Up @@ -238,7 +242,7 @@ impl Conversation {
let group = self.to_mls_group();
let admin_list = group
.admin_list(
group
&group
.mls_provider()
.map_err(|e| JsError::new(&format!("{e}")))?,
)
Expand All @@ -252,7 +256,7 @@ impl Conversation {
let group = self.to_mls_group();
let super_admin_list = group
.super_admin_list(
group
&group
.mls_provider()
.map_err(|e| JsError::new(&format!("{e}")))?,
)
Expand Down Expand Up @@ -398,7 +402,7 @@ impl Conversation {

let group_name = group
.group_name(
group
&group
.mls_provider()
.map_err(|e| JsError::new(&format!("{e}")))?,
)
Expand Down Expand Up @@ -428,7 +432,7 @@ impl Conversation {

let group_image_url_square = group
.group_image_url_square(
group
&group
.mls_provider()
.map_err(|e| JsError::new(&format!("{e}")))?,
)
Expand All @@ -455,7 +459,7 @@ impl Conversation {

let group_description = group
.group_description(
group
&group
.mls_provider()
.map_err(|e| JsError::new(&format!("{e}")))?,
)
Expand Down Expand Up @@ -485,7 +489,7 @@ impl Conversation {

let group_pinned_frame_url = group
.group_pinned_frame_url(
group
&group
.mls_provider()
.map_err(|e| JsError::new(&format!("{e}")))?,
)
Expand All @@ -505,7 +509,7 @@ impl Conversation {

group
.is_active(
group
&group
.mls_provider()
.map_err(|e| JsError::new(&format!("{e}")))?,
)
Expand All @@ -522,14 +526,15 @@ impl Conversation {
}

#[wasm_bindgen(js_name = groupMetadata)]
pub fn group_metadata(&self) -> Result<GroupMetadata, JsError> {
pub async fn group_metadata(&self) -> Result<GroupMetadata, JsError> {
let group = self.to_mls_group();
let metadata = group
.metadata(
group
&group
.mls_provider()
.map_err(|e| JsError::new(&format!("{e}")))?,
)
.await
.map_err(|e| JsError::new(&format!("{e}")))?;

Ok(GroupMetadata { inner: metadata })
Expand Down
4 changes: 2 additions & 2 deletions common/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub fn logger() {
.from_env_lossy()
};

tracing_subscriber::registry()
let _ = tracing_subscriber::registry()
// structured JSON logger only if STRUCTURED=true
.with(is_structured.then(|| {
tracing_subscriber::fmt::layer()
Expand All @@ -61,7 +61,7 @@ pub fn logger() {
})
.with_filter(filter())
}))
.init();
.try_init();
});
}

Expand Down
5 changes: 3 additions & 2 deletions examples/cli/serializable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ impl SerializableGroup {

let metadata = group
.metadata(
group
&group
.mls_provider()
.expect("MLS Provider could not be created"),
)
.expect("could not load metadata");
.await
.unwrap();
let permissions = group.permissions().expect("could not load permissions");

Self {
Expand Down
14 changes: 8 additions & 6 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -862,20 +862,22 @@ where
.map(|group| {
let active_group_count = Arc::clone(&active_group_count);
async move {
let mls_group = group.load_mls_group(provider)?;
tracing::info!(
inbox_id = self.inbox_id(),
"[{}] syncing group",
self.inbox_id()
);
tracing::info!(
inbox_id = self.inbox_id(),
group_epoch = mls_group.epoch().as_u64(),
"current epoch for [{}] in sync_all_groups() is Epoch: [{}]",
self.inbox_id(),
mls_group.epoch()
"[{}] syncing group",
self.inbox_id()
);
if mls_group.is_active() {
let is_active = group
.load_mls_group_with_lock_async(provider, |mls_group| async move {
Ok::<bool, GroupError>(mls_group.is_active())
})
.await?;
if is_active {
group.maybe_update_installations(provider, None).await?;

group.sync_with_conn(provider).await?;
Expand Down
7 changes: 4 additions & 3 deletions xmtp_mls/src/groups/intents.rs
Original file line number Diff line number Diff line change
Expand Up @@ -865,9 +865,10 @@ pub(crate) mod tests {
};

let provider = group.client.mls_provider().unwrap();
let mut openmls_group = group.load_mls_group(&provider).unwrap();
let decrypted_message = openmls_group
.process_message(&provider, mls_message)
let decrypted_message = group
.load_mls_group_with_lock(&provider, |mut mls_group| {
Ok(mls_group.process_message(&provider, mls_message).unwrap())
})
.unwrap();

let staged_commit = match decrypted_message.into_content() {
Expand Down
6 changes: 3 additions & 3 deletions xmtp_mls/src/groups/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ where
&self,
provider: &XmtpOpenMlsProvider,
) -> Result<Vec<GroupMember>, GroupError> {
let openmls_group = self.load_mls_group(provider)?;
// TODO: Replace with try_into from extensions
let group_membership = extract_group_membership(openmls_group.extensions())?;
let group_membership = self.load_mls_group_with_lock(provider, |mls_group| {
Ok(extract_group_membership(mls_group.extensions())?)
})?;
let requests = group_membership
.members
.into_iter()
Expand Down
Loading
Loading