From 8d42a25a8b362449ae696f1b4a935919cb926d73 Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Tue, 8 May 2018 16:26:32 +0200 Subject: [PATCH 01/24] ARUHA-1664: added debug loggin of events timestamp; --- .../nakadi/repository/kafka/NakadiKafkaConsumer.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java b/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java index 85d6bb05d7..6e672af0ec 100644 --- a/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java +++ b/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java @@ -4,6 +4,10 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.zalando.nakadi.domain.ConsumedEvent; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.repository.EventConsumer; @@ -17,6 +21,8 @@ public class NakadiKafkaConsumer implements EventConsumer.LowLevelConsumer { + private static final Logger LOG = LoggerFactory.getLogger(NakadiKafkaConsumer.class); + private final Consumer kafkaConsumer; private final long pollTimeout; private final Map timelineMap; @@ -57,6 +63,10 @@ public List readEvents() { } final ArrayList result = new ArrayList<>(records.count()); for (final ConsumerRecord record : records) { + + final DateTime eventTimestamp = new DateTime(record.timestamp(), DateTimeZone.UTC); + LOG.info("[EVENT_TIMESTAMP] offset: {}, timestamp: {}", record.offset(), eventTimestamp.toString()); + final KafkaCursor cursor = new KafkaCursor(record.topic(), record.partition(), record.offset()); final Timeline timeline = timelineMap.get(new TopicPartition(record.topic(), record.partition())); result.add(new ConsumedEvent(record.value(), cursor.toNakadiCursor(timeline))); From 5a8328f27b313445057c39a511dd543f963fe204 Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Tue, 8 May 2018 16:28:29 +0200 Subject: [PATCH 02/24] ARUHA-1664: added topic and partition info; --- .../zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java b/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java index 6e672af0ec..48c31f6863 100644 --- a/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java +++ b/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java @@ -65,7 +65,8 @@ public List readEvents() { for (final ConsumerRecord record : records) { final DateTime eventTimestamp = new DateTime(record.timestamp(), DateTimeZone.UTC); - LOG.info("[EVENT_TIMESTAMP] offset: {}, timestamp: {}", record.offset(), eventTimestamp.toString()); + LOG.info("[EVENT_TIMESTAMP] topic: {}, partition: {}, offset: {}, timestamp: {}", + record.topic(), record.partition(), record.offset(), eventTimestamp.toString()); final KafkaCursor cursor = new KafkaCursor(record.topic(), record.partition(), record.offset()); final Timeline timeline = timelineMap.get(new TopicPartition(record.topic(), record.partition())); From f385623dafd53557d9b4bc8dd388cbf07cb1cdfd Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Tue, 8 May 2018 19:05:27 +0200 Subject: [PATCH 03/24] ARUHA-1664: first version of time lag implementation; --- docs/_data/nakadi-event-bus-api.yaml | 4 ++ .../nakadi/webservice/hila/HilaAT.java | 5 ++ .../zalando/nakadi/domain/ConsumedEvent.java | 8 ++- .../domain/SubscriptionEventTypeStats.java | 10 +++ .../repository/kafka/NakadiKafkaConsumer.java | 7 +-- .../subscription/SubscriptionService.java | 61 ++++++++++++++++--- .../SubscriptionControllerTest.java | 2 +- .../kafka/NakadiKafkaConsumerTest.java | 6 +- .../nakadi/service/EventStreamTest.java | 20 +++--- .../subscription/state/PartitionDataTest.java | 14 ++--- 10 files changed, 103 insertions(+), 34 deletions(-) diff --git a/docs/_data/nakadi-event-bus-api.yaml b/docs/_data/nakadi-event-bus-api.yaml index 752630849c..db3829966c 100644 --- a/docs/_data/nakadi-event-bus-api.yaml +++ b/docs/_data/nakadi-event-bus-api.yaml @@ -2703,6 +2703,10 @@ definitions: The amount of events in this partition that are not yet consumed within this subscription. The property may be absent at the moment when no events were yet consumed from the partition in this subscription (In case of `read_from` is `BEGIN` or `END`) + time_lag: + type: number + description: | + The age (in seconds) of the oldest event of this partition of subscription that was not consumed. stream_id: type: string description: the id of the stream that consumes data from this partition diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java index c04ecfaf7b..deb1c2f281 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java @@ -322,6 +322,7 @@ public void testGetSubscriptionStat() throws Exception { "0", "assigned", 15L, + null, client.getSessionId(), AUTO))) ); @@ -340,6 +341,7 @@ public void testGetSubscriptionStat() throws Exception { "0", "assigned", 5L, + null, client.getSessionId(), AUTO))) ); @@ -367,6 +369,7 @@ public void testGetSubscriptionStatWhenDirectAssignment() throws Exception { "0", "assigned", 0L, + null, client.getSessionId(), DIRECT )))))); @@ -400,6 +403,7 @@ public void testSubscriptionStatsMultiET() throws IOException { "0", "assigned", 1L, + null, client.getSessionId(), AUTO )))))) @@ -409,6 +413,7 @@ public void testSubscriptionStatsMultiET() throws IOException { "0", "assigned", 2L, + null, client.getSessionId(), AUTO )))))); diff --git a/src/main/java/org/zalando/nakadi/domain/ConsumedEvent.java b/src/main/java/org/zalando/nakadi/domain/ConsumedEvent.java index 2e37ce2f14..fb63694935 100644 --- a/src/main/java/org/zalando/nakadi/domain/ConsumedEvent.java +++ b/src/main/java/org/zalando/nakadi/domain/ConsumedEvent.java @@ -8,10 +8,12 @@ public class ConsumedEvent { private final byte[] event; private final NakadiCursor position; + private final long timestamp; - public ConsumedEvent(final byte[] event, final NakadiCursor position) { + public ConsumedEvent(final byte[] event, final NakadiCursor position, final long timestamp) { this.event = event; this.position = position; + this.timestamp = timestamp; } public byte[] getEvent() { @@ -22,6 +24,10 @@ public NakadiCursor getPosition() { return position; } + public long getTimestamp() { + return timestamp; + } + @Override public boolean equals(final Object o) { if (this == o) { diff --git a/src/main/java/org/zalando/nakadi/domain/SubscriptionEventTypeStats.java b/src/main/java/org/zalando/nakadi/domain/SubscriptionEventTypeStats.java index 212864546a..b715a75b60 100644 --- a/src/main/java/org/zalando/nakadi/domain/SubscriptionEventTypeStats.java +++ b/src/main/java/org/zalando/nakadi/domain/SubscriptionEventTypeStats.java @@ -56,6 +56,9 @@ public String getDescription() { @JsonInclude(JsonInclude.Include.NON_NULL) private final Long unconsumedEvents; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final Long timeLag; + private final String streamId; @JsonInclude(JsonInclude.Include.NON_NULL) @@ -65,11 +68,13 @@ public Partition( @JsonProperty("partition") final String partition, @JsonProperty("state") final String state, @JsonProperty("unconsumed_events") @Nullable final Long unconsumedEvents, + @JsonProperty("time_lag") @Nullable final Long timeLag, @JsonProperty("stream_id") final String streamId, @JsonProperty("assignment_type") @Nullable final AssignmentType assignmentType) { this.partition = partition; this.state = state; this.unconsumedEvents = unconsumedEvents; + this.timeLag = timeLag; this.streamId = streamId; this.assignmentType = assignmentType; } @@ -87,6 +92,11 @@ public Long getUnconsumedEvents() { return unconsumedEvents; } + @Nullable + public Long getTimeLag() { + return timeLag; + } + public String getStreamId() { return streamId; } diff --git a/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java b/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java index 48c31f6863..8ce5b63617 100644 --- a/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java +++ b/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java @@ -63,14 +63,9 @@ public List readEvents() { } final ArrayList result = new ArrayList<>(records.count()); for (final ConsumerRecord record : records) { - - final DateTime eventTimestamp = new DateTime(record.timestamp(), DateTimeZone.UTC); - LOG.info("[EVENT_TIMESTAMP] topic: {}, partition: {}, offset: {}, timestamp: {}", - record.topic(), record.partition(), record.offset(), eventTimestamp.toString()); - final KafkaCursor cursor = new KafkaCursor(record.topic(), record.partition(), record.offset()); final Timeline timeline = timelineMap.get(new TopicPartition(record.topic(), record.partition())); - result.add(new ConsumedEvent(record.value(), cursor.toNakadiCursor(timeline))); + result.add(new ConsumedEvent(record.value(), cursor.toNakadiCursor(timeline), record.timestamp())); } return result; } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java index 440dc1d2ba..a6c098528f 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java @@ -1,6 +1,9 @@ package org.zalando.nakadi.service.subscription; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -9,6 +12,8 @@ import org.springframework.stereotype.Component; import org.springframework.web.util.UriComponents; import org.springframework.web.util.UriComponentsBuilder; +import org.zalando.nakadi.domain.ConsumedEvent; +import org.zalando.nakadi.domain.CursorError; import org.zalando.nakadi.domain.EventType; import org.zalando.nakadi.domain.EventTypePartition; import org.zalando.nakadi.domain.ItemsWrapper; @@ -23,6 +28,7 @@ import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.InternalNakadiException; import org.zalando.nakadi.exceptions.InvalidCursorException; +import org.zalando.nakadi.exceptions.NakadiException; import org.zalando.nakadi.exceptions.NoSuchEventTypeException; import org.zalando.nakadi.exceptions.NoSuchSubscriptionException; import org.zalando.nakadi.exceptions.Try; @@ -36,11 +42,13 @@ import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException; import org.zalando.nakadi.exceptions.runtime.TooManyPartitionsException; import org.zalando.nakadi.exceptions.runtime.WrongInitialCursorsException; +import org.zalando.nakadi.repository.EventConsumer; import org.zalando.nakadi.repository.EventTypeRepository; import org.zalando.nakadi.repository.TopicRepository; import org.zalando.nakadi.repository.db.SubscriptionDbRepository; import org.zalando.nakadi.service.CursorConverter; import org.zalando.nakadi.service.CursorOperationsService; +import org.zalando.nakadi.service.CursorsService; import org.zalando.nakadi.service.FeatureToggleService; import org.zalando.nakadi.service.NakadiKpiPublisher; import org.zalando.nakadi.service.Result; @@ -55,14 +63,17 @@ import javax.annotation.Nullable; import javax.ws.rs.core.Response; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; @Component @@ -272,6 +283,34 @@ private List loadPartitionEndStatistics(final Collection return topicPartitions; } + private Map> getTimeLags(final Collection committedPositions) { + return committedPositions.stream() + .collect(Collectors.toMap( + c -> new EventTypePartition(c.getTimeline().getEventType(), c.getPartition()), + this::getNextEventLag + )); + } + + private Optional getNextEventLag(final NakadiCursor cursor) { + try { + final EventConsumer consumer = timelineService.createEventConsumer("]|[o/7a", ImmutableList.of(cursor)); + final List events = consumer.readEvents(); + if (events.isEmpty()) { + return Optional.of(Duration.ZERO); + } + final ConsumedEvent firstEvent = events.iterator().next(); + final Duration duration = Duration.ofMillis(new Date().getTime() - firstEvent.getTimestamp()); + return Optional.of(duration); + + } catch (NakadiException e) { + e.printStackTrace(); + return Optional.empty(); + } catch (InvalidCursorException e) { + e.printStackTrace(); + return Optional.empty(); + } + } + private Map> getTimelinesByRepository(final Collection eventTypes) { return eventTypes.stream() .map(timelineService::getActiveTimeline) @@ -291,12 +330,15 @@ private List loadStats( final List result = new ArrayList<>(eventTypes.size()); final Collection committedPositions = getCommittedPositions(subscriptionNode, client); final List stats = loadPartitionEndStatistics(eventTypes); + + final Map> timeLags = getTimeLags(committedPositions); + for (final EventType eventType : eventTypes) { final List statsForEventType = stats.stream() .filter(s -> s.getTimeline().getEventType().equals(eventType.getName())) .collect(Collectors.toList()); result.add(getEventTypeStats(subscriptionNode, eventType.getName(), statsForEventType, - committedPositions)); + committedPositions, timeLags)); } return result; } @@ -314,14 +356,19 @@ private List loadLightStats(final Collection subscriptionNode, final String eventTypeName, final List stats, - final Collection committedPositions) { + final Collection committedPositions, + final Map> timeLags) { final List resultPartitions = new ArrayList<>(stats.size()); for (final PartitionBaseStatistics stat : stats) { final String partition = stat.getPartition(); final NakadiCursor lastPosition = ((PartitionEndStatistics) stat).getLast(); final Long distance = computeDistance(committedPositions, lastPosition); - resultPartitions.add(getPartitionStats(subscriptionNode, eventTypeName, partition, distance)); + final Long timeLag = timeLags.get(new EventTypePartition(eventTypeName, partition)) + .map(Duration::getSeconds) + .orElse(null); + + resultPartitions.add(getPartitionStats(subscriptionNode, eventTypeName, partition, distance, timeLag)); } resultPartitions.sort(Comparator.comparing(SubscriptionEventTypeStats.Partition::getPartition)); return new SubscriptionEventTypeStats(eventTypeName, resultPartitions); @@ -330,8 +377,8 @@ private SubscriptionEventTypeStats getEventTypeStats(final Optional subscriptionNode, final EventType eventType) { final List resultPartitions = new ArrayList<>(); - for (final String partition: getPartitionsList(eventType)) { - resultPartitions.add(getPartitionStats(subscriptionNode, eventType.getName(), partition, null)); + for (final String partition : getPartitionsList(eventType)) { + resultPartitions.add(getPartitionStats(subscriptionNode, eventType.getName(), partition, null, null)); } resultPartitions.sort(Comparator.comparing(SubscriptionEventTypeStats.Partition::getPartition)); return new SubscriptionEventTypeStats(eventType.getName(), resultPartitions); @@ -339,13 +386,13 @@ private SubscriptionEventTypeStats getEventTypeLightStats(final Optional subscriptionNode, final String eventTypeName, final String partition, - final Long distance) { + final Long distance, final Long timeLag) { final Partition.State state = getState(subscriptionNode, eventTypeName, partition); final String streamId = getStreamId(subscriptionNode, eventTypeName, partition); final SubscriptionEventTypeStats.Partition.AssignmentType assignmentType = getAssignmentType(subscriptionNode, eventTypeName, partition); return new SubscriptionEventTypeStats.Partition(partition, state.getDescription(), - distance, streamId, assignmentType); + distance, timeLag, streamId, assignmentType); } private Collection getCommittedPositions(final Optional subscriptionNode, diff --git a/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java b/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java index b6b046a86c..b1e473f0c2 100644 --- a/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java @@ -262,7 +262,7 @@ public void whenGetSubscriptionStatThenOk() throws Exception { Collections.singletonList(new SubscriptionEventTypeStats( TIMELINE.getEventType(), Collections.singletonList( - new SubscriptionEventTypeStats.Partition("0", "assigned", 10L, "xz", AUTO))) + new SubscriptionEventTypeStats.Partition("0", "assigned", 10L, null, "xz", AUTO))) ); getSubscriptionStats(subscription.getId()) diff --git a/src/test/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumerTest.java b/src/test/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumerTest.java index d36fff77f9..4e87c05de2 100644 --- a/src/test/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumerTest.java +++ b/src/test/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumerTest.java @@ -137,12 +137,14 @@ public void whenReadEventsThenGetRightEvents() { assertThat("The event we read first should have the same data as first mocked ConsumerRecord", consumedEvents.get(0), equalTo(new ConsumedEvent(event1, - new KafkaCursor(TOPIC, PARTITION, event1Offset).toNakadiCursor(timeline)))); + new KafkaCursor(TOPIC, PARTITION, event1Offset).toNakadiCursor(timeline), + 0))); assertThat("The event we read second should have the same data as second mocked ConsumerRecord", consumedEvents.get(1), equalTo(new ConsumedEvent(event2, - new KafkaCursor(TOPIC, PARTITION, event2Offset).toNakadiCursor(timeline)))); + new KafkaCursor(TOPIC, PARTITION, event2Offset).toNakadiCursor(timeline), + 0))); assertThat("The kafka poll should be called with timeout we defined", pollTimeoutCaptor.getValue(), equalTo(POLL_TIMEOUT)); diff --git a/src/test/java/org/zalando/nakadi/service/EventStreamTest.java b/src/test/java/org/zalando/nakadi/service/EventStreamTest.java index 25f7c16df7..3cb06e8390 100644 --- a/src/test/java/org/zalando/nakadi/service/EventStreamTest.java +++ b/src/test/java/org/zalando/nakadi/service/EventStreamTest.java @@ -319,7 +319,7 @@ public void whenReadingEventsTheOrderIsCorrect() throws NakadiException, IOExcep .boxed() .map(index -> new ConsumedEvent( ("event" + index).getBytes(UTF_8), NakadiCursor.of(TIMELINE, "0", - KafkaCursor.toNakadiOffset(index)))) + KafkaCursor.toNakadiOffset(index)), 0)) .collect(Collectors.toList())); final EventStream eventStream = @@ -360,12 +360,12 @@ public void whenReadFromMultiplePartitionsThenGroupedInBatchesAccordingToPartiti final ByteArrayOutputStream out = new ByteArrayOutputStream(); final LinkedList events = new LinkedList<>(); - events.add(new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, "0", "000000000000000000"))); - events.add(new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, "1", "000000000000000000"))); - events.add(new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, "2", "000000000000000000"))); - events.add(new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, "0", "000000000000000000"))); - events.add(new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, "1", "000000000000000000"))); - events.add(new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, "2", "000000000000000000"))); + events.add(new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, "0", "000000000000000000"), 0)); + events.add(new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, "1", "000000000000000000"), 0)); + events.add(new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, "2", "000000000000000000"), 0)); + events.add(new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, "0", "000000000000000000"), 0)); + events.add(new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, "1", "000000000000000000"), 0)); + events.add(new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, "2", "000000000000000000"), 0)); final EventStream eventStream = new EventStream(predefinedConsumer(events), out, config, mock(BlacklistService.class), cursorConverter, @@ -394,7 +394,7 @@ private static NakadiKafkaConsumer endlessDummyConsumerForPartition(final String final NakadiKafkaConsumer nakadiKafkaConsumer = mock(NakadiKafkaConsumer.class); when(nakadiKafkaConsumer.readEvents()) .thenReturn(Collections.singletonList( - new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, partition, "0")))); + new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, partition, "0"), 0))); return nakadiKafkaConsumer; } @@ -406,7 +406,7 @@ private static NakadiKafkaConsumer nCountDummyConsumerForPartition(final int eve if (eventsToCreate.get() > 0) { eventsToCreate.set(eventsToCreate.get() - 1); return Collections.singletonList( - new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, partition, "000000000000000000"))); + new ConsumedEvent(DUMMY, NakadiCursor.of(TIMELINE, partition, "000000000000000000"), 0)); } else { return Collections.emptyList(); } @@ -524,7 +524,7 @@ public void testWriteStreamInfoWhenPresent() { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final SubscriptionCursor cursor = new SubscriptionCursor("11", "000000000000000012", "event-type", "token-id"); final ArrayList events = Lists.newArrayList( - new ConsumedEvent("{\"a\":\"b\"}".getBytes(), mock(NakadiCursor.class))); + new ConsumedEvent("{\"a\":\"b\"}".getBytes(), mock(NakadiCursor.class), 0)); try { writerProvider.getWriter().writeSubscriptionBatch(baos, cursor, events, Optional.of("something")); diff --git a/src/test/java/org/zalando/nakadi/service/subscription/state/PartitionDataTest.java b/src/test/java/org/zalando/nakadi/service/subscription/state/PartitionDataTest.java index af2324ff42..055c36f819 100644 --- a/src/test/java/org/zalando/nakadi/service/subscription/state/PartitionDataTest.java +++ b/src/test/java/org/zalando/nakadi/service/subscription/state/PartitionDataTest.java @@ -60,7 +60,7 @@ public void onNewOffsetsShouldSupportCommitInFuture() { public void normalOperationShouldNotReconfigureKafkaConsumer() { final PartitionData pd = new PartitionData(COMP, null, createCursor(100L), System.currentTimeMillis()); for (long i = 0; i < 100; ++i) { - pd.addEvent(new ConsumedEvent(("test_" + i).getBytes(), createCursor(100L + i + 1))); + pd.addEvent(new ConsumedEvent(("test_" + i).getBytes(), createCursor(100L + i + 1), 0)); } // Now say to it that it was sent pd.takeEventsToStream(currentTimeMillis(), 1000, 0L, false); @@ -80,7 +80,7 @@ public void keepAliveCountShouldIncreaseOnEachEmptyCall() { pd.takeEventsToStream(currentTimeMillis(), 10, 0L, false); assertEquals(i + 1, pd.getKeepAliveInARow()); } - pd.addEvent(new ConsumedEvent("".getBytes(), createCursor(101L))); + pd.addEvent(new ConsumedEvent("".getBytes(), createCursor(101L), 0)); assertEquals(100, pd.getKeepAliveInARow()); pd.takeEventsToStream(currentTimeMillis(), 10, 0L, false); assertEquals(0, pd.getKeepAliveInARow()); @@ -95,7 +95,7 @@ public void eventsShouldBeStreamedOnTimeout() { final PartitionData pd = new PartitionData(COMP, null, createCursor(100L), currentTime); for (int i = 0; i < 100; ++i) { - pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i + 100L + 1))); + pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i + 100L + 1), 0)); } List data = pd.takeEventsToStream(currentTime, 1000, timeout, false); assertNull(data); @@ -108,7 +108,7 @@ public void eventsShouldBeStreamedOnTimeout() { assertEquals(100, data.size()); for (int i = 100; i < 200; ++i) { - pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i + 100L + 1))); + pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i + 100L + 1), 0)); } data = pd.takeEventsToStream(currentTime, 1000, timeout, false); assertNull(data); @@ -126,7 +126,7 @@ public void eventsShouldBeStreamedOnBatchSize() { final long timeout = TimeUnit.SECONDS.toMillis(1); final PartitionData pd = new PartitionData(COMP, null, createCursor(100L), System.currentTimeMillis()); for (int i = 0; i < 100; ++i) { - pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i + 100L + 1))); + pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i + 100L + 1), 0)); } assertNull(pd.takeEventsToStream(currentTimeMillis(), 1000, timeout, false)); final List eventsToStream = pd.takeEventsToStream(currentTimeMillis(), 99, timeout, false); @@ -139,7 +139,7 @@ public void eventsShouldBeStreamedOnStreamTimeout() { final long timeout = TimeUnit.SECONDS.toMillis(100); final PartitionData pd = new PartitionData(COMP, null, createCursor(100L), System.currentTimeMillis()); for (int i = 0; i < 10; ++i) { - pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i))); + pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i), 0)); } assertEquals(10, pd.takeEventsToStream(currentTimeMillis(), 100, timeout, true).size()); } @@ -149,7 +149,7 @@ public void noEmptyBatchShouldBeStreamedOnStreamTimeoutWhenNoEvents() { final long timeout = TimeUnit.SECONDS.toMillis(100); final PartitionData pd = new PartitionData(COMP, null, createCursor(100L), System.currentTimeMillis()); for (int i = 0; i < 10; ++i) { - pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i))); + pd.addEvent(new ConsumedEvent("test".getBytes(), createCursor(i), 0)); } assertNull(pd.takeEventsToStream(currentTimeMillis(), 0, timeout, true)); } From 9352377a74044b05554c71083df8e853365528b9 Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Wed, 9 May 2018 10:15:33 +0200 Subject: [PATCH 04/24] ARUHA-1664: working on time lag implementation; --- .../repository/kafka/NakadiKafkaConsumer.java | 2 - .../service/NakadiCursorComparator.java | 1 + .../subscription/SubscriptionService.java | 68 +++++++++++++------ .../service/timeline/TimelineService.java | 11 +-- .../SubscriptionControllerTest.java | 2 +- .../service/SubscriptionServiceTest.java | 2 +- 6 files changed, 59 insertions(+), 27 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java b/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java index 8ce5b63617..82012bd8f5 100644 --- a/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java +++ b/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java @@ -4,8 +4,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.zalando.nakadi.domain.ConsumedEvent; diff --git a/src/main/java/org/zalando/nakadi/service/NakadiCursorComparator.java b/src/main/java/org/zalando/nakadi/service/NakadiCursorComparator.java index 1737d09183..dce822ab9e 100644 --- a/src/main/java/org/zalando/nakadi/service/NakadiCursorComparator.java +++ b/src/main/java/org/zalando/nakadi/service/NakadiCursorComparator.java @@ -12,6 +12,7 @@ import java.util.Objects; public class NakadiCursorComparator implements Comparator { + private final EventTypeCache eventTypeCache; public NakadiCursorComparator(final EventTypeCache eventTypeCache) { diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java index a6c098528f..36483d6d0a 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java @@ -2,8 +2,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; +import org.echocat.jomon.runtime.concurrent.RetryForSpecifiedTimeStrategy; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -13,7 +12,6 @@ import org.springframework.web.util.UriComponents; import org.springframework.web.util.UriComponentsBuilder; import org.zalando.nakadi.domain.ConsumedEvent; -import org.zalando.nakadi.domain.CursorError; import org.zalando.nakadi.domain.EventType; import org.zalando.nakadi.domain.EventTypePartition; import org.zalando.nakadi.domain.ItemsWrapper; @@ -42,14 +40,15 @@ import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException; import org.zalando.nakadi.exceptions.runtime.TooManyPartitionsException; import org.zalando.nakadi.exceptions.runtime.WrongInitialCursorsException; -import org.zalando.nakadi.repository.EventConsumer; import org.zalando.nakadi.repository.EventTypeRepository; +import org.zalando.nakadi.repository.MultiTimelineEventConsumer; import org.zalando.nakadi.repository.TopicRepository; +import org.zalando.nakadi.repository.db.EventTypeCache; import org.zalando.nakadi.repository.db.SubscriptionDbRepository; import org.zalando.nakadi.service.CursorConverter; import org.zalando.nakadi.service.CursorOperationsService; -import org.zalando.nakadi.service.CursorsService; import org.zalando.nakadi.service.FeatureToggleService; +import org.zalando.nakadi.service.NakadiCursorComparator; import org.zalando.nakadi.service.NakadiKpiPublisher; import org.zalando.nakadi.service.Result; import org.zalando.nakadi.service.subscription.model.Partition; @@ -73,9 +72,11 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.function.Function; +import java.util.UUID; import java.util.stream.Collectors; +import static org.echocat.jomon.runtime.concurrent.Retryer.executeWithRetry; + @Component public class SubscriptionService { @@ -92,6 +93,7 @@ public class SubscriptionService { private final NakadiKpiPublisher nakadiKpiPublisher; private final FeatureToggleService featureToggleService; private final String subLogEventType; + private final EventTypeCache eventTypeCache; @Autowired public SubscriptionService(final SubscriptionDbRepository subscriptionRepository, @@ -103,6 +105,7 @@ public SubscriptionService(final SubscriptionDbRepository subscriptionRepository final CursorOperationsService cursorOperationsService, final NakadiKpiPublisher nakadiKpiPublisher, final FeatureToggleService featureToggleService, + final EventTypeCache eventTypeCache, @Value("${nakadi.kpi.event-types.nakadiSubscriptionLog}") final String subLogEventType) { this.subscriptionRepository = subscriptionRepository; this.subscriptionClientFactory = subscriptionClientFactory; @@ -113,6 +116,7 @@ public SubscriptionService(final SubscriptionDbRepository subscriptionRepository this.cursorOperationsService = cursorOperationsService; this.nakadiKpiPublisher = nakadiKpiPublisher; this.featureToggleService = featureToggleService; + this.eventTypeCache = eventTypeCache; this.subLogEventType = subLogEventType; } @@ -283,25 +287,37 @@ private List loadPartitionEndStatistics(final Collection return topicPartitions; } - private Map> getTimeLags(final Collection committedPositions) { + private Map> getTimeLags(final Collection committedPositions, + final List endPositions) { + + final NakadiCursorComparator cursorComparator = new NakadiCursorComparator(eventTypeCache); + final MultiTimelineEventConsumer consumer = timelineService.createMultiTimelineEventConsumer( + "time-lag-checker-" + UUID.randomUUID().toString()); + return committedPositions.stream() .collect(Collectors.toMap( c -> new EventTypePartition(c.getTimeline().getEventType(), c.getPartition()), - this::getNextEventLag + c -> { + final boolean isAtTail = endPositions.stream() + .filter(endStats -> endStats.getLast().getEventType().equals(c.getEventType()) + && endStats.getLast().getPartition().equals(c.getPartition())) + .findAny() + .map(endPos -> cursorComparator.compare(c, endPos.getLast()) >= 0) + .orElse(false); + + if (isAtTail) { + return Optional.of(Duration.ZERO); + } else { + return getNextEventTimeLag(c, consumer); + } + } )); } - private Optional getNextEventLag(final NakadiCursor cursor) { + private Optional getNextEventTimeLag(final NakadiCursor cursor, + final MultiTimelineEventConsumer consumer) { try { - final EventConsumer consumer = timelineService.createEventConsumer("]|[o/7a", ImmutableList.of(cursor)); - final List events = consumer.readEvents(); - if (events.isEmpty()) { - return Optional.of(Duration.ZERO); - } - final ConsumedEvent firstEvent = events.iterator().next(); - final Duration duration = Duration.ofMillis(new Date().getTime() - firstEvent.getTimestamp()); - return Optional.of(duration); - + consumer.reassign(ImmutableList.of(cursor)); } catch (NakadiException e) { e.printStackTrace(); return Optional.empty(); @@ -309,6 +325,20 @@ private Optional getNextEventLag(final NakadiCursor cursor) { e.printStackTrace(); return Optional.empty(); } + + final ConsumedEvent nextEvent = executeWithRetry( + () -> { + final List events = consumer.readEvents(); + return events.isEmpty() ? null : events.iterator().next(); + }, + new RetryForSpecifiedTimeStrategy(1000) + .withResultsThatForceRetry((ConsumedEvent) null)); + + if (nextEvent == null) { + return Optional.of(Duration.ZERO); + } + final Duration duration = Duration.ofMillis(new Date().getTime() - nextEvent.getTimestamp()); + return Optional.of(duration); } private Map> getTimelinesByRepository(final Collection eventTypes) { @@ -331,7 +361,7 @@ private List loadStats( final Collection committedPositions = getCommittedPositions(subscriptionNode, client); final List stats = loadPartitionEndStatistics(eventTypes); - final Map> timeLags = getTimeLags(committedPositions); + final Map> timeLags = getTimeLags(committedPositions, stats); for (final EventType eventType : eventTypes) { final List statsForEventType = stats.stream() diff --git a/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java b/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java index 1c819ecbc2..ebb2bdff1b 100644 --- a/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java +++ b/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java @@ -230,10 +230,13 @@ public TopicRepository getTopicRepository(final Timeline timeline) public EventConsumer createEventConsumer(@Nullable final String clientId, final List positions) throws NakadiException, InvalidCursorException { - final MultiTimelineEventConsumer result = new MultiTimelineEventConsumer( - clientId, this, timelineSync, new NakadiCursorComparator(eventTypeCache)); - result.reassign(positions); - return result; + final MultiTimelineEventConsumer consumer = createMultiTimelineEventConsumer(clientId); + consumer.reassign(positions); + return consumer; + } + + public MultiTimelineEventConsumer createMultiTimelineEventConsumer(@Nullable final String clientId) { + return new MultiTimelineEventConsumer(clientId, this, timelineSync, new NakadiCursorComparator(eventTypeCache)); } public EventConsumer.ReassignableEventConsumer createEventConsumer(@Nullable final String clientId) { diff --git a/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java b/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java index b1e473f0c2..ae6858498c 100644 --- a/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java +++ b/src/test/java/org/zalando/nakadi/controller/SubscriptionControllerTest.java @@ -122,7 +122,7 @@ public SubscriptionControllerTest() throws Exception { final NakadiKpiPublisher nakadiKpiPublisher = mock(NakadiKpiPublisher.class); final SubscriptionService subscriptionService = new SubscriptionService(subscriptionRepository, zkSubscriptionClientFactory, timelineService, eventTypeRepository, null, - cursorConverter, cursorOperationsService, nakadiKpiPublisher, featureToggleService, + cursorConverter, cursorOperationsService, nakadiKpiPublisher, featureToggleService, null, "subscription_log_et"); final SubscriptionController controller = new SubscriptionController(featureToggleService, subscriptionService); final ApplicationService applicationService = mock(ApplicationService.class); diff --git a/src/test/java/org/zalando/nakadi/service/SubscriptionServiceTest.java b/src/test/java/org/zalando/nakadi/service/SubscriptionServiceTest.java index 535c023333..c3520dfe1c 100644 --- a/src/test/java/org/zalando/nakadi/service/SubscriptionServiceTest.java +++ b/src/test/java/org/zalando/nakadi/service/SubscriptionServiceTest.java @@ -42,7 +42,7 @@ public SubscriptionServiceTest() throws Exception { subscriptionService = new SubscriptionService(subscriptionRepository, zkSubscriptionClientFactory, timelineService, eventTypeRepository, subscriptionValidationService, cursorConverter, - cursorOperationsService, nakadiKpiPublisher, featureToggleService, SUBSCRIPTION_LOG_ET); + cursorOperationsService, nakadiKpiPublisher, featureToggleService, null, SUBSCRIPTION_LOG_ET); } @Test From 0268b172ac4e7b77d2066767bfc57367eed0f9e4 Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Wed, 9 May 2018 11:46:31 +0200 Subject: [PATCH 05/24] ARUHA-1664: added errors handling; refactoring; ranaming; --- .../controller/SubscriptionController.java | 13 ++++- .../domain/SubscriptionEventTypeStats.java | 10 ++-- .../ErrorGettingCursorTimeLagException.java | 19 +++++++ .../subscription/SubscriptionService.java | 55 ++++++++++--------- 4 files changed, 63 insertions(+), 34 deletions(-) create mode 100644 src/main/java/org/zalando/nakadi/exceptions/ErrorGettingCursorTimeLagException.java diff --git a/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java b/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java index 075b0a7a39..7203741c10 100644 --- a/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java +++ b/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java @@ -14,6 +14,7 @@ import org.springframework.web.context.request.NativeWebRequest; import org.zalando.nakadi.domain.ItemsWrapper; import org.zalando.nakadi.domain.SubscriptionEventTypeStats; +import org.zalando.nakadi.exceptions.ErrorGettingCursorTimeLagException; import org.zalando.nakadi.exceptions.NakadiException; import org.zalando.nakadi.exceptions.runtime.FeatureNotAvailableException; import org.zalando.nakadi.exceptions.runtime.InconsistentStateException; @@ -82,11 +83,12 @@ public ResponseEntity deleteSubscription(@PathVariable("id") final String sub @RequestMapping(value = "/{id}/stats", method = RequestMethod.GET) public ItemsWrapper getSubscriptionStats( - @PathVariable("id") final String subscriptionId) + @PathVariable("id") final String subscriptionId, + @RequestParam(value = "show_time_lag", required = false, defaultValue = "false") final boolean showTimeLag) throws NakadiException, InconsistentStateException, ServiceTemporarilyUnavailableException { featureToggleService.checkFeatureOn(HIGH_LEVEL_API); - return subscriptionService.getSubscriptionStat(subscriptionId, true); + return subscriptionService.getSubscriptionStat(subscriptionId, true, showTimeLag); } @ExceptionHandler(NakadiException.class) @@ -103,6 +105,13 @@ public ResponseEntity handleFeatureTurnedOff(final FeatureNotAvailableE return Responses.create(Problem.valueOf(NOT_IMPLEMENTED, ex.getMessage()), request); } + @ExceptionHandler(ErrorGettingCursorTimeLagException.class) + public ResponseEntity handleTimeLagException(final ErrorGettingCursorTimeLagException ex, + final NativeWebRequest request) { + LOG.debug(ex.getMessage(), ex); + return Responses.create(Problem.valueOf(SERVICE_UNAVAILABLE, ex.getMessage()), request); + } + @ExceptionHandler(InconsistentStateException.class) public ResponseEntity handleInconsistentState(final InconsistentStateException ex, final NativeWebRequest request) { diff --git a/src/main/java/org/zalando/nakadi/domain/SubscriptionEventTypeStats.java b/src/main/java/org/zalando/nakadi/domain/SubscriptionEventTypeStats.java index b715a75b60..476ca930cf 100644 --- a/src/main/java/org/zalando/nakadi/domain/SubscriptionEventTypeStats.java +++ b/src/main/java/org/zalando/nakadi/domain/SubscriptionEventTypeStats.java @@ -57,7 +57,7 @@ public String getDescription() { private final Long unconsumedEvents; @JsonInclude(JsonInclude.Include.NON_NULL) - private final Long timeLag; + private final Long consumerLagSeconds; private final String streamId; @@ -68,13 +68,13 @@ public Partition( @JsonProperty("partition") final String partition, @JsonProperty("state") final String state, @JsonProperty("unconsumed_events") @Nullable final Long unconsumedEvents, - @JsonProperty("time_lag") @Nullable final Long timeLag, + @JsonProperty("consumer_lag_seconds") @Nullable final Long consumerLagSeconds, @JsonProperty("stream_id") final String streamId, @JsonProperty("assignment_type") @Nullable final AssignmentType assignmentType) { this.partition = partition; this.state = state; this.unconsumedEvents = unconsumedEvents; - this.timeLag = timeLag; + this.consumerLagSeconds = consumerLagSeconds; this.streamId = streamId; this.assignmentType = assignmentType; } @@ -93,8 +93,8 @@ public Long getUnconsumedEvents() { } @Nullable - public Long getTimeLag() { - return timeLag; + public Long getConsumerLagSeconds() { + return consumerLagSeconds; } public String getStreamId() { diff --git a/src/main/java/org/zalando/nakadi/exceptions/ErrorGettingCursorTimeLagException.java b/src/main/java/org/zalando/nakadi/exceptions/ErrorGettingCursorTimeLagException.java new file mode 100644 index 0000000000..4600545b19 --- /dev/null +++ b/src/main/java/org/zalando/nakadi/exceptions/ErrorGettingCursorTimeLagException.java @@ -0,0 +1,19 @@ +package org.zalando.nakadi.exceptions; + +import org.zalando.nakadi.domain.NakadiCursor; +import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1; + +public class ErrorGettingCursorTimeLagException extends MyNakadiRuntimeException1 { + + private final NakadiCursor failedCursor; + + public ErrorGettingCursorTimeLagException(final NakadiCursor failedCursor, + final Throwable cause) { + super("Error getting time lag for cursor: " + failedCursor.toString(), cause); + this.failedCursor = failedCursor; + } + + public NakadiCursor getFailedCursor() { + return failedCursor; + } +} diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java index 36483d6d0a..3b829120fb 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java @@ -1,6 +1,7 @@ package org.zalando.nakadi.service.subscription; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.echocat.jomon.runtime.concurrent.RetryForSpecifiedTimeStrategy; import org.json.JSONObject; @@ -24,6 +25,7 @@ import org.zalando.nakadi.domain.SubscriptionBase; import org.zalando.nakadi.domain.SubscriptionEventTypeStats; import org.zalando.nakadi.domain.Timeline; +import org.zalando.nakadi.exceptions.ErrorGettingCursorTimeLagException; import org.zalando.nakadi.exceptions.InternalNakadiException; import org.zalando.nakadi.exceptions.InvalidCursorException; import org.zalando.nakadi.exceptions.NakadiException; @@ -176,7 +178,7 @@ public Result listSubscriptions(@Nullable final String owningApplication, @Nulla new PaginationWrapper<>(subscriptions, paginationLinks); if (showStatus) { final List items = paginationWrapper.getItems(); - items.forEach(s -> s.setStatus(createSubscriptionStat(s, false))); + items.forEach(s -> s.setStatus(createSubscriptionStat(s, false, false))); } return Result.ok(paginationWrapper); } catch (final ServiceTemporarilyUnavailableException e) { @@ -229,7 +231,8 @@ public Result deleteSubscription(final String subscriptionId) throws DbWri } public ItemsWrapper getSubscriptionStat(final String subscriptionId, - final boolean includeDistance) + final boolean includeDistance, + final boolean showTimeLag) throws InconsistentStateException, NoSuchSubscriptionException, ServiceTemporarilyUnavailableException { final Subscription subscription; try { @@ -237,19 +240,21 @@ public ItemsWrapper getSubscriptionStat(final String } catch (final ServiceTemporarilyUnavailableException ex) { throw new InconsistentStateException(ex.getMessage()); } - final List subscriptionStat = createSubscriptionStat(subscription, includeDistance); + final List subscriptionStat = createSubscriptionStat(subscription, includeDistance, + showTimeLag); return new ItemsWrapper<>(subscriptionStat); } private List createSubscriptionStat(final Subscription subscription, - final boolean includeDistance) + final boolean includeDistance, + final boolean showTimeLag) throws InconsistentStateException, ServiceTemporarilyUnavailableException { final List eventTypes = getEventTypesForSubscription(subscription); final ZkSubscriptionClient subscriptionClient = createZkSubscriptionClient(subscription); final Optional zkSubscriptionNode = subscriptionClient.getZkSubscriptionNode(); if (includeDistance) { - return loadStats(eventTypes, zkSubscriptionNode, subscriptionClient); + return loadStats(eventTypes, zkSubscriptionNode, subscriptionClient, showTimeLag); } else { return loadLightStats(eventTypes, zkSubscriptionNode); } @@ -287,8 +292,8 @@ private List loadPartitionEndStatistics(final Collection return topicPartitions; } - private Map> getTimeLags(final Collection committedPositions, - final List endPositions) { + private Map getTimeLags(final Collection committedPositions, + final List endPositions) { final NakadiCursorComparator cursorComparator = new NakadiCursorComparator(eventTypeCache); final MultiTimelineEventConsumer consumer = timelineService.createMultiTimelineEventConsumer( @@ -306,7 +311,7 @@ private Map> getTimeLags(final Collection .orElse(false); if (isAtTail) { - return Optional.of(Duration.ZERO); + return Duration.ZERO; } else { return getNextEventTimeLag(c, consumer); } @@ -314,16 +319,11 @@ private Map> getTimeLags(final Collection )); } - private Optional getNextEventTimeLag(final NakadiCursor cursor, - final MultiTimelineEventConsumer consumer) { + private Duration getNextEventTimeLag(final NakadiCursor cursor, final MultiTimelineEventConsumer consumer) { try { consumer.reassign(ImmutableList.of(cursor)); - } catch (NakadiException e) { - e.printStackTrace(); - return Optional.empty(); - } catch (InvalidCursorException e) { - e.printStackTrace(); - return Optional.empty(); + } catch (final NakadiException | InvalidCursorException e) { + throw new ErrorGettingCursorTimeLagException(cursor, e); } final ConsumedEvent nextEvent = executeWithRetry( @@ -335,10 +335,10 @@ private Optional getNextEventTimeLag(final NakadiCursor cursor, .withResultsThatForceRetry((ConsumedEvent) null)); if (nextEvent == null) { - return Optional.of(Duration.ZERO); + return Duration.ZERO; + } else { + return Duration.ofMillis(new Date().getTime() - nextEvent.getTimestamp()); } - final Duration duration = Duration.ofMillis(new Date().getTime() - nextEvent.getTimestamp()); - return Optional.of(duration); } private Map> getTimelinesByRepository(final Collection eventTypes) { @@ -355,13 +355,15 @@ private List getPartitionsList(final EventType eventType) { private List loadStats( final Collection eventTypes, final Optional subscriptionNode, - final ZkSubscriptionClient client) + final ZkSubscriptionClient client, final boolean showTimeLag) throws ServiceTemporarilyUnavailableException, InconsistentStateException { final List result = new ArrayList<>(eventTypes.size()); final Collection committedPositions = getCommittedPositions(subscriptionNode, client); final List stats = loadPartitionEndStatistics(eventTypes); - final Map> timeLags = getTimeLags(committedPositions, stats); + final Map timeLags = showTimeLag ? + getTimeLags(committedPositions, stats) : + ImmutableMap.of(); for (final EventType eventType : eventTypes) { final List statsForEventType = stats.stream() @@ -387,18 +389,17 @@ private SubscriptionEventTypeStats getEventTypeStats(final Optional stats, final Collection committedPositions, - final Map> timeLags) { + final Map timeLags) { final List resultPartitions = new ArrayList<>(stats.size()); for (final PartitionBaseStatistics stat : stats) { final String partition = stat.getPartition(); final NakadiCursor lastPosition = ((PartitionEndStatistics) stat).getLast(); final Long distance = computeDistance(committedPositions, lastPosition); - final Long timeLag = timeLags.get(new EventTypePartition(eventTypeName, partition)) + final Long lagSeconds = Optional.ofNullable(timeLags.get(new EventTypePartition(eventTypeName, partition))) .map(Duration::getSeconds) .orElse(null); - - resultPartitions.add(getPartitionStats(subscriptionNode, eventTypeName, partition, distance, timeLag)); + resultPartitions.add(getPartitionStats(subscriptionNode, eventTypeName, partition, distance, lagSeconds)); } resultPartitions.sort(Comparator.comparing(SubscriptionEventTypeStats.Partition::getPartition)); return new SubscriptionEventTypeStats(eventTypeName, resultPartitions); @@ -416,13 +417,13 @@ private SubscriptionEventTypeStats getEventTypeLightStats(final Optional subscriptionNode, final String eventTypeName, final String partition, - final Long distance, final Long timeLag) { + final Long distance, final Long lagSeconds) { final Partition.State state = getState(subscriptionNode, eventTypeName, partition); final String streamId = getStreamId(subscriptionNode, eventTypeName, partition); final SubscriptionEventTypeStats.Partition.AssignmentType assignmentType = getAssignmentType(subscriptionNode, eventTypeName, partition); return new SubscriptionEventTypeStats.Partition(partition, state.getDescription(), - distance, timeLag, streamId, assignmentType); + distance, lagSeconds, streamId, assignmentType); } private Collection getCommittedPositions(final Optional subscriptionNode, From 0811346cdf32d543d3570ec41c7bace08710ed97 Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Wed, 9 May 2018 12:12:34 +0200 Subject: [PATCH 06/24] ARUHA-1664: refactoring; --- .../subscription/SubscriptionService.java | 69 +------------- .../SubscriptionTimeLagService.java | 95 +++++++++++++++++++ 2 files changed, 99 insertions(+), 65 deletions(-) create mode 100644 src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java index 3b829120fb..00ef6dab3d 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java @@ -1,9 +1,7 @@ package org.zalando.nakadi.service.subscription; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import org.echocat.jomon.runtime.concurrent.RetryForSpecifiedTimeStrategy; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -12,7 +10,6 @@ import org.springframework.stereotype.Component; import org.springframework.web.util.UriComponents; import org.springframework.web.util.UriComponentsBuilder; -import org.zalando.nakadi.domain.ConsumedEvent; import org.zalando.nakadi.domain.EventType; import org.zalando.nakadi.domain.EventTypePartition; import org.zalando.nakadi.domain.ItemsWrapper; @@ -25,10 +22,8 @@ import org.zalando.nakadi.domain.SubscriptionBase; import org.zalando.nakadi.domain.SubscriptionEventTypeStats; import org.zalando.nakadi.domain.Timeline; -import org.zalando.nakadi.exceptions.ErrorGettingCursorTimeLagException; import org.zalando.nakadi.exceptions.InternalNakadiException; import org.zalando.nakadi.exceptions.InvalidCursorException; -import org.zalando.nakadi.exceptions.NakadiException; import org.zalando.nakadi.exceptions.NoSuchEventTypeException; import org.zalando.nakadi.exceptions.NoSuchSubscriptionException; import org.zalando.nakadi.exceptions.Try; @@ -43,14 +38,11 @@ import org.zalando.nakadi.exceptions.runtime.TooManyPartitionsException; import org.zalando.nakadi.exceptions.runtime.WrongInitialCursorsException; import org.zalando.nakadi.repository.EventTypeRepository; -import org.zalando.nakadi.repository.MultiTimelineEventConsumer; import org.zalando.nakadi.repository.TopicRepository; -import org.zalando.nakadi.repository.db.EventTypeCache; import org.zalando.nakadi.repository.db.SubscriptionDbRepository; import org.zalando.nakadi.service.CursorConverter; import org.zalando.nakadi.service.CursorOperationsService; import org.zalando.nakadi.service.FeatureToggleService; -import org.zalando.nakadi.service.NakadiCursorComparator; import org.zalando.nakadi.service.NakadiKpiPublisher; import org.zalando.nakadi.service.Result; import org.zalando.nakadi.service.subscription.model.Partition; @@ -69,16 +61,12 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.Date; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.UUID; import java.util.stream.Collectors; -import static org.echocat.jomon.runtime.concurrent.Retryer.executeWithRetry; - @Component public class SubscriptionService { @@ -95,7 +83,7 @@ public class SubscriptionService { private final NakadiKpiPublisher nakadiKpiPublisher; private final FeatureToggleService featureToggleService; private final String subLogEventType; - private final EventTypeCache eventTypeCache; + private final SubscriptionTimeLagService subscriptionTimeLagService; @Autowired public SubscriptionService(final SubscriptionDbRepository subscriptionRepository, @@ -107,7 +95,7 @@ public SubscriptionService(final SubscriptionDbRepository subscriptionRepository final CursorOperationsService cursorOperationsService, final NakadiKpiPublisher nakadiKpiPublisher, final FeatureToggleService featureToggleService, - final EventTypeCache eventTypeCache, + final SubscriptionTimeLagService subscriptionTimeLagService, @Value("${nakadi.kpi.event-types.nakadiSubscriptionLog}") final String subLogEventType) { this.subscriptionRepository = subscriptionRepository; this.subscriptionClientFactory = subscriptionClientFactory; @@ -118,7 +106,7 @@ public SubscriptionService(final SubscriptionDbRepository subscriptionRepository this.cursorOperationsService = cursorOperationsService; this.nakadiKpiPublisher = nakadiKpiPublisher; this.featureToggleService = featureToggleService; - this.eventTypeCache = eventTypeCache; + this.subscriptionTimeLagService = subscriptionTimeLagService; this.subLogEventType = subLogEventType; } @@ -292,55 +280,6 @@ private List loadPartitionEndStatistics(final Collection return topicPartitions; } - private Map getTimeLags(final Collection committedPositions, - final List endPositions) { - - final NakadiCursorComparator cursorComparator = new NakadiCursorComparator(eventTypeCache); - final MultiTimelineEventConsumer consumer = timelineService.createMultiTimelineEventConsumer( - "time-lag-checker-" + UUID.randomUUID().toString()); - - return committedPositions.stream() - .collect(Collectors.toMap( - c -> new EventTypePartition(c.getTimeline().getEventType(), c.getPartition()), - c -> { - final boolean isAtTail = endPositions.stream() - .filter(endStats -> endStats.getLast().getEventType().equals(c.getEventType()) - && endStats.getLast().getPartition().equals(c.getPartition())) - .findAny() - .map(endPos -> cursorComparator.compare(c, endPos.getLast()) >= 0) - .orElse(false); - - if (isAtTail) { - return Duration.ZERO; - } else { - return getNextEventTimeLag(c, consumer); - } - } - )); - } - - private Duration getNextEventTimeLag(final NakadiCursor cursor, final MultiTimelineEventConsumer consumer) { - try { - consumer.reassign(ImmutableList.of(cursor)); - } catch (final NakadiException | InvalidCursorException e) { - throw new ErrorGettingCursorTimeLagException(cursor, e); - } - - final ConsumedEvent nextEvent = executeWithRetry( - () -> { - final List events = consumer.readEvents(); - return events.isEmpty() ? null : events.iterator().next(); - }, - new RetryForSpecifiedTimeStrategy(1000) - .withResultsThatForceRetry((ConsumedEvent) null)); - - if (nextEvent == null) { - return Duration.ZERO; - } else { - return Duration.ofMillis(new Date().getTime() - nextEvent.getTimestamp()); - } - } - private Map> getTimelinesByRepository(final Collection eventTypes) { return eventTypes.stream() .map(timelineService::getActiveTimeline) @@ -362,7 +301,7 @@ private List loadStats( final List stats = loadPartitionEndStatistics(eventTypes); final Map timeLags = showTimeLag ? - getTimeLags(committedPositions, stats) : + subscriptionTimeLagService.getTimeLags(committedPositions, stats) : ImmutableMap.of(); for (final EventType eventType : eventTypes) { diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java new file mode 100644 index 0000000000..a5079fbc0b --- /dev/null +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java @@ -0,0 +1,95 @@ +package org.zalando.nakadi.service.subscription; + +import com.google.common.collect.ImmutableList; +import org.echocat.jomon.runtime.concurrent.RetryForSpecifiedTimeStrategy; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.zalando.nakadi.domain.ConsumedEvent; +import org.zalando.nakadi.domain.EventTypePartition; +import org.zalando.nakadi.domain.NakadiCursor; +import org.zalando.nakadi.domain.PartitionEndStatistics; +import org.zalando.nakadi.exceptions.ErrorGettingCursorTimeLagException; +import org.zalando.nakadi.exceptions.InvalidCursorException; +import org.zalando.nakadi.exceptions.NakadiException; +import org.zalando.nakadi.repository.MultiTimelineEventConsumer; +import org.zalando.nakadi.repository.db.EventTypeCache; +import org.zalando.nakadi.service.NakadiCursorComparator; +import org.zalando.nakadi.service.timeline.TimelineService; + +import java.time.Duration; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.echocat.jomon.runtime.concurrent.Retryer.executeWithRetry; + +@Component +public class SubscriptionTimeLagService { + + private static final int EVENT_FETCH_WAIT_TIME = 1000; + + private final TimelineService timelineService; + private final EventTypeCache eventTypeCache; + + @Autowired + public SubscriptionTimeLagService(final TimelineService timelineService, + final EventTypeCache eventTypeCache) { + this.timelineService = timelineService; + this.eventTypeCache = eventTypeCache; + } + + public Map getTimeLags(final Collection committedPositions, + final List endPositions) + throws ErrorGettingCursorTimeLagException { + + final NakadiCursorComparator cursorComparator = new NakadiCursorComparator(eventTypeCache); + final MultiTimelineEventConsumer consumer = timelineService.createMultiTimelineEventConsumer( + "time-lag-checker-" + UUID.randomUUID().toString()); + + return committedPositions.stream() + .collect(Collectors.toMap( + c -> new EventTypePartition(c.getTimeline().getEventType(), c.getPartition()), + c -> { + final boolean isAtTail = endPositions.stream() + .filter(endStats -> endStats.getLast().getEventType().equals(c.getEventType()) + && endStats.getLast().getPartition().equals(c.getPartition())) + .findAny() + .map(endPos -> cursorComparator.compare(c, endPos.getLast()) >= 0) + .orElse(false); + + if (isAtTail) { + return Duration.ZERO; + } else { + return getNextEventTimeLag(c, consumer); + } + } + )); + } + + private Duration getNextEventTimeLag(final NakadiCursor cursor, final MultiTimelineEventConsumer consumer) + throws ErrorGettingCursorTimeLagException { + try { + consumer.reassign(ImmutableList.of(cursor)); + } catch (final NakadiException | InvalidCursorException e) { + throw new ErrorGettingCursorTimeLagException(cursor, e); + } + + final ConsumedEvent nextEvent = executeWithRetry( + () -> { + final List events = consumer.readEvents(); + return events.isEmpty() ? null : events.iterator().next(); + }, + new RetryForSpecifiedTimeStrategy(EVENT_FETCH_WAIT_TIME) + .withResultsThatForceRetry((ConsumedEvent) null)); + + if (nextEvent == null) { + return Duration.ZERO; + } else { + return Duration.ofMillis(new Date().getTime() - nextEvent.getTimestamp()); + } + } + +} From 56df799e8324b0669061bf60bb99c1df7de9643a Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Wed, 9 May 2018 14:42:46 +0200 Subject: [PATCH 07/24] ARUHA-1664: trying multithreading; --- .../SubscriptionTimeLagService.java | 71 +++++++++++++------ 1 file changed, 50 insertions(+), 21 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java index a5079fbc0b..ff8ae59b3a 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java @@ -11,6 +11,7 @@ import org.zalando.nakadi.exceptions.ErrorGettingCursorTimeLagException; import org.zalando.nakadi.exceptions.InvalidCursorException; import org.zalando.nakadi.exceptions.NakadiException; +import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1; import org.zalando.nakadi.repository.MultiTimelineEventConsumer; import org.zalando.nakadi.repository.db.EventTypeCache; import org.zalando.nakadi.service.NakadiCursorComparator; @@ -19,17 +20,23 @@ import java.time.Duration; import java.util.Collection; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.stream.Collectors; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import static org.echocat.jomon.runtime.concurrent.Retryer.executeWithRetry; @Component public class SubscriptionTimeLagService { - private static final int EVENT_FETCH_WAIT_TIME = 1000; + private static final int EVENT_FETCH_WAIT_TIME_MS = 1000; + private static final int COMPLETE_TIMEOUT_MS = 60000; private final TimelineService timelineService; private final EventTypeCache eventTypeCache; @@ -49,24 +56,46 @@ public Map getTimeLags(final Collection new EventTypePartition(c.getTimeline().getEventType(), c.getPartition()), - c -> { - final boolean isAtTail = endPositions.stream() - .filter(endStats -> endStats.getLast().getEventType().equals(c.getEventType()) - && endStats.getLast().getPartition().equals(c.getPartition())) - .findAny() - .map(endPos -> cursorComparator.compare(c, endPos.getLast()) >= 0) - .orElse(false); - - if (isAtTail) { - return Duration.ZERO; - } else { - return getNextEventTimeLag(c, consumer); - } - } - )); + final ThreadPoolExecutor executor = new ThreadPoolExecutor(0, 10, 100, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>()); + + Map> futures = new HashMap<>(); + Map result = new HashMap<>(); + + for (NakadiCursor c : committedPositions) { + final EventTypePartition etp = new EventTypePartition(c.getTimeline().getEventType(), c.getPartition()); + final boolean isAtTail = endPositions.stream() + .filter(endStats -> endStats.getLast().getEventType().equals(c.getEventType()) + && endStats.getLast().getPartition().equals(c.getPartition())) + .findAny() + .map(endPos -> cursorComparator.compare(c, endPos.getLast()) >= 0) + .orElse(false); + + if (isAtTail) { + result.put(etp, Duration.ZERO); + } else { + futures.put(etp, executor.submit(() -> getNextEventTimeLag(c, consumer))); + } + } + + executor.shutdown(); + try { + final boolean finished = executor.awaitTermination(COMPLETE_TIMEOUT_MS, TimeUnit.MILLISECONDS); + if (!finished) { + throw new MyNakadiRuntimeException1("timeout!!!"); + } + futures.forEach((partition, future) -> { + try { + result.put(partition, future.get()); + } catch (InterruptedException | ExecutionException e) { + throw new MyNakadiRuntimeException1("wtf!!!"); + } + }); + return result; + + } catch (InterruptedException e) { + throw new MyNakadiRuntimeException1("interrupted!!!"); + } } private Duration getNextEventTimeLag(final NakadiCursor cursor, final MultiTimelineEventConsumer consumer) @@ -82,7 +111,7 @@ private Duration getNextEventTimeLag(final NakadiCursor cursor, final MultiTimel final List events = consumer.readEvents(); return events.isEmpty() ? null : events.iterator().next(); }, - new RetryForSpecifiedTimeStrategy(EVENT_FETCH_WAIT_TIME) + new RetryForSpecifiedTimeStrategy(EVENT_FETCH_WAIT_TIME_MS) .withResultsThatForceRetry((ConsumedEvent) null)); if (nextEvent == null) { From 75548fd1dfb647a66c48b3f950693e6c7104472c Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Wed, 9 May 2018 14:51:21 +0200 Subject: [PATCH 08/24] ARUHA-1664: trying multithreading; --- .../SubscriptionTimeLagService.java | 40 +++++++++---------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java index ff8ae59b3a..27cee8131c 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java @@ -12,7 +12,7 @@ import org.zalando.nakadi.exceptions.InvalidCursorException; import org.zalando.nakadi.exceptions.NakadiException; import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1; -import org.zalando.nakadi.repository.MultiTimelineEventConsumer; +import org.zalando.nakadi.repository.EventConsumer; import org.zalando.nakadi.repository.db.EventTypeCache; import org.zalando.nakadi.service.NakadiCursorComparator; import org.zalando.nakadi.service.timeline.TimelineService; @@ -53,8 +53,6 @@ public Map getTimeLags(final Collection()); @@ -74,7 +72,7 @@ public Map getTimeLags(final Collection getNextEventTimeLag(c, consumer))); + futures.put(etp, executor.submit(() -> getNextEventTimeLag(c))); } } @@ -98,27 +96,27 @@ public Map getTimeLags(final Collection { + final List events = consumer.readEvents(); + return events.isEmpty() ? null : events.iterator().next(); + }, + new RetryForSpecifiedTimeStrategy(EVENT_FETCH_WAIT_TIME_MS) + .withResultsThatForceRetry((ConsumedEvent) null)); + + if (nextEvent == null) { + return Duration.ZERO; + } else { + return Duration.ofMillis(new Date().getTime() - nextEvent.getTimestamp()); + } } catch (final NakadiException | InvalidCursorException e) { throw new ErrorGettingCursorTimeLagException(cursor, e); } - - final ConsumedEvent nextEvent = executeWithRetry( - () -> { - final List events = consumer.readEvents(); - return events.isEmpty() ? null : events.iterator().next(); - }, - new RetryForSpecifiedTimeStrategy(EVENT_FETCH_WAIT_TIME_MS) - .withResultsThatForceRetry((ConsumedEvent) null)); - - if (nextEvent == null) { - return Duration.ZERO; - } else { - return Duration.ofMillis(new Date().getTime() - nextEvent.getTimestamp()); - } } } From fcaf89589dc6243fc853802776408b092c0cbda8 Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Wed, 9 May 2018 15:32:37 +0200 Subject: [PATCH 09/24] ARUHA-1664: added extra logging; --- .../SubscriptionTimeLagService.java | 45 ++++++++++++++----- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java index 27cee8131c..ee025fedc4 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java @@ -2,6 +2,8 @@ import com.google.common.collect.ImmutableList; import org.echocat.jomon.runtime.concurrent.RetryForSpecifiedTimeStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.zalando.nakadi.domain.ConsumedEvent; @@ -16,6 +18,7 @@ import org.zalando.nakadi.repository.db.EventTypeCache; import org.zalando.nakadi.service.NakadiCursorComparator; import org.zalando.nakadi.service.timeline.TimelineService; +import org.zalando.nakadi.util.TimeLogger; import java.time.Duration; import java.util.Collection; @@ -35,53 +38,66 @@ @Component public class SubscriptionTimeLagService { + private static final Logger LOG = LoggerFactory.getLogger(SubscriptionTimeLagService.class); + + private static final int EVENT_FETCH_WAIT_TIME_MS = 1000; private static final int COMPLETE_TIMEOUT_MS = 60000; private final TimelineService timelineService; - private final EventTypeCache eventTypeCache; + private final NakadiCursorComparator cursorComparator; @Autowired public SubscriptionTimeLagService(final TimelineService timelineService, final EventTypeCache eventTypeCache) { this.timelineService = timelineService; - this.eventTypeCache = eventTypeCache; + this.cursorComparator = new NakadiCursorComparator(eventTypeCache); } public Map getTimeLags(final Collection committedPositions, final List endPositions) throws ErrorGettingCursorTimeLagException { - final NakadiCursorComparator cursorComparator = new NakadiCursorComparator(eventTypeCache); + TimeLogger.startMeasure("TIME_LAG", "putting of execution"); - final ThreadPoolExecutor executor = new ThreadPoolExecutor(0, 10, 100, TimeUnit.MILLISECONDS, + final ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); Map> futures = new HashMap<>(); Map result = new HashMap<>(); - for (NakadiCursor c : committedPositions) { - final EventTypePartition etp = new EventTypePartition(c.getTimeline().getEventType(), c.getPartition()); + for (final NakadiCursor cursor : committedPositions) { + final EventTypePartition partition = new EventTypePartition(cursor.getTimeline().getEventType(), + cursor.getPartition()); final boolean isAtTail = endPositions.stream() - .filter(endStats -> endStats.getLast().getEventType().equals(c.getEventType()) - && endStats.getLast().getPartition().equals(c.getPartition())) + .map(PartitionEndStatistics::getLast) + .filter(last -> last.getEventType().equals(cursor.getEventType()) + && last.getPartition().equals(cursor.getPartition())) .findAny() - .map(endPos -> cursorComparator.compare(c, endPos.getLast()) >= 0) + .map(last -> cursorComparator.compare(cursor, last) >= 0) .orElse(false); if (isAtTail) { - result.put(etp, Duration.ZERO); + result.put(partition, Duration.ZERO); } else { - futures.put(etp, executor.submit(() -> getNextEventTimeLag(c))); + futures.put(partition, executor.submit(() -> getNextEventTimeLag(cursor))); } } + TimeLogger.startMeasure("TIME_LAG", "shutdown"); + executor.shutdown(); + + TimeLogger.startMeasure("TIME_LAG", "waiting to finish"); + try { final boolean finished = executor.awaitTermination(COMPLETE_TIMEOUT_MS, TimeUnit.MILLISECONDS); if (!finished) { throw new MyNakadiRuntimeException1("timeout!!!"); } + + TimeLogger.startMeasure("TIME_LAG", "extracting results from futures"); + futures.forEach((partition, future) -> { try { result.put(partition, future.get()); @@ -89,6 +105,9 @@ public Map getTimeLags(final Collection getTimeLags(final Collection(EVENT_FETCH_WAIT_TIME_MS) .withResultsThatForceRetry((ConsumedEvent) null)); + final long diff = System.currentTimeMillis() - start; + LOG.info("[PARTITION_TIME_LAG] " + cursor.getEventType() + ":" + cursor.getPartition() + " " + diff); + if (nextEvent == null) { return Duration.ZERO; } else { From 2b2739e4295360ddc750fa10f658db5a129287a8 Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Wed, 9 May 2018 16:02:55 +0200 Subject: [PATCH 10/24] ARUHA-1664: added extra logging; --- .../subscription/SubscriptionTimeLagService.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java index ee025fedc4..f7933517f6 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java @@ -28,9 +28,9 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import static org.echocat.jomon.runtime.concurrent.Retryer.executeWithRetry; @@ -60,8 +60,7 @@ public Map getTimeLags(final Collection()); + final ExecutorService executor = Executors.newFixedThreadPool(10); Map> futures = new HashMap<>(); Map result = new HashMap<>(); @@ -78,17 +77,18 @@ public Map getTimeLags(final Collection getNextEventTimeLag(cursor))); } } - TimeLogger.startMeasure("TIME_LAG", "shutdown"); + TimeLogger.addMeasure("shutdown"); executor.shutdown(); - TimeLogger.startMeasure("TIME_LAG", "waiting to finish"); + TimeLogger.addMeasure("waiting to finish"); try { final boolean finished = executor.awaitTermination(COMPLETE_TIMEOUT_MS, TimeUnit.MILLISECONDS); @@ -96,7 +96,7 @@ public Map getTimeLags(final Collection { try { From 547070108070ac1e1d46a62d34c8da0bc1499f5b Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Wed, 9 May 2018 17:17:38 +0200 Subject: [PATCH 11/24] ARUHA-1664: fixed error handling; --- .../controller/SubscriptionController.java | 3 +- .../ErrorGettingCursorTimeLagException.java | 5 +- .../SubscriptionTimeLagService.java | 55 ++++++++----------- 3 files changed, 27 insertions(+), 36 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java b/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java index 7203741c10..9a105266cf 100644 --- a/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java +++ b/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java @@ -31,6 +31,7 @@ import static javax.ws.rs.core.Response.Status.NOT_IMPLEMENTED; import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE; import static org.zalando.nakadi.service.FeatureToggleService.Feature.HIGH_LEVEL_API; +import static org.zalando.problem.MoreStatus.UNPROCESSABLE_ENTITY; @RestController @@ -109,7 +110,7 @@ public ResponseEntity handleFeatureTurnedOff(final FeatureNotAvailableE public ResponseEntity handleTimeLagException(final ErrorGettingCursorTimeLagException ex, final NativeWebRequest request) { LOG.debug(ex.getMessage(), ex); - return Responses.create(Problem.valueOf(SERVICE_UNAVAILABLE, ex.getMessage()), request); + return Responses.create(Problem.valueOf(UNPROCESSABLE_ENTITY, ex.getMessage()), request); } @ExceptionHandler(InconsistentStateException.class) diff --git a/src/main/java/org/zalando/nakadi/exceptions/ErrorGettingCursorTimeLagException.java b/src/main/java/org/zalando/nakadi/exceptions/ErrorGettingCursorTimeLagException.java index 4600545b19..fe442242a0 100644 --- a/src/main/java/org/zalando/nakadi/exceptions/ErrorGettingCursorTimeLagException.java +++ b/src/main/java/org/zalando/nakadi/exceptions/ErrorGettingCursorTimeLagException.java @@ -6,10 +6,11 @@ public class ErrorGettingCursorTimeLagException extends MyNakadiRuntimeException1 { private final NakadiCursor failedCursor; - + public ErrorGettingCursorTimeLagException(final NakadiCursor failedCursor, final Throwable cause) { - super("Error getting time lag for cursor: " + failedCursor.toString(), cause); + super("Error occurred when getting subscription time lag as as subscription cursor is wrong or expired: " + + failedCursor.toString(), cause); this.failedCursor = failedCursor; } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java index f7933517f6..58d83b273f 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java @@ -2,8 +2,6 @@ import com.google.common.collect.ImmutableList; import org.echocat.jomon.runtime.concurrent.RetryForSpecifiedTimeStrategy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.zalando.nakadi.domain.ConsumedEvent; @@ -13,12 +11,12 @@ import org.zalando.nakadi.exceptions.ErrorGettingCursorTimeLagException; import org.zalando.nakadi.exceptions.InvalidCursorException; import org.zalando.nakadi.exceptions.NakadiException; +import org.zalando.nakadi.exceptions.runtime.InconsistentStateException; import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1; import org.zalando.nakadi.repository.EventConsumer; import org.zalando.nakadi.repository.db.EventTypeCache; import org.zalando.nakadi.service.NakadiCursorComparator; import org.zalando.nakadi.service.timeline.TimelineService; -import org.zalando.nakadi.util.TimeLogger; import java.time.Duration; import java.util.Collection; @@ -38,11 +36,9 @@ @Component public class SubscriptionTimeLagService { - private static final Logger LOG = LoggerFactory.getLogger(SubscriptionTimeLagService.class); - - private static final int EVENT_FETCH_WAIT_TIME_MS = 1000; - private static final int COMPLETE_TIMEOUT_MS = 60000; + private static final int COMPLETE_TIMEOUT_MS = 30000; + private static final int LAG_CALCULATION_PARALLELISM = 10; private final TimelineService timelineService; private final NakadiCursorComparator cursorComparator; @@ -56,11 +52,9 @@ public SubscriptionTimeLagService(final TimelineService timelineService, public Map getTimeLags(final Collection committedPositions, final List endPositions) - throws ErrorGettingCursorTimeLagException { - - TimeLogger.startMeasure("TIME_LAG", "putting of execution"); + throws ErrorGettingCursorTimeLagException, InconsistentStateException { - final ExecutorService executor = Executors.newFixedThreadPool(10); + final ExecutorService executor = Executors.newFixedThreadPool(LAG_CALCULATION_PARALLELISM); Map> futures = new HashMap<>(); Map result = new HashMap<>(); @@ -77,46 +71,42 @@ public Map getTimeLags(final Collection getNextEventTimeLag(cursor))); } } - TimeLogger.addMeasure("shutdown"); - executor.shutdown(); - - TimeLogger.addMeasure("waiting to finish"); - try { final boolean finished = executor.awaitTermination(COMPLETE_TIMEOUT_MS, TimeUnit.MILLISECONDS); if (!finished) { - throw new MyNakadiRuntimeException1("timeout!!!"); + throw new InconsistentStateException("Timeout occurred when getting subscription time lag"); } - TimeLogger.addMeasure("extracting results from futures"); - futures.forEach((partition, future) -> { try { result.put(partition, future.get()); - } catch (InterruptedException | ExecutionException e) { - throw new MyNakadiRuntimeException1("wtf!!!"); + } catch (final InterruptedException e) { + throw new InconsistentStateException("Thread interrupted when getting subscription time lag", e); + } catch (final ExecutionException e) { + if (e.getCause() instanceof MyNakadiRuntimeException1) { + throw (MyNakadiRuntimeException1) e.getCause(); + } else { + throw new InconsistentStateException("Unexpected error occurred when getting subscription " + + "time lag", e); + } } }); - - LOG.info(TimeLogger.finishMeasure()); - return result; - } catch (InterruptedException e) { - throw new MyNakadiRuntimeException1("interrupted!!!"); + } catch (final InterruptedException e) { + throw new InconsistentStateException("Thread interrupted when getting subscription time lag", e); } } - private Duration getNextEventTimeLag(final NakadiCursor cursor) throws ErrorGettingCursorTimeLagException { - final long start = System.currentTimeMillis(); + private Duration getNextEventTimeLag(final NakadiCursor cursor) throws ErrorGettingCursorTimeLagException, + InconsistentStateException { try { final EventConsumer consumer = timelineService.createEventConsumer( "time-lag-checker-" + UUID.randomUUID().toString(), ImmutableList.of(cursor)); @@ -129,15 +119,14 @@ private Duration getNextEventTimeLag(final NakadiCursor cursor) throws ErrorGett new RetryForSpecifiedTimeStrategy(EVENT_FETCH_WAIT_TIME_MS) .withResultsThatForceRetry((ConsumedEvent) null)); - final long diff = System.currentTimeMillis() - start; - LOG.info("[PARTITION_TIME_LAG] " + cursor.getEventType() + ":" + cursor.getPartition() + " " + diff); - if (nextEvent == null) { return Duration.ZERO; } else { return Duration.ofMillis(new Date().getTime() - nextEvent.getTimestamp()); } - } catch (final NakadiException | InvalidCursorException e) { + } catch (final NakadiException e) { + throw new InconsistentStateException("Unexpected error happened when getting consumer time lag", e); + } catch (final InvalidCursorException e) { throw new ErrorGettingCursorTimeLagException(cursor, e); } } From 5826210b486702801db291c641602c938ccea543 Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Fri, 11 May 2018 12:16:01 +0200 Subject: [PATCH 12/24] ARUHA-1664: added tests; refactoring; --- .../nakadi/webservice/UserJourneyAT.java | 11 +- .../runtime/InconsistentStateException.java | 2 +- .../service/NakadiCursorComparator.java | 4 + .../SubscriptionTimeLagService.java | 51 ++++---- .../service/timeline/TimelineService.java | 11 +- .../SubscriptionTimLagServiceTest.java | 111 ++++++++++++++++++ 6 files changed, 150 insertions(+), 40 deletions(-) create mode 100644 src/test/java/org/zalando/nakadi/service/SubscriptionTimLagServiceTest.java diff --git a/src/acceptance-test/java/org/zalando/nakadi/webservice/UserJourneyAT.java b/src/acceptance-test/java/org/zalando/nakadi/webservice/UserJourneyAT.java index cb1beea502..163a6958ed 100644 --- a/src/acceptance-test/java/org/zalando/nakadi/webservice/UserJourneyAT.java +++ b/src/acceptance-test/java/org/zalando/nakadi/webservice/UserJourneyAT.java @@ -31,6 +31,7 @@ import static java.util.stream.IntStream.rangeClosed; import static org.echocat.jomon.runtime.concurrent.Retryer.executeWithRetry; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.core.IsEqual.equalTo; @@ -280,10 +281,11 @@ public void userJourneyHila() throws InterruptedException, IOException { // as we didn't commit, there should be still 4 unconsumed events jsonRequestSpec() - .get("/subscriptions/{sid}/stats", subscription.getId()) + .get("/subscriptions/{sid}/stats?show_time_lag=true", subscription.getId()) .then() .statusCode(OK.value()) - .body("items[0].partitions[0].unconsumed_events", equalTo(4)); + .body("items[0].partitions[0].unconsumed_events", equalTo(4)) + .body("items[0].partitions[0].consumer_lag_seconds", greaterThanOrEqualTo(0)); // commit cursor of latest event final StreamBatch lastBatch = batches.get(batches.size() - 1); @@ -293,10 +295,11 @@ public void userJourneyHila() throws InterruptedException, IOException { // now there should be 0 unconsumed events jsonRequestSpec() - .get("/subscriptions/{sid}/stats", subscription.getId()) + .get("/subscriptions/{sid}/stats?show_time_lag=true", subscription.getId()) .then() .statusCode(OK.value()) - .body("items[0].partitions[0].unconsumed_events", equalTo(0)); + .body("items[0].partitions[0].unconsumed_events", equalTo(0)) + .body("items[0].partitions[0].consumer_lag_seconds", equalTo(0)); // get cursors jsonRequestSpec() diff --git a/src/main/java/org/zalando/nakadi/exceptions/runtime/InconsistentStateException.java b/src/main/java/org/zalando/nakadi/exceptions/runtime/InconsistentStateException.java index 07406da155..ba3bc8d709 100644 --- a/src/main/java/org/zalando/nakadi/exceptions/runtime/InconsistentStateException.java +++ b/src/main/java/org/zalando/nakadi/exceptions/runtime/InconsistentStateException.java @@ -2,7 +2,7 @@ public class InconsistentStateException extends MyNakadiRuntimeException1 { - public InconsistentStateException(final String msg, final Exception cause) { + public InconsistentStateException(final String msg, final Throwable cause) { super(msg, cause); } diff --git a/src/main/java/org/zalando/nakadi/service/NakadiCursorComparator.java b/src/main/java/org/zalando/nakadi/service/NakadiCursorComparator.java index dce822ab9e..8f7e94e0cd 100644 --- a/src/main/java/org/zalando/nakadi/service/NakadiCursorComparator.java +++ b/src/main/java/org/zalando/nakadi/service/NakadiCursorComparator.java @@ -1,5 +1,7 @@ package org.zalando.nakadi.service; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.exceptions.InternalNakadiException; @@ -11,10 +13,12 @@ import java.util.List; import java.util.Objects; +@Component public class NakadiCursorComparator implements Comparator { private final EventTypeCache eventTypeCache; + @Autowired public NakadiCursorComparator(final EventTypeCache eventTypeCache) { this.eventTypeCache = eventTypeCache; } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java index 58d83b273f..c719b23bf1 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java @@ -12,9 +12,7 @@ import org.zalando.nakadi.exceptions.InvalidCursorException; import org.zalando.nakadi.exceptions.NakadiException; import org.zalando.nakadi.exceptions.runtime.InconsistentStateException; -import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1; import org.zalando.nakadi.repository.EventConsumer; -import org.zalando.nakadi.repository.db.EventTypeCache; import org.zalando.nakadi.service.NakadiCursorComparator; import org.zalando.nakadi.service.timeline.TimelineService; @@ -45,9 +43,9 @@ public class SubscriptionTimeLagService { @Autowired public SubscriptionTimeLagService(final TimelineService timelineService, - final EventTypeCache eventTypeCache) { + final NakadiCursorComparator cursorComparator) { this.timelineService = timelineService; - this.cursorComparator = new NakadiCursorComparator(eventTypeCache); + this.cursorComparator = cursorComparator; } public Map getTimeLags(final Collection committedPositions, @@ -56,21 +54,13 @@ public Map getTimeLags(final Collection> futures = new HashMap<>(); - Map result = new HashMap<>(); + final Map> futures = new HashMap<>(); + final Map result = new HashMap<>(); for (final NakadiCursor cursor : committedPositions) { final EventTypePartition partition = new EventTypePartition(cursor.getTimeline().getEventType(), cursor.getPartition()); - final boolean isAtTail = endPositions.stream() - .map(PartitionEndStatistics::getLast) - .filter(last -> last.getEventType().equals(cursor.getEventType()) - && last.getPartition().equals(cursor.getPartition())) - .findAny() - .map(last -> cursorComparator.compare(cursor, last) >= 0) - .orElse(false); - - if (isAtTail) { + if (isCursorAtTail(cursor, endPositions)) { result.put(partition, Duration.ZERO); } else { futures.put(partition, executor.submit(() -> getNextEventTimeLag(cursor))); @@ -83,25 +73,20 @@ public Map getTimeLags(final Collection { + for (final EventTypePartition partition : futures.keySet()) { + final Future future = futures.get(partition); try { result.put(partition, future.get()); - } catch (final InterruptedException e) { - throw new InconsistentStateException("Thread interrupted when getting subscription time lag", e); } catch (final ExecutionException e) { - if (e.getCause() instanceof MyNakadiRuntimeException1) { - throw (MyNakadiRuntimeException1) e.getCause(); - } else { - throw new InconsistentStateException("Unexpected error occurred when getting subscription " + - "time lag", e); - } + throw e.getCause(); } - }); + } return result; - } catch (final InterruptedException e) { - throw new InconsistentStateException("Thread interrupted when getting subscription time lag", e); + } catch (final InconsistentStateException | ErrorGettingCursorTimeLagException e) { + throw e; + } catch (final Throwable e) { + throw new InconsistentStateException("Unexpected error occurred when getting subscription time lag", e); } } @@ -131,4 +116,14 @@ private Duration getNextEventTimeLag(final NakadiCursor cursor) throws ErrorGett } } + private boolean isCursorAtTail(final NakadiCursor cursor, final List endPositions) { + return endPositions.stream() + .map(PartitionEndStatistics::getLast) + .filter(last -> last.getEventType().equals(cursor.getEventType()) + && last.getPartition().equals(cursor.getPartition())) + .findAny() + .map(last -> cursorComparator.compare(cursor, last) >= 0) + .orElse(false); + } + } diff --git a/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java b/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java index ebb2bdff1b..1c819ecbc2 100644 --- a/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java +++ b/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java @@ -230,13 +230,10 @@ public TopicRepository getTopicRepository(final Timeline timeline) public EventConsumer createEventConsumer(@Nullable final String clientId, final List positions) throws NakadiException, InvalidCursorException { - final MultiTimelineEventConsumer consumer = createMultiTimelineEventConsumer(clientId); - consumer.reassign(positions); - return consumer; - } - - public MultiTimelineEventConsumer createMultiTimelineEventConsumer(@Nullable final String clientId) { - return new MultiTimelineEventConsumer(clientId, this, timelineSync, new NakadiCursorComparator(eventTypeCache)); + final MultiTimelineEventConsumer result = new MultiTimelineEventConsumer( + clientId, this, timelineSync, new NakadiCursorComparator(eventTypeCache)); + result.reassign(positions); + return result; } public EventConsumer.ReassignableEventConsumer createEventConsumer(@Nullable final String clientId) { diff --git a/src/test/java/org/zalando/nakadi/service/SubscriptionTimLagServiceTest.java b/src/test/java/org/zalando/nakadi/service/SubscriptionTimLagServiceTest.java new file mode 100644 index 0000000000..1221ffe475 --- /dev/null +++ b/src/test/java/org/zalando/nakadi/service/SubscriptionTimLagServiceTest.java @@ -0,0 +1,111 @@ +package org.zalando.nakadi.service; + +import com.google.common.collect.ImmutableList; +import org.junit.Before; +import org.junit.Test; +import org.zalando.nakadi.domain.ConsumedEvent; +import org.zalando.nakadi.domain.EventTypePartition; +import org.zalando.nakadi.domain.NakadiCursor; +import org.zalando.nakadi.domain.PartitionEndStatistics; +import org.zalando.nakadi.domain.Storage; +import org.zalando.nakadi.domain.Timeline; +import org.zalando.nakadi.exceptions.ErrorGettingCursorTimeLagException; +import org.zalando.nakadi.exceptions.InvalidCursorException; +import org.zalando.nakadi.exceptions.NakadiException; +import org.zalando.nakadi.exceptions.runtime.InconsistentStateException; +import org.zalando.nakadi.repository.EventConsumer; +import org.zalando.nakadi.service.subscription.SubscriptionTimeLagService; +import org.zalando.nakadi.service.timeline.TimelineService; + +import java.time.Duration; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SubscriptionTimLagServiceTest { + + private static final long FAKE_EVENT_TIMESTAMP = 478220400000L; + + private NakadiCursorComparator cursorComparator; + private SubscriptionTimeLagService timeLagService; + private TimelineService timelineService; + + @Before + public void setUp() throws Exception { + timelineService = mock(TimelineService.class); + + cursorComparator = mock(NakadiCursorComparator.class); + timeLagService = new SubscriptionTimeLagService(timelineService, cursorComparator); + } + + @Test + public void testTimeLagsForTailAndNotTailPositions() throws NakadiException, InvalidCursorException { + + final EventConsumer eventConsumer = mock(EventConsumer.class); + final Timeline timeline = mock(Timeline.class); + when(timeline.getStorage()).thenReturn(new Storage("", Storage.Type.KAFKA)); + when(eventConsumer.readEvents()).thenAnswer(invocation -> + ImmutableList.of(new ConsumedEvent(null, NakadiCursor.of(timeline, "", ""), FAKE_EVENT_TIMESTAMP))); + + when(timelineService.createEventConsumer(any(), any())).thenReturn(eventConsumer); + + final Timeline et1Timeline = new Timeline("et1", 0, new Storage("", Storage.Type.KAFKA), "t1", null); + + final NakadiCursor committedCursor1 = NakadiCursor.of(et1Timeline, "p1", "o1"); + final NakadiCursor committedCursor2 = NakadiCursor.of(et1Timeline, "p2", "o2"); + + final PartitionEndStatistics endStats1 = mockEndStats(NakadiCursor.of(et1Timeline, "p1", "o1")); + final PartitionEndStatistics endStats2 = mockEndStats(NakadiCursor.of(et1Timeline, "p2", "o3")); + + // mock first committed cursor to be at the tail - the expected time lag should be 0 + when(cursorComparator.compare(committedCursor1, endStats1.getLast())).thenReturn(0); + + // mock second committed cursor to be lower than tail - the expected time lag should be > 0 + when(cursorComparator.compare(committedCursor2, endStats2.getLast())).thenReturn(-1); + + final Map timeLags = timeLagService.getTimeLags( + ImmutableList.of(committedCursor1, committedCursor2), + ImmutableList.of(endStats1, endStats2)); + + assertThat(timeLags.entrySet(), hasSize(2)); + assertThat(timeLags.get(new EventTypePartition("et1", "p1")), equalTo(Duration.ZERO)); + assertThat(timeLags.get(new EventTypePartition("et1", "p2")), greaterThan(Duration.ZERO)); + } + + + @Test(expected = InconsistentStateException.class) + @SuppressWarnings("unchecked") + public void whenNakadiExceptionThenInconsistentStateExceptionIsThrown() + throws NakadiException, InvalidCursorException { + when(timelineService.createEventConsumer(any(), any())).thenThrow(NakadiException.class); + + final Timeline et1Timeline = new Timeline("et1", 0, new Storage("", Storage.Type.KAFKA), "t1", null); + final NakadiCursor committedCursor1 = NakadiCursor.of(et1Timeline, "p1", "o1"); + + timeLagService.getTimeLags(ImmutableList.of(committedCursor1), ImmutableList.of()); + } + + @Test(expected = ErrorGettingCursorTimeLagException.class) + @SuppressWarnings("unchecked") + public void whenInvalidCursorThenErrorGettingCursorTimeLagExceptionIsThrown() + throws NakadiException, InvalidCursorException { + when(timelineService.createEventConsumer(any(), any())).thenThrow(InvalidCursorException.class); + + final Timeline et1Timeline = new Timeline("et1", 0, new Storage("", Storage.Type.KAFKA), "t1", null); + final NakadiCursor committedCursor1 = NakadiCursor.of(et1Timeline, "p1", "o1"); + + timeLagService.getTimeLags(ImmutableList.of(committedCursor1), ImmutableList.of()); + } + + private PartitionEndStatistics mockEndStats(final NakadiCursor nakadiCursor) { + final PartitionEndStatistics endStats = mock(PartitionEndStatistics.class); + when(endStats.getLast()).thenReturn(nakadiCursor); + return endStats; + } +} From c72dfb41b8337fec112b9b4508576f6ec640eb28 Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Fri, 11 May 2018 12:27:13 +0200 Subject: [PATCH 13/24] ARUHA-1664: updated API definition; --- docs/_data/nakadi-event-bus-api.yaml | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/docs/_data/nakadi-event-bus-api.yaml b/docs/_data/nakadi-event-bus-api.yaml index db3829966c..d90bbdfabf 100644 --- a/docs/_data/nakadi-event-bus-api.yaml +++ b/docs/_data/nakadi-event-bus-api.yaml @@ -1389,6 +1389,11 @@ paths: description: exposes statistics of specified subscription parameters: - $ref: '#/parameters/SubscriptionId' + - name: show_time_lag + in: query + description: show consumer time lag + type: boolean + default: false responses: '200': description: Ok @@ -2703,10 +2708,11 @@ definitions: The amount of events in this partition that are not yet consumed within this subscription. The property may be absent at the moment when no events were yet consumed from the partition in this subscription (In case of `read_from` is `BEGIN` or `END`) - time_lag: + consumer_lag_seconds: type: number description: | - The age (in seconds) of the oldest event of this partition of subscription that was not consumed. + Subscription consumer lag for this partition in seconds. Measured as the age of the oldest event of + this partition that is not yet consumed. stream_id: type: string description: the id of the stream that consumes data from this partition From 3dc873672098cd8f57c645c1b8a60f626a3f638a Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Fri, 11 May 2018 12:28:05 +0200 Subject: [PATCH 14/24] ARUHA-1664: updated changelog; --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1bdc331eb2..9c284a81e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +### Added +- Extended subscription statistics endpoint with time-lag information + ## [2.6.6] - 2018-05-08 ### Added From 6267933a3f4a38d94ee724a06a699f97a1323616 Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Fri, 11 May 2018 12:31:14 +0200 Subject: [PATCH 15/24] ARUHA-1664: updated API description; --- docs/_data/nakadi-event-bus-api.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/_data/nakadi-event-bus-api.yaml b/docs/_data/nakadi-event-bus-api.yaml index d90bbdfabf..a7d02cdfbb 100644 --- a/docs/_data/nakadi-event-bus-api.yaml +++ b/docs/_data/nakadi-event-bus-api.yaml @@ -2712,7 +2712,7 @@ definitions: type: number description: | Subscription consumer lag for this partition in seconds. Measured as the age of the oldest event of - this partition that is not yet consumed. + this partition that is not yet consumed within this subscription. stream_id: type: string description: the id of the stream that consumes data from this partition From 135ece571b31113fcbcba4ac002794143b25636c Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Fri, 11 May 2018 12:32:56 +0200 Subject: [PATCH 16/24] ARUHA-1664: removed unneeded logger; --- .../zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java b/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java index 82012bd8f5..83d5b2b541 100644 --- a/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java +++ b/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java @@ -4,8 +4,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.zalando.nakadi.domain.ConsumedEvent; import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.repository.EventConsumer; @@ -19,8 +17,6 @@ public class NakadiKafkaConsumer implements EventConsumer.LowLevelConsumer { - private static final Logger LOG = LoggerFactory.getLogger(NakadiKafkaConsumer.class); - private final Consumer kafkaConsumer; private final long pollTimeout; private final Map timelineMap; From 36ff06ad6adb6c505d566d59419174244969d7e7 Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Thu, 17 May 2018 14:31:57 +0200 Subject: [PATCH 17/24] ARUHA-1664: added closing of consumer; added timeout error; --- .../subscription/SubscriptionTimeLagService.java | 11 ++++++----- ...eTest.java => SubscriptionTimeLagServiceTest.java} | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) rename src/test/java/org/zalando/nakadi/service/{SubscriptionTimLagServiceTest.java => SubscriptionTimeLagServiceTest.java} (99%) diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java index c719b23bf1..a15ae3c84d 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java @@ -16,6 +16,7 @@ import org.zalando.nakadi.service.NakadiCursorComparator; import org.zalando.nakadi.service.timeline.TimelineService; +import java.io.IOException; import java.time.Duration; import java.util.Collection; import java.util.Date; @@ -92,9 +93,9 @@ public Map getTimeLags(final Collection { @@ -105,11 +106,11 @@ private Duration getNextEventTimeLag(final NakadiCursor cursor) throws ErrorGett .withResultsThatForceRetry((ConsumedEvent) null)); if (nextEvent == null) { - return Duration.ZERO; + throw new InconsistentStateException("Timeout waiting for events when getting consumer time lag"); } else { return Duration.ofMillis(new Date().getTime() - nextEvent.getTimestamp()); } - } catch (final NakadiException e) { + } catch (final NakadiException | IOException e) { throw new InconsistentStateException("Unexpected error happened when getting consumer time lag", e); } catch (final InvalidCursorException e) { throw new ErrorGettingCursorTimeLagException(cursor, e); diff --git a/src/test/java/org/zalando/nakadi/service/SubscriptionTimLagServiceTest.java b/src/test/java/org/zalando/nakadi/service/SubscriptionTimeLagServiceTest.java similarity index 99% rename from src/test/java/org/zalando/nakadi/service/SubscriptionTimLagServiceTest.java rename to src/test/java/org/zalando/nakadi/service/SubscriptionTimeLagServiceTest.java index 1221ffe475..6c444166c9 100644 --- a/src/test/java/org/zalando/nakadi/service/SubscriptionTimLagServiceTest.java +++ b/src/test/java/org/zalando/nakadi/service/SubscriptionTimeLagServiceTest.java @@ -28,7 +28,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class SubscriptionTimLagServiceTest { +public class SubscriptionTimeLagServiceTest { private static final long FAKE_EVENT_TIMESTAMP = 478220400000L; From 8a2eb564af944f6c7640b1e388114ec94b155c15 Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Fri, 18 May 2018 15:05:25 +0200 Subject: [PATCH 18/24] ARUHA-1664: replaced two bool values with enum; --- .../controller/SubscriptionController.java | 12 +++++---- .../subscription/SubscriptionService.java | 27 ++++++++++--------- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java b/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java index 9a105266cf..36abbd10a1 100644 --- a/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java +++ b/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java @@ -19,9 +19,10 @@ import org.zalando.nakadi.exceptions.runtime.FeatureNotAvailableException; import org.zalando.nakadi.exceptions.runtime.InconsistentStateException; import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException; +import org.zalando.nakadi.service.FeatureToggleService; import org.zalando.nakadi.service.WebResult; import org.zalando.nakadi.service.subscription.SubscriptionService; -import org.zalando.nakadi.service.FeatureToggleService; +import org.zalando.nakadi.service.subscription.SubscriptionService.StatsMode; import org.zalando.problem.Problem; import org.zalando.problem.spring.web.advice.Responses; @@ -60,8 +61,8 @@ public ResponseEntity listSubscriptions( final NativeWebRequest request) { featureToggleService.checkFeatureOn(HIGH_LEVEL_API); - return WebResult.wrap(() -> - subscriptionService.listSubscriptions(owningApplication, eventTypes, showStatus, limit, offset), + return WebResult.wrap( + () -> subscriptionService.listSubscriptions(owningApplication, eventTypes, showStatus, limit, offset), request); } @@ -89,7 +90,8 @@ public ItemsWrapper getSubscriptionStats( throws NakadiException, InconsistentStateException, ServiceTemporarilyUnavailableException { featureToggleService.checkFeatureOn(HIGH_LEVEL_API); - return subscriptionService.getSubscriptionStat(subscriptionId, true, showTimeLag); + final StatsMode statsMode = showTimeLag ? StatsMode.TIMELAG : StatsMode.NORMAL; + return subscriptionService.getSubscriptionStat(subscriptionId, statsMode); } @ExceptionHandler(NakadiException.class) @@ -126,7 +128,7 @@ public ResponseEntity handleInconsistentState(final InconsistentStateEx @ExceptionHandler(ServiceTemporarilyUnavailableException.class) public ResponseEntity handleServiceTemporarilyUnavailable(final ServiceTemporarilyUnavailableException ex, - final NativeWebRequest request) { + final NativeWebRequest request) { LOG.debug(ex.getMessage(), ex); return Responses.create( Problem.valueOf( diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java index 00ef6dab3d..265ea1ece3 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java @@ -85,6 +85,12 @@ public class SubscriptionService { private final String subLogEventType; private final SubscriptionTimeLagService subscriptionTimeLagService; + public enum StatsMode { + LIGHT, + NORMAL, + TIMELAG + } + @Autowired public SubscriptionService(final SubscriptionDbRepository subscriptionRepository, final SubscriptionClientFactory subscriptionClientFactory, @@ -166,7 +172,7 @@ public Result listSubscriptions(@Nullable final String owningApplication, @Nulla new PaginationWrapper<>(subscriptions, paginationLinks); if (showStatus) { final List items = paginationWrapper.getItems(); - items.forEach(s -> s.setStatus(createSubscriptionStat(s, false, false))); + items.forEach(s -> s.setStatus(createSubscriptionStat(s, StatsMode.LIGHT))); } return Result.ok(paginationWrapper); } catch (final ServiceTemporarilyUnavailableException e) { @@ -219,8 +225,7 @@ public Result deleteSubscription(final String subscriptionId) throws DbWri } public ItemsWrapper getSubscriptionStat(final String subscriptionId, - final boolean includeDistance, - final boolean showTimeLag) + final StatsMode statsMode) throws InconsistentStateException, NoSuchSubscriptionException, ServiceTemporarilyUnavailableException { final Subscription subscription; try { @@ -228,23 +233,21 @@ public ItemsWrapper getSubscriptionStat(final String } catch (final ServiceTemporarilyUnavailableException ex) { throw new InconsistentStateException(ex.getMessage()); } - final List subscriptionStat = createSubscriptionStat(subscription, includeDistance, - showTimeLag); + final List subscriptionStat = createSubscriptionStat(subscription, statsMode); return new ItemsWrapper<>(subscriptionStat); } private List createSubscriptionStat(final Subscription subscription, - final boolean includeDistance, - final boolean showTimeLag) + final StatsMode statsMode) throws InconsistentStateException, ServiceTemporarilyUnavailableException { final List eventTypes = getEventTypesForSubscription(subscription); final ZkSubscriptionClient subscriptionClient = createZkSubscriptionClient(subscription); final Optional zkSubscriptionNode = subscriptionClient.getZkSubscriptionNode(); - if (includeDistance) { - return loadStats(eventTypes, zkSubscriptionNode, subscriptionClient, showTimeLag); - } else { + if (statsMode == StatsMode.LIGHT) { return loadLightStats(eventTypes, zkSubscriptionNode); + } else { + return loadStats(eventTypes, zkSubscriptionNode, subscriptionClient, statsMode); } } @@ -294,13 +297,13 @@ private List getPartitionsList(final EventType eventType) { private List loadStats( final Collection eventTypes, final Optional subscriptionNode, - final ZkSubscriptionClient client, final boolean showTimeLag) + final ZkSubscriptionClient client, final StatsMode statsMode) throws ServiceTemporarilyUnavailableException, InconsistentStateException { final List result = new ArrayList<>(eventTypes.size()); final Collection committedPositions = getCommittedPositions(subscriptionNode, client); final List stats = loadPartitionEndStatistics(eventTypes); - final Map timeLags = showTimeLag ? + final Map timeLags = statsMode == StatsMode.TIMELAG ? subscriptionTimeLagService.getTimeLags(committedPositions, stats) : ImmutableMap.of(); From 1cc59779d75530eda9bebbb1d2665536786d3a9d Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Wed, 23 May 2018 11:18:33 +0200 Subject: [PATCH 19/24] ARUHA-1664: reworked the multithreading; --- .../SubscriptionTimeLagService.java | 153 +++++++++++------- 1 file changed, 95 insertions(+), 58 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java index a15ae3c84d..a50b0d5885 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java @@ -12,6 +12,7 @@ import org.zalando.nakadi.exceptions.InvalidCursorException; import org.zalando.nakadi.exceptions.NakadiException; import org.zalando.nakadi.exceptions.runtime.InconsistentStateException; +import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1; import org.zalando.nakadi.repository.EventConsumer; import org.zalando.nakadi.service.NakadiCursorComparator; import org.zalando.nakadi.service.timeline.TimelineService; @@ -24,11 +25,13 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.echocat.jomon.runtime.concurrent.Retryer.executeWithRetry; @@ -36,84 +39,58 @@ public class SubscriptionTimeLagService { private static final int EVENT_FETCH_WAIT_TIME_MS = 1000; + private static final int SINGLE_PARTITION_TIMEOUT_MS = 5000; private static final int COMPLETE_TIMEOUT_MS = 30000; - private static final int LAG_CALCULATION_PARALLELISM = 10; + private static final int MAX_THREADS_PER_REQUEST = 10; + private static final int TIME_LAG_COMMON_POOL_SIZE = 400; private final TimelineService timelineService; private final NakadiCursorComparator cursorComparator; + private final ThreadPoolExecutor threadPool; @Autowired public SubscriptionTimeLagService(final TimelineService timelineService, final NakadiCursorComparator cursorComparator) { this.timelineService = timelineService; this.cursorComparator = cursorComparator; + this.threadPool = new ThreadPoolExecutor(0, TIME_LAG_COMMON_POOL_SIZE, 60L, TimeUnit.SECONDS, + new SynchronousQueue<>()); } public Map getTimeLags(final Collection committedPositions, final List endPositions) throws ErrorGettingCursorTimeLagException, InconsistentStateException { - final ExecutorService executor = Executors.newFixedThreadPool(LAG_CALCULATION_PARALLELISM); - - final Map> futures = new HashMap<>(); - final Map result = new HashMap<>(); - - for (final NakadiCursor cursor : committedPositions) { - final EventTypePartition partition = new EventTypePartition(cursor.getTimeline().getEventType(), - cursor.getPartition()); - if (isCursorAtTail(cursor, endPositions)) { - result.put(partition, Duration.ZERO); - } else { - futures.put(partition, executor.submit(() -> getNextEventTimeLag(cursor))); - } - } - - executor.shutdown(); + final TimeLagRequestHandler timeLagHandler = new TimeLagRequestHandler(timelineService, threadPool); + final Map timeLags = new HashMap<>(); + final Map> futureTimeLags = new HashMap<>(); try { - final boolean finished = executor.awaitTermination(COMPLETE_TIMEOUT_MS, TimeUnit.MILLISECONDS); - if (!finished) { - throw new InconsistentStateException("Timeout occurred when getting subscription time lag"); - } - for (final EventTypePartition partition : futures.keySet()) { - final Future future = futures.get(partition); - try { - result.put(partition, future.get()); - } catch (final ExecutionException e) { - throw e.getCause(); + for (final NakadiCursor cursor : committedPositions) { + if (isCursorAtTail(cursor, endPositions)) { + timeLags.put(cursor.getEventTypePartition(), Duration.ZERO); + } else { + final CompletableFuture timeLagFuture = timeLagHandler.getCursorTimeLagFuture(cursor); + futureTimeLags.put(cursor.getEventTypePartition(), timeLagFuture); } } - return result; + CompletableFuture + .allOf(futureTimeLags.values().toArray(new CompletableFuture[futureTimeLags.size()])) + .get(COMPLETE_TIMEOUT_MS, TimeUnit.MILLISECONDS); - } catch (final InconsistentStateException | ErrorGettingCursorTimeLagException e) { - throw e; - } catch (final Throwable e) { - throw new InconsistentStateException("Unexpected error occurred when getting subscription time lag", e); - } - } - - private Duration getNextEventTimeLag(final NakadiCursor cursor) throws ErrorGettingCursorTimeLagException, - InconsistentStateException { - - try (final EventConsumer consumer = timelineService.createEventConsumer( - "time-lag-checker-" + UUID.randomUUID().toString(), ImmutableList.of(cursor))) { - - final ConsumedEvent nextEvent = executeWithRetry( - () -> { - final List events = consumer.readEvents(); - return events.isEmpty() ? null : events.iterator().next(); - }, - new RetryForSpecifiedTimeStrategy(EVENT_FETCH_WAIT_TIME_MS) - .withResultsThatForceRetry((ConsumedEvent) null)); + for (final EventTypePartition partition : futureTimeLags.keySet()) { + timeLags.put(partition, futureTimeLags.get(partition).get()); + } + return timeLags; - if (nextEvent == null) { - throw new InconsistentStateException("Timeout waiting for events when getting consumer time lag"); + } catch (final ExecutionException e) { + if (e.getCause() instanceof MyNakadiRuntimeException1) { + throw (MyNakadiRuntimeException1) e.getCause(); } else { - return Duration.ofMillis(new Date().getTime() - nextEvent.getTimestamp()); + throw new InconsistentStateException("Unexpected error occurred when getting subscription time lag", + e.getCause()); } - } catch (final NakadiException | IOException e) { - throw new InconsistentStateException("Unexpected error happened when getting consumer time lag", e); - } catch (final InvalidCursorException e) { - throw new ErrorGettingCursorTimeLagException(cursor, e); + } catch (final Throwable e) { + throw new InconsistentStateException("Unexpected error occurred when getting subscription time lag", e); } } @@ -127,4 +104,64 @@ private boolean isCursorAtTail(final NakadiCursor cursor, final List getCursorTimeLagFuture(final NakadiCursor cursor) + throws InterruptedException, TimeoutException { + + final CompletableFuture future = new CompletableFuture<>(); + if (semaphore.tryAcquire(SINGLE_PARTITION_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { + threadPool.submit(() -> { + try { + final Duration timeLag = getNextEventTimeLag(cursor); + future.complete(timeLag); + } catch (final Throwable e) { + future.completeExceptionally(e); + } finally { + semaphore.release(); + } + }); + } else { + throw new TimeoutException("Partition time lag timeout exceeded"); + } + return future; + } + + private Duration getNextEventTimeLag(final NakadiCursor cursor) throws ErrorGettingCursorTimeLagException, + InconsistentStateException { + + try (final EventConsumer consumer = timelineService.createEventConsumer( + "time-lag-checker-" + UUID.randomUUID().toString(), ImmutableList.of(cursor))) { + + final ConsumedEvent nextEvent = executeWithRetry( + () -> { + final List events = consumer.readEvents(); + return events.isEmpty() ? null : events.iterator().next(); + }, + new RetryForSpecifiedTimeStrategy(EVENT_FETCH_WAIT_TIME_MS) + .withResultsThatForceRetry((ConsumedEvent) null)); + + if (nextEvent == null) { + throw new InconsistentStateException("Timeout waiting for events when getting consumer time lag"); + } else { + return Duration.ofMillis(new Date().getTime() - nextEvent.getTimestamp()); + } + } catch (final NakadiException | IOException e) { + throw new InconsistentStateException("Unexpected error happened when getting consumer time lag", e); + } catch (final InvalidCursorException e) { + throw new ErrorGettingCursorTimeLagException(cursor, e); + } + } + } + } From a633aa96ad52051c530efd2449de7bc0446a9232 Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Wed, 23 May 2018 11:51:59 +0200 Subject: [PATCH 20/24] ARUHA-1664: increase max amount of threads per per request; --- .../nakadi/service/subscription/SubscriptionTimeLagService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java index a50b0d5885..bb27da84e7 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java @@ -41,7 +41,7 @@ public class SubscriptionTimeLagService { private static final int EVENT_FETCH_WAIT_TIME_MS = 1000; private static final int SINGLE_PARTITION_TIMEOUT_MS = 5000; private static final int COMPLETE_TIMEOUT_MS = 30000; - private static final int MAX_THREADS_PER_REQUEST = 10; + private static final int MAX_THREADS_PER_REQUEST = 20; private static final int TIME_LAG_COMMON_POOL_SIZE = 400; private final TimelineService timelineService; From 2cec912f45adb774dd8d17d7b70a6690f5436412 Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Wed, 23 May 2018 17:21:46 +0200 Subject: [PATCH 21/24] ARUHA-1664: fixed checkstyle issues; --- .../service/subscription/SubscriptionTimeLagService.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java index bb27da84e7..88e654870b 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java @@ -110,13 +110,13 @@ private static class TimeLagRequestHandler { private final ThreadPoolExecutor threadPool; private final Semaphore semaphore; - public TimeLagRequestHandler(final TimelineService timelineService, final ThreadPoolExecutor threadPool) { + TimeLagRequestHandler(final TimelineService timelineService, final ThreadPoolExecutor threadPool) { this.timelineService = timelineService; this.threadPool = threadPool; this.semaphore = new Semaphore(MAX_THREADS_PER_REQUEST); } - public CompletableFuture getCursorTimeLagFuture(final NakadiCursor cursor) + CompletableFuture getCursorTimeLagFuture(final NakadiCursor cursor) throws InterruptedException, TimeoutException { final CompletableFuture future = new CompletableFuture<>(); @@ -140,7 +140,7 @@ public CompletableFuture getCursorTimeLagFuture(final NakadiCursor cur private Duration getNextEventTimeLag(final NakadiCursor cursor) throws ErrorGettingCursorTimeLagException, InconsistentStateException { - try (final EventConsumer consumer = timelineService.createEventConsumer( + try (EventConsumer consumer = timelineService.createEventConsumer( "time-lag-checker-" + UUID.randomUUID().toString(), ImmutableList.of(cursor))) { final ConsumedEvent nextEvent = executeWithRetry( From 62266e616f1d9e49e3657ceb47711f86720f0224 Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Thu, 24 May 2018 12:04:24 +0200 Subject: [PATCH 22/24] ARUHA-1664: fixed timeouts; fixed exceptions handling; --- .../nakadi/controller/ExceptionHandling.java | 8 ++++++++ .../controller/SubscriptionController.java | 9 +++++++++ .../runtime/LimitReachedException.java | 9 +++++++++ .../runtime/TimeLagStatsTimeoutException.java | 9 +++++++++ .../SubscriptionTimeLagService.java | 18 ++++++++++++------ 5 files changed, 47 insertions(+), 6 deletions(-) create mode 100644 src/main/java/org/zalando/nakadi/exceptions/runtime/LimitReachedException.java create mode 100644 src/main/java/org/zalando/nakadi/exceptions/runtime/TimeLagStatsTimeoutException.java diff --git a/src/main/java/org/zalando/nakadi/controller/ExceptionHandling.java b/src/main/java/org/zalando/nakadi/controller/ExceptionHandling.java index ea2629ec7a..dec7a1bc66 100644 --- a/src/main/java/org/zalando/nakadi/controller/ExceptionHandling.java +++ b/src/main/java/org/zalando/nakadi/controller/ExceptionHandling.java @@ -19,6 +19,7 @@ import org.zalando.nakadi.exceptions.runtime.CursorConversionException; import org.zalando.nakadi.exceptions.runtime.CursorsAreEmptyException; import org.zalando.nakadi.exceptions.runtime.DbWriteOperationsBlockedException; +import org.zalando.nakadi.exceptions.runtime.LimitReachedException; import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1; import org.zalando.nakadi.exceptions.runtime.NoEventTypeException; import org.zalando.nakadi.exceptions.runtime.RepositoryProblemException; @@ -157,6 +158,13 @@ public ResponseEntity handleServiceTemporarilyUnavailableException( return Responses.create(Response.Status.SERVICE_UNAVAILABLE, exception.getMessage(), request); } + @ExceptionHandler(LimitReachedException.class) + public ResponseEntity handleLimitReachedException( + final ServiceTemporarilyUnavailableException exception, final NativeWebRequest request) { + LOG.warn(exception.getMessage()); + return Responses.create(MoreStatus.TOO_MANY_REQUESTS, exception.getMessage(), request); + } + @ExceptionHandler(DbWriteOperationsBlockedException.class) public ResponseEntity handleDbWriteOperationsBlockedException( final DbWriteOperationsBlockedException exception, final NativeWebRequest request) { diff --git a/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java b/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java index 36abbd10a1..c53c0e2320 100644 --- a/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java +++ b/src/main/java/org/zalando/nakadi/controller/SubscriptionController.java @@ -19,6 +19,7 @@ import org.zalando.nakadi.exceptions.runtime.FeatureNotAvailableException; import org.zalando.nakadi.exceptions.runtime.InconsistentStateException; import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException; +import org.zalando.nakadi.exceptions.runtime.TimeLagStatsTimeoutException; import org.zalando.nakadi.service.FeatureToggleService; import org.zalando.nakadi.service.WebResult; import org.zalando.nakadi.service.subscription.SubscriptionService; @@ -27,6 +28,7 @@ import org.zalando.problem.spring.web.advice.Responses; import javax.annotation.Nullable; +import javax.ws.rs.core.Response; import java.util.Set; import static javax.ws.rs.core.Response.Status.NOT_IMPLEMENTED; @@ -137,4 +139,11 @@ public ResponseEntity handleServiceTemporarilyUnavailable(final Service request); } + @ExceptionHandler(TimeLagStatsTimeoutException.class) + public ResponseEntity handleTimeLagStatsTimeoutException(final TimeLagStatsTimeoutException e, + final NativeWebRequest request) { + LOG.warn(e.getMessage()); + return Responses.create(Response.Status.REQUEST_TIMEOUT, e.getMessage(), request); + } + } diff --git a/src/main/java/org/zalando/nakadi/exceptions/runtime/LimitReachedException.java b/src/main/java/org/zalando/nakadi/exceptions/runtime/LimitReachedException.java new file mode 100644 index 0000000000..176632da4a --- /dev/null +++ b/src/main/java/org/zalando/nakadi/exceptions/runtime/LimitReachedException.java @@ -0,0 +1,9 @@ +package org.zalando.nakadi.exceptions.runtime; + +public class LimitReachedException extends MyNakadiRuntimeException1 { + + public LimitReachedException(final String msg, final Throwable cause) { + super(msg, cause); + } + +} diff --git a/src/main/java/org/zalando/nakadi/exceptions/runtime/TimeLagStatsTimeoutException.java b/src/main/java/org/zalando/nakadi/exceptions/runtime/TimeLagStatsTimeoutException.java new file mode 100644 index 0000000000..9eec9d7a5e --- /dev/null +++ b/src/main/java/org/zalando/nakadi/exceptions/runtime/TimeLagStatsTimeoutException.java @@ -0,0 +1,9 @@ +package org.zalando.nakadi.exceptions.runtime; + +public class TimeLagStatsTimeoutException extends MyNakadiRuntimeException1 { + + public TimeLagStatsTimeoutException(final String msg, final Throwable cause) { + super(msg, cause); + } + +} diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java index 88e654870b..f3a76a9bc6 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java @@ -12,7 +12,9 @@ import org.zalando.nakadi.exceptions.InvalidCursorException; import org.zalando.nakadi.exceptions.NakadiException; import org.zalando.nakadi.exceptions.runtime.InconsistentStateException; +import org.zalando.nakadi.exceptions.runtime.LimitReachedException; import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1; +import org.zalando.nakadi.exceptions.runtime.TimeLagStatsTimeoutException; import org.zalando.nakadi.repository.EventConsumer; import org.zalando.nakadi.service.NakadiCursorComparator; import org.zalando.nakadi.service.timeline.TimelineService; @@ -27,6 +29,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -39,8 +42,7 @@ public class SubscriptionTimeLagService { private static final int EVENT_FETCH_WAIT_TIME_MS = 1000; - private static final int SINGLE_PARTITION_TIMEOUT_MS = 5000; - private static final int COMPLETE_TIMEOUT_MS = 30000; + private static final int SINGLE_PARTITION_PROCESSING_TIMEOUT_MS = 5000; private static final int MAX_THREADS_PER_REQUEST = 20; private static final int TIME_LAG_COMMON_POOL_SIZE = 400; @@ -59,7 +61,8 @@ public SubscriptionTimeLagService(final TimelineService timelineService, public Map getTimeLags(final Collection committedPositions, final List endPositions) - throws ErrorGettingCursorTimeLagException, InconsistentStateException { + throws ErrorGettingCursorTimeLagException, InconsistentStateException, LimitReachedException, + TimeLagStatsTimeoutException { final TimeLagRequestHandler timeLagHandler = new TimeLagRequestHandler(timelineService, threadPool); final Map timeLags = new HashMap<>(); @@ -75,13 +78,16 @@ public Map getTimeLags(final Collection getCursorTimeLagFuture(final NakadiCursor cursor) throws InterruptedException, TimeoutException { final CompletableFuture future = new CompletableFuture<>(); - if (semaphore.tryAcquire(SINGLE_PARTITION_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { + if (semaphore.tryAcquire(SINGLE_PARTITION_PROCESSING_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { threadPool.submit(() -> { try { final Duration timeLag = getNextEventTimeLag(cursor); From 78aa0da2899ac0b0e442ecfc9b1c57a22db8e580 Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Thu, 24 May 2018 14:51:27 +0200 Subject: [PATCH 23/24] ARUHA-1664: fixed timeout; --- .../subscription/SubscriptionTimeLagService.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java index f3a76a9bc6..d309a94ec8 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionTimeLagService.java @@ -42,7 +42,7 @@ public class SubscriptionTimeLagService { private static final int EVENT_FETCH_WAIT_TIME_MS = 1000; - private static final int SINGLE_PARTITION_PROCESSING_TIMEOUT_MS = 5000; + private static final int REQUEST_TIMEOUT_MS = 30000; private static final int MAX_THREADS_PER_REQUEST = 20; private static final int TIME_LAG_COMMON_POOL_SIZE = 400; @@ -78,7 +78,7 @@ public Map getTimeLags(final Collection getCursorTimeLagFuture(final NakadiCursor cursor) throws InterruptedException, TimeoutException { final CompletableFuture future = new CompletableFuture<>(); - if (semaphore.tryAcquire(SINGLE_PARTITION_PROCESSING_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { + if (semaphore.tryAcquire(getRemainingTimeoutMs(), TimeUnit.MILLISECONDS)) { threadPool.submit(() -> { try { final Duration timeLag = getNextEventTimeLag(cursor); @@ -143,6 +145,14 @@ CompletableFuture getCursorTimeLagFuture(final NakadiCursor cursor) return future; } + long getRemainingTimeoutMs() { + if (timeoutTimestampMs > System.currentTimeMillis()) { + return timeoutTimestampMs - System.currentTimeMillis(); + } else { + return 0; + } + } + private Duration getNextEventTimeLag(final NakadiCursor cursor) throws ErrorGettingCursorTimeLagException, InconsistentStateException { From dbd04d9e2d9f4eaf79f2f7762f282eec456941e1 Mon Sep 17 00:00:00 2001 From: v-stepanov Date: Fri, 25 May 2018 08:59:58 +0200 Subject: [PATCH 24/24] ARUHA-1664: added release version; --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cc2170063d..9e3df0cab3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +## [2.7.0] - 2018-05-25 + ### Added - Extended subscription statistics endpoint with time-lag information