-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Decide on how to use poll and flush in producer #10
Comments
OK, this is my new understanding of librdkafka producers:
Implications:
|
…12) I believe a nullary call to poll uses a default timeout of -1 (wait indefinitely), but we really just want to make sure that pending callbacks are triggered for the acks that have been buffered in the background, from previous events. poll(0) will not block if the buffer is empty. For testing we need to call flush(-1), so add sync=True as an option. Documentation for `rd_kafka_poll`: https://github.com/edenhill/librdkafka/blob/4faeb8132521da70b6bcde14423a14eb7ed5c55e/src/rdkafka.h#L3079 This addresses part of #10
…12) I believe a nullary call to poll uses a default timeout of -1 (wait indefinitely), but we really just want to make sure that pending callbacks are triggered for the acks that have been buffered in the background, from previous events. poll(0) will not block if the buffer is empty. For testing we need to call flush(-1), so add sync=True as an option. Documentation for `rd_kafka_poll`: https://github.com/edenhill/librdkafka/blob/4faeb8132521da70b6bcde14423a14eb7ed5c55e/src/rdkafka.h#L3079 This addresses part of #10
…; deduplicating config; cache producers (#12) I believe a nullary call to poll uses a default timeout of -1 (wait indefinitely), but we really just want to make sure that pending callbacks are triggered for the acks that have been buffered in the background, from previous events. poll(0) will not block if the buffer is empty. For testing we need to call flush(-1), so add sync=True as an option. Documentation for `rd_kafka_poll`: https://github.com/edenhill/librdkafka/blob/4faeb8132521da70b6bcde14423a14eb7ed5c55e/src/rdkafka.h#L3079 This addresses part of #10 Configuration is put in config.py rather than inside the send to event bus function so it can be more widely used, do note that setting names are changed to have the EVENT_BUS_KAFKA prefix for use. Caching implemented on producers so data is not lost (one test breaks as a result).
…ache producers (#12) I believe a nullary call to poll uses a default timeout of -1 (wait indefinitely), but we really just want to make sure that pending callbacks are triggered for the acks that have been buffered in the background, from previous events. poll(0) will not block if the buffer is empty. For testing we need to call flush(-1), so add sync=True as an option. Documentation for `rd_kafka_poll`: https://github.com/edenhill/librdkafka/blob/4faeb8132521da70b6bcde14423a14eb7ed5c55e/src/rdkafka.h#L3079 This addresses part of #10 Configuration is put in config.py rather than inside the send to event bus function so it can be more widely used, do note that setting names are changed to have the EVENT_BUS_KAFKA prefix for use. Caching implemented on producers so data is not lost (one test breaks as a result).
|
Hi! Do you know some reference to this point you mentioned? We are seeing such a problem and would like to investigate how to mitigate it. |
Maybe I found it... Would this be the ref: https://docs.confluent.io/platform/7.5/clients/librdkafka/html/classRdKafka_1_1Handle.html#a98d3b4ee48457ff13e4d5155e3fc5ea4 ? |
Yep, that looks right. You can see what we ended up doing, along with the reasoning (note that we call poll from two places.) |
<!-- Thanks for sending a pull request! Here are some tips for you: 1. Run unit tests and ensure that they are passing 2. If your change introduces any API changes, make sure to update the e2e tests 3. Make sure documentation is updated for your PR! --> # Description <!-- Briefly describe the motivation for the change. Please include illustrations where appropriate. --> We've seen gradual increase of memory usage when model observability is enabled for a model. First hypothesis why this happened is due to using asyncio, because we pass prediction input and output to the async function. We try to prove it by reducing sampling rate to 0, since the async function that being called need to publish to kafka, so we need to isolate this only for asyncio overhead. After set sampling rate to 0 the memory usage is stable there is no gradual increase Since first hypothesis is not correct, we have new hypothesis that this is due to publishing the data to kafka, and we did memory profiler to the model PS: We use memray as profiler https://github.com/bloomberg/memray <img width="1135" alt="Screen Shot 2024-01-29 at 09 47 21" src="https://github.com/caraml-dev/merlin/assets/2369255/2f4e6aaf-2279-4052-97d2-f5515d743f76"> <img width="1777" alt="Screen Shot 2024-01-29 at 09 49 11" src="https://github.com/caraml-dev/merlin/assets/2369255/2ac87035-6f8a-4d4a-8c5f-f216f7904bf5"> <img width="1789" alt="Screen Shot 2024-01-29 at 09 48 52" src="https://github.com/caraml-dev/merlin/assets/2369255/556bf0bf-d90e-4720-af53-34029d2c45ec"> We see that the memory usage is keep increasing and producing the message to kafka contribute to this. # Modifications To solve this problem, kafka producer must call `poll` after publish the message, this is necessary so `ack` buffer from producer will be drained and the memory usage won't gradually increase, ref: [1](openedx/event-bus-kafka#10) , [2](https://github.com/confluentinc/librdkafka/wiki/FAQ#when-and-how-should-i-call-rd_kafka_poll) After the changes <img width="1139" alt="Screen Shot 2024-01-29 at 09 49 02" src="https://github.com/caraml-dev/merlin/assets/2369255/beab944d-fe05-445c-865a-524ef40630d0"> <img width="1786" alt="Screen Shot 2024-01-29 at 09 49 19" src="https://github.com/caraml-dev/merlin/assets/2369255/0d7a79ad-36c7-4154-92af-e00c6a893b8e"> # Tests <!-- Besides the existing / updated automated tests, what specific scenarios should be tested? Consider the backward compatibility of the changes, whether corner cases are covered, etc. Please describe the tests and check the ones that have been completed. Eg: - [x] Deploying new and existing standard models - [ ] Deploying PyFunc models --> # Checklist - [x] Added PR label - [ ] Added unit test, integration, and/or e2e tests - [x] Tested locally - [ ] Updated documentation - [ ] Update Swagger spec if the PR introduce API changes - [ ] Regenerated Golang and Python client if the PR introduces API changes # Release Notes <!-- Does this PR introduce a user-facing change? If no, just write "NONE" in the release-note block below. If yes, a release note is required. Enter your extended release note in the block below. If the PR requires additional action from users switching to the new release, include the string "action required". For more information about release notes, see kubernetes' guide here: http://git.k8s.io/community/contributors/guide/release-notes.md --> ```release-note ```
We currently use poll() on the producer, but it wasn't clear whether we should use that, or how (or with what timeout), and when we should use flush() instead/too.
event-bus-kafka/edx_event_bus_kafka/publishing/event_producer.py
Line 221 in f0f7386
Also investigate:
enable.idempotence=true
acks=all
Resources:
The text was updated successfully, but these errors were encountered: