From edda7361a1ab4d179f2255ec4a189f2c6a9ace89 Mon Sep 17 00:00:00 2001 From: gk-kindred <118979108+gk-kindred@users.noreply.github.com> Date: Tue, 24 Oct 2023 11:13:26 +1100 Subject: [PATCH] feat: allow headers pass through (#89) * feat: certifier candidate and decision headers all certifier headers will be passed to decision decision appends additional headers * fix: use headers directly without metadata * feat: pass headers from decision message to the messenger publish actions * feat: pass certTime header to decision --- Cargo.lock | 1 + Cargo.toml | 1 - .../src/kafka_producer.rs | 10 +- packages/talos_certifier/src/core.rs | 26 ++- packages/talos_certifier/src/ports/message.rs | 4 +- .../src/services/certifier_service.rs | 23 ++- .../src/services/decision_outbox_service.rs | 28 +++- .../src/services/tests/certifier_service.rs | 153 ++++++++++++------ .../services/tests/decision_outbox_service.rs | 116 +++++++------ .../tests/message_receiver_service.rs | 89 +++++----- packages/talos_certifier_adapters/Cargo.toml | 13 +- .../histogram_decision_timeline_from_kafka.rs | 19 +-- .../src/kafka/consumer.rs | 19 ++- .../src/kafka/producer.rs | 10 +- .../src/kafka/utils.rs | 6 +- .../src/mock_certifier_service.rs | 9 +- .../src/services/replicator_service.rs | 10 +- .../src/kafka/producer.rs | 17 +- .../src/kafka/service.rs | 6 +- packages/talos_messenger_core/src/core.rs | 3 +- .../src/services/inbound_service.rs | 19 ++- packages/talos_messenger_core/src/suffix.rs | 25 ++- 22 files changed, 383 insertions(+), 224 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ab3ae552..eaa10469 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2539,6 +2539,7 @@ dependencies = [ name = "talos_certifier_adapters" version = "0.0.1" dependencies = [ + "ahash 0.8.3", "async-trait", "deadpool-postgres", "env_logger", diff --git a/Cargo.toml b/Cargo.toml index ee27e341..aeb92f68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,5 @@ [workspace] resolver = "2" - members = [ "packages/*", # Example crates diff --git a/examples/messenger_using_kafka/src/kafka_producer.rs b/examples/messenger_using_kafka/src/kafka_producer.rs index f7d902ee..e7f83202 100644 --- a/examples/messenger_using_kafka/src/kafka_producer.rs +++ b/examples/messenger_using_kafka/src/kafka_producer.rs @@ -1,13 +1,9 @@ +use ahash::HashMap; use async_trait::async_trait; use log::info; use rdkafka::producer::ProducerContext; use talos_messenger_actions::kafka::{context::MessengerProducerDeliveryOpaque, models::KafkaAction, producer::KafkaProducer}; use talos_messenger_core::core::{MessengerPublisher, PublishActionType}; -// use talos_messenger::{ -// core::{MessengerPublisher, PublishActionType}, -// kafka::producer::{KafkaProducer, MessengerProducerDeliveryOpaque}, -// models::commit_actions::publish::KafkaAction, -// }; pub struct MessengerKafkaPublisher { pub publisher: KafkaProducer, @@ -24,7 +20,7 @@ where PublishActionType::Kafka } - async fn send(&self, version: u64, payload: Self::Payload, additional_data: Self::AdditionalData) -> () { + async fn send(&self, version: u64, payload: Self::Payload, headers: HashMap, additional_data: Self::AdditionalData) -> () { info!("[MessengerKafkaPublisher] Publishing message with payload=\n{payload:#?}"); let mut bytes: Vec = Vec::new(); @@ -44,7 +40,7 @@ where payload.partition, payload.key.as_deref(), payload_str, - None, + headers, Box::new(delivery_opaque), ) .unwrap(); diff --git a/packages/talos_certifier/src/core.rs b/packages/talos_certifier/src/core.rs index 75ba3baa..758a2d25 100644 --- a/packages/talos_certifier/src/core.rs +++ b/packages/talos_certifier/src/core.rs @@ -1,3 +1,4 @@ +use ahash::HashMap; use async_trait::async_trait; use strum::{Display, EnumString}; use tokio::sync::broadcast; @@ -7,13 +8,23 @@ use crate::{ model::{CandidateMessage, DecisionMessage}, }; -type Version = u64; #[derive(Debug, Clone)] -// TODO: double check this setting -#[allow(clippy::large_enum_variant)] +pub struct CandidateChannelMessage { + pub message: CandidateMessage, + pub headers: HashMap, +} + +#[derive(Debug, Clone)] +pub struct DecisionChannelMessage { + pub decision_version: u64, + pub message: DecisionMessage, + pub headers: HashMap, +} + +#[derive(Debug, Clone)] pub enum ChannelMessage { - Candidate(CandidateMessage), - Decision(Version, DecisionMessage), + Candidate(Box), + Decision(Box), } #[derive(Debug, Display, Eq, PartialEq, EnumString)] @@ -34,8 +45,9 @@ pub enum SystemMessage { pub type ServiceResult = Result>; #[derive(Debug)] -pub enum DecisionOutboxChannelMessage { - Decision(DecisionMessage), +pub struct DecisionOutboxChannelMessage { + pub message: DecisionMessage, + pub headers: HashMap, } #[derive(Debug, Clone)] diff --git a/packages/talos_certifier/src/ports/message.rs b/packages/talos_certifier/src/ports/message.rs index 9264d0a4..b5ed5145 100644 --- a/packages/talos_certifier/src/ports/message.rs +++ b/packages/talos_certifier/src/ports/message.rs @@ -1,5 +1,5 @@ +use ahash::HashMap; use async_trait::async_trait; -use std::collections::HashMap; use tokio::task::JoinHandle; use crate::errors::SystemServiceError; @@ -24,5 +24,5 @@ pub trait MessageReciever: SharedPortTraits { // The trait that should be implemented by any adapter that will publish the Decision message from Certifier Domain. #[async_trait] pub trait MessagePublisher: SharedPortTraits { - async fn publish_message(&self, key: &str, value: &str, headers: Option>) -> Result<(), SystemServiceError>; + async fn publish_message(&self, key: &str, value: &str, headers: HashMap) -> Result<(), SystemServiceError>; } diff --git a/packages/talos_certifier/src/services/certifier_service.rs b/packages/talos_certifier/src/services/certifier_service.rs index ac0d9c7f..bc1a041e 100644 --- a/packages/talos_certifier/src/services/certifier_service.rs +++ b/packages/talos_certifier/src/services/certifier_service.rs @@ -5,6 +5,7 @@ use async_trait::async_trait; use log::{debug, error, warn}; use talos_suffix::core::SuffixConfig; use talos_suffix::{get_nonempty_suffix_items, Suffix, SuffixTrait}; +use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; use tokio::sync::mpsc; @@ -89,7 +90,7 @@ impl CertifierService { Ok(dm) } - pub(crate) fn process_decision(&mut self, decision_version: &u64, decision_message: &DecisionMessage) -> Result<(), CertificationError> { + pub(crate) fn process_decision(&mut self, decision_version: u64, decision_message: &DecisionMessage) -> Result<(), CertificationError> { // update the decision in suffix debug!( "[Process Decision message] Version {} and Decision Message {:?} ", @@ -111,7 +112,7 @@ impl CertifierService { if candidate_version_index.is_some() && candidate_version_index.unwrap().le(&self.suffix.messages.len()) { self.suffix - .update_decision_suffix_item(candidate_version, *decision_version) + .update_decision_suffix_item(candidate_version, decision_version) .map_err(CertificationError::SuffixError)?; // check if all prioir items are decided. @@ -147,12 +148,22 @@ impl CertifierService { pub async fn process_message(&mut self, channel_message: &Option) -> ServiceResult { if let Err(certification_error) = match channel_message { - Some(ChannelMessage::Candidate(message)) => { - let decision_message = self.process_candidate(message)?; + Some(ChannelMessage::Candidate(candidate)) => { + let decision_message = self.process_candidate(&candidate.message)?; + + let mut headers = candidate.headers.clone(); + if let Ok(cert_time) = OffsetDateTime::now_utc().format(&Rfc3339) { + headers.insert("certTime".to_owned(), cert_time); + } + + let decision_outbox_channel_message = DecisionOutboxChannelMessage { + message: decision_message.clone(), + headers, + }; Ok(self .decision_outbox_tx - .send(DecisionOutboxChannelMessage::Decision(decision_message.clone())) + .send(decision_outbox_channel_message) .await .map_err(|e| SystemServiceError { kind: SystemServiceErrorKind::SystemError(SystemErrorType::Channel), @@ -162,7 +173,7 @@ impl CertifierService { })?) } - Some(ChannelMessage::Decision(version, decision_message)) => self.process_decision(version, decision_message), + Some(ChannelMessage::Decision(decision)) => self.process_decision(decision.decision_version, &decision.message), None => Ok(()), // _ => (), diff --git a/packages/talos_certifier/src/services/decision_outbox_service.rs b/packages/talos_certifier/src/services/decision_outbox_service.rs index 15f86b02..68c0e055 100644 --- a/packages/talos_certifier/src/services/decision_outbox_service.rs +++ b/packages/talos_certifier/src/services/decision_outbox_service.rs @@ -1,5 +1,6 @@ -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; +use ahash::HashMap; use async_trait::async_trait; use log::{debug, error}; @@ -71,7 +72,11 @@ impl DecisionOutboxService { Ok(decision) } - pub async fn publish_decision(publisher: &Arc>, decision_message: &DecisionMessage) -> ServiceResult { + pub async fn publish_decision( + publisher: &Arc>, + decision_message: &DecisionMessage, + headers: HashMap, + ) -> ServiceResult { let xid = decision_message.xid.clone(); let decision_str = serde_json::to_string(&decision_message).map_err(|e| { Box::new(SystemServiceError { @@ -82,13 +87,18 @@ impl DecisionOutboxService { }) })?; - let mut decision_publish_header = HashMap::new(); + let mut decision_publish_header = headers; decision_publish_header.insert("messageType".to_string(), MessageVariant::Decision.to_string()); - decision_publish_header.insert("certAgent".to_string(), decision_message.agent.clone()); + decision_publish_header.insert("certXid".to_string(), decision_message.xid.to_owned()); + + if let Some(safepoint) = decision_message.safepoint { + decision_publish_header.insert("certSafepoint".to_string(), safepoint.to_string()); + } + decision_publish_header.insert("certAgent".to_string(), decision_message.agent.to_owned()); debug!("Publishing message {}", decision_message.version); publisher - .publish_message(xid.as_str(), &decision_str, Some(decision_publish_header.clone())) + .publish_message(xid.as_str(), &decision_str, decision_publish_header.clone()) .await .map_err(|publish_error| { Box::new(SystemServiceError { @@ -108,11 +118,15 @@ impl SystemService for DecisionOutboxService { let publisher = Arc::clone(&self.decision_publisher); let system = self.system.clone(); - if let Some(DecisionOutboxChannelMessage::Decision(decision_message)) = self.decision_outbox_channel_rx.recv().await { + if let Some(decision_channel_message) = self.decision_outbox_channel_rx.recv().await { + let DecisionOutboxChannelMessage { + headers, + message: decision_message, + } = decision_channel_message; tokio::spawn(async move { match DecisionOutboxService::save_decision_to_xdb(&datastore, &decision_message).await { Ok(decision) => { - if let Err(publish_error) = DecisionOutboxService::publish_decision(&publisher, &decision).await { + if let Err(publish_error) = DecisionOutboxService::publish_decision(&publisher, &decision, headers).await { error!( "Error publishing message for version={} with reason={:?}", decision.version, diff --git a/packages/talos_certifier/src/services/tests/certifier_service.rs b/packages/talos_certifier/src/services/tests/certifier_service.rs index 3975ba5e..38974106 100644 --- a/packages/talos_certifier/src/services/tests/certifier_service.rs +++ b/packages/talos_certifier/src/services/tests/certifier_service.rs @@ -1,6 +1,7 @@ use std::sync::{atomic::AtomicI64, Arc}; use crate::{ + core::{CandidateChannelMessage, DecisionChannelMessage}, errors::{SystemErrorType, SystemServiceErrorKind}, model::{ CandidateMessage, {Decision, DecisionMessage}, @@ -8,6 +9,7 @@ use crate::{ services::CertifierServiceConfig, ChannelMessage, SystemMessage, }; +use ahash::{HashMap, HashMapExt}; use talos_suffix::core::SuffixConfig; use tokio::sync::{broadcast, mpsc}; @@ -18,7 +20,16 @@ use crate::{ async fn send_candidate_message(message_channel_tx: mpsc::Sender, candidate_message: CandidateMessage) { tokio::spawn(async move { - message_channel_tx.send(ChannelMessage::Candidate(candidate_message)).await.unwrap(); + message_channel_tx + .send(ChannelMessage::Candidate( + CandidateChannelMessage { + message: candidate_message, + headers: HashMap::new(), + } + .into(), + )) + .await + .unwrap(); }); } @@ -99,14 +110,14 @@ async fn test_certification_rule_2() { assert!(result.is_ok()); // Rule 1 - if let Some(DecisionOutboxChannelMessage::Decision(decision)) = do_channel_rx.recv().await { - assert!(decision.conflict_version.is_none()); - assert_eq!(decision.decision, Decision::Committed); + if let Some(decision) = do_channel_rx.recv().await { + assert!(decision.message.conflict_version.is_none()); + assert_eq!(decision.message.decision, Decision::Committed); }; // Rule 2 - if let Some(DecisionOutboxChannelMessage::Decision(decision)) = do_channel_rx.recv().await { - assert!(decision.conflict_version.is_none()); - assert_eq!(decision.decision, Decision::Aborted); + if let Some(decision) = do_channel_rx.recv().await { + assert!(decision.message.conflict_version.is_none()); + assert_eq!(decision.message.decision, Decision::Aborted); }; } @@ -204,9 +215,19 @@ async fn test_certification_process_decision() { assert!(result.is_ok()); - if let Some(DecisionOutboxChannelMessage::Decision(decision)) = do_channel_rx.recv().await { + if let Some(decision) = do_channel_rx.recv().await { tokio::spawn(async move { - message_channel_tx.send(ChannelMessage::Decision(decision.version, decision)).await.unwrap(); + message_channel_tx + .send(ChannelMessage::Decision( + DecisionChannelMessage { + decision_version: decision.message.version, + message: decision.message, + headers: HashMap::new(), + } + .into(), + )) + .await + .unwrap(); }); }; @@ -262,10 +283,20 @@ async fn test_certification_process_decision_incorrect_version() { assert!(result.is_ok()); // pass incorrect decision (version incorrect), will not do anything as item is not found on suffix. - if let Some(DecisionOutboxChannelMessage::Decision(decision)) = do_channel_rx.recv().await { + if let Some(decision) = do_channel_rx.recv().await { tokio::spawn(async move { message_channel_tx - .send(ChannelMessage::Decision(12, DecisionMessage { version: 10, ..decision })) + .send(ChannelMessage::Decision( + DecisionChannelMessage { + decision_version: 12, + message: DecisionMessage { + version: 10, + ..decision.message + }, + headers: HashMap::new(), + } + .into(), + )) .await .unwrap(); }); @@ -423,50 +454,71 @@ async fn test_certification_check_suffix_prune_is_ready_threshold_30pc() { let result = certifier_svc.run().await; assert!(result.is_ok()); - if let Some(DecisionOutboxChannelMessage::Decision(decision)) = do_channel_rx.recv().await { + if let Some(decision) = do_channel_rx.recv().await { tokio::spawn({ let message_channel_tx_clone = message_channel_tx.clone(); async move { - message_channel_tx_clone - .send(ChannelMessage::Decision(6, DecisionMessage { ..decision })) - .await - .unwrap(); + let _ = message_channel_tx_clone + .send(ChannelMessage::Decision( + DecisionChannelMessage { + decision_version: 6, + message: decision.message, + headers: HashMap::new(), + } + .into(), + )) + .await; } }); }; - if let Some(DecisionOutboxChannelMessage::Decision(decision)) = do_channel_rx.recv().await { + if let Some(decision) = do_channel_rx.recv().await { tokio::spawn({ let message_channel_tx_clone = message_channel_tx.clone(); - async move { - message_channel_tx_clone - .send(ChannelMessage::Decision(7, DecisionMessage { ..decision })) - .await - .unwrap(); + let _ = message_channel_tx_clone + .send(ChannelMessage::Decision( + DecisionChannelMessage { + decision_version: 7, + message: decision.message, + headers: HashMap::new(), + } + .into(), + )) + .await; } }); }; - if let Some(DecisionOutboxChannelMessage::Decision(decision)) = do_channel_rx.recv().await { + if let Some(decision) = do_channel_rx.recv().await { tokio::spawn({ let message_channel_tx_clone = message_channel_tx.clone(); - async move { - message_channel_tx_clone - .send(ChannelMessage::Decision(8, DecisionMessage { ..decision })) - .await - .unwrap(); + let _ = message_channel_tx_clone + .send(ChannelMessage::Decision( + DecisionChannelMessage { + decision_version: 8, + message: decision.message, + headers: HashMap::new(), + } + .into(), + )) + .await; } }); }; - if let Some(DecisionOutboxChannelMessage::Decision(decision)) = do_channel_rx.recv().await { + if let Some(decision) = do_channel_rx.recv().await { tokio::spawn({ let message_channel_tx_clone = message_channel_tx.clone(); - async move { - message_channel_tx_clone - .send(ChannelMessage::Decision(10, DecisionMessage { ..decision })) - .await - .unwrap(); + let _ = message_channel_tx_clone + .send(ChannelMessage::Decision( + DecisionChannelMessage { + decision_version: 10, + message: decision.message, + headers: HashMap::new(), + } + .into(), + )) + .await; } }); }; @@ -563,26 +615,37 @@ async fn test_certification_check_suffix_prune_is_not_at_threshold() { let result = certifier_svc.run().await; assert!(result.is_ok()); - if let Some(DecisionOutboxChannelMessage::Decision(decision)) = do_channel_rx.recv().await { + if let Some(decision) = do_channel_rx.recv().await { tokio::spawn({ let message_channel_tx_clone = message_channel_tx.clone(); async move { - message_channel_tx_clone - .send(ChannelMessage::Decision(6, DecisionMessage { ..decision })) - .await - .unwrap(); + let _ = message_channel_tx_clone + .send(ChannelMessage::Decision( + DecisionChannelMessage { + decision_version: 6, + message: decision.message, + headers: HashMap::new(), + } + .into(), + )) + .await; } }); }; - if let Some(DecisionOutboxChannelMessage::Decision(decision)) = do_channel_rx.recv().await { + if let Some(decision) = do_channel_rx.recv().await { tokio::spawn({ let message_channel_tx_clone = message_channel_tx.clone(); - async move { - message_channel_tx_clone - .send(ChannelMessage::Decision(7, DecisionMessage { ..decision })) - .await - .unwrap(); + let _ = message_channel_tx_clone + .send(ChannelMessage::Decision( + DecisionChannelMessage { + decision_version: 7, + message: decision.message, + headers: HashMap::new(), + } + .into(), + )) + .await; } }); }; diff --git a/packages/talos_certifier/src/services/tests/decision_outbox_service.rs b/packages/talos_certifier/src/services/tests/decision_outbox_service.rs index 167dced8..11f97068 100644 --- a/packages/talos_certifier/src/services/tests/decision_outbox_service.rs +++ b/packages/talos_certifier/src/services/tests/decision_outbox_service.rs @@ -1,5 +1,4 @@ use std::{ - collections::HashMap, sync::{Arc, Mutex}, time::Duration, }; @@ -14,6 +13,7 @@ use crate::{ }, SystemMessage, }; +use ahash::{HashMap, HashMapExt}; use async_trait::async_trait; use tokio::{ sync::{broadcast, mpsc}, @@ -62,7 +62,7 @@ struct MockDecisionPublisher; #[async_trait] impl MessagePublisher for MockDecisionPublisher { - async fn publish_message(&self, _key: &str, _value: &str, _headers: Option>) -> Result<(), SystemServiceError> { + async fn publish_message(&self, _key: &str, _value: &str, _headers: HashMap) -> Result<(), SystemServiceError> { Ok(()) } } @@ -111,18 +111,21 @@ async fn test_candidate_message_create_decision_message() { // sending a decision into decision outbox service tokio::spawn(async move { do_channel_tx_clone - .send(crate::core::DecisionOutboxChannelMessage::Decision(DecisionMessage { - xid: "test-xid-1".to_owned(), - agent: "test-agent-1".to_owned(), - cohort: "test-cohort-1".to_owned(), - decision: Decision::Committed, - suffix_start: 2, - version: 4, - duplicate_version: None, - safepoint: Some(3), - conflict_version: None, - metrics: TxProcessingTimeline::default(), - })) + .send(crate::core::DecisionOutboxChannelMessage { + message: DecisionMessage { + xid: "test-xid-1".to_owned(), + agent: "test-agent-1".to_owned(), + cohort: "test-cohort-1".to_owned(), + decision: Decision::Committed, + suffix_start: 2, + version: 4, + duplicate_version: None, + safepoint: Some(3), + conflict_version: None, + metrics: TxProcessingTimeline::default(), + }, + headers: HashMap::new(), + }) .await .unwrap(); }); @@ -170,36 +173,42 @@ async fn test_save_and_publish_multiple_decisions() { // sending a decision into decision outbox service tokio::spawn(async move { do_channel_tx_clone - .send(crate::core::DecisionOutboxChannelMessage::Decision(DecisionMessage { - xid: "test-xid-1".to_owned(), - agent: "test-agent-1".to_owned(), - cohort: "test-cohort-1".to_owned(), - decision: Decision::Committed, - suffix_start: 2, - version: 4, - duplicate_version: None, - safepoint: Some(3), - conflict_version: None, - metrics: TxProcessingTimeline::default(), - })) + .send(crate::core::DecisionOutboxChannelMessage { + message: DecisionMessage { + xid: "test-xid-1".to_owned(), + agent: "test-agent-1".to_owned(), + cohort: "test-cohort-1".to_owned(), + decision: Decision::Committed, + suffix_start: 2, + version: 4, + duplicate_version: None, + safepoint: Some(3), + conflict_version: None, + metrics: TxProcessingTimeline::default(), + }, + headers: HashMap::new(), + }) .await .unwrap(); }); tokio::spawn(async move { do_channel_tx_clone_2 - .send(crate::core::DecisionOutboxChannelMessage::Decision(DecisionMessage { - xid: "test-xid-2".to_owned(), - agent: "test-agent-1".to_owned(), - cohort: "test-cohort-1".to_owned(), - decision: Decision::Committed, - suffix_start: 2, - version: 4, - duplicate_version: None, - safepoint: Some(3), - conflict_version: None, - metrics: TxProcessingTimeline::default(), - })) + .send(crate::core::DecisionOutboxChannelMessage { + message: DecisionMessage { + xid: "test-xid-2".to_owned(), + agent: "test-agent-1".to_owned(), + cohort: "test-cohort-1".to_owned(), + decision: Decision::Committed, + suffix_start: 2, + version: 4, + duplicate_version: None, + safepoint: Some(3), + conflict_version: None, + metrics: TxProcessingTimeline::default(), + }, + headers: HashMap::new(), + }) .await .unwrap(); }); @@ -284,18 +293,21 @@ async fn test_capture_child_thread_dberror() { // sending a decision into decision outbox service tokio::spawn(async move { do_channel_tx_clone - .send(crate::core::DecisionOutboxChannelMessage::Decision(DecisionMessage { - xid: "test-xid-1".to_owned(), - agent: "test-agent-1".to_owned(), - cohort: "test-cohort-1".to_owned(), - decision: Decision::Committed, - suffix_start: 2, - version: 4, - duplicate_version: None, - safepoint: Some(3), - conflict_version: None, - metrics: TxProcessingTimeline::default(), - })) + .send(crate::core::DecisionOutboxChannelMessage { + message: DecisionMessage { + xid: "test-xid-1".to_owned(), + agent: "test-agent-1".to_owned(), + cohort: "test-cohort-1".to_owned(), + decision: Decision::Committed, + suffix_start: 2, + version: 4, + duplicate_version: None, + safepoint: Some(3), + conflict_version: None, + metrics: TxProcessingTimeline::default(), + }, + headers: HashMap::new(), + }) .await .unwrap(); }); @@ -313,7 +325,7 @@ struct MockDecisionPublisherWithError; #[async_trait] impl MessagePublisher for MockDecisionPublisherWithError { - async fn publish_message(&self, _key: &str, _value: &str, _headers: Option>) -> Result<(), SystemServiceError> { + async fn publish_message(&self, _key: &str, _value: &str, _headers: HashMap) -> Result<(), SystemServiceError> { Err(SystemServiceError { kind: SystemServiceErrorKind::MessagePublishError, reason: "Failed to Publish".to_string(), @@ -349,7 +361,7 @@ async fn test_capture_publish_error() { metrics: TxProcessingTimeline::default(), }; - if let Err(publish_error) = DecisionOutboxService::publish_decision(&Arc::new(Box::new(mock_decision_publisher)), &decision_message).await { + if let Err(publish_error) = DecisionOutboxService::publish_decision(&Arc::new(Box::new(mock_decision_publisher)), &decision_message, HashMap::new()).await { assert!(publish_error.kind == SystemServiceErrorKind::MessagePublishError); } } diff --git a/packages/talos_certifier/src/services/tests/message_receiver_service.rs b/packages/talos_certifier/src/services/tests/message_receiver_service.rs index 702c6a8f..c8d3a278 100644 --- a/packages/talos_certifier/src/services/tests/message_receiver_service.rs +++ b/packages/talos_certifier/src/services/tests/message_receiver_service.rs @@ -1,5 +1,6 @@ use std::sync::{atomic::AtomicI64, Arc}; +use ahash::{HashMap, HashMapExt}; use async_trait::async_trait; use tokio::{ sync::{broadcast, mpsc}, @@ -7,7 +8,7 @@ use tokio::{ }; use crate::{ - core::{System, SystemService}, + core::{CandidateChannelMessage, System, SystemService}, errors::SystemServiceError, model::CandidateMessage, ports::{common::SharedPortTraits, errors::MessageReceiverError, MessageReciever}, @@ -28,8 +29,8 @@ impl MessageReciever for MockReciever { let msg = self.consumer.recv().await.unwrap(); let vers = match &msg { - ChannelMessage::Candidate(msg) => Some(msg.version), - ChannelMessage::Decision(vers, _) => Some(vers).copied(), + ChannelMessage::Candidate(msg) => Some(msg.message.version), + ChannelMessage::Decision(decision) => Some(decision.decision_version), }; self.offset = vers; @@ -88,22 +89,30 @@ async fn test_consume_message() { let mut msg_receiver = MessageReceiverService::new(Box::new(mock_receiver), msg_channel_tx, commit_offset, system); + let candidate_message = CandidateMessage { + xid: "xid-1".to_string(), + version: 8, + agent: "agent-1".to_string(), + cohort: "cohort-1".to_string(), + snapshot: 5, + readvers: vec![], + readset: vec![], + writeset: vec!["ksp:w1".to_owned()], + metadata: None, + on_commit: None, + statemap: None, + published_at: 0, + received_at: 0, + }; + mock_channel_tx - .send(ChannelMessage::Candidate(CandidateMessage { - xid: "xid-1".to_string(), - version: 8, - agent: "agent-1".to_string(), - cohort: "cohort-1".to_string(), - snapshot: 5, - readvers: vec![], - readset: vec![], - writeset: vec!["ksp:w1".to_owned()], - metadata: None, - on_commit: None, - statemap: None, - published_at: 0, - received_at: 0, - })) + .send(ChannelMessage::Candidate( + CandidateChannelMessage { + message: candidate_message, + headers: HashMap::new(), + } + .into(), + )) .await .unwrap(); @@ -111,9 +120,9 @@ async fn test_consume_message() { assert!(result.is_ok()); - if let Some(ChannelMessage::Candidate(msg)) = msg_channel_rx.recv().await { - assert_eq!(msg.version, 8); - assert_eq!(msg.xid, "xid-1".to_string()); + if let Some(ChannelMessage::Candidate(candidate)) = msg_channel_rx.recv().await { + assert_eq!(candidate.message.version, 8); + assert_eq!(candidate.message.xid, "xid-1".to_string()); } } @@ -136,23 +145,29 @@ async fn test_consume_message_error() { let commit_offset: Arc = Arc::new(0.into()); let mut msg_receiver = MessageReceiverService::new(Box::new(mock_receiver), msg_channel_tx, commit_offset, system); - + let candidate_message = CandidateMessage { + xid: "xid-1".to_string(), + version: 8, + agent: "agent-1".to_string(), + cohort: "cohort-1".to_string(), + snapshot: 5, + readvers: vec![], + readset: vec![], + writeset: vec!["ksp:w1".to_owned()], + metadata: None, + on_commit: None, + statemap: None, + published_at: 0, + received_at: 0, + }; mock_channel_tx - .send(ChannelMessage::Candidate(CandidateMessage { - xid: "xid-1".to_string(), - version: 8, - agent: "agent-1".to_string(), - cohort: "cohort-1".to_string(), - snapshot: 5, - readvers: vec![], - readset: vec![], - writeset: vec!["ksp:w1".to_owned()], - metadata: None, - on_commit: None, - statemap: None, - published_at: 0, - received_at: 0, - })) + .send(ChannelMessage::Candidate( + CandidateChannelMessage { + message: candidate_message, + headers: HashMap::new(), + } + .into(), + )) .await .unwrap(); diff --git a/packages/talos_certifier_adapters/Cargo.toml b/packages/talos_certifier_adapters/Cargo.toml index 269b2a4b..0a6a6723 100644 --- a/packages/talos_certifier_adapters/Cargo.toml +++ b/packages/talos_certifier_adapters/Cargo.toml @@ -26,6 +26,9 @@ rdkafka = { version = "0.34.0", features = ["sasl"] } #openssl-sys = { version = "0.9", features = ["vendored"] } #openssl = { version = "0.10", features = ["vendored", "v111"] } +# Ahash hashmap +ahash = "0.8.3" + # uuid uuid = { version = "1.4.1", features = ["v4"] } # postgres @@ -45,11 +48,11 @@ thiserror = "1.0.31" mockall = "0.11.0" # internal crates -logger = { path = "../logger" } -metrics = { path = "../metrics" } -talos_certifier = { path = "../talos_certifier" } -talos_suffix = { path = "../talos_suffix" } -talos_common_utils = { path = "../talos_common_utils" } +logger = { path = "../logger" } +metrics = { path = "../metrics" } +talos_certifier = { path = "../talos_certifier" } +talos_suffix = { path = "../talos_suffix" } +talos_common_utils = { path = "../talos_common_utils" } talos_rdkafka_utils = { path = "../talos_rdkafka_utils" } diff --git a/packages/talos_certifier_adapters/src/bin/histogram_decision_timeline_from_kafka.rs b/packages/talos_certifier_adapters/src/bin/histogram_decision_timeline_from_kafka.rs index d6ba80ac..dbf56d60 100644 --- a/packages/talos_certifier_adapters/src/bin/histogram_decision_timeline_from_kafka.rs +++ b/packages/talos_certifier_adapters/src/bin/histogram_decision_timeline_from_kafka.rs @@ -74,18 +74,19 @@ async fn main() -> Result<(), String> { _ => break, }; - if let ChannelMessage::Decision(_, msg) = message { + if let ChannelMessage::Decision(decision) = message { + let decision_message = decision.message; // log::warn!("Decision {:?}", msg_decision); item_number += 1; if item_number % progress_frequency == 0 { log::warn!("Progress: {} of {}", item_number, total_decisions); } - hist_candidate_published.include(&msg.metrics, msg.metrics.candidate_published); - hist_candidate_received.include(&msg.metrics, msg.metrics.candidate_received); - hist_candidate_processing_started.include(&msg.metrics, msg.metrics.candidate_processing_started); - hist_decision_created_at.include(&msg.metrics, msg.metrics.decision_created_at); - hist_db_save_started.include(&msg.metrics, msg.metrics.db_save_started); - hist_db_save_ended.include(&msg.metrics, msg.metrics.db_save_ended); + hist_candidate_published.include(&decision_message.metrics, decision_message.metrics.candidate_published); + hist_candidate_received.include(&decision_message.metrics, decision_message.metrics.candidate_received); + hist_candidate_processing_started.include(&decision_message.metrics, decision_message.metrics.candidate_processing_started); + hist_decision_created_at.include(&decision_message.metrics, decision_message.metrics.decision_created_at); + hist_db_save_started.include(&decision_message.metrics, decision_message.metrics.db_save_started); + hist_db_save_ended.include(&decision_message.metrics, decision_message.metrics.db_save_ended); if item_number == total_decisions { break; @@ -132,9 +133,9 @@ async fn aggregate_timelines(consumer: &mut KafkaConsumer) -> Result<(TimelineAg _ => break, }; - if let ChannelMessage::Decision(_, msg_decision) = message { + if let ChannelMessage::Decision(decision) = message { total += 1; - aggregates.merge(msg_decision.metrics); + aggregates.merge(decision.message.metrics); } } diff --git a/packages/talos_certifier_adapters/src/kafka/consumer.rs b/packages/talos_certifier_adapters/src/kafka/consumer.rs index 5a7a7ef1..a51aa1c1 100644 --- a/packages/talos_certifier_adapters/src/kafka/consumer.rs +++ b/packages/talos_certifier_adapters/src/kafka/consumer.rs @@ -7,7 +7,7 @@ use rdkafka::{ Message, TopicPartitionList, }; use talos_certifier::{ - core::MessageVariant, + core::{CandidateChannelMessage, DecisionChannelMessage, MessageVariant}, errors::SystemServiceError, model::{CandidateMessage, DecisionMessage}, ports::{ @@ -126,7 +126,13 @@ impl MessageReciever for KafkaConsumer { })?; msg.version = offset; msg.received_at = OffsetDateTime::now_utc().unix_timestamp_nanos(); - ChannelMessage::Candidate(msg) + ChannelMessage::Candidate( + CandidateChannelMessage { + message: msg, + headers: headers.clone(), + } + .into(), + ) } MessageVariant::Decision => { let mut msg: DecisionMessage = utils::parse_kafka_payload(raw_payload).map_err(|e| MessageReceiverError { @@ -149,7 +155,14 @@ impl MessageReciever for KafkaConsumer { })?; } - ChannelMessage::Decision(offset, msg) + ChannelMessage::Decision( + DecisionChannelMessage { + decision_version: offset, + message: msg, + headers: headers.clone(), + } + .into(), + ) } }; diff --git a/packages/talos_certifier_adapters/src/kafka/producer.rs b/packages/talos_certifier_adapters/src/kafka/producer.rs index 7037105b..dc86d049 100644 --- a/packages/talos_certifier_adapters/src/kafka/producer.rs +++ b/packages/talos_certifier_adapters/src/kafka/producer.rs @@ -1,5 +1,4 @@ -use std::collections::HashMap; - +use ahash::HashMap; use async_trait::async_trait; use log::debug; use rdkafka::producer::{BaseRecord, DefaultProducerContext, ThreadedProducer}; @@ -30,13 +29,10 @@ impl KafkaProducer { // Message publisher traits #[async_trait] impl MessagePublisher for KafkaProducer { - async fn publish_message(&self, key: &str, value: &str, headers: Option>) -> Result<(), SystemServiceError> { + async fn publish_message(&self, key: &str, value: &str, headers: HashMap) -> Result<(), SystemServiceError> { let record = BaseRecord::to(&self.topic).payload(value).key(key); - let record = match headers { - Some(x) => record.headers(build_kafka_headers(x)), - None => record, - }; + let record = record.headers(build_kafka_headers(headers)); debug!("Preparing to send the Decision Message. "); let delivery_result = self diff --git a/packages/talos_certifier_adapters/src/kafka/utils.rs b/packages/talos_certifier_adapters/src/kafka/utils.rs index 9b911b63..849f80ca 100644 --- a/packages/talos_certifier_adapters/src/kafka/utils.rs +++ b/packages/talos_certifier_adapters/src/kafka/utils.rs @@ -1,5 +1,6 @@ -use std::{collections::HashMap, str::FromStr}; +use std::str::FromStr; +use ahash::{HashMap, HashMapExt}; use rdkafka::{ message::{BorrowedMessage, Header, Headers, OwnedHeaders}, Message, @@ -62,8 +63,7 @@ pub fn parse_message_variant(message_type: &String) -> Result { match channel_msg { - Some(ChannelMessage::Candidate( message)) => { + Some(ChannelMessage::Candidate(candidate)) => { + let message = candidate.message; let decision_message = DecisionMessage { version: message.version, decision: Decision::Committed, @@ -43,8 +45,9 @@ impl SystemService for MockCertifierService { duplicate_version: None, metrics: TxProcessingTimeline::default(), }; + let decision_outbox_channel_message = DecisionOutboxChannelMessage{ message: decision_message.clone(), headers:HashMap::new() }; self.decision_outbox_tx - .send(DecisionOutboxChannelMessage::Decision(decision_message.clone())) + .send(decision_outbox_channel_message) .await .map_err(|e| SystemServiceError { kind: SystemServiceErrorKind::CertifierError, @@ -55,7 +58,7 @@ impl SystemService for MockCertifierService { }, - Some(ChannelMessage::Decision(_version, _decision_message)) => { + Some(ChannelMessage::Decision(_)) => { // ignore decision }, diff --git a/packages/talos_cohort_replicator/src/services/replicator_service.rs b/packages/talos_cohort_replicator/src/services/replicator_service.rs index 5ae6a6fd..03fd91e1 100644 --- a/packages/talos_cohort_replicator/src/services/replicator_service.rs +++ b/packages/talos_cohort_replicator/src/services/replicator_service.rs @@ -45,13 +45,13 @@ where // 2. Add/update to suffix. match msg { // 2.1 For CM - Install messages on the version - ChannelMessage::Candidate(message) => { - let version = message.version; - replicator.process_consumer_message(version, message.into()).await; + ChannelMessage::Candidate(candidate) => { + let version = candidate.message.version; + replicator.process_consumer_message(version, candidate.message.into()).await; }, // 2.2 For DM - Update the decision with outcome + safepoint. - ChannelMessage::Decision(decision_version, decision_message) => { - replicator.process_decision_message(decision_version, decision_message).await; + ChannelMessage::Decision(decision) => { + replicator.process_decision_message(decision.decision_version, decision.message).await; if total_items_send == 0 { time_first_item_created_start_ns = OffsetDateTime::now_utc().unix_timestamp_nanos(); diff --git a/packages/talos_messenger_actions/src/kafka/producer.rs b/packages/talos_messenger_actions/src/kafka/producer.rs index 20378460..19d7ccaf 100644 --- a/packages/talos_messenger_actions/src/kafka/producer.rs +++ b/packages/talos_messenger_actions/src/kafka/producer.rs @@ -1,5 +1,4 @@ -use std::collections::HashMap; - +use ahash::HashMap; use async_trait::async_trait; use log::debug; use rdkafka::{ @@ -43,7 +42,7 @@ impl KafkaProducer { partition: Option, key: Option<&str>, value: &str, - headers: Option>, + headers: HashMap, delivery_opaque: C::DeliveryOpaque, ) -> Result<(), MessagePublishError> { let record = BaseRecord::with_opaque_to(topic, delivery_opaque).payload(value); @@ -55,10 +54,7 @@ impl KafkaProducer { let record = if let Some(key_str) = key { record.key(key_str) } else { record }; // Add headers if applicable - let record = match headers { - Some(x) => record.headers(build_kafka_headers(x)), - None => record, - }; + let record = record.headers(build_kafka_headers(headers)); self.producer.send(record).map_err(|(kafka_error, record)| MessagePublishError { reason: kafka_error.to_string(), @@ -73,13 +69,10 @@ impl KafkaProducer { // Message publisher traits #[async_trait] impl MessagePublisher for KafkaProducer { - async fn publish_message(&self, key: &str, value: &str, headers: Option>) -> Result<(), SystemServiceError> { + async fn publish_message(&self, key: &str, value: &str, headers: HashMap) -> Result<(), SystemServiceError> { let record = BaseRecord::to(&self.topic).payload(value).key(key); - let record = match headers { - Some(x) => record.headers(build_kafka_headers(x)), - None => record, - }; + let record = record.headers(build_kafka_headers(headers)); debug!("Preparing to publish the message. "); let delivery_result = self diff --git a/packages/talos_messenger_actions/src/kafka/service.rs b/packages/talos_messenger_actions/src/kafka/service.rs index 5ec0c328..9f5eea16 100644 --- a/packages/talos_messenger_actions/src/kafka/service.rs +++ b/packages/talos_messenger_actions/src/kafka/service.rs @@ -32,7 +32,7 @@ where loop { tokio::select! { Some(actions) = self.rx_actions_channel.recv() => { - let MessengerCommitActions {version, commit_actions } = actions; + let MessengerCommitActions {version, commit_actions, headers } = actions; if let Some(publish_actions_for_type) = commit_actions.get(&self.publisher.get_publish_type().to_string()){ match get_actions_deserialised::>(publish_actions_for_type) { @@ -40,11 +40,13 @@ where let total_len = actions.len() as u32; + let headers_cloned = headers.clone(); for action in actions { let publisher = self.publisher.clone(); + let headers = headers_cloned.clone(); // Publish the message tokio::spawn(async move { - publisher.send(version, action, total_len ).await; + publisher.send(version, action, headers, total_len ).await; }); } diff --git a/packages/talos_messenger_core/src/core.rs b/packages/talos_messenger_core/src/core.rs index 024fcb52..4623d2d8 100644 --- a/packages/talos_messenger_core/src/core.rs +++ b/packages/talos_messenger_core/src/core.rs @@ -23,7 +23,7 @@ pub trait MessengerPublisher { type Payload; type AdditionalData; fn get_publish_type(&self) -> PublishActionType; - async fn send(&self, version: u64, payload: Self::Payload, additional_data: Self::AdditionalData) -> (); + async fn send(&self, version: u64, payload: Self::Payload, headers: HashMap, additional_data: Self::AdditionalData) -> (); } /// Trait to be implemented by all services. @@ -38,6 +38,7 @@ pub trait MessengerSystemService { pub struct MessengerCommitActions { pub version: u64, pub commit_actions: HashMap, + pub headers: HashMap, } pub enum MessengerChannelFeedback { diff --git a/packages/talos_messenger_core/src/services/inbound_service.rs b/packages/talos_messenger_core/src/services/inbound_service.rs index d5f19c09..fb4b20c2 100644 --- a/packages/talos_messenger_core/src/services/inbound_service.rs +++ b/packages/talos_messenger_core/src/services/inbound_service.rs @@ -42,6 +42,7 @@ where acc.insert(key.to_string(), value.get_payload().clone()); acc }), + headers: item.headers, }; // send for publishing self.tx_actions_channel.send(payload_to_send).await.map_err(|e| MessengerServiceError { @@ -149,12 +150,12 @@ where // 2. Add/update to suffix. match msg { // 2.1 For CM - Install messages on the version - ChannelMessage::Candidate(message) => { + ChannelMessage::Candidate(candidate) => { - let version = message.version; - if message.version > 0 { + let version = candidate.message.version; + if version > 0 { // insert item to suffix - let _ = self.suffix.insert(version, message.into()); + let _ = self.suffix.insert(version, candidate.message.into()); if let Some(item_to_update) = self.suffix.get_mut(version){ if let Some(commit_actions) = &item_to_update.item.candidate.on_commit { @@ -178,11 +179,13 @@ where }, // 2.2 For DM - Update the decision with outcome + safepoint. - ChannelMessage::Decision(decision_version, decision_message) => { - let version = decision_message.get_candidate_version(); - info!("[Decision Message] Version received = {} and {}", decision_version, version); + ChannelMessage::Decision(decision) => { + let version = decision.message.get_candidate_version(); + info!("[Decision Message] Version received = {} and {}", decision.decision_version, version); - self.suffix.update_item_decision(version, decision_version, &decision_message); + // TODO: GK - no hardcoded filters on headers + let headers: HashMap = decision.headers.into_iter().filter(|(key, _)| key.as_str() != "messageType").collect(); + self.suffix.update_item_decision(version, decision.decision_version, &decision.message, headers); self.process_next_actions().await?; diff --git a/packages/talos_messenger_core/src/suffix.rs b/packages/talos_messenger_core/src/suffix.rs index a5a232ea..15efcf63 100644 --- a/packages/talos_messenger_core/src/suffix.rs +++ b/packages/talos_messenger_core/src/suffix.rs @@ -11,11 +11,14 @@ pub trait MessengerSuffixItemTrait: Debug + Clone { fn set_safepoint(&mut self, safepoint: Option); fn set_commit_action(&mut self, commit_actions: HashMap); fn set_decision(&mut self, decision: Decision); + fn set_headers(&mut self, headers: HashMap); fn get_state(&self) -> &SuffixItemState; fn get_commit_actions(&self) -> &HashMap; fn get_action_by_key_mut(&mut self, action_key: &str) -> Option<&mut AllowedActionsMapItem>; fn get_safepoint(&self) -> &Option; + fn get_headers(&self) -> &HashMap; + fn get_headers_mut(&mut self) -> &mut HashMap; fn is_abort(&self) -> Option; } @@ -35,7 +38,7 @@ pub trait MessengerSuffixTrait: SuffixTrait { /// Gets the suffix items eligible to process. fn get_suffix_items_to_process(&self) -> Vec; /// Updates the decision for a version. - fn update_item_decision(&mut self, version: u64, decision_version: u64, decision_message: &D); + fn update_item_decision(&mut self, version: u64, decision_version: u64, decision_message: &D, headers: HashMap); /// Updates the action for a version using the action_key for lookup. fn increment_item_action_count(&mut self, version: u64, action_key: &str); @@ -107,6 +110,7 @@ impl AllowedActionsMapItem { pub struct ActionsMapWithVersion { pub actions: HashMap, pub version: u64, + pub headers: HashMap, } #[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] @@ -120,6 +124,8 @@ pub struct MessengerCandidate { state: SuffixItemState, /// Filtered actions that need to be processed by the messenger allowed_actions_map: HashMap, + /// Any headers from decision to be used in on-commit actions + headers: HashMap, } impl From for MessengerCandidate { @@ -131,6 +137,7 @@ impl From for MessengerCandidate { state: SuffixItemState::AwaitingDecision, allowed_actions_map: HashMap::new(), + headers: HashMap::new(), } } } @@ -171,6 +178,18 @@ impl MessengerSuffixItemTrait for MessengerCandidate { fn get_action_by_key_mut(&mut self, action_key: &str) -> Option<&mut AllowedActionsMapItem> { self.allowed_actions_map.get_mut(action_key) } + + fn set_headers(&mut self, headers: HashMap) { + self.headers.extend(headers); + } + + fn get_headers(&self) -> &HashMap { + &self.headers + } + + fn get_headers_mut(&mut self) -> &mut HashMap { + &mut self.headers + } } impl MessengerSuffixTrait for Suffix @@ -217,13 +236,14 @@ where .map(|x| ActionsMapWithVersion { version: x.item_ver, actions: x.item.get_commit_actions().clone(), + headers: x.item.get_headers().clone(), }) .collect(); items } - fn update_item_decision(&mut self, version: u64, decision_version: u64, decision_message: &D) { + fn update_item_decision(&mut self, version: u64, decision_version: u64, decision_message: &D, headers: HashMap) { let _ = self.update_decision_suffix_item(version, decision_version); if let Some(item_to_update) = self.get_mut(version) { @@ -238,6 +258,7 @@ where item_to_update.item.set_decision(decision_message.get_decision().clone()); item_to_update.item.set_safepoint(decision_message.get_safepoint()); + item_to_update.item.set_headers(headers); } }