Skip to content

Commit

Permalink
feat: allow headers pass through (#89)
Browse files Browse the repository at this point in the history
* feat: certifier candidate and decision headers

all certifier headers will be passed to decision

decision appends additional headers

* fix: use headers directly without metadata

* feat: pass headers from decision message to the messenger publish actions

* feat: pass certTime header to decision
  • Loading branch information
gk-kindred authored Oct 24, 2023
1 parent ab815a2 commit edda736
Show file tree
Hide file tree
Showing 22 changed files with 383 additions and 224 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
[workspace]
resolver = "2"

members = [
"packages/*",
# Example crates
Expand Down
10 changes: 3 additions & 7 deletions examples/messenger_using_kafka/src/kafka_producer.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
use ahash::HashMap;
use async_trait::async_trait;
use log::info;
use rdkafka::producer::ProducerContext;
use talos_messenger_actions::kafka::{context::MessengerProducerDeliveryOpaque, models::KafkaAction, producer::KafkaProducer};
use talos_messenger_core::core::{MessengerPublisher, PublishActionType};
// use talos_messenger::{
// core::{MessengerPublisher, PublishActionType},
// kafka::producer::{KafkaProducer, MessengerProducerDeliveryOpaque},
// models::commit_actions::publish::KafkaAction,
// };

pub struct MessengerKafkaPublisher<C: ProducerContext + 'static> {
pub publisher: KafkaProducer<C>,
Expand All @@ -24,7 +20,7 @@ where
PublishActionType::Kafka
}

async fn send(&self, version: u64, payload: Self::Payload, additional_data: Self::AdditionalData) -> () {
async fn send(&self, version: u64, payload: Self::Payload, headers: HashMap<String, String>, additional_data: Self::AdditionalData) -> () {
info!("[MessengerKafkaPublisher] Publishing message with payload=\n{payload:#?}");

let mut bytes: Vec<u8> = Vec::new();
Expand All @@ -44,7 +40,7 @@ where
payload.partition,
payload.key.as_deref(),
payload_str,
None,
headers,
Box::new(delivery_opaque),
)
.unwrap();
Expand Down
26 changes: 19 additions & 7 deletions packages/talos_certifier/src/core.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use ahash::HashMap;
use async_trait::async_trait;
use strum::{Display, EnumString};
use tokio::sync::broadcast;
Expand All @@ -7,13 +8,23 @@ use crate::{
model::{CandidateMessage, DecisionMessage},
};

type Version = u64;
#[derive(Debug, Clone)]
// TODO: double check this setting
#[allow(clippy::large_enum_variant)]
pub struct CandidateChannelMessage {
pub message: CandidateMessage,
pub headers: HashMap<String, String>,
}

#[derive(Debug, Clone)]
pub struct DecisionChannelMessage {
pub decision_version: u64,
pub message: DecisionMessage,
pub headers: HashMap<String, String>,
}

#[derive(Debug, Clone)]
pub enum ChannelMessage {
Candidate(CandidateMessage),
Decision(Version, DecisionMessage),
Candidate(Box<CandidateChannelMessage>),
Decision(Box<DecisionChannelMessage>),
}

#[derive(Debug, Display, Eq, PartialEq, EnumString)]
Expand All @@ -34,8 +45,9 @@ pub enum SystemMessage {
pub type ServiceResult<T = ()> = Result<T, Box<SystemServiceError>>;

#[derive(Debug)]
pub enum DecisionOutboxChannelMessage {
Decision(DecisionMessage),
pub struct DecisionOutboxChannelMessage {
pub message: DecisionMessage,
pub headers: HashMap<String, String>,
}

#[derive(Debug, Clone)]
Expand Down
4 changes: 2 additions & 2 deletions packages/talos_certifier/src/ports/message.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use ahash::HashMap;
use async_trait::async_trait;
use std::collections::HashMap;
use tokio::task::JoinHandle;

use crate::errors::SystemServiceError;
Expand All @@ -24,5 +24,5 @@ pub trait MessageReciever: SharedPortTraits {
// The trait that should be implemented by any adapter that will publish the Decision message from Certifier Domain.
#[async_trait]
pub trait MessagePublisher: SharedPortTraits {
async fn publish_message(&self, key: &str, value: &str, headers: Option<HashMap<String, String>>) -> Result<(), SystemServiceError>;
async fn publish_message(&self, key: &str, value: &str, headers: HashMap<String, String>) -> Result<(), SystemServiceError>;
}
23 changes: 17 additions & 6 deletions packages/talos_certifier/src/services/certifier_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use async_trait::async_trait;
use log::{debug, error, warn};
use talos_suffix::core::SuffixConfig;
use talos_suffix::{get_nonempty_suffix_items, Suffix, SuffixTrait};
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
use tokio::sync::mpsc;

Expand Down Expand Up @@ -89,7 +90,7 @@ impl CertifierService {
Ok(dm)
}

pub(crate) fn process_decision(&mut self, decision_version: &u64, decision_message: &DecisionMessage) -> Result<(), CertificationError> {
pub(crate) fn process_decision(&mut self, decision_version: u64, decision_message: &DecisionMessage) -> Result<(), CertificationError> {
// update the decision in suffix
debug!(
"[Process Decision message] Version {} and Decision Message {:?} ",
Expand All @@ -111,7 +112,7 @@ impl CertifierService {

if candidate_version_index.is_some() && candidate_version_index.unwrap().le(&self.suffix.messages.len()) {
self.suffix
.update_decision_suffix_item(candidate_version, *decision_version)
.update_decision_suffix_item(candidate_version, decision_version)
.map_err(CertificationError::SuffixError)?;

// check if all prioir items are decided.
Expand Down Expand Up @@ -147,12 +148,22 @@ impl CertifierService {

pub async fn process_message(&mut self, channel_message: &Option<ChannelMessage>) -> ServiceResult {
if let Err(certification_error) = match channel_message {
Some(ChannelMessage::Candidate(message)) => {
let decision_message = self.process_candidate(message)?;
Some(ChannelMessage::Candidate(candidate)) => {
let decision_message = self.process_candidate(&candidate.message)?;

let mut headers = candidate.headers.clone();
if let Ok(cert_time) = OffsetDateTime::now_utc().format(&Rfc3339) {
headers.insert("certTime".to_owned(), cert_time);
}

let decision_outbox_channel_message = DecisionOutboxChannelMessage {
message: decision_message.clone(),
headers,
};

Ok(self
.decision_outbox_tx
.send(DecisionOutboxChannelMessage::Decision(decision_message.clone()))
.send(decision_outbox_channel_message)
.await
.map_err(|e| SystemServiceError {
kind: SystemServiceErrorKind::SystemError(SystemErrorType::Channel),
Expand All @@ -162,7 +173,7 @@ impl CertifierService {
})?)
}

Some(ChannelMessage::Decision(version, decision_message)) => self.process_decision(version, decision_message),
Some(ChannelMessage::Decision(decision)) => self.process_decision(decision.decision_version, &decision.message),

None => Ok(()),
// _ => (),
Expand Down
28 changes: 21 additions & 7 deletions packages/talos_certifier/src/services/decision_outbox_service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{collections::HashMap, sync::Arc};
use std::sync::Arc;

use ahash::HashMap;
use async_trait::async_trait;
use log::{debug, error};

Expand Down Expand Up @@ -71,7 +72,11 @@ impl DecisionOutboxService {
Ok(decision)
}

pub async fn publish_decision(publisher: &Arc<Box<dyn MessagePublisher + Send + Sync>>, decision_message: &DecisionMessage) -> ServiceResult {
pub async fn publish_decision(
publisher: &Arc<Box<dyn MessagePublisher + Send + Sync>>,
decision_message: &DecisionMessage,
headers: HashMap<String, String>,
) -> ServiceResult {
let xid = decision_message.xid.clone();
let decision_str = serde_json::to_string(&decision_message).map_err(|e| {
Box::new(SystemServiceError {
Expand All @@ -82,13 +87,18 @@ impl DecisionOutboxService {
})
})?;

let mut decision_publish_header = HashMap::new();
let mut decision_publish_header = headers;
decision_publish_header.insert("messageType".to_string(), MessageVariant::Decision.to_string());
decision_publish_header.insert("certAgent".to_string(), decision_message.agent.clone());
decision_publish_header.insert("certXid".to_string(), decision_message.xid.to_owned());

if let Some(safepoint) = decision_message.safepoint {
decision_publish_header.insert("certSafepoint".to_string(), safepoint.to_string());
}
decision_publish_header.insert("certAgent".to_string(), decision_message.agent.to_owned());

debug!("Publishing message {}", decision_message.version);
publisher
.publish_message(xid.as_str(), &decision_str, Some(decision_publish_header.clone()))
.publish_message(xid.as_str(), &decision_str, decision_publish_header.clone())
.await
.map_err(|publish_error| {
Box::new(SystemServiceError {
Expand All @@ -108,11 +118,15 @@ impl SystemService for DecisionOutboxService {
let publisher = Arc::clone(&self.decision_publisher);
let system = self.system.clone();

if let Some(DecisionOutboxChannelMessage::Decision(decision_message)) = self.decision_outbox_channel_rx.recv().await {
if let Some(decision_channel_message) = self.decision_outbox_channel_rx.recv().await {
let DecisionOutboxChannelMessage {
headers,
message: decision_message,
} = decision_channel_message;
tokio::spawn(async move {
match DecisionOutboxService::save_decision_to_xdb(&datastore, &decision_message).await {
Ok(decision) => {
if let Err(publish_error) = DecisionOutboxService::publish_decision(&publisher, &decision).await {
if let Err(publish_error) = DecisionOutboxService::publish_decision(&publisher, &decision, headers).await {
error!(
"Error publishing message for version={} with reason={:?}",
decision.version,
Expand Down
Loading

0 comments on commit edda736

Please sign in to comment.