diff --git a/rust/worker/src/compactor/scheduler.rs b/rust/worker/src/compactor/scheduler.rs index 4ce08768e3a..5ed9af6c2a5 100644 --- a/rust/worker/src/compactor/scheduler.rs +++ b/rust/worker/src/compactor/scheduler.rs @@ -133,7 +133,6 @@ impl Scheduler { } } } - filtered_collections } @@ -166,11 +165,6 @@ impl Scheduler { pub(crate) fn set_memberlist(&mut self, memberlist: Memberlist) { self.memberlist = Some(memberlist); } - - // For testing - pub(crate) fn set_sysdb(&mut self, sysdb: Box) { - self.sysdb = sysdb; - } } #[cfg(test)] @@ -303,7 +297,6 @@ mod tests { let last_compaction_time_2 = 1; sysdb.add_tenant_last_compaction_time(tenant_2, last_compaction_time_2); - scheduler.set_sysdb(sysdb.clone()); scheduler.schedule().await; let jobs = scheduler.get_jobs(); let jobs = jobs.collect::>(); diff --git a/rust/worker/src/execution/operators/flush_sysdb.rs b/rust/worker/src/execution/operators/flush_sysdb.rs index 809f292a883..a64a49edaa4 100644 --- a/rust/worker/src/execution/operators/flush_sysdb.rs +++ b/rust/worker/src/execution/operators/flush_sysdb.rs @@ -187,7 +187,7 @@ mod tests { log_position, collection_version, segment_flush_info.into(), - sysdb, + sysdb.clone(), ); let result = operator.run(&input).await; @@ -196,5 +196,24 @@ mod tests { let result = result.unwrap(); assert_eq!(result.result.collection_id, collection_uuid_1.to_string()); assert_eq!(result.result.collection_version, collection_version + 1); + + let collections = sysdb + .get_collections(Some(collection_uuid_1), None, None, None) + .await; + + assert!(collections.is_ok()); + let collection = collections.unwrap(); + assert_eq!(collection.len(), 1); + let collection = collection[0].clone(); + assert_eq!(collection.log_position, log_position); + + let segments = sysdb.get_segments(None, None, None, None).await; + assert!(segments.is_ok()); + let segments = segments.unwrap(); + assert_eq!(segments.len(), 2); + let segment_1 = segments.iter().find(|s| s.id == segment_id_1).unwrap(); + assert_eq!(segment_1.file_path, file_path_3); + let segment_2 = segments.iter().find(|s| s.id == segment_id_2).unwrap(); + assert_eq!(segment_2.file_path, file_path_4); } } diff --git a/rust/worker/src/sysdb/test_sysdb.rs b/rust/worker/src/sysdb/test_sysdb.rs index e3491a9611c..e81a12b26c8 100644 --- a/rust/worker/src/sysdb/test_sysdb.rs +++ b/rust/worker/src/sysdb/test_sysdb.rs @@ -10,6 +10,7 @@ use crate::types::SegmentScope; use crate::types::SegmentType; use crate::types::Tenant; use async_trait::async_trait; +use parking_lot::Mutex; use std::collections::HashMap; use std::sync::Arc; use uuid::Uuid; @@ -18,6 +19,11 @@ use super::sysdb::GetLastCompactionTimeError; #[derive(Clone, Debug)] pub(crate) struct TestSysDb { + inner: Arc>, +} + +#[derive(Debug)] +struct Inner { collections: HashMap, segments: HashMap, tenant_last_compaction_time: HashMap, @@ -26,18 +32,22 @@ pub(crate) struct TestSysDb { impl TestSysDb { pub(crate) fn new() -> Self { TestSysDb { - collections: HashMap::new(), - segments: HashMap::new(), - tenant_last_compaction_time: HashMap::new(), + inner: Arc::new(Mutex::new(Inner { + collections: HashMap::new(), + segments: HashMap::new(), + tenant_last_compaction_time: HashMap::new(), + })), } } pub(crate) fn add_collection(&mut self, collection: Collection) { - self.collections.insert(collection.id, collection); + let mut inner = self.inner.lock(); + inner.collections.insert(collection.id, collection); } pub(crate) fn add_segment(&mut self, segment: Segment) { - self.segments.insert(segment.id, segment); + let mut inner = self.inner.lock(); + inner.segments.insert(segment.id, segment); } pub(crate) fn add_tenant_last_compaction_time( @@ -45,7 +55,9 @@ impl TestSysDb { tenant: String, last_compaction_time: i64, ) { - self.tenant_last_compaction_time + let mut inner = self.inner.lock(); + inner + .tenant_last_compaction_time .insert(tenant, last_compaction_time); } @@ -112,8 +124,9 @@ impl SysDb for TestSysDb { tenant: Option, database: Option, ) -> Result, GetCollectionsError> { + let inner = self.inner.lock(); let mut collections = Vec::new(); - for collection in self.collections.values() { + for collection in inner.collections.values() { if !TestSysDb::filter_collections( &collection, collection_id, @@ -135,8 +148,9 @@ impl SysDb for TestSysDb { scope: Option, collection: Option, ) -> Result, GetSegmentsError> { + let inner = self.inner.lock(); let mut segments = Vec::new(); - for segment in self.segments.values() { + for segment in inner.segments.values() { if !TestSysDb::filter_segments(&segment, id, r#type.clone(), scope.clone(), collection) { continue; @@ -150,9 +164,10 @@ impl SysDb for TestSysDb { &mut self, tenant_ids: Vec, ) -> Result, GetLastCompactionTimeError> { + let inner = self.inner.lock(); let mut tenants = Vec::new(); for tenant_id in tenant_ids { - let last_compaction_time = match self.tenant_last_compaction_time.get(&tenant_id) { + let last_compaction_time = match inner.tenant_last_compaction_time.get(&tenant_id) { Some(last_compaction_time) => *last_compaction_time, None => { // TODO: Log an error @@ -175,7 +190,8 @@ impl SysDb for TestSysDb { collection_version: i32, segment_flush_info: Arc<[SegmentFlushInfo]>, ) -> Result { - let collection = self + let mut inner = self.inner.lock(); + let collection = inner .collections .get(&Uuid::parse_str(&collection_id).unwrap()); if collection.is_none() { @@ -186,8 +202,8 @@ impl SysDb for TestSysDb { collection.log_position = log_position; let new_collection_version = collection_version + 1; collection.version = new_collection_version; - self.collections.insert(collection.id, collection); - let mut last_compaction_time = match self.tenant_last_compaction_time.get(&tenant_id) { + inner.collections.insert(collection.id, collection); + let mut last_compaction_time = match inner.tenant_last_compaction_time.get(&tenant_id) { Some(last_compaction_time) => *last_compaction_time, None => 0, }; @@ -195,13 +211,13 @@ impl SysDb for TestSysDb { // update segments for segment_flush_info in segment_flush_info.iter() { - let segment = self.segments.get(&segment_flush_info.segment_id); + let segment = inner.segments.get(&segment_flush_info.segment_id); if segment.is_none() { return Err(FlushCompactionError::SegmentNotFound); } let mut segment = segment.unwrap().clone(); segment.file_path = segment_flush_info.file_paths.clone(); - self.segments.insert(segment.id, segment); + inner.segments.insert(segment.id, segment); } Ok(FlushCompactionResponse::new(