Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: wrapper around the messenger service #95

Merged
merged 1 commit into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 10 additions & 50 deletions examples/messenger_using_kafka/examples/messenger_using_kafka.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,8 @@
use talos_certifier::ports::MessageReciever;
use talos_certifier_adapters::KafkaConsumer;
use talos_common_utils::env_var;
use talos_messenger_actions::kafka::{context::MessengerProducerContext, producer::KafkaProducer, service::KafkaActionService};
use talos_messenger_core::{
services::MessengerInboundService,
suffix::MessengerCandidate,
talos_messenger_service::TalosMessengerService,
utlis::{create_whitelist_actions_from_str, ActionsParserConfig},
};
use talos_messenger_actions::messenger_with_kafka::{messenger_with_kafka, Configuration};
use talos_messenger_core::utlis::{create_whitelist_actions_from_str, ActionsParserConfig};
use talos_rdkafka_utils::kafka_config::KafkaConfig;
use talos_suffix::{core::SuffixConfig, Suffix};
use tokio::sync::mpsc;

use messenger_using_kafka::kafka_producer::MessengerKafkaPublisher;
use talos_suffix::core::SuffixConfig;

#[tokio::main]
async fn main() {
Expand All @@ -21,64 +11,34 @@ async fn main() {
// 0. Create required items.
// a. Create Kafka consumer
let mut kafka_config = KafkaConfig::from_env(None);
kafka_config.group_id = env_var!("TALOS_MESSENGER_KAFKA_GROUP_ID");
// kafka_config.group_id = env_var!("TALOS_MESSENGER_KAFKA_GROUP_ID");
kafka_config.extend(
None,
Some(
[
("group.id".to_string(), env_var!("TALOS_MESSENGER_KAFKA_GROUP_ID")),
("enable.auto.commit".to_string(), "false".to_string()),
("auto.offset.reset".to_string(), "earliest".to_string()),
// ("fetch.wait.max.ms".to_string(), "600".to_string()),
// ("socket.keepalive.enable".to_string(), "true".to_string()),
// ("acks".to_string(), "0".to_string()),
]
.into(),
),
);
let kafka_consumer = KafkaConsumer::new(&kafka_config);

// b. Subscribe to topic.
kafka_consumer.subscribe().await.unwrap();

let (tx_feedback_channel, rx_feedback_channel) = mpsc::channel(10_000);
let (tx_actions_channel, rx_actions_channel) = mpsc::channel(10_000);

let suffix_config = SuffixConfig {
capacity: 400_000,
prune_start_threshold: Some(2_000),
min_size_after_prune: None,
};
let suffix: Suffix<MessengerCandidate> = Suffix::with_config(suffix_config);

let actions_from_env = env_var!("TALOS_MESSENGER_ACTIONS_WHITELIST");
let allowed_actions = create_whitelist_actions_from_str(&actions_from_env, &ActionsParserConfig::default());

let inbound_service = MessengerInboundService {
message_receiver: kafka_consumer,
tx_actions_channel,
rx_feedback_channel,
suffix,
let config = Configuration {
suffix_config: Some(suffix_config),
kafka_config,
allowed_actions,
channel_buffers: None,
};

let tx_feedback_channel_clone = tx_feedback_channel.clone();
let custom_context = MessengerProducerContext {
tx_feedback_channel: tx_feedback_channel_clone,
};
let kafka_producer = KafkaProducer::with_context(&kafka_config, custom_context);
let messenger_kafka_publisher = MessengerKafkaPublisher { publisher: kafka_producer };

let publish_service = KafkaActionService {
publisher: messenger_kafka_publisher.into(),
rx_actions_channel,
tx_feedback_channel,
};

// inbound_service.run().await.unwrap();

let messenger_service = TalosMessengerService {
services: vec![Box::new(inbound_service), Box::new(publish_service)],
};

messenger_service.run().await.unwrap();
messenger_with_kafka(config).await.unwrap();
}
48 changes: 0 additions & 48 deletions examples/messenger_using_kafka/src/kafka_producer.rs

This file was deleted.

1 change: 0 additions & 1 deletion examples/messenger_using_kafka/src/lib.rs

This file was deleted.

9 changes: 6 additions & 3 deletions packages/talos_messenger_actions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ indexmap = { version = "2.0.0", features = ["rayon"] }
ahash = "0.8.3"

talos_certifier = { path = "../talos_certifier", version = "0.2.7-dev" }
talos_certifier_adapters = { path = "../../packages/talos_certifier_adapters", version = "0.2.7-dev" }
talos_rdkafka_utils = { path = "../../packages/talos_rdkafka_utils", version = "0.2.7-dev" }
talos_messenger_core = { path = "../../packages/talos_messenger_core", version = "0.2.7-dev" }
talos_suffix = { path = "../talos_suffix", version = "0.2.7-dev" }
talos_certifier_adapters = { path = "../talos_certifier_adapters", version = "0.2.7-dev" }
talos_common_utils = { path = "../talos_common_utils", version = "0.2.7-dev" }
talos_rdkafka_utils = { path = "../talos_rdkafka_utils", version = "0.2.7-dev" }
talos_messenger_core = { path = "../talos_messenger_core", version = "0.2.7-dev" }


[dev-dependencies]
mockall = { version = "0.11.3" }
Expand Down
4 changes: 3 additions & 1 deletion packages/talos_messenger_actions/src/kafka/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ pub struct MessengerProducerContext {
pub tx_feedback_channel: mpsc::Sender<MessengerChannelFeedback>,
}

impl ClientContext for MessengerProducerContext {}
impl ClientContext for MessengerProducerContext {
const ENABLE_REFRESH_OAUTH_TOKEN: bool = false;
}
impl ProducerContext for MessengerProducerContext {
type DeliveryOpaque = Box<MessengerProducerDeliveryOpaque>;

Expand Down
1 change: 1 addition & 0 deletions packages/talos_messenger_actions/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod kafka;
pub mod messenger_with_kafka;
138 changes: 138 additions & 0 deletions packages/talos_messenger_actions/src/messenger_with_kafka.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use ahash::HashMap;
use async_trait::async_trait;
use log::debug;
use rdkafka::producer::ProducerContext;
use talos_certifier::ports::MessageReciever;
use talos_certifier_adapters::KafkaConsumer;
use talos_messenger_core::{
core::{MessengerPublisher, PublishActionType},
errors::MessengerServiceResult,
services::MessengerInboundService,
suffix::MessengerCandidate,
talos_messenger_service::TalosMessengerService,
};
use talos_rdkafka_utils::kafka_config::KafkaConfig;
use talos_suffix::{core::SuffixConfig, Suffix};
use tokio::sync::mpsc;

use crate::kafka::{
context::{MessengerProducerContext, MessengerProducerDeliveryOpaque},
models::KafkaAction,
producer::KafkaProducer,
service::KafkaActionService,
};

pub struct MessengerKafkaPublisher<C: ProducerContext + 'static> {
pub publisher: KafkaProducer<C>,
}

#[async_trait]
impl<C> MessengerPublisher for MessengerKafkaPublisher<C>
where
C: ProducerContext<DeliveryOpaque = Box<MessengerProducerDeliveryOpaque>> + 'static,
{
type Payload = KafkaAction;
type AdditionalData = u32;
fn get_publish_type(&self) -> PublishActionType {
PublishActionType::Kafka
}

async fn send(&self, version: u64, payload: Self::Payload, headers: HashMap<String, String>, additional_data: Self::AdditionalData) -> () {
debug!("[MessengerKafkaPublisher] Publishing message with payload=\n{payload:#?}");

let mut bytes: Vec<u8> = Vec::new();
serde_json::to_writer(&mut bytes, &payload.value).unwrap();

let payload_str = std::str::from_utf8(&bytes).unwrap();
debug!("[MessengerKafkaPublisher] base_record=\n{payload_str:#?}");

let delivery_opaque = MessengerProducerDeliveryOpaque {
version,
total_publish_count: additional_data,
};

self.publisher
.publish_to_topic(
&payload.topic,
payload.partition,
payload.key.as_deref(),
payload_str,
headers,
Box::new(delivery_opaque),
)
.unwrap();
}
}

pub struct ChannelBuffers {
pub actions_channel: u32,
pub feedback_channel: u32,
}

impl Default for ChannelBuffers {
fn default() -> Self {
Self {

Check warning on line 74 in packages/talos_messenger_actions/src/messenger_with_kafka.rs

View check run for this annotation

Codecov / codecov/patch

packages/talos_messenger_actions/src/messenger_with_kafka.rs#L73-L74

Added lines #L73 - L74 were not covered by tests
actions_channel: 10_000,
feedback_channel: 10_000,
}
}

Check warning on line 78 in packages/talos_messenger_actions/src/messenger_with_kafka.rs

View check run for this annotation

Codecov / codecov/patch

packages/talos_messenger_actions/src/messenger_with_kafka.rs#L78

Added line #L78 was not covered by tests
}

pub struct Configuration {
/// Configuration required for the In memory suffix
pub suffix_config: Option<SuffixConfig>,
/// Configuration required for the Kafka producer and consumer
pub kafka_config: KafkaConfig,
/// Map of permitted on_commit actions
pub allowed_actions: HashMap<String, Vec<String>>,
/// Channel buffer size for the internal channels between threads
pub channel_buffers: Option<ChannelBuffers>,
}

pub async fn messenger_with_kafka(config: Configuration) -> MessengerServiceResult {

Check warning on line 92 in packages/talos_messenger_actions/src/messenger_with_kafka.rs

View check run for this annotation

Codecov / codecov/patch

packages/talos_messenger_actions/src/messenger_with_kafka.rs#L92

Added line #L92 was not covered by tests
let kafka_consumer = KafkaConsumer::new(&config.kafka_config);

// Subscribe to topic.
kafka_consumer.subscribe().await.unwrap();

let ChannelBuffers {
actions_channel,
feedback_channel,
} = config.channel_buffers.unwrap_or_default();

let (tx_feedback_channel, rx_feedback_channel) = mpsc::channel(feedback_channel as usize);
let (tx_actions_channel, rx_actions_channel) = mpsc::channel(actions_channel as usize);
let tx_feedback_channel_clone = tx_feedback_channel.clone();

// START - Inbound service
let suffix: Suffix<MessengerCandidate> = Suffix::with_config(config.suffix_config.unwrap_or_default());

let inbound_service = MessengerInboundService {
message_receiver: kafka_consumer,
tx_actions_channel,
rx_feedback_channel,
suffix,
allowed_actions: config.allowed_actions,
};
// END - Inbound service

// START - Publish service
let custom_context = MessengerProducerContext {
tx_feedback_channel: tx_feedback_channel_clone,
};
let kafka_producer = KafkaProducer::with_context(&config.kafka_config, custom_context);
let messenger_kafka_publisher = MessengerKafkaPublisher { publisher: kafka_producer };

let publish_service = KafkaActionService {
publisher: messenger_kafka_publisher.into(),
rx_actions_channel,
tx_feedback_channel,
};

// END - Publish service
let messenger_service = TalosMessengerService {
services: vec![Box::new(inbound_service), Box::new(publish_service)],
};

messenger_service.run().await
}

Check warning on line 138 in packages/talos_messenger_actions/src/messenger_with_kafka.rs

View check run for this annotation

Codecov / codecov/patch

packages/talos_messenger_actions/src/messenger_with_kafka.rs#L138

Added line #L138 was not covered by tests