diff --git a/rust/types/src/data_record.rs b/rust/types/src/data_record.rs index ec1f866ad6f..ca21d036bdd 100644 --- a/rust/types/src/data_record.rs +++ b/rust/types/src/data_record.rs @@ -10,6 +10,29 @@ pub struct DataRecord<'a> { pub document: Option<&'a str>, } +#[derive(Debug, Clone)] +pub struct OwnedDataRecord { + pub id: String, + pub embedding: Vec, + pub metadata: Option, + pub document: Option, +} + +impl<'a> From<&DataRecord<'a>> for OwnedDataRecord { + fn from(data_record: &DataRecord<'a>) -> Self { + let id = data_record.id.to_string(); + let embedding = data_record.embedding.to_vec(); + let metadata = data_record.metadata.clone(); + let document = data_record.document.map(|doc| doc.to_string()); + OwnedDataRecord { + id, + embedding, + metadata, + document, + } + } +} + impl DataRecord<'_> { pub fn get_size(&self) -> usize { let id_size = self.id.len(); @@ -28,4 +51,8 @@ impl DataRecord<'_> { }; id_size + embedding_size + metadata_size + document_size } + + pub fn to_owned(&self) -> OwnedDataRecord { + self.into() + } } diff --git a/rust/types/src/operation.rs b/rust/types/src/operation.rs index 0ca6c2c24e1..3b2315213e7 100644 --- a/rust/types/src/operation.rs +++ b/rust/types/src/operation.rs @@ -13,9 +13,6 @@ pub enum Operation { #[derive(Clone, Debug, PartialEq)] pub enum MaterializedLogOperation { - // Set when the record is initially read from the segment - // before it is processed based on state of the log. - Initial, // Set for records that don't exist in the segment and // have been encountered for the first time in the log. AddNew, diff --git a/rust/worker/src/execution/operators/filter.rs b/rust/worker/src/execution/operators/filter.rs index 635636b1137..fcd5f39f1b4 100644 --- a/rust/worker/src/execution/operators/filter.rs +++ b/rust/worker/src/execution/operators/filter.rs @@ -106,17 +106,14 @@ pub(crate) struct MetadataLogReader<'me> { } impl<'me> MetadataLogReader<'me> { - pub(crate) fn new(logs: &'me Chunk>) -> Self { + pub(crate) fn new(logs: &'me Chunk) -> Self { let mut compact_metadata: HashMap<_, BTreeMap<&MetadataValue, RoaringBitmap>> = HashMap::new(); let mut document = HashMap::new(); let mut updated_offset_ids = RoaringBitmap::new(); let mut user_id_to_offset_id = HashMap::new(); for (log, _) in logs.iter() { - if !matches!( - log.final_operation, - MaterializedLogOperation::Initial | MaterializedLogOperation::AddNew - ) { + if !matches!(log.final_operation, MaterializedLogOperation::AddNew) { updated_offset_ids.insert(log.offset_id); } if !matches!( diff --git a/rust/worker/src/execution/operators/hnsw_knn.rs b/rust/worker/src/execution/operators/hnsw_knn.rs index c5e57467d5d..5d8623a1f95 100644 --- a/rust/worker/src/execution/operators/hnsw_knn.rs +++ b/rust/worker/src/execution/operators/hnsw_knn.rs @@ -65,7 +65,7 @@ impl ChromaError for HnswKnnOperatorError { impl HnswKnnOperator { async fn get_disallowed_ids<'referred_data>( &self, - logs: Chunk>, + logs: Chunk, record_segment_reader: &RecordSegmentReader<'_>, ) -> Result, Box> { let mut disallowed_ids = Vec::new(); diff --git a/rust/worker/src/segment/distributed_hnsw_segment.rs b/rust/worker/src/segment/distributed_hnsw_segment.rs index bff4a208b86..f11aaa32365 100644 --- a/rust/worker/src/segment/distributed_hnsw_segment.rs +++ b/rust/worker/src/segment/distributed_hnsw_segment.rs @@ -238,10 +238,10 @@ impl DistributedHNSWSegmentWriter { } } -impl<'a> SegmentWriter<'a> for DistributedHNSWSegmentWriter { +impl SegmentWriter for DistributedHNSWSegmentWriter { async fn apply_materialized_log_chunk( &self, - records: chroma_types::Chunk>, + records: chroma_types::Chunk, ) -> Result<(), ApplyMaterializedLogError> { for (record, _) in records.iter() { match record.final_operation { @@ -284,9 +284,6 @@ impl<'a> SegmentWriter<'a> for DistributedHNSWSegmentWriter { } } } - MaterializedLogOperation::Initial => panic!( - "Invariant violation. Mat records should not contain logs in initial state" - ), } } Ok(()) diff --git a/rust/worker/src/segment/metadata_segment.rs b/rust/worker/src/segment/metadata_segment.rs index c5410479cf9..27e3ead8b98 100644 --- a/rust/worker/src/segment/metadata_segment.rs +++ b/rust/worker/src/segment/metadata_segment.rs @@ -530,16 +530,20 @@ impl<'me> MetadataSegmentWriter<'me> { } } -impl<'log_records> SegmentWriter<'log_records> for MetadataSegmentWriter<'_> { +impl SegmentWriter for MetadataSegmentWriter<'_> { async fn apply_materialized_log_chunk( &self, - records: Chunk>, + records: Chunk, ) -> Result<(), ApplyMaterializedLogError> { let mut count = 0u64; let full_text_writer_batch = records.iter().filter_map(|record| { let offset_id = record.0.offset_id; - let old_document = record.0.data_record.as_ref().and_then(|r| r.document); - let new_document = record.0.final_document; + let old_document = record + .0 + .data_record + .as_ref() + .and_then(|r| r.document.as_ref().map(|d| d.as_str())); + let new_document = &record.0.final_document; if matches!( record.0.final_operation, @@ -685,7 +689,6 @@ impl<'log_records> SegmentWriter<'log_records> for MetadataSegmentWriter<'_> { } }, - MaterializedLogOperation::Initial => panic!("Not expected mat records in the initial state") } } tracing::info!("Applied {} records to metadata segment", count,); diff --git a/rust/worker/src/segment/record_segment.rs b/rust/worker/src/segment/record_segment.rs index 379cc918749..bcc92271561 100644 --- a/rust/worker/src/segment/record_segment.rs +++ b/rust/worker/src/segment/record_segment.rs @@ -61,9 +61,9 @@ pub enum RecordSegmentWriterCreationError { } impl RecordSegmentWriter { - async fn construct_and_set_data_record<'a>( + async fn construct_and_set_data_record( &self, - mat_record: &MaterializedLogRecord<'a>, + mat_record: &MaterializedLogRecord, user_id: &str, offset_id: u32, ) -> Result<(), ApplyMaterializedLogError> { @@ -337,10 +337,10 @@ impl ChromaError for ApplyMaterializedLogError { } } -impl<'a> SegmentWriter<'a> for RecordSegmentWriter { +impl SegmentWriter for RecordSegmentWriter { async fn apply_materialized_log_chunk( &self, - records: Chunk>, + records: Chunk, ) -> Result<(), ApplyMaterializedLogError> { // The max new offset id introduced by materialized logs is initialized as zero // Since offset id should start from 1, we use this to indicate no new offset id @@ -357,7 +357,11 @@ impl<'a> SegmentWriter<'a> for RecordSegmentWriter { .user_id_to_id .as_ref() .unwrap() - .set::<&str, u32>("", log_record.user_id.unwrap(), log_record.offset_id) + .set::<&str, u32>( + "", + log_record.user_id.as_ref().unwrap(), + log_record.offset_id, + ) .await { Ok(()) => (), @@ -370,7 +374,11 @@ impl<'a> SegmentWriter<'a> for RecordSegmentWriter { .id_to_user_id .as_ref() .unwrap() - .set::("", log_record.offset_id, log_record.user_id.unwrap().to_string()) + .set::( + "", + log_record.offset_id, + log_record.user_id.clone().unwrap(), + ) .await { Ok(()) => (), @@ -382,7 +390,7 @@ impl<'a> SegmentWriter<'a> for RecordSegmentWriter { match self .construct_and_set_data_record( log_record, - log_record.user_id.unwrap(), + log_record.user_id.as_ref().unwrap(), log_record.offset_id, ) .await @@ -395,7 +403,8 @@ impl<'a> SegmentWriter<'a> for RecordSegmentWriter { // Set max offset id. max_new_offset_id = max_new_offset_id.max(log_record.offset_id); } - MaterializedLogOperation::UpdateExisting | MaterializedLogOperation::OverwriteExisting => { + MaterializedLogOperation::UpdateExisting + | MaterializedLogOperation::OverwriteExisting => { // Offset id and user id do not need to change. Only data // needs to change. Blockfile does not have Read then write // semantics so we'll delete and insert. @@ -415,7 +424,7 @@ impl<'a> SegmentWriter<'a> for RecordSegmentWriter { match self .construct_and_set_data_record( log_record, - log_record.data_record.as_ref().unwrap().id, + &log_record.data_record.as_ref().unwrap().id, log_record.offset_id, ) .await @@ -432,7 +441,7 @@ impl<'a> SegmentWriter<'a> for RecordSegmentWriter { .user_id_to_id .as_ref() .unwrap() - .delete::<&str, u32>("", log_record.data_record.as_ref().unwrap().id) + .delete::<&str, u32>("", &log_record.data_record.as_ref().unwrap().id) .await { Ok(()) => (), @@ -470,7 +479,6 @@ impl<'a> SegmentWriter<'a> for RecordSegmentWriter { } } } - MaterializedLogOperation::Initial => panic!("Invariant violation. Materialized logs should not have any logs in the initial state") } } self.max_new_offset_id diff --git a/rust/worker/src/segment/types.rs b/rust/worker/src/segment/types.rs index 11e1845741f..23b93dc3ceb 100644 --- a/rust/worker/src/segment/types.rs +++ b/rust/worker/src/segment/types.rs @@ -3,7 +3,7 @@ use chroma_error::{ChromaError, ErrorCodes}; use chroma_types::{ Chunk, DataRecord, DeletedMetadata, LogRecord, MaterializedLogOperation, Metadata, MetadataDelta, MetadataValue, MetadataValueConversionError, Operation, OperationRecord, - UpdateMetadata, UpdateMetadataValue, + OwnedDataRecord, UpdateMetadata, UpdateMetadataValue, }; use std::collections::{HashMap, HashSet}; use std::sync::atomic::AtomicU32; @@ -113,10 +113,10 @@ impl ChromaError for LogMaterializerError { } #[derive(Debug, Clone)] -pub struct MaterializedLogRecord<'referred_data> { +pub struct MaterializedLogRecord { // This is the data record read from the record segment for this id. // None if the record exists only in the log. - pub(crate) data_record: Option>, + pub(crate) data_record: Option, // If present in the record segment then it is the offset id // in the record segment at which the record was found. // If not present in the segment then it is the offset id @@ -125,7 +125,7 @@ pub struct MaterializedLogRecord<'referred_data> { // Set only for the records that are being inserted for the first time // in the log since data_record will be None in such cases. For other // cases, just read from data record. - pub(crate) user_id: Option<&'referred_data str>, + pub(crate) user_id: Option, // There can be several entries in the log for an id. This is the final // operation that needs to be done on it. For e.g. // If log has [Update, Update, Delete] then final operation is Delete. @@ -149,15 +149,15 @@ pub struct MaterializedLogRecord<'referred_data> { // This is the final document obtained from the last non null operation. // E.g. if log has [Insert(str0), Update(str1), Update(str2), Update()] then this will contain // str2. None if final operation is Delete. - pub(crate) final_document: Option<&'referred_data str>, + pub(crate) final_document: Option, // Similar to above, this is the final embedding obtained // from the last non null operation. // E.g. if log has [Insert(emb0), Update(emb1), Update(emb2), Update()] // then this will contain emb2. None if final operation is Delete. - pub(crate) final_embedding: Option<&'referred_data [f32]>, + pub(crate) final_embedding: Option>, } -impl<'referred_data> MaterializedLogRecord<'referred_data> { +impl MaterializedLogRecord { // Performs a deep copy of the document so only use it if really // needed. If you only need a reference then use merged_document_ref // defined below. @@ -165,12 +165,12 @@ impl<'referred_data> MaterializedLogRecord<'referred_data> { if self.final_operation == MaterializedLogOperation::OverwriteExisting || self.final_operation == MaterializedLogOperation::AddNew { - return self.final_document.map(|doc| doc.to_string()); + return self.final_document.clone(); } - return match self.final_document { - Some(doc) => Some(doc.to_string()), + return match self.final_document.clone() { + Some(doc) => Some(doc), None => match self.data_record.as_ref() { - Some(data_record) => data_record.document.map(|doc| doc.to_string()), + Some(data_record) => data_record.document.clone(), None => None, }, }; @@ -180,18 +180,12 @@ impl<'referred_data> MaterializedLogRecord<'referred_data> { if self.final_operation == MaterializedLogOperation::OverwriteExisting || self.final_operation == MaterializedLogOperation::AddNew { - return match self.final_document { - Some(doc) => Some(doc), - None => None, - }; + return self.final_document.as_deref(); } - return match self.final_document { + return match &self.final_document { Some(doc) => Some(doc), None => match self.data_record.as_ref() { - Some(data_record) => match data_record.document { - Some(doc) => Some(doc), - None => None, - }, + Some(data_record) => data_record.document.as_ref().map(|doc| doc.as_str()), None => None, }, }; @@ -200,8 +194,8 @@ impl<'referred_data> MaterializedLogRecord<'referred_data> { // Performs a deep copy of the user id so only use it if really // needed. If you only need reference then use merged_user_id_ref below. pub(crate) fn merged_user_id(&self) -> String { - match self.user_id { - Some(id) => id.to_string(), + match &self.user_id { + Some(id) => id.clone(), None => match &self.data_record { Some(data_record) => data_record.id.to_string(), None => panic!("Expected at least one user id to be set"), @@ -209,11 +203,12 @@ impl<'referred_data> MaterializedLogRecord<'referred_data> { } } + // todo: needed? pub(crate) fn merged_user_id_ref(&self) -> &str { - match self.user_id { - Some(id) => id, + match &self.user_id { + Some(id) => id.as_str(), None => match &self.data_record { - Some(data_record) => data_record.id, + Some(data_record) => &data_record.id, None => panic!("Expected at least one user id to be set"), }, } @@ -249,7 +244,9 @@ impl<'referred_data> MaterializedLogRecord<'referred_data> { final_metadata } - pub(crate) fn metadata_delta(&'referred_data self) -> MetadataDelta<'referred_data> { + pub(crate) fn metadata_delta<'referred_data>( + &'referred_data self, + ) -> MetadataDelta<'referred_data> { let mut metadata_delta = MetadataDelta::new(); let mut base_metadata: HashMap<&str, &MetadataValue> = HashMap::new(); if let Some(data_record) = &self.data_record { @@ -321,45 +318,45 @@ impl<'referred_data> MaterializedLogRecord<'referred_data> { if self.final_operation == MaterializedLogOperation::OverwriteExisting || self.final_operation == MaterializedLogOperation::AddNew { - return match self.final_embedding { + return match &self.final_embedding { Some(embed) => embed, None => panic!("Expected source of embedding"), }; } - return match self.final_embedding { + return match &self.final_embedding { Some(embed) => embed, None => match self.data_record.as_ref() { - Some(data_record) => data_record.embedding, + Some(data_record) => &data_record.embedding, None => panic!("Expected at least one source of embedding"), }, }; } } -impl<'referred_data> From<(DataRecord<'referred_data>, u32)> - for MaterializedLogRecord<'referred_data> -{ - fn from(data_record_info: (DataRecord<'referred_data>, u32)) -> Self { - let data_record = data_record_info.0; - let offset_id = data_record_info.1; - Self { - data_record: Some(data_record), - offset_id, - user_id: None, - final_operation: MaterializedLogOperation::Initial, - metadata_to_be_merged: None, - metadata_to_be_deleted: None, - final_document: None, - final_embedding: None, - } - } -} +// impl<'referred_data> From<(DataRecord<'referred_data>, u32)> +// for MaterializedLogRecord<'referred_data> +// { +// fn from(data_record_info: (DataRecord<'referred_data>, u32)) -> Self { +// let data_record = data_record_info.0; +// let offset_id = data_record_info.1; +// Self { +// data_record: Some(data_record), +// offset_id, +// user_id: None, +// final_operation: MaterializedLogOperation::Initial, +// metadata_to_be_merged: None, +// metadata_to_be_deleted: None, +// final_document: None, +// final_embedding: None, +// } +// } +// } // Creates a materialized log record from the corresponding entry // in the log (OperationRecord), offset id in storage where it will be stored (u32) // and user id (str). impl<'referred_data> TryFrom<(&'referred_data OperationRecord, u32, &'referred_data str)> - for MaterializedLogRecord<'referred_data> + for MaterializedLogRecord { type Error = LogMaterializerError; @@ -387,9 +384,8 @@ impl<'referred_data> TryFrom<(&'referred_data OperationRecord, u32, &'referred_d } }; - let document = log_record.document.as_deref(); let embedding = match &log_record.embedding { - Some(embedding) => Some(embedding.as_slice()), + Some(embedding) => Some(embedding.clone()), None => { return Err(LogMaterializerError::EmbeddingMaterialization); } @@ -398,11 +394,11 @@ impl<'referred_data> TryFrom<(&'referred_data OperationRecord, u32, &'referred_d Ok(Self { data_record: None, offset_id, - user_id: Some(user_id), + user_id: Some(user_id.to_string()), final_operation: MaterializedLogOperation::AddNew, metadata_to_be_merged: merged_metadata, metadata_to_be_deleted: deleted_metadata, - final_document: document, + final_document: log_record.document.clone(), final_embedding: embedding, }) } @@ -417,7 +413,7 @@ pub async fn materialize_logs<'me>( // for materializing. Writers pass this value to the materializer // because they need to share this across all log partitions. next_offset_id: Option>, -) -> Result>, LogMaterializerError> { +) -> Result, LogMaterializerError> { // Trace the total_len since len() iterates over the entire chunk // and we don't want to do that just to trace the length. tracing::info!("Total length of logs in materializer: {}", logs.total_len()); @@ -438,8 +434,8 @@ pub async fn materialize_logs<'me>( }; // Populate entries that are present in the record segment. - let mut existing_id_to_materialized: HashMap<&str, MaterializedLogRecord> = HashMap::new(); - let mut new_id_to_materialized: HashMap<&str, MaterializedLogRecord> = HashMap::new(); + let mut existing_id_to_record: HashMap<&str, (DataRecord, u32)> = HashMap::new(); + let mut id_to_materialized: HashMap<&str, MaterializedLogRecord> = HashMap::new(); if let Some(reader) = &record_segment_reader { async { for (log_record, _) in logs.iter() { @@ -458,10 +454,8 @@ pub async fn materialize_logs<'me>( .await { Ok(Some((data_record, offset_id))) => { - existing_id_to_materialized.insert( - log_record.record.id.as_str(), - MaterializedLogRecord::from((data_record, offset_id)), - ); + existing_id_to_record + .insert(log_record.record.id.as_str(), (data_record, offset_id)); } Ok(None) => { return Err(LogMaterializerError::RecordSegment(Box::new( @@ -485,288 +479,307 @@ pub async fn materialize_logs<'me>( ) .await?; } + // Populate updates to these and fresh records that are being // inserted for the first time. - async { - for (log_record, _) in logs.iter() { - match log_record.record.operation { - Operation::Add => { - // If this is an add of a record present in the segment then add - // only if it has been previously deleted in the log. - if existing_id_to_materialized.contains_key(log_record.record.id.as_str()) { - // safe to unwrap - let operation = existing_id_to_materialized - .get(log_record.record.id.as_str()) - .unwrap() - .final_operation - .clone(); - match operation { - MaterializedLogOperation::DeleteExisting => { - let curr_val = existing_id_to_materialized.remove(log_record.record.id.as_str()).unwrap(); - // Overwrite. - let mut materialized_record = - match MaterializedLogRecord::try_from(( - &log_record.record, - curr_val.offset_id, - log_record.record.id.as_str(), - )) { - Ok(record) => record, - Err(e) => { - return Err(e); - } - }; - materialized_record.data_record = curr_val.data_record; - materialized_record.final_operation = - MaterializedLogOperation::OverwriteExisting; - existing_id_to_materialized - .insert(log_record.record.id.as_str(), materialized_record); - }, - MaterializedLogOperation::AddNew => panic!("Invariant violation. Existing record can never have an Add new state"), - MaterializedLogOperation::Initial | MaterializedLogOperation::OverwriteExisting | MaterializedLogOperation::UpdateExisting => { - // Invalid add so skip. - continue; + // async { + for (log_record, _) in logs.iter() { + match log_record.record.operation { + Operation::Add => { + match ( + existing_id_to_record.get(log_record.record.id.as_str()), + id_to_materialized.get_mut(log_record.record.id.as_str()), + ) { + (Some(_), None) => { + // Offset ID already exists in segment and this ID has not been modified in the log up to this point, so the add is a noop/invalid. + continue; + } + (None, Some(_)) => { + // Offset ID does not exist in segment but does exist in log up to this point (a new add), so this second add is a noop/invalid. + continue; + } + (Some(_), Some(materialized_operation)) => { + // Handles the case where an ID exists in the segment and the log contains a delete followed by an add. + match materialized_operation.final_operation { + MaterializedLogOperation::DeleteExisting => { + match merge_update_metadata( + ( + &materialized_operation.metadata_to_be_merged, + &materialized_operation.metadata_to_be_deleted, + ), + &log_record.record.metadata, + ) { + Ok(meta) => { + materialized_operation.metadata_to_be_merged = meta.0; + materialized_operation.metadata_to_be_deleted = meta.1; + } + Err(e) => { + return Err(LogMaterializerError::MetadataMaterialization( + e, + )); + } + }; + materialized_operation.final_document = + log_record.record.document.clone(); + if let Some(emb) = log_record.record.embedding.as_ref() { + materialized_operation.final_embedding = Some(emb.clone()); } + // This record is not present on storage yet hence final operation is + // AddNew and not UpdateExisting. + materialized_operation.final_operation = + MaterializedLogOperation::OverwriteExisting; + } + _ => { + // This is a noop. + continue; } - } - // Adding an entry that does not exist on the segment yet. - // Only add if it hasn't been added before in the log. - else if !new_id_to_materialized.contains_key(log_record.record.id.as_str()) { - let next_offset_id = - next_offset_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - let materialized_record = match MaterializedLogRecord::try_from(( - &log_record.record, - next_offset_id, - log_record.record.id.as_str(), - )) { - Ok(record) => record, - Err(e) => { - return Err(e); - } - }; - new_id_to_materialized - .insert(log_record.record.id.as_str(), materialized_record); } } - Operation::Delete => { - // If the delete is for a record that is currently not in the - // record segment, then we can just NOT process these records - // at all. On the other hand if it is for a record that is currently - // in segment then we'll have to pass it as a delete - // to the compactor so that it can be deleted. - if new_id_to_materialized.contains_key(log_record.record.id.as_str()) { - new_id_to_materialized.remove(log_record.record.id.as_str()); - } else if existing_id_to_materialized - .contains_key(log_record.record.id.as_str()) + (None, None) => { + // New insert + let next_offset_id = + next_offset_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let materialized_record = match MaterializedLogRecord::try_from(( + &log_record.record, + next_offset_id, + log_record.record.id.as_str(), + )) { + Ok(record) => record, + Err(e) => { + return Err(e); + } + }; + id_to_materialized + .insert(log_record.record.id.as_str(), materialized_record); + } + }; + } + Operation::Delete => { + // Assume that the ID is present in the currently-processed log and remove it + match id_to_materialized.remove(log_record.record.id.as_str()) { + Some(_) => { + // Successfully removed + } + None => { + // Was not found in processed log records, check the segment + if let Some(existing_record) = + existing_id_to_record.get(log_record.record.id.as_str()) { - // Mark state as deleted. Other fields become noop after such a delete. - let record_from_map = existing_id_to_materialized - .get_mut(log_record.record.id.as_str()) - .unwrap(); - record_from_map.final_operation = MaterializedLogOperation::DeleteExisting; - record_from_map.final_document = None; - record_from_map.final_embedding = None; - record_from_map.metadata_to_be_merged = None; - record_from_map.metadata_to_be_deleted = None; - record_from_map.user_id = None; + id_to_materialized.insert( + log_record.record.id.as_str(), + MaterializedLogRecord { + data_record: Some(existing_record.0.to_owned()), + offset_id: existing_record.1, + user_id: None, + final_operation: MaterializedLogOperation::DeleteExisting, + metadata_to_be_merged: None, + metadata_to_be_deleted: None, + final_document: None, + final_embedding: None, + }, + ); } } - Operation::Update => { - let record_from_map = match existing_id_to_materialized - .get_mut(log_record.record.id.as_str()) - { - Some(res) => { - match res.final_operation { - // Ignore the update if deleted. - MaterializedLogOperation::DeleteExisting => { - continue; - }, - MaterializedLogOperation::AddNew => panic!("Invariant violation. AddNew state not expected for an entry that exists on the segment"), - MaterializedLogOperation::Initial | MaterializedLogOperation::OverwriteExisting | MaterializedLogOperation::UpdateExisting => {} - } - res - } - None => match new_id_to_materialized.get_mut(log_record.record.id.as_str()) - { - Some(res) => res, - None => { - // Does not exist in either maps. Ignore this update. - continue; - } - }, + } + } + Operation::Update => { + let current_value_from_segment_or_log = match ( + existing_id_to_record.get(log_record.record.id.as_str()), + id_to_materialized.get_mut(log_record.record.id.as_str()), + ) { + (Some(existing_record), None) => { + let new_record = MaterializedLogRecord { + data_record: Some(existing_record.0.to_owned()), + offset_id: existing_record.1, + user_id: None, + final_operation: MaterializedLogOperation::UpdateExisting, + metadata_to_be_merged: None, + metadata_to_be_deleted: None, + final_document: None, + final_embedding: None, + }; + id_to_materialized.insert(existing_record.0.id, new_record); + id_to_materialized + .get_mut(existing_record.0.id) + .expect("inserted above") + } + (None, Some(materialized_operation)) => materialized_operation, + (Some(_), Some(materialized_operation)) => materialized_operation, + (None, None) => { + // Invalid update because the offset ID does not exist in the record segment or log (processed up to this point) + continue; + } + }; + + match merge_update_metadata( + ( + ¤t_value_from_segment_or_log.metadata_to_be_merged, + ¤t_value_from_segment_or_log.metadata_to_be_deleted, + ), + &log_record.record.metadata, + ) { + Ok(meta) => { + current_value_from_segment_or_log.metadata_to_be_merged = meta.0; + current_value_from_segment_or_log.metadata_to_be_deleted = meta.1; + } + Err(e) => { + return Err(LogMaterializerError::MetadataMaterialization(e)); + } + }; + current_value_from_segment_or_log.final_document = + log_record.record.document.clone(); + if let Some(emb) = log_record.record.embedding.as_ref() { + current_value_from_segment_or_log.final_embedding = Some(emb.clone()); + } + } + Operation::Upsert => { + match ( + existing_id_to_record.get(log_record.record.id.as_str()), + id_to_materialized.get_mut(log_record.record.id.as_str()), + ) { + (Some(existing_record), None) => { + // Update the existing record in the segment + let mut materialized_operation = MaterializedLogRecord { + data_record: Some(existing_record.0.to_owned()), + offset_id: existing_record.1, + user_id: None, + final_operation: MaterializedLogOperation::UpdateExisting, + metadata_to_be_merged: None, + metadata_to_be_deleted: None, + final_document: None, + final_embedding: None, }; + let merged_metadata = merge_update_metadata( + ( + &materialized_operation.metadata_to_be_merged, + &materialized_operation.metadata_to_be_deleted, + ), + &log_record.record.metadata, + ) + .map_err(LogMaterializerError::MetadataMaterialization)?; + materialized_operation.metadata_to_be_merged = merged_metadata.0; + materialized_operation.metadata_to_be_deleted = merged_metadata.1; + + materialized_operation.final_document = log_record.record.document.clone(); + + if let Some(emb) = log_record.record.embedding.as_ref() { + materialized_operation.final_embedding = Some(emb.clone()); + } + + id_to_materialized + .insert(log_record.record.id.as_str(), materialized_operation); + } + (None, Some(materialized_operation)) => { + // Update (the ID is not present in the segment, so modify the existing log operation) match merge_update_metadata( ( - &record_from_map.metadata_to_be_merged, - &record_from_map.metadata_to_be_deleted, + &materialized_operation.metadata_to_be_merged, + &materialized_operation.metadata_to_be_deleted, ), &log_record.record.metadata, ) { Ok(meta) => { - record_from_map.metadata_to_be_merged = meta.0; - record_from_map.metadata_to_be_deleted = meta.1; + materialized_operation.metadata_to_be_merged = meta.0; + materialized_operation.metadata_to_be_deleted = meta.1; } Err(e) => { return Err(LogMaterializerError::MetadataMaterialization(e)); } }; - if let Some(doc) = log_record.record.document.as_ref() { - record_from_map.final_document = Some(doc); - } + + materialized_operation.final_document = log_record.record.document.clone(); + if let Some(emb) = log_record.record.embedding.as_ref() { - record_from_map.final_embedding = Some(emb.as_slice()); - } - match record_from_map.final_operation { - MaterializedLogOperation::Initial => { - record_from_map.final_operation = - MaterializedLogOperation::UpdateExisting; - } - // State remains as is. - MaterializedLogOperation::AddNew - | MaterializedLogOperation::OverwriteExisting - | MaterializedLogOperation::UpdateExisting => {} - // Not expected. - MaterializedLogOperation::DeleteExisting => { - panic!("Invariant violation. Should not be updating a deleted record") - } + materialized_operation.final_embedding = Some(emb.clone()); } + // This record is not present on storage yet hence final operation is + // AddNew and not UpdateExisting. + materialized_operation.final_operation = MaterializedLogOperation::AddNew; } - Operation::Upsert => { - if existing_id_to_materialized.contains_key(log_record.record.id.as_str()) { - // safe to unwrap here. - let operation = existing_id_to_materialized - .get(log_record.record.id.as_str()) - .unwrap() - .final_operation - .clone(); - match operation { - MaterializedLogOperation::DeleteExisting => { - let curr_val = existing_id_to_materialized.remove(log_record.record.id.as_str()).unwrap(); - // Overwrite. - let mut materialized_record = - match MaterializedLogRecord::try_from(( - &log_record.record, - curr_val.offset_id, - log_record.record.id.as_str(), - )) { - Ok(record) => record, + (Some(_), Some(materialized_operation)) => { + // ID exists in both the segment and the processed log records + match materialized_operation.final_operation { + MaterializedLogOperation::DeleteExisting => { + let current_value = id_to_materialized.remove(log_record.record.id.as_str()).expect("Exists because get_mut() returned Some(_)."); + + // Overwrite. + let mut materialized_record = + match MaterializedLogRecord::try_from(( + &log_record.record, + current_value.offset_id, + log_record.record.id.as_str(), + )) { + Ok(record) => record, + Err(e) => { + return Err(e); + } + }; + materialized_record.data_record = current_value.data_record; + materialized_record.final_operation = + MaterializedLogOperation::OverwriteExisting; + id_to_materialized + .insert(log_record.record.id.as_str(), materialized_record); + }, + MaterializedLogOperation::AddNew => panic!("Invariant violation. AddNew state not expected for records that exist in the segment"), + MaterializedLogOperation::OverwriteExisting | MaterializedLogOperation::UpdateExisting => { + // Update. + match merge_update_metadata((&materialized_operation.metadata_to_be_merged, &materialized_operation.metadata_to_be_deleted,),&log_record.record.metadata,) { + Ok(meta) => { + materialized_operation.metadata_to_be_merged = meta.0; + materialized_operation.metadata_to_be_deleted = meta.1; + } Err(e) => { - return Err(e); + return Err(LogMaterializerError::MetadataMaterialization(e)); } }; - materialized_record.data_record = curr_val.data_record; - materialized_record.final_operation = - MaterializedLogOperation::OverwriteExisting; - existing_id_to_materialized - .insert(log_record.record.id.as_str(), materialized_record); - }, - MaterializedLogOperation::AddNew => panic!("Invariant violation. AddNew state not expected for records that exist in the segment"), - MaterializedLogOperation::Initial | MaterializedLogOperation::OverwriteExisting | MaterializedLogOperation::UpdateExisting => { - // Update. - let record_from_map = existing_id_to_materialized.get_mut(log_record.record.id.as_str()).unwrap(); - match merge_update_metadata((&record_from_map.metadata_to_be_merged, &record_from_map.metadata_to_be_deleted,),&log_record.record.metadata,) { - Ok(meta) => { - record_from_map.metadata_to_be_merged = meta.0; - record_from_map.metadata_to_be_deleted = meta.1; - } - Err(e) => { - return Err(LogMaterializerError::MetadataMaterialization(e)); - } - }; - if let Some(doc) = log_record.record.document.as_ref() { - record_from_map.final_document = Some(doc); - } - if let Some(emb) = log_record.record.embedding.as_ref() { - record_from_map.final_embedding = Some(emb.as_slice()); - } - match record_from_map.final_operation { - MaterializedLogOperation::Initial => { - record_from_map.final_operation = - MaterializedLogOperation::UpdateExisting; - } - // State remains as is. - MaterializedLogOperation::AddNew - | MaterializedLogOperation::OverwriteExisting - | MaterializedLogOperation::UpdateExisting => {} - // Not expected. - MaterializedLogOperation::DeleteExisting => { - panic!("Invariant violation. Should not be updating a deleted record") + + materialized_operation.final_document = log_record.record.document.clone(); + + if let Some(emb) = log_record.record.embedding.as_ref() { + materialized_operation.final_embedding = Some(emb.clone()); } } } + } + (None, None) => { + // Insert. + let next_offset = + next_offset_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let materialized_record = match MaterializedLogRecord::try_from(( + &log_record.record, + next_offset, + log_record.record.id.as_str(), + )) { + Ok(record) => record, + Err(e) => { + return Err(e); } - } else if new_id_to_materialized.contains_key(log_record.record.id.as_str()) { - // Update. - let record_from_map = new_id_to_materialized - .get_mut(log_record.record.id.as_str()) - .unwrap(); - match merge_update_metadata( - ( - &record_from_map.metadata_to_be_merged, - &record_from_map.metadata_to_be_deleted, - ), - &log_record.record.metadata, - ) { - Ok(meta) => { - record_from_map.metadata_to_be_merged = meta.0; - record_from_map.metadata_to_be_deleted = meta.1; - } - Err(e) => { - return Err(LogMaterializerError::MetadataMaterialization(e)); - } - }; - if let Some(doc) = log_record.record.document.as_ref() { - record_from_map.final_document = Some(doc); - } - if let Some(emb) = log_record.record.embedding.as_ref() { - record_from_map.final_embedding = Some(emb.as_slice()); - } - // This record is not present on storage yet hence final operation is - // AddNew and not UpdateExisting. - record_from_map.final_operation = MaterializedLogOperation::AddNew; - } else { - // Insert. - let next_offset = - next_offset_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - let materialized_record = match MaterializedLogRecord::try_from(( - &log_record.record, - next_offset, - log_record.record.id.as_str(), - )) { - Ok(record) => record, - Err(e) => { - return Err(e); - } - }; - new_id_to_materialized - .insert(log_record.record.id.as_str(), materialized_record); - } + }; + id_to_materialized + .insert(log_record.record.id.as_str(), materialized_record); } } } - Ok(()) - }.instrument(tracing::info_span!(parent: Span::current(), "Materialization main iteration")).await?; - let mut res = vec![]; - for (_key, value) in existing_id_to_materialized { - // Ignore records that only had invalid ADDS on the log. - if value.final_operation == MaterializedLogOperation::Initial { - continue; } - res.push(value); } - for (_key, value) in new_id_to_materialized { + // Ok(()) + // }.instrument(tracing::info_span!(parent: Span::current(), "Materialization main iteration")).await?; + let mut res = vec![]; + for (_key, value) in id_to_materialized { res.push(value); } res.sort_by(|x, y| x.offset_id.cmp(&y.offset_id)); + Ok(Chunk::new(res.into())) } // This needs to be public for testing #[allow(async_fn_in_trait)] -pub trait SegmentWriter<'a> { +pub trait SegmentWriter { async fn apply_materialized_log_chunk( &self, - records: Chunk>, + records: Chunk, ) -> Result<(), ApplyMaterializedLogError>; async fn commit(self) -> Result>; } @@ -1836,10 +1849,10 @@ mod tests { // Embedding 3. if log.user_id.is_some() { id3_found += 1; - assert_eq!("embedding_id_3", log.user_id.unwrap()); + assert_eq!("embedding_id_3", log.user_id.clone().unwrap()); assert!(log.data_record.is_none()); - assert_eq!("doc3", log.final_document.unwrap()); - assert_eq!(vec![7.0, 8.0, 9.0], log.final_embedding.unwrap()); + assert_eq!("doc3", log.final_document.clone().unwrap()); + assert_eq!(vec![7.0, 8.0, 9.0], log.final_embedding.clone().unwrap()); assert_eq!(3, log.offset_id); assert_eq!(MaterializedLogOperation::AddNew, log.final_operation); let mut hello_found = 0; @@ -1895,7 +1908,10 @@ mod tests { assert_eq!(hello_found, 1); assert_eq!(hello_again_found, 1); assert!(log.data_record.is_some()); - assert_eq!(log.data_record.as_ref().unwrap().document, Some("doc1")); + assert_eq!( + log.data_record.as_ref().unwrap().document, + Some("doc1".to_string()) + ); assert_eq!( log.data_record.as_ref().unwrap().embedding, vec![1.0, 2.0, 3.0].as_slice() diff --git a/rust/worker/src/server.rs b/rust/worker/src/server.rs index ab1956596ef..2022dd1469e 100644 --- a/rust/worker/src/server.rs +++ b/rust/worker/src/server.rs @@ -605,7 +605,9 @@ mod tests { assert_eq!(response.unwrap_err().code(), tonic::Code::InvalidArgument); } - fn gen_knn_request(mut scan_operator: Option) -> chroma_proto::KnnPlan { + fn gen_knn_request( + mut scan_operator: Option, + ) -> chroma_proto::KnnPlan { if scan_operator.is_none() { scan_operator = Some(scan()); }