diff --git a/skiplist/src/list.rs b/skiplist/src/list.rs index 88fed7e9..6db6d0da 100644 --- a/skiplist/src/list.rs +++ b/skiplist/src/list.rs @@ -422,7 +422,7 @@ where impl>, C: KeyComparator> IterRef { pub fn valid(&self) -> bool { - !self.cursor.is_null() + !(self.cursor.is_null() || self.cursor == self.list.as_ref().inner.head.as_ptr()) } pub fn key(&self) -> &Bytes { @@ -436,7 +436,13 @@ impl>, C: KeyComparator> IterRef { } pub fn next(&mut self) { - assert!(self.valid()); + if self.cursor.is_null() { + return; + } else if self.cursor == self.list.as_ref().inner.head.as_ptr() { + self.seek_to_first(); + return; + } + unsafe { let cursor_offset = (&*self.cursor).next_offset(0); self.cursor = self.list.as_ref().inner.arena.get_mut(cursor_offset); @@ -444,20 +450,27 @@ impl>, C: KeyComparator> IterRef { } pub fn prev(&mut self) { - assert!(self.valid()); + if self.cursor.is_null() { + self.seek_to_last(); + return; + } else if self.cursor == self.list.as_ref().inner.head.as_ptr() { + return; + } + if self.list.as_ref().allow_concurrent_write { unsafe { - self.cursor = self.list.as_ref().find_near(self.key(), true, false); + let node = self.list.as_ref().find_near(self.key(), true, false); + if node.is_null() { + self.cursor = self.list.as_ref().inner.head.as_ptr(); + } else { + self.cursor = node + } } } else { unsafe { let prev_offset = (*self.cursor).prev.load(Ordering::Acquire); let node = self.list.as_ref().inner.arena.get_mut(prev_offset); - if node != self.list.as_ref().inner.head.as_ptr() { - self.cursor = node; - } else { - self.cursor = ptr::null(); - } + self.cursor = node; } } } diff --git a/src/db.rs b/src/db.rs index c1c04a65..d35f1fda 100644 --- a/src/db.rs +++ b/src/db.rs @@ -272,6 +272,7 @@ impl Core { false } + /// Get the value for the given key with specified version. pub(crate) fn get(&self, key: &Bytes) -> Result { if self.is_closed() { return Err(Error::DBClosed); diff --git a/src/iterator.rs b/src/iterator.rs index 64b169c2..6c89c7ec 100644 --- a/src/iterator.rs +++ b/src/iterator.rs @@ -190,6 +190,22 @@ impl AgateIterator for SkiplistIterator { fn valid(&self) -> bool { self.skl_iter.valid() } + + fn prev(&mut self) { + if !self.reversed { + self.skl_iter.prev(); + } else { + self.skl_iter.next(); + } + } + + fn to_last(&mut self) { + if !self.reversed { + self.skl_iter.seek_to_last(); + } else { + self.skl_iter.seek_to_first(); + } + } } pub struct Iterator<'a> { @@ -199,9 +215,8 @@ pub struct Iterator<'a> { pub(crate) opt: IteratorOptions, item: Option, - - /// Used to skip over multiple versions of the same key. - last_key: BytesMut, + // If false, last operation is prev. + current_direction: bool, } impl Transaction { @@ -247,7 +262,7 @@ impl Transaction { read_ts: self.read_ts, opt: opt.clone(), item: None, - last_key: BytesMut::new(), + current_direction: true, } } } @@ -274,6 +289,12 @@ impl<'a> Iterator<'a> { // Advances the iterator by one. pub fn next(&mut self) { + if !self.current_direction { + self.table_iter.next(); + self.table_iter.next(); + } + self.current_direction = true; + while self.table_iter.valid() { if self.parse_item() { return; @@ -283,6 +304,22 @@ impl<'a> Iterator<'a> { self.item = None; } + pub fn prev(&mut self) { + if self.current_direction { + self.table_iter.prev(); + self.table_iter.prev(); + } + self.current_direction = false; + + while self.table_iter.valid() { + if self.parse_item_for_prev() { + return; + } + } + + self.item = None; + } + /// Handles both forward and reverse iteration implementation. We store keys such that /// their versions are sorted in descending order. This makes forward iteration /// efficient, but revese iteration complicated. This tradeoff is better because @@ -319,17 +356,23 @@ impl<'a> Iterator<'a> { // If iterating in forward direction, then just checking the last key against // current key would be sufficient. if !self.opt.reverse { - if crate::util::same_key(&self.last_key, key) { - self.table_iter.next(); - return false; - } + self.table_iter.prev(); + // Only track in forward direction. // We should update last_key as soon as we find a different key in our snapshot. // Consider keys: a 5, b 7 (del), b 5. When iterating, last_key = a. // Then we see b 7, which is deleted. If we don't store last_key = b, we'll then // return b 5, which is wrong. Therefore, update last_key here. - self.last_key.clear(); - self.last_key.extend_from_slice(key); + if self.table_iter.valid() + && crate::util::same_key(self.table_iter.key(), key) + && get_ts(self.table_iter.key()) <= self.read_ts + { + self.table_iter.next(); + self.table_iter.next(); + return false; + } + + self.table_iter.next(); } loop { @@ -340,7 +383,12 @@ impl<'a> Iterator<'a> { } let mut item = Item::new(self.txn.core.clone()); - Self::fill_item(&mut item, key, &self.table_iter.value(), &self.opt); + Self::fill_item( + &mut item, + self.table_iter.key(), + &self.table_iter.value(), + &self.opt, + ); self.table_iter.next(); if !self.opt.reverse || !self.table_iter.valid() { @@ -350,11 +398,92 @@ impl<'a> Iterator<'a> { // Reverse direction. let next_ts = get_ts(self.table_iter.key()); - let key_without_ts = user_key(self.table_iter.key()); - if next_ts <= self.read_ts && key_without_ts == item.key { + if next_ts <= self.read_ts && crate::util::same_key(self.table_iter.key(), key) { + // This is a valid potential candidate. + continue; + } + + // Ignore the next candidate. Return the current one. + self.item = Some(item); + return true; + } + } + + /// Just like `parse_item`, but used for `prev`. + /// + /// This function advances the iterator. + fn parse_item_for_prev(&mut self) -> bool { + #[allow(clippy::unnecessary_to_owned)] + let key: &[u8] = &self.table_iter.key().to_owned(); + + // Skip Agate keys. + if !self.opt.internal_access && key.starts_with(AGATE_PREFIX) { + self.table_iter.prev(); + return false; + } + + // Skip any versions which are beyond the read_ts. + let version = get_ts(key); + if version > self.read_ts { + self.table_iter.prev(); + return false; + } + + if self.opt.all_versions { + // Return deleted or expired values also, otherwise user can't figure out + // whether the key was deleted. + let mut item = Item::new(self.txn.core.clone()); + Self::fill_item(&mut item, key, &self.table_iter.value(), &self.opt); + self.item = Some(item); + self.table_iter.prev(); + return true; + } + + // If iterating in *reverse* direction, then just checking the last key against + // current key would be sufficient. + if self.opt.reverse { + self.table_iter.next(); + + if self.table_iter.valid() + && crate::util::same_key(self.table_iter.key(), key) + && get_ts(self.table_iter.key()) <= self.read_ts + { + self.table_iter.prev(); + self.table_iter.prev(); + return false; + } + + self.table_iter.prev(); + } + + loop { + let vs = self.table_iter.value(); + if is_deleted_or_expired(vs.meta, vs.expires_at) { + self.table_iter.prev(); + return false; + } + + let mut item = Item::new(self.txn.core.clone()); + Self::fill_item( + &mut item, + self.table_iter.key(), + &self.table_iter.value(), + &self.opt, + ); + + self.table_iter.prev(); + if self.opt.reverse || !self.table_iter.valid() { + self.item = Some(item); + return true; + } + + // Forward direction. + let next_ts = get_ts(self.table_iter.key()); + if next_ts <= self.read_ts && crate::util::same_key(self.table_iter.key(), key) { // This is a valid potential candidate. continue; } + // Ignore the next candidate. Return the current one. self.item = Some(item); return true; @@ -382,28 +511,33 @@ impl<'a> Iterator<'a> { /// smallest key greater than the provided key if iterating in the forward direction. /// Behavior would be reversed if iterating backwards. pub fn seek(&mut self, key: &Bytes) { + self.current_direction = true; + if !key.is_empty() { self.txn.add_read_key(key); } - self.last_key.clear(); - // TODO: Prefix. if key.is_empty() { self.table_iter.rewind(); - self.next(); - return; + } else { + let key = if !self.opt.reverse { + key_with_ts(BytesMut::from(&key[..]), self.read_ts) + } else { + key_with_ts(BytesMut::from(&key[..]), 0) + }; + + self.table_iter.seek(&key); } - let key = if !self.opt.reverse { - key_with_ts(BytesMut::from(&key[..]), self.txn.read_ts) - } else { - key_with_ts(BytesMut::from(&key[..]), 0) - }; + while self.table_iter.valid() { + if self.parse_item() { + return; + } + } - self.table_iter.seek(&key); - self.next(); + self.item = None; } /// Rewinds the iterator cursor all the way to zero-th position, which would be the @@ -412,6 +546,23 @@ impl<'a> Iterator<'a> { pub fn rewind(&mut self) { self.seek(&Bytes::new()); } + + #[allow(clippy::wrong_self_convention)] + pub fn to_last(&mut self) { + self.current_direction = true; + + // TODO: Re-examine this. + + self.table_iter.to_last(); + + while self.table_iter.valid() { + if self.parse_item_for_prev() { + return; + } + } + + self.item = None; + } } impl<'a> Drop for Iterator<'a> { @@ -419,3 +570,108 @@ impl<'a> Drop for Iterator<'a> { self.txn.num_iterators.fetch_sub(1, Ordering::SeqCst); } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + db::tests::*, + entry::Entry, + util::{make_comparator, test::check_iterator_out_of_bound}, + }; + + #[test] + fn test_skl_iterator_out_of_bound() { + let n = 100; + let skl = Skiplist::with_capacity(make_comparator(), 4 * 1024 * 1024, true); + + for i in 0..n { + let key = key_with_ts(format!("{:012x}", i).as_str(), 0); + skl.put(key, Bytes::new()); + } + + let iter = TableIterators::from(SkiplistIterator::new(skl.iter(), false)); + check_iterator_out_of_bound(iter, n, false); + + let iter = TableIterators::from(SkiplistIterator::new(skl.iter(), true)); + check_iterator_out_of_bound(iter, n, true); + } + + #[test] + fn test_iterator() { + run_agate_test(None, |agate| { + let n = 100; + + let key = |i| BytesMut::from(format!("key-{:012x}", i).as_bytes()); + let value = |i| Bytes::from(format!("value-{:012x}", i)); + + let mut txn = agate.new_transaction(true); + + for i in 0..n { + txn.set_entry(Entry::new(key(i).freeze(), value(i))) + .unwrap(); + } + + let check = |txn: &Transaction, reversed: bool| { + let mut iter = txn.new_iterator(&IteratorOptions { + reverse: reversed, + ..Default::default() + }); + + iter.rewind(); + + // test iterate + for i in 0..n { + assert!(iter.valid()); + if !reversed { + assert_eq!(iter.item().key, key(i)); + } else { + assert_eq!(iter.item().key, key(n - i - 1)); + } + iter.next(); + } + assert!(!iter.valid()); + + // test seek + for i in 10..n - 10 { + iter.seek(&key(i).freeze()); + + for j in 0..10 { + if !reversed { + assert_eq!(iter.item().key, key(i + j)); + } else { + assert_eq!(iter.item().key, key(i - j)); + } + iter.next(); + } + } + + // test prev + for i in 10..n - 10 { + iter.seek(&key(i).freeze()); + + for j in 0..10 { + if !reversed { + assert_eq!(iter.item().key, key(i - j)); + } else { + assert_eq!(iter.item().key, key(i + j)); + } + iter.prev(); + } + } + + // test to_last + iter.to_last(); + if !reversed { + assert_eq!(iter.item().key, key(n - 1)); + } else { + assert_eq!(iter.item().key, key(0)); + } + }; + + check(&txn, false); + + check(&txn, true); + }); + } +} diff --git a/src/iterator_trait.rs b/src/iterator_trait.rs index d9bd45d9..72ba0a24 100644 --- a/src/iterator_trait.rs +++ b/src/iterator_trait.rs @@ -13,4 +13,6 @@ pub trait AgateIterator { fn key(&self) -> &[u8]; fn value(&self) -> Value; fn valid(&self) -> bool; + fn prev(&mut self); + fn to_last(&mut self); } diff --git a/src/ops/transaction.rs b/src/ops/transaction.rs index 2fefaa3a..687da34c 100644 --- a/src/ops/transaction.rs +++ b/src/ops/transaction.rs @@ -7,15 +7,13 @@ use std::{ }; use bytes::{Bytes, BytesMut}; -use skiplist::KeyComparator; +use crate::util::default_hash; use crate::{ entry::Entry, format::{append_ts, user_key}, iterator::{is_deleted_or_expired, Item}, - key_with_ts, - util::{default_hash, COMPARATOR}, - Agate, AgateIterator, Error, Result, Value, + key_with_ts, Agate, AgateIterator, Error, Result, Value, }; const MAX_KEY_LENGTH: usize = 65000; @@ -86,7 +84,8 @@ impl Transaction { // As each entry saves key / value as Bytes, there will only be overhead of pointer clone. let mut entries: Vec<_> = self.pending_writes.values().cloned().collect(); entries.sort_by(|x, y| { - let cmp = COMPARATOR.compare_key(&x.key, &y.key); + // Note, we should not use COMPARATOR when compare without ts. + let cmp = (&x.key).cmp(&y.key); if reversed { cmp.reverse() } else { @@ -213,6 +212,7 @@ impl Transaction { if is_deleted_or_expired(entry.meta, entry.expires_at) { return Err(Error::KeyNotFound(())); } + item.meta = entry.meta; item.set_value(entry.value.clone()); item.user_meta = entry.user_meta; @@ -443,8 +443,12 @@ impl PendingWritesIterator { impl AgateIterator for PendingWritesIterator { fn next(&mut self) { - self.next_idx += 1; - self.update_key(); + if self.next_idx == std::usize::MAX { + self.rewind(); + } else if self.next_idx < self.entries.len() { + self.next_idx += 1; + self.update_key(); + } } fn rewind(&mut self) { @@ -487,7 +491,24 @@ impl AgateIterator for PendingWritesIterator { } fn valid(&self) -> bool { - self.next_idx < self.entries.len() + self.next_idx != std::usize::MAX && self.next_idx < self.entries.len() + } + + fn prev(&mut self) { + if self.next_idx == std::usize::MAX { + } else if self.next_idx > 0 { + self.next_idx -= 1; + self.update_key(); + } else { + self.next_idx = std::usize::MAX; + } + } + + fn to_last(&mut self) { + if !self.entries.is_empty() { + self.next_idx = self.entries.len() - 1; + self.update_key(); + } } } @@ -558,3 +579,56 @@ impl Agate { Ok(()) } } + +#[cfg(test)] +mod tests { + use crate::util::test::{check_iterator_normal_operation, check_iterator_out_of_bound}; + + use super::*; + + #[test] + fn test_pending_writes_iterator() { + let n = 100; + + let mut entries = Vec::new(); + + let key = |i| BytesMut::from(format!("{:012x}", i).as_bytes()); + let value = |i| Bytes::from(format!("{:012x}", i)); + + for i in 0..n { + let entry = Entry::new(key(i).freeze(), value(i)); + entries.push(entry); + } + + let iter = PendingWritesIterator::new(0, false, entries.clone()); + check_iterator_normal_operation(iter, n, false); + + entries.reverse(); + + let iter = PendingWritesIterator::new(0, true, entries); + check_iterator_normal_operation(iter, n, true); + } + + #[test] + fn test_pending_writes_iterator_out_of_bound() { + let n = 100; + + let mut entries = Vec::new(); + + let key = |i| BytesMut::from(format!("{:012x}", i).as_bytes()); + let value = |i| Bytes::from(format!("{:012x}", i)); + + for i in 0..n { + let entry = Entry::new(key(i).freeze(), value(i)); + entries.push(entry); + } + + let iter = PendingWritesIterator::new(0, false, entries.clone()); + check_iterator_out_of_bound(iter, n, false); + + entries.reverse(); + + let iter = PendingWritesIterator::new(0, true, entries); + check_iterator_out_of_bound(iter, n, true); + } +} diff --git a/src/ops/transaction_test.rs b/src/ops/transaction_test.rs index e5cab415..1bd6006a 100644 --- a/src/ops/transaction_test.rs +++ b/src/ops/transaction_test.rs @@ -151,6 +151,34 @@ mod normal_db { Agate, AgateOptions, }; + fn check_iterator(mut it: Iterator, expectd: Vec<&'_ str>) { + let mut index = 0; + it.rewind(); + while it.valid() { + let item = it.item(); + let value = item.value(); + + assert_bytes_eq!(&value, &Bytes::from(expectd[index].to_string())); + + it.next(); + + // Additional check for prev. + if it.valid() { + it.prev(); + + let item = it.item(); + let value = item.value(); + + assert_bytes_eq!(&value, &Bytes::from(expectd[index].to_string())); + + it.next(); + } + + index += 1; + } + assert_eq!(expectd.len(), index); + } + #[test] fn test_txn_simple() { run_agate_test(None, move |agate| { @@ -293,23 +321,6 @@ mod normal_db { assert_eq!(i, agate.core.orc.read_ts()); } - let check_iterator = |mut it: Iterator, i: u64| { - let mut count = 0; - - it.rewind(); - while it.valid() { - let item = it.item(); - assert_bytes_eq!(&item.key, &key); - assert_bytes_eq!(&item.value(), &valversion(i)); - - count += 1; - - it.next(); - } - - assert_eq!(count, 1); - }; - let check_all_versions = |mut it: Iterator, i: u64| { let mut version = if it.opt.reverse { 1 } else { i }; @@ -323,6 +334,21 @@ mod normal_db { let value = item.value(); assert_bytes_eq!(&value, &valversion(version)); + it.next(); + + if it.valid() { + it.prev(); + + let item = it.item(); + assert_bytes_eq!(&item.key, &key); + assert_eq!(item.version, version); + + let value = item.value(); + assert_bytes_eq!(&value, &valversion(version)); + + it.next(); + } + count += 1; if it.opt.reverse { @@ -330,8 +356,6 @@ mod normal_db { } else { version -= 1; } - - it.next(); } assert_eq!(count, i); @@ -346,13 +370,16 @@ mod normal_db { assert_bytes_eq!(&value, &valversion(i)); let it = txn.new_iterator(&IteratorOptions::default()); - check_iterator(it, i); + check_iterator(it, vec![std::str::from_utf8(&valversion(i)).unwrap()]); let reversed_it = txn.new_iterator(&IteratorOptions { reverse: true, ..Default::default() }); - check_iterator(reversed_it, i); + check_iterator( + reversed_it, + vec![std::str::from_utf8(&valversion(i)).unwrap()], + ); let it = txn.new_iterator(&IteratorOptions { all_versions: true, @@ -473,21 +500,6 @@ mod normal_db { txn.commit().unwrap(); assert_eq!(agate.core.orc.read_ts(), 4); - let check_iterator = |mut it: Iterator, expectd: Vec<&'static str>| { - let mut index = 0; - it.rewind(); - while it.valid() { - let item = it.item(); - let value = item.value(); - - assert_bytes_eq!(&value, &Bytes::from(expectd[index])); - - index += 1; - it.next(); - } - assert_eq!(expectd.len(), index); - }; - let mut txn = agate.new_transaction(true); let rev = IteratorOptions { reverse: true, @@ -568,21 +580,6 @@ mod normal_db { txn.commit().unwrap(); assert_eq!(agate.core.orc.read_ts(), 4); - let check_iterator = |mut it: Iterator, expectd: Vec<&'static str>| { - let mut index = 0; - it.rewind(); - while it.valid() { - let item = it.item(); - let value = item.value(); - - assert_bytes_eq!(&value, &Bytes::from(expectd[index])); - - index += 1; - it.next(); - } - assert_eq!(expectd.len(), index); - }; - let mut txn = agate.new_transaction(true); let rev = IteratorOptions { reverse: true, diff --git a/src/table/concat_iterator.rs b/src/table/concat_iterator.rs index d4743ffd..7ad493c0 100644 --- a/src/table/concat_iterator.rs +++ b/src/table/concat_iterator.rs @@ -8,7 +8,7 @@ use crate::{ /// ConcatIterator iterates on SSTs with no overlap keys. pub struct ConcatIterator { - cur: Option, + cur: usize, iters: Vec>, tables: Vec, opt: usize, @@ -21,7 +21,7 @@ impl ConcatIterator { let iters = tables.iter().map(|_| None).collect(); ConcatIterator { - cur: None, + cur: 0, iters, tables, opt, @@ -29,48 +29,65 @@ impl ConcatIterator { } fn set_idx(&mut self, idx: usize) { - if idx >= self.iters.len() { - self.cur = None; + assert!(idx == std::usize::MAX || idx <= self.tables.len()); + self.cur = idx; + + if idx == std::usize::MAX || idx == self.iters.len() { return; } + if self.iters[idx].is_none() { self.iters[idx] = Some(self.tables[idx].new_iterator(self.opt)); } - self.cur = Some(idx); } fn iter_mut(&mut self) -> &mut TableIterator { - self.iters[self.cur.unwrap()].as_mut().unwrap() + self.iters[self.cur].as_mut().unwrap() } fn iter_ref(&self) -> &TableIterator { - self.iters[self.cur.unwrap()].as_ref().unwrap() + self.iters[self.cur].as_ref().unwrap() } } impl AgateIterator for ConcatIterator { fn next(&mut self) { - let cur = self.cur.unwrap(); - let cur_iter = self.iter_mut(); - cur_iter.next(); - if cur_iter.valid() { - return; + if self.cur != std::usize::MAX && self.cur < self.iters.len() { + let cur_iter = self.iter_mut(); + cur_iter.next(); + if cur_iter.valid() { + return; + } } + + #[allow(clippy::collapsible_else_if)] loop { if self.opt & ITERATOR_REVERSED == 0 { - self.set_idx(cur + 1); - } else if cur == 0 { - self.cur = None; - } else { - self.set_idx(cur - 1); - } + if self.cur == std::usize::MAX { + self.rewind(); + return; + } else if self.cur < self.iters.len() { + self.set_idx(self.cur + 1); - if self.cur.is_some() { - self.iter_mut().rewind(); - if self.iter_ref().valid() { + if self.cur == self.iters.len() { + return; + } + } else { return; } } else { + if self.cur == std::usize::MAX { + return; + } else if self.cur > 0 { + self.set_idx(self.cur - 1); + } else { + self.set_idx(std::usize::MAX); + return; + } + } + + self.iter_mut().rewind(); + if self.iter_ref().valid() { return; } } @@ -96,8 +113,8 @@ impl AgateIterator for ConcatIterator { idx = crate::util::search(self.tables.len(), |idx| { COMPARATOR.compare_key(self.tables[idx].biggest(), key) != Less }); + self.set_idx(idx); if idx >= self.tables.len() { - self.cur = None; return; } } else { @@ -105,8 +122,8 @@ impl AgateIterator for ConcatIterator { let ridx = crate::util::search(self.tables.len(), |idx| { COMPARATOR.compare_key(self.tables[n - 1 - idx].smallest(), key) != Greater }); + self.set_idx(ridx); if ridx >= self.tables.len() { - self.cur = None; return; } idx = n - 1 - ridx; @@ -125,26 +142,82 @@ impl AgateIterator for ConcatIterator { } fn valid(&self) -> bool { - if self.cur.is_some() { + if self.cur != std::usize::MAX && self.cur < self.iters.len() { self.iter_ref().valid() } else { false } } + + fn prev(&mut self) { + if self.cur != std::usize::MAX && self.cur < self.iters.len() { + let cur_iter = self.iter_mut(); + cur_iter.prev(); + if cur_iter.valid() { + return; + } + } + + #[allow(clippy::collapsible_else_if)] + loop { + if self.opt & ITERATOR_REVERSED == 0 { + if self.cur == std::usize::MAX { + return; + } else if self.cur > 0 { + self.set_idx(self.cur - 1); + } else { + self.set_idx(std::usize::MAX); + return; + } + } else { + if self.cur == std::usize::MAX { + self.to_last(); + return; + } else if self.cur < self.iters.len() { + self.set_idx(self.cur + 1); + + if self.cur == self.iters.len() { + return; + } + } else { + return; + } + } + + self.iter_mut().to_last(); + if self.iter_ref().valid() { + return; + } + } + } + + fn to_last(&mut self) { + if self.iters.is_empty() { + return; + } + + if self.opt & ITERATOR_REVERSED == 0 { + self.set_idx(self.iters.len() - 1); + } else { + self.set_idx(0); + } + + self.iter_mut().to_last(); + } } #[cfg(test)] mod tests { - use bytes::{Bytes, BytesMut}; + use bytes::Bytes; use rand::prelude::*; use super::*; use crate::{ - format::{key_with_ts, user_key}, table::{ tests::{build_table_data, get_test_table_options}, Table, }, + util::test::{check_iterator_normal_operation, check_iterator_out_of_bound}, }; fn build_test_tables() -> (Vec
, usize) { @@ -173,28 +246,22 @@ mod tests { #[test] fn test_concat_iterator() { let (tables, cnt) = build_test_tables(); - let mut iter = ConcatIterator::from_tables(tables, 0); - iter.rewind(); + let iter = ConcatIterator::from_tables(tables.clone(), 0); + check_iterator_normal_operation(iter, cnt, false); - // test iterate - for i in 0..cnt { - assert!(iter.valid()); - assert_eq!(user_key(iter.key()), format!("{:012x}", i).as_bytes()); - iter.next(); - } - assert!(!iter.valid()); - - // test seek - for i in 10..cnt - 10 { - iter.seek(&key_with_ts( - BytesMut::from(format!("{:012x}", i).as_bytes()), - 0, - )); - for j in 0..10 { - assert_eq!(user_key(iter.key()), format!("{:012x}", i + j).as_bytes()); - iter.next(); - } - } + let iter = ConcatIterator::from_tables(tables, ITERATOR_REVERSED); + check_iterator_normal_operation(iter, cnt, true); + } + + #[test] + fn test_concat_iterator_out_of_bound() { + let (tables, cnt) = build_test_tables(); + + let iter = ConcatIterator::from_tables(tables.clone(), 0); + check_iterator_out_of_bound(iter, cnt, false); + + let iter = ConcatIterator::from_tables(tables, ITERATOR_REVERSED); + check_iterator_out_of_bound(iter, cnt, true); } } diff --git a/src/table/iterator.rs b/src/table/iterator.rs index 57b4851a..626cd747 100644 --- a/src/table/iterator.rs +++ b/src/table/iterator.rs @@ -104,8 +104,11 @@ impl BlockIterator { } fn set_idx(&mut self, i: usize) { + // Before first and from first to after last. + assert!(self.idx == std::usize::MAX || self.idx <= self.entry_offsets().len()); + self.idx = i; - if i >= self.entry_offsets().len() { + if self.idx == std::usize::MAX || i >= self.entry_offsets().len() { self.err = Some(IteratorError::Eof); return; } @@ -187,7 +190,7 @@ impl BlockIterator { pub fn seek_to_last(&mut self) { if self.entry_offsets().is_empty() { - self.idx = std::usize::MAX; + self.idx = 0; self.err = Some(IteratorError::Eof); } else { self.set_idx(self.entry_offsets().len() - 1); @@ -195,15 +198,28 @@ impl BlockIterator { } pub fn next(&mut self) { - self.set_idx(self.idx + 1); + if self.idx == std::usize::MAX { + // Before first. + self.set_idx(0); + } else if self.idx < self.entry_offsets().len() { + // Normal. + self.set_idx(self.idx + 1); + } else { + // After last. + assert!(!self.valid()); + } } pub fn prev(&mut self) { - if self.idx == 0 { - self.idx = std::usize::MAX; - self.err = Some(IteratorError::Eof); - } else { + if self.idx == std::usize::MAX { + // Before first. + assert!(!self.valid()); + } else if self.idx > 0 { + // Normal. self.set_idx(self.idx - 1); + } else { + // At first. + self.set_idx(std::usize::MAX); } } @@ -364,7 +380,10 @@ impl> TableRefIterator { pub(crate) fn next_inner(&mut self) { self.err = None; - if self.bpos >= self.table.as_ref().offsets_length() { + if self.bpos == std::usize::MAX { + self.seek_to_first(); + return; + } else if self.bpos >= self.table.as_ref().offsets_length() { self.err = Some(IteratorError::Eof); return; } @@ -398,6 +417,9 @@ impl> TableRefIterator { if self.bpos == std::usize::MAX { self.err = Some(IteratorError::Eof); return; + } else if self.bpos >= self.table.as_ref().offsets_length() { + self.seek_to_last(); + return; } if BlockIterator::is_ready(&self.block_iterator) { @@ -413,8 +435,12 @@ impl> TableRefIterator { let bi = self.block_iterator.as_mut().unwrap(); bi.prev(); if !bi.valid() { - self.bpos = self.bpos.wrapping_sub(1); - // bpos will become -1 or usize::MAX if it moves before zero position. + // bpos will become usize::MAX if it moves before zero position. + if self.bpos == 0 { + self.bpos = std::usize::MAX; + } else { + self.bpos -= 1; + } bi.data.clear(); self.prev_inner(); } @@ -472,6 +498,22 @@ impl> AgateIterator for TableRefIterator { fn valid(&self) -> bool { self.err.is_none() } + + fn prev(&mut self) { + if self.opt & ITERATOR_REVERSED == 0 { + self.prev_inner(); + } else { + self.next_inner(); + } + } + + fn to_last(&mut self) { + if self.opt & ITERATOR_REVERSED == 0 { + self.seek_to_last(); + } else { + self.seek_to_first(); + } + } } #[cfg(test)] diff --git a/src/table/merge_iterator.rs b/src/table/merge_iterator.rs index dd08460d..489fb137 100644 --- a/src/table/merge_iterator.rs +++ b/src/table/merge_iterator.rs @@ -1,3 +1,5 @@ +use std::cmp::Ordering; + use bytes::{Bytes, BytesMut}; use enum_dispatch::enum_dispatch; @@ -34,6 +36,9 @@ pub struct MergeIterator { is_left_small: bool, reverse: bool, current_key: BytesMut, + + before_first: bool, + after_last: bool, } /// `IteratorNode` buffers the iterator key in its own struct, to @@ -75,6 +80,17 @@ impl IteratorNode { self.iter.seek(key); self.set_key(); } + + fn prev(&mut self) { + self.iter.prev(); + self.set_key(); + } + + #[allow(clippy::wrong_self_convention)] + fn to_last(&mut self) { + self.iter.to_last(); + self.set_key(); + } } impl MergeIterator { @@ -132,7 +148,7 @@ impl MergeIterator { match COMPARATOR.compare_key(&self.smaller().key, &self.bigger().key) { Equal => { - self.right.next(); + // Left most is guaranteed to be pending writes, we should make it the smaller one. if !self.is_left_small { self.swap_small(); } @@ -151,6 +167,10 @@ impl MergeIterator { } fn set_current(&mut self) { + if !self.valid() { + return; + } + self.current_key.clear(); if self.is_left_small { self.current_key.extend_from_slice(&self.left.key); @@ -175,6 +195,8 @@ impl MergeIterator { right: IteratorNode::new(Box::new(right)), is_left_small: true, current_key: BytesMut::new(), + before_first: false, + after_last: false, })) } _ => { @@ -187,6 +209,8 @@ impl MergeIterator { right: IteratorNode::new(Self::from_iterators(right, reverse)), is_left_small: true, current_key: BytesMut::new(), + before_first: false, + after_last: false, })) } } @@ -195,17 +219,42 @@ impl MergeIterator { impl AgateIterator for MergeIterator { fn next(&mut self) { + if self.before_first { + self.before_first = false; + self.rewind(); + return; + } + + self.before_first = false; + while self.valid() { if self.smaller().key != self.current_key { break; } + + if self.bigger().valid + && COMPARATOR.compare_key(&self.smaller().key, &self.bigger().key) + == Ordering::Equal + { + self.bigger_mut().next(); + } + self.smaller_mut().next(); + self.fix(); } + self.set_current(); + + if !self.valid() { + self.after_last = true; + } } fn rewind(&mut self) { + self.before_first = false; + self.after_last = false; + self.left.rewind(); self.right.rewind(); self.fix(); @@ -213,10 +262,17 @@ impl AgateIterator for MergeIterator { } fn seek(&mut self, key: &Bytes) { + self.before_first = false; + self.after_last = false; + self.left.seek(key); self.right.seek(key); self.fix(); self.set_current(); + + if !self.valid() { + self.after_last = true; + } } fn key(&self) -> &[u8] { @@ -230,6 +286,79 @@ impl AgateIterator for MergeIterator { fn valid(&self) -> bool { self.smaller().valid } + + fn prev(&mut self) { + // TODO: Re-examine this. + + if self.after_last { + self.after_last = false; + self.to_last(); + return; + } + + self.after_last = false; + + if self.before_first { + return; + } + + // We can call prev even when iterator is not valid. + self.bigger_mut().prev(); + + if !self.bigger().valid { + // Prev element is in the smaller. + self.smaller_mut().prev(); + if self.smaller().valid { + // Only when smaller has prev element, should we rewind the bigger. + // Otherwise, current element is the first element, we should make + // both smaller and bigger invalid. + self.bigger_mut().rewind(); + } else { + self.before_first = true; + } + } else { + // We should check where does the prev element come from. + self.smaller_mut().prev(); + if !self.smaller().valid { + // Prev element is in the bigger, simply rewind the smaller. + self.smaller_mut().rewind(); + } else { + // Note: Assume only one pair of keys are identical. + // Both smaller and bigger have prev element, fix and let the smaller step forward + // or let both smaller and bigger stay unmoved when they are identical. + self.fix(); + + if COMPARATOR.compare_key(&self.smaller().key, &self.bigger().key) + != Ordering::Equal + { + self.smaller_mut().next(); + } + } + } + + self.fix(); + self.set_current(); + + if !self.valid() { + self.before_first = true; + } + } + + fn to_last(&mut self) { + self.before_first = false; + self.after_last = false; + + self.left.to_last(); + self.right.to_last(); + self.fix(); + self.set_current(); + + if self.bigger_mut().valid + && COMPARATOR.compare_key(&self.smaller().key, &self.bigger().key) != Ordering::Equal + { + self.next(); + } + } } #[cfg(test)] @@ -238,6 +367,7 @@ mod tests { use crate::{ assert_bytes_eq, format::{key_with_ts, user_key}, + util::test::{check_iterator_normal_operation, check_iterator_out_of_bound}, }; pub struct VecIterator { @@ -258,7 +388,11 @@ mod tests { impl AgateIterator for VecIterator { fn next(&mut self) { - self.pos += 1; + if self.pos == std::usize::MAX { + self.rewind(); + } else if self.pos < self.vec.len() { + self.pos += 1; + } } fn rewind(&mut self) { @@ -286,84 +420,110 @@ mod tests { } fn valid(&self) -> bool { - self.pos < self.vec.len() + self.pos != std::usize::MAX && self.pos < self.vec.len() + } + + fn prev(&mut self) { + if self.pos == std::usize::MAX { + } else if self.pos > 0 { + self.pos -= 1; + } else { + self.pos = std::usize::MAX; + } + } + + fn to_last(&mut self) { + if !self.vec.is_empty() { + self.pos = self.vec.len() - 1; + } } } - fn gen_vec_data(n: usize, predicate: impl Fn(usize) -> bool) -> Vec { + pub fn gen_vec_data(n: usize, predicate: impl Fn(usize) -> bool) -> Vec { (0..n) .filter(|x| predicate(*x)) .map(|i| key_with_ts(format!("{:012x}", i).as_str(), 0)) .collect() } - fn check_sequence_both(mut iter: Box, n: usize, reversed: bool) { - // test sequentially iterate - let mut cnt = 0; - iter.rewind(); - while iter.valid() { - let check_cnt = if reversed { n - 1 - cnt } else { cnt }; - assert_bytes_eq!( - user_key(iter.key()), - format!("{:012x}", check_cnt).as_bytes() - ); - cnt += 1; - iter.next(); + #[test] + fn test_vec_iter_seek() { + let data = gen_vec_data(0xfff, |_| true); + let mut iter = VecIterator::new(data, false); + for i in 0..0xfff { + iter.seek(&key_with_ts( + BytesMut::from(format!("{:012x}", i).as_bytes()), + 0, + )); + assert_bytes_eq!(user_key(iter.key()), format!("{:012x}", i).as_bytes()); } - assert_eq!(cnt, n); - iter.rewind(); - - // test seek - for i in 10..n - 10 { + let mut data = gen_vec_data(0xfff, |_| true); + data.reverse(); + let mut iter = VecIterator::new(data, true); + for i in 0..0xfff { iter.seek(&key_with_ts( BytesMut::from(format!("{:012x}", i).as_bytes()), 0, )); - for j in 0..10 { - assert!(iter.valid()); - let expected_key = if reversed { - format!("{:012x}", i - j).to_string() - } else { - format!("{:012x}", i + j).to_string() - }; - assert_bytes_eq!(user_key(iter.key()), expected_key.as_bytes()); - iter.next(); - } + assert_bytes_eq!(user_key(iter.key()), format!("{:012x}", i).as_bytes()); } } - fn check_sequence(iter: Box, n: usize) { - check_sequence_both(iter, n, false); - } - - fn check_reverse_sequence(iter: Box, n: usize) { - check_sequence_both(iter, n, true); - } - #[test] - fn test_vec_iter_seek() { + fn test_vec_iter_prev() { let data = gen_vec_data(0xfff, |_| true); let mut iter = VecIterator::new(data, false); - for i in 0..0xfff { + for i in 1..0xfff { iter.seek(&key_with_ts( BytesMut::from(format!("{:012x}", i).as_bytes()), 0, )); - assert_bytes_eq!(user_key(iter.key()), format!("{:012x}", i).as_bytes()); + iter.prev(); + assert_bytes_eq!(user_key(iter.key()), format!("{:012x}", i - 1).as_bytes()); } + let mut data = gen_vec_data(0xfff, |_| true); data.reverse(); let mut iter = VecIterator::new(data, true); - for i in 0..0xfff { + for i in 0..0xfff - 1 { iter.seek(&key_with_ts( BytesMut::from(format!("{:012x}", i).as_bytes()), 0, )); - assert_bytes_eq!(user_key(iter.key()), format!("{:012x}", i).as_bytes()); + iter.prev(); + assert_bytes_eq!(user_key(iter.key()), format!("{:012x}", i + 1).as_bytes()); } } + #[test] + fn test_vec_to_last() { + let data = gen_vec_data(10, |_| true); + let mut iter = VecIterator::new(data, false); + iter.to_last(); + assert_bytes_eq!(user_key(iter.key()), format!("{:012x}", 9).as_bytes()); + + let mut data = gen_vec_data(0xfff, |_| true); + data.reverse(); + let mut iter = VecIterator::new(data, true); + iter.to_last(); + assert_bytes_eq!(user_key(iter.key()), format!("{:012x}", 0).as_bytes()); + } + + #[test] + fn test_vec_out_of_bound() { + let n = 100; + let mut data = gen_vec_data(n, |_| true); + + let iter = VecIterator::new(data.clone(), false); + check_iterator_out_of_bound(iter, n, false); + + data.reverse(); + + let iter = VecIterator::new(data, false); + check_iterator_out_of_bound(iter, n, true); + } + #[test] fn test_merge_2iters_iterate() { let a = gen_vec_data(0xfff, |x| x % 5 == 0); @@ -377,12 +537,13 @@ mod tests { let iter_b = Iterators::from(VecIterator::new(b, false)); let merge_iter = MergeIterator::from_iterators(vec![iter_a, iter_b], false); - check_sequence(merge_iter, 0xfff); + check_iterator_normal_operation(*merge_iter, 0xfff, false); let iter_a = Iterators::from(VecIterator::new(rev_a, true)); let iter_b = Iterators::from(VecIterator::new(rev_b, true)); let merge_iter = MergeIterator::from_iterators(vec![iter_a, iter_b], true); - check_reverse_sequence(merge_iter, 0xfff); + + check_iterator_normal_operation(*merge_iter, 0xfff, true); } #[test] @@ -409,8 +570,129 @@ mod tests { .map(|vec| Iterators::from(VecIterator::new(vec, false))) .collect(); - check_sequence(MergeIterator::from_iterators(iters, false), 0xfff); + check_iterator_normal_operation(*MergeIterator::from_iterators(iters, false), 0xfff, false); + + check_iterator_normal_operation( + *MergeIterator::from_iterators(rev_iters, true), + 0xfff, + true, + ); + } + + #[test] + fn test_merge_5iters_out_of_bound() { + // randomly determine sequence of 5 iterators + let vec_map = vec![2, 4, 1, 3, 0]; + let vec_map_size = vec_map.len(); + let vecs: Vec> = vec_map + .into_iter() + .map(|i| gen_vec_data(0xfff, |x| x % vec_map_size == i)) + .collect(); + + let rev_iters: Vec = vecs + .iter() + .map(|x| { + let mut y = x.clone(); + y.reverse(); + Iterators::from(VecIterator::new(y, true)) + }) + .collect(); + + let iters: Vec = vecs + .into_iter() + .map(|vec| Iterators::from(VecIterator::new(vec, false))) + .collect(); + + check_iterator_out_of_bound(*MergeIterator::from_iterators(iters, false), 0xfff, false); + + check_iterator_out_of_bound(*MergeIterator::from_iterators(rev_iters, true), 0xfff, true); + } + + #[test] + fn test_merge_full_empty() { + let a = gen_vec_data(0xfff, |_| true); + let b = gen_vec_data(0xfff, |_| false); + let mut rev_a = a.clone(); + rev_a.reverse(); + let mut rev_b = b.clone(); + rev_b.reverse(); + + let iter_a = Iterators::from(VecIterator::new(a, false)); + let iter_b = Iterators::from(VecIterator::new(b, false)); + let merge_iter = MergeIterator::from_iterators(vec![iter_a, iter_b], false); + + check_iterator_normal_operation(*merge_iter, 0xfff, false); + + let iter_a = Iterators::from(VecIterator::new(rev_a, true)); + let iter_b = Iterators::from(VecIterator::new(rev_b, true)); + let merge_iter = MergeIterator::from_iterators(vec![iter_a, iter_b], true); + + check_iterator_normal_operation(*merge_iter, 0xfff, true); + } + + #[test] + fn test_merge_full_empty_out_of_bound() { + let a = gen_vec_data(0xfff, |_| true); + let b = gen_vec_data(0xfff, |_| false); + let mut rev_a = a.clone(); + rev_a.reverse(); + let mut rev_b = b.clone(); + rev_b.reverse(); + + let iter_a = Iterators::from(VecIterator::new(a, false)); + let iter_b = Iterators::from(VecIterator::new(b, false)); + let merge_iter = MergeIterator::from_iterators(vec![iter_a, iter_b], false); + + check_iterator_out_of_bound(*merge_iter, 0xfff, false); + + let iter_a = Iterators::from(VecIterator::new(rev_a, true)); + let iter_b = Iterators::from(VecIterator::new(rev_b, true)); + let merge_iter = MergeIterator::from_iterators(vec![iter_a, iter_b], true); + + check_iterator_out_of_bound(*merge_iter, 0xfff, true); + } + + #[test] + fn test_merge_same() { + let a = gen_vec_data(0xfff, |_| true); + let b = a.clone(); + let mut rev_a = a.clone(); + rev_a.reverse(); + let mut rev_b = b.clone(); + rev_b.reverse(); + + let iter_a = Iterators::from(VecIterator::new(a, false)); + let iter_b = Iterators::from(VecIterator::new(b, false)); + let merge_iter = MergeIterator::from_iterators(vec![iter_a, iter_b], false); + + check_iterator_normal_operation(*merge_iter, 0xfff, false); + + let iter_a = Iterators::from(VecIterator::new(rev_a, true)); + let iter_b = Iterators::from(VecIterator::new(rev_b, true)); + let merge_iter = MergeIterator::from_iterators(vec![iter_a, iter_b], true); + + check_iterator_normal_operation(*merge_iter, 0xfff, true); + } + + #[test] + fn test_merge_same_out_of_bound() { + let a = gen_vec_data(0xfff, |_| true); + let b = a.clone(); + let mut rev_a = a.clone(); + rev_a.reverse(); + let mut rev_b = b.clone(); + rev_b.reverse(); + + let iter_a = Iterators::from(VecIterator::new(a, false)); + let iter_b = Iterators::from(VecIterator::new(b, false)); + let merge_iter = MergeIterator::from_iterators(vec![iter_a, iter_b], false); + + check_iterator_out_of_bound(*merge_iter, 0xfff, false); + + let iter_a = Iterators::from(VecIterator::new(rev_a, true)); + let iter_b = Iterators::from(VecIterator::new(rev_b, true)); + let merge_iter = MergeIterator::from_iterators(vec![iter_a, iter_b], true); - check_reverse_sequence(MergeIterator::from_iterators(rev_iters, true), 0xfff); + check_iterator_out_of_bound(*merge_iter, 0xfff, true); } } diff --git a/src/table/tests.rs b/src/table/tests.rs index ea86b7f5..2d2feeb4 100644 --- a/src/table/tests.rs +++ b/src/table/tests.rs @@ -456,14 +456,45 @@ fn test_iterator_out_of_bound() { let opts = get_test_table_options(); let table = build_test_table(b"key", 1000, opts); let mut it = table.new_iterator(0); + // next first and then prev it.seek_to_last(); assert!(it.error().is_none()); + + it.next(); + assert_eq!(it.error(), Some(&IteratorError::Eof)); it.next(); assert_eq!(it.error(), Some(&IteratorError::Eof)); + it.prev(); + assert!(it.error().is_none()); + it.next(); assert_eq!(it.error(), Some(&IteratorError::Eof)); it.next(); assert_eq!(it.error(), Some(&IteratorError::Eof)); + it.prev(); + assert!(it.error().is_none()); + + it.rewind(); + assert!(it.error().is_none()); + + // prev first and then next + it.seek_to_first(); + assert!(it.error().is_none()); + + it.prev(); + assert_eq!(it.error(), Some(&IteratorError::Eof)); + it.prev(); + assert_eq!(it.error(), Some(&IteratorError::Eof)); + it.next(); + assert!(it.error().is_none()); + + it.prev(); + assert_eq!(it.error(), Some(&IteratorError::Eof)); + it.prev(); + assert_eq!(it.error(), Some(&IteratorError::Eof)); + it.next(); + assert!(it.error().is_none()); + it.rewind(); assert!(it.error().is_none()); assert_eq!(user_key(it.key()), key(b"key", 0)); @@ -474,14 +505,45 @@ fn test_iterator_out_of_bound_reverse() { let opts = get_test_table_options(); let table = build_test_table(b"key", 1000, opts); let mut it = table.new_iterator(ITERATOR_REVERSED); + // next first and then prev it.seek_to_first(); assert!(it.error().is_none()); + + it.next(); + assert_eq!(it.error(), Some(&IteratorError::Eof)); it.next(); assert_eq!(it.error(), Some(&IteratorError::Eof)); + it.prev(); + assert!(it.error().is_none()); + it.next(); assert_eq!(it.error(), Some(&IteratorError::Eof)); it.next(); assert_eq!(it.error(), Some(&IteratorError::Eof)); + it.prev(); + assert!(it.error().is_none()); + + it.rewind(); + assert!(it.error().is_none()); + + // prev first and then next + it.seek_to_last(); + assert!(it.error().is_none()); + + it.prev(); + assert_eq!(it.error(), Some(&IteratorError::Eof)); + it.prev(); + assert_eq!(it.error(), Some(&IteratorError::Eof)); + it.next(); + assert!(it.error().is_none()); + + it.prev(); + assert_eq!(it.error(), Some(&IteratorError::Eof)); + it.prev(); + assert_eq!(it.error(), Some(&IteratorError::Eof)); + it.next(); + assert!(it.error().is_none()); + it.rewind(); assert!(it.error().is_none()); assert_eq!(user_key(it.key()), key(b"key", 999)); diff --git a/src/util.rs b/src/util.rs index 04d82692..d2df70fb 100644 --- a/src/util.rs +++ b/src/util.rs @@ -124,11 +124,132 @@ macro_rules! assert_bytes_eq { } #[cfg(test)] -mod test { +pub(crate) mod test { + use bytes::BytesMut; use std::time::{SystemTime, UNIX_EPOCH}; + use crate::{key_with_ts, AgateIterator}; + use super::*; + pub fn check_iterator_out_of_bound(mut iter: impl AgateIterator, count: usize, reversed: bool) { + iter.rewind(); + iter.prev(); + assert!(!iter.valid()); + iter.prev(); + assert!(!iter.valid()); + iter.next(); + assert!(iter.valid()); + + iter.prev(); + assert!(!iter.valid()); + iter.prev(); + assert!(!iter.valid()); + iter.next(); + assert!(iter.valid()); + + if !reversed { + assert_eq!(user_key(iter.key()), format!("{:012x}", 0).as_bytes()); + } else { + assert_eq!( + user_key(iter.key()), + format!("{:012x}", count - 1).as_bytes() + ); + } + + iter.to_last(); + iter.next(); + assert!(!iter.valid()); + iter.next(); + assert!(!iter.valid()); + iter.prev(); + assert!(iter.valid()); + + iter.next(); + assert!(!iter.valid()); + iter.next(); + assert!(!iter.valid()); + iter.prev(); + assert!(iter.valid()); + + if !reversed { + assert_eq!( + user_key(iter.key()), + format!("{:012x}", count - 1).as_bytes() + ); + } else { + assert_eq!(user_key(iter.key()), format!("{:012x}", 0).as_bytes()); + } + } + + pub fn check_iterator_normal_operation( + mut iter: impl AgateIterator, + count: usize, + reversed: bool, + ) { + iter.rewind(); + + // test iterate + for i in 0..count { + assert!(iter.valid()); + if !reversed { + assert_eq!(user_key(iter.key()), format!("{:012x}", i).as_bytes()); + } else { + assert_eq!( + user_key(iter.key()), + format!("{:012x}", count - i - 1).as_bytes() + ); + } + iter.next(); + } + assert!(!iter.valid()); + + // test seek + for i in 10..count - 10 { + iter.seek(&key_with_ts( + BytesMut::from(format!("{:012x}", i).as_bytes()), + 0, + )); + + for j in 0..10 { + if !reversed { + assert_eq!(user_key(iter.key()), format!("{:012x}", i + j).as_bytes()); + } else { + assert_eq!(user_key(iter.key()), format!("{:012x}", i - j).as_bytes()); + } + iter.next(); + } + } + + // test prev + for i in 10..count - 10 { + iter.seek(&key_with_ts( + BytesMut::from(format!("{:012x}", i).as_bytes()), + 0, + )); + + for j in 0..10 { + if !reversed { + assert_eq!(user_key(iter.key()), format!("{:012x}", i - j).as_bytes()); + } else { + assert_eq!(user_key(iter.key()), format!("{:012x}", i + j).as_bytes()); + } + iter.prev(); + } + } + + // test to_last + iter.to_last(); + if !reversed { + assert_eq!( + user_key(iter.key()), + format!("{:012x}", count - 1).as_bytes() + ); + } else { + assert_eq!(user_key(iter.key()), format!("{:012x}", 0).as_bytes()); + } + } + #[test] fn test_unix_time() { let start = SystemTime::now();