From daea93e5650f589c98e0d2d20171ad66c3d4c823 Mon Sep 17 00:00:00 2001 From: Gethyl Kurian Date: Wed, 27 Nov 2024 09:33:35 +1100 Subject: [PATCH] chore: update pruning of certifier read and write sets --- .../src/certifier/certification.rs | 14 +++++++-- .../src/services/certifier_service.rs | 31 ++++++++++++++++++- 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/packages/talos_certifier/src/certifier/certification.rs b/packages/talos_certifier/src/certifier/certification.rs index 53d4f092..e88aa164 100644 --- a/packages/talos_certifier/src/certifier/certification.rs +++ b/packages/talos_certifier/src/certifier/certification.rs @@ -160,9 +160,17 @@ impl Certifier { /// /// Removes if key is found and the version < version in the certifier. pub fn prune_set(self_items: &mut AHashMap, remove_items: &AHashMap) { - self_items.retain(|k, v| match remove_items.get(k) { - Some(read_value) => read_value > v, - None => true, + // self_items.retain(|k, v| match remove_items.get(k) { + // Some(read_value) => read_value > v, + // None => true, + // }); + + remove_items.iter().for_each(|(k, v)| { + if let Some(item) = self_items.get(k) { + if item > v { + self_items.remove(k); + } + }; }); } } diff --git a/packages/talos_certifier/src/services/certifier_service.rs b/packages/talos_certifier/src/services/certifier_service.rs index d19c2c13..50687193 100644 --- a/packages/talos_certifier/src/services/certifier_service.rs +++ b/packages/talos_certifier/src/services/certifier_service.rs @@ -1,5 +1,6 @@ use std::sync::atomic::AtomicI64; use std::sync::Arc; +use std::time::Instant; use async_trait::async_trait; use log::{debug, error, warn}; @@ -133,13 +134,41 @@ impl CertifierService { // prune suffix if required? if let Some(prune_index) = self.suffix.get_safe_prune_index() { + let start_suffix_ms = Instant::now(); let pruned_suffix_items = self.suffix.prune_till_index(prune_index).unwrap(); + let end_suffix_ms = start_suffix_ms.elapsed().as_micros(); let pruned_items = get_nonempty_suffix_items(pruned_suffix_items.iter()); - let (readset, writeset) = generate_certifier_sets_from_suffix(pruned_items); + let pruned_items: Vec<_> = pruned_items.collect(); + let prune_items_size = pruned_items.len(); + + let start_suffix_ms = Instant::now(); + + let (readset, writeset) = generate_certifier_sets_from_suffix(pruned_items.into_iter()); + let end_generate_sets_ms = start_suffix_ms.elapsed().as_micros(); + + let start_suffix_ms = Instant::now(); + let readset_size_before = self.certifier.reads.len(); Certifier::prune_set(&mut self.certifier.reads, &readset); + let readset_size_after = self.certifier.reads.len(); + + let end_readset_prune_ms = start_suffix_ms.elapsed().as_micros(); + + let start_suffix_ms = Instant::now(); + let writeset_size_before = self.certifier.writes.len(); Certifier::prune_set(&mut self.certifier.writes, &writeset); + let writeset_size_after = self.certifier.writes.len(); + let end_writeset_prune_ms = start_suffix_ms.elapsed().as_micros(); + + warn!("++ Prune suffix = {end_suffix_ms}µs | prune size = {prune_items_size}"); + warn!( + "~~ Generate set = {end_generate_sets_ms}µs | readset size = {} | writeset size = {}", + readset.len(), + writeset.len() + ); + warn!("-- prune readset = {end_readset_prune_ms}µs | before prune size = {readset_size_before} | after prune size = {readset_size_after}"); + warn!("-- prune writeset = {end_writeset_prune_ms}µs | before prune size = {writeset_size_before} | after prune size = {writeset_size_after}"); // prune_index returned from `get_safe_prune_index` can be a lower index due to suffix.meta.min_size_after_prune value. // Although we prune only till this new/lower prune_index, we know everything till the previous prune_index was already decided