Skip to content

Commit

Permalink
chore: introduce config for inbound service and commit based on the c…
Browse files Browse the repository at this point in the history
…onfig size
  • Loading branch information
gk-kindred committed Nov 11, 2024
1 parent 5e0cb0b commit 38496b7
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ async fn main() {
kafka_config,
allowed_actions,
channel_buffers: None,
commit_size: None,
};

messenger_with_kafka(config).await.unwrap();
Expand Down
10 changes: 7 additions & 3 deletions packages/talos_messenger_actions/src/messenger_with_kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use talos_certifier_adapters::KafkaConsumer;
use talos_messenger_core::{
core::{MessengerPublisher, PublishActionType},
errors::MessengerServiceResult,
services::MessengerInboundService,
services::{MessengerInboundService, MessengerInboundServiceConfig},
suffix::MessengerCandidate,
talos_messenger_service::TalosMessengerService,
};
Expand Down Expand Up @@ -93,6 +93,10 @@ pub struct Configuration {
pub allowed_actions: HashMap<String, Vec<String>>,
/// Channel buffer size for the internal channels between threads
pub channel_buffers: Option<ChannelBuffers>,
/// Commit size to decide how often the certifier topic can be committed by the consumer.
/// The more ofter the commit is done has inverse impact on the latency.
/// Defaults to 1_000.
pub commit_size: Option<u32>,
}

pub async fn messenger_with_kafka(config: Configuration) -> MessengerServiceResult {
Expand All @@ -112,8 +116,8 @@ pub async fn messenger_with_kafka(config: Configuration) -> MessengerServiceResu

// START - Inbound service
let suffix: Suffix<MessengerCandidate> = Suffix::with_config(config.suffix_config.unwrap_or_default());

let inbound_service = MessengerInboundService::new(kafka_consumer, tx_actions_channel, rx_feedback_channel, suffix, config.allowed_actions);
let inbound_service_config = MessengerInboundServiceConfig::new(config.allowed_actions, config.commit_size);
let inbound_service = MessengerInboundService::new(kafka_consumer, tx_actions_channel, rx_feedback_channel, suffix, inbound_service_config);
// END - Inbound service

// START - Publish service
Expand Down
88 changes: 65 additions & 23 deletions packages/talos_messenger_core/src/services/inbound_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use async_trait::async_trait;
use log::{debug, error, info, warn};

use talos_certifier::{model::DecisionMessageTrait, ports::MessageReciever, ChannelMessage};
use talos_suffix::{core::SuffixMeta, Suffix, SuffixTrait};
use talos_suffix::{Suffix, SuffixTrait};
use time::{format_description::well_known::Rfc3339, OffsetDateTime};
use tokio::sync::mpsc::{self};

Expand All @@ -16,6 +16,25 @@ use crate::{
utlis::get_allowed_commit_actions,
};

#[derive(Debug)]
pub struct MessengerInboundServiceConfig {
/// commit size decides when the offsets can be committed.
/// When the number of feedbacks is greater than the commit_size, a commit is issued.
/// Default value is 1_000. Updating this value can impact the latency.
commit_size: u32,
/// The allowed on_commit actions
allowed_actions: HashMap<String, Vec<String>>,
}

impl MessengerInboundServiceConfig {
pub fn new(allowed_actions: HashMap<String, Vec<String>>, commit_size: Option<u32>) -> Self {
Self {
allowed_actions,
commit_size: commit_size.unwrap_or(1_000),
}
}
}

pub struct MessengerInboundService<M>
where
M: MessageReciever<Message = ChannelMessage> + Send + Sync + 'static,
Expand All @@ -24,7 +43,7 @@ where
pub tx_actions_channel: mpsc::Sender<MessengerCommitActions>,
pub rx_feedback_channel: mpsc::Receiver<MessengerChannelFeedback>,
pub suffix: Suffix<MessengerCandidate>,
pub allowed_actions: HashMap<String, Vec<String>>,
pub config: MessengerInboundServiceConfig,
}

impl<M> MessengerInboundService<M>
Expand All @@ -36,14 +55,14 @@ where
tx_actions_channel: mpsc::Sender<MessengerCommitActions>,
rx_feedback_channel: mpsc::Receiver<MessengerChannelFeedback>,
suffix: Suffix<MessengerCandidate>,
allowed_actions: HashMap<String, Vec<String>>,
config: MessengerInboundServiceConfig,
) -> Self {
Self {
message_receiver,
tx_actions_channel,
rx_feedback_channel,
suffix,
allowed_actions,
config,
}
}
/// Get next versions with their commit actions to process.
Expand Down Expand Up @@ -81,30 +100,44 @@ where
Ok(())
}

pub(crate) fn commit_offset_and_prune_suffix(&mut self) {
fn commit_offset(&mut self) {
if let Some(index) = self.suffix.get_meta().prune_index {
let prune_item_option = self.suffix.messages.get(index);

if let Some(Some(prune_item)) = prune_item_option {
let commit_offset = prune_item.item_ver + 1;
debug!("[Commit] Updating tpl to version .. {commit_offset}");
let _ = self.message_receiver.update_offset_to_commit(commit_offset as i64);

let _ = self.message_receiver.commit();
}
}
}

pub(crate) fn suffix_pruning(&mut self) {
// Update the prune index in suffix if applicable.
let SuffixMeta {
prune_index,
prune_start_threshold,
..
} = self.suffix.get_meta();
// let SuffixMeta {
// prune_index,
// prune_start_threshold,
// ..
// } = self.suffix.get_meta();
// let prune_index = self.suffix.get_meta().prune_index;

// If there is a prune_index, it is safe to assume, all messages prioir to this are decided + on_commit actions are actioned.
// Therefore, it is safe to commit till that offset/version.
if prune_index.gt(prune_start_threshold) {
if let Some(index) = prune_index {
let prune_item_option = self.suffix.messages.get(*index);
// if prune_index.gt(prune_start_threshold) {
// if let Some(index) = prune_index {
// let prune_item_option = self.suffix.messages.get(*index);

if let Some(Some(prune_item)) = prune_item_option {
let commit_offset = prune_item.item_ver + 1;
debug!("[Commit] Updating tpl to version .. {commit_offset}");
let _ = self.message_receiver.update_offset_to_commit(commit_offset as i64);
// if let Some(Some(prune_item)) = prune_item_option {
// let commit_offset = prune_item.item_ver + 1;
// debug!("[Commit] Updating tpl to version .. {commit_offset}");
// let _ = self.message_receiver.update_offset_to_commit(commit_offset as i64);

let _ = self.message_receiver.commit();
}
}
}
// let _ = self.message_receiver.commit();
// }
// }
// }

// Check prune eligibility by looking at the prune meta info.
if let Some(index_to_prune) = self.suffix.get_safe_prune_index() {
Expand Down Expand Up @@ -187,6 +220,8 @@ where

async fn run(&mut self) -> MessengerServiceResult {
info!("Running Messenger service");

let mut feedback_count: u32 = 0;
loop {
tokio::select! {
// Receive feedback from publisher.
Expand All @@ -203,6 +238,13 @@ where
},
}

feedback_count+=1;

if feedback_count.ge(&self.config.commit_size){
self.commit_offset();
feedback_count = 0;
}

// Update the prune index and commit
// let SuffixMeta {
// prune_index,
Expand All @@ -213,7 +255,7 @@ where
// // NOTE: Pruning and committing offset adds to latency if done more frequently.
// // The more frequent this method is called has direct impact on the latency.
// if prune_index.gt(prune_start_threshold) {
self.commit_offset_and_prune_suffix();
self.suffix_pruning();
// };

}
Expand All @@ -232,7 +274,7 @@ where

if let Some(item_to_update) = self.suffix.get_mut(version){
if let Some(commit_actions) = &item_to_update.item.candidate.on_commit {
let filter_actions = get_allowed_commit_actions(commit_actions, &self.allowed_actions);
let filter_actions = get_allowed_commit_actions(commit_actions, &self.config.allowed_actions);
if filter_actions.is_empty() {
// There are on_commit actions, but not the ones required by messenger
item_to_update.item.set_state(SuffixItemState::Complete(SuffixItemCompleteStateReason::NoRelavantCommitActions));
Expand Down
2 changes: 1 addition & 1 deletion packages/talos_messenger_core/src/services/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
mod inbound_service;

pub use inbound_service::MessengerInboundService;
pub use inbound_service::{MessengerInboundService, MessengerInboundServiceConfig};

0 comments on commit 38496b7

Please sign in to comment.