Skip to content

Commit

Permalink
Merge pull request #76 from fjall-rs/perf/lazy-range-eval
Browse files Browse the repository at this point in the history
Lazy range bounds evaluation
  • Loading branch information
marvin-j97 authored Nov 21, 2024
2 parents b5b39e7 + f5e2ccb commit 540089e
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 43 deletions.
42 changes: 31 additions & 11 deletions benches/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,42 @@ use lsm_tree::{InternalValue, Memtable};
use nanoid::nanoid;

fn memtable_get_upper_bound(c: &mut Criterion) {
let memtable = Memtable::default();
c.bench_function("memtable get", |b| {
let memtable = Memtable::default();

for _ in 0..1_000_000 {
memtable.insert(InternalValue::from_components(
format!("abc_{}", nanoid!()).as_bytes(),
vec![],
0,
lsm_tree::ValueType::Value,
));
}
for _ in 0..1_000_000 {
memtable.insert(InternalValue::from_components(
format!("abc_{}", nanoid!()).as_bytes(),
vec![],
0,
lsm_tree::ValueType::Value,
));
}

c.bench_function("memtable get", |b| {
b.iter(|| {
memtable.get("abc", None);
});
});
}
criterion_group!(benches, memtable_get_upper_bound);

fn memtable_highest_seqno(c: &mut Criterion) {
c.bench_function("memtable highest seqno", |b| {
let memtable = Memtable::default();

for x in 0..100_000 {
memtable.insert(InternalValue::from_components(
format!("abc_{}", nanoid!()).as_bytes(),
vec![],
x,
lsm_tree::ValueType::Value,
));
}

b.iter(|| {
assert_eq!(Some(99_999), memtable.get_highest_seqno());
});
});
}

criterion_group!(benches, memtable_get_upper_bound, memtable_highest_seqno);
criterion_main!(benches);
35 changes: 24 additions & 11 deletions src/memtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,33 @@ use crate::segment::block::ItemSize;
use crate::value::{InternalValue, SeqNo, UserValue, ValueType};
use crossbeam_skiplist::SkipMap;
use std::ops::RangeBounds;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::{AtomicU32, AtomicU64};

/// The memtable serves as an intermediary storage for new items
/// The memtable serves as an intermediary, ephemeral, sorted storage for new items
///
/// When the Memtable exceeds some size, it should be flushed to a disk segment.
#[derive(Default)]
pub struct Memtable {
/// The actual content, stored in a lock-free skiplist.
#[doc(hidden)]
pub items: SkipMap<InternalKey, UserValue>,

/// Approximate active memtable size
/// Approximate active memtable size.
///
/// If this grows too large, a flush is triggered
/// If this grows too large, a flush is triggered.
pub(crate) approximate_size: AtomicU32,

/// Highest encountered sequence number.
///
/// This is used so that `get_highest_seqno` has O(1) complexity.
pub(crate) highest_seqno: AtomicU64,
}

impl Memtable {
/// Clears the memtable.
pub fn clear(&mut self) {
self.items.clear();
self.highest_seqno = AtomicU64::new(0);
self.approximate_size
.store(0, std::sync::atomic::Ordering::Release);
}
Expand Down Expand Up @@ -126,18 +135,22 @@ impl Memtable {
let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type);
self.items.insert(key, item.value);

self.highest_seqno
.fetch_max(item.key.seqno, std::sync::atomic::Ordering::AcqRel);

(item_size, size_before + item_size)
}

/// Returns the highest sequence number in the memtable.
pub fn get_highest_seqno(&self) -> Option<SeqNo> {
self.items
.iter()
.map(|x| {
let key = x.key();
key.seqno
})
.max()
if self.is_empty() {
None
} else {
Some(
self.highest_seqno
.load(std::sync::atomic::Ordering::Acquire),
)
}
}
}

Expand Down
35 changes: 14 additions & 21 deletions src/segment/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use std::sync::Arc;
pub struct Range {
block_index: Arc<TwoLevelBlockIndex>,

is_initialized: bool,
lo_initialized: bool,
hi_initialized: bool,

pub(crate) range: (Bound<UserKey>, Bound<UserKey>),

Expand All @@ -45,7 +46,8 @@ impl Range {
);

Self {
is_initialized: false,
lo_initialized: false,
hi_initialized: false,

block_index,

Expand Down Expand Up @@ -75,9 +77,13 @@ impl Range {
Some(start)
}
};

if let Some(key) = start_key.cloned() {
self.reader.set_lower_bound(key);
}

self.lo_initialized = true;

Ok(())
}

Expand Down Expand Up @@ -107,19 +113,8 @@ impl Range {
if let Some(key) = end_key.cloned() {
self.reader.set_upper_bound(key);
}
Ok(())
}

fn initialize(&mut self) -> crate::Result<()> {
// TODO: can we skip searching for lower bound until next is called at least once...?
// would make short ranges 1.5-2x faster (if cache miss) if only one direction is used
self.initialize_lo_bound()?;

// TODO: can we skip searching for upper bound until next_back is called at least once...?
// would make short ranges 1.5-2x faster (if cache miss) if only one direction is used
self.initialize_hi_bound()?;

self.is_initialized = true;
self.hi_initialized = true;

Ok(())
}
Expand All @@ -129,8 +124,8 @@ impl Iterator for Range {
type Item = crate::Result<InternalValue>;

fn next(&mut self) -> Option<Self::Item> {
if !self.is_initialized {
if let Err(e) = self.initialize() {
if !self.lo_initialized {
if let Err(e) = self.initialize_lo_bound() {
return Some(Err(e));
};
}
Expand Down Expand Up @@ -182,16 +177,14 @@ impl Iterator for Range {

impl DoubleEndedIterator for Range {
fn next_back(&mut self) -> Option<Self::Item> {
if !self.is_initialized {
if let Err(e) = self.initialize() {
if !self.hi_initialized {
if let Err(e) = self.initialize_hi_bound() {
return Some(Err(e));
};
}

loop {
let entry_result = self.reader.next_back()?;

match entry_result {
match self.reader.next_back()? {
Ok(entry) => {
match self.range.start_bound() {
Bound::Included(start) => {
Expand Down
80 changes: 80 additions & 0 deletions tests/segment_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,83 @@ fn segment_ranges() -> lsm_tree::Result<()> {

Ok(())
}

#[test]
fn segment_range_last_back() -> lsm_tree::Result<()> {
let folder = tempfile::tempdir()?.into_path();

let tree = Config::new(folder)
.data_block_size(1_024)
.index_block_size(1_024)
.open()?;

let value = (0..2_000).map(|_| 0).collect::<Vec<u8>>();

for x in 0..10_u64 {
let key = x.to_be_bytes();
tree.insert(key, &value, 0);
}
tree.flush_active_memtable(0)?;

let iter = tree.range(0u64.to_be_bytes()..10u64.to_be_bytes());
assert_eq!(10, iter.count());

let iter = tree.range(0u64.to_be_bytes()..10u64.to_be_bytes());
assert_eq!(10, iter.rev().count());

let mut iter = tree.range(0u64.to_be_bytes()..5u64.to_be_bytes());

assert_eq!(0u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0);
assert_eq!(1u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0);
assert_eq!(2u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0);
assert_eq!(3u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0);
assert_eq!(4u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0);
assert!(iter.next_back().is_none());

Ok(())
}

#[test]
fn segment_range_last_back_2() -> lsm_tree::Result<()> {
let folder = tempfile::tempdir()?.into_path();

let tree = Config::new(folder)
.data_block_size(1_024)
.index_block_size(1_024)
.open()?;

let value = (0..2_000).map(|_| 0).collect::<Vec<u8>>();

for x in 0..10_u64 {
let key = x.to_be_bytes();
tree.insert(key, &value, 0);
}
tree.insert(10u64.to_be_bytes(), [], 0);
tree.insert(11u64.to_be_bytes(), [], 0);
tree.flush_active_memtable(0)?;

let iter = tree.range(0u64.to_be_bytes()..10u64.to_be_bytes());
assert_eq!(10, iter.count());

let iter = tree.range(0u64.to_be_bytes()..10u64.to_be_bytes());
assert_eq!(10, iter.rev().count());

let mut iter = tree.range(0u64.to_be_bytes()..12u64.to_be_bytes());

assert_eq!(0u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0);
assert_eq!(1u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0);
assert_eq!(2u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0);
assert_eq!(3u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0);
assert_eq!(4u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0);
assert_eq!(5u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0);
assert_eq!(6u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0);
assert_eq!(7u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0);
assert_eq!(8u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0);
assert_eq!(9u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0);
assert_eq!(10u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0);
assert_eq!(11u64.to_be_bytes(), &*iter.next_back().unwrap().unwrap().0);
assert!(iter.next().is_none());
assert!(iter.next_back().is_none());

Ok(())
}

0 comments on commit 540089e

Please sign in to comment.