From 95bdeb0c94dd0752f8fcf827974275f36f5d02e3 Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Tue, 26 Nov 2024 22:23:57 +0100 Subject: [PATCH] preliminary parallel compactions --- src/compaction/leveled.rs | 77 +++++++++++++++++++++++--------------- src/compaction/worker.rs | 22 +++++++++-- src/level_manifest/mod.rs | 2 +- src/segment/value_block.rs | 5 +++ 4 files changed, 71 insertions(+), 35 deletions(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index 639a9ec..2c8a69d 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -6,7 +6,7 @@ use super::{Choice, CompactionStrategy, Input as CompactionInput}; use crate::{ config::Config, key_range::KeyRange, - level_manifest::{level::Level, LevelManifest}, + level_manifest::{level::Level, HiddenSet, LevelManifest}, segment::Segment, HashSet, SegmentId, }; @@ -16,8 +16,12 @@ fn aggregate_key_range(segments: &[Segment]) -> KeyRange { } // TODO: Currently does not take in `overshoot` -// TODO: Need to make sure compactions are not too small -fn pick_minimal_overlap(curr_level: &Level, next_level: &Level) -> (HashSet, bool) { +// TODO: Need to make sure compactions are not too small either +fn pick_minimal_compaction( + curr_level: &Level, + next_level: &Level, + hidden_set: &HiddenSet, +) -> Option<(HashSet, bool)> { // assert!(curr_level.is_disjoint, "Lx is not disjoint"); // assert!(next_level.is_disjoint, "Lx+1 is not disjoint"); @@ -27,6 +31,16 @@ fn pick_minimal_overlap(curr_level: &Level, next_level: &Level) -> (HashSet (HashSet (HashSet Choice { - let resolved_view = levels.resolved_view(); - - // If there are any levels that already have a compactor working on it - // we can't touch those, because that could cause a race condition - // violating the leveled compaction invariance of having a single sorted - // run per level - // - // TODO: However, this can probably improved by checking two compaction - // workers just don't cross key ranges - let busy_levels = levels.busy_levels(); - - for (curr_level_index, level) in resolved_view - .iter() - .enumerate() - .skip(1) - .take(resolved_view.len() - 2) - //.rev() - { + let view = &levels.levels; + + // L1+ compactions + for (curr_level_index, level) in view.iter().enumerate().skip(1).take(view.len() - 2) { // NOTE: Level count is 255 max #[allow(clippy::cast_possible_truncation)] let curr_level_index = curr_level_index as u8; @@ -202,20 +210,24 @@ impl CompactionStrategy for Strategy { continue; } - if busy_levels.contains(&curr_level_index) || busy_levels.contains(&next_level_index) { + /* if busy_levels.contains(&curr_level_index) || busy_levels.contains(&next_level_index) { continue; - } + } */ let desired_bytes = self.level_target_size(curr_level_index); let overshoot = level.size().saturating_sub(desired_bytes); if overshoot > 0 { - let Some(next_level) = &resolved_view.get(next_level_index as usize) else { + let Some(next_level) = &view.get(next_level_index as usize) else { break; }; - let (segment_ids, can_trivial_move) = pick_minimal_overlap(level, next_level); + let Some((segment_ids, can_trivial_move)) = + pick_minimal_compaction(level, next_level, &levels.hidden_set) + else { + break; + }; // eprintln!( // "merge {} segments, L{}->L{next_level_index}: {segment_ids:?}", @@ -250,8 +262,11 @@ impl CompactionStrategy for Strategy { } } + // L0->L1 compactions { - let Some(first_level) = resolved_view.first() else { + let busy_levels = levels.busy_levels(); + + let Some(first_level) = view.first() else { return Choice::DoNothing; }; @@ -296,10 +311,10 @@ impl CompactionStrategy for Strategy { } if !busy_levels.contains(&1) { - let mut level = first_level.clone(); + let mut level = (**first_level).clone(); level.sort_by_key_range(); - let Some(next_level) = &resolved_view.get(1) else { + let Some(next_level) = &view.get(1) else { return Choice::DoNothing; }; diff --git a/src/compaction/worker.rs b/src/compaction/worker.rs index 9b42bcf..100b89d 100644 --- a/src/compaction/worker.rs +++ b/src/compaction/worker.rs @@ -125,22 +125,38 @@ fn merge_segments( ) -> crate::Result<()> { if opts.stop_signal.is_stopped() { log::debug!("compactor: stopping before compaction because of stop signal"); + return Ok(()); + } + + // TODO: this sometimes runs, but shouldn't be possible + // TODO: because we have a mutex when hiding & showing segments and checking compaction strategy... + if payload + .segment_ids + .iter() + .any(|id| levels.hidden_set.contains(id)) + { + log::warn!("Compaction task contained hidden segments, declining to run it"); + return Ok(()); } let segments_base_folder = opts.config.path.join(SEGMENTS_FOLDER); let merge_iter = { - let to_merge: Vec<_> = { + let to_merge: Option> = { let segments = levels.get_all_segments(); payload .segment_ids .iter() - .filter_map(|x| segments.get(x)) - .cloned() + .map(|x| segments.get(x).cloned()) .collect() }; + let Some(to_merge) = to_merge else { + log::warn!("Compaction task contained segments that do not exist, declining to run it"); + return Ok(()); + }; + let mut segment_readers: Vec> = Vec::with_capacity(to_merge.len()); for segment in to_merge { diff --git a/src/level_manifest/mod.rs b/src/level_manifest/mod.rs index e7e0d3c..bf4e25c 100644 --- a/src/level_manifest/mod.rs +++ b/src/level_manifest/mod.rs @@ -38,7 +38,7 @@ pub struct LevelManifest { /// /// While consuming segments (because of compaction) they will not appear in the list of segments /// as to not cause conflicts between multiple compaction threads (compacting the same segments) - hidden_set: HiddenSet, + pub hidden_set: HiddenSet, is_disjoint: bool, } diff --git a/src/segment/value_block.rs b/src/segment/value_block.rs index 4883271..40c923d 100644 --- a/src/segment/value_block.rs +++ b/src/segment/value_block.rs @@ -80,7 +80,12 @@ impl ValueBlock { let file_guard = descriptor_table .access(&segment_id)? + .ok_or(()) + .map_err(|()| { + log::error!("Failed to get file guard for segment {segment_id:?}"); + }) .expect("should acquire file handle"); + // TODO: ^ use inspect instead: 1.76 let block = Self::from_file( &mut *file_guard.file.lock().expect("lock is poisoned"),