Skip to content

Commit

Permalink
Revert "Remove events from being stored on the heap (#6753)" (#6755)
Browse files Browse the repository at this point in the history
  • Loading branch information
hpeebles authored Nov 7, 2024
1 parent bccb53b commit fd52754
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 18 deletions.
1 change: 0 additions & 1 deletion backend/canisters/community/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### Removed

- Remove code to migrate events to stable memory ([#6751](https://github.com/open-chat-labs/open-chat/pull/6751))
- Remove events from being stored on the heap ([#6753](https://github.com/open-chat-labs/open-chat/pull/6753))

## [[2.0.1431](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1431-community)] - 2024-11-06

Expand Down
4 changes: 0 additions & 4 deletions backend/canisters/group/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- Fix case where some thread messages were not updated in stable memory ([#6736](https://github.com/open-chat-labs/open-chat/pull/6736))
- Perform cycles check when migrating events to stable memory ([#6751](https://github.com/open-chat-labs/open-chat/pull/6751))

### Removed

- Remove events from being stored on the heap ([#6753](https://github.com/open-chat-labs/open-chat/pull/6753))

### Fixed

- Fix migrating to stable memory for chats with disappearing messages ([#6746](https://github.com/open-chat-labs/open-chat/pull/6746))
Expand Down
4 changes: 0 additions & 4 deletions backend/canisters/user/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- Read events from stable memory once migration is complete ([#6722](https://github.com/open-chat-labs/open-chat/pull/6722))
- Perform cycles check when migrating events to stable memory ([#6751](https://github.com/open-chat-labs/open-chat/pull/6751))

### Removed

- Remove events from being stored on the heap ([#6753](https://github.com/open-chat-labs/open-chat/pull/6753))

## [[2.0.1412](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1414-user)] - 2024-10-24

### Added
Expand Down
59 changes: 58 additions & 1 deletion backend/libraries/chat_events/src/chat_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,16 @@ pub struct ChatEvents {
video_call_in_progress: Timestamped<Option<VideoCall>>,
anonymized_id: String,
search_index: SearchIndex,
#[serde(default = "default_next_event_to_migrate_to_stable_memory")]
next_event_to_migrate_to_stable_memory: Option<EventContext>,
#[serde(default)]
thread_messages_to_update_in_stable_memory: Vec<MessageIndex>,
}

fn default_next_event_to_migrate_to_stable_memory() -> Option<EventContext> {
Some(EventContext::default())
}

impl ChatEvents {
pub fn update_event_in_stable_memory(&mut self, event_key: EventKey) {
self.main.update_event_in_stable_memory(event_key);
Expand All @@ -77,6 +83,7 @@ impl ChatEvents {
}

pub fn migrate_next_batch_of_events_to_stable_storage(&mut self) -> bool {
let mut total_count = 0;
while !self.thread_messages_to_update_in_stable_memory.is_empty() {
if ic_cdk::api::instruction_counter() > 1_000_000_000 {
return false;
Expand All @@ -92,8 +99,58 @@ impl ChatEvents {
self.update_event_in_stable_memory(message_index.into());
}
info!(chat = ?self.chat, count, "Updated threads in stable memory");
total_count += count;
}

if self.next_event_to_migrate_to_stable_memory.is_none() {
return true;
};

while ic_cdk::api::instruction_counter() < 1_000_000_000 {
let EventContext {
thread_root_message_index: next_thread_root_message_index,
event_index: next_event_index,
} = self.next_event_to_migrate_to_stable_memory.clone().unwrap();

let (thread_root_message_index, events_list) = if let Some(message_index) = next_thread_root_message_index {
if let Some((index, next)) = self.threads.range_mut(message_index..).next() {
(Some(*index), next)
} else {
self.next_event_to_migrate_to_stable_memory = None;
self.main.set_read_events_from_stable_memory(true);
for events_list in self.threads.values_mut() {
events_list.set_read_events_from_stable_memory(true);
}
info!(chat = ?self.chat, total_count, "Finished migrating events to stable memory");
return true;
}
} else {
(None, &mut self.main)
};

let (count, next_event_index) = events_list.migrate_events_to_stable_memory(next_event_index, 100);
if let Some(event_index) = next_event_index {
self.next_event_to_migrate_to_stable_memory = Some(EventContext {
thread_root_message_index,
event_index,
});
} else {
self.next_event_to_migrate_to_stable_memory = Some(EventContext {
thread_root_message_index: Some(thread_root_message_index.map_or(MessageIndex::default(), |m| m.incr())),
event_index: EventIndex::default(),
});
}
total_count += count;
}
if total_count > 0 {
info!(
chat = ?self.chat,
count = total_count,
next = ?self.next_event_to_migrate_to_stable_memory,
"Migrated batch of events to stable memory"
);
}
self.next_event_to_migrate_to_stable_memory.is_none()
false
}

pub fn new_direct_chat(
Expand Down
47 changes: 39 additions & 8 deletions backend/libraries/chat_events/src/chat_events_list.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::last_updated_timestamps::LastUpdatedTimestamps;
use crate::stable_storage::ChatEventsStableStorage;
use crate::{ChatEventInternal, ChatInternal, EventKey, EventOrExpiredRangeInternal, EventsMap, MessageInternal};
use crate::{
ChatEventInternal, ChatEventsMap, ChatInternal, EventKey, EventOrExpiredRangeInternal, EventsMap, MessageInternal,
};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::collections::hash_map::Entry::Vacant;
Expand All @@ -13,6 +15,7 @@ use types::{

#[derive(Serialize, Deserialize)]
pub struct ChatEventsList {
events_map: ChatEventsMap,
stable_events_map: ChatEventsStableStorage,
message_id_map: HashMap<MessageId, EventIndex>,
message_event_indexes: Vec<EventIndex>,
Expand All @@ -24,7 +27,7 @@ pub struct ChatEventsList {
impl ChatEventsList {
pub fn update_event_in_stable_memory(&mut self, event_key: EventKey) {
if let Some(event_index) = self.event_index(event_key) {
if let Some(event) = self.stable_events_map.get(event_index) {
if let Some(event) = self.events_map.get(event_index) {
self.stable_events_map.insert(event);
}
}
Expand All @@ -38,8 +41,24 @@ impl ChatEventsList {
self.read_events_from_stable_memory = value;
}

pub fn migrate_events_to_stable_memory(&mut self, start: EventIndex, max_events: usize) -> (usize, Option<EventIndex>) {
let mut count = 0;
let mut next_event_index = start;
for event in self.events_map.range(start..).take(max_events) {
count += 1;
next_event_index = event.index.incr();
self.stable_events_map.insert(event);
}
if count < max_events || Some(next_event_index) > self.latest_event_index {
(count, None)
} else {
(count, Some(next_event_index))
}
}

pub fn new(chat: Chat, thread_root_message_index: Option<MessageIndex>) -> Self {
ChatEventsList {
events_map: ChatEventsMap::default(),
stable_events_map: ChatEventsStableStorage::new(chat, thread_root_message_index),
message_id_map: HashMap::new(),
message_event_indexes: Vec::new(),
Expand Down Expand Up @@ -73,7 +92,8 @@ impl ChatEventsList {
expires_at,
event,
};
self.stable_events_map.insert(event_wrapper);
self.stable_events_map.insert(event_wrapper.clone());
self.events_map.insert(event_wrapper);

self.latest_event_index = Some(event_index);
self.latest_event_timestamp = Some(now);
Expand Down Expand Up @@ -113,7 +133,8 @@ impl ChatEventsList {
if let Some(mut event) = self.get_event(event_key, EventIndex::default()) {
update_event_fn(&mut event).map(|result| {
let event_index = event.index;
self.stable_events_map.insert(event);
self.stable_events_map.insert(event.clone());
self.events_map.insert(event);
(result, event_index)
})
} else {
Expand Down Expand Up @@ -214,7 +235,8 @@ impl ChatEventsList {

let updated_indexes = updated.iter().map(|e| e.index).collect();
for event in updated {
self.stable_events_map.insert(event);
self.stable_events_map.insert(event.clone());
self.events_map.insert(event);
}
updated_indexes
}
Expand All @@ -228,7 +250,8 @@ impl ChatEventsList {
}

pub fn remove(&mut self, event_index: EventIndex) -> Option<EventWrapperInternal<ChatEventInternal>> {
self.stable_events_map.remove(event_index)
self.stable_events_map.remove(event_index);
self.events_map.remove(event_index)
}

pub fn latest_event_index(&self) -> Option<EventIndex> {
Expand Down Expand Up @@ -286,14 +309,22 @@ impl ChatEventsList {
}

fn iter_internal(&self) -> Box<dyn DoubleEndedIterator<Item = EventWrapperInternal<ChatEventInternal>> + '_> {
self.stable_events_map.iter()
if self.read_events_from_stable_memory {
self.stable_events_map.iter()
} else {
self.events_map.iter()
}
}

fn range_internal<R: RangeBounds<EventIndex>>(
&self,
range: R,
) -> Box<dyn DoubleEndedIterator<Item = EventWrapperInternal<ChatEventInternal>> + '_> {
self.stable_events_map.range(range)
if self.read_events_from_stable_memory {
self.stable_events_map.range(range)
} else {
self.events_map.range(range)
}
}
}

Expand Down

0 comments on commit fd52754

Please sign in to comment.