From 1a1f650521611b57f4740e414c3787a01a545286 Mon Sep 17 00:00:00 2001 From: Hamish Peebles Date: Tue, 3 Oct 2023 15:26:10 +0100 Subject: [PATCH] Use canister timers to remove expired events (#4447) --- backend/canisters/community/CHANGELOG.md | 1 + backend/canisters/community/api/can.did | 1 + .../api/src/updates/update_channel.rs | 5 +- backend/canisters/community/impl/src/lib.rs | 25 ++- .../community/impl/src/model/channels.rs | 10 +- .../impl/src/queries/deleted_message.rs | 2 +- .../community/impl/src/timer_job_types.rs | 19 ++- .../src/updates/add_members_to_channel.rs | 6 +- .../impl/src/updates/add_reaction.rs | 15 +- .../impl/src/updates/c2c_join_channel.rs | 2 +- .../impl/src/updates/c2c_tip_message.rs | 2 +- .../src/updates/register_proposal_vote.rs | 8 +- .../src/updates/register_proposal_vote_v2.rs | 5 +- .../impl/src/updates/send_message.rs | 17 +- .../impl/src/updates/update_channel.rs | 3 +- backend/canisters/group/CHANGELOG.md | 1 + .../group/impl/src/activity_notifications.rs | 2 +- backend/canisters/group/impl/src/lib.rs | 38 +++-- .../impl/src/queries/c2c_events_internal.rs | 13 +- .../group/impl/src/queries/deleted_message.rs | 2 +- .../group/impl/src/queries/public_summary.rs | 3 +- .../impl/src/queries/selected_initial.rs | 2 +- .../group/impl/src/queries/summary_updates.rs | 6 +- .../group/impl/src/timer_job_types.rs | 24 ++- .../group/impl/src/updates/add_reaction.rs | 7 +- .../group/impl/src/updates/c2c_tip_message.rs | 2 +- .../src/updates/register_proposal_vote.rs | 5 +- .../src/updates/register_proposal_vote_v2.rs | 5 +- .../group/impl/src/updates/send_message.rs | 17 +- backend/canisters/user/CHANGELOG.md | 4 + backend/canisters/user/impl/src/lib.rs | 25 ++- .../user/impl/src/model/direct_chat.rs | 15 +- .../user/impl/src/model/direct_chats.rs | 4 + .../user/impl/src/queries/deleted_message.rs | 3 +- .../canisters/user/impl/src/queries/events.rs | 4 +- .../user/impl/src/queries/events_by_index.rs | 4 +- .../user/impl/src/queries/events_window.rs | 4 +- .../user/impl/src/queries/initial_state.rs | 9 +- .../src/queries/messages_by_message_index.rs | 3 +- .../user/impl/src/queries/updates.rs | 4 +- .../user/impl/src/timer_job_types.rs | 14 +- .../impl/src/updates/c2c_send_messages.rs | 47 +++--- .../user/impl/src/updates/c2c_tip_message.rs | 2 +- .../impl/src/updates/c2c_toggle_reaction.rs | 7 +- .../user/impl/src/updates/mark_read.rs | 2 +- .../user/impl/src/updates/send_message.rs | 48 +++++- .../impl/src/updates/undelete_messages.rs | 2 +- .../integration_tests/src/client/community.rs | 4 +- .../communities/disappearing_message_tests.rs | 100 +++++++++++ .../integration_tests/src/communities/mod.rs | 1 + .../src/communities/send_message_tests.rs | 57 ++++--- .../src/communities/update_channel_tests.rs | 3 +- .../src/disappearing_message_tests.rs | 109 +----------- .../chat_events/src/chat_event_internal.rs | 13 +- .../libraries/chat_events/src/chat_events.rs | 155 +++++++----------- .../chat_events/src/chat_events_list.rs | 101 +++--------- .../chat_events/src/expiring_events.rs | 79 ++------- backend/libraries/group_chat_core/src/lib.rs | 94 +++++------ .../libraries/group_chat_core/src/members.rs | 9 +- backend/libraries/types/can.did | 10 -- .../libraries/types/src/channel_summary.rs | 4 +- backend/libraries/types/src/chat_summary.rs | 12 +- .../src/services/community/candid/idl.js | 1 + .../src/services/community/candid/types.d.ts | 1 + .../services/community/community.client.ts | 11 +- .../src/services/openchatAgent.ts | 1 + upgrade_order.md | 5 +- 67 files changed, 593 insertions(+), 626 deletions(-) create mode 100644 backend/integration_tests/src/communities/disappearing_message_tests.rs diff --git a/backend/canisters/community/CHANGELOG.md b/backend/canisters/community/CHANGELOG.md index 2e42d6a0a2..d679ec5331 100644 --- a/backend/canisters/community/CHANGELOG.md +++ b/backend/canisters/community/CHANGELOG.md @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ### Changed - Disable mentions for messages sent by the ProposalsBot ([#4424](https://github.com/open-chat-labs/open-chat/pull/4424)) +- Use canister timers to remove expired events ([#4447](https://github.com/open-chat-labs/open-chat/pull/4447)) ### Fixed diff --git a/backend/canisters/community/api/can.did b/backend/canisters/community/api/can.did index aa14aeb46f..8a5face2ae 100644 --- a/backend/canisters/community/api/can.did +++ b/backend/canisters/community/api/can.did @@ -812,6 +812,7 @@ type UpdateChannelArgs = record { rules : opt UpdatedRules; avatar : DocumentUpdate; permissions : opt OptionalGroupPermissions; + events_ttl : EventsTimeToLiveUpdate; gate : AccessGateUpdate; public : opt bool; }; diff --git a/backend/canisters/community/api/src/updates/update_channel.rs b/backend/canisters/community/api/src/updates/update_channel.rs index 9a3782cca7..696bb09d63 100644 --- a/backend/canisters/community/api/src/updates/update_channel.rs +++ b/backend/canisters/community/api/src/updates/update_channel.rs @@ -1,8 +1,8 @@ use candid::CandidType; use serde::{Deserialize, Serialize}; use types::{ - AccessGate, ChannelId, Document, FieldTooLongResult, FieldTooShortResult, OptionUpdate, OptionalGroupPermissions, - UpdatedRules, Version, + AccessGate, ChannelId, Document, FieldTooLongResult, FieldTooShortResult, Milliseconds, OptionUpdate, + OptionalGroupPermissions, UpdatedRules, Version, }; #[derive(CandidType, Serialize, Deserialize, Debug)] @@ -13,6 +13,7 @@ pub struct Args { pub rules: Option, pub avatar: OptionUpdate, pub permissions: Option, + pub events_ttl: OptionUpdate, pub gate: OptionUpdate, pub public: Option, } diff --git a/backend/canisters/community/impl/src/lib.rs b/backend/canisters/community/impl/src/lib.rs index dc1f0831a9..da593523e8 100644 --- a/backend/canisters/community/impl/src/lib.rs +++ b/backend/canisters/community/impl/src/lib.rs @@ -2,7 +2,7 @@ use crate::memory::{get_instruction_counts_data_memory, get_instruction_counts_i use crate::model::channels::Channels; use crate::model::groups_being_imported::{GroupBeingImportedSummary, GroupsBeingImported}; use crate::model::members::CommunityMembers; -use crate::timer_job_types::TimerJob; +use crate::timer_job_types::{RemoveExpiredEventsJob, TimerJob}; use activity_notification_state::ActivityNotificationState; use candid::Principal; use canister_state_macros::canister_state; @@ -146,6 +146,26 @@ impl RuntimeState { } } + pub fn run_event_expiry_job(&mut self) { + let now = self.env.now(); + let mut next_event_expiry = None; + for channel in self.data.channels.iter_mut() { + channel.chat.remove_expired_events(now); + if let Some(expiry) = channel.chat.events.next_event_expiry() { + if next_event_expiry.map_or(true, |current| expiry < current) { + next_event_expiry = Some(expiry); + } + } + } + + self.data.next_event_expiry = next_event_expiry; + if let Some(expiry) = self.data.next_event_expiry { + self.data + .timer_jobs + .enqueue_job(TimerJob::RemoveExpiredEvents(RemoveExpiredEventsJob), expiry, now); + } + } + pub fn metrics(&self) -> Metrics { Metrics { memory_used: utils::memory::used(), @@ -211,6 +231,8 @@ struct Data { groups_being_imported: GroupsBeingImported, #[serde(skip, default = "init_instruction_counts_log")] instruction_counts_log: InstructionCountsLog, + #[serde(default)] + next_event_expiry: Option, test_mode: bool, cached_chat_metrics: Timestamped, } @@ -274,6 +296,7 @@ impl Data { activity_notification_state: ActivityNotificationState::new(now, mark_active_duration), groups_being_imported: GroupsBeingImported::default(), instruction_counts_log: init_instruction_counts_log(), + next_event_expiry: None, test_mode, cached_chat_metrics: Timestamped::default(), } diff --git a/backend/canisters/community/impl/src/model/channels.rs b/backend/canisters/community/impl/src/model/channels.rs index bdeb7b2007..43a6cb7a57 100644 --- a/backend/canisters/community/impl/src/model/channels.rs +++ b/backend/canisters/community/impl/src/model/channels.rs @@ -207,7 +207,7 @@ impl Channel { let can_view_latest_message = self.can_view_latest_message(member.is_some(), is_community_member, is_public_community); - let main_events_reader = chat.events.visible_main_events_reader(min_visible_event_index, now); + let main_events_reader = chat.events.visible_main_events_reader(min_visible_event_index); let latest_event_index = main_events_reader.latest_event_index().unwrap_or_default(); let latest_message = if can_view_latest_message { main_events_reader.latest_message_event(user_id) } else { None }; @@ -219,7 +219,7 @@ impl Channel { let membership = member.map(|m| ChannelMembership { joined: m.date_added, role: m.role.into(), - mentions: m.most_recent_mentions(None, &chat.events, now), + mentions: m.most_recent_mentions(None, &chat.events), notifications_muted: m.notifications_muted.value, my_metrics: chat .events @@ -232,7 +232,6 @@ impl Channel { None, MAX_THREADS_IN_SUMMARY, m.user_id, - now, ), rules_accepted: m .rules_accepted @@ -259,8 +258,6 @@ impl Channel { metrics: chat.events.metrics().hydrate(), date_last_pinned: chat.date_last_pinned, events_ttl: chat.events.get_events_time_to_live().value, - expired_messages: chat.events.expired_messages(now), - next_message_expiry: chat.events.next_message_expiry(now), gate: chat.gate.value.clone(), membership, }) @@ -292,7 +289,7 @@ impl Channel { } let can_view_latest_message = self.can_view_latest_message(member.is_some(), is_community_member, is_public_community); - let updates_from_events = chat.summary_updates_from_events(since, user_id, now); + let updates_from_events = chat.summary_updates_from_events(since, user_id); let latest_message = can_view_latest_message .then_some(updates_from_events.latest_message) @@ -314,7 +311,6 @@ impl Channel { Some(since), MAX_THREADS_IN_SUMMARY, m.user_id, - now, ), unfollowed_threads: self .chat diff --git a/backend/canisters/community/impl/src/queries/deleted_message.rs b/backend/canisters/community/impl/src/queries/deleted_message.rs index a75e74d83b..b6a4d4cb1d 100644 --- a/backend/canisters/community/impl/src/queries/deleted_message.rs +++ b/backend/canisters/community/impl/src/queries/deleted_message.rs @@ -16,7 +16,7 @@ fn deleted_message_impl(args: Args, state: &RuntimeState) -> Response { if let Some(channel) = state.data.channels.get(&args.channel_id) { match channel .chat - .deleted_message(user_id, args.thread_root_message_index, args.message_id, state.env.now()) + .deleted_message(user_id, args.thread_root_message_index, args.message_id) { DeletedMessageResult::Success(content) => Success(SuccessResult { content: *content }), DeletedMessageResult::UserNotInGroup => UserNotInChannel, diff --git a/backend/canisters/community/impl/src/timer_job_types.rs b/backend/canisters/community/impl/src/timer_job_types.rs index fe34ee9186..ec92eadbdc 100644 --- a/backend/canisters/community/impl/src/timer_job_types.rs +++ b/backend/canisters/community/impl/src/timer_job_types.rs @@ -1,3 +1,4 @@ +use crate::activity_notifications::handle_activity_notification; use crate::jobs::import_groups::{finalize_group_import, mark_import_complete, process_channel_members}; use crate::{mutate_state, read_state}; use canister_timer_jobs::Job; @@ -12,6 +13,7 @@ pub enum TimerJob { HardDeleteMessageContent(HardDeleteMessageContentJob), DeleteFileReferences(DeleteFileReferencesJob), EndPoll(EndPollJob), + RemoveExpiredEvents(RemoveExpiredEventsJob), FinalizeGroupImport(FinalizeGroupImportJob), ProcessGroupImportChannelMembers(ProcessGroupImportChannelMembersJob), MarkGroupImportComplete(MarkGroupImportCompleteJob), @@ -38,6 +40,9 @@ pub struct EndPollJob { pub message_index: MessageIndex, } +#[derive(Serialize, Deserialize, Clone)] +pub struct RemoveExpiredEventsJob; + #[derive(Serialize, Deserialize, Clone)] pub struct FinalizeGroupImportJob { pub group_id: ChatId, @@ -74,6 +79,7 @@ impl Job for TimerJob { TimerJob::HardDeleteMessageContent(job) => job.execute(), TimerJob::DeleteFileReferences(job) => job.execute(), TimerJob::EndPoll(job) => job.execute(), + TimerJob::RemoveExpiredEvents(job) => job.execute(), TimerJob::FinalizeGroupImport(job) => job.execute(), TimerJob::ProcessGroupImportChannelMembers(job) => job.execute(), TimerJob::MarkGroupImportComplete(job) => job.execute(), @@ -86,13 +92,11 @@ impl Job for TimerJob { impl Job for HardDeleteMessageContentJob { fn execute(&self) { mutate_state(|state| { - let now = state.env.now(); - if let Some(content) = state.data.channels.get_mut(&self.channel_id).and_then(|channel| { channel .chat .events - .remove_deleted_message_content(self.thread_root_message_index, self.message_id, now) + .remove_deleted_message_content(self.thread_root_message_index, self.message_id) }) { let files_to_delete = content.blob_references(); if !files_to_delete.is_empty() { @@ -126,12 +130,19 @@ impl Job for EndPollJob { .chat .events .end_poll(self.thread_root_message_index, self.message_index, now); - // handle_activity_notification(state); + + handle_activity_notification(state); } }); } } +impl Job for RemoveExpiredEventsJob { + fn execute(&self) { + mutate_state(|state| state.run_event_expiry_job()); + } +} + impl Job for FinalizeGroupImportJob { fn execute(&self) { finalize_group_import(self.group_id); diff --git a/backend/canisters/community/impl/src/updates/add_members_to_channel.rs b/backend/canisters/community/impl/src/updates/add_members_to_channel.rs index e6d097756e..cace90b2f3 100644 --- a/backend/canisters/community/impl/src/updates/add_members_to_channel.rs +++ b/backend/canisters/community/impl/src/updates/add_members_to_channel.rs @@ -137,18 +137,18 @@ fn commit( is_bot: bool, state: &mut RuntimeState, ) -> Response { - let now = state.env.now(); - if let Some(channel) = state.data.channels.get_mut(&channel_id) { let mut min_visible_event_index = EventIndex::default(); let mut min_visible_message_index = MessageIndex::default(); if !channel.chat.history_visible_to_new_joiners { - let events_reader = channel.chat.events.main_events_reader(now); + let events_reader = channel.chat.events.main_events_reader(); min_visible_event_index = events_reader.next_event_index(); min_visible_message_index = events_reader.next_message_index(); } + let now = state.env.now(); + let mut users_added: Vec = Vec::new(); let mut users_limit_reached: Vec = Vec::new(); diff --git a/backend/canisters/community/impl/src/updates/add_reaction.rs b/backend/canisters/community/impl/src/updates/add_reaction.rs index ddd4601652..d1eae66917 100644 --- a/backend/canisters/community/impl/src/updates/add_reaction.rs +++ b/backend/canisters/community/impl/src/updates/add_reaction.rs @@ -5,7 +5,7 @@ use canister_tracing_macros::trace; use chat_events::Reader; use community_canister::add_reaction::{Response::*, *}; use group_chat_core::{AddRemoveReactionResult, GroupChatCore}; -use types::{ChannelReactionAddedNotification, EventIndex, EventWrapper, Message, Notification, TimestampMillis, UserId}; +use types::{ChannelReactionAddedNotification, EventIndex, EventWrapper, Message, Notification, UserId}; #[update_candid_and_msgpack] #[trace] @@ -27,9 +27,9 @@ fn add_reaction_impl(args: Args, state: &mut RuntimeState) -> Response { } let user_id = member.user_id; - let now = state.env.now(); if let Some(channel) = state.data.channels.get_mut(&args.channel_id) { + let now = state.env.now(); match channel.chat.add_reaction( user_id, args.thread_root_message_index, @@ -38,7 +38,7 @@ fn add_reaction_impl(args: Args, state: &mut RuntimeState) -> Response { now, ) { AddRemoveReactionResult::Success => { - if let Some(message) = should_push_notification(&args, user_id, &channel.chat, now) { + if let Some(message) = should_push_notification(&args, user_id, &channel.chat) { push_notification( args, user_id, @@ -67,15 +67,10 @@ fn add_reaction_impl(args: Args, state: &mut RuntimeState) -> Response { } } -fn should_push_notification( - args: &Args, - user_id: UserId, - chat: &GroupChatCore, - now: TimestampMillis, -) -> Option> { +fn should_push_notification(args: &Args, user_id: UserId, chat: &GroupChatCore) -> Option> { let message = chat .events - .events_reader(EventIndex::default(), args.thread_root_message_index, now) + .events_reader(EventIndex::default(), args.thread_root_message_index) // We pass in `None` in place of `my_user_id` because we don't want to hydrate // the notification with data for the current user (eg. their poll votes). .and_then(|events_reader| events_reader.message_event(args.message_id.into(), None))?; diff --git a/backend/canisters/community/impl/src/updates/c2c_join_channel.rs b/backend/canisters/community/impl/src/updates/c2c_join_channel.rs index 74cb161c48..2a8c256d3a 100644 --- a/backend/canisters/community/impl/src/updates/c2c_join_channel.rs +++ b/backend/canisters/community/impl/src/updates/c2c_join_channel.rs @@ -160,7 +160,7 @@ pub(crate) fn join_channel_unchecked( min_visible_event_index = e; min_visible_message_index = m; } else { - let events_reader = channel.chat.events.main_events_reader(now); + let events_reader = channel.chat.events.main_events_reader(); min_visible_event_index = events_reader.next_event_index(); min_visible_message_index = events_reader.next_message_index(); }; diff --git a/backend/canisters/community/impl/src/updates/c2c_tip_message.rs b/backend/canisters/community/impl/src/updates/c2c_tip_message.rs index 71d616e66c..3bf71b8677 100644 --- a/backend/canisters/community/impl/src/updates/c2c_tip_message.rs +++ b/backend/canisters/community/impl/src/updates/c2c_tip_message.rs @@ -46,7 +46,7 @@ fn c2c_tip_message_impl(args: Args, state: &mut RuntimeState) -> Response { if let Some((message_index, message_event_index)) = channel .chat .events - .events_reader(EventIndex::default(), args.thread_root_message_index, now) + .events_reader(EventIndex::default(), args.thread_root_message_index) .and_then(|r| { r.message_event_internal(args.message_id.into()) .map(|e| (e.event.message_index, e.index)) diff --git a/backend/canisters/community/impl/src/updates/register_proposal_vote.rs b/backend/canisters/community/impl/src/updates/register_proposal_vote.rs index 5fa2311b27..97dcc6475d 100644 --- a/backend/canisters/community/impl/src/updates/register_proposal_vote.rs +++ b/backend/canisters/community/impl/src/updates/register_proposal_vote.rs @@ -76,12 +76,11 @@ fn prepare(args: &Args, state: &RuntimeState) -> Result }; let min_visible_event_index = channel_member.min_visible_event_index(); - let now = state.env.now(); if let Some(proposal) = channel .chat .events - .visible_main_events_reader(min_visible_event_index, now) + .visible_main_events_reader(min_visible_event_index) .message_internal(args.message_index.into()) .and_then(|m| if let MessageContentInternal::GovernanceProposal(p) = &m.content { Some(p) } else { None }) { @@ -111,15 +110,16 @@ fn commit(channel_id: ChannelId, user_id: UserId, args: Args, state: &mut Runtim None => return UserNotInChannel, }; - let now = state.env.now(); let min_visible_event_index = member.min_visible_event_index(); match channel .chat .events - .record_proposal_vote(user_id, min_visible_event_index, args.message_index, args.adopt, now) + .record_proposal_vote(user_id, min_visible_event_index, args.message_index, args.adopt) { RecordProposalVoteResult::Success => { + let now = state.env.now(); + let votes = member.proposal_votes.entry(now).or_default(); if !votes.contains(&args.message_index) { votes.push(args.message_index); diff --git a/backend/canisters/community/impl/src/updates/register_proposal_vote_v2.rs b/backend/canisters/community/impl/src/updates/register_proposal_vote_v2.rs index ff010b7aac..861ec0da10 100644 --- a/backend/canisters/community/impl/src/updates/register_proposal_vote_v2.rs +++ b/backend/canisters/community/impl/src/updates/register_proposal_vote_v2.rs @@ -39,16 +39,17 @@ fn register_proposal_vote_impl(args: Args, state: &mut RuntimeState) -> Response None => return UserNotInChannel, }; - let now = state.env.now(); let min_visible_event_index = channel_member.min_visible_event_index(); let user_id = member.user_id; match channel .chat .events - .record_proposal_vote(user_id, min_visible_event_index, args.message_index, args.adopt, now) + .record_proposal_vote(user_id, min_visible_event_index, args.message_index, args.adopt) { RecordProposalVoteResult::Success => { + let now = state.env.now(); + channel .chat .members diff --git a/backend/canisters/community/impl/src/updates/send_message.rs b/backend/canisters/community/impl/src/updates/send_message.rs index f520b04f3b..c1dcbd0e04 100644 --- a/backend/canisters/community/impl/src/updates/send_message.rs +++ b/backend/canisters/community/impl/src/updates/send_message.rs @@ -1,7 +1,7 @@ use crate::activity_notifications::handle_activity_notification; use crate::model::members::CommunityMembers; use crate::model::user_groups::UserGroup; -use crate::timer_job_types::{DeleteFileReferencesJob, EndPollJob, RefundPrizeJob, TimerJob}; +use crate::timer_job_types::{DeleteFileReferencesJob, EndPollJob, RefundPrizeJob, RemoveExpiredEventsJob, TimerJob}; use crate::{mutate_state, run_regular_jobs, RuntimeState}; use canister_api_macros::update_candid_and_msgpack; use canister_timer_jobs::TimerJobs; @@ -80,10 +80,19 @@ fn send_message_impl(args: Args, state: &mut RuntimeState) -> Response { let message_index = result.message_event.event.message_index; let expires_at = result.message_event.expires_at; + let mut is_next_event_to_expire = false; + if let Some(expiry) = expires_at { + is_next_event_to_expire = state.data.next_event_expiry.map_or(true, |ex| expiry < ex); + if is_next_event_to_expire { + state.data.next_event_expiry = expires_at; + } + } + register_timer_jobs( args.channel_id, args.thread_root_message_index, &result.message_event, + is_next_event_to_expire, now, &mut state.data.timer_jobs, ); @@ -150,6 +159,7 @@ fn register_timer_jobs( channel_id: ChannelId, thread_root_message_index: Option, message_event: &EventWrapper, + is_next_event_to_expire: bool, now: TimestampMillis, timer_jobs: &mut TimerJobs, ) { @@ -185,6 +195,11 @@ fn register_timer_jobs( now, ); } + + if let Some(expiry) = message_event.expires_at.filter(|_| is_next_event_to_expire) { + timer_jobs.cancel_jobs(|j| matches!(j, TimerJob::RemoveExpiredEvents(_))); + timer_jobs.enqueue_job(TimerJob::RemoveExpiredEvents(RemoveExpiredEventsJob), expiry, now); + } } lazy_static! { diff --git a/backend/canisters/community/impl/src/updates/update_channel.rs b/backend/canisters/community/impl/src/updates/update_channel.rs index 855bf0dfaf..b3e37353c3 100644 --- a/backend/canisters/community/impl/src/updates/update_channel.rs +++ b/backend/canisters/community/impl/src/updates/update_channel.rs @@ -4,7 +4,6 @@ use canister_tracing_macros::trace; use community_canister::update_channel::{Response::*, *}; use group_chat_core::UpdateResult; use ic_cdk_macros::update; -use types::OptionUpdate; #[update] #[trace] @@ -42,7 +41,7 @@ fn update_channel_impl(mut args: Args, state: &mut RuntimeState) -> Response { args.permissions, args.gate, args.public, - OptionUpdate::NoChange, + args.events_ttl, now, ) { UpdateResult::Success(result) => { diff --git a/backend/canisters/group/CHANGELOG.md b/backend/canisters/group/CHANGELOG.md index ba685bfbb3..4e0328e8e1 100644 --- a/backend/canisters/group/CHANGELOG.md +++ b/backend/canisters/group/CHANGELOG.md @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). - Disable mentions for messages sent by the ProposalsBot ([#4424](https://github.com/open-chat-labs/open-chat/pull/4424)) - Simplify `inspect_message` ([#4436](https://github.com/open-chat-labs/open-chat/pull/4436)) +- Use canister timers to remove expired events ([#4447](https://github.com/open-chat-labs/open-chat/pull/4447)) ### Fixed diff --git a/backend/canisters/group/impl/src/activity_notifications.rs b/backend/canisters/group/impl/src/activity_notifications.rs index f184159d1e..4946b73b76 100644 --- a/backend/canisters/group/impl/src/activity_notifications.rs +++ b/backend/canisters/group/impl/src/activity_notifications.rs @@ -38,7 +38,7 @@ pub(crate) fn handle_activity_notification(state: &mut RuntimeState) { for event in data .chat .events - .main_events_reader(now) + .main_events_reader() .iter(None, false) .take_while(|e| e.timestamp >= one_day_ago) { diff --git a/backend/canisters/group/impl/src/lib.rs b/backend/canisters/group/impl/src/lib.rs index c132d60e9b..8b19ff3b74 100644 --- a/backend/canisters/group/impl/src/lib.rs +++ b/backend/canisters/group/impl/src/lib.rs @@ -1,7 +1,7 @@ use crate::memory::{get_instruction_counts_data_memory, get_instruction_counts_index_memory}; use crate::model::new_joiner_rewards::{NewJoinerRewardMetrics, NewJoinerRewardStatus, NewJoinerRewards}; use crate::new_joiner_rewards::process_new_joiner_reward; -use crate::timer_job_types::TimerJob; +use crate::timer_job_types::{RemoveExpiredEventsJob, TimerJob}; use crate::updates::c2c_freeze_group::freeze_group_impl; use activity_notification_state::ActivityNotificationState; use candid::Principal; @@ -104,7 +104,7 @@ impl RuntimeState { let chat = &self.data.chat; let min_visible_event_index = member.min_visible_event_index(); let min_visible_message_index = member.min_visible_message_index(); - let main_events_reader = chat.events.visible_main_events_reader(min_visible_event_index, now); + let main_events_reader = chat.events.visible_main_events_reader(min_visible_event_index); let latest_event_index = main_events_reader.latest_event_index().unwrap_or_default(); GroupCanisterGroupChatSummary { @@ -123,7 +123,7 @@ impl RuntimeState { joined: member.date_added, participant_count: chat.members.len(), role: member.role.into(), - mentions: member.most_recent_mentions(None, &chat.events, now), + mentions: member.most_recent_mentions(None, &chat.events), permissions: chat.permissions.clone(), notifications_muted: member.notifications_muted.value, metrics: chat.events.metrics().hydrate(), @@ -138,14 +138,11 @@ impl RuntimeState { None, MAX_THREADS_IN_SUMMARY, member.user_id, - now, ), frozen: self.data.frozen.value.clone(), wasm_version: BuildVersion::default(), date_last_pinned: chat.date_last_pinned, events_ttl: chat.events.get_events_time_to_live().value, - expired_messages: chat.events.expired_messages(now), - next_message_expiry: chat.events.next_message_expiry(now), gate: chat.gate.value.clone(), rules_accepted: member .rules_accepted @@ -206,25 +203,35 @@ impl RuntimeState { } } + pub fn run_event_expiry_job(&mut self) { + let now = self.env.now(); + self.data.chat.remove_expired_events(now); + + self.data.next_event_expiry = self.data.chat.events.next_event_expiry(); + if let Some(expiry) = self.data.next_event_expiry { + self.data + .timer_jobs + .enqueue_job(TimerJob::RemoveExpiredEvents(RemoveExpiredEventsJob), expiry, now); + } + } + pub fn metrics(&self) -> Metrics { let group_chat_core = &self.data.chat; let now = self.env.now(); let messages_in_last_hour = group_chat_core .events - .event_count_since(now.saturating_sub(HOUR_IN_MS), now, |e| { - matches!(e, ChatEventInternal::Message(_)) - }) as u64; + .event_count_since(now.saturating_sub(HOUR_IN_MS), |e| matches!(e, ChatEventInternal::Message(_))) + as u64; let messages_in_last_day = group_chat_core .events - .event_count_since(now.saturating_sub(DAY_IN_MS), now, |e| { - matches!(e, ChatEventInternal::Message(_)) - }) as u64; + .event_count_since(now.saturating_sub(DAY_IN_MS), |e| matches!(e, ChatEventInternal::Message(_))) + as u64; let events_in_last_hour = group_chat_core .events - .event_count_since(now.saturating_sub(HOUR_IN_MS), now, |_| true) as u64; + .event_count_since(now.saturating_sub(HOUR_IN_MS), |_| true) as u64; let events_in_last_day = group_chat_core .events - .event_count_since(now.saturating_sub(DAY_IN_MS), now, |_| true) as u64; + .event_count_since(now.saturating_sub(DAY_IN_MS), |_| true) as u64; Metrics { memory_used: utils::memory::used(), @@ -294,6 +301,8 @@ struct Data { pub test_mode: bool, pub community_being_imported_into: Option, pub serialized_chat_state: Option, + #[serde(default)] + pub next_event_expiry: Option, } fn init_instruction_counts_log() -> InstructionCountsLog { @@ -361,6 +370,7 @@ impl Data { instruction_counts_log: init_instruction_counts_log(), community_being_imported_into: None, serialized_chat_state: None, + next_event_expiry: None, } } diff --git a/backend/canisters/group/impl/src/queries/c2c_events_internal.rs b/backend/canisters/group/impl/src/queries/c2c_events_internal.rs index ba172b08f9..18e1d2c573 100644 --- a/backend/canisters/group/impl/src/queries/c2c_events_internal.rs +++ b/backend/canisters/group/impl/src/queries/c2c_events_internal.rs @@ -11,14 +11,11 @@ fn c2c_events_internal(args: Args) -> Response { } fn c2c_events_internal_impl(args: Args, state: &RuntimeState) -> Response { - let now = state.env.now(); - - if let Some(events_reader) = - state - .data - .chat - .events - .events_reader(EventIndex::default(), args.thread_root_message_index, now) + if let Some(events_reader) = state + .data + .chat + .events + .events_reader(EventIndex::default(), args.thread_root_message_index) { let latest_event_index = events_reader.latest_event_index().unwrap(); let events = events_reader diff --git a/backend/canisters/group/impl/src/queries/deleted_message.rs b/backend/canisters/group/impl/src/queries/deleted_message.rs index b2d615a020..f76d912652 100644 --- a/backend/canisters/group/impl/src/queries/deleted_message.rs +++ b/backend/canisters/group/impl/src/queries/deleted_message.rs @@ -15,7 +15,7 @@ fn deleted_message_impl(args: Args, state: &RuntimeState) -> Response { match state .data .chat - .deleted_message(user_id, args.thread_root_message_index, args.message_id, state.env.now()) + .deleted_message(user_id, args.thread_root_message_index, args.message_id) { DeletedMessageResult::Success(content) => Success(SuccessResult { content: *content }), DeletedMessageResult::UserNotInGroup => CallerNotInGroup, diff --git a/backend/canisters/group/impl/src/queries/public_summary.rs b/backend/canisters/group/impl/src/queries/public_summary.rs index 25e4d62f53..8996182d47 100644 --- a/backend/canisters/group/impl/src/queries/public_summary.rs +++ b/backend/canisters/group/impl/src/queries/public_summary.rs @@ -18,9 +18,8 @@ fn public_summary_impl(args: Args, state: &RuntimeState) -> Response { } let is_public = state.data.chat.is_public; - let now = state.env.now(); let data = &state.data; - let events_reader = data.chat.events.main_events_reader(now); + let events_reader = data.chat.events.main_events_reader(); let latest_event_timestamp = events_reader.latest_event_timestamp().unwrap_or_default(); let latest_event_index = events_reader.latest_event_index().unwrap_or_default(); diff --git a/backend/canisters/group/impl/src/queries/selected_initial.rs b/backend/canisters/group/impl/src/queries/selected_initial.rs index e6215ca762..016ca9e777 100644 --- a/backend/canisters/group/impl/src/queries/selected_initial.rs +++ b/backend/canisters/group/impl/src/queries/selected_initial.rs @@ -16,7 +16,7 @@ fn selected_initial_impl(state: &RuntimeState) -> Response { Success(SuccessResult { timestamp: now, - latest_event_index: chat.events.main_events_reader(now).latest_event_index().unwrap_or_default(), + latest_event_index: chat.events.main_events_reader().latest_event_index().unwrap_or_default(), participants: chat.members.iter().map(|p| p.into()).collect(), blocked_users: chat.members.blocked(), invited_users: chat.invited_users.users(), diff --git a/backend/canisters/group/impl/src/queries/summary_updates.rs b/backend/canisters/group/impl/src/queries/summary_updates.rs index 2fe47f54b0..db729f6f5a 100644 --- a/backend/canisters/group/impl/src/queries/summary_updates.rs +++ b/backend/canisters/group/impl/src/queries/summary_updates.rs @@ -33,8 +33,7 @@ fn summary_updates_impl(args: Args, state: &RuntimeState) -> Response { } let now = state.env.now(); - let newly_expired_messages = chat.events.expired_messages_since(updates_since, now); - let updates_from_events = chat.summary_updates_from_events(updates_since, Some(member.user_id), now); + let updates_from_events = chat.summary_updates_from_events(updates_since, Some(member.user_id)); let updates = GroupCanisterGroupChatSummaryUpdates { chat_id: state.env.canister_id().into(), @@ -64,7 +63,6 @@ fn summary_updates_impl(args: Args, state: &RuntimeState) -> Response { Some(args.updates_since), MAX_THREADS_IN_SUMMARY, member.user_id, - now, ), unfollowed_threads: chat.events.unfollowed_threads_since( member.unfollowed_threads.iter(), @@ -81,8 +79,6 @@ fn summary_updates_impl(args: Args, state: &RuntimeState) -> Response { wasm_version: None, date_last_pinned: updates_from_events.date_last_pinned, events_ttl: updates_from_events.events_ttl, - newly_expired_messages, - next_message_expiry: OptionUpdate::from_update(chat.events.next_message_expiry(now)), gate: updates_from_events.gate, rules_accepted: member .rules_accepted diff --git a/backend/canisters/group/impl/src/timer_job_types.rs b/backend/canisters/group/impl/src/timer_job_types.rs index abef4e0fa6..bda08ae70d 100644 --- a/backend/canisters/group/impl/src/timer_job_types.rs +++ b/backend/canisters/group/impl/src/timer_job_types.rs @@ -13,6 +13,7 @@ pub enum TimerJob { EndPoll(EndPollJob), RefundPrize(RefundPrizeJob), MakeTransfer(MakeTransferJob), + RemoveExpiredEvents(RemoveExpiredEventsJob), } #[derive(Serialize, Deserialize, Clone)] @@ -43,6 +44,9 @@ pub struct MakeTransferJob { pub pending_transaction: PendingCryptoTransaction, } +#[derive(Serialize, Deserialize, Clone)] +pub struct RemoveExpiredEventsJob; + impl Job for TimerJob { fn execute(&self) { match self { @@ -51,6 +55,7 @@ impl Job for TimerJob { TimerJob::EndPoll(job) => job.execute(), TimerJob::RefundPrize(job) => job.execute(), TimerJob::MakeTransfer(job) => job.execute(), + TimerJob::RemoveExpiredEvents(job) => job.execute(), } } } @@ -58,14 +63,11 @@ impl Job for TimerJob { impl Job for HardDeleteMessageContentJob { fn execute(&self) { mutate_state(|state| { - let now = state.env.now(); - - if let Some(content) = - state - .data - .chat - .events - .remove_deleted_message_content(self.thread_root_message_index, self.message_id, now) + if let Some(content) = state + .data + .chat + .events + .remove_deleted_message_content(self.thread_root_message_index, self.message_id) { let files_to_delete = content.blob_references(); if !files_to_delete.is_empty() { @@ -141,3 +143,9 @@ impl Job for MakeTransferJob { } } } + +impl Job for RemoveExpiredEventsJob { + fn execute(&self) { + mutate_state(|state| state.run_event_expiry_job()); + } +} diff --git a/backend/canisters/group/impl/src/updates/add_reaction.rs b/backend/canisters/group/impl/src/updates/add_reaction.rs index c322399697..14ad281879 100644 --- a/backend/canisters/group/impl/src/updates/add_reaction.rs +++ b/backend/canisters/group/impl/src/updates/add_reaction.rs @@ -5,7 +5,7 @@ use chat_events::Reader; use group_canister::add_reaction::{Response::*, *}; use group_chat_core::AddRemoveReactionResult; use ic_cdk_macros::update; -use types::{EventIndex, GroupReactionAddedNotification, Notification, TimestampMillis, UserId}; +use types::{EventIndex, GroupReactionAddedNotification, Notification, UserId}; #[update] #[trace] @@ -33,7 +33,7 @@ fn add_reaction_impl(args: Args, state: &mut RuntimeState) -> Response { ) { AddRemoveReactionResult::Success => { handle_activity_notification(state); - handle_notification(args, user_id, now, state); + handle_notification(args, user_id, state); Success } AddRemoveReactionResult::NoChange => NoChange, @@ -58,14 +58,13 @@ fn handle_notification( .. }: Args, user_id: UserId, - now: TimestampMillis, state: &mut RuntimeState, ) { if let Some(message_event) = state .data .chat .events - .events_reader(EventIndex::default(), thread_root_message_index, now) + .events_reader(EventIndex::default(), thread_root_message_index) // We pass in `None` in place of `my_user_id` because we don't want to hydrate // the notification with data for the current user (eg. their poll votes). .and_then(|events_reader| events_reader.message_event(message_id.into(), None)) diff --git a/backend/canisters/group/impl/src/updates/c2c_tip_message.rs b/backend/canisters/group/impl/src/updates/c2c_tip_message.rs index 129cc4b337..32e5a3e221 100644 --- a/backend/canisters/group/impl/src/updates/c2c_tip_message.rs +++ b/backend/canisters/group/impl/src/updates/c2c_tip_message.rs @@ -41,7 +41,7 @@ fn c2c_tip_message_impl(args: Args, state: &mut RuntimeState) -> Response { .data .chat .events - .events_reader(EventIndex::default(), args.thread_root_message_index, now) + .events_reader(EventIndex::default(), args.thread_root_message_index) .and_then(|r| { r.message_event_internal(args.message_id.into()) .map(|e| (e.event.message_index, e.index)) diff --git a/backend/canisters/group/impl/src/updates/register_proposal_vote.rs b/backend/canisters/group/impl/src/updates/register_proposal_vote.rs index 1440bfcb3b..d07ca0ef48 100644 --- a/backend/canisters/group/impl/src/updates/register_proposal_vote.rs +++ b/backend/canisters/group/impl/src/updates/register_proposal_vote.rs @@ -65,14 +65,13 @@ fn prepare(args: &Args, state: &RuntimeState) -> Result return Err(UserSuspended); } - let now = state.env.now(); let min_visible_event_index = member.min_visible_event_index(); if let Some(proposal) = state .data .chat .events - .visible_main_events_reader(min_visible_event_index, now) + .visible_main_events_reader(min_visible_event_index) .message_internal(args.message_index.into()) .and_then(|m| if let MessageContentInternal::GovernanceProposal(p) = &m.content { Some(p) } else { None }) { @@ -104,7 +103,7 @@ fn commit(user_id: UserId, args: Args, state: &mut RuntimeState) -> Response { .data .chat .events - .record_proposal_vote(user_id, min_visible_event_index, args.message_index, args.adopt, now) + .record_proposal_vote(user_id, min_visible_event_index, args.message_index, args.adopt) { RecordProposalVoteResult::Success => { let votes = member.proposal_votes.entry(now).or_default(); diff --git a/backend/canisters/group/impl/src/updates/register_proposal_vote_v2.rs b/backend/canisters/group/impl/src/updates/register_proposal_vote_v2.rs index 177c88186a..c23ef2b6ab 100644 --- a/backend/canisters/group/impl/src/updates/register_proposal_vote_v2.rs +++ b/backend/canisters/group/impl/src/updates/register_proposal_vote_v2.rs @@ -29,7 +29,6 @@ fn register_proposal_vote_impl(args: Args, state: &mut RuntimeState) -> Response return UserSuspended; } - let now = state.env.now(); let min_visible_event_index = member.min_visible_event_index(); let user_id = member.user_id; @@ -37,9 +36,11 @@ fn register_proposal_vote_impl(args: Args, state: &mut RuntimeState) -> Response .data .chat .events - .record_proposal_vote(user_id, min_visible_event_index, args.message_index, args.adopt, now) + .record_proposal_vote(user_id, min_visible_event_index, args.message_index, args.adopt) { RecordProposalVoteResult::Success => { + let now = state.env.now(); + state .data .chat diff --git a/backend/canisters/group/impl/src/updates/send_message.rs b/backend/canisters/group/impl/src/updates/send_message.rs index 386c05e228..9650fc29d0 100644 --- a/backend/canisters/group/impl/src/updates/send_message.rs +++ b/backend/canisters/group/impl/src/updates/send_message.rs @@ -1,5 +1,5 @@ use crate::activity_notifications::handle_activity_notification; -use crate::timer_job_types::{DeleteFileReferencesJob, EndPollJob, RefundPrizeJob}; +use crate::timer_job_types::{DeleteFileReferencesJob, EndPollJob, RefundPrizeJob, RemoveExpiredEventsJob}; use crate::{mutate_state, run_regular_jobs, RuntimeState, TimerJob}; use canister_api_macros::update_candid_and_msgpack; use canister_timer_jobs::TimerJobs; @@ -42,9 +42,18 @@ fn send_message_impl(args: Args, state: &mut RuntimeState) -> Response { let message_index = result.message_event.event.message_index; let expires_at = result.message_event.expires_at; + let mut is_next_event_to_expire = false; + if let Some(expiry) = expires_at { + is_next_event_to_expire = state.data.next_event_expiry.map_or(true, |ex| expiry < ex); + if is_next_event_to_expire { + state.data.next_event_expiry = expires_at; + } + } + register_timer_jobs( args.thread_root_message_index, &result.message_event, + is_next_event_to_expire, now, &mut state.data.timer_jobs, ); @@ -94,6 +103,7 @@ fn send_message_impl(args: Args, state: &mut RuntimeState) -> Response { fn register_timer_jobs( thread_root_message_index: Option, message_event: &EventWrapper, + is_next_event_to_expire: bool, now: TimestampMillis, timer_jobs: &mut TimerJobs, ) { @@ -127,4 +137,9 @@ fn register_timer_jobs( now, ); } + + if let Some(expiry) = message_event.expires_at.filter(|_| is_next_event_to_expire) { + timer_jobs.cancel_jobs(|j| matches!(j, TimerJob::RemoveExpiredEvents(_))); + timer_jobs.enqueue_job(TimerJob::RemoveExpiredEvents(RemoveExpiredEventsJob), expiry, now); + } } diff --git a/backend/canisters/user/CHANGELOG.md b/backend/canisters/user/CHANGELOG.md index 5a2202443b..b3bbf797fa 100644 --- a/backend/canisters/user/CHANGELOG.md +++ b/backend/canisters/user/CHANGELOG.md @@ -22,6 +22,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). - Add `followed_by_me` to the thread summary returned in GroupChatSummary ([#4431](https://github.com/open-chat-labs/open-chat/pull/4431)) - Allow users to save named cryptocurrency accounts ([#4434](https://github.com/open-chat-labs/open-chat/pull/4434)) +### Changed + +- Use canister timers to remove expired events ([#4447](https://github.com/open-chat-labs/open-chat/pull/4447)) + ## [[2.0.852](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.852-user)] - 2023-09-18 ### Added diff --git a/backend/canisters/user/impl/src/lib.rs b/backend/canisters/user/impl/src/lib.rs index 5c432ec974..53aa78e454 100644 --- a/backend/canisters/user/impl/src/lib.rs +++ b/backend/canisters/user/impl/src/lib.rs @@ -5,7 +5,7 @@ use crate::model::direct_chats::DirectChats; use crate::model::group_chat::GroupChat; use crate::model::group_chats::GroupChats; use crate::model::hot_group_exclusions::HotGroupExclusions; -use crate::timer_job_types::TimerJob; +use crate::timer_job_types::{RemoveExpiredEventsJob, TimerJob}; use candid::Principal; use canister_state_macros::canister_state; use canister_timer_jobs::TimerJobs; @@ -100,6 +100,26 @@ impl RuntimeState { } } + pub fn run_event_expiry_job(&mut self) { + let now = self.env.now(); + let mut next_event_expiry = None; + for chat in self.data.direct_chats.iter_mut() { + chat.events.remove_expired_events(now); + if let Some(expiry) = chat.events.next_event_expiry() { + if next_event_expiry.map_or(true, |current| expiry < current) { + next_event_expiry = Some(expiry); + } + } + } + + self.data.next_event_expiry = next_event_expiry; + if let Some(expiry) = self.data.next_event_expiry { + self.data + .timer_jobs + .enqueue_job(TimerJob::RemoveExpiredEvents(RemoveExpiredEventsJob), expiry, now); + } + } + pub fn metrics(&self) -> Metrics { Metrics { memory_used: utils::memory::used(), @@ -155,6 +175,8 @@ struct Data { pub fire_and_forget_handler: FireAndForgetHandler, #[serde(default)] pub saved_crypto_accounts: Vec, + #[serde(default)] + pub next_event_expiry: Option, } impl Data { @@ -199,6 +221,7 @@ impl Data { diamond_membership_expires_at: None, fire_and_forget_handler: FireAndForgetHandler::default(), saved_crypto_accounts: Vec::new(), + next_event_expiry: None, } } diff --git a/backend/canisters/user/impl/src/model/direct_chat.rs b/backend/canisters/user/impl/src/model/direct_chat.rs index e2d118dccc..2ae7faf7dc 100644 --- a/backend/canisters/user/impl/src/model/direct_chat.rs +++ b/backend/canisters/user/impl/src/model/direct_chat.rs @@ -78,8 +78,8 @@ impl DirectChat { self.unconfirmed_v2.retain(|m| m.message_id != message_id); } - pub fn to_summary(&self, my_user_id: UserId, now: TimestampMillis) -> DirectChatSummary { - let events_reader = self.events.main_events_reader(now); + pub fn to_summary(&self, my_user_id: UserId) -> DirectChatSummary { + let events_reader = self.events.main_events_reader(); DirectChatSummary { them: self.them, @@ -98,17 +98,11 @@ impl DirectChat { .unwrap_or_default(), archived: self.archived.value, events_ttl: self.events.get_events_time_to_live().value, - expired_messages: self.events.expired_messages(now), } } - pub fn to_summary_updates( - &self, - updates_since: TimestampMillis, - my_user_id: UserId, - now: TimestampMillis, - ) -> DirectChatSummaryUpdates { - let events_reader = self.events.main_events_reader(now); + pub fn to_summary_updates(&self, updates_since: TimestampMillis, my_user_id: UserId) -> DirectChatSummaryUpdates { + let events_reader = self.events.main_events_reader(); let has_new_events = events_reader.latest_event_timestamp().map_or(false, |ts| ts > updates_since); let latest_message = events_reader.latest_message_event_if_updated(updates_since, Some(my_user_id)); @@ -143,7 +137,6 @@ impl DirectChat { .if_set_after(updates_since) .copied() .map_or(OptionUpdate::NoChange, OptionUpdate::from_update), - newly_expired_messages: self.events.expired_messages_since(updates_since, now), } } } diff --git a/backend/canisters/user/impl/src/model/direct_chats.rs b/backend/canisters/user/impl/src/model/direct_chats.rs index 02f2375db6..6a09fd863e 100644 --- a/backend/canisters/user/impl/src/model/direct_chats.rs +++ b/backend/canisters/user/impl/src/model/direct_chats.rs @@ -41,6 +41,10 @@ impl DirectChats { self.direct_chats.values() } + pub fn iter_mut(&mut self) -> impl Iterator { + self.direct_chats.values_mut() + } + pub fn len(&self) -> usize { self.direct_chats.len() } diff --git a/backend/canisters/user/impl/src/queries/deleted_message.rs b/backend/canisters/user/impl/src/queries/deleted_message.rs index 6aadb69031..a1f6919c92 100644 --- a/backend/canisters/user/impl/src/queries/deleted_message.rs +++ b/backend/canisters/user/impl/src/queries/deleted_message.rs @@ -13,8 +13,7 @@ fn deleted_message_impl(args: Args, state: &RuntimeState) -> Response { let my_user_id = state.env.canister_id().into(); if let Some(chat) = state.data.direct_chats.get(&args.user_id.into()) { - let now = state.env.now(); - let events_reader = chat.events.main_events_reader(now); + let events_reader = chat.events.main_events_reader(); if let Some(message) = events_reader.message_internal(args.message_id.into()) { let deleted_by = message.deleted_by.as_ref().map(|d| d.deleted_by); diff --git a/backend/canisters/user/impl/src/queries/events.rs b/backend/canisters/user/impl/src/queries/events.rs index 24f06115d5..ee682a54f3 100644 --- a/backend/canisters/user/impl/src/queries/events.rs +++ b/backend/canisters/user/impl/src/queries/events.rs @@ -12,14 +12,14 @@ fn events(args: Args) -> Response { fn events_impl(args: Args, state: &RuntimeState) -> Response { if let Some(chat) = state.data.direct_chats.get(&args.user_id.into()) { - let now = state.env.now(); - let events_reader = chat.events.main_events_reader(now); + let events_reader = chat.events.main_events_reader(); let latest_event_index = events_reader.latest_event_index().unwrap(); if args.latest_client_event_index.map_or(false, |e| latest_event_index < e) { return ReplicaNotUpToDate(latest_event_index); } + let now = state.env.now(); let my_user_id = state.env.canister_id().into(); let events: Vec<_> = events_reader.scan( diff --git a/backend/canisters/user/impl/src/queries/events_by_index.rs b/backend/canisters/user/impl/src/queries/events_by_index.rs index 36760c3d88..c2e4aa9258 100644 --- a/backend/canisters/user/impl/src/queries/events_by_index.rs +++ b/backend/canisters/user/impl/src/queries/events_by_index.rs @@ -12,14 +12,14 @@ fn events_by_index(args: Args) -> Response { fn events_by_index_impl(args: Args, state: &RuntimeState) -> Response { if let Some(chat) = state.data.direct_chats.get(&args.user_id.into()) { - let now = state.env.now(); - let events_reader = chat.events.main_events_reader(now); + let events_reader = chat.events.main_events_reader(); let latest_event_index = events_reader.latest_event_index().unwrap(); if args.latest_client_event_index.map_or(false, |e| latest_event_index < e) { return ReplicaNotUpToDate(latest_event_index); } + let now = state.env.now(); let my_user_id = state.env.canister_id().into(); let events = events_reader.get_by_indexes(&args.events, Some(my_user_id)); diff --git a/backend/canisters/user/impl/src/queries/events_window.rs b/backend/canisters/user/impl/src/queries/events_window.rs index 2119fb560d..d9e0699ddd 100644 --- a/backend/canisters/user/impl/src/queries/events_window.rs +++ b/backend/canisters/user/impl/src/queries/events_window.rs @@ -12,14 +12,14 @@ fn events_window(args: Args) -> Response { fn events_window_impl(args: Args, state: &RuntimeState) -> Response { if let Some(chat) = state.data.direct_chats.get(&args.user_id.into()) { - let now = state.env.now(); - let events_reader = chat.events.main_events_reader(now); + let events_reader = chat.events.main_events_reader(); let latest_event_index = events_reader.latest_event_index().unwrap(); if args.latest_client_event_index.map_or(false, |e| latest_event_index < e) { return ReplicaNotUpToDate(latest_event_index); } + let now = state.env.now(); let my_user_id = state.env.canister_id().into(); let events = events_reader.window( args.mid_point.into(), diff --git a/backend/canisters/user/impl/src/queries/initial_state.rs b/backend/canisters/user/impl/src/queries/initial_state.rs index 794155465f..d9b56d3f84 100644 --- a/backend/canisters/user/impl/src/queries/initial_state.rs +++ b/backend/canisters/user/impl/src/queries/initial_state.rs @@ -18,12 +18,7 @@ fn initial_state_impl(args: Args, state: &RuntimeState) -> Response { let blocked_users = state.data.blocked_users.value.iter().copied().collect(); let direct_chats = DirectChatsInitial { - summaries: state - .data - .direct_chats - .iter() - .map(|d| d.to_summary(my_user_id, now)) - .collect(), + summaries: state.data.direct_chats.iter().map(|d| d.to_summary(my_user_id)).collect(), pinned: state.data.direct_chats.pinned().to_vec(), }; @@ -132,8 +127,6 @@ fn hydrate_cached_summary(cached: &GroupCanisterGroupChatSummary, user_details: date_last_pinned: cached.date_last_pinned, date_read_pinned: user_details.messages_read.date_read_pinned.value, events_ttl: cached.events_ttl, - expired_messages: cached.expired_messages.clone(), - next_message_expiry: cached.next_message_expiry, gate: cached.gate.clone(), rules_accepted: cached.rules_accepted, } diff --git a/backend/canisters/user/impl/src/queries/messages_by_message_index.rs b/backend/canisters/user/impl/src/queries/messages_by_message_index.rs index bff5612462..0a685d51ee 100644 --- a/backend/canisters/user/impl/src/queries/messages_by_message_index.rs +++ b/backend/canisters/user/impl/src/queries/messages_by_message_index.rs @@ -12,9 +12,8 @@ fn messages_by_message_index(args: Args) -> Response { fn messages_by_message_index_impl(args: Args, state: &RuntimeState) -> Response { if let Some(chat) = state.data.direct_chats.get(&args.user_id.into()) { let my_user_id = state.env.canister_id().into(); - let now = state.env.now(); - let events_reader = chat.events.main_events_reader(now); + let events_reader = chat.events.main_events_reader(); let latest_event_index = events_reader.latest_event_index().unwrap(); let messages: Vec<_> = args diff --git a/backend/canisters/user/impl/src/queries/updates.rs b/backend/canisters/user/impl/src/queries/updates.rs index 9659465b6b..ef8886225f 100644 --- a/backend/canisters/user/impl/src/queries/updates.rs +++ b/backend/canisters/user/impl/src/queries/updates.rs @@ -54,9 +54,9 @@ fn updates_impl(updates_since: TimestampMillis, state: &RuntimeState) -> Respons for direct_chat in state.data.direct_chats.updated_since(updates_since) { if direct_chat.date_created > updates_since { - direct_chats_added.push(direct_chat.to_summary(my_user_id, now)); + direct_chats_added.push(direct_chat.to_summary(my_user_id)); } else { - direct_chats_updated.push(direct_chat.to_summary_updates(updates_since, my_user_id, now)); + direct_chats_updated.push(direct_chat.to_summary_updates(updates_since, my_user_id)); } } diff --git a/backend/canisters/user/impl/src/timer_job_types.rs b/backend/canisters/user/impl/src/timer_job_types.rs index 775fbee63f..8fb45a53f3 100644 --- a/backend/canisters/user/impl/src/timer_job_types.rs +++ b/backend/canisters/user/impl/src/timer_job_types.rs @@ -13,6 +13,7 @@ pub enum TimerJob { HardDeleteMessageContent(Box), DeleteFileReferences(DeleteFileReferencesJob), MessageReminder(MessageReminderJob), + RemoveExpiredEvents(RemoveExpiredEventsJob), } #[derive(Serialize, Deserialize, Clone)] @@ -44,6 +45,9 @@ pub struct MessageReminderJob { pub reminder_created_message_index: MessageIndex, } +#[derive(Serialize, Deserialize, Clone)] +pub struct RemoveExpiredEventsJob; + impl Job for TimerJob { fn execute(&self) { match self { @@ -51,6 +55,7 @@ impl Job for TimerJob { TimerJob::HardDeleteMessageContent(job) => job.execute(), TimerJob::DeleteFileReferences(job) => job.execute(), TimerJob::MessageReminder(job) => job.execute(), + TimerJob::RemoveExpiredEvents(job) => job.execute(), } } } @@ -82,9 +87,8 @@ impl Job for HardDeleteMessageContentJob { fn execute(&self) { mutate_state(|state| { if let Some(content) = state.data.direct_chats.get_mut(&self.chat_id).and_then(|chat| { - let now = state.env.now(); chat.events - .remove_deleted_message_content(self.thread_root_message_index, self.message_id, now) + .remove_deleted_message_content(self.thread_root_message_index, self.message_id) }) { if self.delete_files { let files_to_delete = content.blob_references(); @@ -129,3 +133,9 @@ impl Job for MessageReminderJob { }); } } + +impl Job for RemoveExpiredEventsJob { + fn execute(&self) { + mutate_state(|state| state.run_event_expiry_job()); + } +} diff --git a/backend/canisters/user/impl/src/updates/c2c_send_messages.rs b/backend/canisters/user/impl/src/updates/c2c_send_messages.rs index 0f4e17a2cb..ef7d37c415 100644 --- a/backend/canisters/user/impl/src/updates/c2c_send_messages.rs +++ b/backend/canisters/user/impl/src/updates/c2c_send_messages.rs @@ -1,14 +1,13 @@ -use crate::timer_job_types::{DeleteFileReferencesJob, TimerJob}; +use crate::updates::send_message::register_timer_jobs; use crate::{mutate_state, read_state, run_regular_jobs, RuntimeState}; use canister_api_macros::update_msgpack; -use canister_timer_jobs::TimerJobs; use canister_tracing_macros::trace; use chat_events::{MessageContentInternal, PushMessageArgs, Reader, ReplyContextInternal}; use ic_cdk_macros::update; use rand::Rng; use types::{ - BlobReference, CanisterId, DirectMessageNotification, EventWrapper, Message, MessageContent, MessageContentInitial, - MessageId, MessageIndex, Notification, TimestampMillis, UserId, + CanisterId, DirectMessageNotification, EventWrapper, Message, MessageContent, MessageContentInitial, MessageId, + MessageIndex, Notification, TimestampMillis, UserId, }; use user_canister::c2c_send_messages::{Response::*, *}; @@ -40,7 +39,7 @@ async fn c2c_send_messages_impl(args: Args) -> Response { if let Some(chat) = state.data.direct_chats.get(&sender_user_id.into()) { if chat .events - .main_events_reader(now) + .main_events_reader() .message_internal(message.message_id.into()) .is_some() { @@ -175,7 +174,7 @@ pub(crate) fn handle_message_impl( mute_notification: bool, state: &mut RuntimeState, ) -> EventWrapper { - let replies_to = convert_reply_context(args.replies_to, sender, args.now, state); + let replies_to = convert_reply_context(args.replies_to, sender, state); let initial_content: MessageContentInitial = args.content.into(); let content = MessageContentInternal::from(initial_content); let files = content.blob_references(); @@ -198,7 +197,21 @@ pub(crate) fn handle_message_impl( .direct_chats .push_message(false, sender, args.sender_message_index, push_message_args, args.is_bot); - register_timer_jobs(&message_event, files, args.now, &mut state.data.timer_jobs); + let mut is_next_event_to_expire = false; + if let Some(expiry) = message_event.expires_at { + is_next_event_to_expire = state.data.next_event_expiry.map_or(true, |ex| expiry < ex); + if is_next_event_to_expire { + state.data.next_event_expiry = Some(expiry); + } + } + + register_timer_jobs( + &message_event, + files, + is_next_event_to_expire, + args.now, + &mut state.data.timer_jobs, + ); if let Some(chat) = state.data.direct_chats.get_mut(&sender.into()) { if args.is_bot { @@ -232,7 +245,6 @@ pub(crate) fn handle_message_impl( fn convert_reply_context( replies_to: Option, sender: UserId, - now: TimestampMillis, state: &RuntimeState, ) -> Option { match replies_to? { @@ -242,7 +254,7 @@ fn convert_reply_context( .data .direct_chats .get(&chat_id) - .and_then(|chat| chat.events.main_events_reader(now).event_index(message_id.into())) + .and_then(|chat| chat.events.main_events_reader().event_index(message_id.into())) .map(|event_index| ReplyContextInternal { chat_if_other: None, event_index, @@ -254,20 +266,3 @@ fn convert_reply_context( }), } } - -fn register_timer_jobs( - message_event: &EventWrapper, - file_references: Vec, - now: TimestampMillis, - timer_jobs: &mut TimerJobs, -) { - if !file_references.is_empty() { - if let Some(expiry) = message_event.expires_at { - timer_jobs.enqueue_job( - TimerJob::DeleteFileReferences(DeleteFileReferencesJob { files: file_references }), - expiry, - now, - ); - } - } -} diff --git a/backend/canisters/user/impl/src/updates/c2c_tip_message.rs b/backend/canisters/user/impl/src/updates/c2c_tip_message.rs index 4a514e7954..079b585949 100644 --- a/backend/canisters/user/impl/src/updates/c2c_tip_message.rs +++ b/backend/canisters/user/impl/src/updates/c2c_tip_message.rs @@ -37,7 +37,7 @@ fn c2c_tip_message_impl(args: Args, state: &mut RuntimeState) -> Response { ) { if let Some(event) = chat .events - .main_events_reader(now) + .main_events_reader() .message_event_internal(args.message_id.into()) { let notification = Notification::DirectMessageTipped(DirectMessageTipped { diff --git a/backend/canisters/user/impl/src/updates/c2c_toggle_reaction.rs b/backend/canisters/user/impl/src/updates/c2c_toggle_reaction.rs index d52aa1160f..afbacee54a 100644 --- a/backend/canisters/user/impl/src/updates/c2c_toggle_reaction.rs +++ b/backend/canisters/user/impl/src/updates/c2c_toggle_reaction.rs @@ -3,7 +3,7 @@ use crate::{mutate_state, run_regular_jobs, RuntimeState}; use canister_api_macros::update_msgpack; use canister_tracing_macros::trace; use chat_events::{AddRemoveReactionArgs, AddRemoveReactionResult, Reader}; -use types::{DirectReactionAddedNotification, EventIndex, Notification, TimestampMillis, UserId}; +use types::{DirectReactionAddedNotification, EventIndex, Notification, UserId}; use user_canister::c2c_toggle_reaction::{Response::*, *}; #[update_msgpack] @@ -40,7 +40,7 @@ fn c2c_toggle_reaction_impl(args: Args, state: &mut RuntimeState) -> Response { match chat.events.add_reaction(add_remove_reaction_args) { AddRemoveReactionResult::Success => { if !state.data.suspended.value { - if let Some((recipient, notification)) = build_notification(args, chat, now) { + if let Some((recipient, notification)) = build_notification(args, chat) { state.push_notification(recipient, notification); } } @@ -70,7 +70,6 @@ fn build_notification( .. }: Args, chat: &DirectChat, - now: TimestampMillis, ) -> Option<(UserId, Notification)> { if username.is_empty() || chat.notifications_muted.value { return None; @@ -78,7 +77,7 @@ fn build_notification( let message_event = chat .events - .main_events_reader(now) + .main_events_reader() .message_event(message_id.into(), None) .filter(|m| m.event.sender != chat.them)?; diff --git a/backend/canisters/user/impl/src/updates/mark_read.rs b/backend/canisters/user/impl/src/updates/mark_read.rs index 2ad7fc0283..8318b7d732 100644 --- a/backend/canisters/user/impl/src/updates/mark_read.rs +++ b/backend/canisters/user/impl/src/updates/mark_read.rs @@ -32,7 +32,7 @@ fn mark_read_impl(args: Args, state: &mut RuntimeState) -> Response { if read_up_to <= direct_chat .events - .main_events_reader(now) + .main_events_reader() .latest_message_index() .unwrap_or_default() && direct_chat.mark_read_up_to(read_up_to, true, now) diff --git a/backend/canisters/user/impl/src/updates/send_message.rs b/backend/canisters/user/impl/src/updates/send_message.rs index da6a0e4f8a..1f01654ef6 100644 --- a/backend/canisters/user/impl/src/updates/send_message.rs +++ b/backend/canisters/user/impl/src/updates/send_message.rs @@ -1,16 +1,17 @@ use crate::crypto::process_transaction_without_caller_check; use crate::guards::caller_is_owner; -use crate::timer_job_types::RetrySendingFailedMessagesJob; +use crate::timer_job_types::{DeleteFileReferencesJob, RemoveExpiredEventsJob, RetrySendingFailedMessagesJob}; use crate::{mutate_state, read_state, run_regular_jobs, RuntimeState, TimerJob}; use candid::Principal; +use canister_timer_jobs::TimerJobs; use canister_tracing_macros::trace; use chat_events::{PushMessageArgs, Reader}; use ic_cdk_macros::update; use rand::Rng; use tracing::error; use types::{ - CanisterId, CompletedCryptoTransaction, ContentValidationError, CryptoTransaction, MessageContentInitial, MessageIndex, - UserId, + BlobReference, CanisterId, CompletedCryptoTransaction, ContentValidationError, CryptoTransaction, EventWrapper, Message, + MessageContentInitial, MessageIndex, TimestampMillis, UserId, }; use user_canister::c2c_send_messages; use user_canister::c2c_send_messages::{C2CReplyContext, SendMessageArgs}; @@ -178,6 +179,22 @@ fn send_message_impl( .direct_chats .push_message(true, recipient, None, push_message_args, user_type.is_bot()); + let mut is_next_event_to_expire = false; + if let Some(expiry) = message_event.expires_at { + is_next_event_to_expire = state.data.next_event_expiry.map_or(true, |ex| expiry < ex); + if is_next_event_to_expire { + state.data.next_event_expiry = Some(expiry); + } + } + + register_timer_jobs( + &message_event, + Vec::new(), + is_next_event_to_expire, + now, + &mut state.data.timer_jobs, + ); + if !user_type.is_self() { let send_message_args = SendMessageArgs { message_id: args.message_id, @@ -193,7 +210,7 @@ fn send_message_impl( .get(&args.recipient.into()) .and_then(|chat| { chat.events - .main_events_reader(now) + .main_events_reader() .message_internal(r.event_index.into()) .map(|m| m.message_id) }) @@ -327,3 +344,26 @@ async fn send_to_bot_canister(recipient: UserId, message_index: MessageIndex, ar } } } + +pub(crate) fn register_timer_jobs( + message_event: &EventWrapper, + file_references: Vec, + is_next_event_to_expire: bool, + now: TimestampMillis, + timer_jobs: &mut TimerJobs, +) { + if !file_references.is_empty() { + if let Some(expiry) = message_event.expires_at { + timer_jobs.enqueue_job( + TimerJob::DeleteFileReferences(DeleteFileReferencesJob { files: file_references }), + expiry, + now, + ); + } + } + + if let Some(expiry) = message_event.expires_at.filter(|_| is_next_event_to_expire) { + timer_jobs.cancel_jobs(|j| matches!(j, TimerJob::RemoveExpiredEvents(_))); + timer_jobs.enqueue_job(TimerJob::RemoveExpiredEvents(RemoveExpiredEventsJob), expiry, now); + } +} diff --git a/backend/canisters/user/impl/src/updates/undelete_messages.rs b/backend/canisters/user/impl/src/updates/undelete_messages.rs index 160008ae56..8497e87046 100644 --- a/backend/canisters/user/impl/src/updates/undelete_messages.rs +++ b/backend/canisters/user/impl/src/updates/undelete_messages.rs @@ -39,7 +39,7 @@ fn undelete_messages_impl(args: Args, state: &mut RuntimeState) -> Response { .filter_map(|(message_id, result)| matches!(result, UndeleteMessageResult::Success).then_some(message_id)) .collect(); - let events_reader = chat.events.main_events_reader(now); + let events_reader = chat.events.main_events_reader(); let messages: Vec<_> = deleted .iter() diff --git a/backend/integration_tests/src/client/community.rs b/backend/integration_tests/src/client/community.rs index e1939b35a4..48b03333e8 100644 --- a/backend/integration_tests/src/client/community.rs +++ b/backend/integration_tests/src/client/community.rs @@ -139,11 +139,11 @@ pub mod happy_path { pub fn update_channel( env: &mut StateMachine, - sender: &User, + sender: Principal, community_id: CommunityId, args: &community_canister::update_channel::Args, ) { - let response = super::update_channel(env, sender.principal, community_id.into(), args); + let response = super::update_channel(env, sender, community_id.into(), args); match response { community_canister::update_channel::Response::SuccessV2(_) => {} diff --git a/backend/integration_tests/src/communities/disappearing_message_tests.rs b/backend/integration_tests/src/communities/disappearing_message_tests.rs new file mode 100644 index 0000000000..9bdd5d8648 --- /dev/null +++ b/backend/integration_tests/src/communities/disappearing_message_tests.rs @@ -0,0 +1,100 @@ +use crate::env::ENV; +use crate::rng::random_string; +use crate::{client, TestEnv}; +use std::ops::Deref; +use std::time::Duration; +use types::OptionUpdate; + +#[test] +fn disappearing_messages_in_channel() { + let mut wrapper = ENV.deref().get(); + let TestEnv { + env, + canister_ids, + controller, + .. + } = wrapper.env(); + + let user = client::register_diamond_user(env, canister_ids, *controller); + let community_id = client::user::happy_path::create_community(env, &user, &random_string(), true, vec![random_string()]); + let channel_id = client::community::happy_path::create_channel(env, user.principal, community_id, true, random_string()); + + client::community::happy_path::update_channel( + env, + user.principal, + community_id, + &community_canister::update_channel::Args { + channel_id, + name: None, + description: None, + rules: None, + avatar: OptionUpdate::NoChange, + permissions: None, + events_ttl: OptionUpdate::SetToSome(1000), + gate: OptionUpdate::NoChange, + public: None, + }, + ); + + let send_message_response1 = + client::community::happy_path::send_text_message(env, &user, community_id, channel_id, None, "abc", None); + + assert!(client::community::happy_path::events_by_index( + env, + &user, + community_id, + channel_id, + vec![send_message_response1.event_index] + ) + .events + .first() + .is_some()); + + env.advance_time(Duration::from_millis(2000)); + env.tick(); + + assert!(client::community::happy_path::events_by_index( + env, + &user, + community_id, + channel_id, + vec![send_message_response1.event_index] + ) + .events + .first() + .is_none()); + + client::community::happy_path::update_channel( + env, + user.principal, + community_id, + &community_canister::update_channel::Args { + channel_id, + name: None, + description: None, + rules: None, + avatar: OptionUpdate::NoChange, + permissions: None, + events_ttl: OptionUpdate::SetToNone, + gate: OptionUpdate::NoChange, + public: None, + }, + ); + + let send_message_response2 = + client::community::happy_path::send_text_message(env, &user, community_id, channel_id, None, "xyz", None); + + env.advance_time(Duration::from_secs(100000)); + env.tick(); + + assert!(client::community::happy_path::events_by_index( + env, + &user, + community_id, + channel_id, + vec![send_message_response2.event_index] + ) + .events + .first() + .is_some()); +} diff --git a/backend/integration_tests/src/communities/mod.rs b/backend/integration_tests/src/communities/mod.rs index c547d50296..668f1c9312 100644 --- a/backend/integration_tests/src/communities/mod.rs +++ b/backend/integration_tests/src/communities/mod.rs @@ -1,5 +1,6 @@ mod convert_group_into_community_tests; mod create_channel_tests; +mod disappearing_message_tests; mod import_group_tests; mod join_channel_tests; mod join_community_tests; diff --git a/backend/integration_tests/src/communities/send_message_tests.rs b/backend/integration_tests/src/communities/send_message_tests.rs index f92b721ea8..640546a4a4 100644 --- a/backend/integration_tests/src/communities/send_message_tests.rs +++ b/backend/integration_tests/src/communities/send_message_tests.rs @@ -201,7 +201,7 @@ fn send_message_with_community_rules_not_accepted_fails() { channel_id, } = init_test_data(env, canister_ids, *controller); - set_community_rules(env, &user1, community_id, "No heavy petting".to_string()); + set_community_rules(env, user1.principal, community_id, "No heavy petting".to_string()); let response = send_dummy_message_with_rules(env, &user2, community_id, channel_id, None, None); @@ -231,7 +231,7 @@ fn send_message_with_channel_rules_not_accepted_fails() { channel_id, } = init_test_data(env, canister_ids, *controller); - set_channel_rules(env, &user1, community_id, channel_id, "No running".to_string()); + set_channel_rules(env, user1.principal, community_id, channel_id, "No running".to_string()); let response = send_dummy_message_with_rules(env, &user2, community_id, channel_id, None, None); @@ -258,7 +258,7 @@ fn send_message_with_community_rules_accepted_succeeds() { channel_id, } = init_test_data(env, canister_ids, *controller); - set_community_rules(env, &user1, community_id, "No heavy petting".to_string()); + set_community_rules(env, user1.principal, community_id, "No heavy petting".to_string()); let response = send_dummy_message_with_rules(env, &user2, community_id, channel_id, Some(Version::from(1)), None); @@ -285,7 +285,7 @@ fn send_message_with_channel_rules_accepted_succeeds() { channel_id, } = init_test_data(env, canister_ids, *controller); - set_channel_rules(env, &user1, community_id, channel_id, "No running".to_string()); + set_channel_rules(env, user1.principal, community_id, channel_id, "No running".to_string()); let response = send_dummy_message_with_rules(env, &user2, community_id, channel_id, None, Some(Version::from(1))); @@ -312,8 +312,8 @@ fn send_message_with_community_rules_but_not_channel_rules_accepted_fails() { channel_id, } = init_test_data(env, canister_ids, *controller); - set_community_rules(env, &user1, community_id, "No heavy petting".to_string()); - set_channel_rules(env, &user1, community_id, channel_id, "No running".to_string()); + set_community_rules(env, user1.principal, community_id, "No heavy petting".to_string()); + set_channel_rules(env, user1.principal, community_id, channel_id, "No running".to_string()); let response = send_dummy_message_with_rules(env, &user2, community_id, channel_id, Some(Version::from(1)), None); @@ -340,8 +340,8 @@ fn send_message_with_channel_rules_but_not_community_rules_accepted_fails() { channel_id, } = init_test_data(env, canister_ids, *controller); - set_community_rules(env, &user1, community_id, "No heavy petting".to_string()); - set_channel_rules(env, &user1, community_id, channel_id, "No running".to_string()); + set_community_rules(env, user1.principal, community_id, "No heavy petting".to_string()); + set_channel_rules(env, user1.principal, community_id, channel_id, "No running".to_string()); let response = send_dummy_message_with_rules(env, &user2, community_id, channel_id, None, Some(Version::from(1))); @@ -371,8 +371,8 @@ fn send_message_with_community_rules_and_channel_rules_accepted_succeeds() { channel_id, } = init_test_data(env, canister_ids, *controller); - set_community_rules(env, &user1, community_id, "No heavy petting".to_string()); - set_channel_rules(env, &user1, community_id, channel_id, "No running".to_string()); + set_community_rules(env, user1.principal, community_id, "No heavy petting".to_string()); + set_channel_rules(env, user1.principal, community_id, channel_id, "No running".to_string()); let response = send_dummy_message_with_rules( env, @@ -406,8 +406,8 @@ fn send_message_with_previously_accepted_rules_succeeds() { channel_id, } = init_test_data(env, canister_ids, *controller); - set_community_rules(env, &user1, community_id, "No heavy petting".to_string()); - set_channel_rules(env, &user1, community_id, channel_id, "No running".to_string()); + set_community_rules(env, user1.principal, community_id, "No heavy petting".to_string()); + set_channel_rules(env, user1.principal, community_id, channel_id, "No running".to_string()); send_dummy_message_with_rules( env, @@ -443,8 +443,8 @@ fn send_message_with_old_community_rules_accepted_fails() { channel_id, } = init_test_data(env, canister_ids, *controller); - set_community_rules(env, &user1, community_id, "No heavy petting".to_string()); - set_community_rules(env, &user1, community_id, "No heavy petting or pets".to_string()); + set_community_rules(env, user1.principal, community_id, "No heavy petting".to_string()); + set_community_rules(env, user1.principal, community_id, "No heavy petting or pets".to_string()); let response = send_dummy_message_with_rules( env, @@ -481,8 +481,14 @@ fn send_message_with_old_channel_rules_accepted_fails() { channel_id, } = init_test_data(env, canister_ids, *controller); - set_channel_rules(env, &user1, community_id, channel_id, "No running".to_string()); - set_channel_rules(env, &user1, community_id, channel_id, "No running or jumping".to_string()); + set_channel_rules(env, user1.principal, community_id, channel_id, "No running".to_string()); + set_channel_rules( + env, + user1.principal, + community_id, + channel_id, + "No running or jumping".to_string(), + ); let response = send_dummy_message_with_rules(env, &user2, community_id, channel_id, None, Some(Version::from(1))); @@ -532,8 +538,8 @@ fn send_message_with_rules_leads_to_expected_summary_and_selected_states() { } ); - set_community_rules(env, &user1, community_id, "No running".to_string()); - set_channel_rules(env, &user1, community_id, channel_id, "No jumping".to_string()); + set_community_rules(env, user1.principal, community_id, "No running".to_string()); + set_channel_rules(env, user1.principal, community_id, channel_id, "No jumping".to_string()); let community_rules = get_community_rules(env, &user2, community_id); let channel_rules = get_channel_rules(env, &user2, community_id, channel_id); @@ -671,7 +677,7 @@ fn init_test_data(env: &mut StateMachine, canister_ids: &CanisterIds, controller } } -fn set_community_rules(env: &mut StateMachine, user: &User, community_id: CommunityId, text: String) { +fn set_community_rules(env: &mut StateMachine, sender: Principal, community_id: CommunityId, text: String) { let args = community_canister::update_community::Args { name: None, description: None, @@ -688,10 +694,16 @@ fn set_community_rules(env: &mut StateMachine, user: &User, community_id: Commun primary_language: None, }; - client::community::happy_path::update_community(env, user.principal, community_id, &args); + client::community::happy_path::update_community(env, sender, community_id, &args); } -fn set_channel_rules(env: &mut StateMachine, user: &User, community_id: CommunityId, channel_id: ChannelId, text: String) { +fn set_channel_rules( + env: &mut StateMachine, + sender: Principal, + community_id: CommunityId, + channel_id: ChannelId, + text: String, +) { let args = community_canister::update_channel::Args { name: None, description: None, @@ -702,12 +714,13 @@ fn set_channel_rules(env: &mut StateMachine, user: &User, community_id: Communit }), avatar: OptionUpdate::NoChange, permissions: None, + events_ttl: OptionUpdate::NoChange, gate: OptionUpdate::NoChange, public: None, channel_id, }; - client::community::happy_path::update_channel(env, user, community_id, &args); + client::community::happy_path::update_channel(env, sender, community_id, &args); } struct TestData { diff --git a/backend/integration_tests/src/communities/update_channel_tests.rs b/backend/integration_tests/src/communities/update_channel_tests.rs index d57b0883ac..8c4e473901 100644 --- a/backend/integration_tests/src/communities/update_channel_tests.rs +++ b/backend/integration_tests/src/communities/update_channel_tests.rs @@ -28,7 +28,7 @@ fn make_private_channel_public_succeeds() { client::local_user_index::happy_path::join_community(env, user2.principal, canister_ids.local_user_index, community_id); client::community::happy_path::update_channel( env, - &user1, + user1.principal, community_id, &community_canister::update_channel::Args { channel_id, @@ -37,6 +37,7 @@ fn make_private_channel_public_succeeds() { rules: None, avatar: OptionUpdate::NoChange, permissions: None, + events_ttl: OptionUpdate::NoChange, gate: OptionUpdate::NoChange, public: Some(true), }, diff --git a/backend/integration_tests/src/disappearing_message_tests.rs b/backend/integration_tests/src/disappearing_message_tests.rs index 1bf4d526e7..59284beb13 100644 --- a/backend/integration_tests/src/disappearing_message_tests.rs +++ b/backend/integration_tests/src/disappearing_message_tests.rs @@ -1,24 +1,17 @@ use crate::env::ENV; use crate::rng::random_string; -use crate::{client, CanisterIds, TestEnv, User}; -use candid::Principal; -use ic_test_state_machine_client::StateMachine; -use itertools::Itertools; +use crate::{client, TestEnv}; use std::ops::Deref; use std::time::Duration; -use types::{ChatId, OptionUpdate}; +use types::OptionUpdate; #[test] fn disappearing_messages_in_group_chats() { let mut wrapper = ENV.deref().get(); - let TestEnv { - env, - canister_ids, - controller, - .. - } = wrapper.env(); + let TestEnv { env, canister_ids, .. } = wrapper.env(); - let TestData { user, group_id } = init_test_data(env, canister_ids, *controller, true); + let user = client::local_user_index::happy_path::register_user(env, canister_ids.local_user_index); + let group_id = client::user::happy_path::create_group(env, &user, &random_string(), false, true); client::group::update_group_v2( env, @@ -71,95 +64,3 @@ fn disappearing_messages_in_group_chats() { .is_some() ); } - -#[test] -fn group_chat_summary_contains_expired_messages() { - let mut wrapper = ENV.deref().get(); - let TestEnv { - env, - canister_ids, - controller, - .. - } = wrapper.env(); - - let TestData { user, group_id } = init_test_data(env, canister_ids, *controller, true); - - client::group::update_group_v2( - env, - user.principal, - group_id.into(), - &group_canister::update_group_v2::Args { - events_ttl: OptionUpdate::SetToSome(1000), - ..Default::default() - }, - ); - - let send_message_response1 = client::group::happy_path::send_text_message(env, &user, group_id, None, "abc", None); - env.advance_time(Duration::from_millis(400)); - let send_message_response2 = client::group::happy_path::send_text_message(env, &user, group_id, None, "def", None); - env.advance_time(Duration::from_millis(400)); - let send_message_response3 = client::group::happy_path::send_text_message(env, &user, group_id, None, "ghi", None); - env.advance_time(Duration::from_millis(400)); - let send_message_response4 = client::group::happy_path::send_text_message(env, &user, group_id, None, "jkl", None); - env.advance_time(Duration::from_millis(400)); - let send_message_response5 = client::group::happy_path::send_text_message(env, &user, group_id, None, "mno", None); - env.advance_time(Duration::from_millis(400)); - let send_message_response6 = client::group::happy_path::send_text_message(env, &user, group_id, None, "mno", None); - - let summary = client::group::happy_path::summary(env, &user, group_id); - let summary_timestamp = send_message_response6.timestamp; - - assert_eq!(summary.events_ttl, Some(1000)); - assert_eq!( - summary.expired_messages.iter().collect_vec(), - vec![ - send_message_response1.message_index, - send_message_response2.message_index, - send_message_response3.message_index - ] - ); - assert_eq!(summary.next_message_expiry, send_message_response4.expires_at); - - env.advance_time(Duration::from_millis(2000)); - - client::group::update_group_v2( - env, - user.principal, - group_id.into(), - &group_canister::update_group_v2::Args { - events_ttl: OptionUpdate::SetToSome(2000), - ..Default::default() - }, - ); - let send_message_response7 = client::group::happy_path::send_text_message(env, &user, group_id, None, "pqr", None); - let summary_updates = client::group::happy_path::summary_updates(env, &user, group_id, summary_timestamp).unwrap(); - - assert_eq!(summary_updates.events_ttl.expand(), Some(Some(2000))); - assert_eq!( - summary_updates.newly_expired_messages.iter().collect_vec(), - vec![ - send_message_response4.message_index, - send_message_response5.message_index, - send_message_response6.message_index - ] - ); - assert_eq!( - summary_updates.next_message_expiry.expand(), - Some(send_message_response7.expires_at) - ); -} - -fn init_test_data(env: &mut StateMachine, canister_ids: &CanisterIds, controller: Principal, public: bool) -> TestData { - let user = client::register_diamond_user(env, canister_ids, controller); - - let group_name = random_string(); - - let group_id = client::user::happy_path::create_group(env, &user, &group_name, public, true); - - TestData { user, group_id } -} - -struct TestData { - user: User, - group_id: ChatId, -} diff --git a/backend/libraries/chat_events/src/chat_event_internal.rs b/backend/libraries/chat_events/src/chat_event_internal.rs index a9cb8d0cd7..6d8af7a082 100644 --- a/backend/libraries/chat_events/src/chat_event_internal.rs +++ b/backend/libraries/chat_events/src/chat_event_internal.rs @@ -486,8 +486,17 @@ impl ThreadSummaryInternal { } } - pub fn followers(&self) -> HashSet { - HashSet::from_iter(self.follower_ids.iter().filter(|(_, t)| t.value).map(|(user_id, _)| *user_id)) + pub fn participants_and_followers(&self, include_unfollowed: bool) -> Vec { + self.participant_ids + .iter() + .copied() + .chain( + self.follower_ids + .iter() + .filter(|(_, t)| include_unfollowed || t.value) + .map(|(user_id, _)| *user_id), + ) + .collect() } pub fn get_follower(&self, user_id: UserId) -> Option> { diff --git a/backend/libraries/chat_events/src/chat_events.rs b/backend/libraries/chat_events/src/chat_events.rs index 3a68b2e42b..331ca9e08a 100644 --- a/backend/libraries/chat_events/src/chat_events.rs +++ b/backend/libraries/chat_events/src/chat_events.rs @@ -18,8 +18,8 @@ use types::{ EventWrapper, EventsTimeToLiveUpdated, GroupCanisterThreadDetails, GroupCreated, GroupFrozen, GroupUnfrozen, Hash, HydratedMention, Mention, Message, MessageContentInitial, MessageId, MessageIndex, MessageMatch, MessageReport, Milliseconds, MultiUserChat, PendingCryptoTransaction, PollVotes, PrizeWinnerContent, ProposalUpdate, PushEventResult, - PushIfNotContains, RangeSet, Reaction, RegisterVoteResult, ReportedMessageInternal, TimestampMillis, TimestampNanos, - Timestamped, Tips, UserId, VoteOperation, + PushIfNotContains, Reaction, RegisterVoteResult, ReportedMessageInternal, TimestampMillis, TimestampNanos, Timestamped, + Tips, UserId, VoteOperation, }; pub const OPENCHAT_BOT_USER_ID: UserId = UserId::new(Principal::from_slice(&[228, 104, 142, 9, 133, 211, 135, 217, 129, 1])); @@ -159,7 +159,6 @@ impl ChatEvents { args.min_visible_event_index, args.thread_root_message_index, args.message_id.into(), - args.now, ) { if message.sender == args.sender { if !matches!(message.content, MessageContentInternal::Deleted(_)) { @@ -215,7 +214,6 @@ impl ChatEvents { args.min_visible_event_index, args.thread_root_message_index, args.message_id.into(), - args.now, ) { if message.sender == args.caller || args.is_admin { if message.deleted_by.is_some() { @@ -267,7 +265,6 @@ impl ChatEvents { args.min_visible_event_index, args.thread_root_message_index, args.message_id.into(), - args.now, ) { if let Some(deleted_by) = message.deleted_by.as_ref().map(|db| db.deleted_by) { if deleted_by == args.caller || (args.is_admin && message.sender != deleted_by) { @@ -316,10 +313,8 @@ impl ChatEvents { &mut self, thread_root_message_index: Option, message_id: MessageId, - now: TimestampMillis, ) -> Option { - let (message, _) = - self.message_internal_mut(EventIndex::default(), thread_root_message_index, message_id.into(), now)?; + let (message, _) = self.message_internal_mut(EventIndex::default(), thread_root_message_index, message_id.into())?; let deleted_by = message.deleted_by.clone()?; @@ -334,7 +329,6 @@ impl ChatEvents { args.min_visible_event_index, args.thread_root_message_index, args.message_index.into(), - args.now, ) { if let MessageContentInternal::Poll(p) = &mut message.content { return match p.register_vote(args.user_id, args.option_index, args.operation) { @@ -389,7 +383,7 @@ impl ChatEvents { now: TimestampMillis, ) -> EndPollResult { if let Some((message, event_index)) = - self.message_internal_mut(EventIndex::default(), thread_root_message_index, message_index.into(), now) + self.message_internal_mut(EventIndex::default(), thread_root_message_index, message_index.into()) { if let MessageContentInternal::Poll(p) = &mut message.content { return if p.ended || p.config.end_date.is_none() { @@ -414,10 +408,7 @@ impl ChatEvents { message_index: MessageIndex, now_nanos: TimestampNanos, ) -> Option { - let now_ms = now_nanos / 1_000_000; - if let Some(message) = - self.message_internal(EventIndex::default(), thread_root_message_index, message_index.into(), now_ms) - { + if let Some(message) = self.message_internal(EventIndex::default(), thread_root_message_index, message_index.into()) { if let MessageContentInternal::Prize(p) = &message.content { if let CryptoTransaction::Completed(t) = &p.transaction { let fee = t.fee(); @@ -445,10 +436,9 @@ impl ChatEvents { min_visible_event_index: EventIndex, message_index: MessageIndex, adopt: bool, - now: TimestampMillis, ) -> RecordProposalVoteResult { if let Some(proposal) = self - .message_internal_mut(min_visible_event_index, None, message_index.into(), now) + .message_internal_mut(min_visible_event_index, None, message_index.into()) .and_then( |(m, _)| { if let MessageContentInternal::GovernanceProposal(p) = &mut m.content { @@ -477,7 +467,7 @@ impl ChatEvents { pub fn update_proposals(&mut self, user_id: UserId, updates: Vec, now: TimestampMillis) { for update in updates { if let Some((message, event_index)) = - self.message_internal_mut(EventIndex::default(), None, update.message_id.into(), now) + self.message_internal_mut(EventIndex::default(), None, update.message_id.into()) { if message.sender == user_id { if let MessageContentInternal::GovernanceProposal(p) = &mut message.content { @@ -500,7 +490,6 @@ impl ChatEvents { args.min_visible_event_index, args.thread_root_message_index, args.message_id.into(), - args.now, ) { let added = if let Some((_, users)) = message.reactions.iter_mut().find(|(r, _)| *r == args.reaction) { users.insert(args.user_id) @@ -538,7 +527,6 @@ impl ChatEvents { args.min_visible_event_index, args.thread_root_message_index, args.message_id.into(), - args.now, ) { let (removed, is_empty) = message .reactions @@ -580,7 +568,6 @@ impl ChatEvents { min_visible_event_index, args.thread_root_message_index, args.message_id.into(), - args.now, ) { if message.sender == args.user_id { return CannotTipSelf; @@ -615,7 +602,7 @@ impl ChatEvents { user_id: UserId, now: TimestampMillis, ) -> ReservePrizeResult { - if let Some((message, event_index)) = self.message_internal_mut(min_visible_event_index, None, message_id.into(), now) { + if let Some((message, event_index)) = self.message_internal_mut(min_visible_event_index, None, message_id.into()) { if let MessageContentInternal::Prize(content) = &mut message.content { if content.end_date < now { return ReservePrizeResult::PrizeEnded; @@ -654,9 +641,7 @@ impl ChatEvents { rng: &mut StdRng, now: TimestampMillis, ) -> ClaimPrizeResult { - if let Some((message, event_index)) = - self.message_internal_mut(EventIndex::default(), None, message_id.into(), TimestampMillis::default()) - { + if let Some((message, event_index)) = self.message_internal_mut(EventIndex::default(), None, message_id.into()) { if let MessageContentInternal::Prize(content) = &mut message.content { // Remove the reservation if content.reservations.remove(&winner) { @@ -700,7 +685,7 @@ impl ChatEvents { amount: Tokens, now: TimestampMillis, ) -> UnreservePrizeResult { - if let Some((message, event_index)) = self.message_internal_mut(EventIndex::default(), None, message_id.into(), now) { + if let Some((message, event_index)) = self.message_internal_mut(EventIndex::default(), None, message_id.into()) { if let MessageContentInternal::Prize(content) = &mut message.content { // Remove the reservation if content.reservations.remove(&user_id) { @@ -763,7 +748,7 @@ impl ChatEvents { let message_id_bytes: [u8; 16] = hash[..16].try_into().unwrap(); let message_id: MessageId = u128::from_be_bytes(message_id_bytes).into(); - if let Some((message, index)) = self.message_internal_mut(EventIndex::default(), None, message_id.into(), now) { + if let Some((message, index)) = self.message_internal_mut(EventIndex::default(), None, message_id.into()) { if let MessageContentInternal::ReportedMessage(r) = &mut message.content { r.reports.retain(|x| x.reported_by != user_id); r.reports.push(MessageReport { @@ -825,7 +810,7 @@ impl ChatEvents { use FollowThreadResult::*; if let Some((root_message, event_index)) = - self.message_internal_mut(min_visible_event_index, None, thread_root_message_index.into(), now) + self.message_internal_mut(min_visible_event_index, None, thread_root_message_index.into()) { if let Some(summary) = &mut root_message.thread_summary { if !summary.participant_ids.contains(&user_id) && summary.set_follow(user_id, now, true) { @@ -851,7 +836,7 @@ impl ChatEvents { use UnfollowThreadResult::*; if let Some((root_message, event_index)) = - self.message_internal_mut(min_visible_event_index, None, thread_root_message_index.into(), now) + self.message_internal_mut(min_visible_event_index, None, thread_root_message_index.into()) { if let Some(summary) = &mut root_message.thread_summary { if summary.set_follow(user_id, now, false) { @@ -876,7 +861,7 @@ impl ChatEvents { now: TimestampMillis, ) { let (root_message, event_index) = self - .message_internal_mut(EventIndex::default(), None, thread_root_message_index.into(), now) + .message_internal_mut(EventIndex::default(), None, thread_root_message_index.into()) .unwrap_or_else(|| panic!("Root thread message not found with message index {thread_root_message_index:?}")); root_message.last_updated = Some(now); @@ -933,15 +918,12 @@ impl ChatEvents { &mut self.main }; - let maybe_message_index = event.as_message().map(|m| m.message_index); let event_index = events_list.push_event(event, correlation_id, expires_at, now); if let Some(timestamp) = expires_at { - self.expiring_events.insert(event_index, maybe_message_index, timestamp); + self.expiring_events.insert(event_index, timestamp); } - self.remove_expired_events(now); - PushEventResult { index: event_index, timestamp: now, @@ -975,7 +957,7 @@ impl ChatEvents { max_results: u8, my_user_id: UserId, ) -> Vec { - self.visible_main_events_reader(min_visible_event_index, now) + self.visible_main_events_reader(min_visible_event_index) .iter(None, true) .filter_map(|e| e.event.as_message().filter(|m| m.deleted_by.is_none()).map(|m| (e, m))) .filter(|(_, m)| if query.users.is_empty() { true } else { query.users.contains(&m.sender) }) @@ -1019,8 +1001,7 @@ impl ChatEvents { } pub fn mark_message_reminder_created_message_hidden(&mut self, message_index: MessageIndex, now: TimestampMillis) -> bool { - if let Some((message, event_index)) = self.message_internal_mut(EventIndex::default(), None, message_index.into(), now) - { + if let Some((message, event_index)) = self.message_internal_mut(EventIndex::default(), None, message_index.into()) { if let MessageContentInternal::MessageReminderCreated(r) = &mut message.content { r.hidden = true; message.last_updated = Some(now); @@ -1031,13 +1012,8 @@ impl ChatEvents { false } - pub fn hydrate_mention( - &self, - min_visible_event_index: EventIndex, - mention: &Mention, - now: TimestampMillis, - ) -> Option { - let events_reader = self.events_reader(min_visible_event_index, mention.thread_root_message_index, now)?; + pub fn hydrate_mention(&self, min_visible_event_index: EventIndex, mention: &Mention) -> Option { + let events_reader = self.events_reader(min_visible_event_index, mention.thread_root_message_index)?; events_reader.hydrate_mention(mention) } @@ -1051,17 +1027,12 @@ impl ChatEvents { .filter(|m| if let Some(since) = if_updated_since { m.last_active > since } else { true }) } - pub fn event_count_since bool>( - &self, - since: TimestampMillis, - now: TimestampMillis, - filter: F, - ) -> usize { - self.main.event_count_since(since, now, &filter) + pub fn event_count_since bool>(&self, since: TimestampMillis, filter: F) -> usize { + self.main.event_count_since(since, &filter) + self .threads .values() - .map(|e| e.event_count_since(since, now, &filter)) + .map(|e| e.event_count_since(since, &filter)) .sum::() } @@ -1070,10 +1041,9 @@ impl ChatEvents { min_visible_event_index: EventIndex, thread_root_message_index: Option, event_key: EventKey, - now: TimestampMillis, ) -> bool { - if let Some(events_list) = self.events_reader(min_visible_event_index, thread_root_message_index, now) { - events_list.is_accessible(event_key, min_visible_event_index, now) + if let Some(events_list) = self.events_reader(min_visible_event_index, thread_root_message_index) { + events_list.is_accessible(event_key, min_visible_event_index) } else { false } @@ -1086,7 +1056,6 @@ impl ChatEvents { updated_since: Option, max_threads: usize, my_user_id: UserId, - now: TimestampMillis, ) -> Vec { root_message_indexes .filter_map(|root_message_index| { @@ -1095,7 +1064,7 @@ impl ChatEvents { let latest_event = thread_events.latest_event_index()?; let follower = self .main - .get((*root_message_index).into(), min_visible_event_index, now)? + .get((*root_message_index).into(), min_visible_event_index)? .event .as_message()? .thread_summary @@ -1132,7 +1101,7 @@ impl ChatEvents { let mut unfollowed = Vec::new(); for message_index in root_message_indexes.rev() { - if let Some(wrapped_event) = self.main.get((*message_index).into(), EventIndex::default(), 0) { + if let Some(wrapped_event) = self.main.get((*message_index).into(), EventIndex::default()) { if let Some(message) = wrapped_event.event.as_message() { if let Some(thread_summary) = message.thread_summary.as_ref() { if let Some(follower) = thread_summary.get_follower(my_user_id) { @@ -1197,55 +1166,51 @@ impl ChatEvents { ); } - pub fn next_message_expiry(&self, now: TimestampMillis) -> Option { - self.expiring_events.next_message_expiry(now) + pub fn next_event_expiry(&self) -> Option { + self.expiring_events.next_event_expiry() } - pub fn expired_messages(&self, now: TimestampMillis) -> RangeSet { - self.expiring_events.expired_messages(now) - } + pub fn remove_expired_events(&mut self, now: TimestampMillis) -> RemoveExpiredEventsResult { + let mut result = RemoveExpiredEventsResult::default(); - pub fn expired_messages_since(&self, since: TimestampMillis, now: TimestampMillis) -> RangeSet { - self.expiring_events.expired_messages_since(since, now) - } - - fn remove_expired_events(&mut self, now: TimestampMillis) { - for event_index in self.expiring_events.process_expired_events(now) { + while let Some(event_index) = self.expiring_events.take_next_expired_event(now) { if let Some(event) = self.main.remove_expired_event(event_index) { + result.events.push(event_index); if let ChatEventInternal::Message(m) = event.event { - self.threads.remove(&m.message_index); + if let Some(thread) = m.thread_summary { + self.threads.remove(&m.message_index); + result + .threads + .push((m.message_index, thread.participants_and_followers(true))); + } } } } + + result } - pub fn main_events_reader(&self, now: TimestampMillis) -> ChatEventsListReader { - ChatEventsListReader::new(&self.main, now) + pub fn main_events_reader(&self) -> ChatEventsListReader { + ChatEventsListReader::new(&self.main) } - pub fn visible_main_events_reader( - &self, - min_visible_event_index: EventIndex, - now: TimestampMillis, - ) -> ChatEventsListReader { - ChatEventsListReader::with_min_visible_event_index(&self.main, min_visible_event_index, now) + pub fn visible_main_events_reader(&self, min_visible_event_index: EventIndex) -> ChatEventsListReader { + ChatEventsListReader::with_min_visible_event_index(&self.main, min_visible_event_index) } pub fn events_reader( &self, min_visible_event_index: EventIndex, thread_root_message_index: Option, - now: TimestampMillis, ) -> Option { - let events_list = self.events_list(min_visible_event_index, thread_root_message_index, now)?; + let events_list = self.events_list(min_visible_event_index, thread_root_message_index)?; if thread_root_message_index.is_some() { - Some(ChatEventsListReader::new(events_list, now)) + Some(ChatEventsListReader::new(events_list)) } else { Some(ChatEventsListReader::with_min_visible_event_index( events_list, min_visible_event_index, - now, )) } } @@ -1266,13 +1231,9 @@ impl ChatEvents { &self, min_visible_event_index: EventIndex, thread_root_message_index: Option, - now: TimestampMillis, ) -> Option<&ChatEventsList> { if let Some(root_message_index) = thread_root_message_index { - if self - .main - .is_accessible(root_message_index.into(), min_visible_event_index, now) - { + if self.main.is_accessible(root_message_index.into(), min_visible_event_index) { self.threads.get(&root_message_index) } else { None @@ -1286,13 +1247,9 @@ impl ChatEvents { &mut self, min_visible_event_index: EventIndex, thread_root_message_index: Option, - now: TimestampMillis, ) -> Option<&mut ChatEventsList> { if let Some(root_message_index) = thread_root_message_index { - if self - .main - .is_accessible(root_message_index.into(), min_visible_event_index, now) - { + if self.main.is_accessible(root_message_index.into(), min_visible_event_index) { self.threads.get_mut(&root_message_index) } else { None @@ -1307,10 +1264,9 @@ impl ChatEvents { min_visible_event_index: EventIndex, thread_root_message_index: Option, event_key: EventKey, - now: TimestampMillis, ) -> Option<(&mut MessageInternal, EventIndex)> { - self.events_list_mut(min_visible_event_index, thread_root_message_index, now) - .and_then(|l| l.get_mut(event_key, min_visible_event_index, now)) + self.events_list_mut(min_visible_event_index, thread_root_message_index) + .and_then(|l| l.get_mut(event_key, min_visible_event_index)) .and_then(|e| e.event.as_message_mut().map(|m| (m, e.index))) } @@ -1319,10 +1275,9 @@ impl ChatEvents { min_visible_event_index: EventIndex, thread_root_message_index: Option, event_key: EventKey, - now: TimestampMillis, ) -> Option<&MessageInternal> { - self.events_list(min_visible_event_index, thread_root_message_index, now) - .and_then(|l| l.get(event_key, min_visible_event_index, now)) + self.events_list(min_visible_event_index, thread_root_message_index) + .and_then(|l| l.get(event_key, min_visible_event_index)) .and_then(|e| e.event.as_message()) } @@ -1529,6 +1484,12 @@ pub enum UnfollowThreadResult { ThreadNotFound, } +#[derive(Default)] +pub struct RemoveExpiredEventsResult { + pub events: Vec, + pub threads: Vec<(MessageIndex, Vec)>, +} + #[derive(Copy, Clone)] pub enum EventKey { EventIndex(EventIndex), diff --git a/backend/libraries/chat_events/src/chat_events_list.rs b/backend/libraries/chat_events/src/chat_events_list.rs index 623757735f..8dee85598b 100644 --- a/backend/libraries/chat_events/src/chat_events_list.rs +++ b/backend/libraries/chat_events/src/chat_events_list.rs @@ -57,28 +57,24 @@ impl ChatEventsList { &self, event_key: EventKey, min_visible_event_index: EventIndex, - now: TimestampMillis, ) -> Option<&EventWrapperInternal> { self.event_index(event_key) .filter(|e| *e >= min_visible_event_index) .and_then(|e| self.events_map.get(&e)) - .filter(|e| !e.is_expired(now)) } pub(crate) fn get_mut( &mut self, event_key: EventKey, min_visible_event_index: EventIndex, - now: TimestampMillis, ) -> Option<&mut EventWrapperInternal> { self.event_index(event_key) .filter(|e| *e >= min_visible_event_index) .and_then(|e| self.events_map.get_mut(&e)) - .filter(|e| !e.is_expired(now)) } - pub(crate) fn is_accessible(&self, event_key: EventKey, min_visible_event_index: EventIndex, now: TimestampMillis) -> bool { - self.get(event_key, min_visible_event_index, now).is_some() + pub(crate) fn is_accessible(&self, event_key: EventKey, min_visible_event_index: EventIndex) -> bool { + self.get(event_key, min_visible_event_index).is_some() } pub(crate) fn iter( @@ -86,10 +82,9 @@ impl ChatEventsList { start: Option, ascending: bool, min_visible_event_index: EventIndex, - now: TimestampMillis, ) -> Box> + '_> { let range = if let Some(start) = start { - if let Some(event_index) = self.get(start, min_visible_event_index, now).map(|e| e.index) { + if let Some(event_index) = self.get(start, min_visible_event_index).map(|e| e.index) { if ascending { self.events_map.range(event_index..) } else { @@ -102,7 +97,7 @@ impl ChatEventsList { self.events_map.range(min_visible_event_index..) }; - let iter = range.map(|(_, e)| e).filter(move |e| !e.is_expired(now)); + let iter = range.map(|(_, e)| e); if ascending { Box::new(iter) @@ -132,17 +127,12 @@ impl ChatEventsList { updated } - pub(crate) fn event_count_since bool>( - &self, - since: TimestampMillis, - now: TimestampMillis, - filter: &F, - ) -> usize { + pub(crate) fn event_count_since bool>(&self, since: TimestampMillis, filter: &F) -> usize { self.events_map .values() .rev() .take_while(|e| e.timestamp > since) - .filter(|e| !e.is_expired(now) && filter(&e.event)) + .filter(|e| filter(&e.event)) .count() } @@ -205,7 +195,6 @@ impl ChatEventsList { pub struct ChatEventsListReader<'r> { events_list: &'r ChatEventsList, min_visible_event_index: EventIndex, - now: TimestampMillis, } impl<'r> Deref for ChatEventsListReader<'r> { @@ -217,19 +206,17 @@ impl<'r> Deref for ChatEventsListReader<'r> { } impl<'r> ChatEventsListReader<'r> { - pub(crate) fn new(events_list: &ChatEventsList, now: TimestampMillis) -> ChatEventsListReader { - Self::with_min_visible_event_index(events_list, EventIndex::default(), now) + pub(crate) fn new(events_list: &ChatEventsList) -> ChatEventsListReader { + Self::with_min_visible_event_index(events_list, EventIndex::default()) } pub(crate) fn with_min_visible_event_index( events_list: &ChatEventsList, min_visible_event_index: EventIndex, - now: TimestampMillis, ) -> ChatEventsListReader { ChatEventsListReader { events_list, min_visible_event_index, - now, } } } @@ -413,7 +400,7 @@ pub trait Reader { impl<'r> Reader for ChatEventsListReader<'r> { fn get(&self, event_key: EventKey) -> Option<&EventWrapperInternal> { - self.events_list.get(event_key, self.min_visible_event_index, self.now) + self.events_list.get(event_key, self.min_visible_event_index) } fn iter( @@ -421,8 +408,7 @@ impl<'r> Reader for ChatEventsListReader<'r> { start: Option, ascending: bool, ) -> Box> + '_> { - self.events_list - .iter(start, ascending, self.min_visible_event_index, self.now) + self.events_list.iter(start, ascending, self.min_visible_event_index) } fn iter_latest_messages(&self, my_user_id: Option) -> Box> + '_> { @@ -432,7 +418,7 @@ impl<'r> Reader for ChatEventsListReader<'r> { .values() .copied() .rev() - .map_while(|e| self.events_list.get(e.into(), self.min_visible_event_index, self.now)) + .map_while(|e| self.events_list.get(e.into(), self.min_visible_event_index)) .filter_map(move |e| try_into_message_event(e, my_user_id)), ) } @@ -470,7 +456,7 @@ mod tests { #[test] fn get() { let events = setup_events(None); - let events_reader = events.main_events_reader(0); + let events_reader = events.main_events_reader(); let event_by_message_index = events_reader.get(EventKey::MessageIndex(10.into())).unwrap(); let event_by_event_index = events_reader.get(event_by_message_index.index.into()).unwrap(); @@ -485,33 +471,16 @@ mod tests { #[test] fn get_before_min_visible_returns_none() { let events = setup_events(None); - let events_reader = events.visible_main_events_reader(10.into(), 0); + let events_reader = events.visible_main_events_reader(10.into()); assert!(events_reader.get(EventKey::EventIndex(10.into())).is_some()); assert!(events_reader.get(EventKey::EventIndex(9.into())).is_none()); } - #[test] - fn get_excludes_expired_events() { - let events = setup_events(Some(100)); - let events_reader1 = events.main_events_reader(100); - let expires_at = events_reader1 - .get(EventKey::EventIndex(20.into())) - .unwrap() - .expires_at - .unwrap(); - - let events_reader2 = events.main_events_reader(expires_at); - assert!(events_reader2.get(EventKey::EventIndex(20.into())).is_some()); - - let events_reader3 = events.main_events_reader(expires_at + 1); - assert!(events_reader3.get(EventKey::EventIndex(20.into())).is_none()); - } - #[test] fn scan_ascending_from_start() { let events = setup_events(None); - let events_reader = events.main_events_reader(0); + let events_reader = events.main_events_reader(); let results = events_reader.scan(None, true, usize::MAX, usize::MAX, None); @@ -523,7 +492,7 @@ mod tests { #[test] fn scan_descending_from_end() { let events = setup_events(None); - let events_reader = events.main_events_reader(0); + let events_reader = events.main_events_reader(); let results = events_reader.scan(None, false, usize::MAX, usize::MAX, None); @@ -535,7 +504,7 @@ mod tests { #[test] fn scan_ascending() { let events = setup_events(None); - let events_reader = events.main_events_reader(0); + let events_reader = events.main_events_reader(); let start: MessageIndex = 20.into(); @@ -557,7 +526,7 @@ mod tests { #[test] fn scan_descending() { let events = setup_events(None); - let events_reader = events.main_events_reader(0); + let events_reader = events.main_events_reader(); let start = 30.into(); @@ -576,40 +545,10 @@ mod tests { assert_eq!(event_indexes, (0..=first.index.into()).rev().collect_vec()); } - #[test] - fn iter_skips_expired() { - let mut events = setup_events(Some(2000)); // These will expire at 2000 - let user_id = Principal::from_slice(&[1]).into(); - - events.set_events_time_to_live(user_id, Some(1000), 500); - push_events(&mut events, 500); // These will expire at 1500 - events.set_events_time_to_live(user_id, Some(1500), 1000); - push_events(&mut events, 1000); // These will expire at 2500 - - let group1 = (0u32..=100).map(EventIndex::from).collect_vec(); - let group2 = (101u32..=201).map(EventIndex::from).collect_vec(); - let group3 = (202u32..=302).map(EventIndex::from).collect_vec(); - - let events_reader1 = events.main_events_reader(1250); - let expected1 = group1.iter().chain(group2.iter()).chain(group3.iter()).copied().collect_vec(); - assert_eq!(events_reader1.iter(None, true).map(|e| e.index).collect_vec(), expected1); - - let events_reader2 = events.main_events_reader(1750); - let expected2 = group1.iter().chain(group3.iter()).copied().collect_vec(); - assert_eq!(events_reader2.iter(None, true).map(|e| e.index).collect_vec(), expected2); - - let events_reader3 = events.main_events_reader(2250); - let expected3 = group3; - assert_eq!(events_reader3.iter(None, true).map(|e| e.index).collect_vec(), expected3); - - let events_reader4 = events.main_events_reader(2750); - assert_eq!(events_reader4.iter(None, true).map(|e| e.index).collect_vec(), vec![]); - } - #[test] fn window_message_limit() { let events = setup_events(None); - let events_reader = events.main_events_reader(0); + let events_reader = events.main_events_reader(); let start = 30.into(); @@ -626,7 +565,7 @@ mod tests { #[test] fn window_event_limit() { let events = setup_events(None); - let events_reader = events.main_events_reader(0); + let events_reader = events.main_events_reader(); let start = 40.into(); @@ -640,7 +579,7 @@ mod tests { #[test] fn window_min_visible_event_index() { let events = setup_events(None); - let events_reader = events.visible_main_events_reader(46.into(), 0); + let events_reader = events.visible_main_events_reader(46.into()); let start = 50.into(); diff --git a/backend/libraries/chat_events/src/expiring_events.rs b/backend/libraries/chat_events/src/expiring_events.rs index 83a7b4ca20..9ca2c45331 100644 --- a/backend/libraries/chat_events/src/expiring_events.rs +++ b/backend/libraries/chat_events/src/expiring_events.rs @@ -1,81 +1,26 @@ use serde::{Deserialize, Serialize}; -use std::collections::BTreeMap; -use types::{EventIndex, MessageIndex, RangeSet, TimestampMillis, Timestamped}; +use std::collections::BTreeSet; +use types::{EventIndex, TimestampMillis}; #[derive(Serialize, Deserialize, Default)] pub struct ExpiringEvents { - event_expiry_dates: BTreeMap<(TimestampMillis, EventIndex), ()>, - // We don't remove message indexes from this map so that we can send expired message ranges - // incrementally to the frontend - message_expiry_dates: BTreeMap<(TimestampMillis, MessageIndex), ()>, - expired_message_ranges: Timestamped>, + event_expiry_dates: BTreeSet<(TimestampMillis, EventIndex)>, } impl ExpiringEvents { - pub fn insert(&mut self, event_index: EventIndex, message_index: Option, expires_at: TimestampMillis) { - self.event_expiry_dates.insert((expires_at, event_index), ()); - - message_index.map(|m| self.message_expiry_dates.insert((expires_at, m), ())); - } - - pub fn expired_messages(&self, now: TimestampMillis) -> RangeSet { - let mut ranges = self.expired_message_ranges.value.clone(); - for message_index in self.messages_expired_since(self.expired_message_ranges.timestamp, now) { - ranges.insert(message_index); - } - ranges + pub fn insert(&mut self, event_index: EventIndex, expires_at: TimestampMillis) { + self.event_expiry_dates.insert((expires_at, event_index)); } - pub fn expired_messages_since(&self, since: TimestampMillis, now: TimestampMillis) -> RangeSet { - let mut ranges = RangeSet::default(); - for message_index in self.messages_expired_since(since, now) { - ranges.insert(message_index); - } - ranges - } - - pub fn next_message_expiry(&self, now: TimestampMillis) -> Option { - self.message_expiry_dates - .range((now, MessageIndex::default())..) - .next() - .map(|((ts, _), _)| *ts) + pub fn next_event_expiry(&self) -> Option { + self.event_expiry_dates.first().map(|(ts, _)| *ts) } - pub fn process_expired_events(&mut self, now: TimestampMillis) -> Vec { - let mut expired_events = Vec::new(); - while let Some(next) = self.take_next_expired_event(now) { - expired_events.push(next); + pub fn take_next_expired_event(&mut self, now: TimestampMillis) -> Option { + if self.next_event_expiry().map_or(false, |ts| ts <= now) { + self.event_expiry_dates.pop_first().map(|(_, i)| i) + } else { + None } - - let last_updated = self.expired_message_ranges.timestamp; - let expired_messages: Vec<_> = self.messages_expired_since(last_updated, now).collect(); - - if !expired_messages.is_empty() { - self.expired_message_ranges.update( - |ranges| { - for message_index in expired_messages { - ranges.insert(message_index); - } - true - }, - now, - ); - } - - expired_events - } - - fn take_next_expired_event(&mut self, now: TimestampMillis) -> Option { - self.event_expiry_dates - .first_entry() - .filter(|e| e.key().0 < now) - .map(|e| e.remove_entry()) - .map(|((_, event_index), _)| event_index) - } - - fn messages_expired_since(&self, since: TimestampMillis, now: TimestampMillis) -> impl Iterator + '_ { - self.message_expiry_dates - .range((since, MessageIndex::default())..(now, MessageIndex::default())) - .map(|((_, message_index), _)| *message_index) } } diff --git a/backend/libraries/group_chat_core/src/lib.rs b/backend/libraries/group_chat_core/src/lib.rs index fedaaec999..4044484c63 100644 --- a/backend/libraries/group_chat_core/src/lib.rs +++ b/backend/libraries/group_chat_core/src/lib.rs @@ -129,12 +129,7 @@ impl GroupChatCore { self.events.has_updates_since(since) || self.invited_users.last_updated() > since } - pub fn summary_updates_from_events( - &self, - since: TimestampMillis, - user_id: Option, - now: TimestampMillis, - ) -> SummaryUpdatesFromEvents { + pub fn summary_updates_from_events(&self, since: TimestampMillis, user_id: Option) -> SummaryUpdatesFromEvents { let member = user_id.and_then(|user_id| self.members.get(&user_id)); let min_visible_event_index = if let Some(member) = member { @@ -147,10 +142,10 @@ impl GroupChatCore { panic!("Cannot get private summary updates if user is not a member"); }; - let events_reader = self.events.visible_main_events_reader(min_visible_event_index, now); + let events_reader = self.events.visible_main_events_reader(min_visible_event_index); let latest_message = events_reader.latest_message_event_if_updated(since, user_id); let mentions = member - .map(|m| m.most_recent_mentions(Some(since), &self.events, now)) + .map(|m| m.most_recent_mentions(Some(since), &self.events)) .unwrap_or_default(); let mut updates = SummaryUpdatesFromEvents { @@ -302,13 +297,10 @@ impl GroupChatCore { return None; }; - let latest_event_timestamp = self.events.latest_event_timestamp().unwrap_or_default(); - - let events_reader = self - .events - .visible_main_events_reader(min_visible_event_index, latest_event_timestamp); + let events_reader = self.events.visible_main_events_reader(min_visible_event_index); let latest_event_index = events_reader.latest_event_index().unwrap(); + let latest_event_timestamp = self.events.latest_event_timestamp().unwrap_or_default(); let invited_users = if self.invited_users.last_updated() > since { Some(self.invited_users.users()) } else { None }; @@ -402,7 +394,7 @@ impl GroupChatCore { ) -> EventsResult { use EventsResult::*; - match self.events_reader(user_id, thread_root_message_index, now) { + match self.events_reader(user_id, thread_root_message_index) { EventsReaderResult::Success(reader) => { let latest_event_index = reader.latest_event_index().unwrap(); if latest_client_event_index.map_or(false, |e| latest_event_index < e) { @@ -438,7 +430,7 @@ impl GroupChatCore { ) -> EventsResult { use EventsResult::*; - match self.events_reader(user_id, thread_root_message_index, now) { + match self.events_reader(user_id, thread_root_message_index) { EventsReaderResult::Success(reader) => { let latest_event_index = reader.latest_event_index().unwrap(); if latest_client_event_index.map_or(false, |e| latest_event_index < e) { @@ -470,7 +462,7 @@ impl GroupChatCore { ) -> EventsResult { use EventsResult::*; - match self.events_reader(user_id, thread_root_message_index, now) { + match self.events_reader(user_id, thread_root_message_index) { EventsReaderResult::Success(reader) => { let latest_event_index = reader.latest_event_index().unwrap(); if latest_client_event_index.map_or(false, |e| latest_event_index < e) { @@ -500,7 +492,7 @@ impl GroupChatCore { ) -> MessagesResult { use MessagesResult::*; - match self.events_reader(user_id, thread_root_message_index, now) { + match self.events_reader(user_id, thread_root_message_index) { EventsReaderResult::Success(reader) => { let latest_event_index = reader.latest_event_index().unwrap(); if latest_client_event_index.map_or(false, |e| latest_event_index < e) { @@ -528,17 +520,13 @@ impl GroupChatCore { user_id: UserId, thread_root_message_index: Option, message_id: MessageId, - now: TimestampMillis, ) -> DeletedMessageResult { use DeletedMessageResult::*; if let Some(member) = self.members.get(&user_id) { let min_visible_event_index = member.min_visible_event_index(); - if let Some(events_reader) = self - .events - .events_reader(min_visible_event_index, thread_root_message_index, now) - { + if let Some(events_reader) = self.events.events_reader(min_visible_event_index, thread_root_message_index) { if let Some(message) = events_reader.message_internal(message_id.into()) { return if let Some(deleted_by) = &message.deleted_by { if matches!(message.content, MessageContentInternal::Deleted(_)) { @@ -580,7 +568,7 @@ impl GroupChatCore { threads .into_iter() .filter_map(|root_message_index| { - self.build_thread_preview(member.user_id, member.min_visible_event_index(), root_message_index, now) + self.build_thread_preview(member.user_id, member.min_visible_event_index(), root_message_index) }) .collect(), ) @@ -714,7 +702,7 @@ impl GroupChatCore { if let Some(root_message_index) = thread_root_message_index { if !self .events - .is_accessible(member.min_visible_event_index(), None, root_message_index.into(), now) + .is_accessible(member.min_visible_event_index(), None, root_message_index.into()) { return ThreadMessageNotFound; } @@ -723,7 +711,7 @@ impl GroupChatCore { let min_visible_event_index = member.min_visible_event_index(); let user_being_replied_to = replies_to .as_ref() - .and_then(|r| self.get_user_being_replied_to(r, min_visible_event_index, thread_root_message_index, now)); + .and_then(|r| self.get_user_being_replied_to(r, min_visible_event_index, thread_root_message_index)); let everyone_mentioned = member.role.can_mention_everyone(permissions) && is_everyone_mentioned(&content); @@ -745,11 +733,11 @@ impl GroupChatCore { let mut mentions: HashSet<_> = mentioned.into_iter().chain(user_being_replied_to).collect(); let mut users_to_notify = HashSet::new(); - let mut thread_followers: Option> = None; + let mut thread_followers: Option> = None; if let Some(thread_root_message) = thread_root_message_index.and_then(|root_message_index| { self.events - .visible_main_events_reader(min_visible_event_index, now) + .visible_main_events_reader(min_visible_event_index) .message_internal(root_message_index.into()) .cloned() }) { @@ -758,9 +746,7 @@ impl GroupChatCore { } if let Some(thread_summary) = thread_root_message.thread_summary { - let followers = thread_summary.followers(); - let participants = HashSet::from_iter(thread_summary.participant_ids); - thread_followers = Some(followers.union(&participants).copied().collect()); + thread_followers = Some(thread_summary.participants_and_followers(false)); let is_first_reply = thread_summary.reply_count == 1; if is_first_reply { @@ -922,7 +908,7 @@ impl GroupChatCore { { if let Some(message_index) = self .events - .visible_main_events_reader(min_visible_event_index, now) + .visible_main_events_reader(min_visible_event_index) .message_internal(message_id.into()) .map(|m| m.message_index) { @@ -977,7 +963,7 @@ impl GroupChatCore { let events_reader = self .events - .events_reader(min_visible_event_index, thread_root_message_index, now) + .events_reader(min_visible_event_index, thread_root_message_index) .unwrap(); let messages = results @@ -1044,10 +1030,7 @@ impl GroupChatCore { let min_visible_event_index = member.min_visible_event_index(); let user_id = member.user_id; - if !self - .events - .is_accessible(min_visible_event_index, None, message_index.into(), now) - { + if !self.events.is_accessible(min_visible_event_index, None, message_index.into()) { return MessageNotFound; } @@ -1091,7 +1074,7 @@ impl GroupChatCore { if !self .events - .is_accessible(member.min_visible_event_index(), None, message_index.into(), now) + .is_accessible(member.min_visible_event_index(), None, message_index.into()) { return MessageNotFound; } @@ -1163,7 +1146,7 @@ impl GroupChatCore { } else { // If there is only an initial "group created" event then allow these users // to see the "group created" event by starting min_visible_* at zero - let events_reader = self.events.main_events_reader(now); + let events_reader = self.events.main_events_reader(); if events_reader.len() > 1 { min_visible_event_index = events_reader.next_event_index(); min_visible_message_index = events_reader.next_message_index(); @@ -1593,19 +1576,24 @@ impl GroupChatCore { } } - fn events_reader( - &self, - user_id: Option, - thread_root_message_index: Option, - now: TimestampMillis, - ) -> EventsReaderResult { + pub fn remove_expired_events(&mut self, now: TimestampMillis) { + let result = self.events.remove_expired_events(now); + + for (thread_root_message_index, users) in result.threads { + for user_id in users { + if let Some(member) = self.members.get_mut(&user_id) { + member.threads.remove(&thread_root_message_index); + member.unfollowed_threads.retain(|&m| m != thread_root_message_index); + } + } + } + } + + fn events_reader(&self, user_id: Option, thread_root_message_index: Option) -> EventsReaderResult { use EventsReaderResult::*; if let Some(min_visible_event_index) = self.min_visible_event_index(user_id) { - if let Some(events_reader) = self - .events - .events_reader(min_visible_event_index, thread_root_message_index, now) - { + if let Some(events_reader) = self.events.events_reader(min_visible_event_index, thread_root_message_index) { Success(events_reader) } else { ThreadNotFound @@ -1620,11 +1608,10 @@ impl GroupChatCore { replies_to: &GroupReplyContext, min_visible_event_index: EventIndex, thread_root_message_index: Option, - now: TimestampMillis, ) -> Option { let events_reader = self .events - .events_reader(min_visible_event_index, thread_root_message_index, now)?; + .events_reader(min_visible_event_index, thread_root_message_index)?; events_reader .message_internal(replies_to.event_index.into()) @@ -1656,17 +1643,14 @@ impl GroupChatCore { caller_user_id: UserId, min_visible_event_index: EventIndex, root_message_index: MessageIndex, - now: TimestampMillis, ) -> Option { const MAX_PREVIEWED_REPLY_COUNT: usize = 2; - let events_reader = self.events.visible_main_events_reader(min_visible_event_index, now); + let events_reader = self.events.visible_main_events_reader(min_visible_event_index); let root_message = events_reader.message_event(root_message_index.into(), Some(caller_user_id))?; - let thread_events_reader = self - .events - .events_reader(min_visible_event_index, Some(root_message_index), now)?; + let thread_events_reader = self.events.events_reader(min_visible_event_index, Some(root_message_index))?; Some(ThreadPreview { root_message, diff --git a/backend/libraries/group_chat_core/src/members.rs b/backend/libraries/group_chat_core/src/members.rs index 15bd689aa4..97e73c4cd5 100644 --- a/backend/libraries/group_chat_core/src/members.rs +++ b/backend/libraries/group_chat_core/src/members.rs @@ -332,17 +332,12 @@ impl GroupMemberInternal { } } - pub fn most_recent_mentions( - &self, - since: Option, - chat_events: &ChatEvents, - now: TimestampMillis, - ) -> Vec { + pub fn most_recent_mentions(&self, since: Option, chat_events: &ChatEvents) -> Vec { let min_visible_event_index = self.min_visible_event_index(); self.mentions .iter_most_recent(since) - .filter_map(|m| chat_events.hydrate_mention(min_visible_event_index, &m, now)) + .filter_map(|m| chat_events.hydrate_mention(min_visible_event_index, &m)) .take(MAX_RETURNED_MENTIONS) .collect() } diff --git a/backend/libraries/types/can.did b/backend/libraries/types/can.did index 4272603d36..eb35a87e66 100644 --- a/backend/libraries/types/can.did +++ b/backend/libraries/types/can.did @@ -129,7 +129,6 @@ type DirectChatSummary = record { my_metrics : ChatMetrics; archived : bool; events_ttl : opt Milliseconds; - expired_messages : vec MessageIndexRange; }; type DirectChatSummaryUpdates = record { @@ -145,7 +144,6 @@ type DirectChatSummaryUpdates = record { my_metrics : opt ChatMetrics; archived : opt bool; events_ttl : EventsTimeToLiveUpdate; - newly_expired_messages : vec MessageIndexRange; }; type DirectMessageNotification = record { @@ -274,8 +272,6 @@ type GroupChatSummary = record { date_last_pinned : opt TimestampMillis; date_read_pinned : opt TimestampMillis; events_ttl : opt Milliseconds; - expired_messages : vec MessageIndexRange; - next_message_expiry : opt TimestampMillis; gate : opt AccessGate; rules_accepted : bool; }; @@ -306,8 +302,6 @@ type GroupCanisterGroupChatSummary = record { wasm_version : BuildVersion; date_last_pinned : opt TimestampMillis; events_ttl : opt Milliseconds; - expired_messages : vec MessageIndexRange; - next_message_expiry : opt TimestampMillis; gate : opt AccessGate; rules_accepted : bool; }; @@ -336,8 +330,6 @@ type GroupCanisterGroupChatSummaryUpdates = record { wasm_version : opt BuildVersion; date_last_pinned : opt TimestampMillis; events_ttl : EventsTimeToLiveUpdate; - newly_expired_messages : vec MessageIndexRange; - next_message_expiry : TimestampUpdate; gate : AccessGateUpdate; rules_accepted : opt bool; }; @@ -394,8 +386,6 @@ type CommunityCanisterChannelSummary = record { metrics : ChatMetrics; date_last_pinned : opt TimestampMillis; events_ttl : opt Milliseconds; - expired_messages : vec MessageIndexRange; - next_message_expiry : opt TimestampMillis; gate : opt AccessGate; membership : opt ChannelMembership; }; diff --git a/backend/libraries/types/src/channel_summary.rs b/backend/libraries/types/src/channel_summary.rs index 618a7d5670..363fcb60a3 100644 --- a/backend/libraries/types/src/channel_summary.rs +++ b/backend/libraries/types/src/channel_summary.rs @@ -1,6 +1,6 @@ use crate::{ AccessGate, ChannelId, ChatMetrics, EventIndex, EventWrapper, GroupCanisterThreadDetails, GroupPermissions, GroupRole, - GroupSubtype, HydratedMention, Message, MessageIndex, Milliseconds, OptionUpdate, RangeSet, TimestampMillis, + GroupSubtype, HydratedMention, Message, MessageIndex, Milliseconds, OptionUpdate, TimestampMillis, }; use candid::CandidType; use serde::{Deserialize, Serialize}; @@ -25,8 +25,6 @@ pub struct CommunityCanisterChannelSummary { pub metrics: ChatMetrics, pub date_last_pinned: Option, pub events_ttl: Option, - pub expired_messages: RangeSet, - pub next_message_expiry: Option, pub gate: Option, pub membership: Option, } diff --git a/backend/libraries/types/src/chat_summary.rs b/backend/libraries/types/src/chat_summary.rs index 77c440b0ab..27f4742267 100644 --- a/backend/libraries/types/src/chat_summary.rs +++ b/backend/libraries/types/src/chat_summary.rs @@ -1,6 +1,6 @@ use crate::{ AccessGate, BuildVersion, CanisterId, ChatId, EventIndex, EventWrapper, FrozenGroupInfo, GroupMember, GroupPermissions, - GroupRole, HydratedMention, Message, MessageIndex, Milliseconds, OptionUpdate, RangeSet, TimestampMillis, UserId, Version, + GroupRole, HydratedMention, Message, MessageIndex, Milliseconds, OptionUpdate, TimestampMillis, UserId, Version, MAX_RETURNED_MENTIONS, }; use candid::CandidType; @@ -23,7 +23,6 @@ pub struct DirectChatSummary { pub my_metrics: ChatMetrics, pub archived: bool, pub events_ttl: Option, - pub expired_messages: RangeSet, } impl DirectChatSummary { @@ -63,8 +62,6 @@ pub struct GroupChatSummary { pub date_last_pinned: Option, pub date_read_pinned: Option, pub events_ttl: Option, - pub expired_messages: RangeSet, - pub next_message_expiry: Option, pub gate: Option, pub rules_accepted: bool, } @@ -83,7 +80,6 @@ pub struct DirectChatSummaryUpdates { pub my_metrics: Option, pub archived: Option, pub events_ttl: OptionUpdate, - pub newly_expired_messages: RangeSet, } // TODO: This type is used in the response from group::public_summary and group_index::recommended_groups @@ -135,8 +131,6 @@ pub struct GroupCanisterGroupChatSummary { pub frozen: Option, pub date_last_pinned: Option, pub events_ttl: Option, - pub expired_messages: RangeSet, - pub next_message_expiry: Option, pub gate: Option, pub rules_accepted: bool, } @@ -196,8 +190,6 @@ impl GroupCanisterGroupChatSummary { frozen: updates.frozen.apply_to(self.frozen), date_last_pinned: updates.date_last_pinned.or(self.date_last_pinned), events_ttl: updates.events_ttl.apply_to(self.events_ttl), - expired_messages: self.expired_messages.merge(updates.newly_expired_messages), - next_message_expiry: updates.next_message_expiry.apply_to(self.next_message_expiry), gate: updates.gate.apply_to(self.gate), rules_accepted: updates.rules_accepted.unwrap_or(self.rules_accepted), } @@ -230,8 +222,6 @@ pub struct GroupCanisterGroupChatSummaryUpdates { pub frozen: OptionUpdate, pub date_last_pinned: Option, pub events_ttl: OptionUpdate, - pub newly_expired_messages: RangeSet, - pub next_message_expiry: OptionUpdate, pub gate: OptionUpdate, pub rules_accepted: Option, } diff --git a/frontend/openchat-agent/src/services/community/candid/idl.js b/frontend/openchat-agent/src/services/community/candid/idl.js index ea0184a0b5..ca3236d592 100644 --- a/frontend/openchat-agent/src/services/community/candid/idl.js +++ b/frontend/openchat-agent/src/services/community/candid/idl.js @@ -1600,6 +1600,7 @@ export const idlFactory = ({ IDL }) => { 'gate' : AccessGateUpdate, 'name' : IDL.Opt(IDL.Text), 'description' : IDL.Opt(IDL.Text), + 'events_ttl' : EventsTimeToLiveUpdate, 'public' : IDL.Opt(IDL.Bool), 'rules' : IDL.Opt(UpdatedRules), 'avatar' : DocumentUpdate, diff --git a/frontend/openchat-agent/src/services/community/candid/types.d.ts b/frontend/openchat-agent/src/services/community/candid/types.d.ts index 18edc3e171..2d58a7f526 100644 --- a/frontend/openchat-agent/src/services/community/candid/types.d.ts +++ b/frontend/openchat-agent/src/services/community/candid/types.d.ts @@ -1795,6 +1795,7 @@ export interface UpdateChannelArgs { 'gate' : AccessGateUpdate, 'name' : [] | [string], 'description' : [] | [string], + 'events_ttl' : EventsTimeToLiveUpdate, 'public' : [] | [boolean], 'rules' : [] | [UpdatedRules], 'avatar' : DocumentUpdate, diff --git a/frontend/openchat-agent/src/services/community/community.client.ts b/frontend/openchat-agent/src/services/community/community.client.ts index 769c857f19..3c7df33018 100644 --- a/frontend/openchat-agent/src/services/community/community.client.ts +++ b/frontend/openchat-agent/src/services/community/community.client.ts @@ -2,7 +2,7 @@ import type { Identity } from "@dfinity/agent"; import { idlFactory, type CommunityService } from "./candid/idl"; import { CandidService } from "../candidService"; -import { identity } from "../../utils/mapping"; +import { apiOptionUpdate, identity } from "../../utils/mapping"; import type { AgentConfig } from "../../config"; import { addMembersToChannelResponse, @@ -125,6 +125,7 @@ import type { SetMemberDisplayNameResponse, UpdatedRules, FollowThreadResponse, + OptionUpdate, } from "openchat-shared"; import { textToCode, DestinationInvalidError } from "openchat-shared"; import { @@ -1109,6 +1110,7 @@ export class CommunityClient extends CandidService { rules?: UpdatedRules, permissions?: Partial, avatar?: Uint8Array, + eventsTimeToLiveMs?: OptionUpdate, gate?: AccessGate, isPublic?: boolean, ): Promise { @@ -1120,6 +1122,7 @@ export class CommunityClient extends CandidService { permissions: apiOptional(apiOptionalGroupPermissions, permissions), rules: apiOptional(apiUpdatedRules, rules), public: apiOptional(identity, isPublic), + events_ttl: apiOptionUpdate(identity, eventsTimeToLiveMs), gate: gate === undefined ? { NoChange: null } @@ -1236,7 +1239,11 @@ export class CommunityClient extends CandidService { ); } - followThread(channelId: string, threadRootMessageIndex: number, follow: boolean): Promise { + followThread( + channelId: string, + threadRootMessageIndex: number, + follow: boolean, + ): Promise { const args = { channel_id: BigInt(channelId), thread_root_message_index: threadRootMessageIndex, diff --git a/frontend/openchat-agent/src/services/openchatAgent.ts b/frontend/openchat-agent/src/services/openchatAgent.ts index 94a3d8ae3c..a677edb278 100644 --- a/frontend/openchat-agent/src/services/openchatAgent.ts +++ b/frontend/openchat-agent/src/services/openchatAgent.ts @@ -511,6 +511,7 @@ export class OpenChatAgent extends EventTarget { rules, permissions, avatar, + undefined, gate, isPublic, ); diff --git a/upgrade_order.md b/upgrade_order.md index 4b14512a23..0fda9085aa 100644 --- a/upgrade_order.md +++ b/upgrade_order.md @@ -1 +1,4 @@ -website -> users \ No newline at end of file +website -> users -> groups -> communities + +Website first because fields have been removed from chat summaries + fields have been added to `update_channel` args. +Users before groups because fields have been removed from group chat summaries. \ No newline at end of file