Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
codetheweb committed Dec 2, 2024
1 parent 58cddf5 commit 2458959
Showing 1 changed file with 55 additions and 59 deletions.
114 changes: 55 additions & 59 deletions rust/worker/src/segment/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,42 +503,45 @@ pub async fn materialize_logs<'me>(
continue;
}
(Some(_), Some(materialized_operation)) => {
// Handles the case where an ID exists in the segment and the log contains a delete followed by an add.
match materialized_operation.final_operation {
MaterializedLogOperation::AddNew => panic!("Invariant violation. AddNew state not expected for records that exist in the segment."),
MaterializedLogOperation::OverwriteExisting | MaterializedLogOperation::UpdateExisting => {
// This is a noop.
continue;
MaterializedLogOperation::DeleteExisting => {
match merge_update_metadata(
(
&materialized_operation.metadata_to_be_merged,
&materialized_operation.metadata_to_be_deleted,
),
&log_record.record.metadata,
) {
Ok(meta) => {
materialized_operation.metadata_to_be_merged = meta.0;
materialized_operation.metadata_to_be_deleted = meta.1;
}

MaterializedLogOperation::DeleteExisting => {
match merge_update_metadata(
(
&materialized_operation.metadata_to_be_merged,
&materialized_operation.metadata_to_be_deleted,
),
&log_record.record.metadata,
) {
Ok(meta) => {
materialized_operation.metadata_to_be_merged = meta.0;
materialized_operation.metadata_to_be_deleted = meta.1;
}
Err(e) => {
return Err(LogMaterializerError::MetadataMaterialization(e));
}
};
if let Some(doc) = log_record.record.document.as_ref() {
materialized_operation.final_document = Some(doc);
}
if let Some(emb) = log_record.record.embedding.as_ref() {
materialized_operation.final_embedding = Some(emb.as_slice());
}
// This record is not present on storage yet hence final operation is
// AddNew and not UpdateExisting.
materialized_operation.final_operation = MaterializedLogOperation::OverwriteExisting;
Err(e) => {
return Err(LogMaterializerError::MetadataMaterialization(
e,
));
}
};
if let Some(doc) = log_record.record.document.as_ref() {
materialized_operation.final_document = Some(doc);
}
if let Some(emb) = log_record.record.embedding.as_ref() {
materialized_operation.final_embedding = Some(emb.as_slice());
}
// This record is not present on storage yet hence final operation is
// AddNew and not UpdateExisting.
materialized_operation.final_operation =
MaterializedLogOperation::OverwriteExisting;
}
_ => {
// This is a noop.
continue;
}
}
}
(None, None) => {
// New insert
let next_offset_id =
next_offset_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let materialized_record = match MaterializedLogRecord::try_from((
Expand All @@ -557,12 +560,7 @@ pub async fn materialize_logs<'me>(
};
}
Operation::Delete => {
// If the delete is for a record that is currently not in the
// record segment, then we can just NOT process these records
// at all. On the other hand if it is for a record that is currently
// in segment then we'll have to pass it as a delete
// to the compactor so that it can be deleted.

// Assume that the ID is present in the currently-processed log and remove it
match id_to_materialized.remove(log_record.record.id.as_str()) {
Some(_) => {
// Successfully removed
Expand Down Expand Up @@ -590,13 +588,11 @@ pub async fn materialize_logs<'me>(
}
}
Operation::Update => {
// todo: rename
let record_from_map = match (
let current_value_from_segment_or_log = match (
existing_id_to_record.get(log_record.record.id.as_str()),
id_to_materialized.get_mut(log_record.record.id.as_str()),
) {
(Some(existing_record), None) => {
// todo: operation here doesn't matter
let new_record = MaterializedLogRecord {
data_record: Some(existing_record.0.clone()),
offset_id: existing_record.1,
Expand All @@ -608,8 +604,9 @@ pub async fn materialize_logs<'me>(
final_embedding: None,
};
id_to_materialized.insert(existing_record.0.id, new_record);
id_to_materialized.get_mut(existing_record.0.id).unwrap()
// todo
id_to_materialized
.get_mut(existing_record.0.id)
.expect("inserted above")
}
(None, Some(materialized_operation)) => materialized_operation,
(Some(_), Some(materialized_operation)) => materialized_operation,
Expand All @@ -621,24 +618,24 @@ pub async fn materialize_logs<'me>(

match merge_update_metadata(
(
&record_from_map.metadata_to_be_merged,
&record_from_map.metadata_to_be_deleted,
&current_value_from_segment_or_log.metadata_to_be_merged,
&current_value_from_segment_or_log.metadata_to_be_deleted,
),
&log_record.record.metadata,
) {
Ok(meta) => {
record_from_map.metadata_to_be_merged = meta.0;
record_from_map.metadata_to_be_deleted = meta.1;
current_value_from_segment_or_log.metadata_to_be_merged = meta.0;
current_value_from_segment_or_log.metadata_to_be_deleted = meta.1;
}
Err(e) => {
return Err(LogMaterializerError::MetadataMaterialization(e));
}
};
if let Some(doc) = log_record.record.document.as_ref() {
record_from_map.final_document = Some(doc);
current_value_from_segment_or_log.final_document = Some(doc);
}
if let Some(emb) = log_record.record.embedding.as_ref() {
record_from_map.final_embedding = Some(emb.as_slice());
current_value_from_segment_or_log.final_embedding = Some(emb.as_slice());
}
}
Operation::Upsert => {
Expand All @@ -647,7 +644,7 @@ pub async fn materialize_logs<'me>(
id_to_materialized.get_mut(log_record.record.id.as_str()),
) {
(Some(existing_record), None) => {
// Update.
// Update the existing record in the segment
let mut materialized_operation = MaterializedLogRecord {
data_record: Some(existing_record.0.clone()),
offset_id: existing_record.1,
Expand Down Expand Up @@ -682,38 +679,37 @@ pub async fn materialize_logs<'me>(
.insert(log_record.record.id.as_str(), materialized_operation);
}
(None, Some(materialized_operation)) => {
// Update.
// todo: rename
let record_from_map = materialized_operation;
// Update (the ID is not present in the segment, so modify the existing log operation)
match merge_update_metadata(
(
&record_from_map.metadata_to_be_merged,
&record_from_map.metadata_to_be_deleted,
&materialized_operation.metadata_to_be_merged,
&materialized_operation.metadata_to_be_deleted,
),
&log_record.record.metadata,
) {
Ok(meta) => {
record_from_map.metadata_to_be_merged = meta.0;
record_from_map.metadata_to_be_deleted = meta.1;
materialized_operation.metadata_to_be_merged = meta.0;
materialized_operation.metadata_to_be_deleted = meta.1;
}
Err(e) => {
return Err(LogMaterializerError::MetadataMaterialization(e));
}
};
if let Some(doc) = log_record.record.document.as_ref() {
record_from_map.final_document = Some(doc);
materialized_operation.final_document = Some(doc);
}
if let Some(emb) = log_record.record.embedding.as_ref() {
record_from_map.final_embedding = Some(emb.as_slice());
materialized_operation.final_embedding = Some(emb.as_slice());
}
// This record is not present on storage yet hence final operation is
// AddNew and not UpdateExisting.
record_from_map.final_operation = MaterializedLogOperation::AddNew;
materialized_operation.final_operation = MaterializedLogOperation::AddNew;
}
(Some(_), Some(materialized_operation)) => {
// ID exists in both the segment and the processed log records
match materialized_operation.final_operation {
MaterializedLogOperation::DeleteExisting => {
let current_value = id_to_materialized.remove(log_record.record.id.as_str()).unwrap(); // todo
let current_value = id_to_materialized.remove(log_record.record.id.as_str()).expect("Exists because get_mut() returned Some(_).");

// Overwrite.
let mut materialized_record =
Expand Down

0 comments on commit 2458959

Please sign in to comment.