diff --git a/src/main/java/org/zalando/nakadi/service/CursorsService.java b/src/main/java/org/zalando/nakadi/service/CursorsService.java index 8945741205..05302e07ad 100644 --- a/src/main/java/org/zalando/nakadi/service/CursorsService.java +++ b/src/main/java/org/zalando/nakadi/service/CursorsService.java @@ -1,7 +1,6 @@ package org.zalando.nakadi.service; import com.google.common.collect.ImmutableList; -import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -33,6 +32,8 @@ import org.zalando.nakadi.repository.db.SubscriptionDbRepository; import org.zalando.nakadi.security.Client; import org.zalando.nakadi.service.subscription.model.Partition; +import org.zalando.nakadi.service.subscription.state.StartingState; +import org.zalando.nakadi.service.subscription.zk.OldSubscriptionFormatException; import org.zalando.nakadi.service.subscription.zk.SubscriptionClientFactory; import org.zalando.nakadi.service.subscription.zk.SubscriptionNotInitializedException; import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; @@ -131,12 +132,18 @@ public List getSubscriptionCursors(final String subscription, "subscription." + subscriptionId + ".get_cursors"); final ImmutableList.Builder cursorsListBuilder = ImmutableList.builder(); + Partition[] partitions; try { - for (final Partition p : zkSubscriptionClient.listPartitions()) { - cursorsListBuilder.add(zkSubscriptionClient.getOffset(p.getKey())); - } - } catch (SubscriptionNotInitializedException ex) { - return Collections.emptyList(); + partitions = zkSubscriptionClient.listPartitions(); + } catch (final OldSubscriptionFormatException ex) { + zkSubscriptionClient.runLocked(() -> StartingState.initializeSubscriptionStructure( + subscription, timelineService, cursorConverter, zkSubscriptionClient)); + partitions = zkSubscriptionClient.listPartitions(); + } catch (final SubscriptionNotInitializedException ex) { + partitions = new Partition[]{}; + } + for (final Partition p : partitions) { + cursorsListBuilder.add(zkSubscriptionClient.getOffset(p.getKey())); } return cursorsListBuilder.build(); } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java index df160898e5..e18ec004c2 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java @@ -52,8 +52,8 @@ import org.zalando.nakadi.service.CursorOperationsService; import org.zalando.nakadi.service.Result; import org.zalando.nakadi.service.subscription.state.StartingState; +import org.zalando.nakadi.service.subscription.zk.OldSubscriptionFormatException; import org.zalando.nakadi.service.subscription.zk.SubscriptionClientFactory; -import org.zalando.nakadi.service.subscription.zk.SubscriptionNotInitializedException; import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionNode; import org.zalando.nakadi.service.timeline.TimelineService; @@ -225,7 +225,7 @@ private ZkSubscriptionNode getZkSubscriptionNode( final Subscription subscription, final ZkSubscriptionClient subscriptionClient) { try { return subscriptionClient.getZkSubscriptionNodeLocked(); - } catch (SubscriptionNotInitializedException ex) { + } catch (OldSubscriptionFormatException ex) { subscriptionClient.runLocked(() -> StartingState.initializeSubscriptionStructure( subscription, timelineService, converter, subscriptionClient)); return subscriptionClient.getZkSubscriptionNodeLocked(); diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java index e8b07d9b09..239dac2474 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java @@ -496,7 +496,7 @@ void offsetChanged(final EventTypePartition key) { reconfigureKafkaConsumer(true); } - if (commitResult.committedCount > 0) { + if (commitResult.committedCount > 0) { committedEvents += commitResult.committedCount; this.lastCommitMillis = System.currentTimeMillis(); streamToOutput(); diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java index 00669d9ffe..6d66374937 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java @@ -29,6 +29,7 @@ import org.zalando.nakadi.exceptions.NakadiRuntimeException; import org.zalando.nakadi.exceptions.ServiceUnavailableException; import org.zalando.nakadi.exceptions.UnableProcessException; +import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1; import org.zalando.nakadi.exceptions.runtime.OperationInterruptedException; import org.zalando.nakadi.exceptions.runtime.OperationTimeoutException; import org.zalando.nakadi.exceptions.runtime.RequestInProgressException; @@ -102,7 +103,7 @@ public final void runLocked(final Runnable function) { if (releaseException != null) { throw releaseException; } - } catch (final NakadiRuntimeException e) { + } catch (final NakadiRuntimeException | MyNakadiRuntimeException1 e) { throw e; } catch (final Exception e) { throw new NakadiRuntimeException(e); @@ -207,15 +208,22 @@ public final void unregisterSession(final Session session) { public final Session[] listSessions() { getLog().info("fetching sessions information"); final List sessions = new ArrayList<>(); + final List zkSessions; try { - for (final String sessionId : getCurator().getChildren().forPath(getSubscriptionPath("/sessions"))) { + zkSessions = getCurator().getChildren().forPath(getSubscriptionPath("/sessions")); + } catch (final KeeperException.NoNodeException e) { + throw new SubscriptionNotInitializedException(getSubscriptionId()); + } catch (Exception ex) { + throw new NakadiRuntimeException(ex); + } + + try { + for (final String sessionId : zkSessions) { final int weight = Integer.parseInt(new String(getCurator().getData() .forPath(getSubscriptionPath("/sessions/" + sessionId)), UTF_8)); sessions.add(new Session(sessionId, weight)); } return sessions.toArray(new Session[sessions.size()]); - } catch (final KeeperException.NoNodeException e) { - throw new SubscriptionNotInitializedException(getSubscriptionId()); } catch (final Exception e) { throw new NakadiRuntimeException(e); } @@ -350,7 +358,8 @@ public final ZKSubscription subscribeForTopologyChanges(final Runnable onTopolog } @Override - public final ZkSubscriptionNode getZkSubscriptionNodeLocked() throws SubscriptionNotInitializedException { + public final ZkSubscriptionNode getZkSubscriptionNodeLocked() throws SubscriptionNotInitializedException, + OldSubscriptionFormatException { final ZkSubscriptionNode subscriptionNode = new ZkSubscriptionNode(); try { if (null == getCurator().checkExists().forPath(getSubscriptionPath(""))) { @@ -366,11 +375,8 @@ public final ZkSubscriptionNode getZkSubscriptionNodeLocked() throws Subscriptio subscriptionNode.setPartitions(listPartitions()); subscriptionNode.setSessions(listSessions()); }); - } catch (final NakadiRuntimeException nre) { - final Exception cause = nre.getException(); - if (!(cause instanceof KeeperException.NoNodeException)) { - throw new NakadiRuntimeException(cause); - } + } catch (final NakadiRuntimeException ex) { + // this line intentionally left to have the same behavior as it was before getLog().info("No data about provided subscription {} in ZK", getSubscriptionPath("")); } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java index b537976b5a..23d833a05a 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java @@ -22,7 +22,7 @@ * Subscription client that uses topology as an object node and separates changeable data to a separate zk node. * The structure of zk data is like this: *
- *- nakadi
+ * - nakadi
  * +- locks
  * | |- subscription_{subscription_id}      // Node that is being used to guarantee consistency of subscription data
  * |
@@ -142,10 +142,10 @@ protected byte[] createTopologyAndOffsets(final Collection partitions) {
+    public void transfer(final String sessionId, final Collection partitions)
+            throws NakadiRuntimeException, OldSubscriptionFormatException, SubscriptionNotInitializedException {
         getLog().info("session " + sessionId + " releases partitions " + partitions);
-        final Topology topology;
-        try {
-            topology = readTopology();
-        } catch (final Exception e) {
-            throw new NakadiRuntimeException(e);
-        }
+        final Topology topology = readTopology();
+
         final List changeSet = new ArrayList<>();
         for (final EventTypePartition etp : partitions) {
             final Partition candidate = Stream.of(topology.getPartitions())
diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/OldSubscriptionFormatException.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/OldSubscriptionFormatException.java
new file mode 100644
index 0000000000..2a9d5f4782
--- /dev/null
+++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/OldSubscriptionFormatException.java
@@ -0,0 +1,6 @@
+package org.zalando.nakadi.service.subscription.zk;
+
+import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1;
+
+public class OldSubscriptionFormatException extends MyNakadiRuntimeException1 {
+}
diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/OldZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/OldZkSubscriptionClient.java
index d3fad539f7..38f9e2f621 100644
--- a/src/main/java/org/zalando/nakadi/service/subscription/zk/OldZkSubscriptionClient.java
+++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/OldZkSubscriptionClient.java
@@ -126,11 +126,20 @@ private void incrementTopology() {
     }
 
     @Override
-    public Partition[] listPartitions() {
+    public Partition[] listPartitions() throws NakadiRuntimeException {
         getLog().info("fetching partitions information");
+
+        final List zkPartitions;
+        try {
+            zkPartitions = getCurator().getChildren().forPath(getSubscriptionPath("/topics"));
+        } catch (KeeperException.NoNodeException ex) {
+            throw new SubscriptionNotInitializedException(getSubscriptionId());
+        } catch (final Exception e) {
+            throw new NakadiRuntimeException(e);
+        }
         try {
             final List partitions = new ArrayList<>();
-            for (final String topic : getCurator().getChildren().forPath(getSubscriptionPath("/topics"))) {
+            for (final String topic : zkPartitions) {
                 for (final String partition : getCurator().getChildren().forPath(getSubscriptionPath("/topics/"
                         + topic))) {
                     partitions.add(deserializeNode(
@@ -141,8 +150,6 @@ public Partition[] listPartitions() {
                 }
             }
             return partitions.toArray(new Partition[partitions.size()]);
-        } catch (final KeeperException.NoNodeException e) {
-            throw new SubscriptionNotInitializedException(getSubscriptionId());
         } catch (final Exception e) {
             throw new NakadiRuntimeException(e);
         }
diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java
index 369adb399d..40aef85bf4 100644
--- a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java
+++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java
@@ -49,7 +49,8 @@ public interface ZkSubscriptionClient {
     /**
      * Updates specified partitions in zk.
      */
-    void updatePartitionsConfiguration(Partition[] partitions) throws NakadiRuntimeException;
+    void updatePartitionsConfiguration(Partition[] partitions) throws NakadiRuntimeException,
+            SubscriptionNotInitializedException, OldSubscriptionFormatException;
 
     /**
      * Returns session list in zk related to this subscription.
@@ -113,7 +114,8 @@ List commitOffsets(List cursors,
      * @param sessionId  Someone who actually tries to transfer data.
      * @param partitions topic ids and partition ids of transferred data.
      */
-    void transfer(String sessionId, Collection partitions);
+    void transfer(String sessionId, Collection partitions)
+            throws NakadiRuntimeException, OldSubscriptionFormatException, SubscriptionNotInitializedException;
 
     /**
      * Retrieves subscription data like partitions and sessions from ZK under lock.
diff --git a/src/main/java/org/zalando/nakadi/view/SubscriptionCursorWithoutToken.java b/src/main/java/org/zalando/nakadi/view/SubscriptionCursorWithoutToken.java
index e073996ba9..e9d2626edf 100644
--- a/src/main/java/org/zalando/nakadi/view/SubscriptionCursorWithoutToken.java
+++ b/src/main/java/org/zalando/nakadi/view/SubscriptionCursorWithoutToken.java
@@ -1,5 +1,6 @@
 package org.zalando.nakadi.view;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import javax.annotation.concurrent.Immutable;
 import javax.validation.constraints.NotNull;
@@ -22,6 +23,7 @@ public String getEventType() {
         return eventType;
     }
 
+    @JsonIgnore
     public EventTypePartition getEventTypePartition() {
         return new EventTypePartition(eventType, getPartition());
     }