diff --git a/backend/canisters/community/CHANGELOG.md b/backend/canisters/community/CHANGELOG.md index 36dd38802e..2bc9dda274 100644 --- a/backend/canisters/community/CHANGELOG.md +++ b/backend/canisters/community/CHANGELOG.md @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ### Changed +- Simplify timer jobs + make them more efficient ([#5233](https://github.com/open-chat-labs/open-chat/pull/5233)) - Avoid sending prize winner notifications ([#5236](https://github.com/open-chat-labs/open-chat/pull/5236)) ### Fixed diff --git a/backend/canisters/community/impl/src/jobs/import_groups.rs b/backend/canisters/community/impl/src/jobs/import_groups.rs index ca913bec67..68bdfaf784 100644 --- a/backend/canisters/community/impl/src/jobs/import_groups.rs +++ b/backend/canisters/community/impl/src/jobs/import_groups.rs @@ -1,11 +1,10 @@ use crate::activity_notifications::extract_activity; use crate::model::channels::Channel; use crate::model::events::{CommunityEventInternal, GroupImportedInternal}; -use crate::model::groups_being_imported::NextBatchResult; use crate::model::members::AddResult; use crate::timer_job_types::{FinalizeGroupImportJob, ProcessGroupImportChannelMembersJob, TimerJob}; use crate::updates::c2c_join_channel::join_channel_unchecked; -use crate::{mutate_state, RuntimeState}; +use crate::{mutate_state, read_state, RuntimeState}; use group_canister::c2c_export_group::{Args, Response}; use group_chat_core::GroupChatCore; use ic_cdk_timers::TimerId; @@ -24,9 +23,8 @@ thread_local! { pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { if TIMER_ID.get().is_none() && !state.data.groups_being_imported.is_empty() { - let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run); + let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run); TIMER_ID.set(Some(timer_id)); - trace!("'import_groups' job started"); true } else { false @@ -34,25 +32,23 @@ pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { } fn run() { - match mutate_state(next_batch) { - NextBatchResult::Success(groups) => ic_cdk::spawn(import_groups(groups)), - NextBatchResult::Continue => {} - NextBatchResult::Exit => { - if let Some(timer_id) = TIMER_ID.take() { - ic_cdk_timers::clear_timer(timer_id); - trace!("'import_groups' job stopped"); - } - } + trace!("'import_groups' job running"); + TIMER_ID.set(None); + + let batch = mutate_state(next_batch); + if !batch.is_empty() { + ic_cdk::spawn(import_groups(batch)); } } -fn next_batch(state: &mut RuntimeState) -> NextBatchResult { +fn next_batch(state: &mut RuntimeState) -> Vec<(ChatId, u64)> { let now = state.env.now(); state.data.groups_being_imported.next_batch(now) } async fn import_groups(groups: Vec<(ChatId, u64)>) { futures::future::join_all(groups.into_iter().map(|(g, i)| import_group(g, i))).await; + read_state(start_job_if_required); } async fn import_group(group_id: ChatId, from: u64) { @@ -94,8 +90,6 @@ async fn import_group(group_id: ChatId, from: u64) { .data .groups_being_imported .mark_batch_failed(&group_id, format!("{error:?}")); - - start_job_if_required(state); } }); } diff --git a/backend/canisters/community/impl/src/jobs/make_pending_payments.rs b/backend/canisters/community/impl/src/jobs/make_pending_payments.rs index 22acb3f953..2662a3b3b7 100644 --- a/backend/canisters/community/impl/src/jobs/make_pending_payments.rs +++ b/backend/canisters/community/impl/src/jobs/make_pending_payments.rs @@ -1,4 +1,4 @@ -use crate::{mutate_state, RuntimeState}; +use crate::{mutate_state, read_state, RuntimeState}; use candid::Principal; use group_community_common::{PaymentRecipient, PendingPayment, PendingPaymentReason}; use ic_cdk_timers::TimerId; @@ -16,9 +16,8 @@ thread_local! { pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { if TIMER_ID.get().is_none() && !state.data.pending_payments_queue.is_empty() { - let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run); + let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run); TIMER_ID.set(Some(timer_id)); - trace!("'make_pending_payments' job started"); true } else { false @@ -26,13 +25,14 @@ pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { } pub fn run() { + trace!("'make_pending_payments' job running"); + TIMER_ID.set(None); + let (pending_payment, now_nanos) = mutate_state(|state| (state.data.pending_payments_queue.pop(), state.env.now_nanos())); if let Some(pending_payment) = pending_payment { ic_cdk::spawn(process_payment(pending_payment, now_nanos)); - } else if let Some(timer_id) = TIMER_ID.take() { - ic_cdk_timers::clear_timer(timer_id); - trace!("'make_pending_payments' job stopped"); + read_state(start_job_if_required); } } diff --git a/backend/canisters/community/impl/src/model/groups_being_imported.rs b/backend/canisters/community/impl/src/model/groups_being_imported.rs index 2e1e069dc1..c4cdcbd65f 100644 --- a/backend/canisters/community/impl/src/model/groups_being_imported.rs +++ b/backend/canisters/community/impl/src/model/groups_being_imported.rs @@ -8,12 +8,6 @@ pub struct GroupsBeingImported { groups: HashMap, } -pub enum NextBatchResult { - Success(Vec<(ChatId, u64)>), - Continue, - Exit, -} - impl GroupsBeingImported { pub fn add( &mut self, @@ -37,23 +31,15 @@ impl GroupsBeingImported { self.groups.contains_key(group_id) } - pub fn next_batch(&mut self, now: TimestampMillis) -> NextBatchResult { - if self.groups.is_empty() { - NextBatchResult::Exit - } else { - let mut batch = Vec::new(); - for (chat_id, group) in self.groups.iter_mut().filter(|(_, g)| !g.is_complete()) { - if group.current_batch_started.is_none() { - group.current_batch_started = Some(now); - batch.push((*chat_id, group.bytes.len() as u64)); - } - } - if batch.is_empty() { - NextBatchResult::Continue - } else { - NextBatchResult::Success(batch) + pub fn next_batch(&mut self, now: TimestampMillis) -> Vec<(ChatId, u64)> { + let mut batch = Vec::new(); + for (chat_id, group) in self.groups.iter_mut().filter(|(_, g)| !g.is_complete()) { + if group.current_batch_started.is_none() { + group.current_batch_started = Some(now); + batch.push((*chat_id, group.bytes.len() as u64)); } } + batch } // Returns true if the group bytes have all been imported, else false diff --git a/backend/canisters/escrow/CHANGELOG.md b/backend/canisters/escrow/CHANGELOG.md index 515c769b7a..71b6b63d5a 100644 --- a/backend/canisters/escrow/CHANGELOG.md +++ b/backend/canisters/escrow/CHANGELOG.md @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ### Changed - Include `created_by` on `SwapStatusChange` messages ([#5230](https://github.com/open-chat-labs/open-chat/pull/5230)) +- Simplify timer jobs + make them more efficient ([#5233](https://github.com/open-chat-labs/open-chat/pull/5233)) ## [[2.0.1014](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1014-escrow)] - 2024-01-19 diff --git a/backend/canisters/escrow/impl/src/jobs/make_pending_payments.rs b/backend/canisters/escrow/impl/src/jobs/make_pending_payments.rs index 454a9c04c5..fcbea299af 100644 --- a/backend/canisters/escrow/impl/src/jobs/make_pending_payments.rs +++ b/backend/canisters/escrow/impl/src/jobs/make_pending_payments.rs @@ -1,5 +1,5 @@ use crate::model::pending_payments_queue::{PendingPayment, PendingPaymentReason}; -use crate::{mutate_state, RuntimeState}; +use crate::{mutate_state, read_state, RuntimeState}; use candid::Principal; use escrow_canister::{deposit_subaccount, SwapStatus}; use ic_cdk_timers::TimerId; @@ -18,9 +18,8 @@ thread_local! { pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { if TIMER_ID.get().is_none() && !state.data.pending_payments_queue.is_empty() { - let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run); + let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run); TIMER_ID.set(Some(timer_id)); - trace!("'make_pending_payments' job started"); true } else { false @@ -28,11 +27,12 @@ pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { } pub fn run() { + trace!("'make_pending_payments' job running"); + TIMER_ID.set(None); + if let Some(pending_payment) = mutate_state(|state| state.data.pending_payments_queue.pop()) { ic_cdk::spawn(process_payment(pending_payment)); - } else if let Some(timer_id) = TIMER_ID.take() { - ic_cdk_timers::clear_timer(timer_id); - trace!("'make_pending_payments' job stopped"); + read_state(start_job_if_required); } } diff --git a/backend/canisters/escrow/impl/src/jobs/notify_status_change.rs b/backend/canisters/escrow/impl/src/jobs/notify_status_change.rs index 218b317573..477a1a35c1 100644 --- a/backend/canisters/escrow/impl/src/jobs/notify_status_change.rs +++ b/backend/canisters/escrow/impl/src/jobs/notify_status_change.rs @@ -1,4 +1,4 @@ -use crate::{mutate_state, RuntimeState}; +use crate::{mutate_state, read_state, RuntimeState}; use canister_client::make_c2c_call; use escrow_canister::SwapStatusChange; use ic_cdk_timers::TimerId; @@ -13,9 +13,8 @@ thread_local! { pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { if TIMER_ID.get().is_none() && !state.data.notify_status_change_queue.is_empty() { - let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run); + let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run); TIMER_ID.set(Some(timer_id)); - trace!("'notify_status_change' job started"); true } else { false @@ -23,11 +22,12 @@ pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { } pub fn run() { + trace!("'notify_status_change' job running"); + TIMER_ID.set(None); + if let Some((canister_id, notification)) = mutate_state(get_next) { ic_cdk::spawn(notify_swap_status(canister_id, notification)); - } else if let Some(timer_id) = TIMER_ID.take() { - ic_cdk_timers::clear_timer(timer_id); - trace!("'notify_status_change' job stopped"); + read_state(start_job_if_required); } } diff --git a/backend/canisters/group/CHANGELOG.md b/backend/canisters/group/CHANGELOG.md index b53733d8df..c90ef93f77 100644 --- a/backend/canisters/group/CHANGELOG.md +++ b/backend/canisters/group/CHANGELOG.md @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ### Changed +- Simplify timer jobs + make them more efficient ([#5233](https://github.com/open-chat-labs/open-chat/pull/5233)) - Avoid sending prize winner notifications ([#5236](https://github.com/open-chat-labs/open-chat/pull/5236)) ### Fixed diff --git a/backend/canisters/group/impl/src/jobs/make_pending_payments.rs b/backend/canisters/group/impl/src/jobs/make_pending_payments.rs index 22acb3f953..2662a3b3b7 100644 --- a/backend/canisters/group/impl/src/jobs/make_pending_payments.rs +++ b/backend/canisters/group/impl/src/jobs/make_pending_payments.rs @@ -1,4 +1,4 @@ -use crate::{mutate_state, RuntimeState}; +use crate::{mutate_state, read_state, RuntimeState}; use candid::Principal; use group_community_common::{PaymentRecipient, PendingPayment, PendingPaymentReason}; use ic_cdk_timers::TimerId; @@ -16,9 +16,8 @@ thread_local! { pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { if TIMER_ID.get().is_none() && !state.data.pending_payments_queue.is_empty() { - let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run); + let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run); TIMER_ID.set(Some(timer_id)); - trace!("'make_pending_payments' job started"); true } else { false @@ -26,13 +25,14 @@ pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { } pub fn run() { + trace!("'make_pending_payments' job running"); + TIMER_ID.set(None); + let (pending_payment, now_nanos) = mutate_state(|state| (state.data.pending_payments_queue.pop(), state.env.now_nanos())); if let Some(pending_payment) = pending_payment { ic_cdk::spawn(process_payment(pending_payment, now_nanos)); - } else if let Some(timer_id) = TIMER_ID.take() { - ic_cdk_timers::clear_timer(timer_id); - trace!("'make_pending_payments' job stopped"); + read_state(start_job_if_required); } } diff --git a/backend/canisters/group_index/CHANGELOG.md b/backend/canisters/group_index/CHANGELOG.md index 1cd0094910..1263105bb3 100644 --- a/backend/canisters/group_index/CHANGELOG.md +++ b/backend/canisters/group_index/CHANGELOG.md @@ -5,6 +5,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ## [unreleased] +### Changed + +- Simplify timer jobs + make them more efficient ([#5233](https://github.com/open-chat-labs/open-chat/pull/5233)) + ## [[2.0.1010](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1010-group_index)] - 2024-01-18 ### Added diff --git a/backend/canisters/group_index/impl/src/jobs/push_community_deleted_notifications.rs b/backend/canisters/group_index/impl/src/jobs/push_community_deleted_notifications.rs index 48e084f9fc..ef6ca4db89 100644 --- a/backend/canisters/group_index/impl/src/jobs/push_community_deleted_notifications.rs +++ b/backend/canisters/group_index/impl/src/jobs/push_community_deleted_notifications.rs @@ -1,9 +1,10 @@ -use crate::{mutate_state, RuntimeState}; +use crate::{mutate_state, read_state, RuntimeState}; use ic_cdk_timers::TimerId; use std::cell::Cell; use std::time::Duration; use tracing::trace; use types::{DeletedCommunityInfo, UserId}; +use utils::time::MINUTE_IN_MS; const MAX_BATCH_SIZE: usize = 100; @@ -13,9 +14,8 @@ thread_local! { pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { if TIMER_ID.get().is_none() && state.data.deleted_communities.notifications_pending() > 0 { - let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run); + let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run); TIMER_ID.set(Some(timer_id)); - trace!("'push_community_deleted_notifications' job started"); true } else { false @@ -23,13 +23,14 @@ pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { } fn run() { + trace!("'push_community_deleted_notifications' job running"); + TIMER_ID.set(None); + if let Some(batch) = mutate_state(next_batch) { if !batch.is_empty() { ic_cdk::spawn(push_notifications(batch)); + read_state(start_job_if_required); } - } else if let Some(timer_id) = TIMER_ID.take() { - ic_cdk_timers::clear_timer(timer_id); - trace!("'push_community_deleted_notifications' job stopped"); } } @@ -49,10 +50,29 @@ async fn push_notifications(notifications: Vec<(UserId, DeletedCommunityInfo)>) let futures: Vec<_> = notifications.into_iter().map(|(u, d)| push_notification(u, d)).collect(); futures::future::join_all(futures).await; + + read_state(start_job_if_required); } async fn push_notification(user_id: UserId, deleted_community: DeletedCommunityInfo) { let args = user_canister::c2c_notify_community_deleted::Args { deleted_community }; - // TODO handle case where this fails - let _ = user_canister_c2c_client::c2c_notify_community_deleted(user_id.into(), &args).await; + + if user_canister_c2c_client::c2c_notify_community_deleted(user_id.into(), &args) + .await + .is_err() + { + mutate_state(|state| { + let now = state.env.now(); + let deleted_community = args.deleted_community; + + let retry = now.saturating_sub(deleted_community.timestamp) < 10 * MINUTE_IN_MS; + + state + .data + .deleted_communities + .mark_notification_failed(deleted_community.id, user_id, retry); + + start_job_if_required(state); + }); + } } diff --git a/backend/canisters/group_index/impl/src/jobs/push_group_deleted_notifications.rs b/backend/canisters/group_index/impl/src/jobs/push_group_deleted_notifications.rs index ad723d712a..88efa3a7f0 100644 --- a/backend/canisters/group_index/impl/src/jobs/push_group_deleted_notifications.rs +++ b/backend/canisters/group_index/impl/src/jobs/push_group_deleted_notifications.rs @@ -1,4 +1,4 @@ -use crate::{mutate_state, RuntimeState}; +use crate::{mutate_state, read_state, RuntimeState}; use ic_cdk_timers::TimerId; use std::cell::Cell; use std::time::Duration; @@ -14,9 +14,8 @@ thread_local! { pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { if TIMER_ID.get().is_none() && state.data.deleted_groups.notifications_pending() > 0 { - let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run); + let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run); TIMER_ID.set(Some(timer_id)); - trace!("'push_group_deleted_notifications' job started"); true } else { false @@ -24,13 +23,14 @@ pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { } fn run() { + trace!("'push_group_deleted_notifications' job running"); + TIMER_ID.set(None); + if let Some(batch) = mutate_state(next_batch) { if !batch.is_empty() { ic_cdk::spawn(push_notifications(batch)); + read_state(start_job_if_required); } - } else if let Some(timer_id) = TIMER_ID.take() { - ic_cdk_timers::clear_timer(timer_id); - trace!("'push_group_deleted_notifications' job stopped"); } } @@ -71,6 +71,6 @@ async fn push_notification(user_id: UserId, deleted_group: DeletedGroupInfoInter .mark_notification_failed(deleted_group.id, user_id, retry); start_job_if_required(state); - }) + }); } } diff --git a/backend/canisters/group_index/impl/src/lib.rs b/backend/canisters/group_index/impl/src/lib.rs index 0d8f24c9d6..28de916d9b 100644 --- a/backend/canisters/group_index/impl/src/lib.rs +++ b/backend/canisters/group_index/impl/src/lib.rs @@ -196,11 +196,14 @@ impl Data { pub fn calculate_metrics(&mut self, now: TimestampMillis) { let deleted_group_metrics = self.deleted_groups.metrics(); + let deleted_community_metrics = self.deleted_communities.metrics(); let mut cached_metrics = CachedMetrics { last_run: now, deleted_public_groups: deleted_group_metrics.public, deleted_private_groups: deleted_group_metrics.private, + deleted_public_communities: deleted_community_metrics.public, + deleted_private_communities: deleted_community_metrics.private, ..Default::default() }; diff --git a/backend/canisters/group_index/impl/src/model/deleted_communities.rs b/backend/canisters/group_index/impl/src/model/deleted_communities.rs index f78454a11d..fd0608fae8 100644 --- a/backend/canisters/group_index/impl/src/model/deleted_communities.rs +++ b/backend/canisters/group_index/impl/src/model/deleted_communities.rs @@ -1,4 +1,3 @@ -#![allow(dead_code)] use serde::{Deserialize, Serialize}; use std::collections::hash_map::Entry::{Occupied, Vacant}; use std::collections::{HashMap, VecDeque}; @@ -8,6 +7,8 @@ use types::{CommunityId, DeletedCommunityInfo, UserId}; pub struct DeletedCommunities { communities: HashMap, pending_community_deleted_notifications: VecDeque<(CommunityId, UserId)>, + #[serde(default)] + failed_notifications: Vec<(CommunityId, UserId)>, } impl DeletedCommunities { @@ -38,6 +39,15 @@ impl DeletedCommunities { .and_then(|(community_id, user_id)| self.communities.get(&community_id).map(|d| (user_id, d.clone()))) } + pub fn mark_notification_failed(&mut self, community_id: CommunityId, user_id: UserId, retry: bool) { + if retry { + self.pending_community_deleted_notifications + .push_back((community_id, user_id)); + } else { + self.failed_notifications.push((community_id, user_id)); + } + } + pub fn notifications_pending(&self) -> usize { self.pending_community_deleted_notifications.len() } diff --git a/backend/canisters/local_user_index/CHANGELOG.md b/backend/canisters/local_user_index/CHANGELOG.md index b6401cd724..4afda638b8 100644 --- a/backend/canisters/local_user_index/CHANGELOG.md +++ b/backend/canisters/local_user_index/CHANGELOG.md @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ### Changed - Upgrade Diamond members first ([#5214](https://github.com/open-chat-labs/open-chat/pull/5214)) +- Simplify timer jobs + make them more efficient ([#5233](https://github.com/open-chat-labs/open-chat/pull/5233)) ## [[2.0.1011](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1011-local_user_index)] - 2024-01-18 diff --git a/backend/canisters/local_user_index/impl/src/jobs/sync_events_to_user_canisters.rs b/backend/canisters/local_user_index/impl/src/jobs/sync_events_to_user_canisters.rs index 7eec325657..3593dc0f7b 100644 --- a/backend/canisters/local_user_index/impl/src/jobs/sync_events_to_user_canisters.rs +++ b/backend/canisters/local_user_index/impl/src/jobs/sync_events_to_user_canisters.rs @@ -1,4 +1,4 @@ -use crate::{mutate_state, RuntimeState}; +use crate::{mutate_state, read_state, RuntimeState}; use ic_cdk_timers::TimerId; use std::cell::Cell; use std::time::Duration; @@ -11,12 +11,9 @@ thread_local! { } pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { - if TIMER_ID.get().is_none() - && (!state.data.user_event_sync_queue.is_empty() || state.data.user_event_sync_queue.sync_in_progress()) - { - let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run); + if TIMER_ID.get().is_none() && !state.data.user_event_sync_queue.is_empty() { + let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run); TIMER_ID.set(Some(timer_id)); - trace!("'sync_events_to_user_canisters' job started"); true } else { false @@ -24,32 +21,17 @@ pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { } fn run() { - match mutate_state(next_batch) { - NextBatchResult::Success(batch) => ic_cdk::spawn(process_batch(batch)), - NextBatchResult::Continue => {} - NextBatchResult::QueueEmpty => { - if let Some(timer_id) = TIMER_ID.take() { - ic_cdk_timers::clear_timer(timer_id); - trace!("'sync_events_to_user_canisters' job stopped"); - } - } - } -} + trace!("'sync_events_to_user_canisters' job running"); + TIMER_ID.set(None); -enum NextBatchResult { - Success(Vec<(CanisterId, Vec)>), - Continue, - QueueEmpty, + if let Some(batch) = mutate_state(next_batch) { + ic_cdk::spawn(process_batch(batch)); + read_state(start_job_if_required); + } } -fn next_batch(state: &mut RuntimeState) -> NextBatchResult { - if let Some(batch) = state.data.user_event_sync_queue.try_start_batch() { - NextBatchResult::Success(batch) - } else if !state.data.user_event_sync_queue.is_empty() || state.data.user_event_sync_queue.sync_in_progress() { - NextBatchResult::Continue - } else { - NextBatchResult::QueueEmpty - } +fn next_batch(state: &mut RuntimeState) -> Option)>> { + state.data.user_event_sync_queue.try_start_batch() } async fn process_batch(batch: Vec<(CanisterId, Vec)>) { @@ -60,7 +42,10 @@ async fn process_batch(batch: Vec<(CanisterId, Vec)>) { futures::future::join_all(futures).await; - mutate_state(|state| state.data.user_event_sync_queue.mark_batch_completed()); + mutate_state(|state| { + state.data.user_event_sync_queue.mark_batch_completed(); + start_job_if_required(state); + }); } async fn sync_events(canister_id: CanisterId, events: Vec) { @@ -71,8 +56,6 @@ async fn sync_events(canister_id: CanisterId, events: Vec) { .data .user_event_sync_queue .mark_sync_failed_for_canister(canister_id, events); - - start_job_if_required(state); }); } } diff --git a/backend/canisters/local_user_index/impl/src/jobs/sync_events_to_user_index_canister.rs b/backend/canisters/local_user_index/impl/src/jobs/sync_events_to_user_index_canister.rs index 4c2600ab1a..1b768188b1 100644 --- a/backend/canisters/local_user_index/impl/src/jobs/sync_events_to_user_index_canister.rs +++ b/backend/canisters/local_user_index/impl/src/jobs/sync_events_to_user_index_canister.rs @@ -1,4 +1,4 @@ -use crate::{mutate_state, RuntimeState}; +use crate::{mutate_state, read_state, RuntimeState}; use ic_cdk_timers::TimerId; use std::cell::Cell; use std::time::Duration; @@ -11,12 +11,9 @@ thread_local! { } pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { - if TIMER_ID.get().is_none() - && (!state.data.user_index_event_sync_queue.is_empty() || state.data.user_index_event_sync_queue.sync_in_progress()) - { - let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run); + if TIMER_ID.get().is_none() && !state.data.user_index_event_sync_queue.is_empty() { + let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run); TIMER_ID.set(Some(timer_id)); - trace!("'sync_events_to_user_index_canister' job started"); true } else { false @@ -24,49 +21,33 @@ pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { } fn run() { - match mutate_state(next_batch) { - NextBatchResult::Success(canister_id, events) => ic_cdk::spawn(sync_events(canister_id, events)), - NextBatchResult::Continue => {} - NextBatchResult::QueueEmpty => { - if let Some(timer_id) = TIMER_ID.take() { - ic_cdk_timers::clear_timer(timer_id); - trace!("'sync_events_to_user_index_canister' job stopped"); - } - } - } -} + trace!("'sync_events_to_user_index_canister' job running"); + TIMER_ID.set(None); -enum NextBatchResult { - Success(CanisterId, Vec), - Continue, - QueueEmpty, + if let Some((canister_id, events)) = mutate_state(next_batch) { + ic_cdk::spawn(sync_events(canister_id, events)); + read_state(start_job_if_required); + } } -fn next_batch(state: &mut RuntimeState) -> NextBatchResult { - if let Some((user_index_canister_id, events)) = state.data.user_index_event_sync_queue.try_start_single() { - NextBatchResult::Success(user_index_canister_id, events) - } else if !state.data.user_index_event_sync_queue.is_empty() || state.data.user_index_event_sync_queue.sync_in_progress() { - NextBatchResult::Continue - } else { - NextBatchResult::QueueEmpty - } +fn next_batch(state: &mut RuntimeState) -> Option<(CanisterId, Vec)> { + state.data.user_index_event_sync_queue.try_start_single() } async fn sync_events(canister_id: CanisterId, events: Vec) { let args = user_index_canister::c2c_notify_events::Args { events: events.clone() }; - if user_index_canister_c2c_client::c2c_notify_events(canister_id, &args) + let success = user_index_canister_c2c_client::c2c_notify_events(canister_id, &args) .await - .is_err() - { - mutate_state(|state| { + .is_ok(); + + mutate_state(|state| { + if !success { state .data .user_index_event_sync_queue .mark_sync_failed_for_canister(canister_id, events); - - start_job_if_required(state); - }); - } - - mutate_state(|state| state.data.user_index_event_sync_queue.mark_batch_completed()); + } + state.data.user_index_event_sync_queue.mark_batch_completed(); + start_job_if_required(state); + }); } diff --git a/backend/canisters/local_user_index/impl/src/jobs/topup_canister_pool.rs b/backend/canisters/local_user_index/impl/src/jobs/topup_canister_pool.rs index 7627b2dd13..cface7cb3b 100644 --- a/backend/canisters/local_user_index/impl/src/jobs/topup_canister_pool.rs +++ b/backend/canisters/local_user_index/impl/src/jobs/topup_canister_pool.rs @@ -13,9 +13,8 @@ thread_local! { pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { if TIMER_ID.get().is_none() && !state.data.canister_pool.is_full() { - let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run); + let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run); TIMER_ID.set(Some(timer_id)); - trace!("'topup_canister_pool' job started"); true } else { false @@ -23,6 +22,9 @@ pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { } fn run() { + trace!("'topup_canister_pool' job running"); + TIMER_ID.set(None); + let (is_full, test_mode) = read_state(|state| (is_pool_full(state), state.data.test_mode)); if !is_full { let cycles_to_use = USER_CANISTER_INITIAL_CYCLES_BALANCE + CREATE_CANISTER_CYCLES_FEE; @@ -31,9 +33,6 @@ fn run() { if utils::cycles::can_spend_cycles(cycles_to_use, min_cycles_balance(test_mode)) { ic_cdk::spawn(add_new_canister(cycles_to_use)); } - } else if let Some(timer_id) = TIMER_ID.take() { - ic_cdk_timers::clear_timer(timer_id); - trace!("'topup_canister_pool' job stopped"); } } @@ -45,6 +44,7 @@ async fn add_new_canister(cycles_to_use: Cycles) { if let Ok(canister_id) = create(cycles_to_use).await { mutate_state(|state| add_canister_to_pool(canister_id, cycles_to_use, state)); } + read_state(start_job_if_required); } fn add_canister_to_pool(canister_id: CanisterId, cycles: Cycles, state: &mut RuntimeState) { diff --git a/backend/canisters/notifications_index/CHANGELOG.md b/backend/canisters/notifications_index/CHANGELOG.md index 390fdf3923..a0ef6f73a5 100644 --- a/backend/canisters/notifications_index/CHANGELOG.md +++ b/backend/canisters/notifications_index/CHANGELOG.md @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ### Changed - Better formatting of proposal payloads ([#5115](https://github.com/open-chat-labs/open-chat/pull/5115)) +- Simplify timer jobs + make them more efficient ([#5233](https://github.com/open-chat-labs/open-chat/pull/5233)) ## [[2.0.971](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.971-notifications_index)] - 2023-12-12 diff --git a/backend/canisters/notifications_index/impl/src/jobs/sync_notifications_canisters.rs b/backend/canisters/notifications_index/impl/src/jobs/sync_notifications_canisters.rs index fd635f8db8..66558502d3 100644 --- a/backend/canisters/notifications_index/impl/src/jobs/sync_notifications_canisters.rs +++ b/backend/canisters/notifications_index/impl/src/jobs/sync_notifications_canisters.rs @@ -11,13 +11,9 @@ thread_local! { } pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { - if TIMER_ID.get().is_none() - && (!state.data.notifications_index_event_sync_queue.is_empty() - || state.data.notifications_index_event_sync_queue.sync_in_progress()) - { - let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run); + if TIMER_ID.get().is_none() && !state.data.notifications_index_event_sync_queue.is_empty() { + let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run); TIMER_ID.set(Some(timer_id)); - trace!("'sync_notifications_canisters' job started"); true } else { false @@ -25,13 +21,13 @@ pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { } pub fn run() { + trace!("'sync_notifications_canisters' job running"); + TIMER_ID.set(None); + if let Some(batch) = mutate_state(next_batch) { if !batch.is_empty() { ic_cdk::spawn(process_batch(batch)); } - } else if let Some(timer_id) = TIMER_ID.take() { - ic_cdk_timers::clear_timer(timer_id); - trace!("'sync_notifications_canisters' job stopped"); } } @@ -47,7 +43,10 @@ async fn process_batch(batch: Vec<(CanisterId, Vec)>) { futures::future::join_all(futures).await; - mutate_state(|state| state.data.notifications_index_event_sync_queue.mark_batch_completed()); + mutate_state(|state| { + state.data.notifications_index_event_sync_queue.mark_batch_completed(); + start_job_if_required(state); + }); } async fn sync_events(canister_id: CanisterId, events: Vec) { diff --git a/backend/canisters/notifications_index/impl/src/jobs/upgrade_canisters.rs b/backend/canisters/notifications_index/impl/src/jobs/upgrade_canisters.rs index 9aa147e33d..acd63b0ee6 100644 --- a/backend/canisters/notifications_index/impl/src/jobs/upgrade_canisters.rs +++ b/backend/canisters/notifications_index/impl/src/jobs/upgrade_canisters.rs @@ -120,4 +120,5 @@ fn on_failure(canister_id: CanisterId, from_version: BuildVersion, to_version: B from_version, to_version, }); + start_job_if_required(state); } diff --git a/backend/canisters/proposals_bot/CHANGELOG.md b/backend/canisters/proposals_bot/CHANGELOG.md index bfc0cfe764..171fa38cb3 100644 --- a/backend/canisters/proposals_bot/CHANGELOG.md +++ b/backend/canisters/proposals_bot/CHANGELOG.md @@ -5,6 +5,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ## [unreleased] +### Changed + +- Simplify timer jobs + make them more efficient ([#5233](https://github.com/open-chat-labs/open-chat/pull/5233)) + ## [[2.0.1001](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1001-proposals_bot)] - 2024-01-05 ### Fixed diff --git a/backend/canisters/proposals_bot/impl/src/jobs/increase_dissolve_delay.rs b/backend/canisters/proposals_bot/impl/src/jobs/increase_dissolve_delay.rs index 62bde12709..5755bb835b 100644 --- a/backend/canisters/proposals_bot/impl/src/jobs/increase_dissolve_delay.rs +++ b/backend/canisters/proposals_bot/impl/src/jobs/increase_dissolve_delay.rs @@ -22,7 +22,6 @@ pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { { let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run); TIMER_ID.set(Some(timer_id)); - trace!("'increase_dissolve_delay' job started"); true } else { false @@ -30,21 +29,15 @@ pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { } pub fn run() { + trace!("'increase_dissolve_delay' job running"); + TIMER_ID.set(None); + if let Some(neuron) = mutate_state(|state| state.data.nervous_systems.get_neuron_in_need_of_dissolve_delay_increase()) { ic_cdk::spawn(increase_dissolve_delay( neuron.governance_canister_id, neuron.neuron_id, neuron.additional_dissolve_delay_seconds, )); - } else { - stop_job(); - } -} - -fn stop_job() { - if let Some(timer_id) = TIMER_ID.take() { - ic_cdk_timers::clear_timer(timer_id); - trace!("'increase_dissolve_delay' job stopped"); } } @@ -70,7 +63,5 @@ async fn increase_dissolve_delay( .mark_neuron_dissolve_delay_increased(&governance_canister_id, additional_dissolve_delay_seconds as u64 * 1000) }); } - - stop_job(); read_state(start_job_if_required); } diff --git a/backend/canisters/proposals_bot/impl/src/jobs/push_proposals.rs b/backend/canisters/proposals_bot/impl/src/jobs/push_proposals.rs index 7383a8dca3..d71762f9f0 100644 --- a/backend/canisters/proposals_bot/impl/src/jobs/push_proposals.rs +++ b/backend/canisters/proposals_bot/impl/src/jobs/push_proposals.rs @@ -1,5 +1,5 @@ use crate::model::nervous_systems::ProposalToPush; -use crate::{generate_message_id, mutate_state, RuntimeState}; +use crate::{generate_message_id, mutate_state, read_state, RuntimeState}; use chat_events::{MessageContentInternal, ProposalContentInternal}; use ic_cdk::api::call::{CallResult, RejectionCode}; use ic_cdk_timers::TimerId; @@ -15,9 +15,8 @@ thread_local! { pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { if TIMER_ID.get().is_none() && state.data.nervous_systems.any_proposals_to_push() { - let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run); + let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run); TIMER_ID.set(Some(timer_id)); - trace!("'push_proposals' job started"); true } else { false @@ -25,12 +24,13 @@ pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { } pub fn run() { + trace!("'push_proposals' job started"); + TIMER_ID.set(None); + if let Some(proposal) = mutate_state(|state| state.data.nervous_systems.dequeue_next_proposal_to_push()) { ic_cdk::spawn(push_proposal(proposal)); - } else if let Some(timer_id) = TIMER_ID.take() { - ic_cdk_timers::clear_timer(timer_id); - trace!("'push_proposals' job stopped"); } + read_state(start_job_if_required); } async fn push_proposal( diff --git a/backend/canisters/proposals_bot/impl/src/jobs/update_finished_proposals.rs b/backend/canisters/proposals_bot/impl/src/jobs/update_finished_proposals.rs index bd8ffa5cc6..c179c0b6b3 100644 --- a/backend/canisters/proposals_bot/impl/src/jobs/update_finished_proposals.rs +++ b/backend/canisters/proposals_bot/impl/src/jobs/update_finished_proposals.rs @@ -15,9 +15,8 @@ thread_local! { pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { if TIMER_ID.get().is_none() && !state.data.finished_proposals_to_process.is_empty() { - let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run); + let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run); TIMER_ID.set(Some(timer_id)); - trace!("'update_finished_proposals' job started"); true } else { false @@ -25,6 +24,9 @@ pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { } fn run() { + trace!("'update_finished_proposals' job started"); + TIMER_ID.set(None); + mutate_state(run_impl); } @@ -35,10 +37,8 @@ fn run_impl(state: &mut RuntimeState) { ic_cdk::spawn(process_proposal(governance_canister_id, proposal_id, is_nns)); } - } else if let Some(timer_id) = TIMER_ID.take() { - ic_cdk_timers::clear_timer(timer_id); - trace!("'update_finished_proposals' job stopped"); } + start_job_if_required(state); } async fn process_proposal(governance_canister_id: CanisterId, proposal_id: ProposalId, is_nns: bool) { diff --git a/backend/canisters/proposals_bot/impl/src/jobs/update_proposals.rs b/backend/canisters/proposals_bot/impl/src/jobs/update_proposals.rs index c06d822044..ad28b35e91 100644 --- a/backend/canisters/proposals_bot/impl/src/jobs/update_proposals.rs +++ b/backend/canisters/proposals_bot/impl/src/jobs/update_proposals.rs @@ -1,5 +1,5 @@ use crate::model::nervous_systems::ProposalsToUpdate; -use crate::{mutate_state, RuntimeState}; +use crate::{mutate_state, read_state, RuntimeState}; use ic_cdk_timers::TimerId; use std::cell::Cell; use std::time::Duration; @@ -12,9 +12,8 @@ thread_local! { pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { if TIMER_ID.get().is_none() && state.data.nervous_systems.any_proposals_to_update() { - let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run); + let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run); TIMER_ID.set(Some(timer_id)); - trace!("'update_proposals' job started"); true } else { false @@ -22,12 +21,13 @@ pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { } pub fn run() { + trace!("'update_proposals' job started"); + TIMER_ID.set(None); + if let Some(proposals) = mutate_state(|state| state.data.nervous_systems.dequeue_next_proposals_to_update()) { ic_cdk::spawn(update_proposals(proposals)); - } else if let Some(timer_id) = TIMER_ID.take() { - ic_cdk_timers::clear_timer(timer_id); - trace!("'update_proposals' job stopped"); } + read_state(start_job_if_required); } async fn update_proposals( diff --git a/backend/canisters/user/impl/src/jobs/push_user_canister_events.rs b/backend/canisters/user/impl/src/jobs/push_user_canister_events.rs index 0944b531c7..2b4fe48375 100644 --- a/backend/canisters/user/impl/src/jobs/push_user_canister_events.rs +++ b/backend/canisters/user/impl/src/jobs/push_user_canister_events.rs @@ -23,6 +23,7 @@ pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { fn run() { trace!("'push_user_canister_events' running"); TIMER_ID.set(None); + if let Some(batch) = mutate_state(next_batch) { ic_cdk::spawn(process_batch(batch)); } diff --git a/backend/canisters/user_index/CHANGELOG.md b/backend/canisters/user_index/CHANGELOG.md index dfc93fee2d..09808f0e80 100644 --- a/backend/canisters/user_index/CHANGELOG.md +++ b/backend/canisters/user_index/CHANGELOG.md @@ -5,6 +5,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ## [unreleased] +### Changed + +- Simplify timer jobs + make them more efficient ([#5233](https://github.com/open-chat-labs/open-chat/pull/5233)) + ### Fixed - Notify community canisters when a user is unsuspended ([#5227](https://github.com/open-chat-labs/open-chat/pull/5227)) diff --git a/backend/canisters/user_index/impl/src/jobs/make_pending_payments.rs b/backend/canisters/user_index/impl/src/jobs/make_pending_payments.rs index dbaacd346f..096b96c9dd 100644 --- a/backend/canisters/user_index/impl/src/jobs/make_pending_payments.rs +++ b/backend/canisters/user_index/impl/src/jobs/make_pending_payments.rs @@ -18,9 +18,8 @@ thread_local! { pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { if TIMER_ID.get().is_none() && !state.data.pending_payments_queue.is_empty() { - let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run); + let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run); TIMER_ID.set(Some(timer_id)); - trace!("'make_pending_payments' job started"); true } else { false @@ -28,35 +27,38 @@ pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { } pub fn run() { + trace!("'make_pending_payments' job running"); + TIMER_ID.set(None); + if let Some(pending_payment) = mutate_state(|state| state.data.pending_payments_queue.pop()) { ic_cdk::spawn(process_payment(pending_payment)); - } else if let Some(timer_id) = TIMER_ID.take() { - ic_cdk_timers::clear_timer(timer_id); - trace!("'make_pending_payments' job stopped"); } + read_state(start_job_if_required); } async fn process_payment(pending_payment: PendingPayment) { let reason = pending_payment.reason.clone(); - match make_payment(&pending_payment).await { - Ok(block_index) => match reason { - PendingPaymentReason::ReferralReward => { - mutate_state(|state| inform_referrer(&pending_payment, block_index, state)); - } - PendingPaymentReason::TopUpNeuron => { - read_state(|state| state.data.refresh_nns_neuron()); - } - _ => {} - }, - Err(retry) => { - if retry { - mutate_state(|state| { + let result = make_payment(&pending_payment).await; + + mutate_state(|state| { + match result { + Ok(block_index) => match reason { + PendingPaymentReason::ReferralReward => { + inform_referrer(&pending_payment, block_index, state); + } + PendingPaymentReason::TopUpNeuron => { + state.data.refresh_nns_neuron(); + } + _ => {} + }, + Err(retry) => { + if retry { state.data.pending_payments_queue.push(pending_payment); - start_job_if_required(state); - }); + } } } - } + start_job_if_required(state); + }); } // Error response contains a boolean stating if the transfer should be retried diff --git a/backend/canisters/user_index/impl/src/jobs/notify_user_principal_migrations.rs b/backend/canisters/user_index/impl/src/jobs/notify_user_principal_migrations.rs index 22c63e26eb..2319094c17 100644 --- a/backend/canisters/user_index/impl/src/jobs/notify_user_principal_migrations.rs +++ b/backend/canisters/user_index/impl/src/jobs/notify_user_principal_migrations.rs @@ -1,5 +1,5 @@ use crate::model::user_principal_migration_queue::CanisterToNotifyOfUserPrincipalMigration; -use crate::{mutate_state, RuntimeState}; +use crate::{mutate_state, read_state, RuntimeState}; use ic_cdk_timers::TimerId; use std::cell::Cell; use std::time::Duration; @@ -14,9 +14,8 @@ thread_local! { pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { if TIMER_ID.get().is_none() && !state.data.user_principal_migration_queue.is_empty() { - let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run); + let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run); TIMER_ID.set(Some(timer_id)); - trace!("'notify_user_principal_migrations' job started"); true } else { false @@ -24,13 +23,14 @@ pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { } pub fn run() { + trace!("'notify_user_principal_migrations' job running"); + TIMER_ID.set(None); + let next_batch = mutate_state(next_batch); if !next_batch.is_empty() { ic_cdk::spawn(notify_many(next_batch)); - } else if let Some(timer_id) = TIMER_ID.take() { - ic_cdk_timers::clear_timer(timer_id); - trace!("'notify_user_principal_migrations' job stopped"); } + read_state(start_job_if_required); } fn next_batch(state: &mut RuntimeState) -> Vec<(UserId, CanisterToNotifyOfUserPrincipalMigration)> { @@ -46,6 +46,8 @@ async fn notify_many(canisters: Vec<(UserId, CanisterToNotifyOfUserPrincipalMigr .collect(); futures::future::join_all(futures).await; + + read_state(start_job_if_required); } async fn notify(user_id: UserId, canister: CanisterToNotifyOfUserPrincipalMigration) { @@ -71,7 +73,6 @@ async fn notify(user_id: UserId, canister: CanisterToNotifyOfUserPrincipalMigrat Ok(_) => state.data.user_principal_migration_queue.mark_success(user_id), Err(_) => { state.data.user_principal_migration_queue.mark_failure(user_id, canister); - start_job_if_required(state); } }); } diff --git a/backend/canisters/user_index/impl/src/jobs/submit_message_to_modclub.rs b/backend/canisters/user_index/impl/src/jobs/submit_message_to_modclub.rs index 998dff3d7e..c60def5766 100644 --- a/backend/canisters/user_index/impl/src/jobs/submit_message_to_modclub.rs +++ b/backend/canisters/user_index/impl/src/jobs/submit_message_to_modclub.rs @@ -1,5 +1,5 @@ use crate::model::pending_modclub_submissions_queue::PendingModclubSubmission; -use crate::{mutate_state, RuntimeState}; +use crate::{mutate_state, read_state, RuntimeState}; use ic_cdk_timers::TimerId; use std::cell::Cell; use std::time::Duration; @@ -12,9 +12,8 @@ thread_local! { pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { if TIMER_ID.with(|t| t.get().is_none()) && !state.data.pending_modclub_submissions_queue.is_empty() { - let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run); + let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run); TIMER_ID.with(|t| t.set(Some(timer_id))); - trace!("'submit_message_to_modclub' job started"); true } else { false @@ -22,6 +21,9 @@ pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { } pub fn run() { + trace!("'submit_message_to_modclub' job running"); + TIMER_ID.set(None); + let (pending_submission, modclub_canister_id) = mutate_state(|state| { ( state.data.pending_modclub_submissions_queue.pop(), @@ -31,19 +33,20 @@ pub fn run() { if let Some(pending_submission) = pending_submission { ic_cdk::spawn(process_submission(modclub_canister_id, pending_submission)); - } else if let Some(timer_id) = TIMER_ID.with(|t| t.take()) { - ic_cdk_timers::clear_timer(timer_id); - trace!("'submit_message_to_modclub' job stopped"); } + + read_state(start_job_if_required); } async fn process_submission(modclub_canister_id: CanisterId, pending_submission: PendingModclubSubmission) { - if !submit_message(modclub_canister_id, &pending_submission).await { - mutate_state(|state| { + let success = submit_message(modclub_canister_id, &pending_submission).await; + + mutate_state(|state| { + if !success { state.data.pending_modclub_submissions_queue.push(pending_submission); - start_job_if_required(state); - }); - } + } + start_job_if_required(state); + }); } async fn submit_message(modclub_canister_id: CanisterId, pending_submission: &PendingModclubSubmission) -> bool { diff --git a/backend/canisters/user_index/impl/src/jobs/sync_events_to_local_user_index_canisters.rs b/backend/canisters/user_index/impl/src/jobs/sync_events_to_local_user_index_canisters.rs index 2c8b2d532e..624a40eb3d 100644 --- a/backend/canisters/user_index/impl/src/jobs/sync_events_to_local_user_index_canisters.rs +++ b/backend/canisters/user_index/impl/src/jobs/sync_events_to_local_user_index_canisters.rs @@ -11,13 +11,9 @@ thread_local! { } pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { - if TIMER_ID.get().is_none() - && !state.data.user_index_event_sync_queue.is_empty() - && !state.data.user_index_event_sync_queue.sync_in_progress() - { + if TIMER_ID.get().is_none() && !state.data.user_index_event_sync_queue.is_empty() { let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run); TIMER_ID.set(Some(timer_id)); - trace!("'sync_events_to_local_user_index_canisters' job started"); true } else { false @@ -25,11 +21,11 @@ pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { } pub fn run() { + trace!("'sync_events_to_local_user_index_canisters' job running"); + TIMER_ID.set(None); + if let Some(batch) = mutate_state(|state| state.data.user_index_event_sync_queue.try_start_batch()) { ic_cdk::spawn(process_batch(batch)); - } else if let Some(timer_id) = TIMER_ID.take() { - ic_cdk_timers::clear_timer(timer_id); - trace!("'sync_events_to_local_user_index_canisters' job stopped"); } } diff --git a/backend/canisters/user_index/impl/src/jobs/sync_users_to_storage_index.rs b/backend/canisters/user_index/impl/src/jobs/sync_users_to_storage_index.rs index a871b7dfab..5154b8fef8 100644 --- a/backend/canisters/user_index/impl/src/jobs/sync_users_to_storage_index.rs +++ b/backend/canisters/user_index/impl/src/jobs/sync_users_to_storage_index.rs @@ -11,13 +11,9 @@ thread_local! { } pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { - if TIMER_ID.get().is_none() - && !state.data.storage_index_user_sync_queue.is_empty() - && !state.data.storage_index_user_sync_queue.sync_in_progress() - { - let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run); + if TIMER_ID.get().is_none() && !state.data.storage_index_user_sync_queue.is_empty() { + let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run); TIMER_ID.set(Some(timer_id)); - trace!("'sync_users_to_storage_index' job started"); true } else { false @@ -25,6 +21,9 @@ pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { } fn run() { + trace!("'sync_users_to_storage_index' job running"); + TIMER_ID.set(None); + if let Some((canister_id, users)) = mutate_state(|state| { state .data @@ -33,9 +32,6 @@ fn run() { .map(|u| (state.data.storage_index_canister_id, u)) }) { ic_cdk::spawn(sync_users(canister_id, users)); - } else if let Some(timer_id) = TIMER_ID.take() { - ic_cdk_timers::clear_timer(timer_id); - trace!("'sync_users_to_storage_index' job stopped"); } } diff --git a/backend/canisters/user_index/impl/src/jobs/upgrade_canisters.rs b/backend/canisters/user_index/impl/src/jobs/upgrade_canisters.rs index 485409f9f5..5a233e2112 100644 --- a/backend/canisters/user_index/impl/src/jobs/upgrade_canisters.rs +++ b/backend/canisters/user_index/impl/src/jobs/upgrade_canisters.rs @@ -118,4 +118,5 @@ fn on_failure(canister_id: CanisterId, from_version: BuildVersion, to_version: B from_version, to_version, }); + start_job_if_required(state); } diff --git a/backend/canisters/user_index/impl/src/model/storage_index_user_sync_queue.rs b/backend/canisters/user_index/impl/src/model/storage_index_user_sync_queue.rs index 49649b6b6e..cba941e965 100644 --- a/backend/canisters/user_index/impl/src/model/storage_index_user_sync_queue.rs +++ b/backend/canisters/user_index/impl/src/model/storage_index_user_sync_queue.rs @@ -49,8 +49,4 @@ impl OpenStorageUserSyncQueue { pub fn is_empty(&self) -> bool { self.users.is_empty() } - - pub fn sync_in_progress(&self) -> bool { - self.sync_in_progress - } }