Skip to content

Commit

Permalink
get_clone impl on writer
Browse files Browse the repository at this point in the history
  • Loading branch information
sanketkedia committed Oct 30, 2024
1 parent a4f04c5 commit 6dd7191
Show file tree
Hide file tree
Showing 11 changed files with 162 additions and 85 deletions.
9 changes: 9 additions & 0 deletions rust/blockstore/src/arrow/block/delta/data_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ impl DataRecordStorage {
inner.document_size
}

pub fn get_owned_value(&self, prefix: &str, key: KeyWrapper) -> Option<DataRecordStorageEntry> {
let inner = self.inner.read();
let composite_key = CompositeKey {
prefix: prefix.to_string(),
key,
};
inner.storage.get(&composite_key).cloned()
}

pub fn add(&self, prefix: &str, key: KeyWrapper, value: &DataRecord<'_>) {
let mut inner = self.inner.write();
let composite_key = CompositeKey {
Expand Down
44 changes: 44 additions & 0 deletions rust/blockstore/src/arrow/block/delta/single_column_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,17 @@ impl SingleColumnStorage<String> {

(schema.into(), vec![prefix_arr, key_arr, value_arr])
}

pub fn get_owned_value(&self, prefix: &str, key: KeyWrapper) -> Option<String> {
let inner = self.inner.read();
inner
.storage
.get(&CompositeKey {
prefix: prefix.to_string(),
key,
})
.cloned()
}
}

impl SingleColumnStorage<Vec<u32>> {
Expand Down Expand Up @@ -318,6 +329,17 @@ impl SingleColumnStorage<Vec<u32>> {

(schema.into(), vec![prefix_arr, key_arr, value_arr])
}

pub fn get_owned_value(&self, prefix: &str, key: KeyWrapper) -> Option<Vec<u32>> {
let inner = self.inner.read();
inner
.storage
.get(&CompositeKey {
prefix: prefix.to_string(),
key,
})
.cloned()
}
}

impl SingleColumnStorage<u32> {
Expand Down Expand Up @@ -361,6 +383,17 @@ impl SingleColumnStorage<u32> {

(schema.into(), vec![prefix_arr, key_arr, value_arr])
}

pub fn get_owned_value(&self, prefix: &str, key: KeyWrapper) -> Option<u32> {
let inner = self.inner.read();
inner
.storage
.get(&CompositeKey {
prefix: prefix.to_string(),
key,
})
.cloned()
}
}

impl SingleColumnStorage<RoaringBitmap> {
Expand Down Expand Up @@ -415,4 +448,15 @@ impl SingleColumnStorage<RoaringBitmap> {

(schema.into(), vec![prefix_arr, key_arr, value_arr])
}

pub fn get_owned_value(&self, prefix: &str, key: KeyWrapper) -> Option<RoaringBitmap> {
let inner = self.inner.read();
inner
.storage
.get(&CompositeKey {
prefix: prefix.to_string(),
key,
})
.cloned()
}
}
13 changes: 13 additions & 0 deletions rust/blockstore/src/arrow/block/delta/spann_posting_list_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,19 @@ impl SpannPostingListDelta {
self.inner.read().key_size
}

pub fn get_owned_value(
&self,
prefix: &str,
key: KeyWrapper,
) -> Option<SpannPostingListDeltaEntry> {
let read_guard = self.inner.read();
let composite_key = CompositeKey {
prefix: prefix.to_string(),
key,
};
read_guard.storage.get(&composite_key).cloned()
}

pub fn add(&self, prefix: &str, key: KeyWrapper, value: &SpannPostingList<'_>) {
let mut lock_guard = self.inner.write();
let composite_key = CompositeKey {
Expand Down
27 changes: 8 additions & 19 deletions rust/blockstore/src/arrow/block/value/data_record_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;

impl ArrowWriteableValue for &DataRecord<'_> {
type ReadableValue<'referred_data> = DataRecord<'referred_data>;
type OwnedReadableValue = DataRecordStorageEntry;

fn offset_size(item_count: usize) -> usize {
let id_offset = bit_util::round_upto_multiple_of_64((item_count + 1) * 4);
Expand Down Expand Up @@ -48,11 +49,16 @@ impl ArrowWriteableValue for &DataRecord<'_> {
fn get_delta_builder() -> BlockStorage {
BlockStorage::DataRecord(DataRecordStorage::new())
}

fn get_owned_value_from_delta(prefix: &str, key: KeyWrapper, delta: &BlockDelta) -> Option<Self::OwnedReadableValue> {
match &delta.builder {
BlockStorage::DataRecord(builder) => builder.get_owned_value(prefix, key),
_ => panic!("Invalid builder type"),
}
}
}

impl<'referred_data> ArrowReadableValue<'referred_data> for DataRecord<'referred_data> {
type OwnedReadableValue = DataRecordStorageEntry;

fn get(array: &'referred_data Arc<dyn Array>, index: usize) -> Self {
let as_struct_array = array.as_any().downcast_ref::<StructArray>().unwrap();

Expand Down Expand Up @@ -123,21 +129,4 @@ impl<'referred_data> ArrowReadableValue<'referred_data> for DataRecord<'referred
) {
delta.add(prefix, key, &value);
}

fn to_owned(self) -> Self::OwnedReadableValue {
let metadata = match &self.metadata {
Some(metadata) => {
let metadata_proto = Into::<UpdateMetadata>::into(metadata.clone());
let metadata_as_bytes = metadata_proto.encode_to_vec();
Some(metadata_as_bytes)
}
None => None,
};
(
self.id.to_string(),
self.embedding.to_vec(),
metadata,
self.document.map(|s| s.to_string()),
)
}
}
18 changes: 12 additions & 6 deletions rust/blockstore/src/arrow/block/value/int32array_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::{mem::size_of, sync::Arc};

impl ArrowWriteableValue for Vec<u32> {
type ReadableValue<'referred_data> = &'referred_data [u32];
type OwnedReadableValue = Vec<u32>;

fn offset_size(item_count: usize) -> usize {
bit_util::round_upto_multiple_of_64((item_count + 1) * size_of::<u32>())
Expand Down Expand Up @@ -43,11 +44,20 @@ impl ArrowWriteableValue for Vec<u32> {
fn get_delta_builder() -> BlockStorage {
BlockStorage::VecUInt32(SingleColumnStorage::new())
}

fn get_owned_value_from_delta(
prefix: &str,
key: KeyWrapper,
delta: &BlockDelta,
) -> Option<Self::OwnedReadableValue> {
match &delta.builder {
BlockStorage::VecUInt32(builder) => builder.get_owned_value(prefix, key),
_ => panic!("Invalid builder type"),
}
}
}

impl<'referred_data> ArrowReadableValue<'referred_data> for &'referred_data [u32] {
type OwnedReadableValue = Vec<u32>;

fn get(array: &'referred_data Arc<dyn Array>, index: usize) -> Self {
let list_array = array.as_any().downcast_ref::<ListArray>().unwrap();
let start = list_array.value_offsets()[index] as usize;
Expand Down Expand Up @@ -92,8 +102,4 @@ impl<'referred_data> ArrowReadableValue<'referred_data> for &'referred_data [u32
) {
delta.add(prefix, key, value.to_vec());
}

fn to_owned(self) -> Self::OwnedReadableValue {
self.to_vec()
}
}
18 changes: 12 additions & 6 deletions rust/blockstore/src/arrow/block/value/roaring_bitmap_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use roaring::RoaringBitmap;

impl ArrowWriteableValue for RoaringBitmap {
type ReadableValue<'referred_data> = RoaringBitmap;
type OwnedReadableValue = RoaringBitmap;

fn offset_size(item_count: usize) -> usize {
bit_util::round_upto_multiple_of_64((item_count + 1) * 4)
Expand Down Expand Up @@ -43,11 +44,20 @@ impl ArrowWriteableValue for RoaringBitmap {
fn get_delta_builder() -> BlockStorage {
BlockStorage::RoaringBitmap(SingleColumnStorage::new())
}

fn get_owned_value_from_delta(
prefix: &str,
key: KeyWrapper,
delta: &BlockDelta,
) -> Option<Self::OwnedReadableValue> {
match &delta.builder {
BlockStorage::RoaringBitmap(builder) => builder.get_owned_value(prefix, key),
_ => panic!("Invalid builder type"),
}
}
}

impl ArrowReadableValue<'_> for RoaringBitmap {
type OwnedReadableValue = RoaringBitmap;

fn get(array: &std::sync::Arc<dyn Array>, index: usize) -> Self {
let arr = array.as_any().downcast_ref::<BinaryArray>().unwrap();
let bytes = arr.value(index);
Expand All @@ -63,8 +73,4 @@ impl ArrowReadableValue<'_> for RoaringBitmap {
) {
delta.add(prefix, key, value);
}

fn to_owned(self) -> Self::OwnedReadableValue {
self.clone()
}
}
22 changes: 12 additions & 10 deletions rust/blockstore/src/arrow/block/value/spann_posting_list_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{

impl ArrowWriteableValue for &SpannPostingList<'_> {
type ReadableValue<'referred_data> = SpannPostingList<'referred_data>;
type OwnedReadableValue = SpannPostingListDeltaEntry;

// This method is only called for SingleColumnStorage.
fn offset_size(_: usize) -> usize {
Expand Down Expand Up @@ -48,11 +49,20 @@ impl ArrowWriteableValue for &SpannPostingList<'_> {
fn get_delta_builder() -> BlockStorage {
BlockStorage::SpannPostingListDelta(SpannPostingListDelta::new())
}

fn get_owned_value_from_delta(
prefix: &str,
key: KeyWrapper,
delta: &BlockDelta,
) -> Option<Self::OwnedReadableValue> {
match &delta.builder {
BlockStorage::SpannPostingListDelta(builder) => builder.get_owned_value(prefix, key),
_ => panic!("Invalid builder type"),
}
}
}

impl<'referred_data> ArrowReadableValue<'referred_data> for SpannPostingList<'referred_data> {
type OwnedReadableValue = SpannPostingListDeltaEntry;

fn get(array: &'referred_data Arc<dyn Array>, index: usize) -> Self {
let as_struct_array = array.as_any().downcast_ref::<StructArray>().unwrap();

Expand Down Expand Up @@ -123,12 +133,4 @@ impl<'referred_data> ArrowReadableValue<'referred_data> for SpannPostingList<'re
) {
delta.add(prefix, key, &value);
}

fn to_owned(self) -> Self::OwnedReadableValue {
(
self.doc_offset_ids.to_vec(),
self.doc_versions.to_vec(),
self.doc_embeddings.to_vec(),
)
}
}
18 changes: 12 additions & 6 deletions rust/blockstore/src/arrow/block/value/str_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::sync::Arc;

impl ArrowWriteableValue for String {
type ReadableValue<'referred_data> = &'referred_data str;
type OwnedReadableValue = String;

fn offset_size(item_count: usize) -> usize {
bit_util::round_upto_multiple_of_64((item_count + 1) * 4)
Expand All @@ -39,11 +40,20 @@ impl ArrowWriteableValue for String {
fn get_delta_builder() -> BlockStorage {
BlockStorage::String(SingleColumnStorage::new())
}

fn get_owned_value_from_delta(
prefix: &str,
key: KeyWrapper,
delta: &BlockDelta,
) -> Option<Self::OwnedReadableValue> {
match &delta.builder {
BlockStorage::String(builder) => builder.get_owned_value(prefix, key),
_ => panic!("Invalid builder type"),
}
}
}

impl<'referred_data> ArrowReadableValue<'referred_data> for &'referred_data str {
type OwnedReadableValue = String;

fn get(array: &'referred_data Arc<dyn Array>, index: usize) -> &'referred_data str {
let array = array.as_any().downcast_ref::<StringArray>().unwrap();
array.value(index)
Expand All @@ -56,8 +66,4 @@ impl<'referred_data> ArrowReadableValue<'referred_data> for &'referred_data str
) {
delta.add(prefix, key, value.to_string());
}

fn to_owned(self) -> Self::OwnedReadableValue {
self.to_string()
}
}
18 changes: 12 additions & 6 deletions rust/blockstore/src/arrow/block/value/u32_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::sync::Arc;

impl ArrowWriteableValue for u32 {
type ReadableValue<'referred_data> = u32;
type OwnedReadableValue = u32;

fn offset_size(_item_count: usize) -> usize {
0
Expand All @@ -36,11 +37,20 @@ impl ArrowWriteableValue for u32 {
fn get_delta_builder() -> BlockStorage {
BlockStorage::UInt32(SingleColumnStorage::new())
}

fn get_owned_value_from_delta(
prefix: &str,
key: KeyWrapper,
delta: &BlockDelta,
) -> Option<Self::OwnedReadableValue> {
match &delta.builder {
BlockStorage::UInt32(builder) => builder.get_owned_value(prefix, key),
_ => panic!("Invalid builder type: {:?}", &delta.builder),
}
}
}

impl ArrowReadableValue<'_> for u32 {
type OwnedReadableValue = u32;

fn get(array: &Arc<dyn Array>, index: usize) -> u32 {
let array = array.as_any().downcast_ref::<UInt32Array>().unwrap();
array.value(index)
Expand All @@ -53,8 +63,4 @@ impl ArrowReadableValue<'_> for u32 {
) {
delta.add(prefix, key, value);
}

fn to_owned(self) -> Self::OwnedReadableValue {
self
}
}
Loading

0 comments on commit 6dd7191

Please sign in to comment.