diff --git a/docs/_data/nakadi-event-bus-api.yaml b/docs/_data/nakadi-event-bus-api.yaml index 72454a7c33..1003787656 100644 --- a/docs/_data/nakadi-event-bus-api.yaml +++ b/docs/_data/nakadi-event-bus-api.yaml @@ -1240,6 +1240,7 @@ paths: - $ref: '#/parameters/BatchFlushTimeout' - $ref: '#/parameters/StreamTimeout' - $ref: '#/parameters/StreamKeepAliveLimit' + - $ref: '#/parameters/CommitTimeout' - name: X-Flow-Id in: header description: | @@ -1382,6 +1383,17 @@ paths: default: 0 minimum: 0 maximum: 4200 + commit_timeout: + description: | + Maximum amount of seconds that nakadi will be waiting for commit after sending a batch to a client. + In case if commit does not come within this timeout, nakadi will initialize stream termination, no + new data will be sent. Partitions from this stream will be assigned to other streams. + Setting commit_timeout to 0 is equal to setting it to the maximum allowed value - 60 seconds. + type: number + format: int32 + default: 60 + maximum: 60 + minimum: 0 - $ref: '#/parameters/SubscriptionId' - name: X-Flow-Id in: header @@ -1972,11 +1984,11 @@ definitions: For concrete examples of what will be enforced by Nakadi see the objects BusinessEvent and DataChangeEvent below. schema: - oneOf: + oneOf: - $ref: '#/definitions/BusinessEvent' - $ref: '#/definitions/DataChangeEvent' - $ref: '#/definitions/UndefinedEvent' - + EventMetadata: type: object description: | @@ -2073,9 +2085,9 @@ definitions: type: string required: - eid - - occurred_at - - + - occurred_at + + BusinessEvent: description: | A Business Event. @@ -2230,7 +2242,7 @@ definitions: CursorDistanceResult: allOf: - $ref: '#/definitions/CursorDistanceQuery' - - type: object + - type: object properties: distance: type: number @@ -3166,6 +3178,21 @@ parameters: required: false default: 0 + CommitTimeout: + name: commit_timeout + in: query + description: | + Maximum amount of seconds that nakadi will be waiting for commit after sending a batch to a client. + In case if commit does not come within this timeout, nakadi will initialize stream termination, no + new data will be sent. Partitions from this stream will be assigned to other streams. + Setting commit_timeout to 0 is equal to setting it to the maximum allowed value - 60 seconds. + type: number + format: int32 + default: 60 + maximum: 60 + minimum: 0 + required: false + MaxUncommittedEvents: name: max_uncommitted_events in: query diff --git a/src/main/java/org/zalando/nakadi/config/NakadiSettings.java b/src/main/java/org/zalando/nakadi/config/NakadiSettings.java index 4704e396a5..142870192d 100644 --- a/src/main/java/org/zalando/nakadi/config/NakadiSettings.java +++ b/src/main/java/org/zalando/nakadi/config/NakadiSettings.java @@ -14,7 +14,7 @@ public class NakadiSettings { private final int defaultTopicReplicaFactor; private final long defaultTopicRetentionMs; private final long defaultTopicRotationMs; - private final long defaultCommitTimeoutSeconds; + private final long maxCommitTimeout; private final long kafkaPollTimeoutMs; private final long kafkaSendTimeoutMs; private final long timelineWaitTimeoutMs; @@ -30,7 +30,7 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo @Value("${nakadi.topic.default.replicaFactor}") final int defaultTopicReplicaFactor, @Value("${nakadi.topic.default.retentionMs}") final long defaultTopicRetentionMs, @Value("${nakadi.topic.default.rotationMs}") final long defaultTopicRotationMs, - @Value("${nakadi.stream.default.commitTimeout}") final long defaultCommitTimeoutSeconds, + @Value("${nakadi.stream.max.commitTimeout}") final long maxCommitTimeout, @Value("${nakadi.kafka.poll.timeoutMs}") final long kafkaPollTimeoutMs, @Value("${nakadi.kafka.send.timeoutMs}") final long kafkaSendTimeoutMs, @Value("${nakadi.timeline.wait.timeoutMs}") final long timelineWaitTimeoutMs, @@ -45,7 +45,7 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo this.defaultTopicReplicaFactor = defaultTopicReplicaFactor; this.defaultTopicRetentionMs = defaultTopicRetentionMs; this.defaultTopicRotationMs = defaultTopicRotationMs; - this.defaultCommitTimeoutSeconds = defaultCommitTimeoutSeconds; + this.maxCommitTimeout = maxCommitTimeout; this.kafkaPollTimeoutMs = kafkaPollTimeoutMs; this.kafkaSendTimeoutMs = kafkaSendTimeoutMs; this.eventMaxBytes = eventMaxBytes; @@ -76,8 +76,8 @@ public long getDefaultTopicRotationMs() { return defaultTopicRotationMs; } - public long getDefaultCommitTimeoutSeconds() { - return defaultCommitTimeoutSeconds; + public long getMaxCommitTimeout() { + return maxCommitTimeout; } public long getKafkaPollTimeoutMs() { diff --git a/src/main/java/org/zalando/nakadi/controller/SubscriptionStreamController.java b/src/main/java/org/zalando/nakadi/controller/SubscriptionStreamController.java index 506f5541a4..39d97df525 100644 --- a/src/main/java/org/zalando/nakadi/controller/SubscriptionStreamController.java +++ b/src/main/java/org/zalando/nakadi/controller/SubscriptionStreamController.java @@ -158,7 +158,7 @@ public StreamingResponseBody streamEvents( final Client client) { final StreamParameters streamParameters = StreamParameters.of(userParameters, - nakadiSettings.getDefaultCommitTimeoutSeconds(), client); + nakadiSettings.getMaxCommitTimeout(), client); return stream(subscriptionId, request, response, client, streamParameters); } @@ -174,13 +174,14 @@ public StreamingResponseBody streamEvents( @Nullable @RequestParam(value = "stream_timeout", required = false) final Long streamTimeout, @Nullable @RequestParam(value = "stream_keep_alive_limit", required = false) final Integer streamKeepAliveLimit, + @Nullable @RequestParam(value = "commit_timeout", required = false) final Long commitTimeout, final HttpServletRequest request, final HttpServletResponse response, final Client client) { final UserStreamParameters userParameters = new UserStreamParameters(batchLimit, streamLimit, batchTimeout, - streamTimeout, streamKeepAliveLimit, maxUncommittedEvents, ImmutableList.of()); + streamTimeout, streamKeepAliveLimit, maxUncommittedEvents, ImmutableList.of(), commitTimeout); final StreamParameters streamParameters = StreamParameters.of(userParameters, - nakadiSettings.getDefaultCommitTimeoutSeconds(), client); + nakadiSettings.getMaxCommitTimeout(), client); return stream(subscriptionId, request, response, client, streamParameters); } diff --git a/src/main/java/org/zalando/nakadi/service/CursorsService.java b/src/main/java/org/zalando/nakadi/service/CursorsService.java index 7c5481588e..58b1fd1309 100644 --- a/src/main/java/org/zalando/nakadi/service/CursorsService.java +++ b/src/main/java/org/zalando/nakadi/service/CursorsService.java @@ -177,7 +177,7 @@ public void resetCursors(final String subscriptionId, final List c zkClient, subscription, timelineService, cursorConverter)); // add 1 second to commit timeout in order to give time to finish reset if there is uncommitted events if (!cursors.isEmpty()) { - final long timeout = TimeUnit.SECONDS.toMillis(nakadiSettings.getDefaultCommitTimeoutSeconds()) + + final long timeout = TimeUnit.SECONDS.toMillis(nakadiSettings.getMaxCommitTimeout()) + TimeUnit.SECONDS.toMillis(1); zkClient.resetCursors( cursors.stream().map(cursorConverter::convertToNoToken).collect(Collectors.toList()), diff --git a/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java b/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java index 3f55488190..3dfb62df51 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/StreamParameters.java @@ -45,8 +45,10 @@ public class StreamParameters { private final List partitions; - private StreamParameters(final UserStreamParameters userParameters, final long commitTimeoutMillis, - final Client consumingClient) throws WrongStreamParametersException { + private StreamParameters( + final UserStreamParameters userParameters, + final long maxCommitTimeout, + final Client consumingClient) throws WrongStreamParametersException { this.batchLimitEvents = userParameters.getBatchLimit().orElse(1); if (batchLimitEvents <= 0) { @@ -61,8 +63,15 @@ private StreamParameters(final UserStreamParameters userParameters, final long c this.maxUncommittedMessages = userParameters.getMaxUncommittedEvents().orElse(10); this.batchKeepAliveIterations = userParameters.getStreamKeepAliveLimit(); this.partitions = userParameters.getPartitions(); - this.commitTimeoutMillis = TimeUnit.SECONDS.toMillis(commitTimeoutMillis); this.consumingClient = consumingClient; + + final long commitTimeout = userParameters.getCommitTimeoutSeconds().orElse(maxCommitTimeout); + if (commitTimeout > maxCommitTimeout) { + throw new WrongStreamParametersException("commit_timeout can not be more than " + maxCommitTimeout); + } else if (commitTimeout < 0) { + throw new WrongStreamParametersException("commit_timeout can not be less than 0"); + } + this.commitTimeoutMillis = TimeUnit.SECONDS.toMillis(commitTimeout == 0 ? maxCommitTimeout : commitTimeout); } public long getMessagesAllowedToSend(final long limit, final long sentSoFar) { @@ -86,9 +95,9 @@ public List getPartitions() { } public static StreamParameters of(final UserStreamParameters userStreamParameters, - final long commitTimeoutSeconds, + final long maxCommitTimeoutSeconds, final Client client) throws WrongStreamParametersException { - return new StreamParameters(userStreamParameters, commitTimeoutSeconds, client); + return new StreamParameters(userStreamParameters, maxCommitTimeoutSeconds, client); } } diff --git a/src/main/java/org/zalando/nakadi/view/UserStreamParameters.java b/src/main/java/org/zalando/nakadi/view/UserStreamParameters.java index 879e960d68..0a00b09891 100644 --- a/src/main/java/org/zalando/nakadi/view/UserStreamParameters.java +++ b/src/main/java/org/zalando/nakadi/view/UserStreamParameters.java @@ -25,6 +25,8 @@ public class UserStreamParameters { private final List partitions; + private final Optional commitTimeoutSeconds; + @JsonCreator public UserStreamParameters(@JsonProperty("batch_limit") @Nullable final Integer batchLimit, @JsonProperty("stream_limit") @Nullable final Long streamLimit, @@ -32,7 +34,8 @@ public UserStreamParameters(@JsonProperty("batch_limit") @Nullable final Integer @JsonProperty("stream_timeout") @Nullable final Long streamTimeout, @JsonProperty("stream_keep_alive_limit") @Nullable final Integer streamKeepAliveLimit, @JsonProperty("max_uncommitted_events") @Nullable final Integer maxUncommittedEvents, - @JsonProperty("partitions") @Nullable final List partitions) { + @JsonProperty("partitions") @Nullable final List partitions, + @JsonProperty("commit_timeout") @Nullable final Long commitTimeoutSeconds) { this.batchLimit = Optional.ofNullable(batchLimit); this.streamLimit = Optional.ofNullable(streamLimit); this.batchFlushTimeout = Optional.ofNullable(batchFlushTimeout); @@ -40,6 +43,7 @@ public UserStreamParameters(@JsonProperty("batch_limit") @Nullable final Integer this.streamKeepAliveLimit = Optional.ofNullable(streamKeepAliveLimit); this.maxUncommittedEvents = Optional.ofNullable(maxUncommittedEvents); this.partitions = partitions == null ? ImmutableList.of() : partitions; + this.commitTimeoutSeconds = Optional.ofNullable(commitTimeoutSeconds); } public Optional getBatchLimit() { @@ -69,4 +73,8 @@ public Optional getMaxUncommittedEvents() { public List getPartitions() { return partitions; } + + public Optional getCommitTimeoutSeconds() { + return commitTimeoutSeconds; + } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index c0b8d35814..f1e8cd7b4c 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -48,7 +48,7 @@ nakadi: warnMessage: "Compaction warning" stream: timeoutMs: 31536000000 # 1 year :-P - default.commitTimeout: 60 # 1 minute + max.commitTimeout: 60 # 1 minute maxConnections: 5 maxStreamMemoryBytes: 50000000 # ~50 MB kafka: @@ -129,8 +129,7 @@ spring: nakadi: stream: maxStreamMemoryBytes: 10_000 # ~10 Kb - default: - commitTimeout: 5 # seconds + max.commitTimeout: 5 # seconds subscription: maxPartitions: 30 features.defaultFeatures: diff --git a/src/test/java/org/zalando/nakadi/service/subscription/StreamParametersTest.java b/src/test/java/org/zalando/nakadi/service/subscription/StreamParametersTest.java index 538bf75942..d876610c46 100644 --- a/src/test/java/org/zalando/nakadi/service/subscription/StreamParametersTest.java +++ b/src/test/java/org/zalando/nakadi/service/subscription/StreamParametersTest.java @@ -96,7 +96,7 @@ public static StreamParameters createStreamParameters(final int batchLimitEvents final Client client) throws WrongStreamParametersException { final UserStreamParameters userParams = new UserStreamParameters(batchLimitEvents, streamLimitEvents, batchTimeoutSeconds, streamTimeoutSeconds, batchKeepAliveIterations, maxUncommittedMessages, - ImmutableList.of()); + ImmutableList.of(), commitTimeoutSeconds); return StreamParameters.of(userParams, commitTimeoutSeconds, client); } }