Skip to content

Commit

Permalink
feat: perf optimisations for certifier (#109)
Browse files Browse the repository at this point in the history
* chore: optimise safe prune check

* chore: add timing in decision arm of certifier service

* chore: check decision channel capacity

* chore: talos-certifier accept the db pool size from env

* fix: setting of pool size in pg.rs

* chore: log the pruning of suffix

* chore: update the calculation of safe_prune_index and update the unit tests for suffix

* chore: revert the safe_prune_index calculation

* chore: optimised the updation of prune_index after pruning to reduce redundant computations

* chore: update tests and minor refactor

* chore: update pruning of certifier read and write sets

* chore: revert to use retain for pruning suffix

* chore: remove unused import

* chore: minor refactor
  • Loading branch information
gk-kindred authored Nov 27, 2024
1 parent 4d5b80c commit 2761e0b
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 14 deletions.
2 changes: 1 addition & 1 deletion examples/certifier_kafka_pg/examples/certifier_kafka_pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async fn main() -> Result<(), impl std::error::Error> {
let suffix_config = Some(SuffixConfig {
capacity: 400_000,
prune_start_threshold: Some(300_000),
min_size_after_prune: Some(250_000),
min_size_after_prune: Some(150_000),
});

let configuration = Configuration {
Expand Down
20 changes: 19 additions & 1 deletion packages/talos_certifier/src/services/certifier_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,33 @@ impl CertifierService {
self.suffix.update_prune_index(candidate_version_index);
}

let mut prune_candidate_version = None;
if let Some(prune_index_before_pruning) = self.suffix.get_meta().prune_index {
if let Some(Some(item)) = self.suffix.messages.get(prune_index_before_pruning) {
prune_candidate_version = Some(item.item_ver);
}
};

// prune suffix if required?
if let Some(prune_index) = self.suffix.get_safe_prune_index() {
let pruned_suffix_items = self.suffix.prune_till_index(prune_index).unwrap();

let pruned_items = get_nonempty_suffix_items(pruned_suffix_items.iter());

let (readset, writeset) = generate_certifier_sets_from_suffix(pruned_items);

Certifier::prune_set(&mut self.certifier.reads, &readset);
Certifier::prune_set(&mut self.certifier.writes, &writeset);

// prune_index returned from `get_safe_prune_index` can be a lower index due to suffix.meta.min_size_after_prune value.
// Although we prune only till this new/lower prune_index, we know everything till the previous prune_index was already decided
// and is therefore safe to update the prune_index version deriving from the corresponding version.
//
// This a tiny optimisation which helps in places where a slice is build using the prune_index as the
// start or end of the slice boundary.
if let Some(prune_vers) = prune_candidate_version {
let new_prune_index = self.suffix.index_from_head(prune_vers);
self.suffix.update_prune_index(new_prune_index);
}
}
// remove sets from certifier if pruning?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ impl SystemService for DecisionOutboxService {
}
}
Err(db_error) => {
error!("Error saving decision to XDB with reason={:?}", db_error.to_string());
system.system_notifier.send(SystemMessage::ShutdownWithError(db_error)).unwrap();
}
};
Expand Down
7 changes: 6 additions & 1 deletion packages/talos_certifier_adapters/src/postgres/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use talos_common_utils::env_var;
use log::debug;
use talos_common_utils::{env_var, env_var_with_defaults};

#[derive(Debug, Clone)]
pub struct PgConfig {
Expand All @@ -7,16 +8,20 @@ pub struct PgConfig {
pub host: String,
pub port: String,
pub database: String,
pub pool_size: Option<u32>,
}

impl PgConfig {
pub fn from_env() -> PgConfig {
let pool_size = env_var_with_defaults!("PG_POOL_SIZE", Option::<u32>);
debug!("Pool size used... {pool_size:?}");
PgConfig {
user: env_var!("PG_USER"),
password: env_var!("PG_PASSWORD"),
host: env_var!("PG_HOST"),
port: env_var!("PG_PORT"),
database: env_var!("PG_DATABASE"),
pool_size,
}
}
pub fn get_base_connection_string(&self) -> String {
Expand Down
11 changes: 10 additions & 1 deletion packages/talos_certifier_adapters/src/postgres/pg.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::time::Duration;

use async_trait::async_trait;
use deadpool_postgres::{Config, ManagerConfig, Object, Pool, PoolError, Runtime};
use deadpool_postgres::{Config, ManagerConfig, Object, Pool, PoolConfig, PoolError, Runtime};
use log::warn;
use serde_json::{json, Value};
use talos_certifier::{
Expand Down Expand Up @@ -35,6 +35,15 @@ impl Pg {
recycling_method: deadpool_postgres::RecyclingMethod::Fast,
});

if let Some(pool_max_size) = pg_config.pool_size {
let pool_config = PoolConfig {
max_size: pool_max_size as usize,
..PoolConfig::default()
};

config.pool = Some(pool_config);
}

let pool = config.create_pool(Some(Runtime::Tokio1), NoTls).map_err(PgError::CreatePool)?;

//test connection
Expand Down
20 changes: 18 additions & 2 deletions packages/talos_suffix/src/suffix.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Suffix

use std::collections::VecDeque;
use std::{collections::VecDeque, time::Instant};

use log::{debug, error, info, warn};

Expand Down Expand Up @@ -149,6 +149,14 @@ where
return None;
};

// If prune_index is less than prune_threshold, it is not ready
if let Some(prune_index) = self.meta.prune_index {
if prune_index.lt(&prune_threshold) {
debug!("[SUFFIX PRUNE CHECK] Prune index {prune_index} is less than prune_threshold {prune_threshold}. Not ready to prune yet.");
return None;
}
}

// If not reached the max threshold
if self.suffix_length() < prune_threshold {
debug!(
Expand Down Expand Up @@ -295,22 +303,30 @@ where
/// This enables to move the head to the appropiate location.
fn prune_till_index(&mut self, index: usize) -> SuffixResult<Vec<Option<SuffixItem<T>>>> {
info!("Suffix message length BEFORE pruning={} and head={}!!!", self.messages.len(), self.meta.head);
let start_ms = Instant::now();
// info!("Next suffix item index= {:?} after prune index={prune_index:?}.....", suffix_item.item_ver);

// let k = self.retrieve_all_some_vec_items();
// info!("Items before pruning are \n{k:?}");

let drained_entries = self.messages.drain(..index).collect();
let drain_end_ms = start_ms.elapsed().as_micros();

self.update_prune_index(None);

let start_ms_2 = Instant::now();
if let Some(Some(s_item)) = self.messages.iter().find(|m| m.is_some()) {
self.update_head(s_item.item_ver);
} else {
self.update_head(0)
}

// info!("Suffix message length AFTER pruning={} and head={}!!!", self.messages.len(), self.meta.head);
info!("Suffix message length AFTER pruning={} and head={}!!!", self.messages.len(), self.meta.head);
info!(
"Prune took {} microseconds and update head took {} microseconds",
drain_end_ms,
start_ms_2.elapsed().as_micros()
);
// let k = self.retrieve_all_some_vec_items();
// info!("Items after pruning are \n{k:?}");
// }
Expand Down
101 changes: 93 additions & 8 deletions packages/talos_suffix/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ mod suffix_tests {
}

#[test]
fn test_is_ready_for_prune_true_with_prune_index_updated_to_nearest() {
fn test_is_not_ready_when_prune_index_is_below_prune_start_threshold() {
let mut sfx: Suffix<MockSuffixItemMessage> = Suffix::with_config(SuffixConfig {
capacity: 30,
prune_start_threshold: Some(20),
Expand All @@ -224,15 +224,100 @@ mod suffix_tests {
});

assert_eq!(sfx.meta.prune_index, Some(16)); // because version 18, index 17 is not decided.
assert!(sfx.get_safe_prune_index().is_some()); // because message.len > prune_start_threshold, we are ready for prune till the prune_index
assert!(sfx.meta.prune_index.lt(&sfx.meta.prune_start_threshold)); // Prune index is below the start threshold for prune checks
assert!(sfx.get_safe_prune_index().is_none()); // returns none as the prune_index is below the prune_start_threshold.
}

// test the following while pruning
// - returned items in suffix len and version
// - remaining items in suffix len and version
// - updated head value
// - updated prune version number
// fn test_prune
#[test]
fn test_is_ready_when_prune_index_is_above_prune_start_threshold() {
let mut sfx: Suffix<MockSuffixItemMessage> = Suffix::with_config(SuffixConfig {
capacity: 30,
prune_start_threshold: Some(20),
..Default::default()
});

// insert suffix items
let ignore_vec: Vec<u64> = vec![22, 26, 27, 29];

let filtered_vec = filtered_versions_vec(0..31, Some(ignore_vec));

filtered_vec.iter().for_each(|&vers| {
sfx.insert(vers, create_mock_candidate_message(vers)).unwrap();
});

assert_eq!(sfx.messages.len(), 31);

filtered_vec.iter().for_each(|&vers| {
sfx.update_decision(vers, vers + 30).unwrap();
});

assert_eq!(sfx.meta.prune_index, Some(20)); // prune_index updated till version 21 (index 20)
assert!(sfx.meta.prune_index.le(&sfx.meta.prune_start_threshold));
assert_eq!(sfx.get_safe_prune_index(), Some(20));
let _ = sfx.prune_till_index(sfx.meta.prune_index.unwrap());
assert_eq!(sfx.messages.len(), 11);

// assert_eq!(sfx.messages[0].unwrap().item_ver, 21);
assert_eq!(sfx.meta.head, 20);
}
#[test]
fn test_prune_after_min_size_after_prune_check_pass() {
let mut sfx: Suffix<MockSuffixItemMessage> = Suffix::with_config(SuffixConfig {
capacity: 30,
prune_start_threshold: Some(20),
min_size_after_prune: Some(15),
});

// insert suffix items
let ignore_vec: Vec<u64> = vec![22, 26, 27, 29];

let filtered_vec = filtered_versions_vec(0..31, Some(ignore_vec));

filtered_vec.iter().for_each(|&vers| {
sfx.insert(vers, create_mock_candidate_message(vers)).unwrap();
});

assert_eq!(sfx.messages.len(), 31);

filtered_vec.iter().for_each(|&vers| {
sfx.update_decision(vers, vers + 30).unwrap();
});

assert_eq!(sfx.meta.prune_index, Some(20)); // prune_index updated till version 21 (index 20)
assert!(sfx.meta.prune_index.le(&sfx.meta.prune_start_threshold));
// Although prune_index moved till 20, because the min_size_after_prune is 15, and the suffix length is 31,
// there will only be 11 suffix items, therefore the min_size_after_prune criteria fails.
assert_eq!(sfx.get_safe_prune_index(), Some(15));
}

#[test]
fn test_no_prune_after_min_size_after_prune_check_fail() {
let mut sfx: Suffix<MockSuffixItemMessage> = Suffix::with_config(SuffixConfig {
capacity: 30,
prune_start_threshold: Some(20),
min_size_after_prune: Some(5),
});

// insert suffix items
let ignore_vec: Vec<u64> = vec![22, 26, 27, 29];

let filtered_vec = filtered_versions_vec(0..31, Some(ignore_vec));

filtered_vec.iter().for_each(|&vers| {
sfx.insert(vers, create_mock_candidate_message(vers)).unwrap();
});

assert_eq!(sfx.messages.len(), 31);

filtered_vec.iter().for_each(|&vers| {
sfx.update_decision(vers, vers + 30).unwrap();
});

assert_eq!(sfx.meta.prune_index, Some(20)); // prune_index updated till version 20 (index 20)
assert!(sfx.meta.prune_index.le(&sfx.meta.prune_start_threshold));
// As min_size_after_prune is 5, prune_index is at 20 and suffix length is 31, it is safe to remove all the entries till prune_index
assert_eq!(sfx.get_safe_prune_index(), None);
}

#[test]
fn get_index_from_head() {
Expand Down

0 comments on commit 2761e0b

Please sign in to comment.