Skip to content

Commit

Permalink
Add Key and KeyPrefix traits to simplify adding additional key ty…
Browse files Browse the repository at this point in the history
…pes (#7013)
  • Loading branch information
hpeebles authored Dec 9, 2024
1 parent 473be17 commit 003c5d3
Show file tree
Hide file tree
Showing 16 changed files with 743 additions and 635 deletions.
13 changes: 5 additions & 8 deletions backend/canisters/community/impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use rand::rngs::StdRng;
use rand::RngCore;
use serde::{Deserialize, Deserializer, Serialize};
use serde_bytes::ByteBuf;
use stable_memory_map::{ChatEventKeyPrefix, KeyPrefix};
use stable_memory_map::{BaseKeyPrefix, ChatEventKeyPrefix};
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::ops::Deref;
Expand Down Expand Up @@ -243,12 +243,9 @@ impl RuntimeState {
}
final_prize_payments.extend(result.final_prize_payments);
for thread in result.threads {
self.data
.stable_memory_keys_to_garbage_collect
.push(KeyPrefix::from(ChatEventKeyPrefix::new_from_channel(
channel.id,
Some(thread.root_message_index),
)));
self.data.stable_memory_keys_to_garbage_collect.push(BaseKeyPrefix::from(
ChatEventKeyPrefix::new_from_channel(channel.id, Some(thread.root_message_index)),
));
}
}
jobs::garbage_collect_stable_memory::start_job_if_required(self);
Expand Down Expand Up @@ -379,7 +376,7 @@ struct Data {
expiring_member_actions: ExpiringMemberActions,
user_cache: UserCache,
user_event_sync_queue: GroupedTimerJobQueue<UserEventBatch>,
stable_memory_keys_to_garbage_collect: Vec<KeyPrefix>,
stable_memory_keys_to_garbage_collect: Vec<BaseKeyPrefix>,
members_migrated_to_stable_memory: bool,
#[serde(default)]
bots: GroupBots,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::model::events::CommunityEventInternal;
use candid::Deserialize;
use serde::Serialize;
use stable_memory_map::{with_map_mut, CommunityEventKeyPrefix};
use stable_memory_map::{with_map_mut, CommunityEventKeyPrefix, KeyPrefix};
use types::EventWrapperInternal;

#[derive(Serialize, Deserialize)]
Expand All @@ -19,7 +19,7 @@ impl Default for EventsStableStorage {

impl EventsStableStorage {
pub fn insert(&mut self, event: EventWrapperInternal<CommunityEventInternal>) {
with_map_mut(|m| m.insert(self.prefix.create_key(event.index).into(), event_to_bytes(event)));
with_map_mut(|m| m.insert(self.prefix.create_key(&event.index), event_to_bytes(event)));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::CommunityMemberInternal;
use candid::Deserialize;
use serde::Serialize;
use stable_memory_map::{with_map, with_map_mut, MemberKeyPrefix};
use stable_memory_map::{with_map, with_map_mut, KeyPrefix, MemberKeyPrefix};
use std::collections::BTreeSet;
use types::{is_default, CommunityRole, TimestampMillis, Timestamped, UserId, UserType, Version};

Expand All @@ -19,30 +19,29 @@ impl MembersStableStorage {

pub fn get(&self, user_id: &UserId) -> Option<CommunityMemberInternal> {
with_map(|m| {
m.get(&self.prefix.create_key(*user_id).into())
m.get(self.prefix.create_key(user_id))
.map(|v| bytes_to_member(&v).hydrate(*user_id))
})
}

pub fn insert(&mut self, member: CommunityMemberInternal) {
with_map_mut(|m| m.insert(self.prefix.create_key(member.user_id).into(), member_to_bytes(member.into())));
with_map_mut(|m| m.insert(self.prefix.create_key(&member.user_id), member_to_bytes(member.into())));
}

pub fn remove(&mut self, user_id: &UserId) -> Option<CommunityMemberInternal> {
with_map_mut(|m| {
m.remove(&self.prefix.create_key(*user_id).into())
m.remove(self.prefix.create_key(user_id))
.map(|v| bytes_to_member(&v).hydrate(*user_id))
})
}

#[cfg(test)]
pub fn all_members(&self) -> Vec<CommunityMemberInternal> {
use candid::Principal;
use stable_memory_map::{Key, MemberKey};
use stable_memory_map::Key;

with_map(|m| {
m.range(Key::from(self.prefix.create_key(Principal::from_slice(&[]).into()))..)
.map_while(|(k, v)| MemberKey::try_from(k).ok().map(|k| (k, v)))
m.range(self.prefix.create_key(&Principal::from_slice(&[]).into())..)
.take_while(|(k, _)| k.matches_prefix(&self.prefix))
.map(|(k, v)| bytes_to_member(&v).hydrate(k.user_id()))
.collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
use canister_api_macros::update;
use canister_tracing_macros::trace;
use community_canister::delete_channel::{Response::*, *};
use stable_memory_map::{ChatEventKeyPrefix, KeyPrefix, MemberKeyPrefix};
use stable_memory_map::{BaseKeyPrefix, ChatEventKeyPrefix, MemberKeyPrefix};
use types::{ChannelDeleted, ChannelId};

#[update(msgpack = true)]
Expand Down Expand Up @@ -54,13 +54,13 @@ fn delete_channel_impl(channel_id: ChannelId, state: &mut RuntimeState) -> Respo
state
.data
.stable_memory_keys_to_garbage_collect
.push(KeyPrefix::from(ChatEventKeyPrefix::new_from_channel(channel_id, None)));
.push(BaseKeyPrefix::from(ChatEventKeyPrefix::new_from_channel(channel_id, None)));

for message_index in channel.chat.events.thread_keys() {
state
.data
.stable_memory_keys_to_garbage_collect
.push(KeyPrefix::from(ChatEventKeyPrefix::new_from_channel(
.push(BaseKeyPrefix::from(ChatEventKeyPrefix::new_from_channel(
channel_id,
Some(message_index),
)));
Expand All @@ -69,7 +69,7 @@ fn delete_channel_impl(channel_id: ChannelId, state: &mut RuntimeState) -> Respo
state
.data
.stable_memory_keys_to_garbage_collect
.push(KeyPrefix::from(MemberKeyPrefix::new_from_channel(channel_id)));
.push(BaseKeyPrefix::from(MemberKeyPrefix::new_from_channel(channel_id)));

crate::jobs::garbage_collect_stable_memory::start_job_if_required(state);

Expand Down
6 changes: 3 additions & 3 deletions backend/canisters/group/impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use msgpack::serialize_then_unwrap;
use notifications_canister::c2c_push_notification;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use stable_memory_map::{ChatEventKeyPrefix, KeyPrefix};
use stable_memory_map::{BaseKeyPrefix, ChatEventKeyPrefix};
use std::cell::RefCell;
use std::collections::hash_map::Entry::{Occupied, Vacant};
use std::collections::{BTreeMap, HashMap, HashSet};
Expand Down Expand Up @@ -359,7 +359,7 @@ impl RuntimeState {
for thread in result.threads {
self.data
.stable_memory_keys_to_garbage_collect
.push(KeyPrefix::from(ChatEventKeyPrefix::new_from_group_chat(Some(
.push(BaseKeyPrefix::from(ChatEventKeyPrefix::new_from_group_chat(Some(
thread.root_message_index,
))));
}
Expand Down Expand Up @@ -470,7 +470,7 @@ struct Data {
expiring_member_actions: ExpiringMemberActions,
user_cache: UserCache,
user_event_sync_queue: GroupedTimerJobQueue<UserEventBatch>,
stable_memory_keys_to_garbage_collect: Vec<KeyPrefix>,
stable_memory_keys_to_garbage_collect: Vec<BaseKeyPrefix>,
}

fn init_instruction_counts_log() -> InstructionCountsLog {
Expand Down
4 changes: 2 additions & 2 deletions backend/canisters/user/impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use model::streak::Streak;
use notifications_canister::c2c_push_notification;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use stable_memory_map::KeyPrefix;
use stable_memory_map::BaseKeyPrefix;
use std::cell::RefCell;
use std::collections::{BTreeMap, HashSet};
use std::ops::Deref;
Expand Down Expand Up @@ -254,7 +254,7 @@ struct Data {
pub referred_by: Option<UserId>,
pub referrals: Referrals,
pub message_activity_events: MessageActivityEvents,
pub stable_memory_keys_to_garbage_collect: Vec<KeyPrefix>,
pub stable_memory_keys_to_garbage_collect: Vec<BaseKeyPrefix>,
}

impl Data {
Expand Down
17 changes: 8 additions & 9 deletions backend/canisters/user/impl/src/updates/delete_direct_chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::guards::caller_is_owner;
use crate::{mutate_state, run_regular_jobs, RuntimeState};
use canister_api_macros::update;
use canister_tracing_macros::trace;
use stable_memory_map::{ChatEventKeyPrefix, KeyPrefix};
use stable_memory_map::{BaseKeyPrefix, ChatEventKeyPrefix};
use user_canister::delete_direct_chat::{Response::*, *};

#[update(guard = "caller_is_owner", msgpack = true)]
Expand All @@ -23,16 +23,15 @@ fn delete_direct_chat_impl(args: Args, state: &mut RuntimeState) -> Response {
state
.data
.stable_memory_keys_to_garbage_collect
.push(KeyPrefix::from(ChatEventKeyPrefix::new_from_direct_chat(args.user_id, None)));
.push(BaseKeyPrefix::from(ChatEventKeyPrefix::new_from_direct_chat(
args.user_id,
None,
)));

for message_index in chat.events.thread_keys() {
state
.data
.stable_memory_keys_to_garbage_collect
.push(KeyPrefix::from(ChatEventKeyPrefix::new_from_direct_chat(
args.user_id,
Some(message_index),
)));
state.data.stable_memory_keys_to_garbage_collect.push(BaseKeyPrefix::from(
ChatEventKeyPrefix::new_from_direct_chat(args.user_id, Some(message_index)),
));
}

crate::jobs::garbage_collect_stable_memory::start_job_if_required(state);
Expand Down
36 changes: 15 additions & 21 deletions backend/libraries/chat_events/src/stable_memory/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{ChatEventInternal, EventsMap};
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use stable_memory_map::{with_map, with_map_mut, ChatEventKey, ChatEventKeyPrefix, Key};
use stable_memory_map::{with_map, with_map_mut, ChatEventKey, ChatEventKeyPrefix, KeyPrefix};
use std::cmp::min;
use std::collections::VecDeque;
use std::ops::RangeBounds;
Expand All @@ -15,16 +15,15 @@ mod tests;
// Used to efficiently read all events from stable memory when migrating a group into a community
pub fn read_events_as_bytes(chat: Chat, after: Option<EventContext>, max_bytes: usize) -> Vec<(EventContext, ByteBuf)> {
let key = match after {
None => ChatEventKeyPrefix::new_from_chat(chat, None).create_key(EventIndex::default()),
None => ChatEventKeyPrefix::new_from_chat(chat, None).create_key(&EventIndex::default()),
Some(EventContext {
thread_root_message_index,
event_index,
}) => ChatEventKeyPrefix::new_from_chat(chat, thread_root_message_index).create_key(event_index.incr()),
}) => ChatEventKeyPrefix::new_from_chat(chat, thread_root_message_index).create_key(&event_index.incr()),
};
with_map(|m| {
let mut total_bytes = 0;
m.range(Key::from(key)..)
.map_while(|(k, v)| ChatEventKey::try_from(k).ok().map(|k| (k, v)))
m.range(key..)
.take_while(|(k, v)| {
if !k.matches_chat(&chat) {
return false;
Expand All @@ -42,11 +41,11 @@ pub fn write_events_as_bytes(chat: Chat, events: Vec<(EventContext, ByteBuf)>) {
with_map_mut(|m| {
for (context, bytes) in events {
let prefix = ChatEventKeyPrefix::new_from_chat(chat, context.thread_root_message_index);
let key = prefix.create_key(context.event_index);
let key = prefix.create_key(&context.event_index);
let value = bytes.into_vec();
// Check the event is valid. We could remove this once we're more confident
let _ = bytes_to_event(&value);
m.insert(key.into(), value);
m.insert(key, value);
}
});
}
Expand Down Expand Up @@ -83,11 +82,6 @@ impl ChatEventsStableStorage {
};
Iter::new(prefix, start, end)
}

fn get_internal(&self, event_index: EventIndex) -> Option<Vec<u8>> {
let key = self.prefix.create_key(event_index);
with_map(|m| m.get(&Key::from(key)))
}
}

impl EventsMap for ChatEventsStableStorage {
Expand All @@ -96,15 +90,15 @@ impl EventsMap for ChatEventsStableStorage {
}

fn get(&self, event_index: EventIndex) -> Option<EventWrapperInternal<ChatEventInternal>> {
self.get_internal(event_index).map(|v| bytes_to_event(&v))
with_map(|m| m.get(self.prefix.create_key(&event_index))).map(|v| bytes_to_event(&v))
}

fn insert(&mut self, event: EventWrapperInternal<ChatEventInternal>) {
with_map_mut(|m| m.insert(Key::from(self.prefix.create_key(event.index)), event_to_bytes(event)));
with_map_mut(|m| m.insert(self.prefix.create_key(&event.index), event_to_bytes(event)));
}

fn remove(&mut self, event_index: EventIndex) -> Option<EventWrapperInternal<ChatEventInternal>> {
with_map_mut(|m| m.remove(&Key::from(self.prefix.create_key(event_index)))).map(|v| bytes_to_event(&v))
with_map_mut(|m| m.remove(self.prefix.create_key(&event_index))).map(|v| bytes_to_event(&v))
}

fn range<R: RangeBounds<EventIndex>>(
Expand Down Expand Up @@ -183,11 +177,11 @@ impl Iter {
}

fn next_key(&self) -> ChatEventKey {
self.prefix.create_key(self.next)
self.prefix.create_key(&self.next)
}

fn next_back_key(&self) -> ChatEventKey {
self.prefix.create_key(self.next_back)
self.prefix.create_key(&self.next_back)
}

fn check_buffer_direction(&mut self, forward: bool) {
Expand Down Expand Up @@ -227,8 +221,8 @@ impl Iterator for Iter {
self.check_buffer_direction(true);
if self.buffer.is_empty() {
self.buffer = with_map(|m| {
m.range(Key::from(self.next_key())..=Key::from(self.next_back_key()))
.map_while(|(k, v)| ChatEventKey::try_from(k).ok().map(|k| (k.event_index(), v)))
m.range(self.next_key()..=self.next_back_key())
.map(|(k, v)| (k.event_index(), v))
.take(self.next_buffer_size)
.collect()
});
Expand All @@ -252,9 +246,9 @@ impl DoubleEndedIterator for Iter {
self.check_buffer_direction(false);
if self.buffer.is_empty() {
self.buffer = with_map(|m| {
m.range(Key::from(self.next_key())..=Key::from(self.next_back_key()))
m.range(self.next_key()..=self.next_back_key())
.rev()
.map_while(|(k, v)| ChatEventKey::try_from(k).ok().map(|k| (k.event_index(), v)))
.map(|(k, v)| (k.event_index(), v))
.take(self.next_buffer_size)
.collect()
});
Expand Down
20 changes: 9 additions & 11 deletions backend/libraries/group_chat_core/src/members/stable_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{GroupMemberInternal, GroupMemberStableStorage};
use candid::{Deserialize, Principal};
use serde::Serialize;
use serde_bytes::ByteBuf;
use stable_memory_map::{with_map, with_map_mut, Key, MemberKey, MemberKeyPrefix};
use stable_memory_map::{with_map, with_map_mut, Key, KeyPrefix, MemberKeyPrefix};
use types::{MultiUserChat, UserId};

#[derive(Serialize, Deserialize)]
Expand All @@ -21,18 +21,18 @@ impl MembersStableStorage {

pub fn get(&self, user_id: &UserId) -> Option<GroupMemberInternal> {
with_map(|m| {
m.get(&self.prefix.create_key(*user_id).into())
m.get(self.prefix.create_key(user_id))
.map(|v| bytes_to_member(&v).hydrate(*user_id))
})
}

pub fn insert(&mut self, member: GroupMemberInternal) {
with_map_mut(|m| m.insert(self.prefix.create_key(member.user_id).into(), member_to_bytes(member.into())));
with_map_mut(|m| m.insert(self.prefix.create_key(&member.user_id), member_to_bytes(member.into())));
}

pub fn remove(&mut self, user_id: &UserId) -> Option<GroupMemberInternal> {
with_map_mut(|m| {
m.remove(&self.prefix.create_key(*user_id).into())
m.remove(self.prefix.create_key(user_id))
.map(|v| bytes_to_member(&v).hydrate(*user_id))
})
}
Expand All @@ -44,14 +44,13 @@ impl MembersStableStorage {
// Used to efficiently read all members from stable memory when migrating a group into a community
pub fn read_members_as_bytes(&self, after: Option<UserId>, max_bytes: usize) -> Vec<(UserId, ByteBuf)> {
let start_key = match after {
None => self.prefix.create_key(Principal::from_slice(&[]).into()),
Some(user_id) => self.prefix.create_key(user_id),
None => self.prefix.create_key(&Principal::from_slice(&[]).into()),
Some(user_id) => self.prefix.create_key(&user_id),
};

with_map(|m| {
let mut total_bytes = 0;
m.range(Key::from(start_key.clone())..)
.map_while(|(k, v)| MemberKey::try_from(k).ok().map(|k| (k, v)))
m.range(start_key.clone()..)
.skip_while(|(k, _)| *k == start_key)
.take_while(|(k, v)| {
if !k.matches_prefix(&self.prefix) {
Expand All @@ -68,8 +67,7 @@ impl MembersStableStorage {
#[cfg(test)]
pub fn all_members(&self) -> Vec<GroupMemberInternal> {
with_map(|m| {
m.range(Key::from(self.prefix.create_key(Principal::from_slice(&[]).into()))..)
.map_while(|(k, v)| MemberKey::try_from(k).ok().map(|k| (k, v)))
m.range(self.prefix.create_key(&Principal::from_slice(&[]).into())..)
.take_while(|(k, _)| k.matches_prefix(&self.prefix))
.map(|(k, v)| bytes_to_member(&v).hydrate(k.user_id()))
.collect()
Expand All @@ -87,7 +85,7 @@ pub fn write_members_from_bytes(chat: MultiUserChat, members: Vec<(UserId, ByteB
// Check that the bytes are valid
let _ = bytes_to_member(&bytes);
latest = Some(user_id);
m.insert(prefix.create_key(user_id).into(), bytes);
m.insert(prefix.create_key(&user_id), bytes);
}
});
latest
Expand Down
Loading

0 comments on commit 003c5d3

Please sign in to comment.