diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java index 6d79fbd9e5..7c72297a6e 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java @@ -238,32 +238,22 @@ protected Map loadDataAsync(final Collection keys, public final Collection listSessions() throws SubscriptionNotInitializedException, NakadiRuntimeException, ServiceTemporarilyUnavailableException { getLog().info("fetching sessions information"); - final int numberOfCalls = 3; - for (int i = 0; i < numberOfCalls; i++) { - final List zkSessions; - final Map result; + for (int i = 0; i < 5; i++) { try { - zkSessions = getCurator().getChildren().forPath(getSubscriptionPath("/sessions")); + final List sessions = getCurator().getChildren().forPath(getSubscriptionPath("/sessions")); + final Map result = loadDataAsync(sessions, + key -> getSubscriptionPath("/sessions/" + key), + this::deserializeSession); + if (result.size() == sessions.size()) { + return result.values(); + } } catch (final KeeperException.NoNodeException e) { throw new SubscriptionNotInitializedException(getSubscriptionId()); } catch (Exception ex) { throw new NakadiRuntimeException(ex); } - result = loadDataAsync(zkSessions, key -> getSubscriptionPath("/sessions/" + key), - this::deserializeSession); - if (result.size() == zkSessions.size()) { - return result.values(); - } - if (i == numberOfCalls-1) { - throw new ServiceTemporarilyUnavailableException("Failed to get all keys " + - zkSessions.stream() - .filter(v -> !result.containsKey(v)) - .map(String::valueOf) - .collect(Collectors.joining(", ")) - + " from ZK", null); - } } - return null; + throw new ServiceTemporarilyUnavailableException("Failed to get all keys from ZK", null); } @Override