diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_reply.rs b/plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_reply.rs index 20c82d0fb..6e2350766 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_reply.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_reply.rs @@ -207,62 +207,7 @@ impl Replication { } } AlignmentReply::Retrieval(replica_event) => { - tracing::trace!("Processing `AlignmentReply::Retrieval`"); - { - let span = tracing::Span::current(); - span.record( - "sample", - replica_event - .stripped_key - .as_ref() - .map_or("", |key| key.as_str()), - ); - span.record("t", replica_event.timestamp.to_string()); - } - - if self - .latest_updates - .read() - .await - .get(&replica_event.stripped_key) - .is_some_and(|latest_event| latest_event.timestamp >= replica_event.timestamp) - { - return; - } - - let mut replication_log_guard = self.replication_log.write().await; - if let Some(latest_event) = - replication_log_guard.lookup(&replica_event.stripped_key) - { - if latest_event.timestamp >= replica_event.timestamp { - return; - } - } - - // NOTE: This code can only be called with `action` set to `delete` on an initial - // alignment, in which case the Storage of the receiving Replica is empty => there - // is no need to actually call `storage.delete`. - // - // Outside of an initial alignment, the `delete` action will be performed at the - // step above, in `AlignmentReply::Events`. - if replica_event.action == SampleKind::Put - && matches!( - self.storage - .lock() - .await - .put( - replica_event.stripped_key.clone(), - sample.into(), - replica_event.timestamp, - ) - .await, - Ok(StorageInsertionResult::Outdated) | Err(_) - ) - { - return; - } - - replication_log_guard.insert_event(replica_event.into()); + self.process_event_retrieval(replica_event, sample).await; } } } @@ -340,4 +285,63 @@ impl Replication { Some(replica_event) } + + /// Processes the [EventMetadata] and [Sample] sent by the Replica, adding it to our Storage if + /// needed. + /// + /// # Special case: initial alignment + /// + /// Outside of the initial alignment, an [EventMetadata] with an action set to `Delete` will be + /// processed during the previous step, i.e. in the `AlignmentReply::EventsMetadata` as, as + /// explained there, we already have at that stage all the required information to perform the + /// deletion. + /// + /// That fact is true except for the initial alignment: the initial alignment bypasses all these + /// steps and the Replica goes straight to sending all its Replication Log and data in its + /// Storage. Including for the deleted events. + async fn process_event_retrieval(&self, replica_event: EventMetadata, sample: Sample) { + tracing::trace!("Processing `AlignmentReply::Retrieval`"); + + if self + .latest_updates + .read() + .await + .get(&replica_event.stripped_key) + .is_some_and(|latest_event| latest_event.timestamp >= replica_event.timestamp) + { + return; + } + + let mut replication_log_guard = self.replication_log.write().await; + if let Some(latest_event) = replication_log_guard.lookup(&replica_event.stripped_key) { + if latest_event.timestamp >= replica_event.timestamp { + return; + } + } + + // NOTE: This code can only be called with `action` set to `delete` on an initial + // alignment, in which case the Storage of the receiving Replica is empty => there + // is no need to actually call `storage.delete`. + // + // Outside of an initial alignment, the `delete` action will be performed at the + // step above, in `AlignmentReply::EventsMetadata`. + if replica_event.action == SampleKind::Put + && matches!( + self.storage + .lock() + .await + .put( + replica_event.stripped_key.clone(), + sample.into(), + replica_event.timestamp, + ) + .await, + Ok(StorageInsertionResult::Outdated) | Err(_) + ) + { + return; + } + + replication_log_guard.insert_event(replica_event.into()); + } }