Skip to content

Commit

Permalink
[CLN] Implement Orchestrator trait and cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
Sicheng Pan committed Dec 20, 2024
1 parent 9ed5b07 commit 12e9d6d
Show file tree
Hide file tree
Showing 34 changed files with 608 additions and 990 deletions.
5 changes: 4 additions & 1 deletion rust/worker/benches/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use load::{
};
use worker::{
config::RootConfig,
execution::{dispatcher::Dispatcher, orchestration::get::GetOrchestrator},
execution::{
dispatcher::Dispatcher,
orchestration::{get::GetOrchestrator, orchestrator::Orchestrator},
},
segment::test::TestSegment,
system::{ComponentHandle, System},
};
Expand Down
1 change: 1 addition & 0 deletions rust/worker/benches/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use worker::{
orchestration::{
knn::KnnOrchestrator,
knn_filter::{KnnFilterOrchestrator, KnnFilterOutput},
orchestrator::Orchestrator,
},
},
segment::test::TestSegment,
Expand Down
8 changes: 4 additions & 4 deletions rust/worker/src/compactor/compaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::compactor::types::CompactionJob;
use crate::compactor::types::ScheduleMessage;
use crate::config::CompactionServiceConfig;
use crate::execution::dispatcher::Dispatcher;
use crate::execution::orchestration::orchestrator::Orchestrator;
use crate::execution::orchestration::CompactOrchestrator;
use crate::execution::orchestration::CompactionResponse;
use crate::log::log::Log;
Expand Down Expand Up @@ -115,7 +116,6 @@ impl CompactionManager {
Some(ref system) => {
let orchestrator = CompactOrchestrator::new(
compaction_job.clone(),
system.clone(),
compaction_job.collection_id,
self.log.clone(),
self.sysdb.clone(),
Expand All @@ -129,14 +129,14 @@ impl CompactionManager {
self.max_partition_size,
);

match orchestrator.run().await {
match orchestrator.run(system.clone()).await {
Ok(result) => {
tracing::info!("Compaction Job completed: {:?}", result);
return Ok(result);
}
Err(e) => {
tracing::error!("Compaction Job failed: {:?}", e);
return Err(e);
return Err(Box::new(e));
}
}
}
Expand Down Expand Up @@ -280,7 +280,7 @@ impl Component for CompactionManager {
self.compaction_manager_queue_size
}

async fn on_start(&mut self, ctx: &crate::system::ComponentContext<Self>) -> () {
async fn start(&mut self, ctx: &crate::system::ComponentContext<Self>) -> () {
println!("Starting CompactionManager");
ctx.scheduler
.schedule(ScheduleMessage {}, self.compaction_interval, ctx, || {
Expand Down
6 changes: 3 additions & 3 deletions rust/worker/src/execution/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl Component for Dispatcher {
self.queue_size
}

async fn on_start(&mut self, ctx: &ComponentContext<Self>) {
async fn start(&mut self, ctx: &ComponentContext<Self>) {
self.spawn_workers(&mut ctx.system.clone(), ctx.receiver());
}
}
Expand Down Expand Up @@ -314,7 +314,7 @@ mod tests {
1000
}

async fn on_start(&mut self, ctx: &ComponentContext<Self>) {
async fn start(&mut self, ctx: &ComponentContext<Self>) {
// dispatch a new task every DISPATCH_FREQUENCY_MS for DISPATCH_COUNT times
let duration = std::time::Duration::from_millis(DISPATCH_FREQUENCY_MS);
ctx.scheduler
Expand Down Expand Up @@ -377,7 +377,7 @@ mod tests {
1000
}

async fn on_start(&mut self, ctx: &ComponentContext<Self>) {
async fn start(&mut self, ctx: &ComponentContext<Self>) {
// dispatch a new task every DISPATCH_FREQUENCY_MS for DISPATCH_COUNT times
let duration = std::time::Duration::from_millis(DISPATCH_FREQUENCY_MS);
ctx.scheduler
Expand Down
15 changes: 3 additions & 12 deletions rust/worker/src/execution/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,6 @@ where
}
}

impl<Err> TaskError<Err>
where
Err: Debug + ChromaError + 'static,
{
pub(super) fn boxed(self) -> Box<dyn ChromaError> {
Box::new(self)
}
}

/// A task result is a wrapper around the result of a task.
/// It contains the task id for tracking purposes.
#[derive(Debug)]
Expand Down Expand Up @@ -94,12 +85,12 @@ where
}

/// A message type used by the dispatcher to send tasks to worker threads.
pub(crate) type TaskMessage = Box<dyn TaskWrapper>;
pub type TaskMessage = Box<dyn TaskWrapper>;

/// A task wrapper is a trait that can be used to run a task. We use it to
/// erase the I, O types from the Task struct so that tasks.
#[async_trait]
pub(crate) trait TaskWrapper: Send + Debug {
pub trait TaskWrapper: Send + Debug {
fn get_name(&self) -> &'static str;
async fn run(&self);
#[allow(dead_code)]
Expand Down Expand Up @@ -264,7 +255,7 @@ mod tests {
1000
}

async fn on_start(&mut self, ctx: &ComponentContext<Self>) {
async fn start(&mut self, ctx: &ComponentContext<Self>) {
let task = wrap(Box::new(MockOperator {}), (), ctx.receiver());
self.dispatcher.send(task, None).await.unwrap();
}
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/execution/operators/count_records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use crate::{
execution::operator::Operator,
segment::record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError},
};
use async_trait::async_trait;
use chroma_blockstore::provider::BlockfileProvider;
use chroma_error::{ChromaError, ErrorCodes};
use chroma_types::{Chunk, LogRecord, Operation, Segment};
use std::collections::HashSet;
use thiserror::Error;
use tonic::async_trait;

#[derive(Debug)]
pub(crate) struct CountRecordsOperator {}
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/execution/operators/fetch_log.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::time::{SystemTime, SystemTimeError, UNIX_EPOCH};

use async_trait::async_trait;
use chroma_error::{ChromaError, ErrorCodes};
use chroma_types::{Chunk, CollectionUuid, LogRecord};
use thiserror::Error;
use tonic::async_trait;
use tracing::trace;

use crate::{
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/execution/operators/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
ops::{BitAnd, BitOr, Bound},
};

use async_trait::async_trait;
use chroma_blockstore::provider::BlockfileProvider;
use chroma_error::{ChromaError, ErrorCodes};
use chroma_index::metadata::types::MetadataIndexError;
Expand All @@ -13,7 +14,6 @@ use chroma_types::{
};
use roaring::RoaringBitmap;
use thiserror::Error;
use tonic::async_trait;
use tracing::{trace, Instrument, Span};

use crate::{
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/execution/operators/knn_hnsw.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use async_trait::async_trait;
use chroma_distance::{normalize, DistanceFunction};
use chroma_error::{ChromaError, ErrorCodes};
use chroma_types::SignedRoaringBitmap;
use thiserror::Error;
use tonic::async_trait;

use crate::{
execution::operator::Operator, segment::distributed_hnsw_segment::DistributedHNSWSegmentReader,
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/execution/operators/knn_log.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::collections::BinaryHeap;

use async_trait::async_trait;
use chroma_blockstore::provider::BlockfileProvider;
use chroma_distance::{normalize, DistanceFunction};
use chroma_error::ChromaError;
use chroma_types::{MaterializedLogOperation, Segment, SignedRoaringBitmap};
use thiserror::Error;
use tonic::async_trait;

use crate::{
execution::operator::Operator,
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/execution/operators/knn_merge.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use tonic::async_trait;
use async_trait::async_trait;

use crate::execution::operator::Operator;

Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/execution/operators/knn_projection.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use async_trait::async_trait;
use chroma_blockstore::provider::BlockfileProvider;
use chroma_error::ChromaError;
use chroma_types::Segment;
use thiserror::Error;
use tonic::async_trait;
use tracing::trace;

use crate::execution::{operator::Operator, operators::projection::ProjectionInput};
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/execution/operators/limit.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::{cmp::Ordering, num::TryFromIntError, sync::atomic};

use async_trait::async_trait;
use chroma_blockstore::provider::BlockfileProvider;
use chroma_error::{ChromaError, ErrorCodes};
use chroma_types::{Chunk, LogRecord, MaterializedLogOperation, Segment, SignedRoaringBitmap};
use roaring::RoaringBitmap;
use thiserror::Error;
use tonic::async_trait;
use tracing::{trace, Instrument, Span};

use crate::{
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/execution/operators/prefetch_record.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::collections::HashSet;

use async_trait::async_trait;
use chroma_error::{ChromaError, ErrorCodes};
use thiserror::Error;
use tonic::async_trait;
use tracing::{trace, Instrument, Span};

use crate::{
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/execution/operators/spann_bf_pl.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::collections::BinaryHeap;

use async_trait::async_trait;
use chroma_distance::DistanceFunction;
use chroma_error::{ChromaError, ErrorCodes};
use chroma_index::spann::types::SpannPosting;
use chroma_types::SignedRoaringBitmap;
use thiserror::Error;
use tonic::async_trait;

use crate::execution::operator::Operator;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use async_trait::async_trait;
use chroma_distance::DistanceFunction;
use chroma_error::{ChromaError, ErrorCodes};
use chroma_index::spann::utils::rng_query;
use thiserror::Error;
use tonic::async_trait;

use crate::{
execution::operator::Operator,
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/execution/operators/spann_fetch_pl.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_trait::async_trait;
use chroma_error::{ChromaError, ErrorCodes};
use chroma_index::spann::types::SpannPosting;
use thiserror::Error;
use tonic::async_trait;

use crate::{
execution::operator::{Operator, OperatorType},
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/execution/operators/spann_knn_merge.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{cmp::Ordering, collections::BinaryHeap};

use tonic::async_trait;
use async_trait::async_trait;

use crate::execution::operator::Operator;

Expand Down
32 changes: 0 additions & 32 deletions rust/worker/src/execution/orchestration/common.rs

This file was deleted.

Loading

0 comments on commit 12e9d6d

Please sign in to comment.