Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: perf optimisations for certifier #109

Merged
merged 14 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/certifier_kafka_pg/examples/certifier_kafka_pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async fn main() -> Result<(), impl std::error::Error> {
let suffix_config = Some(SuffixConfig {
capacity: 400_000,
prune_start_threshold: Some(300_000),
min_size_after_prune: Some(250_000),
min_size_after_prune: Some(150_000),
});

let configuration = Configuration {
Expand Down
20 changes: 19 additions & 1 deletion packages/talos_certifier/src/services/certifier_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,33 @@ impl CertifierService {
self.suffix.update_prune_index(candidate_version_index);
}

let mut prune_candidate_version = None;
if let Some(prune_index_before_pruning) = self.suffix.get_meta().prune_index {
if let Some(Some(item)) = self.suffix.messages.get(prune_index_before_pruning) {
prune_candidate_version = Some(item.item_ver);
}
};

// prune suffix if required?
if let Some(prune_index) = self.suffix.get_safe_prune_index() {
let pruned_suffix_items = self.suffix.prune_till_index(prune_index).unwrap();

let pruned_items = get_nonempty_suffix_items(pruned_suffix_items.iter());

let (readset, writeset) = generate_certifier_sets_from_suffix(pruned_items);

Certifier::prune_set(&mut self.certifier.reads, &readset);
Certifier::prune_set(&mut self.certifier.writes, &writeset);

// prune_index returned from `get_safe_prune_index` can be a lower index due to suffix.meta.min_size_after_prune value.
// Although we prune only till this new/lower prune_index, we know everything till the previous prune_index was already decided
// and is therefore safe to update the prune_index version deriving from the corresponding version.
//
// This a tiny optimisation which helps in places where a slice is build using the prune_index as the
// start or end of the slice boundary.
if let Some(prune_vers) = prune_candidate_version {
let new_prune_index = self.suffix.index_from_head(prune_vers);
self.suffix.update_prune_index(new_prune_index);
}
}
// remove sets from certifier if pruning?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ impl SystemService for DecisionOutboxService {
}
}
Err(db_error) => {
error!("Error saving decision to XDB with reason={:?}", db_error.to_string());
system.system_notifier.send(SystemMessage::ShutdownWithError(db_error)).unwrap();
}
};
Expand Down
7 changes: 6 additions & 1 deletion packages/talos_certifier_adapters/src/postgres/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use talos_common_utils::env_var;
use log::debug;
use talos_common_utils::{env_var, env_var_with_defaults};

#[derive(Debug, Clone)]
pub struct PgConfig {
Expand All @@ -7,16 +8,20 @@ pub struct PgConfig {
pub host: String,
pub port: String,
pub database: String,
pub pool_size: Option<u32>,
}

impl PgConfig {
pub fn from_env() -> PgConfig {
let pool_size = env_var_with_defaults!("PG_POOL_SIZE", Option::<u32>);
debug!("Pool size used... {pool_size:?}");
PgConfig {
user: env_var!("PG_USER"),
password: env_var!("PG_PASSWORD"),
host: env_var!("PG_HOST"),
port: env_var!("PG_PORT"),
database: env_var!("PG_DATABASE"),
pool_size,
}
}
pub fn get_base_connection_string(&self) -> String {
Expand Down
11 changes: 10 additions & 1 deletion packages/talos_certifier_adapters/src/postgres/pg.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::time::Duration;

use async_trait::async_trait;
use deadpool_postgres::{Config, ManagerConfig, Object, Pool, PoolError, Runtime};
use deadpool_postgres::{Config, ManagerConfig, Object, Pool, PoolConfig, PoolError, Runtime};
use log::warn;
use serde_json::{json, Value};
use talos_certifier::{
Expand Down Expand Up @@ -35,6 +35,15 @@ impl Pg {
recycling_method: deadpool_postgres::RecyclingMethod::Fast,
});

if let Some(pool_max_size) = pg_config.pool_size {
let pool_config = PoolConfig {
max_size: pool_max_size as usize,
..PoolConfig::default()
};

config.pool = Some(pool_config);
}

let pool = config.create_pool(Some(Runtime::Tokio1), NoTls).map_err(PgError::CreatePool)?;

//test connection
Expand Down
20 changes: 18 additions & 2 deletions packages/talos_suffix/src/suffix.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Suffix

use std::collections::VecDeque;
use std::{collections::VecDeque, time::Instant};

use log::{debug, error, info, warn};

Expand Down Expand Up @@ -149,6 +149,14 @@ where
return None;
};

// If prune_index is less than prune_threshold, it is not ready
if let Some(prune_index) = self.meta.prune_index {
if prune_index.lt(&prune_threshold) {
debug!("[SUFFIX PRUNE CHECK] Prune index {prune_index} is less than prune_threshold {prune_threshold}. Not ready to prune yet.");
return None;
}
}

// If not reached the max threshold
if self.suffix_length() < prune_threshold {
debug!(
Expand Down Expand Up @@ -295,22 +303,30 @@ where
/// This enables to move the head to the appropiate location.
fn prune_till_index(&mut self, index: usize) -> SuffixResult<Vec<Option<SuffixItem<T>>>> {
info!("Suffix message length BEFORE pruning={} and head={}!!!", self.messages.len(), self.meta.head);
let start_ms = Instant::now();
// info!("Next suffix item index= {:?} after prune index={prune_index:?}.....", suffix_item.item_ver);

// let k = self.retrieve_all_some_vec_items();
// info!("Items before pruning are \n{k:?}");

let drained_entries = self.messages.drain(..index).collect();
let drain_end_ms = start_ms.elapsed().as_micros();

self.update_prune_index(None);

let start_ms_2 = Instant::now();
if let Some(Some(s_item)) = self.messages.iter().find(|m| m.is_some()) {
self.update_head(s_item.item_ver);
} else {
self.update_head(0)
}

// info!("Suffix message length AFTER pruning={} and head={}!!!", self.messages.len(), self.meta.head);
info!("Suffix message length AFTER pruning={} and head={}!!!", self.messages.len(), self.meta.head);
info!(
"Prune took {} microseconds and update head took {} microseconds",
drain_end_ms,
start_ms_2.elapsed().as_micros()
);
// let k = self.retrieve_all_some_vec_items();
// info!("Items after pruning are \n{k:?}");
// }
Expand Down
101 changes: 93 additions & 8 deletions packages/talos_suffix/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ mod suffix_tests {
}

#[test]
fn test_is_ready_for_prune_true_with_prune_index_updated_to_nearest() {
fn test_is_not_ready_when_prune_index_is_below_prune_start_threshold() {
let mut sfx: Suffix<MockSuffixItemMessage> = Suffix::with_config(SuffixConfig {
capacity: 30,
prune_start_threshold: Some(20),
Expand All @@ -224,15 +224,100 @@ mod suffix_tests {
});

assert_eq!(sfx.meta.prune_index, Some(16)); // because version 18, index 17 is not decided.
assert!(sfx.get_safe_prune_index().is_some()); // because message.len > prune_start_threshold, we are ready for prune till the prune_index
assert!(sfx.meta.prune_index.lt(&sfx.meta.prune_start_threshold)); // Prune index is below the start threshold for prune checks
assert!(sfx.get_safe_prune_index().is_none()); // returns none as the prune_index is below the prune_start_threshold.
}

// test the following while pruning
// - returned items in suffix len and version
// - remaining items in suffix len and version
// - updated head value
// - updated prune version number
// fn test_prune
#[test]
fn test_is_ready_when_prune_index_is_above_prune_start_threshold() {
let mut sfx: Suffix<MockSuffixItemMessage> = Suffix::with_config(SuffixConfig {
capacity: 30,
prune_start_threshold: Some(20),
..Default::default()
});

// insert suffix items
let ignore_vec: Vec<u64> = vec![22, 26, 27, 29];

let filtered_vec = filtered_versions_vec(0..31, Some(ignore_vec));

filtered_vec.iter().for_each(|&vers| {
sfx.insert(vers, create_mock_candidate_message(vers)).unwrap();
});

assert_eq!(sfx.messages.len(), 31);

filtered_vec.iter().for_each(|&vers| {
sfx.update_decision(vers, vers + 30).unwrap();
});

assert_eq!(sfx.meta.prune_index, Some(20)); // prune_index updated till version 21 (index 20)
assert!(sfx.meta.prune_index.le(&sfx.meta.prune_start_threshold));
assert_eq!(sfx.get_safe_prune_index(), Some(20));
let _ = sfx.prune_till_index(sfx.meta.prune_index.unwrap());
assert_eq!(sfx.messages.len(), 11);

// assert_eq!(sfx.messages[0].unwrap().item_ver, 21);
assert_eq!(sfx.meta.head, 20);
}
#[test]
fn test_prune_after_min_size_after_prune_check_pass() {
let mut sfx: Suffix<MockSuffixItemMessage> = Suffix::with_config(SuffixConfig {
capacity: 30,
prune_start_threshold: Some(20),
min_size_after_prune: Some(15),
});

// insert suffix items
let ignore_vec: Vec<u64> = vec![22, 26, 27, 29];

let filtered_vec = filtered_versions_vec(0..31, Some(ignore_vec));

filtered_vec.iter().for_each(|&vers| {
sfx.insert(vers, create_mock_candidate_message(vers)).unwrap();
});

assert_eq!(sfx.messages.len(), 31);

filtered_vec.iter().for_each(|&vers| {
sfx.update_decision(vers, vers + 30).unwrap();
});

assert_eq!(sfx.meta.prune_index, Some(20)); // prune_index updated till version 21 (index 20)
assert!(sfx.meta.prune_index.le(&sfx.meta.prune_start_threshold));
// Although prune_index moved till 20, because the min_size_after_prune is 15, and the suffix length is 31,
// there will only be 11 suffix items, therefore the min_size_after_prune criteria fails.
assert_eq!(sfx.get_safe_prune_index(), Some(15));
}

#[test]
fn test_no_prune_after_min_size_after_prune_check_fail() {
let mut sfx: Suffix<MockSuffixItemMessage> = Suffix::with_config(SuffixConfig {
capacity: 30,
prune_start_threshold: Some(20),
min_size_after_prune: Some(5),
});

// insert suffix items
let ignore_vec: Vec<u64> = vec![22, 26, 27, 29];

let filtered_vec = filtered_versions_vec(0..31, Some(ignore_vec));

filtered_vec.iter().for_each(|&vers| {
sfx.insert(vers, create_mock_candidate_message(vers)).unwrap();
});

assert_eq!(sfx.messages.len(), 31);

filtered_vec.iter().for_each(|&vers| {
sfx.update_decision(vers, vers + 30).unwrap();
});

assert_eq!(sfx.meta.prune_index, Some(20)); // prune_index updated till version 20 (index 20)
assert!(sfx.meta.prune_index.le(&sfx.meta.prune_start_threshold));
// As min_size_after_prune is 5, prune_index is at 20 and suffix length is 31, it is safe to remove all the entries till prune_index
assert_eq!(sfx.get_safe_prune_index(), None);
}

#[test]
fn get_index_from_head() {
Expand Down
Loading