diff --git a/rust/blockstore/src/arrow/block/delta/builder_storage.rs b/rust/blockstore/src/arrow/block/delta/builder_storage.rs index 1c4fc4ec10d..cc690c3bab1 100644 --- a/rust/blockstore/src/arrow/block/delta/builder_storage.rs +++ b/rust/blockstore/src/arrow/block/delta/builder_storage.rs @@ -14,6 +14,13 @@ impl BTreeBuilderStorage { self.storage.remove(key) } + fn get(&self, key: &CompositeKey) -> Option { + if !self.storage.contains_key(key) { + return None; + } + Some(V::prepare(self.storage.get(key).unwrap().clone())) + } + fn min_key(&self) -> Option<&CompositeKey> { self.storage.keys().next() } @@ -61,6 +68,10 @@ impl VecBuilderStorage { None } + fn get(&self, _: &CompositeKey) -> Option { + unimplemented!() + } + fn min_key(&self) -> Option<&CompositeKey> { self.storage.first().map(|(key, _)| key) } @@ -137,6 +148,13 @@ impl BuilderStorage { } } + pub fn get(&self, key: &CompositeKey) -> Option { + match self { + BuilderStorage::BTreeBuilderStorage(storage) => storage.get(key), + BuilderStorage::VecBuilderStorage(storage) => storage.get(key), + } + } + pub fn len(&self) -> usize { match self { BuilderStorage::BTreeBuilderStorage(storage) => storage.len(), diff --git a/rust/blockstore/src/arrow/block/delta/data_record.rs b/rust/blockstore/src/arrow/block/delta/data_record.rs index b9967ca8f4c..132a8fc8f6e 100644 --- a/rust/blockstore/src/arrow/block/delta/data_record.rs +++ b/rust/blockstore/src/arrow/block/delta/data_record.rs @@ -49,7 +49,11 @@ impl DataRecordStorage { inner.size_tracker.get_key_size() } - pub fn get_owned_value(&self, prefix: &str, key: KeyWrapper) -> Option { + pub fn get_owned_value( + &self, + prefix: &str, + key: KeyWrapper, + ) -> Option<(String, Vec, Option>, Option)> { let inner = self.inner.read(); let composite_key = CompositeKey { prefix: prefix.to_string(), diff --git a/rust/blockstore/src/arrow/block/delta/single_column_storage.rs b/rust/blockstore/src/arrow/block/delta/single_column_storage.rs index 4c2283f8e27..ec47f140354 100644 --- a/rust/blockstore/src/arrow/block/delta/single_column_storage.rs +++ b/rust/blockstore/src/arrow/block/delta/single_column_storage.rs @@ -11,6 +11,7 @@ use crate::{ use arrow::util::bit_util; use arrow::{array::Array, datatypes::Schema}; use parking_lot::RwLock; +use roaring::RoaringBitmap; use std::sync::Arc; use std::{collections::HashMap, vec}; @@ -270,14 +271,11 @@ impl> SingleColumn (schema.into(), vec![prefix_arr, key_arr, value_arr]) } - pub fn get_owned_value(&self, prefix: &str, key: KeyWrapper) -> Option { - let inner = self.inner.read(); - inner - .storage - .get(&CompositeKey { - prefix: prefix.to_string(), - key, - }) - .cloned() + pub fn get_owned_value(&self, prefix: &str, key: KeyWrapper) -> Option { + let composite_key = CompositeKey { + prefix: prefix.to_string(), + key, + }; + self.inner.read().storage.get(&composite_key) } } diff --git a/rust/blockstore/src/arrow/block/delta/spann_posting_list_delta.rs b/rust/blockstore/src/arrow/block/delta/spann_posting_list_delta.rs index cea5d8e0590..fbb32b35d4a 100644 --- a/rust/blockstore/src/arrow/block/delta/spann_posting_list_delta.rs +++ b/rust/blockstore/src/arrow/block/delta/spann_posting_list_delta.rs @@ -5,7 +5,10 @@ use chroma_types::SpannPostingList; use parking_lot::RwLock; use crate::{ - arrow::types::{ArrowWriteableKey, ArrowWriteableValue}, + arrow::{ + block::value::spann_posting_list_value::SpannPostingListDeltaEntry, + types::{ArrowWriteableKey, ArrowWriteableValue}, + }, key::{CompositeKey, KeyWrapper}, }; diff --git a/rust/blockstore/src/arrow/block/value/data_record_value.rs b/rust/blockstore/src/arrow/block/value/data_record_value.rs index bf0824eaa82..2aa58802915 100644 --- a/rust/blockstore/src/arrow/block/value/data_record_value.rs +++ b/rust/blockstore/src/arrow/block/value/data_record_value.rs @@ -20,13 +20,9 @@ use arrow::{ array::{ArrayRef, BinaryArray}, util::bit_util, }; -use chroma_types::{chroma_proto::UpdateMetadata, DataRecord}; +use chroma_types::{chroma_proto::UpdateMetadata, DataRecord, MetadataValue}; use prost::Message; -use std::sync::Arc; - -// Convenience type for the storage entry -// (id, embedding, metadata, document) -pub type DataRecordStorageEntry = (String, Vec, Option>, Option); +use std::{collections::HashMap, sync::Arc}; pub struct ValueBuilderWrapper { id_builder: StringBuilder, @@ -40,7 +36,6 @@ impl ArrowWriteableValue for &DataRecord<'_> { type ArrowBuilder = ValueBuilderWrapper; type SizeTracker = DataRecordSizeTracker; type PreparedValue = (String, Vec, Option>, Option); - type OwnedReadableValue = DataRecordStorageEntry; fn offset_size(item_count: usize) -> usize { let id_offset = bit_util::round_upto_multiple_of_64((item_count + 1) * 4); @@ -190,8 +185,8 @@ impl ArrowWriteableValue for &DataRecord<'_> { fn get_owned_value_from_delta( prefix: &str, key: KeyWrapper, - delta: &BlockDelta, - ) -> Option { + delta: &UnorderedBlockDelta, + ) -> Option { match &delta.builder { BlockStorage::DataRecord(builder) => builder.get_owned_value(prefix, key), _ => panic!("Invalid builder type"), diff --git a/rust/blockstore/src/arrow/block/value/roaring_bitmap_value.rs b/rust/blockstore/src/arrow/block/value/roaring_bitmap_value.rs index d4cbd14c020..a6c14f4cd61 100644 --- a/rust/blockstore/src/arrow/block/value/roaring_bitmap_value.rs +++ b/rust/blockstore/src/arrow/block/value/roaring_bitmap_value.rs @@ -23,7 +23,6 @@ impl ArrowWriteableValue for RoaringBitmap { type ArrowBuilder = BinaryBuilder; type SizeTracker = SingleColumnSizeTracker; type PreparedValue = Vec; - type OwnedReadableValue = RoaringBitmap; fn offset_size(item_count: usize) -> usize { bit_util::round_upto_multiple_of_64((item_count + 1) * 4) @@ -86,8 +85,8 @@ impl ArrowWriteableValue for RoaringBitmap { fn get_owned_value_from_delta( prefix: &str, key: KeyWrapper, - delta: &BlockDelta, - ) -> Option { + delta: &UnorderedBlockDelta, + ) -> Option { match &delta.builder { BlockStorage::RoaringBitmap(builder) => builder.get_owned_value(prefix, key), _ => panic!("Invalid builder type"), diff --git a/rust/blockstore/src/arrow/block/value/spann_posting_list_value.rs b/rust/blockstore/src/arrow/block/value/spann_posting_list_value.rs index d8ba34d9ac1..ac5c15ef310 100644 --- a/rust/blockstore/src/arrow/block/value/spann_posting_list_value.rs +++ b/rust/blockstore/src/arrow/block/value/spann_posting_list_value.rs @@ -22,7 +22,7 @@ use crate::{ BlockfileWriterMutationOrdering, }; -type SpannPostingListDeltaEntry = (Vec, Vec, Vec); +pub type SpannPostingListDeltaEntry = (Vec, Vec, Vec); pub struct SpannPostingListBuilderWrapper { doc_offset_ids_builder: ListBuilder, @@ -35,7 +35,6 @@ impl ArrowWriteableValue for &SpannPostingList<'_> { type PreparedValue = SpannPostingListDeltaEntry; type SizeTracker = SpannPostingListSizeTracker; type ArrowBuilder = SpannPostingListBuilderWrapper; - type OwnedReadableValue = SpannPostingListDeltaEntry; // This method is only called for SingleColumnStorage. fn offset_size(_: usize) -> usize { @@ -185,8 +184,8 @@ impl ArrowWriteableValue for &SpannPostingList<'_> { fn get_owned_value_from_delta( prefix: &str, key: KeyWrapper, - delta: &BlockDelta, - ) -> Option { + delta: &UnorderedBlockDelta, + ) -> Option { match &delta.builder { BlockStorage::SpannPostingListDelta(builder) => builder.get_owned_value(prefix, key), _ => panic!("Invalid builder type"), diff --git a/rust/blockstore/src/arrow/block/value/str_value.rs b/rust/blockstore/src/arrow/block/value/str_value.rs index 7bd4db13b77..fc77507dd75 100644 --- a/rust/blockstore/src/arrow/block/value/str_value.rs +++ b/rust/blockstore/src/arrow/block/value/str_value.rs @@ -21,7 +21,6 @@ impl ArrowWriteableValue for String { type ArrowBuilder = StringBuilder; type SizeTracker = SingleColumnSizeTracker; type PreparedValue = String; - type OwnedReadableValue = String; fn offset_size(item_count: usize) -> usize { bit_util::round_upto_multiple_of_64((item_count + 1) * 4) @@ -74,8 +73,8 @@ impl ArrowWriteableValue for String { fn get_owned_value_from_delta( prefix: &str, key: KeyWrapper, - delta: &BlockDelta, - ) -> Option { + delta: &UnorderedBlockDelta, + ) -> Option { match &delta.builder { BlockStorage::String(builder) => builder.get_owned_value(prefix, key), _ => panic!("Invalid builder type"), diff --git a/rust/blockstore/src/arrow/block/value/u32_value.rs b/rust/blockstore/src/arrow/block/value/u32_value.rs index d6edb744835..efe09660964 100644 --- a/rust/blockstore/src/arrow/block/value/u32_value.rs +++ b/rust/blockstore/src/arrow/block/value/u32_value.rs @@ -20,7 +20,6 @@ impl ArrowWriteableValue for u32 { type ArrowBuilder = UInt32Builder; type SizeTracker = SingleColumnSizeTracker; type PreparedValue = u32; - type OwnedReadableValue = u32; fn offset_size(_item_count: usize) -> usize { 0 @@ -73,8 +72,8 @@ impl ArrowWriteableValue for u32 { fn get_owned_value_from_delta( prefix: &str, key: KeyWrapper, - delta: &BlockDelta, - ) -> Option { + delta: &UnorderedBlockDelta, + ) -> Option { match &delta.builder { BlockStorage::UInt32(builder) => builder.get_owned_value(prefix, key), _ => panic!("Invalid builder type: {:?}", &delta.builder), @@ -83,8 +82,6 @@ impl ArrowWriteableValue for u32 { } impl<'a> ArrowReadableValue<'a> for u32 { - type OwnedReadableValue = u32; - fn get(array: &Arc, index: usize) -> u32 { let array = array.as_any().downcast_ref::().unwrap(); array.value(index) diff --git a/rust/blockstore/src/arrow/block/value/uint32array_value.rs b/rust/blockstore/src/arrow/block/value/uint32array_value.rs index 2cdb4781109..1272c0514f9 100644 --- a/rust/blockstore/src/arrow/block/value/uint32array_value.rs +++ b/rust/blockstore/src/arrow/block/value/uint32array_value.rs @@ -21,7 +21,6 @@ impl ArrowWriteableValue for Vec { type ArrowBuilder = ListBuilder; type SizeTracker = SingleColumnSizeTracker; type PreparedValue = Vec; - type OwnedReadableValue = Vec; fn offset_size(item_count: usize) -> usize { bit_util::round_upto_multiple_of_64((item_count + 1) * size_of::()) @@ -94,8 +93,8 @@ impl ArrowWriteableValue for Vec { fn get_owned_value_from_delta( prefix: &str, key: KeyWrapper, - delta: &BlockDelta, - ) -> Option { + delta: &UnorderedBlockDelta, + ) -> Option { match &delta.builder { BlockStorage::VecUInt32(builder) => builder.get_owned_value(prefix, key), _ => panic!("Invalid builder type"), diff --git a/rust/blockstore/src/arrow/blockfile.rs b/rust/blockstore/src/arrow/blockfile.rs index 7d7f45df528..b1236bd8b27 100644 --- a/rust/blockstore/src/arrow/blockfile.rs +++ b/rust/blockstore/src/arrow/blockfile.rs @@ -228,7 +228,7 @@ impl ArrowUnorderedBlockfileWriter { &self, prefix: &str, key: K, - ) -> Result, Box> { + ) -> Result, Box> { // TODO: for now the BF writer locks the entire write operation let _guard = self.write_mutex.lock().await; @@ -258,7 +258,11 @@ impl ArrowUnorderedBlockfileWriter { return Err(Box::new(e)); } }; - let new_delta = match self.block_manager.fork::(&block.id).await { + let new_delta = match self + .block_manager + .fork::(&block.id) + .await + { Ok(delta) => delta, Err(e) => { return Err(Box::new(e)); diff --git a/rust/blockstore/src/arrow/types.rs b/rust/blockstore/src/arrow/types.rs index 884155ca5fd..19fc2799c19 100644 --- a/rust/blockstore/src/arrow/types.rs +++ b/rust/blockstore/src/arrow/types.rs @@ -22,8 +22,7 @@ pub trait ArrowWriteableValue: Value { /// Every writable value has a corresponding readable value type. For example, the readable value type for a `String` is `&str`. type ReadableValue<'referred_data>: ArrowReadableValue<'referred_data>; /// Some values are a reference type and need to be converted to an owned type or need to be prepared (e.g. serializing a RoaringBitmap) before they can be stored in a delta or Arrow array. - type PreparedValue; - type OwnedReadableValue; + type PreparedValue: Clone; /// Some values use an offsets array. This returns the size of the offsets array given the number of items in the array. fn offset_size(item_count: usize) -> usize; @@ -49,8 +48,8 @@ pub trait ArrowWriteableValue: Value { fn get_owned_value_from_delta( prefix: &str, key: KeyWrapper, - delta: &BlockDelta, - ) -> Option; + delta: &UnorderedBlockDelta, + ) -> Option; } pub trait ArrowReadableKey<'referred_data>: Key + PartialOrd {