Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CLN]: clean up log materialization #3223

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions rust/types/src/data_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,29 @@ pub struct DataRecord<'a> {
pub document: Option<&'a str>,
}

#[derive(Debug, Clone)]
pub struct OwnedDataRecord {
pub id: String,
pub embedding: Vec<f32>,
pub metadata: Option<Metadata>,
pub document: Option<String>,
}

impl<'a> From<&DataRecord<'a>> for OwnedDataRecord {
fn from(data_record: &DataRecord<'a>) -> Self {
let id = data_record.id.to_string();
let embedding = data_record.embedding.to_vec();
let metadata = data_record.metadata.clone();
let document = data_record.document.map(|doc| doc.to_string());
OwnedDataRecord {
id,
embedding,
metadata,
document,
}
}
}

impl DataRecord<'_> {
pub fn get_size(&self) -> usize {
let id_size = self.id.len();
Expand All @@ -28,4 +51,8 @@ impl DataRecord<'_> {
};
id_size + embedding_size + metadata_size + document_size
}

pub fn to_owned(&self) -> OwnedDataRecord {
self.into()
}
}
3 changes: 0 additions & 3 deletions rust/types/src/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ pub enum Operation {

#[derive(Clone, Debug, PartialEq)]
pub enum MaterializedLogOperation {
// Set when the record is initially read from the segment
// before it is processed based on state of the log.
Initial,
// Set for records that don't exist in the segment and
// have been encountered for the first time in the log.
AddNew,
Expand Down
7 changes: 2 additions & 5 deletions rust/worker/src/execution/operators/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,14 @@ pub(crate) struct MetadataLogReader<'me> {
}

impl<'me> MetadataLogReader<'me> {
pub(crate) fn new(logs: &'me Chunk<MaterializedLogRecord<'me>>) -> Self {
pub(crate) fn new(logs: &'me Chunk<MaterializedLogRecord>) -> Self {
let mut compact_metadata: HashMap<_, BTreeMap<&MetadataValue, RoaringBitmap>> =
HashMap::new();
let mut document = HashMap::new();
let mut updated_offset_ids = RoaringBitmap::new();
let mut user_id_to_offset_id = HashMap::new();
for (log, _) in logs.iter() {
if !matches!(
log.final_operation,
MaterializedLogOperation::Initial | MaterializedLogOperation::AddNew
) {
if !matches!(log.final_operation, MaterializedLogOperation::AddNew) {
updated_offset_ids.insert(log.offset_id);
}
if !matches!(
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/execution/operators/hnsw_knn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl ChromaError for HnswKnnOperatorError {
impl HnswKnnOperator {
async fn get_disallowed_ids<'referred_data>(
&self,
logs: Chunk<MaterializedLogRecord<'_>>,
logs: Chunk<MaterializedLogRecord>,
record_segment_reader: &RecordSegmentReader<'_>,
) -> Result<Vec<u32>, Box<dyn ChromaError>> {
let mut disallowed_ids = Vec::new();
Expand Down
7 changes: 2 additions & 5 deletions rust/worker/src/segment/distributed_hnsw_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,10 @@ impl DistributedHNSWSegmentWriter {
}
}

impl<'a> SegmentWriter<'a> for DistributedHNSWSegmentWriter {
impl SegmentWriter for DistributedHNSWSegmentWriter {
async fn apply_materialized_log_chunk(
&self,
records: chroma_types::Chunk<super::MaterializedLogRecord<'a>>,
records: chroma_types::Chunk<super::MaterializedLogRecord>,
) -> Result<(), ApplyMaterializedLogError> {
for (record, _) in records.iter() {
match record.final_operation {
Expand Down Expand Up @@ -284,9 +284,6 @@ impl<'a> SegmentWriter<'a> for DistributedHNSWSegmentWriter {
}
}
}
MaterializedLogOperation::Initial => panic!(
"Invariant violation. Mat records should not contain logs in initial state"
),
}
}
Ok(())
Expand Down
13 changes: 8 additions & 5 deletions rust/worker/src/segment/metadata_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,16 +530,20 @@ impl<'me> MetadataSegmentWriter<'me> {
}
}

impl<'log_records> SegmentWriter<'log_records> for MetadataSegmentWriter<'_> {
impl SegmentWriter for MetadataSegmentWriter<'_> {
async fn apply_materialized_log_chunk(
&self,
records: Chunk<MaterializedLogRecord<'log_records>>,
records: Chunk<MaterializedLogRecord>,
) -> Result<(), ApplyMaterializedLogError> {
let mut count = 0u64;
let full_text_writer_batch = records.iter().filter_map(|record| {
let offset_id = record.0.offset_id;
let old_document = record.0.data_record.as_ref().and_then(|r| r.document);
let new_document = record.0.final_document;
let old_document = record
.0
.data_record
.as_ref()
.and_then(|r| r.document.as_ref().map(|d| d.as_str()));
let new_document = &record.0.final_document;

if matches!(
record.0.final_operation,
Expand Down Expand Up @@ -685,7 +689,6 @@ impl<'log_records> SegmentWriter<'log_records> for MetadataSegmentWriter<'_> {
}

},
MaterializedLogOperation::Initial => panic!("Not expected mat records in the initial state")
}
}
tracing::info!("Applied {} records to metadata segment", count,);
Expand Down
30 changes: 19 additions & 11 deletions rust/worker/src/segment/record_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ pub enum RecordSegmentWriterCreationError {
}

impl RecordSegmentWriter {
async fn construct_and_set_data_record<'a>(
async fn construct_and_set_data_record(
&self,
mat_record: &MaterializedLogRecord<'a>,
mat_record: &MaterializedLogRecord,
user_id: &str,
offset_id: u32,
) -> Result<(), ApplyMaterializedLogError> {
Expand Down Expand Up @@ -337,10 +337,10 @@ impl ChromaError for ApplyMaterializedLogError {
}
}

impl<'a> SegmentWriter<'a> for RecordSegmentWriter {
impl SegmentWriter for RecordSegmentWriter {
async fn apply_materialized_log_chunk(
&self,
records: Chunk<MaterializedLogRecord<'a>>,
records: Chunk<MaterializedLogRecord>,
) -> Result<(), ApplyMaterializedLogError> {
// The max new offset id introduced by materialized logs is initialized as zero
// Since offset id should start from 1, we use this to indicate no new offset id
Expand All @@ -357,7 +357,11 @@ impl<'a> SegmentWriter<'a> for RecordSegmentWriter {
.user_id_to_id
.as_ref()
.unwrap()
.set::<&str, u32>("", log_record.user_id.unwrap(), log_record.offset_id)
.set::<&str, u32>(
"",
log_record.user_id.as_ref().unwrap(),
log_record.offset_id,
)
.await
{
Ok(()) => (),
Expand All @@ -370,7 +374,11 @@ impl<'a> SegmentWriter<'a> for RecordSegmentWriter {
.id_to_user_id
.as_ref()
.unwrap()
.set::<u32, String>("", log_record.offset_id, log_record.user_id.unwrap().to_string())
.set::<u32, String>(
"",
log_record.offset_id,
log_record.user_id.clone().unwrap(),
)
.await
{
Ok(()) => (),
Expand All @@ -382,7 +390,7 @@ impl<'a> SegmentWriter<'a> for RecordSegmentWriter {
match self
.construct_and_set_data_record(
log_record,
log_record.user_id.unwrap(),
log_record.user_id.as_ref().unwrap(),
log_record.offset_id,
)
.await
Expand All @@ -395,7 +403,8 @@ impl<'a> SegmentWriter<'a> for RecordSegmentWriter {
// Set max offset id.
max_new_offset_id = max_new_offset_id.max(log_record.offset_id);
}
MaterializedLogOperation::UpdateExisting | MaterializedLogOperation::OverwriteExisting => {
MaterializedLogOperation::UpdateExisting
| MaterializedLogOperation::OverwriteExisting => {
// Offset id and user id do not need to change. Only data
// needs to change. Blockfile does not have Read then write
// semantics so we'll delete and insert.
Expand All @@ -415,7 +424,7 @@ impl<'a> SegmentWriter<'a> for RecordSegmentWriter {
match self
.construct_and_set_data_record(
log_record,
log_record.data_record.as_ref().unwrap().id,
&log_record.data_record.as_ref().unwrap().id,
log_record.offset_id,
)
.await
Expand All @@ -432,7 +441,7 @@ impl<'a> SegmentWriter<'a> for RecordSegmentWriter {
.user_id_to_id
.as_ref()
.unwrap()
.delete::<&str, u32>("", log_record.data_record.as_ref().unwrap().id)
.delete::<&str, u32>("", &log_record.data_record.as_ref().unwrap().id)
.await
{
Ok(()) => (),
Expand Down Expand Up @@ -470,7 +479,6 @@ impl<'a> SegmentWriter<'a> for RecordSegmentWriter {
}
}
}
MaterializedLogOperation::Initial => panic!("Invariant violation. Materialized logs should not have any logs in the initial state")
}
}
self.max_new_offset_id
Expand Down
Loading
Loading