From 5d07b51ba1f022147a86b646b15fb879dc77e184 Mon Sep 17 00:00:00 2001 From: gk-kindred <118979108+gk-kindred@users.noreply.github.com> Date: Tue, 12 Nov 2024 10:45:20 +1100 Subject: [PATCH] feat: messenger performance optimisations to improve latency (#108) --- .env.example | 9 +- .../examples/kafka_create_topic.rs | 34 +++- .../examples/messenger_using_kafka.rs | 1 + .../src/kafka/kafka_deploy.rs | 47 +++-- .../src/kafka/context.rs | 1 + .../src/kafka/service.rs | 7 +- .../src/messenger_with_kafka.rs | 16 +- .../src/services/inbound_service.rs | 167 +++++++++++++----- .../talos_messenger_core/src/services/mod.rs | 2 +- packages/talos_messenger_core/src/suffix.rs | 99 +++++++++-- 10 files changed, 293 insertions(+), 90 deletions(-) diff --git a/.env.example b/.env.example index 4ae6888e..e4ac1437 100644 --- a/.env.example +++ b/.env.example @@ -69,4 +69,11 @@ BANK_STATEMAP_INSTALL_RETRY_WAIT_MS=2 # Messenger environment variables TALOS_MESSENGER_KAFKA_GROUP_ID="talos-messenger-dev" TALOS_MESSENGER_ACTIONS_WHITELIST="publish:kafka" -# ### Talos Messenger Env variables (end) ############################# \ No newline at end of file +# ### Talos Messenger Env variables (end) ############################# + +# +# ### Configs used for topic creation (start) ############################# +# KAFKA_CREATE_TOPIC_PARTITIONS=1 +# KAFKA_CREATE_TOPIC_REPLICATION_COUNT=3 +KAFKA_CREATE_TOPIC_CONFIGS="retention.ms=3600000, message.timestamp.type=LogAppendTime" +# ### Configs used for topic creation (end) ############################# \ No newline at end of file diff --git a/examples/certifier_kafka_pg/examples/kafka_create_topic.rs b/examples/certifier_kafka_pg/examples/kafka_create_topic.rs index 24345395..bc16a56c 100644 --- a/examples/certifier_kafka_pg/examples/kafka_create_topic.rs +++ b/examples/certifier_kafka_pg/examples/kafka_create_topic.rs @@ -1,10 +1,38 @@ -use talos_certifier_adapters::kafka::kafka_deploy::{create_topic, KafkaDeployError, KafkaDeployStatus}; +use std::collections::HashMap; + +use talos_certifier_adapters::kafka::kafka_deploy::{create_topic, CreateTopicConfigs, KafkaDeployError, KafkaDeployStatus}; +use talos_common_utils::env_var_with_defaults; +use talos_rdkafka_utils::kafka_config::KafkaConfig; #[tokio::main] async fn main() -> Result<(), KafkaDeployError> { - println!("deploying kafka..."); + println!("Creating kafka topic..."); + + let kafka_config = KafkaConfig::from_env(None); + + let replication_factor = env_var_with_defaults!("KAFKA_CREATE_TOPIC_REPLICATION_COUNT", Option::, 3); + let num_partitions = env_var_with_defaults!("KAFKA_CREATE_TOPIC_PARTITIONS", Option::, 1); + + // eg: KAFKA_CREATE_TOPIC_CONFIGS="retention.ms=3600000," + let config_option = env_var_with_defaults!("KAFKA_CREATE_TOPIC_CONFIGS", Option::); + + let mut config: HashMap<&str, &str> = HashMap::new(); + + let config_string = config_option.unwrap_or("".to_owned()); + config_string.trim().split(',').for_each(|c| { + if let Some((k, v)) = c.trim().split_once('=') { + config.insert(k, v); + } + }); + + let topic_config = CreateTopicConfigs { + topic: kafka_config.topic.clone(), + config, + replication_factor, + num_partitions, + }; - let status = create_topic().await?; + let status = create_topic(&kafka_config, topic_config).await?; match status { KafkaDeployStatus::TopicExists => { diff --git a/examples/messenger_using_kafka/examples/messenger_using_kafka.rs b/examples/messenger_using_kafka/examples/messenger_using_kafka.rs index edd7668e..e3931005 100644 --- a/examples/messenger_using_kafka/examples/messenger_using_kafka.rs +++ b/examples/messenger_using_kafka/examples/messenger_using_kafka.rs @@ -38,6 +38,7 @@ async fn main() { kafka_config, allowed_actions, channel_buffers: None, + commit_size: Some(2_000), }; messenger_with_kafka(config).await.unwrap(); diff --git a/packages/talos_certifier_adapters/src/kafka/kafka_deploy.rs b/packages/talos_certifier_adapters/src/kafka/kafka_deploy.rs index 50a5358c..d3ba32e0 100644 --- a/packages/talos_certifier_adapters/src/kafka/kafka_deploy.rs +++ b/packages/talos_certifier_adapters/src/kafka/kafka_deploy.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{collections::HashMap, time::Duration}; use rdkafka::{ admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}, @@ -7,7 +7,7 @@ use rdkafka::{ error::KafkaError, types::RDKafkaErrorCode, }; -use talos_common_utils::env_var_with_defaults; + use talos_rdkafka_utils::kafka_config::KafkaConfig; use thiserror::Error as ThisError; @@ -24,26 +24,45 @@ pub enum KafkaDeployError { KafkaError(#[from] KafkaError), } -pub async fn create_topic() -> Result { - let kafka_config = KafkaConfig::from_env(None); - println!("kafka configs received from env... {kafka_config:#?}"); +#[derive(Debug, Clone)] +pub struct CreateTopicConfigs<'a> { + /// topic to create. + pub topic: String, + /// Topic specific configs. + /// + /// see: https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html + pub config: HashMap<&'a str, &'a str>, + /// Replication count for partitions in topic. Defaults to 3. + pub replication_factor: Option, + /// Number of paritions for the topic. Defaults to 1. + pub num_partitions: Option, +} + +const DEFAULT_REPLICATION_FACTOR: i32 = 3; +const DEFAULT_NUM_PARTITIONS: i32 = 3; + +pub async fn create_topic(kafka_config: &KafkaConfig, topic_configs: CreateTopicConfigs<'_>) -> Result { + println!("kafka brokers = {:?} and usename = {}", kafka_config.brokers, kafka_config.username); + println!("topic configs received from env = {topic_configs:#?}"); let consumer: StreamConsumer = kafka_config.build_consumer_config().create()?; - let kafka_certification_topic = kafka_config.topic.to_string(); - let timeout = Duration::from_secs(1); + let timeout = Duration::from_secs(5); let metadata = consumer - .fetch_metadata(Some(&kafka_certification_topic), timeout) + .fetch_metadata(Some(&topic_configs.topic), timeout) .expect("Fetching topic metadata failed"); if !metadata.topics().is_empty() && !metadata.topics()[0].partitions().is_empty() { Ok(KafkaDeployStatus::TopicExists) } else { println!("Topic does not exist, creating..."); + + let config: Vec<(&str, &str)> = topic_configs.config.into_iter().collect(); + let topic = NewTopic { - name: &kafka_certification_topic, - num_partitions: env_var_with_defaults!("KAFKA_CREATE_TOPIC_PARTITIONS", i32, 1), - replication: TopicReplication::Fixed(1), - config: vec![("message.timestamp.type", "LogAppendTime")], + name: &topic_configs.topic, + num_partitions: topic_configs.num_partitions.unwrap_or(DEFAULT_NUM_PARTITIONS), + replication: TopicReplication::Fixed(topic_configs.replication_factor.unwrap_or(DEFAULT_REPLICATION_FACTOR)), + config, }; let opts = AdminOptions::new().operation_timeout(Some(timeout)); @@ -52,9 +71,7 @@ pub async fn create_topic() -> Result { let results = admin.create_topics(&[topic], &opts).await?; - results[0] - .as_ref() - .map_err(|e| KafkaDeployError::TopicCreation(kafka_certification_topic, e.1))?; + results[0].as_ref().map_err(|e| KafkaDeployError::TopicCreation(topic_configs.topic, e.1))?; Ok(KafkaDeployStatus::TopicCreated) } diff --git a/packages/talos_messenger_actions/src/kafka/context.rs b/packages/talos_messenger_actions/src/kafka/context.rs index dc4178ae..25eef79d 100644 --- a/packages/talos_messenger_actions/src/kafka/context.rs +++ b/packages/talos_messenger_actions/src/kafka/context.rs @@ -29,6 +29,7 @@ impl ProducerContext for MessengerProducerContext { match result { Ok(msg) => { info!("Message {:?} {:?}", msg.key(), msg.offset()); + // Safe to ignore error check, as error occurs only if receiver is closed or dropped, which would happen if the thread receving has errored. In such a scenario, the publisher thread would also shutdown. if let Err(error) = block_on(self.tx_feedback_channel.send(MessengerChannelFeedback::Success(version, "kafka".to_string()))) { error!("[Messenger Producer Context] Error sending feedback for version={version} with error={error:?}"); diff --git a/packages/talos_messenger_actions/src/kafka/service.rs b/packages/talos_messenger_actions/src/kafka/service.rs index 3c51c021..c83176fd 100644 --- a/packages/talos_messenger_actions/src/kafka/service.rs +++ b/packages/talos_messenger_actions/src/kafka/service.rs @@ -3,11 +3,13 @@ use std::sync::Arc; use async_trait::async_trait; use futures_util::future::join_all; use log::{debug, error, info}; +use time::{format_description::well_known::Rfc3339, OffsetDateTime}; use tokio::sync::mpsc; use talos_messenger_core::{ core::{MessengerChannelFeedback, MessengerCommitActions, MessengerPublisher, MessengerSystemService}, errors::MessengerServiceResult, + suffix::MessengerStateTransitionTimestamps, utlis::get_actions_deserialised, }; @@ -49,7 +51,10 @@ where let publish_vec = actions.into_iter().map(|action| { let publisher = self.publisher.clone(); - let headers = headers_cloned.clone(); + let mut headers = headers_cloned.clone(); + let timestamp = OffsetDateTime::now_utc().format(&Rfc3339).ok().unwrap(); + + headers.insert(MessengerStateTransitionTimestamps::EndOnCommitActions.to_string(), timestamp); async move { publisher.send(version, action, headers, total_len ).await; } diff --git a/packages/talos_messenger_actions/src/messenger_with_kafka.rs b/packages/talos_messenger_actions/src/messenger_with_kafka.rs index dd123ed2..f852c66f 100644 --- a/packages/talos_messenger_actions/src/messenger_with_kafka.rs +++ b/packages/talos_messenger_actions/src/messenger_with_kafka.rs @@ -7,7 +7,7 @@ use talos_certifier_adapters::KafkaConsumer; use talos_messenger_core::{ core::{MessengerPublisher, PublishActionType}, errors::MessengerServiceResult, - services::MessengerInboundService, + services::{MessengerInboundService, MessengerInboundServiceConfig}, suffix::MessengerCandidate, talos_messenger_service::TalosMessengerService, }; @@ -93,6 +93,10 @@ pub struct Configuration { pub allowed_actions: HashMap>, /// Channel buffer size for the internal channels between threads pub channel_buffers: Option, + /// Commit size to decide how often the certifier topic can be committed by the consumer. + /// The more often the commit is done has inverse impact on the latency. + /// Defaults to 5_000. + pub commit_size: Option, } pub async fn messenger_with_kafka(config: Configuration) -> MessengerServiceResult { @@ -112,14 +116,8 @@ pub async fn messenger_with_kafka(config: Configuration) -> MessengerServiceResu // START - Inbound service let suffix: Suffix = Suffix::with_config(config.suffix_config.unwrap_or_default()); - - let inbound_service = MessengerInboundService { - message_receiver: kafka_consumer, - tx_actions_channel, - rx_feedback_channel, - suffix, - allowed_actions: config.allowed_actions, - }; + let inbound_service_config = MessengerInboundServiceConfig::new(config.allowed_actions, config.commit_size); + let inbound_service = MessengerInboundService::new(kafka_consumer, tx_actions_channel, rx_feedback_channel, suffix, inbound_service_config); // END - Inbound service // START - Publish service diff --git a/packages/talos_messenger_core/src/services/inbound_service.rs b/packages/talos_messenger_core/src/services/inbound_service.rs index 72573c21..5c5772cd 100644 --- a/packages/talos_messenger_core/src/services/inbound_service.rs +++ b/packages/talos_messenger_core/src/services/inbound_service.rs @@ -3,16 +3,38 @@ use async_trait::async_trait; use log::{debug, error, info, warn}; use talos_certifier::{model::DecisionMessageTrait, ports::MessageReciever, ChannelMessage}; -use talos_suffix::{Suffix, SuffixTrait}; -use tokio::sync::mpsc; +use talos_suffix::{core::SuffixMeta, Suffix, SuffixTrait}; +use time::{format_description::well_known::Rfc3339, OffsetDateTime}; +use tokio::sync::mpsc::{self}; use crate::{ core::{MessengerChannelFeedback, MessengerCommitActions, MessengerSystemService}, errors::{MessengerServiceError, MessengerServiceResult}, - suffix::{MessengerCandidate, MessengerSuffixItemTrait, MessengerSuffixTrait, SuffixItemCompleteStateReason, SuffixItemState}, + suffix::{ + MessengerCandidate, MessengerStateTransitionTimestamps, MessengerSuffixItemTrait, MessengerSuffixTrait, SuffixItemCompleteStateReason, SuffixItemState, + }, utlis::get_allowed_commit_actions, }; +#[derive(Debug)] +pub struct MessengerInboundServiceConfig { + /// commit size decides when the offsets can be committed. + /// When the number of feedbacks is greater than the commit_size, a commit is issued. + /// Default value is 5_000. Updating this value can impact the latency/throughput due to the frequency at which the commits will be issued. + commit_size: u32, + /// The allowed on_commit actions + allowed_actions: HashMap>, +} + +impl MessengerInboundServiceConfig { + pub fn new(allowed_actions: HashMap>, commit_size: Option) -> Self { + Self { + allowed_actions, + commit_size: commit_size.unwrap_or(5_000), + } + } +} + pub struct MessengerInboundService where M: MessageReciever + Send + Sync + 'static, @@ -21,30 +43,49 @@ where pub tx_actions_channel: mpsc::Sender, pub rx_feedback_channel: mpsc::Receiver, pub suffix: Suffix, - pub allowed_actions: HashMap>, + pub config: MessengerInboundServiceConfig, } impl MessengerInboundService where M: MessageReciever + Send + Sync + 'static, { + pub fn new( + message_receiver: M, + tx_actions_channel: mpsc::Sender, + rx_feedback_channel: mpsc::Receiver, + suffix: Suffix, + config: MessengerInboundServiceConfig, + ) -> Self { + Self { + message_receiver, + tx_actions_channel, + rx_feedback_channel, + suffix, + config, + } + } /// Get next versions with their commit actions to process. /// async fn process_next_actions(&mut self) -> MessengerServiceResult { let items_to_process = self.suffix.get_suffix_items_to_process(); - for item in items_to_process { let ver = item.version; + let mut headers = item.headers; + let timestamp = OffsetDateTime::now_utc().format(&Rfc3339).ok().unwrap(); + headers.insert(MessengerStateTransitionTimestamps::StartOnCommitActions.to_string(), timestamp); + let payload_to_send = MessengerCommitActions { version: ver, commit_actions: item.actions.iter().fold(HashMap::new(), |mut acc, (key, value)| { acc.insert(key.to_string(), value.get_payload().clone()); acc }), - headers: item.headers, + headers, }; // send for publishing + self.tx_actions_channel.send(payload_to_send).await.map_err(|e| MessengerServiceError { kind: crate::errors::MessengerServiceErrorKind::Channel, reason: e.to_string(), @@ -59,6 +100,29 @@ where Ok(()) } + fn commit_offset(&mut self) { + if let Some(index) = self.suffix.get_meta().prune_index { + let prune_item_option = self.suffix.messages.get(index); + + if let Some(Some(prune_item)) = prune_item_option { + let commit_offset = prune_item.item_ver + 1; + debug!("[Commit] Updating tpl to version .. {commit_offset}"); + let _ = self.message_receiver.update_offset_to_commit(commit_offset as i64); + + let _ = self.message_receiver.commit_async(); + } + } + } + + pub(crate) fn suffix_pruning(&mut self) { + // Check prune eligibility by looking at the prune meta info. + if let Some(index_to_prune) = self.suffix.get_safe_prune_index() { + // error!("Pruning till index {index_to_prune}"); + // Call prune method on suffix. + let _ = self.suffix.prune_till_index(index_to_prune); + } + } + /// Checks if all actions are completed and updates the state of the item to `Processed`. /// Also, checks if the suffix can be pruned and the message_receiver can be committed. pub(crate) fn check_and_update_all_actions_complete(&mut self, version: u64, reason: SuffixItemCompleteStateReason) { @@ -66,29 +130,11 @@ where Ok(is_completed) if is_completed => { self.suffix.set_item_state(version, SuffixItemState::Complete(reason)); - // Update the prune index in suffix if applicable. - let prune_index = self.suffix.update_prune_index_from_version(version); - - // If there is a prune_index, it is safe to assume, all messages prioir to this are decided + on_commit actions are actioned. - // Therefore, it is safe to commit till that offset/version. - if let Some(index) = prune_index { - let prune_item_option = self.suffix.messages.get(index); - - if let Some(Some(prune_item)) = prune_item_option { - let commit_offset = prune_item.item_ver + 1; - debug!("[Commit] Updating tpl to version .. {commit_offset}"); - let _ = self.message_receiver.update_offset_to_commit(commit_offset as i64); + // self.all_completed_versions.push(version); - self.message_receiver.commit_async(); - } - } + let _ = self.suffix.update_prune_index_from_version(version); debug!("[Actions] All actions for version {version} completed!"); - // Check prune eligibility by looking at the prune meta info. - if let Some(index_to_prune) = self.suffix.get_safe_prune_index() { - // Call prune method on suffix. - let _ = self.suffix.prune_till_index(index_to_prune); - } } _ => {} } @@ -150,8 +196,28 @@ where async fn run(&mut self) -> MessengerServiceResult { info!("Running Messenger service"); + + let mut feedback_count: u32 = 0; loop { tokio::select! { + // Receive feedback from publisher. + Some(feedback_result) = self.rx_feedback_channel.recv() => { + match feedback_result { + MessengerChannelFeedback::Error(version, key, message_error) => { + error!("Failed to process version={version} with error={message_error:?}"); + self.handle_action_failed(version, &key); + + }, + MessengerChannelFeedback::Success(version, key) => { + debug!("Successfully processed version={version} with action_key={key}"); + self.handle_action_success(version, &key); + }, + } + + feedback_count+=1; + + + } // 1. Consume message. // Ok(Some(msg)) = self.message_receiver.consume_message() => { reciever_result = self.message_receiver.consume_message() => { @@ -167,7 +233,7 @@ where if let Some(item_to_update) = self.suffix.get_mut(version){ if let Some(commit_actions) = &item_to_update.item.candidate.on_commit { - let filter_actions = get_allowed_commit_actions(commit_actions, &self.allowed_actions); + let filter_actions = get_allowed_commit_actions(commit_actions, &self.config.allowed_actions); if filter_actions.is_empty() { // There are on_commit actions, but not the ones required by messenger item_to_update.item.set_state(SuffixItemState::Complete(SuffixItemCompleteStateReason::NoRelavantCommitActions)); @@ -181,6 +247,8 @@ where } }; + + } else { warn!("Version 0 will not be inserted into suffix.") } @@ -188,17 +256,19 @@ where // 2.2 For DM - Update the decision with outcome + safepoint. Ok(Some(ChannelMessage::Decision(decision))) => { let version = decision.message.get_candidate_version(); - info!("[Decision Message] Version received = {} and {}", decision.decision_version, version); + debug!("[Decision Message] Decision version received = {} for candidate version = {}", decision.decision_version, version); // TODO: GK - no hardcoded filters on headers let headers: HashMap = decision.headers.into_iter().filter(|(key, _)| key.as_str() != "messageType").collect(); self.suffix.update_item_decision(version, decision.decision_version, &decision.message, headers); + // Pick the next items from suffix whose actions are to be processed. self.process_next_actions().await?; + }, Ok(None) => { - info!("No message to process.."); + debug!("No message to process.."); }, Err(error) => { // Catch the error propogated, and if it has a version, mark the item as completed. @@ -210,28 +280,31 @@ where error!("error consuming message....{:?}", error); }, } + + } + else => { + warn!("Unhandled arm...."); } - // Receive feedback from publisher. - feedback_result = self.rx_feedback_channel.recv() => { - match feedback_result { - Some(MessengerChannelFeedback::Error(version, key, message_error)) => { - error!("Failed to process version={version} with error={message_error:?}"); - self.handle_action_failed(version, &key); - }, - Some(MessengerChannelFeedback::Success(version, key)) => { - info!("Successfully processed version={version} with action_key={key}"); - self.handle_action_success(version, &key); - }, - None => { - debug!("No feedback message to process.."); - } - } - // Process the next items with commit actions - self.process_next_actions().await? + } - } + // NOTE: Pruning and committing offset adds to latency if done more frequently. + + if feedback_count.ge(&self.config.commit_size) { + self.commit_offset(); + feedback_count = 0; } + + // Update the prune index and commit + let SuffixMeta { + prune_index, + prune_start_threshold, + .. + } = self.suffix.get_meta(); + + if prune_index.gt(prune_start_threshold) { + self.suffix_pruning(); + }; } } } diff --git a/packages/talos_messenger_core/src/services/mod.rs b/packages/talos_messenger_core/src/services/mod.rs index 75e294e4..d3ae629a 100644 --- a/packages/talos_messenger_core/src/services/mod.rs +++ b/packages/talos_messenger_core/src/services/mod.rs @@ -1,3 +1,3 @@ mod inbound_service; -pub use inbound_service::MessengerInboundService; +pub use inbound_service::{MessengerInboundService, MessengerInboundServiceConfig}; diff --git a/packages/talos_messenger_core/src/suffix.rs b/packages/talos_messenger_core/src/suffix.rs index 15efcf63..118dee22 100644 --- a/packages/talos_messenger_core/src/suffix.rs +++ b/packages/talos_messenger_core/src/suffix.rs @@ -3,8 +3,10 @@ use log::{debug, warn}; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::fmt::Debug; +use strum::{Display, EnumString}; use talos_certifier::model::{CandidateMessage, Decision, DecisionMessageTrait}; use talos_suffix::{core::SuffixResult, Suffix, SuffixTrait}; +use time::{format_description::well_known::Rfc3339, OffsetDateTime}; pub trait MessengerSuffixItemTrait: Debug + Clone { fn set_state(&mut self, state: SuffixItemState); @@ -14,6 +16,7 @@ pub trait MessengerSuffixItemTrait: Debug + Clone { fn set_headers(&mut self, headers: HashMap); fn get_state(&self) -> &SuffixItemState; + fn get_state_transition_timestamps(&self) -> &HashMap; fn get_commit_actions(&self) -> &HashMap; fn get_action_by_key_mut(&mut self, action_key: &str) -> Option<&mut AllowedActionsMapItem>; fn get_safepoint(&self) -> &Option; @@ -59,6 +62,42 @@ pub enum SuffixItemState { Complete(SuffixItemCompleteStateReason), } +type TimeStamp = String; + +/// Internal timings from messenger for a candidate received. +/// These timings help in debugging the time taken between various state changes. +#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq, EnumString, Display, Hash)] +pub enum MessengerStateTransitionTimestamps { + /// Set when SuffixItemState::AwaitingDecision + #[strum(serialize = "messengerCandidateReceivedTimestamp")] + CandidateReceived, + /// Set when SuffixItemState::ReadyToProcess + #[strum(serialize = "messengerDecisionReceivedTimestamp")] + DecisionReceived, + /// Set when SuffixItemState::Processing + #[strum(serialize = "messengerStartOnCommitActionsTimestamp")] + StartOnCommitActions, + // /// Not required for timings + #[strum(disabled)] + InProgressOnCommitActions, + /// Set when SuffixItemState::Complete + /// Irrespective of the reason for completion, we just capture the timing based on the final state. + #[strum(serialize = "messengerEndOnCommitActionsTimestamp")] + EndOnCommitActions, +} + +impl From for MessengerStateTransitionTimestamps { + fn from(value: SuffixItemState) -> Self { + match value { + SuffixItemState::AwaitingDecision => MessengerStateTransitionTimestamps::CandidateReceived, + SuffixItemState::ReadyToProcess => MessengerStateTransitionTimestamps::DecisionReceived, + SuffixItemState::Processing => MessengerStateTransitionTimestamps::StartOnCommitActions, + SuffixItemState::PartiallyComplete => MessengerStateTransitionTimestamps::InProgressOnCommitActions, + SuffixItemState::Complete(_) => MessengerStateTransitionTimestamps::EndOnCommitActions, + } + } +} + #[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] pub enum SuffixItemCompleteStateReason { /// When the decision is an abort @@ -122,6 +161,8 @@ pub struct MessengerCandidate { decision: Option, /// Suffix item state. state: SuffixItemState, + /// Suffix state transition timestamps. + state_transition_ts: HashMap, /// Filtered actions that need to be processed by the messenger allowed_actions_map: HashMap, /// Any headers from decision to be used in on-commit actions @@ -130,12 +171,17 @@ pub struct MessengerCandidate { impl From for MessengerCandidate { fn from(candidate: CandidateMessage) -> Self { + let state = SuffixItemState::AwaitingDecision; + let mut state_transition_ts: HashMap = HashMap::new(); + let timestamp = OffsetDateTime::now_utc().format(&Rfc3339).ok().unwrap(); + state_transition_ts.insert(state.clone().into(), timestamp); + MessengerCandidate { candidate, safepoint: None, decision: None, - - state: SuffixItemState::AwaitingDecision, + state, + state_transition_ts, allowed_actions_map: HashMap::new(), headers: HashMap::new(), } @@ -152,7 +198,10 @@ impl MessengerSuffixItemTrait for MessengerCandidate { } fn set_state(&mut self, state: SuffixItemState) { - self.state = state; + self.state = state.clone(); + + let timestamp = OffsetDateTime::now_utc().format(&Rfc3339).ok().unwrap(); + self.state_transition_ts.insert(state.into(), timestamp); } fn set_commit_action(&mut self, commit_actions: HashMap) { @@ -162,6 +211,9 @@ impl MessengerSuffixItemTrait for MessengerCandidate { fn get_state(&self) -> &SuffixItemState { &self.state } + fn get_state_transition_timestamps(&self) -> &HashMap { + &self.state_transition_ts + } fn get_safepoint(&self) -> &Option { &self.safepoint @@ -211,32 +263,54 @@ where } fn get_suffix_items_to_process(&self) -> Vec { - let items = self + let current_prune_index = self.get_meta().prune_index; + + let start_index = current_prune_index.unwrap_or(0); + + // let start_ms = Instant::now(); + let items: Vec = self .messages - .iter() + // we know from start_index = prune_index, everything prioir to this is already completed. + // This helps in taking a smaller slice out of the suffix to iterate over. + .range(start_index..) // Remove `None` items .flatten() // Filter only the items awaiting to be processed. .filter(|&x| x.item.get_state().eq(&SuffixItemState::ReadyToProcess)) // Take while contiguous ones, whose safepoint is already processed. - .take_while(|&x| { + .filter(|&x| { let Some(safepoint) = x.item.get_safepoint() else { return false; }; match self.get(*safepoint) { - // If we find the suffix item from the safepoint, we need to ensure that it already in `Complete` state + // If we find the suffix item from the safepoint, we need to ensure that it already in `Complete` or `Processing` state Ok(Some(safepoint_item)) => { - matches!(safepoint_item.item.get_state(), SuffixItemState::Complete(..)) + matches!(safepoint_item.item.get_state(), SuffixItemState::Processing | SuffixItemState::Complete(..)) } // If we couldn't find the item in suffix, it could be because it was pruned and it is safe to assume that we can consider it. _ => true, } }) - .map(|x| ActionsMapWithVersion { - version: x.item_ver, - actions: x.item.get_commit_actions().clone(), - headers: x.item.get_headers().clone(), + // add timings related headers. + .map(|x| { + let mut headers = x.item.get_headers().clone(); + // Add the state timestamps + let state_timestamps_headers = x + .item + .get_state_transition_timestamps() + // .clone() + .iter() + .map(|(state, ts)| (state.to_string(), ts.clone())) + .collect::>(); + + headers.extend(state_timestamps_headers); + + ActionsMapWithVersion { + version: x.item_ver, + actions: x.item.get_commit_actions().clone(), + headers, + } }) .collect(); @@ -288,7 +362,6 @@ where "[Update prune index] Calculating prune index in suffix slice between index {start_index} <-> {end_index}. Current prune index version {current_prune_index:?}.", ); - // 1. Get the last contiguous item that is completed. let safe_prune_version = self .messages .range(start_index..=end_index)