diff --git a/docs/_data/nakadi-event-bus-api.yaml b/docs/_data/nakadi-event-bus-api.yaml index 5f3dc57b54..72454a7c33 100644 --- a/docs/_data/nakadi-event-bus-api.yaml +++ b/docs/_data/nakadi-event-bus-api.yaml @@ -1442,7 +1442,12 @@ paths: - $ref: '#/parameters/SubscriptionId' - name: show_time_lag in: query - description: show consumer time lag + description: | + Shows consumer time lag as an optional field. + This option is a time consuming operation and Nakadi attempts to compute it in the best possible strategy. + In cases of failures, resulting in Nakadi being unable to compute it within a configurable timeout, + the field might either be partially present or not present (depending on the number of successful requests) + in the output. type: boolean default: false responses: diff --git a/src/main/java/org/zalando/nakadi/controller/ExceptionHandling.java b/src/main/java/org/zalando/nakadi/controller/ExceptionHandling.java index da288f2ff9..a41f58054f 100644 --- a/src/main/java/org/zalando/nakadi/controller/ExceptionHandling.java +++ b/src/main/java/org/zalando/nakadi/controller/ExceptionHandling.java @@ -17,7 +17,6 @@ import org.zalando.nakadi.exceptions.runtime.InternalNakadiException; import org.zalando.nakadi.exceptions.runtime.InvalidLimitException; import org.zalando.nakadi.exceptions.runtime.InvalidVersionNumberException; -import org.zalando.nakadi.exceptions.runtime.LimitReachedException; import org.zalando.nakadi.exceptions.runtime.NakadiBaseException; import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException; import org.zalando.nakadi.exceptions.runtime.NoSuchEventTypeException; @@ -25,7 +24,6 @@ import org.zalando.nakadi.exceptions.runtime.RepositoryProblemException; import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException; import org.zalando.nakadi.exceptions.runtime.UnprocessableEntityException; -import org.zalando.problem.MoreStatus; import org.zalando.problem.Problem; import org.zalando.problem.spring.web.advice.ProblemHandling; import org.zalando.problem.spring.web.advice.Responses; @@ -117,13 +115,6 @@ public ResponseEntity handleInternalError(final NakadiBaseException exc return Responses.create(Response.Status.INTERNAL_SERVER_ERROR, exception.getMessage(), request); } - @ExceptionHandler(LimitReachedException.class) - public ResponseEntity handleLimitReachedException( - final ServiceTemporarilyUnavailableException exception, final NativeWebRequest request) { - LOG.warn(exception.getMessage()); - return Responses.create(MoreStatus.TOO_MANY_REQUESTS, exception.getMessage(), request); - } - @ExceptionHandler(DbWriteOperationsBlockedException.class) public ResponseEntity handleDbWriteOperationsBlockedException( final DbWriteOperationsBlockedException exception, final NativeWebRequest request) { diff --git a/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java b/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java index 4e87708420..4bb9ccd1e6 100644 --- a/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java +++ b/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java @@ -23,7 +23,6 @@ import org.zalando.nakadi.exceptions.runtime.NoSuchEventTypeException; import org.zalando.nakadi.exceptions.runtime.NoSuchSubscriptionException; import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException; -import org.zalando.nakadi.exceptions.runtime.TimeLagStatsTimeoutException; import org.zalando.nakadi.service.FeatureToggleService; import org.zalando.nakadi.service.subscription.SubscriptionService; import org.zalando.nakadi.service.subscription.SubscriptionService.StatsMode; @@ -31,7 +30,6 @@ import org.zalando.problem.spring.web.advice.Responses; import javax.annotation.Nullable; -import javax.ws.rs.core.Response; import java.util.Set; import static javax.ws.rs.core.Response.Status.NOT_IMPLEMENTED; @@ -122,12 +120,4 @@ public ResponseEntity handleServiceUnavailableResponses(final NakadiBas exception.getMessage()), request); } - - @ExceptionHandler(TimeLagStatsTimeoutException.class) - public ResponseEntity handleTimeLagStatsTimeoutException(final TimeLagStatsTimeoutException e, - final NativeWebRequest request) { - LOG.warn(e.getMessage()); - return Responses.create(Response.Status.REQUEST_TIMEOUT, e.getMessage(), request); - } - } diff --git a/src/main/java/org/zalando/nakadi/exceptions/runtime/LimitReachedException.java b/src/main/java/org/zalando/nakadi/exceptions/runtime/LimitReachedException.java deleted file mode 100644 index 9c1f02b129..0000000000 --- a/src/main/java/org/zalando/nakadi/exceptions/runtime/LimitReachedException.java +++ /dev/null @@ -1,9 +0,0 @@ -package org.zalando.nakadi.exceptions.runtime; - -public class LimitReachedException extends NakadiBaseException { - - public LimitReachedException(final String msg, final Throwable cause) { - super(msg, cause); - } - -} diff --git a/src/main/java/org/zalando/nakadi/exceptions/runtime/TimeLagStatsTimeoutException.java b/src/main/java/org/zalando/nakadi/exceptions/runtime/TimeLagStatsTimeoutException.java deleted file mode 100644 index 2da65922dd..0000000000 --- a/src/main/java/org/zalando/nakadi/exceptions/runtime/TimeLagStatsTimeoutException.java +++ /dev/null @@ -1,9 +0,0 @@ -package org.zalando.nakadi.exceptions.runtime; - -public class TimeLagStatsTimeoutException extends NakadiBaseException { - - public TimeLagStatsTimeoutException(final String msg, final Throwable cause) { - super(msg, cause); - } - -} diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java index 89b6ae3403..4b56a9abe9 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java @@ -37,6 +37,7 @@ import org.zalando.nakadi.exceptions.runtime.SubscriptionUpdateConflictException; import org.zalando.nakadi.exceptions.runtime.TooManyPartitionsException; import org.zalando.nakadi.exceptions.runtime.WrongInitialCursorsException; + import org.zalando.nakadi.repository.EventTypeRepository; import org.zalando.nakadi.repository.TopicRepository; import org.zalando.nakadi.repository.db.SubscriptionDbRepository; diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java index 5f0a9ce351..11bc2423cc 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java @@ -2,6 +2,8 @@ import com.google.common.collect.ImmutableList; import org.echocat.jomon.runtime.concurrent.RetryForSpecifiedTimeStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.zalando.nakadi.domain.ConsumedEvent; @@ -11,9 +13,6 @@ import org.zalando.nakadi.exceptions.runtime.ErrorGettingCursorTimeLagException; import org.zalando.nakadi.exceptions.runtime.InconsistentStateException; import org.zalando.nakadi.exceptions.runtime.InvalidCursorException; -import org.zalando.nakadi.exceptions.runtime.LimitReachedException; -import org.zalando.nakadi.exceptions.runtime.NakadiBaseException; -import org.zalando.nakadi.exceptions.runtime.TimeLagStatsTimeoutException; import org.zalando.nakadi.repository.EventConsumer; import org.zalando.nakadi.service.NakadiCursorComparator; import org.zalando.nakadi.service.timeline.TimelineService; @@ -39,7 +38,7 @@ @Component public class SubscriptionTimeLagService { - + private static final Logger LOG = LoggerFactory.getLogger(SubscriptionTimeLagService.class); private static final int EVENT_FETCH_WAIT_TIME_MS = 1000; private static final int REQUEST_TIMEOUT_MS = 30000; private static final int MAX_THREADS_PER_REQUEST = 20; @@ -59,9 +58,7 @@ public SubscriptionTimeLagService(final TimelineService timelineService, } public Map getTimeLags(final Collection committedPositions, - final List endPositions) - throws ErrorGettingCursorTimeLagException, InconsistentStateException, LimitReachedException, - TimeLagStatsTimeoutException { + final List endPositions) { final TimeLagRequestHandler timeLagHandler = new TimeLagRequestHandler(timelineService, threadPool); final Map timeLags = new HashMap<>(); @@ -82,21 +79,12 @@ public Map getTimeLags(final Collection endPositions) { diff --git a/src/test/java/org/zalando/nakadi/service/SubscriptionTimeLagServiceTest.java b/src/test/java/org/zalando/nakadi/service/SubscriptionTimeLagServiceTest.java index df885b643d..3ad0c78d46 100644 --- a/src/test/java/org/zalando/nakadi/service/SubscriptionTimeLagServiceTest.java +++ b/src/test/java/org/zalando/nakadi/service/SubscriptionTimeLagServiceTest.java @@ -9,9 +9,6 @@ import org.zalando.nakadi.domain.PartitionEndStatistics; import org.zalando.nakadi.domain.Storage; import org.zalando.nakadi.domain.Timeline; -import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException; -import org.zalando.nakadi.exceptions.runtime.ErrorGettingCursorTimeLagException; -import org.zalando.nakadi.exceptions.runtime.InconsistentStateException; import org.zalando.nakadi.exceptions.runtime.InvalidCursorException; import org.zalando.nakadi.repository.EventConsumer; import org.zalando.nakadi.service.subscription.SubscriptionTimeLagService; @@ -24,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -79,28 +77,15 @@ public void testTimeLagsForTailAndNotTailPositions() throws InvalidCursorExcepti } - @Test(expected = InconsistentStateException.class) - @SuppressWarnings("unchecked") - public void whenNakadiRuntimeExceptionThenInconsistentStateExceptionIsThrown() - throws InvalidCursorException { - when(timelineService.createEventConsumer(any(), any())).thenThrow(NakadiRuntimeException.class); - - final Timeline et1Timeline = new Timeline("et1", 0, new Storage("", Storage.Type.KAFKA), "t1", null); - final NakadiCursor committedCursor1 = NakadiCursor.of(et1Timeline, "p1", "o1"); - - timeLagService.getTimeLags(ImmutableList.of(committedCursor1), ImmutableList.of()); - } - - @Test(expected = ErrorGettingCursorTimeLagException.class) - @SuppressWarnings("unchecked") - public void whenInvalidCursorThenErrorGettingCursorTimeLagExceptionIsThrown() - throws InvalidCursorException { - when(timelineService.createEventConsumer(any(), any())).thenThrow(InvalidCursorException.class); - + @Test + public void whenNoSubscriptionThenReturnSizeZeroMap() { + when(timelineService.createEventConsumer(any(), any())).thenReturn(null); final Timeline et1Timeline = new Timeline("et1", 0, new Storage("", Storage.Type.KAFKA), "t1", null); final NakadiCursor committedCursor1 = NakadiCursor.of(et1Timeline, "p1", "o1"); - timeLagService.getTimeLags(ImmutableList.of(committedCursor1), ImmutableList.of()); + final Map result = timeLagService.getTimeLags + (ImmutableList.of(committedCursor1), ImmutableList.of()); + assertThat(result.size(), is(0)); } private PartitionEndStatistics mockEndStats(final NakadiCursor nakadiCursor) {