diff --git a/examples/messenger_using_kafka/examples/messenger_using_kafka.rs b/examples/messenger_using_kafka/examples/messenger_using_kafka.rs index 485298aa..e3931005 100644 --- a/examples/messenger_using_kafka/examples/messenger_using_kafka.rs +++ b/examples/messenger_using_kafka/examples/messenger_using_kafka.rs @@ -38,7 +38,7 @@ async fn main() { kafka_config, allowed_actions, channel_buffers: None, - commit_size: None, + commit_size: Some(2_000), }; messenger_with_kafka(config).await.unwrap(); diff --git a/packages/talos_messenger_core/src/services/inbound_service.rs b/packages/talos_messenger_core/src/services/inbound_service.rs index 62fc4bbd..5952aedc 100644 --- a/packages/talos_messenger_core/src/services/inbound_service.rs +++ b/packages/talos_messenger_core/src/services/inbound_service.rs @@ -109,7 +109,7 @@ where debug!("[Commit] Updating tpl to version .. {commit_offset}"); let _ = self.message_receiver.update_offset_to_commit(commit_offset as i64); - let _ = self.message_receiver.commit(); + let _ = self.message_receiver.commit_async(); } } }