Skip to content

Commit

Permalink
Rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
sanketkedia committed Nov 11, 2024
1 parent 6c39aaf commit 8c3ddcc
Show file tree
Hide file tree
Showing 12 changed files with 58 additions and 44 deletions.
18 changes: 18 additions & 0 deletions rust/blockstore/src/arrow/block/delta/builder_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ impl<V: ArrowWriteableValue> BTreeBuilderStorage<V> {
self.storage.remove(key)
}

fn get(&self, key: &CompositeKey) -> Option<V::PreparedValue> {
if !self.storage.contains_key(key) {
return None;
}
Some(V::prepare(self.storage.get(key).unwrap().clone()))
}

fn min_key(&self) -> Option<&CompositeKey> {
self.storage.keys().next()
}
Expand Down Expand Up @@ -61,6 +68,10 @@ impl<V: ArrowWriteableValue> VecBuilderStorage<V> {
None
}

fn get(&self, _: &CompositeKey) -> Option<V::PreparedValue> {
unimplemented!()
}

fn min_key(&self) -> Option<&CompositeKey> {
self.storage.first().map(|(key, _)| key)
}
Expand Down Expand Up @@ -137,6 +148,13 @@ impl<V: ArrowWriteableValue> BuilderStorage<V> {
}
}

pub fn get(&self, key: &CompositeKey) -> Option<V::PreparedValue> {
match self {
BuilderStorage::BTreeBuilderStorage(storage) => storage.get(key),
BuilderStorage::VecBuilderStorage(storage) => storage.get(key),
}
}

pub fn len(&self) -> usize {
match self {
BuilderStorage::BTreeBuilderStorage(storage) => storage.len(),
Expand Down
6 changes: 5 additions & 1 deletion rust/blockstore/src/arrow/block/delta/data_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ impl DataRecordStorage {
inner.size_tracker.get_key_size()
}

pub fn get_owned_value(&self, prefix: &str, key: KeyWrapper) -> Option<DataRecordStorageEntry> {
pub fn get_owned_value(
&self,
prefix: &str,
key: KeyWrapper,
) -> Option<(String, Vec<f32>, Option<Vec<u8>>, Option<String>)> {
let inner = self.inner.read();
let composite_key = CompositeKey {
prefix: prefix.to_string(),
Expand Down
16 changes: 7 additions & 9 deletions rust/blockstore/src/arrow/block/delta/single_column_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{
use arrow::util::bit_util;
use arrow::{array::Array, datatypes::Schema};
use parking_lot::RwLock;
use roaring::RoaringBitmap;
use std::sync::Arc;
use std::{collections::HashMap, vec};

Expand Down Expand Up @@ -270,14 +271,11 @@ impl<V: ArrowWriteableValue<SizeTracker = SingleColumnSizeTracker>> SingleColumn
(schema.into(), vec![prefix_arr, key_arr, value_arr])
}

pub fn get_owned_value(&self, prefix: &str, key: KeyWrapper) -> Option<RoaringBitmap> {
let inner = self.inner.read();
inner
.storage
.get(&CompositeKey {
prefix: prefix.to_string(),
key,
})
.cloned()
pub fn get_owned_value(&self, prefix: &str, key: KeyWrapper) -> Option<V::PreparedValue> {
let composite_key = CompositeKey {
prefix: prefix.to_string(),
key,
};
self.inner.read().storage.get(&composite_key)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use chroma_types::SpannPostingList;
use parking_lot::RwLock;

use crate::{
arrow::types::{ArrowWriteableKey, ArrowWriteableValue},
arrow::{
block::value::spann_posting_list_value::SpannPostingListDeltaEntry,
types::{ArrowWriteableKey, ArrowWriteableValue},
},
key::{CompositeKey, KeyWrapper},
};

Expand Down
13 changes: 4 additions & 9 deletions rust/blockstore/src/arrow/block/value/data_record_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,9 @@ use arrow::{
array::{ArrayRef, BinaryArray},
util::bit_util,
};
use chroma_types::{chroma_proto::UpdateMetadata, DataRecord};
use chroma_types::{chroma_proto::UpdateMetadata, DataRecord, MetadataValue};
use prost::Message;
use std::sync::Arc;

// Convenience type for the storage entry
// (id, embedding, metadata, document)
pub type DataRecordStorageEntry = (String, Vec<f32>, Option<Vec<u8>>, Option<String>);
use std::{collections::HashMap, sync::Arc};

pub struct ValueBuilderWrapper {
id_builder: StringBuilder,
Expand All @@ -40,7 +36,6 @@ impl ArrowWriteableValue for &DataRecord<'_> {
type ArrowBuilder = ValueBuilderWrapper;
type SizeTracker = DataRecordSizeTracker;
type PreparedValue = (String, Vec<f32>, Option<Vec<u8>>, Option<String>);
type OwnedReadableValue = DataRecordStorageEntry;

fn offset_size(item_count: usize) -> usize {
let id_offset = bit_util::round_upto_multiple_of_64((item_count + 1) * 4);
Expand Down Expand Up @@ -190,8 +185,8 @@ impl ArrowWriteableValue for &DataRecord<'_> {
fn get_owned_value_from_delta(
prefix: &str,
key: KeyWrapper,
delta: &BlockDelta,
) -> Option<Self::OwnedReadableValue> {
delta: &UnorderedBlockDelta,
) -> Option<Self::PreparedValue> {
match &delta.builder {
BlockStorage::DataRecord(builder) => builder.get_owned_value(prefix, key),
_ => panic!("Invalid builder type"),
Expand Down
5 changes: 2 additions & 3 deletions rust/blockstore/src/arrow/block/value/roaring_bitmap_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ impl ArrowWriteableValue for RoaringBitmap {
type ArrowBuilder = BinaryBuilder;
type SizeTracker = SingleColumnSizeTracker;
type PreparedValue = Vec<u8>;
type OwnedReadableValue = RoaringBitmap;

fn offset_size(item_count: usize) -> usize {
bit_util::round_upto_multiple_of_64((item_count + 1) * 4)
Expand Down Expand Up @@ -86,8 +85,8 @@ impl ArrowWriteableValue for RoaringBitmap {
fn get_owned_value_from_delta(
prefix: &str,
key: KeyWrapper,
delta: &BlockDelta,
) -> Option<Self::OwnedReadableValue> {
delta: &UnorderedBlockDelta,
) -> Option<Self::PreparedValue> {
match &delta.builder {
BlockStorage::RoaringBitmap(builder) => builder.get_owned_value(prefix, key),
_ => panic!("Invalid builder type"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
BlockfileWriterMutationOrdering,
};

type SpannPostingListDeltaEntry = (Vec<u32>, Vec<u32>, Vec<f32>);
pub type SpannPostingListDeltaEntry = (Vec<u32>, Vec<u32>, Vec<f32>);

pub struct SpannPostingListBuilderWrapper {
doc_offset_ids_builder: ListBuilder<UInt32Builder>,
Expand All @@ -35,7 +35,6 @@ impl ArrowWriteableValue for &SpannPostingList<'_> {
type PreparedValue = SpannPostingListDeltaEntry;
type SizeTracker = SpannPostingListSizeTracker;
type ArrowBuilder = SpannPostingListBuilderWrapper;
type OwnedReadableValue = SpannPostingListDeltaEntry;

// This method is only called for SingleColumnStorage.
fn offset_size(_: usize) -> usize {
Expand Down Expand Up @@ -185,8 +184,8 @@ impl ArrowWriteableValue for &SpannPostingList<'_> {
fn get_owned_value_from_delta(
prefix: &str,
key: KeyWrapper,
delta: &BlockDelta,
) -> Option<Self::OwnedReadableValue> {
delta: &UnorderedBlockDelta,
) -> Option<Self::PreparedValue> {
match &delta.builder {
BlockStorage::SpannPostingListDelta(builder) => builder.get_owned_value(prefix, key),
_ => panic!("Invalid builder type"),
Expand Down
5 changes: 2 additions & 3 deletions rust/blockstore/src/arrow/block/value/str_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ impl ArrowWriteableValue for String {
type ArrowBuilder = StringBuilder;
type SizeTracker = SingleColumnSizeTracker;
type PreparedValue = String;
type OwnedReadableValue = String;

fn offset_size(item_count: usize) -> usize {
bit_util::round_upto_multiple_of_64((item_count + 1) * 4)
Expand Down Expand Up @@ -74,8 +73,8 @@ impl ArrowWriteableValue for String {
fn get_owned_value_from_delta(
prefix: &str,
key: KeyWrapper,
delta: &BlockDelta,
) -> Option<Self::OwnedReadableValue> {
delta: &UnorderedBlockDelta,
) -> Option<Self::PreparedValue> {
match &delta.builder {
BlockStorage::String(builder) => builder.get_owned_value(prefix, key),
_ => panic!("Invalid builder type"),
Expand Down
7 changes: 2 additions & 5 deletions rust/blockstore/src/arrow/block/value/u32_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ impl ArrowWriteableValue for u32 {
type ArrowBuilder = UInt32Builder;
type SizeTracker = SingleColumnSizeTracker;
type PreparedValue = u32;
type OwnedReadableValue = u32;

fn offset_size(_item_count: usize) -> usize {
0
Expand Down Expand Up @@ -73,8 +72,8 @@ impl ArrowWriteableValue for u32 {
fn get_owned_value_from_delta(
prefix: &str,
key: KeyWrapper,
delta: &BlockDelta,
) -> Option<Self::OwnedReadableValue> {
delta: &UnorderedBlockDelta,
) -> Option<Self::PreparedValue> {
match &delta.builder {
BlockStorage::UInt32(builder) => builder.get_owned_value(prefix, key),
_ => panic!("Invalid builder type: {:?}", &delta.builder),
Expand All @@ -83,8 +82,6 @@ impl ArrowWriteableValue for u32 {
}

impl<'a> ArrowReadableValue<'a> for u32 {
type OwnedReadableValue = u32;

fn get(array: &Arc<dyn Array>, index: usize) -> u32 {
let array = array.as_any().downcast_ref::<UInt32Array>().unwrap();
array.value(index)
Expand Down
5 changes: 2 additions & 3 deletions rust/blockstore/src/arrow/block/value/uint32array_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ impl ArrowWriteableValue for Vec<u32> {
type ArrowBuilder = ListBuilder<UInt32Builder>;
type SizeTracker = SingleColumnSizeTracker;
type PreparedValue = Vec<u32>;
type OwnedReadableValue = Vec<u32>;

fn offset_size(item_count: usize) -> usize {
bit_util::round_upto_multiple_of_64((item_count + 1) * size_of::<u32>())
Expand Down Expand Up @@ -94,8 +93,8 @@ impl ArrowWriteableValue for Vec<u32> {
fn get_owned_value_from_delta(
prefix: &str,
key: KeyWrapper,
delta: &BlockDelta,
) -> Option<Self::OwnedReadableValue> {
delta: &UnorderedBlockDelta,
) -> Option<Self::PreparedValue> {
match &delta.builder {
BlockStorage::VecUInt32(builder) => builder.get_owned_value(prefix, key),
_ => panic!("Invalid builder type"),
Expand Down
8 changes: 6 additions & 2 deletions rust/blockstore/src/arrow/blockfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl ArrowUnorderedBlockfileWriter {
&self,
prefix: &str,
key: K,
) -> Result<Option<V::OwnedReadableValue>, Box<dyn ChromaError>> {
) -> Result<Option<V::PreparedValue>, Box<dyn ChromaError>> {
// TODO: for now the BF writer locks the entire write operation
let _guard = self.write_mutex.lock().await;

Expand Down Expand Up @@ -258,7 +258,11 @@ impl ArrowUnorderedBlockfileWriter {
return Err(Box::new(e));
}
};
let new_delta = match self.block_manager.fork::<K, V>(&block.id).await {
let new_delta = match self
.block_manager
.fork::<K, V, UnorderedBlockDelta>(&block.id)
.await
{
Ok(delta) => delta,
Err(e) => {
return Err(Box::new(e));
Expand Down
7 changes: 3 additions & 4 deletions rust/blockstore/src/arrow/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ pub trait ArrowWriteableValue: Value {
/// Every writable value has a corresponding readable value type. For example, the readable value type for a `String` is `&str`.
type ReadableValue<'referred_data>: ArrowReadableValue<'referred_data>;
/// Some values are a reference type and need to be converted to an owned type or need to be prepared (e.g. serializing a RoaringBitmap) before they can be stored in a delta or Arrow array.
type PreparedValue;
type OwnedReadableValue;
type PreparedValue: Clone;

/// Some values use an offsets array. This returns the size of the offsets array given the number of items in the array.
fn offset_size(item_count: usize) -> usize;
Expand All @@ -49,8 +48,8 @@ pub trait ArrowWriteableValue: Value {
fn get_owned_value_from_delta(
prefix: &str,
key: KeyWrapper,
delta: &BlockDelta,
) -> Option<Self::OwnedReadableValue>;
delta: &UnorderedBlockDelta,
) -> Option<Self::PreparedValue>;
}

pub trait ArrowReadableKey<'referred_data>: Key + PartialOrd {
Expand Down

0 comments on commit 8c3ddcc

Please sign in to comment.