Skip to content

Commit

Permalink
refactor(storage-manager): optimise insertion during Replication
Browse files Browse the repository at this point in the history
Before this commit, the Replication Log was searched twice when aligning
an Event: once while calling `lookup` and a second time when calling
`insert_event`.

To reduce the number of searches to a single one, the method
`insert_event_unchecked` was introduced. This function assumes that the
Replication Log contains no Event with the same key expression as the
Event it will insert.

Leveraging this, the calls to `LogLatest::lookup` followed by
`LogLatest::insert_event` were replaced with calls to
`LogLatest::remove_older` followed by
`LogLatest::insert_event_unchecked`.

Indeed, if the call to `LogLatest::remove_older` yields either
`NotFound` or `RemovedOlder` then the Replication Log does not contain
any event with the key expression, and
`LogLatest::insert_event_unchecked` can be confidently called.

Another consequence of the introduction of the new method
`insert_event_unchecked` is that the variant
`EventInsertion::NotInsertedAsOutOfBound` was no longer used and was
thus removed.

* plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_reply.rs:
  Replaced the calls to `lookup` followed by `insert_event` to calls
  to `remove_older` followed by `insert_event_unchecked` in the
  methods `process_event_metadata` and `process_event_retrieval`.

* plugins/zenoh-plugin-storage-manager/src/replication/log.rs:
  - Removed the no longer used `EventInsertion::NotInsertedAsOutOfBound`
    variant.
  - Added a comment to clearly indicate that the method `insert_event`
    will remove older Event.
  - Introduced the new method `insert_event_unchecked` that inserts an
    Event in the Replication Log without checking if it is the only
    Event (i.e. it assumes it is the case).

Signed-off-by: Julien Loudet <[email protected]>
  • Loading branch information
J-Loudet committed Oct 3, 2024
1 parent bf81bca commit bda500b
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use zenoh::{
use zenoh_backend_traits::StorageInsertionResult;

use crate::replication::{
classification::{IntervalIdx, SubIntervalIdx},
classification::{EventRemoval, IntervalIdx, SubIntervalIdx},
core::{aligner_key_expr_formatter, aligner_query::AlignmentQuery, Replication},
digest::Fingerprint,
log::EventMetadata,
Expand Down Expand Up @@ -254,32 +254,31 @@ impl Replication {
}
SampleKind::Delete => {
let mut replication_log_guard = self.replication_log.write().await;
if let Some(latest_event) =
replication_log_guard.lookup(&replica_event.stripped_key)
match replication_log_guard
.remove_older(&replica_event.stripped_key, &replica_event.timestamp)
{
if latest_event.timestamp >= replica_event.timestamp {
return None;
EventRemoval::NotFound => {}
EventRemoval::KeptNewer => return None,
EventRemoval::RemovedOlder(older_event) => {
if older_event.action == SampleKind::Put {
// NOTE: In some of our backend implementation, a deletion on a
// non-existing key will return an error. Given that we cannot
// distinguish an error from a missing key, we will assume
// the latter and move forward.
//
// FIXME: Once the behaviour described above is fixed, check for
// errors.
let _ = self
.storage
.lock()
.await
.delete(replica_event.stripped_key.clone(), replica_event.timestamp)
.await;
}
}
}
if matches!(
self.storage
.lock()
.await
.delete(replica_event.stripped_key.clone(), replica_event.timestamp)
.await,
// NOTE: In some of our backend implementation, a deletion on a
// non-existing key will return an error. Given that we cannot
// distinguish an error from a missing key, we will assume
// the latter and move forward.
//
// FIXME: Once the behaviour described above is fixed, check for
// errors.
Ok(StorageInsertionResult::Outdated)
) {
return None;
}

replication_log_guard.insert_event(replica_event.clone().into());
replication_log_guard.insert_event_unchecked(replica_event.clone().into());
}
}

Expand Down Expand Up @@ -313,10 +312,11 @@ impl Replication {
}

let mut replication_log_guard = self.replication_log.write().await;
if let Some(latest_event) = replication_log_guard.lookup(&replica_event.stripped_key) {
if latest_event.timestamp >= replica_event.timestamp {
return;
}
match replication_log_guard
.remove_older(&replica_event.stripped_key, &replica_event.timestamp)
{
EventRemoval::KeptNewer => return,
EventRemoval::RemovedOlder(_) | EventRemoval::NotFound => {}
}

// NOTE: This code can only be called with `action` set to `delete` on an initial
Expand All @@ -342,6 +342,6 @@ impl Replication {
return;
}

replication_log_guard.insert_event(replica_event.into());
replication_log_guard.insert_event_unchecked(replica_event.into());
}
}
43 changes: 33 additions & 10 deletions plugins/zenoh-plugin-storage-manager/src/replication/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ pub enum EventInsertion {
New(Event),
ReplacedOlder(Event),
NotInsertedAsOlder,
NotInsertedAsOutOfBound,
}

/// The `LogLatest` keeps track of the last publication that happened on a key expression.
Expand Down Expand Up @@ -215,6 +214,9 @@ impl LogLatest {
/// Attempts to insert the provided [Event] in the replication log and return the [Insertion]
/// outcome.
///
/// This method will first go through the Replication Log to determine if the provided [Event]
/// is indeed newer. If not then this method will do nothing.
///
/// # Caveat: out of bound
///
/// This method will record an error in the Zenoh log if the timestamp associated with the
Expand All @@ -228,15 +230,38 @@ impl LogLatest {
EventRemoval::NotFound => EventInsertion::New(event.clone()),
};

let (interval_idx, sub_interval_idx) = match self
self.insert_event_unchecked(event);

event_insertion
}

/// Inserts the provided [Event] in the replication log *without checking if there is another
/// [Event] with the same key expression*.
///
/// ⚠️ This method is meant to be used *after having called [remove_older] and processed its
/// result*, ensuring that the provided event is indeed more recent and the only one for
/// that key expression.
///
/// This method will first go through the Replication Log to determine if the provided [Event]
/// is indeed newer. If not then this method will do nothing.
///
/// # Caveat: out of bound
///
/// This method will record an error in the Zenoh log if the timestamp associated with the
/// [Event] is so far in the future that the index of its interval is higher than
/// [u64::MAX]. This should not happen unless a specially crafted [Event] is sent to this node
/// or if the internal clock of the host that produced it is (very) far in the future.
pub(crate) fn insert_event_unchecked(&mut self, event: Event) {
let Ok((interval_idx, sub_interval_idx)) = self
.configuration
.get_time_classification(event.timestamp())
{
Ok((interval_idx, sub_interval_idx)) => (interval_idx, sub_interval_idx),
Err(e) => {
tracing::error!("{e:?}");
return EventInsertion::NotInsertedAsOutOfBound;
}
else {
tracing::error!(
"Fatal error: timestamp of Event < {:?} > is out of bounds: {}",
event.maybe_stripped_key,
event.timestamp
);
return;
};

self.bloom_filter_event.set(event.key_expr());
Expand All @@ -245,8 +270,6 @@ impl LogLatest {
.entry(interval_idx)
.or_default()
.insert_unchecked(sub_interval_idx, event);

event_insertion
}

/// Removes, if there is one, the previous event from the Replication Log for the provided key
Expand Down

0 comments on commit bda500b

Please sign in to comment.