From 8ea76cd9dd69fede5e55d4fb8b8a40da3d9c3129 Mon Sep 17 00:00:00 2001 From: antban Date: Wed, 31 May 2017 10:11:45 +0200 Subject: [PATCH] ARUHA-834 Fixes after review --- .../zk/NewZkSubscriptionClient.java | 32 ++++++++----------- .../zk/OldZkSubscriptionClient.java | 2 +- .../subscription/zk/ZkSubscriptionClient.java | 6 ++-- 3 files changed, 18 insertions(+), 22 deletions(-) diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java index d55072f518..23d833a05a 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java @@ -22,7 +22,7 @@ * Subscription client that uses topology as an object node and separates changeable data to a separate zk node. * The structure of zk data is like this: *
- *- nakadi
+ * - nakadi
  * +- locks
  * | |- subscription_{subscription_id}      // Node that is being used to guarantee consistency of subscription data
  * |
@@ -142,10 +142,10 @@ protected byte[] createTopologyAndOffsets(final Collection partitions) {
+    public void transfer(final String sessionId, final Collection partitions)
+            throws NakadiRuntimeException, OldSubscriptionFormatException, SubscriptionNotInitializedException {
         getLog().info("session " + sessionId + " releases partitions " + partitions);
-        final Topology topology;
-        try {
-            topology = readTopology();
-        } catch (final Exception e) {
-            throw new NakadiRuntimeException(e);
-        }
+        final Topology topology = readTopology();
+
         final List changeSet = new ArrayList<>();
         for (final EventTypePartition etp : partitions) {
             final Partition candidate = Stream.of(topology.getPartitions())
diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/OldZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/OldZkSubscriptionClient.java
index 19e4f484b2..38f9e2f621 100644
--- a/src/main/java/org/zalando/nakadi/service/subscription/zk/OldZkSubscriptionClient.java
+++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/OldZkSubscriptionClient.java
@@ -126,7 +126,7 @@ private void incrementTopology() {
     }
 
     @Override
-    public Partition[] listPartitions() {
+    public Partition[] listPartitions() throws NakadiRuntimeException {
         getLog().info("fetching partitions information");
 
         final List zkPartitions;
diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java
index 369adb399d..40aef85bf4 100644
--- a/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java
+++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/ZkSubscriptionClient.java
@@ -49,7 +49,8 @@ public interface ZkSubscriptionClient {
     /**
      * Updates specified partitions in zk.
      */
-    void updatePartitionsConfiguration(Partition[] partitions) throws NakadiRuntimeException;
+    void updatePartitionsConfiguration(Partition[] partitions) throws NakadiRuntimeException,
+            SubscriptionNotInitializedException, OldSubscriptionFormatException;
 
     /**
      * Returns session list in zk related to this subscription.
@@ -113,7 +114,8 @@ List commitOffsets(List cursors,
      * @param sessionId  Someone who actually tries to transfer data.
      * @param partitions topic ids and partition ids of transferred data.
      */
-    void transfer(String sessionId, Collection partitions);
+    void transfer(String sessionId, Collection partitions)
+            throws NakadiRuntimeException, OldSubscriptionFormatException, SubscriptionNotInitializedException;
 
     /**
      * Retrieves subscription data like partitions and sessions from ZK under lock.