-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
If Processor stopped, should not put records any more #10
Comments
Hello @blling, thanks for raising the issue! There is certainly room for improvement in the synchronization during reassignments. 😄 Your issue made me look into it in more detail. A In case a Processor stops as a result of an exception during processing, the processor queue will fill up and block, no new messages are processed and therefore no offsets for that partition should be altered. Thanks Lars |
@larsp I have made some failure test (#11 ) for discussion. I think: a. b. |
Thanks for providing the tests. I will look into it 👍 |
Thanks again for providing the tests @blling
Usually I would expect the processing code to be stable, so that it is not throwing any unchecked exceptions. If such exception happens it results in an error log which ideally should trigger an alert in your application monitoring setup. I would argue that this exception is probably something lower level from the file system or the database, thus one could assume that other consumers/ processors are running into the same situation and eventually run into the same problem. Let me think about it for a while and see if stopping the consumer on processing errors is something which should be added.
In the The test will pass once "exactly once" has been enabled: Consumer: Since (in my experience) exactly once comes with a performance penalty I would try to make sure my message stream and processing is idempotent: Consuming a message multiple times will lead to the same result as consuming it once. Hope that helped Thanks Lars |
If the
Processor
is stopped,BlockingQueueConsumer
should not put record intoProcessor
at all.https://github.com/datanerds-io/verteiler/blob/develop/verteiler/src/main/java/io/datanerds/verteiler/BlockingQueueConsumer.java#L113
The text was updated successfully, but these errors were encountered: