diff --git a/packages/talos_messenger_actions/src/kafka/context.rs b/packages/talos_messenger_actions/src/kafka/context.rs index ce2a8c1c..7f187904 100644 --- a/packages/talos_messenger_actions/src/kafka/context.rs +++ b/packages/talos_messenger_actions/src/kafka/context.rs @@ -1,7 +1,7 @@ use futures_executor::block_on; -use log::info; +use log::{error, info}; use rdkafka::{producer::ProducerContext, ClientContext, Message}; -use talos_messenger_core::core::MessengerChannelFeedback; +use talos_messenger_core::{core::MessengerChannelFeedback, errors::MessengerActionError}; use tokio::sync::mpsc; #[derive(Debug)] @@ -10,6 +10,7 @@ pub struct MessengerProducerDeliveryOpaque { pub total_publish_count: u32, } +#[derive(Debug, Clone)] pub struct MessengerProducerContext { pub tx_feedback_channel: mpsc::Sender, } @@ -27,15 +28,29 @@ impl ProducerContext for MessengerProducerContext { Ok(msg) => { info!("Message {:?} {:?}", msg.key(), msg.offset()); // Safe to ignore error check, as error occurs only if receiver is closed or dropped, which would happen if the thread receving has errored. In such a scenario, the publisher thread would also shutdown. - let _ = block_on(self.tx_feedback_channel.send(MessengerChannelFeedback::Success( - version, - "kafka".to_string(), - delivery_opaque.total_publish_count, - ))); + if let Err(error) = block_on(self.tx_feedback_channel.send(MessengerChannelFeedback::Success(version, "kafka".to_string()))) { + error!("[Messenger Producer Context] Error sending feedback for version={version} with error={error:?}"); + }; } - Err(err) => { + Err((publish_error, borrowed_message)) => { + error!( + "[Messenger Producer Context] Error for version={:?} \nerror={:?}", + delivery_opaque.version, + publish_error.to_string() + ); + let messenger_error = MessengerActionError { + kind: talos_messenger_core::errors::MessengerActionErrorKind::Publishing, + reason: publish_error.to_string(), + data: format!("version={version} message={:#?}", borrowed_message.detach()), + }; // Safe to ignore error check, as error occurs only if receiver is closed or dropped, which would happen if the thread receving has errored. In such a scenario, the publisher thread would also shutdown. - let _ = block_on(self.tx_feedback_channel.send(MessengerChannelFeedback::Error(version, err.0.to_string()))); + if let Err(send_error) = block_on(self.tx_feedback_channel.send(MessengerChannelFeedback::Error( + version, + "kafka".to_string(), + Box::new(messenger_error), + ))) { + error!("[Messenger Producer Context] Error sending error feedback for version={version} with \npublish_error={publish_error:?} \nchannel send_error={send_error:?}"); + }; } } } diff --git a/packages/talos_messenger_actions/src/kafka/models.rs b/packages/talos_messenger_actions/src/kafka/models.rs index 0c609874..a01f0f2e 100644 --- a/packages/talos_messenger_actions/src/kafka/models.rs +++ b/packages/talos_messenger_actions/src/kafka/models.rs @@ -2,14 +2,41 @@ use ahash::HashMap; use serde::{Deserialize, Serialize}; // 1.0.130 use serde_json::{self}; +fn default_text_plain_encoding() -> String { + "text/plain".to_string() +} + +fn default_application_json_encoding() -> String { + "application/json".to_string() +} + +#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] +pub struct MessengerKafkaActionHeader { + pub key_encoding: String, + pub key: String, + pub value_encoding: String, + pub value: String, +} + #[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] +#[serde(rename_all = "camelCase")] pub struct KafkaAction { - // TODO: GK - Add additional Kafka producer related props here. - // pub version: u32, - // pub cluster: String, + #[serde(default)] + pub cluster: String, + /// Topic to publish the payload pub topic: String, + /// Key encoding to be used. Defaults to `text/plain`. + #[serde(default = "default_text_plain_encoding")] + pub key_encoding: String, + /// Key for the message to publish. pub key: Option, + /// Optional if the message should be published to a specific partition. pub partition: Option, + /// Optional headers while publishing. pub headers: Option>, + /// Key encoding to be used. Defaults to `application/json`. + #[serde(default = "default_application_json_encoding")] + pub value_encoding: String, + /// Payload to publish. pub value: serde_json::Value, } diff --git a/packages/talos_messenger_actions/src/kafka/service.rs b/packages/talos_messenger_actions/src/kafka/service.rs index 7ecc0bb0..989eaff1 100644 --- a/packages/talos_messenger_actions/src/kafka/service.rs +++ b/packages/talos_messenger_actions/src/kafka/service.rs @@ -10,6 +10,7 @@ use talos_messenger_core::{ use super::models::KafkaAction; +#[derive(Debug)] pub struct KafkaActionService + Send + Sync> { pub publisher: M, pub rx_actions_channel: mpsc::Receiver, diff --git a/packages/talos_messenger_core/src/core.rs b/packages/talos_messenger_core/src/core.rs index aa65d063..024fcb52 100644 --- a/packages/talos_messenger_core/src/core.rs +++ b/packages/talos_messenger_core/src/core.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use strum::{Display, EnumIter, EnumString}; -use crate::errors::MessengerServiceResult; +use crate::errors::{MessengerActionError, MessengerServiceResult}; #[derive(Debug, Display, Serialize, Deserialize, EnumString, EnumIter, Clone, Eq, PartialEq)] pub enum CommitActionType { @@ -34,13 +34,13 @@ pub trait MessengerSystemService { async fn stop(&self) -> MessengerServiceResult; } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MessengerCommitActions { pub version: u64, pub commit_actions: HashMap, } pub enum MessengerChannelFeedback { - Error(u64, String), - Success(u64, String, u32), + Error(u64, String, Box), + Success(u64, String), } diff --git a/packages/talos_messenger_core/src/errors.rs b/packages/talos_messenger_core/src/errors.rs index b44ae7ce..824acc84 100644 --- a/packages/talos_messenger_core/src/errors.rs +++ b/packages/talos_messenger_core/src/errors.rs @@ -3,13 +3,14 @@ use thiserror::Error as ThisError; pub type MessengerServiceResult = Result<(), MessengerServiceError>; #[derive(Debug, PartialEq, Clone)] -pub enum ActionErrorKind { +pub enum MessengerActionErrorKind { Deserialisation, + Publishing, } #[derive(Debug, ThisError, PartialEq, Clone)] -#[error("Action Error {kind:?} with reason={reason} for data={data:?}")] -pub struct ActionError { - pub kind: ActionErrorKind, +#[error("Messenger action error {kind:?} with reason={reason} for data={data:?}")] +pub struct MessengerActionError { + pub kind: MessengerActionErrorKind, pub reason: String, pub data: String, } diff --git a/packages/talos_messenger_core/src/services/inbound_service.rs b/packages/talos_messenger_core/src/services/inbound_service.rs index 8050a07e..d5f19c09 100644 --- a/packages/talos_messenger_core/src/services/inbound_service.rs +++ b/packages/talos_messenger_core/src/services/inbound_service.rs @@ -1,22 +1,6 @@ -// 1. Kafka - Get candidate message -// a. Store inmemory. -// 2. Kafka - Get decision message. -// a. Update the store. -// 3. Handle `On Commit` part of the message -// a. Can there be anything other than publishing to kafka? -// b. what if the topic doesnt exist? -// c. Any validation required on what is being published? -// d. Publish T(k) only if all prioir items are published or if safepoint of T(k) is published? -// e. If there are multiple messages to be published, should they be done serially?:- -// i. If to the same topic -// ii. If to another topic -// 4. After a message was published:- -// a. Mark that item as processed. -// b. Prune the store if contiguous items are processed. - use ahash::{HashMap, HashMapExt}; use async_trait::async_trait; -use log::{debug, info, warn}; +use log::{debug, error, info, warn}; use talos_certifier::{model::DecisionMessageTrait, ports::MessageReciever, ChannelMessage}; use talos_suffix::{Suffix, SuffixTrait}; @@ -74,37 +58,63 @@ where Ok(()) } + /// Checks if all actions are completed and updates the state of the item to `Processed`. + /// Also, checks if the suffix can be pruned and the message_receiver can be committed. + pub(crate) fn check_and_update_all_actions_complete(&mut self, version: u64, reason: SuffixItemCompleteStateReason) { + match self.suffix.are_all_actions_complete_for_version(version) { + Ok(is_completed) if is_completed => { + self.suffix.set_item_state(version, SuffixItemState::Complete(reason)); + + // Pruning of suffix. + self.suffix.update_prune_index_from_version(version); + + debug!("[Actions] All actions for version {version} completed!"); + // Check prune eligibility by looking at the prune meta info. + if let Some(index_to_prune) = self.suffix.get_safe_prune_index() { + // Call prune method on suffix. + let _ = self.suffix.prune_till_index(index_to_prune); + + let commit_offset = version + 1; + debug!("[Commit] Updating tpl to version .. {commit_offset}"); + let _ = self.message_receiver.update_offset_to_commit(commit_offset as i64); + + self.message_receiver.commit_async(); + } + } + _ => {} + } + } + /// + /// Handles the failed to process feedback received from other services + /// + pub(crate) fn handle_action_failed(&mut self, version: u64, action_key: &str) { + let item_state = self.suffix.get_item_state(version); + match item_state { + Some(SuffixItemState::Processing) | Some(SuffixItemState::PartiallyComplete) => { + self.suffix.set_item_state(version, SuffixItemState::PartiallyComplete); + + self.suffix.increment_item_action_count(version, action_key); + self.check_and_update_all_actions_complete(version, SuffixItemCompleteStateReason::ErrorProcessing); + debug!( + "[Action] State version={version} changed from {item_state:?} => {:?}", + self.suffix.get_item_state(version) + ); + } + _ => (), + }; + } /// /// Handles the feedback received from other services when they have successfully processed the action. /// Will update the individual action for the count and completed flag and also update state of the suffix item. /// - pub(crate) async fn handle_item_actioned_success(&mut self, version: u64, action_key: &str, total_count: u32) { + pub(crate) fn handle_action_success(&mut self, version: u64, action_key: &str) { let item_state = self.suffix.get_item_state(version); match item_state { Some(SuffixItemState::Processing) | Some(SuffixItemState::PartiallyComplete) => { self.suffix.set_item_state(version, SuffixItemState::PartiallyComplete); - self.suffix.update_item_action(version, action_key, total_count); - if self.suffix.are_all_item_actions_completed(version) { - self.suffix - .set_item_state(version, SuffixItemState::Complete(SuffixItemCompleteStateReason::Processed)); - - // Pruning of suffix. - self.suffix.update_prune_index_from_version(version); - - debug!("[Actions] All actions in Version {version} completed!"); - // Check prune eligibility by looking at the prune meta info. - if let Some(index_to_prune) = self.suffix.get_safe_prune_index() { - // Call prune method on suffix. - let _ = self.suffix.prune_till_index(index_to_prune); - - let commit_offset = version + 1; - debug!("[Commit] Updating tpl to version .. {commit_offset}"); - let _ = self.message_receiver.update_offset_to_commit(commit_offset as i64); - - self.message_receiver.commit_async(); - } - } + self.suffix.increment_item_action_count(version, action_key); + self.check_and_update_all_actions_complete(version, SuffixItemCompleteStateReason::Processed); debug!( "[Action] State version={version} changed from {item_state:?} => {:?}", self.suffix.get_item_state(version) @@ -183,12 +193,14 @@ where // Receive feedback from publisher. Some(feedback) = self.rx_feedback_channel.recv() => { match feedback { - // TODO: GK - What to do when we have error on publishing? Retry?? - MessengerChannelFeedback::Error(_, _) => panic!("Implement the error feedback"), - MessengerChannelFeedback::Success(version, key, total_count) => { - info!("Successfully received version={version} count={total_count}"); + MessengerChannelFeedback::Error(version, key, message_error) => { + error!("Failed to process version={version} with error={message_error:?}"); + self.handle_action_failed(version, &key); - self.handle_item_actioned_success(version, &key, total_count).await; + }, + MessengerChannelFeedback::Success(version, key) => { + info!("Successfully processed version={version} with action_key={key}"); + self.handle_action_success(version, &key); }, } // Process the next items with commit actions diff --git a/packages/talos_messenger_core/src/suffix.rs b/packages/talos_messenger_core/src/suffix.rs index bea02e04..a5a232ea 100644 --- a/packages/talos_messenger_core/src/suffix.rs +++ b/packages/talos_messenger_core/src/suffix.rs @@ -37,7 +37,7 @@ pub trait MessengerSuffixTrait: SuffixTrait { /// Updates the decision for a version. fn update_item_decision(&mut self, version: u64, decision_version: u64, decision_message: &D); /// Updates the action for a version using the action_key for lookup. - fn update_item_action(&mut self, version: u64, action_key: &str, total_count: u32); + fn increment_item_action_count(&mut self, version: u64, action_key: &str); /// Checks if all versions prioir to this version are already completed, and updates the prune index. /// If the prune index was updated, returns the new prune_index, else returns None. @@ -64,39 +64,30 @@ pub enum SuffixItemCompleteStateReason { NoCommitActions, /// When there are commit actions, but they are not required to be handled in messenger NoRelavantCommitActions, - //TODO: GK - Mark as error? - /// When there is an error? - // Error(String), /// When all commit action has are completed. Processed, + /// Error in processing + ErrorProcessing, } -// #[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] -// pub struct AllowedActionsMapValueMeta { -// pub total_count: u32, -// pub completed_count: u32, -// } #[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] pub struct AllowedActionsMapItem { payload: Value, count: u32, - is_completed: bool, + total_count: u32, } impl AllowedActionsMapItem { - pub fn new(payload: Value) -> Self { + pub fn new(payload: Value, total_count: u32) -> Self { AllowedActionsMapItem { payload, count: 0, - is_completed: false, + total_count, } } - pub fn update_count(&mut self) { - self.count += 1; - } - pub fn mark_completed(&mut self) { - self.is_completed = true; + pub fn increment_count(&mut self) { + self.count += 1; } pub fn get_payload(&self) -> &Value { @@ -108,7 +99,7 @@ impl AllowedActionsMapItem { } pub fn is_completed(&self) -> bool { - self.is_completed + self.total_count > 0 && self.total_count == self.count } } @@ -250,14 +241,10 @@ where } } - fn update_item_action(&mut self, version: u64, action_key: &str, total_count: u32) { + fn increment_item_action_count(&mut self, version: u64, action_key: &str) { if let Some(item_to_update) = self.get_mut(version) { if let Some(action) = item_to_update.item.get_action_by_key_mut(action_key) { - action.update_count(); - - if action.get_count() == total_count { - action.mark_completed(); - } + action.increment_count(); } else { warn!("Could not update the action as item with version={version} does not have action_key={action_key}! "); } diff --git a/packages/talos_messenger_core/src/utlis.rs b/packages/talos_messenger_core/src/utlis.rs index 44acf22a..88715c1b 100644 --- a/packages/talos_messenger_core/src/utlis.rs +++ b/packages/talos_messenger_core/src/utlis.rs @@ -4,7 +4,7 @@ use ahash::{HashMap, HashMapExt}; use serde::de::DeserializeOwned; use serde_json::Value; -use crate::{errors::ActionError, suffix::AllowedActionsMapItem}; +use crate::{errors::MessengerActionError, suffix::AllowedActionsMapItem}; #[derive(Debug, Clone, Copy)] pub struct ActionsParserConfig<'a> { @@ -80,7 +80,10 @@ pub fn get_allowed_commit_actions(on_commit_actions: &Value, allowed_actions: &H if let Some(action) = get_value_by_key(on_commit_actions, action_key) { for sub_action_key in sub_action_keys { if let Some(sub_action) = get_value_by_key(action, sub_action_key) { - filtered_actions.insert(sub_action_key.to_string(), AllowedActionsMapItem::new(sub_action.clone())); + filtered_actions.insert( + sub_action_key.to_string(), + AllowedActionsMapItem::new(sub_action.clone(), get_total_action_count(sub_action)), + ); } } } @@ -89,20 +92,26 @@ pub fn get_allowed_commit_actions(on_commit_actions: &Value, allowed_actions: &H filtered_actions } +pub fn get_total_action_count(action: &Value) -> u32 { + if let Some(actions_vec) = action.as_array() { + actions_vec.len() as u32 + } else { + 1 + } +} + /// Retrieves sub actions under publish by using a look key. -pub fn get_actions_deserialised(actions: &Value) -> Result { +pub fn get_actions_deserialised(actions: &Value) -> Result { match serde_json::from_value(actions.clone()) { Ok(res) => Ok(res), - Err(err) => Err(ActionError { - kind: crate::errors::ActionErrorKind::Deserialisation, + Err(err) => Err(MessengerActionError { + kind: crate::errors::MessengerActionErrorKind::Deserialisation, reason: format!("Deserialisation to type={} failed, with error={:?}", type_name::(), err), data: actions.to_string(), }), } } -pub fn parse_white_list() {} - ///// Retrieves the oncommit actions that are supported by the system. // fn get_allowed_commit_actions(version: &u64, on_commit_actions: &Value) -> Option { // let Some(publish_actions) = on_commit_actions.get("publish") else {