Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates for this project #7

Open
mpopova-yottaa opened this issue May 4, 2015 · 11 comments
Open

Updates for this project #7

mpopova-yottaa opened this issue May 4, 2015 · 11 comments

Comments

@mpopova-yottaa
Copy link
Contributor

Hi, Krishna,
I've picked up your kafka-elasticsearch-standalone-consumer project as the base for Kafka-to-ES functionality I need to implement. As part of this work, I would like to contribute the changes/updates/fixes to your original project. I have pushed my initial changes into my fork already: https://github.com/ppine7/kafka-elasticsearch-standalone-consumer,
and I envision many more changes to come, according to the TODO items I've added to the code as well.
If you are Ok with adding these changes into the main project - I can create a Pull request. If you would rather keep your project as is , I can just keep working on my Fork.

Thanks !
Marina

@reachkrishnaraj
Copy link
Owner

Hi Marina,

Thanks a lot for working on this. I was aware of many of the issues you listed and have the fix in my local but was not able to commit.

Please goahead in committing your changes. I will merge the changes.

Also, can you send me across the TODO list and I can create tickets/issues for it.

Thanks.

@ppine7
Copy link
Collaborator

ppine7 commented May 4, 2015

Great, I've created a Pull request - please take a look!
I will try to compile the TODO list - mostly right now they are in comments int he code. The major TODO that would really speed this up would be, I think , to consolidate transform/prepare steps, to avoid storing events in multiple collections. I am looking for ways to optimize this application as much as possible because I need to process up to 10K events per second, so every small thing counts :)

thanks!
MArina

@mpopova-yottaa
Copy link
Contributor Author

Hi, Krishna,
I have pushed a few more changes into my fork - and the Pull request. Please review when you have a chance.

I was also thinking about further scaling of this application. The current requirement to have one instance of this consumer running as a separate JVM process per one partition, and has to have its own properties file and log file, is not scalable enough. It works great if you have 1, or a couple of partitions, but if you start scaling out to dozens and hundreds of partitions - this will not work anymore.

For my use case, I would like to have at least 20 partitions - and maintaining 20 different configs/ logs/ startup scripts is not viable anymore.
In light of that, I think the following tasks could be done as next steps:

  • modify the app to maintain a pool of Consumers - as many as there are partitions for a topic; A very rough approach could be:
    • in the properties file, specify all partitions that should be served by this app
    • in the main Service class (to be developed) - create a fixed executor thread pool with the number of threads equal to the number of partitions
    • convert ConsumerJob to be a Runnable
    • move Kafka and ES initialization from the ConsumerJob into the man Service
    • create and start one thread with the ConsumerJob per partition
    • main Service should be able to send shutdown (interrupt) signal to active ConsumerJobs on shutdown - and CosumerJobs have to check for this signal before each iteration run
  • also, it would be nice to add some kind of monitoring via JMX:
    • we could add a getStatus() method to the ConsumerJob class - that could return whatever metrics might be useful (current offset in works, number of failed messages so far, etc.)
    • main Service could keep references to all created ConsumerJobs and call this method - this could be exposed as a JMX method for monitoring
  • on the other front, I think having one 'index' name/type specified in the config file might not be enough in many cases. For example, often, the event data should be indexed into a specific index based on some event meta-info - for example, customerId or something like that. I think it would be useful to add one more generic interface , like IndexManager, with one method to start with : String getIndex(Object params ... ). In the basic form it would just return the hard-coded (from the config) index name. But for more complex use cases - users could implement their own IndexManagers with whatever logic they need

It is a lot of work, but I think it would take the Kafka-ES consumer to the next scalability level :)

thanks!
Marina

@mpopova-yottaa
Copy link
Contributor Author

sorry, I closed the issue by mistake :) - I'm re-opening it now

@reachkrishnaraj
Copy link
Owner

Hi @mpopova-yottaa : Very glad to see your updates and improvements for this project. Yes, I agree with all of the those. I want to work hand in hand with you. As a next step, I will try to break down the above improvements into small issues and we can start working on it.

Between, I am yet to merge your changes into the main branch. I reviewed it and see couple of places which needs fix and I will comment on those. I also wanted to do a quick test on the new changes. Have you been using the new changed in your fork in your consumer ?

Thank You once again !

@ppine7
Copy link
Collaborator

ppine7 commented May 26, 2015

Great!
Krishna, to answer your question: yes, I did test running the app form my fork - it works fine. Specific scenarios I've tested:
-- happy-path execution with no errors;
-- happy-path execution over multiple restarts of the app - to make sure offsets are stored/read in correctly after restart;
-- shutdown of the app - to make sure the new Shutdown hook works Ok and closes the ES and Kafka connections;
-- failure scenario - parsing failures and ES indexing errors - to make sure the app is not looping forever over the same events and keeps moving to next batch;

I have tested all this using the regular JVM startup mode - not 'jsvc' one - as I figured this would be the preferred way anyway. If you think it is important to keep both modes - we could remove the Main from Manifest.MF at all and require to specify it as part of the startup command. This way you could run either the Main or the Daemon version.

In addition to the list of TODO items I posted earlier, here are couple more thoughts:
I think it might make sense to consider using the High-Level consumer APIs instead of low-level ones. The reasons are:

  • the main reason to use low-level API is to explicitly control offsets per partitions. One case: when event processing or indexing fails for some events - you could theoretically not commit the current offset but keep it at the old value - to re-process the events again. In practice, though, this is not as simple - as very often these failures are not transient and will happen again and again unless you either fix the event format, or bring failed ES cluster up or something like that. It usually requires some investigation and manual fixing of things in order to re-process failed events. So, to avoid endless loops processing/failing the same messages - I have removed this logic already and am committing current offsets regardless of the failures. The key here is to preserve failed events in some other way - for example , in a separate failedEvents.log - and then push them back into Kafka for reprocessing when ready. Using High-Level consumer in this case would produce the same results, I believe.
  • Another reason is to control offset setting for a specific partition - independent of the other partitions. [High-level consumer does not , currently, allow committing offsets for a specific partition - it has the commitOffsets() method that will commit all current offsets of all partitions/consumer groups. = that's my understanding] This can be useful if we have multiple Consumer threads consuming events form multiple partitions (which is not the case in the current state of the app but will be the case once we add the multi-threading). However, even in that case, the failure handling should be still the same - store failed events into some secondary log - and, thus, auto-committing of offsets has exactly the same effect as manual commits.
  • Also, even though High-level consumer does not , currently, allow committing offsets for a specific partition - it looks like they are going to address this in the up-coming Kafka 0.9.0 version: http://mail-archives.apache.org/mod_mbox/kafka-users/201411.mbox/%[email protected]%3E , where low-level and high-level APIs will merge. I believe we should move to the new Consumer API as soon as it is out and then decide whether we need the per-partition offset commits or not. And until then - just use the High-level consumer and avoid the headache of re-initializing Kafka, looking for Leader and such… My main reason for bringing all this up is because I have not tested the re-initialization logic in a real distributed cluster , under different failure scenarios, so not sure how reliable this logic is. If you did test this out and have good results - great!

So, these are just ideas , up for discussion.

Thanks for the great work on this project!
By the way, you can contact me directly at ppine7 at gmail dot com if it is more convenient.

Marina

@ppine7
Copy link
Collaborator

ppine7 commented May 26, 2015

Krishna, also , I think your idea to split the list of TODOs into smaller tasks is great!
For the immediate future, I am planning on working on the multi-threading of the Consumer, since for me this is a critical feature in order to use this app in production.

thanks!
Marina

@reachkrishnaraj
Copy link
Owner

Hi @mpopova-yottaa, @ppine7 : Thanks. I started working on these items. Can we use the new branch(name is: branch2.0) to push our changes so that we can keep the master branch intact of the working code until we work on our improvements ? We can start merging our changes in milestones. I will start creating the issues for individual improvements and I will also pick up couple. Thanks.

@ppine7
Copy link
Collaborator

ppine7 commented May 27, 2015

Sure, sounds good - lets use the new branch. I've added one minor change last night for the IndexHandler - but will modify it slightly to be a bit more generic: to use your approach of specifying the IndexHandler implementation class as a property in the .properties file.
Thanks!
Marina

@reachkrishnaraj
Copy link
Owner

Sure, i will goahead and review the pull request tonight. I am able to comment quickly on the below with the initial glance.

  1. The boolean flag to decide whether to commit the offset 1st has to be retained. The flag is also necessary to determine whether the process was started 1st time.

Also, I will comment on the questions you raised about High Level Consumer. Short answer for now: Its good to have the offset management in our code for few reasons, but as you mentioned, the reintialization part is not reliable and most the time I end up restarting the broker and consumer.

Apologies for the late replies, I will try to be on active on this going forward.

Thanks and your effort is much appreciated.

@ppine7
Copy link
Collaborator

ppine7 commented May 28, 2015

Thanks , Krishna, sounds great!

No worry about timing - I keep pushing changes fast because I really want to use this project in production very soon (in a couple of weeks) - but have to make sure it is scalable enough to handle multiple partitions and a load of a few thousand events per second.

As to the boolean flag: Yes, I agree - the flag is needed. Actually, it is still there (isStartingFirstTime) - and I use it - I just was setting it immediately to 'false' as soon as the offset is committed for the first time. I did that to fix a specific use case I ran into when it kept doing the "first time" logic multiple times if it so happened that no events were arriving from Kafka for some prolonged period of time right after the start.
I think it is hard to just see the changes at this point - as I made too many - sorry :) .
It might be easier to just checkout my fork and try to run it - the first time lookup/commit works well, I think.
Let me know if you see any issues with it - will be happy to fix.

And since we are on the subject of discussing new tasks/improvements - here is one more list of smaller / minor improvements I suggest:

— parameterize all necessary variables (like Kafka timeouts , etc.) - there are many hard-coded timeouts/sleep times in the KafkaClient
— cleanup dependencies - remove easymock, etc. that are not used
— cleanup unused/reserved for future use properties - it is better to keep the current properties file lean and clean , and add new properties as needed in the next versions
— package all dependencies into the target JAR file - to be able to distribute /deploy to different envs
— upgrade ES libs to 1.5
— add JUnit tests - use mockito or such to mock ES and Kafka

thanks!
Marina

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants