From bda500bc4afd4ac8c5febebc980c77e02347d464 Mon Sep 17 00:00:00 2001 From: Julien Loudet Date: Thu, 3 Oct 2024 11:34:14 +0200 Subject: [PATCH] refactor(storage-manager): optimise insertion during Replication Before this commit, the Replication Log was searched twice when aligning an Event: once while calling `lookup` and a second time when calling `insert_event`. To reduce the number of searches to a single one, the method `insert_event_unchecked` was introduced. This function assumes that the Replication Log contains no Event with the same key expression as the Event it will insert. Leveraging this, the calls to `LogLatest::lookup` followed by `LogLatest::insert_event` were replaced with calls to `LogLatest::remove_older` followed by `LogLatest::insert_event_unchecked`. Indeed, if the call to `LogLatest::remove_older` yields either `NotFound` or `RemovedOlder` then the Replication Log does not contain any event with the key expression, and `LogLatest::insert_event_unchecked` can be confidently called. Another consequence of the introduction of the new method `insert_event_unchecked` is that the variant `EventInsertion::NotInsertedAsOutOfBound` was no longer used and was thus removed. * plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_reply.rs: Replaced the calls to `lookup` followed by `insert_event` to calls to `remove_older` followed by `insert_event_unchecked` in the methods `process_event_metadata` and `process_event_retrieval`. * plugins/zenoh-plugin-storage-manager/src/replication/log.rs: - Removed the no longer used `EventInsertion::NotInsertedAsOutOfBound` variant. - Added a comment to clearly indicate that the method `insert_event` will remove older Event. - Introduced the new method `insert_event_unchecked` that inserts an Event in the Replication Log without checking if it is the only Event (i.e. it assumes it is the case). Signed-off-by: Julien Loudet --- .../src/replication/core/aligner_reply.rs | 56 +++++++++---------- .../src/replication/log.rs | 43 ++++++++++---- 2 files changed, 61 insertions(+), 38 deletions(-) diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_reply.rs b/plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_reply.rs index 6e2350766..566b01f33 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_reply.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_reply.rs @@ -23,7 +23,7 @@ use zenoh::{ use zenoh_backend_traits::StorageInsertionResult; use crate::replication::{ - classification::{IntervalIdx, SubIntervalIdx}, + classification::{EventRemoval, IntervalIdx, SubIntervalIdx}, core::{aligner_key_expr_formatter, aligner_query::AlignmentQuery, Replication}, digest::Fingerprint, log::EventMetadata, @@ -254,32 +254,31 @@ impl Replication { } SampleKind::Delete => { let mut replication_log_guard = self.replication_log.write().await; - if let Some(latest_event) = - replication_log_guard.lookup(&replica_event.stripped_key) + match replication_log_guard + .remove_older(&replica_event.stripped_key, &replica_event.timestamp) { - if latest_event.timestamp >= replica_event.timestamp { - return None; + EventRemoval::NotFound => {} + EventRemoval::KeptNewer => return None, + EventRemoval::RemovedOlder(older_event) => { + if older_event.action == SampleKind::Put { + // NOTE: In some of our backend implementation, a deletion on a + // non-existing key will return an error. Given that we cannot + // distinguish an error from a missing key, we will assume + // the latter and move forward. + // + // FIXME: Once the behaviour described above is fixed, check for + // errors. + let _ = self + .storage + .lock() + .await + .delete(replica_event.stripped_key.clone(), replica_event.timestamp) + .await; + } } } - if matches!( - self.storage - .lock() - .await - .delete(replica_event.stripped_key.clone(), replica_event.timestamp) - .await, - // NOTE: In some of our backend implementation, a deletion on a - // non-existing key will return an error. Given that we cannot - // distinguish an error from a missing key, we will assume - // the latter and move forward. - // - // FIXME: Once the behaviour described above is fixed, check for - // errors. - Ok(StorageInsertionResult::Outdated) - ) { - return None; - } - replication_log_guard.insert_event(replica_event.clone().into()); + replication_log_guard.insert_event_unchecked(replica_event.clone().into()); } } @@ -313,10 +312,11 @@ impl Replication { } let mut replication_log_guard = self.replication_log.write().await; - if let Some(latest_event) = replication_log_guard.lookup(&replica_event.stripped_key) { - if latest_event.timestamp >= replica_event.timestamp { - return; - } + match replication_log_guard + .remove_older(&replica_event.stripped_key, &replica_event.timestamp) + { + EventRemoval::KeptNewer => return, + EventRemoval::RemovedOlder(_) | EventRemoval::NotFound => {} } // NOTE: This code can only be called with `action` set to `delete` on an initial @@ -342,6 +342,6 @@ impl Replication { return; } - replication_log_guard.insert_event(replica_event.into()); + replication_log_guard.insert_event_unchecked(replica_event.into()); } } diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/log.rs b/plugins/zenoh-plugin-storage-manager/src/replication/log.rs index c5864a69f..a757bce18 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/log.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/log.rs @@ -142,7 +142,6 @@ pub enum EventInsertion { New(Event), ReplacedOlder(Event), NotInsertedAsOlder, - NotInsertedAsOutOfBound, } /// The `LogLatest` keeps track of the last publication that happened on a key expression. @@ -215,6 +214,9 @@ impl LogLatest { /// Attempts to insert the provided [Event] in the replication log and return the [Insertion] /// outcome. /// + /// This method will first go through the Replication Log to determine if the provided [Event] + /// is indeed newer. If not then this method will do nothing. + /// /// # Caveat: out of bound /// /// This method will record an error in the Zenoh log if the timestamp associated with the @@ -228,15 +230,38 @@ impl LogLatest { EventRemoval::NotFound => EventInsertion::New(event.clone()), }; - let (interval_idx, sub_interval_idx) = match self + self.insert_event_unchecked(event); + + event_insertion + } + + /// Inserts the provided [Event] in the replication log *without checking if there is another + /// [Event] with the same key expression*. + /// + /// ⚠️ This method is meant to be used *after having called [remove_older] and processed its + /// result*, ensuring that the provided event is indeed more recent and the only one for + /// that key expression. + /// + /// This method will first go through the Replication Log to determine if the provided [Event] + /// is indeed newer. If not then this method will do nothing. + /// + /// # Caveat: out of bound + /// + /// This method will record an error in the Zenoh log if the timestamp associated with the + /// [Event] is so far in the future that the index of its interval is higher than + /// [u64::MAX]. This should not happen unless a specially crafted [Event] is sent to this node + /// or if the internal clock of the host that produced it is (very) far in the future. + pub(crate) fn insert_event_unchecked(&mut self, event: Event) { + let Ok((interval_idx, sub_interval_idx)) = self .configuration .get_time_classification(event.timestamp()) - { - Ok((interval_idx, sub_interval_idx)) => (interval_idx, sub_interval_idx), - Err(e) => { - tracing::error!("{e:?}"); - return EventInsertion::NotInsertedAsOutOfBound; - } + else { + tracing::error!( + "Fatal error: timestamp of Event < {:?} > is out of bounds: {}", + event.maybe_stripped_key, + event.timestamp + ); + return; }; self.bloom_filter_event.set(event.key_expr()); @@ -245,8 +270,6 @@ impl LogLatest { .entry(interval_idx) .or_default() .insert_unchecked(sub_interval_idx, event); - - event_insertion } /// Removes, if there is one, the previous event from the Replication Log for the provided key