Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use GroupedTimerJobQueue to sync events to storage index #7047

Merged
merged 3 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/canisters/storage_bucket/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

- Expose size of each virtual stable memory in metrics ([#6981](https://github.com/open-chat-labs/open-chat/pull/6981))
- Avoid having to regenerate rng seed after each upgrade ([#7043](https://github.com/open-chat-labs/open-chat/pull/7043))
- Use `GroupedTimerJobQueue` to sync events to storage index ([#7047](https://github.com/open-chat-labs/open-chat/pull/7047))

## [[2.0.1420](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1420-storage_bucket)] - 2024-10-28

Expand Down
1 change: 1 addition & 0 deletions backend/canisters/storage_bucket/impl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ stable_memory = { path = "../../../libraries/stable_memory" }
storage_bucket_canister = { path = "../api" }
storage_index_canister = { path = "../../storage_index/api" }
storage_index_canister_c2c_client = { path = "../../storage_index/c2c_client" }
timer_job_queues = { path = "../../../libraries/timer_job_queues" }
tracing = { workspace = true }
types = { path = "../../../libraries/types" }
utils = { path = "../../../libraries/utils" }
2 changes: 0 additions & 2 deletions backend/canisters/storage_bucket/impl/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ use crate::RuntimeState;
pub mod calculate_freezing_limit;
pub mod check_cycles_balance;
pub mod remove_expired_files;
pub mod sync_index;

pub(crate) fn start(state: &RuntimeState) {
calculate_freezing_limit::start_job();
check_cycles_balance::start_job();
remove_expired_files::start_job_if_required(state);
sync_index::start_job_if_required(&state.data);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ fn run() {
mutate_state(|state| {
let now = state.env.now();
for file in state.data.files.remove_expired_files(now, 10) {
state.data.index_sync_state.enqueue(EventToSync::FileRemoved(file));
state.data.push_event_to_index(EventToSync::FileRemoved(file));
}
crate::jobs::sync_index::start_job_if_required(&state.data);
start_job_if_required(state);
});
}
81 changes: 0 additions & 81 deletions backend/canisters/storage_bucket/impl/src/jobs/sync_index.rs

This file was deleted.

17 changes: 14 additions & 3 deletions backend/canisters/storage_bucket/impl/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use crate::model::files::{Files, RemoveFileResult};
use crate::model::index_event_batch::IndexEventBatch;
use crate::model::index_sync_state::{EventToSync, IndexSyncState};
use crate::model::users::Users;
use candid::{CandidType, Principal};
use canister_state_macros::canister_state;
use serde::{Deserialize, Serialize};
use std::cell::RefCell;
use std::collections::BTreeMap;
use timer_job_queues::GroupedTimerJobQueue;
use types::{BuildVersion, CanisterId, Cycles, FileId, TimestampMillis, Timestamped};
use utils::env::Environment;

Expand Down Expand Up @@ -65,7 +67,7 @@ impl RuntimeState {
user_count: self.data.users.len() as u64,
file_count: file_metrics.file_count,
blob_count: file_metrics.blob_count,
index_sync_queue_length: self.data.index_sync_state.queue_len(),
index_sync_queue_length: self.data.index_event_sync_queue.len() as u32,
freezing_limit: self.data.freezing_limit.value.unwrap_or_default(),
stable_memory_sizes: memory::memory_sizes(),
}
Expand All @@ -77,20 +79,29 @@ struct Data {
storage_index_canister_id: CanisterId,
users: Users,
files: Files,
#[deprecated]
index_sync_state: IndexSyncState,
#[serde(default = "index_event_sync_queue")]
index_event_sync_queue: GroupedTimerJobQueue<IndexEventBatch>,
created: TimestampMillis,
freezing_limit: Timestamped<Option<Cycles>>,
rng_seed: [u8; 32],
test_mode: bool,
}

fn index_event_sync_queue() -> GroupedTimerJobQueue<IndexEventBatch> {
GroupedTimerJobQueue::new(1, false)
}

impl Data {
pub fn new(storage_index_canister_id: CanisterId, now: TimestampMillis, test_mode: bool) -> Data {
#[allow(deprecated)]
Data {
storage_index_canister_id,
users: Users::default(),
files: Files::default(),
index_sync_state: IndexSyncState::default(),
index_event_sync_queue: GroupedTimerJobQueue::new(1, false),
created: now,
freezing_limit: Timestamped::default(),
rng_seed: [0; 32],
Expand All @@ -109,8 +120,8 @@ impl Data {
}

pub fn push_event_to_index(&mut self, event_to_sync: EventToSync) {
self.index_sync_state.enqueue(event_to_sync);
jobs::sync_index::start_job_if_required(self);
self.index_event_sync_queue
.push(self.storage_index_canister_id, (event_to_sync, self.files.bytes_used()));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use crate::model::index_sync_state::EventToSync;
use crate::model::users::FileStatusInternal;
use crate::{mutate_state, DATA_LIMIT_BYTES, MAX_EVENTS_TO_SYNC_PER_BATCH};
use timer_job_queues::{TimerJobItem, TimerJobItemGroup};
use types::CanisterId;
use utils::canister::should_retry_failed_c2c_call;

pub struct IndexEventBatch {
canister_id: CanisterId,
events: Vec<(EventToSync, u64)>,
}

impl TimerJobItem for IndexEventBatch {
async fn process(&self) -> Result<(), bool> {
let mut args = storage_index_canister::c2c_sync_bucket::Args::default();

for (event, bytes_used) in &self.events {
match event {
EventToSync::FileAdded(file) => {
args.files_added.push(file.clone());
args.bytes_used = *bytes_used;
}
EventToSync::FileRemoved(file) => {
args.files_removed.push(file.clone());
}
}
args.bytes_used = *bytes_used;
args.bytes_remaining = (DATA_LIMIT_BYTES as i64) - (args.bytes_used as i64);
}

let response = storage_index_canister_c2c_client::c2c_sync_bucket(self.canister_id, &args).await;

match response {
Ok(storage_index_canister::c2c_sync_bucket::Response::Success(result)) => {
mutate_state(|state| {
for file in result.files_rejected {
let file_id = file.file_id;
let reason = file.reason.into();

if let Some(user_id) = state.data.files.owner(&file.file_id) {
if let Some(user) = state.data.users.get_mut(&user_id) {
let old_status = user.set_file_status(file_id, FileStatusInternal::Rejected(reason));

if let Some(FileStatusInternal::Uploading(_)) = old_status {
state.data.files.remove_pending_file(&file_id);
} else {
state.data.files.remove(user_id, file_id);
}
}
}
}
});
Ok(())
}
Err((code, msg)) => {
let retry = should_retry_failed_c2c_call(code, &msg);
Err(retry)
}
}
}
}

impl TimerJobItemGroup for IndexEventBatch {
type Key = CanisterId;
type Item = (EventToSync, u64);

fn new(canister_id: Self::Key) -> Self {
IndexEventBatch {
canister_id,
events: Vec::new(),
}
}

fn key(&self) -> Self::Key {
self.canister_id
}

fn add(&mut self, item: Self::Item) {
self.events.push(item);
}

fn into_items(self) -> Vec<Self::Item> {
self.events
}

fn is_full(&self) -> bool {
self.events.len() >= MAX_EVENTS_TO_SYNC_PER_BATCH
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::MAX_EVENTS_TO_SYNC_PER_BATCH;
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use storage_index_canister::c2c_sync_bucket::Args;
Expand All @@ -14,63 +13,7 @@ pub struct IndexSyncState {
args_to_retry: Option<Args>,
}

impl IndexSyncState {
pub fn enqueue(&mut self, event: EventToSync) {
self.queue.push_back(event);
}

pub fn pop_args_for_next_sync(&mut self, bytes_used: u64, bytes_remaining: i64) -> Option<Args> {
if self.in_progress {
None
} else if let Some(args) = self.args_to_retry.take() {
self.in_progress = true;
Some(args)
} else if self.queue.is_empty() {
None
} else {
let mut args = Args {
files_added: Vec::new(),
files_removed: Vec::new(),
bytes_used,
bytes_remaining,
};

for _ in 0..MAX_EVENTS_TO_SYNC_PER_BATCH {
if let Some(event) = self.queue.pop_front() {
match event {
EventToSync::FileAdded(a) => args.files_added.push(a),
EventToSync::FileRemoved(r) => args.files_removed.push(r),
}
} else {
break;
}
}
self.in_progress = true;
Some(args)
}
}

pub fn in_progress(&self) -> bool {
self.in_progress
}

pub fn is_empty(&self) -> bool {
self.queue.is_empty() && !self.in_progress && self.args_to_retry.is_none()
}

pub fn mark_sync_completed(&mut self) {
self.in_progress = false;
}

pub fn mark_sync_failed(&mut self, args: Args) {
self.in_progress = false;
self.args_to_retry = Some(args);
}

pub fn queue_len(&self) -> u32 {
self.queue.len() as u32
}
}
impl IndexSyncState {}

#[derive(Serialize, Deserialize)]
pub enum EventToSync {
Expand Down
1 change: 1 addition & 0 deletions backend/canisters/storage_bucket/impl/src/model/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod files;
pub mod index_event_batch;
pub mod index_sync_state;
pub mod stable_blob_storage;
pub mod users;
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use candid::CandidType;
use serde::{Deserialize, Serialize};
use types::{FileAdded, FileRejected, FileRemoved};

#[derive(CandidType, Serialize, Deserialize, Debug)]
#[derive(CandidType, Serialize, Deserialize, Debug, Default)]
pub struct Args {
pub files_added: Vec<FileAdded>,
pub files_removed: Vec<FileRemoved>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl<T: TimerJobItemGroup> GroupedTimerJobQueue<T> {
}

pub fn len(&self) -> usize {
self.within_lock(|i| i.queue.len())
self.within_lock(|i| i.items_map.values().map(|v| v.len()).sum())
}

pub fn is_empty(&self) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion backend/libraries/types/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
use ts_export::ts_export;

#[ts_export]
#[derive(CandidType, Serialize, Deserialize, Debug)]
#[derive(CandidType, Serialize, Deserialize, Clone, Debug)]
pub struct FileAdded {
pub file_id: FileId,
pub hash: Hash,
Expand Down
Loading