Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #969 from zalando/ARUHA-1978
Browse files Browse the repository at this point in the history
ARUHA-1978 Be able to specify commit timeout less than 60 seconds
  • Loading branch information
antban authored Nov 1, 2018
2 parents 3ae1fe9 + 3f168c7 commit 67e95a4
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 25 deletions.
39 changes: 33 additions & 6 deletions docs/_data/nakadi-event-bus-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,7 @@ paths:
- $ref: '#/parameters/BatchFlushTimeout'
- $ref: '#/parameters/StreamTimeout'
- $ref: '#/parameters/StreamKeepAliveLimit'
- $ref: '#/parameters/CommitTimeout'
- name: X-Flow-Id
in: header
description: |
Expand Down Expand Up @@ -1382,6 +1383,17 @@ paths:
default: 0
minimum: 0
maximum: 4200
commit_timeout:
description: |
Maximum amount of seconds that nakadi will be waiting for commit after sending a batch to a client.
In case if commit does not come within this timeout, nakadi will initialize stream termination, no
new data will be sent. Partitions from this stream will be assigned to other streams.
Setting commit_timeout to 0 is equal to setting it to the maximum allowed value - 60 seconds.
type: number
format: int32
default: 60
maximum: 60
minimum: 0
- $ref: '#/parameters/SubscriptionId'
- name: X-Flow-Id
in: header
Expand Down Expand Up @@ -1972,11 +1984,11 @@ definitions:
For concrete examples of what will be enforced by Nakadi see the objects BusinessEvent and
DataChangeEvent below.
schema:
oneOf:
oneOf:
- $ref: '#/definitions/BusinessEvent'
- $ref: '#/definitions/DataChangeEvent'
- $ref: '#/definitions/UndefinedEvent'

EventMetadata:
type: object
description: |
Expand Down Expand Up @@ -2073,9 +2085,9 @@ definitions:
type: string
required:
- eid
- occurred_at
- occurred_at


BusinessEvent:
description: |
A Business Event.
Expand Down Expand Up @@ -2230,7 +2242,7 @@ definitions:
CursorDistanceResult:
allOf:
- $ref: '#/definitions/CursorDistanceQuery'
- type: object
- type: object
properties:
distance:
type: number
Expand Down Expand Up @@ -3166,6 +3178,21 @@ parameters:
required: false
default: 0

CommitTimeout:
name: commit_timeout
in: query
description: |
Maximum amount of seconds that nakadi will be waiting for commit after sending a batch to a client.
In case if commit does not come within this timeout, nakadi will initialize stream termination, no
new data will be sent. Partitions from this stream will be assigned to other streams.
Setting commit_timeout to 0 is equal to setting it to the maximum allowed value - 60 seconds.
type: number
format: int32
default: 60
maximum: 60
minimum: 0
required: false

MaxUncommittedEvents:
name: max_uncommitted_events
in: query
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/org/zalando/nakadi/config/NakadiSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class NakadiSettings {
private final int defaultTopicReplicaFactor;
private final long defaultTopicRetentionMs;
private final long defaultTopicRotationMs;
private final long defaultCommitTimeoutSeconds;
private final long maxCommitTimeout;
private final long kafkaPollTimeoutMs;
private final long kafkaSendTimeoutMs;
private final long timelineWaitTimeoutMs;
Expand All @@ -30,7 +30,7 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo
@Value("${nakadi.topic.default.replicaFactor}") final int defaultTopicReplicaFactor,
@Value("${nakadi.topic.default.retentionMs}") final long defaultTopicRetentionMs,
@Value("${nakadi.topic.default.rotationMs}") final long defaultTopicRotationMs,
@Value("${nakadi.stream.default.commitTimeout}") final long defaultCommitTimeoutSeconds,
@Value("${nakadi.stream.max.commitTimeout}") final long maxCommitTimeout,
@Value("${nakadi.kafka.poll.timeoutMs}") final long kafkaPollTimeoutMs,
@Value("${nakadi.kafka.send.timeoutMs}") final long kafkaSendTimeoutMs,
@Value("${nakadi.timeline.wait.timeoutMs}") final long timelineWaitTimeoutMs,
Expand All @@ -45,7 +45,7 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo
this.defaultTopicReplicaFactor = defaultTopicReplicaFactor;
this.defaultTopicRetentionMs = defaultTopicRetentionMs;
this.defaultTopicRotationMs = defaultTopicRotationMs;
this.defaultCommitTimeoutSeconds = defaultCommitTimeoutSeconds;
this.maxCommitTimeout = maxCommitTimeout;
this.kafkaPollTimeoutMs = kafkaPollTimeoutMs;
this.kafkaSendTimeoutMs = kafkaSendTimeoutMs;
this.eventMaxBytes = eventMaxBytes;
Expand Down Expand Up @@ -76,8 +76,8 @@ public long getDefaultTopicRotationMs() {
return defaultTopicRotationMs;
}

public long getDefaultCommitTimeoutSeconds() {
return defaultCommitTimeoutSeconds;
public long getMaxCommitTimeout() {
return maxCommitTimeout;
}

public long getKafkaPollTimeoutMs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public StreamingResponseBody streamEvents(
final Client client) {

final StreamParameters streamParameters = StreamParameters.of(userParameters,
nakadiSettings.getDefaultCommitTimeoutSeconds(), client);
nakadiSettings.getMaxCommitTimeout(), client);

return stream(subscriptionId, request, response, client, streamParameters);
}
Expand All @@ -174,13 +174,14 @@ public StreamingResponseBody streamEvents(
@Nullable @RequestParam(value = "stream_timeout", required = false) final Long streamTimeout,
@Nullable @RequestParam(value = "stream_keep_alive_limit", required = false) final Integer
streamKeepAliveLimit,
@Nullable @RequestParam(value = "commit_timeout", required = false) final Long commitTimeout,
final HttpServletRequest request, final HttpServletResponse response, final Client client) {

final UserStreamParameters userParameters = new UserStreamParameters(batchLimit, streamLimit, batchTimeout,
streamTimeout, streamKeepAliveLimit, maxUncommittedEvents, ImmutableList.of());
streamTimeout, streamKeepAliveLimit, maxUncommittedEvents, ImmutableList.of(), commitTimeout);

final StreamParameters streamParameters = StreamParameters.of(userParameters,
nakadiSettings.getDefaultCommitTimeoutSeconds(), client);
nakadiSettings.getMaxCommitTimeout(), client);

return stream(subscriptionId, request, response, client, streamParameters);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void resetCursors(final String subscriptionId, final List<NakadiCursor> c
zkClient, subscription, timelineService, cursorConverter));
// add 1 second to commit timeout in order to give time to finish reset if there is uncommitted events
if (!cursors.isEmpty()) {
final long timeout = TimeUnit.SECONDS.toMillis(nakadiSettings.getDefaultCommitTimeoutSeconds()) +
final long timeout = TimeUnit.SECONDS.toMillis(nakadiSettings.getMaxCommitTimeout()) +
TimeUnit.SECONDS.toMillis(1);
zkClient.resetCursors(
cursors.stream().map(cursorConverter::convertToNoToken).collect(Collectors.toList()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ public class StreamParameters {

private final List<EventTypePartition> partitions;

private StreamParameters(final UserStreamParameters userParameters, final long commitTimeoutMillis,
final Client consumingClient) throws WrongStreamParametersException {
private StreamParameters(
final UserStreamParameters userParameters,
final long maxCommitTimeout,
final Client consumingClient) throws WrongStreamParametersException {

this.batchLimitEvents = userParameters.getBatchLimit().orElse(1);
if (batchLimitEvents <= 0) {
Expand All @@ -61,8 +63,15 @@ private StreamParameters(final UserStreamParameters userParameters, final long c
this.maxUncommittedMessages = userParameters.getMaxUncommittedEvents().orElse(10);
this.batchKeepAliveIterations = userParameters.getStreamKeepAliveLimit();
this.partitions = userParameters.getPartitions();
this.commitTimeoutMillis = TimeUnit.SECONDS.toMillis(commitTimeoutMillis);
this.consumingClient = consumingClient;

final long commitTimeout = userParameters.getCommitTimeoutSeconds().orElse(maxCommitTimeout);
if (commitTimeout > maxCommitTimeout) {
throw new WrongStreamParametersException("commit_timeout can not be more than " + maxCommitTimeout);
} else if (commitTimeout < 0) {
throw new WrongStreamParametersException("commit_timeout can not be less than 0");
}
this.commitTimeoutMillis = TimeUnit.SECONDS.toMillis(commitTimeout == 0 ? maxCommitTimeout : commitTimeout);
}

public long getMessagesAllowedToSend(final long limit, final long sentSoFar) {
Expand All @@ -86,9 +95,9 @@ public List<EventTypePartition> getPartitions() {
}

public static StreamParameters of(final UserStreamParameters userStreamParameters,
final long commitTimeoutSeconds,
final long maxCommitTimeoutSeconds,
final Client client) throws WrongStreamParametersException {
return new StreamParameters(userStreamParameters, commitTimeoutSeconds, client);
return new StreamParameters(userStreamParameters, maxCommitTimeoutSeconds, client);
}

}
10 changes: 9 additions & 1 deletion src/main/java/org/zalando/nakadi/view/UserStreamParameters.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,25 @@ public class UserStreamParameters {

private final List<EventTypePartition> partitions;

private final Optional<Long> commitTimeoutSeconds;

@JsonCreator
public UserStreamParameters(@JsonProperty("batch_limit") @Nullable final Integer batchLimit,
@JsonProperty("stream_limit") @Nullable final Long streamLimit,
@JsonProperty("batch_flush_timeout") @Nullable final Integer batchFlushTimeout,
@JsonProperty("stream_timeout") @Nullable final Long streamTimeout,
@JsonProperty("stream_keep_alive_limit") @Nullable final Integer streamKeepAliveLimit,
@JsonProperty("max_uncommitted_events") @Nullable final Integer maxUncommittedEvents,
@JsonProperty("partitions") @Nullable final List<EventTypePartition> partitions) {
@JsonProperty("partitions") @Nullable final List<EventTypePartition> partitions,
@JsonProperty("commit_timeout") @Nullable final Long commitTimeoutSeconds) {
this.batchLimit = Optional.ofNullable(batchLimit);
this.streamLimit = Optional.ofNullable(streamLimit);
this.batchFlushTimeout = Optional.ofNullable(batchFlushTimeout);
this.streamTimeout = Optional.ofNullable(streamTimeout);
this.streamKeepAliveLimit = Optional.ofNullable(streamKeepAliveLimit);
this.maxUncommittedEvents = Optional.ofNullable(maxUncommittedEvents);
this.partitions = partitions == null ? ImmutableList.of() : partitions;
this.commitTimeoutSeconds = Optional.ofNullable(commitTimeoutSeconds);
}

public Optional<Integer> getBatchLimit() {
Expand Down Expand Up @@ -69,4 +73,8 @@ public Optional<Integer> getMaxUncommittedEvents() {
public List<EventTypePartition> getPartitions() {
return partitions;
}

public Optional<Long> getCommitTimeoutSeconds() {
return commitTimeoutSeconds;
}
}
5 changes: 2 additions & 3 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ nakadi:
warnMessage: "Compaction warning"
stream:
timeoutMs: 31536000000 # 1 year :-P
default.commitTimeout: 60 # 1 minute
max.commitTimeout: 60 # 1 minute
maxConnections: 5
maxStreamMemoryBytes: 50000000 # ~50 MB
kafka:
Expand Down Expand Up @@ -129,8 +129,7 @@ spring:
nakadi:
stream:
maxStreamMemoryBytes: 10_000 # ~10 Kb
default:
commitTimeout: 5 # seconds
max.commitTimeout: 5 # seconds
subscription:
maxPartitions: 30
features.defaultFeatures:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public static StreamParameters createStreamParameters(final int batchLimitEvents
final Client client) throws WrongStreamParametersException {
final UserStreamParameters userParams = new UserStreamParameters(batchLimitEvents, streamLimitEvents,
batchTimeoutSeconds, streamTimeoutSeconds, batchKeepAliveIterations, maxUncommittedMessages,
ImmutableList.of());
ImmutableList.of(), commitTimeoutSeconds);
return StreamParameters.of(userParams, commitTimeoutSeconds, client);
}
}

0 comments on commit 67e95a4

Please sign in to comment.