Skip to content

Commit

Permalink
feat: messenger performance optimisations to improve latency (#108)
Browse files Browse the repository at this point in the history
  • Loading branch information
gk-kindred authored Nov 11, 2024
1 parent c01bb1b commit 5d07b51
Show file tree
Hide file tree
Showing 10 changed files with 293 additions and 90 deletions.
9 changes: 8 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,11 @@ BANK_STATEMAP_INSTALL_RETRY_WAIT_MS=2
# Messenger environment variables
TALOS_MESSENGER_KAFKA_GROUP_ID="talos-messenger-dev"
TALOS_MESSENGER_ACTIONS_WHITELIST="publish:kafka"
# ### Talos Messenger Env variables (end) #############################
# ### Talos Messenger Env variables (end) #############################

#
# ### Configs used for topic creation (start) #############################
# KAFKA_CREATE_TOPIC_PARTITIONS=1
# KAFKA_CREATE_TOPIC_REPLICATION_COUNT=3
KAFKA_CREATE_TOPIC_CONFIGS="retention.ms=3600000, message.timestamp.type=LogAppendTime"
# ### Configs used for topic creation (end) #############################
34 changes: 31 additions & 3 deletions examples/certifier_kafka_pg/examples/kafka_create_topic.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,38 @@
use talos_certifier_adapters::kafka::kafka_deploy::{create_topic, KafkaDeployError, KafkaDeployStatus};
use std::collections::HashMap;

use talos_certifier_adapters::kafka::kafka_deploy::{create_topic, CreateTopicConfigs, KafkaDeployError, KafkaDeployStatus};
use talos_common_utils::env_var_with_defaults;
use talos_rdkafka_utils::kafka_config::KafkaConfig;

#[tokio::main]
async fn main() -> Result<(), KafkaDeployError> {
println!("deploying kafka...");
println!("Creating kafka topic...");

let kafka_config = KafkaConfig::from_env(None);

let replication_factor = env_var_with_defaults!("KAFKA_CREATE_TOPIC_REPLICATION_COUNT", Option::<i32>, 3);
let num_partitions = env_var_with_defaults!("KAFKA_CREATE_TOPIC_PARTITIONS", Option::<i32>, 1);

// eg: KAFKA_CREATE_TOPIC_CONFIGS="retention.ms=3600000,"
let config_option = env_var_with_defaults!("KAFKA_CREATE_TOPIC_CONFIGS", Option::<String>);

let mut config: HashMap<&str, &str> = HashMap::new();

let config_string = config_option.unwrap_or("".to_owned());
config_string.trim().split(',').for_each(|c| {
if let Some((k, v)) = c.trim().split_once('=') {
config.insert(k, v);
}
});

let topic_config = CreateTopicConfigs {
topic: kafka_config.topic.clone(),
config,
replication_factor,
num_partitions,
};

let status = create_topic().await?;
let status = create_topic(&kafka_config, topic_config).await?;

match status {
KafkaDeployStatus::TopicExists => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ async fn main() {
kafka_config,
allowed_actions,
channel_buffers: None,
commit_size: Some(2_000),
};

messenger_with_kafka(config).await.unwrap();
Expand Down
47 changes: 32 additions & 15 deletions packages/talos_certifier_adapters/src/kafka/kafka_deploy.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::Duration;
use std::{collections::HashMap, time::Duration};

use rdkafka::{
admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
Expand All @@ -7,7 +7,7 @@ use rdkafka::{
error::KafkaError,
types::RDKafkaErrorCode,
};
use talos_common_utils::env_var_with_defaults;

use talos_rdkafka_utils::kafka_config::KafkaConfig;
use thiserror::Error as ThisError;

Expand All @@ -24,26 +24,45 @@ pub enum KafkaDeployError {
KafkaError(#[from] KafkaError),
}

pub async fn create_topic() -> Result<KafkaDeployStatus, KafkaDeployError> {
let kafka_config = KafkaConfig::from_env(None);
println!("kafka configs received from env... {kafka_config:#?}");
#[derive(Debug, Clone)]
pub struct CreateTopicConfigs<'a> {
/// topic to create.
pub topic: String,
/// Topic specific configs.
///
/// see: https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html
pub config: HashMap<&'a str, &'a str>,
/// Replication count for partitions in topic. Defaults to 3.
pub replication_factor: Option<i32>,
/// Number of paritions for the topic. Defaults to 1.
pub num_partitions: Option<i32>,
}

const DEFAULT_REPLICATION_FACTOR: i32 = 3;
const DEFAULT_NUM_PARTITIONS: i32 = 3;

pub async fn create_topic(kafka_config: &KafkaConfig, topic_configs: CreateTopicConfigs<'_>) -> Result<KafkaDeployStatus, KafkaDeployError> {
println!("kafka brokers = {:?} and usename = {}", kafka_config.brokers, kafka_config.username);
println!("topic configs received from env = {topic_configs:#?}");
let consumer: StreamConsumer = kafka_config.build_consumer_config().create()?;

let kafka_certification_topic = kafka_config.topic.to_string();
let timeout = Duration::from_secs(1);
let timeout = Duration::from_secs(5);
let metadata = consumer
.fetch_metadata(Some(&kafka_certification_topic), timeout)
.fetch_metadata(Some(&topic_configs.topic), timeout)
.expect("Fetching topic metadata failed");

if !metadata.topics().is_empty() && !metadata.topics()[0].partitions().is_empty() {
Ok(KafkaDeployStatus::TopicExists)
} else {
println!("Topic does not exist, creating...");

let config: Vec<(&str, &str)> = topic_configs.config.into_iter().collect();

let topic = NewTopic {
name: &kafka_certification_topic,
num_partitions: env_var_with_defaults!("KAFKA_CREATE_TOPIC_PARTITIONS", i32, 1),
replication: TopicReplication::Fixed(1),
config: vec![("message.timestamp.type", "LogAppendTime")],
name: &topic_configs.topic,
num_partitions: topic_configs.num_partitions.unwrap_or(DEFAULT_NUM_PARTITIONS),
replication: TopicReplication::Fixed(topic_configs.replication_factor.unwrap_or(DEFAULT_REPLICATION_FACTOR)),
config,
};

let opts = AdminOptions::new().operation_timeout(Some(timeout));
Expand All @@ -52,9 +71,7 @@ pub async fn create_topic() -> Result<KafkaDeployStatus, KafkaDeployError> {

let results = admin.create_topics(&[topic], &opts).await?;

results[0]
.as_ref()
.map_err(|e| KafkaDeployError::TopicCreation(kafka_certification_topic, e.1))?;
results[0].as_ref().map_err(|e| KafkaDeployError::TopicCreation(topic_configs.topic, e.1))?;

Ok(KafkaDeployStatus::TopicCreated)
}
Expand Down
1 change: 1 addition & 0 deletions packages/talos_messenger_actions/src/kafka/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ impl ProducerContext for MessengerProducerContext {
match result {
Ok(msg) => {
info!("Message {:?} {:?}", msg.key(), msg.offset());

// Safe to ignore error check, as error occurs only if receiver is closed or dropped, which would happen if the thread receving has errored. In such a scenario, the publisher thread would also shutdown.
if let Err(error) = block_on(self.tx_feedback_channel.send(MessengerChannelFeedback::Success(version, "kafka".to_string()))) {
error!("[Messenger Producer Context] Error sending feedback for version={version} with error={error:?}");
Expand Down
7 changes: 6 additions & 1 deletion packages/talos_messenger_actions/src/kafka/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ use std::sync::Arc;
use async_trait::async_trait;
use futures_util::future::join_all;
use log::{debug, error, info};
use time::{format_description::well_known::Rfc3339, OffsetDateTime};
use tokio::sync::mpsc;

use talos_messenger_core::{
core::{MessengerChannelFeedback, MessengerCommitActions, MessengerPublisher, MessengerSystemService},
errors::MessengerServiceResult,
suffix::MessengerStateTransitionTimestamps,
utlis::get_actions_deserialised,
};

Expand Down Expand Up @@ -49,7 +51,10 @@ where

let publish_vec = actions.into_iter().map(|action| {
let publisher = self.publisher.clone();
let headers = headers_cloned.clone();
let mut headers = headers_cloned.clone();
let timestamp = OffsetDateTime::now_utc().format(&Rfc3339).ok().unwrap();

headers.insert(MessengerStateTransitionTimestamps::EndOnCommitActions.to_string(), timestamp);
async move {
publisher.send(version, action, headers, total_len ).await;
}
Expand Down
16 changes: 7 additions & 9 deletions packages/talos_messenger_actions/src/messenger_with_kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use talos_certifier_adapters::KafkaConsumer;
use talos_messenger_core::{
core::{MessengerPublisher, PublishActionType},
errors::MessengerServiceResult,
services::MessengerInboundService,
services::{MessengerInboundService, MessengerInboundServiceConfig},
suffix::MessengerCandidate,
talos_messenger_service::TalosMessengerService,
};
Expand Down Expand Up @@ -93,6 +93,10 @@ pub struct Configuration {
pub allowed_actions: HashMap<String, Vec<String>>,
/// Channel buffer size for the internal channels between threads
pub channel_buffers: Option<ChannelBuffers>,
/// Commit size to decide how often the certifier topic can be committed by the consumer.
/// The more often the commit is done has inverse impact on the latency.
/// Defaults to 5_000.
pub commit_size: Option<u32>,
}

pub async fn messenger_with_kafka(config: Configuration) -> MessengerServiceResult {
Expand All @@ -112,14 +116,8 @@ pub async fn messenger_with_kafka(config: Configuration) -> MessengerServiceResu

// START - Inbound service
let suffix: Suffix<MessengerCandidate> = Suffix::with_config(config.suffix_config.unwrap_or_default());

let inbound_service = MessengerInboundService {
message_receiver: kafka_consumer,
tx_actions_channel,
rx_feedback_channel,
suffix,
allowed_actions: config.allowed_actions,
};
let inbound_service_config = MessengerInboundServiceConfig::new(config.allowed_actions, config.commit_size);
let inbound_service = MessengerInboundService::new(kafka_consumer, tx_actions_channel, rx_feedback_channel, suffix, inbound_service_config);
// END - Inbound service

// START - Publish service
Expand Down
Loading

0 comments on commit 5d07b51

Please sign in to comment.