Skip to content

Commit

Permalink
Avoid iterating all events when migrating private replies after group…
Browse files Browse the repository at this point in the history
… import (#6827)
  • Loading branch information
hpeebles authored Nov 15, 2024
1 parent 6a21f27 commit 16284df
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 40 deletions.
1 change: 1 addition & 0 deletions backend/canisters/user/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- Reduce size of metrics in memory ([#6765](https://github.com/open-chat-labs/open-chat/pull/6765))
- Run cycles check (+ other background tasks) when executing timer jobs ([#6815](https://github.com/open-chat-labs/open-chat/pull/6815))
- Add cycles balance check to more timer jobs ([#6822](https://github.com/open-chat-labs/open-chat/pull/6822))
- Avoid iterating all events when migrating private replies after group import ([#6827](https://github.com/open-chat-labs/open-chat/pull/6827))

### Removed

Expand Down
27 changes: 23 additions & 4 deletions backend/canisters/user/impl/src/model/direct_chats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@ use crate::model::direct_chat::DirectChat;
use chat_events::{ChatInternal, ChatMetricsInternal};
use serde::{Deserialize, Serialize};
use std::collections::hash_map::Entry::Vacant;
use std::collections::{BTreeSet, HashMap};
use types::{ChatId, TimestampMillis, Timestamped, UserId, UserType};
use std::collections::{BTreeMap, BTreeSet, HashMap};
use types::{ChatId, MessageIndex, TimestampMillis, Timestamped, UserId, UserType};

#[derive(Serialize, Deserialize, Default)]
pub struct DirectChats {
direct_chats: HashMap<ChatId, DirectChat>,
pinned: Timestamped<Vec<ChatId>>,
metrics: ChatMetricsInternal,
chats_removed: BTreeSet<(TimestampMillis, ChatId)>,
// This is needed so that when a group is imported into a community we can quickly update the
// replies to point to the community
#[serde(default)]
private_replies_to_groups: BTreeMap<ChatId, Vec<(UserId, MessageIndex)>>,
}

impl DirectChats {
Expand Down Expand Up @@ -75,9 +79,24 @@ impl DirectChats {
self.direct_chats.len()
}

pub fn mark_private_reply(&mut self, user_id: UserId, chat: ChatInternal, message_index: MessageIndex) {
if let ChatInternal::Group(chat_id) = chat {
self.private_replies_to_groups
.entry(chat_id)
.or_default()
.push((user_id, message_index));
}
}

pub fn migrate_replies(&mut self, old: ChatInternal, new: ChatInternal, now: TimestampMillis) {
for chat in self.direct_chats.values_mut() {
chat.events.migrate_replies(old, new, now);
if let ChatInternal::Group(chat_id) = old {
if let Some(replies) = self.private_replies_to_groups.remove(&chat_id) {
for (user_id, message_index) in replies {
if let Some(chat) = self.direct_chats.get_mut(&user_id.into()) {
chat.events.migrate_reply(message_index, old, new, now);
}
}
}
}
}

Expand Down
13 changes: 13 additions & 0 deletions backend/canisters/user/impl/src/updates/c2c_send_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ pub(crate) fn handle_message_impl(args: HandleMessageArgs, state: &mut RuntimeSt

let thread_root_message_index = args.thread_root_message_id.map(|id| chat.main_message_id_to_index(id));

let chat_private_replying_to = if let Some((chat, None)) = replies_to.as_ref().and_then(|r| r.chat_if_other) {
Some(chat)
} else {
None
};

let push_message_args = PushMessageArgs {
thread_root_message_index,
message_id: args.message_id.unwrap_or_else(|| state.env.rng().gen()),
Expand Down Expand Up @@ -207,6 +213,13 @@ pub(crate) fn handle_message_impl(args: HandleMessageArgs, state: &mut RuntimeSt
&mut state.data,
);

if let Some(chat) = chat_private_replying_to {
state
.data
.direct_chats
.mark_private_reply(args.sender, chat, message_event.event.message_index);
}

message_event
}

Expand Down
18 changes: 16 additions & 2 deletions backend/canisters/user/impl/src/updates/send_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{mutate_state, read_state, run_regular_jobs, Data, RuntimeState, Time
use candid::Principal;
use canister_api_macros::update;
use canister_tracing_macros::trace;
use chat_events::{MessageContentInternal, PushMessageArgs, Reader};
use chat_events::{MessageContentInternal, PushMessageArgs, Reader, ReplyContextInternal};
use rand::Rng;
use types::{
BlobReference, CanisterId, Chat, ChatId, CompletedCryptoTransaction, ContentValidationError, CryptoTransaction,
Expand Down Expand Up @@ -217,19 +217,26 @@ fn send_message_impl(
let this_canister_id = state.env.canister_id();
let sender: UserId = this_canister_id.into();
let recipient = args.recipient;
let replies_to = args.replies_to.as_ref().map(ReplyContextInternal::from);
let content = if let Some(transfer) = completed_transfer.clone() {
MessageContentInternal::new_with_transfer(args.content.clone(), transfer.into(), p2p_swap_id, now)
} else {
args.content.into()
};

let chat_private_replying_to = if let Some((chat, None)) = replies_to.as_ref().and_then(|r| r.chat_if_other) {
Some(chat)
} else {
None
};

let push_message_args = PushMessageArgs {
thread_root_message_index: args.thread_root_message_index,
message_id: args.message_id,
sender,
content: content.clone(),
mentioned: Vec::new(),
replies_to: args.replies_to.as_ref().map(|r| r.into()),
replies_to,
forwarded: args.forwarding,
sender_is_bot: false,
block_level_markdown: args.block_level_markdown,
Expand Down Expand Up @@ -306,6 +313,13 @@ fn send_message_impl(
&mut state.data,
);

if let Some(chat) = chat_private_replying_to {
state
.data
.direct_chats
.mark_private_reply(args.recipient, chat, message_event.event.message_index);
}

if let Some(transfer) = completed_transfer {
TransferSuccessV2(TransferSuccessV2Result {
chat_id: recipient.into(),
Expand Down
101 changes: 99 additions & 2 deletions backend/integration_tests/src/communities/import_group_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use pocket_ic::PocketIc;
use std::ops::Deref;
use testing::rng::{random_from_u128, random_string};
use types::{
icrc1, ChatId, CommunityId, CryptoTransaction, Cryptocurrency, EventIndex, MessageContentInitial, PendingCryptoTransaction,
PrizeContentInitial,
icrc1, Chat, ChatEvent, ChatId, CommunityId, CryptoTransaction, Cryptocurrency, EventIndex, MessageContentInitial,
PendingCryptoTransaction, PrizeContentInitial, ReplyContext, TextContent,
};
use user_canister::mark_read::ChatMessagesRead;
use utils::time::HOUR_IN_MS;
Expand Down Expand Up @@ -245,6 +245,103 @@ fn pending_prizes_transferred_to_community() {
assert_eq!(community_balance, 0);
}

#[test]
fn private_replies_to_group_updated_to_community() {
let mut wrapper = ENV.deref().get();
let TestEnv {
env,
canister_ids,
controller,
..
} = wrapper.env();

let TestData {
user1,
user2,
group_id,
community_id,
..
} = init_test_data(env, canister_ids, *controller);

let send_message_response =
client::group::happy_path::send_text_message(env, &user1, group_id, None, random_string(), None);

client::user::send_message_v2(
env,
user2.principal,
user2.canister(),
&user_canister::send_message_v2::Args {
recipient: user1.user_id,
thread_root_message_index: None,
message_id: random_from_u128(),
content: MessageContentInitial::Text(TextContent { text: random_string() }),
replies_to: Some(ReplyContext {
chat_if_other: Some((Chat::Group(group_id), None)),
event_index: send_message_response.event_index,
}),
forwarding: false,
block_level_markdown: false,
message_filter_failed: None,
pin: None,
correlation_id: 0,
},
);

let import_group_response = client::community::import_group(
env,
user1.principal,
community_id.into(),
&community_canister::import_group::Args { group_id },
);

let channel_id = match import_group_response {
community_canister::import_group::Response::Success(result) => result.channel_id,
response => panic!("{response:?}"),
};

tick_many(env, 10);

let user1_event = client::user::happy_path::events(env, &user1, user2.user_id, EventIndex::default(), true, 10, 10)
.events
.pop()
.unwrap()
.event;

if let ChatEvent::Message(m) = user1_event {
if let Some(replies_to) = m.replies_to {
assert_eq!(
replies_to.chat_if_other,
Some((Chat::Channel(community_id, channel_id), None))
);
assert_eq!(replies_to.event_index, send_message_response.event_index);
} else {
panic!();
}
} else {
panic!();
}

let user2_event = client::user::happy_path::events(env, &user2, user1.user_id, EventIndex::default(), true, 10, 10)
.events
.pop()
.unwrap()
.event;

if let ChatEvent::Message(m) = user2_event {
if let Some(replies_to) = m.replies_to {
assert_eq!(
replies_to.chat_if_other,
Some((Chat::Channel(community_id, channel_id), None))
);
assert_eq!(replies_to.event_index, send_message_response.event_index);
} else {
panic!();
}
} else {
panic!();
}
}

fn init_test_data(env: &mut PocketIc, canister_ids: &CanisterIds, controller: Principal) -> TestData {
let user1 = client::register_diamond_user(env, canister_ids, controller);
let user2 = client::register_user(env, canister_ids);
Expand Down
25 changes: 16 additions & 9 deletions backend/libraries/chat_events/src/chat_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::collections::hash_map::Entry::{Occupied, Vacant};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::mem;
use std::ops::DerefMut;
use tracing::info;
use tracing::{error, info};
use types::{
AcceptP2PSwapResult, CallParticipant, CancelP2PSwapResult, CanisterId, Chat, ChatType, CompleteP2PSwapResult,
CompletedCryptoTransaction, Cryptocurrency, DirectChatCreated, EventContext, EventIndex, EventWrapper,
Expand Down Expand Up @@ -1559,15 +1559,22 @@ impl ChatEvents {
}

// Used when a group is imported into a community
pub fn migrate_replies(&mut self, old: ChatInternal, new: ChatInternal, now: TimestampMillis) {
for (thread_root_message_index, events_list) in [(None, &mut self.main)]
.into_iter()
.chain(self.threads.iter_mut().map(|(t, e)| (Some(*t), e)))
pub fn migrate_reply(&mut self, message_index: MessageIndex, old: ChatInternal, new: ChatInternal, now: TimestampMillis) {
if self
.update_message(None, message_index.into(), EventIndex::default(), Some(now), |message, _| {
if let Some(r) = message.replies_to.as_mut() {
if let Some((chat, _)) = r.chat_if_other.as_mut() {
if *chat == old {
*chat = new;
return Ok(());
}
}
}
Err(UpdateEventError::NoChange(()))
})
.is_err()
{
for event_index in events_list.migrate_replies(old, new) {
self.last_updated_timestamps
.mark_updated(thread_root_message_index, event_index, now);
}
error!("Failed to migrate reply. This should never happen")
}
}

Expand Down
24 changes: 1 addition & 23 deletions backend/libraries/chat_events/src/chat_events_list.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::hybrid_map::HybridMap;
use crate::last_updated_timestamps::LastUpdatedTimestamps;
use crate::stable_storage::ChatEventsStableStorage;
use crate::{ChatEventInternal, ChatInternal, EventKey, EventOrExpiredRangeInternal, EventsMap, MessageInternal};
use crate::{ChatEventInternal, EventKey, EventOrExpiredRangeInternal, EventsMap, MessageInternal};
use candid::Principal;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -211,28 +211,6 @@ impl ChatEventsList {
}
}

pub fn migrate_replies(&mut self, old: ChatInternal, new: ChatInternal) -> Vec<EventIndex> {
let mut updated = Vec::new();
for mut event in self.iter_internal() {
if let Some(message) = event.event.as_message_mut() {
if let Some(r) = message.replies_to.as_mut() {
if let Some((chat, _)) = r.chat_if_other.as_mut() {
if *chat == old {
*chat = new;
updated.push(event);
}
}
}
}
}

let updated_indexes = updated.iter().map(|e| e.index).collect();
for event in updated {
self.stable_events_map.insert(event);
}
updated_indexes
}

pub(crate) fn event_count_since<F: Fn(&ChatEventInternal) -> bool>(&self, since: TimestampMillis, filter: &F) -> usize {
self.iter_internal()
.rev()
Expand Down

0 comments on commit 16284df

Please sign in to comment.