Skip to content

Commit

Permalink
feat: Handle processing action error and marking items complete
Browse files Browse the repository at this point in the history
  • Loading branch information
gk-kindred committed Oct 9, 2023
1 parent 03c2be3 commit b0462d8
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 95 deletions.
33 changes: 24 additions & 9 deletions packages/talos_messenger_actions/src/kafka/context.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use futures_executor::block_on;
use log::info;
use log::{error, info};
use rdkafka::{producer::ProducerContext, ClientContext, Message};
use talos_messenger_core::core::MessengerChannelFeedback;
use talos_messenger_core::{core::MessengerChannelFeedback, errors::MessengerActionError};
use tokio::sync::mpsc;

#[derive(Debug)]
Expand All @@ -10,6 +10,7 @@ pub struct MessengerProducerDeliveryOpaque {
pub total_publish_count: u32,
}

#[derive(Debug, Clone)]
pub struct MessengerProducerContext {
pub tx_feedback_channel: mpsc::Sender<MessengerChannelFeedback>,
}
Expand All @@ -27,15 +28,29 @@ impl ProducerContext for MessengerProducerContext {
Ok(msg) => {
info!("Message {:?} {:?}", msg.key(), msg.offset());
// Safe to ignore error check, as error occurs only if receiver is closed or dropped, which would happen if the thread receving has errored. In such a scenario, the publisher thread would also shutdown.
let _ = block_on(self.tx_feedback_channel.send(MessengerChannelFeedback::Success(
version,
"kafka".to_string(),
delivery_opaque.total_publish_count,
)));
if let Err(error) = block_on(self.tx_feedback_channel.send(MessengerChannelFeedback::Success(version, "kafka".to_string()))) {
error!("[Messenger Producer Context] Error sending feedback for version={version} with error={error:?}");
};
}
Err(err) => {
Err((publish_error, borrowed_message)) => {
error!(
"[Messenger Producer Context] Error for version={:?} \nerror={:?}",
delivery_opaque.version,
publish_error.to_string()
);
let messenger_error = MessengerActionError {
kind: talos_messenger_core::errors::MessengerActionErrorKind::Publishing,
reason: publish_error.to_string(),
data: format!("version={version} message={:#?}", borrowed_message.detach()),
};
// Safe to ignore error check, as error occurs only if receiver is closed or dropped, which would happen if the thread receving has errored. In such a scenario, the publisher thread would also shutdown.
let _ = block_on(self.tx_feedback_channel.send(MessengerChannelFeedback::Error(version, err.0.to_string())));
if let Err(send_error) = block_on(self.tx_feedback_channel.send(MessengerChannelFeedback::Error(
version,
"kafka".to_string(),
Box::new(messenger_error),
))) {
error!("[Messenger Producer Context] Error sending error feedback for version={version} with \npublish_error={publish_error:?} \nchannel send_error={send_error:?}");
};
}
}
}
Expand Down
33 changes: 30 additions & 3 deletions packages/talos_messenger_actions/src/kafka/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,41 @@ use ahash::HashMap;
use serde::{Deserialize, Serialize}; // 1.0.130
use serde_json::{self};

fn default_text_plain_encoding() -> String {
"text/plain".to_string()
}

fn default_application_json_encoding() -> String {
"application/json".to_string()
}

#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
pub struct MessengerKafkaActionHeader {
pub key_encoding: String,
pub key: String,
pub value_encoding: String,
pub value: String,
}

#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct KafkaAction {
// TODO: GK - Add additional Kafka producer related props here.
// pub version: u32,
// pub cluster: String,
#[serde(default)]
pub cluster: String,
/// Topic to publish the payload
pub topic: String,
/// Key encoding to be used. Defaults to `text/plain`.
#[serde(default = "default_text_plain_encoding")]
pub key_encoding: String,
/// Key for the message to publish.
pub key: Option<String>,
/// Optional if the message should be published to a specific partition.
pub partition: Option<i32>,
/// Optional headers while publishing.
pub headers: Option<HashMap<String, String>>,
/// Key encoding to be used. Defaults to `application/json`.
#[serde(default = "default_application_json_encoding")]
pub value_encoding: String,
/// Payload to publish.
pub value: serde_json::Value,
}
1 change: 1 addition & 0 deletions packages/talos_messenger_actions/src/kafka/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use talos_messenger_core::{

use super::models::KafkaAction;

#[derive(Debug)]
pub struct KafkaActionService<M: MessengerPublisher<Payload = KafkaAction> + Send + Sync> {
pub publisher: M,
pub rx_actions_channel: mpsc::Receiver<MessengerCommitActions>,
Expand Down
8 changes: 4 additions & 4 deletions packages/talos_messenger_core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
use serde_json::Value;
use strum::{Display, EnumIter, EnumString};

use crate::errors::MessengerServiceResult;
use crate::errors::{MessengerActionError, MessengerServiceResult};

#[derive(Debug, Display, Serialize, Deserialize, EnumString, EnumIter, Clone, Eq, PartialEq)]
pub enum CommitActionType {
Expand Down Expand Up @@ -34,13 +34,13 @@ pub trait MessengerSystemService {
async fn stop(&self) -> MessengerServiceResult;
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct MessengerCommitActions {
pub version: u64,
pub commit_actions: HashMap<String, Value>,
}

pub enum MessengerChannelFeedback {
Error(u64, String),
Success(u64, String, u32),
Error(u64, String, Box<MessengerActionError>),
Success(u64, String),
}
9 changes: 5 additions & 4 deletions packages/talos_messenger_core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ use thiserror::Error as ThisError;
pub type MessengerServiceResult = Result<(), MessengerServiceError>;

#[derive(Debug, PartialEq, Clone)]
pub enum ActionErrorKind {
pub enum MessengerActionErrorKind {
Deserialisation,
Publishing,
}
#[derive(Debug, ThisError, PartialEq, Clone)]
#[error("Action Error {kind:?} with reason={reason} for data={data:?}")]
pub struct ActionError {
pub kind: ActionErrorKind,
#[error("Messenger action error {kind:?} with reason={reason} for data={data:?}")]
pub struct MessengerActionError {
pub kind: MessengerActionErrorKind,
pub reason: String,
pub data: String,
}
Expand Down
100 changes: 56 additions & 44 deletions packages/talos_messenger_core/src/services/inbound_service.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,6 @@
// 1. Kafka - Get candidate message
// a. Store inmemory.
// 2. Kafka - Get decision message.
// a. Update the store.
// 3. Handle `On Commit` part of the message
// a. Can there be anything other than publishing to kafka?
// b. what if the topic doesnt exist?
// c. Any validation required on what is being published?
// d. Publish T(k) only if all prioir items are published or if safepoint of T(k) is published?
// e. If there are multiple messages to be published, should they be done serially?:-
// i. If to the same topic
// ii. If to another topic
// 4. After a message was published:-
// a. Mark that item as processed.
// b. Prune the store if contiguous items are processed.

use ahash::{HashMap, HashMapExt};
use async_trait::async_trait;
use log::{debug, info, warn};
use log::{debug, error, info, warn};

use talos_certifier::{model::DecisionMessageTrait, ports::MessageReciever, ChannelMessage};
use talos_suffix::{Suffix, SuffixTrait};
Expand Down Expand Up @@ -74,37 +58,63 @@ where
Ok(())
}

/// Checks if all actions are completed and updates the state of the item to `Processed`.
/// Also, checks if the suffix can be pruned and the message_receiver can be committed.
pub(crate) fn check_and_update_all_actions_complete(&mut self, version: u64, reason: SuffixItemCompleteStateReason) {
match self.suffix.are_all_actions_complete_for_version(version) {
Ok(is_completed) if is_completed => {
self.suffix.set_item_state(version, SuffixItemState::Complete(reason));

// Pruning of suffix.
self.suffix.update_prune_index_from_version(version);

debug!("[Actions] All actions for version {version} completed!");
// Check prune eligibility by looking at the prune meta info.
if let Some(index_to_prune) = self.suffix.get_safe_prune_index() {
// Call prune method on suffix.
let _ = self.suffix.prune_till_index(index_to_prune);

let commit_offset = version + 1;
debug!("[Commit] Updating tpl to version .. {commit_offset}");
let _ = self.message_receiver.update_offset_to_commit(commit_offset as i64);

self.message_receiver.commit_async();
}
}
_ => {}
}
}
///
/// Handles the failed to process feedback received from other services
///
pub(crate) fn handle_action_failed(&mut self, version: u64, action_key: &str) {
let item_state = self.suffix.get_item_state(version);
match item_state {
Some(SuffixItemState::Processing) | Some(SuffixItemState::PartiallyComplete) => {
self.suffix.set_item_state(version, SuffixItemState::PartiallyComplete);

self.suffix.increment_item_action_count(version, action_key);
self.check_and_update_all_actions_complete(version, SuffixItemCompleteStateReason::ErrorProcessing);
debug!(
"[Action] State version={version} changed from {item_state:?} => {:?}",
self.suffix.get_item_state(version)
);
}
_ => (),
};
}
///
/// Handles the feedback received from other services when they have successfully processed the action.
/// Will update the individual action for the count and completed flag and also update state of the suffix item.
///
pub(crate) async fn handle_item_actioned_success(&mut self, version: u64, action_key: &str, total_count: u32) {
pub(crate) fn handle_action_success(&mut self, version: u64, action_key: &str) {
let item_state = self.suffix.get_item_state(version);
match item_state {
Some(SuffixItemState::Processing) | Some(SuffixItemState::PartiallyComplete) => {
self.suffix.set_item_state(version, SuffixItemState::PartiallyComplete);

self.suffix.update_item_action(version, action_key, total_count);
if self.suffix.are_all_item_actions_completed(version) {
self.suffix
.set_item_state(version, SuffixItemState::Complete(SuffixItemCompleteStateReason::Processed));

// Pruning of suffix.
self.suffix.update_prune_index_from_version(version);

debug!("[Actions] All actions in Version {version} completed!");
// Check prune eligibility by looking at the prune meta info.
if let Some(index_to_prune) = self.suffix.get_safe_prune_index() {
// Call prune method on suffix.
let _ = self.suffix.prune_till_index(index_to_prune);

let commit_offset = version + 1;
debug!("[Commit] Updating tpl to version .. {commit_offset}");
let _ = self.message_receiver.update_offset_to_commit(commit_offset as i64);

self.message_receiver.commit_async();
}
}
self.suffix.increment_item_action_count(version, action_key);
self.check_and_update_all_actions_complete(version, SuffixItemCompleteStateReason::Processed);
debug!(
"[Action] State version={version} changed from {item_state:?} => {:?}",
self.suffix.get_item_state(version)
Expand Down Expand Up @@ -183,12 +193,14 @@ where
// Receive feedback from publisher.
Some(feedback) = self.rx_feedback_channel.recv() => {
match feedback {
// TODO: GK - What to do when we have error on publishing? Retry??
MessengerChannelFeedback::Error(_, _) => panic!("Implement the error feedback"),
MessengerChannelFeedback::Success(version, key, total_count) => {
info!("Successfully received version={version} count={total_count}");
MessengerChannelFeedback::Error(version, key, message_error) => {
error!("Failed to process version={version} with error={message_error:?}");
self.handle_action_failed(version, &key);

self.handle_item_actioned_success(version, &key, total_count).await;
},
MessengerChannelFeedback::Success(version, key) => {
info!("Successfully processed version={version} with action_key={key}");
self.handle_action_success(version, &key);
},
}
// Process the next items with commit actions
Expand Down
35 changes: 11 additions & 24 deletions packages/talos_messenger_core/src/suffix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub trait MessengerSuffixTrait<T: MessengerSuffixItemTrait>: SuffixTrait<T> {
/// Updates the decision for a version.
fn update_item_decision<D: DecisionMessageTrait>(&mut self, version: u64, decision_version: u64, decision_message: &D);
/// Updates the action for a version using the action_key for lookup.
fn update_item_action(&mut self, version: u64, action_key: &str, total_count: u32);
fn increment_item_action_count(&mut self, version: u64, action_key: &str);

/// Checks if all versions prioir to this version are already completed, and updates the prune index.
/// If the prune index was updated, returns the new prune_index, else returns None.
Expand All @@ -64,39 +64,30 @@ pub enum SuffixItemCompleteStateReason {
NoCommitActions,
/// When there are commit actions, but they are not required to be handled in messenger
NoRelavantCommitActions,
//TODO: GK - Mark as error?
/// When there is an error?
// Error(String),
/// When all commit action has are completed.
Processed,
/// Error in processing
ErrorProcessing,
}

// #[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
// pub struct AllowedActionsMapValueMeta {
// pub total_count: u32,
// pub completed_count: u32,
// }
#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
pub struct AllowedActionsMapItem {
payload: Value,
count: u32,
is_completed: bool,
total_count: u32,
}

impl AllowedActionsMapItem {
pub fn new(payload: Value) -> Self {
pub fn new(payload: Value, total_count: u32) -> Self {
AllowedActionsMapItem {
payload,
count: 0,
is_completed: false,
total_count,
}
}
pub fn update_count(&mut self) {
self.count += 1;
}

pub fn mark_completed(&mut self) {
self.is_completed = true;
pub fn increment_count(&mut self) {
self.count += 1;
}

pub fn get_payload(&self) -> &Value {
Expand All @@ -108,7 +99,7 @@ impl AllowedActionsMapItem {
}

pub fn is_completed(&self) -> bool {
self.is_completed
self.total_count > 0 && self.total_count == self.count
}
}

Expand Down Expand Up @@ -250,14 +241,10 @@ where
}
}

fn update_item_action(&mut self, version: u64, action_key: &str, total_count: u32) {
fn increment_item_action_count(&mut self, version: u64, action_key: &str) {
if let Some(item_to_update) = self.get_mut(version) {
if let Some(action) = item_to_update.item.get_action_by_key_mut(action_key) {
action.update_count();

if action.get_count() == total_count {
action.mark_completed();
}
action.increment_count();
} else {
warn!("Could not update the action as item with version={version} does not have action_key={action_key}! ");
}
Expand Down
Loading

0 comments on commit b0462d8

Please sign in to comment.