diff --git a/backend/canisters/storage_bucket/CHANGELOG.md b/backend/canisters/storage_bucket/CHANGELOG.md index 7315125904..56ac3e7d26 100644 --- a/backend/canisters/storage_bucket/CHANGELOG.md +++ b/backend/canisters/storage_bucket/CHANGELOG.md @@ -8,6 +8,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ### Changed +- Push any remaining events still queued in the old events system ([#7065](https://github.com/open-chat-labs/open-chat/pull/7065)) + +## [[2.0.1522](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1522-storage_bucket)] - 2024-12-16 + +### Changed + - Expose size of each virtual stable memory in metrics ([#6981](https://github.com/open-chat-labs/open-chat/pull/6981)) - Avoid having to regenerate rng seed after each upgrade ([#7043](https://github.com/open-chat-labs/open-chat/pull/7043)) - Use `GroupedTimerJobQueue` to sync events to storage index ([#7047](https://github.com/open-chat-labs/open-chat/pull/7047)) diff --git a/backend/canisters/storage_bucket/impl/src/jobs/remove_expired_files.rs b/backend/canisters/storage_bucket/impl/src/jobs/remove_expired_files.rs index 296dea024d..ce8f242c40 100644 --- a/backend/canisters/storage_bucket/impl/src/jobs/remove_expired_files.rs +++ b/backend/canisters/storage_bucket/impl/src/jobs/remove_expired_files.rs @@ -1,4 +1,4 @@ -use crate::model::index_sync_state::EventToSync; +use crate::model::index_event_batch::EventToSync; use crate::{mutate_state, RuntimeState}; use ic_cdk_timers::TimerId; use std::cell::Cell; diff --git a/backend/canisters/storage_bucket/impl/src/lib.rs b/backend/canisters/storage_bucket/impl/src/lib.rs index b2103daba8..b740718e8a 100644 --- a/backend/canisters/storage_bucket/impl/src/lib.rs +++ b/backend/canisters/storage_bucket/impl/src/lib.rs @@ -1,6 +1,6 @@ use crate::model::files::{Files, RemoveFileResult}; -use crate::model::index_event_batch::IndexEventBatch; -use crate::model::index_sync_state::{EventToSync, IndexSyncState}; +use crate::model::index_event_batch::{EventToSync, IndexEventBatch}; +use crate::model::index_sync_state::IndexSyncState; use crate::model::users::Users; use candid::{CandidType, Principal}; use canister_state_macros::canister_state; diff --git a/backend/canisters/storage_bucket/impl/src/lifecycle/post_upgrade.rs b/backend/canisters/storage_bucket/impl/src/lifecycle/post_upgrade.rs index 7b3b2c2de6..caa9c2bb15 100644 --- a/backend/canisters/storage_bucket/impl/src/lifecycle/post_upgrade.rs +++ b/backend/canisters/storage_bucket/impl/src/lifecycle/post_upgrade.rs @@ -1,5 +1,6 @@ use crate::lifecycle::{init_env, init_state}; use crate::memory::get_upgrades_memory; +use crate::model::index_event_batch::EventToSync; use crate::Data; use canister_logger::LogEntry; use canister_tracing_macros::trace; @@ -14,9 +15,26 @@ fn post_upgrade(args: Args) { let memory = get_upgrades_memory(); let reader = get_reader(&memory); - let (data, errors, logs, traces): (Data, Vec, Vec, Vec) = + let (mut data, errors, logs, traces): (Data, Vec, Vec, Vec) = msgpack::deserialize(reader).unwrap(); + data.index_event_sync_queue.set_defer_processing(true); + #[allow(deprecated)] + { + if let Some(args) = data.index_sync_state.args_to_retry.take() { + for added in args.files_added { + data.push_event_to_index(EventToSync::FileAdded(added)); + } + for removed in args.files_removed { + data.push_event_to_index(EventToSync::FileRemoved(removed)); + } + } + for event in std::mem::take(&mut data.index_sync_state.queue) { + data.push_event_to_index(event) + } + } + data.index_event_sync_queue.set_defer_processing(false); + canister_logger::init_with_logs(data.test_mode, errors, logs, traces); let env = init_env(data.rng_seed); diff --git a/backend/canisters/storage_bucket/impl/src/model/index_event_batch.rs b/backend/canisters/storage_bucket/impl/src/model/index_event_batch.rs index 05e5233bbc..d59f3f0e66 100644 --- a/backend/canisters/storage_bucket/impl/src/model/index_event_batch.rs +++ b/backend/canisters/storage_bucket/impl/src/model/index_event_batch.rs @@ -1,8 +1,9 @@ -use crate::model::index_sync_state::EventToSync; use crate::model::users::FileStatusInternal; use crate::{mutate_state, DATA_LIMIT_BYTES, MAX_EVENTS_TO_SYNC_PER_BATCH}; +use candid::Deserialize; +use serde::Serialize; use timer_job_queues::{TimerJobItem, TimerJobItemGroup}; -use types::CanisterId; +use types::{CanisterId, FileAdded, FileRemoved}; use utils::canister::should_retry_failed_c2c_call; pub struct IndexEventBatch { @@ -10,6 +11,12 @@ pub struct IndexEventBatch { events: Vec<(EventToSync, u64)>, } +#[derive(Serialize, Deserialize)] +pub enum EventToSync { + FileAdded(FileAdded), + FileRemoved(FileRemoved), +} + impl TimerJobItem for IndexEventBatch { async fn process(&self) -> Result<(), bool> { let mut args = storage_index_canister::c2c_sync_bucket::Args::default(); diff --git a/backend/canisters/storage_bucket/impl/src/model/index_sync_state.rs b/backend/canisters/storage_bucket/impl/src/model/index_sync_state.rs index 8154f80f87..376414eba3 100644 --- a/backend/canisters/storage_bucket/impl/src/model/index_sync_state.rs +++ b/backend/canisters/storage_bucket/impl/src/model/index_sync_state.rs @@ -1,22 +1,13 @@ +use crate::model::index_event_batch::EventToSync; use serde::{Deserialize, Serialize}; use std::collections::VecDeque; use storage_index_canister::c2c_sync_bucket::Args; -use types::{FileAdded, FileRemoved}; // We want to send events to the index in order, so while a sync is in progress we avoid sending // more events in case the first batch fails and the second succeeds. If a sync fails, the args that // were sent are stored so that they can be retried again. #[derive(Serialize, Deserialize, Default)] pub struct IndexSyncState { - queue: VecDeque, - in_progress: bool, - args_to_retry: Option, -} - -impl IndexSyncState {} - -#[derive(Serialize, Deserialize)] -pub enum EventToSync { - FileAdded(FileAdded), - FileRemoved(FileRemoved), + pub queue: VecDeque, + pub args_to_retry: Option, } diff --git a/backend/canisters/storage_bucket/impl/src/updates/c2c_sync_index.rs b/backend/canisters/storage_bucket/impl/src/updates/c2c_sync_index.rs index 08d2fa318d..9f8de7f092 100644 --- a/backend/canisters/storage_bucket/impl/src/updates/c2c_sync_index.rs +++ b/backend/canisters/storage_bucket/impl/src/updates/c2c_sync_index.rs @@ -1,6 +1,6 @@ use crate::guards::caller_is_storage_index_canister; use crate::model::files::RemoveFileResult; -use crate::model::index_sync_state::EventToSync; +use crate::model::index_event_batch::EventToSync; use crate::{mutate_state, RuntimeState, MAX_EVENTS_TO_SYNC_PER_BATCH}; use canister_tracing_macros::trace; use ic_cdk::update; diff --git a/backend/canisters/storage_bucket/impl/src/updates/forward_file.rs b/backend/canisters/storage_bucket/impl/src/updates/forward_file.rs index 467d89153e..9c82cd64d2 100644 --- a/backend/canisters/storage_bucket/impl/src/updates/forward_file.rs +++ b/backend/canisters/storage_bucket/impl/src/updates/forward_file.rs @@ -1,6 +1,6 @@ use crate::guards::caller_is_known_user; use crate::model::files::ForwardFileResult; -use crate::model::index_sync_state::EventToSync; +use crate::model::index_event_batch::EventToSync; use crate::model::users::{FileStatusInternal, IndexSyncComplete}; use crate::{mutate_state, RuntimeState}; use canister_tracing_macros::trace; diff --git a/backend/canisters/storage_bucket/impl/src/updates/upload_chunk.rs b/backend/canisters/storage_bucket/impl/src/updates/upload_chunk.rs index 8c3d94d79b..58de50767c 100644 --- a/backend/canisters/storage_bucket/impl/src/updates/upload_chunk.rs +++ b/backend/canisters/storage_bucket/impl/src/updates/upload_chunk.rs @@ -1,6 +1,6 @@ use crate::guards::caller_is_known_user; use crate::model::files::{PutChunkArgs, PutChunkResult}; -use crate::model::index_sync_state::EventToSync; +use crate::model::index_event_batch::EventToSync; use crate::model::users::{FileStatusInternal, IndexSyncComplete}; use crate::{mutate_state, RuntimeState}; use canister_tracing_macros::trace; diff --git a/backend/canisters/storage_index/CHANGELOG.md b/backend/canisters/storage_index/CHANGELOG.md index 9977945e5e..9527092911 100644 --- a/backend/canisters/storage_index/CHANGELOG.md +++ b/backend/canisters/storage_index/CHANGELOG.md @@ -6,6 +6,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ## [unreleased] +## [[2.0.1521](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1521-storage_index)] - 2024-12-16 + ### Changed - Update the canister creation fee to 0.5T ([#6700](https://github.com/open-chat-labs/open-chat/pull/6700)) diff --git a/backend/canisters/storage_index/impl/src/lib.rs b/backend/canisters/storage_index/impl/src/lib.rs index d573fb2b32..a74c7e1fb7 100644 --- a/backend/canisters/storage_index/impl/src/lib.rs +++ b/backend/canisters/storage_index/impl/src/lib.rs @@ -1,5 +1,4 @@ -use crate::model::bucket_event_batch::BucketEventBatch; -use crate::model::bucket_sync_state::EventToSync; +use crate::model::bucket_event_batch::{BucketEventBatch, EventToSync}; use crate::model::buckets::{BucketRecord, Buckets}; use crate::model::files::Files; use candid::{CandidType, Principal}; @@ -104,7 +103,6 @@ struct Data { pub users: HashMap, pub files: Files, pub buckets: Buckets, - #[serde(default = "bucket_event_sync_queue")] pub bucket_event_sync_queue: GroupedTimerJobQueue, pub canisters_requiring_upgrade: CanistersRequiringUpgrade, pub total_cycles_spent_on_canisters: Cycles, @@ -113,10 +111,6 @@ struct Data { pub test_mode: bool, } -fn bucket_event_sync_queue() -> GroupedTimerJobQueue { - GroupedTimerJobQueue::new(5, false) -} - impl Data { fn new( user_controllers: Vec, diff --git a/backend/canisters/storage_index/impl/src/lifecycle/post_upgrade.rs b/backend/canisters/storage_index/impl/src/lifecycle/post_upgrade.rs index c115414511..65ddf3f852 100644 --- a/backend/canisters/storage_index/impl/src/lifecycle/post_upgrade.rs +++ b/backend/canisters/storage_index/impl/src/lifecycle/post_upgrade.rs @@ -14,17 +14,9 @@ fn post_upgrade(args: Args) { let memory = get_upgrades_memory(); let reader = get_reader(&memory); - let (mut data, errors, logs, traces): (Data, Vec, Vec, Vec) = + let (data, errors, logs, traces): (Data, Vec, Vec, Vec) = msgpack::deserialize(reader).unwrap(); - data.bucket_event_sync_queue.set_defer_processing(true); - for bucket in data.buckets.iter_mut() { - #[allow(deprecated)] - data.bucket_event_sync_queue - .push_many(bucket.canister_id, bucket.sync_state.take()); - } - data.bucket_event_sync_queue.set_defer_processing(false); - canister_logger::init_with_logs(data.test_mode, errors, logs, traces); let env = init_env(data.rng_seed); diff --git a/backend/canisters/storage_index/impl/src/model/bucket_event_batch.rs b/backend/canisters/storage_index/impl/src/model/bucket_event_batch.rs index 27a4618475..6985fffcc1 100644 --- a/backend/canisters/storage_index/impl/src/model/bucket_event_batch.rs +++ b/backend/canisters/storage_index/impl/src/model/bucket_event_batch.rs @@ -1,7 +1,8 @@ -use crate::model::bucket_sync_state::EventToSync; use crate::MAX_EVENTS_TO_SYNC_PER_BATCH; +use candid::Principal; +use serde::{Deserialize, Serialize}; use timer_job_queues::{TimerJobItem, TimerJobItemGroup}; -use types::CanisterId; +use types::{AccessorId, CanisterId, FileId}; use utils::canister::should_retry_failed_c2c_call; pub struct BucketEventBatch { @@ -9,6 +10,15 @@ pub struct BucketEventBatch { events: Vec, } +#[derive(Serialize, Deserialize, Clone)] +pub enum EventToSync { + UserAdded(Principal), + UserRemoved(Principal), + AccessorRemoved(AccessorId), + UserIdUpdated(Principal, Principal), + FileToRemove(FileId), +} + impl TimerJobItem for BucketEventBatch { async fn process(&self) -> Result<(), bool> { let mut args = storage_bucket_canister::c2c_sync_index::Args::default(); diff --git a/backend/canisters/storage_index/impl/src/model/bucket_sync_state.rs b/backend/canisters/storage_index/impl/src/model/bucket_sync_state.rs deleted file mode 100644 index e457040240..0000000000 --- a/backend/canisters/storage_index/impl/src/model/bucket_sync_state.rs +++ /dev/null @@ -1,50 +0,0 @@ -use candid::Principal; -use serde::{Deserialize, Serialize}; -use std::collections::VecDeque; -use storage_bucket_canister::c2c_sync_index::Args; -use types::{AccessorId, FileId}; - -// We want to send events to the each bucket in order, so while a sync is in progress we avoid sending -// more events in case the first batch fails and the second succeeds. If a sync fails, the args that -// were sent are stored so that they can be retried again. -#[derive(Serialize, Deserialize, Default)] -pub struct BucketSyncState { - queue: VecDeque, - in_progress: bool, - args_to_retry: Option, -} - -impl BucketSyncState { - pub fn take(&mut self) -> Vec { - assert!(!self.in_progress); - let mut events = Vec::new(); - if let Some(args) = self.args_to_retry.take() { - for principal in args.users_added { - events.push(EventToSync::UserAdded(principal)); - } - for principal in args.users_removed { - events.push(EventToSync::UserRemoved(principal)); - } - for principal in args.accessors_removed { - events.push(EventToSync::AccessorRemoved(principal)); - } - for (old, new) in args.user_ids_updated { - events.push(EventToSync::UserIdUpdated(old, new)); - } - for file_id in args.files_to_remove { - events.push(EventToSync::FileToRemove(file_id)); - } - } - events.extend(std::mem::take(&mut self.queue)); - events - } -} - -#[derive(Serialize, Deserialize, Clone)] -pub enum EventToSync { - UserAdded(Principal), - UserRemoved(Principal), - AccessorRemoved(AccessorId), - UserIdUpdated(Principal, Principal), - FileToRemove(FileId), -} diff --git a/backend/canisters/storage_index/impl/src/model/buckets.rs b/backend/canisters/storage_index/impl/src/model/buckets.rs index 9e17e7fd8c..a5bc7a6e75 100644 --- a/backend/canisters/storage_index/impl/src/model/buckets.rs +++ b/backend/canisters/storage_index/impl/src/model/buckets.rs @@ -1,4 +1,3 @@ -use crate::model::bucket_sync_state::BucketSyncState; use crate::BucketMetrics; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -96,10 +95,6 @@ impl Buckets { pub fn iter_full_buckets(&self) -> impl Iterator { self.full_buckets.values() } - - pub fn iter_mut(&mut self) -> impl Iterator { - self.active_buckets.iter_mut().chain(self.full_buckets.values_mut()) - } } #[derive(Serialize, Deserialize)] @@ -108,20 +103,16 @@ pub struct BucketRecord { pub wasm_version: BuildVersion, pub bytes_used: u64, pub bytes_remaining: i64, - #[deprecated] - pub sync_state: BucketSyncState, pub cycle_top_ups: Vec, } impl BucketRecord { pub fn new(canister_id: CanisterId, wasm_version: BuildVersion) -> BucketRecord { - #[allow(deprecated)] BucketRecord { canister_id, wasm_version, bytes_used: 0, bytes_remaining: 0, - sync_state: BucketSyncState::default(), cycle_top_ups: Vec::new(), } } diff --git a/backend/canisters/storage_index/impl/src/model/mod.rs b/backend/canisters/storage_index/impl/src/model/mod.rs index 117cdf25e8..8de7ea779f 100644 --- a/backend/canisters/storage_index/impl/src/model/mod.rs +++ b/backend/canisters/storage_index/impl/src/model/mod.rs @@ -1,4 +1,3 @@ pub mod bucket_event_batch; -pub mod bucket_sync_state; pub mod buckets; pub mod files; diff --git a/backend/canisters/storage_index/impl/src/updates/add_or_update_users.rs b/backend/canisters/storage_index/impl/src/updates/add_or_update_users.rs index 58e47bcc7a..67740c754c 100644 --- a/backend/canisters/storage_index/impl/src/updates/add_or_update_users.rs +++ b/backend/canisters/storage_index/impl/src/updates/add_or_update_users.rs @@ -1,5 +1,5 @@ use crate::guards::caller_is_user_controller; -use crate::model::bucket_sync_state::EventToSync; +use crate::model::bucket_event_batch::EventToSync; use crate::{mutate_state, RuntimeState, UserRecordInternal}; use canister_tracing_macros::trace; use ic_cdk::update; diff --git a/backend/canisters/storage_index/impl/src/updates/c2c_update_user_principal.rs b/backend/canisters/storage_index/impl/src/updates/c2c_update_user_principal.rs index 56f944f181..8aaae64c78 100644 --- a/backend/canisters/storage_index/impl/src/updates/c2c_update_user_principal.rs +++ b/backend/canisters/storage_index/impl/src/updates/c2c_update_user_principal.rs @@ -1,5 +1,5 @@ use crate::guards::caller_is_user_controller; -use crate::model::bucket_sync_state::EventToSync; +use crate::model::bucket_event_batch::EventToSync; use crate::{mutate_state, RuntimeState}; use canister_api_macros::update; use canister_tracing_macros::trace; diff --git a/backend/canisters/storage_index/impl/src/updates/remove_accessor.rs b/backend/canisters/storage_index/impl/src/updates/remove_accessor.rs index 90454be6a1..b59b2402c2 100644 --- a/backend/canisters/storage_index/impl/src/updates/remove_accessor.rs +++ b/backend/canisters/storage_index/impl/src/updates/remove_accessor.rs @@ -1,5 +1,5 @@ use crate::guards::caller_is_user_controller; -use crate::model::bucket_sync_state::EventToSync; +use crate::model::bucket_event_batch::EventToSync; use crate::{mutate_state, RuntimeState}; use canister_tracing_macros::trace; use ic_cdk::update; diff --git a/backend/canisters/storage_index/impl/src/updates/remove_user.rs b/backend/canisters/storage_index/impl/src/updates/remove_user.rs index 220d6e2e0f..ad644a29c9 100644 --- a/backend/canisters/storage_index/impl/src/updates/remove_user.rs +++ b/backend/canisters/storage_index/impl/src/updates/remove_user.rs @@ -1,5 +1,5 @@ use crate::guards::caller_is_user_controller; -use crate::model::bucket_sync_state::EventToSync; +use crate::model::bucket_event_batch::EventToSync; use crate::{mutate_state, RuntimeState}; use canister_tracing_macros::trace; use ic_cdk::update;