From 453eb2b53f91308629fb94e32a5ad0fb2bf5d3a7 Mon Sep 17 00:00:00 2001 From: Gethyl Kurian Date: Mon, 16 Oct 2023 22:54:57 +1100 Subject: [PATCH] feat: pass headers from decision message to the messenger publish actions --- .../src/kafka_producer.rs | 10 +++----- .../src/kafka/producer.rs | 7 ++---- .../src/kafka/service.rs | 6 +++-- packages/talos_messenger_core/src/core.rs | 3 ++- .../src/services/inbound_service.rs | 5 +++- packages/talos_messenger_core/src/suffix.rs | 25 +++++++++++++++++-- 6 files changed, 38 insertions(+), 18 deletions(-) diff --git a/examples/messenger_using_kafka/src/kafka_producer.rs b/examples/messenger_using_kafka/src/kafka_producer.rs index f7d902ee..e7f83202 100644 --- a/examples/messenger_using_kafka/src/kafka_producer.rs +++ b/examples/messenger_using_kafka/src/kafka_producer.rs @@ -1,13 +1,9 @@ +use ahash::HashMap; use async_trait::async_trait; use log::info; use rdkafka::producer::ProducerContext; use talos_messenger_actions::kafka::{context::MessengerProducerDeliveryOpaque, models::KafkaAction, producer::KafkaProducer}; use talos_messenger_core::core::{MessengerPublisher, PublishActionType}; -// use talos_messenger::{ -// core::{MessengerPublisher, PublishActionType}, -// kafka::producer::{KafkaProducer, MessengerProducerDeliveryOpaque}, -// models::commit_actions::publish::KafkaAction, -// }; pub struct MessengerKafkaPublisher { pub publisher: KafkaProducer, @@ -24,7 +20,7 @@ where PublishActionType::Kafka } - async fn send(&self, version: u64, payload: Self::Payload, additional_data: Self::AdditionalData) -> () { + async fn send(&self, version: u64, payload: Self::Payload, headers: HashMap, additional_data: Self::AdditionalData) -> () { info!("[MessengerKafkaPublisher] Publishing message with payload=\n{payload:#?}"); let mut bytes: Vec = Vec::new(); @@ -44,7 +40,7 @@ where payload.partition, payload.key.as_deref(), payload_str, - None, + headers, Box::new(delivery_opaque), ) .unwrap(); diff --git a/packages/talos_messenger_actions/src/kafka/producer.rs b/packages/talos_messenger_actions/src/kafka/producer.rs index 47dc1a81..19d7ccaf 100644 --- a/packages/talos_messenger_actions/src/kafka/producer.rs +++ b/packages/talos_messenger_actions/src/kafka/producer.rs @@ -42,7 +42,7 @@ impl KafkaProducer { partition: Option, key: Option<&str>, value: &str, - headers: Option>, + headers: HashMap, delivery_opaque: C::DeliveryOpaque, ) -> Result<(), MessagePublishError> { let record = BaseRecord::with_opaque_to(topic, delivery_opaque).payload(value); @@ -54,10 +54,7 @@ impl KafkaProducer { let record = if let Some(key_str) = key { record.key(key_str) } else { record }; // Add headers if applicable - let record = match headers { - Some(x) => record.headers(build_kafka_headers(x)), - None => record, - }; + let record = record.headers(build_kafka_headers(headers)); self.producer.send(record).map_err(|(kafka_error, record)| MessagePublishError { reason: kafka_error.to_string(), diff --git a/packages/talos_messenger_actions/src/kafka/service.rs b/packages/talos_messenger_actions/src/kafka/service.rs index 5ec0c328..9f5eea16 100644 --- a/packages/talos_messenger_actions/src/kafka/service.rs +++ b/packages/talos_messenger_actions/src/kafka/service.rs @@ -32,7 +32,7 @@ where loop { tokio::select! { Some(actions) = self.rx_actions_channel.recv() => { - let MessengerCommitActions {version, commit_actions } = actions; + let MessengerCommitActions {version, commit_actions, headers } = actions; if let Some(publish_actions_for_type) = commit_actions.get(&self.publisher.get_publish_type().to_string()){ match get_actions_deserialised::>(publish_actions_for_type) { @@ -40,11 +40,13 @@ where let total_len = actions.len() as u32; + let headers_cloned = headers.clone(); for action in actions { let publisher = self.publisher.clone(); + let headers = headers_cloned.clone(); // Publish the message tokio::spawn(async move { - publisher.send(version, action, total_len ).await; + publisher.send(version, action, headers, total_len ).await; }); } diff --git a/packages/talos_messenger_core/src/core.rs b/packages/talos_messenger_core/src/core.rs index 024fcb52..4623d2d8 100644 --- a/packages/talos_messenger_core/src/core.rs +++ b/packages/talos_messenger_core/src/core.rs @@ -23,7 +23,7 @@ pub trait MessengerPublisher { type Payload; type AdditionalData; fn get_publish_type(&self) -> PublishActionType; - async fn send(&self, version: u64, payload: Self::Payload, additional_data: Self::AdditionalData) -> (); + async fn send(&self, version: u64, payload: Self::Payload, headers: HashMap, additional_data: Self::AdditionalData) -> (); } /// Trait to be implemented by all services. @@ -38,6 +38,7 @@ pub trait MessengerSystemService { pub struct MessengerCommitActions { pub version: u64, pub commit_actions: HashMap, + pub headers: HashMap, } pub enum MessengerChannelFeedback { diff --git a/packages/talos_messenger_core/src/services/inbound_service.rs b/packages/talos_messenger_core/src/services/inbound_service.rs index 9bb7e865..fb4b20c2 100644 --- a/packages/talos_messenger_core/src/services/inbound_service.rs +++ b/packages/talos_messenger_core/src/services/inbound_service.rs @@ -42,6 +42,7 @@ where acc.insert(key.to_string(), value.get_payload().clone()); acc }), + headers: item.headers, }; // send for publishing self.tx_actions_channel.send(payload_to_send).await.map_err(|e| MessengerServiceError { @@ -182,7 +183,9 @@ where let version = decision.message.get_candidate_version(); info!("[Decision Message] Version received = {} and {}", decision.decision_version, version); - self.suffix.update_item_decision(version, decision.decision_version, &decision.message); + // TODO: GK - no hardcoded filters on headers + let headers: HashMap = decision.headers.into_iter().filter(|(key, _)| key.as_str() != "messageType").collect(); + self.suffix.update_item_decision(version, decision.decision_version, &decision.message, headers); self.process_next_actions().await?; diff --git a/packages/talos_messenger_core/src/suffix.rs b/packages/talos_messenger_core/src/suffix.rs index a5a232ea..15efcf63 100644 --- a/packages/talos_messenger_core/src/suffix.rs +++ b/packages/talos_messenger_core/src/suffix.rs @@ -11,11 +11,14 @@ pub trait MessengerSuffixItemTrait: Debug + Clone { fn set_safepoint(&mut self, safepoint: Option); fn set_commit_action(&mut self, commit_actions: HashMap); fn set_decision(&mut self, decision: Decision); + fn set_headers(&mut self, headers: HashMap); fn get_state(&self) -> &SuffixItemState; fn get_commit_actions(&self) -> &HashMap; fn get_action_by_key_mut(&mut self, action_key: &str) -> Option<&mut AllowedActionsMapItem>; fn get_safepoint(&self) -> &Option; + fn get_headers(&self) -> &HashMap; + fn get_headers_mut(&mut self) -> &mut HashMap; fn is_abort(&self) -> Option; } @@ -35,7 +38,7 @@ pub trait MessengerSuffixTrait: SuffixTrait { /// Gets the suffix items eligible to process. fn get_suffix_items_to_process(&self) -> Vec; /// Updates the decision for a version. - fn update_item_decision(&mut self, version: u64, decision_version: u64, decision_message: &D); + fn update_item_decision(&mut self, version: u64, decision_version: u64, decision_message: &D, headers: HashMap); /// Updates the action for a version using the action_key for lookup. fn increment_item_action_count(&mut self, version: u64, action_key: &str); @@ -107,6 +110,7 @@ impl AllowedActionsMapItem { pub struct ActionsMapWithVersion { pub actions: HashMap, pub version: u64, + pub headers: HashMap, } #[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] @@ -120,6 +124,8 @@ pub struct MessengerCandidate { state: SuffixItemState, /// Filtered actions that need to be processed by the messenger allowed_actions_map: HashMap, + /// Any headers from decision to be used in on-commit actions + headers: HashMap, } impl From for MessengerCandidate { @@ -131,6 +137,7 @@ impl From for MessengerCandidate { state: SuffixItemState::AwaitingDecision, allowed_actions_map: HashMap::new(), + headers: HashMap::new(), } } } @@ -171,6 +178,18 @@ impl MessengerSuffixItemTrait for MessengerCandidate { fn get_action_by_key_mut(&mut self, action_key: &str) -> Option<&mut AllowedActionsMapItem> { self.allowed_actions_map.get_mut(action_key) } + + fn set_headers(&mut self, headers: HashMap) { + self.headers.extend(headers); + } + + fn get_headers(&self) -> &HashMap { + &self.headers + } + + fn get_headers_mut(&mut self) -> &mut HashMap { + &mut self.headers + } } impl MessengerSuffixTrait for Suffix @@ -217,13 +236,14 @@ where .map(|x| ActionsMapWithVersion { version: x.item_ver, actions: x.item.get_commit_actions().clone(), + headers: x.item.get_headers().clone(), }) .collect(); items } - fn update_item_decision(&mut self, version: u64, decision_version: u64, decision_message: &D) { + fn update_item_decision(&mut self, version: u64, decision_version: u64, decision_message: &D, headers: HashMap) { let _ = self.update_decision_suffix_item(version, decision_version); if let Some(item_to_update) = self.get_mut(version) { @@ -238,6 +258,7 @@ where item_to_update.item.set_decision(decision_message.get_decision().clone()); item_to_update.item.set_safepoint(decision_message.get_safepoint()); + item_to_update.item.set_headers(headers); } }