From 613be22ac4962387b3765d4a4e9458f00ca73d91 Mon Sep 17 00:00:00 2001 From: Marco Lehmann Date: Mon, 4 Feb 2019 13:21:48 +0100 Subject: [PATCH] Moved filter to constructor --- .../nakadi/service/subscription/StreamParameters.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java b/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java index f76bd17333..f690ee99fd 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java @@ -61,7 +61,7 @@ private StreamParameters( .filter(timeout -> timeout > 0 && timeout <= EventStreamConfig.MAX_STREAM_TIMEOUT) .orElse((long) EventStreamConfig.generateDefaultStreamTimeout())); this.maxUncommittedMessages = userParameters.getMaxUncommittedEvents().orElse(10); - this.batchKeepAliveIterations = userParameters.getStreamKeepAliveLimit(); + this.batchKeepAliveIterations = userParameters.getStreamKeepAliveLimit().filter(v -> v != 0); this.partitions = userParameters.getPartitions(); this.consumingClient = consumingClient; @@ -78,12 +78,12 @@ public long getMessagesAllowedToSend(final long limit, final long sentSoFar) { return streamLimitEvents.map(v -> Math.max(0, Math.min(limit, v - sentSoFar))).orElse(limit); } - public boolean isStreamLimitReached(final long commitedEvents) { - return streamLimitEvents.map(v -> v <= commitedEvents).orElse(false); + public boolean isStreamLimitReached(final long committedEvents) { + return streamLimitEvents.map(v -> v <= committedEvents).orElse(false); } public boolean isKeepAliveLimitReached(final IntStream keepAlive) { - return batchKeepAliveIterations.map(it -> keepAlive.allMatch(v -> v >= it) && it > 0).orElse(false); + return batchKeepAliveIterations.map(it -> keepAlive.allMatch(v -> v >= it)).orElse(false); } public Client getConsumingClient() {