From bed22e1b0a49a8571d0152f961aa1b632523c506 Mon Sep 17 00:00:00 2001 From: Hamish Peebles Date: Mon, 2 Dec 2024 11:30:03 +0000 Subject: [PATCH] Export members from stable memory when importing group into community (#6935) --- Cargo.lock | 1 + backend/canisters/community/CHANGELOG.md | 1 + .../community/impl/src/jobs/import_groups.rs | 47 ++++++++++++++++-- .../impl/src/model/groups_being_imported.rs | 30 ++++++++++-- backend/canisters/group/CHANGELOG.md | 1 + .../src/updates/c2c_export_group_members.rs | 29 +++++++++++ .../canisters/group/api/src/updates/mod.rs | 1 + backend/canisters/group/c2c_client/src/lib.rs | 1 + .../src/updates/c2c_export_group_members.rs | 21 ++++++++ .../canisters/group/impl/src/updates/mod.rs | 1 + .../libraries/chat_events/src/chat_events.rs | 4 +- .../chat_events/src/stable_memory/mod.rs | 1 + backend/libraries/constants/src/lib.rs | 1 + backend/libraries/group_chat_core/Cargo.toml | 1 + .../libraries/group_chat_core/src/members.rs | 12 ++++- .../src/members/stable_memory.rs | 48 +++++++++++++++++-- 16 files changed, 185 insertions(+), 15 deletions(-) create mode 100644 backend/canisters/group/api/src/updates/c2c_export_group_members.rs create mode 100644 backend/canisters/group/impl/src/updates/c2c_export_group_members.rs diff --git a/Cargo.lock b/Cargo.lock index e7fb5edf09..fe78fc22be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2911,6 +2911,7 @@ dependencies = [ "regex-lite", "search", "serde", + "serde_bytes", "stable_memory_map", "test-strategy", "tracing", diff --git a/backend/canisters/community/CHANGELOG.md b/backend/canisters/community/CHANGELOG.md index e0ea016401..19a1e61e65 100644 --- a/backend/canisters/community/CHANGELOG.md +++ b/backend/canisters/community/CHANGELOG.md @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). - Only handle a single bot action ([#6929](https://github.com/open-chat-labs/open-chat/pull/6929)) - Implement `MembersStableStorage` which stores members in stable memory ([#6931](https://github.com/open-chat-labs/open-chat/pull/6931)) - Migrate chat members to stable memory using timer job ([#6933](https://github.com/open-chat-labs/open-chat/pull/6933)) +- Write group members to stable memory when importing group into community ([#6935](https://github.com/open-chat-labs/open-chat/pull/6935)) ## [[2.0.1479](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1479-community)] - 2024-11-28 diff --git a/backend/canisters/community/impl/src/jobs/import_groups.rs b/backend/canisters/community/impl/src/jobs/import_groups.rs index 3c114205ef..8d202754ee 100644 --- a/backend/canisters/community/impl/src/jobs/import_groups.rs +++ b/backend/canisters/community/impl/src/jobs/import_groups.rs @@ -9,13 +9,15 @@ use crate::{mutate_state, read_state, RuntimeState}; use chat_events::ChatEvents; use constants::OPENCHAT_BOT_USER_ID; use group_canister::c2c_export_group::{Args, Response}; -use group_chat_core::GroupChatCore; +use group_chat_core::{GroupChatCore, GroupMembers}; use ic_cdk_timers::TimerId; use std::cell::Cell; use std::collections::HashMap; use std::time::Duration; use tracing::{info, trace}; -use types::{ChannelId, ChannelLatestMessageIndex, Chat, ChatId, CommunityUsersBlocked, Empty, UserId, UserType}; +use types::{ + ChannelId, ChannelLatestMessageIndex, Chat, ChatId, CommunityUsersBlocked, Empty, MultiUserChat, UserId, UserType, +}; const PAGE_SIZE: u32 = 19 * 102 * 1024; // Roughly 1.9MB (1.9 * 1024 * 1024) @@ -135,6 +137,41 @@ async fn import_group(group: GroupToImport) { } } } + GroupToImportAction::Members(channel_id, after) => { + match group_canister_c2c_client::c2c_export_group_members( + group_id.into(), + &group_canister::c2c_export_group_members::Args { after }, + ) + .await + { + Ok(group_canister::c2c_export_group_members::Response::Success(result)) => { + mutate_state(|state| { + let up_to = GroupMembers::write_members_from_bytes_to_stable_memory( + MultiUserChat::Channel(state.env.canister_id().into(), channel_id), + result.members, + ); + if let Some(user_id) = up_to { + state + .data + .groups_being_imported + .mark_members_batch_complete(&group_id, user_id); + } + if result.finished { + state.data.groups_being_imported.mark_members_import_complete(&group_id); + } + info!(%group_id, "Group members imported"); + }); + } + Err(error) => { + mutate_state(|state| { + state + .data + .groups_being_imported + .mark_batch_failed(&group_id, format!("{error:?}")); + }); + } + } + } } } @@ -145,10 +182,12 @@ pub(crate) fn finalize_group_import(group_id: ChatId) { mutate_state(|state| { if let Some(group) = state.data.groups_being_imported.take(&group_id) { let now = state.env.now(); + let community_id = state.env.canister_id().into(); let channel_id = group.channel_id(); + let mut chat: GroupChatCore = msgpack::deserialize_then_unwrap(group.bytes()); - chat.events - .set_chat(Chat::Channel(state.env.canister_id().into(), channel_id)); + chat.events.set_chat(Chat::Channel(community_id, channel_id)); + chat.members.set_chat(MultiUserChat::Channel(community_id, channel_id)); let blocked: Vec<_> = chat.members.blocked(); if !blocked.is_empty() { diff --git a/backend/canisters/community/impl/src/model/groups_being_imported.rs b/backend/canisters/community/impl/src/model/groups_being_imported.rs index a471348908..412a3bf16e 100644 --- a/backend/canisters/community/impl/src/model/groups_being_imported.rs +++ b/backend/canisters/community/impl/src/model/groups_being_imported.rs @@ -16,6 +16,7 @@ pub struct GroupToImport { #[derive(Debug)] pub enum GroupToImportAction { Events(ChannelId, Option), + Members(ChannelId, Option), Core(u64), } @@ -47,10 +48,12 @@ impl GroupsBeingImported { for (chat_id, group) in self.groups.iter_mut().filter(|(_, g)| !g.is_complete()) { if group.current_batch_started.is_none() { group.current_batch_started = Some(now); - let action = if group.events_imported { - GroupToImportAction::Core(group.bytes.len() as u64) - } else { + let action = if !group.events_imported { GroupToImportAction::Events(group.channel_id, group.events_imported_up_to.clone()) + } else if !group.members_imported { + GroupToImportAction::Members(group.channel_id, group.members_imported_up_to) + } else { + GroupToImportAction::Core(group.bytes.len() as u64) }; batch.push(GroupToImport { group_id: *chat_id, @@ -89,6 +92,22 @@ impl GroupsBeingImported { } } + pub fn mark_members_batch_complete(&mut self, group_id: &ChatId, up_to: UserId) { + if let Some(group) = self.groups.get_mut(group_id) { + group.current_batch_started = None; + group.error_message = None; + group.members_imported_up_to = Some(up_to); + } + } + + pub fn mark_members_import_complete(&mut self, group_id: &ChatId) { + if let Some(group) = self.groups.get_mut(group_id) { + group.current_batch_started = None; + group.error_message = None; + group.members_imported = true; + } + } + pub fn mark_batch_failed(&mut self, group_id: &ChatId, error_message: String) { if let Some(group) = self.groups.get_mut(group_id) { group.current_batch_started = None; @@ -122,6 +141,9 @@ pub struct GroupBeingImported { #[serde(default)] events_imported: bool, events_imported_up_to: Option, + #[serde(default)] + members_imported: bool, + members_imported_up_to: Option, total_bytes: u64, #[serde(with = "serde_bytes")] bytes: Vec, @@ -144,6 +166,8 @@ impl GroupBeingImported { current_batch_started: None, events_imported: false, events_imported_up_to: None, + members_imported: false, + members_imported_up_to: None, total_bytes, bytes: Vec::with_capacity(total_bytes as usize), error_message: None, diff --git a/backend/canisters/group/CHANGELOG.md b/backend/canisters/group/CHANGELOG.md index 7126ab1156..07c02ba99e 100644 --- a/backend/canisters/group/CHANGELOG.md +++ b/backend/canisters/group/CHANGELOG.md @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). - Only handle a single bot action ([#6929](https://github.com/open-chat-labs/open-chat/pull/6929)) - Implement `MembersStableStorage` which stores members in stable memory ([#6931](https://github.com/open-chat-labs/open-chat/pull/6931)) - Migrate chat members to stable memory using timer job ([#6933](https://github.com/open-chat-labs/open-chat/pull/6933)) +- Export members from stable memory when importing group into community ([#6935](https://github.com/open-chat-labs/open-chat/pull/6935)) ## [[2.0.1480](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1480-group)] - 2024-11-28 diff --git a/backend/canisters/group/api/src/updates/c2c_export_group_members.rs b/backend/canisters/group/api/src/updates/c2c_export_group_members.rs new file mode 100644 index 0000000000..cf22f95b31 --- /dev/null +++ b/backend/canisters/group/api/src/updates/c2c_export_group_members.rs @@ -0,0 +1,29 @@ +use serde::{Deserialize, Serialize}; +use serde_bytes::ByteBuf; +use std::fmt::{Debug, Formatter}; +use types::UserId; + +#[derive(Serialize, Deserialize, Debug)] +pub struct Args { + pub after: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +pub enum Response { + Success(SuccessResult), +} + +#[derive(Serialize, Deserialize)] +pub struct SuccessResult { + pub members: Vec, + pub finished: bool, +} + +impl Debug for SuccessResult { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut s = f.debug_struct("SuccessResult"); + s.field("members", &self.members.len()); + s.field("finished", &self.finished); + s.finish() + } +} diff --git a/backend/canisters/group/api/src/updates/mod.rs b/backend/canisters/group/api/src/updates/mod.rs index 827a1d7069..deb32d8f81 100644 --- a/backend/canisters/group/api/src/updates/mod.rs +++ b/backend/canisters/group/api/src/updates/mod.rs @@ -4,6 +4,7 @@ pub mod block_user; pub mod c2c_delete_group; pub mod c2c_export_group; pub mod c2c_export_group_events; +pub mod c2c_export_group_members; pub mod c2c_freeze_group; pub mod c2c_handle_bot_action; pub mod c2c_invite_users; diff --git a/backend/canisters/group/c2c_client/src/lib.rs b/backend/canisters/group/c2c_client/src/lib.rs index 79706015fc..b61ddab514 100644 --- a/backend/canisters/group/c2c_client/src/lib.rs +++ b/backend/canisters/group/c2c_client/src/lib.rs @@ -20,6 +20,7 @@ generate_candid_c2c_call!(selected_initial); generate_c2c_call!(c2c_delete_group); generate_c2c_call!(c2c_export_group); generate_c2c_call!(c2c_export_group_events); +generate_c2c_call!(c2c_export_group_members); generate_c2c_call!(c2c_freeze_group); generate_c2c_call!(c2c_invite_users); generate_c2c_call!(c2c_join_group); diff --git a/backend/canisters/group/impl/src/updates/c2c_export_group_members.rs b/backend/canisters/group/impl/src/updates/c2c_export_group_members.rs new file mode 100644 index 0000000000..998cadb75d --- /dev/null +++ b/backend/canisters/group/impl/src/updates/c2c_export_group_members.rs @@ -0,0 +1,21 @@ +use crate::guards::caller_is_community_being_imported_into; +use crate::RuntimeState; +use crate::{read_state, run_regular_jobs}; +use canister_api_macros::update; +use group_canister::c2c_export_group_members::{Response::*, *}; + +#[update(guard = "caller_is_community_being_imported_into", msgpack = true)] +fn c2c_export_group_members(args: Args) -> Response { + run_regular_jobs(); + + read_state(|state| c2c_export_group_members_impl(args, state)) +} + +fn c2c_export_group_members_impl(args: Args, state: &RuntimeState) -> Response { + let members = state.data.chat.members.read_members_as_bytes_from_stable_memory(args.after); + + Success(SuccessResult { + finished: members.is_empty(), + members, + }) +} diff --git a/backend/canisters/group/impl/src/updates/mod.rs b/backend/canisters/group/impl/src/updates/mod.rs index 5406e77a69..bf41b5a522 100644 --- a/backend/canisters/group/impl/src/updates/mod.rs +++ b/backend/canisters/group/impl/src/updates/mod.rs @@ -3,6 +3,7 @@ pub mod add_reaction; pub mod c2c_delete_group; pub mod c2c_export_group; pub mod c2c_export_group_events; +pub mod c2c_export_group_members; pub mod c2c_freeze_group; pub mod c2c_handle_bot_action; pub mod c2c_invite_users; diff --git a/backend/libraries/chat_events/src/chat_events.rs b/backend/libraries/chat_events/src/chat_events.rs index 6a8f7905ba..a63fdc73b7 100644 --- a/backend/libraries/chat_events/src/chat_events.rs +++ b/backend/libraries/chat_events/src/chat_events.rs @@ -4,7 +4,7 @@ use crate::last_updated_timestamps::LastUpdatedTimestamps; use crate::metrics::{ChatMetricsInternal, MetricKey}; use crate::search_index::SearchIndex; use crate::*; -use constants::{HOUR_IN_MS, OPENCHAT_BOT_USER_ID}; +use constants::{HOUR_IN_MS, ONE_MB, OPENCHAT_BOT_USER_ID}; use event_store_producer::{EventBuilder, EventStoreClient, Runtime}; use rand::rngs::StdRng; use rand::Rng; @@ -165,7 +165,7 @@ impl ChatEvents { } pub fn read_events_as_bytes_from_stable_memory(&self, after: Option) -> Vec<(EventContext, ByteBuf)> { - stable_memory::read_events_as_bytes(self.chat, after, 1_000_000) + stable_memory::read_events_as_bytes(self.chat, after, 2 * ONE_MB as usize) } pub fn iter_recently_updated_events( diff --git a/backend/libraries/chat_events/src/stable_memory/mod.rs b/backend/libraries/chat_events/src/stable_memory/mod.rs index 94f2da4a9f..6aa1b846fb 100644 --- a/backend/libraries/chat_events/src/stable_memory/mod.rs +++ b/backend/libraries/chat_events/src/stable_memory/mod.rs @@ -40,6 +40,7 @@ pub fn read_events_as_bytes(chat: Chat, after: Option, max_bytes: }) } +// Used to efficiently write all events to stable memory when migrating a group into a community pub fn write_events_as_bytes(chat: Chat, events: Vec<(EventContext, ByteBuf)>) { with_map_mut(|m| { for (context, bytes) in events { diff --git a/backend/libraries/constants/src/lib.rs b/backend/libraries/constants/src/lib.rs index 15a85c8b5c..26e53c3e96 100644 --- a/backend/libraries/constants/src/lib.rs +++ b/backend/libraries/constants/src/lib.rs @@ -7,6 +7,7 @@ pub const HOUR_IN_MS: Milliseconds = MINUTE_IN_MS * 60; pub const DAY_IN_MS: Milliseconds = HOUR_IN_MS * 24; pub const WEEK_IN_MS: Milliseconds = DAY_IN_MS * 7; pub const NANOS_PER_MILLISECOND: u64 = 1_000_000; +pub const ONE_MB: u32 = 1024 * 1024; // This only applies to the 'top level' canisters (ie. not user + group canisters) pub fn min_cycles_balance(test_mode: bool) -> Cycles { diff --git a/backend/libraries/group_chat_core/Cargo.toml b/backend/libraries/group_chat_core/Cargo.toml index ea417f8546..ec847213c2 100644 --- a/backend/libraries/group_chat_core/Cargo.toml +++ b/backend/libraries/group_chat_core/Cargo.toml @@ -18,6 +18,7 @@ msgpack = { path = "../msgpack" } regex-lite = { workspace = true } search = { path = "../search" } serde = { workspace = true } +serde_bytes = { workspace = true } stable_memory_map = { path = "../stable_memory_map" } tracing = { workspace = true } types = { path = "../types" } diff --git a/backend/libraries/group_chat_core/src/members.rs b/backend/libraries/group_chat_core/src/members.rs index f39e77faf6..b5b83ee859 100644 --- a/backend/libraries/group_chat_core/src/members.rs +++ b/backend/libraries/group_chat_core/src/members.rs @@ -4,9 +4,10 @@ use crate::mentions::Mentions; use crate::roles::GroupRoleInternal; use crate::AccessRulesInternal; use candid::Principal; -use constants::calculate_summary_updates_data_removal_cutoff; +use constants::{calculate_summary_updates_data_removal_cutoff, ONE_MB}; use group_community_common::{Member, MemberUpdate, Members}; use serde::{Deserialize, Serialize}; +use serde_bytes::ByteBuf; use std::cell::OnceCell; use std::cmp::max; use std::collections::{BTreeMap, BTreeSet, VecDeque}; @@ -138,6 +139,15 @@ impl GroupMembers { self.stable_memory_members_map.set_chat(chat); } + pub fn read_members_as_bytes_from_stable_memory(&self, after: Option) -> Vec { + self.stable_memory_members_map + .read_members_as_bytes(after, 2 * ONE_MB as usize) + } + + pub fn write_members_from_bytes_to_stable_memory(chat: MultiUserChat, members: Vec) -> Option { + stable_memory::write_members_from_bytes(chat, members) + } + pub fn add( &mut self, user_id: UserId, diff --git a/backend/libraries/group_chat_core/src/members/stable_memory.rs b/backend/libraries/group_chat_core/src/members/stable_memory.rs index 8a3f84dab1..43a660d584 100644 --- a/backend/libraries/group_chat_core/src/members/stable_memory.rs +++ b/backend/libraries/group_chat_core/src/members/stable_memory.rs @@ -3,6 +3,7 @@ use crate::GroupMemberInternal; use candid::{Deserialize, Principal}; use serde::de::{Error, Visitor}; use serde::{Deserializer, Serialize, Serializer}; +use serde_bytes::ByteBuf; use stable_memory_map::{with_map, with_map_mut, KeyType}; use std::fmt::Formatter; use types::{MultiUserChat, UserId}; @@ -31,6 +32,28 @@ impl MembersStableStorage { self.prefix = chat.into(); } + // Used to efficiently read all members from stable memory when migrating a group into a community + pub fn read_members_as_bytes(&self, after: Option, max_bytes: usize) -> Vec { + let start_key = match after { + None => self.key(Principal::from_slice(&[]).into()), + Some(user_id) => self.key(user_id), + } + .to_vec(); + + with_map(|m| { + let mut total_bytes = 0; + m.range(start_key.clone()..) + .skip_while(|(k, _)| *k == start_key) + .take_while(|(k, _)| KeyPrefix::try_from(k.as_slice()).is_ok_and(|p| p == self.prefix)) + .map(|(_, v)| ByteBuf::from(v)) + .take_while(|v| { + total_bytes += v.len(); + total_bytes < max_bytes + }) + .collect() + }) + } + fn key(&self, user_id: UserId) -> Key { Key::new(self.prefix, user_id) } @@ -38,7 +61,7 @@ impl MembersStableStorage { impl MembersMap for MembersStableStorage { fn get(&self, user_id: &UserId) -> Option { - with_map(|m| m.get(&self.key(*user_id).to_vec()).map(bytes_to_member)) + with_map(|m| m.get(&self.key(*user_id).to_vec()).map(|v| bytes_to_member(&v))) } fn insert(&mut self, member: GroupMemberInternal) { @@ -46,7 +69,7 @@ impl MembersMap for MembersStableStorage { } fn remove(&mut self, user_id: &UserId) -> Option { - with_map_mut(|m| m.remove(&self.key(*user_id).to_vec()).map(bytes_to_member)) + with_map_mut(|m| m.remove(&self.key(*user_id).to_vec()).map(|v| bytes_to_member(&v))) } #[cfg(test)] @@ -54,18 +77,33 @@ impl MembersMap for MembersStableStorage { with_map(|m| { m.range(self.key(Principal::from_slice(&[]).into()).to_vec()..) .take_while(|(k, _)| Key::try_from(k.as_slice()).ok().filter(|k| k.prefix == self.prefix).is_some()) - .map(|(_, v)| bytes_to_member(v)) + .map(|(_, v)| bytes_to_member(&v)) .collect() }) } } +// Used to write all members to stable memory when migrating a group into a community +pub fn write_members_from_bytes(chat: MultiUserChat, members: Vec) -> Option { + let prefix = chat.into(); + let mut latest = None; + with_map_mut(|m| { + for byte_buf in members { + let bytes = byte_buf.into_vec(); + let member = bytes_to_member(&bytes); + latest = Some(member.user_id); + m.insert(Key::new(prefix, member.user_id).to_vec(), bytes); + } + }); + latest +} + fn member_to_bytes(member: &GroupMemberInternal) -> Vec { msgpack::serialize_then_unwrap(member) } -fn bytes_to_member(bytes: Vec) -> GroupMemberInternal { - msgpack::deserialize_then_unwrap(&bytes) +fn bytes_to_member(bytes: &[u8]) -> GroupMemberInternal { + msgpack::deserialize_then_unwrap(bytes) } #[derive(Eq, PartialEq, Ord, PartialOrd, Debug)]