diff --git a/packages/talos_messenger_actions/src/messenger_with_kafka.rs b/packages/talos_messenger_actions/src/messenger_with_kafka.rs index d2372b62..0b051e83 100644 --- a/packages/talos_messenger_actions/src/messenger_with_kafka.rs +++ b/packages/talos_messenger_actions/src/messenger_with_kafka.rs @@ -7,7 +7,7 @@ use talos_certifier_adapters::KafkaConsumer; use talos_messenger_core::{ core::{MessengerPublisher, PublishActionType}, errors::MessengerServiceResult, - services::MessengerInboundService, + services::{MessengerInboundService, MessengerInboundServiceConfig}, suffix::MessengerCandidate, talos_messenger_service::TalosMessengerService, }; @@ -112,8 +112,8 @@ pub async fn messenger_with_kafka(config: Configuration) -> MessengerServiceResu // START - Inbound service let suffix: Suffix = Suffix::with_config(config.suffix_config.unwrap_or_default()); - - let inbound_service = MessengerInboundService::new(kafka_consumer, tx_actions_channel, rx_feedback_channel, suffix, config.allowed_actions); + let inbound_service_config = MessengerInboundServiceConfig::new(config.allowed_actions, None); + let inbound_service = MessengerInboundService::new(kafka_consumer, tx_actions_channel, rx_feedback_channel, suffix, inbound_service_config); // END - Inbound service // START - Publish service diff --git a/packages/talos_messenger_core/src/services/inbound_service.rs b/packages/talos_messenger_core/src/services/inbound_service.rs index 2f8ea6cc..62fc4bbd 100644 --- a/packages/talos_messenger_core/src/services/inbound_service.rs +++ b/packages/talos_messenger_core/src/services/inbound_service.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use log::{debug, error, info, warn}; use talos_certifier::{model::DecisionMessageTrait, ports::MessageReciever, ChannelMessage}; -use talos_suffix::{core::SuffixMeta, Suffix, SuffixTrait}; +use talos_suffix::{Suffix, SuffixTrait}; use time::{format_description::well_known::Rfc3339, OffsetDateTime}; use tokio::sync::mpsc::{self}; @@ -16,6 +16,25 @@ use crate::{ utlis::get_allowed_commit_actions, }; +#[derive(Debug)] +pub struct MessengerInboundServiceConfig { + /// commit size decides when the offsets can be committed. + /// When the number of feedbacks is greater than the commit_size, a commit is issued. + /// Default value is 1_000. Updating this value can impact the latency. + commit_size: u32, + /// The allowed on_commit actions + allowed_actions: HashMap>, +} + +impl MessengerInboundServiceConfig { + pub fn new(allowed_actions: HashMap>, commit_size: Option) -> Self { + Self { + allowed_actions, + commit_size: commit_size.unwrap_or(1_000), + } + } +} + pub struct MessengerInboundService where M: MessageReciever + Send + Sync + 'static, @@ -24,7 +43,7 @@ where pub tx_actions_channel: mpsc::Sender, pub rx_feedback_channel: mpsc::Receiver, pub suffix: Suffix, - pub allowed_actions: HashMap>, + pub config: MessengerInboundServiceConfig, } impl MessengerInboundService @@ -36,14 +55,14 @@ where tx_actions_channel: mpsc::Sender, rx_feedback_channel: mpsc::Receiver, suffix: Suffix, - allowed_actions: HashMap>, + config: MessengerInboundServiceConfig, ) -> Self { Self { message_receiver, tx_actions_channel, rx_feedback_channel, suffix, - allowed_actions, + config, } } /// Get next versions with their commit actions to process. @@ -81,30 +100,44 @@ where Ok(()) } - pub(crate) fn commit_offset_and_prune_suffix(&mut self) { + fn commit_offset(&mut self) { + if let Some(index) = self.suffix.get_meta().prune_index { + let prune_item_option = self.suffix.messages.get(index); + + if let Some(Some(prune_item)) = prune_item_option { + let commit_offset = prune_item.item_ver + 1; + debug!("[Commit] Updating tpl to version .. {commit_offset}"); + let _ = self.message_receiver.update_offset_to_commit(commit_offset as i64); + + let _ = self.message_receiver.commit(); + } + } + } + + pub(crate) fn suffix_pruning(&mut self) { // Update the prune index in suffix if applicable. - let SuffixMeta { - prune_index, - prune_start_threshold, - .. - } = self.suffix.get_meta(); + // let SuffixMeta { + // prune_index, + // prune_start_threshold, + // .. + // } = self.suffix.get_meta(); // let prune_index = self.suffix.get_meta().prune_index; // If there is a prune_index, it is safe to assume, all messages prioir to this are decided + on_commit actions are actioned. // Therefore, it is safe to commit till that offset/version. - if prune_index.gt(prune_start_threshold) { - if let Some(index) = prune_index { - let prune_item_option = self.suffix.messages.get(*index); + // if prune_index.gt(prune_start_threshold) { + // if let Some(index) = prune_index { + // let prune_item_option = self.suffix.messages.get(*index); - if let Some(Some(prune_item)) = prune_item_option { - let commit_offset = prune_item.item_ver + 1; - debug!("[Commit] Updating tpl to version .. {commit_offset}"); - let _ = self.message_receiver.update_offset_to_commit(commit_offset as i64); + // if let Some(Some(prune_item)) = prune_item_option { + // let commit_offset = prune_item.item_ver + 1; + // debug!("[Commit] Updating tpl to version .. {commit_offset}"); + // let _ = self.message_receiver.update_offset_to_commit(commit_offset as i64); - let _ = self.message_receiver.commit(); - } - } - } + // let _ = self.message_receiver.commit(); + // } + // } + // } // Check prune eligibility by looking at the prune meta info. if let Some(index_to_prune) = self.suffix.get_safe_prune_index() { @@ -187,6 +220,8 @@ where async fn run(&mut self) -> MessengerServiceResult { info!("Running Messenger service"); + + let mut feedback_count: u32 = 0; loop { tokio::select! { // Receive feedback from publisher. @@ -203,6 +238,13 @@ where }, } + feedback_count+=1; + + if feedback_count.ge(&self.config.commit_size){ + self.commit_offset(); + feedback_count = 0; + } + // Update the prune index and commit // let SuffixMeta { // prune_index, @@ -213,7 +255,7 @@ where // // NOTE: Pruning and committing offset adds to latency if done more frequently. // // The more frequent this method is called has direct impact on the latency. // if prune_index.gt(prune_start_threshold) { - self.commit_offset_and_prune_suffix(); + self.suffix_pruning(); // }; } @@ -232,7 +274,7 @@ where if let Some(item_to_update) = self.suffix.get_mut(version){ if let Some(commit_actions) = &item_to_update.item.candidate.on_commit { - let filter_actions = get_allowed_commit_actions(commit_actions, &self.allowed_actions); + let filter_actions = get_allowed_commit_actions(commit_actions, &self.config.allowed_actions); if filter_actions.is_empty() { // There are on_commit actions, but not the ones required by messenger item_to_update.item.set_state(SuffixItemState::Complete(SuffixItemCompleteStateReason::NoRelavantCommitActions)); diff --git a/packages/talos_messenger_core/src/services/mod.rs b/packages/talos_messenger_core/src/services/mod.rs index 75e294e4..d3ae629a 100644 --- a/packages/talos_messenger_core/src/services/mod.rs +++ b/packages/talos_messenger_core/src/services/mod.rs @@ -1,3 +1,3 @@ mod inbound_service; -pub use inbound_service::MessengerInboundService; +pub use inbound_service::{MessengerInboundService, MessengerInboundServiceConfig};