Skip to content

Commit

Permalink
Remove chat member updates after 31 days (#6907)
Browse files Browse the repository at this point in the history
  • Loading branch information
hpeebles authored Nov 28, 2024
1 parent 80ff71e commit 8c75ec1
Show file tree
Hide file tree
Showing 14 changed files with 122 additions and 16 deletions.
1 change: 1 addition & 0 deletions backend/canisters/community/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### Changed

- Simplify how we store and query proposal votes ([#6906](https://github.com/open-chat-labs/open-chat/pull/6906))
- Remove member updates after 90 days ([#6907](https://github.com/open-chat-labs/open-chat/pull/6907))
- Extract member to channel links out of each member instance ([#6910](https://github.com/open-chat-labs/open-chat/pull/6910))

### Removed
Expand Down
16 changes: 15 additions & 1 deletion backend/canisters/community/impl/src/lifecycle/post_upgrade.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::jobs::import_groups::finalize_group_import;
use crate::lifecycle::{init_env, init_state};
use crate::memory::{get_stable_memory_map_memory, get_upgrades_memory};
use crate::{read_state, Data};
use crate::{mutate_state, read_state, Data};
use canister_logger::LogEntry;
use canister_tracing_macros::trace;
use community_canister::post_upgrade::Args;
Expand Down Expand Up @@ -34,6 +34,20 @@ fn post_upgrade(args: Args) {

info!(version = %args.wasm_version, "Post-upgrade complete");

mutate_state(|state| {
let now = state.env.now();
state.data.members.populate_member_channel_links();
for channel in state.data.channels.iter_mut() {
let count_removed = channel.chat.members.prune_member_updates(now);
info!(count_removed, "Removed old member updates");

if channel.chat.subtype.is_some() {
let count_removed = channel.chat.members.prune_proposal_votes(now);
info!(count_removed, "Removed old proposal votes");
}
}
});

read_state(|state| {
let now = state.env.now();
state
Expand Down
1 change: 1 addition & 0 deletions backend/canisters/community/impl/src/model/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ impl Channel {
membership,
video_call_in_progress: updates.video_call_in_progress,
external_url: updates.external_url,
any_updates_missed: updates.any_updates_missed,
})
}

Expand Down
1 change: 1 addition & 0 deletions backend/canisters/group/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### Changed

- Simplify how we store and query proposal votes ([#6906](https://github.com/open-chat-labs/open-chat/pull/6906))
- Remove member updates after 90 days ([#6907](https://github.com/open-chat-labs/open-chat/pull/6907))

### Removed

Expand Down
13 changes: 12 additions & 1 deletion backend/canisters/group/impl/src/lifecycle/post_upgrade.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::lifecycle::{init_env, init_state};
use crate::memory::{get_stable_memory_map_memory, get_upgrades_memory};
use crate::{read_state, Data};
use crate::{mutate_state, read_state, Data};
use canister_logger::LogEntry;
use canister_tracing_macros::trace;
use group_canister::post_upgrade::Args;
Expand All @@ -27,6 +27,17 @@ fn post_upgrade(args: Args) {

info!(version = %args.wasm_version, "Post-upgrade complete");

mutate_state(|state| {
let now = state.env.now();
let count_removed = state.data.chat.members.prune_member_updates(now);
info!(count_removed, "Removed old member updates");

if state.data.chat.subtype.is_some() {
let count_removed = state.data.chat.members.prune_proposal_votes(now);
info!(count_removed, "Removed old proposal votes");
}
});

read_state(|state| {
let now = state.env.now();
state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ fn summary_updates_impl(updates_since: TimestampMillis, on_behalf_of: Option<Pri
rules_accepted: membership.rules_accepted,
membership: Some(membership),
video_call_in_progress: updates.video_call_in_progress,
any_updates_missed: updates.any_updates_missed,
},
})
}
1 change: 1 addition & 0 deletions backend/canisters/local_user_index/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### Changed

- Make `ChannelId` comparisons use their 32bit representation ([#6885](https://github.com/open-chat-labs/open-chat/pull/6885))
- Add `any_missed_updates` to summary updates responses ([#6907](https://github.com/open-chat-labs/open-chat/pull/6907))

## [[2.0.1463](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1463-local_user_index)] - 2024-11-21

Expand Down
10 changes: 5 additions & 5 deletions backend/libraries/chat_events/src/chat_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use serde_bytes::ByteBuf;
use sha2::{Digest, Sha256};
use std::cmp::max;
use std::collections::hash_map::Entry::{Occupied, Vacant};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{BTreeMap, HashSet};
use std::mem;
use std::ops::DerefMut;
use tracing::error;
Expand All @@ -37,7 +37,7 @@ pub struct ChatEvents {
main: ChatEventsList,
threads: BTreeMap<MessageIndex, ChatEventsList>,
metrics: ChatMetricsInternal,
per_user_metrics: HashMap<UserId, ChatMetricsInternal>,
per_user_metrics: BTreeMap<UserId, ChatMetricsInternal>,
frozen: bool,
events_ttl: Timestamped<Option<Milliseconds>>,
expiring_events: ExpiringEvents,
Expand Down Expand Up @@ -103,7 +103,7 @@ impl ChatEvents {
main: ChatEventsList::new(chat, None),
threads: BTreeMap::new(),
metrics: ChatMetricsInternal::default(),
per_user_metrics: HashMap::new(),
per_user_metrics: BTreeMap::new(),
frozen: false,
events_ttl: Timestamped::new(events_ttl, now),
expiring_events: ExpiringEvents::default(),
Expand Down Expand Up @@ -133,7 +133,7 @@ impl ChatEvents {
main: ChatEventsList::new(chat, None),
threads: BTreeMap::new(),
metrics: ChatMetricsInternal::default(),
per_user_metrics: HashMap::new(),
per_user_metrics: BTreeMap::new(),
frozen: false,
events_ttl: Timestamped::new(events_ttl, now),
expiring_events: ExpiringEvents::default(),
Expand Down Expand Up @@ -2226,7 +2226,7 @@ impl ChatEvents {

fn add_to_metrics<F: FnMut(&mut ChatMetricsInternal)>(
metrics: &mut ChatMetricsInternal,
per_user_metrics: &mut HashMap<UserId, ChatMetricsInternal>,
per_user_metrics: &mut BTreeMap<UserId, ChatMetricsInternal>,
user_id: UserId,
mut action: F,
timestamp: TimestampMillis,
Expand Down
8 changes: 8 additions & 0 deletions backend/libraries/constants/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ pub const LIFETIME_DIAMOND_TIMESTAMP: TimestampMillis = 30000000000000; // This

pub const PRIZE_FEE_PERCENT: u8 = 5;

// The length of time to hold on to data required to compile chat summary updates, eg. event last
// updated timestamps
pub const DURATION_TO_MAINTAIN_SUMMARY_UPDATES_DATA: Milliseconds = 31 * DAY_IN_MS;

pub fn calculate_summary_updates_data_removal_cutoff(now: TimestampMillis) -> Milliseconds {
now.saturating_sub(DURATION_TO_MAINTAIN_SUMMARY_UPDATES_DATA)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
3 changes: 3 additions & 0 deletions backend/libraries/group_chat_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ impl GroupChatCore {
.if_set_after(since)
.cloned()
.map_or(OptionUpdate::NoChange, OptionUpdate::from_update),
any_updates_missed: self.members.any_updates_removed(since)
|| member.as_ref().map(|m| m.any_updates_removed(since)).unwrap_or_default(),
}
}

Expand Down Expand Up @@ -2177,6 +2179,7 @@ pub struct SummaryUpdates {
pub rules_changed: bool,
pub video_call_in_progress: OptionUpdate<VideoCall>,
pub external_url: OptionUpdate<String>,
pub any_updates_missed: bool,
}

#[derive(Serialize, Deserialize, Clone, Debug, Default)]
Expand Down
79 changes: 70 additions & 9 deletions backend/libraries/group_chat_core/src/members.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::mentions::Mentions;
use crate::roles::GroupRoleInternal;
use crate::AccessRulesInternal;
use candid::Principal;
use constants::calculate_summary_updates_data_removal_cutoff;
use group_community_common::{Member, Members};
use serde::de::{SeqAccess, Visitor};
use serde::ser::SerializeSeq;
Expand Down Expand Up @@ -34,6 +36,8 @@ pub struct GroupMembers {
blocked: BTreeSet<UserId>,
suspended: BTreeSet<UserId>,
updates: BTreeSet<(TimestampMillis, UserId, MemberUpdate)>,
#[serde(default)]
latest_update_removed: TimestampMillis,
}

#[derive(Serialize_repr, Deserialize_repr, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
Expand All @@ -50,6 +54,14 @@ pub enum MemberUpdate {

#[allow(clippy::too_many_arguments)]
impl GroupMembers {
pub fn prune_proposal_votes(&mut self, now: TimestampMillis) -> u32 {
let mut count = 0;
for member in self.members.values_mut() {
count += member.prune_proposal_votes(now);
}
count
}

pub fn new(creator_user_id: UserId, user_type: UserType, now: TimestampMillis) -> GroupMembers {
let member = GroupMemberInternal {
user_id: creator_user_id,
Expand All @@ -62,6 +74,7 @@ impl GroupMembers {
followed_threads: TimestampedSet::new(),
unfollowed_threads: TimestampedSet::new(),
proposal_votes: BTreeSet::default(),
latest_proposal_vote_removed: 0,
suspended: Timestamped::default(),
rules_accepted: Some(Timestamped::new(Version::zero(), now)),
user_type,
Expand All @@ -84,6 +97,7 @@ impl GroupMembers {
lapsed: BTreeSet::new(),
suspended: BTreeSet::new(),
updates: BTreeSet::new(),
latest_update_removed: 0,
}
}

Expand Down Expand Up @@ -112,6 +126,7 @@ impl GroupMembers {
followed_threads: TimestampedSet::new(),
unfollowed_threads: TimestampedSet::new(),
proposal_votes: BTreeSet::default(),
latest_proposal_vote_removed: 0,
suspended: Timestamped::default(),
rules_accepted: None,
user_type,
Expand All @@ -124,7 +139,7 @@ impl GroupMembers {
if !notifications_muted {
self.notifications_unmuted.insert(user_id);
}
self.updates.insert((now, user_id, MemberUpdate::Added));
self.prune_then_insert_member_update(user_id, MemberUpdate::Added, now);
AddResult::Success(AddMemberSuccess { member, unlapse: false })
} else {
AddResult::AlreadyInGroup
Expand Down Expand Up @@ -152,7 +167,7 @@ impl GroupMembers {
self.suspended.remove(&user_id);
}
self.member_ids.remove(&user_id);
self.updates.insert((now, user_id, MemberUpdate::Removed));
self.prune_then_insert_member_update(user_id, MemberUpdate::Removed, now);
Some(member)
} else {
None
Expand All @@ -161,7 +176,7 @@ impl GroupMembers {

pub fn block(&mut self, user_id: UserId, now: TimestampMillis) -> bool {
if self.blocked.insert(user_id) {
self.updates.insert((now, user_id, MemberUpdate::Blocked));
self.prune_then_insert_member_update(user_id, MemberUpdate::Blocked, now);
true
} else {
false
Expand All @@ -170,7 +185,7 @@ impl GroupMembers {

pub fn unblock(&mut self, user_id: UserId, now: TimestampMillis) -> bool {
if self.blocked.remove(&user_id) {
self.updates.insert((now, user_id, MemberUpdate::Unblocked));
self.prune_then_insert_member_update(user_id, MemberUpdate::Unblocked, now);
true
} else {
false
Expand Down Expand Up @@ -293,7 +308,7 @@ impl GroupMembers {
_ => false,
};

self.updates.insert((now, user_id, MemberUpdate::RoleChanged));
self.prune_then_insert_member_update(user_id, MemberUpdate::RoleChanged, now);

ChangeRoleResult::Success(ChangeRoleSuccess { prev_role })
}
Expand Down Expand Up @@ -321,6 +336,7 @@ impl GroupMembers {

pub fn register_proposal_vote(&mut self, user_id: UserId, message_index: MessageIndex, now: TimestampMillis) {
if let Some(member) = self.members.get_mut(&user_id) {
member.prune_proposal_votes(now);
member.proposal_votes.insert((now, message_index));
}
}
Expand All @@ -342,6 +358,7 @@ impl GroupMembers {
}

pub fn unlapse_all(&mut self, now: TimestampMillis) {
self.prune_member_updates(now);
for user_id in std::mem::take(&mut self.lapsed) {
if let Some(member) = self.members.get_mut(&user_id) {
if member.set_lapsed(false, now) {
Expand Down Expand Up @@ -370,11 +387,11 @@ impl GroupMembers {
self.lapsed.remove(&user_id);
}

self.updates.insert((
now,
self.prune_then_insert_member_update(
user_id,
if lapsed { MemberUpdate::Lapsed } else { MemberUpdate::Unlapsed },
));
now,
);
}
}

Expand Down Expand Up @@ -423,6 +440,30 @@ impl GroupMembers {
self.updates.iter().next_back().map(|(ts, _, _)| *ts)
}

pub fn any_updates_removed(&self, since: TimestampMillis) -> bool {
self.latest_update_removed > since
}

fn prune_then_insert_member_update(&mut self, user_id: UserId, update: MemberUpdate, now: TimestampMillis) {
self.prune_member_updates(now);
self.updates.insert((now, user_id, update));
}

pub fn prune_member_updates(&mut self, now: TimestampMillis) -> u32 {
let cutoff = calculate_summary_updates_data_removal_cutoff(now);
let still_valid = self
.updates
.split_off(&(cutoff, Principal::anonymous().into(), MemberUpdate::Added));

let removed = std::mem::replace(&mut self.updates, still_valid);

if let Some((ts, _, _)) = removed.last() {
self.latest_update_removed = *ts;
}

removed.len() as u32
}

#[cfg(test)]
fn check_invariants(&self) {
let mut member_ids = BTreeSet::new();
Expand Down Expand Up @@ -530,6 +571,8 @@ pub struct GroupMemberInternal {
#[serde(rename = "p", default, skip_serializing_if = "BTreeSet::is_empty")]
#[serde(deserialize_with = "deserialize_proposal_votes")]
proposal_votes: BTreeSet<(TimestampMillis, MessageIndex)>,
#[serde(rename = "pr", default, skip_serializing_if = "is_default")]
latest_proposal_vote_removed: TimestampMillis,
#[serde(rename = "s", default, skip_serializing_if = "is_default")]
suspended: Timestamped<bool>,
#[serde(rename = "ra", default, skip_serializing_if = "is_default")]
Expand Down Expand Up @@ -656,6 +699,22 @@ impl GroupMemberInternal {
.take_while(move |(ts, _)| *ts > since)
.copied()
}

pub fn any_updates_removed(&self, since: TimestampMillis) -> bool {
self.latest_proposal_vote_removed > since
}

fn prune_proposal_votes(&mut self, now: TimestampMillis) -> u32 {
let cutoff = calculate_summary_updates_data_removal_cutoff(now);
let still_valid = self.proposal_votes.split_off(&(cutoff, 0.into()));
let removed = std::mem::replace(&mut self.proposal_votes, still_valid);

if let Some((ts, _)) = removed.last() {
self.latest_proposal_vote_removed = *ts;
}

removed.len() as u32
}
}

impl Member for GroupMemberInternal {
Expand Down Expand Up @@ -745,6 +804,7 @@ mod tests {
followed_threads: TimestampedSet::default(),
unfollowed_threads: TimestampedSet::default(),
proposal_votes: BTreeSet::new(),
latest_proposal_vote_removed: 0,
suspended: Timestamped::default(),
min_visible_event_index: 0.into(),
min_visible_message_index: 0.into(),
Expand Down Expand Up @@ -775,6 +835,7 @@ mod tests {
followed_threads: [(1.into(), 1)].into_iter().collect(),
unfollowed_threads: [(1.into(), 1)].into_iter().collect(),
proposal_votes: BTreeSet::from([(1, 1.into())]),
latest_proposal_vote_removed: 1,
suspended: Timestamped::new(true, 1),
min_visible_event_index: 1.into(),
min_visible_message_index: 1.into(),
Expand All @@ -786,7 +847,7 @@ mod tests {
let member_bytes = msgpack::serialize_then_unwrap(&member);
let member_bytes_len = member_bytes.len();

assert_eq!(member_bytes_len, 159);
assert_eq!(member_bytes_len, 163);

let _deserialized: GroupMemberInternal = msgpack::deserialize_then_unwrap(&member_bytes);
}
Expand Down
Loading

0 comments on commit 8c75ec1

Please sign in to comment.