Skip to content

Commit

Permalink
preliminary parallel compactions
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin-j97 committed Nov 26, 2024
1 parent f376442 commit 95bdeb0
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 35 deletions.
77 changes: 46 additions & 31 deletions src/compaction/leveled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use super::{Choice, CompactionStrategy, Input as CompactionInput};
use crate::{
config::Config,
key_range::KeyRange,
level_manifest::{level::Level, LevelManifest},
level_manifest::{level::Level, HiddenSet, LevelManifest},
segment::Segment,
HashSet, SegmentId,
};
Expand All @@ -16,8 +16,12 @@ fn aggregate_key_range(segments: &[Segment]) -> KeyRange {
}

// TODO: Currently does not take in `overshoot`
// TODO: Need to make sure compactions are not too small
fn pick_minimal_overlap(curr_level: &Level, next_level: &Level) -> (HashSet<SegmentId>, bool) {
// TODO: Need to make sure compactions are not too small either
fn pick_minimal_compaction(
curr_level: &Level,
next_level: &Level,
hidden_set: &HiddenSet,
) -> Option<(HashSet<SegmentId>, bool)> {
// assert!(curr_level.is_disjoint, "Lx is not disjoint");
// assert!(next_level.is_disjoint, "Lx+1 is not disjoint");

Expand All @@ -27,6 +31,16 @@ fn pick_minimal_overlap(curr_level: &Level, next_level: &Level) -> (HashSet<Segm
let windows = next_level.windows(size);

for window in windows {
if window
.iter()
.map(|x| x.metadata.id)
.any(|x| hidden_set.contains(&x))
{
// IMPORTANT: Compaction is blocked because of other
// on-going compaction
continue;
}

let key_range = aggregate_key_range(window);

// Pull in all segments in current level into compaction
Expand Down Expand Up @@ -55,6 +69,16 @@ fn pick_minimal_overlap(curr_level: &Level, next_level: &Level) -> (HashSet<Segm
curr_level.overlapping_segments(&key_range).collect()
};

if curr_level_pull_in
.iter()
.map(|x| x.metadata.id)
.any(|x| hidden_set.contains(&x))
{
// IMPORTANT: Compaction is blocked because of other
// on-going compaction
continue;
}

let curr_level_size = curr_level_pull_in
.iter()
.map(|x| x.metadata.file_size)
Expand Down Expand Up @@ -100,9 +124,7 @@ fn pick_minimal_overlap(curr_level: &Level, next_level: &Level) -> (HashSet<Segm
.into_iter()
.min_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));

let (_, set, can_trivial_move) = minimum_effort_choice.expect("should exist");

(set, can_trivial_move)
minimum_effort_choice.map(|(_, set, can_trivial_move)| (set, can_trivial_move))
}

/// Levelled compaction strategy (LCS)
Expand Down Expand Up @@ -174,24 +196,10 @@ impl Strategy {
impl CompactionStrategy for Strategy {
#[allow(clippy::too_many_lines)]
fn choose(&self, levels: &LevelManifest, _: &Config) -> Choice {
let resolved_view = levels.resolved_view();

// If there are any levels that already have a compactor working on it
// we can't touch those, because that could cause a race condition
// violating the leveled compaction invariance of having a single sorted
// run per level
//
// TODO: However, this can probably improved by checking two compaction
// workers just don't cross key ranges
let busy_levels = levels.busy_levels();

for (curr_level_index, level) in resolved_view
.iter()
.enumerate()
.skip(1)
.take(resolved_view.len() - 2)
//.rev()
{
let view = &levels.levels;

// L1+ compactions
for (curr_level_index, level) in view.iter().enumerate().skip(1).take(view.len() - 2) {
// NOTE: Level count is 255 max
#[allow(clippy::cast_possible_truncation)]
let curr_level_index = curr_level_index as u8;
Expand All @@ -202,20 +210,24 @@ impl CompactionStrategy for Strategy {
continue;
}

if busy_levels.contains(&curr_level_index) || busy_levels.contains(&next_level_index) {
/* if busy_levels.contains(&curr_level_index) || busy_levels.contains(&next_level_index) {
continue;
}
} */

let desired_bytes = self.level_target_size(curr_level_index);

let overshoot = level.size().saturating_sub(desired_bytes);

if overshoot > 0 {
let Some(next_level) = &resolved_view.get(next_level_index as usize) else {
let Some(next_level) = &view.get(next_level_index as usize) else {
break;
};

let (segment_ids, can_trivial_move) = pick_minimal_overlap(level, next_level);
let Some((segment_ids, can_trivial_move)) =
pick_minimal_compaction(level, next_level, &levels.hidden_set)
else {
break;
};

// eprintln!(
// "merge {} segments, L{}->L{next_level_index}: {segment_ids:?}",
Expand Down Expand Up @@ -250,8 +262,11 @@ impl CompactionStrategy for Strategy {
}
}

// L0->L1 compactions
{
let Some(first_level) = resolved_view.first() else {
let busy_levels = levels.busy_levels();

let Some(first_level) = view.first() else {
return Choice::DoNothing;
};

Expand Down Expand Up @@ -296,10 +311,10 @@ impl CompactionStrategy for Strategy {
}

if !busy_levels.contains(&1) {
let mut level = first_level.clone();
let mut level = (**first_level).clone();
level.sort_by_key_range();

let Some(next_level) = &resolved_view.get(1) else {
let Some(next_level) = &view.get(1) else {
return Choice::DoNothing;
};

Expand Down
22 changes: 19 additions & 3 deletions src/compaction/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,22 +125,38 @@ fn merge_segments(
) -> crate::Result<()> {
if opts.stop_signal.is_stopped() {
log::debug!("compactor: stopping before compaction because of stop signal");
return Ok(());
}

// TODO: this sometimes runs, but shouldn't be possible
// TODO: because we have a mutex when hiding & showing segments and checking compaction strategy...
if payload
.segment_ids
.iter()
.any(|id| levels.hidden_set.contains(id))
{
log::warn!("Compaction task contained hidden segments, declining to run it");
return Ok(());
}

let segments_base_folder = opts.config.path.join(SEGMENTS_FOLDER);

let merge_iter = {
let to_merge: Vec<_> = {
let to_merge: Option<Vec<_>> = {
let segments = levels.get_all_segments();

payload
.segment_ids
.iter()
.filter_map(|x| segments.get(x))
.cloned()
.map(|x| segments.get(x).cloned())
.collect()
};

let Some(to_merge) = to_merge else {
log::warn!("Compaction task contained segments that do not exist, declining to run it");
return Ok(());
};

let mut segment_readers: Vec<BoxedIterator<'_>> = Vec::with_capacity(to_merge.len());

for segment in to_merge {
Expand Down
2 changes: 1 addition & 1 deletion src/level_manifest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct LevelManifest {
///
/// While consuming segments (because of compaction) they will not appear in the list of segments
/// as to not cause conflicts between multiple compaction threads (compacting the same segments)
hidden_set: HiddenSet,
pub hidden_set: HiddenSet,

is_disjoint: bool,
}
Expand Down
5 changes: 5 additions & 0 deletions src/segment/value_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,12 @@ impl ValueBlock {

let file_guard = descriptor_table
.access(&segment_id)?
.ok_or(())
.map_err(|()| {
log::error!("Failed to get file guard for segment {segment_id:?}");
})
.expect("should acquire file handle");
// TODO: ^ use inspect instead: 1.76

let block = Self::from_file(
&mut *file_guard.file.lock().expect("lock is poisoned"),
Expand Down

0 comments on commit 95bdeb0

Please sign in to comment.