Skip to content

Commit

Permalink
Export members from stable memory when importing group into community (
Browse files Browse the repository at this point in the history
  • Loading branch information
hpeebles authored Dec 2, 2024
1 parent cb51d2e commit bed22e1
Show file tree
Hide file tree
Showing 16 changed files with 185 additions and 15 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/canisters/community/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- Only handle a single bot action ([#6929](https://github.com/open-chat-labs/open-chat/pull/6929))
- Implement `MembersStableStorage` which stores members in stable memory ([#6931](https://github.com/open-chat-labs/open-chat/pull/6931))
- Migrate chat members to stable memory using timer job ([#6933](https://github.com/open-chat-labs/open-chat/pull/6933))
- Write group members to stable memory when importing group into community ([#6935](https://github.com/open-chat-labs/open-chat/pull/6935))

## [[2.0.1479](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1479-community)] - 2024-11-28

Expand Down
47 changes: 43 additions & 4 deletions backend/canisters/community/impl/src/jobs/import_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ use crate::{mutate_state, read_state, RuntimeState};
use chat_events::ChatEvents;
use constants::OPENCHAT_BOT_USER_ID;
use group_canister::c2c_export_group::{Args, Response};
use group_chat_core::GroupChatCore;
use group_chat_core::{GroupChatCore, GroupMembers};
use ic_cdk_timers::TimerId;
use std::cell::Cell;
use std::collections::HashMap;
use std::time::Duration;
use tracing::{info, trace};
use types::{ChannelId, ChannelLatestMessageIndex, Chat, ChatId, CommunityUsersBlocked, Empty, UserId, UserType};
use types::{
ChannelId, ChannelLatestMessageIndex, Chat, ChatId, CommunityUsersBlocked, Empty, MultiUserChat, UserId, UserType,
};

const PAGE_SIZE: u32 = 19 * 102 * 1024; // Roughly 1.9MB (1.9 * 1024 * 1024)

Expand Down Expand Up @@ -135,6 +137,41 @@ async fn import_group(group: GroupToImport) {
}
}
}
GroupToImportAction::Members(channel_id, after) => {
match group_canister_c2c_client::c2c_export_group_members(
group_id.into(),
&group_canister::c2c_export_group_members::Args { after },
)
.await
{
Ok(group_canister::c2c_export_group_members::Response::Success(result)) => {
mutate_state(|state| {
let up_to = GroupMembers::write_members_from_bytes_to_stable_memory(
MultiUserChat::Channel(state.env.canister_id().into(), channel_id),
result.members,
);
if let Some(user_id) = up_to {
state
.data
.groups_being_imported
.mark_members_batch_complete(&group_id, user_id);
}
if result.finished {
state.data.groups_being_imported.mark_members_import_complete(&group_id);
}
info!(%group_id, "Group members imported");
});
}
Err(error) => {
mutate_state(|state| {
state
.data
.groups_being_imported
.mark_batch_failed(&group_id, format!("{error:?}"));
});
}
}
}
}
}

Expand All @@ -145,10 +182,12 @@ pub(crate) fn finalize_group_import(group_id: ChatId) {
mutate_state(|state| {
if let Some(group) = state.data.groups_being_imported.take(&group_id) {
let now = state.env.now();
let community_id = state.env.canister_id().into();
let channel_id = group.channel_id();

let mut chat: GroupChatCore = msgpack::deserialize_then_unwrap(group.bytes());
chat.events
.set_chat(Chat::Channel(state.env.canister_id().into(), channel_id));
chat.events.set_chat(Chat::Channel(community_id, channel_id));
chat.members.set_chat(MultiUserChat::Channel(community_id, channel_id));

let blocked: Vec<_> = chat.members.blocked();
if !blocked.is_empty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub struct GroupToImport {
#[derive(Debug)]
pub enum GroupToImportAction {
Events(ChannelId, Option<EventContext>),
Members(ChannelId, Option<UserId>),
Core(u64),
}

Expand Down Expand Up @@ -47,10 +48,12 @@ impl GroupsBeingImported {
for (chat_id, group) in self.groups.iter_mut().filter(|(_, g)| !g.is_complete()) {
if group.current_batch_started.is_none() {
group.current_batch_started = Some(now);
let action = if group.events_imported {
GroupToImportAction::Core(group.bytes.len() as u64)
} else {
let action = if !group.events_imported {
GroupToImportAction::Events(group.channel_id, group.events_imported_up_to.clone())
} else if !group.members_imported {
GroupToImportAction::Members(group.channel_id, group.members_imported_up_to)
} else {
GroupToImportAction::Core(group.bytes.len() as u64)
};
batch.push(GroupToImport {
group_id: *chat_id,
Expand Down Expand Up @@ -89,6 +92,22 @@ impl GroupsBeingImported {
}
}

pub fn mark_members_batch_complete(&mut self, group_id: &ChatId, up_to: UserId) {
if let Some(group) = self.groups.get_mut(group_id) {
group.current_batch_started = None;
group.error_message = None;
group.members_imported_up_to = Some(up_to);
}
}

pub fn mark_members_import_complete(&mut self, group_id: &ChatId) {
if let Some(group) = self.groups.get_mut(group_id) {
group.current_batch_started = None;
group.error_message = None;
group.members_imported = true;
}
}

pub fn mark_batch_failed(&mut self, group_id: &ChatId, error_message: String) {
if let Some(group) = self.groups.get_mut(group_id) {
group.current_batch_started = None;
Expand Down Expand Up @@ -122,6 +141,9 @@ pub struct GroupBeingImported {
#[serde(default)]
events_imported: bool,
events_imported_up_to: Option<EventContext>,
#[serde(default)]
members_imported: bool,
members_imported_up_to: Option<UserId>,
total_bytes: u64,
#[serde(with = "serde_bytes")]
bytes: Vec<u8>,
Expand All @@ -144,6 +166,8 @@ impl GroupBeingImported {
current_batch_started: None,
events_imported: false,
events_imported_up_to: None,
members_imported: false,
members_imported_up_to: None,
total_bytes,
bytes: Vec::with_capacity(total_bytes as usize),
error_message: None,
Expand Down
1 change: 1 addition & 0 deletions backend/canisters/group/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- Only handle a single bot action ([#6929](https://github.com/open-chat-labs/open-chat/pull/6929))
- Implement `MembersStableStorage` which stores members in stable memory ([#6931](https://github.com/open-chat-labs/open-chat/pull/6931))
- Migrate chat members to stable memory using timer job ([#6933](https://github.com/open-chat-labs/open-chat/pull/6933))
- Export members from stable memory when importing group into community ([#6935](https://github.com/open-chat-labs/open-chat/pull/6935))

## [[2.0.1480](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1480-group)] - 2024-11-28

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use std::fmt::{Debug, Formatter};
use types::UserId;

#[derive(Serialize, Deserialize, Debug)]
pub struct Args {
pub after: Option<UserId>,
}

#[derive(Serialize, Deserialize, Debug)]
pub enum Response {
Success(SuccessResult),
}

#[derive(Serialize, Deserialize)]
pub struct SuccessResult {
pub members: Vec<ByteBuf>,
pub finished: bool,
}

impl Debug for SuccessResult {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut s = f.debug_struct("SuccessResult");
s.field("members", &self.members.len());
s.field("finished", &self.finished);
s.finish()
}
}
1 change: 1 addition & 0 deletions backend/canisters/group/api/src/updates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod block_user;
pub mod c2c_delete_group;
pub mod c2c_export_group;
pub mod c2c_export_group_events;
pub mod c2c_export_group_members;
pub mod c2c_freeze_group;
pub mod c2c_handle_bot_action;
pub mod c2c_invite_users;
Expand Down
1 change: 1 addition & 0 deletions backend/canisters/group/c2c_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ generate_candid_c2c_call!(selected_initial);
generate_c2c_call!(c2c_delete_group);
generate_c2c_call!(c2c_export_group);
generate_c2c_call!(c2c_export_group_events);
generate_c2c_call!(c2c_export_group_members);
generate_c2c_call!(c2c_freeze_group);
generate_c2c_call!(c2c_invite_users);
generate_c2c_call!(c2c_join_group);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use crate::guards::caller_is_community_being_imported_into;
use crate::RuntimeState;
use crate::{read_state, run_regular_jobs};
use canister_api_macros::update;
use group_canister::c2c_export_group_members::{Response::*, *};

#[update(guard = "caller_is_community_being_imported_into", msgpack = true)]
fn c2c_export_group_members(args: Args) -> Response {
run_regular_jobs();

read_state(|state| c2c_export_group_members_impl(args, state))
}

fn c2c_export_group_members_impl(args: Args, state: &RuntimeState) -> Response {
let members = state.data.chat.members.read_members_as_bytes_from_stable_memory(args.after);

Success(SuccessResult {
finished: members.is_empty(),
members,
})
}
1 change: 1 addition & 0 deletions backend/canisters/group/impl/src/updates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod add_reaction;
pub mod c2c_delete_group;
pub mod c2c_export_group;
pub mod c2c_export_group_events;
pub mod c2c_export_group_members;
pub mod c2c_freeze_group;
pub mod c2c_handle_bot_action;
pub mod c2c_invite_users;
Expand Down
4 changes: 2 additions & 2 deletions backend/libraries/chat_events/src/chat_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::last_updated_timestamps::LastUpdatedTimestamps;
use crate::metrics::{ChatMetricsInternal, MetricKey};
use crate::search_index::SearchIndex;
use crate::*;
use constants::{HOUR_IN_MS, OPENCHAT_BOT_USER_ID};
use constants::{HOUR_IN_MS, ONE_MB, OPENCHAT_BOT_USER_ID};
use event_store_producer::{EventBuilder, EventStoreClient, Runtime};
use rand::rngs::StdRng;
use rand::Rng;
Expand Down Expand Up @@ -165,7 +165,7 @@ impl ChatEvents {
}

pub fn read_events_as_bytes_from_stable_memory(&self, after: Option<EventContext>) -> Vec<(EventContext, ByteBuf)> {
stable_memory::read_events_as_bytes(self.chat, after, 1_000_000)
stable_memory::read_events_as_bytes(self.chat, after, 2 * ONE_MB as usize)
}

pub fn iter_recently_updated_events(
Expand Down
1 change: 1 addition & 0 deletions backend/libraries/chat_events/src/stable_memory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub fn read_events_as_bytes(chat: Chat, after: Option<EventContext>, max_bytes:
})
}

// Used to efficiently write all events to stable memory when migrating a group into a community
pub fn write_events_as_bytes(chat: Chat, events: Vec<(EventContext, ByteBuf)>) {
with_map_mut(|m| {
for (context, bytes) in events {
Expand Down
1 change: 1 addition & 0 deletions backend/libraries/constants/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub const HOUR_IN_MS: Milliseconds = MINUTE_IN_MS * 60;
pub const DAY_IN_MS: Milliseconds = HOUR_IN_MS * 24;
pub const WEEK_IN_MS: Milliseconds = DAY_IN_MS * 7;
pub const NANOS_PER_MILLISECOND: u64 = 1_000_000;
pub const ONE_MB: u32 = 1024 * 1024;

// This only applies to the 'top level' canisters (ie. not user + group canisters)
pub fn min_cycles_balance(test_mode: bool) -> Cycles {
Expand Down
1 change: 1 addition & 0 deletions backend/libraries/group_chat_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ msgpack = { path = "../msgpack" }
regex-lite = { workspace = true }
search = { path = "../search" }
serde = { workspace = true }
serde_bytes = { workspace = true }
stable_memory_map = { path = "../stable_memory_map" }
tracing = { workspace = true }
types = { path = "../types" }
Expand Down
12 changes: 11 additions & 1 deletion backend/libraries/group_chat_core/src/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use crate::mentions::Mentions;
use crate::roles::GroupRoleInternal;
use crate::AccessRulesInternal;
use candid::Principal;
use constants::calculate_summary_updates_data_removal_cutoff;
use constants::{calculate_summary_updates_data_removal_cutoff, ONE_MB};
use group_community_common::{Member, MemberUpdate, Members};
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use std::cell::OnceCell;
use std::cmp::max;
use std::collections::{BTreeMap, BTreeSet, VecDeque};
Expand Down Expand Up @@ -138,6 +139,15 @@ impl GroupMembers {
self.stable_memory_members_map.set_chat(chat);
}

pub fn read_members_as_bytes_from_stable_memory(&self, after: Option<UserId>) -> Vec<ByteBuf> {
self.stable_memory_members_map
.read_members_as_bytes(after, 2 * ONE_MB as usize)
}

pub fn write_members_from_bytes_to_stable_memory(chat: MultiUserChat, members: Vec<ByteBuf>) -> Option<UserId> {
stable_memory::write_members_from_bytes(chat, members)
}

pub fn add(
&mut self,
user_id: UserId,
Expand Down
Loading

0 comments on commit bed22e1

Please sign in to comment.