Skip to content

Commit

Permalink
refactor(storage-manager): separate processing of Retrieval
Browse files Browse the repository at this point in the history
This commit moves, for clarity purposes, the logic to process an
`Retrieval` in a separate method: `process_event_retrieval`.

This notably removes indentation levels and makes the code more
readable.

* plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_reply.rs:
  move the logic to process a `Retrieval` in a separate method
  `process_event_retrieval`.

Signed-off-by: Julien Loudet <[email protected]>
  • Loading branch information
J-Loudet committed Oct 3, 2024
1 parent 8534dc6 commit bf81bca
Showing 1 changed file with 60 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,62 +207,7 @@ impl Replication {
}
}
AlignmentReply::Retrieval(replica_event) => {
tracing::trace!("Processing `AlignmentReply::Retrieval`");
{
let span = tracing::Span::current();
span.record(
"sample",
replica_event
.stripped_key
.as_ref()
.map_or("", |key| key.as_str()),
);
span.record("t", replica_event.timestamp.to_string());
}

if self
.latest_updates
.read()
.await
.get(&replica_event.stripped_key)
.is_some_and(|latest_event| latest_event.timestamp >= replica_event.timestamp)
{
return;
}

let mut replication_log_guard = self.replication_log.write().await;
if let Some(latest_event) =
replication_log_guard.lookup(&replica_event.stripped_key)
{
if latest_event.timestamp >= replica_event.timestamp {
return;
}
}

// NOTE: This code can only be called with `action` set to `delete` on an initial
// alignment, in which case the Storage of the receiving Replica is empty => there
// is no need to actually call `storage.delete`.
//
// Outside of an initial alignment, the `delete` action will be performed at the
// step above, in `AlignmentReply::Events`.
if replica_event.action == SampleKind::Put
&& matches!(
self.storage
.lock()
.await
.put(
replica_event.stripped_key.clone(),
sample.into(),
replica_event.timestamp,
)
.await,
Ok(StorageInsertionResult::Outdated) | Err(_)
)
{
return;
}

replication_log_guard.insert_event(replica_event.into());
self.process_event_retrieval(replica_event, sample).await;
}
}
}
Expand Down Expand Up @@ -340,4 +285,63 @@ impl Replication {

Some(replica_event)
}

/// Processes the [EventMetadata] and [Sample] sent by the Replica, adding it to our Storage if
/// needed.
///
/// # Special case: initial alignment
///
/// Outside of the initial alignment, an [EventMetadata] with an action set to `Delete` will be
/// processed during the previous step, i.e. in the `AlignmentReply::EventsMetadata` as, as
/// explained there, we already have at that stage all the required information to perform the
/// deletion.
///
/// That fact is true except for the initial alignment: the initial alignment bypasses all these
/// steps and the Replica goes straight to sending all its Replication Log and data in its
/// Storage. Including for the deleted events.
async fn process_event_retrieval(&self, replica_event: EventMetadata, sample: Sample) {
tracing::trace!("Processing `AlignmentReply::Retrieval`");

if self
.latest_updates
.read()
.await
.get(&replica_event.stripped_key)
.is_some_and(|latest_event| latest_event.timestamp >= replica_event.timestamp)
{
return;
}

let mut replication_log_guard = self.replication_log.write().await;
if let Some(latest_event) = replication_log_guard.lookup(&replica_event.stripped_key) {
if latest_event.timestamp >= replica_event.timestamp {
return;
}
}

// NOTE: This code can only be called with `action` set to `delete` on an initial
// alignment, in which case the Storage of the receiving Replica is empty => there
// is no need to actually call `storage.delete`.
//
// Outside of an initial alignment, the `delete` action will be performed at the
// step above, in `AlignmentReply::EventsMetadata`.
if replica_event.action == SampleKind::Put
&& matches!(
self.storage
.lock()
.await
.put(
replica_event.stripped_key.clone(),
sample.into(),
replica_event.timestamp,
)
.await,
Ok(StorageInsertionResult::Outdated) | Err(_)
)
{
return;
}

replication_log_guard.insert_event(replica_event.into());
}
}

0 comments on commit bf81bca

Please sign in to comment.