Skip to content

Commit

Permalink
Simplify timer jobs + make them more efficient (#5233)
Browse files Browse the repository at this point in the history
  • Loading branch information
hpeebles authored Jan 23, 2024
1 parent c63f9df commit b0a2402
Show file tree
Hide file tree
Showing 35 changed files with 233 additions and 252 deletions.
1 change: 1 addition & 0 deletions backend/canisters/community/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

### Changed

- Simplify timer jobs + make them more efficient ([#5233](https://github.com/open-chat-labs/open-chat/pull/5233))
- Avoid sending prize winner notifications ([#5236](https://github.com/open-chat-labs/open-chat/pull/5236))

### Fixed
Expand Down
26 changes: 10 additions & 16 deletions backend/canisters/community/impl/src/jobs/import_groups.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use crate::activity_notifications::extract_activity;
use crate::model::channels::Channel;
use crate::model::events::{CommunityEventInternal, GroupImportedInternal};
use crate::model::groups_being_imported::NextBatchResult;
use crate::model::members::AddResult;
use crate::timer_job_types::{FinalizeGroupImportJob, ProcessGroupImportChannelMembersJob, TimerJob};
use crate::updates::c2c_join_channel::join_channel_unchecked;
use crate::{mutate_state, RuntimeState};
use crate::{mutate_state, read_state, RuntimeState};
use group_canister::c2c_export_group::{Args, Response};
use group_chat_core::GroupChatCore;
use ic_cdk_timers::TimerId;
Expand All @@ -24,35 +23,32 @@ thread_local! {

pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool {
if TIMER_ID.get().is_none() && !state.data.groups_being_imported.is_empty() {
let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run);
let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run);
TIMER_ID.set(Some(timer_id));
trace!("'import_groups' job started");
true
} else {
false
}
}

fn run() {
match mutate_state(next_batch) {
NextBatchResult::Success(groups) => ic_cdk::spawn(import_groups(groups)),
NextBatchResult::Continue => {}
NextBatchResult::Exit => {
if let Some(timer_id) = TIMER_ID.take() {
ic_cdk_timers::clear_timer(timer_id);
trace!("'import_groups' job stopped");
}
}
trace!("'import_groups' job running");
TIMER_ID.set(None);

let batch = mutate_state(next_batch);
if !batch.is_empty() {
ic_cdk::spawn(import_groups(batch));
}
}

fn next_batch(state: &mut RuntimeState) -> NextBatchResult {
fn next_batch(state: &mut RuntimeState) -> Vec<(ChatId, u64)> {
let now = state.env.now();
state.data.groups_being_imported.next_batch(now)
}

async fn import_groups(groups: Vec<(ChatId, u64)>) {
futures::future::join_all(groups.into_iter().map(|(g, i)| import_group(g, i))).await;
read_state(start_job_if_required);
}

async fn import_group(group_id: ChatId, from: u64) {
Expand Down Expand Up @@ -94,8 +90,6 @@ async fn import_group(group_id: ChatId, from: u64) {
.data
.groups_being_imported
.mark_batch_failed(&group_id, format!("{error:?}"));

start_job_if_required(state);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{mutate_state, RuntimeState};
use crate::{mutate_state, read_state, RuntimeState};
use candid::Principal;
use group_community_common::{PaymentRecipient, PendingPayment, PendingPaymentReason};
use ic_cdk_timers::TimerId;
Expand All @@ -16,23 +16,23 @@ thread_local! {

pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool {
if TIMER_ID.get().is_none() && !state.data.pending_payments_queue.is_empty() {
let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run);
let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run);
TIMER_ID.set(Some(timer_id));
trace!("'make_pending_payments' job started");
true
} else {
false
}
}

pub fn run() {
trace!("'make_pending_payments' job running");
TIMER_ID.set(None);

let (pending_payment, now_nanos) = mutate_state(|state| (state.data.pending_payments_queue.pop(), state.env.now_nanos()));

if let Some(pending_payment) = pending_payment {
ic_cdk::spawn(process_payment(pending_payment, now_nanos));
} else if let Some(timer_id) = TIMER_ID.take() {
ic_cdk_timers::clear_timer(timer_id);
trace!("'make_pending_payments' job stopped");
read_state(start_job_if_required);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,6 @@ pub struct GroupsBeingImported {
groups: HashMap<ChatId, GroupBeingImported>,
}

pub enum NextBatchResult {
Success(Vec<(ChatId, u64)>),
Continue,
Exit,
}

impl GroupsBeingImported {
pub fn add(
&mut self,
Expand All @@ -37,23 +31,15 @@ impl GroupsBeingImported {
self.groups.contains_key(group_id)
}

pub fn next_batch(&mut self, now: TimestampMillis) -> NextBatchResult {
if self.groups.is_empty() {
NextBatchResult::Exit
} else {
let mut batch = Vec::new();
for (chat_id, group) in self.groups.iter_mut().filter(|(_, g)| !g.is_complete()) {
if group.current_batch_started.is_none() {
group.current_batch_started = Some(now);
batch.push((*chat_id, group.bytes.len() as u64));
}
}
if batch.is_empty() {
NextBatchResult::Continue
} else {
NextBatchResult::Success(batch)
pub fn next_batch(&mut self, now: TimestampMillis) -> Vec<(ChatId, u64)> {
let mut batch = Vec::new();
for (chat_id, group) in self.groups.iter_mut().filter(|(_, g)| !g.is_complete()) {
if group.current_batch_started.is_none() {
group.current_batch_started = Some(now);
batch.push((*chat_id, group.bytes.len() as u64));
}
}
batch
}

// Returns true if the group bytes have all been imported, else false
Expand Down
1 change: 1 addition & 0 deletions backend/canisters/escrow/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### Changed

- Include `created_by` on `SwapStatusChange` messages ([#5230](https://github.com/open-chat-labs/open-chat/pull/5230))
- Simplify timer jobs + make them more efficient ([#5233](https://github.com/open-chat-labs/open-chat/pull/5233))

## [[2.0.1014](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1014-escrow)] - 2024-01-19

Expand Down
12 changes: 6 additions & 6 deletions backend/canisters/escrow/impl/src/jobs/make_pending_payments.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::model::pending_payments_queue::{PendingPayment, PendingPaymentReason};
use crate::{mutate_state, RuntimeState};
use crate::{mutate_state, read_state, RuntimeState};
use candid::Principal;
use escrow_canister::{deposit_subaccount, SwapStatus};
use ic_cdk_timers::TimerId;
Expand All @@ -18,21 +18,21 @@ thread_local! {

pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool {
if TIMER_ID.get().is_none() && !state.data.pending_payments_queue.is_empty() {
let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run);
let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run);
TIMER_ID.set(Some(timer_id));
trace!("'make_pending_payments' job started");
true
} else {
false
}
}

pub fn run() {
trace!("'make_pending_payments' job running");
TIMER_ID.set(None);

if let Some(pending_payment) = mutate_state(|state| state.data.pending_payments_queue.pop()) {
ic_cdk::spawn(process_payment(pending_payment));
} else if let Some(timer_id) = TIMER_ID.take() {
ic_cdk_timers::clear_timer(timer_id);
trace!("'make_pending_payments' job stopped");
read_state(start_job_if_required);
}
}

Expand Down
12 changes: 6 additions & 6 deletions backend/canisters/escrow/impl/src/jobs/notify_status_change.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{mutate_state, RuntimeState};
use crate::{mutate_state, read_state, RuntimeState};
use canister_client::make_c2c_call;
use escrow_canister::SwapStatusChange;
use ic_cdk_timers::TimerId;
Expand All @@ -13,21 +13,21 @@ thread_local! {

pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool {
if TIMER_ID.get().is_none() && !state.data.notify_status_change_queue.is_empty() {
let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run);
let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run);
TIMER_ID.set(Some(timer_id));
trace!("'notify_status_change' job started");
true
} else {
false
}
}

pub fn run() {
trace!("'notify_status_change' job running");
TIMER_ID.set(None);

if let Some((canister_id, notification)) = mutate_state(get_next) {
ic_cdk::spawn(notify_swap_status(canister_id, notification));
} else if let Some(timer_id) = TIMER_ID.take() {
ic_cdk_timers::clear_timer(timer_id);
trace!("'notify_status_change' job stopped");
read_state(start_job_if_required);
}
}

Expand Down
1 change: 1 addition & 0 deletions backend/canisters/group/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

### Changed

- Simplify timer jobs + make them more efficient ([#5233](https://github.com/open-chat-labs/open-chat/pull/5233))
- Avoid sending prize winner notifications ([#5236](https://github.com/open-chat-labs/open-chat/pull/5236))

### Fixed
Expand Down
12 changes: 6 additions & 6 deletions backend/canisters/group/impl/src/jobs/make_pending_payments.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{mutate_state, RuntimeState};
use crate::{mutate_state, read_state, RuntimeState};
use candid::Principal;
use group_community_common::{PaymentRecipient, PendingPayment, PendingPaymentReason};
use ic_cdk_timers::TimerId;
Expand All @@ -16,23 +16,23 @@ thread_local! {

pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool {
if TIMER_ID.get().is_none() && !state.data.pending_payments_queue.is_empty() {
let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run);
let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run);
TIMER_ID.set(Some(timer_id));
trace!("'make_pending_payments' job started");
true
} else {
false
}
}

pub fn run() {
trace!("'make_pending_payments' job running");
TIMER_ID.set(None);

let (pending_payment, now_nanos) = mutate_state(|state| (state.data.pending_payments_queue.pop(), state.env.now_nanos()));

if let Some(pending_payment) = pending_payment {
ic_cdk::spawn(process_payment(pending_payment, now_nanos));
} else if let Some(timer_id) = TIMER_ID.take() {
ic_cdk_timers::clear_timer(timer_id);
trace!("'make_pending_payments' job stopped");
read_state(start_job_if_required);
}
}

Expand Down
4 changes: 4 additions & 0 deletions backend/canisters/group_index/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [unreleased]

### Changed

- Simplify timer jobs + make them more efficient ([#5233](https://github.com/open-chat-labs/open-chat/pull/5233))

## [[2.0.1010](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1010-group_index)] - 2024-01-18

### Added
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::{mutate_state, RuntimeState};
use crate::{mutate_state, read_state, RuntimeState};
use ic_cdk_timers::TimerId;
use std::cell::Cell;
use std::time::Duration;
use tracing::trace;
use types::{DeletedCommunityInfo, UserId};
use utils::time::MINUTE_IN_MS;

const MAX_BATCH_SIZE: usize = 100;

Expand All @@ -13,23 +14,23 @@ thread_local! {

pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool {
if TIMER_ID.get().is_none() && state.data.deleted_communities.notifications_pending() > 0 {
let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run);
let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run);
TIMER_ID.set(Some(timer_id));
trace!("'push_community_deleted_notifications' job started");
true
} else {
false
}
}

fn run() {
trace!("'push_community_deleted_notifications' job running");
TIMER_ID.set(None);

if let Some(batch) = mutate_state(next_batch) {
if !batch.is_empty() {
ic_cdk::spawn(push_notifications(batch));
read_state(start_job_if_required);
}
} else if let Some(timer_id) = TIMER_ID.take() {
ic_cdk_timers::clear_timer(timer_id);
trace!("'push_community_deleted_notifications' job stopped");
}
}

Expand All @@ -49,10 +50,29 @@ async fn push_notifications(notifications: Vec<(UserId, DeletedCommunityInfo)>)
let futures: Vec<_> = notifications.into_iter().map(|(u, d)| push_notification(u, d)).collect();

futures::future::join_all(futures).await;

read_state(start_job_if_required);
}

async fn push_notification(user_id: UserId, deleted_community: DeletedCommunityInfo) {
let args = user_canister::c2c_notify_community_deleted::Args { deleted_community };
// TODO handle case where this fails
let _ = user_canister_c2c_client::c2c_notify_community_deleted(user_id.into(), &args).await;

if user_canister_c2c_client::c2c_notify_community_deleted(user_id.into(), &args)
.await
.is_err()
{
mutate_state(|state| {
let now = state.env.now();
let deleted_community = args.deleted_community;

let retry = now.saturating_sub(deleted_community.timestamp) < 10 * MINUTE_IN_MS;

state
.data
.deleted_communities
.mark_notification_failed(deleted_community.id, user_id, retry);

start_job_if_required(state);
});
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{mutate_state, RuntimeState};
use crate::{mutate_state, read_state, RuntimeState};
use ic_cdk_timers::TimerId;
use std::cell::Cell;
use std::time::Duration;
Expand All @@ -14,23 +14,23 @@ thread_local! {

pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool {
if TIMER_ID.get().is_none() && state.data.deleted_groups.notifications_pending() > 0 {
let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run);
let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run);
TIMER_ID.set(Some(timer_id));
trace!("'push_group_deleted_notifications' job started");
true
} else {
false
}
}

fn run() {
trace!("'push_group_deleted_notifications' job running");
TIMER_ID.set(None);

if let Some(batch) = mutate_state(next_batch) {
if !batch.is_empty() {
ic_cdk::spawn(push_notifications(batch));
read_state(start_job_if_required);
}
} else if let Some(timer_id) = TIMER_ID.take() {
ic_cdk_timers::clear_timer(timer_id);
trace!("'push_group_deleted_notifications' job stopped");
}
}

Expand Down Expand Up @@ -71,6 +71,6 @@ async fn push_notification(user_id: UserId, deleted_group: DeletedGroupInfoInter
.mark_notification_failed(deleted_group.id, user_id, retry);

start_job_if_required(state);
})
});
}
}
Loading

0 comments on commit b0a2402

Please sign in to comment.