Skip to content

Commit

Permalink
Migrate chat members to stable memory using timer job (#6933)
Browse files Browse the repository at this point in the history
  • Loading branch information
hpeebles authored Dec 2, 2024
1 parent 7f6bce7 commit cb51d2e
Show file tree
Hide file tree
Showing 29 changed files with 324 additions and 161 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/canisters/community/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- Move members to `MembersMap` in prep for stable memory ([#6927](https://github.com/open-chat-labs/open-chat/pull/6927))
- Only handle a single bot action ([#6929](https://github.com/open-chat-labs/open-chat/pull/6929))
- Implement `MembersStableStorage` which stores members in stable memory ([#6931](https://github.com/open-chat-labs/open-chat/pull/6931))
- Migrate chat members to stable memory using timer job ([#6933](https://github.com/open-chat-labs/open-chat/pull/6933))

## [[2.0.1479](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1479-community)] - 2024-11-28

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,12 @@ fn run() {
TIMER_ID.set(None);
mutate_state(|state| {
while let Some(prefix) = state.data.stable_memory_keys_to_garbage_collect.pop() {
let result = chat_events::ChatEvents::garbage_collect_stable_memory(prefix.clone());
let result = stable_memory_map::garbage_collect(prefix.clone());
let (count, complete) = match result {
Ok(c) => (c, true),
Err(c) => (c, false),
};
let thread_root_message_index = prefix.thread_root_message_index();
info!(
count,
?thread_root_message_index,
complete,
"Garbage collected keys from stable memory"
);
info!(count, complete, "Garbage collected keys from stable memory");
if !complete {
state.data.stable_memory_keys_to_garbage_collect.push(prefix);
break;
Expand Down
17 changes: 10 additions & 7 deletions backend/canisters/community/impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,9 @@ impl RuntimeState {
}
final_prize_payments.extend(result.final_prize_payments);
for thread in result.threads {
self.data
.stable_memory_keys_to_garbage_collect
.push(KeyPrefix::ChannelThread(ChannelThreadKeyPrefix::new(
channel.id,
thread.root_message_index,
)));
self.data.stable_memory_keys_to_garbage_collect.push(
KeyPrefix::ChannelThread(ChannelThreadKeyPrefix::new(channel.id, thread.root_message_index)).to_vec(),
);
}
}
jobs::garbage_collect_stable_memory::start_job_if_required(self);
Expand Down Expand Up @@ -298,6 +295,7 @@ impl RuntimeState {
instruction_counts: self.data.instruction_counts_log.iter().collect(),
event_store_client_info: self.data.event_store_client.info(),
timer_jobs: self.data.timer_jobs.len() as u32,
members_migrated_to_stable_memory: self.data.members_migrated_to_stable_memory,
canister_ids: CanisterIds {
user_index: self.data.user_index_canister_id,
group_index: self.data.group_index_canister_id,
Expand Down Expand Up @@ -367,7 +365,10 @@ struct Data {
expiring_member_actions: ExpiringMemberActions,
user_cache: UserCache,
user_event_sync_queue: GroupedTimerJobQueue<UserEventBatch>,
stable_memory_keys_to_garbage_collect: Vec<KeyPrefix>,
#[serde(default)]
stable_memory_keys_to_garbage_collect: Vec<Vec<u8>>,
#[serde(default)]
members_migrated_to_stable_memory: bool,
}

impl Data {
Expand Down Expand Up @@ -472,6 +473,7 @@ impl Data {
user_cache: UserCache::default(),
user_event_sync_queue: GroupedTimerJobQueue::new(5, true),
stable_memory_keys_to_garbage_collect: Vec::new(),
members_migrated_to_stable_memory: true,
}
}

Expand Down Expand Up @@ -713,6 +715,7 @@ pub struct Metrics {
pub instruction_counts: Vec<InstructionCountEntry>,
pub event_store_client_info: EventStoreClientInfo,
pub timer_jobs: u32,
pub members_migrated_to_stable_memory: bool,
pub canister_ids: CanisterIds,
}

Expand Down
10 changes: 10 additions & 0 deletions backend/canisters/community/impl/src/lifecycle/post_upgrade.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use crate::jobs::import_groups::finalize_group_import;
use crate::lifecycle::{init_env, init_state};
use crate::memory::{get_stable_memory_map_memory, get_upgrades_memory};
use crate::timer_job_types::MigrateMembersToStableMemoryJob;
use crate::{read_state, Data};
use canister_logger::LogEntry;
use canister_timer_jobs::Job;
use canister_tracing_macros::trace;
use community_canister::post_upgrade::Args;
use ic_cdk::post_upgrade;
use instruction_counts_log::InstructionCountFunctionId;
use stable_memory::get_reader;
use tracing::info;
use types::MultiUserChat;

#[post_upgrade]
#[trace]
Expand All @@ -21,8 +24,13 @@ fn post_upgrade(args: Args) {
let (mut data, errors, logs, traces): (Data, Vec<LogEntry>, Vec<LogEntry>, Vec<LogEntry>) =
msgpack::deserialize(reader).unwrap();

let community_id = ic_cdk::id().into();
for channel in data.channels.iter_mut() {
channel.chat.members.set_member_default_timestamps();
channel
.chat
.members
.set_chat(MultiUserChat::Channel(community_id, channel.id));
}

canister_logger::init_with_logs(data.test_mode, errors, logs, traces);
Expand All @@ -44,4 +52,6 @@ fn post_upgrade(args: Args) {
.data
.record_instructions_count(InstructionCountFunctionId::PostUpgrade, now)
});

MigrateMembersToStableMemoryJob.execute();
}
33 changes: 30 additions & 3 deletions backend/canisters/community/impl/src/timer_job_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use chat_events::MessageContentInternal;
use constants::{DAY_IN_MS, MINUTE_IN_MS, NANOS_PER_MILLISECOND, SECOND_IN_MS};
use ledger_utils::process_transaction;
use serde::{Deserialize, Serialize};
use tracing::error;
use tracing::{error, info};
use types::{
BlobReference, CanisterId, ChannelId, ChatId, MessageId, MessageIndex, P2PSwapStatus, PendingCryptoTransaction, UserId,
};
Expand All @@ -21,14 +21,13 @@ pub enum TimerJob {
FinalizeGroupImport(FinalizeGroupImportJob),
ProcessGroupImportChannelMembers(ProcessGroupImportChannelMembersJob),
MarkGroupImportComplete(MarkGroupImportCompleteJob),
// TODO: Remove this serde attribute post release
#[serde(alias = "RefundPrize")]
FinalPrizePayments(FinalPrizePaymentsJob),
MakeTransfer(MakeTransferJob),
NotifyEscrowCanisterOfDeposit(NotifyEscrowCanisterOfDepositJob),
CancelP2PSwapInEscrowCanister(CancelP2PSwapInEscrowCanisterJob),
MarkP2PSwapExpired(MarkP2PSwapExpiredJob),
MarkVideoCallEnded(MarkVideoCallEndedJob),
MigrateMembersToStableMemory(MigrateMembersToStableMemoryJob),
}

#[derive(Serialize, Deserialize, Clone)]
Expand Down Expand Up @@ -141,6 +140,9 @@ pub struct MarkP2PSwapExpiredJob {
#[derive(Serialize, Deserialize, Clone)]
pub struct MarkVideoCallEndedJob(pub community_canister::end_video_call::Args);

#[derive(Serialize, Deserialize, Clone)]
pub struct MigrateMembersToStableMemoryJob;

impl Job for TimerJob {
fn execute(self) {
if can_borrow_state() {
Expand All @@ -161,6 +163,7 @@ impl Job for TimerJob {
TimerJob::CancelP2PSwapInEscrowCanister(job) => job.execute(),
TimerJob::MarkP2PSwapExpired(job) => job.execute(),
TimerJob::MarkVideoCallEnded(job) => job.execute(),
TimerJob::MigrateMembersToStableMemory(job) => job.execute(),
}
}
}
Expand Down Expand Up @@ -461,3 +464,27 @@ impl Job for MarkVideoCallEndedJob {
mutate_state(|state| end_video_call_impl(self.0, state));
}
}

impl Job for MigrateMembersToStableMemoryJob {
fn execute(self) {
mutate_state(|state| {
let mut complete = true;
for channel in state.data.channels.iter_mut() {
if !channel.chat.members.migrate_next_batch_to_stable_memory() {
complete = false;
break;
}
}
if !complete {
let now = state.env.now();
state
.data
.timer_jobs
.enqueue_job(TimerJob::MigrateMembersToStableMemory(self), now, now);
} else {
state.data.members_migrated_to_stable_memory = true;
info!("Finished migrating members to stable memory");
}
})
}
}
13 changes: 8 additions & 5 deletions backend/canisters/community/impl/src/updates/delete_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,20 @@ fn delete_channel_impl(channel_id: ChannelId, state: &mut RuntimeState) -> Respo
state
.data
.stable_memory_keys_to_garbage_collect
.push(KeyPrefix::Channel(ChannelKeyPrefix::new(channel_id)));
.push(KeyPrefix::Channel(ChannelKeyPrefix::new(channel_id)).to_vec());

for message_index in channel.chat.events.thread_keys() {
state
.data
.stable_memory_keys_to_garbage_collect
.push(KeyPrefix::ChannelThread(ChannelThreadKeyPrefix::new(
channel_id,
message_index,
)));
.push(KeyPrefix::ChannelThread(ChannelThreadKeyPrefix::new(channel_id, message_index)).to_vec());
}

state
.data
.stable_memory_keys_to_garbage_collect
.push(group_chat_core::MembersKeyPrefix::Channel(channel_id.as_u32()).to_vec());

crate::jobs::garbage_collect_stable_memory::start_job_if_required(state);

state.data.events.push_event(
Expand Down
1 change: 1 addition & 0 deletions backend/canisters/group/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- Move members to `MembersMap` in prep for stable memory ([#6927](https://github.com/open-chat-labs/open-chat/pull/6927))
- Only handle a single bot action ([#6929](https://github.com/open-chat-labs/open-chat/pull/6929))
- Implement `MembersStableStorage` which stores members in stable memory ([#6931](https://github.com/open-chat-labs/open-chat/pull/6931))
- Migrate chat members to stable memory using timer job ([#6933](https://github.com/open-chat-labs/open-chat/pull/6933))

## [[2.0.1480](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1480-group)] - 2024-11-28

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,12 @@ fn run() {
TIMER_ID.set(None);
mutate_state(|state| {
while let Some(prefix) = state.data.stable_memory_keys_to_garbage_collect.pop() {
let result = chat_events::ChatEvents::garbage_collect_stable_memory(prefix.clone());
let result = stable_memory_map::garbage_collect(prefix.clone());
let (count, complete) = match result {
Ok(c) => (c, true),
Err(c) => (c, false),
};
let thread_root_message_index = prefix.thread_root_message_index();
info!(
count,
?thread_root_message_index,
complete,
"Garbage collected keys from stable memory"
);
info!(count, complete, "Garbage collected keys from stable memory");
if !complete {
state.data.stable_memory_keys_to_garbage_collect.push(prefix);
break;
Expand Down
15 changes: 7 additions & 8 deletions backend/canisters/group/impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,9 +361,7 @@ impl RuntimeState {
for thread in result.threads {
self.data
.stable_memory_keys_to_garbage_collect
.push(KeyPrefix::GroupChatThread(GroupChatThreadKeyPrefix::new(
thread.root_message_index,
)));
.push(KeyPrefix::GroupChatThread(GroupChatThreadKeyPrefix::new(thread.root_message_index)).to_vec());
}
jobs::garbage_collect_stable_memory::start_job_if_required(self);
}
Expand Down Expand Up @@ -420,7 +418,7 @@ impl RuntimeState {
.unwrap_or_default(),
event_store_client_info: self.data.event_store_client.info(),
timer_jobs: self.data.timer_jobs.len() as u32,
stable_memory_event_migration_complete: self.data.stable_memory_event_migration_complete,
members_migrated_to_stable_memory: self.data.members_migrated_to_stable_memory,
canister_ids: CanisterIds {
user_index: self.data.user_index_canister_id,
group_index: self.data.group_index_canister_id,
Expand Down Expand Up @@ -474,8 +472,9 @@ struct Data {
expiring_member_actions: ExpiringMemberActions,
user_cache: UserCache,
user_event_sync_queue: GroupedTimerJobQueue<UserEventBatch>,
stable_memory_event_migration_complete: bool,
stable_memory_keys_to_garbage_collect: Vec<KeyPrefix>,
#[serde(default)]
members_migrated_to_stable_memory: bool,
stable_memory_keys_to_garbage_collect: Vec<Vec<u8>>,
}

fn init_instruction_counts_log() -> InstructionCountsLog {
Expand Down Expand Up @@ -573,8 +572,8 @@ impl Data {
expiring_member_actions: ExpiringMemberActions::default(),
user_cache: UserCache::default(),
user_event_sync_queue: GroupedTimerJobQueue::new(5, true),
stable_memory_event_migration_complete: true,
stable_memory_keys_to_garbage_collect: Vec::new(),
members_migrated_to_stable_memory: true,
}
}

Expand Down Expand Up @@ -730,7 +729,7 @@ pub struct Metrics {
pub serialized_chat_state_bytes: u64,
pub event_store_client_info: EventStoreClientInfo,
pub timer_jobs: u32,
pub stable_memory_event_migration_complete: bool,
pub members_migrated_to_stable_memory: bool,
pub canister_ids: CanisterIds,
}

Expand Down
6 changes: 6 additions & 0 deletions backend/canisters/group/impl/src/lifecycle/post_upgrade.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use crate::lifecycle::{init_env, init_state};
use crate::memory::{get_stable_memory_map_memory, get_upgrades_memory};
use crate::timer_job_types::MigrateMembersToStableMemoryJob;
use crate::{read_state, Data};
use canister_logger::LogEntry;
use canister_timer_jobs::Job;
use canister_tracing_macros::trace;
use group_canister::post_upgrade::Args;
use ic_cdk::post_upgrade;
use instruction_counts_log::InstructionCountFunctionId;
use stable_memory::get_reader;
use tracing::info;
use types::MultiUserChat;

#[post_upgrade]
#[trace]
Expand All @@ -21,6 +24,7 @@ fn post_upgrade(args: Args) {
msgpack::deserialize(reader).unwrap();

data.chat.members.set_member_default_timestamps();
data.chat.members.set_chat(MultiUserChat::Group(ic_cdk::id().into()));

canister_logger::init_with_logs(data.test_mode, errors, logs, traces);

Expand All @@ -35,4 +39,6 @@ fn post_upgrade(args: Args) {
.data
.record_instructions_count(InstructionCountFunctionId::PostUpgrade, now)
});

MigrateMembersToStableMemoryJob.execute();
}
Loading

0 comments on commit cb51d2e

Please sign in to comment.