diff --git a/rust/worker/src/segment/types.rs b/rust/worker/src/segment/types.rs index c33d39cd104..4f62b321c10 100644 --- a/rust/worker/src/segment/types.rs +++ b/rust/worker/src/segment/types.rs @@ -503,42 +503,45 @@ pub async fn materialize_logs<'me>( continue; } (Some(_), Some(materialized_operation)) => { + // Handles the case where an ID exists in the segment and the log contains a delete followed by an add. match materialized_operation.final_operation { - MaterializedLogOperation::AddNew => panic!("Invariant violation. AddNew state not expected for records that exist in the segment."), - MaterializedLogOperation::OverwriteExisting | MaterializedLogOperation::UpdateExisting => { - // This is a noop. - continue; + MaterializedLogOperation::DeleteExisting => { + match merge_update_metadata( + ( + &materialized_operation.metadata_to_be_merged, + &materialized_operation.metadata_to_be_deleted, + ), + &log_record.record.metadata, + ) { + Ok(meta) => { + materialized_operation.metadata_to_be_merged = meta.0; + materialized_operation.metadata_to_be_deleted = meta.1; } - - MaterializedLogOperation::DeleteExisting => { - match merge_update_metadata( - ( - &materialized_operation.metadata_to_be_merged, - &materialized_operation.metadata_to_be_deleted, - ), - &log_record.record.metadata, - ) { - Ok(meta) => { - materialized_operation.metadata_to_be_merged = meta.0; - materialized_operation.metadata_to_be_deleted = meta.1; - } - Err(e) => { - return Err(LogMaterializerError::MetadataMaterialization(e)); - } - }; - if let Some(doc) = log_record.record.document.as_ref() { - materialized_operation.final_document = Some(doc); - } - if let Some(emb) = log_record.record.embedding.as_ref() { - materialized_operation.final_embedding = Some(emb.as_slice()); - } - // This record is not present on storage yet hence final operation is - // AddNew and not UpdateExisting. - materialized_operation.final_operation = MaterializedLogOperation::OverwriteExisting; + Err(e) => { + return Err(LogMaterializerError::MetadataMaterialization( + e, + )); } + }; + if let Some(doc) = log_record.record.document.as_ref() { + materialized_operation.final_document = Some(doc); + } + if let Some(emb) = log_record.record.embedding.as_ref() { + materialized_operation.final_embedding = Some(emb.as_slice()); } + // This record is not present on storage yet hence final operation is + // AddNew and not UpdateExisting. + materialized_operation.final_operation = + MaterializedLogOperation::OverwriteExisting; + } + _ => { + // This is a noop. + continue; + } + } } (None, None) => { + // New insert let next_offset_id = next_offset_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst); let materialized_record = match MaterializedLogRecord::try_from(( @@ -557,12 +560,7 @@ pub async fn materialize_logs<'me>( }; } Operation::Delete => { - // If the delete is for a record that is currently not in the - // record segment, then we can just NOT process these records - // at all. On the other hand if it is for a record that is currently - // in segment then we'll have to pass it as a delete - // to the compactor so that it can be deleted. - + // Assume that the ID is present in the currently-processed log and remove it match id_to_materialized.remove(log_record.record.id.as_str()) { Some(_) => { // Successfully removed @@ -590,13 +588,11 @@ pub async fn materialize_logs<'me>( } } Operation::Update => { - // todo: rename - let record_from_map = match ( + let current_value_from_segment_or_log = match ( existing_id_to_record.get(log_record.record.id.as_str()), id_to_materialized.get_mut(log_record.record.id.as_str()), ) { (Some(existing_record), None) => { - // todo: operation here doesn't matter let new_record = MaterializedLogRecord { data_record: Some(existing_record.0.clone()), offset_id: existing_record.1, @@ -608,8 +604,9 @@ pub async fn materialize_logs<'me>( final_embedding: None, }; id_to_materialized.insert(existing_record.0.id, new_record); - id_to_materialized.get_mut(existing_record.0.id).unwrap() - // todo + id_to_materialized + .get_mut(existing_record.0.id) + .expect("inserted above") } (None, Some(materialized_operation)) => materialized_operation, (Some(_), Some(materialized_operation)) => materialized_operation, @@ -621,24 +618,24 @@ pub async fn materialize_logs<'me>( match merge_update_metadata( ( - &record_from_map.metadata_to_be_merged, - &record_from_map.metadata_to_be_deleted, + ¤t_value_from_segment_or_log.metadata_to_be_merged, + ¤t_value_from_segment_or_log.metadata_to_be_deleted, ), &log_record.record.metadata, ) { Ok(meta) => { - record_from_map.metadata_to_be_merged = meta.0; - record_from_map.metadata_to_be_deleted = meta.1; + current_value_from_segment_or_log.metadata_to_be_merged = meta.0; + current_value_from_segment_or_log.metadata_to_be_deleted = meta.1; } Err(e) => { return Err(LogMaterializerError::MetadataMaterialization(e)); } }; if let Some(doc) = log_record.record.document.as_ref() { - record_from_map.final_document = Some(doc); + current_value_from_segment_or_log.final_document = Some(doc); } if let Some(emb) = log_record.record.embedding.as_ref() { - record_from_map.final_embedding = Some(emb.as_slice()); + current_value_from_segment_or_log.final_embedding = Some(emb.as_slice()); } } Operation::Upsert => { @@ -647,7 +644,7 @@ pub async fn materialize_logs<'me>( id_to_materialized.get_mut(log_record.record.id.as_str()), ) { (Some(existing_record), None) => { - // Update. + // Update the existing record in the segment let mut materialized_operation = MaterializedLogRecord { data_record: Some(existing_record.0.clone()), offset_id: existing_record.1, @@ -682,38 +679,37 @@ pub async fn materialize_logs<'me>( .insert(log_record.record.id.as_str(), materialized_operation); } (None, Some(materialized_operation)) => { - // Update. - // todo: rename - let record_from_map = materialized_operation; + // Update (the ID is not present in the segment, so modify the existing log operation) match merge_update_metadata( ( - &record_from_map.metadata_to_be_merged, - &record_from_map.metadata_to_be_deleted, + &materialized_operation.metadata_to_be_merged, + &materialized_operation.metadata_to_be_deleted, ), &log_record.record.metadata, ) { Ok(meta) => { - record_from_map.metadata_to_be_merged = meta.0; - record_from_map.metadata_to_be_deleted = meta.1; + materialized_operation.metadata_to_be_merged = meta.0; + materialized_operation.metadata_to_be_deleted = meta.1; } Err(e) => { return Err(LogMaterializerError::MetadataMaterialization(e)); } }; if let Some(doc) = log_record.record.document.as_ref() { - record_from_map.final_document = Some(doc); + materialized_operation.final_document = Some(doc); } if let Some(emb) = log_record.record.embedding.as_ref() { - record_from_map.final_embedding = Some(emb.as_slice()); + materialized_operation.final_embedding = Some(emb.as_slice()); } // This record is not present on storage yet hence final operation is // AddNew and not UpdateExisting. - record_from_map.final_operation = MaterializedLogOperation::AddNew; + materialized_operation.final_operation = MaterializedLogOperation::AddNew; } (Some(_), Some(materialized_operation)) => { + // ID exists in both the segment and the processed log records match materialized_operation.final_operation { MaterializedLogOperation::DeleteExisting => { - let current_value = id_to_materialized.remove(log_record.record.id.as_str()).unwrap(); // todo + let current_value = id_to_materialized.remove(log_record.record.id.as_str()).expect("Exists because get_mut() returned Some(_)."); // Overwrite. let mut materialized_record =