From 4a510032f3dee9e0f37f7eb0afcec40e5d47b1dc Mon Sep 17 00:00:00 2001 From: hammadb Date: Thu, 14 Dec 2023 16:10:05 -0800 Subject: [PATCH] Ingest TODOs --- rust/worker/src/ingest/ingest.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/rust/worker/src/ingest/ingest.rs b/rust/worker/src/ingest/ingest.rs index cd013f11bb5..c22e6cc27fe 100644 --- a/rust/worker/src/ingest/ingest.rs +++ b/rust/worker/src/ingest/ingest.rs @@ -47,14 +47,14 @@ impl Component for Ingest { #[derive(Error, Debug)] pub(crate) enum IngestConfigurationError { - #[error("Cannot assign empty key")] + #[error(transparent)] PulsarError(#[from] pulsar::Error), } impl ChromaError for IngestConfigurationError { fn code(&self) -> ErrorCodes { match self { - IngestConfigurationError::PulsarError(e) => ErrorCodes::Internal, + IngestConfigurationError::PulsarError(_e) => ErrorCodes::Internal, } } } @@ -109,7 +109,6 @@ impl Ingest { #[async_trait] impl Handler for Ingest { async fn handle(&self, msg: Memberlist, ctx: &ComponentContext) { - println!("Memberlist message: {:?}", msg); let mut new_assignments = HashSet::new(); let candidate_topics: Vec = self.get_topics(); @@ -131,12 +130,10 @@ impl Handler for Ingest { Ok(assignment) => assignment, Err(err) => { // TODO: Log error - println!("Failed to assign topic: {:?}", err); continue; } }; if assignment == self.my_ip { - println!("I am assigned to topic: {}", topic); new_assignments.insert(topic); } } @@ -164,14 +161,13 @@ impl Handler for Ingest { } } Err(err) => { - println!("Failed to write assigned topics: {:?}", err); + // TODO: Log error and handle lock poisoning } } } // Unsubscribe from topics we no longer need to listen to for topic in to_remove.iter() { - println!("Removing topic: {}", topic); match self.topic_to_handle.write() { Ok(mut topic_to_handle) => { let handle = topic_to_handle.remove(topic); @@ -186,7 +182,7 @@ impl Handler for Ingest { } } Err(err) => { - println!("Failed to write topic to handle: {:?}", err); + // TODO: Log an error and handle lock poisoning } } } @@ -214,6 +210,7 @@ impl Handler for Ingest { topic_to_handle.insert("test".to_string(), handle); } Err(err) => { + // TODO: log error and handle lock poisoning println!("Failed to write topic to handle: {:?}", err); } } @@ -286,7 +283,8 @@ impl Handler> for PulsarIngestTopic } Err(err) => { // TODO: Log an error - // Put this on a dead letter queue + // Put this on a dead letter queue, this concept does not exist in our + // system yet None } } @@ -303,5 +301,6 @@ impl StreamHandler> for PulsarIngest _ctx: &ComponentContext, PulsarIngestTopic>, ) -> () { println!("Received stream message: {:?}", message); + // This will be where we filter the message and add it to the corresponding tenant queue } }