Skip to content

Commit

Permalink
feat: built whitelist actions using env variables
Browse files Browse the repository at this point in the history
  • Loading branch information
gk-kindred committed Oct 4, 2023
1 parent 6502fad commit 34f9011
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 30 deletions.
5 changes: 4 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,8 @@ BANK_REPLICATOR_KAFKA_GROUP_ID="talos-replicator-dev"
BANK_STATEMAP_INSTALLER_MAX_RETRY=5
BANK_STATEMAP_INSTALL_RETRY_WAIT_MS=2

# ### Talos Messenger Env variables (start) #############################
# Messenger environment variables
TALOS_MESSENGER_KAFKA_GROUP_ID="talos-messenger-dev"
TALOS_MESSENGER_KAFKA_GROUP_ID="talos-messenger-dev"
TALOS_MESSENGER_ACTIONS_WHITELIST="publish:kafka"
# ### Talos Messenger Env variables (end) #############################
18 changes: 9 additions & 9 deletions examples/messenger_using_kafka/examples/messenger_using_kafka.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use ahash::{HashMap, HashMapExt};
use talos_certifier::ports::MessageReciever;
use talos_certifier_adapters::KafkaConsumer;
use talos_common_utils::env_var;
use talos_messenger_actions::kafka::{context::MessengerProducerContext, producer::KafkaProducer, service::KafkaActionService};
use talos_messenger_core::{services::MessengerInboundService, suffix::MessengerCandidate, talos_messenger_service::TalosMessengerService};
use talos_messenger_core::{
services::MessengerInboundService,
suffix::MessengerCandidate,
talos_messenger_service::TalosMessengerService,
utlis::{create_whitelist_actions_from_str, ActionsParserConfig},
};
use talos_rdkafka_utils::kafka_config::KafkaConfig;
use talos_suffix::{core::SuffixConfig, Suffix};
use tokio::sync::mpsc;
Expand Down Expand Up @@ -46,21 +50,17 @@ async fn main() {
};
let suffix: Suffix<MessengerCandidate> = Suffix::with_config(suffix_config);

let mut whitelisted_actions = HashMap::<&'static str, Vec<&'static str>>::new();
// TODO: GK - Set through env
whitelisted_actions.insert("publish", vec!["kafka"]);
let actions_from_env = env_var!("TALOS_MESSENGER_ACTIONS_WHITELIST");
let allowed_actions = create_whitelist_actions_from_str(&actions_from_env, &ActionsParserConfig::default());

let inbound_service = MessengerInboundService {
message_receiver: kafka_consumer,
tx_actions_channel,
rx_feedback_channel,
suffix,
allowed_actions: whitelisted_actions,
allowed_actions,
};

// TODO: GK - create topic should be part of publish.
kafka_config.topic = "test.messenger.topic".to_string();

let tx_feedback_channel_clone = tx_feedback_channel.clone();
let custom_context = MessengerProducerContext {
tx_feedback_channel: tx_feedback_channel_clone,
Expand Down
4 changes: 2 additions & 2 deletions packages/talos_messenger_actions/src/kafka/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ impl ProducerContext for MessengerProducerContext {
match result {
Ok(msg) => {
info!("Message {:?} {:?}", msg.key(), msg.offset());
// TODO: GK - what to do on error? Panic?
// Safe to ignore error check, as error occurs only if receiver is closed or dropped, which would happen if the thread receving has errored. In such a scenario, the publisher thread would also shutdown.
let _ = block_on(self.tx_feedback_channel.send(MessengerChannelFeedback::Success(
version,
"kafka".to_string(),
delivery_opaque.total_publish_count,
)));
}
Err(err) => {
// TODO: GK - what to do on error? Panic?
// Safe to ignore error check, as error occurs only if receiver is closed or dropped, which would happen if the thread receving has errored. In such a scenario, the publisher thread would also shutdown.
let _ = block_on(self.tx_feedback_channel.send(MessengerChannelFeedback::Error(version, err.0.to_string())));
}
}
Expand Down
2 changes: 2 additions & 0 deletions packages/talos_messenger_actions/src/kafka/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use serde_json::{self};
#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
pub struct KafkaAction {
// TODO: GK - Add additional Kafka producer related props here.
// pub version: u32,
// pub cluster: String,
pub topic: String,
pub key: Option<String>,
pub partition: Option<i32>,
Expand Down
6 changes: 2 additions & 4 deletions packages/talos_messenger_actions/src/kafka/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,13 @@ where
Some(actions) = self.rx_actions_channel.recv() => {
let MessengerCommitActions {version, commit_actions } = actions;

if let Some(publish_actions_for_type) = commit_actions.get(&self.publisher.get_publish_type().to_string())
// TODO: GK - Make this block generic in next ticket to iterator in loop by PublishActionType
{
if let Some(publish_actions_for_type) = commit_actions.get(&self.publisher.get_publish_type().to_string()){
match get_actions_deserialised::<Vec<KafkaAction>>(publish_actions_for_type) {
Ok(actions) => {

let total_len = actions.len() as u32;
for action in actions {
// Send to Kafka
// Publish the message
self.publisher.send(version, action, total_len ).await;

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ where
pub tx_actions_channel: mpsc::Sender<MessengerCommitActions>,
pub rx_feedback_channel: mpsc::Receiver<MessengerChannelFeedback>,
pub suffix: Suffix<MessengerCandidate>,
pub allowed_actions: HashMap<&'static str, Vec<&'static str>>,
pub allowed_actions: HashMap<String, Vec<String>>,
}

impl<M> MessengerInboundService<M>
Expand Down
41 changes: 34 additions & 7 deletions packages/talos_messenger_core/src/tests/utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,25 @@
use ahash::{HashMap, HashMapExt};
use serde_json::{json, Value};

use crate::utlis::{get_actions_deserialised, get_allowed_commit_actions};
use crate::utlis::{create_whitelist_actions_from_str, get_actions_deserialised, get_allowed_commit_actions, ActionsParserConfig};

// Start - testing create_whitelist_actions_from_str function
#[test]
fn test_fn_create_whitelist_actions_from_str() {
let config = ActionsParserConfig {
case_sensitive: false,
key_value_delimiter: ":",
};

let actions_str = "foo:test, foo:test2, bar,FOO:test3";

let action_map = create_whitelist_actions_from_str(actions_str, &config);

assert_eq!(action_map.len(), 2);
assert_eq!(action_map.get("foo").unwrap().len(), 3);
assert!(action_map.contains_key("bar"));
}
// End - testing create_whitelist_actions_from_str function

// Start - testing get_allowed_commit_actions function
#[test]
Expand Down Expand Up @@ -38,33 +56,39 @@ fn test_fn_get_allowed_commit_actions_allowed_actions_negative_scenarios() {

// When allowed action is supported type by the messenger, but the sub actions are not provided
allowed_actions.clear();
allowed_actions.insert("publish", vec![]);
allowed_actions.insert("publish".to_string(), vec![]);
let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions);
assert!(result.is_empty());

// When allowed action is supported type by the messenger, but the sub actions are not supported
allowed_actions.clear();
allowed_actions.insert("publish", vec!["sqs", "sns"]);
allowed_actions.insert("publish".to_string(), vec!["sqs".to_string(), "sns".to_string()]);
let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions);
assert!(result.is_empty());

// When allowed action is non supported type by the messenger, with empty sub type
allowed_actions.clear();
allowed_actions.insert("random", vec![]);
allowed_actions.insert("random".to_string(), vec![]);
let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions);
assert!(result.is_empty());

// When allowed action is non supported type by the messenger, but has valid sub actions
allowed_actions.clear();
allowed_actions.insert("random", vec!["sqs", "sns", "kafka", "mqtt"]);
allowed_actions.insert(
"random".to_string(),
vec!["sqs".to_string(), "sns".to_string(), "kafka".to_string(), "mqtt".to_string()],
);
let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions);
assert!(result.is_empty());
}

#[test]
fn test_fn_get_allowed_commit_actions_on_commit_action_negative_scenarios() {
let mut allowed_actions = HashMap::new();
allowed_actions.insert("publish", vec!["sqs", "sns", "kafka", "mqtt"]);
allowed_actions.insert(
"publish".to_string(),
vec!["sqs".to_string(), "sns".to_string(), "kafka".to_string(), "mqtt".to_string()],
);

// When on_commit_actions are not present
let on_commit_actions = serde_json::json!({});
Expand Down Expand Up @@ -153,7 +177,10 @@ fn test_fn_get_allowed_commit_actions_positive_scenario() {
}
});

allowed_actions.insert("publish", vec!["sqs", "sns", "kafka", "mqtt"]);
allowed_actions.insert(
"publish".to_string(),
vec!["sqs".to_string(), "sns".to_string(), "kafka".to_string(), "mqtt".to_string()],
);
let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions);
assert_eq!(result.len(), 2);
}
Expand Down
67 changes: 62 additions & 5 deletions packages/talos_messenger_core/src/utlis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,65 @@ use serde_json::Value;

use crate::{errors::ActionError, suffix::AllowedActionsMapItem};

#[derive(Debug, Clone, Copy)]
pub struct ActionsParserConfig<'a> {
pub case_sensitive: bool,
pub key_value_delimiter: &'a str,
}

impl Default for ActionsParserConfig<'_> {
fn default() -> Self {
Self {
case_sensitive: Default::default(),
key_value_delimiter: ":",
}
}
}

/// Builds the list of items from a comma separated string.
///
///
/// `ActionsParserConfig` can be passed to specify if
/// - The key and value should be case sensitive. (defaults to false)
/// - Key-Value delimiter. (defaults to ':')
pub fn create_whitelist_actions_from_str(whitelist_str: &str, config: &ActionsParserConfig) -> HashMap<String, Vec<String>> {
whitelist_str.split(',').fold(HashMap::new(), |mut acc_map, item| {
// case sensitive check.
let item_cased = if !config.case_sensitive { item.to_lowercase() } else { item.to_string() };
// Has a key-value pair
if let Some((key, value)) = item_cased.trim().split_once(config.key_value_delimiter) {
let key_to_check = if !config.case_sensitive { key.to_lowercase() } else { key.to_owned() };
let key_to_check = key_to_check.trim();
// update existing entry
if let Some(map_item) = acc_map.get_mut(key_to_check) {
// Check for duplicate before inserting
let value_trimmed = value.trim().to_owned();
if !map_item.contains(&value_trimmed) {
map_item.push(value_trimmed)
}
}
// insert new entry
else {
// Empty value will not be inserted
if !value.is_empty() {
acc_map.insert(key.to_owned(), vec![value.to_owned()]);
}
}
}
// just key type.
else {
let insert_key = if config.case_sensitive {
item_cased.to_lowercase()
} else {
item_cased.to_owned()
};
let key_to_check = insert_key.trim();
acc_map.insert(key_to_check.trim().to_owned(), vec![]);
}
acc_map
})
}

/// Retrieves the serde_json::Value for a given key
pub fn get_value_by_key<'a>(value: &'a Value, key: &str) -> Option<&'a Value> {
value.get(key)
Expand All @@ -14,10 +73,7 @@ pub fn get_value_by_key<'a>(value: &'a Value, key: &str) -> Option<&'a Value> {
/// Create a Hashmap of all the actions that require to be actioned by the messenger.
/// Key for the map is the Action type. eg: "kafka", "mqtt" ..etc
/// Value for the map contains the payload and some meta information like items actioned, and is completed flag
pub fn get_allowed_commit_actions(
on_commit_actions: &Value,
allowed_actions: &HashMap<&'static str, Vec<&'static str>>,
) -> HashMap<String, AllowedActionsMapItem> {
pub fn get_allowed_commit_actions(on_commit_actions: &Value, allowed_actions: &HashMap<String, Vec<String>>) -> HashMap<String, AllowedActionsMapItem> {
let mut filtered_actions = HashMap::new();

allowed_actions.iter().for_each(|(action_key, sub_action_keys)| {
Expand Down Expand Up @@ -45,14 +101,15 @@ pub fn get_actions_deserialised<T: DeserializeOwned>(actions: &Value) -> Result<
}
}

pub fn parse_white_list() {}

///// Retrieves the oncommit actions that are supported by the system.
// fn get_allowed_commit_actions(version: &u64, on_commit_actions: &Value) -> Option<OnCommitActions> {
// let Some(publish_actions) = on_commit_actions.get("publish") else {
// warn!("No publish actions found for version={version} in {on_commit_actions}");
// return None;
// };

// // TODO: GK - In future we will need to check if there are other type that we are interested in, and not just Kafka
// match get_sub_actions::<Vec<KafkaAction>>(version, publish_actions, "kafka") {
// Some(kafka_actions) if !kafka_actions.is_empty() => Some(OnCommitActions::Publish(Some(PublishActions::Kafka(kafka_actions)))),
// _ => None,
Expand Down
2 changes: 1 addition & 1 deletion packages/talos_rdkafka_utils/src/kafka_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl KafkaConfig {

self.setup_auth(&mut client_config, base_config);

log::warn!("p: client_config = {:?}", client_config);
log::debug!("p: client_config = {:?}", client_config);

client_config
}
Expand Down

0 comments on commit 34f9011

Please sign in to comment.