Skip to content

Commit

Permalink
Avoid using heartbeat to upgrade Group canisters (#6643)
Browse files Browse the repository at this point in the history
  • Loading branch information
hpeebles authored Oct 23, 2024
1 parent bdaf6d8 commit 4c7d7de
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 128 deletions.
1 change: 1 addition & 0 deletions backend/canisters/local_group_index/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### Changed

- Maintain set of which canisters have not yet migrated all events to stable memory ([#6603](https://github.com/open-chat-labs/open-chat/pull/6603))
- Avoid using `heartbeat` to upgrade Group canisters ([#6643](https://github.com/open-chat-labs/open-chat/pull/6643))

## [[2.0.1383](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1383-local_group_index)] - 2024-10-11

Expand Down
7 changes: 7 additions & 0 deletions backend/canisters/local_group_index/impl/src/jobs/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use crate::RuntimeState;

pub mod upgrade_groups;

pub(crate) fn start(state: &RuntimeState) {
upgrade_groups::start_job_if_required(state);
}
162 changes: 162 additions & 0 deletions backend/canisters/local_group_index/impl/src/jobs/upgrade_groups.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
use crate::{mutate_state, RuntimeState};
use ic_cdk::api::management_canister::main::CanisterInstallMode;
use ic_cdk_timers::TimerId;
use local_group_index_canister::ChildCanisterType;
use std::cell::Cell;
use std::time::Duration;
use tracing::trace;
use types::{BuildVersion, CanisterId, ChatId, Cycles, CyclesTopUp};
use utils::canister;
use utils::canister::{install, ChunkedWasmToInstall, FailedUpgrade, WasmToInstall};
use utils::consts::min_cycles_balance;

type CanisterToUpgrade = canister::CanisterToInstall<group_canister::post_upgrade::Args>;

thread_local! {
static TIMER_ID: Cell<Option<TimerId>> = Cell::default();
}

pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool {
if TIMER_ID.get().is_none()
&& (state.data.groups_requiring_upgrade.count_pending() > 0
|| state.data.groups_requiring_upgrade.count_in_progress() > 0)
{
let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run);
TIMER_ID.set(Some(timer_id));
trace!("'upgrade_groups' job started");
true
} else {
false
}
}

fn run() {
if let Some(batch) = mutate_state(next_batch) {
if !batch.is_empty() {
ic_cdk::spawn(perform_upgrades(batch));
}
} else if let Some(timer_id) = TIMER_ID.take() {
ic_cdk_timers::clear_timer(timer_id);
trace!("'upgrade_groups' job stopped");
}
}

fn next_batch(state: &mut RuntimeState) -> Option<Vec<CanisterToUpgrade>> {
if state.data.event_store_client.info().events_pending > 100000 {
return Some(Vec::new());
}

let count_in_progress = state.data.groups_requiring_upgrade.count_in_progress();
let count_pending = state.data.groups_requiring_upgrade.count_pending();

if count_in_progress == 0 && count_pending == 0 {
None
} else {
let group_upgrade_concurrency = state.data.group_upgrade_concurrency as usize;

Some(
(0..(group_upgrade_concurrency.saturating_sub(count_in_progress)))
.map_while(|_| try_get_next(state))
.collect(),
)
}
}

fn try_get_next(state: &mut RuntimeState) -> Option<CanisterToUpgrade> {
let (canister_id, force) = state.data.groups_requiring_upgrade.try_take_next()?;

initialize_upgrade(canister_id, force, state).or_else(|| {
state.data.groups_requiring_upgrade.mark_skipped(&canister_id);
None
})
}

fn initialize_upgrade(canister_id: CanisterId, force: bool, state: &mut RuntimeState) -> Option<CanisterToUpgrade> {
let chat_id = canister_id.into();
let group = state.data.local_groups.get_mut(&chat_id)?;
let group_canister_wasm = state.data.child_canister_wasms.get(ChildCanisterType::Group);
let current_wasm_version = group.wasm_version;
let new_wasm_version = group_canister_wasm.wasm.version;
let deposit_cycles_if_needed = ic_cdk::api::canister_balance128() > min_cycles_balance(state.data.test_mode);

if current_wasm_version == new_wasm_version && !force {
return None;
}

group.set_canister_upgrade_status(true, None);

Some(CanisterToUpgrade {
canister_id,
current_wasm_version,
new_wasm_version,
new_wasm: if group_canister_wasm.chunks.is_empty() {
WasmToInstall::Default(group_canister_wasm.wasm.module.clone())
} else {
WasmToInstall::Chunked(ChunkedWasmToInstall {
chunks: group_canister_wasm.chunks.clone(),
wasm_hash: group_canister_wasm.wasm_hash,
store_canister_id: state.env.canister_id(),
})
},
deposit_cycles_if_needed,
args: group_canister::post_upgrade::Args {
wasm_version: new_wasm_version,
},
mode: CanisterInstallMode::Upgrade(None),
stop_start_canister: true,
})
}

async fn perform_upgrades(canisters_to_upgrade: Vec<CanisterToUpgrade>) {
let futures: Vec<_> = canisters_to_upgrade.into_iter().map(perform_upgrade).collect();

futures::future::join_all(futures).await;
}

async fn perform_upgrade(canister_to_upgrade: CanisterToUpgrade) {
let canister_id = canister_to_upgrade.canister_id;
let from_version = canister_to_upgrade.current_wasm_version;
let to_version = canister_to_upgrade.new_wasm_version;

match install(canister_to_upgrade).await {
Ok(cycles_top_up) => {
mutate_state(|state| on_success(canister_id, to_version, cycles_top_up, state));
}
Err(_) => {
mutate_state(|state| on_failure(canister_id, from_version, to_version, state));
}
}
}

fn on_success(canister_id: CanisterId, to_version: BuildVersion, top_up: Option<Cycles>, state: &mut RuntimeState) {
let chat_id = canister_id.into();
mark_upgrade_complete(chat_id, Some(to_version), state);

if let Some(top_up) = top_up {
state.data.local_groups.mark_cycles_top_up(
&chat_id,
CyclesTopUp {
amount: top_up,
date: state.env.now(),
},
);
}

state.data.groups_requiring_upgrade.mark_success(&canister_id);
}

fn on_failure(canister_id: CanisterId, from_version: BuildVersion, to_version: BuildVersion, state: &mut RuntimeState) {
mark_upgrade_complete(canister_id.into(), None, state);

state.data.groups_requiring_upgrade.mark_failure(FailedUpgrade {
canister_id,
from_version,
to_version,
});
}

fn mark_upgrade_complete(chat_id: ChatId, new_wasm_version: Option<BuildVersion>, state: &mut RuntimeState) {
if let Some(group) = state.data.local_groups.get_mut(&chat_id) {
group.set_canister_upgrade_status(false, new_wasm_version);
}
}
1 change: 1 addition & 0 deletions backend/canisters/local_group_index/impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use utils::env::Environment;
use utils::time::MINUTE_IN_MS;

mod guards;
mod jobs;
mod lifecycle;
mod memory;
mod model;
Expand Down
128 changes: 1 addition & 127 deletions backend/canisters/local_group_index/impl/src/lifecycle/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,142 +2,16 @@ use crate::{mutate_state, read_state, RuntimeState, GROUP_CANISTER_INITIAL_CYCLE
use ic_cdk::api::management_canister::main::CanisterInstallMode;
use ic_cdk::heartbeat;
use local_group_index_canister::ChildCanisterType;
use types::{BuildVersion, CanisterId, ChatId, CommunityId, Cycles, CyclesTopUp};
use types::{BuildVersion, CanisterId, CommunityId, Cycles, CyclesTopUp};
use utils::canister::{self, ChunkedWasmToInstall, FailedUpgrade, WasmToInstall};
use utils::consts::{min_cycles_balance, CREATE_CANISTER_CYCLES_FEE};

#[heartbeat]
fn heartbeat() {
upgrade_groups::run();
upgrade_communities::run();
topup_canister_pool::run();
}

mod upgrade_groups {
use super::*;

type CanisterToUpgrade = canister::CanisterToInstall<group_canister::post_upgrade::Args>;

pub fn run() {
let canisters_to_upgrade = mutate_state(next_batch);
if !canisters_to_upgrade.is_empty() {
ic_cdk::spawn(perform_upgrades(canisters_to_upgrade));
}
}

fn next_batch(state: &mut RuntimeState) -> Vec<CanisterToUpgrade> {
if state.data.event_store_client.info().events_pending > 100000 {
return Vec::new();
}

let count_in_progress = state.data.groups_requiring_upgrade.count_in_progress();
let group_upgrade_concurrency = state.data.group_upgrade_concurrency as usize;

(0..(group_upgrade_concurrency.saturating_sub(count_in_progress)))
.map_while(|_| try_get_next(state))
.collect()
}

fn try_get_next(state: &mut RuntimeState) -> Option<CanisterToUpgrade> {
let (canister_id, force) = state.data.groups_requiring_upgrade.try_take_next()?;

initialize_upgrade(canister_id, force, state).or_else(|| {
state.data.groups_requiring_upgrade.mark_skipped(&canister_id);
None
})
}

fn initialize_upgrade(canister_id: CanisterId, force: bool, state: &mut RuntimeState) -> Option<CanisterToUpgrade> {
let chat_id = canister_id.into();
let group = state.data.local_groups.get_mut(&chat_id)?;
let group_canister_wasm = state.data.child_canister_wasms.get(ChildCanisterType::Group);
let current_wasm_version = group.wasm_version;
let new_wasm_version = group_canister_wasm.wasm.version;
let deposit_cycles_if_needed = ic_cdk::api::canister_balance128() > min_cycles_balance(state.data.test_mode);

if current_wasm_version == new_wasm_version && !force {
return None;
}

group.set_canister_upgrade_status(true, None);

Some(CanisterToUpgrade {
canister_id,
current_wasm_version,
new_wasm_version,
new_wasm: if group_canister_wasm.chunks.is_empty() {
WasmToInstall::Default(group_canister_wasm.wasm.module.clone())
} else {
WasmToInstall::Chunked(ChunkedWasmToInstall {
chunks: group_canister_wasm.chunks.clone(),
wasm_hash: group_canister_wasm.wasm_hash,
store_canister_id: state.env.canister_id(),
})
},
deposit_cycles_if_needed,
args: group_canister::post_upgrade::Args {
wasm_version: new_wasm_version,
},
mode: CanisterInstallMode::Upgrade(None),
stop_start_canister: true,
})
}

async fn perform_upgrades(canisters_to_upgrade: Vec<CanisterToUpgrade>) {
let futures: Vec<_> = canisters_to_upgrade.into_iter().map(perform_upgrade).collect();

futures::future::join_all(futures).await;
}

async fn perform_upgrade(canister_to_upgrade: CanisterToUpgrade) {
let canister_id = canister_to_upgrade.canister_id;
let from_version = canister_to_upgrade.current_wasm_version;
let to_version = canister_to_upgrade.new_wasm_version;

match utils::canister::install(canister_to_upgrade).await {
Ok(_) => {
mutate_state(|state| on_success(canister_id, to_version, None, state));
}
Err(_) => {
mutate_state(|state| on_failure(canister_id, from_version, to_version, state));
}
}
}

fn on_success(canister_id: CanisterId, to_version: BuildVersion, top_up: Option<Cycles>, state: &mut RuntimeState) {
let chat_id = canister_id.into();
mark_upgrade_complete(chat_id, Some(to_version), state);

if let Some(top_up) = top_up {
state.data.local_groups.mark_cycles_top_up(
&chat_id,
CyclesTopUp {
amount: top_up,
date: state.env.now(),
},
);
}

state.data.groups_requiring_upgrade.mark_success(&canister_id);
}

fn on_failure(canister_id: CanisterId, from_version: BuildVersion, to_version: BuildVersion, state: &mut RuntimeState) {
mark_upgrade_complete(canister_id.into(), None, state);

state.data.groups_requiring_upgrade.mark_failure(FailedUpgrade {
canister_id,
from_version,
to_version,
});
}

fn mark_upgrade_complete(chat_id: ChatId, new_wasm_version: Option<BuildVersion>, state: &mut RuntimeState) {
if let Some(group) = state.data.local_groups.get_mut(&chat_id) {
group.set_canister_upgrade_status(false, new_wasm_version);
}
}
}

mod upgrade_communities {
use super::*;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ fn init_state(env: Box<dyn Environment>, data: Data, wasm_version: BuildVersion)
let now = env.now();
let state = RuntimeState::new(env, data);

crate::jobs::start(&state);
crate::init_state(state);
WASM_VERSION.set(Timestamped::new(wasm_version, now));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ fn commit(args: Args, wasm: CanisterWasm, chunks: Vec<Hash>, state: &mut Runtime
{
state.data.groups_requiring_upgrade.enqueue(canister_id, false);
}
crate::jobs::upgrade_groups::start_job_if_required(state);

state.data.groups_requiring_upgrade.clear_failed(BuildVersion {
major: version.major,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fn run() {

fn next_batch(state: &mut RuntimeState) -> Option<Vec<CanisterToUpgrade>> {
if state.data.event_store_client.info().events_pending > 100000 {
return None;
return Some(Vec::new());
}

let count_in_progress = state.data.canisters_requiring_upgrade.count_in_progress();
Expand Down

0 comments on commit 4c7d7de

Please sign in to comment.