From b03ecea78715733555aa2cf7383872e7c4ea67ab Mon Sep 17 00:00:00 2001 From: dsorokin Date: Tue, 23 Oct 2018 14:14:29 +0200 Subject: [PATCH 1/3] ARUHA-1978 Be able to specify commit timeout less than 60 seconds --- docs/_data/nakadi-event-bus-api.yaml | 39 ++++++++++++++++--- .../zalando/nakadi/config/NakadiSettings.java | 10 ++--- .../SubscriptionStreamController.java | 7 ++-- .../nakadi/service/CursorsService.java | 2 +- .../subscription/StreamParameters.java | 17 +++++--- .../nakadi/view/UserStreamParameters.java | 10 ++++- src/main/resources/application.yml | 5 +-- .../subscription/StreamParametersTest.java | 2 +- 8 files changed, 67 insertions(+), 25 deletions(-) diff --git a/docs/_data/nakadi-event-bus-api.yaml b/docs/_data/nakadi-event-bus-api.yaml index 72454a7c33..ee3882cf90 100644 --- a/docs/_data/nakadi-event-bus-api.yaml +++ b/docs/_data/nakadi-event-bus-api.yaml @@ -1240,6 +1240,7 @@ paths: - $ref: '#/parameters/BatchFlushTimeout' - $ref: '#/parameters/StreamTimeout' - $ref: '#/parameters/StreamKeepAliveLimit' + - $ref: '#/parameters/CommitTimeout' - name: X-Flow-Id in: header description: | @@ -1382,6 +1383,17 @@ paths: default: 0 minimum: 0 maximum: 4200 + commit_timeout: + description: | + Maximum amount of seconds that nakadi will be waiting for commit after sending a batch to a client. + In case if commit is not coming within this timeout, nakadi will initialize stream termination, no + new data will be sent. Partitions from this stream will be assigned to other streams. + Setting commit_timeout to 0 is equal to setting it to maximum allowed value - 60 sesconds. + type: number + format: int32 + default: 60 + maximum: 60 + minimum: 0 - $ref: '#/parameters/SubscriptionId' - name: X-Flow-Id in: header @@ -1972,11 +1984,11 @@ definitions: For concrete examples of what will be enforced by Nakadi see the objects BusinessEvent and DataChangeEvent below. schema: - oneOf: + oneOf: - $ref: '#/definitions/BusinessEvent' - $ref: '#/definitions/DataChangeEvent' - $ref: '#/definitions/UndefinedEvent' - + EventMetadata: type: object description: | @@ -2073,9 +2085,9 @@ definitions: type: string required: - eid - - occurred_at - - + - occurred_at + + BusinessEvent: description: | A Business Event. @@ -2230,7 +2242,7 @@ definitions: CursorDistanceResult: allOf: - $ref: '#/definitions/CursorDistanceQuery' - - type: object + - type: object properties: distance: type: number @@ -3166,6 +3178,21 @@ parameters: required: false default: 0 + CommitTimeout: + name: commit_timeout + in: query + description: | + Maximum amount of seconds that nakadi will be waiting for commit after sending a batch to a client. + In case if commit is not coming within this timeout, nakadi will initialize stream termination, no + new data will be sent. Partitions from this stream will be assigned to other streams. + Setting commit_timeout to 0 is equal to setting it to maximum allowed value - 60 sesconds. + type: number + format: int32 + default: 60 + maximum: 60 + minimum: 0 + required: false + MaxUncommittedEvents: name: max_uncommitted_events in: query diff --git a/src/main/java/org/zalando/nakadi/config/NakadiSettings.java b/src/main/java/org/zalando/nakadi/config/NakadiSettings.java index 4704e396a5..142870192d 100644 --- a/src/main/java/org/zalando/nakadi/config/NakadiSettings.java +++ b/src/main/java/org/zalando/nakadi/config/NakadiSettings.java @@ -14,7 +14,7 @@ public class NakadiSettings { private final int defaultTopicReplicaFactor; private final long defaultTopicRetentionMs; private final long defaultTopicRotationMs; - private final long defaultCommitTimeoutSeconds; + private final long maxCommitTimeout; private final long kafkaPollTimeoutMs; private final long kafkaSendTimeoutMs; private final long timelineWaitTimeoutMs; @@ -30,7 +30,7 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo @Value("${nakadi.topic.default.replicaFactor}") final int defaultTopicReplicaFactor, @Value("${nakadi.topic.default.retentionMs}") final long defaultTopicRetentionMs, @Value("${nakadi.topic.default.rotationMs}") final long defaultTopicRotationMs, - @Value("${nakadi.stream.default.commitTimeout}") final long defaultCommitTimeoutSeconds, + @Value("${nakadi.stream.max.commitTimeout}") final long maxCommitTimeout, @Value("${nakadi.kafka.poll.timeoutMs}") final long kafkaPollTimeoutMs, @Value("${nakadi.kafka.send.timeoutMs}") final long kafkaSendTimeoutMs, @Value("${nakadi.timeline.wait.timeoutMs}") final long timelineWaitTimeoutMs, @@ -45,7 +45,7 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo this.defaultTopicReplicaFactor = defaultTopicReplicaFactor; this.defaultTopicRetentionMs = defaultTopicRetentionMs; this.defaultTopicRotationMs = defaultTopicRotationMs; - this.defaultCommitTimeoutSeconds = defaultCommitTimeoutSeconds; + this.maxCommitTimeout = maxCommitTimeout; this.kafkaPollTimeoutMs = kafkaPollTimeoutMs; this.kafkaSendTimeoutMs = kafkaSendTimeoutMs; this.eventMaxBytes = eventMaxBytes; @@ -76,8 +76,8 @@ public long getDefaultTopicRotationMs() { return defaultTopicRotationMs; } - public long getDefaultCommitTimeoutSeconds() { - return defaultCommitTimeoutSeconds; + public long getMaxCommitTimeout() { + return maxCommitTimeout; } public long getKafkaPollTimeoutMs() { diff --git a/src/main/java/org/zalando/nakadi/controller/SubscriptionStreamController.java b/src/main/java/org/zalando/nakadi/controller/SubscriptionStreamController.java index 506f5541a4..39d97df525 100644 --- a/src/main/java/org/zalando/nakadi/controller/SubscriptionStreamController.java +++ b/src/main/java/org/zalando/nakadi/controller/SubscriptionStreamController.java @@ -158,7 +158,7 @@ public StreamingResponseBody streamEvents( final Client client) { final StreamParameters streamParameters = StreamParameters.of(userParameters, - nakadiSettings.getDefaultCommitTimeoutSeconds(), client); + nakadiSettings.getMaxCommitTimeout(), client); return stream(subscriptionId, request, response, client, streamParameters); } @@ -174,13 +174,14 @@ public StreamingResponseBody streamEvents( @Nullable @RequestParam(value = "stream_timeout", required = false) final Long streamTimeout, @Nullable @RequestParam(value = "stream_keep_alive_limit", required = false) final Integer streamKeepAliveLimit, + @Nullable @RequestParam(value = "commit_timeout", required = false) final Long commitTimeout, final HttpServletRequest request, final HttpServletResponse response, final Client client) { final UserStreamParameters userParameters = new UserStreamParameters(batchLimit, streamLimit, batchTimeout, - streamTimeout, streamKeepAliveLimit, maxUncommittedEvents, ImmutableList.of()); + streamTimeout, streamKeepAliveLimit, maxUncommittedEvents, ImmutableList.of(), commitTimeout); final StreamParameters streamParameters = StreamParameters.of(userParameters, - nakadiSettings.getDefaultCommitTimeoutSeconds(), client); + nakadiSettings.getMaxCommitTimeout(), client); return stream(subscriptionId, request, response, client, streamParameters); } diff --git a/src/main/java/org/zalando/nakadi/service/CursorsService.java b/src/main/java/org/zalando/nakadi/service/CursorsService.java index 7c5481588e..58b1fd1309 100644 --- a/src/main/java/org/zalando/nakadi/service/CursorsService.java +++ b/src/main/java/org/zalando/nakadi/service/CursorsService.java @@ -177,7 +177,7 @@ public void resetCursors(final String subscriptionId, final List c zkClient, subscription, timelineService, cursorConverter)); // add 1 second to commit timeout in order to give time to finish reset if there is uncommitted events if (!cursors.isEmpty()) { - final long timeout = TimeUnit.SECONDS.toMillis(nakadiSettings.getDefaultCommitTimeoutSeconds()) + + final long timeout = TimeUnit.SECONDS.toMillis(nakadiSettings.getMaxCommitTimeout()) + TimeUnit.SECONDS.toMillis(1); zkClient.resetCursors( cursors.stream().map(cursorConverter::convertToNoToken).collect(Collectors.toList()), diff --git a/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java b/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java index 3f55488190..ed3f5b460d 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java @@ -45,8 +45,10 @@ public class StreamParameters { private final List partitions; - private StreamParameters(final UserStreamParameters userParameters, final long commitTimeoutMillis, - final Client consumingClient) throws WrongStreamParametersException { + private StreamParameters( + final UserStreamParameters userParameters, + final long maxCommitTimeout, + final Client consumingClient) throws WrongStreamParametersException { this.batchLimitEvents = userParameters.getBatchLimit().orElse(1); if (batchLimitEvents <= 0) { @@ -61,8 +63,13 @@ private StreamParameters(final UserStreamParameters userParameters, final long c this.maxUncommittedMessages = userParameters.getMaxUncommittedEvents().orElse(10); this.batchKeepAliveIterations = userParameters.getStreamKeepAliveLimit(); this.partitions = userParameters.getPartitions(); - this.commitTimeoutMillis = TimeUnit.SECONDS.toMillis(commitTimeoutMillis); this.consumingClient = consumingClient; + + final long commitTimeout = userParameters.getCommitTimeoutSeconds().orElse(maxCommitTimeout); + if (commitTimeout > maxCommitTimeout) { + throw new WrongStreamParametersException("commit_timeout can not be more than " + maxCommitTimeout); + } + this.commitTimeoutMillis = TimeUnit.SECONDS.toMillis(commitTimeout <= 0 ? maxCommitTimeout : commitTimeout); } public long getMessagesAllowedToSend(final long limit, final long sentSoFar) { @@ -86,9 +93,9 @@ public List getPartitions() { } public static StreamParameters of(final UserStreamParameters userStreamParameters, - final long commitTimeoutSeconds, + final long maxCommitTimeoutSeconds, final Client client) throws WrongStreamParametersException { - return new StreamParameters(userStreamParameters, commitTimeoutSeconds, client); + return new StreamParameters(userStreamParameters, maxCommitTimeoutSeconds, client); } } diff --git a/src/main/java/org/zalando/nakadi/view/UserStreamParameters.java b/src/main/java/org/zalando/nakadi/view/UserStreamParameters.java index 879e960d68..48c44249a1 100644 --- a/src/main/java/org/zalando/nakadi/view/UserStreamParameters.java +++ b/src/main/java/org/zalando/nakadi/view/UserStreamParameters.java @@ -25,6 +25,8 @@ public class UserStreamParameters { private final List partitions; + private final Optional commitTimeoutSeconds; + @JsonCreator public UserStreamParameters(@JsonProperty("batch_limit") @Nullable final Integer batchLimit, @JsonProperty("stream_limit") @Nullable final Long streamLimit, @@ -32,7 +34,8 @@ public UserStreamParameters(@JsonProperty("batch_limit") @Nullable final Integer @JsonProperty("stream_timeout") @Nullable final Long streamTimeout, @JsonProperty("stream_keep_alive_limit") @Nullable final Integer streamKeepAliveLimit, @JsonProperty("max_uncommitted_events") @Nullable final Integer maxUncommittedEvents, - @JsonProperty("partitions") @Nullable final List partitions) { + @JsonProperty("partitions") @Nullable final List partitions, + @JsonProperty("commit_timeout") final Long commitTimeoutSeconds) { this.batchLimit = Optional.ofNullable(batchLimit); this.streamLimit = Optional.ofNullable(streamLimit); this.batchFlushTimeout = Optional.ofNullable(batchFlushTimeout); @@ -40,6 +43,7 @@ public UserStreamParameters(@JsonProperty("batch_limit") @Nullable final Integer this.streamKeepAliveLimit = Optional.ofNullable(streamKeepAliveLimit); this.maxUncommittedEvents = Optional.ofNullable(maxUncommittedEvents); this.partitions = partitions == null ? ImmutableList.of() : partitions; + this.commitTimeoutSeconds = Optional.ofNullable(commitTimeoutSeconds); } public Optional getBatchLimit() { @@ -69,4 +73,8 @@ public Optional getMaxUncommittedEvents() { public List getPartitions() { return partitions; } + + public Optional getCommitTimeoutSeconds() { + return commitTimeoutSeconds; + } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 37ccbe82a4..ce32b7e743 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -55,7 +55,7 @@ nakadi: warnMessage: "Compaction warning" stream: timeoutMs: 31536000000 # 1 year :-P - default.commitTimeout: 60 # 1 minute + max.commitTimeout: 60 # 1 minute maxConnections: 5 maxStreamMemoryBytes: 50000000 # ~50 MB kafka: @@ -136,8 +136,7 @@ spring: nakadi: stream: maxStreamMemoryBytes: 10_000 # ~10 Kb - default: - commitTimeout: 5 # seconds + max.commitTimeout: 5 # seconds subscription: maxPartitions: 30 features.defaultFeatures: diff --git a/src/test/java/org/zalando/nakadi/service/subscription/StreamParametersTest.java b/src/test/java/org/zalando/nakadi/service/subscription/StreamParametersTest.java index 538bf75942..d876610c46 100644 --- a/src/test/java/org/zalando/nakadi/service/subscription/StreamParametersTest.java +++ b/src/test/java/org/zalando/nakadi/service/subscription/StreamParametersTest.java @@ -96,7 +96,7 @@ public static StreamParameters createStreamParameters(final int batchLimitEvents final Client client) throws WrongStreamParametersException { final UserStreamParameters userParams = new UserStreamParameters(batchLimitEvents, streamLimitEvents, batchTimeoutSeconds, streamTimeoutSeconds, batchKeepAliveIterations, maxUncommittedMessages, - ImmutableList.of()); + ImmutableList.of(), commitTimeoutSeconds); return StreamParameters.of(userParams, commitTimeoutSeconds, client); } } From 1e92714db87b44061f36b38498dfc237c7b73263 Mon Sep 17 00:00:00 2001 From: dsorokin Date: Mon, 29 Oct 2018 09:47:21 +0100 Subject: [PATCH 2/3] ARUHA-1978 Fixes after review --- docs/_data/nakadi-event-bus-api.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/_data/nakadi-event-bus-api.yaml b/docs/_data/nakadi-event-bus-api.yaml index ee3882cf90..1003787656 100644 --- a/docs/_data/nakadi-event-bus-api.yaml +++ b/docs/_data/nakadi-event-bus-api.yaml @@ -1386,9 +1386,9 @@ paths: commit_timeout: description: | Maximum amount of seconds that nakadi will be waiting for commit after sending a batch to a client. - In case if commit is not coming within this timeout, nakadi will initialize stream termination, no + In case if commit does not come within this timeout, nakadi will initialize stream termination, no new data will be sent. Partitions from this stream will be assigned to other streams. - Setting commit_timeout to 0 is equal to setting it to maximum allowed value - 60 sesconds. + Setting commit_timeout to 0 is equal to setting it to the maximum allowed value - 60 seconds. type: number format: int32 default: 60 @@ -3183,9 +3183,9 @@ parameters: in: query description: | Maximum amount of seconds that nakadi will be waiting for commit after sending a batch to a client. - In case if commit is not coming within this timeout, nakadi will initialize stream termination, no + In case if commit does not come within this timeout, nakadi will initialize stream termination, no new data will be sent. Partitions from this stream will be assigned to other streams. - Setting commit_timeout to 0 is equal to setting it to maximum allowed value - 60 sesconds. + Setting commit_timeout to 0 is equal to setting it to the maximum allowed value - 60 seconds. type: number format: int32 default: 60 From 290ae46566226369eb3c54792776b62be3aa7dbd Mon Sep 17 00:00:00 2001 From: dsorokin Date: Tue, 30 Oct 2018 14:34:36 +0100 Subject: [PATCH 3/3] ARUHA-1978 Fix bug with negative commit timeout --- .../zalando/nakadi/service/subscription/StreamParameters.java | 4 +++- .../java/org/zalando/nakadi/view/UserStreamParameters.java | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java b/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java index ed3f5b460d..3dfb62df51 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java @@ -68,8 +68,10 @@ private StreamParameters( final long commitTimeout = userParameters.getCommitTimeoutSeconds().orElse(maxCommitTimeout); if (commitTimeout > maxCommitTimeout) { throw new WrongStreamParametersException("commit_timeout can not be more than " + maxCommitTimeout); + } else if (commitTimeout < 0) { + throw new WrongStreamParametersException("commit_timeout can not be less than 0"); } - this.commitTimeoutMillis = TimeUnit.SECONDS.toMillis(commitTimeout <= 0 ? maxCommitTimeout : commitTimeout); + this.commitTimeoutMillis = TimeUnit.SECONDS.toMillis(commitTimeout == 0 ? maxCommitTimeout : commitTimeout); } public long getMessagesAllowedToSend(final long limit, final long sentSoFar) { diff --git a/src/main/java/org/zalando/nakadi/view/UserStreamParameters.java b/src/main/java/org/zalando/nakadi/view/UserStreamParameters.java index 48c44249a1..0a00b09891 100644 --- a/src/main/java/org/zalando/nakadi/view/UserStreamParameters.java +++ b/src/main/java/org/zalando/nakadi/view/UserStreamParameters.java @@ -35,7 +35,7 @@ public UserStreamParameters(@JsonProperty("batch_limit") @Nullable final Integer @JsonProperty("stream_keep_alive_limit") @Nullable final Integer streamKeepAliveLimit, @JsonProperty("max_uncommitted_events") @Nullable final Integer maxUncommittedEvents, @JsonProperty("partitions") @Nullable final List partitions, - @JsonProperty("commit_timeout") final Long commitTimeoutSeconds) { + @JsonProperty("commit_timeout") @Nullable final Long commitTimeoutSeconds) { this.batchLimit = Optional.ofNullable(batchLimit); this.streamLimit = Optional.ofNullable(streamLimit); this.batchFlushTimeout = Optional.ofNullable(batchFlushTimeout);