Skip to content

Commit

Permalink
Avoid iterating all users when determining who to notify of new messa…
Browse files Browse the repository at this point in the history
…ge (#6877)
  • Loading branch information
hpeebles authored Nov 22, 2024
1 parent b3423e9 commit 2247e37
Show file tree
Hide file tree
Showing 33 changed files with 325 additions and 116 deletions.
1 change: 1 addition & 0 deletions backend/canisters/community/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- Reduce the number of events stored on the heap in the `HybridMap` ([#6867](https://github.com/open-chat-labs/open-chat/pull/6867))
- Return `FailedToDeserialize` and log error if unable to read event ([#6873](https://github.com/open-chat-labs/open-chat/pull/6873))
- Extract stable memory map so it can store additional datasets ([#6876](https://github.com/open-chat-labs/open-chat/pull/6876))
- Avoid iterating all users when determining who to notify of new message ([#6877](https://github.com/open-chat-labs/open-chat/pull/6877))

### Removed

Expand Down
19 changes: 7 additions & 12 deletions backend/canisters/community/impl/src/model/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::collections::{HashMap, HashSet};
use types::{
ChannelId, ChannelMatch, CommunityCanisterChannelSummary, CommunityCanisterChannelSummaryUpdates, CommunityId,
GroupMembership, GroupMembershipUpdates, GroupPermissionRole, GroupPermissions, MultiUserChat, Rules, TimestampMillis,
Timestamped, UserId, UserType, MAX_THREADS_IN_SUMMARY,
UserId, UserType, MAX_THREADS_IN_SUMMARY,
};

use super::members::CommunityMembers;
Expand Down Expand Up @@ -268,7 +268,7 @@ impl Channel {
joined: m.date_added(),
role: m.role().value.into(),
mentions: chat.most_recent_mentions(m, None),
notifications_muted: m.notifications_muted.value,
notifications_muted: m.notifications_muted().value,
my_metrics: chat
.events
.user_metrics(&m.user_id(), None)
Expand Down Expand Up @@ -356,7 +356,7 @@ impl Channel {
let membership = member.map(|m| GroupMembershipUpdates {
role: updates.role_changed.then_some(m.role().value.into()),
mentions: updates.mentions,
notifications_muted: m.notifications_muted.if_set_after(since).cloned(),
notifications_muted: m.notifications_muted().if_set_after(since).cloned(),
my_metrics: self.chat.events.user_metrics(&m.user_id(), Some(since)).map(|m| m.hydrate()),
latest_threads: m
.followed_threads
Expand Down Expand Up @@ -404,15 +404,10 @@ impl Channel {
pub fn mute_notifications(&mut self, mute: bool, user_id: UserId, now: TimestampMillis) -> MuteChannelResult {
use MuteChannelResult::*;

if let Some(channel_member) = self.chat.members.get_mut(&user_id) {
if channel_member.notifications_muted.value != mute {
channel_member.notifications_muted = Timestamped::new(mute, now);
Success
} else {
Unchanged
}
} else {
UserNotFound
match self.chat.members.toggle_notifications_muted(user_id, mute, now) {
Some(true) => Success,
Some(false) => Unchanged,
None => UserNotFound,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ fn add_reaction_impl(args: Args, state: &mut RuntimeState) -> Response {
.chat
.members
.get(&message.sender)
.map_or(true, |m| m.notifications_muted.value || m.suspended.value);
.map_or(true, |m| m.notifications_muted().value || m.suspended().value);

if !notifications_muted {
let notification = Notification::ChannelReactionAdded(ChannelReactionAddedNotification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,16 @@ fn c2c_set_user_suspended(args: Args) -> Response {
}

fn c2c_set_user_suspended_impl(args: Args, state: &mut RuntimeState) -> Response {
if let Some(user) = state.data.members.get_by_user_id_mut(&args.user_id) {
if user.suspended.value != args.suspended {
if let Some(member) = state.data.members.get_by_user_id_mut(&args.user_id) {
if member.suspended.value != args.suspended {
let now = state.env.now();
user.suspended = Timestamped::new(args.suspended, now);
member.suspended = Timestamped::new(args.suspended, now);

for channel_id in member.channels.iter() {
if let Some(channel) = state.data.channels.get_mut(channel_id) {
channel.chat.members.set_suspended(member.user_id, args.suspended, now);
}
}
}
Success
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ fn build_c2c_args(args: &Args, state: &RuntimeState) -> Result<(c2c_report_messa
return Err(UserNotInChannel);
};

if channel_member.suspended.value {
if channel_member.suspended().value {
return Err(UserSuspended);
} else if channel_member.lapsed().value {
return Err(UserLapsed);
Expand Down
16 changes: 1 addition & 15 deletions backend/canisters/community/impl/src/updates/send_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,20 +214,6 @@ fn process_send_message_result(
let message_index = message_event.event.message_index;
let message_id = message_event.event.message_id;
let expires_at = message_event.expires_at;

// Exclude suspended members and bots from notification
let users_to_notify: Vec<UserId> = result
.users_to_notify
.into_iter()
.filter(|u| {
state
.data
.members
.get_by_user_id(u)
.map_or(false, |m| !m.suspended.value && !m.user_type.is_bot())
})
.collect();

let content = &message_event.event.content;
let community_id = state.env.canister_id().into();
let sender_is_human = state
Expand Down Expand Up @@ -255,7 +241,7 @@ fn process_send_message_result(
channel_avatar_id,
crypto_transfer: content.notification_crypto_transfer_details(&users_mentioned.mentioned_directly),
});
state.push_notification(users_to_notify, notification);
state.push_notification(result.users_to_notify, notification);

register_timer_jobs(channel_id, thread_root_message_index, message_event, now, &mut state.data);

Expand Down
1 change: 1 addition & 0 deletions backend/canisters/group/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- Reduce the number of events stored on the heap in the `HybridMap` ([#6867](https://github.com/open-chat-labs/open-chat/pull/6867))
- Return `FailedToDeserialize` and log error if unable to read event ([#6873](https://github.com/open-chat-labs/open-chat/pull/6873))
- Extract stable memory map so it can store additional datasets ([#6876](https://github.com/open-chat-labs/open-chat/pull/6876))
- Avoid iterating all users when determining who to notify of new message ([#6877](https://github.com/open-chat-labs/open-chat/pull/6877))

### Removed

Expand Down
10 changes: 8 additions & 2 deletions backend/canisters/group/impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl RuntimeState {
joined: member.date_added(),
role: member.role().value.into(),
mentions: chat.most_recent_mentions(member, None),
notifications_muted: member.notifications_muted.value,
notifications_muted: member.notifications_muted().value,
my_metrics: chat
.events
.user_metrics(&member.user_id(), None)
Expand Down Expand Up @@ -581,7 +581,13 @@ impl Data {
}

pub fn lookup_user_id(&self, user_id_or_principal: Principal) -> Option<UserId> {
self.get_member(user_id_or_principal).map(|m| m.user_id())
let user_id = self
.principal_to_user_id_map
.get(&user_id_or_principal)
.copied()
.unwrap_or(user_id_or_principal.into());

self.chat.members.contains(&user_id).then_some(user_id)
}

pub fn get_member(&self, user_id_or_principal: Principal) -> Option<&GroupMemberInternal> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ fn summary_updates_impl(updates_since: TimestampMillis, on_behalf_of: Option<Pri
let membership = GroupMembershipUpdates {
role: updates.role_changed.then_some(member.role().value.into()),
mentions: updates.mentions,
notifications_muted: member.notifications_muted.if_set_after(updates_since).cloned(),
notifications_muted: member.notifications_muted().if_set_after(updates_since).cloned(),
my_metrics: state
.data
.chat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ fn reserve_p2p_swap(args: Args, state: &mut RuntimeState) -> Result<ReserveP2PSw

let caller = state.env.caller();
if let Some(member) = state.data.get_member(caller) {
if member.suspended.value {
if member.suspended().value {
return Err(Box::new(UserSuspended));
} else if member.lapsed().value {
return Err(Box::new(UserLapsed));
Expand Down
2 changes: 1 addition & 1 deletion backend/canisters/group/impl/src/updates/add_reaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn add_reaction_impl(args: Args, state: &mut RuntimeState) -> Response {
.chat
.members
.get(&message.sender)
.map_or(true, |p| p.notifications_muted.value || p.suspended.value);
.map_or(true, |p| p.notifications_muted().value || p.suspended().value);

if !notifications_muted {
state.push_notification(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::{mutate_state, run_regular_jobs, RuntimeState};
use canister_api_macros::update;
use canister_tracing_macros::trace;
use group_canister::c2c_set_user_suspended::{Response::*, *};
use types::Timestamped;

#[update(guard = "caller_is_user_index", msgpack = true)]
#[trace]
Expand All @@ -14,11 +13,14 @@ fn c2c_set_user_suspended(args: Args) -> Response {
}

fn c2c_set_user_suspended_impl(args: Args, state: &mut RuntimeState) -> Response {
if let Some(user) = state.data.chat.members.get_mut(&args.user_id) {
if user.suspended.value != args.suspended {
let now = state.env.now();
user.suspended = Timestamped::new(args.suspended, now);
}
let now = state.env.now();
if state
.data
.chat
.members
.set_suspended(args.user_id, args.suspended, now)
.is_some()
{
Success
} else {
UserNotInGroup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ fn c2c_start_import_into_community(args: Args) -> Response {
fn c2c_start_import_into_community_impl(args: Args, state: &mut RuntimeState) -> Response {
if args.user_id != state.data.proposals_bot_user_id {
if let Some(member) = state.data.chat.members.get(&args.user_id) {
if member.suspended.value {
if member.suspended().value {
return UserSuspended;
} else if member.lapsed().value {
return UserLapsed;
Expand Down
2 changes: 1 addition & 1 deletion backend/canisters/group/impl/src/updates/cancel_invites.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fn cancel_invites_impl(args: Args, state: &mut RuntimeState) -> Response {
return NotAuthorized;
};

if member.suspended.value {
if member.suspended().value {
return UserSuspended;
} else if member.lapsed().value {
return UserLapsed;
Expand Down
2 changes: 1 addition & 1 deletion backend/canisters/group/impl/src/updates/claim_prize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn prepare(args: &Args, state: &mut RuntimeState) -> Result<PrepareResult, Box<R
let caller = state.env.caller();

if let Some(member) = state.data.get_member(caller) {
if member.suspended.value {
if member.suspended().value {
return Err(Box::new(UserSuspended));
} else if member.lapsed().value {
return Err(Box::new(UserLapsed));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ fn prepare(state: &mut RuntimeState) -> Result<PrepareResult, Response> {
let caller = state.env.caller();

if let Some(member) = state.data.get_member(caller) {
if member.suspended.value {
if member.suspended().value {
Err(UserSuspended)
} else if member.lapsed().value {
return Err(UserLapsed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn disable_invite_code_impl(args: Args, state: &mut RuntimeState) -> Response {

let caller = state.env.caller();
if let Some(member) = state.data.get_member(caller) {
if member.suspended.value {
if member.suspended().value {
return UserSuspended;
}

Expand Down
2 changes: 1 addition & 1 deletion backend/canisters/group/impl/src/updates/edit_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn edit_message_impl(args: Args, state: &mut RuntimeState) -> Response {

let caller = state.env.caller();
if let Some(member) = state.data.get_member(caller) {
if member.suspended.value {
if member.suspended().value {
return UserSuspended;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ fn prepare(state: &RuntimeState) -> Result<PrepareResult, Response> {

let caller = state.env.caller();
if let Some(member) = state.data.get_member(caller) {
if member.suspended.value {
if member.suspended().value {
return Err(UserSuspended);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ fn register_poll_vote_impl(args: Args, state: &mut RuntimeState) -> Response {

let caller = state.env.caller();
if let Some(member) = state.data.get_member(caller) {
if member.suspended.value {
if member.suspended().value {
return UserSuspended;
} else if member.lapsed().value {
return UserLapsed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn prepare(args: &Args, state: &RuntimeState) -> Result<PrepareResult, Response>
None => return Err(CallerNotInGroup),
};

if member.suspended.value {
if member.suspended().value {
return Err(UserSuspended);
} else if member.lapsed().value {
return Err(UserLapsed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ fn register_proposal_vote_impl(args: Args, state: &mut RuntimeState) -> Response
None => return CallerNotInGroup,
};

if member.suspended.value {
if member.suspended().value {
return UserSuspended;
} else if member.lapsed().value {
return UserLapsed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ fn prepare(user_to_remove: UserId, block: bool, state: &RuntimeState) -> Result<
let caller = state.env.caller();

if let Some(member) = state.data.get_member(caller) {
if member.suspended.value {
if member.suspended().value {
Err(UserSuspended)
} else if member.lapsed().value {
return Err(UserLapsed);
Expand Down
2 changes: 1 addition & 1 deletion backend/canisters/group/impl/src/updates/report_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ fn build_c2c_args(args: &Args, state: &RuntimeState) -> Result<(c2c_report_messa
if let Some(member) = state.data.get_member(caller) {
let chat = &state.data.chat;

if member.suspended.value {
if member.suspended().value {
return Err(UserSuspended);
} else if member.lapsed().value {
return Err(UserLapsed);
Expand Down
2 changes: 1 addition & 1 deletion backend/canisters/group/impl/src/updates/send_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ fn validate_caller(caller_override: Option<Principal>, state: &RuntimeState) ->

let caller = caller_override.unwrap_or_else(|| state.env.caller());
if let Some(member) = state.data.get_member(caller) {
if member.suspended.value {
if member.suspended().value {
Err(UserSuspended)
} else {
Ok(Caller {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::{mutate_state, run_regular_jobs, RuntimeState};
use canister_api_macros::update;
use canister_tracing_macros::trace;
use group_canister::toggle_mute_notifications::{Response::*, *};
use types::Timestamped;

#[update(candid = true, msgpack = true)]
#[trace]
Expand All @@ -15,13 +14,15 @@ fn toggle_mute_notifications(args: Args) -> Response {
fn toggle_mute_notifications_impl(args: Args, state: &mut RuntimeState) -> Response {
let caller = state.env.caller();
let now = state.env.now();
match state.data.get_member_mut(caller) {
Some(member) => {
member.notifications_muted = Timestamped::new(args.mute, now);
let user_id = member.user_id();
if let Some(user_id) = state.data.lookup_user_id(caller) {
if matches!(
state.data.chat.members.toggle_notifications_muted(user_id, args.mute, now),
Some(true)
) {
state.data.mark_group_updated_in_user_canister(user_id);
Success
}
None => CallerNotInGroup,
Success
} else {
CallerNotInGroup
}
}
2 changes: 1 addition & 1 deletion backend/canisters/group/impl/src/updates/unblock_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn unblock_user_impl(args: Args, state: &mut RuntimeState) -> Response {
if !state.data.chat.is_public.value {
GroupNotPublic
} else if let Some(caller_member) = state.data.get_member(caller) {
if caller_member.suspended.value {
if caller_member.suspended().value {
return UserSuspended;
} else if caller_member.lapsed().value {
return UserLapsed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn undelete_messages_impl(args: Args, state: &mut RuntimeState) -> Response {

let caller = state.env.caller();
if let Some(member) = state.data.get_member(caller) {
if member.suspended.value {
if member.suspended().value {
return UserSuspended;
}

Expand Down
Loading

0 comments on commit 2247e37

Please sign in to comment.