From f6316fbd28c2027734dd2909e99bbde012fb2040 Mon Sep 17 00:00:00 2001 From: Sanket Kedia Date: Sat, 23 Nov 2024 20:08:39 -0800 Subject: [PATCH] Implement update and delete --- rust/index/src/spann/types.rs | 12 ++++++++ rust/worker/src/segment/spann_segment.rs | 36 ++++++++++++++++++++++-- 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/rust/index/src/spann/types.rs b/rust/index/src/spann/types.rs index f82761f66f1b..ace1e271eff2 100644 --- a/rust/index/src/spann/types.rs +++ b/rust/index/src/spann/types.rs @@ -945,6 +945,18 @@ impl SpannIndexWriter { .await } + pub async fn update(&self, id: u32, embedding: &[f32]) -> Result<(), SpannIndexWriterError> { + // Delete and then add. + self.delete(id).await?; + self.add(id, embedding).await + } + + pub async fn delete(&self, id: u32) -> Result<(), SpannIndexWriterError> { + let mut version_map_guard = self.versions_map.write(); + version_map_guard.versions_map.insert(id, 0); + Ok(()) + } + pub async fn commit(self) -> Result { // Pl list. let pl_flusher = match Arc::try_unwrap(self.posting_list_writer) { diff --git a/rust/worker/src/segment/spann_segment.rs b/rust/worker/src/segment/spann_segment.rs index 1a037a0c007f..801c7e565c74 100644 --- a/rust/worker/src/segment/spann_segment.rs +++ b/rust/worker/src/segment/spann_segment.rs @@ -206,6 +206,26 @@ impl SpannSegmentWriter { .await .map_err(SpannSegmentWriterError::SpannSegmentWriterAddRecordError) } + + async fn delete( + &self, + record: &MaterializedLogRecord<'_>, + ) -> Result<(), SpannSegmentWriterError> { + self.index + .delete(record.offset_id) + .await + .map_err(SpannSegmentWriterError::SpannSegmentWriterAddRecordError) + } + + async fn update( + &self, + record: &MaterializedLogRecord<'_>, + ) -> Result<(), SpannSegmentWriterError> { + self.index + .update(record.offset_id, record.merged_embeddings()) + .await + .map_err(SpannSegmentWriterError::SpannSegmentWriterAddRecordError) + } } struct SpannSegmentFlusher { @@ -224,10 +244,20 @@ impl<'referred_data> SegmentWriter<'referred_data> for SpannSegmentWriter { .await .map_err(ApplyMaterializedLogError::SpannSegmentError)?; } - // TODO(Sanket): Implement other operations. - _ => { - todo!() + MaterializedLogOperation::UpdateExisting + | MaterializedLogOperation::OverwriteExisting => { + self.update(record) + .await + .map_err(|_| ApplyMaterializedLogError::BlockfileUpdate)?; + } + MaterializedLogOperation::DeleteExisting => { + self.delete(record) + .await + .map_err(|_| ApplyMaterializedLogError::BlockfileDelete)?; } + MaterializedLogOperation::Initial => panic!( + "Invariant violation. Mat records should not contain logs in initial state" + ), } } Ok(())