diff --git a/.env.example b/.env.example index c857df0f..4ae6888e 100644 --- a/.env.example +++ b/.env.example @@ -65,5 +65,8 @@ BANK_REPLICATOR_KAFKA_GROUP_ID="talos-replicator-dev" BANK_STATEMAP_INSTALLER_MAX_RETRY=5 BANK_STATEMAP_INSTALL_RETRY_WAIT_MS=2 +# ### Talos Messenger Env variables (start) ############################# # Messenger environment variables -TALOS_MESSENGER_KAFKA_GROUP_ID="talos-messenger-dev" \ No newline at end of file +TALOS_MESSENGER_KAFKA_GROUP_ID="talos-messenger-dev" +TALOS_MESSENGER_ACTIONS_WHITELIST="publish:kafka" +# ### Talos Messenger Env variables (end) ############################# \ No newline at end of file diff --git a/examples/agent_client/examples/agent_client.rs b/examples/agent_client/examples/agent_client.rs index c051c053..85cfca46 100644 --- a/examples/agent_client/examples/agent_client.rs +++ b/examples/agent_client/examples/agent_client.rs @@ -275,18 +275,15 @@ async fn get_params() -> Result { } } - if stop_type.is_none() { - Err("Parameter --volume is required".into()) - } else if target_rate.is_none() { - Err("Parameter --rate is required".into()) - } else { - Ok(LaunchParams { - target_rate: target_rate.unwrap(), - stop_type: stop_type.unwrap(), - threads: threads.unwrap(), - collect_metrics: collect_metrics.unwrap(), - }) - } + let stop_type = stop_type.ok_or("Parameter --volume is required")?; + let target_rate = target_rate.ok_or("Parameter --rate is required")?; + + Ok(LaunchParams { + target_rate, + stop_type, + threads: threads.unwrap(), + collect_metrics: collect_metrics.unwrap(), + }) } struct RequestGenerator {} diff --git a/examples/cohort_banking_with_sdk/examples/cohort_banking_with_sdk.rs b/examples/cohort_banking_with_sdk/examples/cohort_banking_with_sdk.rs index 6e215c43..b2896fc7 100644 --- a/examples/cohort_banking_with_sdk/examples/cohort_banking_with_sdk.rs +++ b/examples/cohort_banking_with_sdk/examples/cohort_banking_with_sdk.rs @@ -263,22 +263,18 @@ async fn get_params() -> Result { } } - if stop_type.is_none() { - Err("Parameter --volume is required".into()) - } else if accounts.is_none() { - Err("Parameter --accounts is required".into()) - } else if target_rate.is_none() { - Err("Parameter --rate is required".into()) - } else { - Ok(LaunchParams { - target_rate: target_rate.unwrap(), - stop_type: stop_type.unwrap(), - threads: threads.unwrap(), - accounts: accounts.unwrap(), - scaling_config: scaling_config.unwrap_or(HashMap::new()), - metric_print_raw: metric_print_raw.is_some(), - }) - } + let stop_type = stop_type.ok_or("Parameter --volume is required")?; + let target_rate = target_rate.ok_or("Parameter --rate is required")?; + let accounts = accounts.ok_or("Parameter --accounts is required")?; + + Ok(LaunchParams { + target_rate, + stop_type, + threads: threads.unwrap(), + accounts, + scaling_config: scaling_config.unwrap_or_default(), + metric_print_raw: metric_print_raw.is_some(), + }) } struct TransferRequestGenerator { diff --git a/examples/messenger_using_kafka/examples/messenger_using_kafka.rs b/examples/messenger_using_kafka/examples/messenger_using_kafka.rs index d6e1ccff..42e12cc5 100644 --- a/examples/messenger_using_kafka/examples/messenger_using_kafka.rs +++ b/examples/messenger_using_kafka/examples/messenger_using_kafka.rs @@ -1,12 +1,13 @@ -use ahash::{HashMap, HashMapExt}; use talos_certifier::ports::MessageReciever; use talos_certifier_adapters::KafkaConsumer; use talos_common_utils::env_var; -use talos_messenger_actions::kafka::{ - producer::{KafkaProducer, MessengerKafkaProducerContext}, - service::KafkaActionService, +use talos_messenger_actions::kafka::{context::MessengerProducerContext, producer::KafkaProducer, service::KafkaActionService}; +use talos_messenger_core::{ + services::MessengerInboundService, + suffix::MessengerCandidate, + talos_messenger_service::TalosMessengerService, + utlis::{create_whitelist_actions_from_str, ActionsParserConfig}, }; -use talos_messenger_core::{services::MessengerInboundService, suffix::MessengerCandidate, talos_messenger_service::TalosMessengerService}; use talos_rdkafka_utils::kafka_config::KafkaConfig; use talos_suffix::{core::SuffixConfig, Suffix}; use tokio::sync::mpsc; @@ -49,30 +50,26 @@ async fn main() { }; let suffix: Suffix = Suffix::with_config(suffix_config); - let mut whitelisted_actions = HashMap::<&'static str, Vec<&'static str>>::new(); - // TODO: GK - Set through env - whitelisted_actions.insert("publish", vec!["kafka"]); + let actions_from_env = env_var!("TALOS_MESSENGER_ACTIONS_WHITELIST"); + let allowed_actions = create_whitelist_actions_from_str(&actions_from_env, &ActionsParserConfig::default()); let inbound_service = MessengerInboundService { message_receiver: kafka_consumer, tx_actions_channel, rx_feedback_channel, suffix, - allowed_actions: whitelisted_actions, + allowed_actions, }; - // TODO: GK - create topic should be part of publish. - kafka_config.topic = "test.messenger.topic".to_string(); - let tx_feedback_channel_clone = tx_feedback_channel.clone(); - let custom_context = MessengerKafkaProducerContext { + let custom_context = MessengerProducerContext { tx_feedback_channel: tx_feedback_channel_clone, }; let kafka_producer = KafkaProducer::with_context(&kafka_config, custom_context); let messenger_kafka_publisher = MessengerKafkaPublisher { publisher: kafka_producer }; let publish_service = KafkaActionService { - publisher: messenger_kafka_publisher, + publisher: messenger_kafka_publisher.into(), rx_actions_channel, tx_feedback_channel, }; diff --git a/examples/messenger_using_kafka/src/kafka_producer.rs b/examples/messenger_using_kafka/src/kafka_producer.rs index 75f53899..f7d902ee 100644 --- a/examples/messenger_using_kafka/src/kafka_producer.rs +++ b/examples/messenger_using_kafka/src/kafka_producer.rs @@ -1,10 +1,7 @@ use async_trait::async_trait; use log::info; use rdkafka::producer::ProducerContext; -use talos_messenger_actions::kafka::{ - models::KafkaAction, - producer::{KafkaProducer, MessengerProducerDeliveryOpaque}, -}; +use talos_messenger_actions::kafka::{context::MessengerProducerDeliveryOpaque, models::KafkaAction, producer::KafkaProducer}; use talos_messenger_core::core::{MessengerPublisher, PublishActionType}; // use talos_messenger::{ // core::{MessengerPublisher, PublishActionType}, diff --git a/packages/talos_certifier/src/ports/message.rs b/packages/talos_certifier/src/ports/message.rs index 508ab001..9264d0a4 100644 --- a/packages/talos_certifier/src/ports/message.rs +++ b/packages/talos_certifier/src/ports/message.rs @@ -16,7 +16,7 @@ pub trait MessageReciever: SharedPortTraits { async fn subscribe(&self) -> Result<(), SystemServiceError>; async fn commit(&self) -> Result<(), SystemServiceError>; fn commit_async(&self) -> Option>>; - fn update_savepoint(&mut self, offset: i64) -> Result<(), Box>; + fn update_offset_to_commit(&mut self, offset: i64) -> Result<(), Box>; async fn update_savepoint_async(&mut self, offset: i64) -> Result<(), SystemServiceError>; async fn unsubscribe(&self); } diff --git a/packages/talos_certifier/src/services/message_receiver_service.rs b/packages/talos_certifier/src/services/message_receiver_service.rs index 4b9cd345..ee3072de 100644 --- a/packages/talos_certifier/src/services/message_receiver_service.rs +++ b/packages/talos_certifier/src/services/message_receiver_service.rs @@ -84,7 +84,7 @@ impl SystemService for MessageReceiverService { //** commit message _ = self.commit_interval.tick() => { let offset = self.commit_offset.load(std::sync::atomic::Ordering::Relaxed); - self.receiver.update_savepoint(offset)?; + self.receiver.update_offset_to_commit(offset)?; self.receiver.commit_async(); } } diff --git a/packages/talos_certifier/src/services/tests/message_receiver_service.rs b/packages/talos_certifier/src/services/tests/message_receiver_service.rs index 81a920eb..702c6a8f 100644 --- a/packages/talos_certifier/src/services/tests/message_receiver_service.rs +++ b/packages/talos_certifier/src/services/tests/message_receiver_service.rs @@ -47,7 +47,7 @@ impl MessageReciever for MockReciever { fn commit_async(&self) -> Option>> { None } - fn update_savepoint(&mut self, _version: i64) -> Result<(), Box> { + fn update_offset_to_commit(&mut self, _version: i64) -> Result<(), Box> { Ok(()) } async fn update_savepoint_async(&mut self, _version: i64) -> Result<(), SystemServiceError> { diff --git a/packages/talos_certifier_adapters/src/kafka/consumer.rs b/packages/talos_certifier_adapters/src/kafka/consumer.rs index 7f149333..5a7a7ef1 100644 --- a/packages/talos_certifier_adapters/src/kafka/consumer.rs +++ b/packages/talos_certifier_adapters/src/kafka/consumer.rs @@ -206,7 +206,7 @@ impl MessageReciever for KafkaConsumer { } } - fn update_savepoint(&mut self, offset: i64) -> Result<(), Box> { + fn update_offset_to_commit(&mut self, offset: i64) -> Result<(), Box> { // let partition = self.tpl.; let tpl = self.tpl.elements_for_topic(&self.topic); if !tpl.is_empty() { diff --git a/packages/talos_cohort_replicator/src/core.rs b/packages/talos_cohort_replicator/src/core.rs index e349c2ef..fb913910 100644 --- a/packages/talos_cohort_replicator/src/core.rs +++ b/packages/talos_cohort_replicator/src/core.rs @@ -141,7 +141,7 @@ where pub(crate) async fn commit(&mut self) { if let Some(version) = self.next_commit_offset { - self.receiver.update_savepoint(version as i64).unwrap(); + self.receiver.update_offset_to_commit(version as i64).unwrap(); self.receiver.commit_async(); self.next_commit_offset = None; } diff --git a/packages/talos_messenger_actions/src/kafka/context.rs b/packages/talos_messenger_actions/src/kafka/context.rs new file mode 100644 index 00000000..7f187904 --- /dev/null +++ b/packages/talos_messenger_actions/src/kafka/context.rs @@ -0,0 +1,57 @@ +use futures_executor::block_on; +use log::{error, info}; +use rdkafka::{producer::ProducerContext, ClientContext, Message}; +use talos_messenger_core::{core::MessengerChannelFeedback, errors::MessengerActionError}; +use tokio::sync::mpsc; + +#[derive(Debug)] +pub struct MessengerProducerDeliveryOpaque { + pub version: u64, + pub total_publish_count: u32, +} + +#[derive(Debug, Clone)] +pub struct MessengerProducerContext { + pub tx_feedback_channel: mpsc::Sender, +} + +impl ClientContext for MessengerProducerContext {} +impl ProducerContext for MessengerProducerContext { + type DeliveryOpaque = Box; + + fn delivery(&self, delivery_result: &rdkafka::producer::DeliveryResult<'_>, delivery_opaque: Self::DeliveryOpaque) { + let result = delivery_result.as_ref(); + + let version = delivery_opaque.version; + + match result { + Ok(msg) => { + info!("Message {:?} {:?}", msg.key(), msg.offset()); + // Safe to ignore error check, as error occurs only if receiver is closed or dropped, which would happen if the thread receving has errored. In such a scenario, the publisher thread would also shutdown. + if let Err(error) = block_on(self.tx_feedback_channel.send(MessengerChannelFeedback::Success(version, "kafka".to_string()))) { + error!("[Messenger Producer Context] Error sending feedback for version={version} with error={error:?}"); + }; + } + Err((publish_error, borrowed_message)) => { + error!( + "[Messenger Producer Context] Error for version={:?} \nerror={:?}", + delivery_opaque.version, + publish_error.to_string() + ); + let messenger_error = MessengerActionError { + kind: talos_messenger_core::errors::MessengerActionErrorKind::Publishing, + reason: publish_error.to_string(), + data: format!("version={version} message={:#?}", borrowed_message.detach()), + }; + // Safe to ignore error check, as error occurs only if receiver is closed or dropped, which would happen if the thread receving has errored. In such a scenario, the publisher thread would also shutdown. + if let Err(send_error) = block_on(self.tx_feedback_channel.send(MessengerChannelFeedback::Error( + version, + "kafka".to_string(), + Box::new(messenger_error), + ))) { + error!("[Messenger Producer Context] Error sending error feedback for version={version} with \npublish_error={publish_error:?} \nchannel send_error={send_error:?}"); + }; + } + } + } +} diff --git a/packages/talos_messenger_actions/src/kafka/mod.rs b/packages/talos_messenger_actions/src/kafka/mod.rs index 3d5b6a49..1f5827d2 100644 --- a/packages/talos_messenger_actions/src/kafka/mod.rs +++ b/packages/talos_messenger_actions/src/kafka/mod.rs @@ -1,3 +1,4 @@ +pub mod context; pub mod models; pub mod producer; pub mod service; diff --git a/packages/talos_messenger_actions/src/kafka/models.rs b/packages/talos_messenger_actions/src/kafka/models.rs index 4e617bb6..a01f0f2e 100644 --- a/packages/talos_messenger_actions/src/kafka/models.rs +++ b/packages/talos_messenger_actions/src/kafka/models.rs @@ -2,12 +2,41 @@ use ahash::HashMap; use serde::{Deserialize, Serialize}; // 1.0.130 use serde_json::{self}; +fn default_text_plain_encoding() -> String { + "text/plain".to_string() +} + +fn default_application_json_encoding() -> String { + "application/json".to_string() +} + +#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] +pub struct MessengerKafkaActionHeader { + pub key_encoding: String, + pub key: String, + pub value_encoding: String, + pub value: String, +} + #[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] +#[serde(rename_all = "camelCase")] pub struct KafkaAction { - // TODO: GK - Add additional Kafka producer related props here. + #[serde(default)] + pub cluster: String, + /// Topic to publish the payload pub topic: String, + /// Key encoding to be used. Defaults to `text/plain`. + #[serde(default = "default_text_plain_encoding")] + pub key_encoding: String, + /// Key for the message to publish. pub key: Option, + /// Optional if the message should be published to a specific partition. pub partition: Option, + /// Optional headers while publishing. pub headers: Option>, + /// Key encoding to be used. Defaults to `application/json`. + #[serde(default = "default_application_json_encoding")] + pub value_encoding: String, + /// Payload to publish. pub value: serde_json::Value, } diff --git a/packages/talos_messenger_actions/src/kafka/producer.rs b/packages/talos_messenger_actions/src/kafka/producer.rs index 7e1b8e83..20378460 100644 --- a/packages/talos_messenger_actions/src/kafka/producer.rs +++ b/packages/talos_messenger_actions/src/kafka/producer.rs @@ -1,11 +1,10 @@ use std::collections::HashMap; use async_trait::async_trait; -use log::{debug, info}; +use log::debug; use rdkafka::{ config::{FromClientConfig, FromClientConfigAndContext}, producer::{BaseRecord, DefaultProducerContext, ProducerContext, ThreadedProducer}, - ClientContext, Message, }; use talos_certifier::{ errors::SystemServiceError, @@ -13,50 +12,8 @@ use talos_certifier::{ }; use talos_certifier_adapters::kafka::utils::build_kafka_headers; use talos_rdkafka_utils::kafka_config::KafkaConfig; -use tokio::sync::mpsc; - -use futures_executor::block_on; -use talos_messenger_core::core::MessengerChannelFeedback; - -#[derive(Debug)] -pub struct MessengerProducerDeliveryOpaque { - pub version: u64, - pub total_publish_count: u32, -} - -pub struct MessengerKafkaProducerContext { - pub tx_feedback_channel: mpsc::Sender, -} - -impl ClientContext for MessengerKafkaProducerContext {} -impl ProducerContext for MessengerKafkaProducerContext { - type DeliveryOpaque = Box; - - fn delivery(&self, delivery_result: &rdkafka::producer::DeliveryResult<'_>, delivery_opaque: Self::DeliveryOpaque) { - let result = delivery_result.as_ref(); - - let version = delivery_opaque.version; - - match result { - Ok(msg) => { - info!("Message {:?} {:?}", msg.key(), msg.offset()); - // TODO: GK - what to do on error? Panic? - let _ = block_on(self.tx_feedback_channel.send(MessengerChannelFeedback::Success( - version, - "kafka".to_string(), - delivery_opaque.total_publish_count, - ))); - } - Err(err) => { - // TODO: GK - what to do on error? Panic? - let _ = block_on(self.tx_feedback_channel.send(MessengerChannelFeedback::Error(version, err.0.to_string()))); - } - } - } -} // Kafka Producer -// #[derive(Clone)] pub struct KafkaProducer { producer: ThreadedProducer, topic: String, diff --git a/packages/talos_messenger_actions/src/kafka/service.rs b/packages/talos_messenger_actions/src/kafka/service.rs index c6362d9e..5ec0c328 100644 --- a/packages/talos_messenger_actions/src/kafka/service.rs +++ b/packages/talos_messenger_actions/src/kafka/service.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use async_trait::async_trait; use log::{error, info}; use tokio::sync::mpsc; @@ -10,8 +12,9 @@ use talos_messenger_core::{ use super::models::KafkaAction; -pub struct KafkaActionService + Send + Sync> { - pub publisher: M, +#[derive(Debug)] +pub struct KafkaActionService + Send + Sync + 'static> { + pub publisher: Arc, pub rx_actions_channel: mpsc::Receiver, pub tx_feedback_channel: mpsc::Sender, } @@ -31,20 +34,18 @@ where Some(actions) = self.rx_actions_channel.recv() => { let MessengerCommitActions {version, commit_actions } = actions; - let Some(publish_actions_for_type) = commit_actions.get(&self.publisher.get_publish_type().to_string()) else { - // If publish is not present, continue the loop. - continue; - }; - - // TODO: GK - Make this block generic in next ticket to iterator in loop by PublishActionType - { + if let Some(publish_actions_for_type) = commit_actions.get(&self.publisher.get_publish_type().to_string()){ match get_actions_deserialised::>(publish_actions_for_type) { Ok(actions) => { let total_len = actions.len() as u32; + for action in actions { - // Send to Kafka - self.publisher.send(version, action, total_len ).await; + let publisher = self.publisher.clone(); + // Publish the message + tokio::spawn(async move { + publisher.send(version, action, total_len ).await; + }); } }, diff --git a/packages/talos_messenger_core/src/core.rs b/packages/talos_messenger_core/src/core.rs index aa65d063..024fcb52 100644 --- a/packages/talos_messenger_core/src/core.rs +++ b/packages/talos_messenger_core/src/core.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use strum::{Display, EnumIter, EnumString}; -use crate::errors::MessengerServiceResult; +use crate::errors::{MessengerActionError, MessengerServiceResult}; #[derive(Debug, Display, Serialize, Deserialize, EnumString, EnumIter, Clone, Eq, PartialEq)] pub enum CommitActionType { @@ -34,13 +34,13 @@ pub trait MessengerSystemService { async fn stop(&self) -> MessengerServiceResult; } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MessengerCommitActions { pub version: u64, pub commit_actions: HashMap, } pub enum MessengerChannelFeedback { - Error(u64, String), - Success(u64, String, u32), + Error(u64, String, Box), + Success(u64, String), } diff --git a/packages/talos_messenger_core/src/errors.rs b/packages/talos_messenger_core/src/errors.rs index b44ae7ce..824acc84 100644 --- a/packages/talos_messenger_core/src/errors.rs +++ b/packages/talos_messenger_core/src/errors.rs @@ -3,13 +3,14 @@ use thiserror::Error as ThisError; pub type MessengerServiceResult = Result<(), MessengerServiceError>; #[derive(Debug, PartialEq, Clone)] -pub enum ActionErrorKind { +pub enum MessengerActionErrorKind { Deserialisation, + Publishing, } #[derive(Debug, ThisError, PartialEq, Clone)] -#[error("Action Error {kind:?} with reason={reason} for data={data:?}")] -pub struct ActionError { - pub kind: ActionErrorKind, +#[error("Messenger action error {kind:?} with reason={reason} for data={data:?}")] +pub struct MessengerActionError { + pub kind: MessengerActionErrorKind, pub reason: String, pub data: String, } diff --git a/packages/talos_messenger_core/src/services/inbound_service.rs b/packages/talos_messenger_core/src/services/inbound_service.rs index 1fb04381..d5f19c09 100644 --- a/packages/talos_messenger_core/src/services/inbound_service.rs +++ b/packages/talos_messenger_core/src/services/inbound_service.rs @@ -1,22 +1,6 @@ -// 1. Kafka - Get candidate message -// a. Store inmemory. -// 2. Kafka - Get decision message. -// a. Update the store. -// 3. Handle `On Commit` part of the message -// a. Can there be anything other than publishing to kafka? -// b. what if the topic doesnt exist? -// c. Any validation required on what is being published? -// d. Publish T(k) only if all prioir items are published or if safepoint of T(k) is published? -// e. If there are multiple messages to be published, should they be done serially?:- -// i. If to the same topic -// ii. If to another topic -// 4. After a message was published:- -// a. Mark that item as processed. -// b. Prune the store if contiguous items are processed. - use ahash::{HashMap, HashMapExt}; use async_trait::async_trait; -use log::{debug, info, warn}; +use log::{debug, error, info, warn}; use talos_certifier::{model::DecisionMessageTrait, ports::MessageReciever, ChannelMessage}; use talos_suffix::{Suffix, SuffixTrait}; @@ -37,7 +21,7 @@ where pub tx_actions_channel: mpsc::Sender, pub rx_feedback_channel: mpsc::Receiver, pub suffix: Suffix, - pub allowed_actions: HashMap<&'static str, Vec<&'static str>>, + pub allowed_actions: HashMap>, } impl MessengerInboundService @@ -74,38 +58,63 @@ where Ok(()) } + /// Checks if all actions are completed and updates the state of the item to `Processed`. + /// Also, checks if the suffix can be pruned and the message_receiver can be committed. + pub(crate) fn check_and_update_all_actions_complete(&mut self, version: u64, reason: SuffixItemCompleteStateReason) { + match self.suffix.are_all_actions_complete_for_version(version) { + Ok(is_completed) if is_completed => { + self.suffix.set_item_state(version, SuffixItemState::Complete(reason)); + + // Pruning of suffix. + self.suffix.update_prune_index_from_version(version); + + debug!("[Actions] All actions for version {version} completed!"); + // Check prune eligibility by looking at the prune meta info. + if let Some(index_to_prune) = self.suffix.get_safe_prune_index() { + // Call prune method on suffix. + let _ = self.suffix.prune_till_index(index_to_prune); + + let commit_offset = version + 1; + debug!("[Commit] Updating tpl to version .. {commit_offset}"); + let _ = self.message_receiver.update_offset_to_commit(commit_offset as i64); + + self.message_receiver.commit_async(); + } + } + _ => {} + } + } + /// + /// Handles the failed to process feedback received from other services + /// + pub(crate) fn handle_action_failed(&mut self, version: u64, action_key: &str) { + let item_state = self.suffix.get_item_state(version); + match item_state { + Some(SuffixItemState::Processing) | Some(SuffixItemState::PartiallyComplete) => { + self.suffix.set_item_state(version, SuffixItemState::PartiallyComplete); + + self.suffix.increment_item_action_count(version, action_key); + self.check_and_update_all_actions_complete(version, SuffixItemCompleteStateReason::ErrorProcessing); + debug!( + "[Action] State version={version} changed from {item_state:?} => {:?}", + self.suffix.get_item_state(version) + ); + } + _ => (), + }; + } /// /// Handles the feedback received from other services when they have successfully processed the action. /// Will update the individual action for the count and completed flag and also update state of the suffix item. /// - pub(crate) async fn handle_item_actioned_success(&mut self, version: u64, action_key: &str, total_count: u32) { + pub(crate) fn handle_action_success(&mut self, version: u64, action_key: &str) { let item_state = self.suffix.get_item_state(version); match item_state { Some(SuffixItemState::Processing) | Some(SuffixItemState::PartiallyComplete) => { self.suffix.set_item_state(version, SuffixItemState::PartiallyComplete); - self.suffix.update_item_action(version, action_key, total_count); - if self.suffix.are_all_item_actions_completed(version) { - self.suffix - .set_item_state(version, SuffixItemState::Complete(SuffixItemCompleteStateReason::Processed)); - - // Pruning of suffix. - self.suffix.update_prune_index_from_version(version); - - debug!("[Actions] All actions in Version {version} completed!"); - // Check prune eligibility by looking at the prune meta info. - if let Some(index_to_prune) = self.suffix.get_safe_prune_index() { - // Call prune method on suffix. - let _ = self.suffix.prune_till_index(index_to_prune); - - // TODO: GK - Calculate the safe offset to commit. - let commit_offset = version + 1; - debug!("[Commit] Updating tpl to version .. {commit_offset}"); - let _ = self.message_receiver.update_savepoint(commit_offset as i64); - - self.message_receiver.commit_async(); - } - } + self.suffix.increment_item_action_count(version, action_key); + self.check_and_update_all_actions_complete(version, SuffixItemCompleteStateReason::Processed); debug!( "[Action] State version={version} changed from {item_state:?} => {:?}", self.suffix.get_item_state(version) @@ -184,12 +193,14 @@ where // Receive feedback from publisher. Some(feedback) = self.rx_feedback_channel.recv() => { match feedback { - // TODO: GK - What to do when we have error on publishing? Retry?? - MessengerChannelFeedback::Error(_, _) => panic!("Implement the error feedback"), - MessengerChannelFeedback::Success(version, key, total_count) => { - info!("Successfully received version={version} count={total_count}"); + MessengerChannelFeedback::Error(version, key, message_error) => { + error!("Failed to process version={version} with error={message_error:?}"); + self.handle_action_failed(version, &key); - self.handle_item_actioned_success(version, &key, total_count).await; + }, + MessengerChannelFeedback::Success(version, key) => { + info!("Successfully processed version={version} with action_key={key}"); + self.handle_action_success(version, &key); }, } // Process the next items with commit actions diff --git a/packages/talos_messenger_core/src/suffix.rs b/packages/talos_messenger_core/src/suffix.rs index bc18633d..a5a232ea 100644 --- a/packages/talos_messenger_core/src/suffix.rs +++ b/packages/talos_messenger_core/src/suffix.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use std::fmt::Debug; use talos_certifier::model::{CandidateMessage, Decision, DecisionMessageTrait}; -use talos_suffix::{core::SuffixMeta, Suffix, SuffixItem, SuffixTrait}; +use talos_suffix::{core::SuffixResult, Suffix, SuffixTrait}; pub trait MessengerSuffixItemTrait: Debug + Clone { fn set_state(&mut self, state: SuffixItemState); @@ -26,10 +26,6 @@ pub trait MessengerSuffixTrait: SuffixTrait { fn set_item_state(&mut self, version: u64, process_state: SuffixItemState); // Getters - /// Get suffix meta - fn get_meta(&self) -> &SuffixMeta; - /// Get suffix item as mutable reference. - fn get_mut(&mut self, version: u64) -> Option<&mut SuffixItem>; /// Checks if suffix ready to prune /// @@ -41,14 +37,14 @@ pub trait MessengerSuffixTrait: SuffixTrait { /// Updates the decision for a version. fn update_item_decision(&mut self, version: u64, decision_version: u64, decision_message: &D); /// Updates the action for a version using the action_key for lookup. - fn update_item_action(&mut self, version: u64, action_key: &str, total_count: u32); + fn increment_item_action_count(&mut self, version: u64, action_key: &str); /// Checks if all versions prioir to this version are already completed, and updates the prune index. /// If the prune index was updated, returns the new prune_index, else returns None. fn update_prune_index_from_version(&mut self, version: u64) -> Option; /// Checks if all commit actions are completed for the version - fn are_all_item_actions_completed(&self, version: u64) -> bool; + fn are_all_actions_complete_for_version(&self, version: u64) -> SuffixResult; } #[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] @@ -68,39 +64,30 @@ pub enum SuffixItemCompleteStateReason { NoCommitActions, /// When there are commit actions, but they are not required to be handled in messenger NoRelavantCommitActions, - //TODO: GK - Mark as error? - /// When there is an error? - // Error(String), /// When all commit action has are completed. Processed, + /// Error in processing + ErrorProcessing, } -// #[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] -// pub struct AllowedActionsMapValueMeta { -// pub total_count: u32, -// pub completed_count: u32, -// } #[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] pub struct AllowedActionsMapItem { payload: Value, count: u32, - is_completed: bool, + total_count: u32, } impl AllowedActionsMapItem { - pub fn new(payload: Value) -> Self { + pub fn new(payload: Value, total_count: u32) -> Self { AllowedActionsMapItem { payload, count: 0, - is_completed: false, + total_count, } } - pub fn update_count(&mut self) { - self.count += 1; - } - pub fn mark_completed(&mut self) { - self.is_completed = true; + pub fn increment_count(&mut self) { + self.count += 1; } pub fn get_payload(&self) -> &Value { @@ -112,7 +99,7 @@ impl AllowedActionsMapItem { } pub fn is_completed(&self) -> bool { - self.is_completed + self.total_count > 0 && self.total_count == self.count } } @@ -190,12 +177,6 @@ impl MessengerSuffixTrait for Suffix where T: MessengerSuffixItemTrait, { - // TODO: GK - Elevate this to core suffix - fn get_mut(&mut self, version: u64) -> Option<&mut SuffixItem> { - let index = self.index_from_head(version)?; - self.messages.get_mut(index)?.as_mut() - } - fn set_item_state(&mut self, version: u64, process_state: SuffixItemState) { if let Some(item_to_update) = self.get_mut(version) { item_to_update.item.set_state(process_state) @@ -210,11 +191,6 @@ where } } - // TODO: GK - Elevate this to core suffix - fn get_meta(&self) -> &SuffixMeta { - &self.meta - } - fn get_suffix_items_to_process(&self) -> Vec { let items = self .messages @@ -265,14 +241,10 @@ where } } - fn update_item_action(&mut self, version: u64, action_key: &str, total_count: u32) { + fn increment_item_action_count(&mut self, version: u64, action_key: &str) { if let Some(item_to_update) = self.get_mut(version) { if let Some(action) = item_to_update.item.get_action_by_key_mut(action_key) { - action.update_count(); - - if action.get_count() == total_count { - action.mark_completed(); - } + action.increment_count(); } else { warn!("Could not update the action as item with version={version} does not have action_key={action_key}! "); } @@ -313,13 +285,12 @@ where Some(index) } - fn are_all_item_actions_completed(&self, version: u64) -> bool { + fn are_all_actions_complete_for_version(&self, version: u64) -> SuffixResult { if let Ok(Some(item)) = self.get(version) { - item.item.get_commit_actions().iter().all(|(_, x)| x.is_completed()) + Ok(item.item.get_commit_actions().iter().all(|(_, x)| x.is_completed())) } else { warn!("could not find item for version={version}"); - // TODO: GK - handle this in another way for future? - true + Err(talos_suffix::errors::SuffixError::ItemNotFound(version, None)) } } } diff --git a/packages/talos_messenger_core/src/tests/utils.rs b/packages/talos_messenger_core/src/tests/utils.rs index f0ca8427..aa160ba3 100644 --- a/packages/talos_messenger_core/src/tests/utils.rs +++ b/packages/talos_messenger_core/src/tests/utils.rs @@ -1,7 +1,25 @@ use ahash::{HashMap, HashMapExt}; use serde_json::{json, Value}; -use crate::utlis::{get_actions_deserialised, get_allowed_commit_actions}; +use crate::utlis::{create_whitelist_actions_from_str, get_actions_deserialised, get_allowed_commit_actions, ActionsParserConfig}; + +// Start - testing create_whitelist_actions_from_str function +#[test] +fn test_fn_create_whitelist_actions_from_str() { + let config = ActionsParserConfig { + case_sensitive: false, + key_value_delimiter: ":", + }; + + let actions_str = "foo:test, foo:test2, bar,FOO:test3"; + + let action_map = create_whitelist_actions_from_str(actions_str, &config); + + assert_eq!(action_map.len(), 2); + assert_eq!(action_map.get("foo").unwrap().len(), 3); + assert!(action_map.contains_key("bar")); +} +// End - testing create_whitelist_actions_from_str function // Start - testing get_allowed_commit_actions function #[test] @@ -38,25 +56,28 @@ fn test_fn_get_allowed_commit_actions_allowed_actions_negative_scenarios() { // When allowed action is supported type by the messenger, but the sub actions are not provided allowed_actions.clear(); - allowed_actions.insert("publish", vec![]); + allowed_actions.insert("publish".to_string(), vec![]); let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions); assert!(result.is_empty()); // When allowed action is supported type by the messenger, but the sub actions are not supported allowed_actions.clear(); - allowed_actions.insert("publish", vec!["sqs", "sns"]); + allowed_actions.insert("publish".to_string(), vec!["sqs".to_string(), "sns".to_string()]); let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions); assert!(result.is_empty()); // When allowed action is non supported type by the messenger, with empty sub type allowed_actions.clear(); - allowed_actions.insert("random", vec![]); + allowed_actions.insert("random".to_string(), vec![]); let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions); assert!(result.is_empty()); // When allowed action is non supported type by the messenger, but has valid sub actions allowed_actions.clear(); - allowed_actions.insert("random", vec!["sqs", "sns", "kafka", "mqtt"]); + allowed_actions.insert( + "random".to_string(), + vec!["sqs".to_string(), "sns".to_string(), "kafka".to_string(), "mqtt".to_string()], + ); let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions); assert!(result.is_empty()); } @@ -64,7 +85,10 @@ fn test_fn_get_allowed_commit_actions_allowed_actions_negative_scenarios() { #[test] fn test_fn_get_allowed_commit_actions_on_commit_action_negative_scenarios() { let mut allowed_actions = HashMap::new(); - allowed_actions.insert("publish", vec!["sqs", "sns", "kafka", "mqtt"]); + allowed_actions.insert( + "publish".to_string(), + vec!["sqs".to_string(), "sns".to_string(), "kafka".to_string(), "mqtt".to_string()], + ); // When on_commit_actions are not present let on_commit_actions = serde_json::json!({}); @@ -153,7 +177,10 @@ fn test_fn_get_allowed_commit_actions_positive_scenario() { } }); - allowed_actions.insert("publish", vec!["sqs", "sns", "kafka", "mqtt"]); + allowed_actions.insert( + "publish".to_string(), + vec!["sqs".to_string(), "sns".to_string(), "kafka".to_string(), "mqtt".to_string()], + ); let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions); assert_eq!(result.len(), 2); } diff --git a/packages/talos_messenger_core/src/utlis.rs b/packages/talos_messenger_core/src/utlis.rs index f65cfb67..88715c1b 100644 --- a/packages/talos_messenger_core/src/utlis.rs +++ b/packages/talos_messenger_core/src/utlis.rs @@ -4,7 +4,66 @@ use ahash::{HashMap, HashMapExt}; use serde::de::DeserializeOwned; use serde_json::Value; -use crate::{errors::ActionError, suffix::AllowedActionsMapItem}; +use crate::{errors::MessengerActionError, suffix::AllowedActionsMapItem}; + +#[derive(Debug, Clone, Copy)] +pub struct ActionsParserConfig<'a> { + pub case_sensitive: bool, + pub key_value_delimiter: &'a str, +} + +impl Default for ActionsParserConfig<'_> { + fn default() -> Self { + Self { + case_sensitive: Default::default(), + key_value_delimiter: ":", + } + } +} + +/// Builds the list of items from a comma separated string. +/// +/// +/// `ActionsParserConfig` can be passed to specify if +/// - The key and value should be case sensitive. (defaults to false) +/// - Key-Value delimiter. (defaults to ':') +pub fn create_whitelist_actions_from_str(whitelist_str: &str, config: &ActionsParserConfig) -> HashMap> { + whitelist_str.split(',').fold(HashMap::new(), |mut acc_map, item| { + // case sensitive check. + let item_cased = if !config.case_sensitive { item.to_lowercase() } else { item.to_string() }; + // Has a key-value pair + if let Some((key, value)) = item_cased.trim().split_once(config.key_value_delimiter) { + let key_to_check = if !config.case_sensitive { key.to_lowercase() } else { key.to_owned() }; + let key_to_check = key_to_check.trim(); + // update existing entry + if let Some(map_item) = acc_map.get_mut(key_to_check) { + // Check for duplicate before inserting + let value_trimmed = value.trim().to_owned(); + if !map_item.contains(&value_trimmed) { + map_item.push(value_trimmed) + } + } + // insert new entry + else { + // Empty value will not be inserted + if !value.is_empty() { + acc_map.insert(key.to_owned(), vec![value.to_owned()]); + } + } + } + // just key type. + else { + let insert_key = if config.case_sensitive { + item_cased.to_lowercase() + } else { + item_cased.to_owned() + }; + let key_to_check = insert_key.trim(); + acc_map.insert(key_to_check.trim().to_owned(), vec![]); + } + acc_map + }) +} /// Retrieves the serde_json::Value for a given key pub fn get_value_by_key<'a>(value: &'a Value, key: &str) -> Option<&'a Value> { @@ -14,17 +73,17 @@ pub fn get_value_by_key<'a>(value: &'a Value, key: &str) -> Option<&'a Value> { /// Create a Hashmap of all the actions that require to be actioned by the messenger. /// Key for the map is the Action type. eg: "kafka", "mqtt" ..etc /// Value for the map contains the payload and some meta information like items actioned, and is completed flag -pub fn get_allowed_commit_actions( - on_commit_actions: &Value, - allowed_actions: &HashMap<&'static str, Vec<&'static str>>, -) -> HashMap { +pub fn get_allowed_commit_actions(on_commit_actions: &Value, allowed_actions: &HashMap>) -> HashMap { let mut filtered_actions = HashMap::new(); allowed_actions.iter().for_each(|(action_key, sub_action_keys)| { if let Some(action) = get_value_by_key(on_commit_actions, action_key) { for sub_action_key in sub_action_keys { if let Some(sub_action) = get_value_by_key(action, sub_action_key) { - filtered_actions.insert(sub_action_key.to_string(), AllowedActionsMapItem::new(sub_action.clone())); + filtered_actions.insert( + sub_action_key.to_string(), + AllowedActionsMapItem::new(sub_action.clone(), get_total_action_count(sub_action)), + ); } } } @@ -33,12 +92,20 @@ pub fn get_allowed_commit_actions( filtered_actions } +pub fn get_total_action_count(action: &Value) -> u32 { + if let Some(actions_vec) = action.as_array() { + actions_vec.len() as u32 + } else { + 1 + } +} + /// Retrieves sub actions under publish by using a look key. -pub fn get_actions_deserialised(actions: &Value) -> Result { +pub fn get_actions_deserialised(actions: &Value) -> Result { match serde_json::from_value(actions.clone()) { Ok(res) => Ok(res), - Err(err) => Err(ActionError { - kind: crate::errors::ActionErrorKind::Deserialisation, + Err(err) => Err(MessengerActionError { + kind: crate::errors::MessengerActionErrorKind::Deserialisation, reason: format!("Deserialisation to type={} failed, with error={:?}", type_name::(), err), data: actions.to_string(), }), @@ -52,7 +119,6 @@ pub fn get_actions_deserialised(actions: &Value) -> Result< // return None; // }; -// // TODO: GK - In future we will need to check if there are other type that we are interested in, and not just Kafka // match get_sub_actions::>(version, publish_actions, "kafka") { // Some(kafka_actions) if !kafka_actions.is_empty() => Some(OnCommitActions::Publish(Some(PublishActions::Kafka(kafka_actions)))), // _ => None, diff --git a/packages/talos_rdkafka_utils/src/kafka_config.rs b/packages/talos_rdkafka_utils/src/kafka_config.rs index 5a5b87a7..4ed80217 100644 --- a/packages/talos_rdkafka_utils/src/kafka_config.rs +++ b/packages/talos_rdkafka_utils/src/kafka_config.rs @@ -109,7 +109,7 @@ impl KafkaConfig { self.setup_auth(&mut client_config, base_config); - log::warn!("p: client_config = {:?}", client_config); + log::debug!("p: client_config = {:?}", client_config); client_config } diff --git a/packages/talos_suffix/src/core.rs b/packages/talos_suffix/src/core.rs index 9ed1830c..a9bf2a4b 100644 --- a/packages/talos_suffix/src/core.rs +++ b/packages/talos_suffix/src/core.rs @@ -56,6 +56,8 @@ pub type SuffixItemType = T; pub trait SuffixTrait { fn get(&self, version: u64) -> SuffixResult>>; + fn get_mut(&mut self, version: u64) -> Option<&mut SuffixItem>; + fn get_meta(&self) -> &SuffixMeta; fn insert(&mut self, version: u64, message: SuffixItemType) -> SuffixResult<()>; fn update_decision(&mut self, version: u64, decision_ver: u64) -> SuffixResult<()>; fn prune_till_index(&mut self, index: usize) -> SuffixResult>>>; diff --git a/packages/talos_suffix/src/suffix.rs b/packages/talos_suffix/src/suffix.rs index 38e62887..c3cf3500 100644 --- a/packages/talos_suffix/src/suffix.rs +++ b/packages/talos_suffix/src/suffix.rs @@ -124,8 +124,9 @@ where pub fn retrieve_all_some_vec_items(&self) -> Vec<(usize, u64, Option)> { self.messages .iter() + .flatten() .enumerate() - .filter_map(|(i, x)| x.is_some().then(|| (i, x.as_ref().unwrap().item_ver, x.as_ref().unwrap().decision_ver))) + .map(|(i, x)| (i, x.item_ver, x.decision_ver)) .collect() } @@ -228,8 +229,17 @@ where Ok(suffix_item) } + fn get_mut(&mut self, version: u64) -> Option<&mut SuffixItem> { + let index = self.index_from_head(version)?; + self.messages.get_mut(index)?.as_mut() + } + + fn get_meta(&self) -> &SuffixMeta { + &self.meta + } + fn insert(&mut self, version: u64, message: T) -> SuffixResult<()> { - // // The very first item inserted on the suffix will automatically be made head of the suffix. + // The very first item inserted on the suffix will automatically be made head of the suffix. if self.meta.head == 0 { self.update_head(version); } @@ -255,12 +265,6 @@ where self.meta.last_insert_vers = version; } - // info!( - // "All some items on suffix insert where head ={}.... {:?}", - // self.meta.head, - // self.retrieve_all_some_vec_items() - // ); - Ok(()) }