diff --git a/rust/blockstore/src/arrow/block/delta/builder_storage.rs b/rust/blockstore/src/arrow/block/delta/builder_storage.rs index 1c4fc4ec10d..cc690c3bab1 100644 --- a/rust/blockstore/src/arrow/block/delta/builder_storage.rs +++ b/rust/blockstore/src/arrow/block/delta/builder_storage.rs @@ -14,6 +14,13 @@ impl BTreeBuilderStorage { self.storage.remove(key) } + fn get(&self, key: &CompositeKey) -> Option { + if !self.storage.contains_key(key) { + return None; + } + Some(V::prepare(self.storage.get(key).unwrap().clone())) + } + fn min_key(&self) -> Option<&CompositeKey> { self.storage.keys().next() } @@ -61,6 +68,10 @@ impl VecBuilderStorage { None } + fn get(&self, _: &CompositeKey) -> Option { + unimplemented!() + } + fn min_key(&self) -> Option<&CompositeKey> { self.storage.first().map(|(key, _)| key) } @@ -137,6 +148,13 @@ impl BuilderStorage { } } + pub fn get(&self, key: &CompositeKey) -> Option { + match self { + BuilderStorage::BTreeBuilderStorage(storage) => storage.get(key), + BuilderStorage::VecBuilderStorage(storage) => storage.get(key), + } + } + pub fn len(&self) -> usize { match self { BuilderStorage::BTreeBuilderStorage(storage) => storage.len(), diff --git a/rust/blockstore/src/arrow/block/delta/data_record.rs b/rust/blockstore/src/arrow/block/delta/data_record.rs index fea906e67af..f6a03693b39 100644 --- a/rust/blockstore/src/arrow/block/delta/data_record.rs +++ b/rust/blockstore/src/arrow/block/delta/data_record.rs @@ -1,5 +1,6 @@ use super::data_record_size_tracker::DataRecordSizeTracker; use super::BlockKeyArrowBuilder; +use crate::arrow::block::value::data_record_value::DataRecordStorageEntry; use crate::arrow::types::ArrowWriteableValue; use crate::{ arrow::types::ArrowWriteableKey, @@ -49,6 +50,15 @@ impl DataRecordStorage { inner.size_tracker.get_key_size() } + pub fn get_owned_value(&self, prefix: &str, key: KeyWrapper) -> Option { + let inner = self.inner.read(); + let composite_key = CompositeKey { + prefix: prefix.to_string(), + key, + }; + inner.storage.get(&composite_key).cloned() + } + pub fn add(&self, prefix: &str, key: KeyWrapper, value: &DataRecord<'_>) { let mut inner = self.inner.write(); let composite_key = CompositeKey { diff --git a/rust/blockstore/src/arrow/block/delta/single_column_storage.rs b/rust/blockstore/src/arrow/block/delta/single_column_storage.rs index d639fde90d4..3e903245e60 100644 --- a/rust/blockstore/src/arrow/block/delta/single_column_storage.rs +++ b/rust/blockstore/src/arrow/block/delta/single_column_storage.rs @@ -269,4 +269,12 @@ impl> SingleColumn (schema.into(), vec![prefix_arr, key_arr, value_arr]) } + + pub fn get_owned_value(&self, prefix: &str, key: KeyWrapper) -> Option { + let composite_key = CompositeKey { + prefix: prefix.to_string(), + key, + }; + self.inner.read().storage.get(&composite_key) + } } diff --git a/rust/blockstore/src/arrow/block/delta/spann_posting_list_delta.rs b/rust/blockstore/src/arrow/block/delta/spann_posting_list_delta.rs index e7d4f12afff..505e390243d 100644 --- a/rust/blockstore/src/arrow/block/delta/spann_posting_list_delta.rs +++ b/rust/blockstore/src/arrow/block/delta/spann_posting_list_delta.rs @@ -5,7 +5,10 @@ use chroma_types::SpannPostingList; use parking_lot::RwLock; use crate::{ - arrow::types::{ArrowWriteableKey, ArrowWriteableValue}, + arrow::{ + block::value::spann_posting_list_value::SpannPostingListDeltaEntry, + types::{ArrowWriteableKey, ArrowWriteableValue}, + }, key::{CompositeKey, KeyWrapper}, }; @@ -48,6 +51,19 @@ impl SpannPostingListDelta { self.inner.read().size_tracker.get_key_size() } + pub fn get_owned_value( + &self, + prefix: &str, + key: KeyWrapper, + ) -> Option { + let read_guard = self.inner.read(); + let composite_key = CompositeKey { + prefix: prefix.to_string(), + key, + }; + read_guard.storage.get(&composite_key).cloned() + } + pub fn add(&self, prefix: &str, key: KeyWrapper, value: &SpannPostingList<'_>) { let mut lock_guard = self.inner.write(); let composite_key = CompositeKey { diff --git a/rust/blockstore/src/arrow/block/value/data_record_value.rs b/rust/blockstore/src/arrow/block/value/data_record_value.rs index efe4b4305ff..613eb57952a 100644 --- a/rust/blockstore/src/arrow/block/value/data_record_value.rs +++ b/rust/blockstore/src/arrow/block/value/data_record_value.rs @@ -31,11 +31,13 @@ pub struct ValueBuilderWrapper { document_builder: StringBuilder, } +pub type DataRecordStorageEntry = (String, Vec, Option>, Option); + impl ArrowWriteableValue for &DataRecord<'_> { type ReadableValue<'referred_data> = DataRecord<'referred_data>; type ArrowBuilder = ValueBuilderWrapper; type SizeTracker = DataRecordSizeTracker; - type PreparedValue = (String, Vec, Option>, Option); + type PreparedValue = DataRecordStorageEntry; fn offset_size(item_count: usize) -> usize { let id_offset = bit_util::round_upto_multiple_of_64((item_count + 1) * 4); @@ -178,6 +180,17 @@ impl ArrowWriteableValue for &DataRecord<'_> { (struct_field, value_arr) } + + fn get_owned_value_from_delta( + prefix: &str, + key: KeyWrapper, + delta: &UnorderedBlockDelta, + ) -> Option { + match &delta.builder { + BlockStorage::DataRecord(builder) => builder.get_owned_value(prefix, key), + _ => panic!("Invalid builder type"), + } + } } impl<'referred_data> ArrowReadableValue<'referred_data> for DataRecord<'referred_data> { diff --git a/rust/blockstore/src/arrow/block/value/roaring_bitmap_value.rs b/rust/blockstore/src/arrow/block/value/roaring_bitmap_value.rs index eab22040013..951010b8b60 100644 --- a/rust/blockstore/src/arrow/block/value/roaring_bitmap_value.rs +++ b/rust/blockstore/src/arrow/block/value/roaring_bitmap_value.rs @@ -78,6 +78,17 @@ impl ArrowWriteableValue for RoaringBitmap { let value_arr = (&value_arr as &dyn Array).slice(0, value_arr.len()); (value_field, value_arr) } + + fn get_owned_value_from_delta( + prefix: &str, + key: KeyWrapper, + delta: &UnorderedBlockDelta, + ) -> Option { + match &delta.builder { + BlockStorage::RoaringBitmap(builder) => builder.get_owned_value(prefix, key), + _ => panic!("Invalid builder type"), + } + } } impl ArrowReadableValue<'_> for RoaringBitmap { diff --git a/rust/blockstore/src/arrow/block/value/spann_posting_list_value.rs b/rust/blockstore/src/arrow/block/value/spann_posting_list_value.rs index 227d0e83067..ac5c15ef310 100644 --- a/rust/blockstore/src/arrow/block/value/spann_posting_list_value.rs +++ b/rust/blockstore/src/arrow/block/value/spann_posting_list_value.rs @@ -22,7 +22,7 @@ use crate::{ BlockfileWriterMutationOrdering, }; -type SpannPostingListDeltaEntry = (Vec, Vec, Vec); +pub type SpannPostingListDeltaEntry = (Vec, Vec, Vec); pub struct SpannPostingListBuilderWrapper { doc_offset_ids_builder: ListBuilder, @@ -180,6 +180,17 @@ impl ArrowWriteableValue for &SpannPostingList<'_> { (value_field, value_arr) } + + fn get_owned_value_from_delta( + prefix: &str, + key: KeyWrapper, + delta: &UnorderedBlockDelta, + ) -> Option { + match &delta.builder { + BlockStorage::SpannPostingListDelta(builder) => builder.get_owned_value(prefix, key), + _ => panic!("Invalid builder type"), + } + } } impl<'referred_data> ArrowReadableValue<'referred_data> for SpannPostingList<'referred_data> { diff --git a/rust/blockstore/src/arrow/block/value/str_value.rs b/rust/blockstore/src/arrow/block/value/str_value.rs index 37b8e7b38e3..b852a860d30 100644 --- a/rust/blockstore/src/arrow/block/value/str_value.rs +++ b/rust/blockstore/src/arrow/block/value/str_value.rs @@ -69,6 +69,17 @@ impl ArrowWriteableValue for String { let value_arr = (&value_arr as &dyn Array).slice(0, value_arr.len()); (value_field, value_arr) } + + fn get_owned_value_from_delta( + prefix: &str, + key: KeyWrapper, + delta: &UnorderedBlockDelta, + ) -> Option { + match &delta.builder { + BlockStorage::String(builder) => builder.get_owned_value(prefix, key), + _ => panic!("Invalid builder type"), + } + } } impl<'referred_data> ArrowReadableValue<'referred_data> for &'referred_data str { diff --git a/rust/blockstore/src/arrow/block/value/u32_value.rs b/rust/blockstore/src/arrow/block/value/u32_value.rs index 09dfd385666..d7213e1443b 100644 --- a/rust/blockstore/src/arrow/block/value/u32_value.rs +++ b/rust/blockstore/src/arrow/block/value/u32_value.rs @@ -68,6 +68,17 @@ impl ArrowWriteableValue for u32 { let value_arr = (&value_arr as &dyn Array).slice(0, value_arr.len()); (value_field, value_arr) } + + fn get_owned_value_from_delta( + prefix: &str, + key: KeyWrapper, + delta: &UnorderedBlockDelta, + ) -> Option { + match &delta.builder { + BlockStorage::UInt32(builder) => builder.get_owned_value(prefix, key), + _ => panic!("Invalid builder type: {:?}", &delta.builder), + } + } } impl<'a> ArrowReadableValue<'a> for u32 { diff --git a/rust/blockstore/src/arrow/block/value/uint32array_value.rs b/rust/blockstore/src/arrow/block/value/uint32array_value.rs index 2ae5c7375bb..ce1877f17e2 100644 --- a/rust/blockstore/src/arrow/block/value/uint32array_value.rs +++ b/rust/blockstore/src/arrow/block/value/uint32array_value.rs @@ -86,6 +86,17 @@ impl ArrowWriteableValue for Vec { (value_field, Arc::new(value_arr)) } + + fn get_owned_value_from_delta( + prefix: &str, + key: KeyWrapper, + delta: &UnorderedBlockDelta, + ) -> Option { + match &delta.builder { + BlockStorage::VecUInt32(builder) => builder.get_owned_value(prefix, key), + _ => panic!("Invalid builder type"), + } + } } impl<'referred_data> ArrowReadableValue<'referred_data> for &'referred_data [u32] { diff --git a/rust/blockstore/src/arrow/blockfile.rs b/rust/blockstore/src/arrow/blockfile.rs index ba272d1d8f4..762d62a6fde 100644 --- a/rust/blockstore/src/arrow/blockfile.rs +++ b/rust/blockstore/src/arrow/blockfile.rs @@ -224,6 +224,68 @@ impl ArrowUnorderedBlockfileWriter { Ok(()) } + #[allow(dead_code)] + pub(crate) async fn get_owned( + &self, + prefix: &str, + key: K, + ) -> Result, Box> { + // TODO: for now the BF writer locks the entire write operation + let _guard = self.write_mutex.lock().await; + + // TODO: value must be smaller than the block size except for position lists, which are a special case + // where we split the value across multiple blocks + + // Get the target block id for the key + let search_key = CompositeKey::new(prefix.to_string(), key.clone()); + let target_block_id = self.root.sparse_index.get_target_block_id(&search_key); + + // See if a delta for the target block already exists, if not create a new one and add it to the transaction state + // Creating a delta loads the block entirely into memory + + let delta = { + let deltas = self.block_deltas.lock(); + deltas.get(&target_block_id).cloned() + }; + + let delta = match delta { + None => { + let block = match self.block_manager.get(&target_block_id).await { + Ok(Some(block)) => block, + Ok(None) => { + return Err(Box::new(ArrowBlockfileError::BlockNotFound)); + } + Err(e) => { + return Err(Box::new(e)); + } + }; + let new_delta = match self + .block_manager + .fork::(&block.id) + .await + { + Ok(delta) => delta, + Err(e) => { + return Err(Box::new(e)); + } + }; + let new_id = new_delta.id; + // Blocks can be empty. + self.root + .sparse_index + .replace_block(target_block_id, new_delta.id); + { + let mut deltas = self.block_deltas.lock(); + deltas.insert(new_id, new_delta.clone()); + } + new_delta + } + Some(delta) => delta, + }; + + Ok(V::get_owned_value_from_delta(prefix, key.into(), &delta)) + } + pub(crate) async fn delete( &self, prefix: &str, diff --git a/rust/blockstore/src/arrow/types.rs b/rust/blockstore/src/arrow/types.rs index 36bd5f15891..19fc2799c19 100644 --- a/rust/blockstore/src/arrow/types.rs +++ b/rust/blockstore/src/arrow/types.rs @@ -22,7 +22,7 @@ pub trait ArrowWriteableValue: Value { /// Every writable value has a corresponding readable value type. For example, the readable value type for a `String` is `&str`. type ReadableValue<'referred_data>: ArrowReadableValue<'referred_data>; /// Some values are a reference type and need to be converted to an owned type or need to be prepared (e.g. serializing a RoaringBitmap) before they can be stored in a delta or Arrow array. - type PreparedValue; + type PreparedValue: Clone; /// Some values use an offsets array. This returns the size of the offsets array given the number of items in the array. fn offset_size(item_count: usize) -> usize; @@ -45,6 +45,11 @@ pub trait ArrowWriteableValue: Value { builder: Self::ArrowBuilder, size_tracker: &Self::SizeTracker, ) -> (Field, Arc); + fn get_owned_value_from_delta( + prefix: &str, + key: KeyWrapper, + delta: &UnorderedBlockDelta, + ) -> Option; } pub trait ArrowReadableKey<'referred_data>: Key + PartialOrd {