Skip to content

Commit

Permalink
Temporarily reinstate MigrateMembersToStableMemory job to fix upgra…
Browse files Browse the repository at this point in the history
…de (#7007)
  • Loading branch information
hpeebles authored Dec 6, 2024
1 parent 24e76af commit 17f0efe
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 9 deletions.
4 changes: 4 additions & 0 deletions backend/canisters/community/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [unreleased]

### Fixed

- Temporarily reinstate `MigrateMembersToStableMemory` job to fix upgrade ([#7007](https://github.com/open-chat-labs/open-chat/pull/7007))

## [[2.0.1500](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1500-community)] - 2024-12-06

### Added
Expand Down
21 changes: 20 additions & 1 deletion backend/canisters/community/impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use msgpack::serialize_then_unwrap;
use notifications_canister::c2c_push_notification;
use rand::rngs::StdRng;
use rand::RngCore;
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Deserializer, Serialize};
use serde_bytes::ByteBuf;
use stable_memory_map::{ChatEventKeyPrefix, KeyPrefix};
use std::cell::RefCell;
Expand Down Expand Up @@ -300,6 +300,7 @@ impl RuntimeState {
instruction_counts: self.data.instruction_counts_log.iter().collect(),
event_store_client_info: self.data.event_store_client.info(),
timer_jobs: self.data.timer_jobs.len() as u32,
members_migrated_to_stable_memory: self.data.members_migrated_to_stable_memory,
stable_memory_sizes: memory::memory_sizes(),
canister_ids: CanisterIds {
user_index: self.data.user_index_canister_id,
Expand All @@ -322,14 +323,22 @@ fn init_instruction_counts_log() -> InstructionCountsLog {

#[derive(Serialize, Deserialize)]
struct Data {
#[serde(deserialize_with = "deserialize_to_timestamped")]
is_public: Timestamped<bool>,
#[serde(deserialize_with = "deserialize_to_timestamped")]
name: Timestamped<String>,
#[serde(deserialize_with = "deserialize_to_timestamped")]
description: Timestamped<String>,
#[serde(deserialize_with = "deserialize_to_timestamped")]
rules: Timestamped<AccessRulesInternal>,
#[serde(deserialize_with = "deserialize_to_timestamped")]
avatar: Timestamped<Option<Document>>,
#[serde(deserialize_with = "deserialize_to_timestamped")]
banner: Timestamped<Option<Document>>,
#[serde(deserialize_with = "deserialize_to_timestamped")]
permissions: Timestamped<CommunityPermissions>,
gate_config: Timestamped<Option<AccessGateConfigInternal>>,
#[serde(deserialize_with = "deserialize_to_timestamped")]
primary_language: Timestamped<String>,
user_index_canister_id: CanisterId,
local_user_index_canister_id: CanisterId,
Expand All @@ -344,7 +353,9 @@ struct Data {
channels: Channels,
events: CommunityEvents,
invited_users: InvitedUsers,
#[serde(deserialize_with = "deserialize_to_timestamped")]
invite_code: Timestamped<Option<u64>>,
#[serde(deserialize_with = "deserialize_to_timestamped")]
invite_code_enabled: Timestamped<bool>,
frozen: Timestamped<Option<FrozenGroupInfo>>,
timer_jobs: TimerJobs<TimerJob>,
Expand All @@ -369,6 +380,7 @@ struct Data {
user_cache: UserCache,
user_event_sync_queue: GroupedTimerJobQueue<UserEventBatch>,
stable_memory_keys_to_garbage_collect: Vec<KeyPrefix>,
members_migrated_to_stable_memory: bool,
bot_permissions: BTreeMap<UserId, SlashCommandPermissions>,
}

Expand Down Expand Up @@ -472,6 +484,7 @@ impl Data {
user_cache: UserCache::default(),
user_event_sync_queue: GroupedTimerJobQueue::new(5, true),
stable_memory_keys_to_garbage_collect: Vec::new(),
members_migrated_to_stable_memory: true,
bot_permissions: BTreeMap::new(),
}
}
Expand Down Expand Up @@ -896,6 +909,7 @@ pub struct Metrics {
pub instruction_counts: Vec<InstructionCountEntry>,
pub event_store_client_info: EventStoreClientInfo,
pub timer_jobs: u32,
pub members_migrated_to_stable_memory: bool,
pub stable_memory_sizes: BTreeMap<u8, u64>,
pub canister_ids: CanisterIds,
}
Expand All @@ -921,3 +935,8 @@ pub struct AddUsersToChannelResult {
pub users_already_in_channel: Vec<UserId>,
pub users_limit_reached: Vec<UserId>,
}

fn deserialize_to_timestamped<'de, D: Deserializer<'de>, T: Deserialize<'de>>(d: D) -> Result<Timestamped<T>, D::Error> {
let value = T::deserialize(d)?;
Ok(Timestamped::new(value, canister_time::now_millis()))
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ fn post_upgrade(args: Args) {
let memory = get_upgrades_memory();
let reader = get_reader(&memory);

let (data, errors, logs, traces): (Data, Vec<LogEntry>, Vec<LogEntry>, Vec<LogEntry>) =
let (mut data, errors, logs, traces): (Data, Vec<LogEntry>, Vec<LogEntry>, Vec<LogEntry>) =
msgpack::deserialize(reader).unwrap();

assert!(data.members_migrated_to_stable_memory);

canister_logger::init_with_logs(data.test_mode, errors, logs, traces);

data.events.migrate_to_stable_memory();

let env = init_env(data.rng_seed);
init_state(env, data, args.wasm_version);

Expand Down
151 changes: 145 additions & 6 deletions backend/canisters/community/impl/src/model/events.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,82 @@
use crate::model::events::stable_memory::EventsStableStorage;
use chat_events::GroupGateUpdatedInternal;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use tracing::info;
use types::{
AvatarChanged, BannerChanged, BotAdded, BotRemoved, ChannelDeleted, ChannelId, ChatId, CommunityMembersRemoved,
CommunityPermissionsChanged, CommunityRoleChanged, CommunityUsersBlocked, CommunityVisibilityChanged, EventIndex,
EventWrapperInternal, GroupCreated, GroupDescriptionChanged, GroupFrozen, GroupInviteCodeChanged, GroupNameChanged,
GroupRulesChanged, GroupUnfrozen, PrimaryLanguageChanged, TimestampMillis, UserId, UsersInvited, UsersUnblocked,
AvatarChanged, BannerChanged, BotAdded, BotRemoved, ChannelDeleted, ChannelId, ChatId, CommunityMemberLeftInternal,
CommunityMembersRemoved, CommunityPermissionsChanged, CommunityRoleChanged, CommunityUsersBlocked,
CommunityVisibilityChanged, DefaultChannelsChanged, EventIndex, EventWrapperInternal, GroupCreated,
GroupDescriptionChanged, GroupFrozen, GroupInviteCodeChanged, GroupNameChanged, GroupRulesChanged, GroupUnfrozen,
MemberJoinedInternal, PrimaryLanguageChanged, TimestampMillis, UserId, UsersInvited, UsersUnblocked,
};

mod stable_memory;

#[derive(Serialize, Deserialize)]
#[serde(from = "CommunityEventsPrevious")]
pub struct CommunityEvents {
events_map: BTreeMap<EventIndex, EventWrapperInternal<CommunityEventInternal>>,
stable_events_map: EventsStableStorage,
latest_event_index: EventIndex,
latest_event_timestamp: TimestampMillis,
}

#[derive(Serialize, Deserialize)]
pub struct CommunityEventsPrevious {
events_map: BTreeMap<EventIndex, EventWrapperInternal<CommunityEventInternalOld>>,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum CommunityEventInternalOld {
#[serde(rename = "cr", alias = "Created")]
Created(Box<GroupCreated>),
#[serde(rename = "nc", alias = "NameChanged")]
NameChanged(Box<GroupNameChanged>),
#[serde(rename = "dc", alias = "DescriptionChanged")]
DescriptionChanged(Box<GroupDescriptionChanged>),
#[serde(rename = "rc", alias = "RulesChanged")]
RulesChanged(Box<GroupRulesChanged>),
#[serde(rename = "ac", alias = "AvatarChanged")]
AvatarChanged(Box<AvatarChanged>),
#[serde(rename = "bc", alias = "BannerChanged")]
BannerChanged(Box<BannerChanged>),
#[serde(rename = "ui", alias = "UsersInvited")]
UsersInvited(Box<UsersInvited>),
#[serde(rename = "mj", alias = "MemberJoined")]
MemberJoined(Box<MemberJoinedInternal>),
#[serde(rename = "mr", alias = "MembersRemoved")]
MembersRemoved(Box<CommunityMembersRemoved>),
#[serde(rename = "ml", alias = "MemberLeft")]
MemberLeft(Box<CommunityMemberLeftInternal>),
#[serde(rename = "rc", alias = "RoleChanged")]
RoleChanged(Box<CommunityRoleChanged>),
#[serde(rename = "ub", alias = "UsersBlocked")]
UsersBlocked(Box<CommunityUsersBlocked>),
#[serde(rename = "uu", alias = "UsersUnblocked")]
UsersUnblocked(Box<UsersUnblocked>),
#[serde(rename = "pc", alias = "PermissionsChanged")]
PermissionsChanged(Box<CommunityPermissionsChanged>),
#[serde(rename = "vc", alias = "VisibilityChanged")]
VisibilityChanged(Box<CommunityVisibilityChanged>),
#[serde(rename = "ic", alias = "InviteCodeChanged")]
InviteCodeChanged(Box<GroupInviteCodeChanged>),
#[serde(rename = "fr", alias = "Frozen")]
Frozen(Box<GroupFrozen>),
#[serde(rename = "uf", alias = "Unfrozen")]
Unfrozen(Box<GroupUnfrozen>),
#[serde(rename = "gu", alias = "GateUpdated")]
GateUpdated(Box<GroupGateUpdatedInternal>),
#[serde(rename = "cd", alias = "ChannelDeleted")]
ChannelDeleted(Box<ChannelDeleted>),
#[serde(rename = "dcc", alias = "DefaultChannelsChanged")]
DefaultChannelsChanged(Box<DefaultChannelsChanged>),
#[serde(rename = "pl", alias = "PrimaryLanguageChanged")]
PrimaryLanguageChanged(Box<PrimaryLanguageChanged>),
#[serde(rename = "gi", alias = "GroupImported")]
GroupImported(Box<GroupImportedInternal>),
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum CommunityEventInternal {
#[serde(rename = "cr", alias = "Created")]
Expand Down Expand Up @@ -66,6 +126,14 @@ pub enum CommunityEventInternal {
}

impl CommunityEvents {
pub fn migrate_to_stable_memory(&mut self) {
for event in self.events_map.values() {
self.stable_events_map.insert(event.clone());
}
let count = self.events_map.len();
info!(count, "Community events migrated to stable memory");
}

pub fn new(name: String, description: String, created_by: UserId, now: TimestampMillis) -> CommunityEvents {
let event_index = EventIndex::default();
let event = EventWrapperInternal {
Expand All @@ -80,10 +148,13 @@ impl CommunityEvents {
})),
};

let mut events_map = BTreeMap::new();
events_map.insert(event_index, event.clone());
let mut stable_events_map = EventsStableStorage::default();
stable_events_map.insert(event);

CommunityEvents {
events_map,
stable_events_map,
latest_event_index: event_index,
latest_event_timestamp: now,
Expand All @@ -92,13 +163,16 @@ impl CommunityEvents {

pub(crate) fn push_event(&mut self, event: CommunityEventInternal, now: TimestampMillis) -> EventIndex {
let event_index = self.next_event_index();
self.stable_events_map.insert(EventWrapperInternal {
let event = EventWrapperInternal {
index: event_index,
timestamp: now,
correlation_id: 0,
expires_at: None,
event,
});
};

self.events_map.insert(event_index, event.clone());
self.stable_events_map.insert(event);

self.latest_event_index = event_index;
self.latest_event_timestamp = now;
Expand All @@ -125,3 +199,68 @@ pub struct GroupImportedInternal {
pub channel_id: ChannelId,
pub members_added: Vec<UserId>,
}

impl TryFrom<CommunityEventInternalOld> for CommunityEventInternal {
type Error = ();

fn try_from(value: CommunityEventInternalOld) -> Result<Self, Self::Error> {
match value {
CommunityEventInternalOld::Created(e) => Ok(CommunityEventInternal::Created(e)),
CommunityEventInternalOld::NameChanged(e) => Ok(CommunityEventInternal::NameChanged(e)),
CommunityEventInternalOld::DescriptionChanged(e) => Ok(CommunityEventInternal::DescriptionChanged(e)),
CommunityEventInternalOld::RulesChanged(e) => Ok(CommunityEventInternal::RulesChanged(e)),
CommunityEventInternalOld::AvatarChanged(e) => Ok(CommunityEventInternal::AvatarChanged(e)),
CommunityEventInternalOld::BannerChanged(e) => Ok(CommunityEventInternal::BannerChanged(e)),
CommunityEventInternalOld::UsersInvited(e) => Ok(CommunityEventInternal::UsersInvited(e)),
CommunityEventInternalOld::MembersRemoved(e) => Ok(CommunityEventInternal::MembersRemoved(e)),
CommunityEventInternalOld::RoleChanged(e) => Ok(CommunityEventInternal::RoleChanged(e)),
CommunityEventInternalOld::UsersBlocked(e) => Ok(CommunityEventInternal::UsersBlocked(e)),
CommunityEventInternalOld::UsersUnblocked(e) => Ok(CommunityEventInternal::UsersUnblocked(e)),
CommunityEventInternalOld::PermissionsChanged(e) => Ok(CommunityEventInternal::PermissionsChanged(e)),
CommunityEventInternalOld::VisibilityChanged(e) => Ok(CommunityEventInternal::VisibilityChanged(e)),
CommunityEventInternalOld::InviteCodeChanged(e) => Ok(CommunityEventInternal::InviteCodeChanged(e)),
CommunityEventInternalOld::Frozen(e) => Ok(CommunityEventInternal::Frozen(e)),
CommunityEventInternalOld::Unfrozen(e) => Ok(CommunityEventInternal::Unfrozen(e)),
CommunityEventInternalOld::GateUpdated(e) => Ok(CommunityEventInternal::GateUpdated(e)),
CommunityEventInternalOld::ChannelDeleted(e) => Ok(CommunityEventInternal::ChannelDeleted(e)),
CommunityEventInternalOld::PrimaryLanguageChanged(e) => Ok(CommunityEventInternal::PrimaryLanguageChanged(e)),
CommunityEventInternalOld::GroupImported(e) => Ok(CommunityEventInternal::GroupImported(e)),
CommunityEventInternalOld::MemberJoined(_)
| CommunityEventInternalOld::MemberLeft(_)
| CommunityEventInternalOld::DefaultChannelsChanged(_) => Err(()),
}
}
}

impl From<CommunityEventsPrevious> for CommunityEvents {
fn from(value: CommunityEventsPrevious) -> Self {
let mut events_map = BTreeMap::new();
let mut index = EventIndex::default();
for old_event in value.events_map.into_values() {
if let Ok(new_event) = CommunityEventInternal::try_from(old_event.event) {
events_map.insert(
index,
EventWrapperInternal {
index,
timestamp: old_event.timestamp,
correlation_id: 0,
expires_at: None,
event: new_event,
},
);
index = index.incr();
}
}

let last = events_map.values().last().unwrap();
let latest_event_index = last.index;
let latest_event_timestamp = last.timestamp;

CommunityEvents {
events_map,
stable_events_map: EventsStableStorage::default(),
latest_event_index,
latest_event_timestamp,
}
}
}
2 changes: 2 additions & 0 deletions backend/canisters/community/impl/src/model/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const MAX_MEMBERS_PER_COMMUNITY: u32 = 100_000;

#[derive(Serialize, Deserialize)]
pub struct CommunityMembers {
#[serde(alias = "stable_memory_members_map")]
members_map: MembersStableStorage,
member_channel_links: BTreeSet<(UserId, ChannelId)>,
member_channel_links_removed: BTreeMap<(UserId, ChannelId), TimestampMillis>,
Expand All @@ -35,6 +36,7 @@ pub struct CommunityMembers {
members_with_display_names: BTreeSet<UserId>,
members_with_referrals: BTreeSet<UserId>,
updates: BTreeSet<(TimestampMillis, UserId, MemberUpdate)>,
#[serde(default)]
latest_update_removed: TimestampMillis,
}

Expand Down
13 changes: 12 additions & 1 deletion backend/canisters/community/impl/src/timer_job_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use chat_events::MessageContentInternal;
use constants::{DAY_IN_MS, MINUTE_IN_MS, NANOS_PER_MILLISECOND, SECOND_IN_MS};
use ledger_utils::process_transaction;
use serde::{Deserialize, Serialize};
use tracing::error;
use tracing::{error, info};
use types::{
BlobReference, CanisterId, ChannelId, ChatId, MessageId, MessageIndex, P2PSwapStatus, PendingCryptoTransaction, UserId,
};
Expand All @@ -27,6 +27,7 @@ pub enum TimerJob {
CancelP2PSwapInEscrowCanister(CancelP2PSwapInEscrowCanisterJob),
MarkP2PSwapExpired(MarkP2PSwapExpiredJob),
MarkVideoCallEnded(MarkVideoCallEndedJob),
MigrateMembersToStableMemory(MigrateMembersToStableMemoryJob),
}

#[derive(Serialize, Deserialize, Clone)]
Expand Down Expand Up @@ -137,6 +138,9 @@ pub struct MarkP2PSwapExpiredJob {
#[derive(Serialize, Deserialize, Clone)]
pub struct MarkVideoCallEndedJob(pub community_canister::end_video_call::Args);

#[derive(Serialize, Deserialize, Clone)]
pub struct MigrateMembersToStableMemoryJob;

impl Job for TimerJob {
fn execute(self) {
if can_borrow_state() {
Expand All @@ -157,6 +161,7 @@ impl Job for TimerJob {
TimerJob::CancelP2PSwapInEscrowCanister(job) => job.execute(),
TimerJob::MarkP2PSwapExpired(job) => job.execute(),
TimerJob::MarkVideoCallEnded(job) => job.execute(),
TimerJob::MigrateMembersToStableMemory(job) => job.execute(),
}
}
}
Expand Down Expand Up @@ -450,3 +455,9 @@ impl Job for MarkVideoCallEndedJob {
mutate_state(|state| end_video_call_impl(self.0, state));
}
}

impl Job for MigrateMembersToStableMemoryJob {
fn execute(self) {
info!("MigrateMembersToStableMemoryJob executed")
}
}
1 change: 1 addition & 0 deletions backend/libraries/group_chat_core/src/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const MAX_MEMBERS_PER_GROUP: u32 = 100_000;

#[derive(Serialize, Deserialize)]
pub struct GroupMembers {
#[serde(alias = "stable_memory_members_map")]
members_map: MembersStableStorage,
member_ids: BTreeSet<UserId>,
owners: BTreeSet<UserId>,
Expand Down

0 comments on commit 17f0efe

Please sign in to comment.