diff --git a/examples/certifier_kafka_pg/examples/certifier_kafka_pg.rs b/examples/certifier_kafka_pg/examples/certifier_kafka_pg.rs index b236740b..17277bac 100644 --- a/examples/certifier_kafka_pg/examples/certifier_kafka_pg.rs +++ b/examples/certifier_kafka_pg/examples/certifier_kafka_pg.rs @@ -24,7 +24,7 @@ async fn main() -> Result<(), impl std::error::Error> { let suffix_config = Some(SuffixConfig { capacity: 400_000, prune_start_threshold: Some(300_000), - min_size_after_prune: Some(250_000), + min_size_after_prune: Some(150_000), }); let configuration = Configuration { diff --git a/packages/talos_certifier/src/services/certifier_service.rs b/packages/talos_certifier/src/services/certifier_service.rs index dde7df1f..19ae66fe 100644 --- a/packages/talos_certifier/src/services/certifier_service.rs +++ b/packages/talos_certifier/src/services/certifier_service.rs @@ -122,15 +122,33 @@ impl CertifierService { self.suffix.update_prune_index(candidate_version_index); } + let mut prune_candidate_version = None; + if let Some(prune_index_before_pruning) = self.suffix.get_meta().prune_index { + if let Some(Some(item)) = self.suffix.messages.get(prune_index_before_pruning) { + prune_candidate_version = Some(item.item_ver); + } + }; + // prune suffix if required? if let Some(prune_index) = self.suffix.get_safe_prune_index() { let pruned_suffix_items = self.suffix.prune_till_index(prune_index).unwrap(); - let pruned_items = get_nonempty_suffix_items(pruned_suffix_items.iter()); + let (readset, writeset) = generate_certifier_sets_from_suffix(pruned_items); Certifier::prune_set(&mut self.certifier.reads, &readset); Certifier::prune_set(&mut self.certifier.writes, &writeset); + + // prune_index returned from `get_safe_prune_index` can be a lower index due to suffix.meta.min_size_after_prune value. + // Although we prune only till this new/lower prune_index, we know everything till the previous prune_index was already decided + // and is therefore safe to update the prune_index version deriving from the corresponding version. + // + // This a tiny optimisation which helps in places where a slice is build using the prune_index as the + // start or end of the slice boundary. + if let Some(prune_vers) = prune_candidate_version { + let new_prune_index = self.suffix.index_from_head(prune_vers); + self.suffix.update_prune_index(new_prune_index); + } } // remove sets from certifier if pruning? diff --git a/packages/talos_certifier/src/services/decision_outbox_service.rs b/packages/talos_certifier/src/services/decision_outbox_service.rs index a6eac252..b10bd00f 100644 --- a/packages/talos_certifier/src/services/decision_outbox_service.rs +++ b/packages/talos_certifier/src/services/decision_outbox_service.rs @@ -138,6 +138,7 @@ impl SystemService for DecisionOutboxService { } } Err(db_error) => { + error!("Error saving decision to XDB with reason={:?}", db_error.to_string()); system.system_notifier.send(SystemMessage::ShutdownWithError(db_error)).unwrap(); } }; diff --git a/packages/talos_certifier_adapters/src/postgres/config.rs b/packages/talos_certifier_adapters/src/postgres/config.rs index 0a456da2..37ce1616 100644 --- a/packages/talos_certifier_adapters/src/postgres/config.rs +++ b/packages/talos_certifier_adapters/src/postgres/config.rs @@ -1,4 +1,5 @@ -use talos_common_utils::env_var; +use log::debug; +use talos_common_utils::{env_var, env_var_with_defaults}; #[derive(Debug, Clone)] pub struct PgConfig { @@ -7,16 +8,20 @@ pub struct PgConfig { pub host: String, pub port: String, pub database: String, + pub pool_size: Option, } impl PgConfig { pub fn from_env() -> PgConfig { + let pool_size = env_var_with_defaults!("PG_POOL_SIZE", Option::); + debug!("Pool size used... {pool_size:?}"); PgConfig { user: env_var!("PG_USER"), password: env_var!("PG_PASSWORD"), host: env_var!("PG_HOST"), port: env_var!("PG_PORT"), database: env_var!("PG_DATABASE"), + pool_size, } } pub fn get_base_connection_string(&self) -> String { diff --git a/packages/talos_certifier_adapters/src/postgres/pg.rs b/packages/talos_certifier_adapters/src/postgres/pg.rs index 0e30b2f7..c1df2865 100644 --- a/packages/talos_certifier_adapters/src/postgres/pg.rs +++ b/packages/talos_certifier_adapters/src/postgres/pg.rs @@ -1,7 +1,7 @@ use std::time::Duration; use async_trait::async_trait; -use deadpool_postgres::{Config, ManagerConfig, Object, Pool, PoolError, Runtime}; +use deadpool_postgres::{Config, ManagerConfig, Object, Pool, PoolConfig, PoolError, Runtime}; use log::warn; use serde_json::{json, Value}; use talos_certifier::{ @@ -35,6 +35,15 @@ impl Pg { recycling_method: deadpool_postgres::RecyclingMethod::Fast, }); + if let Some(pool_max_size) = pg_config.pool_size { + let pool_config = PoolConfig { + max_size: pool_max_size as usize, + ..PoolConfig::default() + }; + + config.pool = Some(pool_config); + } + let pool = config.create_pool(Some(Runtime::Tokio1), NoTls).map_err(PgError::CreatePool)?; //test connection diff --git a/packages/talos_suffix/src/suffix.rs b/packages/talos_suffix/src/suffix.rs index c3cf3500..77a2aa7c 100644 --- a/packages/talos_suffix/src/suffix.rs +++ b/packages/talos_suffix/src/suffix.rs @@ -1,6 +1,6 @@ // Suffix -use std::collections::VecDeque; +use std::{collections::VecDeque, time::Instant}; use log::{debug, error, info, warn}; @@ -149,6 +149,14 @@ where return None; }; + // If prune_index is less than prune_threshold, it is not ready + if let Some(prune_index) = self.meta.prune_index { + if prune_index.lt(&prune_threshold) { + debug!("[SUFFIX PRUNE CHECK] Prune index {prune_index} is less than prune_threshold {prune_threshold}. Not ready to prune yet."); + return None; + } + } + // If not reached the max threshold if self.suffix_length() < prune_threshold { debug!( @@ -295,22 +303,30 @@ where /// This enables to move the head to the appropiate location. fn prune_till_index(&mut self, index: usize) -> SuffixResult>>> { info!("Suffix message length BEFORE pruning={} and head={}!!!", self.messages.len(), self.meta.head); + let start_ms = Instant::now(); // info!("Next suffix item index= {:?} after prune index={prune_index:?}.....", suffix_item.item_ver); // let k = self.retrieve_all_some_vec_items(); // info!("Items before pruning are \n{k:?}"); let drained_entries = self.messages.drain(..index).collect(); + let drain_end_ms = start_ms.elapsed().as_micros(); self.update_prune_index(None); + let start_ms_2 = Instant::now(); if let Some(Some(s_item)) = self.messages.iter().find(|m| m.is_some()) { self.update_head(s_item.item_ver); } else { self.update_head(0) } - // info!("Suffix message length AFTER pruning={} and head={}!!!", self.messages.len(), self.meta.head); + info!("Suffix message length AFTER pruning={} and head={}!!!", self.messages.len(), self.meta.head); + info!( + "Prune took {} microseconds and update head took {} microseconds", + drain_end_ms, + start_ms_2.elapsed().as_micros() + ); // let k = self.retrieve_all_some_vec_items(); // info!("Items after pruning are \n{k:?}"); // } diff --git a/packages/talos_suffix/src/tests.rs b/packages/talos_suffix/src/tests.rs index 8734bb04..0266fd50 100644 --- a/packages/talos_suffix/src/tests.rs +++ b/packages/talos_suffix/src/tests.rs @@ -201,7 +201,7 @@ mod suffix_tests { } #[test] - fn test_is_ready_for_prune_true_with_prune_index_updated_to_nearest() { + fn test_is_not_ready_when_prune_index_is_below_prune_start_threshold() { let mut sfx: Suffix = Suffix::with_config(SuffixConfig { capacity: 30, prune_start_threshold: Some(20), @@ -224,15 +224,100 @@ mod suffix_tests { }); assert_eq!(sfx.meta.prune_index, Some(16)); // because version 18, index 17 is not decided. - assert!(sfx.get_safe_prune_index().is_some()); // because message.len > prune_start_threshold, we are ready for prune till the prune_index + assert!(sfx.meta.prune_index.lt(&sfx.meta.prune_start_threshold)); // Prune index is below the start threshold for prune checks + assert!(sfx.get_safe_prune_index().is_none()); // returns none as the prune_index is below the prune_start_threshold. } - // test the following while pruning - // - returned items in suffix len and version - // - remaining items in suffix len and version - // - updated head value - // - updated prune version number - // fn test_prune + #[test] + fn test_is_ready_when_prune_index_is_above_prune_start_threshold() { + let mut sfx: Suffix = Suffix::with_config(SuffixConfig { + capacity: 30, + prune_start_threshold: Some(20), + ..Default::default() + }); + + // insert suffix items + let ignore_vec: Vec = vec![22, 26, 27, 29]; + + let filtered_vec = filtered_versions_vec(0..31, Some(ignore_vec)); + + filtered_vec.iter().for_each(|&vers| { + sfx.insert(vers, create_mock_candidate_message(vers)).unwrap(); + }); + + assert_eq!(sfx.messages.len(), 31); + + filtered_vec.iter().for_each(|&vers| { + sfx.update_decision(vers, vers + 30).unwrap(); + }); + + assert_eq!(sfx.meta.prune_index, Some(20)); // prune_index updated till version 21 (index 20) + assert!(sfx.meta.prune_index.le(&sfx.meta.prune_start_threshold)); + assert_eq!(sfx.get_safe_prune_index(), Some(20)); + let _ = sfx.prune_till_index(sfx.meta.prune_index.unwrap()); + assert_eq!(sfx.messages.len(), 11); + + // assert_eq!(sfx.messages[0].unwrap().item_ver, 21); + assert_eq!(sfx.meta.head, 20); + } + #[test] + fn test_prune_after_min_size_after_prune_check_pass() { + let mut sfx: Suffix = Suffix::with_config(SuffixConfig { + capacity: 30, + prune_start_threshold: Some(20), + min_size_after_prune: Some(15), + }); + + // insert suffix items + let ignore_vec: Vec = vec![22, 26, 27, 29]; + + let filtered_vec = filtered_versions_vec(0..31, Some(ignore_vec)); + + filtered_vec.iter().for_each(|&vers| { + sfx.insert(vers, create_mock_candidate_message(vers)).unwrap(); + }); + + assert_eq!(sfx.messages.len(), 31); + + filtered_vec.iter().for_each(|&vers| { + sfx.update_decision(vers, vers + 30).unwrap(); + }); + + assert_eq!(sfx.meta.prune_index, Some(20)); // prune_index updated till version 21 (index 20) + assert!(sfx.meta.prune_index.le(&sfx.meta.prune_start_threshold)); + // Although prune_index moved till 20, because the min_size_after_prune is 15, and the suffix length is 31, + // there will only be 11 suffix items, therefore the min_size_after_prune criteria fails. + assert_eq!(sfx.get_safe_prune_index(), Some(15)); + } + + #[test] + fn test_no_prune_after_min_size_after_prune_check_fail() { + let mut sfx: Suffix = Suffix::with_config(SuffixConfig { + capacity: 30, + prune_start_threshold: Some(20), + min_size_after_prune: Some(5), + }); + + // insert suffix items + let ignore_vec: Vec = vec![22, 26, 27, 29]; + + let filtered_vec = filtered_versions_vec(0..31, Some(ignore_vec)); + + filtered_vec.iter().for_each(|&vers| { + sfx.insert(vers, create_mock_candidate_message(vers)).unwrap(); + }); + + assert_eq!(sfx.messages.len(), 31); + + filtered_vec.iter().for_each(|&vers| { + sfx.update_decision(vers, vers + 30).unwrap(); + }); + + assert_eq!(sfx.meta.prune_index, Some(20)); // prune_index updated till version 20 (index 20) + assert!(sfx.meta.prune_index.le(&sfx.meta.prune_start_threshold)); + // As min_size_after_prune is 5, prune_index is at 20 and suffix length is 31, it is safe to remove all the entries till prune_index + assert_eq!(sfx.get_safe_prune_index(), None); + } #[test] fn get_index_from_head() {