Skip to content

Commit

Permalink
Filter outdated version
Browse files Browse the repository at this point in the history
  • Loading branch information
sanketkedia committed Dec 4, 2024
1 parent 3e17802 commit 0af2979
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 22 deletions.
56 changes: 56 additions & 0 deletions rust/index/src/spann/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1348,6 +1348,8 @@ pub enum SpannIndexReaderError {
BlockfileReaderConstructionError,
#[error("Spann index uninitialized")]
UninitializedIndex,
#[error("Error reading posting list")]
PostingListReadError,
}

impl ChromaError for SpannIndexReaderError {
Expand All @@ -1356,15 +1358,24 @@ impl ChromaError for SpannIndexReaderError {
Self::HnswIndexConstructionError => ErrorCodes::Internal,
Self::BlockfileReaderConstructionError => ErrorCodes::Internal,
Self::UninitializedIndex => ErrorCodes::Internal,
Self::PostingListReadError => ErrorCodes::Internal,
}
}
}

#[derive(Debug)]
pub struct SpannPosting {
pub doc_offset_id: u32,
pub doc_version: u32,
pub doc_embedding: Vec<f32>,
}

#[derive(Clone)]
pub struct SpannIndexReader<'me> {
pub posting_lists: BlockfileReader<'me, u32, SpannPostingList<'me>>,
pub hnsw_index: HnswIndexRef,
pub versions_map: BlockfileReader<'me, u32, u32>,
pub dimensionality: usize,
}

impl<'me> SpannIndexReader<'me> {
Expand Down Expand Up @@ -1454,8 +1465,53 @@ impl<'me> SpannIndexReader<'me> {
posting_lists: postings_list_reader,
hnsw_index: hnsw_reader,
versions_map: versions_map_reader,
dimensionality,
})
}

async fn is_outdated(
&self,
doc_offset_id: u32,
doc_version: u32,
) -> Result<bool, SpannIndexReaderError> {
let actual_version = self
.versions_map
.get("", doc_offset_id)
.await
.map_err(|_| SpannIndexReaderError::PostingListReadError)?
.ok_or(SpannIndexReaderError::PostingListReadError)?;
Ok(actual_version == 0 || doc_version < actual_version)
}

pub async fn fetch_posting_list(
&self,
head_id: u32,
) -> Result<Vec<SpannPosting>, SpannIndexReaderError> {
let res = self
.posting_lists
.get("", head_id)
.await
.map_err(|_| SpannIndexReaderError::PostingListReadError)?
.ok_or(SpannIndexReaderError::PostingListReadError)?;

let mut posting_lists = Vec::with_capacity(res.doc_offset_ids.len());
for (index, doc_offset_id) in res.doc_offset_ids.iter().enumerate() {
if self
.is_outdated(*doc_offset_id, res.doc_versions[index])
.await?
{
continue;
}
posting_lists.push(SpannPosting {
doc_offset_id: *doc_offset_id,
doc_version: res.doc_versions[index],
doc_embedding: res.doc_embeddings
[index * self.dimensionality..(index + 1) * self.dimensionality]
.to_vec(),
});
}
Ok(posting_lists)
}
}

#[cfg(test)]
Expand Down
13 changes: 4 additions & 9 deletions rust/worker/src/execution/operators/spann_fetch_pl.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use chroma_error::{ChromaError, ErrorCodes};
use chroma_index::spann::types::SpannPosting;
use thiserror::Error;
use tonic::async_trait;

Expand All @@ -15,9 +16,7 @@ pub struct SpannFetchPlInput {

#[derive(Debug)]
pub struct SpannFetchPlOutput {
doc_offset_ids: Vec<u32>,
doc_versions: Vec<u32>,
doc_embeddings: Vec<f32>,
posting_list: Vec<SpannPosting>,
}

#[derive(Error, Debug)]
Expand Down Expand Up @@ -62,14 +61,10 @@ impl Operator<SpannFetchPlInput, SpannFetchPlOutput> for SpannFetchPlOperator {
)
.await
.map_err(|_| SpannFetchPlError::SpannSegmentReaderCreationError)?;
let pl = spann_reader
let posting_list = spann_reader
.fetch_posting_list(input.head_id)
.await
.map_err(|_| SpannFetchPlError::SpannSegmentReaderError)?;
Ok(SpannFetchPlOutput {
doc_offset_ids: pl.doc_offset_ids.to_vec(),
doc_versions: pl.doc_versions.to_vec(),
doc_embeddings: pl.doc_embeddings.to_vec(),
})
Ok(SpannFetchPlOutput { posting_list })
}
}
18 changes: 5 additions & 13 deletions rust/worker/src/segment/spann_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use chroma_blockstore::provider::BlockfileProvider;
use chroma_distance::DistanceFunctionError;
use chroma_error::{ChromaError, ErrorCodes};
use chroma_index::spann::types::{
SpannIndexFlusher, SpannIndexReader, SpannIndexReaderError, SpannIndexWriterError,
SpannIndexFlusher, SpannIndexReader, SpannIndexReaderError, SpannIndexWriterError, SpannPosting,
};
use chroma_index::IndexUuid;
use chroma_index::{hnsw_provider::HnswIndexProvider, spann::types::SpannIndexWriter};
Expand Down Expand Up @@ -461,19 +461,11 @@ impl<'me> SpannSegmentReader<'me> {
pub async fn fetch_posting_list(
&self,
head_id: u32,
) -> Result<SpannPostingList<'_>, SpannSegmentReaderError> {
let res = self
.index_reader
.posting_lists
.get("", head_id)
) -> Result<Vec<SpannPosting>, SpannSegmentReaderError> {
self.index_reader
.fetch_posting_list(head_id)
.await
.map_err(|_| SpannSegmentReaderError::KeyReadError)?;
match res {
Some(pl) => Ok(pl),
None => {
panic!("Invariant violation. Key present in hnsw but not in posting list")
}
}
.map_err(|_| SpannSegmentReaderError::KeyReadError)
}
}

Expand Down

0 comments on commit 0af2979

Please sign in to comment.