From 764a1953460c97e2501e09fb2ad9e71671d597ae Mon Sep 17 00:00:00 2001 From: Gethyl Kurian Date: Tue, 26 Sep 2023 10:33:24 +1000 Subject: [PATCH] chore: updates from review comments --- .../examples/messenger_using_kafka.rs | 2 +- .../src/kafka_producer.rs | 13 +++++++++--- .../src/kafka/producer.rs | 16 ++++++++++---- .../src/talos_messenger_service.rs | 21 ++----------------- 4 files changed, 25 insertions(+), 27 deletions(-) diff --git a/examples/messenger_using_kafka/examples/messenger_using_kafka.rs b/examples/messenger_using_kafka/examples/messenger_using_kafka.rs index 26ecb817..d6e1ccff 100644 --- a/examples/messenger_using_kafka/examples/messenger_using_kafka.rs +++ b/examples/messenger_using_kafka/examples/messenger_using_kafka.rs @@ -50,7 +50,7 @@ async fn main() { let suffix: Suffix = Suffix::with_config(suffix_config); let mut whitelisted_actions = HashMap::<&'static str, Vec<&'static str>>::new(); - + // TODO: GK - Set through env whitelisted_actions.insert("publish", vec!["kafka"]); let inbound_service = MessengerInboundService { diff --git a/examples/messenger_using_kafka/src/kafka_producer.rs b/examples/messenger_using_kafka/src/kafka_producer.rs index a33e3880..75f53899 100644 --- a/examples/messenger_using_kafka/src/kafka_producer.rs +++ b/examples/messenger_using_kafka/src/kafka_producer.rs @@ -33,8 +33,8 @@ where let mut bytes: Vec = Vec::new(); serde_json::to_writer(&mut bytes, &payload.value).unwrap(); - let payld = std::str::from_utf8(&bytes).unwrap(); - info!("[MessengerKafkaPublisher] base_record=\n{payld:#?}"); + let payload_str = std::str::from_utf8(&bytes).unwrap(); + info!("[MessengerKafkaPublisher] base_record=\n{payload_str:#?}"); let delivery_opaque = MessengerProducerDeliveryOpaque { version, @@ -42,7 +42,14 @@ where }; self.publisher - .publish_to_topic("test.messenger.topic", "test", payld, None, Box::new(delivery_opaque)) + .publish_to_topic( + &payload.topic, + payload.partition, + payload.key.as_deref(), + payload_str, + None, + Box::new(delivery_opaque), + ) .unwrap(); } } diff --git a/packages/talos_messenger_actions/src/kafka/producer.rs b/packages/talos_messenger_actions/src/kafka/producer.rs index c84757dd..7e1b8e83 100644 --- a/packages/talos_messenger_actions/src/kafka/producer.rs +++ b/packages/talos_messenger_actions/src/kafka/producer.rs @@ -83,13 +83,21 @@ impl KafkaProducer { pub fn publish_to_topic( &self, topic: &str, - key: &str, + partition: Option, + key: Option<&str>, value: &str, headers: Option>, delivery_opaque: C::DeliveryOpaque, ) -> Result<(), MessagePublishError> { - let record = BaseRecord::with_opaque_to(topic, delivery_opaque).payload(value).key(key); + let record = BaseRecord::with_opaque_to(topic, delivery_opaque).payload(value); + // Add partition if applicable + let record = if let Some(part) = partition { record.partition(part) } else { record }; + + // Add key if applicable + let record = if let Some(key_str) = key { record.key(key_str) } else { record }; + + // Add headers if applicable let record = match headers { Some(x) => record.headers(build_kafka_headers(x)), None => record, @@ -116,7 +124,7 @@ impl MessagePublisher for KafkaProducer { None => record, }; - debug!("Preparing to send the Decision Message. "); + debug!("Preparing to publish the message. "); let delivery_result = self .producer .send(record) @@ -127,7 +135,7 @@ impl MessagePublisher for KafkaProducer { data: Some(format!("{:?}", record)), })?; - debug!("Sent the Decision Message successfully {:?} ", delivery_result.to_owned()); + debug!("Published the message successfully {:?} ", delivery_result.to_owned()); Ok(()) } } diff --git a/packages/talos_messenger_core/src/talos_messenger_service.rs b/packages/talos_messenger_core/src/talos_messenger_service.rs index aab7e293..5a61a2b4 100644 --- a/packages/talos_messenger_core/src/talos_messenger_service.rs +++ b/packages/talos_messenger_core/src/talos_messenger_service.rs @@ -11,9 +11,9 @@ pub struct TalosMessengerService { impl TalosMessengerService { pub async fn run(self) -> MessengerServiceResult { - let service_handle = self.services.into_iter().map(|mut service| tokio::spawn(async move { service.run().await })); + let service_handles = self.services.into_iter().map(|mut service| tokio::spawn(async move { service.run().await })); - let k = try_join_all(service_handle).await.map_err(|e| MessengerServiceError { + let k = try_join_all(service_handles).await.map_err(|e| MessengerServiceError { kind: MessengerServiceErrorKind::System, reason: e.to_string(), data: None, @@ -26,21 +26,4 @@ impl TalosMessengerService { Ok(()) } - - // pub async fn shutdown(self) -> MessengerServiceResult { - // let service_handle = self.services.into_iter().map(|service| tokio::spawn(async move { service.stop().await })); - - // let k = try_join_all(service_handle).await.map_err(|e| MessengerServiceError { - // kind: MessengerServiceErrorKind::System, - // reason: e.to_string(), - // data: None, - // service: "Main thread".to_string(), - // })?; - - // for res in k { - // res? - // } - - // Ok(()) - // } }