Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: messenger improve latency #108

Merged
merged 36 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
56f4a88
feat: add timing to state transitions in messenger
gk-kindred Oct 8, 2024
034485c
chore: make tokio select biased for inbound service
gk-kindred Oct 11, 2024
e43eb6c
chore: inbound service, process next actions even in candidate messag…
gk-kindred Oct 11, 2024
b40017e
chore: flip the arms in biased tokio::select for inbound service
gk-kindred Oct 18, 2024
544fe9c
chore: make arm for feedback to run only if there is Some feedback
gk-kindred Oct 21, 2024
e264eb2
chore: use talos version - biased arms flipped - print time spend in …
gk-kindred Oct 21, 2024
163e24e
chore: remove tokio::select and use mpsc channel try_recv
gk-kindred Oct 21, 2024
9a87c85
chore: comment out the printing of stats in inbound service for try_r…
gk-kindred Oct 21, 2024
b79569f
chore: remove the pruning and commit check for every feedback
gk-kindred Oct 21, 2024
d2af214
chore: use tokio select without biased and print useful metrics
gk-kindred Oct 22, 2024
a6ceff4
chore: optimize suffix operations - getting items to process and upda…
gk-kindred Oct 22, 2024
407b3d9
chore: reduce the prune_start_threshold
gk-kindred Oct 23, 2024
eb14fd3
chore: commit and prune logic kicks in after 10_000 completed actions
gk-kindred Oct 23, 2024
826ca48
chore: commit and prune logic kicks in when prune_index is above the …
gk-kindred Oct 23, 2024
91b9efe
chore: call process_next_actions in 10ms interval
gk-kindred Oct 23, 2024
e9fab0a
chore: reduce the interval to 5ms
gk-kindred Oct 23, 2024
004523f
chore: use take_while to update prune_index
gk-kindred Oct 28, 2024
05b62a9
chore: clean up
gk-kindred Nov 1, 2024
7708385
chore: pass topic creation configs to create topic bin
gk-kindred Nov 1, 2024
da92edf
chore: print topic configs passed to create_topic
gk-kindred Nov 3, 2024
100f869
chore: print time taken between publish and receive acks
gk-kindred Nov 4, 2024
8a5cb34
chore: print time taken for building the list of actions to process
gk-kindred Nov 6, 2024
c1b1f94
chore: update prune index at the end of processing next actions
gk-kindred Nov 6, 2024
ceac5b0
chore: print the timing from decision create to on_commit publish
gk-kindred Nov 6, 2024
4dcecb6
chore: print timestamps
gk-kindred Nov 6, 2024
b47f123
chore: update prune index only when completed
gk-kindred Nov 7, 2024
10d0886
chore: comment printing messages
gk-kindred Nov 7, 2024
d1ca56c
chore: final refactoring - part 1
gk-kindred Nov 11, 2024
3f46a8a
chore: final refactoring - part 2
gk-kindred Nov 11, 2024
f5461be
chore: prune check and commit more frequently
gk-kindred Nov 11, 2024
5e0cb0b
chore: reduce commit frequency but do prune check often
gk-kindred Nov 11, 2024
38496b7
chore: introduce config for inbound service and commit based on the c…
gk-kindred Nov 11, 2024
111b266
chore: set the commit_size to 2000
gk-kindred Nov 11, 2024
518a4a7
chore: prune logic only when above the threshold
gk-kindred Nov 11, 2024
88ca2cc
chore: update the default commit_size to 5K records
gk-kindred Nov 11, 2024
67140e1
chore: updates from review comments
gk-kindred Nov 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,11 @@ BANK_STATEMAP_INSTALL_RETRY_WAIT_MS=2
# Messenger environment variables
TALOS_MESSENGER_KAFKA_GROUP_ID="talos-messenger-dev"
TALOS_MESSENGER_ACTIONS_WHITELIST="publish:kafka"
# ### Talos Messenger Env variables (end) #############################
# ### Talos Messenger Env variables (end) #############################

#
# ### Configs used for topic creation (start) #############################
# KAFKA_CREATE_TOPIC_PARTITIONS=1
# KAFKA_CREATE_TOPIC_REPLICATION_COUNT=3
KAFKA_CREATE_TOPIC_CONFIGS="retention.ms=3600000, message.timestamp.type=LogAppendTime"
# ### Configs used for topic creation (end) #############################
34 changes: 31 additions & 3 deletions examples/certifier_kafka_pg/examples/kafka_create_topic.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,38 @@
use talos_certifier_adapters::kafka::kafka_deploy::{create_topic, KafkaDeployError, KafkaDeployStatus};
use std::collections::HashMap;

use talos_certifier_adapters::kafka::kafka_deploy::{create_topic, CreateTopicConfigs, KafkaDeployError, KafkaDeployStatus};
use talos_common_utils::env_var_with_defaults;
use talos_rdkafka_utils::kafka_config::KafkaConfig;

#[tokio::main]
async fn main() -> Result<(), KafkaDeployError> {
println!("deploying kafka...");
println!("Creating kafka topic...");

let kafka_config = KafkaConfig::from_env(None);

let replication_factor = env_var_with_defaults!("KAFKA_CREATE_TOPIC_REPLICATION_COUNT", Option::<i32>, 3);
let num_partitions = env_var_with_defaults!("KAFKA_CREATE_TOPIC_PARTITIONS", Option::<i32>, 1);

// eg: KAFKA_CREATE_TOPIC_CONFIGS="retention.ms=3600000,"
let config_option = env_var_with_defaults!("KAFKA_CREATE_TOPIC_CONFIGS", Option::<String>);

let mut config: HashMap<&str, &str> = HashMap::new();

let config_string = config_option.unwrap_or("".to_owned());
config_string.trim().split(',').for_each(|c| {
if let Some((k, v)) = c.trim().split_once('=') {
config.insert(k, v);
}
});

let topic_config = CreateTopicConfigs {
topic: kafka_config.topic.clone(),
config,
replication_factor,
num_partitions,
};

let status = create_topic().await?;
let status = create_topic(&kafka_config, topic_config).await?;

match status {
KafkaDeployStatus::TopicExists => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ async fn main() {
kafka_config,
allowed_actions,
channel_buffers: None,
commit_size: Some(2_000),
};

messenger_with_kafka(config).await.unwrap();
Expand Down
47 changes: 32 additions & 15 deletions packages/talos_certifier_adapters/src/kafka/kafka_deploy.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::Duration;
use std::{collections::HashMap, time::Duration};

use rdkafka::{
admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
Expand All @@ -7,7 +7,7 @@ use rdkafka::{
error::KafkaError,
types::RDKafkaErrorCode,
};
use talos_common_utils::env_var_with_defaults;

use talos_rdkafka_utils::kafka_config::KafkaConfig;
use thiserror::Error as ThisError;

Expand All @@ -24,26 +24,45 @@ pub enum KafkaDeployError {
KafkaError(#[from] KafkaError),
}

pub async fn create_topic() -> Result<KafkaDeployStatus, KafkaDeployError> {
let kafka_config = KafkaConfig::from_env(None);
println!("kafka configs received from env... {kafka_config:#?}");
#[derive(Debug, Clone)]
pub struct CreateTopicConfigs<'a> {
/// topic to create.
pub topic: String,
/// Topic specific configs.
///
/// see: https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html
pub config: HashMap<&'a str, &'a str>,
/// Replication count for partitions in topic. Defaults to 3.
pub replication_factor: Option<i32>,
/// Number of paritions for the topic. Defaults to 1.
pub num_partitions: Option<i32>,
}

const DEFAULT_REPLICATION_FACTOR: i32 = 3;
const DEFAULT_NUM_PARTITIONS: i32 = 3;

pub async fn create_topic(kafka_config: &KafkaConfig, topic_configs: CreateTopicConfigs<'_>) -> Result<KafkaDeployStatus, KafkaDeployError> {
println!("kafka brokers = {:?} and usename = {}", kafka_config.brokers, kafka_config.username);
println!("topic configs received from env = {topic_configs:#?}");
let consumer: StreamConsumer = kafka_config.build_consumer_config().create()?;

let kafka_certification_topic = kafka_config.topic.to_string();
let timeout = Duration::from_secs(1);
let timeout = Duration::from_secs(5);
let metadata = consumer
.fetch_metadata(Some(&kafka_certification_topic), timeout)
.fetch_metadata(Some(&topic_configs.topic), timeout)
.expect("Fetching topic metadata failed");

if !metadata.topics().is_empty() && !metadata.topics()[0].partitions().is_empty() {
Ok(KafkaDeployStatus::TopicExists)
} else {
println!("Topic does not exist, creating...");

let config: Vec<(&str, &str)> = topic_configs.config.into_iter().collect();

let topic = NewTopic {
name: &kafka_certification_topic,
num_partitions: env_var_with_defaults!("KAFKA_CREATE_TOPIC_PARTITIONS", i32, 1),
replication: TopicReplication::Fixed(1),
config: vec![("message.timestamp.type", "LogAppendTime")],
name: &topic_configs.topic,
num_partitions: topic_configs.num_partitions.unwrap_or(DEFAULT_NUM_PARTITIONS),
replication: TopicReplication::Fixed(topic_configs.replication_factor.unwrap_or(DEFAULT_REPLICATION_FACTOR)),
config,
};

let opts = AdminOptions::new().operation_timeout(Some(timeout));
Expand All @@ -52,9 +71,7 @@ pub async fn create_topic() -> Result<KafkaDeployStatus, KafkaDeployError> {

let results = admin.create_topics(&[topic], &opts).await?;

results[0]
.as_ref()
.map_err(|e| KafkaDeployError::TopicCreation(kafka_certification_topic, e.1))?;
results[0].as_ref().map_err(|e| KafkaDeployError::TopicCreation(topic_configs.topic, e.1))?;

Ok(KafkaDeployStatus::TopicCreated)
}
Expand Down
1 change: 1 addition & 0 deletions packages/talos_messenger_actions/src/kafka/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ impl ProducerContext for MessengerProducerContext {
match result {
Ok(msg) => {
info!("Message {:?} {:?}", msg.key(), msg.offset());

// Safe to ignore error check, as error occurs only if receiver is closed or dropped, which would happen if the thread receving has errored. In such a scenario, the publisher thread would also shutdown.
if let Err(error) = block_on(self.tx_feedback_channel.send(MessengerChannelFeedback::Success(version, "kafka".to_string()))) {
error!("[Messenger Producer Context] Error sending feedback for version={version} with error={error:?}");
Expand Down
7 changes: 6 additions & 1 deletion packages/talos_messenger_actions/src/kafka/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ use std::sync::Arc;
use async_trait::async_trait;
use futures_util::future::join_all;
use log::{debug, error, info};
use time::{format_description::well_known::Rfc3339, OffsetDateTime};
use tokio::sync::mpsc;

use talos_messenger_core::{
core::{MessengerChannelFeedback, MessengerCommitActions, MessengerPublisher, MessengerSystemService},
errors::MessengerServiceResult,
suffix::MessengerStateTransitionTimestamps,
utlis::get_actions_deserialised,
};

Expand Down Expand Up @@ -49,7 +51,10 @@ where

let publish_vec = actions.into_iter().map(|action| {
let publisher = self.publisher.clone();
let headers = headers_cloned.clone();
let mut headers = headers_cloned.clone();
let timestamp = OffsetDateTime::now_utc().format(&Rfc3339).ok().unwrap();

headers.insert(MessengerStateTransitionTimestamps::EndOnCommitActions.to_string(), timestamp);
async move {
publisher.send(version, action, headers, total_len ).await;
}
Expand Down
16 changes: 7 additions & 9 deletions packages/talos_messenger_actions/src/messenger_with_kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use talos_certifier_adapters::KafkaConsumer;
use talos_messenger_core::{
core::{MessengerPublisher, PublishActionType},
errors::MessengerServiceResult,
services::MessengerInboundService,
services::{MessengerInboundService, MessengerInboundServiceConfig},
suffix::MessengerCandidate,
talos_messenger_service::TalosMessengerService,
};
Expand Down Expand Up @@ -93,6 +93,10 @@ pub struct Configuration {
pub allowed_actions: HashMap<String, Vec<String>>,
/// Channel buffer size for the internal channels between threads
pub channel_buffers: Option<ChannelBuffers>,
/// Commit size to decide how often the certifier topic can be committed by the consumer.
/// The more ofter the commit is done has inverse impact on the latency.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A small typo

/// Defaults to 1_000.
pub commit_size: Option<u32>,
}

pub async fn messenger_with_kafka(config: Configuration) -> MessengerServiceResult {
Expand All @@ -112,14 +116,8 @@ pub async fn messenger_with_kafka(config: Configuration) -> MessengerServiceResu

// START - Inbound service
let suffix: Suffix<MessengerCandidate> = Suffix::with_config(config.suffix_config.unwrap_or_default());

let inbound_service = MessengerInboundService {
message_receiver: kafka_consumer,
tx_actions_channel,
rx_feedback_channel,
suffix,
allowed_actions: config.allowed_actions,
};
let inbound_service_config = MessengerInboundServiceConfig::new(config.allowed_actions, config.commit_size);
let inbound_service = MessengerInboundService::new(kafka_consumer, tx_actions_channel, rx_feedback_channel, suffix, inbound_service_config);
// END - Inbound service

// START - Publish service
Expand Down
Loading
Loading