From fb1dcf7b27c3e93a28a59eb17631949201c6d815 Mon Sep 17 00:00:00 2001 From: Max Isom Date: Mon, 2 Dec 2024 11:27:12 -0800 Subject: [PATCH] Get rid of Initial operation state --- rust/types/src/operation.rs | 3 - rust/worker/src/execution/operators/filter.rs | 5 +- .../src/segment/distributed_hnsw_segment.rs | 3 - rust/worker/src/segment/metadata_segment.rs | 1 - rust/worker/src/segment/record_segment.rs | 10 +- rust/worker/src/segment/types.rs | 517 +++++++++--------- rust/worker/src/server.rs | 4 +- 7 files changed, 282 insertions(+), 261 deletions(-) diff --git a/rust/types/src/operation.rs b/rust/types/src/operation.rs index 0ca6c2c24e1e..3b2315213e76 100644 --- a/rust/types/src/operation.rs +++ b/rust/types/src/operation.rs @@ -13,9 +13,6 @@ pub enum Operation { #[derive(Clone, Debug, PartialEq)] pub enum MaterializedLogOperation { - // Set when the record is initially read from the segment - // before it is processed based on state of the log. - Initial, // Set for records that don't exist in the segment and // have been encountered for the first time in the log. AddNew, diff --git a/rust/worker/src/execution/operators/filter.rs b/rust/worker/src/execution/operators/filter.rs index 635636b11377..a542a4966826 100644 --- a/rust/worker/src/execution/operators/filter.rs +++ b/rust/worker/src/execution/operators/filter.rs @@ -113,10 +113,7 @@ impl<'me> MetadataLogReader<'me> { let mut updated_offset_ids = RoaringBitmap::new(); let mut user_id_to_offset_id = HashMap::new(); for (log, _) in logs.iter() { - if !matches!( - log.final_operation, - MaterializedLogOperation::Initial | MaterializedLogOperation::AddNew - ) { + if !matches!(log.final_operation, MaterializedLogOperation::AddNew) { updated_offset_ids.insert(log.offset_id); } if !matches!( diff --git a/rust/worker/src/segment/distributed_hnsw_segment.rs b/rust/worker/src/segment/distributed_hnsw_segment.rs index bff4a208b86c..8bf5043b9a47 100644 --- a/rust/worker/src/segment/distributed_hnsw_segment.rs +++ b/rust/worker/src/segment/distributed_hnsw_segment.rs @@ -284,9 +284,6 @@ impl<'a> SegmentWriter<'a> for DistributedHNSWSegmentWriter { } } } - MaterializedLogOperation::Initial => panic!( - "Invariant violation. Mat records should not contain logs in initial state" - ), } } Ok(()) diff --git a/rust/worker/src/segment/metadata_segment.rs b/rust/worker/src/segment/metadata_segment.rs index c5410479cf92..2b291ac1aefb 100644 --- a/rust/worker/src/segment/metadata_segment.rs +++ b/rust/worker/src/segment/metadata_segment.rs @@ -685,7 +685,6 @@ impl<'log_records> SegmentWriter<'log_records> for MetadataSegmentWriter<'_> { } }, - MaterializedLogOperation::Initial => panic!("Not expected mat records in the initial state") } } tracing::info!("Applied {} records to metadata segment", count,); diff --git a/rust/worker/src/segment/record_segment.rs b/rust/worker/src/segment/record_segment.rs index 379cc918749e..970fe3320364 100644 --- a/rust/worker/src/segment/record_segment.rs +++ b/rust/worker/src/segment/record_segment.rs @@ -370,7 +370,11 @@ impl<'a> SegmentWriter<'a> for RecordSegmentWriter { .id_to_user_id .as_ref() .unwrap() - .set::("", log_record.offset_id, log_record.user_id.unwrap().to_string()) + .set::( + "", + log_record.offset_id, + log_record.user_id.unwrap().to_string(), + ) .await { Ok(()) => (), @@ -395,7 +399,8 @@ impl<'a> SegmentWriter<'a> for RecordSegmentWriter { // Set max offset id. max_new_offset_id = max_new_offset_id.max(log_record.offset_id); } - MaterializedLogOperation::UpdateExisting | MaterializedLogOperation::OverwriteExisting => { + MaterializedLogOperation::UpdateExisting + | MaterializedLogOperation::OverwriteExisting => { // Offset id and user id do not need to change. Only data // needs to change. Blockfile does not have Read then write // semantics so we'll delete and insert. @@ -470,7 +475,6 @@ impl<'a> SegmentWriter<'a> for RecordSegmentWriter { } } } - MaterializedLogOperation::Initial => panic!("Invariant violation. Materialized logs should not have any logs in the initial state") } } self.max_new_offset_id diff --git a/rust/worker/src/segment/types.rs b/rust/worker/src/segment/types.rs index 11e1845741f9..c33d39cd104b 100644 --- a/rust/worker/src/segment/types.rs +++ b/rust/worker/src/segment/types.rs @@ -336,24 +336,24 @@ impl<'referred_data> MaterializedLogRecord<'referred_data> { } } -impl<'referred_data> From<(DataRecord<'referred_data>, u32)> - for MaterializedLogRecord<'referred_data> -{ - fn from(data_record_info: (DataRecord<'referred_data>, u32)) -> Self { - let data_record = data_record_info.0; - let offset_id = data_record_info.1; - Self { - data_record: Some(data_record), - offset_id, - user_id: None, - final_operation: MaterializedLogOperation::Initial, - metadata_to_be_merged: None, - metadata_to_be_deleted: None, - final_document: None, - final_embedding: None, - } - } -} +// impl<'referred_data> From<(DataRecord<'referred_data>, u32)> +// for MaterializedLogRecord<'referred_data> +// { +// fn from(data_record_info: (DataRecord<'referred_data>, u32)) -> Self { +// let data_record = data_record_info.0; +// let offset_id = data_record_info.1; +// Self { +// data_record: Some(data_record), +// offset_id, +// user_id: None, +// final_operation: MaterializedLogOperation::Initial, +// metadata_to_be_merged: None, +// metadata_to_be_deleted: None, +// final_document: None, +// final_embedding: None, +// } +// } +// } // Creates a materialized log record from the corresponding entry // in the log (OperationRecord), offset id in storage where it will be stored (u32) @@ -438,8 +438,8 @@ pub async fn materialize_logs<'me>( }; // Populate entries that are present in the record segment. - let mut existing_id_to_materialized: HashMap<&str, MaterializedLogRecord> = HashMap::new(); - let mut new_id_to_materialized: HashMap<&str, MaterializedLogRecord> = HashMap::new(); + let mut existing_id_to_record: HashMap<&str, (DataRecord, u32)> = HashMap::new(); + let mut id_to_materialized: HashMap<&str, MaterializedLogRecord> = HashMap::new(); if let Some(reader) = &record_segment_reader { async { for (log_record, _) in logs.iter() { @@ -458,10 +458,8 @@ pub async fn materialize_logs<'me>( .await { Ok(Some((data_record, offset_id))) => { - existing_id_to_materialized.insert( - log_record.record.id.as_str(), - MaterializedLogRecord::from((data_record, offset_id)), - ); + existing_id_to_record + .insert(log_record.record.id.as_str(), (data_record, offset_id)); } Ok(None) => { return Err(LogMaterializerError::RecordSegment(Box::new( @@ -485,116 +483,208 @@ pub async fn materialize_logs<'me>( ) .await?; } + // Populate updates to these and fresh records that are being // inserted for the first time. - async { - for (log_record, _) in logs.iter() { - match log_record.record.operation { - Operation::Add => { - // If this is an add of a record present in the segment then add - // only if it has been previously deleted in the log. - if existing_id_to_materialized.contains_key(log_record.record.id.as_str()) { - // safe to unwrap - let operation = existing_id_to_materialized - .get(log_record.record.id.as_str()) - .unwrap() - .final_operation - .clone(); - match operation { - MaterializedLogOperation::DeleteExisting => { - let curr_val = existing_id_to_materialized.remove(log_record.record.id.as_str()).unwrap(); - // Overwrite. - let mut materialized_record = - match MaterializedLogRecord::try_from(( - &log_record.record, - curr_val.offset_id, - log_record.record.id.as_str(), - )) { - Ok(record) => record, + // async { + for (log_record, _) in logs.iter() { + match log_record.record.operation { + Operation::Add => { + match ( + existing_id_to_record.get(log_record.record.id.as_str()), + id_to_materialized.get_mut(log_record.record.id.as_str()), + ) { + (Some(_), None) => { + // Offset ID already exists in segment and this ID has not been modified in the log up to this point, so the add is a noop/invalid. + continue; + } + (None, Some(_)) => { + // Offset ID does not exist in segment but does exist in log up to this point (a new add), so this second add is a noop/invalid. + continue; + } + (Some(_), Some(materialized_operation)) => { + match materialized_operation.final_operation { + MaterializedLogOperation::AddNew => panic!("Invariant violation. AddNew state not expected for records that exist in the segment."), + MaterializedLogOperation::OverwriteExisting | MaterializedLogOperation::UpdateExisting => { + // This is a noop. + continue; + } + + MaterializedLogOperation::DeleteExisting => { + match merge_update_metadata( + ( + &materialized_operation.metadata_to_be_merged, + &materialized_operation.metadata_to_be_deleted, + ), + &log_record.record.metadata, + ) { + Ok(meta) => { + materialized_operation.metadata_to_be_merged = meta.0; + materialized_operation.metadata_to_be_deleted = meta.1; + } Err(e) => { - return Err(e); + return Err(LogMaterializerError::MetadataMaterialization(e)); } }; - materialized_record.data_record = curr_val.data_record; - materialized_record.final_operation = - MaterializedLogOperation::OverwriteExisting; - existing_id_to_materialized - .insert(log_record.record.id.as_str(), materialized_record); - }, - MaterializedLogOperation::AddNew => panic!("Invariant violation. Existing record can never have an Add new state"), - MaterializedLogOperation::Initial | MaterializedLogOperation::OverwriteExisting | MaterializedLogOperation::UpdateExisting => { - // Invalid add so skip. - continue; + if let Some(doc) = log_record.record.document.as_ref() { + materialized_operation.final_document = Some(doc); + } + if let Some(emb) = log_record.record.embedding.as_ref() { + materialized_operation.final_embedding = Some(emb.as_slice()); + } + // This record is not present on storage yet hence final operation is + // AddNew and not UpdateExisting. + materialized_operation.final_operation = MaterializedLogOperation::OverwriteExisting; + } } + } + (None, None) => { + let next_offset_id = + next_offset_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let materialized_record = match MaterializedLogRecord::try_from(( + &log_record.record, + next_offset_id, + log_record.record.id.as_str(), + )) { + Ok(record) => record, + Err(e) => { + return Err(e); } - } - // Adding an entry that does not exist on the segment yet. - // Only add if it hasn't been added before in the log. - else if !new_id_to_materialized.contains_key(log_record.record.id.as_str()) { - let next_offset_id = - next_offset_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - let materialized_record = match MaterializedLogRecord::try_from(( - &log_record.record, - next_offset_id, - log_record.record.id.as_str(), - )) { - Ok(record) => record, - Err(e) => { - return Err(e); - } - }; - new_id_to_materialized - .insert(log_record.record.id.as_str(), materialized_record); - } + }; + id_to_materialized + .insert(log_record.record.id.as_str(), materialized_record); + } + }; + } + Operation::Delete => { + // If the delete is for a record that is currently not in the + // record segment, then we can just NOT process these records + // at all. On the other hand if it is for a record that is currently + // in segment then we'll have to pass it as a delete + // to the compactor so that it can be deleted. + + match id_to_materialized.remove(log_record.record.id.as_str()) { + Some(_) => { + // Successfully removed } - Operation::Delete => { - // If the delete is for a record that is currently not in the - // record segment, then we can just NOT process these records - // at all. On the other hand if it is for a record that is currently - // in segment then we'll have to pass it as a delete - // to the compactor so that it can be deleted. - if new_id_to_materialized.contains_key(log_record.record.id.as_str()) { - new_id_to_materialized.remove(log_record.record.id.as_str()); - } else if existing_id_to_materialized - .contains_key(log_record.record.id.as_str()) + None => { + // Was not found in processed log records, check the segment + if let Some(existing_record) = + existing_id_to_record.get(log_record.record.id.as_str()) { - // Mark state as deleted. Other fields become noop after such a delete. - let record_from_map = existing_id_to_materialized - .get_mut(log_record.record.id.as_str()) - .unwrap(); - record_from_map.final_operation = MaterializedLogOperation::DeleteExisting; - record_from_map.final_document = None; - record_from_map.final_embedding = None; - record_from_map.metadata_to_be_merged = None; - record_from_map.metadata_to_be_deleted = None; - record_from_map.user_id = None; + id_to_materialized.insert( + log_record.record.id.as_str(), + MaterializedLogRecord { + data_record: Some(existing_record.0.clone()), + offset_id: existing_record.1, + user_id: None, + final_operation: MaterializedLogOperation::DeleteExisting, + metadata_to_be_merged: None, + metadata_to_be_deleted: None, + final_document: None, + final_embedding: None, + }, + ); } } - Operation::Update => { - let record_from_map = match existing_id_to_materialized - .get_mut(log_record.record.id.as_str()) - { - Some(res) => { - match res.final_operation { - // Ignore the update if deleted. - MaterializedLogOperation::DeleteExisting => { - continue; - }, - MaterializedLogOperation::AddNew => panic!("Invariant violation. AddNew state not expected for an entry that exists on the segment"), - MaterializedLogOperation::Initial | MaterializedLogOperation::OverwriteExisting | MaterializedLogOperation::UpdateExisting => {} - } - res - } - None => match new_id_to_materialized.get_mut(log_record.record.id.as_str()) - { - Some(res) => res, - None => { - // Does not exist in either maps. Ignore this update. - continue; - } - }, + } + } + Operation::Update => { + // todo: rename + let record_from_map = match ( + existing_id_to_record.get(log_record.record.id.as_str()), + id_to_materialized.get_mut(log_record.record.id.as_str()), + ) { + (Some(existing_record), None) => { + // todo: operation here doesn't matter + let new_record = MaterializedLogRecord { + data_record: Some(existing_record.0.clone()), + offset_id: existing_record.1, + user_id: None, + final_operation: MaterializedLogOperation::UpdateExisting, + metadata_to_be_merged: None, + metadata_to_be_deleted: None, + final_document: None, + final_embedding: None, + }; + id_to_materialized.insert(existing_record.0.id, new_record); + id_to_materialized.get_mut(existing_record.0.id).unwrap() + // todo + } + (None, Some(materialized_operation)) => materialized_operation, + (Some(_), Some(materialized_operation)) => materialized_operation, + (None, None) => { + // Invalid update because the offset ID does not exist in the record segment or log (processed up to this point) + continue; + } + }; + + match merge_update_metadata( + ( + &record_from_map.metadata_to_be_merged, + &record_from_map.metadata_to_be_deleted, + ), + &log_record.record.metadata, + ) { + Ok(meta) => { + record_from_map.metadata_to_be_merged = meta.0; + record_from_map.metadata_to_be_deleted = meta.1; + } + Err(e) => { + return Err(LogMaterializerError::MetadataMaterialization(e)); + } + }; + if let Some(doc) = log_record.record.document.as_ref() { + record_from_map.final_document = Some(doc); + } + if let Some(emb) = log_record.record.embedding.as_ref() { + record_from_map.final_embedding = Some(emb.as_slice()); + } + } + Operation::Upsert => { + match ( + existing_id_to_record.get(log_record.record.id.as_str()), + id_to_materialized.get_mut(log_record.record.id.as_str()), + ) { + (Some(existing_record), None) => { + // Update. + let mut materialized_operation = MaterializedLogRecord { + data_record: Some(existing_record.0.clone()), + offset_id: existing_record.1, + user_id: None, + final_operation: MaterializedLogOperation::UpdateExisting, + metadata_to_be_merged: None, + metadata_to_be_deleted: None, + final_document: None, + final_embedding: None, }; + let merged_metadata = merge_update_metadata( + ( + &materialized_operation.metadata_to_be_merged, + &materialized_operation.metadata_to_be_deleted, + ), + &log_record.record.metadata, + ) + .map_err(LogMaterializerError::MetadataMaterialization)?; + materialized_operation.metadata_to_be_merged = merged_metadata.0; + materialized_operation.metadata_to_be_deleted = merged_metadata.1; + + if let Some(doc) = log_record.record.document.as_ref() { + materialized_operation.final_document = Some(doc); + } + + if let Some(emb) = log_record.record.embedding.as_ref() { + materialized_operation.final_embedding = Some(emb.as_slice()); + } + + id_to_materialized + .insert(log_record.record.id.as_str(), materialized_operation); + } + (None, Some(materialized_operation)) => { + // Update. + // todo: rename + let record_from_map = materialized_operation; match merge_update_metadata( ( &record_from_map.metadata_to_be_merged, @@ -616,148 +706,83 @@ pub async fn materialize_logs<'me>( if let Some(emb) = log_record.record.embedding.as_ref() { record_from_map.final_embedding = Some(emb.as_slice()); } - match record_from_map.final_operation { - MaterializedLogOperation::Initial => { - record_from_map.final_operation = - MaterializedLogOperation::UpdateExisting; - } - // State remains as is. - MaterializedLogOperation::AddNew - | MaterializedLogOperation::OverwriteExisting - | MaterializedLogOperation::UpdateExisting => {} - // Not expected. - MaterializedLogOperation::DeleteExisting => { - panic!("Invariant violation. Should not be updating a deleted record") - } - } + // This record is not present on storage yet hence final operation is + // AddNew and not UpdateExisting. + record_from_map.final_operation = MaterializedLogOperation::AddNew; } - Operation::Upsert => { - if existing_id_to_materialized.contains_key(log_record.record.id.as_str()) { - // safe to unwrap here. - let operation = existing_id_to_materialized - .get(log_record.record.id.as_str()) - .unwrap() - .final_operation - .clone(); - match operation { - MaterializedLogOperation::DeleteExisting => { - let curr_val = existing_id_to_materialized.remove(log_record.record.id.as_str()).unwrap(); - // Overwrite. - let mut materialized_record = - match MaterializedLogRecord::try_from(( - &log_record.record, - curr_val.offset_id, - log_record.record.id.as_str(), - )) { - Ok(record) => record, + (Some(_), Some(materialized_operation)) => { + match materialized_operation.final_operation { + MaterializedLogOperation::DeleteExisting => { + let current_value = id_to_materialized.remove(log_record.record.id.as_str()).unwrap(); // todo + + // Overwrite. + let mut materialized_record = + match MaterializedLogRecord::try_from(( + &log_record.record, + current_value.offset_id, + log_record.record.id.as_str(), + )) { + Ok(record) => record, + Err(e) => { + return Err(e); + } + }; + materialized_record.data_record = current_value.data_record; + materialized_record.final_operation = + MaterializedLogOperation::OverwriteExisting; + id_to_materialized + .insert(log_record.record.id.as_str(), materialized_record); + }, + MaterializedLogOperation::AddNew => panic!("Invariant violation. AddNew state not expected for records that exist in the segment"), + MaterializedLogOperation::OverwriteExisting | MaterializedLogOperation::UpdateExisting => { + // Update. + match merge_update_metadata((&materialized_operation.metadata_to_be_merged, &materialized_operation.metadata_to_be_deleted,),&log_record.record.metadata,) { + Ok(meta) => { + materialized_operation.metadata_to_be_merged = meta.0; + materialized_operation.metadata_to_be_deleted = meta.1; + } Err(e) => { - return Err(e); + return Err(LogMaterializerError::MetadataMaterialization(e)); } }; - materialized_record.data_record = curr_val.data_record; - materialized_record.final_operation = - MaterializedLogOperation::OverwriteExisting; - existing_id_to_materialized - .insert(log_record.record.id.as_str(), materialized_record); - }, - MaterializedLogOperation::AddNew => panic!("Invariant violation. AddNew state not expected for records that exist in the segment"), - MaterializedLogOperation::Initial | MaterializedLogOperation::OverwriteExisting | MaterializedLogOperation::UpdateExisting => { - // Update. - let record_from_map = existing_id_to_materialized.get_mut(log_record.record.id.as_str()).unwrap(); - match merge_update_metadata((&record_from_map.metadata_to_be_merged, &record_from_map.metadata_to_be_deleted,),&log_record.record.metadata,) { - Ok(meta) => { - record_from_map.metadata_to_be_merged = meta.0; - record_from_map.metadata_to_be_deleted = meta.1; - } - Err(e) => { - return Err(LogMaterializerError::MetadataMaterialization(e)); - } - }; - if let Some(doc) = log_record.record.document.as_ref() { - record_from_map.final_document = Some(doc); - } - if let Some(emb) = log_record.record.embedding.as_ref() { - record_from_map.final_embedding = Some(emb.as_slice()); - } - match record_from_map.final_operation { - MaterializedLogOperation::Initial => { - record_from_map.final_operation = - MaterializedLogOperation::UpdateExisting; + if let Some(doc) = log_record.record.document.as_ref() { + materialized_operation.final_document = Some(doc); } - // State remains as is. - MaterializedLogOperation::AddNew - | MaterializedLogOperation::OverwriteExisting - | MaterializedLogOperation::UpdateExisting => {} - // Not expected. - MaterializedLogOperation::DeleteExisting => { - panic!("Invariant violation. Should not be updating a deleted record") + if let Some(emb) = log_record.record.embedding.as_ref() { + materialized_operation.final_embedding = Some(emb.as_slice()); } } } + } + (None, None) => { + // Insert. + let next_offset = + next_offset_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let materialized_record = match MaterializedLogRecord::try_from(( + &log_record.record, + next_offset, + log_record.record.id.as_str(), + )) { + Ok(record) => record, + Err(e) => { + return Err(e); } - } else if new_id_to_materialized.contains_key(log_record.record.id.as_str()) { - // Update. - let record_from_map = new_id_to_materialized - .get_mut(log_record.record.id.as_str()) - .unwrap(); - match merge_update_metadata( - ( - &record_from_map.metadata_to_be_merged, - &record_from_map.metadata_to_be_deleted, - ), - &log_record.record.metadata, - ) { - Ok(meta) => { - record_from_map.metadata_to_be_merged = meta.0; - record_from_map.metadata_to_be_deleted = meta.1; - } - Err(e) => { - return Err(LogMaterializerError::MetadataMaterialization(e)); - } - }; - if let Some(doc) = log_record.record.document.as_ref() { - record_from_map.final_document = Some(doc); - } - if let Some(emb) = log_record.record.embedding.as_ref() { - record_from_map.final_embedding = Some(emb.as_slice()); - } - // This record is not present on storage yet hence final operation is - // AddNew and not UpdateExisting. - record_from_map.final_operation = MaterializedLogOperation::AddNew; - } else { - // Insert. - let next_offset = - next_offset_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - let materialized_record = match MaterializedLogRecord::try_from(( - &log_record.record, - next_offset, - log_record.record.id.as_str(), - )) { - Ok(record) => record, - Err(e) => { - return Err(e); - } - }; - new_id_to_materialized - .insert(log_record.record.id.as_str(), materialized_record); - } + }; + id_to_materialized + .insert(log_record.record.id.as_str(), materialized_record); } } } - Ok(()) - }.instrument(tracing::info_span!(parent: Span::current(), "Materialization main iteration")).await?; - let mut res = vec![]; - for (_key, value) in existing_id_to_materialized { - // Ignore records that only had invalid ADDS on the log. - if value.final_operation == MaterializedLogOperation::Initial { - continue; } - res.push(value); } - for (_key, value) in new_id_to_materialized { + // Ok(()) + // }.instrument(tracing::info_span!(parent: Span::current(), "Materialization main iteration")).await?; + let mut res = vec![]; + for (_key, value) in id_to_materialized { res.push(value); } res.sort_by(|x, y| x.offset_id.cmp(&y.offset_id)); + Ok(Chunk::new(res.into())) } diff --git a/rust/worker/src/server.rs b/rust/worker/src/server.rs index ab1956596eff..2022dd1469ee 100644 --- a/rust/worker/src/server.rs +++ b/rust/worker/src/server.rs @@ -605,7 +605,9 @@ mod tests { assert_eq!(response.unwrap_err().code(), tonic::Code::InvalidArgument); } - fn gen_knn_request(mut scan_operator: Option) -> chroma_proto::KnnPlan { + fn gen_knn_request( + mut scan_operator: Option, + ) -> chroma_proto::KnnPlan { if scan_operator.is_none() { scan_operator = Some(scan()); }