diff --git a/packages/talos_certifier/src/ports/message.rs b/packages/talos_certifier/src/ports/message.rs index 8b4848dc..bb274fa2 100644 --- a/packages/talos_certifier/src/ports/message.rs +++ b/packages/talos_certifier/src/ports/message.rs @@ -1,5 +1,6 @@ use async_trait::async_trait; use std::collections::HashMap; +use tokio::task::JoinHandle; use crate::errors::SystemServiceError; @@ -14,6 +15,7 @@ pub trait MessageReciever: SharedPortTraits { async fn consume_message(&mut self) -> Result, MessageReceiverError>; async fn subscribe(&self) -> Result<(), SystemServiceError>; async fn commit(&self) -> Result<(), SystemServiceError>; + fn commit_async(&self) -> Option>>; async fn update_savepoint(&mut self, offset: i64) -> Result<(), SystemServiceError>; async fn unsubscribe(&self); } diff --git a/packages/talos_certifier/src/services/message_receiver_service.rs b/packages/talos_certifier/src/services/message_receiver_service.rs index 874038a8..74e6e9fc 100644 --- a/packages/talos_certifier/src/services/message_receiver_service.rs +++ b/packages/talos_certifier/src/services/message_receiver_service.rs @@ -5,7 +5,7 @@ use std::{ use async_trait::async_trait; use log::{error, info, warn}; -use tokio::sync::mpsc; +use tokio::{sync::mpsc, time::Interval}; use crate::{ core::{ServiceResult, System, SystemService}, @@ -19,6 +19,7 @@ pub struct MessageReceiverService { pub message_channel_tx: mpsc::Sender, pub commit_offset: Arc, pub system: System, + pub commit_interval: Interval, } impl MessageReceiverService { @@ -33,6 +34,7 @@ impl MessageReceiverService { message_channel_tx, system, commit_offset, + commit_interval: tokio::time::interval(Duration::from_millis(10_000)), } } @@ -45,7 +47,6 @@ impl MessageReceiverService { #[async_trait] impl SystemService for MessageReceiverService { async fn run(&mut self) -> ServiceResult { - let mut interval = tokio::time::interval(Duration::from_millis(10_000)); tokio::select! { // ** Consume Messages from Kafka res = self.receiver.consume_message() => { @@ -81,10 +82,10 @@ impl SystemService for MessageReceiverService { } } //** commit message - _ = interval.tick() => { + _ = self.commit_interval.tick() => { let offset = self.commit_offset.load(std::sync::atomic::Ordering::Relaxed); self.receiver.update_savepoint(offset).await?; - self.receiver.commit().await?; + self.receiver.commit_async(); } } diff --git a/packages/talos_certifier/src/services/tests/message_receiver_service.rs b/packages/talos_certifier/src/services/tests/message_receiver_service.rs index ea2a94a0..660e1d0a 100644 --- a/packages/talos_certifier/src/services/tests/message_receiver_service.rs +++ b/packages/talos_certifier/src/services/tests/message_receiver_service.rs @@ -1,7 +1,10 @@ use std::sync::{atomic::AtomicI64, Arc}; use async_trait::async_trait; -use tokio::sync::{broadcast, mpsc}; +use tokio::{ + sync::{broadcast, mpsc}, + task::JoinHandle, +}; use crate::{ core::{System, SystemService}, @@ -41,6 +44,9 @@ impl MessageReciever for MockReciever { async fn commit(&self) -> Result<(), SystemServiceError> { Ok(()) } + fn commit_async(&self) -> Option>> { + None + } async fn update_savepoint(&mut self, _version: i64) -> Result<(), SystemServiceError> { Ok(()) } diff --git a/packages/talos_certifier_adapters/src/kafka/consumer.rs b/packages/talos_certifier_adapters/src/kafka/consumer.rs index e6f6ad6f..ca7bc614 100644 --- a/packages/talos_certifier_adapters/src/kafka/consumer.rs +++ b/packages/talos_certifier_adapters/src/kafka/consumer.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use async_trait::async_trait; use log::debug; @@ -19,6 +19,7 @@ use talos_certifier::{ }; use talos_rdkafka_utils::kafka_config::KafkaConfig; use time::OffsetDateTime; +use tokio::task::JoinHandle; use crate::{kafka::utils::get_message_headers, KafkaAdapterError}; @@ -27,7 +28,7 @@ use super::utils; // Kafka Consumer Client // #[derive(Debug, Clone)] pub struct KafkaConsumer { - pub consumer: StreamConsumer, + pub consumer: Arc>, pub topic: String, pub tpl: TopicPartitionList, } @@ -38,7 +39,7 @@ impl KafkaConsumer { let topic = config.topic.clone(); Self { - consumer, + consumer: Arc::new(consumer), topic, tpl: TopicPartitionList::new(), } @@ -182,6 +183,29 @@ impl MessageReciever for KafkaConsumer { } Ok(()) } + + fn commit_async(&self) -> Option>> { + if self.tpl.count() > 0 { + let consumer_copy = Arc::clone(&self.consumer); + let tpl = self.tpl.clone(); + let handle = tokio::task::spawn(async move { + consumer_copy.commit(&tpl, rdkafka::consumer::CommitMode::Async).map_err(|err| { + MessageReceiverError { + kind: MessageReceiverErrorKind::CommitError, + version: None, + reason: err.to_string(), + data: None, + } + .into() + }) + }); + + Some(handle) + } else { + None + } + } + async fn update_savepoint(&mut self, offset: i64) -> Result<(), SystemServiceError> { // let partition = self.tpl.; let tpl = self.tpl.elements_for_topic(&self.topic); diff --git a/packages/talos_cohort_replicator/src/core.rs b/packages/talos_cohort_replicator/src/core.rs index 6ce8b771..c9c8a976 100644 --- a/packages/talos_cohort_replicator/src/core.rs +++ b/packages/talos_cohort_replicator/src/core.rs @@ -81,6 +81,7 @@ where pub receiver: M, pub suffix: S, pub last_installing: u64, + pub next_commit_offset: Option, _phantom: PhantomData, } @@ -95,6 +96,7 @@ where receiver, suffix, last_installing: 0, + next_commit_offset: None, _phantom: PhantomData, } } @@ -138,14 +140,20 @@ where get_statemap_from_suffix_items(items.into_iter()) } - pub(crate) async fn commit_till_last_installed(&mut self) { + pub(crate) async fn prepare_offset_for_commit(&mut self) { if self.last_installing > 0 { if let Some(last_installed) = self.suffix.get_last_installed(Some(self.last_installing)) { let version = last_installed.decision_ver.unwrap(); - self.receiver.update_savepoint(version as i64).await.unwrap(); - - self.receiver.commit().await.unwrap(); + self.next_commit_offset = Some(version); } } } + + pub(crate) async fn commit(&mut self) { + if let Some(version) = self.next_commit_offset { + self.receiver.update_savepoint(version as i64).await.unwrap(); + self.receiver.commit_async(); + self.next_commit_offset = None; + } + } } diff --git a/packages/talos_cohort_replicator/src/services/replicator_service.rs b/packages/talos_cohort_replicator/src/services/replicator_service.rs index b7e4d261..95316543 100644 --- a/packages/talos_cohort_replicator/src/services/replicator_service.rs +++ b/packages/talos_cohort_replicator/src/services/replicator_service.rs @@ -90,7 +90,7 @@ where \n "); } - replicator.commit_till_last_installed().await; + replicator.commit().await; } // Receive feedback from installer. res = replicator_rx.recv() => { @@ -110,6 +110,7 @@ where // Prune suffix and update suffix head. if replicator.suffix.get_suffix_meta().prune_index >= replicator.suffix.get_suffix_meta().prune_start_threshold { + replicator.prepare_offset_for_commit().await; replicator.suffix.prune_till_version(version).unwrap(); } total_items_installed += 1;