Skip to content

Commit

Permalink
feat: wrapper around the messenger service (#95)
Browse files Browse the repository at this point in the history
  • Loading branch information
gk-kindred authored Nov 24, 2023
1 parent 127d304 commit ab9dfbc
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 103 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 10 additions & 50 deletions examples/messenger_using_kafka/examples/messenger_using_kafka.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,8 @@
use talos_certifier::ports::MessageReciever;
use talos_certifier_adapters::KafkaConsumer;
use talos_common_utils::env_var;
use talos_messenger_actions::kafka::{context::MessengerProducerContext, producer::KafkaProducer, service::KafkaActionService};
use talos_messenger_core::{
services::MessengerInboundService,
suffix::MessengerCandidate,
talos_messenger_service::TalosMessengerService,
utlis::{create_whitelist_actions_from_str, ActionsParserConfig},
};
use talos_messenger_actions::messenger_with_kafka::{messenger_with_kafka, Configuration};
use talos_messenger_core::utlis::{create_whitelist_actions_from_str, ActionsParserConfig};
use talos_rdkafka_utils::kafka_config::KafkaConfig;
use talos_suffix::{core::SuffixConfig, Suffix};
use tokio::sync::mpsc;

use messenger_using_kafka::kafka_producer::MessengerKafkaPublisher;
use talos_suffix::core::SuffixConfig;

#[tokio::main]
async fn main() {
Expand All @@ -21,64 +11,34 @@ async fn main() {
// 0. Create required items.
// a. Create Kafka consumer
let mut kafka_config = KafkaConfig::from_env(None);
kafka_config.group_id = env_var!("TALOS_MESSENGER_KAFKA_GROUP_ID");
// kafka_config.group_id = env_var!("TALOS_MESSENGER_KAFKA_GROUP_ID");
kafka_config.extend(
None,
Some(
[
("group.id".to_string(), env_var!("TALOS_MESSENGER_KAFKA_GROUP_ID")),
("enable.auto.commit".to_string(), "false".to_string()),
("auto.offset.reset".to_string(), "earliest".to_string()),
// ("fetch.wait.max.ms".to_string(), "600".to_string()),
// ("socket.keepalive.enable".to_string(), "true".to_string()),
// ("acks".to_string(), "0".to_string()),
]
.into(),
),
);
let kafka_consumer = KafkaConsumer::new(&kafka_config);

// b. Subscribe to topic.
kafka_consumer.subscribe().await.unwrap();

let (tx_feedback_channel, rx_feedback_channel) = mpsc::channel(10_000);
let (tx_actions_channel, rx_actions_channel) = mpsc::channel(10_000);

let suffix_config = SuffixConfig {
capacity: 400_000,
prune_start_threshold: Some(2_000),
min_size_after_prune: None,
};
let suffix: Suffix<MessengerCandidate> = Suffix::with_config(suffix_config);

let actions_from_env = env_var!("TALOS_MESSENGER_ACTIONS_WHITELIST");
let allowed_actions = create_whitelist_actions_from_str(&actions_from_env, &ActionsParserConfig::default());

let inbound_service = MessengerInboundService {
message_receiver: kafka_consumer,
tx_actions_channel,
rx_feedback_channel,
suffix,
let config = Configuration {
suffix_config: Some(suffix_config),
kafka_config,
allowed_actions,
channel_buffers: None,
};

let tx_feedback_channel_clone = tx_feedback_channel.clone();
let custom_context = MessengerProducerContext {
tx_feedback_channel: tx_feedback_channel_clone,
};
let kafka_producer = KafkaProducer::with_context(&kafka_config, custom_context);
let messenger_kafka_publisher = MessengerKafkaPublisher { publisher: kafka_producer };

let publish_service = KafkaActionService {
publisher: messenger_kafka_publisher.into(),
rx_actions_channel,
tx_feedback_channel,
};

// inbound_service.run().await.unwrap();

let messenger_service = TalosMessengerService {
services: vec![Box::new(inbound_service), Box::new(publish_service)],
};

messenger_service.run().await.unwrap();
messenger_with_kafka(config).await.unwrap();
}
48 changes: 0 additions & 48 deletions examples/messenger_using_kafka/src/kafka_producer.rs

This file was deleted.

1 change: 0 additions & 1 deletion examples/messenger_using_kafka/src/lib.rs

This file was deleted.

9 changes: 6 additions & 3 deletions packages/talos_messenger_actions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ indexmap = { version = "2.0.0", features = ["rayon"] }
ahash = "0.8.3"

talos_certifier = { path = "../talos_certifier", version = "0.2.7-dev" }
talos_certifier_adapters = { path = "../../packages/talos_certifier_adapters", version = "0.2.7-dev" }
talos_rdkafka_utils = { path = "../../packages/talos_rdkafka_utils", version = "0.2.7-dev" }
talos_messenger_core = { path = "../../packages/talos_messenger_core", version = "0.2.7-dev" }
talos_suffix = { path = "../talos_suffix", version = "0.2.7-dev" }
talos_certifier_adapters = { path = "../talos_certifier_adapters", version = "0.2.7-dev" }
talos_common_utils = { path = "../talos_common_utils", version = "0.2.7-dev" }
talos_rdkafka_utils = { path = "../talos_rdkafka_utils", version = "0.2.7-dev" }
talos_messenger_core = { path = "../talos_messenger_core", version = "0.2.7-dev" }


[dev-dependencies]
mockall = { version = "0.11.3" }
Expand Down
4 changes: 3 additions & 1 deletion packages/talos_messenger_actions/src/kafka/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ pub struct MessengerProducerContext {
pub tx_feedback_channel: mpsc::Sender<MessengerChannelFeedback>,
}

impl ClientContext for MessengerProducerContext {}
impl ClientContext for MessengerProducerContext {
const ENABLE_REFRESH_OAUTH_TOKEN: bool = false;
}
impl ProducerContext for MessengerProducerContext {
type DeliveryOpaque = Box<MessengerProducerDeliveryOpaque>;

Expand Down
1 change: 1 addition & 0 deletions packages/talos_messenger_actions/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod kafka;
pub mod messenger_with_kafka;
138 changes: 138 additions & 0 deletions packages/talos_messenger_actions/src/messenger_with_kafka.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use ahash::HashMap;
use async_trait::async_trait;
use log::debug;
use rdkafka::producer::ProducerContext;
use talos_certifier::ports::MessageReciever;
use talos_certifier_adapters::KafkaConsumer;
use talos_messenger_core::{
core::{MessengerPublisher, PublishActionType},
errors::MessengerServiceResult,
services::MessengerInboundService,
suffix::MessengerCandidate,
talos_messenger_service::TalosMessengerService,
};
use talos_rdkafka_utils::kafka_config::KafkaConfig;
use talos_suffix::{core::SuffixConfig, Suffix};
use tokio::sync::mpsc;

use crate::kafka::{
context::{MessengerProducerContext, MessengerProducerDeliveryOpaque},
models::KafkaAction,
producer::KafkaProducer,
service::KafkaActionService,
};

pub struct MessengerKafkaPublisher<C: ProducerContext + 'static> {
pub publisher: KafkaProducer<C>,
}

#[async_trait]
impl<C> MessengerPublisher for MessengerKafkaPublisher<C>
where
C: ProducerContext<DeliveryOpaque = Box<MessengerProducerDeliveryOpaque>> + 'static,
{
type Payload = KafkaAction;
type AdditionalData = u32;
fn get_publish_type(&self) -> PublishActionType {
PublishActionType::Kafka
}

async fn send(&self, version: u64, payload: Self::Payload, headers: HashMap<String, String>, additional_data: Self::AdditionalData) -> () {
debug!("[MessengerKafkaPublisher] Publishing message with payload=\n{payload:#?}");

let mut bytes: Vec<u8> = Vec::new();
serde_json::to_writer(&mut bytes, &payload.value).unwrap();

let payload_str = std::str::from_utf8(&bytes).unwrap();
debug!("[MessengerKafkaPublisher] base_record=\n{payload_str:#?}");

let delivery_opaque = MessengerProducerDeliveryOpaque {
version,
total_publish_count: additional_data,
};

self.publisher
.publish_to_topic(
&payload.topic,
payload.partition,
payload.key.as_deref(),
payload_str,
headers,
Box::new(delivery_opaque),
)
.unwrap();
}
}

pub struct ChannelBuffers {
pub actions_channel: u32,
pub feedback_channel: u32,
}

impl Default for ChannelBuffers {
fn default() -> Self {
Self {
actions_channel: 10_000,
feedback_channel: 10_000,
}
}
}

pub struct Configuration {
/// Configuration required for the In memory suffix
pub suffix_config: Option<SuffixConfig>,
/// Configuration required for the Kafka producer and consumer
pub kafka_config: KafkaConfig,
/// Map of permitted on_commit actions
pub allowed_actions: HashMap<String, Vec<String>>,
/// Channel buffer size for the internal channels between threads
pub channel_buffers: Option<ChannelBuffers>,
}

pub async fn messenger_with_kafka(config: Configuration) -> MessengerServiceResult {
let kafka_consumer = KafkaConsumer::new(&config.kafka_config);

// Subscribe to topic.
kafka_consumer.subscribe().await.unwrap();

let ChannelBuffers {
actions_channel,
feedback_channel,
} = config.channel_buffers.unwrap_or_default();

let (tx_feedback_channel, rx_feedback_channel) = mpsc::channel(feedback_channel as usize);
let (tx_actions_channel, rx_actions_channel) = mpsc::channel(actions_channel as usize);
let tx_feedback_channel_clone = tx_feedback_channel.clone();

// START - Inbound service
let suffix: Suffix<MessengerCandidate> = Suffix::with_config(config.suffix_config.unwrap_or_default());

let inbound_service = MessengerInboundService {
message_receiver: kafka_consumer,
tx_actions_channel,
rx_feedback_channel,
suffix,
allowed_actions: config.allowed_actions,
};
// END - Inbound service

// START - Publish service
let custom_context = MessengerProducerContext {
tx_feedback_channel: tx_feedback_channel_clone,
};
let kafka_producer = KafkaProducer::with_context(&config.kafka_config, custom_context);
let messenger_kafka_publisher = MessengerKafkaPublisher { publisher: kafka_producer };

let publish_service = KafkaActionService {
publisher: messenger_kafka_publisher.into(),
rx_actions_channel,
tx_feedback_channel,
};

// END - Publish service
let messenger_service = TalosMessengerService {
services: vec![Box::new(inbound_service), Box::new(publish_service)],
};

messenger_service.run().await
}

0 comments on commit ab9dfbc

Please sign in to comment.