diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a0919474e..9e3df0cab3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +## [2.7.0] - 2018-05-25 + +### Added +- Extended subscription statistics endpoint with time-lag information + ## [2.6.7] - 2018-05-15 ### Fixed diff --git a/docs/_data/nakadi-event-bus-api.yaml b/docs/_data/nakadi-event-bus-api.yaml index 752630849c..a7d02cdfbb 100644 --- a/docs/_data/nakadi-event-bus-api.yaml +++ b/docs/_data/nakadi-event-bus-api.yaml @@ -1389,6 +1389,11 @@ paths: description: exposes statistics of specified subscription parameters: - $ref: '#/parameters/SubscriptionId' + - name: show_time_lag + in: query + description: show consumer time lag + type: boolean + default: false responses: '200': description: Ok @@ -2703,6 +2708,11 @@ definitions: The amount of events in this partition that are not yet consumed within this subscription. The property may be absent at the moment when no events were yet consumed from the partition in this subscription (In case of `read_from` is `BEGIN` or `END`) + consumer_lag_seconds: + type: number + description: | + Subscription consumer lag for this partition in seconds. Measured as the age of the oldest event of + this partition that is not yet consumed within this subscription. stream_id: type: string description: the id of the stream that consumes data from this partition diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/UserJourneyAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/UserJourneyAT.java index cb1beea502..163a6958ed 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/UserJourneyAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/UserJourneyAT.java @@ -31,6 +31,7 @@ import static java.util.stream.IntStream.rangeClosed; import static org.echocat.jomon.runtime.concurrent.Retryer.executeWithRetry; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.core.IsEqual.equalTo; @@ -280,10 +281,11 @@ public void userJourneyHila() throws InterruptedException, IOException { // as we didn't commit, there should be still 4 unconsumed events jsonRequestSpec() - .get("/subscriptions/{sid}/stats", subscription.getId()) + .get("/subscriptions/{sid}/stats?show_time_lag=true", subscription.getId()) .then() .statusCode(OK.value()) - .body("items[0].partitions[0].unconsumed_events", equalTo(4)); + .body("items[0].partitions[0].unconsumed_events", equalTo(4)) + .body("items[0].partitions[0].consumer_lag_seconds", greaterThanOrEqualTo(0)); // commit cursor of latest event final StreamBatch lastBatch = batches.get(batches.size() - 1); @@ -293,10 +295,11 @@ public void userJourneyHila() throws InterruptedException, IOException { // now there should be 0 unconsumed events jsonRequestSpec() - .get("/subscriptions/{sid}/stats", subscription.getId()) + .get("/subscriptions/{sid}/stats?show_time_lag=true", subscription.getId()) .then() .statusCode(OK.value()) - .body("items[0].partitions[0].unconsumed_events", equalTo(0)); + .body("items[0].partitions[0].unconsumed_events", equalTo(0)) + .body("items[0].partitions[0].consumer_lag_seconds", equalTo(0)); // get cursors jsonRequestSpec() diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java index c04ecfaf7b..deb1c2f281 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java @@ -322,6 +322,7 @@ public void testGetSubscriptionStat() throws Exception { "0", "assigned", 15L, + null, client.getSessionId(), AUTO))) ); @@ -340,6 +341,7 @@ public void testGetSubscriptionStat() throws Exception { "0", "assigned", 5L, + null, client.getSessionId(), AUTO))) ); @@ -367,6 +369,7 @@ public void testGetSubscriptionStatWhenDirectAssignment() throws Exception { "0", "assigned", 0L, + null, client.getSessionId(), DIRECT )))))); @@ -400,6 +403,7 @@ public void testSubscriptionStatsMultiET() throws IOException { "0", "assigned", 1L, + null, client.getSessionId(), AUTO )))))) @@ -409,6 +413,7 @@ public void testSubscriptionStatsMultiET() throws IOException { "0", "assigned", 2L, + null, client.getSessionId(), AUTO )))))); diff --git a/src/main/java/org/zalando/nakadi/controller/ExceptionHandling.java b/src/main/java/org/zalando/nakadi/controller/ExceptionHandling.java index ea2629ec7a..dec7a1bc66 100644 --- a/src/main/java/org/zalando/nakadi/controller/ExceptionHandling.java +++ b/src/main/java/org/zalando/nakadi/controller/ExceptionHandling.java @@ -19,6 +19,7 @@ import org.zalando.nakadi.exceptions.runtime.CursorConversionException; import org.zalando.nakadi.exceptions.runtime.CursorsAreEmptyException; import org.zalando.nakadi.exceptions.runtime.DbWriteOperationsBlockedException; +import org.zalando.nakadi.exceptions.runtime.LimitReachedException; import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1; import org.zalando.nakadi.exceptions.runtime.NoEventTypeException; import org.zalando.nakadi.exceptions.runtime.RepositoryProblemException; @@ -157,6 +158,13 @@ public ResponseEntity handleServiceTemporarilyUnavailableException( return Responses.create(Response.Status.SERVICE_UNAVAILABLE, exception.getMessage(), request); } + @ExceptionHandler(LimitReachedException.class) + public ResponseEntity handleLimitReachedException( + final ServiceTemporarilyUnavailableException exception, final NativeWebRequest request) { + LOG.warn(exception.getMessage()); + return Responses.create(MoreStatus.TOO_MANY_REQUESTS, exception.getMessage(), request); + } + @ExceptionHandler(DbWriteOperationsBlockedException.class) public ResponseEntity handleDbWriteOperationsBlockedException( final DbWriteOperationsBlockedException exception, final NativeWebRequest request) { diff --git a/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java b/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java index 075b0a7a39..c53c0e2320 100644 --- a/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java +++ b/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java @@ -14,22 +14,27 @@ import org.springframework.web.context.request.NativeWebRequest; import org.zalando.nakadi.domain.ItemsWrapper; import org.zalando.nakadi.domain.SubscriptionEventTypeStats; +import org.zalando.nakadi.exceptions.ErrorGettingCursorTimeLagException; import org.zalando.nakadi.exceptions.NakadiException; import org.zalando.nakadi.exceptions.runtime.FeatureNotAvailableException; import org.zalando.nakadi.exceptions.runtime.InconsistentStateException; import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException; +import org.zalando.nakadi.exceptions.runtime.TimeLagStatsTimeoutException; +import org.zalando.nakadi.service.FeatureToggleService; import org.zalando.nakadi.service.WebResult; import org.zalando.nakadi.service.subscription.SubscriptionService; -import org.zalando.nakadi.service.FeatureToggleService; +import org.zalando.nakadi.service.subscription.SubscriptionService.StatsMode; import org.zalando.problem.Problem; import org.zalando.problem.spring.web.advice.Responses; import javax.annotation.Nullable; +import javax.ws.rs.core.Response; import java.util.Set; import static javax.ws.rs.core.Response.Status.NOT_IMPLEMENTED; import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE; import static org.zalando.nakadi.service.FeatureToggleService.Feature.HIGH_LEVEL_API; +import static org.zalando.problem.MoreStatus.UNPROCESSABLE_ENTITY; @RestController @@ -58,8 +63,8 @@ public ResponseEntity listSubscriptions( final NativeWebRequest request) { featureToggleService.checkFeatureOn(HIGH_LEVEL_API); - return WebResult.wrap(() -> - subscriptionService.listSubscriptions(owningApplication, eventTypes, showStatus, limit, offset), + return WebResult.wrap( + () -> subscriptionService.listSubscriptions(owningApplication, eventTypes, showStatus, limit, offset), request); } @@ -82,11 +87,13 @@ public ResponseEntity deleteSubscription(@PathVariable("id") final String sub @RequestMapping(value = "/{id}/stats", method = RequestMethod.GET) public ItemsWrapper getSubscriptionStats( - @PathVariable("id") final String subscriptionId) + @PathVariable("id") final String subscriptionId, + @RequestParam(value = "show_time_lag", required = false, defaultValue = "false") final boolean showTimeLag) throws NakadiException, InconsistentStateException, ServiceTemporarilyUnavailableException { featureToggleService.checkFeatureOn(HIGH_LEVEL_API); - return subscriptionService.getSubscriptionStat(subscriptionId, true); + final StatsMode statsMode = showTimeLag ? StatsMode.TIMELAG : StatsMode.NORMAL; + return subscriptionService.getSubscriptionStat(subscriptionId, statsMode); } @ExceptionHandler(NakadiException.class) @@ -103,6 +110,13 @@ public ResponseEntity handleFeatureTurnedOff(final FeatureNotAvailableE return Responses.create(Problem.valueOf(NOT_IMPLEMENTED, ex.getMessage()), request); } + @ExceptionHandler(ErrorGettingCursorTimeLagException.class) + public ResponseEntity handleTimeLagException(final ErrorGettingCursorTimeLagException ex, + final NativeWebRequest request) { + LOG.debug(ex.getMessage(), ex); + return Responses.create(Problem.valueOf(UNPROCESSABLE_ENTITY, ex.getMessage()), request); + } + @ExceptionHandler(InconsistentStateException.class) public ResponseEntity handleInconsistentState(final InconsistentStateException ex, final NativeWebRequest request) { @@ -116,7 +130,7 @@ public ResponseEntity handleInconsistentState(final InconsistentStateEx @ExceptionHandler(ServiceTemporarilyUnavailableException.class) public ResponseEntity handleServiceTemporarilyUnavailable(final ServiceTemporarilyUnavailableException ex, - final NativeWebRequest request) { + final NativeWebRequest request) { LOG.debug(ex.getMessage(), ex); return Responses.create( Problem.valueOf( @@ -125,4 +139,11 @@ public ResponseEntity handleServiceTemporarilyUnavailable(final Service request); } + @ExceptionHandler(TimeLagStatsTimeoutException.class) + public ResponseEntity handleTimeLagStatsTimeoutException(final TimeLagStatsTimeoutException e, + final NativeWebRequest request) { + LOG.warn(e.getMessage()); + return Responses.create(Response.Status.REQUEST_TIMEOUT, e.getMessage(), request); + } + } diff --git a/src/main/java/org/zalando/nakadi/domain/ConsumedEvent.java b/src/main/java/org/zalando/nakadi/domain/ConsumedEvent.java index 2e37ce2f14..fb63694935 100644 --- a/src/main/java/org/zalando/nakadi/domain/ConsumedEvent.java +++ b/src/main/java/org/zalando/nakadi/domain/ConsumedEvent.java @@ -8,10 +8,12 @@ public class ConsumedEvent { private final byte[] event; private final NakadiCursor position; + private final long timestamp; - public ConsumedEvent(final byte[] event, final NakadiCursor position) { + public ConsumedEvent(final byte[] event, final NakadiCursor position, final long timestamp) { this.event = event; this.position = position; + this.timestamp = timestamp; } public byte[] getEvent() { @@ -22,6 +24,10 @@ public NakadiCursor getPosition() { return position; } + public long getTimestamp() { + return timestamp; + } + @Override public boolean equals(final Object o) { if (this == o) { diff --git a/src/main/java/org/zalando/nakadi/domain/SubscriptionEventTypeStats.java b/src/main/java/org/zalando/nakadi/domain/SubscriptionEventTypeStats.java index 212864546a..476ca930cf 100644 --- a/src/main/java/org/zalando/nakadi/domain/SubscriptionEventTypeStats.java +++ b/src/main/java/org/zalando/nakadi/domain/SubscriptionEventTypeStats.java @@ -56,6 +56,9 @@ public String getDescription() { @JsonInclude(JsonInclude.Include.NON_NULL) private final Long unconsumedEvents; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final Long consumerLagSeconds; + private final String streamId; @JsonInclude(JsonInclude.Include.NON_NULL) @@ -65,11 +68,13 @@ public Partition( @JsonProperty("partition") final String partition, @JsonProperty("state") final String state, @JsonProperty("unconsumed_events") @Nullable final Long unconsumedEvents, + @JsonProperty("consumer_lag_seconds") @Nullable final Long consumerLagSeconds, @JsonProperty("stream_id") final String streamId, @JsonProperty("assignment_type") @Nullable final AssignmentType assignmentType) { this.partition = partition; this.state = state; this.unconsumedEvents = unconsumedEvents; + this.consumerLagSeconds = consumerLagSeconds; this.streamId = streamId; this.assignmentType = assignmentType; } @@ -87,6 +92,11 @@ public Long getUnconsumedEvents() { return unconsumedEvents; } + @Nullable + public Long getConsumerLagSeconds() { + return consumerLagSeconds; + } + public String getStreamId() { return streamId; } diff --git a/src/main/java/org/zalando/nakadi/exceptions/ErrorGettingCursorTimeLagException.java b/src/main/java/org/zalando/nakadi/exceptions/ErrorGettingCursorTimeLagException.java new file mode 100644 index 0000000000..fe442242a0 --- /dev/null +++ b/src/main/java/org/zalando/nakadi/exceptions/ErrorGettingCursorTimeLagException.java @@ -0,0 +1,20 @@ +package org.zalando.nakadi.exceptions; + +import org.zalando.nakadi.domain.NakadiCursor; +import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1; + +public class ErrorGettingCursorTimeLagException extends MyNakadiRuntimeException1 { + + private final NakadiCursor failedCursor; + + public ErrorGettingCursorTimeLagException(final NakadiCursor failedCursor, + final Throwable cause) { + super("Error occurred when getting subscription time lag as as subscription cursor is wrong or expired: " + + failedCursor.toString(), cause); + this.failedCursor = failedCursor; + } + + public NakadiCursor getFailedCursor() { + return failedCursor; + } +} diff --git a/src/main/java/org/zalando/nakadi/exceptions/runtime/InconsistentStateException.java b/src/main/java/org/zalando/nakadi/exceptions/runtime/InconsistentStateException.java index 07406da155..ba3bc8d709 100644 --- a/src/main/java/org/zalando/nakadi/exceptions/runtime/InconsistentStateException.java +++ b/src/main/java/org/zalando/nakadi/exceptions/runtime/InconsistentStateException.java @@ -2,7 +2,7 @@ public class InconsistentStateException extends MyNakadiRuntimeException1 { - public InconsistentStateException(final String msg, final Exception cause) { + public InconsistentStateException(final String msg, final Throwable cause) { super(msg, cause); } diff --git a/src/main/java/org/zalando/nakadi/exceptions/runtime/LimitReachedException.java b/src/main/java/org/zalando/nakadi/exceptions/runtime/LimitReachedException.java new file mode 100644 index 0000000000..176632da4a --- /dev/null +++ b/src/main/java/org/zalando/nakadi/exceptions/runtime/LimitReachedException.java @@ -0,0 +1,9 @@ +package org.zalando.nakadi.exceptions.runtime; + +public class LimitReachedException extends MyNakadiRuntimeException1 { + + public LimitReachedException(final String msg, final Throwable cause) { + super(msg, cause); + } + +} diff --git a/src/main/java/org/zalando/nakadi/exceptions/runtime/TimeLagStatsTimeoutException.java b/src/main/java/org/zalando/nakadi/exceptions/runtime/TimeLagStatsTimeoutException.java new file mode 100644 index 0000000000..9eec9d7a5e --- /dev/null +++ b/src/main/java/org/zalando/nakadi/exceptions/runtime/TimeLagStatsTimeoutException.java @@ -0,0 +1,9 @@ +package org.zalando.nakadi.exceptions.runtime; + +public class TimeLagStatsTimeoutException extends MyNakadiRuntimeException1 { + + public TimeLagStatsTimeoutException(final String msg, final Throwable cause) { + super(msg, cause); + } + +} diff --git a/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java b/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java index 85d6bb05d7..83d5b2b541 100644 --- a/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java +++ b/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java @@ -59,7 +59,7 @@ public List readEvents() { for (final ConsumerRecord record : records) { final KafkaCursor cursor = new KafkaCursor(record.topic(), record.partition(), record.offset()); final Timeline timeline = timelineMap.get(new TopicPartition(record.topic(), record.partition())); - result.add(new ConsumedEvent(record.value(), cursor.toNakadiCursor(timeline))); + result.add(new ConsumedEvent(record.value(), cursor.toNakadiCursor(timeline), record.timestamp())); } return result; } diff --git a/src/main/java/org/zalando/nakadi/service/NakadiCursorComparator.java b/src/main/java/org/zalando/nakadi/service/NakadiCursorComparator.java index 1737d09183..8f7e94e0cd 100644 --- a/src/main/java/org/zalando/nakadi/service/NakadiCursorComparator.java +++ b/src/main/java/org/zalando/nakadi/service/NakadiCursorComparator.java @@ -1,5 +1,7 @@ package org.zalando.nakadi.service; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.InternalNakadiException; @@ -11,9 +13,12 @@ import java.util.List; import java.util.Objects; +@Component public class NakadiCursorComparator implements Comparator { + private final EventTypeCache eventTypeCache; + @Autowired public NakadiCursorComparator(final EventTypeCache eventTypeCache) { this.eventTypeCache = eventTypeCache; } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java index 70abc66fd9..2ce6518b86 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java @@ -1,5 +1,6 @@ package org.zalando.nakadi.service.subscription; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.json.JSONObject; import org.slf4j.Logger; @@ -55,6 +56,7 @@ import javax.annotation.Nullable; import javax.ws.rs.core.Response; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -81,6 +83,13 @@ public class SubscriptionService { private final NakadiKpiPublisher nakadiKpiPublisher; private final FeatureToggleService featureToggleService; private final String subLogEventType; + private final SubscriptionTimeLagService subscriptionTimeLagService; + + public enum StatsMode { + LIGHT, + NORMAL, + TIMELAG + } @Autowired public SubscriptionService(final SubscriptionDbRepository subscriptionRepository, @@ -92,6 +101,7 @@ public SubscriptionService(final SubscriptionDbRepository subscriptionRepository final CursorOperationsService cursorOperationsService, final NakadiKpiPublisher nakadiKpiPublisher, final FeatureToggleService featureToggleService, + final SubscriptionTimeLagService subscriptionTimeLagService, @Value("${nakadi.kpi.event-types.nakadiSubscriptionLog}") final String subLogEventType) { this.subscriptionRepository = subscriptionRepository; this.subscriptionClientFactory = subscriptionClientFactory; @@ -102,6 +112,7 @@ public SubscriptionService(final SubscriptionDbRepository subscriptionRepository this.cursorOperationsService = cursorOperationsService; this.nakadiKpiPublisher = nakadiKpiPublisher; this.featureToggleService = featureToggleService; + this.subscriptionTimeLagService = subscriptionTimeLagService; this.subLogEventType = subLogEventType; } @@ -161,7 +172,7 @@ public Result listSubscriptions(@Nullable final String owningApplication, @Nulla new PaginationWrapper<>(subscriptions, paginationLinks); if (showStatus) { final List items = paginationWrapper.getItems(); - items.forEach(s -> s.setStatus(createSubscriptionStat(s, false))); + items.forEach(s -> s.setStatus(createSubscriptionStat(s, StatsMode.LIGHT))); } return Result.ok(paginationWrapper); } catch (final ServiceTemporarilyUnavailableException e) { @@ -214,7 +225,7 @@ public Result deleteSubscription(final String subscriptionId) throws DbWri } public ItemsWrapper getSubscriptionStat(final String subscriptionId, - final boolean includeDistance) + final StatsMode statsMode) throws InconsistentStateException, NoSuchSubscriptionException, ServiceTemporarilyUnavailableException { final Subscription subscription; try { @@ -222,21 +233,21 @@ public ItemsWrapper getSubscriptionStat(final String } catch (final ServiceTemporarilyUnavailableException ex) { throw new InconsistentStateException(ex.getMessage()); } - final List subscriptionStat = createSubscriptionStat(subscription, includeDistance); + final List subscriptionStat = createSubscriptionStat(subscription, statsMode); return new ItemsWrapper<>(subscriptionStat); } private List createSubscriptionStat(final Subscription subscription, - final boolean includeDistance) + final StatsMode statsMode) throws InconsistentStateException, ServiceTemporarilyUnavailableException { final List eventTypes = getEventTypesForSubscription(subscription); final ZkSubscriptionClient subscriptionClient = createZkSubscriptionClient(subscription); final Optional zkSubscriptionNode = subscriptionClient.getZkSubscriptionNode(); - if (includeDistance) { - return loadStats(eventTypes, zkSubscriptionNode, subscriptionClient); - } else { + if (statsMode == StatsMode.LIGHT) { return loadLightStats(eventTypes, zkSubscriptionNode); + } else { + return loadStats(eventTypes, zkSubscriptionNode, subscriptionClient, statsMode); } } @@ -286,17 +297,22 @@ private List getPartitionsList(final EventType eventType) { private List loadStats( final Collection eventTypes, final Optional subscriptionNode, - final ZkSubscriptionClient client) + final ZkSubscriptionClient client, final StatsMode statsMode) throws ServiceTemporarilyUnavailableException, InconsistentStateException { final List result = new ArrayList<>(eventTypes.size()); final Collection committedPositions = getCommittedPositions(subscriptionNode, client); final List stats = loadPartitionEndStatistics(eventTypes); + + final Map timeLags = statsMode == StatsMode.TIMELAG ? + subscriptionTimeLagService.getTimeLags(committedPositions, stats) : + ImmutableMap.of(); + for (final EventType eventType : eventTypes) { final List statsForEventType = stats.stream() .filter(s -> s.getTimeline().getEventType().equals(eventType.getName())) .collect(Collectors.toList()); result.add(getEventTypeStats(subscriptionNode, eventType.getName(), statsForEventType, - committedPositions)); + committedPositions, timeLags)); } return result; } @@ -314,14 +330,18 @@ private List loadLightStats(final Collection subscriptionNode, final String eventTypeName, final List stats, - final Collection committedPositions) { + final Collection committedPositions, + final Map timeLags) { final List resultPartitions = new ArrayList<>(stats.size()); for (final PartitionBaseStatistics stat : stats) { final String partition = stat.getPartition(); final NakadiCursor lastPosition = ((PartitionEndStatistics) stat).getLast(); final Long distance = computeDistance(committedPositions, lastPosition); - resultPartitions.add(getPartitionStats(subscriptionNode, eventTypeName, partition, distance)); + final Long lagSeconds = Optional.ofNullable(timeLags.get(new EventTypePartition(eventTypeName, partition))) + .map(Duration::getSeconds) + .orElse(null); + resultPartitions.add(getPartitionStats(subscriptionNode, eventTypeName, partition, distance, lagSeconds)); } resultPartitions.sort(Comparator.comparing(SubscriptionEventTypeStats.Partition::getPartition)); return new SubscriptionEventTypeStats(eventTypeName, resultPartitions); @@ -338,7 +358,7 @@ private SubscriptionEventTypeStats getEventTypeLightStats(final Optional getPartitionsList(eventType)); for (final String partition : partitionsList) { - resultPartitions.add(getPartitionStats(subscriptionNode, eventType.getName(), partition, null)); + resultPartitions.add(getPartitionStats(subscriptionNode, eventType.getName(), partition, null, null)); } resultPartitions.sort(Comparator.comparing(SubscriptionEventTypeStats.Partition::getPartition)); return new SubscriptionEventTypeStats(eventType.getName(), resultPartitions); @@ -346,13 +366,13 @@ private SubscriptionEventTypeStats getEventTypeLightStats(final Optional subscriptionNode, final String eventTypeName, final String partition, - final Long distance) { + final Long distance, final Long lagSeconds) { final Partition.State state = getState(subscriptionNode, eventTypeName, partition); final String streamId = getStreamId(subscriptionNode, eventTypeName, partition); final SubscriptionEventTypeStats.Partition.AssignmentType assignmentType = getAssignmentType(subscriptionNode, eventTypeName, partition); return new SubscriptionEventTypeStats.Partition(partition, state.getDescription(), - distance, streamId, assignmentType); + distance, lagSeconds, streamId, assignmentType); } private Collection getCommittedPositions(final Optional subscriptionNode, diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java new file mode 100644 index 0000000000..d309a94ec8 --- /dev/null +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java @@ -0,0 +1,183 @@ +package org.zalando.nakadi.service.subscription; + +import com.google.common.collect.ImmutableList; +import org.echocat.jomon.runtime.concurrent.RetryForSpecifiedTimeStrategy; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.zalando.nakadi.domain.ConsumedEvent; +import org.zalando.nakadi.domain.EventTypePartition; +import org.zalando.nakadi.domain.NakadiCursor; +import org.zalando.nakadi.domain.PartitionEndStatistics; +import org.zalando.nakadi.exceptions.ErrorGettingCursorTimeLagException; +import org.zalando.nakadi.exceptions.InvalidCursorException; +import org.zalando.nakadi.exceptions.NakadiException; +import org.zalando.nakadi.exceptions.runtime.InconsistentStateException; +import org.zalando.nakadi.exceptions.runtime.LimitReachedException; +import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1; +import org.zalando.nakadi.exceptions.runtime.TimeLagStatsTimeoutException; +import org.zalando.nakadi.repository.EventConsumer; +import org.zalando.nakadi.service.NakadiCursorComparator; +import org.zalando.nakadi.service.timeline.TimelineService; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.echocat.jomon.runtime.concurrent.Retryer.executeWithRetry; + +@Component +public class SubscriptionTimeLagService { + + private static final int EVENT_FETCH_WAIT_TIME_MS = 1000; + private static final int REQUEST_TIMEOUT_MS = 30000; + private static final int MAX_THREADS_PER_REQUEST = 20; + private static final int TIME_LAG_COMMON_POOL_SIZE = 400; + + private final TimelineService timelineService; + private final NakadiCursorComparator cursorComparator; + private final ThreadPoolExecutor threadPool; + + @Autowired + public SubscriptionTimeLagService(final TimelineService timelineService, + final NakadiCursorComparator cursorComparator) { + this.timelineService = timelineService; + this.cursorComparator = cursorComparator; + this.threadPool = new ThreadPoolExecutor(0, TIME_LAG_COMMON_POOL_SIZE, 60L, TimeUnit.SECONDS, + new SynchronousQueue<>()); + } + + public Map getTimeLags(final Collection committedPositions, + final List endPositions) + throws ErrorGettingCursorTimeLagException, InconsistentStateException, LimitReachedException, + TimeLagStatsTimeoutException { + + final TimeLagRequestHandler timeLagHandler = new TimeLagRequestHandler(timelineService, threadPool); + final Map timeLags = new HashMap<>(); + final Map> futureTimeLags = new HashMap<>(); + try { + for (final NakadiCursor cursor : committedPositions) { + if (isCursorAtTail(cursor, endPositions)) { + timeLags.put(cursor.getEventTypePartition(), Duration.ZERO); + } else { + final CompletableFuture timeLagFuture = timeLagHandler.getCursorTimeLagFuture(cursor); + futureTimeLags.put(cursor.getEventTypePartition(), timeLagFuture); + } + } + CompletableFuture + .allOf(futureTimeLags.values().toArray(new CompletableFuture[futureTimeLags.size()])) + .get(timeLagHandler.getRemainingTimeoutMs(), TimeUnit.MILLISECONDS); + + for (final EventTypePartition partition : futureTimeLags.keySet()) { + timeLags.put(partition, futureTimeLags.get(partition).get()); + } + return timeLags; + } catch (final RejectedExecutionException e) { + throw new LimitReachedException("Time lag statistics thread pool exhausted", e); + } catch (final TimeoutException e) { + throw new TimeLagStatsTimeoutException("Timeout exceeded for time lag statistics", e); + } catch (final ExecutionException e) { + if (e.getCause() instanceof MyNakadiRuntimeException1) { + throw (MyNakadiRuntimeException1) e.getCause(); + } else { + throw new InconsistentStateException("Unexpected error occurred when getting subscription time lag", + e.getCause()); + } + } catch (final Throwable e) { + throw new InconsistentStateException("Unexpected error occurred when getting subscription time lag", e); + } + } + + private boolean isCursorAtTail(final NakadiCursor cursor, final List endPositions) { + return endPositions.stream() + .map(PartitionEndStatistics::getLast) + .filter(last -> last.getEventType().equals(cursor.getEventType()) + && last.getPartition().equals(cursor.getPartition())) + .findAny() + .map(last -> cursorComparator.compare(cursor, last) >= 0) + .orElse(false); + } + + private static class TimeLagRequestHandler { + + private final TimelineService timelineService; + private final ThreadPoolExecutor threadPool; + private final Semaphore semaphore; + private final long timeoutTimestampMs; + + TimeLagRequestHandler(final TimelineService timelineService, final ThreadPoolExecutor threadPool) { + this.timelineService = timelineService; + this.threadPool = threadPool; + this.semaphore = new Semaphore(MAX_THREADS_PER_REQUEST); + this.timeoutTimestampMs = System.currentTimeMillis() + REQUEST_TIMEOUT_MS; + } + + CompletableFuture getCursorTimeLagFuture(final NakadiCursor cursor) + throws InterruptedException, TimeoutException { + + final CompletableFuture future = new CompletableFuture<>(); + if (semaphore.tryAcquire(getRemainingTimeoutMs(), TimeUnit.MILLISECONDS)) { + threadPool.submit(() -> { + try { + final Duration timeLag = getNextEventTimeLag(cursor); + future.complete(timeLag); + } catch (final Throwable e) { + future.completeExceptionally(e); + } finally { + semaphore.release(); + } + }); + } else { + throw new TimeoutException("Partition time lag timeout exceeded"); + } + return future; + } + + long getRemainingTimeoutMs() { + if (timeoutTimestampMs > System.currentTimeMillis()) { + return timeoutTimestampMs - System.currentTimeMillis(); + } else { + return 0; + } + } + + private Duration getNextEventTimeLag(final NakadiCursor cursor) throws ErrorGettingCursorTimeLagException, + InconsistentStateException { + + try (EventConsumer consumer = timelineService.createEventConsumer( + "time-lag-checker-" + UUID.randomUUID().toString(), ImmutableList.of(cursor))) { + + final ConsumedEvent nextEvent = executeWithRetry( + () -> { + final List events = consumer.readEvents(); + return events.isEmpty() ? null : events.iterator().next(); + }, + new RetryForSpecifiedTimeStrategy(EVENT_FETCH_WAIT_TIME_MS) + .withResultsThatForceRetry((ConsumedEvent) null)); + + if (nextEvent == null) { + throw new InconsistentStateException("Timeout waiting for events when getting consumer time lag"); + } else { + return Duration.ofMillis(new Date().getTime() - nextEvent.getTimestamp()); + } + } catch (final NakadiException | IOException e) { + throw new InconsistentStateException("Unexpected error happened when getting consumer time lag", e); + } catch (final InvalidCursorException e) { + throw new ErrorGettingCursorTimeLagException(cursor, e); + } + } + } + +} diff --git a/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java b/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java index b6b046a86c..ae6858498c 100644 --- a/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java @@ -122,7 +122,7 @@ public SubscriptionControllerTest() throws Exception { final NakadiKpiPublisher nakadiKpiPublisher = mock(NakadiKpiPublisher.class); final SubscriptionService subscriptionService = new SubscriptionService(subscriptionRepository, zkSubscriptionClientFactory, timelineService, eventTypeRepository, null, - cursorConverter, cursorOperationsService, nakadiKpiPublisher, featureToggleService, + cursorConverter, cursorOperationsService, nakadiKpiPublisher, featureToggleService, null, "subscription_log_et"); final SubscriptionController controller = new SubscriptionController(featureToggleService, subscriptionService); final ApplicationService applicationService = mock(ApplicationService.class); @@ -262,7 +262,7 @@ public void whenGetSubscriptionStatThenOk() throws Exception { Collections.singletonList(new SubscriptionEventTypeStats( TIMELINE.getEventType(), Collections.singletonList( - new SubscriptionEventTypeStats.Partition("0", "assigned", 10L, "xz", AUTO))) + new SubscriptionEventTypeStats.Partition("0", "assigned", 10L, null, "xz", AUTO))) ); getSubscriptionStats(subscription.getId()) diff --git a/src/test/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumerTest.java b/src/test/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumerTest.java index d36fff77f9..4e87c05de2 100644 --- a/src/test/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumerTest.java +++ b/src/test/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumerTest.java @@ -137,12 +137,14 @@ public void whenReadEventsThenGetRightEvents() { assertThat("The event we read first should have the same data as first mocked ConsumerRecord", consumedEvents.get(0), equalTo(new ConsumedEvent(event1, - new KafkaCursor(TOPIC, PARTITION, event1Offset).toNakadiCursor(timeline)))); + new KafkaCursor(TOPIC, PARTITION, event1Offset).toNakadiCursor(timeline), + 0))); assertThat("The event we read second should have the same data as second mocked ConsumerRecord", consumedEvents.get(1), equalTo(new ConsumedEvent(event2, - new KafkaCursor(TOPIC, PARTITION, event2Offset).toNakadiCursor(timeline)))); + new KafkaCursor(TOPIC, PARTITION, event2Offset).toNakadiCursor(timeline), + 0))); assertThat("The kafka poll should be called with timeout we defined", pollTimeoutCaptor.getValue(), equalTo(POLL_TIMEOUT)); diff --git a/src/test/java/org/zalando/nakadi/service/EventStreamTest.java b/src/test/java/org/zalando/nakadi/service/EventStreamTest.java index 25f7c16df7..3cb06e8390 100644 --- a/src/test/java/org/zalando/nakadi/service/EventStreamTest.java +++ b/src/test/java/org/zalando/nakadi/service/EventStreamTest.java @@ -319,7 +319,7 @@ public void whenReadingEventsTheOrderIsCorrect() throws NakadiException, IOExcep .boxed() .map(index -> new ConsumedEvent( ("event" + index).getBytes(UTF_8), NakadiCursor.of(TIMELINE, "0", - KafkaCursor.toNakadiOffset(index)))) + KafkaCursor.toNakadiOffset(index)), 0)) .collect(Collectors.toList())); final EventStream eventStream = @@ -360,12 +360,12 @@ public void whenReadFromMultiplePartitionsThenGroupedInBatchesAccordingToPartiti final ByteArrayOutputStream out = new ByteArrayOutputStream(); final LinkedList events = new LinkedList<>(); - events.add(new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, "0", "000000000000000000"))); - events.add(new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, "1", "000000000000000000"))); - events.add(new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, "2", "000000000000000000"))); - events.add(new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, "0", "000000000000000000"))); - events.add(new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, "1", "000000000000000000"))); - events.add(new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, "2", "000000000000000000"))); + events.add(new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, "0", "000000000000000000"), 0)); + events.add(new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, "1", "000000000000000000"), 0)); + events.add(new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, "2", "000000000000000000"), 0)); + events.add(new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, "0", "000000000000000000"), 0)); + events.add(new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, "1", "000000000000000000"), 0)); + events.add(new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, "2", "000000000000000000"), 0)); final EventStream eventStream = new EventStream(predefinedConsumer(events), out, config, mock(BlacklistService.class), cursorConverter, @@ -394,7 +394,7 @@ private static NakadiKafkaConsumer endlessDummyConsumerForPartition(final String final NakadiKafkaConsumer nakadiKafkaConsumer = mock(NakadiKafkaConsumer.class); when(nakadiKafkaConsumer.readEvents()) .thenReturn(Collections.singletonList( - new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, partition, "0")))); + new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, partition, "0"), 0))); return nakadiKafkaConsumer; } @@ -406,7 +406,7 @@ private static NakadiKafkaConsumer nCountDummyConsumerForPartition(final int eve if (eventsToCreate.get() > 0) { eventsToCreate.set(eventsToCreate.get() - 1); return Collections.singletonList( - new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, partition, "000000000000000000"))); + new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, partition, "000000000000000000"), 0)); } else { return Collections.emptyList(); } @@ -524,7 +524,7 @@ public void testWriteStreamInfoWhenPresent() { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final SubscriptionCursor cursor = new SubscriptionCursor("11", "000000000000000012", "event-type", "token-id"); final ArrayList events = Lists.newArrayList( - new ConsumedEvent("{\"a\":\"b\"}".getBytes(), mock(NakadiCursor.class))); + new ConsumedEvent("{\"a\":\"b\"}".getBytes(), mock(NakadiCursor.class), 0)); try { writerProvider.getWriter().writeSubscriptionBatch(baos, cursor, events, Optional.of("something")); diff --git a/src/test/java/org/zalando/nakadi/service/SubscriptionServiceTest.java b/src/test/java/org/zalando/nakadi/service/SubscriptionServiceTest.java index 535c023333..c3520dfe1c 100644 --- a/src/test/java/org/zalando/nakadi/service/SubscriptionServiceTest.java +++ b/src/test/java/org/zalando/nakadi/service/SubscriptionServiceTest.java @@ -42,7 +42,7 @@ public SubscriptionServiceTest() throws Exception { subscriptionService = new SubscriptionService(subscriptionRepository, zkSubscriptionClientFactory, timelineService, eventTypeRepository, subscriptionValidationService, cursorConverter, - cursorOperationsService, nakadiKpiPublisher, featureToggleService, SUBSCRIPTION_LOG_ET); + cursorOperationsService, nakadiKpiPublisher, featureToggleService, null, SUBSCRIPTION_LOG_ET); } @Test diff --git a/src/test/java/org/zalando/nakadi/service/SubscriptionTimeLagServiceTest.java b/src/test/java/org/zalando/nakadi/service/SubscriptionTimeLagServiceTest.java new file mode 100644 index 0000000000..6c444166c9 --- /dev/null +++ b/src/test/java/org/zalando/nakadi/service/SubscriptionTimeLagServiceTest.java @@ -0,0 +1,111 @@ +package org.zalando.nakadi.service; + +import com.google.common.collect.ImmutableList; +import org.junit.Before; +import org.junit.Test; +import org.zalando.nakadi.domain.ConsumedEvent; +import org.zalando.nakadi.domain.EventTypePartition; +import org.zalando.nakadi.domain.NakadiCursor; +import org.zalando.nakadi.domain.PartitionEndStatistics; +import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.Timeline; +import org.zalando.nakadi.exceptions.ErrorGettingCursorTimeLagException; +import org.zalando.nakadi.exceptions.InvalidCursorException; +import org.zalando.nakadi.exceptions.NakadiException; +import org.zalando.nakadi.exceptions.runtime.InconsistentStateException; +import org.zalando.nakadi.repository.EventConsumer; +import org.zalando.nakadi.service.subscription.SubscriptionTimeLagService; +import org.zalando.nakadi.service.timeline.TimelineService; + +import java.time.Duration; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SubscriptionTimeLagServiceTest { + + private static final long FAKE_EVENT_TIMESTAMP = 478220400000L; + + private NakadiCursorComparator cursorComparator; + private SubscriptionTimeLagService timeLagService; + private TimelineService timelineService; + + @Before + public void setUp() throws Exception { + timelineService = mock(TimelineService.class); + + cursorComparator = mock(NakadiCursorComparator.class); + timeLagService = new SubscriptionTimeLagService(timelineService, cursorComparator); + } + + @Test + public void testTimeLagsForTailAndNotTailPositions() throws NakadiException, InvalidCursorException { + + final EventConsumer eventConsumer = mock(EventConsumer.class); + final Timeline timeline = mock(Timeline.class); + when(timeline.getStorage()).thenReturn(new Storage("", Storage.Type.KAFKA)); + when(eventConsumer.readEvents()).thenAnswer(invocation -> + ImmutableList.of(new ConsumedEvent(null, NakadiCursor.of(timeline, "", ""), FAKE_EVENT_TIMESTAMP))); + + when(timelineService.createEventConsumer(any(), any())).thenReturn(eventConsumer); + + final Timeline et1Timeline = new Timeline("et1", 0, new Storage("", Storage.Type.KAFKA), "t1", null); + + final NakadiCursor committedCursor1 = NakadiCursor.of(et1Timeline, "p1", "o1"); + final NakadiCursor committedCursor2 = NakadiCursor.of(et1Timeline, "p2", "o2"); + + final PartitionEndStatistics endStats1 = mockEndStats(NakadiCursor.of(et1Timeline, "p1", "o1")); + final PartitionEndStatistics endStats2 = mockEndStats(NakadiCursor.of(et1Timeline, "p2", "o3")); + + // mock first committed cursor to be at the tail - the expected time lag should be 0 + when(cursorComparator.compare(committedCursor1, endStats1.getLast())).thenReturn(0); + + // mock second committed cursor to be lower than tail - the expected time lag should be > 0 + when(cursorComparator.compare(committedCursor2, endStats2.getLast())).thenReturn(-1); + + final Map timeLags = timeLagService.getTimeLags( + ImmutableList.of(committedCursor1, committedCursor2), + ImmutableList.of(endStats1, endStats2)); + + assertThat(timeLags.entrySet(), hasSize(2)); + assertThat(timeLags.get(new EventTypePartition("et1", "p1")), equalTo(Duration.ZERO)); + assertThat(timeLags.get(new EventTypePartition("et1", "p2")), greaterThan(Duration.ZERO)); + } + + + @Test(expected = InconsistentStateException.class) + @SuppressWarnings("unchecked") + public void whenNakadiExceptionThenInconsistentStateExceptionIsThrown() + throws NakadiException, InvalidCursorException { + when(timelineService.createEventConsumer(any(), any())).thenThrow(NakadiException.class); + + final Timeline et1Timeline = new Timeline("et1", 0, new Storage("", Storage.Type.KAFKA), "t1", null); + final NakadiCursor committedCursor1 = NakadiCursor.of(et1Timeline, "p1", "o1"); + + timeLagService.getTimeLags(ImmutableList.of(committedCursor1), ImmutableList.of()); + } + + @Test(expected = ErrorGettingCursorTimeLagException.class) + @SuppressWarnings("unchecked") + public void whenInvalidCursorThenErrorGettingCursorTimeLagExceptionIsThrown() + throws NakadiException, InvalidCursorException { + when(timelineService.createEventConsumer(any(), any())).thenThrow(InvalidCursorException.class); + + final Timeline et1Timeline = new Timeline("et1", 0, new Storage("", Storage.Type.KAFKA), "t1", null); + final NakadiCursor committedCursor1 = NakadiCursor.of(et1Timeline, "p1", "o1"); + + timeLagService.getTimeLags(ImmutableList.of(committedCursor1), ImmutableList.of()); + } + + private PartitionEndStatistics mockEndStats(final NakadiCursor nakadiCursor) { + final PartitionEndStatistics endStats = mock(PartitionEndStatistics.class); + when(endStats.getLast()).thenReturn(nakadiCursor); + return endStats; + } +} diff --git a/src/test/java/org/zalando/nakadi/service/subscription/state/PartitionDataTest.java b/src/test/java/org/zalando/nakadi/service/subscription/state/PartitionDataTest.java index af2324ff42..055c36f819 100644 --- a/src/test/java/org/zalando/nakadi/service/subscription/state/PartitionDataTest.java +++ b/src/test/java/org/zalando/nakadi/service/subscription/state/PartitionDataTest.java @@ -60,7 +60,7 @@ public void onNewOffsetsShouldSupportCommitInFuture() { public void normalOperationShouldNotReconfigureKafkaConsumer() { final PartitionData pd = new PartitionData(COMP, null, createCursor(100L), System.currentTimeMillis()); for (long i = 0; i < 100; ++i) { - pd.addEvent(new ConsumedEvent(("test_" + i).getBytes(), createCursor(100L + i + 1))); + pd.addEvent(new ConsumedEvent(("test_" + i).getBytes(), createCursor(100L + i + 1), 0)); } // Now say to it that it was sent pd.takeEventsToStream(currentTimeMillis(), 1000, 0L, false); @@ -80,7 +80,7 @@ public void keepAliveCountShouldIncreaseOnEachEmptyCall() { pd.takeEventsToStream(currentTimeMillis(), 10, 0L, false); assertEquals(i + 1, pd.getKeepAliveInARow()); } - pd.addEvent(new ConsumedEvent("".getBytes(), createCursor(101L))); + pd.addEvent(new ConsumedEvent("".getBytes(), createCursor(101L), 0)); assertEquals(100, pd.getKeepAliveInARow()); pd.takeEventsToStream(currentTimeMillis(), 10, 0L, false); assertEquals(0, pd.getKeepAliveInARow()); @@ -95,7 +95,7 @@ public void eventsShouldBeStreamedOnTimeout() { final PartitionData pd = new PartitionData(COMP, null, createCursor(100L), currentTime); for (int i = 0; i < 100; ++i) { - pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i + 100L + 1))); + pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i + 100L + 1), 0)); } List data = pd.takeEventsToStream(currentTime, 1000, timeout, false); assertNull(data); @@ -108,7 +108,7 @@ public void eventsShouldBeStreamedOnTimeout() { assertEquals(100, data.size()); for (int i = 100; i < 200; ++i) { - pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i + 100L + 1))); + pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i + 100L + 1), 0)); } data = pd.takeEventsToStream(currentTime, 1000, timeout, false); assertNull(data); @@ -126,7 +126,7 @@ public void eventsShouldBeStreamedOnBatchSize() { final long timeout = TimeUnit.SECONDS.toMillis(1); final PartitionData pd = new PartitionData(COMP, null, createCursor(100L), System.currentTimeMillis()); for (int i = 0; i < 100; ++i) { - pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i + 100L + 1))); + pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i + 100L + 1), 0)); } assertNull(pd.takeEventsToStream(currentTimeMillis(), 1000, timeout, false)); final List eventsToStream = pd.takeEventsToStream(currentTimeMillis(), 99, timeout, false); @@ -139,7 +139,7 @@ public void eventsShouldBeStreamedOnStreamTimeout() { final long timeout = TimeUnit.SECONDS.toMillis(100); final PartitionData pd = new PartitionData(COMP, null, createCursor(100L), System.currentTimeMillis()); for (int i = 0; i < 10; ++i) { - pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i))); + pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i), 0)); } assertEquals(10, pd.takeEventsToStream(currentTimeMillis(), 100, timeout, true).size()); } @@ -149,7 +149,7 @@ public void noEmptyBatchShouldBeStreamedOnStreamTimeoutWhenNoEvents() { final long timeout = TimeUnit.SECONDS.toMillis(100); final PartitionData pd = new PartitionData(COMP, null, createCursor(100L), System.currentTimeMillis()); for (int i = 0; i < 10; ++i) { - pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i))); + pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i), 0)); } assertNull(pd.takeEventsToStream(currentTimeMillis(), 0, timeout, true)); }