Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Optimise replicator commit logic #82

Merged
merged 3 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/talos_certifier/src/ports/message.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_trait::async_trait;
use std::collections::HashMap;
use tokio::task::JoinHandle;

use crate::errors::SystemServiceError;

Expand All @@ -14,6 +15,7 @@ pub trait MessageReciever: SharedPortTraits {
async fn consume_message(&mut self) -> Result<Option<Self::Message>, MessageReceiverError>;
async fn subscribe(&self) -> Result<(), SystemServiceError>;
async fn commit(&self) -> Result<(), SystemServiceError>;
fn commit_async(&self) -> Option<JoinHandle<Result<(), SystemServiceError>>>;
async fn update_savepoint(&mut self, offset: i64) -> Result<(), SystemServiceError>;
async fn unsubscribe(&self);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{

use async_trait::async_trait;
use log::{error, info, warn};
use tokio::sync::mpsc;
use tokio::{sync::mpsc, time::Interval};

use crate::{
core::{ServiceResult, System, SystemService},
Expand All @@ -19,6 +19,7 @@ pub struct MessageReceiverService {
pub message_channel_tx: mpsc::Sender<ChannelMessage>,
pub commit_offset: Arc<AtomicI64>,
pub system: System,
pub commit_interval: Interval,
}

impl MessageReceiverService {
Expand All @@ -33,6 +34,7 @@ impl MessageReceiverService {
message_channel_tx,
system,
commit_offset,
commit_interval: tokio::time::interval(Duration::from_millis(10_000)),
}
}

Expand All @@ -45,7 +47,6 @@ impl MessageReceiverService {
#[async_trait]
impl SystemService for MessageReceiverService {
async fn run(&mut self) -> ServiceResult {
let mut interval = tokio::time::interval(Duration::from_millis(10_000));
tokio::select! {
// ** Consume Messages from Kafka
res = self.receiver.consume_message() => {
Expand Down Expand Up @@ -81,10 +82,10 @@ impl SystemService for MessageReceiverService {
}
}
//** commit message
_ = interval.tick() => {
_ = self.commit_interval.tick() => {
let offset = self.commit_offset.load(std::sync::atomic::Ordering::Relaxed);
self.receiver.update_savepoint(offset).await?;
self.receiver.commit().await?;
self.receiver.commit_async();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::sync::{atomic::AtomicI64, Arc};

use async_trait::async_trait;
use tokio::sync::{broadcast, mpsc};
use tokio::{
sync::{broadcast, mpsc},
task::JoinHandle,
};

use crate::{
core::{System, SystemService},
Expand Down Expand Up @@ -41,6 +44,9 @@ impl MessageReciever for MockReciever {
async fn commit(&self) -> Result<(), SystemServiceError> {
Ok(())
}
fn commit_async(&self) -> Option<JoinHandle<Result<(), SystemServiceError>>> {
None
}
async fn update_savepoint(&mut self, _version: i64) -> Result<(), SystemServiceError> {
Ok(())
}
Expand Down
30 changes: 27 additions & 3 deletions packages/talos_certifier_adapters/src/kafka/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::Duration;
use std::{sync::Arc, time::Duration};

use async_trait::async_trait;
use log::debug;
Expand All @@ -19,6 +19,7 @@ use talos_certifier::{
};
use talos_rdkafka_utils::kafka_config::KafkaConfig;
use time::OffsetDateTime;
use tokio::task::JoinHandle;

use crate::{kafka::utils::get_message_headers, KafkaAdapterError};

Expand All @@ -27,7 +28,7 @@ use super::utils;
// Kafka Consumer Client
// #[derive(Debug, Clone)]
pub struct KafkaConsumer {
pub consumer: StreamConsumer<DefaultConsumerContext>,
pub consumer: Arc<StreamConsumer<DefaultConsumerContext>>,
pub topic: String,
pub tpl: TopicPartitionList,
}
Expand All @@ -38,7 +39,7 @@ impl KafkaConsumer {

let topic = config.topic.clone();
Self {
consumer,
consumer: Arc::new(consumer),
topic,
tpl: TopicPartitionList::new(),
}
Expand Down Expand Up @@ -182,6 +183,29 @@ impl MessageReciever for KafkaConsumer {
}
Ok(())
}

fn commit_async(&self) -> Option<JoinHandle<Result<(), SystemServiceError>>> {
if self.tpl.count() > 0 {
let consumer_copy = Arc::clone(&self.consumer);
let tpl = self.tpl.clone();
let handle = tokio::task::spawn(async move {
consumer_copy.commit(&tpl, rdkafka::consumer::CommitMode::Async).map_err(|err| {
MessageReceiverError {
kind: MessageReceiverErrorKind::CommitError,
version: None,
reason: err.to_string(),
data: None,
}
.into()
})
});

Some(handle)
} else {
None
}
}

async fn update_savepoint(&mut self, offset: i64) -> Result<(), SystemServiceError> {
// let partition = self.tpl.;
let tpl = self.tpl.elements_for_topic(&self.topic);
Expand Down
16 changes: 12 additions & 4 deletions packages/talos_cohort_replicator/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ where
pub receiver: M,
pub suffix: S,
pub last_installing: u64,
pub next_commit_offset: Option<u64>,
_phantom: PhantomData<T>,
}

Expand All @@ -95,6 +96,7 @@ where
receiver,
suffix,
last_installing: 0,
next_commit_offset: None,
_phantom: PhantomData,
}
}
Expand Down Expand Up @@ -138,14 +140,20 @@ where
get_statemap_from_suffix_items(items.into_iter())
}

pub(crate) async fn commit_till_last_installed(&mut self) {
pub(crate) async fn prepare_offset_for_commit(&mut self) {
if self.last_installing > 0 {
if let Some(last_installed) = self.suffix.get_last_installed(Some(self.last_installing)) {
let version = last_installed.decision_ver.unwrap();
self.receiver.update_savepoint(version as i64).await.unwrap();

self.receiver.commit().await.unwrap();
self.next_commit_offset = Some(version);
}
}
}

pub(crate) async fn commit(&mut self) {
if let Some(version) = self.next_commit_offset {
self.receiver.update_savepoint(version as i64).await.unwrap();
self.receiver.commit_async();
self.next_commit_offset = None;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ where
\n ");
}

replicator.commit_till_last_installed().await;
replicator.commit().await;
}
// Receive feedback from installer.
res = replicator_rx.recv() => {
Expand All @@ -110,6 +110,7 @@ where

// Prune suffix and update suffix head.
if replicator.suffix.get_suffix_meta().prune_index >= replicator.suffix.get_suffix_meta().prune_start_threshold {
replicator.prepare_offset_for_commit().await;
replicator.suffix.prune_till_version(version).unwrap();
}
total_items_installed += 1;
Expand Down