Skip to content

Commit

Permalink
Merge branch 'master' into release
Browse files Browse the repository at this point in the history
  • Loading branch information
megrogan authored Jul 10, 2024
2 parents 241b02a + 2811b2a commit 9178f14
Show file tree
Hide file tree
Showing 12 changed files with 97 additions and 26 deletions.
2 changes: 2 additions & 0 deletions backend/canisters/community/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [unreleased]

## [[2.0.1235](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1235-community)] - 2024-07-09

### Added

- Add `LifetimeDiamondMembership` access gate ([#5986](https://github.com/open-chat-labs/open-chat/pull/5986))
Expand Down
2 changes: 2 additions & 0 deletions backend/canisters/group/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [unreleased]

## [[2.0.1234](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1234-group)] - 2024-07-09

### Added

- Add `LifetimeDiamondMembership` access gate ([#5986](https://github.com/open-chat-labs/open-chat/pull/5986))
Expand Down
8 changes: 8 additions & 0 deletions backend/canisters/local_group_index/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [unreleased]

## [[2.0.1232](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1232-local_group_index)] - 2024-07-09

### Added

- Add `LifetimeDiamondMembership` access gate ([#5986](https://github.com/open-chat-labs/open-chat/pull/5986))
- Add `UniquePerson` access gate ([#5993](https://github.com/open-chat-labs/open-chat/pull/5993))
- Support composite access gates ([#5988](https://github.com/open-chat-labs/open-chat/pull/5988))

## [[2.0.1189](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1189-local_group_index)] - 2024-04-23

### Changed
Expand Down
6 changes: 6 additions & 0 deletions backend/canisters/user_index/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [unreleased]

### Added

- Uninstall canisters of empty users ([#6018](https://github.com/open-chat-labs/open-chat/pull/6018))

## [[2.0.1231](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1231-user_index)] - 2024-07-08

### Fixed

- Fix `chitbands` endpoint ([#6007](https://github.com/open-chat-labs/open-chat/pull/6007))
Expand Down
4 changes: 4 additions & 0 deletions backend/canisters/user_index/impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ impl RuntimeState {
oc_public_key: self.data.oc_key_pair.public_key_pem().to_string(),
empty_users: self.data.empty_users.iter().take(100).copied().collect(),
empty_users_length: self.data.empty_users.len(),
deleted_users: self.data.deleted_users.iter().take(100).map(|u| u.user_id).collect(),
deleted_users_length: self.data.deleted_users.len(),
}
}
}
Expand Down Expand Up @@ -490,6 +492,8 @@ pub struct Metrics {
pub oc_public_key: String,
pub empty_users: Vec<UserId>,
pub empty_users_length: usize,
pub deleted_users: Vec<UserId>,
pub deleted_users_length: usize,
}

#[derive(Serialize, Debug)]
Expand Down
29 changes: 16 additions & 13 deletions backend/canisters/user_index/impl/src/lifecycle/post_upgrade.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::lifecycle::{init_env, init_state};
use crate::memory::get_upgrades_memory;
use crate::Data;
use crate::{mutate_state, Data};
use canister_logger::LogEntry;
use canister_tracing_macros::trace;
use ic_cdk::post_upgrade;
use local_user_index_canister::{DeleteUser, Event};
use stable_memory::get_reader;
use tracing::info;
use user_index_canister::post_upgrade::Args;
Expand All @@ -25,16 +26,18 @@ fn post_upgrade(args: Args) {

info!(version = %args.wasm_version, "Post-upgrade complete");

// Enable this code block once Users and LocalUserIndexes have been upgraded
// mutate_state(|state| {
// for user_id in state.data.empty_users.iter().copied() {
// state.push_event_to_local_user_index(
// user_id,
// Event::DeleteUser(DeleteUser {
// user_id,
// triggered_by_user: false,
// }),
// );
// }
// })
mutate_state(|state| {
for user_id in std::mem::take(&mut state.data.empty_users) {
if let Some(canister_id) = state.data.local_index_map.get_index_canister(&user_id) {
state.data.user_index_event_sync_queue.push(
canister_id,
Event::DeleteUser(DeleteUser {
user_id,
triggered_by_user: false,
}),
);
}
}
crate::jobs::sync_events_to_local_user_index_canisters::start_job_if_required(state);
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ impl LocalUserIndexMap {
false
}

pub fn remove_user(&mut self, user_id: &UserId) -> bool {
if let Some(index) = self.user_to_index.remove(user_id) {
self.index_map
.entry(index)
.and_modify(|i| i.user_count = i.user_count.saturating_sub(1));

true
} else {
false
}
}

pub fn index_for_new_user(&self) -> Option<CanisterId> {
self.index_map
.iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ fn handle_event(event: Event, state: &mut RuntimeState) {
Event::UserDeleted(ev) => {
let now = state.env.now();
state.data.users.delete_user(ev.user_id, now);
state.data.local_index_map.remove_user(&ev.user_id);
state.data.empty_users.remove(&ev.user_id);
state.data.deleted_users.push(DeletedUser {
user_id: ev.user_id,
triggered_by_user: false,
Expand Down
2 changes: 2 additions & 0 deletions backend/notification_pusher/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ pub async fn run_notifications_pusher<I: IndexStore + 'static>(
}

let invalid_subscriptions = Arc::new(RwLock::default());
let throttled_subscriptions = Arc::new(RwLock::default());
for _ in 0..pusher_count {
let pusher = Pusher::new(
receiver.clone(),
&vapid_private_pem,
subscriptions_to_remove_sender.clone(),
invalid_subscriptions.clone(),
throttled_subscriptions.clone(),
);
tokio::spawn(pusher.run());
}
Expand Down
35 changes: 30 additions & 5 deletions backend/notification_pusher/core/src/pusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,22 @@ use std::collections::{BinaryHeap, HashMap};
use std::sync::{Arc, RwLock};
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::{error, info};
use types::{Error, TimestampMillis, UserId};
use types::{Error, Milliseconds, TimestampMillis, UserId};
use web_push::{
ContentEncoding, HyperWebPushClient, PartialVapidSignatureBuilder, SubscriptionInfo, Urgency, VapidSignature,
VapidSignatureBuilder, WebPushClient, WebPushError, WebPushMessage, WebPushMessageBuilder,
};

const MAX_PAYLOAD_LENGTH_BYTES: usize = 3 * 1000; // Just under 3KB
const ONE_MINUTE: Milliseconds = 60 * 1000;

pub struct Pusher {
receiver: Receiver<Notification>,
web_push_client: HyperWebPushClient,
sig_builder: PartialVapidSignatureBuilder,
subscriptions_to_remove_sender: Sender<(UserId, String)>,
invalid_subscriptions: Arc<RwLock<HashMap<String, TimestampMillis>>>,
throttled_subscriptions: Arc<RwLock<HashMap<String, TimestampMillis>>>,
}

impl Pusher {
Expand All @@ -26,13 +28,15 @@ impl Pusher {
vapid_private_pem: &str,
subscriptions_to_remove_sender: Sender<(UserId, String)>,
invalid_subscriptions: Arc<RwLock<HashMap<String, TimestampMillis>>>,
throttled_subscriptions: Arc<RwLock<HashMap<String, TimestampMillis>>>,
) -> Self {
Self {
receiver,
web_push_client: HyperWebPushClient::new(),
sig_builder: VapidSignatureBuilder::from_pem_no_sub(vapid_private_pem.as_bytes()).unwrap(),
subscriptions_to_remove_sender,
invalid_subscriptions,
throttled_subscriptions,
}
}

Expand All @@ -43,6 +47,15 @@ impl Pusher {
continue;
}
}
if let Ok(map) = self.throttled_subscriptions.read() {
if let Some(until) = map.get(&notification.subscription_info.endpoint) {
let timestamp = timestamp();
if *until > timestamp {
info!("Notification skipped due to subscription being throttled");
continue;
}
}
}
if let Err(error) = self.push_notification(&notification).await {
let bytes = notification.payload.len();
error!(
Expand All @@ -68,13 +81,11 @@ impl Pusher {
.subscriptions_to_remove_sender
.try_send((notification.recipient, subscription.keys.p256dh.clone()));

let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64;

if let Ok(mut map) = self.invalid_subscriptions.write() {
if map.len() > 10000 {
prune_invalid_subscriptions(&mut map);
}
map.insert(subscription.endpoint.clone(), timestamp);
map.insert(subscription.endpoint.clone(), timestamp());
}

info!(
Expand All @@ -83,7 +94,17 @@ impl Pusher {
);
Ok(())
}
_ => Err(error.into()),
_ => {
if let Ok(mut map) = self.throttled_subscriptions.write() {
if map.len() > 100 {
let timestamp = timestamp();
map.retain(|_, ts| *ts > timestamp);
}
info!(subscription.endpoint, "Subscription throttled for 1 minute");
map.insert(subscription.endpoint.clone(), timestamp() + ONE_MINUTE);
}
Err(error.into())
}
}
} else {
Ok(())
Expand Down Expand Up @@ -136,6 +157,10 @@ fn prune_invalid_subscriptions(map: &mut HashMap<String, TimestampMillis>) {
}
}

fn timestamp() -> TimestampMillis {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64
}

#[derive(Debug)]
#[allow(dead_code)]
struct SubscriptionInfoDebug<'a> {
Expand Down
8 changes: 6 additions & 2 deletions backend/notification_pusher/core/src/subscription_remover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::ic_agent::IcAgent;
use async_channel::Receiver;
use std::collections::HashMap;
use tokio::time;
use tracing::{error, info};
use types::{CanisterId, PushIfNotContains, UserId};

pub struct SubscriptionRemover {
Expand Down Expand Up @@ -35,12 +36,15 @@ impl SubscriptionRemover {
}

if !subscriptions_to_remove.is_empty() {
if let Err(_error) = self
let count = subscriptions_to_remove.len();
if let Err(error) = self
.ic_agent
.remove_subscriptions(&self.index_canister_id, subscriptions_to_remove)
.await
{
// Log error
error!(?error, "Failed to remove subscriptions");
} else {
info!("Removed {count} subscriptions");
}
}

Expand Down
13 changes: 7 additions & 6 deletions frontend/app/src/components/home/Home.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,14 @@
}
function remoteVideoCallStarted(ev: RemoteVideoCallStartedEvent) {
// Check user is not already in the call and it started less than an hour ago
if (
!ev.detail.currentUserIsParticipant &&
Number(ev.detail.timestamp) > Date.now() - 60 * 60 * 1000
) {
incomingVideoCall.set(ev.detail);
// If current user is already in the call, or has previously been in the call, or the call started more than an hour ago, exit
if ($activeVideoCall?.chatId === ev.detail.chatId ||
ev.detail.currentUserIsParticipant ||
Number(ev.detail.timestamp) < Date.now() - 60 * 60 * 1000) {
return;
}
incomingVideoCall.set(ev.detail);
}
async function newChatSelected(
Expand Down

0 comments on commit 9178f14

Please sign in to comment.