Skip to content

Commit

Permalink
Reduce size by grouping member -> channel links per userId (#7025)
Browse files Browse the repository at this point in the history
  • Loading branch information
hpeebles authored Dec 9, 2024
1 parent e679d2d commit 1b9bd62
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 20 deletions.
1 change: 1 addition & 0 deletions backend/canisters/community/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- Include the ledger canister Id in transfer failed error logs ([#7011](https://github.com/open-chat-labs/open-chat/pull/7011))
- Ensure bot has permission to execute given action ([#7014](https://github.com/open-chat-labs/open-chat/pull/7014))
- Allow bots to send a subset of message types ([#7016](https://github.com/open-chat-labs/open-chat/pull/7016))
- Reduce size by grouping member -> channel links per userId ([#7025](https://github.com/open-chat-labs/open-chat/pull/7025))

### Removed

Expand Down
3 changes: 2 additions & 1 deletion backend/canisters/community/impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ impl RuntimeState {
.data
.members
.channels_for_member(m.user_id)
.filter_map(|c| self.data.channels.get(&c))
.iter()
.filter_map(|c| self.data.channels.get(c))
.filter_map(|c| c.summary(Some(m.user_id), true, data.is_public.value, &data.members))
.collect();

Expand Down
64 changes: 47 additions & 17 deletions backend/canisters/community/impl/src/model/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ use candid::Principal;
use constants::calculate_summary_updates_data_removal_cutoff;
use group_community_common::{Member, MemberUpdate, Members};
use rand::RngCore;
use serde::{Deserialize, Serialize};
use serde::de::{SeqAccess, Visitor};
use serde::{Deserialize, Deserializer, Serialize};
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Formatter;
use types::{
is_default, ChannelId, CommunityMember, CommunityPermissions, CommunityRole, TimestampMillis, Timestamped, UserId,
UserType, Version,
is_default, ChannelId, CommunityMember, CommunityPermissions, CommunityRole, PushIfNotContains, TimestampMillis,
Timestamped, UserId, UserType, Version,
};

#[cfg(test)]
Expand All @@ -20,7 +22,8 @@ const MAX_MEMBERS_PER_COMMUNITY: u32 = 100_000;
#[derive(Serialize, Deserialize)]
pub struct CommunityMembers {
members_map: MembersStableStorage,
member_channel_links: BTreeSet<(UserId, ChannelId)>,
#[serde(deserialize_with = "deserialize_member_channel_links")]
member_channel_links: BTreeMap<UserId, Vec<ChannelId>>,
member_channel_links_removed: BTreeMap<(UserId, ChannelId), TimestampMillis>,
user_groups: UserGroups,
// This includes the userIds of community members and also users invited to the community
Expand Down Expand Up @@ -62,7 +65,7 @@ impl CommunityMembers {

CommunityMembers {
members_map: MembersStableStorage::new(member),
member_channel_links: public_channels.into_iter().map(|c| (creator_user_id, c)).collect(),
member_channel_links: [(creator_user_id, public_channels)].into_iter().collect(),
member_channel_links_removed: BTreeMap::new(),
user_groups: UserGroups::default(),
principal_to_user_id_map: vec![(creator_principal, creator_user_id)].into_iter().collect(),
Expand Down Expand Up @@ -180,10 +183,7 @@ impl CommunityMembers {
self.members_with_referrals.remove(&referrer);
}
}
let channel_ids: Vec<_> = self.channels_for_member(user_id).collect();
for channel_id in channel_ids {
self.member_channel_links.remove(&(user_id, channel_id));
}
self.member_channel_links.remove(&user_id);
let channels_removed: Vec<_> = self.channels_removed_for_member(user_id).map(|(c, _)| c).collect();
for channel_id in channels_removed {
self.member_channel_links_removed.remove(&(user_id, channel_id));
Expand Down Expand Up @@ -351,7 +351,10 @@ impl CommunityMembers {

pub fn mark_member_joined_channel(&mut self, user_id: UserId, channel_id: ChannelId) {
if self.member_ids.contains(&user_id) {
self.member_channel_links.insert((user_id, channel_id));
self.member_channel_links
.entry(user_id)
.or_default()
.push_if_not_contains(channel_id);
self.member_channel_links_removed.remove(&(user_id, channel_id));
}
}
Expand All @@ -364,7 +367,9 @@ impl CommunityMembers {
now: TimestampMillis,
) {
if self.member_ids.contains(&user_id) {
self.member_channel_links.remove(&(user_id, channel_id));
if let Some(channel_ids) = self.member_channel_links.get_mut(&user_id) {
channel_ids.retain(|id| *id != channel_id);
}
if !channel_deleted {
self.member_channel_links_removed.insert((user_id, channel_id), now);
}
Expand Down Expand Up @@ -448,11 +453,11 @@ impl CommunityMembers {
&self.member_ids
}

pub fn channels_for_member(&self, user_id: UserId) -> impl Iterator<Item = ChannelId> + '_ {
pub fn channels_for_member(&self, user_id: UserId) -> &[ChannelId] {
self.member_channel_links
.range((user_id, ChannelId::from(0u32))..)
.take_while(move |(u, _)| *u == user_id)
.map(|(_, c)| *c)
.get(&user_id)
.map(|v| v.as_slice())
.unwrap_or_default()
}

pub fn channels_removed_for_member(&self, user_id: UserId) -> impl Iterator<Item = (ChannelId, TimestampMillis)> + '_ {
Expand Down Expand Up @@ -827,6 +832,31 @@ impl From<&CommunityMemberInternal> for CommunityMember {
}
}

struct MemberChannelLinksVisitor;

impl<'de> Visitor<'de> for MemberChannelLinksVisitor {
type Value = BTreeMap<UserId, Vec<ChannelId>>;

fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
formatter.write_str("a sequence")
}

fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
let mut map: BTreeMap<UserId, Vec<ChannelId>> = BTreeMap::new();
while let Some((user_id, channel_id)) = seq.next_element()? {
map.entry(user_id).or_default().push(channel_id);
}
Ok(map)
}
}

fn deserialize_member_channel_links<'de, D: Deserializer<'de>>(d: D) -> Result<BTreeMap<UserId, Vec<ChannelId>>, D::Error> {
d.deserialize_seq(MemberChannelLinksVisitor)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -858,7 +888,7 @@ mod tests {
}
}

let channel_ids: Vec<_> = members.channels_for_member(user_id2).collect();
let channel_ids = members.channels_for_member(user_id2).to_vec();
assert_eq!(channel_ids, (25u32..100).map(ChannelId::from).collect::<Vec<_>>());

let removed: Vec<_> = members.channels_removed_for_member(user_id2).map(|(c, _)| c).collect();
Expand All @@ -869,7 +899,7 @@ mod tests {
}

members.remove(&user_id2, 0);
assert!(members.channels_for_member(user_id2).next().is_none());
assert!(members.channels_for_member(user_id2).is_empty());
assert!(members.channels_removed_for_member(user_id2).next().is_none());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ fn summary_updates_impl(
.data
.members
.channels_for_member(m.user_id)
.filter_map(|c| state.data.channels.get(&c))
.iter()
.filter_map(|c| state.data.channels.get(c))
.filter(|c| c.last_updated(Some(m.user_id)) > updates_since)
.collect();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ fn c2c_set_user_suspended_impl(args: Args, state: &mut RuntimeState) -> Response
let now = state.env.now();
if state.data.members.set_suspended(args.user_id, args.suspended, now).is_some() {
for channel_id in state.data.members.channels_for_member(args.user_id) {
if let Some(channel) = state.data.channels.get_mut(&channel_id) {
if let Some(channel) = state.data.channels.get_mut(channel_id) {
channel.chat.members.set_suspended(args.user_id, args.suspended, now);
}
}
Expand Down

0 comments on commit 1b9bd62

Please sign in to comment.