Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #873 from zalando/ARUHA-1664-time-lag-stats
Browse files Browse the repository at this point in the history
ARUHA-1664: Subscription time lag stats
  • Loading branch information
v-stepanov authored May 25, 2018
2 parents 81bd9f9 + dbd04d9 commit 322819c
Show file tree
Hide file tree
Showing 22 changed files with 476 additions and 49 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

## [2.7.0] - 2018-05-25

### Added
- Extended subscription statistics endpoint with time-lag information

## [2.6.7] - 2018-05-15

### Fixed
Expand Down
10 changes: 10 additions & 0 deletions docs/_data/nakadi-event-bus-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1389,6 +1389,11 @@ paths:
description: exposes statistics of specified subscription
parameters:
- $ref: '#/parameters/SubscriptionId'
- name: show_time_lag
in: query
description: show consumer time lag
type: boolean
default: false
responses:
'200':
description: Ok
Expand Down Expand Up @@ -2703,6 +2708,11 @@ definitions:
The amount of events in this partition that are not yet consumed within this subscription.
The property may be absent at the moment when no events were yet consumed from the partition in this
subscription (In case of `read_from` is `BEGIN` or `END`)
consumer_lag_seconds:
type: number
description: |
Subscription consumer lag for this partition in seconds. Measured as the age of the oldest event of
this partition that is not yet consumed within this subscription.
stream_id:
type: string
description: the id of the stream that consumes data from this partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static java.util.stream.IntStream.rangeClosed;
import static org.echocat.jomon.runtime.concurrent.Retryer.executeWithRetry;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.core.IsEqual.equalTo;
Expand Down Expand Up @@ -280,10 +281,11 @@ public void userJourneyHila() throws InterruptedException, IOException {

// as we didn't commit, there should be still 4 unconsumed events
jsonRequestSpec()
.get("/subscriptions/{sid}/stats", subscription.getId())
.get("/subscriptions/{sid}/stats?show_time_lag=true", subscription.getId())
.then()
.statusCode(OK.value())
.body("items[0].partitions[0].unconsumed_events", equalTo(4));
.body("items[0].partitions[0].unconsumed_events", equalTo(4))
.body("items[0].partitions[0].consumer_lag_seconds", greaterThanOrEqualTo(0));

// commit cursor of latest event
final StreamBatch lastBatch = batches.get(batches.size() - 1);
Expand All @@ -293,10 +295,11 @@ public void userJourneyHila() throws InterruptedException, IOException {

// now there should be 0 unconsumed events
jsonRequestSpec()
.get("/subscriptions/{sid}/stats", subscription.getId())
.get("/subscriptions/{sid}/stats?show_time_lag=true", subscription.getId())
.then()
.statusCode(OK.value())
.body("items[0].partitions[0].unconsumed_events", equalTo(0));
.body("items[0].partitions[0].unconsumed_events", equalTo(0))
.body("items[0].partitions[0].consumer_lag_seconds", equalTo(0));

// get cursors
jsonRequestSpec()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ public void testGetSubscriptionStat() throws Exception {
"0",
"assigned",
15L,
null,
client.getSessionId(),
AUTO)))
);
Expand All @@ -340,6 +341,7 @@ public void testGetSubscriptionStat() throws Exception {
"0",
"assigned",
5L,
null,
client.getSessionId(),
AUTO)))
);
Expand Down Expand Up @@ -367,6 +369,7 @@ public void testGetSubscriptionStatWhenDirectAssignment() throws Exception {
"0",
"assigned",
0L,
null,
client.getSessionId(),
DIRECT
))))));
Expand Down Expand Up @@ -400,6 +403,7 @@ public void testSubscriptionStatsMultiET() throws IOException {
"0",
"assigned",
1L,
null,
client.getSessionId(),
AUTO
))))))
Expand All @@ -409,6 +413,7 @@ public void testSubscriptionStatsMultiET() throws IOException {
"0",
"assigned",
2L,
null,
client.getSessionId(),
AUTO
))))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.zalando.nakadi.exceptions.runtime.CursorConversionException;
import org.zalando.nakadi.exceptions.runtime.CursorsAreEmptyException;
import org.zalando.nakadi.exceptions.runtime.DbWriteOperationsBlockedException;
import org.zalando.nakadi.exceptions.runtime.LimitReachedException;
import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1;
import org.zalando.nakadi.exceptions.runtime.NoEventTypeException;
import org.zalando.nakadi.exceptions.runtime.RepositoryProblemException;
Expand Down Expand Up @@ -157,6 +158,13 @@ public ResponseEntity<Problem> handleServiceTemporarilyUnavailableException(
return Responses.create(Response.Status.SERVICE_UNAVAILABLE, exception.getMessage(), request);
}

@ExceptionHandler(LimitReachedException.class)
public ResponseEntity<Problem> handleLimitReachedException(
final ServiceTemporarilyUnavailableException exception, final NativeWebRequest request) {
LOG.warn(exception.getMessage());
return Responses.create(MoreStatus.TOO_MANY_REQUESTS, exception.getMessage(), request);
}

@ExceptionHandler(DbWriteOperationsBlockedException.class)
public ResponseEntity<Problem> handleDbWriteOperationsBlockedException(
final DbWriteOperationsBlockedException exception, final NativeWebRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,27 @@
import org.springframework.web.context.request.NativeWebRequest;
import org.zalando.nakadi.domain.ItemsWrapper;
import org.zalando.nakadi.domain.SubscriptionEventTypeStats;
import org.zalando.nakadi.exceptions.ErrorGettingCursorTimeLagException;
import org.zalando.nakadi.exceptions.NakadiException;
import org.zalando.nakadi.exceptions.runtime.FeatureNotAvailableException;
import org.zalando.nakadi.exceptions.runtime.InconsistentStateException;
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
import org.zalando.nakadi.exceptions.runtime.TimeLagStatsTimeoutException;
import org.zalando.nakadi.service.FeatureToggleService;
import org.zalando.nakadi.service.WebResult;
import org.zalando.nakadi.service.subscription.SubscriptionService;
import org.zalando.nakadi.service.FeatureToggleService;
import org.zalando.nakadi.service.subscription.SubscriptionService.StatsMode;
import org.zalando.problem.Problem;
import org.zalando.problem.spring.web.advice.Responses;

import javax.annotation.Nullable;
import javax.ws.rs.core.Response;
import java.util.Set;

import static javax.ws.rs.core.Response.Status.NOT_IMPLEMENTED;
import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
import static org.zalando.nakadi.service.FeatureToggleService.Feature.HIGH_LEVEL_API;
import static org.zalando.problem.MoreStatus.UNPROCESSABLE_ENTITY;


@RestController
Expand Down Expand Up @@ -58,8 +63,8 @@ public ResponseEntity<?> listSubscriptions(
final NativeWebRequest request) {
featureToggleService.checkFeatureOn(HIGH_LEVEL_API);

return WebResult.wrap(() ->
subscriptionService.listSubscriptions(owningApplication, eventTypes, showStatus, limit, offset),
return WebResult.wrap(
() -> subscriptionService.listSubscriptions(owningApplication, eventTypes, showStatus, limit, offset),
request);
}

Expand All @@ -82,11 +87,13 @@ public ResponseEntity<?> deleteSubscription(@PathVariable("id") final String sub

@RequestMapping(value = "/{id}/stats", method = RequestMethod.GET)
public ItemsWrapper<SubscriptionEventTypeStats> getSubscriptionStats(
@PathVariable("id") final String subscriptionId)
@PathVariable("id") final String subscriptionId,
@RequestParam(value = "show_time_lag", required = false, defaultValue = "false") final boolean showTimeLag)
throws NakadiException, InconsistentStateException, ServiceTemporarilyUnavailableException {
featureToggleService.checkFeatureOn(HIGH_LEVEL_API);

return subscriptionService.getSubscriptionStat(subscriptionId, true);
final StatsMode statsMode = showTimeLag ? StatsMode.TIMELAG : StatsMode.NORMAL;
return subscriptionService.getSubscriptionStat(subscriptionId, statsMode);
}

@ExceptionHandler(NakadiException.class)
Expand All @@ -103,6 +110,13 @@ public ResponseEntity<Problem> handleFeatureTurnedOff(final FeatureNotAvailableE
return Responses.create(Problem.valueOf(NOT_IMPLEMENTED, ex.getMessage()), request);
}

@ExceptionHandler(ErrorGettingCursorTimeLagException.class)
public ResponseEntity<Problem> handleTimeLagException(final ErrorGettingCursorTimeLagException ex,
final NativeWebRequest request) {
LOG.debug(ex.getMessage(), ex);
return Responses.create(Problem.valueOf(UNPROCESSABLE_ENTITY, ex.getMessage()), request);
}

@ExceptionHandler(InconsistentStateException.class)
public ResponseEntity<Problem> handleInconsistentState(final InconsistentStateException ex,
final NativeWebRequest request) {
Expand All @@ -116,7 +130,7 @@ public ResponseEntity<Problem> handleInconsistentState(final InconsistentStateEx

@ExceptionHandler(ServiceTemporarilyUnavailableException.class)
public ResponseEntity<Problem> handleServiceTemporarilyUnavailable(final ServiceTemporarilyUnavailableException ex,
final NativeWebRequest request) {
final NativeWebRequest request) {
LOG.debug(ex.getMessage(), ex);
return Responses.create(
Problem.valueOf(
Expand All @@ -125,4 +139,11 @@ public ResponseEntity<Problem> handleServiceTemporarilyUnavailable(final Service
request);
}

@ExceptionHandler(TimeLagStatsTimeoutException.class)
public ResponseEntity<Problem> handleTimeLagStatsTimeoutException(final TimeLagStatsTimeoutException e,
final NativeWebRequest request) {
LOG.warn(e.getMessage());
return Responses.create(Response.Status.REQUEST_TIMEOUT, e.getMessage(), request);
}

}
8 changes: 7 additions & 1 deletion src/main/java/org/zalando/nakadi/domain/ConsumedEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ public class ConsumedEvent {

private final byte[] event;
private final NakadiCursor position;
private final long timestamp;

public ConsumedEvent(final byte[] event, final NakadiCursor position) {
public ConsumedEvent(final byte[] event, final NakadiCursor position, final long timestamp) {
this.event = event;
this.position = position;
this.timestamp = timestamp;
}

public byte[] getEvent() {
Expand All @@ -22,6 +24,10 @@ public NakadiCursor getPosition() {
return position;
}

public long getTimestamp() {
return timestamp;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public String getDescription() {
@JsonInclude(JsonInclude.Include.NON_NULL)
private final Long unconsumedEvents;

@JsonInclude(JsonInclude.Include.NON_NULL)
private final Long consumerLagSeconds;

private final String streamId;

@JsonInclude(JsonInclude.Include.NON_NULL)
Expand All @@ -65,11 +68,13 @@ public Partition(
@JsonProperty("partition") final String partition,
@JsonProperty("state") final String state,
@JsonProperty("unconsumed_events") @Nullable final Long unconsumedEvents,
@JsonProperty("consumer_lag_seconds") @Nullable final Long consumerLagSeconds,
@JsonProperty("stream_id") final String streamId,
@JsonProperty("assignment_type") @Nullable final AssignmentType assignmentType) {
this.partition = partition;
this.state = state;
this.unconsumedEvents = unconsumedEvents;
this.consumerLagSeconds = consumerLagSeconds;
this.streamId = streamId;
this.assignmentType = assignmentType;
}
Expand All @@ -87,6 +92,11 @@ public Long getUnconsumedEvents() {
return unconsumedEvents;
}

@Nullable
public Long getConsumerLagSeconds() {
return consumerLagSeconds;
}

public String getStreamId() {
return streamId;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.zalando.nakadi.exceptions;

import org.zalando.nakadi.domain.NakadiCursor;
import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1;

public class ErrorGettingCursorTimeLagException extends MyNakadiRuntimeException1 {

private final NakadiCursor failedCursor;

public ErrorGettingCursorTimeLagException(final NakadiCursor failedCursor,
final Throwable cause) {
super("Error occurred when getting subscription time lag as as subscription cursor is wrong or expired: " +
failedCursor.toString(), cause);
this.failedCursor = failedCursor;
}

public NakadiCursor getFailedCursor() {
return failedCursor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

public class InconsistentStateException extends MyNakadiRuntimeException1 {

public InconsistentStateException(final String msg, final Exception cause) {
public InconsistentStateException(final String msg, final Throwable cause) {
super(msg, cause);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.zalando.nakadi.exceptions.runtime;

public class LimitReachedException extends MyNakadiRuntimeException1 {

public LimitReachedException(final String msg, final Throwable cause) {
super(msg, cause);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.zalando.nakadi.exceptions.runtime;

public class TimeLagStatsTimeoutException extends MyNakadiRuntimeException1 {

public TimeLagStatsTimeoutException(final String msg, final Throwable cause) {
super(msg, cause);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public List<ConsumedEvent> readEvents() {
for (final ConsumerRecord<byte[], byte[]> record : records) {
final KafkaCursor cursor = new KafkaCursor(record.topic(), record.partition(), record.offset());
final Timeline timeline = timelineMap.get(new TopicPartition(record.topic(), record.partition()));
result.add(new ConsumedEvent(record.value(), cursor.toNakadiCursor(timeline)));
result.add(new ConsumedEvent(record.value(), cursor.toNakadiCursor(timeline), record.timestamp()));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.zalando.nakadi.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.zalando.nakadi.domain.NakadiCursor;
import org.zalando.nakadi.domain.Timeline;
import org.zalando.nakadi.exceptions.InternalNakadiException;
Expand All @@ -11,9 +13,12 @@
import java.util.List;
import java.util.Objects;

@Component
public class NakadiCursorComparator implements Comparator<NakadiCursor> {

private final EventTypeCache eventTypeCache;

@Autowired
public NakadiCursorComparator(final EventTypeCache eventTypeCache) {
this.eventTypeCache = eventTypeCache;
}
Expand Down
Loading

0 comments on commit 322819c

Please sign in to comment.