diff --git a/backend/canisters/community/CHANGELOG.md b/backend/canisters/community/CHANGELOG.md index eed1981373..fee77ceca9 100644 --- a/backend/canisters/community/CHANGELOG.md +++ b/backend/canisters/community/CHANGELOG.md @@ -6,6 +6,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ## [unreleased] +### Fixed + +- Temporarily reinstate `MigrateMembersToStableMemory` job to fix upgrade ([#7007](https://github.com/open-chat-labs/open-chat/pull/7007)) + ## [[2.0.1500](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1500-community)] - 2024-12-06 ### Added diff --git a/backend/canisters/community/impl/src/lib.rs b/backend/canisters/community/impl/src/lib.rs index 2b2bd620ee..ac4f520409 100644 --- a/backend/canisters/community/impl/src/lib.rs +++ b/backend/canisters/community/impl/src/lib.rs @@ -28,7 +28,7 @@ use msgpack::serialize_then_unwrap; use notifications_canister::c2c_push_notification; use rand::rngs::StdRng; use rand::RngCore; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize}; use serde_bytes::ByteBuf; use stable_memory_map::{ChatEventKeyPrefix, KeyPrefix}; use std::cell::RefCell; @@ -300,6 +300,7 @@ impl RuntimeState { instruction_counts: self.data.instruction_counts_log.iter().collect(), event_store_client_info: self.data.event_store_client.info(), timer_jobs: self.data.timer_jobs.len() as u32, + members_migrated_to_stable_memory: self.data.members_migrated_to_stable_memory, stable_memory_sizes: memory::memory_sizes(), canister_ids: CanisterIds { user_index: self.data.user_index_canister_id, @@ -322,14 +323,22 @@ fn init_instruction_counts_log() -> InstructionCountsLog { #[derive(Serialize, Deserialize)] struct Data { + #[serde(deserialize_with = "deserialize_to_timestamped")] is_public: Timestamped, + #[serde(deserialize_with = "deserialize_to_timestamped")] name: Timestamped, + #[serde(deserialize_with = "deserialize_to_timestamped")] description: Timestamped, + #[serde(deserialize_with = "deserialize_to_timestamped")] rules: Timestamped, + #[serde(deserialize_with = "deserialize_to_timestamped")] avatar: Timestamped>, + #[serde(deserialize_with = "deserialize_to_timestamped")] banner: Timestamped>, + #[serde(deserialize_with = "deserialize_to_timestamped")] permissions: Timestamped, gate_config: Timestamped>, + #[serde(deserialize_with = "deserialize_to_timestamped")] primary_language: Timestamped, user_index_canister_id: CanisterId, local_user_index_canister_id: CanisterId, @@ -344,7 +353,9 @@ struct Data { channels: Channels, events: CommunityEvents, invited_users: InvitedUsers, + #[serde(deserialize_with = "deserialize_to_timestamped")] invite_code: Timestamped>, + #[serde(deserialize_with = "deserialize_to_timestamped")] invite_code_enabled: Timestamped, frozen: Timestamped>, timer_jobs: TimerJobs, @@ -369,6 +380,7 @@ struct Data { user_cache: UserCache, user_event_sync_queue: GroupedTimerJobQueue, stable_memory_keys_to_garbage_collect: Vec, + members_migrated_to_stable_memory: bool, bot_permissions: BTreeMap, } @@ -472,6 +484,7 @@ impl Data { user_cache: UserCache::default(), user_event_sync_queue: GroupedTimerJobQueue::new(5, true), stable_memory_keys_to_garbage_collect: Vec::new(), + members_migrated_to_stable_memory: true, bot_permissions: BTreeMap::new(), } } @@ -896,6 +909,7 @@ pub struct Metrics { pub instruction_counts: Vec, pub event_store_client_info: EventStoreClientInfo, pub timer_jobs: u32, + pub members_migrated_to_stable_memory: bool, pub stable_memory_sizes: BTreeMap, pub canister_ids: CanisterIds, } @@ -921,3 +935,8 @@ pub struct AddUsersToChannelResult { pub users_already_in_channel: Vec, pub users_limit_reached: Vec, } + +fn deserialize_to_timestamped<'de, D: Deserializer<'de>, T: Deserialize<'de>>(d: D) -> Result, D::Error> { + let value = T::deserialize(d)?; + Ok(Timestamped::new(value, canister_time::now_millis())) +} diff --git a/backend/canisters/community/impl/src/lifecycle/post_upgrade.rs b/backend/canisters/community/impl/src/lifecycle/post_upgrade.rs index 3ea0940d6f..f0989508dc 100644 --- a/backend/canisters/community/impl/src/lifecycle/post_upgrade.rs +++ b/backend/canisters/community/impl/src/lifecycle/post_upgrade.rs @@ -18,11 +18,15 @@ fn post_upgrade(args: Args) { let memory = get_upgrades_memory(); let reader = get_reader(&memory); - let (data, errors, logs, traces): (Data, Vec, Vec, Vec) = + let (mut data, errors, logs, traces): (Data, Vec, Vec, Vec) = msgpack::deserialize(reader).unwrap(); + assert!(data.members_migrated_to_stable_memory); + canister_logger::init_with_logs(data.test_mode, errors, logs, traces); + data.events.migrate_to_stable_memory(); + let env = init_env(data.rng_seed); init_state(env, data, args.wasm_version); diff --git a/backend/canisters/community/impl/src/model/events.rs b/backend/canisters/community/impl/src/model/events.rs index 7e924ee1a9..42cda69cb5 100644 --- a/backend/canisters/community/impl/src/model/events.rs +++ b/backend/canisters/community/impl/src/model/events.rs @@ -1,22 +1,82 @@ use crate::model::events::stable_memory::EventsStableStorage; use chat_events::GroupGateUpdatedInternal; use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; +use tracing::info; use types::{ - AvatarChanged, BannerChanged, BotAdded, BotRemoved, ChannelDeleted, ChannelId, ChatId, CommunityMembersRemoved, - CommunityPermissionsChanged, CommunityRoleChanged, CommunityUsersBlocked, CommunityVisibilityChanged, EventIndex, - EventWrapperInternal, GroupCreated, GroupDescriptionChanged, GroupFrozen, GroupInviteCodeChanged, GroupNameChanged, - GroupRulesChanged, GroupUnfrozen, PrimaryLanguageChanged, TimestampMillis, UserId, UsersInvited, UsersUnblocked, + AvatarChanged, BannerChanged, BotAdded, BotRemoved, ChannelDeleted, ChannelId, ChatId, CommunityMemberLeftInternal, + CommunityMembersRemoved, CommunityPermissionsChanged, CommunityRoleChanged, CommunityUsersBlocked, + CommunityVisibilityChanged, DefaultChannelsChanged, EventIndex, EventWrapperInternal, GroupCreated, + GroupDescriptionChanged, GroupFrozen, GroupInviteCodeChanged, GroupNameChanged, GroupRulesChanged, GroupUnfrozen, + MemberJoinedInternal, PrimaryLanguageChanged, TimestampMillis, UserId, UsersInvited, UsersUnblocked, }; mod stable_memory; #[derive(Serialize, Deserialize)] +#[serde(from = "CommunityEventsPrevious")] pub struct CommunityEvents { + events_map: BTreeMap>, stable_events_map: EventsStableStorage, latest_event_index: EventIndex, latest_event_timestamp: TimestampMillis, } +#[derive(Serialize, Deserialize)] +pub struct CommunityEventsPrevious { + events_map: BTreeMap>, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub enum CommunityEventInternalOld { + #[serde(rename = "cr", alias = "Created")] + Created(Box), + #[serde(rename = "nc", alias = "NameChanged")] + NameChanged(Box), + #[serde(rename = "dc", alias = "DescriptionChanged")] + DescriptionChanged(Box), + #[serde(rename = "rc", alias = "RulesChanged")] + RulesChanged(Box), + #[serde(rename = "ac", alias = "AvatarChanged")] + AvatarChanged(Box), + #[serde(rename = "bc", alias = "BannerChanged")] + BannerChanged(Box), + #[serde(rename = "ui", alias = "UsersInvited")] + UsersInvited(Box), + #[serde(rename = "mj", alias = "MemberJoined")] + MemberJoined(Box), + #[serde(rename = "mr", alias = "MembersRemoved")] + MembersRemoved(Box), + #[serde(rename = "ml", alias = "MemberLeft")] + MemberLeft(Box), + #[serde(rename = "rc", alias = "RoleChanged")] + RoleChanged(Box), + #[serde(rename = "ub", alias = "UsersBlocked")] + UsersBlocked(Box), + #[serde(rename = "uu", alias = "UsersUnblocked")] + UsersUnblocked(Box), + #[serde(rename = "pc", alias = "PermissionsChanged")] + PermissionsChanged(Box), + #[serde(rename = "vc", alias = "VisibilityChanged")] + VisibilityChanged(Box), + #[serde(rename = "ic", alias = "InviteCodeChanged")] + InviteCodeChanged(Box), + #[serde(rename = "fr", alias = "Frozen")] + Frozen(Box), + #[serde(rename = "uf", alias = "Unfrozen")] + Unfrozen(Box), + #[serde(rename = "gu", alias = "GateUpdated")] + GateUpdated(Box), + #[serde(rename = "cd", alias = "ChannelDeleted")] + ChannelDeleted(Box), + #[serde(rename = "dcc", alias = "DefaultChannelsChanged")] + DefaultChannelsChanged(Box), + #[serde(rename = "pl", alias = "PrimaryLanguageChanged")] + PrimaryLanguageChanged(Box), + #[serde(rename = "gi", alias = "GroupImported")] + GroupImported(Box), +} + #[derive(Serialize, Deserialize, Clone, Debug)] pub enum CommunityEventInternal { #[serde(rename = "cr", alias = "Created")] @@ -66,6 +126,14 @@ pub enum CommunityEventInternal { } impl CommunityEvents { + pub fn migrate_to_stable_memory(&mut self) { + for event in self.events_map.values() { + self.stable_events_map.insert(event.clone()); + } + let count = self.events_map.len(); + info!(count, "Community events migrated to stable memory"); + } + pub fn new(name: String, description: String, created_by: UserId, now: TimestampMillis) -> CommunityEvents { let event_index = EventIndex::default(); let event = EventWrapperInternal { @@ -80,10 +148,13 @@ impl CommunityEvents { })), }; + let mut events_map = BTreeMap::new(); + events_map.insert(event_index, event.clone()); let mut stable_events_map = EventsStableStorage::default(); stable_events_map.insert(event); CommunityEvents { + events_map, stable_events_map, latest_event_index: event_index, latest_event_timestamp: now, @@ -92,13 +163,16 @@ impl CommunityEvents { pub(crate) fn push_event(&mut self, event: CommunityEventInternal, now: TimestampMillis) -> EventIndex { let event_index = self.next_event_index(); - self.stable_events_map.insert(EventWrapperInternal { + let event = EventWrapperInternal { index: event_index, timestamp: now, correlation_id: 0, expires_at: None, event, - }); + }; + + self.events_map.insert(event_index, event.clone()); + self.stable_events_map.insert(event); self.latest_event_index = event_index; self.latest_event_timestamp = now; @@ -125,3 +199,68 @@ pub struct GroupImportedInternal { pub channel_id: ChannelId, pub members_added: Vec, } + +impl TryFrom for CommunityEventInternal { + type Error = (); + + fn try_from(value: CommunityEventInternalOld) -> Result { + match value { + CommunityEventInternalOld::Created(e) => Ok(CommunityEventInternal::Created(e)), + CommunityEventInternalOld::NameChanged(e) => Ok(CommunityEventInternal::NameChanged(e)), + CommunityEventInternalOld::DescriptionChanged(e) => Ok(CommunityEventInternal::DescriptionChanged(e)), + CommunityEventInternalOld::RulesChanged(e) => Ok(CommunityEventInternal::RulesChanged(e)), + CommunityEventInternalOld::AvatarChanged(e) => Ok(CommunityEventInternal::AvatarChanged(e)), + CommunityEventInternalOld::BannerChanged(e) => Ok(CommunityEventInternal::BannerChanged(e)), + CommunityEventInternalOld::UsersInvited(e) => Ok(CommunityEventInternal::UsersInvited(e)), + CommunityEventInternalOld::MembersRemoved(e) => Ok(CommunityEventInternal::MembersRemoved(e)), + CommunityEventInternalOld::RoleChanged(e) => Ok(CommunityEventInternal::RoleChanged(e)), + CommunityEventInternalOld::UsersBlocked(e) => Ok(CommunityEventInternal::UsersBlocked(e)), + CommunityEventInternalOld::UsersUnblocked(e) => Ok(CommunityEventInternal::UsersUnblocked(e)), + CommunityEventInternalOld::PermissionsChanged(e) => Ok(CommunityEventInternal::PermissionsChanged(e)), + CommunityEventInternalOld::VisibilityChanged(e) => Ok(CommunityEventInternal::VisibilityChanged(e)), + CommunityEventInternalOld::InviteCodeChanged(e) => Ok(CommunityEventInternal::InviteCodeChanged(e)), + CommunityEventInternalOld::Frozen(e) => Ok(CommunityEventInternal::Frozen(e)), + CommunityEventInternalOld::Unfrozen(e) => Ok(CommunityEventInternal::Unfrozen(e)), + CommunityEventInternalOld::GateUpdated(e) => Ok(CommunityEventInternal::GateUpdated(e)), + CommunityEventInternalOld::ChannelDeleted(e) => Ok(CommunityEventInternal::ChannelDeleted(e)), + CommunityEventInternalOld::PrimaryLanguageChanged(e) => Ok(CommunityEventInternal::PrimaryLanguageChanged(e)), + CommunityEventInternalOld::GroupImported(e) => Ok(CommunityEventInternal::GroupImported(e)), + CommunityEventInternalOld::MemberJoined(_) + | CommunityEventInternalOld::MemberLeft(_) + | CommunityEventInternalOld::DefaultChannelsChanged(_) => Err(()), + } + } +} + +impl From for CommunityEvents { + fn from(value: CommunityEventsPrevious) -> Self { + let mut events_map = BTreeMap::new(); + let mut index = EventIndex::default(); + for old_event in value.events_map.into_values() { + if let Ok(new_event) = CommunityEventInternal::try_from(old_event.event) { + events_map.insert( + index, + EventWrapperInternal { + index, + timestamp: old_event.timestamp, + correlation_id: 0, + expires_at: None, + event: new_event, + }, + ); + index = index.incr(); + } + } + + let last = events_map.values().last().unwrap(); + let latest_event_index = last.index; + let latest_event_timestamp = last.timestamp; + + CommunityEvents { + events_map, + stable_events_map: EventsStableStorage::default(), + latest_event_index, + latest_event_timestamp, + } + } +} diff --git a/backend/canisters/community/impl/src/model/members.rs b/backend/canisters/community/impl/src/model/members.rs index eb9c8dac40..fc60092b77 100644 --- a/backend/canisters/community/impl/src/model/members.rs +++ b/backend/canisters/community/impl/src/model/members.rs @@ -19,6 +19,7 @@ const MAX_MEMBERS_PER_COMMUNITY: u32 = 100_000; #[derive(Serialize, Deserialize)] pub struct CommunityMembers { + #[serde(alias = "stable_memory_members_map")] members_map: MembersStableStorage, member_channel_links: BTreeSet<(UserId, ChannelId)>, member_channel_links_removed: BTreeMap<(UserId, ChannelId), TimestampMillis>, @@ -35,6 +36,7 @@ pub struct CommunityMembers { members_with_display_names: BTreeSet, members_with_referrals: BTreeSet, updates: BTreeSet<(TimestampMillis, UserId, MemberUpdate)>, + #[serde(default)] latest_update_removed: TimestampMillis, } diff --git a/backend/canisters/community/impl/src/timer_job_types.rs b/backend/canisters/community/impl/src/timer_job_types.rs index 638e36d6c6..57b6f562d2 100644 --- a/backend/canisters/community/impl/src/timer_job_types.rs +++ b/backend/canisters/community/impl/src/timer_job_types.rs @@ -7,7 +7,7 @@ use chat_events::MessageContentInternal; use constants::{DAY_IN_MS, MINUTE_IN_MS, NANOS_PER_MILLISECOND, SECOND_IN_MS}; use ledger_utils::process_transaction; use serde::{Deserialize, Serialize}; -use tracing::error; +use tracing::{error, info}; use types::{ BlobReference, CanisterId, ChannelId, ChatId, MessageId, MessageIndex, P2PSwapStatus, PendingCryptoTransaction, UserId, }; @@ -27,6 +27,7 @@ pub enum TimerJob { CancelP2PSwapInEscrowCanister(CancelP2PSwapInEscrowCanisterJob), MarkP2PSwapExpired(MarkP2PSwapExpiredJob), MarkVideoCallEnded(MarkVideoCallEndedJob), + MigrateMembersToStableMemory(MigrateMembersToStableMemoryJob), } #[derive(Serialize, Deserialize, Clone)] @@ -137,6 +138,9 @@ pub struct MarkP2PSwapExpiredJob { #[derive(Serialize, Deserialize, Clone)] pub struct MarkVideoCallEndedJob(pub community_canister::end_video_call::Args); +#[derive(Serialize, Deserialize, Clone)] +pub struct MigrateMembersToStableMemoryJob; + impl Job for TimerJob { fn execute(self) { if can_borrow_state() { @@ -157,6 +161,7 @@ impl Job for TimerJob { TimerJob::CancelP2PSwapInEscrowCanister(job) => job.execute(), TimerJob::MarkP2PSwapExpired(job) => job.execute(), TimerJob::MarkVideoCallEnded(job) => job.execute(), + TimerJob::MigrateMembersToStableMemory(job) => job.execute(), } } } @@ -450,3 +455,9 @@ impl Job for MarkVideoCallEndedJob { mutate_state(|state| end_video_call_impl(self.0, state)); } } + +impl Job for MigrateMembersToStableMemoryJob { + fn execute(self) { + info!("MigrateMembersToStableMemoryJob executed") + } +} diff --git a/backend/libraries/group_chat_core/src/members.rs b/backend/libraries/group_chat_core/src/members.rs index 34fbfe183f..4a5dfce728 100644 --- a/backend/libraries/group_chat_core/src/members.rs +++ b/backend/libraries/group_chat_core/src/members.rs @@ -25,6 +25,7 @@ const MAX_MEMBERS_PER_GROUP: u32 = 100_000; #[derive(Serialize, Deserialize)] pub struct GroupMembers { + #[serde(alias = "stable_memory_members_map")] members_map: MembersStableStorage, member_ids: BTreeSet, owners: BTreeSet,