diff --git a/rust/index/src/spann/types.rs b/rust/index/src/spann/types.rs index bf11661eb771..a91d7c4283dd 100644 --- a/rust/index/src/spann/types.rs +++ b/rust/index/src/spann/types.rs @@ -900,6 +900,22 @@ impl SpannIndexWriter { self.add_postings_list(id, version, embedding).await } + pub async fn update( + &self, + id: u32, + embedding: &[f32], + ) -> Result<(), SpannIndexWriterConstructionError> { + // Delete and then add. + self.delete(id).await?; + self.add(id, embedding).await + } + + pub async fn delete(&self, id: u32) -> Result<(), SpannIndexWriterConstructionError> { + let mut version_map_guard = self.versions_map.write(); + version_map_guard.versions_map.insert(id, 0); + Ok(()) + } + // TODO(Sanket): Change the error types. pub async fn commit(self) -> Result { // Pl list. diff --git a/rust/worker/src/segment/spann_segment.rs b/rust/worker/src/segment/spann_segment.rs index 56593a75853f..fdfce848dab5 100644 --- a/rust/worker/src/segment/spann_segment.rs +++ b/rust/worker/src/segment/spann_segment.rs @@ -204,6 +204,26 @@ impl SpannSegmentWriter { .await .map_err(|_| SpannSegmentWriterError::SpannSegmentWriterAddRecordError) } + + async fn delete( + &self, + record: &MaterializedLogRecord<'_>, + ) -> Result<(), SpannSegmentWriterError> { + self.index + .delete(record.offset_id) + .await + .map_err(|_| SpannSegmentWriterError::SpannSegmentWriterAddRecordError) + } + + async fn update( + &self, + record: &MaterializedLogRecord<'_>, + ) -> Result<(), SpannSegmentWriterError> { + self.index + .update(record.offset_id, record.merged_embeddings()) + .await + .map_err(|_| SpannSegmentWriterError::SpannSegmentWriterAddRecordError) + } } struct SpannSegmentFlusher { @@ -222,10 +242,20 @@ impl<'a> SegmentWriter<'a> for SpannSegmentWriter { .await .map_err(|_| ApplyMaterializedLogError::BlockfileSet)?; } - // TODO(Sanket): Implement other operations. - _ => { - todo!() + MaterializedLogOperation::UpdateExisting + | MaterializedLogOperation::OverwriteExisting => { + self.update(record) + .await + .map_err(|_| ApplyMaterializedLogError::BlockfileUpdate)?; + } + MaterializedLogOperation::DeleteExisting => { + self.delete(record) + .await + .map_err(|_| ApplyMaterializedLogError::BlockfileDelete)?; } + MaterializedLogOperation::Initial => panic!( + "Invariant violation. Mat records should not contain logs in initial state" + ), } } Ok(())