diff --git a/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java b/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java index 3dfb62df51..f690ee99fd 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java @@ -61,7 +61,7 @@ private StreamParameters( .filter(timeout -> timeout > 0 && timeout <= EventStreamConfig.MAX_STREAM_TIMEOUT) .orElse((long) EventStreamConfig.generateDefaultStreamTimeout())); this.maxUncommittedMessages = userParameters.getMaxUncommittedEvents().orElse(10); - this.batchKeepAliveIterations = userParameters.getStreamKeepAliveLimit(); + this.batchKeepAliveIterations = userParameters.getStreamKeepAliveLimit().filter(v -> v != 0); this.partitions = userParameters.getPartitions(); this.consumingClient = consumingClient; @@ -78,8 +78,8 @@ public long getMessagesAllowedToSend(final long limit, final long sentSoFar) { return streamLimitEvents.map(v -> Math.max(0, Math.min(limit, v - sentSoFar))).orElse(limit); } - public boolean isStreamLimitReached(final long commitedEvents) { - return streamLimitEvents.map(v -> v <= commitedEvents).orElse(false); + public boolean isStreamLimitReached(final long committedEvents) { + return streamLimitEvents.map(v -> v <= committedEvents).orElse(false); } public boolean isKeepAliveLimitReached(final IntStream keepAlive) { diff --git a/src/test/java/org/zalando/nakadi/service/subscription/StreamParametersTest.java b/src/test/java/org/zalando/nakadi/service/subscription/StreamParametersTest.java index d876610c46..a921fd1501 100644 --- a/src/test/java/org/zalando/nakadi/service/subscription/StreamParametersTest.java +++ b/src/test/java/org/zalando/nakadi/service/subscription/StreamParametersTest.java @@ -70,6 +70,14 @@ public void checkIsKeepAliveLimitReached() throws Exception { assertThat(streamParameters.isKeepAliveLimitReached(IntStream.of(5, 7, 4, 12)), is(false)); } + @Test + public void checkIsKeepAliveLimitReachedIndefinitely() throws Exception { + final StreamParameters streamParameters = createStreamParameters(1, null, 0, null, 0, 0, 0, mock(Client.class)); + + assertThat(streamParameters.isKeepAliveLimitReached(IntStream.of(5, 7, 6, 12)), is(false)); + assertThat(streamParameters.isKeepAliveLimitReached(IntStream.of(5, 7, 4, 12)), is(false)); + } + @Test public void checkGetMessagesAllowedToSend() throws Exception { final StreamParameters streamParameters = createStreamParameters(1, 200L, 0, null, null, 0, 0,