diff --git a/src/main/java/org/zalando/nakadi/service/CursorsService.java b/src/main/java/org/zalando/nakadi/service/CursorsService.java index 8945741205..05302e07ad 100644 --- a/src/main/java/org/zalando/nakadi/service/CursorsService.java +++ b/src/main/java/org/zalando/nakadi/service/CursorsService.java @@ -1,7 +1,6 @@ package org.zalando.nakadi.service; import com.google.common.collect.ImmutableList; -import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -33,6 +32,8 @@ import org.zalando.nakadi.repository.db.SubscriptionDbRepository; import org.zalando.nakadi.security.Client; import org.zalando.nakadi.service.subscription.model.Partition; +import org.zalando.nakadi.service.subscription.state.StartingState; +import org.zalando.nakadi.service.subscription.zk.OldSubscriptionFormatException; import org.zalando.nakadi.service.subscription.zk.SubscriptionClientFactory; import org.zalando.nakadi.service.subscription.zk.SubscriptionNotInitializedException; import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; @@ -131,12 +132,18 @@ public List getSubscriptionCursors(final String subscription, "subscription." + subscriptionId + ".get_cursors"); final ImmutableList.Builder cursorsListBuilder = ImmutableList.builder(); + Partition[] partitions; try { - for (final Partition p : zkSubscriptionClient.listPartitions()) { - cursorsListBuilder.add(zkSubscriptionClient.getOffset(p.getKey())); - } - } catch (SubscriptionNotInitializedException ex) { - return Collections.emptyList(); + partitions = zkSubscriptionClient.listPartitions(); + } catch (final OldSubscriptionFormatException ex) { + zkSubscriptionClient.runLocked(() -> StartingState.initializeSubscriptionStructure( + subscription, timelineService, cursorConverter, zkSubscriptionClient)); + partitions = zkSubscriptionClient.listPartitions(); + } catch (final SubscriptionNotInitializedException ex) { + partitions = new Partition[]{}; + } + for (final Partition p : partitions) { + cursorsListBuilder.add(zkSubscriptionClient.getOffset(p.getKey())); } return cursorsListBuilder.build(); } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java index df160898e5..e18ec004c2 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/SubscriptionService.java @@ -52,8 +52,8 @@ import org.zalando.nakadi.service.CursorOperationsService; import org.zalando.nakadi.service.Result; import org.zalando.nakadi.service.subscription.state.StartingState; +import org.zalando.nakadi.service.subscription.zk.OldSubscriptionFormatException; import org.zalando.nakadi.service.subscription.zk.SubscriptionClientFactory; -import org.zalando.nakadi.service.subscription.zk.SubscriptionNotInitializedException; import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient; import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionNode; import org.zalando.nakadi.service.timeline.TimelineService; @@ -225,7 +225,7 @@ private ZkSubscriptionNode getZkSubscriptionNode( final Subscription subscription, final ZkSubscriptionClient subscriptionClient) { try { return subscriptionClient.getZkSubscriptionNodeLocked(); - } catch (SubscriptionNotInitializedException ex) { + } catch (OldSubscriptionFormatException ex) { subscriptionClient.runLocked(() -> StartingState.initializeSubscriptionStructure( subscription, timelineService, converter, subscriptionClient)); return subscriptionClient.getZkSubscriptionNodeLocked(); diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java index 00669d9ffe..6d66374937 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java @@ -29,6 +29,7 @@ import org.zalando.nakadi.exceptions.NakadiRuntimeException; import org.zalando.nakadi.exceptions.ServiceUnavailableException; import org.zalando.nakadi.exceptions.UnableProcessException; +import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1; import org.zalando.nakadi.exceptions.runtime.OperationInterruptedException; import org.zalando.nakadi.exceptions.runtime.OperationTimeoutException; import org.zalando.nakadi.exceptions.runtime.RequestInProgressException; @@ -102,7 +103,7 @@ public final void runLocked(final Runnable function) { if (releaseException != null) { throw releaseException; } - } catch (final NakadiRuntimeException e) { + } catch (final NakadiRuntimeException | MyNakadiRuntimeException1 e) { throw e; } catch (final Exception e) { throw new NakadiRuntimeException(e); @@ -207,15 +208,22 @@ public final void unregisterSession(final Session session) { public final Session[] listSessions() { getLog().info("fetching sessions information"); final List sessions = new ArrayList<>(); + final List zkSessions; try { - for (final String sessionId : getCurator().getChildren().forPath(getSubscriptionPath("/sessions"))) { + zkSessions = getCurator().getChildren().forPath(getSubscriptionPath("/sessions")); + } catch (final KeeperException.NoNodeException e) { + throw new SubscriptionNotInitializedException(getSubscriptionId()); + } catch (Exception ex) { + throw new NakadiRuntimeException(ex); + } + + try { + for (final String sessionId : zkSessions) { final int weight = Integer.parseInt(new String(getCurator().getData() .forPath(getSubscriptionPath("/sessions/" + sessionId)), UTF_8)); sessions.add(new Session(sessionId, weight)); } return sessions.toArray(new Session[sessions.size()]); - } catch (final KeeperException.NoNodeException e) { - throw new SubscriptionNotInitializedException(getSubscriptionId()); } catch (final Exception e) { throw new NakadiRuntimeException(e); } @@ -350,7 +358,8 @@ public final ZKSubscription subscribeForTopologyChanges(final Runnable onTopolog } @Override - public final ZkSubscriptionNode getZkSubscriptionNodeLocked() throws SubscriptionNotInitializedException { + public final ZkSubscriptionNode getZkSubscriptionNodeLocked() throws SubscriptionNotInitializedException, + OldSubscriptionFormatException { final ZkSubscriptionNode subscriptionNode = new ZkSubscriptionNode(); try { if (null == getCurator().checkExists().forPath(getSubscriptionPath(""))) { @@ -366,11 +375,8 @@ public final ZkSubscriptionNode getZkSubscriptionNodeLocked() throws Subscriptio subscriptionNode.setPartitions(listPartitions()); subscriptionNode.setSessions(listSessions()); }); - } catch (final NakadiRuntimeException nre) { - final Exception cause = nre.getException(); - if (!(cause instanceof KeeperException.NoNodeException)) { - throw new NakadiRuntimeException(cause); - } + } catch (final NakadiRuntimeException ex) { + // this line intentionally left to have the same behavior as it was before getLog().info("No data about provided subscription {} in ZK", getSubscriptionPath("")); } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java index b537976b5a..d55072f518 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java @@ -155,18 +155,25 @@ public void updatePartitionsConfiguration(final Partition[] partitions) throws N } private Topology readTopology() throws Exception { - final byte[] data = getCurator().getData().forPath(getSubscriptionPath(NODE_TOPOLOGY)); - final Topology result = objectMapper.readValue(data, Topology.class); - getLog().info("Topology is {}", result); - return result; + try { + final byte[] data = getCurator().getData().forPath(getSubscriptionPath(NODE_TOPOLOGY)); + final Topology result = objectMapper.readValue(data, Topology.class); + getLog().info("Topology is {}", result); + return result; + } catch (final IOException ex) { + throw new OldSubscriptionFormatException(); + } catch (KeeperException.NoNodeException ex) { + throw new SubscriptionNotInitializedException(getSubscriptionId()); + } } @Override - public Partition[] listPartitions() throws NakadiRuntimeException, SubscriptionNotInitializedException { + public Partition[] listPartitions() throws NakadiRuntimeException, SubscriptionNotInitializedException, + OldSubscriptionFormatException { try { return readTopology().getPartitions(); - } catch (final KeeperException.NoNodeException e) { - throw new SubscriptionNotInitializedException(getSubscriptionId()); + } catch (final SubscriptionNotInitializedException | OldSubscriptionFormatException e) { + throw e; } catch (final Exception e) { throw new NakadiRuntimeException(e); } diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/OldSubscriptionFormatException.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/OldSubscriptionFormatException.java new file mode 100644 index 0000000000..2a9d5f4782 --- /dev/null +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/OldSubscriptionFormatException.java @@ -0,0 +1,6 @@ +package org.zalando.nakadi.service.subscription.zk; + +import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1; + +public class OldSubscriptionFormatException extends MyNakadiRuntimeException1 { +} diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/OldZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/OldZkSubscriptionClient.java index d3fad539f7..19e4f484b2 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/OldZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/OldZkSubscriptionClient.java @@ -128,9 +128,18 @@ private void incrementTopology() { @Override public Partition[] listPartitions() { getLog().info("fetching partitions information"); + + final List zkPartitions; + try { + zkPartitions = getCurator().getChildren().forPath(getSubscriptionPath("/topics")); + } catch (KeeperException.NoNodeException ex) { + throw new SubscriptionNotInitializedException(getSubscriptionId()); + } catch (final Exception e) { + throw new NakadiRuntimeException(e); + } try { final List partitions = new ArrayList<>(); - for (final String topic : getCurator().getChildren().forPath(getSubscriptionPath("/topics"))) { + for (final String topic : zkPartitions) { for (final String partition : getCurator().getChildren().forPath(getSubscriptionPath("/topics/" + topic))) { partitions.add(deserializeNode( @@ -141,8 +150,6 @@ public Partition[] listPartitions() { } } return partitions.toArray(new Partition[partitions.size()]); - } catch (final KeeperException.NoNodeException e) { - throw new SubscriptionNotInitializedException(getSubscriptionId()); } catch (final Exception e) { throw new NakadiRuntimeException(e); }