Skip to content

Commit

Permalink
Rename field
Browse files Browse the repository at this point in the history
  • Loading branch information
Sicheng Pan authored and Sicheng-Pan committed Dec 17, 2024
1 parent 2bb040f commit cf8f66a
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 48 deletions.
4 changes: 2 additions & 2 deletions rust/types/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,14 @@ impl TryFrom<chroma_proto::Collection> for Collection {
}

#[derive(Clone, Debug)]
pub struct CollectionSegments {
pub struct CollectionAndSegments {
pub collection: Collection,
pub metadata_segment: Segment,
pub record_segment: Segment,
pub vector_segment: Segment,
}

impl TryFrom<chroma_proto::ScanOperator> for CollectionSegments {
impl TryFrom<chroma_proto::ScanOperator> for CollectionAndSegments {
type Error = ConversionError;

fn try_from(value: chroma_proto::ScanOperator) -> Result<Self, ConversionError> {
Expand Down
16 changes: 8 additions & 8 deletions rust/worker/src/execution/orchestration/get.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use chroma_blockstore::provider::BlockfileProvider;
use chroma_error::{ChromaError, ErrorCodes};
use chroma_types::CollectionSegments;
use chroma_types::CollectionAndSegments;
use thiserror::Error;
use tokio::sync::oneshot::{self, error::RecvError, Sender};
use tonic::async_trait;
Expand Down Expand Up @@ -124,7 +124,7 @@ pub struct GetOrchestrator {
queue: usize,

// Collection segments
collection_segments: CollectionSegments,
collection_and_segments: CollectionAndSegments,

// Fetch logs
fetch_log: FetchLogOperator,
Expand All @@ -147,7 +147,7 @@ impl GetOrchestrator {
blockfile_provider: BlockfileProvider,
dispatcher: ComponentHandle<Dispatcher>,
queue: usize,
segments: CollectionSegments,
collection_and_segments: CollectionAndSegments,
fetch_log: FetchLogOperator,
filter: FilterOperator,
limit: LimitOperator,
Expand All @@ -157,7 +157,7 @@ impl GetOrchestrator {
blockfile_provider,
dispatcher,
queue,
collection_segments: segments,
collection_and_segments,
fetch_log,
fetched_logs: None,
filter,
Expand Down Expand Up @@ -229,8 +229,8 @@ impl Handler<TaskResult<FetchLogOutput, FetchLogError>> for GetOrchestrator {
FilterInput {
logs: output,
blockfile_provider: self.blockfile_provider.clone(),
metadata_segment: self.collection_segments.metadata_segment.clone(),
record_segment: self.collection_segments.record_segment.clone(),
metadata_segment: self.collection_and_segments.metadata_segment.clone(),
record_segment: self.collection_and_segments.record_segment.clone(),
},
ctx.receiver(),
);
Expand Down Expand Up @@ -265,7 +265,7 @@ impl Handler<TaskResult<FilterOutput, FilterError>> for GetOrchestrator {
.expect("FetchLogOperator should have finished already")
.clone(),
blockfile_provider: self.blockfile_provider.clone(),
record_segment: self.collection_segments.record_segment.clone(),
record_segment: self.collection_and_segments.record_segment.clone(),
log_offset_ids: output.log_offset_ids,
compact_offset_ids: output.compact_offset_ids,
},
Expand Down Expand Up @@ -301,7 +301,7 @@ impl Handler<TaskResult<LimitOutput, LimitError>> for GetOrchestrator {
.expect("FetchLogOperator should have finished already")
.clone(),
blockfile_provider: self.blockfile_provider.clone(),
record_segment: self.collection_segments.record_segment.clone(),
record_segment: self.collection_and_segments.record_segment.clone(),
offset_ids: output.offset_ids.iter().collect(),
};

Expand Down
22 changes: 11 additions & 11 deletions rust/worker/src/execution/orchestration/knn_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use chroma_blockstore::provider::BlockfileProvider;
use chroma_distance::DistanceFunction;
use chroma_error::{ChromaError, ErrorCodes};
use chroma_index::hnsw_provider::HnswIndexProvider;
use chroma_types::{CollectionSegments, Segment};
use chroma_types::{CollectionAndSegments, Segment};
use thiserror::Error;
use tokio::sync::oneshot::{self, error::RecvError, Sender};
use tonic::async_trait;
Expand Down Expand Up @@ -152,7 +152,7 @@ pub struct KnnFilterOrchestrator {
queue: usize,

// Collection segments
collection_segments: CollectionSegments,
collection_and_segments: CollectionAndSegments,

// Fetch logs
fetch_log: FetchLogOperator,
Expand All @@ -173,7 +173,7 @@ impl KnnFilterOrchestrator {
dispatcher: ComponentHandle<Dispatcher>,
hnsw_provider: HnswIndexProvider,
queue: usize,
collection_segments: CollectionSegments,
collection_and_segments: CollectionAndSegments,
fetch_log: FetchLogOperator,
filter: FilterOperator,
) -> Self {
Expand All @@ -182,7 +182,7 @@ impl KnnFilterOrchestrator {
dispatcher,
hnsw_provider,
queue,
collection_segments,
collection_and_segments,
fetch_log,
fetched_logs: None,
filter,
Expand Down Expand Up @@ -252,8 +252,8 @@ impl Handler<TaskResult<FetchLogOutput, FetchLogError>> for KnnFilterOrchestrato
FilterInput {
logs: output,
blockfile_provider: self.blockfile_provider.clone(),
metadata_segment: self.collection_segments.metadata_segment.clone(),
record_segment: self.collection_segments.record_segment.clone(),
metadata_segment: self.collection_and_segments.metadata_segment.clone(),
record_segment: self.collection_and_segments.record_segment.clone(),
},
ctx.receiver(),
);
Expand All @@ -279,23 +279,23 @@ impl Handler<TaskResult<FilterOutput, FilterError>> for KnnFilterOrchestrator {
return;
}
};
let collection_dimension = match self.collection_segments.collection.dimension {
let collection_dimension = match self.collection_and_segments.collection.dimension {
Some(dimension) => dimension as u32,
None => {
self.terminate_with_error(ctx, KnnError::NoCollectionDimension);
return;
}
};
let distance_function =
match distance_function_from_segment(&self.collection_segments.vector_segment) {
match distance_function_from_segment(&self.collection_and_segments.vector_segment) {
Ok(distance_function) => distance_function,
Err(_) => {
self.terminate_with_error(ctx, KnnError::InvalidDistanceFunction);
return;
}
};
let hnsw_reader = match DistributedHNSWSegmentReader::from_segment(
&self.collection_segments.vector_segment,
&self.collection_and_segments.vector_segment,
collection_dimension as usize,
self.hnsw_provider.clone(),
)
Expand All @@ -321,8 +321,8 @@ impl Handler<TaskResult<FilterOutput, FilterError>> for KnnFilterOrchestrator {
distance_function,
filter_output: output,
hnsw_reader,
record_segment: self.collection_segments.record_segment.clone(),
vector_segment: self.collection_segments.vector_segment.clone(),
record_segment: self.collection_and_segments.record_segment.clone(),
vector_segment: self.collection_and_segments.vector_segment.clone(),
dimension: collection_dimension as usize,
}))
.is_err()
Expand Down
44 changes: 17 additions & 27 deletions rust/worker/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{iter::once, str::FromStr};
use std::iter::once;

use chroma_blockstore::provider::BlockfileProvider;
use chroma_config::Configurable;
Expand All @@ -9,7 +9,7 @@ use chroma_types::{
self, query_executor_server::QueryExecutor, CountPlan, CountResult, GetPlan, GetResult,
KnnBatchResult, KnnPlan,
},
CollectionSegments, CollectionUuid, SegmentUuid,
CollectionAndSegments,
};
use futures::{stream, StreamExt, TryStreamExt};
use tokio::signal::unix::{signal, SignalKind};
Expand Down Expand Up @@ -131,16 +131,16 @@ impl WorkerServer {
self.system = Some(system);
}

fn fetch_log(&self, collection_segments: &CollectionSegments) -> FetchLogOperator {
fn fetch_log(&self, collection_and_segments: &CollectionAndSegments) -> FetchLogOperator {
FetchLogOperator {
log_client: self.log.clone(),
// TODO: Make this configurable
batch_size: 100,
// The collection log position is inclusive, and we want to start from the next log
// Note that we query using the incoming log position this is critical for correctness
start_log_offset_id: collection_segments.collection.log_position as u32 + 1,
start_log_offset_id: collection_and_segments.collection.log_position as u32 + 1,
maximum_fetch_count: None,
collection_uuid: collection_segments.collection.collection_id,
collection_uuid: collection_and_segments.collection.collection_id,
}
}

Expand All @@ -153,23 +153,13 @@ impl WorkerServer {
.scan
.ok_or(Status::invalid_argument("Invalid Scan Operator"))?;

let collection = &scan
.collection
.ok_or(Status::invalid_argument("Invalid collection"))?;
let collection_and_segments = CollectionAndSegments::try_from(scan)?;
let collection = &collection_and_segments.collection;

let count_orchestrator = CountQueryOrchestrator::new(
self.clone_system()?,
&SegmentUuid::from_str(
&scan
.metadata
.ok_or(Status::invalid_argument(
"Invalid metadata segment information",
))?
.id,
)?
.0,
&CollectionUuid::from_str(&collection.id)
.map_err(|e| Status::invalid_argument(e.to_string()))?,
&collection_and_segments.metadata_segment.id.0,
&collection.collection_id,
self.log.clone(),
self.sysdb.clone(),
self.clone_dispatcher()?,
Expand All @@ -192,8 +182,8 @@ impl WorkerServer {
.scan
.ok_or(Status::invalid_argument("Invalid Scan Operator"))?;

let collection_segments = scan.try_into()?;
let fetch_log = self.fetch_log(&collection_segments);
let collection_and_segments = scan.try_into()?;
let fetch_log = self.fetch_log(&collection_and_segments);

let filter = get_inner
.filter
Expand All @@ -212,7 +202,7 @@ impl WorkerServer {
self.clone_dispatcher()?,
// TODO: Make this configurable
1000,
collection_segments,
collection_and_segments,
fetch_log,
filter.try_into()?,
limit.into(),
Expand All @@ -238,9 +228,9 @@ impl WorkerServer {
.scan
.ok_or(Status::invalid_argument("Invalid Scan Operator"))?;

let collection_segments = scan.try_into()?;
let collection_and_segments = scan.try_into()?;

let fetch_log = self.fetch_log(&collection_segments);
let fetch_log = self.fetch_log(&collection_and_segments);

let filter = knn_inner
.filter
Expand All @@ -262,8 +252,8 @@ impl WorkerServer {

// If dimension is not set and segment is uninitialized, we assume
// this is a query on empty collection, so we return early here
if collection_segments.collection.dimension.is_none()
&& collection_segments.vector_segment.file_path.is_empty()
if collection_and_segments.collection.dimension.is_none()
&& collection_and_segments.vector_segment.file_path.is_empty()
{
return Ok(Response::new(to_proto_knn_batch_result(
once(Default::default())
Expand All @@ -279,7 +269,7 @@ impl WorkerServer {
self.hnsw_index_provider.clone(),
// TODO: Make this configurable
1000,
collection_segments,
collection_and_segments,
fetch_log,
filter.try_into()?,
);
Expand Down

0 comments on commit cf8f66a

Please sign in to comment.