diff --git a/rust/worker/src/execution/operators/mod.rs b/rust/worker/src/execution/operators/mod.rs index ab277505e5f2..e76d2e23b6f1 100644 --- a/rust/worker/src/execution/operators/mod.rs +++ b/rust/worker/src/execution/operators/mod.rs @@ -1,7 +1,6 @@ pub(super) mod count_records; pub(super) mod flush_s3; pub(super) mod partition; -pub(super) mod pull_log; pub(super) mod register; pub(super) mod spann_bf_pl; pub(super) mod spann_centers_search; diff --git a/rust/worker/src/execution/operators/pull_log.rs b/rust/worker/src/execution/operators/pull_log.rs deleted file mode 100644 index 6ae3c35abbbc..000000000000 --- a/rust/worker/src/execution/operators/pull_log.rs +++ /dev/null @@ -1,258 +0,0 @@ -use crate::execution::operator::{Operator, OperatorType}; -use crate::log::log::{Log, PullLogsError}; -use async_trait::async_trait; -use chroma_types::{Chunk, CollectionUuid, LogRecord}; - -/// The pull logs operator is responsible for reading logs from the log service. -#[derive(Debug)] -pub struct PullLogsOperator { - client: Box, -} - -impl PullLogsOperator { - /// Create a new pull logs operator. - /// # Parameters - /// * `client` - The log client to use for reading logs. - pub fn new(client: Box) -> Box { - Box::new(PullLogsOperator { client }) - } -} - -/// The input to the pull logs operator. -/// # Parameters -/// * `collection_id` - The collection id to read logs from. -/// * `offset` - The offset to start reading logs from. -/// * `batch_size` - The number of log entries to read. -/// * `num_records` - The maximum number of records to read. -/// * `end_timestamp` - The end timestamp to read logs until. -#[derive(Debug)] -pub struct PullLogsInput { - collection_id: CollectionUuid, - offset: i64, - batch_size: i32, - num_records: Option, - end_timestamp: Option, -} - -impl PullLogsInput { - /// Create a new pull logs input. - /// # Parameters - /// * `collection_id` - The collection id to read logs from. - /// * `offset` - The offset to start reading logs from. - /// * `batch_size` - The number of log entries to read. - /// * `num_records` - The maximum number of records to read. - /// * `end_timestamp` - The end timestamp to read logs until. - pub fn new( - collection_id: CollectionUuid, - offset: i64, - batch_size: i32, - num_records: Option, - end_timestamp: Option, - ) -> Self { - PullLogsInput { - collection_id, - offset, - batch_size, - num_records, - end_timestamp, - } - } -} - -/// The output of the pull logs operator. -#[derive(Debug)] -pub struct PullLogsOutput { - logs: Chunk, -} - -impl PullLogsOutput { - /// Create a new pull logs output. - /// # Parameters - /// * `logs` - The logs that were read. - pub fn new(logs: Chunk) -> Self { - PullLogsOutput { logs } - } - - /// Get the log entries that were read by an invocation of the pull logs operator. - /// # Returns - /// The log entries that were read. - pub fn logs(&self) -> Chunk { - self.logs.clone() - } -} - -#[async_trait] -impl Operator for PullLogsOperator { - type Error = PullLogsError; - - fn get_name(&self) -> &'static str { - "PullLogsOperator" - } - - fn get_type(&self) -> OperatorType { - OperatorType::IO - } - - async fn run(&self, input: &PullLogsInput) -> Result { - // We expect the log to be cheaply cloneable, we need to clone it since we need - // a mutable reference to it. Not necessarily the best, but it works for our needs. - let mut client_clone = self.client.clone(); - let batch_size = input.batch_size; - let mut num_records_read = 0; - let mut offset = input.offset; - let mut result = Vec::new(); - loop { - let logs = client_clone - .read(input.collection_id, offset, batch_size, input.end_timestamp) - .await; - - let mut logs = match logs { - Ok(logs) => logs, - Err(e) => { - return Err(e); - } - }; - - if logs.is_empty() { - break; - } - - num_records_read += logs.len(); - // unwrap here is safe because we just checked if empty - offset = logs.last().unwrap().log_offset + 1; - result.append(&mut logs); - - // We used a a timestamp and we didn't get a full batch, so we have retrieved - // the last batch of logs relevant to our query - if input.end_timestamp.is_some() && num_records_read < batch_size as usize { - break; - } - - // We have read all the records up to the size we wanted - if input.num_records.is_some() - && num_records_read >= input.num_records.unwrap() as usize - { - break; - } - } - if input.num_records.is_some() && result.len() > input.num_records.unwrap() as usize { - result.truncate(input.num_records.unwrap() as usize); - } - tracing::info!(name: "Pulled log records", num_records = result.len()); - // Convert to DataChunk - let data_chunk = Chunk::new(result.into()); - Ok(PullLogsOutput::new(data_chunk)) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::log::log::InMemoryLog; - use crate::log::log::InternalLogRecord; - use chroma_types::{CollectionUuid, LogRecord, Operation, OperationRecord}; - use std::str::FromStr; - - #[tokio::test] - async fn test_pull_logs() { - let mut log = Box::new(Log::InMemory(InMemoryLog::new())); - let collection_uuid_1 = - CollectionUuid::from_str("00000000-0000-0000-0000-000000000001").unwrap(); - - match *log { - Log::InMemory(ref mut log) => { - log.add_log( - collection_uuid_1, - InternalLogRecord { - collection_id: collection_uuid_1, - log_offset: 0, - log_ts: 1, - record: LogRecord { - log_offset: 0, - record: OperationRecord { - id: "embedding_id_1".to_string(), - embedding: None, - encoding: None, - metadata: None, - document: None, - operation: Operation::Add, - }, - }, - }, - ); - log.add_log( - collection_uuid_1, - InternalLogRecord { - collection_id: collection_uuid_1, - log_offset: 1, - log_ts: 2, - record: LogRecord { - log_offset: 1, - record: OperationRecord { - id: "embedding_id_2".to_string(), - embedding: None, - encoding: None, - metadata: None, - document: None, - operation: Operation::Add, - }, - }, - }, - ); - } - _ => panic!("Expected InMemoryLog"), - } - - let operator = PullLogsOperator::new(log); - - // Pull all logs from collection 1 - let input = PullLogsInput::new(collection_uuid_1, 0, 1, None, None); - let output = operator.run(&input).await.unwrap(); - assert_eq!(output.logs().len(), 2); - - // Pull all logs from collection 1 with a large batch size - let input = PullLogsInput::new(collection_uuid_1, 0, 100, None, None); - let output = operator.run(&input).await.unwrap(); - assert_eq!(output.logs().len(), 2); - - // Pull logs from collection 1 with a limit - let input = PullLogsInput::new(collection_uuid_1, 0, 1, Some(1), None); - let output = operator.run(&input).await.unwrap(); - assert_eq!(output.logs().len(), 1); - - // Pull logs from collection 1 with an end timestamp - let input = PullLogsInput::new(collection_uuid_1, 0, 1, None, Some(1)); - let output = operator.run(&input).await.unwrap(); - assert_eq!(output.logs().len(), 1); - - // Pull logs from collection 1 with an end timestamp - let input = PullLogsInput::new(collection_uuid_1, 0, 1, None, Some(2)); - let output = operator.run(&input).await.unwrap(); - assert_eq!(output.logs().len(), 2); - - // Pull logs from collection 1 with an end timestamp and a limit - let input = PullLogsInput::new(collection_uuid_1, 0, 1, Some(1), Some(2)); - let output = operator.run(&input).await.unwrap(); - assert_eq!(output.logs().len(), 1); - - // Pull logs from collection 1 with a limit and a large batch size - let input = PullLogsInput::new(collection_uuid_1, 0, 100, Some(1), None); - let output = operator.run(&input).await.unwrap(); - assert_eq!(output.logs().len(), 1); - - // Pull logs from collection 1 with an end timestamp and a large batch size - let input = PullLogsInput::new(collection_uuid_1, 0, 100, None, Some(1)); - let output = operator.run(&input).await.unwrap(); - assert_eq!(output.logs().len(), 1); - - // Pull logs from collection 1 with an end timestamp and a large batch size - let input = PullLogsInput::new(collection_uuid_1, 0, 100, None, Some(2)); - let output = operator.run(&input).await.unwrap(); - assert_eq!(output.logs().len(), 2); - - // Pull logs from collection 1 with an end timestamp and a limit and a large batch size - let input = PullLogsInput::new(collection_uuid_1, 0, 100, Some(1), Some(2)); - let output = operator.run(&input).await.unwrap(); - assert_eq!(output.logs().len(), 1); - } -} diff --git a/rust/worker/src/execution/orchestration/compact.rs b/rust/worker/src/execution/orchestration/compact.rs index 43bcca8869e3..9dc4f5c4e88d 100644 --- a/rust/worker/src/execution/orchestration/compact.rs +++ b/rust/worker/src/execution/orchestration/compact.rs @@ -2,6 +2,9 @@ use super::super::operator::wrap; use crate::compactor::CompactionJob; use crate::execution::dispatcher::Dispatcher; use crate::execution::operator::TaskResult; +use crate::execution::operators::fetch_log::FetchLogError; +use crate::execution::operators::fetch_log::FetchLogOperator; +use crate::execution::operators::fetch_log::FetchLogOutput; use crate::execution::operators::flush_s3::FlushS3Input; use crate::execution::operators::flush_s3::FlushS3Operator; use crate::execution::operators::flush_s3::FlushS3Output; @@ -9,9 +12,6 @@ use crate::execution::operators::partition::PartitionError; use crate::execution::operators::partition::PartitionInput; use crate::execution::operators::partition::PartitionOperator; use crate::execution::operators::partition::PartitionOutput; -use crate::execution::operators::pull_log::PullLogsInput; -use crate::execution::operators::pull_log::PullLogsOperator; -use crate::execution::operators::pull_log::PullLogsOutput; use crate::execution::operators::register::RegisterError; use crate::execution::operators::register::RegisterInput; use crate::execution::operators::register::RegisterOperator; @@ -22,7 +22,6 @@ use crate::execution::operators::write_segments::WriteSegmentsOperatorError; use crate::execution::operators::write_segments::WriteSegmentsOutput; use crate::execution::orchestration::common::terminate_with_error; use crate::log::log::Log; -use crate::log::log::PullLogsError; use crate::segment::distributed_hnsw_segment::DistributedHNSWSegmentWriter; use crate::segment::metadata_segment::MetadataSegmentWriter; use crate::segment::record_segment::RecordSegmentReader; @@ -46,8 +45,6 @@ use core::panic; use std::sync::atomic; use std::sync::atomic::AtomicU32; use std::sync::Arc; -use std::time::SystemTime; -use std::time::UNIX_EPOCH; use thiserror::Error; use tracing::Span; use uuid::Uuid; @@ -144,8 +141,8 @@ impl ChromaError for GetSegmentWritersError { #[derive(Error, Debug)] enum CompactionError { - #[error(transparent)] - SystemTimeError(#[from] std::time::SystemTimeError), + #[error("Task dispatch failed")] + DispatchFailure, #[error("Result channel dropped")] ResultChannelDropped, } @@ -207,44 +204,28 @@ impl CompactOrchestrator { } } - async fn pull_logs( + async fn fetch_log( &mut self, - self_address: Box>>, + self_address: Box>>, ctx: &crate::system::ComponentContext, ) { self.state = ExecutionState::PullLogs; - let operator = PullLogsOperator::new(self.log.clone()); - let collection_id = self.collection_id; - let end_timestamp = SystemTime::now().duration_since(UNIX_EPOCH); - let end_timestamp = match end_timestamp { - // TODO: change protobuf definition to use u64 instead of i64 - Ok(end_timestamp) => end_timestamp.as_nanos() as i64, - Err(e) => { - terminate_with_error( - self.result_channel.take(), - Box::new(CompactionError::SystemTimeError(e)), - ctx, - ); - return; - } + let operator = FetchLogOperator { + log_client: self.log.clone(), + batch_size: 100, + start_log_offset_id: self.compaction_job.offset as u32, + maximum_fetch_count: Some(self.max_compaction_size as u32), + collection_uuid: self.collection_id, }; - let input = PullLogsInput::new( - collection_id, - // Here we do not need to be inclusive since the compaction job - // offset is the one after the last compaction offset - self.compaction_job.offset, - 100, - Some(self.max_compaction_size as i32), - Some(end_timestamp), - ); - let task = wrap(operator, input, self_address); + let task = wrap(Box::new(operator), (), self_address); match self.dispatcher.send(task, Some(Span::current())).await { Ok(_) => (), Err(e) => { tracing::error!("Error dispatching pull logs for compaction {:?}", e); - panic!( - "Invariant violation. Somehow the dispatcher receiver is dropped. Error: {:?}", - e + terminate_with_error( + self.result_channel.take(), + Box::new(CompactionError::DispatchFailure), + ctx, ); } } @@ -542,23 +523,23 @@ impl Component for CompactOrchestrator { } async fn on_start(&mut self, ctx: &crate::system::ComponentContext) -> () { - self.pull_logs(ctx.receiver(), ctx).await; + self.fetch_log(ctx.receiver(), ctx).await; } } // ============== Handlers ============== #[async_trait] -impl Handler> for CompactOrchestrator { +impl Handler> for CompactOrchestrator { type Result = (); async fn handle( &mut self, - message: TaskResult, + message: TaskResult, ctx: &crate::system::ComponentContext, ) { let message = message.into_inner(); let records = match message { - Ok(result) => result.logs(), + Ok(result) => result, Err(e) => { terminate_with_error(self.result_channel.take(), Box::new(e), ctx); return; diff --git a/rust/worker/src/execution/orchestration/count.rs b/rust/worker/src/execution/orchestration/count.rs index 7141309571a9..215f17ab8251 100644 --- a/rust/worker/src/execution/orchestration/count.rs +++ b/rust/worker/src/execution/orchestration/count.rs @@ -3,9 +3,8 @@ use crate::execution::operator::{wrap, TaskResult}; use crate::execution::operators::count_records::{ CountRecordsError, CountRecordsInput, CountRecordsOperator, CountRecordsOutput, }; -use crate::execution::operators::pull_log::{PullLogsInput, PullLogsOperator, PullLogsOutput}; +use crate::execution::operators::fetch_log::{FetchLogError, FetchLogOperator, FetchLogOutput}; use crate::execution::orchestration::common::terminate_with_error; -use crate::log::log::PullLogsError; use crate::sysdb::sysdb::{GetCollectionsError, GetSegmentsError}; use crate::system::{Component, ComponentContext, ComponentHandle, Handler}; use crate::{log::log::Log, sysdb::sysdb::SysDb, system::System}; @@ -13,7 +12,6 @@ use async_trait::async_trait; use chroma_blockstore::provider::BlockfileProvider; use chroma_error::{ChromaError, ErrorCodes}; use chroma_types::{Collection, CollectionUuid, Segment, SegmentType, SegmentUuid}; -use std::time::{SystemTime, UNIX_EPOCH}; use thiserror::Error; use tracing::Span; use uuid::Uuid; @@ -56,6 +54,8 @@ enum CountQueryOrchestratorError { GetCollectionError(#[from] GetCollectionsError), #[error("Collection version mismatch")] CollectionVersionMismatch, + #[error("Task dispatch failed")] + DispatchFailure, } impl ChromaError for CountQueryOrchestratorError { @@ -70,6 +70,7 @@ impl ChromaError for CountQueryOrchestratorError { CountQueryOrchestratorError::CollectionNotFound(_) => ErrorCodes::NotFound, CountQueryOrchestratorError::GetCollectionError(e) => e.code(), CountQueryOrchestratorError::CollectionVersionMismatch => ErrorCodes::VersionMismatch, + CountQueryOrchestratorError::DispatchFailure => ErrorCodes::Internal, } } } @@ -162,50 +163,38 @@ impl CountQueryOrchestrator { self.record_segment = Some(record_segment); self.collection = Some(collection); - self.pull_logs(ctx).await; + self.fetch_log(ctx).await; } // shared - async fn pull_logs(&mut self, ctx: &ComponentContext) { + async fn fetch_log(&mut self, ctx: &ComponentContext) { println!("Count query orchestrator pulling logs"); - let operator = PullLogsOperator::new(self.log.clone()); - let end_timestamp = SystemTime::now().duration_since(UNIX_EPOCH); - let end_timestamp = match end_timestamp { - Ok(end_timestamp) => end_timestamp.as_nanos() as i64, - Err(e) => { - tracing::error!("Error getting system time: {:?}", e); - terminate_with_error( - self.result_channel.take(), - Box::new(CountQueryOrchestratorError::SystemTimeError(e)), - ctx, - ); - return; - } - }; - let collection = self .collection .as_ref() .expect("Invariant violation. Collection is not set before pull logs state."); - let input = PullLogsInput::new( - collection.collection_id, - // The collection log position is inclusive, and we want to start from the next log. - // Note that we query using the incoming log position this is critical for correctness - // TODO: We should make all the log service code use u64 instead of i64 - (self.log_position as i64) + 1, - 100, - None, - Some(end_timestamp), - ); - let task = wrap(operator, input, ctx.receiver()); + let operator = FetchLogOperator { + log_client: self.log.clone(), + batch_size: 100, + start_log_offset_id: self.log_position as u32 + 1, + maximum_fetch_count: None, + collection_uuid: collection.collection_id, + }; + + let task = wrap(Box::new(operator), (), ctx.receiver()); match self.dispatcher.send(task, Some(Span::current())).await { Ok(_) => (), Err(e) => { // Log an error - this implies the dispatcher was dropped somehow // and is likely fatal println!("Error sending Count Query task: {:?}", e); + terminate_with_error( + self.result_channel.take(), + Box::new(CountQueryOrchestratorError::DispatchFailure), + ctx, + ); } } } @@ -335,12 +324,12 @@ impl Component for CountQueryOrchestrator { } #[async_trait] -impl Handler> for CountQueryOrchestrator { +impl Handler> for CountQueryOrchestrator { type Result = (); async fn handle( &mut self, - message: TaskResult, + message: TaskResult, ctx: &ComponentContext, ) { let message = message.into_inner(); @@ -353,7 +342,7 @@ impl Handler> for CountQueryOrchestrat .expect("Expect segment") .clone(), self.blockfile_provider.clone(), - logs.logs(), + logs, ); let msg = wrap(operator, input, ctx.receiver()); match self.dispatcher.send(msg, None).await {