diff --git a/src/compaction/fifo.rs b/src/compaction/fifo.rs index 7eea268..0942f34 100644 --- a/src/compaction/fifo.rs +++ b/src/compaction/fifo.rs @@ -124,7 +124,7 @@ mod tests { key_range::KeyRange, level_manifest::LevelManifest, segment::{ - block_index::two_level_index::TwoLevelBlockIndex, + block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl}, file_offsets::FileOffsets, meta::{Metadata, SegmentId}, value_block::BlockOffset, @@ -144,10 +144,13 @@ mod tests { fn fixture_segment(id: SegmentId, created_at: u128) -> Segment { let block_cache = Arc::new(BlockCache::with_capacity_bytes(10 * 1_024 * 1_024)); + let block_index = TwoLevelBlockIndex::new((0, id).into(), block_cache.clone()); + let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index)); + SegmentInner { tree_id: 0, descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)), - block_index: Arc::new(TwoLevelBlockIndex::new((0, id).into(), block_cache.clone())), + block_index, offsets: FileOffsets { bloom_ptr: BlockOffset(0), diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index 09d79ec..32b5cec 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -323,7 +323,7 @@ mod tests { key_range::KeyRange, level_manifest::LevelManifest, segment::{ - block_index::two_level_index::TwoLevelBlockIndex, + block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl}, file_offsets::FileOffsets, meta::{Metadata, SegmentId}, value_block::BlockOffset, @@ -355,10 +355,13 @@ mod tests { ) -> Segment { let block_cache = Arc::new(BlockCache::with_capacity_bytes(10 * 1_024 * 1_024)); + let block_index = TwoLevelBlockIndex::new((0, id).into(), block_cache.clone()); + let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index)); + SegmentInner { tree_id: 0, descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)), - block_index: Arc::new(TwoLevelBlockIndex::new((0, id).into(), block_cache.clone())), + block_index, offsets: FileOffsets { bloom_ptr: BlockOffset(0), diff --git a/src/compaction/maintenance.rs b/src/compaction/maintenance.rs index 11a86f2..d6c8598 100644 --- a/src/compaction/maintenance.rs +++ b/src/compaction/maintenance.rs @@ -86,8 +86,11 @@ mod tests { key_range::KeyRange, level_manifest::LevelManifest, segment::{ - block_index::two_level_index::TwoLevelBlockIndex, file_offsets::FileOffsets, - meta::Metadata, value_block::BlockOffset, Segment, SegmentInner, + block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl}, + file_offsets::FileOffsets, + meta::Metadata, + value_block::BlockOffset, + Segment, SegmentInner, }, }; use std::sync::Arc; @@ -100,10 +103,13 @@ mod tests { fn fixture_segment(id: SegmentId, created_at: u128) -> Segment { let block_cache = Arc::new(BlockCache::with_capacity_bytes(10 * 1_024 * 1_024)); + let block_index = TwoLevelBlockIndex::new((0, id).into(), block_cache.clone()); + let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index)); + SegmentInner { tree_id: 0, descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)), - block_index: Arc::new(TwoLevelBlockIndex::new((0, id).into(), block_cache.clone())), + block_index, offsets: FileOffsets { bloom_ptr: BlockOffset(0), diff --git a/src/compaction/tiered.rs b/src/compaction/tiered.rs index 367c254..056186c 100644 --- a/src/compaction/tiered.rs +++ b/src/compaction/tiered.rs @@ -131,7 +131,7 @@ mod tests { key_range::KeyRange, level_manifest::LevelManifest, segment::{ - block_index::two_level_index::TwoLevelBlockIndex, + block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl}, file_offsets::FileOffsets, meta::{Metadata, SegmentId}, value_block::BlockOffset, @@ -149,10 +149,13 @@ mod tests { fn fixture_segment(id: SegmentId, size_mib: u64, max_seqno: SeqNo) -> Segment { let block_cache = Arc::new(BlockCache::with_capacity_bytes(10 * 1_024 * 1_024)); + let block_index = TwoLevelBlockIndex::new((0, id).into(), block_cache.clone()); + let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index)); + SegmentInner { tree_id: 0, descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)), - block_index: Arc::new(TwoLevelBlockIndex::new((0, id).into(), block_cache.clone())), + block_index, offsets: FileOffsets { bloom_ptr: BlockOffset(0), diff --git a/src/compaction/worker.rs b/src/compaction/worker.rs index b053a52..31da364 100644 --- a/src/compaction/worker.rs +++ b/src/compaction/worker.rs @@ -9,8 +9,12 @@ use crate::{ level_manifest::LevelManifest, merge::{BoxedIterator, Merger}, segment::{ - block_index::two_level_index::TwoLevelBlockIndex, id::GlobalSegmentId, - multi_writer::MultiWriter, Segment, SegmentInner, + block_index::{ + full_index::FullBlockIndex, two_level_index::TwoLevelBlockIndex, BlockIndexImpl, + }, + id::GlobalSegmentId, + multi_writer::MultiWriter, + Segment, SegmentInner, }, stop_signal::StopSignal, tree::inner::{SealedMemtables, TreeId}, @@ -229,21 +233,34 @@ fn merge_segments( let segment_id = trailer.metadata.id; let segment_file_path = segments_base_folder.join(segment_id.to_string()); - let tli_ptr = trailer.offsets.tli_ptr; - #[cfg(feature = "bloom")] let bloom_ptr = trailer.offsets.bloom_ptr; - // NOTE: Need to allow because of false positive in Clippy - // because of "bloom" feature - #[allow(clippy::needless_borrows_for_generic_args)] - let block_index = Arc::new(TwoLevelBlockIndex::from_file( - &segment_file_path, - tli_ptr, - (opts.tree_id, segment_id).into(), - opts.config.descriptor_table.clone(), - opts.config.block_cache.clone(), - )?); + let block_index = match payload.dest_level { + 0 | 1 => { + let block_index = FullBlockIndex::from_file( + &segment_file_path, + &trailer.metadata, + &trailer.offsets, + )?; + BlockIndexImpl::Full(block_index) + } + _ => { + // NOTE: Need to allow because of false positive in Clippy + // because of "bloom" feature + #[allow(clippy::needless_borrows_for_generic_args)] + let block_index = TwoLevelBlockIndex::from_file( + &segment_file_path, + &trailer.metadata, + trailer.offsets.tli_ptr, + (opts.tree_id, segment_id).into(), + opts.config.descriptor_table.clone(), + opts.config.block_cache.clone(), + )?; + BlockIndexImpl::TwoLevel(block_index) + } + }; + let block_index = Arc::new(block_index); Ok(SegmentInner { tree_id: opts.tree_id, diff --git a/src/level_manifest/level.rs b/src/level_manifest/level.rs index 32199ed..488bb36 100644 --- a/src/level_manifest/level.rs +++ b/src/level_manifest/level.rs @@ -17,6 +17,7 @@ pub struct Level { /// is only recomputed when the level is changed /// to avoid unnecessary CPU work pub is_disjoint: bool, + // pub key_range: KeyRange, } impl std::fmt::Display for Level { @@ -41,26 +42,36 @@ impl Default for Level { fn default() -> Self { Self { is_disjoint: true, - segments: Vec::with_capacity(10), + segments: Vec::new(), + // key_range: KeyRange::empty(), } } } impl Level { + // TODO: unit test + fn set_key_range(&mut self) { + todo!() + } + pub fn list_ids(&self) -> HashSet { self.segments.iter().map(|x| x.metadata.id).collect() } - pub fn insert(&mut self, segment: Segment) { - self.segments.push(segment); + fn update_metadata(&mut self) { self.set_disjoint_flag(); self.sort(); + // self.set_key_range(); + } + + pub fn insert(&mut self, segment: Segment) { + self.segments.push(segment); + self.update_metadata(); } pub fn remove(&mut self, segment_id: SegmentId) { self.segments.retain(|x| segment_id != x.metadata.id); - self.set_disjoint_flag(); - self.sort(); + self.update_metadata(); } pub(crate) fn sort(&mut self) { @@ -223,7 +234,7 @@ mod tests { descriptor_table::FileDescriptorTable, key_range::KeyRange, segment::{ - block_index::two_level_index::TwoLevelBlockIndex, + block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl}, file_offsets::FileOffsets, meta::{Metadata, SegmentId}, value_block::BlockOffset, @@ -241,10 +252,13 @@ mod tests { fn fixture_segment(id: SegmentId, key_range: KeyRange) -> Segment { let block_cache = Arc::new(BlockCache::with_capacity_bytes(10 * 1_024 * 1_024)); + let block_index = TwoLevelBlockIndex::new((0, id).into(), block_cache.clone()); + let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index)); + SegmentInner { tree_id: 0, descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)), - block_index: Arc::new(TwoLevelBlockIndex::new((0, id).into(), block_cache.clone())), + block_index, offsets: FileOffsets { bloom_ptr: BlockOffset(0), @@ -287,6 +301,7 @@ mod tests { fn level_disjoint_cull() { let level = Level { is_disjoint: true, + // key_range: KeyRange::empty(), segments: vec![ fixture_segment(0, KeyRange::new((Slice::from("a"), Slice::from("c")))), fixture_segment(1, KeyRange::new((Slice::from("d"), Slice::from("g")))), diff --git a/src/level_manifest/mod.rs b/src/level_manifest/mod.rs index 30395ac..248f27b 100644 --- a/src/level_manifest/mod.rs +++ b/src/level_manifest/mod.rs @@ -175,11 +175,24 @@ impl LevelManifest { Ok(levels) } - pub(crate) fn recover_ids>(path: P) -> crate::Result> { - Ok(Self::load_level_manifest(path)? - .into_iter() - .flatten() - .collect()) + pub(crate) fn recover_ids>( + path: P, + ) -> crate::Result> { + let manifest = Self::load_level_manifest(path)?; + let mut result = crate::HashMap::default(); + + for (level_idx, segment_ids) in manifest.into_iter().enumerate() { + for segment_id in segment_ids { + result.insert( + segment_id, + level_idx + .try_into() + .expect("there are less than 256 levels"), + ); + } + } + + Ok(result) } fn resolve_levels( diff --git a/src/segment/block_index/full_index.rs b/src/segment/block_index/full_index.rs new file mode 100644 index 0000000..6b9f69c --- /dev/null +++ b/src/segment/block_index/full_index.rs @@ -0,0 +1,85 @@ +use super::{block_handle::KeyedBlockHandle, BlockIndex}; +use crate::segment::{ + block_index::IndexBlock, + value_block::{BlockOffset, CachePolicy}, +}; +use std::{fs::File, io::Seek, path::Path}; + +/// Index that translates item keys to block handles +/// +/// The index is fully loaded into memory. +pub struct FullBlockIndex(Box<[KeyedBlockHandle]>); + +impl std::ops::Deref for FullBlockIndex { + type Target = Box<[KeyedBlockHandle]>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl FullBlockIndex { + pub fn from_file>( + path: P, + metadata: &crate::segment::meta::Metadata, + offsets: &crate::segment::file_offsets::FileOffsets, + ) -> crate::Result { + let path = path.as_ref(); + let cnt = metadata.index_block_count as usize; + + log::trace!( + "reading full block index from {path:?} at idx_ptr={} ({cnt} index blocks)", + offsets.index_block_ptr, + ); + + let mut file = File::open(path)?; + file.seek(std::io::SeekFrom::Start(*offsets.index_block_ptr))?; + + let mut block_handles = Vec::with_capacity(cnt); + + for _ in 0..cnt { + let idx_block = IndexBlock::from_reader(&mut file)?.items; + // TODO: 1.80? IntoIter impl for Box<[T]> + block_handles.extend(idx_block.into_vec()); + } + + debug_assert!(!block_handles.is_empty()); + + Ok(Self(block_handles.into_boxed_slice())) + } +} + +impl BlockIndex for FullBlockIndex { + fn get_lowest_block_containing_key( + &self, + key: &[u8], + _: CachePolicy, + ) -> crate::Result> { + use super::KeyedBlockIndex; + + self.0 + .get_lowest_block_containing_key(key, CachePolicy::Read) + .map(|x| x.map(|x| x.offset)) + } + + /// Gets the last block handle that may contain the given item + fn get_last_block_containing_key( + &self, + key: &[u8], + cache_policy: CachePolicy, + ) -> crate::Result> { + use super::KeyedBlockIndex; + + self.0 + .get_last_block_containing_key(key, cache_policy) + .map(|x| x.map(|x| x.offset)) + } + + fn get_last_block_handle(&self, _: CachePolicy) -> crate::Result { + use super::KeyedBlockIndex; + + self.0 + .get_last_block_handle(CachePolicy::Read) + .map(|x| x.offset) + } +} diff --git a/src/segment/block_index/mod.rs b/src/segment/block_index/mod.rs index 8465a12..67fcea2 100644 --- a/src/segment/block_index/mod.rs +++ b/src/segment/block_index/mod.rs @@ -3,16 +3,22 @@ // (found in the LICENSE-* files in the repository) pub mod block_handle; +pub mod full_index; pub mod top_level; pub mod two_level_index; pub mod writer; -use super::{block::Block, value_block::CachePolicy}; +use super::{ + block::Block, + value_block::{BlockOffset, CachePolicy}, +}; use block_handle::KeyedBlockHandle; +use full_index::FullBlockIndex; +use two_level_index::TwoLevelBlockIndex; pub type IndexBlock = Block; -impl BlockIndex for [KeyedBlockHandle] { +impl KeyedBlockIndex for [KeyedBlockHandle] { fn get_lowest_block_containing_key( &self, key: &[u8], @@ -54,7 +60,28 @@ impl BlockIndex for [KeyedBlockHandle] { } } +#[enum_dispatch::enum_dispatch] pub trait BlockIndex { + /// Gets the lowest block handle that may contain the given item + fn get_lowest_block_containing_key( + &self, + key: &[u8], + cache_policy: CachePolicy, + ) -> crate::Result>; + + /// Gets the last block handle that may contain the given item + fn get_last_block_containing_key( + &self, + key: &[u8], + cache_policy: CachePolicy, + ) -> crate::Result>; + + /// Returns a handle to the last block + fn get_last_block_handle(&self, cache_policy: CachePolicy) -> crate::Result; +} + +#[allow(clippy::module_name_repetitions)] +pub trait KeyedBlockIndex { /// Gets the lowest block handle that may contain the given item fn get_lowest_block_containing_key( &self, @@ -73,14 +100,18 @@ pub trait BlockIndex { fn get_last_block_handle(&self, cache_policy: CachePolicy) -> crate::Result<&KeyedBlockHandle>; } +#[enum_dispatch::enum_dispatch(BlockIndex)] +#[allow(clippy::module_name_repetitions)] +pub enum BlockIndexImpl { + Full(FullBlockIndex), + TwoLevel(TwoLevelBlockIndex), +} + #[cfg(test)] #[allow(clippy::expect_used)] mod tests { use super::*; - use crate::{ - segment::{block_index::BlockIndex, value_block::BlockOffset}, - Slice, - }; + use crate::{segment::value_block::BlockOffset, Slice}; use test_log::test; fn bh>(end_key: K, offset: BlockOffset) -> KeyedBlockHandle { diff --git a/src/segment/block_index/top_level.rs b/src/segment/block_index/top_level.rs index 71672bf..c37bbc3 100644 --- a/src/segment/block_index/top_level.rs +++ b/src/segment/block_index/top_level.rs @@ -2,7 +2,7 @@ // This source code is licensed under both the Apache 2.0 and MIT License // (found in the LICENSE-* files in the repository) -use super::{block_handle::KeyedBlockHandle, BlockIndex}; +use super::{block_handle::KeyedBlockHandle, KeyedBlockIndex}; use crate::segment::{ block_index::IndexBlock, value_block::{BlockOffset, CachePolicy}, @@ -32,27 +32,30 @@ use std::{fs::File, path::Path}; pub struct TopLevelIndex(Box<[KeyedBlockHandle]>); impl TopLevelIndex { - /// Creates a top-level block index - #[must_use] - pub fn from_boxed_slice(handles: Box<[KeyedBlockHandle]>) -> Self { - Self(handles) - } - - /// Loads a top-level index from disk - pub fn from_file>(path: P, offset: BlockOffset) -> crate::Result { + pub fn from_file>( + path: P, + _: &crate::segment::meta::Metadata, + tli_ptr: BlockOffset, + ) -> crate::Result { let path = path.as_ref(); - log::trace!("reading TLI from {path:?}, offset={offset}"); + + log::trace!("reading TLI from {path:?} at tli_ptr={tli_ptr}"); let mut file = File::open(path)?; + let items = IndexBlock::from_file(&mut file, tli_ptr)?.items; - let items = IndexBlock::from_file(&mut file, offset)?.items; log::trace!("loaded TLI ({path:?}): {items:?}"); - debug_assert!(!items.is_empty()); Ok(Self::from_boxed_slice(items)) } + /// Creates a top-level block index + #[must_use] + pub fn from_boxed_slice(handles: Box<[KeyedBlockHandle]>) -> Self { + Self(handles) + } + #[must_use] pub fn len(&self) -> usize { self.0.len() @@ -68,7 +71,7 @@ impl TopLevelIndex { } } -impl BlockIndex for TopLevelIndex { +impl KeyedBlockIndex for TopLevelIndex { fn get_lowest_block_containing_key( &self, key: &[u8], diff --git a/src/segment/block_index/two_level_index.rs b/src/segment/block_index/two_level_index.rs index c66649e..eaa0bc1 100644 --- a/src/segment/block_index/two_level_index.rs +++ b/src/segment/block_index/two_level_index.rs @@ -4,13 +4,13 @@ use super::{ super::{id::GlobalSegmentId, value_block::CachePolicy}, - block_handle::KeyedBlockHandle, top_level::TopLevelIndex, BlockIndex, IndexBlock, }; use crate::{ - block_cache::BlockCache, descriptor_table::FileDescriptorTable, - segment::value_block::BlockOffset, + block_cache::BlockCache, + descriptor_table::FileDescriptorTable, + segment::{meta::Metadata, value_block::BlockOffset}, }; use std::{path::Path, sync::Arc}; @@ -55,13 +55,37 @@ pub struct TwoLevelBlockIndex { index_block_fetcher: IndexBlockFetcher, } +impl BlockIndex for TwoLevelBlockIndex { + fn get_lowest_block_containing_key( + &self, + key: &[u8], + cache_policy: CachePolicy, + ) -> crate::Result> { + self.get_lowest_data_block_handle_containing_item(key, cache_policy) + } + + fn get_last_block_handle(&self, cache_policy: CachePolicy) -> crate::Result { + self.get_last_data_block_handle(cache_policy) + } + + fn get_last_block_containing_key( + &self, + key: &[u8], + cache_policy: CachePolicy, + ) -> crate::Result> { + self.get_last_data_block_handle_containing_item(key, cache_policy) + } +} + impl TwoLevelBlockIndex { /// Gets the lowest block handle that may contain the given item pub fn get_lowest_data_block_handle_containing_item( &self, key: &[u8], cache_policy: CachePolicy, - ) -> crate::Result> { + ) -> crate::Result> { + use super::KeyedBlockIndex; + let Some(index_block_handle) = self .top_level_index .get_lowest_block_containing_key(key, cache_policy) @@ -70,13 +94,17 @@ impl TwoLevelBlockIndex { return Ok(None); }; - let index_block = self.load_index_block(index_block_handle, cache_policy)?; + let index_block = self.load_index_block(index_block_handle.offset, cache_policy)?; - Ok(index_block - .items - .get_lowest_block_containing_key(key, cache_policy) - .expect("cannot fail") - .cloned()) + Ok({ + use super::KeyedBlockIndex; + + index_block + .items + .get_lowest_block_containing_key(key, cache_policy) + .expect("cannot fail") + .map(|x| x.offset) + }) } /// Gets the last block handle that may contain the given item @@ -84,7 +112,9 @@ impl TwoLevelBlockIndex { &self, key: &[u8], cache_policy: CachePolicy, - ) -> crate::Result> { + ) -> crate::Result> { + use super::KeyedBlockIndex; + let Some(index_block_handle) = self .top_level_index .get_last_block_containing_key(key, cache_policy) @@ -93,45 +123,48 @@ impl TwoLevelBlockIndex { return Ok(Some(self.get_last_data_block_handle(cache_policy)?)); }; - let index_block = self.load_index_block(index_block_handle, cache_policy)?; + let index_block = self.load_index_block(index_block_handle.offset, cache_policy)?; - Ok(index_block - .items - .get_last_block_containing_key(key, cache_policy) - .expect("cannot fail") - .cloned()) + Ok({ + use super::KeyedBlockIndex; + + index_block + .items + .get_last_block_containing_key(key, cache_policy) + .expect("cannot fail") + .map(|x| x.offset) + }) } pub fn get_last_data_block_handle( &self, cache_policy: CachePolicy, - ) -> crate::Result { + ) -> crate::Result { + use super::KeyedBlockIndex; + let index_block_handle = self .top_level_index .get_last_block_handle(cache_policy) .expect("cannot fail"); - let index_block = self.load_index_block(index_block_handle, cache_policy)?; + let index_block = self.load_index_block(index_block_handle.offset, cache_policy)?; Ok(index_block .items .last() .expect("index block should not be empty") - .clone()) + .offset) } /// Loads an index block from disk pub fn load_index_block( &self, - block_handle: &KeyedBlockHandle, + offset: BlockOffset, cache_policy: CachePolicy, ) -> crate::Result> { - log::trace!("loading index block {:?}/{block_handle:?}", self.segment_id); + log::trace!("loading index block {:?}/{offset:?}", self.segment_id); - if let Some(block) = self - .index_block_fetcher - .get(self.segment_id, block_handle.offset) - { + if let Some(block) = self.index_block_fetcher.get(self.segment_id, offset) { // Cache hit: Copy from block Ok(block) @@ -145,13 +178,13 @@ impl TwoLevelBlockIndex { let block = IndexBlock::from_file( &mut *file_guard.file.lock().expect("lock is poisoned"), - block_handle.offset, + offset, ) .map_err(|e| { log::error!( "Failed to load index block {:?}/{:?}: {e:?}", self.segment_id, - block_handle.offset + offset ); e })?; @@ -162,11 +195,8 @@ impl TwoLevelBlockIndex { let block = Arc::new(block); if cache_policy == CachePolicy::Write { - self.index_block_fetcher.insert( - self.segment_id, - block_handle.offset, - block.clone(), - ); + self.index_block_fetcher + .insert(self.segment_id, offset, block.clone()); } Ok(block) @@ -187,16 +217,17 @@ impl TwoLevelBlockIndex { } pub fn from_file>( - file_path: P, - offset: BlockOffset, + path: P, + metadata: &Metadata, + tli_ptr: BlockOffset, segment_id: GlobalSegmentId, descriptor_table: Arc, block_cache: Arc, ) -> crate::Result { - let file_path = file_path.as_ref(); + let file_path = path.as_ref(); log::trace!("Reading block index from {file_path:?}"); - let top_level_index = TopLevelIndex::from_file(file_path, offset)?; + let top_level_index = TopLevelIndex::from_file(file_path, metadata, tli_ptr)?; Ok(Self { descriptor_table, diff --git a/src/segment/inner.rs b/src/segment/inner.rs index 805c2f5..01cacdf 100644 --- a/src/segment/inner.rs +++ b/src/segment/inner.rs @@ -2,9 +2,7 @@ // This source code is licensed under both the Apache 2.0 and MIT License // (found in the LICENSE-* files in the repository) -use super::{ - block_index::two_level_index::TwoLevelBlockIndex, file_offsets::FileOffsets, meta::Metadata, -}; +use super::{block_index::BlockIndexImpl, file_offsets::FileOffsets, meta::Metadata}; use crate::{block_cache::BlockCache, descriptor_table::FileDescriptorTable, tree::inner::TreeId}; use std::sync::Arc; @@ -22,7 +20,7 @@ pub struct Inner { /// Translates key (first item of a block) to block offset (address inside file) and (compressed) size #[doc(hidden)] - pub block_index: Arc, + pub block_index: Arc, /// Block cache /// diff --git a/src/segment/mod.rs b/src/segment/mod.rs index 6df15bd..bd820ba 100644 --- a/src/segment/mod.rs +++ b/src/segment/mod.rs @@ -24,6 +24,7 @@ use crate::{ tree::inner::TreeId, value::{InternalValue, SeqNo, UserKey}, }; +use block_index::BlockIndexImpl; use id::GlobalSegmentId; use inner::Inner; use range::Range; @@ -32,6 +33,7 @@ use std::{ops::Bound, path::Path, sync::Arc}; #[cfg(feature = "bloom")] use crate::bloom::{BloomFilter, CompositeHash}; +#[allow(clippy::module_name_repetitions)] pub type SegmentInner = Inner; /// Disk segment (a.k.a. `SSTable`, `SST`, `sorted string table`) that is located on disk @@ -86,57 +88,98 @@ impl Segment { let mut file = guard.file.lock().expect("lock is poisoned"); - // NOTE: TODO: because of 1.74.0 - #[allow(clippy::explicit_iter_loop)] - for handle in self.block_index.top_level_index.iter() { - let block = match IndexBlock::from_file(&mut *file, handle.offset) { - Ok(v) => v, - Err(e) => { - log::error!( - "index block {handle:?} could not be loaded, it is probably corrupted: {e:?}" - ); - broken_count += 1; - continue; - } - }; - - for handle in &*block.items { - let value_block = match ValueBlock::from_file(&mut *file, handle.offset) { - Ok(v) => v, - Err(e) => { - log::error!( - "data block {handle:?} could not be loaded, it is probably corrupted: {e:?}" - ); + // TODO: maybe move to BlockIndexImpl::verify + match &*self.block_index { + BlockIndexImpl::Full(block_index) => { + for handle in block_index.iter() { + let value_block = match ValueBlock::from_file(&mut *file, handle.offset) { + Ok(v) => v, + Err(e) => { + log::error!( + "data block {handle:?} could not be loaded, it is probably corrupted: {e:?}" + ); + broken_count += 1; + data_block_count += 1; + continue; + } + }; + + let (_, data) = ValueBlock::to_bytes_compressed( + &value_block.items, + value_block.header.previous_block_offset, + value_block.header.compression, + )?; + let actual_checksum = Checksum::from_bytes(&data); + + if value_block.header.checksum != actual_checksum { + log::error!("{handle:?} is corrupted, invalid checksum value"); broken_count += 1; - data_block_count += 1; - continue; } - }; - - let (_, data) = ValueBlock::to_bytes_compressed( - &value_block.items, - value_block.header.previous_block_offset, - value_block.header.compression, - )?; - let actual_checksum = Checksum::from_bytes(&data); - - if value_block.header.checksum != actual_checksum { - log::error!("{handle:?} is corrupted, invalid checksum value"); - broken_count += 1; + + data_block_count += 1; + + if data_block_count % 1_000 == 0 { + log::debug!("Checked {data_block_count} data blocks"); + } } + } + BlockIndexImpl::TwoLevel(block_index) => { + // NOTE: TODO: because of 1.74.0 + #[allow(clippy::explicit_iter_loop)] + for handle in block_index.top_level_index.iter() { + let block = match IndexBlock::from_file(&mut *file, handle.offset) { + Ok(v) => v, + Err(e) => { + log::error!( + "index block {handle:?} could not be loaded, it is probably corrupted: {e:?}" + ); + broken_count += 1; + continue; + } + }; + + for handle in &*block.items { + let value_block = match ValueBlock::from_file(&mut *file, handle.offset) { + Ok(v) => v, + Err(e) => { + log::error!( + "data block {handle:?} could not be loaded, it is probably corrupted: {e:?}" + ); + broken_count += 1; + data_block_count += 1; + continue; + } + }; + + let (_, data) = ValueBlock::to_bytes_compressed( + &value_block.items, + value_block.header.previous_block_offset, + value_block.header.compression, + )?; + let actual_checksum = Checksum::from_bytes(&data); + + if value_block.header.checksum != actual_checksum { + log::error!("{handle:?} is corrupted, invalid checksum value"); + broken_count += 1; + } - data_block_count += 1; + data_block_count += 1; - if data_block_count % 1_000 == 0 { - log::debug!("Checked {data_block_count} data blocks"); + if data_block_count % 1_000 == 0 { + log::debug!("Checked {data_block_count} data blocks"); + } + } } } } - assert_eq!( - data_block_count, self.metadata.data_block_count, - "not all data blocks were visited" - ); + if data_block_count != self.metadata.data_block_count { + log::error!( + "Not all data blocks were visited during verification of disk segment {:?}", + self.metadata.id + ); + broken_count += 1; + } Ok(broken_count) } @@ -167,8 +210,9 @@ impl Segment { tree_id: TreeId, block_cache: Arc, descriptor_table: Arc, + use_full_block_index: bool, ) -> crate::Result { - use block_index::two_level_index::TwoLevelBlockIndex; + use block_index::{full_index::FullBlockIndex, two_level_index::TwoLevelBlockIndex}; use trailer::SegmentFileTrailer; let file_path = file_path.as_ref(); @@ -185,13 +229,23 @@ impl Segment { "Creating block index, with tli_ptr={}", trailer.offsets.tli_ptr ); - let block_index = TwoLevelBlockIndex::from_file( - file_path, - trailer.offsets.tli_ptr, - (tree_id, trailer.metadata.id).into(), - descriptor_table.clone(), - block_cache.clone(), - )?; + + let block_index = if use_full_block_index { + let block_index = + FullBlockIndex::from_file(file_path, &trailer.metadata, &trailer.offsets)?; + + BlockIndexImpl::Full(block_index) + } else { + let block_index = TwoLevelBlockIndex::from_file( + file_path, + &trailer.metadata, + trailer.offsets.tli_ptr, + (tree_id, trailer.metadata.id).into(), + descriptor_table.clone(), + block_cache.clone(), + )?; + BlockIndexImpl::TwoLevel(block_index) + }; #[cfg(feature = "bloom")] let bloom_ptr = trailer.offsets.bloom_ptr; @@ -253,6 +307,7 @@ impl Segment { seqno: Option, ) -> crate::Result> { use crate::{mvcc_stream::MvccStream, ValueType}; + use block_index::BlockIndex; use value_block::{CachePolicy, ValueBlock}; use value_block_consumer::ValueBlockConsumer; @@ -260,7 +315,7 @@ impl Segment { let Some(first_block_handle) = self .block_index - .get_lowest_data_block_handle_containing_item(key.as_ref(), CachePolicy::Write)? + .get_lowest_block_containing_key(key, CachePolicy::Write)? else { return Ok(None); }; @@ -269,7 +324,7 @@ impl Segment { &self.descriptor_table, &self.block_cache, GlobalSegmentId::from((self.tree_id, self.metadata.id)), - first_block_handle.offset, + first_block_handle, CachePolicy::Write, )? else { @@ -300,7 +355,7 @@ impl Segment { self.descriptor_table.clone(), GlobalSegmentId::from((self.tree_id, self.metadata.id)), self.block_cache.clone(), - first_block_handle.offset, + first_block_handle, None, ); reader.lo_block_size = block.header.data_length.into(); @@ -370,18 +425,18 @@ impl Segment { key: K, seqno: Option, ) -> crate::Result> { + let key = key.as_ref(); + if let Some(seqno) = seqno { if self.metadata.seqnos.0 >= seqno { return Ok(None); } } - if !self.is_key_in_key_range(&key) { + if !self.is_key_in_key_range(key) { return Ok(None); } - let key = key.as_ref(); - #[cfg(feature = "bloom")] if let Some(bf) = &self.bloom_filter { debug_assert!(false, "Use Segment::get_with_hash instead"); diff --git a/src/segment/range.rs b/src/segment/range.rs index 1a5169c..2c0af96 100644 --- a/src/segment/range.rs +++ b/src/segment/range.rs @@ -2,7 +2,8 @@ // This source code is licensed under both the Apache 2.0 and MIT License // (found in the LICENSE-* files in the repository) -use super::block_index::two_level_index::TwoLevelBlockIndex; +use super::block_index::BlockIndex; +use super::block_index::BlockIndexImpl; use super::id::GlobalSegmentId; use super::reader::Reader; use super::value_block::BlockOffset; @@ -17,7 +18,7 @@ use std::ops::RangeBounds; use std::sync::Arc; pub struct Range { - block_index: Arc, + block_index: Arc, lo_initialized: bool, hi_initialized: bool, @@ -33,7 +34,7 @@ impl Range { descriptor_table: Arc, segment_id: GlobalSegmentId, block_cache: Arc, - block_index: Arc, + block_index: Arc, range: (Bound, Bound), ) -> Self { let reader = Reader::new( @@ -69,9 +70,9 @@ impl Range { Bound::Included(start) | Bound::Excluded(start) => { if let Some(lower_bound) = self .block_index - .get_lowest_data_block_handle_containing_item(start, CachePolicy::Write)? + .get_lowest_block_containing_key(start, CachePolicy::Write)? { - self.reader.lo_block_offset = lower_bound.offset; + self.reader.lo_block_offset = lower_bound; } Some(start) @@ -90,20 +91,21 @@ impl Range { fn initialize_hi_bound(&mut self) -> crate::Result<()> { let end_key: Option<&Slice> = match self.range.end_bound() { Bound::Unbounded => { - let upper_bound = self - .block_index - .get_last_data_block_handle(CachePolicy::Write)?; + let upper_bound = self.block_index.get_last_block_handle(CachePolicy::Write)?; - self.reader.hi_block_offset = Some(upper_bound.offset); + self.reader.hi_block_offset = Some(upper_bound); None } Bound::Included(end) | Bound::Excluded(end) => { if let Some(upper_bound) = self .block_index - .get_last_data_block_handle_containing_item(end, CachePolicy::Write)? + .get_last_block_containing_key(end, CachePolicy::Write)? { - self.reader.hi_block_offset = Some(upper_bound.offset); + self.reader.hi_block_offset = Some(upper_bound); + } else { + self.reader.hi_block_offset = + Some(self.block_index.get_last_block_handle(CachePolicy::Write)?); } Some(end) @@ -233,7 +235,7 @@ mod tests { block_cache::BlockCache, descriptor_table::FileDescriptorTable, segment::{ - block_index::two_level_index::TwoLevelBlockIndex, + block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl}, range::Range, writer::{Options, Writer}, }, @@ -284,13 +286,15 @@ mod tests { table.insert(&segment_file_path, (0, 0).into()); let block_cache = Arc::new(BlockCache::with_capacity_bytes(10 * 1_024 * 1_024)); - let block_index = Arc::new(TwoLevelBlockIndex::from_file( + let block_index = TwoLevelBlockIndex::from_file( segment_file_path, + &trailer.metadata, trailer.offsets.tli_ptr, (0, 0).into(), table.clone(), block_cache.clone(), - )?); + )?; + let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index)); let iter = Range::new( trailer.offsets.index_block_ptr, @@ -382,13 +386,15 @@ mod tests { table.insert(&segment_file_path, (0, 0).into()); let block_cache = Arc::new(BlockCache::with_capacity_bytes(10 * 1_024 * 1_024)); - let block_index = Arc::new(TwoLevelBlockIndex::from_file( + let block_index = TwoLevelBlockIndex::from_file( segment_file_path, + &trailer.metadata, trailer.offsets.tli_ptr, (0, 0).into(), table.clone(), block_cache.clone(), - )?); + )?; + let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index)); { let mut iter = Range::new( @@ -581,13 +587,15 @@ mod tests { table.insert(&segment_file_path, (0, 0).into()); let block_cache = Arc::new(BlockCache::with_capacity_bytes(10 * 1_024 * 1_024)); - let block_index = Arc::new(TwoLevelBlockIndex::from_file( + let block_index = TwoLevelBlockIndex::from_file( segment_file_path, + &trailer.metadata, trailer.offsets.tli_ptr, (0, 0).into(), table.clone(), block_cache.clone(), - )?); + )?; + let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index)); let ranges: Vec<(Bound, Bound)> = vec![ range_bounds_to_tuple(&(0..1_000)), @@ -683,13 +691,15 @@ mod tests { table.insert(&segment_file_path, (0, 0).into()); let block_cache = Arc::new(BlockCache::with_capacity_bytes(10 * 1_024 * 1_024)); - let block_index = Arc::new(TwoLevelBlockIndex::from_file( + let block_index = TwoLevelBlockIndex::from_file( segment_file_path, + &trailer.metadata, trailer.offsets.tli_ptr, (0, 0).into(), table.clone(), block_cache.clone(), - )?); + )?; + let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index)); for (i, &start_char) in chars.iter().enumerate() { for &end_char in chars.iter().skip(i + 1) { diff --git a/src/segment/writer/mod.rs b/src/segment/writer/mod.rs index 6813791..e7181cf 100644 --- a/src/segment/writer/mod.rs +++ b/src/segment/writer/mod.rs @@ -457,7 +457,12 @@ mod tests { // the TLI length fits into u32 as well #[allow(clippy::cast_possible_truncation)] { - let tli = TopLevelIndex::from_file(&segment_file_path, trailer.offsets.tli_ptr)?; + let tli = TopLevelIndex::from_file( + &segment_file_path, + &trailer.metadata, + trailer.offsets.tli_ptr, + )?; + assert_eq!(tli.len() as u32, trailer.metadata.index_block_count); } diff --git a/src/tree/mod.rs b/src/tree/mod.rs index dc1e245..05a95ef 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -14,7 +14,9 @@ use crate::{ memtable::Memtable, range::{prefix_to_range, MemtableLockGuard, TreeIter}, segment::{ - block_index::two_level_index::TwoLevelBlockIndex, meta::TableType, Segment, SegmentInner, + block_index::{full_index::FullBlockIndex, BlockIndexImpl}, + meta::TableType, + Segment, SegmentInner, }, stop_signal::StopSignal, value::InternalValue, @@ -504,13 +506,9 @@ impl Tree { log::debug!("Finalized segment write at {segment_folder:?}"); - let block_index = Arc::new(TwoLevelBlockIndex::from_file( - &segment_file_path, - trailer.offsets.tli_ptr, - (self.id, segment_id).into(), - self.config.descriptor_table.clone(), - self.config.block_cache.clone(), - )?); + let block_index = + FullBlockIndex::from_file(&segment_file_path, &trailer.metadata, &trailer.offsets)?; + let block_index = Arc::new(BlockIndexImpl::Full(block_index)); #[cfg(feature = "bloom")] let bloom_ptr = trailer.offsets.bloom_ptr; @@ -703,6 +701,7 @@ impl Tree { } return Ok(Some(entry)); }; + drop(memtable_lock); // Now look in sealed memtables @@ -915,8 +914,8 @@ impl Tree { let level_manifest_path = tree_path.join(LEVELS_MANIFEST_FILE); - let segment_ids_to_recover = LevelManifest::recover_ids(&level_manifest_path)?; - let cnt = segment_ids_to_recover.len(); + let segment_id_map = LevelManifest::recover_ids(&level_manifest_path)?; + let cnt = segment_id_map.len(); log::debug!("Recovering {cnt} disk segments from {tree_path:?}"); @@ -961,12 +960,13 @@ impl Tree { crate::Error::Unrecoverable })?; - if segment_ids_to_recover.contains(&segment_id) { + if let Some(&level_idx) = segment_id_map.get(&segment_id) { let segment = Segment::recover( &segment_file_path, tree_id, block_cache.clone(), descriptor_table.clone(), + level_idx == 0 || level_idx == 1, )?; descriptor_table.insert(&segment_file_path, (tree_id, segment.metadata.id).into()); @@ -983,8 +983,11 @@ impl Tree { } } - if segments.len() < segment_ids_to_recover.len() { - log::error!("Recovered less segments than expected: {segment_ids_to_recover:?}"); + if segments.len() < cnt { + log::error!( + "Recovered less segments than expected: {:?}", + segment_id_map.keys(), + ); return Err(crate::Error::Unrecoverable); } diff --git a/tests/segment_range_oob.rs b/tests/segment_range_oob.rs new file mode 100644 index 0000000..454fae0 --- /dev/null +++ b/tests/segment_range_oob.rs @@ -0,0 +1,62 @@ +use lsm_tree::{AbstractTree, Config}; +use test_log::test; + +const ITEM_COUNT: usize = 100; + +#[test] +fn segment_range_out_of_bounds_lo() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?.into_path(); + + let tree = Config::new(folder) + .data_block_size(1_024) + .index_block_size(1_024) + .open()?; + + for key in ('h'..='o').map(|c| c.to_string()) { + let value = nanoid::nanoid!(); + tree.insert(key, value.as_bytes(), 0); + } + tree.flush_active_memtable(0)?; + + assert_eq!(4, tree.range(..="k").count()); + assert_eq!(4, tree.range(..="k").rev().count()); + + assert_eq!(4, tree.range("0"..="k").count()); + assert_eq!(4, tree.range("0"..="k").rev().count()); + + Ok(()) +} + +#[test] +fn segment_range_out_of_bounds_hi() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?.into_path(); + + let tree = Config::new(folder) + .data_block_size(1_024) + .index_block_size(1_024) + .open()?; + + for x in 0..ITEM_COUNT as u64 { + let key = x.to_be_bytes(); + let value = nanoid::nanoid!(); + tree.insert(key, value.as_bytes(), 0); + } + tree.flush_active_memtable(0)?; + + assert_eq!(50, tree.range((50u64.to_be_bytes())..).count()); + assert_eq!(50, tree.range((50u64.to_be_bytes())..).rev().count()); + + assert_eq!( + 50, + tree.range((50u64.to_be_bytes())..(150u64.to_be_bytes())) + .count() + ); + assert_eq!( + 50, + tree.range((50u64.to_be_bytes())..(150u64.to_be_bytes())) + .rev() + .count() + ); + + Ok(()) +}