Skip to content

Commit

Permalink
Merge pull request #80 from fjall-rs/perf/l0-full-index
Browse files Browse the repository at this point in the history
L0/L1 full index
  • Loading branch information
marvin-j97 authored Nov 26, 2024
2 parents cec2878 + 489ffd4 commit 06f105e
Show file tree
Hide file tree
Showing 17 changed files with 528 additions and 185 deletions.
7 changes: 5 additions & 2 deletions src/compaction/fifo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ mod tests {
key_range::KeyRange,
level_manifest::LevelManifest,
segment::{
block_index::two_level_index::TwoLevelBlockIndex,
block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
file_offsets::FileOffsets,
meta::{Metadata, SegmentId},
value_block::BlockOffset,
Expand All @@ -144,10 +144,13 @@ mod tests {
fn fixture_segment(id: SegmentId, created_at: u128) -> Segment {
let block_cache = Arc::new(BlockCache::with_capacity_bytes(10 * 1_024 * 1_024));

let block_index = TwoLevelBlockIndex::new((0, id).into(), block_cache.clone());
let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));

SegmentInner {
tree_id: 0,
descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
block_index: Arc::new(TwoLevelBlockIndex::new((0, id).into(), block_cache.clone())),
block_index,

offsets: FileOffsets {
bloom_ptr: BlockOffset(0),
Expand Down
7 changes: 5 additions & 2 deletions src/compaction/leveled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ mod tests {
key_range::KeyRange,
level_manifest::LevelManifest,
segment::{
block_index::two_level_index::TwoLevelBlockIndex,
block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
file_offsets::FileOffsets,
meta::{Metadata, SegmentId},
value_block::BlockOffset,
Expand Down Expand Up @@ -355,10 +355,13 @@ mod tests {
) -> Segment {
let block_cache = Arc::new(BlockCache::with_capacity_bytes(10 * 1_024 * 1_024));

let block_index = TwoLevelBlockIndex::new((0, id).into(), block_cache.clone());
let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));

SegmentInner {
tree_id: 0,
descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
block_index: Arc::new(TwoLevelBlockIndex::new((0, id).into(), block_cache.clone())),
block_index,

offsets: FileOffsets {
bloom_ptr: BlockOffset(0),
Expand Down
12 changes: 9 additions & 3 deletions src/compaction/maintenance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,11 @@ mod tests {
key_range::KeyRange,
level_manifest::LevelManifest,
segment::{
block_index::two_level_index::TwoLevelBlockIndex, file_offsets::FileOffsets,
meta::Metadata, value_block::BlockOffset, Segment, SegmentInner,
block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
file_offsets::FileOffsets,
meta::Metadata,
value_block::BlockOffset,
Segment, SegmentInner,
},
};
use std::sync::Arc;
Expand All @@ -100,10 +103,13 @@ mod tests {
fn fixture_segment(id: SegmentId, created_at: u128) -> Segment {
let block_cache = Arc::new(BlockCache::with_capacity_bytes(10 * 1_024 * 1_024));

let block_index = TwoLevelBlockIndex::new((0, id).into(), block_cache.clone());
let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));

SegmentInner {
tree_id: 0,
descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
block_index: Arc::new(TwoLevelBlockIndex::new((0, id).into(), block_cache.clone())),
block_index,

offsets: FileOffsets {
bloom_ptr: BlockOffset(0),
Expand Down
7 changes: 5 additions & 2 deletions src/compaction/tiered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ mod tests {
key_range::KeyRange,
level_manifest::LevelManifest,
segment::{
block_index::two_level_index::TwoLevelBlockIndex,
block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
file_offsets::FileOffsets,
meta::{Metadata, SegmentId},
value_block::BlockOffset,
Expand All @@ -149,10 +149,13 @@ mod tests {
fn fixture_segment(id: SegmentId, size_mib: u64, max_seqno: SeqNo) -> Segment {
let block_cache = Arc::new(BlockCache::with_capacity_bytes(10 * 1_024 * 1_024));

let block_index = TwoLevelBlockIndex::new((0, id).into(), block_cache.clone());
let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));

SegmentInner {
tree_id: 0,
descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
block_index: Arc::new(TwoLevelBlockIndex::new((0, id).into(), block_cache.clone())),
block_index,

offsets: FileOffsets {
bloom_ptr: BlockOffset(0),
Expand Down
45 changes: 31 additions & 14 deletions src/compaction/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ use crate::{
level_manifest::LevelManifest,
merge::{BoxedIterator, Merger},
segment::{
block_index::two_level_index::TwoLevelBlockIndex, id::GlobalSegmentId,
multi_writer::MultiWriter, Segment, SegmentInner,
block_index::{
full_index::FullBlockIndex, two_level_index::TwoLevelBlockIndex, BlockIndexImpl,
},
id::GlobalSegmentId,
multi_writer::MultiWriter,
Segment, SegmentInner,
},
stop_signal::StopSignal,
tree::inner::{SealedMemtables, TreeId},
Expand Down Expand Up @@ -229,21 +233,34 @@ fn merge_segments(
let segment_id = trailer.metadata.id;
let segment_file_path = segments_base_folder.join(segment_id.to_string());

let tli_ptr = trailer.offsets.tli_ptr;

#[cfg(feature = "bloom")]
let bloom_ptr = trailer.offsets.bloom_ptr;

// NOTE: Need to allow because of false positive in Clippy
// because of "bloom" feature
#[allow(clippy::needless_borrows_for_generic_args)]
let block_index = Arc::new(TwoLevelBlockIndex::from_file(
&segment_file_path,
tli_ptr,
(opts.tree_id, segment_id).into(),
opts.config.descriptor_table.clone(),
opts.config.block_cache.clone(),
)?);
let block_index = match payload.dest_level {
0 | 1 => {
let block_index = FullBlockIndex::from_file(
&segment_file_path,
&trailer.metadata,
&trailer.offsets,
)?;
BlockIndexImpl::Full(block_index)
}
_ => {
// NOTE: Need to allow because of false positive in Clippy
// because of "bloom" feature
#[allow(clippy::needless_borrows_for_generic_args)]
let block_index = TwoLevelBlockIndex::from_file(
&segment_file_path,
&trailer.metadata,
trailer.offsets.tli_ptr,
(opts.tree_id, segment_id).into(),
opts.config.descriptor_table.clone(),
opts.config.block_cache.clone(),
)?;
BlockIndexImpl::TwoLevel(block_index)
}
};
let block_index = Arc::new(block_index);

Ok(SegmentInner {
tree_id: opts.tree_id,
Expand Down
29 changes: 22 additions & 7 deletions src/level_manifest/level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub struct Level {
/// is only recomputed when the level is changed
/// to avoid unnecessary CPU work
pub is_disjoint: bool,
// pub key_range: KeyRange,
}

impl std::fmt::Display for Level {
Expand All @@ -41,26 +42,36 @@ impl Default for Level {
fn default() -> Self {
Self {
is_disjoint: true,
segments: Vec::with_capacity(10),
segments: Vec::new(),
// key_range: KeyRange::empty(),
}
}
}

impl Level {
// TODO: unit test
fn set_key_range(&mut self) {
todo!()
}

pub fn list_ids(&self) -> HashSet<SegmentId> {
self.segments.iter().map(|x| x.metadata.id).collect()
}

pub fn insert(&mut self, segment: Segment) {
self.segments.push(segment);
fn update_metadata(&mut self) {
self.set_disjoint_flag();
self.sort();
// self.set_key_range();
}

pub fn insert(&mut self, segment: Segment) {
self.segments.push(segment);
self.update_metadata();
}

pub fn remove(&mut self, segment_id: SegmentId) {
self.segments.retain(|x| segment_id != x.metadata.id);
self.set_disjoint_flag();
self.sort();
self.update_metadata();
}

pub(crate) fn sort(&mut self) {
Expand Down Expand Up @@ -223,7 +234,7 @@ mod tests {
descriptor_table::FileDescriptorTable,
key_range::KeyRange,
segment::{
block_index::two_level_index::TwoLevelBlockIndex,
block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
file_offsets::FileOffsets,
meta::{Metadata, SegmentId},
value_block::BlockOffset,
Expand All @@ -241,10 +252,13 @@ mod tests {
fn fixture_segment(id: SegmentId, key_range: KeyRange) -> Segment {
let block_cache = Arc::new(BlockCache::with_capacity_bytes(10 * 1_024 * 1_024));

let block_index = TwoLevelBlockIndex::new((0, id).into(), block_cache.clone());
let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));

SegmentInner {
tree_id: 0,
descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
block_index: Arc::new(TwoLevelBlockIndex::new((0, id).into(), block_cache.clone())),
block_index,

offsets: FileOffsets {
bloom_ptr: BlockOffset(0),
Expand Down Expand Up @@ -287,6 +301,7 @@ mod tests {
fn level_disjoint_cull() {
let level = Level {
is_disjoint: true,
// key_range: KeyRange::empty(),
segments: vec![
fixture_segment(0, KeyRange::new((Slice::from("a"), Slice::from("c")))),
fixture_segment(1, KeyRange::new((Slice::from("d"), Slice::from("g")))),
Expand Down
23 changes: 18 additions & 5 deletions src/level_manifest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,24 @@ impl LevelManifest {
Ok(levels)
}

pub(crate) fn recover_ids<P: AsRef<Path>>(path: P) -> crate::Result<Vec<SegmentId>> {
Ok(Self::load_level_manifest(path)?
.into_iter()
.flatten()
.collect())
pub(crate) fn recover_ids<P: AsRef<Path>>(
path: P,
) -> crate::Result<crate::HashMap<SegmentId, u8 /* Level index */>> {
let manifest = Self::load_level_manifest(path)?;
let mut result = crate::HashMap::default();

for (level_idx, segment_ids) in manifest.into_iter().enumerate() {
for segment_id in segment_ids {
result.insert(
segment_id,
level_idx
.try_into()
.expect("there are less than 256 levels"),
);
}
}

Ok(result)
}

fn resolve_levels(
Expand Down
85 changes: 85 additions & 0 deletions src/segment/block_index/full_index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use super::{block_handle::KeyedBlockHandle, BlockIndex};
use crate::segment::{
block_index::IndexBlock,
value_block::{BlockOffset, CachePolicy},
};
use std::{fs::File, io::Seek, path::Path};

/// Index that translates item keys to block handles
///
/// The index is fully loaded into memory.
pub struct FullBlockIndex(Box<[KeyedBlockHandle]>);

impl std::ops::Deref for FullBlockIndex {
type Target = Box<[KeyedBlockHandle]>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl FullBlockIndex {
pub fn from_file<P: AsRef<Path>>(
path: P,
metadata: &crate::segment::meta::Metadata,
offsets: &crate::segment::file_offsets::FileOffsets,
) -> crate::Result<Self> {
let path = path.as_ref();
let cnt = metadata.index_block_count as usize;

log::trace!(
"reading full block index from {path:?} at idx_ptr={} ({cnt} index blocks)",
offsets.index_block_ptr,
);

let mut file = File::open(path)?;
file.seek(std::io::SeekFrom::Start(*offsets.index_block_ptr))?;

let mut block_handles = Vec::with_capacity(cnt);

for _ in 0..cnt {
let idx_block = IndexBlock::from_reader(&mut file)?.items;
// TODO: 1.80? IntoIter impl for Box<[T]>
block_handles.extend(idx_block.into_vec());
}

debug_assert!(!block_handles.is_empty());

Ok(Self(block_handles.into_boxed_slice()))
}
}

impl BlockIndex for FullBlockIndex {
fn get_lowest_block_containing_key(
&self,
key: &[u8],
_: CachePolicy,
) -> crate::Result<Option<BlockOffset>> {
use super::KeyedBlockIndex;

self.0
.get_lowest_block_containing_key(key, CachePolicy::Read)
.map(|x| x.map(|x| x.offset))
}

/// Gets the last block handle that may contain the given item
fn get_last_block_containing_key(
&self,
key: &[u8],
cache_policy: CachePolicy,
) -> crate::Result<Option<BlockOffset>> {
use super::KeyedBlockIndex;

self.0
.get_last_block_containing_key(key, cache_policy)
.map(|x| x.map(|x| x.offset))
}

fn get_last_block_handle(&self, _: CachePolicy) -> crate::Result<BlockOffset> {
use super::KeyedBlockIndex;

self.0
.get_last_block_handle(CachePolicy::Read)
.map(|x| x.offset)
}
}
Loading

0 comments on commit 06f105e

Please sign in to comment.