Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decide on how to use poll and flush in producer #10

Closed
timmc-edx opened this issue Jul 20, 2022 · 5 comments
Closed

Decide on how to use poll and flush in producer #10

timmc-edx opened this issue Jul 20, 2022 · 5 comments
Assignees
Labels
event-bus Work related to the Event Bus.

Comments

@timmc-edx
Copy link
Contributor

timmc-edx commented Jul 20, 2022

We currently use poll() on the producer, but it wasn't clear whether we should use that, or how (or with what timeout), and when we should use flush() instead/too.

producer.poll() # wait indefinitely for the above event to either be delivered or fail

Also investigate:

  • enable.idempotence=true
  • acks=all

Resources:

@timmc-edx timmc-edx added the event-bus Work related to the Event Bus. label Jul 20, 2022
@timmc-edx timmc-edx moved this to Todo in Arch-BOM Jul 20, 2022
@timmc-edx timmc-edx moved this from Todo to In Progress in Arch-BOM Jul 25, 2022
@timmc-edx timmc-edx self-assigned this Jul 25, 2022
@timmc-edx
Copy link
Contributor Author

timmc-edx commented Jul 27, 2022

OK, this is my new understanding of librdkafka producers:

  • Messages we produce will actually be accumulated into batches before (asynchronous) delivery to the broker, with batch size and timing depending on our producer settings. Batching is fundamental to Kafka's performance.
  • Incoming acks to those messages accumulate asynchronously in a buffer, and do not immediately trigger callbacks.
  • The only job of poll() is to wait for acks and trigger callbacks—it doesn't affect delivery to the broker at all. poll(0) checks if anything is already in that buffer, and triggers callbacks to be made if there were any. Passing a non-zero timeout makes poll() first block until there's at least one message.
    • Question: Are callbacks made on the same thread as poll()?
  • Something needs to call poll() regularly to ensure callbacks are triggered.
  • flush() both ensures that the outgoing message batch is sent and that all acks have been received (and callbacks triggered, with them).

Implications:

@timmc-edx timmc-edx changed the title Decide on whether to use poll or flush in producer Decide on how to use poll and flush in producer Jul 27, 2022
timmc-edx added a commit that referenced this issue Jul 27, 2022
…12)

I believe a nullary call to poll uses a default timeout of -1 (wait
indefinitely), but we really just want to make sure that pending callbacks
are triggered for the acks that have been buffered in the background, from
previous events. poll(0) will not block if the buffer is empty.

For testing we need to call flush(-1), so add sync=True as an option.

Documentation for `rd_kafka_poll`:
https://github.com/edenhill/librdkafka/blob/4faeb8132521da70b6bcde14423a14eb7ed5c55e/src/rdkafka.h#L3079

This addresses part of #10
Repository owner moved this from In Progress to Done in Arch-BOM Jul 27, 2022
whuang1202 pushed a commit that referenced this issue Aug 10, 2022
…12)

I believe a nullary call to poll uses a default timeout of -1 (wait
indefinitely), but we really just want to make sure that pending callbacks
are triggered for the acks that have been buffered in the background, from
previous events. poll(0) will not block if the buffer is empty.

For testing we need to call flush(-1), so add sync=True as an option.

Documentation for `rd_kafka_poll`:
https://github.com/edenhill/librdkafka/blob/4faeb8132521da70b6bcde14423a14eb7ed5c55e/src/rdkafka.h#L3079

This addresses part of #10
whuang1202 pushed a commit that referenced this issue Aug 10, 2022
…; deduplicating config; cache producers (#12)

I believe a nullary call to poll uses a default timeout of -1 (wait
indefinitely), but we really just want to make sure that pending callbacks
are triggered for the acks that have been buffered in the background, from
previous events. poll(0) will not block if the buffer is empty.

For testing we need to call flush(-1), so add sync=True as an option.

Documentation for `rd_kafka_poll`:
https://github.com/edenhill/librdkafka/blob/4faeb8132521da70b6bcde14423a14eb7ed5c55e/src/rdkafka.h#L3079

This addresses part of #10

Configuration is put in config.py rather than inside the send to event bus function so it can be more widely used,
do note that setting names are changed to have the EVENT_BUS_KAFKA prefix for use.

Caching implemented on producers so data is not lost (one test breaks as a result).
whuang1202 pushed a commit that referenced this issue Aug 10, 2022
…ache producers (#12)

I believe a nullary call to poll uses a default timeout of -1 (wait
indefinitely), but we really just want to make sure that pending callbacks
are triggered for the acks that have been buffered in the background, from
previous events. poll(0) will not block if the buffer is empty.

For testing we need to call flush(-1), so add sync=True as an option.

Documentation for `rd_kafka_poll`:
https://github.com/edenhill/librdkafka/blob/4faeb8132521da70b6bcde14423a14eb7ed5c55e/src/rdkafka.h#L3079

This addresses part of #10

Configuration is put in config.py rather than inside the send to event bus function so it can be more widely used,
do note that setting names are changed to have the EVENT_BUS_KAFKA prefix for use.

Caching implemented on producers so data is not lost (one test breaks as a result).
@timmc-edx
Copy link
Contributor Author

enable.idempotence=true and acks=all turn out to be the default values, and are what we want.

@mparada-suva
Copy link

[...] Passing a non-zero timeout makes poll() first block until there's at least one message.

Hi! Do you know some reference to this point you mentioned? We are seeing such a problem and would like to investigate how to mitigate it.

@mparada-suva
Copy link

@timmc-edx
Copy link
Contributor Author

timmc-edx commented Nov 22, 2023

Yep, that looks right. You can see what we ended up doing, along with the reasoning (note that we call poll from two places.)

tiopramayudi added a commit to caraml-dev/merlin that referenced this issue Jan 29, 2024
<!--  Thanks for sending a pull request!  Here are some tips for you:

1. Run unit tests and ensure that they are passing
2. If your change introduces any API changes, make sure to update the
e2e tests
3. Make sure documentation is updated for your PR!

-->
# Description
<!-- Briefly describe the motivation for the change. Please include
illustrations where appropriate. -->
We've seen gradual increase of memory usage when model observability is
enabled for a model.
 
First hypothesis why this happened is due to using asyncio, because we
pass prediction input and output to the async function. We try to prove
it by reducing sampling rate to 0, since the async function that being
called need to publish to kafka, so we need to isolate this only for
asyncio overhead. After set sampling rate to 0 the memory usage is
stable there is no gradual increase

Since first hypothesis is not correct, we have new hypothesis that this
is due to publishing the data to kafka, and we did memory profiler to
the model

PS: We use memray as profiler https://github.com/bloomberg/memray
 
<img width="1135" alt="Screen Shot 2024-01-29 at 09 47 21"
src="https://github.com/caraml-dev/merlin/assets/2369255/2f4e6aaf-2279-4052-97d2-f5515d743f76">

<img width="1777" alt="Screen Shot 2024-01-29 at 09 49 11"
src="https://github.com/caraml-dev/merlin/assets/2369255/2ac87035-6f8a-4d4a-8c5f-f216f7904bf5">

<img width="1789" alt="Screen Shot 2024-01-29 at 09 48 52"
src="https://github.com/caraml-dev/merlin/assets/2369255/556bf0bf-d90e-4720-af53-34029d2c45ec">

We see that the memory usage is keep increasing and producing the
message to kafka contribute to this.

# Modifications
To solve this problem, kafka producer must call `poll` after publish the
message, this is necessary so `ack` buffer from producer will be drained
and the memory usage won't gradually increase, ref:
[1](openedx/event-bus-kafka#10) ,
[2](https://github.com/confluentinc/librdkafka/wiki/FAQ#when-and-how-should-i-call-rd_kafka_poll)

After the changes
<img width="1139" alt="Screen Shot 2024-01-29 at 09 49 02"
src="https://github.com/caraml-dev/merlin/assets/2369255/beab944d-fe05-445c-865a-524ef40630d0">

<img width="1786" alt="Screen Shot 2024-01-29 at 09 49 19"
src="https://github.com/caraml-dev/merlin/assets/2369255/0d7a79ad-36c7-4154-92af-e00c6a893b8e">

# Tests
<!-- Besides the existing / updated automated tests, what specific
scenarios should be tested? Consider the backward compatibility of the
changes, whether corner cases are covered, etc. Please describe the
tests and check the ones that have been completed. Eg:
- [x] Deploying new and existing standard models
- [ ] Deploying PyFunc models
-->

# Checklist
- [x] Added PR label
- [ ] Added unit test, integration, and/or e2e tests
- [x] Tested locally
- [ ] Updated documentation
- [ ] Update Swagger spec if the PR introduce API changes
- [ ] Regenerated Golang and Python client if the PR introduces API
changes

# Release Notes
<!--
Does this PR introduce a user-facing change?
If no, just write "NONE" in the release-note block below.
If yes, a release note is required. Enter your extended release note in
the block below.
If the PR requires additional action from users switching to the new
release, include the string "action required".

For more information about release notes, see kubernetes' guide here:
http://git.k8s.io/community/contributors/guide/release-notes.md
-->

```release-note

```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
event-bus Work related to the Event Bus.
Projects
None yet
Development

No branches or pull requests

2 participants