From cb51d2e29865df4ca58ceb0999c13d94c3140119 Mon Sep 17 00:00:00 2001 From: Hamish Peebles Date: Mon, 2 Dec 2024 08:53:36 +0000 Subject: [PATCH] Migrate chat members to stable memory using timer job (#6933) --- Cargo.lock | 3 + backend/canisters/community/CHANGELOG.md | 1 + .../src/jobs/garbage_collect_stable_memory.rs | 10 +- backend/canisters/community/impl/src/lib.rs | 17 +-- .../impl/src/lifecycle/post_upgrade.rs | 10 ++ .../community/impl/src/timer_job_types.rs | 33 +++++- .../impl/src/updates/delete_channel.rs | 13 ++- backend/canisters/group/CHANGELOG.md | 1 + .../src/jobs/garbage_collect_stable_memory.rs | 10 +- backend/canisters/group/impl/src/lib.rs | 15 ++- .../group/impl/src/lifecycle/post_upgrade.rs | 6 + .../group/impl/src/timer_job_types.rs | 25 ++++- .../src/jobs/garbage_collect_stable_memory.rs | 10 +- backend/canisters/user/impl/src/lib.rs | 3 +- .../impl/src/updates/delete_direct_chat.rs | 7 +- .../integration_tests/src/client/community.rs | 2 +- .../src/communities/delete_channel_tests.rs | 25 +++-- .../communities/disappearing_message_tests.rs | 11 +- .../src/disappearing_message_tests.rs | 6 +- .../integration_tests/src/stable_memory.rs | 10 +- .../libraries/chat_events/src/chat_events.rs | 5 - .../chat_events/src/stable_memory/mod.rs | 27 ----- backend/libraries/group_chat_core/Cargo.toml | 3 + backend/libraries/group_chat_core/src/lib.rs | 2 +- .../libraries/group_chat_core/src/members.rs | 106 ++++++++++++++---- .../group_chat_core/src/members/proptests.rs | 17 ++- .../src/members/stable_memory.rs | 66 ++++++----- .../group_chat_core/src/members_map.rs | 13 ++- .../libraries/stable_memory_map/src/lib.rs | 28 +++++ 29 files changed, 324 insertions(+), 161 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7b70b3903d..e7fb5edf09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2901,6 +2901,8 @@ dependencies = [ "constants", "event_store_producer", "group_community_common", + "ic-cdk 0.16.0", + "ic-stable-structures", "itertools 0.13.0", "lazy_static", "msgpack", @@ -2911,6 +2913,7 @@ dependencies = [ "serde", "stable_memory_map", "test-strategy", + "tracing", "types", "utils 0.1.0", ] diff --git a/backend/canisters/community/CHANGELOG.md b/backend/canisters/community/CHANGELOG.md index c1187d2041..e0ea016401 100644 --- a/backend/canisters/community/CHANGELOG.md +++ b/backend/canisters/community/CHANGELOG.md @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). - Move members to `MembersMap` in prep for stable memory ([#6927](https://github.com/open-chat-labs/open-chat/pull/6927)) - Only handle a single bot action ([#6929](https://github.com/open-chat-labs/open-chat/pull/6929)) - Implement `MembersStableStorage` which stores members in stable memory ([#6931](https://github.com/open-chat-labs/open-chat/pull/6931)) +- Migrate chat members to stable memory using timer job ([#6933](https://github.com/open-chat-labs/open-chat/pull/6933)) ## [[2.0.1479](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1479-community)] - 2024-11-28 diff --git a/backend/canisters/community/impl/src/jobs/garbage_collect_stable_memory.rs b/backend/canisters/community/impl/src/jobs/garbage_collect_stable_memory.rs index fa7ee8ce6e..a4153f09af 100644 --- a/backend/canisters/community/impl/src/jobs/garbage_collect_stable_memory.rs +++ b/backend/canisters/community/impl/src/jobs/garbage_collect_stable_memory.rs @@ -23,18 +23,12 @@ fn run() { TIMER_ID.set(None); mutate_state(|state| { while let Some(prefix) = state.data.stable_memory_keys_to_garbage_collect.pop() { - let result = chat_events::ChatEvents::garbage_collect_stable_memory(prefix.clone()); + let result = stable_memory_map::garbage_collect(prefix.clone()); let (count, complete) = match result { Ok(c) => (c, true), Err(c) => (c, false), }; - let thread_root_message_index = prefix.thread_root_message_index(); - info!( - count, - ?thread_root_message_index, - complete, - "Garbage collected keys from stable memory" - ); + info!(count, complete, "Garbage collected keys from stable memory"); if !complete { state.data.stable_memory_keys_to_garbage_collect.push(prefix); break; diff --git a/backend/canisters/community/impl/src/lib.rs b/backend/canisters/community/impl/src/lib.rs index 3a3c965ebf..696d5154a4 100644 --- a/backend/canisters/community/impl/src/lib.rs +++ b/backend/canisters/community/impl/src/lib.rs @@ -241,12 +241,9 @@ impl RuntimeState { } final_prize_payments.extend(result.final_prize_payments); for thread in result.threads { - self.data - .stable_memory_keys_to_garbage_collect - .push(KeyPrefix::ChannelThread(ChannelThreadKeyPrefix::new( - channel.id, - thread.root_message_index, - ))); + self.data.stable_memory_keys_to_garbage_collect.push( + KeyPrefix::ChannelThread(ChannelThreadKeyPrefix::new(channel.id, thread.root_message_index)).to_vec(), + ); } } jobs::garbage_collect_stable_memory::start_job_if_required(self); @@ -298,6 +295,7 @@ impl RuntimeState { instruction_counts: self.data.instruction_counts_log.iter().collect(), event_store_client_info: self.data.event_store_client.info(), timer_jobs: self.data.timer_jobs.len() as u32, + members_migrated_to_stable_memory: self.data.members_migrated_to_stable_memory, canister_ids: CanisterIds { user_index: self.data.user_index_canister_id, group_index: self.data.group_index_canister_id, @@ -367,7 +365,10 @@ struct Data { expiring_member_actions: ExpiringMemberActions, user_cache: UserCache, user_event_sync_queue: GroupedTimerJobQueue, - stable_memory_keys_to_garbage_collect: Vec, + #[serde(default)] + stable_memory_keys_to_garbage_collect: Vec>, + #[serde(default)] + members_migrated_to_stable_memory: bool, } impl Data { @@ -472,6 +473,7 @@ impl Data { user_cache: UserCache::default(), user_event_sync_queue: GroupedTimerJobQueue::new(5, true), stable_memory_keys_to_garbage_collect: Vec::new(), + members_migrated_to_stable_memory: true, } } @@ -713,6 +715,7 @@ pub struct Metrics { pub instruction_counts: Vec, pub event_store_client_info: EventStoreClientInfo, pub timer_jobs: u32, + pub members_migrated_to_stable_memory: bool, pub canister_ids: CanisterIds, } diff --git a/backend/canisters/community/impl/src/lifecycle/post_upgrade.rs b/backend/canisters/community/impl/src/lifecycle/post_upgrade.rs index 4d9ea34500..80b698ee4c 100644 --- a/backend/canisters/community/impl/src/lifecycle/post_upgrade.rs +++ b/backend/canisters/community/impl/src/lifecycle/post_upgrade.rs @@ -1,14 +1,17 @@ use crate::jobs::import_groups::finalize_group_import; use crate::lifecycle::{init_env, init_state}; use crate::memory::{get_stable_memory_map_memory, get_upgrades_memory}; +use crate::timer_job_types::MigrateMembersToStableMemoryJob; use crate::{read_state, Data}; use canister_logger::LogEntry; +use canister_timer_jobs::Job; use canister_tracing_macros::trace; use community_canister::post_upgrade::Args; use ic_cdk::post_upgrade; use instruction_counts_log::InstructionCountFunctionId; use stable_memory::get_reader; use tracing::info; +use types::MultiUserChat; #[post_upgrade] #[trace] @@ -21,8 +24,13 @@ fn post_upgrade(args: Args) { let (mut data, errors, logs, traces): (Data, Vec, Vec, Vec) = msgpack::deserialize(reader).unwrap(); + let community_id = ic_cdk::id().into(); for channel in data.channels.iter_mut() { channel.chat.members.set_member_default_timestamps(); + channel + .chat + .members + .set_chat(MultiUserChat::Channel(community_id, channel.id)); } canister_logger::init_with_logs(data.test_mode, errors, logs, traces); @@ -44,4 +52,6 @@ fn post_upgrade(args: Args) { .data .record_instructions_count(InstructionCountFunctionId::PostUpgrade, now) }); + + MigrateMembersToStableMemoryJob.execute(); } diff --git a/backend/canisters/community/impl/src/timer_job_types.rs b/backend/canisters/community/impl/src/timer_job_types.rs index 3562dcc392..e3625ef095 100644 --- a/backend/canisters/community/impl/src/timer_job_types.rs +++ b/backend/canisters/community/impl/src/timer_job_types.rs @@ -7,7 +7,7 @@ use chat_events::MessageContentInternal; use constants::{DAY_IN_MS, MINUTE_IN_MS, NANOS_PER_MILLISECOND, SECOND_IN_MS}; use ledger_utils::process_transaction; use serde::{Deserialize, Serialize}; -use tracing::error; +use tracing::{error, info}; use types::{ BlobReference, CanisterId, ChannelId, ChatId, MessageId, MessageIndex, P2PSwapStatus, PendingCryptoTransaction, UserId, }; @@ -21,14 +21,13 @@ pub enum TimerJob { FinalizeGroupImport(FinalizeGroupImportJob), ProcessGroupImportChannelMembers(ProcessGroupImportChannelMembersJob), MarkGroupImportComplete(MarkGroupImportCompleteJob), - // TODO: Remove this serde attribute post release - #[serde(alias = "RefundPrize")] FinalPrizePayments(FinalPrizePaymentsJob), MakeTransfer(MakeTransferJob), NotifyEscrowCanisterOfDeposit(NotifyEscrowCanisterOfDepositJob), CancelP2PSwapInEscrowCanister(CancelP2PSwapInEscrowCanisterJob), MarkP2PSwapExpired(MarkP2PSwapExpiredJob), MarkVideoCallEnded(MarkVideoCallEndedJob), + MigrateMembersToStableMemory(MigrateMembersToStableMemoryJob), } #[derive(Serialize, Deserialize, Clone)] @@ -141,6 +140,9 @@ pub struct MarkP2PSwapExpiredJob { #[derive(Serialize, Deserialize, Clone)] pub struct MarkVideoCallEndedJob(pub community_canister::end_video_call::Args); +#[derive(Serialize, Deserialize, Clone)] +pub struct MigrateMembersToStableMemoryJob; + impl Job for TimerJob { fn execute(self) { if can_borrow_state() { @@ -161,6 +163,7 @@ impl Job for TimerJob { TimerJob::CancelP2PSwapInEscrowCanister(job) => job.execute(), TimerJob::MarkP2PSwapExpired(job) => job.execute(), TimerJob::MarkVideoCallEnded(job) => job.execute(), + TimerJob::MigrateMembersToStableMemory(job) => job.execute(), } } } @@ -461,3 +464,27 @@ impl Job for MarkVideoCallEndedJob { mutate_state(|state| end_video_call_impl(self.0, state)); } } + +impl Job for MigrateMembersToStableMemoryJob { + fn execute(self) { + mutate_state(|state| { + let mut complete = true; + for channel in state.data.channels.iter_mut() { + if !channel.chat.members.migrate_next_batch_to_stable_memory() { + complete = false; + break; + } + } + if !complete { + let now = state.env.now(); + state + .data + .timer_jobs + .enqueue_job(TimerJob::MigrateMembersToStableMemory(self), now, now); + } else { + state.data.members_migrated_to_stable_memory = true; + info!("Finished migrating members to stable memory"); + } + }) + } +} diff --git a/backend/canisters/community/impl/src/updates/delete_channel.rs b/backend/canisters/community/impl/src/updates/delete_channel.rs index 4a7ce6b1a7..6b679872a3 100644 --- a/backend/canisters/community/impl/src/updates/delete_channel.rs +++ b/backend/canisters/community/impl/src/updates/delete_channel.rs @@ -54,17 +54,20 @@ fn delete_channel_impl(channel_id: ChannelId, state: &mut RuntimeState) -> Respo state .data .stable_memory_keys_to_garbage_collect - .push(KeyPrefix::Channel(ChannelKeyPrefix::new(channel_id))); + .push(KeyPrefix::Channel(ChannelKeyPrefix::new(channel_id)).to_vec()); for message_index in channel.chat.events.thread_keys() { state .data .stable_memory_keys_to_garbage_collect - .push(KeyPrefix::ChannelThread(ChannelThreadKeyPrefix::new( - channel_id, - message_index, - ))); + .push(KeyPrefix::ChannelThread(ChannelThreadKeyPrefix::new(channel_id, message_index)).to_vec()); } + + state + .data + .stable_memory_keys_to_garbage_collect + .push(group_chat_core::MembersKeyPrefix::Channel(channel_id.as_u32()).to_vec()); + crate::jobs::garbage_collect_stable_memory::start_job_if_required(state); state.data.events.push_event( diff --git a/backend/canisters/group/CHANGELOG.md b/backend/canisters/group/CHANGELOG.md index 2859ed4522..7126ab1156 100644 --- a/backend/canisters/group/CHANGELOG.md +++ b/backend/canisters/group/CHANGELOG.md @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). - Move members to `MembersMap` in prep for stable memory ([#6927](https://github.com/open-chat-labs/open-chat/pull/6927)) - Only handle a single bot action ([#6929](https://github.com/open-chat-labs/open-chat/pull/6929)) - Implement `MembersStableStorage` which stores members in stable memory ([#6931](https://github.com/open-chat-labs/open-chat/pull/6931)) +- Migrate chat members to stable memory using timer job ([#6933](https://github.com/open-chat-labs/open-chat/pull/6933)) ## [[2.0.1480](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1480-group)] - 2024-11-28 diff --git a/backend/canisters/group/impl/src/jobs/garbage_collect_stable_memory.rs b/backend/canisters/group/impl/src/jobs/garbage_collect_stable_memory.rs index fa7ee8ce6e..a4153f09af 100644 --- a/backend/canisters/group/impl/src/jobs/garbage_collect_stable_memory.rs +++ b/backend/canisters/group/impl/src/jobs/garbage_collect_stable_memory.rs @@ -23,18 +23,12 @@ fn run() { TIMER_ID.set(None); mutate_state(|state| { while let Some(prefix) = state.data.stable_memory_keys_to_garbage_collect.pop() { - let result = chat_events::ChatEvents::garbage_collect_stable_memory(prefix.clone()); + let result = stable_memory_map::garbage_collect(prefix.clone()); let (count, complete) = match result { Ok(c) => (c, true), Err(c) => (c, false), }; - let thread_root_message_index = prefix.thread_root_message_index(); - info!( - count, - ?thread_root_message_index, - complete, - "Garbage collected keys from stable memory" - ); + info!(count, complete, "Garbage collected keys from stable memory"); if !complete { state.data.stable_memory_keys_to_garbage_collect.push(prefix); break; diff --git a/backend/canisters/group/impl/src/lib.rs b/backend/canisters/group/impl/src/lib.rs index d76c038724..e91e06f580 100644 --- a/backend/canisters/group/impl/src/lib.rs +++ b/backend/canisters/group/impl/src/lib.rs @@ -361,9 +361,7 @@ impl RuntimeState { for thread in result.threads { self.data .stable_memory_keys_to_garbage_collect - .push(KeyPrefix::GroupChatThread(GroupChatThreadKeyPrefix::new( - thread.root_message_index, - ))); + .push(KeyPrefix::GroupChatThread(GroupChatThreadKeyPrefix::new(thread.root_message_index)).to_vec()); } jobs::garbage_collect_stable_memory::start_job_if_required(self); } @@ -420,7 +418,7 @@ impl RuntimeState { .unwrap_or_default(), event_store_client_info: self.data.event_store_client.info(), timer_jobs: self.data.timer_jobs.len() as u32, - stable_memory_event_migration_complete: self.data.stable_memory_event_migration_complete, + members_migrated_to_stable_memory: self.data.members_migrated_to_stable_memory, canister_ids: CanisterIds { user_index: self.data.user_index_canister_id, group_index: self.data.group_index_canister_id, @@ -474,8 +472,9 @@ struct Data { expiring_member_actions: ExpiringMemberActions, user_cache: UserCache, user_event_sync_queue: GroupedTimerJobQueue, - stable_memory_event_migration_complete: bool, - stable_memory_keys_to_garbage_collect: Vec, + #[serde(default)] + members_migrated_to_stable_memory: bool, + stable_memory_keys_to_garbage_collect: Vec>, } fn init_instruction_counts_log() -> InstructionCountsLog { @@ -573,8 +572,8 @@ impl Data { expiring_member_actions: ExpiringMemberActions::default(), user_cache: UserCache::default(), user_event_sync_queue: GroupedTimerJobQueue::new(5, true), - stable_memory_event_migration_complete: true, stable_memory_keys_to_garbage_collect: Vec::new(), + members_migrated_to_stable_memory: true, } } @@ -730,7 +729,7 @@ pub struct Metrics { pub serialized_chat_state_bytes: u64, pub event_store_client_info: EventStoreClientInfo, pub timer_jobs: u32, - pub stable_memory_event_migration_complete: bool, + pub members_migrated_to_stable_memory: bool, pub canister_ids: CanisterIds, } diff --git a/backend/canisters/group/impl/src/lifecycle/post_upgrade.rs b/backend/canisters/group/impl/src/lifecycle/post_upgrade.rs index 0215b6fbc7..d87ffddc46 100644 --- a/backend/canisters/group/impl/src/lifecycle/post_upgrade.rs +++ b/backend/canisters/group/impl/src/lifecycle/post_upgrade.rs @@ -1,13 +1,16 @@ use crate::lifecycle::{init_env, init_state}; use crate::memory::{get_stable_memory_map_memory, get_upgrades_memory}; +use crate::timer_job_types::MigrateMembersToStableMemoryJob; use crate::{read_state, Data}; use canister_logger::LogEntry; +use canister_timer_jobs::Job; use canister_tracing_macros::trace; use group_canister::post_upgrade::Args; use ic_cdk::post_upgrade; use instruction_counts_log::InstructionCountFunctionId; use stable_memory::get_reader; use tracing::info; +use types::MultiUserChat; #[post_upgrade] #[trace] @@ -21,6 +24,7 @@ fn post_upgrade(args: Args) { msgpack::deserialize(reader).unwrap(); data.chat.members.set_member_default_timestamps(); + data.chat.members.set_chat(MultiUserChat::Group(ic_cdk::id().into())); canister_logger::init_with_logs(data.test_mode, errors, logs, traces); @@ -35,4 +39,6 @@ fn post_upgrade(args: Args) { .data .record_instructions_count(InstructionCountFunctionId::PostUpgrade, now) }); + + MigrateMembersToStableMemoryJob.execute(); } diff --git a/backend/canisters/group/impl/src/timer_job_types.rs b/backend/canisters/group/impl/src/timer_job_types.rs index 842c1c67f1..0a4a67e427 100644 --- a/backend/canisters/group/impl/src/timer_job_types.rs +++ b/backend/canisters/group/impl/src/timer_job_types.rs @@ -7,7 +7,7 @@ use chat_events::MessageContentInternal; use constants::{DAY_IN_MS, MINUTE_IN_MS, NANOS_PER_MILLISECOND, SECOND_IN_MS}; use ledger_utils::process_transaction; use serde::{Deserialize, Serialize}; -use tracing::error; +use tracing::{error, info}; use types::{BlobReference, CanisterId, MessageId, MessageIndex, P2PSwapStatus, PendingCryptoTransaction, UserId}; #[derive(Serialize, Deserialize, Clone)] @@ -24,6 +24,7 @@ pub enum TimerJob { CancelP2PSwapInEscrowCanister(CancelP2PSwapInEscrowCanisterJob), MarkP2PSwapExpired(MarkP2PSwapExpiredJob), MarkVideoCallEnded(MarkVideoCallEndedJob), + MigrateMembersToStableMemory(MigrateMembersToStableMemoryJob), } #[derive(Serialize, Deserialize, Clone)] @@ -111,6 +112,9 @@ pub struct MarkP2PSwapExpiredJob { #[derive(Serialize, Deserialize, Clone)] pub struct MarkVideoCallEndedJob(pub group_canister::end_video_call::Args); +#[derive(Serialize, Deserialize, Clone)] +pub struct MigrateMembersToStableMemoryJob; + impl Job for TimerJob { fn execute(self) { if can_borrow_state() { @@ -128,6 +132,7 @@ impl Job for TimerJob { TimerJob::CancelP2PSwapInEscrowCanister(job) => job.execute(), TimerJob::MarkP2PSwapExpired(job) => job.execute(), TimerJob::MarkVideoCallEnded(job) => job.execute(), + TimerJob::MigrateMembersToStableMemory(job) => job.execute(), } } } @@ -394,3 +399,21 @@ impl Job for MarkVideoCallEndedJob { mutate_state(|state| end_video_call_impl(self.0, state)); } } + +impl Job for MigrateMembersToStableMemoryJob { + fn execute(self) { + mutate_state(|state| { + let complete = state.data.chat.members.migrate_next_batch_to_stable_memory(); + if !complete { + let now = state.env.now(); + state + .data + .timer_jobs + .enqueue_job(TimerJob::MigrateMembersToStableMemory(self), now, now); + } else { + state.data.members_migrated_to_stable_memory = true; + info!("Finished migrating members to stable memory"); + } + }) + } +} diff --git a/backend/canisters/user/impl/src/jobs/garbage_collect_stable_memory.rs b/backend/canisters/user/impl/src/jobs/garbage_collect_stable_memory.rs index fa7ee8ce6e..a4153f09af 100644 --- a/backend/canisters/user/impl/src/jobs/garbage_collect_stable_memory.rs +++ b/backend/canisters/user/impl/src/jobs/garbage_collect_stable_memory.rs @@ -23,18 +23,12 @@ fn run() { TIMER_ID.set(None); mutate_state(|state| { while let Some(prefix) = state.data.stable_memory_keys_to_garbage_collect.pop() { - let result = chat_events::ChatEvents::garbage_collect_stable_memory(prefix.clone()); + let result = stable_memory_map::garbage_collect(prefix.clone()); let (count, complete) = match result { Ok(c) => (c, true), Err(c) => (c, false), }; - let thread_root_message_index = prefix.thread_root_message_index(); - info!( - count, - ?thread_root_message_index, - complete, - "Garbage collected keys from stable memory" - ); + info!(count, complete, "Garbage collected keys from stable memory"); if !complete { state.data.stable_memory_keys_to_garbage_collect.push(prefix); break; diff --git a/backend/canisters/user/impl/src/lib.rs b/backend/canisters/user/impl/src/lib.rs index b6ab313742..eb15a9d6db 100644 --- a/backend/canisters/user/impl/src/lib.rs +++ b/backend/canisters/user/impl/src/lib.rs @@ -12,7 +12,6 @@ use crate::timer_job_types::{RemoveExpiredEventsJob, TimerJob}; use candid::Principal; use canister_state_macros::canister_state; use canister_timer_jobs::TimerJobs; -use chat_events::KeyPrefix; use constants::{DAY_IN_MS, MINUTE_IN_MS, OPENCHAT_BOT_USER_ID}; use event_store_producer::{EventStoreClient, EventStoreClientBuilder, EventStoreClientInfo}; use event_store_producer_cdk_runtime::CdkRuntime; @@ -256,7 +255,7 @@ struct Data { pub referred_by: Option, pub referrals: Referrals, pub message_activity_events: MessageActivityEvents, - pub stable_memory_keys_to_garbage_collect: Vec, + pub stable_memory_keys_to_garbage_collect: Vec>, } impl Data { diff --git a/backend/canisters/user/impl/src/updates/delete_direct_chat.rs b/backend/canisters/user/impl/src/updates/delete_direct_chat.rs index 2398074321..ed18fae7da 100644 --- a/backend/canisters/user/impl/src/updates/delete_direct_chat.rs +++ b/backend/canisters/user/impl/src/updates/delete_direct_chat.rs @@ -22,15 +22,12 @@ fn delete_direct_chat_impl(args: Args, state: &mut RuntimeState) -> Response { state .data .stable_memory_keys_to_garbage_collect - .push(KeyPrefix::DirectChat(DirectChatKeyPrefix::new(args.user_id))); + .push(KeyPrefix::DirectChat(DirectChatKeyPrefix::new(args.user_id)).to_vec()); for message_index in chat.events.thread_keys() { state .data .stable_memory_keys_to_garbage_collect - .push(KeyPrefix::DirectChatThread(DirectChatThreadKeyPrefix::new( - args.user_id, - message_index, - ))); + .push(KeyPrefix::DirectChatThread(DirectChatThreadKeyPrefix::new(args.user_id, message_index)).to_vec()); } crate::jobs::garbage_collect_stable_memory::start_job_if_required(state); Success diff --git a/backend/integration_tests/src/client/community.rs b/backend/integration_tests/src/client/community.rs index 7ed695af62..f85ffa523b 100644 --- a/backend/integration_tests/src/client/community.rs +++ b/backend/integration_tests/src/client/community.rs @@ -2,7 +2,7 @@ use crate::{generate_msgpack_query_call, generate_msgpack_update_call}; use community_canister::*; use ic_stable_structures::memory_manager::MemoryId; -pub const CHAT_EVENTS_MEMORY_ID: MemoryId = MemoryId::new(3); +pub const STABLE_MEMORY_MAP_MEMORY_ID: MemoryId = MemoryId::new(3); // Queries generate_msgpack_query_call!(channel_summary); diff --git a/backend/integration_tests/src/communities/delete_channel_tests.rs b/backend/integration_tests/src/communities/delete_channel_tests.rs index dfd09cb4a0..e8942609a1 100644 --- a/backend/integration_tests/src/communities/delete_channel_tests.rs +++ b/backend/integration_tests/src/communities/delete_channel_tests.rs @@ -1,6 +1,6 @@ -use crate::client::community::CHAT_EVENTS_MEMORY_ID; +use crate::client::community::STABLE_MEMORY_MAP_MEMORY_ID; use crate::env::ENV; -use crate::stable_memory::count_stable_memory_event_keys; +use crate::stable_memory::get_stable_memory_map; use crate::{client, CanisterIds, TestEnv, User}; use candid::Principal; use pocket_ic::PocketIc; @@ -65,33 +65,44 @@ fn stable_memory_garbage_collected_after_deleting_channel() { .. } = init_test_data(env, canister_ids, *controller); - assert_eq!(count_stable_memory_event_keys(env, community_id, CHAT_EVENTS_MEMORY_ID), 4); + let stable_map = get_stable_memory_map(env, community_id, STABLE_MEMORY_MAP_MEMORY_ID); + // 8 keys = 2 events and 2 users per channel + assert_eq!(stable_map.len(), 8); for _ in 0..100 { client::community::happy_path::send_text_message(env, &user1, community_id, channel_id1, None, random_string(), None); } - assert_eq!(count_stable_memory_event_keys(env, community_id, CHAT_EVENTS_MEMORY_ID), 104); + assert_eq!( + get_stable_memory_map(env, community_id, STABLE_MEMORY_MAP_MEMORY_ID).len(), + 108 + ); for _ in 0..80 { client::community::happy_path::send_text_message(env, &user1, community_id, channel_id2, None, random_string(), None); } - assert_eq!(count_stable_memory_event_keys(env, community_id, CHAT_EVENTS_MEMORY_ID), 184); + assert_eq!( + get_stable_memory_map(env, community_id, STABLE_MEMORY_MAP_MEMORY_ID).len(), + 188 + ); client::community::happy_path::delete_channel(env, user1.principal, community_id, channel_id1); env.advance_time(Duration::from_secs(60)); env.tick(); - assert_eq!(count_stable_memory_event_keys(env, community_id, CHAT_EVENTS_MEMORY_ID), 82); + assert_eq!( + get_stable_memory_map(env, community_id, STABLE_MEMORY_MAP_MEMORY_ID).len(), + 84 + ); client::community::happy_path::delete_channel(env, user1.principal, community_id, channel_id2); env.advance_time(Duration::from_secs(60)); env.tick(); - assert_eq!(count_stable_memory_event_keys(env, community_id, CHAT_EVENTS_MEMORY_ID), 0); + assert_eq!(get_stable_memory_map(env, community_id, STABLE_MEMORY_MAP_MEMORY_ID).len(), 0); } fn init_test_data(env: &mut PocketIc, canister_ids: &CanisterIds, controller: Principal) -> TestData { diff --git a/backend/integration_tests/src/communities/disappearing_message_tests.rs b/backend/integration_tests/src/communities/disappearing_message_tests.rs index 454be29181..66732cd5ec 100644 --- a/backend/integration_tests/src/communities/disappearing_message_tests.rs +++ b/backend/integration_tests/src/communities/disappearing_message_tests.rs @@ -1,6 +1,6 @@ -use crate::client::community::CHAT_EVENTS_MEMORY_ID; +use crate::client::community::STABLE_MEMORY_MAP_MEMORY_ID; use crate::env::ENV; -use crate::stable_memory::count_stable_memory_event_keys; +use crate::stable_memory::get_stable_memory_map; use crate::{client, TestEnv}; use std::ops::Deref; use std::time::Duration; @@ -159,7 +159,10 @@ fn stable_memory_garbage_collected_after_messages_disappear() { } } - assert_eq!(count_stable_memory_event_keys(env, community_id, CHAT_EVENTS_MEMORY_ID), 32); + assert_eq!( + get_stable_memory_map(env, community_id, STABLE_MEMORY_MAP_MEMORY_ID).len(), + 33 + ); // Tick once to expire the messages env.advance_time(Duration::from_secs(2)); @@ -169,5 +172,5 @@ fn stable_memory_garbage_collected_after_messages_disappear() { env.advance_time(Duration::from_secs(60)); env.tick(); - assert_eq!(count_stable_memory_event_keys(env, community_id, CHAT_EVENTS_MEMORY_ID), 2); + assert_eq!(get_stable_memory_map(env, community_id, STABLE_MEMORY_MAP_MEMORY_ID).len(), 3); } diff --git a/backend/integration_tests/src/disappearing_message_tests.rs b/backend/integration_tests/src/disappearing_message_tests.rs index b52189b7e5..e40a51e561 100644 --- a/backend/integration_tests/src/disappearing_message_tests.rs +++ b/backend/integration_tests/src/disappearing_message_tests.rs @@ -1,6 +1,6 @@ use crate::client::group::CHAT_EVENTS_MEMORY_ID; use crate::env::ENV; -use crate::stable_memory::count_stable_memory_event_keys; +use crate::stable_memory::get_stable_memory_map; use crate::{client, TestEnv}; use std::ops::Deref; use std::time::Duration; @@ -241,7 +241,7 @@ fn stable_memory_garbage_collected_after_messages_disappear() { } } - assert_eq!(count_stable_memory_event_keys(env, group_id, CHAT_EVENTS_MEMORY_ID), 32); + assert_eq!(get_stable_memory_map(env, group_id, CHAT_EVENTS_MEMORY_ID).len(), 33); // Tick once to expire the messages env.advance_time(Duration::from_secs(2)); @@ -251,5 +251,5 @@ fn stable_memory_garbage_collected_after_messages_disappear() { env.advance_time(Duration::from_secs(60)); env.tick(); - assert_eq!(count_stable_memory_event_keys(env, group_id, CHAT_EVENTS_MEMORY_ID), 2); + assert_eq!(get_stable_memory_map(env, group_id, CHAT_EVENTS_MEMORY_ID).len(), 3); } diff --git a/backend/integration_tests/src/stable_memory.rs b/backend/integration_tests/src/stable_memory.rs index ff48454bff..34f76fd147 100644 --- a/backend/integration_tests/src/stable_memory.rs +++ b/backend/integration_tests/src/stable_memory.rs @@ -4,10 +4,12 @@ use pocket_ic::PocketIc; use std::cell::RefCell; use types::CanisterId; -pub fn count_stable_memory_event_keys(env: &PocketIc, canister_id: impl Into, memory_id: MemoryId) -> u64 { +pub fn get_stable_memory_map( + env: &PocketIc, + canister_id: impl Into, + memory_id: MemoryId, +) -> StableBTreeMap, Vec, VirtualMemory> { let memory = VectorMemory::new(RefCell::new(env.get_stable_memory(canister_id.into()))); let memory_manager = MemoryManager::init(memory); - let chat_events_memory = memory_manager.get(memory_id); - let map: StableBTreeMap, Vec, VirtualMemory> = StableBTreeMap::load(chat_events_memory); - map.len() + StableBTreeMap::load(memory_manager.get(memory_id)) } diff --git a/backend/libraries/chat_events/src/chat_events.rs b/backend/libraries/chat_events/src/chat_events.rs index 375b06158c..6a8f7905ba 100644 --- a/backend/libraries/chat_events/src/chat_events.rs +++ b/backend/libraries/chat_events/src/chat_events.rs @@ -3,7 +3,6 @@ use crate::expiring_events::ExpiringEvents; use crate::last_updated_timestamps::LastUpdatedTimestamps; use crate::metrics::{ChatMetricsInternal, MetricKey}; use crate::search_index::SearchIndex; -use crate::stable_memory::key::KeyPrefix; use crate::*; use constants::{HOUR_IN_MS, OPENCHAT_BOT_USER_ID}; use event_store_producer::{EventBuilder, EventStoreClient, Runtime}; @@ -84,10 +83,6 @@ impl ChatEvents { stable_memory::write_events_as_bytes(chat, events); } - pub fn garbage_collect_stable_memory(prefix: KeyPrefix) -> Result { - stable_memory::garbage_collect(prefix) - } - pub fn set_stable_memory_key_prefixes(&mut self) { self.main.set_stable_memory_prefix(self.chat, None); for (message_index, events) in self.threads.iter_mut() { diff --git a/backend/libraries/chat_events/src/stable_memory/mod.rs b/backend/libraries/chat_events/src/stable_memory/mod.rs index 70fd1bc698..94f2da4a9f 100644 --- a/backend/libraries/chat_events/src/stable_memory/mod.rs +++ b/backend/libraries/chat_events/src/stable_memory/mod.rs @@ -15,33 +15,6 @@ pub mod key; #[cfg(test)] mod tests; -pub fn garbage_collect(prefix: KeyPrefix) -> Result { - let start = Key::new(prefix.clone(), EventIndex::default()); - let mut total_count = 0; - with_map_mut(|m| { - // If < 1B instructions have been used so far, delete another 100 keys, or exit if complete - while ic_cdk::api::instruction_counter() < 1_000_000_000 { - let keys: Vec<_> = m - .range(start.to_vec()..) - .map_while(|(k, _)| Key::try_from(k.as_slice()).ok()) - .take_while(|k| *k.prefix() == prefix) - .take(100) - .collect(); - - let batch_count = keys.len() as u32; - total_count += batch_count; - for key in keys { - m.remove(&key.to_vec()); - } - // If batch count < 100 then we are finished - if batch_count < 100 { - return Ok(total_count); - } - } - Err(total_count) - }) -} - // Used to efficiently read all events from stable memory when migrating a group into a community pub fn read_events_as_bytes(chat: Chat, after: Option, max_bytes: usize) -> Vec<(EventContext, ByteBuf)> { let key = match after { diff --git a/backend/libraries/group_chat_core/Cargo.toml b/backend/libraries/group_chat_core/Cargo.toml index a5e71ed359..ea417f8546 100644 --- a/backend/libraries/group_chat_core/Cargo.toml +++ b/backend/libraries/group_chat_core/Cargo.toml @@ -11,6 +11,7 @@ chat_events = { path = "../chat_events" } constants = { path = "../constants" } event_store_producer = { workspace = true } group_community_common = { path = "../group_community_common" } +ic-cdk = { workspace = true } itertools = { workspace = true } lazy_static = { workspace = true } msgpack = { path = "../msgpack" } @@ -18,10 +19,12 @@ regex-lite = { workspace = true } search = { path = "../search" } serde = { workspace = true } stable_memory_map = { path = "../stable_memory_map" } +tracing = { workspace = true } types = { path = "../types" } utils = { path = "../utils" } [dev-dependencies] +ic-stable-structures = { workspace = true } msgpack = { path = "../msgpack" } proptest = { workspace = true } rand = { workspace = true } diff --git a/backend/libraries/group_chat_core/src/lib.rs b/backend/libraries/group_chat_core/src/lib.rs index 34f5292949..224879d2dd 100644 --- a/backend/libraries/group_chat_core/src/lib.rs +++ b/backend/libraries/group_chat_core/src/lib.rs @@ -87,7 +87,7 @@ impl GroupChatCore { external_url: Option, now: TimestampMillis, ) -> GroupChatCore { - let members = GroupMembers::new(created_by, created_by_user_type, now); + let members = GroupMembers::new(created_by, created_by_user_type, chat, now); let events = ChatEvents::new_group_chat( chat, name.clone(), diff --git a/backend/libraries/group_chat_core/src/members.rs b/backend/libraries/group_chat_core/src/members.rs index a759dd6408..f39e77faf6 100644 --- a/backend/libraries/group_chat_core/src/members.rs +++ b/backend/libraries/group_chat_core/src/members.rs @@ -1,3 +1,4 @@ +use crate::members::stable_memory::MembersStableStorage; use crate::members_map::{HeapMembersMap, MembersMap}; use crate::mentions::Mentions; use crate::roles::GroupRoleInternal; @@ -8,11 +9,12 @@ use group_community_common::{Member, MemberUpdate, Members}; use serde::{Deserialize, Serialize}; use std::cell::OnceCell; use std::cmp::max; -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::ops::Deref; +use tracing::info; use types::{ - is_default, EventIndex, GroupMember, GroupPermissions, MessageIndex, TimestampMillis, Timestamped, UserId, UserType, - Version, + is_default, EventIndex, GroupMember, GroupPermissions, MessageIndex, MultiUserChat, TimestampMillis, Timestamped, UserId, + UserType, Version, }; use utils::timestamped_set::TimestampedSet; @@ -20,11 +22,15 @@ use utils::timestamped_set::TimestampedSet; mod proptests; mod stable_memory; +pub use stable_memory::KeyPrefix as MembersKeyPrefix; + const MAX_MEMBERS_PER_GROUP: u32 = 100_000; #[derive(Serialize, Deserialize)] pub struct GroupMembers { members: HeapMembersMap, + #[serde(default = "default_stable_memory_members_map")] + stable_memory_members_map: MembersStableStorage, member_ids: BTreeSet, owners: BTreeSet, admins: BTreeSet, @@ -36,25 +42,57 @@ pub struct GroupMembers { suspended: BTreeSet, updates: BTreeSet<(TimestampMillis, UserId, MemberUpdate)>, latest_update_removed: TimestampMillis, + #[serde(default)] + migrate_to_stable_memory_queue: VecDeque, + #[serde(default)] + migration_to_stable_memory_complete: bool, +} + +fn default_stable_memory_members_map() -> MembersStableStorage { + MembersStableStorage::new_empty() } #[allow(clippy::too_many_arguments)] impl GroupMembers { - pub fn set_member_default_timestamps(&mut self) { - for member in self.members.values_mut() { - member.set_default_timestamps(); + pub fn migrate_next_batch_to_stable_memory(&mut self) -> bool { + if self.migration_to_stable_memory_complete { + return true; + } + if self.migrate_to_stable_memory_queue.is_empty() { + // This is the first iteration, populate the queue + self.migrate_to_stable_memory_queue = self.member_ids.iter().copied().collect(); } - } - pub fn prune_proposal_votes(&mut self, now: TimestampMillis) -> u32 { + // Migrate 100 at a time and exit if we exceed 2B instructions let mut count = 0; + while !self.migrate_to_stable_memory_queue.is_empty() && ic_cdk::api::instruction_counter() < 2_000_000_000 { + for _ in 0..100 { + if let Some(next) = self.migrate_to_stable_memory_queue.pop_front() { + let member = self.members.get(&next).unwrap(); + self.stable_memory_members_map.insert(member); + count += 1 + } else { + break; + } + } + } + + info!(count, "Migrated users to stable memory"); + + let complete = self.migrate_to_stable_memory_queue.is_empty(); + if complete { + self.migration_to_stable_memory_complete = true; + } + complete + } + + pub fn set_member_default_timestamps(&mut self) { for member in self.members.values_mut() { - count += member.prune_proposal_votes(now); + member.set_default_timestamps(); } - count } - pub fn new(creator_user_id: UserId, user_type: UserType, now: TimestampMillis) -> GroupMembers { + pub fn new(creator_user_id: UserId, user_type: UserType, chat: MultiUserChat, now: TimestampMillis) -> GroupMembers { let member = GroupMemberInternal { user_id: creator_user_id, date_added: now, @@ -75,6 +113,7 @@ impl GroupMembers { GroupMembers { members: HeapMembersMap::new(member.clone()), + stable_memory_members_map: MembersStableStorage::new(chat, member), member_ids: [creator_user_id].into_iter().collect(), owners: [creator_user_id].into_iter().collect(), admins: BTreeSet::new(), @@ -90,9 +129,15 @@ impl GroupMembers { suspended: BTreeSet::new(), updates: BTreeSet::new(), latest_update_removed: 0, + migrate_to_stable_memory_queue: VecDeque::default(), + migration_to_stable_memory_complete: true, } } + pub fn set_chat(&mut self, chat: MultiUserChat) { + self.stable_memory_members_map.set_chat(chat); + } + pub fn add( &mut self, user_id: UserId, @@ -124,7 +169,7 @@ impl GroupMembers { user_type, lapsed: Timestamped::default(), }; - self.members.insert(member.clone()); + self.insert_internal(member.clone()); if user_type.is_bot() { self.bots.insert(user_id, user_type); } @@ -139,7 +184,7 @@ impl GroupMembers { } pub fn remove(&mut self, user_id: UserId, now: TimestampMillis) -> Option { - if let Some(member) = self.members.remove(&user_id) { + if let Some(member) = self.remove_internal(&user_id) { match member.role.value { GroupRoleInternal::Owner => self.owners.remove(&user_id), GroupRoleInternal::Admin => self.admins.remove(&user_id), @@ -209,7 +254,7 @@ impl GroupMembers { } pub fn get(&self, user_id: &UserId) -> Option { - self.members.get(user_id) + self.get_internal(user_id) } pub fn get_bot(&self, bot_user_id: &UserId) -> Option { @@ -225,11 +270,11 @@ impl GroupMembers { user_id: &UserId, update_fn: F, ) -> Option { - let mut member = self.members.get(user_id)?; + let mut member = self.get_internal(user_id)?; let updated = update_fn(&mut member); if updated { - self.members.insert(member); + self.insert_internal(member); } Some(updated) } @@ -285,7 +330,7 @@ impl GroupMembers { } } - let member = match self.members.get(&user_id) { + let member = match self.get_internal(&user_id) { Some(p) => p, None => return TargetUserNotInGroup, }; @@ -486,8 +531,26 @@ impl GroupMembers { removed.len() as u32 } + fn get_internal(&self, user_id: &UserId) -> Option { + // if self.migration_to_stable_memory_complete { + // self.stable_memory_members_map.get(user_id) + // } else { + self.members.get(user_id) + // } + } + + fn insert_internal(&mut self, member: GroupMemberInternal) { + self.members.insert(member.clone()); + self.stable_memory_members_map.insert(member); + } + + fn remove_internal(&mut self, user_id: &UserId) -> Option { + self.stable_memory_members_map.remove(user_id); + self.members.remove(user_id) + } + #[cfg(test)] - fn check_invariants(&self) { + fn check_invariants(&self, stable_map: bool) { let mut member_ids = BTreeSet::new(); let mut owners = BTreeSet::new(); let mut admins = BTreeSet::new(); @@ -496,7 +559,9 @@ impl GroupMembers { let mut lapsed = BTreeSet::new(); let mut suspended = BTreeSet::new(); - for member in self.members.values() { + let all_members = if stable_map { self.stable_memory_members_map.all_members() } else { self.members.all_members() }; + + for member in all_members { member_ids.insert(member.user_id); match member.role.value { @@ -792,8 +857,7 @@ impl VerifiedGroupMember<'_> { } fn resolve_member(&self) -> &GroupMemberInternal { - self.member - .get_or_init(|| self.members.members.get(&self.user_id).clone().unwrap()) + self.member.get_or_init(|| self.members.get_internal(&self.user_id).unwrap()) } } diff --git a/backend/libraries/group_chat_core/src/members/proptests.rs b/backend/libraries/group_chat_core/src/members/proptests.rs index aa65a44850..24623c2666 100644 --- a/backend/libraries/group_chat_core/src/members/proptests.rs +++ b/backend/libraries/group_chat_core/src/members/proptests.rs @@ -1,11 +1,13 @@ use crate::{GroupMembers, GroupRoleInternal}; use candid::Principal; +use ic_stable_structures::memory_manager::{MemoryId, MemoryManager}; +use ic_stable_structures::DefaultMemoryImpl; use proptest::collection::vec as pvec; use proptest::prelude::*; use proptest::prop_oneof; use std::collections::BTreeSet; use test_strategy::proptest; -use types::{EventIndex, GroupPermissions, MessageIndex, TimestampMillis, UserId, UserType}; +use types::{EventIndex, GroupPermissions, MessageIndex, MultiUserChat, TimestampMillis, UserId, UserType}; #[derive(Debug, Clone)] enum Operation { @@ -62,7 +64,15 @@ fn operation_strategy() -> impl Strategy { #[proptest(cases = 10)] fn comprehensive(#[strategy(pvec(operation_strategy(), 100..5_000))] ops: Vec) { - let mut members = GroupMembers::new(user_id(0), UserType::User, 0); + let memory = MemoryManager::init(DefaultMemoryImpl::default()); + stable_memory_map::init(memory.get(MemoryId::new(1))); + + let mut members = GroupMembers::new( + user_id(0), + UserType::User, + MultiUserChat::Group(Principal::anonymous().into()), + 0, + ); let mut timestamp = 1000; for op in ops.into_iter() { @@ -70,7 +80,8 @@ fn comprehensive(#[strategy(pvec(operation_strategy(), 100..5_000))] ops: Vec Self { + MembersStableStorage { + prefix: KeyPrefix::GroupChat, + } + } + #[allow(dead_code)] pub fn new(chat: MultiUserChat, member: GroupMemberInternal) -> Self { let mut map = MembersStableStorage { prefix: chat.into() }; @@ -20,6 +27,10 @@ impl MembersStableStorage { map } + pub fn set_chat(&mut self, chat: MultiUserChat) { + self.prefix = chat.into(); + } + fn key(&self, user_id: UserId) -> Key { Key::new(self.prefix, user_id) } @@ -37,6 +48,16 @@ impl MembersMap for MembersStableStorage { fn remove(&mut self, user_id: &UserId) -> Option { with_map_mut(|m| m.remove(&self.key(*user_id).to_vec()).map(bytes_to_member)) } + + #[cfg(test)] + fn all_members(&self) -> Vec { + with_map(|m| { + m.range(self.key(Principal::from_slice(&[]).into()).to_vec()..) + .take_while(|(k, _)| Key::try_from(k.as_slice()).ok().filter(|k| k.prefix == self.prefix).is_some()) + .map(|(_, v)| bytes_to_member(v)) + .collect() + }) + } } fn member_to_bytes(member: &GroupMemberInternal) -> Vec { @@ -62,8 +83,7 @@ impl Key { impl Key { fn to_vec(&self) -> Vec { let user_id_bytes = self.user_id.as_slice(); - let mut bytes = Vec::with_capacity(1 + self.prefix.byte_len() + user_id_bytes.len()); - bytes.push(KeyType::ChatMember as u8); + let mut bytes = Vec::with_capacity(self.prefix.byte_len() + user_id_bytes.len()); bytes.extend_from_slice(&self.prefix.to_vec()); bytes.extend_from_slice(user_id_bytes); bytes @@ -74,20 +94,11 @@ impl TryFrom<&[u8]> for Key { type Error = (); fn try_from(value: &[u8]) -> Result { - match value.split_first() { - Some((kt, tail)) if *kt == KeyType::ChatMember as u8 => { - let prefix_len = match tail.first() { - Some(1) => 1, - Some(2) => 5, - _ => return Err(()), - }; - let prefix = KeyPrefix::try_from(&tail[..prefix_len])?; - let user_id = Principal::from_slice(&tail[prefix_len..]).into(); - - Ok(Key::new(prefix, user_id)) - } - _ => Err(()), - } + let prefix = KeyPrefix::try_from(value)?; + let prefix_bytes_len = prefix.byte_len(); + let user_id = Principal::from_slice(&value[prefix_bytes_len..]).into(); + + Ok(Key::new(prefix, user_id)) } } @@ -98,11 +109,12 @@ pub enum KeyPrefix { } impl KeyPrefix { - fn to_vec(self) -> Vec { + pub fn to_vec(self) -> Vec { match self { - KeyPrefix::GroupChat => vec![1], + KeyPrefix::GroupChat => vec![KeyType::ChatMember as u8, 1], KeyPrefix::Channel(channel_id) => { - let mut vec = Vec::with_capacity(5); + let mut vec = Vec::with_capacity(6); + vec.push(KeyType::ChatMember as u8); vec.push(2); vec.extend_from_slice(&channel_id.to_be_bytes()); vec @@ -112,8 +124,8 @@ impl KeyPrefix { fn byte_len(&self) -> usize { match self { - KeyPrefix::GroupChat => 1, - KeyPrefix::Channel(_) => 5, + KeyPrefix::GroupChat => 2, + KeyPrefix::Channel(_) => 6, } } } @@ -121,10 +133,14 @@ impl KeyPrefix { impl TryFrom<&[u8]> for KeyPrefix { type Error = (); + // The slice may extend beyond the bytes of the prefix fn try_from(value: &[u8]) -> Result { match value.split_first() { - Some((1, _)) => Ok(KeyPrefix::GroupChat), - Some((2, bytes)) if bytes.len() >= 4 => Ok(KeyPrefix::Channel(u32::from_be_bytes(bytes[..4].try_into().unwrap()))), + Some((kt, bytes)) if *kt == KeyType::ChatMember as u8 => match bytes.split_first() { + Some((1, _)) => Ok(KeyPrefix::GroupChat), + Some((2, tail)) if tail.len() >= 4 => Ok(KeyPrefix::Channel(u32::from_be_bytes(tail[..4].try_into().unwrap()))), + _ => Err(()), + }, _ => Err(()), } } @@ -184,7 +200,7 @@ mod tests { let key_in = Key::new(KeyPrefix::GroupChat, user_id); let bytes = key_in.to_vec(); - let key_out = Key::try_from(&bytes[..]).unwrap(); + let key_out = Key::try_from(bytes.as_slice()).unwrap(); assert_eq!(key_in, key_out); } @@ -199,7 +215,7 @@ mod tests { let key_in = Key::new(KeyPrefix::Channel(channel_id), user_id); let bytes = key_in.to_vec(); - let key_out = Key::try_from(&bytes[..]).unwrap(); + let key_out = Key::try_from(bytes.as_slice()).unwrap(); assert_eq!(key_in, key_out); } diff --git a/backend/libraries/group_chat_core/src/members_map.rs b/backend/libraries/group_chat_core/src/members_map.rs index ecc18b6a71..ee425476a5 100644 --- a/backend/libraries/group_chat_core/src/members_map.rs +++ b/backend/libraries/group_chat_core/src/members_map.rs @@ -10,6 +10,9 @@ pub trait MembersMap { fn get(&self, user_id: &UserId) -> Option; fn insert(&mut self, member: GroupMemberInternal); fn remove(&mut self, user_id: &UserId) -> Option; + + #[cfg(test)] + fn all_members(&self) -> Vec; } pub struct HeapMembersMap { @@ -27,11 +30,6 @@ impl HeapMembersMap { pub fn values_mut(&mut self) -> impl Iterator + '_ { self.map.values_mut() } - - #[cfg(test)] - pub fn values(&self) -> impl Iterator + '_ { - self.map.values() - } } impl MembersMap for HeapMembersMap { @@ -46,6 +44,11 @@ impl MembersMap for HeapMembersMap { fn remove(&mut self, user_id: &UserId) -> Option { self.map.remove(user_id) } + + #[cfg(test)] + fn all_members(&self) -> Vec { + self.map.values().cloned().collect() + } } impl Serialize for HeapMembersMap { diff --git a/backend/libraries/stable_memory_map/src/lib.rs b/backend/libraries/stable_memory_map/src/lib.rs index 9e00400249..7da7897442 100644 --- a/backend/libraries/stable_memory_map/src/lib.rs +++ b/backend/libraries/stable_memory_map/src/lib.rs @@ -30,6 +30,34 @@ pub fn with_map_mut, Vec, Memory>) -> MAP.with_borrow_mut(|m| f(&mut m.as_mut().unwrap().map)) } +pub fn garbage_collect(prefix: Vec) -> Result { + assert!(!prefix.is_empty()); + + let mut total_count = 0; + with_map_mut(|m| { + // If < 2B instructions have been used so far, delete another 100 keys, or exit if complete + while ic_cdk::api::instruction_counter() < 2_000_000_000 { + let keys: Vec<_> = m + .range(prefix.clone()..) + .take_while(|(k, _)| k.starts_with(&prefix)) + .map(|(k, _)| k) + .take(100) + .collect(); + + let batch_count = keys.len() as u32; + total_count += batch_count; + for key in keys { + m.remove(&key); + } + // If batch count < 100 then we are finished + if batch_count < 100 { + return Ok(total_count); + } + } + Err(total_count) + }) +} + #[derive(Copy, Clone)] #[repr(u8)] pub enum KeyType {