diff --git a/CHANGELOG.md b/CHANGELOG.md index 2398be1b29..c76b216502 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +## [2.2.9] - 2017-11-14 + +### Fixed +- Fixed displaying of streamId for /stats endpoint + ## [2.2.8] - 2017-11-01 ### Fixed diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java index 7f71e8c02a..13abb08dc9 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java @@ -264,12 +264,14 @@ private List loadStats( } }) .orElse(null); + + final String state = subscriptionNode.guessState(stat.getTimeline().getEventType(), stat.getPartition()) + .getDescription(); + final String streamId = Optional.ofNullable(subscriptionNode.guessStream( + stat.getTimeline().getEventType(), stat.getPartition())).orElse(""); + resultPartitions.add(new SubscriptionEventTypeStats.Partition( - lastPosition.getPartition(), - subscriptionNode.guessState(stat.getPartition()).getDescription(), - distance, - Optional.ofNullable(subscriptionNode.guessStream(stat.getPartition())).orElse("") - )); + lastPosition.getPartition(), state, distance, streamId)); } resultPartitions.sort(Comparator.comparing(SubscriptionEventTypeStats.Partition::getPartition)); result.add(new SubscriptionEventTypeStats(eventType.getName(), resultPartitions)); diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionNode.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionNode.java index 327f04d16b..9f8f092ebd 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionNode.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionNode.java @@ -1,6 +1,5 @@ package org.zalando.nakadi.service.subscription.zk; -import org.zalando.nakadi.domain.EventTypePartition; import org.zalando.nakadi.service.subscription.model.Partition; import org.zalando.nakadi.service.subscription.model.Session; @@ -39,23 +38,24 @@ public Session[] getSessions() { return sessions; } - public Partition.State guessState(final String partition) { - return getPartitionWithActiveSession(partition).map(Partition::getState).orElse(Partition.State.UNASSIGNED); + public Partition.State guessState(final String eventType, final String partition) { + return getPartitionWithActiveSession(eventType, partition) + .map(Partition::getState) + .orElse(Partition.State.UNASSIGNED); } - private Optional getPartitionWithActiveSession(final String partition) { + private Optional getPartitionWithActiveSession(final String eventType, final String partition) { return Stream.of(partitions) - .filter(p -> p.getPartition().equals(partition)) + .filter(p -> p.getPartition().equals(partition) && p.getEventType().equals(eventType)) .filter(p -> Stream.of(sessions).anyMatch(s -> s.getId().equalsIgnoreCase(p.getSession()))) .findAny(); } @Nullable - public String guessStream(final String partition) { - return getPartitionWithActiveSession(partition).map(Partition::getSession).orElse(null); + public String guessStream(final String eventType, final String partition) { + return getPartitionWithActiveSession(eventType, partition) + .map(Partition::getSession) + .orElse(null); } - public boolean containsPartition(final EventTypePartition eventTypePartition) { - return Stream.of(getPartitions()).anyMatch(p -> p.getKey().equals(eventTypePartition)); - } } diff --git a/src/test/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionNodeTest.java b/src/test/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionNodeTest.java new file mode 100644 index 0000000000..aa66aef18f --- /dev/null +++ b/src/test/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionNodeTest.java @@ -0,0 +1,50 @@ +package org.zalando.nakadi.service.subscription.zk; + +import com.google.common.collect.ImmutableList; +import org.junit.Before; +import org.junit.Test; +import org.zalando.nakadi.service.subscription.model.Partition; +import org.zalando.nakadi.service.subscription.model.Session; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +public class ZkSubscriptionNodeTest { + + private ZkSubscriptionNode zkSubscriptionNode; + + @Before + public void before() { + final Partition[] partitions = ImmutableList.of( + new Partition("et1", "0", "stream1", null, Partition.State.ASSIGNED), + new Partition("et1", "1", "stream2", "stream4", Partition.State.REASSIGNING), + new Partition("et2", "0", "stream3", null, Partition.State.UNASSIGNED), + new Partition("et2", "1", null, null, null) + ).toArray(new Partition[4]); + + final Session[] sessions = ImmutableList.of( + new Session("stream1", 1), + new Session("stream2", 1), + new Session("stream3", 1), + new Session("stream4", 1) + ).toArray(new Session[4]); + + zkSubscriptionNode = new ZkSubscriptionNode(partitions, sessions); + } + + @Test + public void whenGuessStreamThenOk() { + assertThat(zkSubscriptionNode.guessStream("et1", "0"), equalTo("stream1")); + assertThat(zkSubscriptionNode.guessStream("et1", "1"), equalTo("stream2")); + assertThat(zkSubscriptionNode.guessStream("et2", "0"), equalTo("stream3")); + assertThat(zkSubscriptionNode.guessStream("et2", "1"), equalTo(null)); + } + + @Test + public void whenGuessStateThenOk() { + assertThat(zkSubscriptionNode.guessState("et1", "0"), equalTo(Partition.State.ASSIGNED)); + assertThat(zkSubscriptionNode.guessState("et1", "1"), equalTo(Partition.State.REASSIGNING)); + assertThat(zkSubscriptionNode.guessState("et2", "0"), equalTo(Partition.State.UNASSIGNED)); + assertThat(zkSubscriptionNode.guessState("et2", "1"), equalTo(Partition.State.UNASSIGNED)); + } +}