Skip to content

Commit

Permalink
SAMZA-2797: Add flush to CoordinatorStreamSystemProducer stop to ensu…
Browse files Browse the repository at this point in the history
…re async sends are flushed prior to stop
  • Loading branch information
ajothomas committed Nov 21, 2023
1 parent 66495b6 commit 5c2df1c
Showing 1 changed file with 3 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.samza.coordinator.stream.CoordinatorStreamWriter.SOURCE;

/**
* A wrapper around a SystemProducer that provides helpful methods for dealing
* with the coordinator stream.
Expand Down Expand Up @@ -105,6 +107,7 @@ public void start() {
*/
public void stop() {
log.info("Stopping coordinator stream producer.");
systemProducer.flush(SOURCE);
systemProducer.stop();
systemAdmin.stop();
isStarted = false;
Expand Down

0 comments on commit 5c2df1c

Please sign in to comment.