Skip to content

Commit

Permalink
feat: minor refactor and unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
gk-kindred committed Sep 25, 2023
1 parent 1dad90e commit 1fe0908
Show file tree
Hide file tree
Showing 8 changed files with 381 additions and 111 deletions.
22 changes: 12 additions & 10 deletions packages/talos_messenger_actions/src/kafka/service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use async_trait::async_trait;
use log::info;
use log::{error, info};
use tokio::sync::mpsc;

use talos_messenger_core::{
Expand Down Expand Up @@ -38,17 +38,19 @@ where

// TODO: GK - Make this block generic in next ticket to iterator in loop by PublishActionType
{
let Some(kafka_actions) = get_actions_deserialised::<Vec<KafkaAction>>(&version, publish_actions_for_type, &self.publisher.get_publish_type().to_string()) else {
continue;
};
let total_len = kafka_actions.len() as u32;
for k_action in kafka_actions {
info!("Received message for version={version} and publish_action={k_action:#?}");
match get_actions_deserialised::<Vec<KafkaAction>>(publish_actions_for_type) {
Ok(actions) => {

let total_len = actions.len() as u32;
for action in actions {
// Send to Kafka
self.publisher.send(version, action, total_len ).await;

// if kafka commit action send to Kafka publisher
self.publisher.send(version, k_action, total_len ).await;

}
},
Err(err) => {
error!("Failed to deserialise for version={version} key={} for data={:?} with error={:?}",&self.publisher.get_publish_type(), err.data, err.reason )
},
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions packages/talos_messenger_core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,18 @@ use thiserror::Error as ThisError;

pub type MessengerServiceResult = Result<(), MessengerServiceError>;

#[derive(Debug, PartialEq, Clone)]
pub enum ActionErrorKind {
Deserialisation,
}
#[derive(Debug, ThisError, PartialEq, Clone)]
#[error("Action Error {kind:?} with reason={reason} for data={data:?}")]
pub struct ActionError {
pub kind: ActionErrorKind,
pub reason: String,
pub data: String,
}

#[derive(Debug, PartialEq, Clone)]
pub enum MessengerServiceErrorKind {
System,
Expand Down
3 changes: 3 additions & 0 deletions packages/talos_messenger_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@ pub mod services;
pub mod suffix;
pub mod talos_messenger_service;
pub mod utlis;

#[cfg(test)]
mod tests;
54 changes: 14 additions & 40 deletions packages/talos_messenger_core/src/services/inbound_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ where
let payload_to_send = MessengerCommitActions {
version: ver,
commit_actions: item.actions.iter().fold(HashMap::new(), |mut acc, (key, value)| {
acc.insert(key.to_string(), value.payload.clone());
acc.insert(key.to_string(), value.get_payload().clone());
acc
}),
};
Expand All @@ -83,46 +83,24 @@ where
/// Handles the feedback received from other services when they have successfully processed the action.
/// Will update the individual action for the count and completed flag and also update state of the suffix item.
///
pub(crate) fn handle_item_actioned_success(&mut self, version: u64, action_type: String, total_count: u32) {
pub(crate) fn handle_item_actioned_success(&mut self, version: u64, action_key: &str, total_count: u32) {
let item_state = self.suffix.get_item_state(version);
match item_state {
Some(SuffixItemState::Processing) => {
Some(SuffixItemState::Processing) | Some(SuffixItemState::PartiallyComplete) => {
self.suffix.set_item_state(version, SuffixItemState::PartiallyComplete);

// TODO: GK - Put setter function for this and avoid repeating it.
if let Some(item_to_update) = self.suffix.get_mut(version) {
if let Some(action_completed) = item_to_update.item.allowed_actions_map.get_mut(&action_type) {
action_completed.count += 1;
action_completed.is_completed = action_completed.count == total_count;
}

if item_to_update.item.allowed_actions_map.iter().all(|(_, x)| x.is_completed) {
self.suffix
.set_item_state(version, SuffixItemState::Complete(SuffixItemCompleteStateReason::Processed));
}
}
}
Some(SuffixItemState::PartiallyComplete) => {
self.suffix.set_item_state(version, SuffixItemState::PartiallyComplete);

// TODO: GK - Put setter function for this.
if let Some(item_to_update) = self.suffix.get_mut(version) {
if let Some(action_completed) = item_to_update.item.allowed_actions_map.get_mut(&action_type) {
action_completed.count += 1;

if action_completed.count == total_count {
action_completed.is_completed = true;
}
}

if item_to_update.item.allowed_actions_map.iter().all(|(_, x)| x.is_completed) {
self.suffix
.set_item_state(version, SuffixItemState::Complete(SuffixItemCompleteStateReason::Processed));
}
self.suffix.update_action(version, action_key, total_count);
if self.suffix.all_actions_completed(version) {
self.suffix
.set_item_state(version, SuffixItemState::Complete(SuffixItemCompleteStateReason::Processed));
}
}
_ => (),
};
error!(
"State change for version={version} from {item_state:?} => {:?}",
self.suffix.get_item_state(version)
);
}
}

Expand Down Expand Up @@ -151,30 +129,27 @@ where
match msg {
// 2.1 For CM - Install messages on the version
ChannelMessage::Candidate(message) => {

let version = message.version;
if message.version > 0 {
// insert item to suffix
let _ = self.suffix.insert(version, message.into());

if let Some(item_to_update) = self.suffix.get_mut(version){
if let Some(commit_actions) = &item_to_update.item.candidate.on_commit {

let filter_actions = get_allowed_commit_actions(commit_actions, &self.allowed_actions);

if filter_actions.is_empty() {
// There are on_commit actions, but not the ones required by messenger

item_to_update.item.set_state(SuffixItemState::Complete(SuffixItemCompleteStateReason::NoRelavantCommitActions));
} else {

item_to_update.item.set_commit_action(filter_actions);
}
} else {
// No on_commit actions
item_to_update.item.set_state(SuffixItemState::Complete(SuffixItemCompleteStateReason::NoCommitActions));

}
error!("[FILTERED ACTIONS] version={} state={:?} actions={:#?}", version, item_to_update.item.state, item_to_update.item.allowed_actions_map);
error!("[FILTERED ACTIONS] version={} state={:?} actions={:#?}", version, item_to_update.item.get_state(), item_to_update.item.get_commit_actions());
};

} else {
Expand Down Expand Up @@ -212,9 +187,8 @@ where
MessengerChannelFeedback::Success(version, key, total_count) => {
info!("Successfully received version={version} count={total_count}");

self.handle_item_actioned_success(version, key, total_count);
self.handle_item_actioned_success(version, &key, total_count);

// error!("State change for version={version} from {item_state:?} => {:?}", self.suffix.get_item_state(version));

// self.suffix.messages.iter().flatten().for_each(|item|
// error!("version={} decision={:?} state={:?} action_state={:#?}", item.item_ver, item.item.decision, item.item.get_state(), item.item.commit_actions.iter().map(|x| (x.1.count, x.1.is_completed)).collect::<Vec<(u32, bool)>>())
Expand Down
147 changes: 105 additions & 42 deletions packages/talos_messenger_core/src/suffix.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,46 @@
use ahash::{HashMap, HashMapExt};
use log::error;
use log::{error, warn};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::fmt::Debug;
use talos_certifier::model::{CandidateMessage, Decision, DecisionMessageTrait};
use talos_suffix::{core::SuffixMeta, Suffix, SuffixItem, SuffixTrait};

pub trait MessengerSuffixItemTrait {
fn set_state(&mut self, state: SuffixItemState);
fn set_safepoint(&mut self, safepoint: Option<u64>);
fn set_commit_action(&mut self, commit_actions: HashMap<String, AllowedActionsMapItem>);
fn set_decision(&mut self, decision: Decision);

fn get_state(&self) -> &SuffixItemState;
fn get_commit_actions(&self) -> &HashMap<String, AllowedActionsMapItem>;
fn get_action_by_key_mut(&mut self, action_key: &str) -> Option<&mut AllowedActionsMapItem>;
fn get_safepoint(&self) -> &Option<u64>;

fn is_abort(&self) -> Option<bool>;
}

pub trait MessengerSuffixTrait<T: MessengerSuffixItemTrait>: SuffixTrait<T> {
// Setters
fn set_item_state(&mut self, version: u64, process_state: SuffixItemState);

// Getters
fn get_mut(&mut self, version: u64) -> Option<&mut SuffixItem<T>>;
fn get_item_state(&self, version: u64) -> Option<SuffixItemState>;
fn get_last_installed(&self, to_version: Option<u64>) -> Option<&SuffixItem<T>>;
// fn update_suffix_item_decision(&mut self, version: u64, decision_ver: u64) -> SuffixResult<()>;
fn get_suffix_meta(&self) -> &SuffixMeta;
fn installed_all_prior_decided_items(&self, version: u64) -> bool;

fn get_suffix_items_to_process(&self) -> Vec<ActionsMapWithVersion>;
// updates
fn update_prune_index(&mut self, version: u64);
fn update_item_decision<D: DecisionMessageTrait>(&mut self, version: u64, decision_version: u64, decision_message: &D);
fn update_action(&mut self, version: u64, action_key: &str, total_count: u32);

fn all_actions_completed(&self, version: u64) -> bool;
}

#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
pub enum SuffixItemState {
AwaitingDecision,
Expand Down Expand Up @@ -37,9 +72,38 @@ pub enum SuffixItemCompleteStateReason {
// }
#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
pub struct AllowedActionsMapItem {
pub payload: Value,
pub count: u32,
pub is_completed: bool,
payload: Value,
count: u32,
is_completed: bool,
}

impl AllowedActionsMapItem {
pub fn new(payload: Value) -> Self {
AllowedActionsMapItem {
payload,
count: 0,
is_completed: false,
}
}
pub fn update_count(&mut self) {
self.count += 1;
}

pub fn mark_completed(&mut self) {
self.is_completed = true;
}

pub fn get_payload(&self) -> &Value {
&self.payload
}

pub fn get_count(&self) -> u32 {
self.count
}

pub fn is_completed(&self) -> bool {
self.is_completed
}
}

#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
Expand All @@ -51,14 +115,14 @@ pub struct ActionsMapWithVersion {
#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
pub struct MessengerCandidate {
pub candidate: CandidateMessage,

pub safepoint: Option<u64>,

pub decision: Option<Decision>,

pub state: SuffixItemState,

pub allowed_actions_map: HashMap<String, AllowedActionsMapItem>,
/// Safepoint received for committed outcomes from certifier.
safepoint: Option<u64>,
/// Decision received from certifier.
decision: Option<Decision>,
/// Suffix item state.
state: SuffixItemState,
/// Filtered actions that need to be processed by the messenger
allowed_actions_map: HashMap<String, AllowedActionsMapItem>,
}

impl From<CandidateMessage> for MessengerCandidate {
Expand Down Expand Up @@ -106,37 +170,10 @@ impl MessengerSuffixItemTrait for MessengerCandidate {
fn is_abort(&self) -> Option<bool> {
Some(self.decision.clone()?.eq(&Decision::Aborted))
}
}

pub trait MessengerSuffixItemTrait {
fn set_state(&mut self, state: SuffixItemState);
fn get_state(&self) -> &SuffixItemState;

fn set_commit_action(&mut self, commit_actions: HashMap<String, AllowedActionsMapItem>);
fn get_commit_actions(&self) -> &HashMap<String, AllowedActionsMapItem>;

fn set_safepoint(&mut self, safepoint: Option<u64>);
fn get_safepoint(&self) -> &Option<u64>;
fn set_decision(&mut self, decision: Decision);
fn is_abort(&self) -> Option<bool>;
}

pub trait MessengerSuffixTrait<T: MessengerSuffixItemTrait>: SuffixTrait<T> {
// Setters
fn set_item_state(&mut self, version: u64, process_state: SuffixItemState);

// Getters
fn get_mut(&mut self, version: u64) -> Option<&mut SuffixItem<T>>;
fn get_item_state(&self, version: u64) -> Option<SuffixItemState>;
fn get_last_installed(&self, to_version: Option<u64>) -> Option<&SuffixItem<T>>;
// fn update_suffix_item_decision(&mut self, version: u64, decision_ver: u64) -> SuffixResult<()>;
fn get_suffix_meta(&self) -> &SuffixMeta;
fn installed_all_prior_decided_items(&self, version: u64) -> bool;

fn get_suffix_items_to_process(&self) -> Vec<ActionsMapWithVersion>;
// updates
fn update_prune_index(&mut self, version: u64);
fn update_item_decision<D: DecisionMessageTrait>(&mut self, version: u64, decision_version: u64, decision_message: &D);
fn get_action_by_key_mut(&mut self, action_key: &str) -> Option<&mut AllowedActionsMapItem> {
self.allowed_actions_map.get_mut(action_key)
}
}

impl<T> MessengerSuffixTrait<T> for Suffix<T>
Expand Down Expand Up @@ -231,4 +268,30 @@ where
item_to_update.item.set_safepoint(decision_message.get_safepoint());
}
}

fn update_action(&mut self, version: u64, action_key: &str, total_count: u32) {
if let Some(item_to_update) = self.get_mut(version) {
if let Some(action) = item_to_update.item.get_action_by_key_mut(action_key) {
action.update_count();

if action.get_count() == total_count {
action.mark_completed();
}
} else {
warn!("Could not update the action as item with version={version} does not have action_key={action_key}! ");
}
} else {
warn!("Could not update the action as item with version={version} was not found! ");
}
}

fn all_actions_completed(&self, version: u64) -> bool {
if let Ok(Some(item)) = self.get(version) {
item.item.get_commit_actions().iter().all(|(_, x)| x.is_completed())
} else {
warn!("could not find item for version={version}");
// TODO: GK - handle this in another way for future?
true
}
}
}
1 change: 1 addition & 0 deletions packages/talos_messenger_core/src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub(crate) mod utils;
Loading

0 comments on commit 1fe0908

Please sign in to comment.