Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use canister timers to remove expired events #4447

Merged
merged 16 commits into from
Oct 3, 2023
Merged
1 change: 1 addition & 0 deletions backend/canisters/community/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### Changed

- Disable mentions for messages sent by the ProposalsBot ([#4424](https://github.com/open-chat-labs/open-chat/pull/4424))
- Use canister timers to remove expired events ([#4447](https://github.com/open-chat-labs/open-chat/pull/4447))

### Fixed

Expand Down
1 change: 1 addition & 0 deletions backend/canisters/community/api/can.did
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,7 @@ type UpdateChannelArgs = record {
rules : opt UpdatedRules;
avatar : DocumentUpdate;
permissions : opt OptionalGroupPermissions;
events_ttl : EventsTimeToLiveUpdate;
gate : AccessGateUpdate;
public : opt bool;
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use candid::CandidType;
use serde::{Deserialize, Serialize};
use types::{
AccessGate, ChannelId, Document, FieldTooLongResult, FieldTooShortResult, OptionUpdate, OptionalGroupPermissions,
UpdatedRules, Version,
AccessGate, ChannelId, Document, FieldTooLongResult, FieldTooShortResult, Milliseconds, OptionUpdate,
OptionalGroupPermissions, UpdatedRules, Version,
};

#[derive(CandidType, Serialize, Deserialize, Debug)]
Expand All @@ -13,6 +13,7 @@ pub struct Args {
pub rules: Option<UpdatedRules>,
pub avatar: OptionUpdate<Document>,
pub permissions: Option<OptionalGroupPermissions>,
pub events_ttl: OptionUpdate<Milliseconds>,
pub gate: OptionUpdate<AccessGate>,
pub public: Option<bool>,
}
Expand Down
25 changes: 24 additions & 1 deletion backend/canisters/community/impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::memory::{get_instruction_counts_data_memory, get_instruction_counts_i
use crate::model::channels::Channels;
use crate::model::groups_being_imported::{GroupBeingImportedSummary, GroupsBeingImported};
use crate::model::members::CommunityMembers;
use crate::timer_job_types::TimerJob;
use crate::timer_job_types::{RemoveExpiredEventsJob, TimerJob};
use activity_notification_state::ActivityNotificationState;
use candid::Principal;
use canister_state_macros::canister_state;
Expand Down Expand Up @@ -146,6 +146,26 @@ impl RuntimeState {
}
}

pub fn run_event_expiry_job(&mut self) {
let now = self.env.now();
let mut next_event_expiry = None;
for channel in self.data.channels.iter_mut() {
channel.chat.remove_expired_events(now);
if let Some(expiry) = channel.chat.events.next_event_expiry() {
if next_event_expiry.map_or(true, |current| expiry < current) {
next_event_expiry = Some(expiry);
}
}
}

self.data.next_event_expiry = next_event_expiry;
if let Some(expiry) = self.data.next_event_expiry {
self.data
.timer_jobs
.enqueue_job(TimerJob::RemoveExpiredEvents(RemoveExpiredEventsJob), expiry, now);
}
}

pub fn metrics(&self) -> Metrics {
Metrics {
memory_used: utils::memory::used(),
Expand Down Expand Up @@ -211,6 +231,8 @@ struct Data {
groups_being_imported: GroupsBeingImported,
#[serde(skip, default = "init_instruction_counts_log")]
instruction_counts_log: InstructionCountsLog,
#[serde(default)]
next_event_expiry: Option<TimestampMillis>,
test_mode: bool,
cached_chat_metrics: Timestamped<ChatMetrics>,
}
Expand Down Expand Up @@ -274,6 +296,7 @@ impl Data {
activity_notification_state: ActivityNotificationState::new(now, mark_active_duration),
groups_being_imported: GroupsBeingImported::default(),
instruction_counts_log: init_instruction_counts_log(),
next_event_expiry: None,
test_mode,
cached_chat_metrics: Timestamped::default(),
}
Expand Down
10 changes: 3 additions & 7 deletions backend/canisters/community/impl/src/model/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl Channel {

let can_view_latest_message = self.can_view_latest_message(member.is_some(), is_community_member, is_public_community);

let main_events_reader = chat.events.visible_main_events_reader(min_visible_event_index, now);
let main_events_reader = chat.events.visible_main_events_reader(min_visible_event_index);
let latest_event_index = main_events_reader.latest_event_index().unwrap_or_default();
let latest_message = if can_view_latest_message { main_events_reader.latest_message_event(user_id) } else { None };

Expand All @@ -219,7 +219,7 @@ impl Channel {
let membership = member.map(|m| ChannelMembership {
joined: m.date_added,
role: m.role.into(),
mentions: m.most_recent_mentions(None, &chat.events, now),
mentions: m.most_recent_mentions(None, &chat.events),
notifications_muted: m.notifications_muted.value,
my_metrics: chat
.events
Expand All @@ -232,7 +232,6 @@ impl Channel {
None,
MAX_THREADS_IN_SUMMARY,
m.user_id,
now,
),
rules_accepted: m
.rules_accepted
Expand All @@ -259,8 +258,6 @@ impl Channel {
metrics: chat.events.metrics().hydrate(),
date_last_pinned: chat.date_last_pinned,
events_ttl: chat.events.get_events_time_to_live().value,
expired_messages: chat.events.expired_messages(now),
next_message_expiry: chat.events.next_message_expiry(now),
gate: chat.gate.value.clone(),
membership,
})
Expand Down Expand Up @@ -292,7 +289,7 @@ impl Channel {
}

let can_view_latest_message = self.can_view_latest_message(member.is_some(), is_community_member, is_public_community);
let updates_from_events = chat.summary_updates_from_events(since, user_id, now);
let updates_from_events = chat.summary_updates_from_events(since, user_id);

let latest_message = can_view_latest_message
.then_some(updates_from_events.latest_message)
Expand All @@ -314,7 +311,6 @@ impl Channel {
Some(since),
MAX_THREADS_IN_SUMMARY,
m.user_id,
now,
),
unfollowed_threads: self
.chat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ fn deleted_message_impl(args: Args, state: &RuntimeState) -> Response {
if let Some(channel) = state.data.channels.get(&args.channel_id) {
match channel
.chat
.deleted_message(user_id, args.thread_root_message_index, args.message_id, state.env.now())
.deleted_message(user_id, args.thread_root_message_index, args.message_id)
{
DeletedMessageResult::Success(content) => Success(SuccessResult { content: *content }),
DeletedMessageResult::UserNotInGroup => UserNotInChannel,
Expand Down
19 changes: 15 additions & 4 deletions backend/canisters/community/impl/src/timer_job_types.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::activity_notifications::handle_activity_notification;
use crate::jobs::import_groups::{finalize_group_import, mark_import_complete, process_channel_members};
use crate::{mutate_state, read_state};
use canister_timer_jobs::Job;
Expand All @@ -12,6 +13,7 @@ pub enum TimerJob {
HardDeleteMessageContent(HardDeleteMessageContentJob),
DeleteFileReferences(DeleteFileReferencesJob),
EndPoll(EndPollJob),
RemoveExpiredEvents(RemoveExpiredEventsJob),
FinalizeGroupImport(FinalizeGroupImportJob),
ProcessGroupImportChannelMembers(ProcessGroupImportChannelMembersJob),
MarkGroupImportComplete(MarkGroupImportCompleteJob),
Expand All @@ -38,6 +40,9 @@ pub struct EndPollJob {
pub message_index: MessageIndex,
}

#[derive(Serialize, Deserialize, Clone)]
pub struct RemoveExpiredEventsJob;

#[derive(Serialize, Deserialize, Clone)]
pub struct FinalizeGroupImportJob {
pub group_id: ChatId,
Expand Down Expand Up @@ -74,6 +79,7 @@ impl Job for TimerJob {
TimerJob::HardDeleteMessageContent(job) => job.execute(),
TimerJob::DeleteFileReferences(job) => job.execute(),
TimerJob::EndPoll(job) => job.execute(),
TimerJob::RemoveExpiredEvents(job) => job.execute(),
TimerJob::FinalizeGroupImport(job) => job.execute(),
TimerJob::ProcessGroupImportChannelMembers(job) => job.execute(),
TimerJob::MarkGroupImportComplete(job) => job.execute(),
Expand All @@ -86,13 +92,11 @@ impl Job for TimerJob {
impl Job for HardDeleteMessageContentJob {
fn execute(&self) {
mutate_state(|state| {
let now = state.env.now();

if let Some(content) = state.data.channels.get_mut(&self.channel_id).and_then(|channel| {
channel
.chat
.events
.remove_deleted_message_content(self.thread_root_message_index, self.message_id, now)
.remove_deleted_message_content(self.thread_root_message_index, self.message_id)
}) {
let files_to_delete = content.blob_references();
if !files_to_delete.is_empty() {
Expand Down Expand Up @@ -126,12 +130,19 @@ impl Job for EndPollJob {
.chat
.events
.end_poll(self.thread_root_message_index, self.message_index, now);
// handle_activity_notification(state);

handle_activity_notification(state);
}
});
}
}

impl Job for RemoveExpiredEventsJob {
fn execute(&self) {
mutate_state(|state| state.run_event_expiry_job());
}
}

impl Job for FinalizeGroupImportJob {
fn execute(&self) {
finalize_group_import(self.group_id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,18 @@ fn commit(
is_bot: bool,
state: &mut RuntimeState,
) -> Response {
let now = state.env.now();

if let Some(channel) = state.data.channels.get_mut(&channel_id) {
let mut min_visible_event_index = EventIndex::default();
let mut min_visible_message_index = MessageIndex::default();

if !channel.chat.history_visible_to_new_joiners {
let events_reader = channel.chat.events.main_events_reader(now);
let events_reader = channel.chat.events.main_events_reader();
min_visible_event_index = events_reader.next_event_index();
min_visible_message_index = events_reader.next_message_index();
}

let now = state.env.now();

let mut users_added: Vec<UserId> = Vec::new();
let mut users_limit_reached: Vec<UserId> = Vec::new();

Expand Down
15 changes: 5 additions & 10 deletions backend/canisters/community/impl/src/updates/add_reaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use canister_tracing_macros::trace;
use chat_events::Reader;
use community_canister::add_reaction::{Response::*, *};
use group_chat_core::{AddRemoveReactionResult, GroupChatCore};
use types::{ChannelReactionAddedNotification, EventIndex, EventWrapper, Message, Notification, TimestampMillis, UserId};
use types::{ChannelReactionAddedNotification, EventIndex, EventWrapper, Message, Notification, UserId};

#[update_candid_and_msgpack]
#[trace]
Expand All @@ -27,9 +27,9 @@ fn add_reaction_impl(args: Args, state: &mut RuntimeState) -> Response {
}

let user_id = member.user_id;
let now = state.env.now();

if let Some(channel) = state.data.channels.get_mut(&args.channel_id) {
let now = state.env.now();
match channel.chat.add_reaction(
user_id,
args.thread_root_message_index,
Expand All @@ -38,7 +38,7 @@ fn add_reaction_impl(args: Args, state: &mut RuntimeState) -> Response {
now,
) {
AddRemoveReactionResult::Success => {
if let Some(message) = should_push_notification(&args, user_id, &channel.chat, now) {
if let Some(message) = should_push_notification(&args, user_id, &channel.chat) {
push_notification(
args,
user_id,
Expand Down Expand Up @@ -67,15 +67,10 @@ fn add_reaction_impl(args: Args, state: &mut RuntimeState) -> Response {
}
}

fn should_push_notification(
args: &Args,
user_id: UserId,
chat: &GroupChatCore,
now: TimestampMillis,
) -> Option<EventWrapper<Message>> {
fn should_push_notification(args: &Args, user_id: UserId, chat: &GroupChatCore) -> Option<EventWrapper<Message>> {
let message = chat
.events
.events_reader(EventIndex::default(), args.thread_root_message_index, now)
.events_reader(EventIndex::default(), args.thread_root_message_index)
// We pass in `None` in place of `my_user_id` because we don't want to hydrate
// the notification with data for the current user (eg. their poll votes).
.and_then(|events_reader| events_reader.message_event(args.message_id.into(), None))?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ pub(crate) fn join_channel_unchecked(
min_visible_event_index = e;
min_visible_message_index = m;
} else {
let events_reader = channel.chat.events.main_events_reader(now);
let events_reader = channel.chat.events.main_events_reader();
min_visible_event_index = events_reader.next_event_index();
min_visible_message_index = events_reader.next_message_index();
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fn c2c_tip_message_impl(args: Args, state: &mut RuntimeState) -> Response {
if let Some((message_index, message_event_index)) = channel
.chat
.events
.events_reader(EventIndex::default(), args.thread_root_message_index, now)
.events_reader(EventIndex::default(), args.thread_root_message_index)
.and_then(|r| {
r.message_event_internal(args.message_id.into())
.map(|e| (e.event.message_index, e.index))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,11 @@ fn prepare(args: &Args, state: &RuntimeState) -> Result<PrepareResult, Response>
};

let min_visible_event_index = channel_member.min_visible_event_index();
let now = state.env.now();

if let Some(proposal) = channel
.chat
.events
.visible_main_events_reader(min_visible_event_index, now)
.visible_main_events_reader(min_visible_event_index)
.message_internal(args.message_index.into())
.and_then(|m| if let MessageContentInternal::GovernanceProposal(p) = &m.content { Some(p) } else { None })
{
Expand Down Expand Up @@ -111,15 +110,16 @@ fn commit(channel_id: ChannelId, user_id: UserId, args: Args, state: &mut Runtim
None => return UserNotInChannel,
};

let now = state.env.now();
let min_visible_event_index = member.min_visible_event_index();

match channel
.chat
.events
.record_proposal_vote(user_id, min_visible_event_index, args.message_index, args.adopt, now)
.record_proposal_vote(user_id, min_visible_event_index, args.message_index, args.adopt)
{
RecordProposalVoteResult::Success => {
let now = state.env.now();

let votes = member.proposal_votes.entry(now).or_default();
if !votes.contains(&args.message_index) {
votes.push(args.message_index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,17 @@ fn register_proposal_vote_impl(args: Args, state: &mut RuntimeState) -> Response
None => return UserNotInChannel,
};

let now = state.env.now();
let min_visible_event_index = channel_member.min_visible_event_index();
let user_id = member.user_id;

match channel
.chat
.events
.record_proposal_vote(user_id, min_visible_event_index, args.message_index, args.adopt, now)
.record_proposal_vote(user_id, min_visible_event_index, args.message_index, args.adopt)
{
RecordProposalVoteResult::Success => {
let now = state.env.now();

channel
.chat
.members
Expand Down
Loading
Loading