Skip to content

Commit

Permalink
Ingest TODOs
Browse files Browse the repository at this point in the history
  • Loading branch information
HammadB committed Dec 16, 2023
1 parent 5cdba43 commit 473686b
Showing 1 changed file with 8 additions and 9 deletions.
17 changes: 8 additions & 9 deletions rust/worker/src/ingest/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ impl Component for Ingest {

#[derive(Error, Debug)]
pub(crate) enum IngestConfigurationError {
#[error("Cannot assign empty key")]
#[error(transparent)]
PulsarError(#[from] pulsar::Error),
}

impl ChromaError for IngestConfigurationError {
fn code(&self) -> ErrorCodes {
match self {
IngestConfigurationError::PulsarError(e) => ErrorCodes::Internal,
IngestConfigurationError::PulsarError(_e) => ErrorCodes::Internal,
}
}
}
Expand Down Expand Up @@ -109,7 +109,6 @@ impl Ingest {
#[async_trait]
impl Handler<Memberlist> for Ingest {
async fn handle(&self, msg: Memberlist, ctx: &ComponentContext<Memberlist, Self>) {
println!("Memberlist message: {:?}", msg);
let mut new_assignments = HashSet::new();
let candidate_topics: Vec<String> = self.get_topics();

Expand All @@ -131,12 +130,10 @@ impl Handler<Memberlist> for Ingest {
Ok(assignment) => assignment,
Err(err) => {
// TODO: Log error
println!("Failed to assign topic: {:?}", err);
continue;
}
};
if assignment == self.my_ip {
println!("I am assigned to topic: {}", topic);
new_assignments.insert(topic);
}
}
Expand Down Expand Up @@ -164,14 +161,13 @@ impl Handler<Memberlist> for Ingest {
}
}
Err(err) => {
println!("Failed to write assigned topics: {:?}", err);
// TODO: Log error and handle lock poisoning
}
}
}

// Unsubscribe from topics we no longer need to listen to
for topic in to_remove.iter() {
println!("Removing topic: {}", topic);
match self.topic_to_handle.write() {
Ok(mut topic_to_handle) => {
let handle = topic_to_handle.remove(topic);
Expand All @@ -186,7 +182,7 @@ impl Handler<Memberlist> for Ingest {
}
}
Err(err) => {
println!("Failed to write topic to handle: {:?}", err);
// TODO: Log an error and handle lock poisoning
}
}
}
Expand Down Expand Up @@ -214,6 +210,7 @@ impl Handler<Memberlist> for Ingest {
topic_to_handle.insert("test".to_string(), handle);
}
Err(err) => {
// TODO: log error and handle lock poisoning
println!("Failed to write topic to handle: {:?}", err);
}
}
Expand Down Expand Up @@ -286,7 +283,8 @@ impl Handler<Option<chroma_proto::SubmitEmbeddingRecord>> for PulsarIngestTopic
}
Err(err) => {
// TODO: Log an error
// Put this on a dead letter queue
// Put this on a dead letter queue, this concept does not exist in our
// system yet
None
}
}
Expand All @@ -303,5 +301,6 @@ impl StreamHandler<Option<chroma_proto::SubmitEmbeddingRecord>> for PulsarIngest
_ctx: &ComponentContext<Option<chroma_proto::SubmitEmbeddingRecord>, PulsarIngestTopic>,
) -> () {
println!("Received stream message: {:?}", message);
// This will be where we filter the message and add it to the corresponding tenant queue
}
}

0 comments on commit 473686b

Please sign in to comment.