Skip to content

Commit

Permalink
Use GroupedTimerJobQueue for pushing events between User canisters
Browse files Browse the repository at this point in the history
  • Loading branch information
hpeebles committed Oct 9, 2024
1 parent 4262ae2 commit 8a81dad
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 98 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/canisters/user/impl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ sonic_canister = { path = "../../../external_canisters/sonic/api" }
sonic_canister_c2c_client = { path = "../../../external_canisters/sonic/c2c_client" }
stable_memory = { path = "../../../libraries/stable_memory" }
storage_bucket_client = { path = "../../../libraries/storage_bucket_client" }
timer_job_queues = { path = "../../../libraries/timer_job_queues" }
tracing = { workspace = true }
types = { path = "../../../libraries/types" }
utils = { path = "../../../libraries/utils" }
Expand Down
7 changes: 0 additions & 7 deletions backend/canisters/user/impl/src/jobs/mod.rs

This file was deleted.

75 changes: 0 additions & 75 deletions backend/canisters/user/impl/src/jobs/push_user_canister_events.rs

This file was deleted.

26 changes: 19 additions & 7 deletions backend/canisters/user/impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::model::hot_group_exclusions::HotGroupExclusions;
use crate::model::p2p_swaps::P2PSwaps;
use crate::model::pin_number::PinNumber;
use crate::model::token_swaps::TokenSwaps;
use crate::timer_job_types::{RemoveExpiredEventsJob, TimerJob};
use crate::timer_job_types::{RemoveExpiredEventsJob, TimerJob, UserCanisterEventsBatch};
use candid::Principal;
use canister_state_macros::canister_state;
use canister_timer_jobs::TimerJobs;
Expand All @@ -22,12 +22,13 @@ use model::message_activity_events::MessageActivityEvents;
use model::referrals::Referrals;
use model::streak::Streak;
use notifications_canister::c2c_push_notification;
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Deserializer, Serialize};
use serde_bytes::ByteBuf;
use std::cell::RefCell;
use std::collections::HashSet;
use std::ops::Deref;
use std::time::Duration;
use timer_job_queues::GroupedTimerJobQueue;
use types::{
Achievement, BuildVersion, CanisterId, Chat, ChatId, ChatMetrics, ChitEarned, ChitEarnedReason, CommunityId,
Cryptocurrency, Cycles, Document, Milliseconds, Notification, TimestampMillis, Timestamped, UniquePersonProof, UserId,
Expand All @@ -41,7 +42,6 @@ use utils::time::{today, tomorrow, DAY_IN_MS, MINUTE_IN_MS};
mod crypto;
mod governance_clients;
mod guards;
mod jobs;
mod lifecycle;
mod memory;
mod model;
Expand Down Expand Up @@ -144,8 +144,7 @@ impl RuntimeState {

pub fn push_user_canister_event(&mut self, canister_id: CanisterId, event: UserCanisterEvent) {
if canister_id != OPENCHAT_BOT_USER_ID.into() && canister_id != self.env.canister_id() {
self.data.user_canister_events_queue.push(canister_id, event);
jobs::push_user_canister_events::try_run_now_for_canister(self, canister_id);
self.data.user_canister_events_queue.enqueue(canister_id.into(), event);
}
}

Expand Down Expand Up @@ -236,7 +235,8 @@ struct Data {
pub next_event_expiry: Option<TimestampMillis>,
pub token_swaps: TokenSwaps,
pub p2p_swaps: P2PSwaps,
pub user_canister_events_queue: CanisterEventSyncQueue<UserCanisterEvent>,
#[serde(deserialize_with = "deserialize_user_canister_events_queue")]
pub user_canister_events_queue: GroupedTimerJobQueue<UserCanisterEventsBatch>,
pub video_call_operators: Vec<Principal>,
pub event_store_client: EventStoreClient<CdkRuntime>,
pub pin_number: PinNumber,
Expand All @@ -255,6 +255,18 @@ struct Data {
pub message_activity_events: MessageActivityEvents,
}

fn deserialize_user_canister_events_queue<'de, D: Deserializer<'de>>(
d: D,
) -> Result<GroupedTimerJobQueue<UserCanisterEventsBatch>, D::Error> {
let previous: CanisterEventSyncQueue<UserCanisterEvent> = CanisterEventSyncQueue::deserialize(d)?;

let new = GroupedTimerJobQueue::new(10);
for (canister_id, events) in previous.take_all() {
new.enqueue_many(canister_id.into(), events);
}
Ok(new)
}

impl Data {
#[allow(clippy::too_many_arguments)]
pub fn new(
Expand Down Expand Up @@ -303,7 +315,7 @@ impl Data {
next_event_expiry: None,
token_swaps: TokenSwaps::default(),
p2p_swaps: P2PSwaps::default(),
user_canister_events_queue: CanisterEventSyncQueue::default(),
user_canister_events_queue: GroupedTimerJobQueue::new(10),
video_call_operators,
event_store_client: EventStoreClientBuilder::new(local_user_index_canister_id, CdkRuntime::default())
.with_flush_delay(Duration::from_millis(5 * MINUTE_IN_MS))
Expand Down
1 change: 0 additions & 1 deletion backend/canisters/user/impl/src/lifecycle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ fn init_state(env: Box<dyn Environment>, data: Data, wasm_version: BuildVersion)
let regular_jobs = regular_jobs::build();
let state = RuntimeState::new(env, data, regular_jobs);

crate::jobs::start(&state);
crate::init_state(state);
WASM_VERSION.set(Timestamped::new(wasm_version, now));
}
Expand Down
52 changes: 52 additions & 0 deletions backend/canisters/user/impl/src/timer_job_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{mutate_state, openchat_bot, read_state};
use canister_timer_jobs::Job;
use chat_events::{MessageContentInternal, MessageReminderContentInternal};
use serde::{Deserialize, Serialize};
use timer_job_queues::{TimerJobItem, TimerJobItemGroup};
use tracing::error;
use types::{BlobReference, Chat, ChatId, CommunityId, EventIndex, MessageId, MessageIndex, P2PSwapStatus, UserId};
use user_canister::{C2CReplyContext, UserCanisterEvent};
Expand Down Expand Up @@ -383,3 +384,54 @@ impl Job for MarkVideoCallEndedJob {
mutate_state(|state| end_video_call_impl(self.0, state));
}
}

#[derive(Serialize, Deserialize)]
pub struct UserCanisterEventsBatch {
user_id: UserId,
events: Vec<UserCanisterEvent>,
}

impl TimerJobItem for UserCanisterEventsBatch {
async fn process(&self) -> Result<(), bool> {
let response = user_canister_c2c_client::c2c_notify_user_canister_events(
self.user_id.into(),
&user_canister::c2c_notify_user_canister_events::Args {
events: self.events.clone(),
},
)
.await;

match response {
Ok(_) => Ok(()),
Err(_) => Err(true),
}
}
}

impl TimerJobItemGroup for UserCanisterEventsBatch {
type Key = UserId;
type Item = UserCanisterEvent;

fn new(user_id: Self::Key) -> Self {
UserCanisterEventsBatch {
user_id,
events: Vec::new(),
}
}

fn key(&self) -> Self::Key {
self.user_id
}

fn add(&mut self, item: Self::Item) {
self.events.push(item)
}

fn into_items(self) -> Vec<Self::Item> {
self.events
}

fn is_full(&self) -> bool {
self.events.len() > 100
}
}
8 changes: 3 additions & 5 deletions backend/canisters/user/impl/src/updates/accept_p2p_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,15 @@ async fn accept_p2p_swap(args: Args) -> Response {
if let AcceptP2PSwapResult::Success(status) =
chat.events.accept_p2p_swap(my_user_id, None, args.message_id, index, now)
{
state.data.user_canister_events_queue.push(
let thread_root_message_id = args.thread_root_message_index.map(|i| chat.main_message_index_to_id(i));
state.push_user_canister_event(
args.user_id.into(),
UserCanisterEvent::P2PSwapStatusChange(Box::new(P2PSwapStatusChange {
thread_root_message_id: args
.thread_root_message_index
.map(|i| chat.main_message_index_to_id(i)),
thread_root_message_id,
message_id: args.message_id,
status: P2PSwapStatus::Accepted(status),
})),
);
crate::jobs::push_user_canister_events::start_job_if_required(state);
state
.data
.award_achievement_and_notify(Achievement::AcceptedP2PSwapOffer, now);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,15 @@ fn c2c_notify_p2p_swap_status_change_impl(args: Args, state: &mut RuntimeState)
}

if let Some(status) = status_to_push_c2c {
state.data.user_canister_events_queue.push(
let thread_root_message_id = m.thread_root_message_index.map(|i| chat.main_message_index_to_id(i));
state.push_user_canister_event(
chat_id.into(),
UserCanisterEvent::P2PSwapStatusChange(Box::new(P2PSwapStatusChange {
thread_root_message_id: m.thread_root_message_index.map(|i| chat.main_message_index_to_id(i)),
thread_root_message_id,
message_id: m.message_id,
status,
})),
);
crate::jobs::push_user_canister_events::start_job_if_required(state);
}
}
}
Expand Down

0 comments on commit 8a81dad

Please sign in to comment.