diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java index dcb559b451..7c72297a6e 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/AbstractZkSubscriptionClient.java @@ -14,11 +14,11 @@ import org.zalando.nakadi.domain.EventTypePartition; import org.zalando.nakadi.exceptions.runtime.NakadiBaseException; import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException; -import org.zalando.nakadi.exceptions.runtime.UnableProcessException; import org.zalando.nakadi.exceptions.runtime.OperationInterruptedException; import org.zalando.nakadi.exceptions.runtime.OperationTimeoutException; import org.zalando.nakadi.exceptions.runtime.RequestInProgressException; import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException; +import org.zalando.nakadi.exceptions.runtime.UnableProcessException; import org.zalando.nakadi.exceptions.runtime.ZookeeperException; import org.zalando.nakadi.service.subscription.model.Session; import org.zalando.nakadi.view.SubscriptionCursorWithoutToken; @@ -46,17 +46,17 @@ import static org.echocat.jomon.runtime.concurrent.Retryer.executeWithRetry; public abstract class AbstractZkSubscriptionClient implements ZkSubscriptionClient { + public static final int SECONDS_TO_WAIT_FOR_LOCK = 15; + protected static final String NODE_TOPOLOGY = "/topology"; private static final String STATE_INITIALIZED = "INITIALIZED"; private static final int COMMIT_CONFLICT_RETRY_TIMES = 5; - protected static final String NODE_TOPOLOGY = "/topology"; - public static final int SECONDS_TO_WAIT_FOR_LOCK = 15; private static final int MAX_ZK_RESPONSE_SECONDS = 5; private final String subscriptionId; private final CuratorFramework curatorFramework; - private InterProcessSemaphoreMutex lock; private final String resetCursorPath; private final Logger log; + private InterProcessSemaphoreMutex lock; public AbstractZkSubscriptionClient( final String subscriptionId, @@ -206,6 +206,8 @@ protected Map loadDataAsync(final Collection keys, synchronized (result) { result.put(key, value); } + } else if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) { + getLog().warn("Unable to get {} data from zk. Node not found ", zkKey); } else { getLog().error( "Failed to get {} data from zk. status code: {}", @@ -229,14 +231,6 @@ protected Map loadDataAsync(final Collection keys, Thread.currentThread().interrupt(); throw new ServiceTemporarilyUnavailableException("Failed to wait for zk response", ex); } - if (result.size() != keys.size()) { - throw new ServiceTemporarilyUnavailableException("Failed to wait for keys " + - keys.stream() - .filter(v -> !result.containsKey(v)) - .map(String::valueOf) - .collect(Collectors.joining(", ")) - + " to be in response", null); - } return result; } @@ -244,20 +238,22 @@ protected Map loadDataAsync(final Collection keys, public final Collection listSessions() throws SubscriptionNotInitializedException, NakadiRuntimeException, ServiceTemporarilyUnavailableException { getLog().info("fetching sessions information"); - final List zkSessions; - try { - zkSessions = getCurator().getChildren().forPath(getSubscriptionPath("/sessions")); - } catch (final KeeperException.NoNodeException e) { - throw new SubscriptionNotInitializedException(getSubscriptionId()); - } catch (Exception ex) { - throw new NakadiRuntimeException(ex); + for (int i = 0; i < 5; i++) { + try { + final List sessions = getCurator().getChildren().forPath(getSubscriptionPath("/sessions")); + final Map result = loadDataAsync(sessions, + key -> getSubscriptionPath("/sessions/" + key), + this::deserializeSession); + if (result.size() == sessions.size()) { + return result.values(); + } + } catch (final KeeperException.NoNodeException e) { + throw new SubscriptionNotInitializedException(getSubscriptionId()); + } catch (Exception ex) { + throw new NakadiRuntimeException(ex); + } } - - return loadDataAsync( - zkSessions, - key -> getSubscriptionPath("/sessions/" + key), - this::deserializeSession - ).values(); + throw new ServiceTemporarilyUnavailableException("Failed to get all keys from ZK", null); } @Override diff --git a/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java b/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java index 27d430889f..0b82c9da82 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/zk/NewZkSubscriptionClient.java @@ -17,6 +17,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import java.util.stream.Stream; import static com.google.common.base.Charsets.UTF_8; @@ -170,8 +171,21 @@ protected String getOffsetPath(final EventTypePartition etp) { public Map getOffsets( final Collection keys) throws NakadiRuntimeException, ServiceTemporarilyUnavailableException { - return loadDataAsync(keys, this::getOffsetPath, (etp, value) -> - new SubscriptionCursorWithoutToken(etp.getEventType(), etp.getPartition(), new String(value, UTF_8))); + final Map offSets = loadDataAsync(keys, + this::getOffsetPath, (etp, value) -> + new SubscriptionCursorWithoutToken(etp.getEventType(), etp.getPartition(), + new String(value, UTF_8))); + + if (offSets.size() != keys.size()) { + throw new ServiceTemporarilyUnavailableException("Failed to get all the keys " + + keys.stream() + .filter(v -> !offSets.containsKey(v)) + .map(String::valueOf) + .collect(Collectors.joining(", ")) + + " from ZK.", null); + } + + return offSets; } @Override