Skip to content

Commit

Permalink
chore: refactor inbound service code into smaller fns
Browse files Browse the repository at this point in the history
  • Loading branch information
gk-kindred committed Sep 24, 2023
1 parent 8a254e2 commit 1dad90e
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 235 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,6 @@ Temporary Items
tasklist.md

docs/.*.bkp

# Temporary testing bin
packages/talos_messenger_actions/src/bin/test**.rs
58 changes: 0 additions & 58 deletions packages/talos_messenger_actions/src/bin/kafka_action_testing.rs

This file was deleted.

22 changes: 8 additions & 14 deletions packages/talos_messenger_actions/src/kafka/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,27 +87,21 @@ impl<C: ProducerContext + 'static> KafkaProducer<C> {
value: &str,
headers: Option<HashMap<String, String>>,
delivery_opaque: C::DeliveryOpaque,
// TODO: GK - Update error type here.
) -> Result<(), SystemServiceError> {
) -> Result<(), MessagePublishError> {
let record = BaseRecord::with_opaque_to(topic, delivery_opaque).payload(value).key(key);

let record = match headers {
Some(x) => record.headers(build_kafka_headers(x)),
None => record,
};

Ok(self
.producer
.send(record)
// .send(record, Timeout::After(Duration::from_secs(1)))
// .await
.map_err(|(kafka_error, record)| MessagePublishError {
reason: kafka_error.to_string(),
data: Some(format!(
"Topic={:?} partition={:?} key={:?} headers={:?} payload={:?}",
self.topic, record.partition, record.key, record.headers, record.payload
)),
})?)
self.producer.send(record).map_err(|(kafka_error, record)| MessagePublishError {
reason: kafka_error.to_string(),
data: Some(format!(
"Topic={:?} partition={:?} key={:?} headers={:?} payload={:?}",
self.topic, record.partition, record.key, record.headers, record.payload
)),
})
}
}

Expand Down
177 changes: 88 additions & 89 deletions packages/talos_messenger_core/src/services/inbound_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
core::{MessengerChannelFeedback, MessengerCommitActions, MessengerSystemService},
errors::{MessengerServiceError, MessengerServiceResult},
suffix::{MessengerCandidate, MessengerSuffixItemTrait, MessengerSuffixTrait, SuffixItemCompleteStateReason, SuffixItemState},
utlis::get_filtered_commit_actions,
utlis::get_allowed_commit_actions,
};

pub struct MessengerInboundService<M>
Expand All @@ -44,25 +44,22 @@ impl<M> MessengerInboundService<M>
where
M: MessageReciever<Message = ChannelMessage> + Send + Sync + 'static,
{
/// Get next items to process.
/// Get next versions with their commit actions to process.
///
async fn process_next_actions(&mut self) -> MessengerServiceResult {
let items_to_process = self.suffix.get_suffix_items_to_process();

// {

error!(
"Items to process count... {:#?}",
items_to_process.iter().map(|x| x.version).collect::<Vec<u64>>()
);
// }

for item in items_to_process {
let ver = item.version;

let commit_actions = HashMap::new();
let payload_to_send = MessengerCommitActions {
version: ver,
commit_actions: item.actions.iter().fold(commit_actions, |mut acc, (key, value)| {
commit_actions: item.actions.iter().fold(HashMap::new(), |mut acc, (key, value)| {
acc.insert(key.to_string(), value.payload.clone());
acc
}),
Expand All @@ -81,6 +78,52 @@ where

Ok(())
}

///
/// Handles the feedback received from other services when they have successfully processed the action.
/// Will update the individual action for the count and completed flag and also update state of the suffix item.
///
pub(crate) fn handle_item_actioned_success(&mut self, version: u64, action_type: String, total_count: u32) {
let item_state = self.suffix.get_item_state(version);
match item_state {
Some(SuffixItemState::Processing) => {
self.suffix.set_item_state(version, SuffixItemState::PartiallyComplete);

// TODO: GK - Put setter function for this and avoid repeating it.
if let Some(item_to_update) = self.suffix.get_mut(version) {
if let Some(action_completed) = item_to_update.item.allowed_actions_map.get_mut(&action_type) {
action_completed.count += 1;
action_completed.is_completed = action_completed.count == total_count;
}

if item_to_update.item.allowed_actions_map.iter().all(|(_, x)| x.is_completed) {
self.suffix
.set_item_state(version, SuffixItemState::Complete(SuffixItemCompleteStateReason::Processed));
}
}
}
Some(SuffixItemState::PartiallyComplete) => {
self.suffix.set_item_state(version, SuffixItemState::PartiallyComplete);

// TODO: GK - Put setter function for this.
if let Some(item_to_update) = self.suffix.get_mut(version) {
if let Some(action_completed) = item_to_update.item.allowed_actions_map.get_mut(&action_type) {
action_completed.count += 1;

if action_completed.count == total_count {
action_completed.is_completed = true;
}
}

if item_to_update.item.allowed_actions_map.iter().all(|(_, x)| x.is_completed) {
self.suffix
.set_item_state(version, SuffixItemState::Complete(SuffixItemCompleteStateReason::Processed));
}
}
}
_ => (),
};
}
}

#[async_trait]
Expand All @@ -103,61 +146,59 @@ where
tokio::select! {
// 1. Consume message.
Ok(Some(msg)) = self.message_receiver.consume_message() => {
// if let Ok(Some(msg)) = res {

// 2. Add/update to suffix.
match msg {
// 2.1 For CM - Install messages on the version
ChannelMessage::Candidate(message) => {
let version = message.version;
if message.version > 0 {
// insert item to suffix
let _ = self.suffix.insert(version, message.into());

if let Some(item_to_update) = self.suffix.get_mut(version){
if let Some(commit_actions) = &item_to_update.item.candidate.on_commit {
// 2. Add/update to suffix.
match msg {
// 2.1 For CM - Install messages on the version
ChannelMessage::Candidate(message) => {
let version = message.version;
if message.version > 0 {
// insert item to suffix
let _ = self.suffix.insert(version, message.into());

let filter_actions = get_filtered_commit_actions(commit_actions, &self.allowed_actions);
if let Some(item_to_update) = self.suffix.get_mut(version){
if let Some(commit_actions) = &item_to_update.item.candidate.on_commit {

if filter_actions.is_empty() {
// There are on_commit actions, but not the ones required by messenger
let filter_actions = get_allowed_commit_actions(commit_actions, &self.allowed_actions);

item_to_update.item.set_state(SuffixItemState::Complete(SuffixItemCompleteStateReason::NoRelavantCommitActions));
} else {
if filter_actions.is_empty() {
// There are on_commit actions, but not the ones required by messenger

item_to_update.item.set_commit_action(filter_actions);
}
item_to_update.item.set_state(SuffixItemState::Complete(SuffixItemCompleteStateReason::NoRelavantCommitActions));
} else {
// No on_commit actions
item_to_update.item.set_state(SuffixItemState::Complete(SuffixItemCompleteStateReason::NoCommitActions));

item_to_update.item.set_commit_action(filter_actions);
}
error!("[FILTERED ACTIONS] version={} state={:?} actions={:#?}", version, item_to_update.item.state, item_to_update.item.allowed_actions_map);
};
} else {
// No on_commit actions
item_to_update.item.set_state(SuffixItemState::Complete(SuffixItemCompleteStateReason::NoCommitActions));

} else {
warn!("Version 0 will not be inserted into suffix.")
}
}
error!("[FILTERED ACTIONS] version={} state={:?} actions={:#?}", version, item_to_update.item.state, item_to_update.item.allowed_actions_map);
};

},
// 2.2 For DM - Update the decision with outcome + safepoint.
ChannelMessage::Decision(decision_version, decision_message) => {
let version = decision_message.get_candidate_version();
info!("[Decision Message] Version received = {} and {}", decision_version, version);
} else {
warn!("Version 0 will not be inserted into suffix.")
}

self.suffix.update_item_decision(version, decision_version, &decision_message);
},
// 2.2 For DM - Update the decision with outcome + safepoint.
ChannelMessage::Decision(decision_version, decision_message) => {
let version = decision_message.get_candidate_version();
info!("[Decision Message] Version received = {} and {}", decision_version, version);

self.process_next_actions().await?
self.suffix.update_item_decision(version, decision_version, &decision_message);

self.process_next_actions().await?

// TODO: GK - Calculate the safe offset to commit.

// TODO: GK - Prune suffix.
// TODO: GK - Calculate the safe offset to commit.

},
}
// TODO: GK - Prune suffix.

},
}

// }
}
// Next condition - Commit, get processed/published info.

Expand All @@ -171,51 +212,9 @@ where
MessengerChannelFeedback::Success(version, key, total_count) => {
info!("Successfully received version={version} count={total_count}");

let item_state = self.suffix.get_item_state(version);
match item_state{
Some(SuffixItemState::Processing) => {
self.suffix.set_item_state(version, SuffixItemState::PartiallyComplete);

// TODO: GK - Put setter function for this and avoid repeating it.
if let Some(item_to_update) = self.suffix.get_mut(version) {
if let Some(action_completed) = item_to_update.item.allowed_actions_map.get_mut(&key) {
action_completed.count += 1;
action_completed.is_completed = action_completed.count == total_count;

}

if item_to_update.item.allowed_actions_map.iter().all(|(_, x)| x.is_completed) {
self.suffix.set_item_state(version, SuffixItemState::Complete(SuffixItemCompleteStateReason::Processed));

}
}

}
Some(SuffixItemState::PartiallyComplete) => {
self.suffix.set_item_state(version, SuffixItemState::PartiallyComplete);

// TODO: GK - Put setter function for this.
if let Some(item_to_update) = self.suffix.get_mut(version) {
if let Some(action_completed) = item_to_update.item.allowed_actions_map.get_mut(&key) {
action_completed.count += 1;

if action_completed.count == total_count {
action_completed.is_completed = true;

}
}

if item_to_update.item.allowed_actions_map.iter().all(|(_, x)| x.is_completed) {
self.suffix.set_item_state(version, SuffixItemState::Complete(SuffixItemCompleteStateReason::Processed));

}
}

},
_ =>(),
};
self.handle_item_actioned_success(version, key, total_count);

error!("State change for version={version} from {item_state:?} => {:?}", self.suffix.get_item_state(version));
// error!("State change for version={version} from {item_state:?} => {:?}", self.suffix.get_item_state(version));

// self.suffix.messages.iter().flatten().for_each(|item|
// error!("version={} decision={:?} state={:?} action_state={:#?}", item.item_ver, item.item.decision, item.item.get_state(), item.item.commit_actions.iter().map(|x| (x.1.count, x.1.is_completed)).collect::<Vec<(u32, bool)>>())
Expand Down
Loading

0 comments on commit 1dad90e

Please sign in to comment.