diff --git a/Cargo.lock b/Cargo.lock index 2cd83656..4e13e3f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2592,8 +2592,10 @@ dependencies = [ "strum 0.25.0", "talos_certifier", "talos_certifier_adapters", + "talos_common_utils", "talos_messenger_core", "talos_rdkafka_utils", + "talos_suffix", "thiserror", "time", "tokio", diff --git a/examples/messenger_using_kafka/examples/messenger_using_kafka.rs b/examples/messenger_using_kafka/examples/messenger_using_kafka.rs index 42e12cc5..edd7668e 100644 --- a/examples/messenger_using_kafka/examples/messenger_using_kafka.rs +++ b/examples/messenger_using_kafka/examples/messenger_using_kafka.rs @@ -1,18 +1,8 @@ -use talos_certifier::ports::MessageReciever; -use talos_certifier_adapters::KafkaConsumer; use talos_common_utils::env_var; -use talos_messenger_actions::kafka::{context::MessengerProducerContext, producer::KafkaProducer, service::KafkaActionService}; -use talos_messenger_core::{ - services::MessengerInboundService, - suffix::MessengerCandidate, - talos_messenger_service::TalosMessengerService, - utlis::{create_whitelist_actions_from_str, ActionsParserConfig}, -}; +use talos_messenger_actions::messenger_with_kafka::{messenger_with_kafka, Configuration}; +use talos_messenger_core::utlis::{create_whitelist_actions_from_str, ActionsParserConfig}; use talos_rdkafka_utils::kafka_config::KafkaConfig; -use talos_suffix::{core::SuffixConfig, Suffix}; -use tokio::sync::mpsc; - -use messenger_using_kafka::kafka_producer::MessengerKafkaPublisher; +use talos_suffix::core::SuffixConfig; #[tokio::main] async fn main() { @@ -21,64 +11,34 @@ async fn main() { // 0. Create required items. // a. Create Kafka consumer let mut kafka_config = KafkaConfig::from_env(None); - kafka_config.group_id = env_var!("TALOS_MESSENGER_KAFKA_GROUP_ID"); + // kafka_config.group_id = env_var!("TALOS_MESSENGER_KAFKA_GROUP_ID"); kafka_config.extend( None, Some( [ + ("group.id".to_string(), env_var!("TALOS_MESSENGER_KAFKA_GROUP_ID")), ("enable.auto.commit".to_string(), "false".to_string()), ("auto.offset.reset".to_string(), "earliest".to_string()), - // ("fetch.wait.max.ms".to_string(), "600".to_string()), - // ("socket.keepalive.enable".to_string(), "true".to_string()), - // ("acks".to_string(), "0".to_string()), ] .into(), ), ); - let kafka_consumer = KafkaConsumer::new(&kafka_config); - - // b. Subscribe to topic. - kafka_consumer.subscribe().await.unwrap(); - - let (tx_feedback_channel, rx_feedback_channel) = mpsc::channel(10_000); - let (tx_actions_channel, rx_actions_channel) = mpsc::channel(10_000); let suffix_config = SuffixConfig { capacity: 400_000, prune_start_threshold: Some(2_000), min_size_after_prune: None, }; - let suffix: Suffix = Suffix::with_config(suffix_config); let actions_from_env = env_var!("TALOS_MESSENGER_ACTIONS_WHITELIST"); let allowed_actions = create_whitelist_actions_from_str(&actions_from_env, &ActionsParserConfig::default()); - let inbound_service = MessengerInboundService { - message_receiver: kafka_consumer, - tx_actions_channel, - rx_feedback_channel, - suffix, + let config = Configuration { + suffix_config: Some(suffix_config), + kafka_config, allowed_actions, + channel_buffers: None, }; - let tx_feedback_channel_clone = tx_feedback_channel.clone(); - let custom_context = MessengerProducerContext { - tx_feedback_channel: tx_feedback_channel_clone, - }; - let kafka_producer = KafkaProducer::with_context(&kafka_config, custom_context); - let messenger_kafka_publisher = MessengerKafkaPublisher { publisher: kafka_producer }; - - let publish_service = KafkaActionService { - publisher: messenger_kafka_publisher.into(), - rx_actions_channel, - tx_feedback_channel, - }; - - // inbound_service.run().await.unwrap(); - - let messenger_service = TalosMessengerService { - services: vec![Box::new(inbound_service), Box::new(publish_service)], - }; - - messenger_service.run().await.unwrap(); + messenger_with_kafka(config).await.unwrap(); } diff --git a/examples/messenger_using_kafka/src/kafka_producer.rs b/examples/messenger_using_kafka/src/kafka_producer.rs deleted file mode 100644 index e7f83202..00000000 --- a/examples/messenger_using_kafka/src/kafka_producer.rs +++ /dev/null @@ -1,48 +0,0 @@ -use ahash::HashMap; -use async_trait::async_trait; -use log::info; -use rdkafka::producer::ProducerContext; -use talos_messenger_actions::kafka::{context::MessengerProducerDeliveryOpaque, models::KafkaAction, producer::KafkaProducer}; -use talos_messenger_core::core::{MessengerPublisher, PublishActionType}; - -pub struct MessengerKafkaPublisher { - pub publisher: KafkaProducer, -} - -#[async_trait] -impl MessengerPublisher for MessengerKafkaPublisher -where - C: ProducerContext> + 'static, -{ - type Payload = KafkaAction; - type AdditionalData = u32; - fn get_publish_type(&self) -> PublishActionType { - PublishActionType::Kafka - } - - async fn send(&self, version: u64, payload: Self::Payload, headers: HashMap, additional_data: Self::AdditionalData) -> () { - info!("[MessengerKafkaPublisher] Publishing message with payload=\n{payload:#?}"); - - let mut bytes: Vec = Vec::new(); - serde_json::to_writer(&mut bytes, &payload.value).unwrap(); - - let payload_str = std::str::from_utf8(&bytes).unwrap(); - info!("[MessengerKafkaPublisher] base_record=\n{payload_str:#?}"); - - let delivery_opaque = MessengerProducerDeliveryOpaque { - version, - total_publish_count: additional_data, - }; - - self.publisher - .publish_to_topic( - &payload.topic, - payload.partition, - payload.key.as_deref(), - payload_str, - headers, - Box::new(delivery_opaque), - ) - .unwrap(); - } -} diff --git a/examples/messenger_using_kafka/src/lib.rs b/examples/messenger_using_kafka/src/lib.rs deleted file mode 100644 index 54b94503..00000000 --- a/examples/messenger_using_kafka/src/lib.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod kafka_producer; diff --git a/packages/talos_messenger_actions/Cargo.toml b/packages/talos_messenger_actions/Cargo.toml index 8cf78062..6c9598b9 100644 --- a/packages/talos_messenger_actions/Cargo.toml +++ b/packages/talos_messenger_actions/Cargo.toml @@ -36,9 +36,12 @@ indexmap = { version = "2.0.0", features = ["rayon"] } ahash = "0.8.3" talos_certifier = { path = "../talos_certifier", version = "0.2.7-dev" } -talos_certifier_adapters = { path = "../../packages/talos_certifier_adapters", version = "0.2.7-dev" } -talos_rdkafka_utils = { path = "../../packages/talos_rdkafka_utils", version = "0.2.7-dev" } -talos_messenger_core = { path = "../../packages/talos_messenger_core", version = "0.2.7-dev" } +talos_suffix = { path = "../talos_suffix", version = "0.2.7-dev" } +talos_certifier_adapters = { path = "../talos_certifier_adapters", version = "0.2.7-dev" } +talos_common_utils = { path = "../talos_common_utils", version = "0.2.7-dev" } +talos_rdkafka_utils = { path = "../talos_rdkafka_utils", version = "0.2.7-dev" } +talos_messenger_core = { path = "../talos_messenger_core", version = "0.2.7-dev" } + [dev-dependencies] mockall = { version = "0.11.3" } diff --git a/packages/talos_messenger_actions/src/kafka/context.rs b/packages/talos_messenger_actions/src/kafka/context.rs index 7f187904..dc4178ae 100644 --- a/packages/talos_messenger_actions/src/kafka/context.rs +++ b/packages/talos_messenger_actions/src/kafka/context.rs @@ -15,7 +15,9 @@ pub struct MessengerProducerContext { pub tx_feedback_channel: mpsc::Sender, } -impl ClientContext for MessengerProducerContext {} +impl ClientContext for MessengerProducerContext { + const ENABLE_REFRESH_OAUTH_TOKEN: bool = false; +} impl ProducerContext for MessengerProducerContext { type DeliveryOpaque = Box; diff --git a/packages/talos_messenger_actions/src/lib.rs b/packages/talos_messenger_actions/src/lib.rs index b17877c5..6df96259 100644 --- a/packages/talos_messenger_actions/src/lib.rs +++ b/packages/talos_messenger_actions/src/lib.rs @@ -1 +1,2 @@ pub mod kafka; +pub mod messenger_with_kafka; diff --git a/packages/talos_messenger_actions/src/messenger_with_kafka.rs b/packages/talos_messenger_actions/src/messenger_with_kafka.rs new file mode 100644 index 00000000..59062220 --- /dev/null +++ b/packages/talos_messenger_actions/src/messenger_with_kafka.rs @@ -0,0 +1,138 @@ +use ahash::HashMap; +use async_trait::async_trait; +use log::debug; +use rdkafka::producer::ProducerContext; +use talos_certifier::ports::MessageReciever; +use talos_certifier_adapters::KafkaConsumer; +use talos_messenger_core::{ + core::{MessengerPublisher, PublishActionType}, + errors::MessengerServiceResult, + services::MessengerInboundService, + suffix::MessengerCandidate, + talos_messenger_service::TalosMessengerService, +}; +use talos_rdkafka_utils::kafka_config::KafkaConfig; +use talos_suffix::{core::SuffixConfig, Suffix}; +use tokio::sync::mpsc; + +use crate::kafka::{ + context::{MessengerProducerContext, MessengerProducerDeliveryOpaque}, + models::KafkaAction, + producer::KafkaProducer, + service::KafkaActionService, +}; + +pub struct MessengerKafkaPublisher { + pub publisher: KafkaProducer, +} + +#[async_trait] +impl MessengerPublisher for MessengerKafkaPublisher +where + C: ProducerContext> + 'static, +{ + type Payload = KafkaAction; + type AdditionalData = u32; + fn get_publish_type(&self) -> PublishActionType { + PublishActionType::Kafka + } + + async fn send(&self, version: u64, payload: Self::Payload, headers: HashMap, additional_data: Self::AdditionalData) -> () { + debug!("[MessengerKafkaPublisher] Publishing message with payload=\n{payload:#?}"); + + let mut bytes: Vec = Vec::new(); + serde_json::to_writer(&mut bytes, &payload.value).unwrap(); + + let payload_str = std::str::from_utf8(&bytes).unwrap(); + debug!("[MessengerKafkaPublisher] base_record=\n{payload_str:#?}"); + + let delivery_opaque = MessengerProducerDeliveryOpaque { + version, + total_publish_count: additional_data, + }; + + self.publisher + .publish_to_topic( + &payload.topic, + payload.partition, + payload.key.as_deref(), + payload_str, + headers, + Box::new(delivery_opaque), + ) + .unwrap(); + } +} + +pub struct ChannelBuffers { + pub actions_channel: u32, + pub feedback_channel: u32, +} + +impl Default for ChannelBuffers { + fn default() -> Self { + Self { + actions_channel: 10_000, + feedback_channel: 10_000, + } + } +} + +pub struct Configuration { + /// Configuration required for the In memory suffix + pub suffix_config: Option, + /// Configuration required for the Kafka producer and consumer + pub kafka_config: KafkaConfig, + /// Map of permitted on_commit actions + pub allowed_actions: HashMap>, + /// Channel buffer size for the internal channels between threads + pub channel_buffers: Option, +} + +pub async fn messenger_with_kafka(config: Configuration) -> MessengerServiceResult { + let kafka_consumer = KafkaConsumer::new(&config.kafka_config); + + // Subscribe to topic. + kafka_consumer.subscribe().await.unwrap(); + + let ChannelBuffers { + actions_channel, + feedback_channel, + } = config.channel_buffers.unwrap_or_default(); + + let (tx_feedback_channel, rx_feedback_channel) = mpsc::channel(feedback_channel as usize); + let (tx_actions_channel, rx_actions_channel) = mpsc::channel(actions_channel as usize); + let tx_feedback_channel_clone = tx_feedback_channel.clone(); + + // START - Inbound service + let suffix: Suffix = Suffix::with_config(config.suffix_config.unwrap_or_default()); + + let inbound_service = MessengerInboundService { + message_receiver: kafka_consumer, + tx_actions_channel, + rx_feedback_channel, + suffix, + allowed_actions: config.allowed_actions, + }; + // END - Inbound service + + // START - Publish service + let custom_context = MessengerProducerContext { + tx_feedback_channel: tx_feedback_channel_clone, + }; + let kafka_producer = KafkaProducer::with_context(&config.kafka_config, custom_context); + let messenger_kafka_publisher = MessengerKafkaPublisher { publisher: kafka_producer }; + + let publish_service = KafkaActionService { + publisher: messenger_kafka_publisher.into(), + rx_actions_channel, + tx_feedback_channel, + }; + + // END - Publish service + let messenger_service = TalosMessengerService { + services: vec![Box::new(inbound_service), Box::new(publish_service)], + }; + + messenger_service.run().await +}