Skip to content

Commit

Permalink
chore: prune logic only when above the threshold
Browse files Browse the repository at this point in the history
  • Loading branch information
gk-kindred committed Nov 11, 2024
1 parent 111b266 commit 518a4a7
Showing 1 changed file with 19 additions and 42 deletions.
61 changes: 19 additions & 42 deletions packages/talos_messenger_core/src/services/inbound_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use async_trait::async_trait;
use log::{debug, error, info, warn};

use talos_certifier::{model::DecisionMessageTrait, ports::MessageReciever, ChannelMessage};
use talos_suffix::{Suffix, SuffixTrait};
use talos_suffix::{core::SuffixMeta, Suffix, SuffixTrait};
use time::{format_description::well_known::Rfc3339, OffsetDateTime};
use tokio::sync::mpsc::{self};

Expand Down Expand Up @@ -115,30 +115,6 @@ where
}

pub(crate) fn suffix_pruning(&mut self) {
// Update the prune index in suffix if applicable.
// let SuffixMeta {
// prune_index,
// prune_start_threshold,
// ..
// } = self.suffix.get_meta();
// let prune_index = self.suffix.get_meta().prune_index;

// If there is a prune_index, it is safe to assume, all messages prioir to this are decided + on_commit actions are actioned.
// Therefore, it is safe to commit till that offset/version.
// if prune_index.gt(prune_start_threshold) {
// if let Some(index) = prune_index {
// let prune_item_option = self.suffix.messages.get(*index);

// if let Some(Some(prune_item)) = prune_item_option {
// let commit_offset = prune_item.item_ver + 1;
// debug!("[Commit] Updating tpl to version .. {commit_offset}");
// let _ = self.message_receiver.update_offset_to_commit(commit_offset as i64);

// let _ = self.message_receiver.commit();
// }
// }
// }

// Check prune eligibility by looking at the prune meta info.
if let Some(index_to_prune) = self.suffix.get_safe_prune_index() {
// error!("Pruning till index {index_to_prune}");
Expand Down Expand Up @@ -240,23 +216,6 @@ where

feedback_count+=1;

if feedback_count.ge(&self.config.commit_size){
self.commit_offset();
feedback_count = 0;
}

// Update the prune index and commit
// let SuffixMeta {
// prune_index,
// prune_start_threshold,
// ..
// } = self.suffix.get_meta();

// // NOTE: Pruning and committing offset adds to latency if done more frequently.
// // The more frequent this method is called has direct impact on the latency.
// if prune_index.gt(prune_start_threshold) {
self.suffix_pruning();
// };

}
// 1. Consume message.
Expand Down Expand Up @@ -328,6 +287,24 @@ where
}

}

// NOTE: Pruning and committing offset adds to latency if done more frequently.

if feedback_count.ge(&self.config.commit_size) {
self.commit_offset();
feedback_count = 0;
}

// Update the prune index and commit
let SuffixMeta {
prune_index,
prune_start_threshold,
..
} = self.suffix.get_meta();

if prune_index.gt(prune_start_threshold) {
self.suffix_pruning();
};
}
}
}

0 comments on commit 518a4a7

Please sign in to comment.