Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add missing decision headers and carry forward headers to on_commit actions #107

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/certifier_kafka_pg/examples/certifier_kafka_pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ async fn main() -> Result<(), impl std::error::Error> {
pg_config: Some(pg_config),
kafka_config,
db_mock: mock_config.db_mock,
app_name: None,
};

let talos_certifier = certifier_with_kafka_pg(TalosCertifierChannelBuffers::default(), configuration).await?;
Expand Down
2 changes: 2 additions & 0 deletions packages/talos_certifier/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub struct DecisionOutboxChannelMessage {
pub struct System {
pub system_notifier: broadcast::Sender<SystemMessage>,
pub is_shutdown: bool,
/// Unique identifier of the system - container or pod name/id
pub name: String,
}

#[async_trait]
Expand Down
240 changes: 240 additions & 0 deletions packages/talos_certifier/src/model/decision_headers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
use ahash::AHashMap;

use crate::core::MessageVariant;

use super::{DecisionMessage, DecisionMessageTrait};

#[derive(Debug, Default, Clone)]
pub struct DecisionMetaHeaders {
message_type: String,
message_encoding: String,
producer: String,
major_version: u64,
}

impl DecisionMetaHeaders {
pub fn new(major_version: u64, producer: String, message_encoding: Option<String>) -> Self {
Self {
message_type: MessageVariant::Decision.to_string(),
message_encoding: message_encoding.unwrap_or("application/json".to_string()),
major_version,
producer,
}
}
}

#[derive(Debug, Default, Clone)]
pub struct DecisionCertHeaders {
cert_xid: String,
cert_version: u64,
cert_safepoint: Option<u64>,
cert_time: Option<String>,
cert_agent: String,
}

impl DecisionCertHeaders {
pub fn new(decision_message: &DecisionMessage) -> Self {
Self {
cert_xid: decision_message.xid.to_string(),
cert_agent: decision_message.agent.to_string(),
cert_time: decision_message.time.clone(),
cert_safepoint: decision_message.safepoint,
cert_version: decision_message.get_candidate_version(),
}
}
}

// region: states
#[derive(Debug, Default, Clone)]
pub struct NoMetaHeaders;

#[derive(Debug, Default, Clone)]
pub struct MetaHeaders(DecisionMetaHeaders);

#[derive(Debug, Default, Clone)]
pub struct NoCertHeaders;

#[derive(Debug, Default, Clone)]
pub struct CertHeaders(DecisionCertHeaders);

// endregion: states

#[derive(Debug, Default, Clone)]
pub struct DecisionHeaderBuilder<V, C> {
pub meta_headers: V,
pub cert_headers: C,
pub additional_headers: Option<AHashMap<String, String>>,
}

impl DecisionHeaderBuilder<NoMetaHeaders, NoCertHeaders> {
pub fn new() -> Self {
Self {
..DecisionHeaderBuilder::default()
}
}
pub fn with_additional_headers(additional_headers: AHashMap<String, String>) -> Self {
Self {
additional_headers: Some(additional_headers),
..DecisionHeaderBuilder::new()
}
}
}

impl<C> DecisionHeaderBuilder<NoMetaHeaders, C> {
pub fn add_meta_headers(self, meta_headers: DecisionMetaHeaders) -> DecisionHeaderBuilder<MetaHeaders, C> {
DecisionHeaderBuilder {
meta_headers: MetaHeaders(meta_headers),
cert_headers: self.cert_headers,
additional_headers: self.additional_headers,
}
}
}

impl<V> DecisionHeaderBuilder<V, NoCertHeaders> {
pub fn add_cert_headers(self, cert_headers: DecisionCertHeaders) -> DecisionHeaderBuilder<V, CertHeaders> {
DecisionHeaderBuilder {
cert_headers: CertHeaders(cert_headers),
meta_headers: self.meta_headers,
additional_headers: self.additional_headers,
}
}
}

impl DecisionHeaderBuilder<MetaHeaders, CertHeaders> {
pub fn build(self) -> AHashMap<String, String> {
let cert_headers = self.cert_headers.0;
let meta_headers = self.meta_headers.0;

let mut headers = AHashMap::new();

// candidate headers carried over
if let Some(candidate_headers) = self.additional_headers {
headers.extend(candidate_headers);
}

// meta headers
headers.insert("majorVersion".to_owned(), meta_headers.major_version.to_string());
headers.insert("messageType".to_owned(), meta_headers.message_type);
headers.insert("messageEncoding".to_owned(), meta_headers.message_encoding);
headers.insert("producer".to_owned(), meta_headers.producer);

// certifier specific headers
headers.insert("certVersion".to_owned(), cert_headers.cert_version.to_string());
headers.insert("certXid".to_owned(), cert_headers.cert_xid);
headers.insert("certAgent".to_owned(), cert_headers.cert_agent);

if let Some(cert_time) = cert_headers.cert_time {
headers.insert("certTime".to_owned(), cert_time);
}
if let Some(cert_safepoint) = cert_headers.cert_safepoint {
headers.insert("certSafepoint".to_owned(), cert_safepoint.to_string());
}

headers
}
}

#[cfg(test)]
mod tests {

use ahash::AHashMap;

use crate::model::decision_headers::{DecisionCertHeaders, DecisionHeaderBuilder, DecisionMetaHeaders};

#[test]
fn test_decision_header_with_message_encoding_field_default() {
let decision_meta_headers = DecisionMetaHeaders::new(1_u64, "test_producer".to_string(), None);
let decision_cert_headers = DecisionCertHeaders {
cert_xid: "abcd".to_string(),
cert_version: 100,
cert_safepoint: Some(29),
cert_time: Some("2024-10-20.12:32:31.12323Z".to_owned()),
cert_agent: "some-agent".to_owned(),
};

let decision_headers = DecisionHeaderBuilder::new()
.add_meta_headers(decision_meta_headers.clone())
.add_cert_headers(decision_cert_headers.clone())
.build();

assert_eq!(
decision_headers.get("certXid").unwrap().to_owned(),
decision_cert_headers.cert_xid,
"certXid does not match"
);
assert_eq!(
decision_headers.get("majorVersion").unwrap().to_owned(),
decision_meta_headers.major_version.to_string(),
"majorVersion doesn't match"
);

// test encoding is the default value of "application/json"
assert_eq!(
decision_headers.get("messageEncoding").unwrap().to_owned(),
"application/json".to_string(),
"messageEncoding does not match default application/json"
);

assert_eq!(
decision_headers.get("messageEncoding").unwrap().to_owned(),
decision_meta_headers.message_encoding,
"messageEncoding doesn't match"
);
}
#[test]
fn test_decision_header_with_message_encoding_field_custom() {
let decision_meta_headers = DecisionMetaHeaders::new(1_u64, "test_producer".to_string(), Some("another_encoding".to_owned()));
let decision_cert_headers = DecisionCertHeaders {
cert_xid: "abcd".to_string(),
cert_version: 100,
cert_safepoint: Some(29),
cert_time: Some("2024-10-20.12:32:31.12323Z".to_owned()),
cert_agent: "some-agent".to_owned(),
};

let decision_headers = DecisionHeaderBuilder::new()
.add_meta_headers(decision_meta_headers.clone())
.add_cert_headers(decision_cert_headers.clone())
.build();

// test encoding is not the default value of "application/json"
assert_ne!(
decision_headers.get("messageEncoding").unwrap().to_owned(),
"application/json".to_string(),
"messageEncoding must not match default application/json"
);

assert_eq!(
decision_headers.get("messageEncoding").unwrap().to_owned(),
decision_meta_headers.message_encoding,
"messageEncoding doesn't match"
);
}

#[test]
fn test_decision_header_with_additional_headers() {
let decision_meta_headers = DecisionMetaHeaders::new(1_u64, "test_producer".to_string(), None);
let decision_cert_headers = DecisionCertHeaders {
cert_xid: "abcd".to_string(),
cert_version: 100,
cert_safepoint: Some(29),
cert_time: Some("2024-10-20.12:32:31.12323Z".to_owned()),
cert_agent: "some-agent".to_owned(),
};

let mut additiona_headers = AHashMap::new();
additiona_headers.insert("test-header-1".to_owned(), "test-header-1-value".to_owned());
additiona_headers.insert("correlationId".to_owned(), "eb10b6e1-a7cf-4b44-94da-6cd007030d81".to_owned());

let decision_headers = DecisionHeaderBuilder::with_additional_headers(additiona_headers.clone())
.add_meta_headers(decision_meta_headers.clone())
.add_cert_headers(decision_cert_headers.clone())
.build();

assert_eq!(
decision_headers.get("correlationId").unwrap(),
additiona_headers.get("correlationId").unwrap(),
"correlationId must match"
);
}
}
11 changes: 11 additions & 0 deletions packages/talos_certifier/src/model/decision_message.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
// use super::CandidateMessage;
use serde::{Deserialize, Serialize};
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;

use crate::certifier::Outcome;

use super::{candidate_message::CandidateMessage, metrics::TxProcessingTimeline};

pub const DEFAULT_DECISION_MESSAGE_VERSION: u64 = 1_u64;

#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
pub enum Decision {
#[serde(rename = "committed")]
Expand All @@ -24,6 +28,10 @@ pub struct DecisionMessage {
pub decision: Decision,
pub suffix_start: u64,

/// timestamp when certification/decision was made.
#[serde(skip_serializing_if = "Option::is_none")]
pub time: Option<String>,

/// the version for which the decision was made.
pub version: u64,
/// If a duplicate was found on XDB, this field will hold the new duplicate candidate
Expand Down Expand Up @@ -89,10 +97,13 @@ impl DecisionMessage {
Outcome::Aborted { version, discord: _ } => (Decision::Aborted, None, version),
};

let time = OffsetDateTime::now_utc().format(&Rfc3339).ok();

Self {
xid: xid.clone(),
agent: agent.clone(),
cohort: cohort.clone(),
time,
decision,
suffix_start,
version: *version,
Expand Down
3 changes: 2 additions & 1 deletion packages/talos_certifier/src/model/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
mod candidate_message;
pub mod decision_headers;
mod decision_message;
pub mod delivery_order;
pub mod metrics;

pub use candidate_message::{CandidateMessage, CandidateReadWriteSet};
pub use decision_message::{Decision, DecisionMessage, DecisionMessageTrait};
pub use decision_message::{Decision, DecisionMessage, DecisionMessageTrait, DEFAULT_DECISION_MESSAGE_VERSION};
8 changes: 1 addition & 7 deletions packages/talos_certifier/src/services/certifier_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use async_trait::async_trait;
use log::{debug, error, warn};
use talos_suffix::core::SuffixConfig;
use talos_suffix::{get_nonempty_suffix_items, Suffix, SuffixTrait};
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
use tokio::sync::mpsc;

Expand Down Expand Up @@ -151,14 +150,9 @@ impl CertifierService {
Some(ChannelMessage::Candidate(candidate)) => {
let decision_message = self.process_candidate(&candidate.message)?;

let mut headers = candidate.headers.clone();
if let Ok(cert_time) = OffsetDateTime::now_utc().format(&Rfc3339) {
headers.insert("certTime".to_owned(), cert_time);
}

let decision_outbox_channel_message = DecisionOutboxChannelMessage {
message: decision_message.clone(),
headers,
headers: candidate.headers.clone(),
};

Ok(self
Expand Down
Loading
Loading