Skip to content

Commit

Permalink
chore: update tests and minor refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
gk-kindred committed Nov 25, 2024
1 parent c51bc44 commit 8128e64
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 98 deletions.
2 changes: 1 addition & 1 deletion examples/certifier_kafka_pg/examples/certifier_kafka_pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async fn main() -> Result<(), impl std::error::Error> {
let suffix_config = Some(SuffixConfig {
capacity: 400_000,
prune_start_threshold: Some(300_000),
min_size_after_prune: Some(250_000),
min_size_after_prune: Some(150_000),
});

let configuration = Configuration {
Expand Down
61 changes: 11 additions & 50 deletions packages/talos_certifier/src/services/certifier_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
use std::time::Instant;

use async_trait::async_trait;
use log::{debug, error, warn};
Expand Down Expand Up @@ -32,8 +31,6 @@ pub struct CertifierService {
pub commit_offset: Arc<AtomicI64>,
pub decision_outbox_tx: mpsc::Sender<DecisionOutboxChannelMessage>,
pub config: CertifierServiceConfig,
// pub total_prune: (u32, u128),
// pub total_decisions: (u32, u128, Instant),
}

impl CertifierService {
Expand All @@ -58,8 +55,6 @@ impl CertifierService {
decision_outbox_tx,
commit_offset,
config,
// total_decisions: (0, 0, Instant::now()),
// total_prune: (0, 0),
}
}

Expand Down Expand Up @@ -101,8 +96,6 @@ impl CertifierService {
decision_version, decision_message
);

// let start_decision_timing = Instant::now();

// Reserve space if version is beyond the suffix capacity
//
// Applicable in scenarios where certifier starts from a committed version
Expand All @@ -129,21 +122,17 @@ impl CertifierService {
self.suffix.update_prune_index(candidate_version_index);
}

let old_prune_index = self.suffix.get_meta().prune_index;
let old_suffix_len = self.suffix.suffix_length();
let old_head = self.suffix.get_meta().head;
let prune_index_before_pruning = self.suffix.get_meta().prune_index;

let mut old_prune_index_vers = None;
if old_prune_index.is_some() {
if let Some(item) = self.suffix.messages[old_prune_index.unwrap()].clone() {
old_prune_index_vers = Some(item.item_ver);
let mut prune_candidate_version = None;
if prune_index_before_pruning.is_some() {
if let Some(item) = self.suffix.messages[prune_index_before_pruning.unwrap()].clone() {
prune_candidate_version = Some(item.item_ver);
}
};

// prune suffix if required?
if let Some(prune_index) = self.suffix.get_safe_prune_index() {
let start_ms = Instant::now();

let pruned_suffix_items = self.suffix.prune_till_index(prune_index).unwrap();

let pruned_items = get_nonempty_suffix_items(pruned_suffix_items.iter());
Expand All @@ -152,32 +141,16 @@ impl CertifierService {
Certifier::prune_set(&mut self.certifier.reads, &readset);
Certifier::prune_set(&mut self.certifier.writes, &writeset);

let new_suffix_length = self.suffix.suffix_length();

// prune_index returned from `get_safe_prune_index` can be a lower index due to suffix.meta.min_size_after_prune value.
// Although we prune only till this new/lower prune_index, we know everything till the previous prune_index was already decided
// and is therefore safe to update the prune_index version deriving from the corresponding version
if let Some(old_vers) = old_prune_index_vers {
let new_prune_index = self.suffix.index_from_head(old_vers);
warn!("| old_prune_vers as index in new = {:?}", new_prune_index);
// and is therefore safe to update the prune_index version deriving from the corresponding version.
//
// This a tiny optimisation which helps in places where a slice is build using the prune_index as the
// start or end of the slice boundary.
if let Some(prune_vers) = prune_candidate_version {
let new_prune_index = self.suffix.index_from_head(prune_vers);
self.suffix.update_prune_index(new_prune_index);
}

let mut new_prune_index_vers = None;
if let Some(new_prune_index) = self.suffix.meta.prune_index {
if let Some(item) = self.suffix.messages[new_prune_index].clone() {
new_prune_index_vers = Some(item.item_ver)
}
};

warn!(
"++ Pruned the suffix in {:?}ms.. \n
| old_suffix_length = {old_suffix_len:?} | old prune_index = {old_prune_index:?} | old prune_index_vers = {old_prune_index_vers:?} | old head = {old_head:?}\n
| new_suffix_length = {new_suffix_length:?} | safe prune_index = {prune_index:?} | new prune_index_vers = {:?} | current head = {}",
start_ms.elapsed().as_millis(),
new_prune_index_vers,
self.suffix.get_meta().head,
);
}
// remove sets from certifier if pruning?

Expand All @@ -189,18 +162,6 @@ impl CertifierService {
}
}

// self.total_decisions.0 += 1;
// self.total_decisions.1 += start_decision_timing.elapsed().as_millis();

// let time_diff = start_decision_timing.duration_since(self.total_decisions.2.clone());
// log::warn!(
// "### Total time taken for {} decisions = {} ms, duration since last = {}ms",
// self.total_decisions.0,
// self.total_decisions.1,
// time_diff.as_millis()
// );
// self.total_decisions.2 = start_decision_timing;

Ok(())
}

Expand Down
60 changes: 13 additions & 47 deletions packages/talos_suffix/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,9 @@ mod suffix_tests {
// prune suffix
let result = sfx.prune_till_index(24).unwrap();
// new length of suffix after pruning.
assert_eq!(sfx.messages.len(), 5);
assert_eq!(result.len(), 25); // result.len() + sfx.messages.len() = 30
assert_eq!(sfx.meta.head, 25);
assert_eq!(sfx.messages.len(), 6);
assert_eq!(result.len(), 24); // result.len() + sfx.messages.len() = 30
assert_eq!(sfx.meta.head, 24);
assert_eq!(sfx.meta.prune_index, None);
}

Expand Down Expand Up @@ -253,10 +253,15 @@ mod suffix_tests {

assert_eq!(sfx.meta.prune_index, Some(20)); // prune_index updated till version 21 (index 20)
assert!(sfx.meta.prune_index.le(&sfx.meta.prune_start_threshold));
assert_eq!(sfx.get_safe_prune_index(), Some(20)); // prune_index is 21 which is above the prune_start_threshold of 20.
assert_eq!(sfx.get_safe_prune_index(), Some(20));
let _ = sfx.prune_till_index(sfx.meta.prune_index.unwrap());
assert_eq!(sfx.messages.len(), 11);

// assert_eq!(sfx.messages[0].unwrap().item_ver, 21);
assert_eq!(sfx.meta.head, 20);
}
#[test]
fn test_no_prune_after_min_size_after_prune_check_fail() {
fn test_prune_after_min_size_after_prune_check_pass() {
let mut sfx: Suffix<MockSuffixItemMessage> = Suffix::with_config(SuffixConfig {
capacity: 30,
prune_start_threshold: Some(20),
Expand All @@ -282,11 +287,11 @@ mod suffix_tests {
assert!(sfx.meta.prune_index.le(&sfx.meta.prune_start_threshold));
// Although prune_index moved till 20, because the min_size_after_prune is 15, and the suffix length is 31,
// there will only be 11 suffix items, therefore the min_size_after_prune criteria fails.
assert_eq!(sfx.get_safe_prune_index(), None);
assert_eq!(sfx.get_safe_prune_index(), Some(15));
}

#[test]
fn test_prune_after_min_size_after_prune_check_pass() {
fn test_no_prune_after_min_size_after_prune_check_fail() {
let mut sfx: Suffix<MockSuffixItemMessage> = Suffix::with_config(SuffixConfig {
capacity: 30,
prune_start_threshold: Some(20),
Expand All @@ -311,46 +316,7 @@ mod suffix_tests {
assert_eq!(sfx.meta.prune_index, Some(20)); // prune_index updated till version 20 (index 20)
assert!(sfx.meta.prune_index.le(&sfx.meta.prune_start_threshold));
// As min_size_after_prune is 5, prune_index is at 20 and suffix length is 31, it is safe to remove all the entries till prune_index
assert_eq!(sfx.get_safe_prune_index(), Some(20));
let _ = sfx.prune_till_index(sfx.meta.prune_index.unwrap());
assert_eq!(sfx.messages.len(), 10);

// assert_eq!(sfx.messages[0].unwrap().item_ver, 21);
assert_eq!(sfx.meta.head, 21);
}
#[test]
fn test_prune_after_min_size_after_prune_check_pass_2() {
let mut sfx: Suffix<MockSuffixItemMessage> = Suffix::with_config(SuffixConfig {
capacity: 300_000,
prune_start_threshold: Some(20_000),
min_size_after_prune: Some(5_000),
});

// insert suffix items
let ignore_vec: Vec<u64> = vec![20_003, 26_000, 27_000, 29_000];

let filtered_vec = filtered_versions_vec(0..30_001, Some(ignore_vec));

filtered_vec.iter().for_each(|&vers| {
sfx.insert(vers, create_mock_candidate_message(vers)).unwrap();
});

assert_eq!(sfx.messages.len(), 30_001);

filtered_vec.iter().for_each(|&vers| {
sfx.update_decision(vers, vers + 30_000).unwrap();
});

assert_eq!(sfx.meta.prune_index, Some(20_001)); // prune_index updated till version 20 (index 20)
// assert!(sfx.meta.prune_index.le(&sfx.meta.prune_start_threshold));
// As min_size_after_prune is 5, prune_index is at 20 and suffix length is 31, it is safe to remove all the entries till prune_index
assert_eq!(sfx.get_safe_prune_index(), Some(20_001));
// env_logger::init();
let _ = sfx.prune_till_index(sfx.meta.prune_index.unwrap());
assert_eq!(sfx.messages.len(), 9_999);

// assert_eq!(sfx.messages[0].unwrap().item_ver, 21);
assert_eq!(sfx.meta.head, 20_002);
assert_eq!(sfx.get_safe_prune_index(), None);
}

#[test]
Expand Down

0 comments on commit 8128e64

Please sign in to comment.