-
Notifications
You must be signed in to change notification settings - Fork 103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Does KafkaConsumer.stopConsuming actually allow to process all in-flight messages? #1128
Comments
It's not really clear to me what you want to accomplish, but looks like your issue is not specific to FS2 Kafka, but how Kafka works. Let me explain 👇🏽
According to this, the topic is not created, or the messages are not produced (it's not clear to me). Either way, without messages, you can't play with offsets. In any case, I think you don't need to use Hope this helps to clarify how to proceed and sorry for the late response 😅 |
Hi Alan, thank you for your reply. The topic does exist and it may have messages. I just want all the messages of that topic from beginning to end. The last message in that topic does not have any indication on it that implies that it is the last message. This is why I created this complicated mess to figure out whether the consumer has read anything (by querying the current positions) and stop the consumption. Some background: The use case is a topic that stores the state of a computation. Whenever the result of the computation is produced to Kafka, so is the state (within the same transaction). When the application is restarted, then it needs to get the last state in order to continue where it left off. So it wants the last message of the (compacted) state topic. But state may consist of several states (with a different key each) that need to be combined, so all those messages are read. During the restoration of the state, no new state is written to the topic, but after restoring the state and continuing the computation, new state messages are written. Because I don't know when the application is about to shut down (this may be caused by an exception), I can't reliably write a marker message. But maybe there is a simpler solution to this problem, other than relying on PS: To "solve" the problem, I added a delay before calling |
Ahhh! Huh... 🫠 Tricky (but interesting) scenario. So, the number of different states (different keys) is not known beforehand, right? |
Right - there are cases where we don't know. In most cases however, we know these keys, but they may differ from the actual messages in the topic, because the configuration can change in between restarts of the application. |
And last question, when your consumer |
Because if you can guarantee your instance is the only one doing stuff with that topic, and assuming you are in full control of the topic (your application is the only one writing/reading messages), you can do the following:
But, I'm assuming a couple of things that can't be true 😅 In any case, this solution is 100% deterministic and will work well. At least is better that adding a delay that can fail for multiple reasons alien to you (network latencies, instance CPU resources if you're in Kubernetes...). |
This is an excellent idea. I hope I can try it in the coming weeks. Thanks for the help :) |
The ScalaDoc of
KafkaConsumer.stopConsuming
says that "All currently running streams will continue to run until all in-flight messages will be processed" and "streams will be completed when all fetched messages will be processed". According to it, the question of the title is answered with "yes". However, I stumbled upon a situation that sometimes does not work how I would expect it.My goal is to have a finite stream that reads all currently existing messages from a Kafka topic (that is not produced to in that moment). I can't rely on the offset of the messages itself, since there may be some number of commit and abort markers at the end of the partition that are unavailable to me - so the last actual (committed) message may have an offset that is a bit behind the end offset of the topic. I thought that requesting the actual position and stopping consumption would solve the problem, something like this:
This version would not work with an empty topic, but it allows me to reproduce the issue more reliably compared to a concurrent stream that checks the assignment positions. Please excuse the temporary
println
s that obscure the code a bit. Sometimes, the output looks like this (as expected):But at other times, the last two lines of the output (and the last chunk of the stream) is missing. Maybe Kafka reports updated positions before it sends the records over? Or maybe the
KafkaConsumerActor
actually received the messages, but the stream is stopped prematurely (possibly due to a race condition)? I can't really see anything wrong in KafkaConsumer, but then again the code is more complex than I can handle, so maybe I'm overlooking something.Are my assumptions about how it should work wrong? Or could there be an issue with how
stopConsuming
works? Maybe there is a more reliable way to have a finite stream of all available messages?The text was updated successfully, but these errors were encountered: