Skip to content

Commit

Permalink
[ENH] Add a materialized record type as well as make segment interfac…
Browse files Browse the repository at this point in the history
…e chunk (#1964)

## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
	 - Adds a materialized record type for consumption downstream.
 - New functionality
	 - None.

## Test plan
*How are these changes tested?*

- [ ] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Documentation Changes
*Are all docstrings for user-facing APIs updated if required? Do we need
to make documentation changes in the [docs
repository](https://github.com/chroma-core/docs)?*
  • Loading branch information
HammadB authored Apr 26, 2024
1 parent f06c578 commit 8cb6c50
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 6 deletions.
2 changes: 1 addition & 1 deletion rust/worker/src/execution/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub(crate) mod config;
mod data;
pub(crate) mod data;
pub(crate) mod dispatcher;
pub(crate) mod operator;
mod operators;
Expand Down
1 change: 1 addition & 0 deletions rust/worker/src/segment/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub(crate) mod config;
mod distributed_hnsw_segment;
mod record_segment;
mod types;
36 changes: 36 additions & 0 deletions rust/worker/src/segment/record_segment.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use super::types::{LogMaterializer, MaterializedLogRecord, SegmentWriter};
use crate::blockstore::Blockfile;
use crate::execution::data::data_chunk::Chunk;
use crate::types::LogRecord;

struct RecordSegment {
records: Box<dyn Blockfile>,
}

impl SegmentWriter for RecordSegment {
fn begin_transaction(&self) {
todo!()
}

fn apply_materialized_log_chunk(&self, records: Chunk<MaterializedLogRecord>) {
todo!()
}

fn apply_log_chunk(&self, records: Chunk<LogRecord>) {
todo!()
}

fn commit_transaction(&self) {
todo!()
}

fn rollback_transaction(&self) {
todo!()
}
}

impl LogMaterializer for RecordSegment {
fn materialize(&self, records: Chunk<LogRecord>) -> Chunk<MaterializedLogRecord> {
todo!()
}
}
93 changes: 88 additions & 5 deletions rust/worker/src/segment/types.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,95 @@
use crate::types::LogRecord;
use crate::execution::data::data_chunk::Chunk;
use crate::types::{LogRecord, Metadata};

trait SegmentWriter {
pub(super) struct MaterializedLogRecord<'a> {
segment_offset_id: u32,
log_record: &'a LogRecord,
materialized_record: DataRecord<'a>,
}

pub(super) struct DataRecord<'a> {
id: &'a str,
embedding: &'a [f32],
metadata: &'a Option<Metadata>,
document: &'a Option<String>,
}

pub(super) trait SegmentWriter {
fn begin_transaction(&self);
fn write_records(&self, records: Vec<Box<LogRecord>>, offset_ids: Vec<u32>);
fn apply_materialized_log_chunk(&self, records: Chunk<MaterializedLogRecord>);
fn apply_log_chunk(&self, records: Chunk<LogRecord>);
fn commit_transaction(&self);
fn rollback_transaction(&self);
}

trait OffsetIdAssigner: SegmentWriter {
fn assign_offset_ids(&self, records: Vec<Box<LogRecord>>) -> Vec<u32>;
pub(crate) trait LogMaterializer: SegmentWriter {
fn materialize(&self, records: Chunk<LogRecord>) -> Chunk<MaterializedLogRecord>;
}

#[cfg(test)]
mod tests {
use super::*;
use crate::types::{MetadataValue, Operation, OperationRecord};
use std::collections::HashMap;

// This is just a POC test to show how the materialize method could be tested, we can
// remove it later
#[test]
fn test_materialize() {
let mut metadata_1 = HashMap::new();
metadata_1.insert("key".to_string(), MetadataValue::Str("value".to_string()));
let metadata_1 = Some(metadata_1);

let data = vec![
LogRecord {
log_offset: 1,
record: OperationRecord {
id: "embedding_id_1".to_string(),
embedding: None,
encoding: None,
metadata: None,
operation: Operation::Add,
},
},
LogRecord {
log_offset: 2,
record: OperationRecord {
id: "embedding_id_2".to_string(),
embedding: None,
encoding: None,
metadata: None,
operation: Operation::Add,
},
},
LogRecord {
log_offset: 3,
record: OperationRecord {
id: "embedding_id_1".to_string(),
embedding: None,
encoding: None,
metadata: None,
operation: Operation::Add,
},
},
];
let data: Chunk<LogRecord> = Chunk::new(data.into());

let materialized_data = data
.iter()
.map(|record| MaterializedLogRecord {
segment_offset_id: 0,
log_record: record.0,
materialized_record: DataRecord {
id: &record.0.record.id,
embedding: &[],
metadata: &metadata_1,
document: &None,
},
})
.collect::<Vec<_>>();

let materialized_chunk = Chunk::new(materialized_data.into());
drop(materialized_chunk);
drop(data);
}
}

0 comments on commit 8cb6c50

Please sign in to comment.