Skip to content

Commit

Permalink
Use canister timers to remove expired events (#4447)
Browse files Browse the repository at this point in the history
  • Loading branch information
hpeebles authored Oct 3, 2023
1 parent 5d87dc0 commit 1a1f650
Show file tree
Hide file tree
Showing 67 changed files with 593 additions and 626 deletions.
1 change: 1 addition & 0 deletions backend/canisters/community/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### Changed

- Disable mentions for messages sent by the ProposalsBot ([#4424](https://github.com/open-chat-labs/open-chat/pull/4424))
- Use canister timers to remove expired events ([#4447](https://github.com/open-chat-labs/open-chat/pull/4447))

### Fixed

Expand Down
1 change: 1 addition & 0 deletions backend/canisters/community/api/can.did
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,7 @@ type UpdateChannelArgs = record {
rules : opt UpdatedRules;
avatar : DocumentUpdate;
permissions : opt OptionalGroupPermissions;
events_ttl : EventsTimeToLiveUpdate;
gate : AccessGateUpdate;
public : opt bool;
};
Expand Down
5 changes: 3 additions & 2 deletions backend/canisters/community/api/src/updates/update_channel.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use candid::CandidType;
use serde::{Deserialize, Serialize};
use types::{
AccessGate, ChannelId, Document, FieldTooLongResult, FieldTooShortResult, OptionUpdate, OptionalGroupPermissions,
UpdatedRules, Version,
AccessGate, ChannelId, Document, FieldTooLongResult, FieldTooShortResult, Milliseconds, OptionUpdate,
OptionalGroupPermissions, UpdatedRules, Version,
};

#[derive(CandidType, Serialize, Deserialize, Debug)]
Expand All @@ -13,6 +13,7 @@ pub struct Args {
pub rules: Option<UpdatedRules>,
pub avatar: OptionUpdate<Document>,
pub permissions: Option<OptionalGroupPermissions>,
pub events_ttl: OptionUpdate<Milliseconds>,
pub gate: OptionUpdate<AccessGate>,
pub public: Option<bool>,
}
Expand Down
25 changes: 24 additions & 1 deletion backend/canisters/community/impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::memory::{get_instruction_counts_data_memory, get_instruction_counts_i
use crate::model::channels::Channels;
use crate::model::groups_being_imported::{GroupBeingImportedSummary, GroupsBeingImported};
use crate::model::members::CommunityMembers;
use crate::timer_job_types::TimerJob;
use crate::timer_job_types::{RemoveExpiredEventsJob, TimerJob};
use activity_notification_state::ActivityNotificationState;
use candid::Principal;
use canister_state_macros::canister_state;
Expand Down Expand Up @@ -146,6 +146,26 @@ impl RuntimeState {
}
}

pub fn run_event_expiry_job(&mut self) {
let now = self.env.now();
let mut next_event_expiry = None;
for channel in self.data.channels.iter_mut() {
channel.chat.remove_expired_events(now);
if let Some(expiry) = channel.chat.events.next_event_expiry() {
if next_event_expiry.map_or(true, |current| expiry < current) {
next_event_expiry = Some(expiry);
}
}
}

self.data.next_event_expiry = next_event_expiry;
if let Some(expiry) = self.data.next_event_expiry {
self.data
.timer_jobs
.enqueue_job(TimerJob::RemoveExpiredEvents(RemoveExpiredEventsJob), expiry, now);
}
}

pub fn metrics(&self) -> Metrics {
Metrics {
memory_used: utils::memory::used(),
Expand Down Expand Up @@ -211,6 +231,8 @@ struct Data {
groups_being_imported: GroupsBeingImported,
#[serde(skip, default = "init_instruction_counts_log")]
instruction_counts_log: InstructionCountsLog,
#[serde(default)]
next_event_expiry: Option<TimestampMillis>,
test_mode: bool,
cached_chat_metrics: Timestamped<ChatMetrics>,
}
Expand Down Expand Up @@ -274,6 +296,7 @@ impl Data {
activity_notification_state: ActivityNotificationState::new(now, mark_active_duration),
groups_being_imported: GroupsBeingImported::default(),
instruction_counts_log: init_instruction_counts_log(),
next_event_expiry: None,
test_mode,
cached_chat_metrics: Timestamped::default(),
}
Expand Down
10 changes: 3 additions & 7 deletions backend/canisters/community/impl/src/model/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl Channel {

let can_view_latest_message = self.can_view_latest_message(member.is_some(), is_community_member, is_public_community);

let main_events_reader = chat.events.visible_main_events_reader(min_visible_event_index, now);
let main_events_reader = chat.events.visible_main_events_reader(min_visible_event_index);
let latest_event_index = main_events_reader.latest_event_index().unwrap_or_default();
let latest_message = if can_view_latest_message { main_events_reader.latest_message_event(user_id) } else { None };

Expand All @@ -219,7 +219,7 @@ impl Channel {
let membership = member.map(|m| ChannelMembership {
joined: m.date_added,
role: m.role.into(),
mentions: m.most_recent_mentions(None, &chat.events, now),
mentions: m.most_recent_mentions(None, &chat.events),
notifications_muted: m.notifications_muted.value,
my_metrics: chat
.events
Expand All @@ -232,7 +232,6 @@ impl Channel {
None,
MAX_THREADS_IN_SUMMARY,
m.user_id,
now,
),
rules_accepted: m
.rules_accepted
Expand All @@ -259,8 +258,6 @@ impl Channel {
metrics: chat.events.metrics().hydrate(),
date_last_pinned: chat.date_last_pinned,
events_ttl: chat.events.get_events_time_to_live().value,
expired_messages: chat.events.expired_messages(now),
next_message_expiry: chat.events.next_message_expiry(now),
gate: chat.gate.value.clone(),
membership,
})
Expand Down Expand Up @@ -292,7 +289,7 @@ impl Channel {
}

let can_view_latest_message = self.can_view_latest_message(member.is_some(), is_community_member, is_public_community);
let updates_from_events = chat.summary_updates_from_events(since, user_id, now);
let updates_from_events = chat.summary_updates_from_events(since, user_id);

let latest_message = can_view_latest_message
.then_some(updates_from_events.latest_message)
Expand All @@ -314,7 +311,6 @@ impl Channel {
Some(since),
MAX_THREADS_IN_SUMMARY,
m.user_id,
now,
),
unfollowed_threads: self
.chat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ fn deleted_message_impl(args: Args, state: &RuntimeState) -> Response {
if let Some(channel) = state.data.channels.get(&args.channel_id) {
match channel
.chat
.deleted_message(user_id, args.thread_root_message_index, args.message_id, state.env.now())
.deleted_message(user_id, args.thread_root_message_index, args.message_id)
{
DeletedMessageResult::Success(content) => Success(SuccessResult { content: *content }),
DeletedMessageResult::UserNotInGroup => UserNotInChannel,
Expand Down
19 changes: 15 additions & 4 deletions backend/canisters/community/impl/src/timer_job_types.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::activity_notifications::handle_activity_notification;
use crate::jobs::import_groups::{finalize_group_import, mark_import_complete, process_channel_members};
use crate::{mutate_state, read_state};
use canister_timer_jobs::Job;
Expand All @@ -12,6 +13,7 @@ pub enum TimerJob {
HardDeleteMessageContent(HardDeleteMessageContentJob),
DeleteFileReferences(DeleteFileReferencesJob),
EndPoll(EndPollJob),
RemoveExpiredEvents(RemoveExpiredEventsJob),
FinalizeGroupImport(FinalizeGroupImportJob),
ProcessGroupImportChannelMembers(ProcessGroupImportChannelMembersJob),
MarkGroupImportComplete(MarkGroupImportCompleteJob),
Expand All @@ -38,6 +40,9 @@ pub struct EndPollJob {
pub message_index: MessageIndex,
}

#[derive(Serialize, Deserialize, Clone)]
pub struct RemoveExpiredEventsJob;

#[derive(Serialize, Deserialize, Clone)]
pub struct FinalizeGroupImportJob {
pub group_id: ChatId,
Expand Down Expand Up @@ -74,6 +79,7 @@ impl Job for TimerJob {
TimerJob::HardDeleteMessageContent(job) => job.execute(),
TimerJob::DeleteFileReferences(job) => job.execute(),
TimerJob::EndPoll(job) => job.execute(),
TimerJob::RemoveExpiredEvents(job) => job.execute(),
TimerJob::FinalizeGroupImport(job) => job.execute(),
TimerJob::ProcessGroupImportChannelMembers(job) => job.execute(),
TimerJob::MarkGroupImportComplete(job) => job.execute(),
Expand All @@ -86,13 +92,11 @@ impl Job for TimerJob {
impl Job for HardDeleteMessageContentJob {
fn execute(&self) {
mutate_state(|state| {
let now = state.env.now();

if let Some(content) = state.data.channels.get_mut(&self.channel_id).and_then(|channel| {
channel
.chat
.events
.remove_deleted_message_content(self.thread_root_message_index, self.message_id, now)
.remove_deleted_message_content(self.thread_root_message_index, self.message_id)
}) {
let files_to_delete = content.blob_references();
if !files_to_delete.is_empty() {
Expand Down Expand Up @@ -126,12 +130,19 @@ impl Job for EndPollJob {
.chat
.events
.end_poll(self.thread_root_message_index, self.message_index, now);
// handle_activity_notification(state);

handle_activity_notification(state);
}
});
}
}

impl Job for RemoveExpiredEventsJob {
fn execute(&self) {
mutate_state(|state| state.run_event_expiry_job());
}
}

impl Job for FinalizeGroupImportJob {
fn execute(&self) {
finalize_group_import(self.group_id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,18 @@ fn commit(
is_bot: bool,
state: &mut RuntimeState,
) -> Response {
let now = state.env.now();

if let Some(channel) = state.data.channels.get_mut(&channel_id) {
let mut min_visible_event_index = EventIndex::default();
let mut min_visible_message_index = MessageIndex::default();

if !channel.chat.history_visible_to_new_joiners {
let events_reader = channel.chat.events.main_events_reader(now);
let events_reader = channel.chat.events.main_events_reader();
min_visible_event_index = events_reader.next_event_index();
min_visible_message_index = events_reader.next_message_index();
}

let now = state.env.now();

let mut users_added: Vec<UserId> = Vec::new();
let mut users_limit_reached: Vec<UserId> = Vec::new();

Expand Down
15 changes: 5 additions & 10 deletions backend/canisters/community/impl/src/updates/add_reaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use canister_tracing_macros::trace;
use chat_events::Reader;
use community_canister::add_reaction::{Response::*, *};
use group_chat_core::{AddRemoveReactionResult, GroupChatCore};
use types::{ChannelReactionAddedNotification, EventIndex, EventWrapper, Message, Notification, TimestampMillis, UserId};
use types::{ChannelReactionAddedNotification, EventIndex, EventWrapper, Message, Notification, UserId};

#[update_candid_and_msgpack]
#[trace]
Expand All @@ -27,9 +27,9 @@ fn add_reaction_impl(args: Args, state: &mut RuntimeState) -> Response {
}

let user_id = member.user_id;
let now = state.env.now();

if let Some(channel) = state.data.channels.get_mut(&args.channel_id) {
let now = state.env.now();
match channel.chat.add_reaction(
user_id,
args.thread_root_message_index,
Expand All @@ -38,7 +38,7 @@ fn add_reaction_impl(args: Args, state: &mut RuntimeState) -> Response {
now,
) {
AddRemoveReactionResult::Success => {
if let Some(message) = should_push_notification(&args, user_id, &channel.chat, now) {
if let Some(message) = should_push_notification(&args, user_id, &channel.chat) {
push_notification(
args,
user_id,
Expand Down Expand Up @@ -67,15 +67,10 @@ fn add_reaction_impl(args: Args, state: &mut RuntimeState) -> Response {
}
}

fn should_push_notification(
args: &Args,
user_id: UserId,
chat: &GroupChatCore,
now: TimestampMillis,
) -> Option<EventWrapper<Message>> {
fn should_push_notification(args: &Args, user_id: UserId, chat: &GroupChatCore) -> Option<EventWrapper<Message>> {
let message = chat
.events
.events_reader(EventIndex::default(), args.thread_root_message_index, now)
.events_reader(EventIndex::default(), args.thread_root_message_index)
// We pass in `None` in place of `my_user_id` because we don't want to hydrate
// the notification with data for the current user (eg. their poll votes).
.and_then(|events_reader| events_reader.message_event(args.message_id.into(), None))?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ pub(crate) fn join_channel_unchecked(
min_visible_event_index = e;
min_visible_message_index = m;
} else {
let events_reader = channel.chat.events.main_events_reader(now);
let events_reader = channel.chat.events.main_events_reader();
min_visible_event_index = events_reader.next_event_index();
min_visible_message_index = events_reader.next_message_index();
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fn c2c_tip_message_impl(args: Args, state: &mut RuntimeState) -> Response {
if let Some((message_index, message_event_index)) = channel
.chat
.events
.events_reader(EventIndex::default(), args.thread_root_message_index, now)
.events_reader(EventIndex::default(), args.thread_root_message_index)
.and_then(|r| {
r.message_event_internal(args.message_id.into())
.map(|e| (e.event.message_index, e.index))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,11 @@ fn prepare(args: &Args, state: &RuntimeState) -> Result<PrepareResult, Response>
};

let min_visible_event_index = channel_member.min_visible_event_index();
let now = state.env.now();

if let Some(proposal) = channel
.chat
.events
.visible_main_events_reader(min_visible_event_index, now)
.visible_main_events_reader(min_visible_event_index)
.message_internal(args.message_index.into())
.and_then(|m| if let MessageContentInternal::GovernanceProposal(p) = &m.content { Some(p) } else { None })
{
Expand Down Expand Up @@ -111,15 +110,16 @@ fn commit(channel_id: ChannelId, user_id: UserId, args: Args, state: &mut Runtim
None => return UserNotInChannel,
};

let now = state.env.now();
let min_visible_event_index = member.min_visible_event_index();

match channel
.chat
.events
.record_proposal_vote(user_id, min_visible_event_index, args.message_index, args.adopt, now)
.record_proposal_vote(user_id, min_visible_event_index, args.message_index, args.adopt)
{
RecordProposalVoteResult::Success => {
let now = state.env.now();

let votes = member.proposal_votes.entry(now).or_default();
if !votes.contains(&args.message_index) {
votes.push(args.message_index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,17 @@ fn register_proposal_vote_impl(args: Args, state: &mut RuntimeState) -> Response
None => return UserNotInChannel,
};

let now = state.env.now();
let min_visible_event_index = channel_member.min_visible_event_index();
let user_id = member.user_id;

match channel
.chat
.events
.record_proposal_vote(user_id, min_visible_event_index, args.message_index, args.adopt, now)
.record_proposal_vote(user_id, min_visible_event_index, args.message_index, args.adopt)
{
RecordProposalVoteResult::Success => {
let now = state.env.now();

channel
.chat
.members
Expand Down
Loading

0 comments on commit 1a1f650

Please sign in to comment.