Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
ARUHA-834 Recreate subscription during /stat and /cursors calls
Browse files Browse the repository at this point in the history
  • Loading branch information
antban committed May 30, 2017
1 parent 601c87e commit f9b41de
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 28 deletions.
19 changes: 13 additions & 6 deletions src/main/java/org/zalando/nakadi/service/CursorsService.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.zalando.nakadi.service;

import com.google.common.collect.ImmutableList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -33,6 +32,8 @@
import org.zalando.nakadi.repository.db.SubscriptionDbRepository;
import org.zalando.nakadi.security.Client;
import org.zalando.nakadi.service.subscription.model.Partition;
import org.zalando.nakadi.service.subscription.state.StartingState;
import org.zalando.nakadi.service.subscription.zk.OldSubscriptionFormatException;
import org.zalando.nakadi.service.subscription.zk.SubscriptionClientFactory;
import org.zalando.nakadi.service.subscription.zk.SubscriptionNotInitializedException;
import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient;
Expand Down Expand Up @@ -131,12 +132,18 @@ public List<SubscriptionCursorWithoutToken> getSubscriptionCursors(final String
subscription, "subscription." + subscriptionId + ".get_cursors");
final ImmutableList.Builder<SubscriptionCursorWithoutToken> cursorsListBuilder = ImmutableList.builder();

Partition[] partitions;
try {
for (final Partition p : zkSubscriptionClient.listPartitions()) {
cursorsListBuilder.add(zkSubscriptionClient.getOffset(p.getKey()));
}
} catch (SubscriptionNotInitializedException ex) {
return Collections.emptyList();
partitions = zkSubscriptionClient.listPartitions();
} catch (final OldSubscriptionFormatException ex) {
zkSubscriptionClient.runLocked(() -> StartingState.initializeSubscriptionStructure(
subscription, timelineService, cursorConverter, zkSubscriptionClient));
partitions = zkSubscriptionClient.listPartitions();
} catch (final SubscriptionNotInitializedException ex) {
partitions = new Partition[]{};
}
for (final Partition p : partitions) {
cursorsListBuilder.add(zkSubscriptionClient.getOffset(p.getKey()));
}
return cursorsListBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
import org.zalando.nakadi.service.CursorOperationsService;
import org.zalando.nakadi.service.Result;
import org.zalando.nakadi.service.subscription.state.StartingState;
import org.zalando.nakadi.service.subscription.zk.OldSubscriptionFormatException;
import org.zalando.nakadi.service.subscription.zk.SubscriptionClientFactory;
import org.zalando.nakadi.service.subscription.zk.SubscriptionNotInitializedException;
import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient;
import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionNode;
import org.zalando.nakadi.service.timeline.TimelineService;
Expand Down Expand Up @@ -225,7 +225,7 @@ private ZkSubscriptionNode getZkSubscriptionNode(
final Subscription subscription, final ZkSubscriptionClient subscriptionClient) {
try {
return subscriptionClient.getZkSubscriptionNodeLocked();
} catch (SubscriptionNotInitializedException ex) {
} catch (OldSubscriptionFormatException ex) {
subscriptionClient.runLocked(() -> StartingState.initializeSubscriptionStructure(
subscription, timelineService, converter, subscriptionClient));
return subscriptionClient.getZkSubscriptionNodeLocked();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.zalando.nakadi.exceptions.NakadiRuntimeException;
import org.zalando.nakadi.exceptions.ServiceUnavailableException;
import org.zalando.nakadi.exceptions.UnableProcessException;
import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1;
import org.zalando.nakadi.exceptions.runtime.OperationInterruptedException;
import org.zalando.nakadi.exceptions.runtime.OperationTimeoutException;
import org.zalando.nakadi.exceptions.runtime.RequestInProgressException;
Expand Down Expand Up @@ -102,7 +103,7 @@ public final void runLocked(final Runnable function) {
if (releaseException != null) {
throw releaseException;
}
} catch (final NakadiRuntimeException e) {
} catch (final NakadiRuntimeException | MyNakadiRuntimeException1 e) {
throw e;
} catch (final Exception e) {
throw new NakadiRuntimeException(e);
Expand Down Expand Up @@ -207,15 +208,22 @@ public final void unregisterSession(final Session session) {
public final Session[] listSessions() {
getLog().info("fetching sessions information");
final List<Session> sessions = new ArrayList<>();
final List<String> zkSessions;
try {
for (final String sessionId : getCurator().getChildren().forPath(getSubscriptionPath("/sessions"))) {
zkSessions = getCurator().getChildren().forPath(getSubscriptionPath("/sessions"));
} catch (final KeeperException.NoNodeException e) {
throw new SubscriptionNotInitializedException(getSubscriptionId());
} catch (Exception ex) {
throw new NakadiRuntimeException(ex);
}

try {
for (final String sessionId : zkSessions) {
final int weight = Integer.parseInt(new String(getCurator().getData()
.forPath(getSubscriptionPath("/sessions/" + sessionId)), UTF_8));
sessions.add(new Session(sessionId, weight));
}
return sessions.toArray(new Session[sessions.size()]);
} catch (final KeeperException.NoNodeException e) {
throw new SubscriptionNotInitializedException(getSubscriptionId());
} catch (final Exception e) {
throw new NakadiRuntimeException(e);
}
Expand Down Expand Up @@ -350,7 +358,8 @@ public final ZKSubscription subscribeForTopologyChanges(final Runnable onTopolog
}

@Override
public final ZkSubscriptionNode getZkSubscriptionNodeLocked() throws SubscriptionNotInitializedException {
public final ZkSubscriptionNode getZkSubscriptionNodeLocked() throws SubscriptionNotInitializedException,
OldSubscriptionFormatException {
final ZkSubscriptionNode subscriptionNode = new ZkSubscriptionNode();
try {
if (null == getCurator().checkExists().forPath(getSubscriptionPath(""))) {
Expand All @@ -366,11 +375,8 @@ public final ZkSubscriptionNode getZkSubscriptionNodeLocked() throws Subscriptio
subscriptionNode.setPartitions(listPartitions());
subscriptionNode.setSessions(listSessions());
});
} catch (final NakadiRuntimeException nre) {
final Exception cause = nre.getException();
if (!(cause instanceof KeeperException.NoNodeException)) {
throw new NakadiRuntimeException(cause);
}
} catch (final NakadiRuntimeException ex) {
// this line intentionally left to have the same behavior as it was before
getLog().info("No data about provided subscription {} in ZK", getSubscriptionPath(""));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,25 @@ public void updatePartitionsConfiguration(final Partition[] partitions) throws N
}

private Topology readTopology() throws Exception {
final byte[] data = getCurator().getData().forPath(getSubscriptionPath(NODE_TOPOLOGY));
final Topology result = objectMapper.readValue(data, Topology.class);
getLog().info("Topology is {}", result);
return result;
try {
final byte[] data = getCurator().getData().forPath(getSubscriptionPath(NODE_TOPOLOGY));
final Topology result = objectMapper.readValue(data, Topology.class);
getLog().info("Topology is {}", result);
return result;
} catch (final IOException ex) {
throw new OldSubscriptionFormatException();
} catch (KeeperException.NoNodeException ex) {
throw new SubscriptionNotInitializedException(getSubscriptionId());
}
}

@Override
public Partition[] listPartitions() throws NakadiRuntimeException, SubscriptionNotInitializedException {
public Partition[] listPartitions() throws NakadiRuntimeException, SubscriptionNotInitializedException,
OldSubscriptionFormatException {
try {
return readTopology().getPartitions();
} catch (final KeeperException.NoNodeException e) {
throw new SubscriptionNotInitializedException(getSubscriptionId());
} catch (final SubscriptionNotInitializedException | OldSubscriptionFormatException e) {
throw e;
} catch (final Exception e) {
throw new NakadiRuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.zalando.nakadi.service.subscription.zk;

import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1;

public class OldSubscriptionFormatException extends MyNakadiRuntimeException1 {
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,18 @@ private void incrementTopology() {
@Override
public Partition[] listPartitions() {
getLog().info("fetching partitions information");

final List<String> zkPartitions;
try {
zkPartitions = getCurator().getChildren().forPath(getSubscriptionPath("/topics"));
} catch (KeeperException.NoNodeException ex) {
throw new SubscriptionNotInitializedException(getSubscriptionId());
} catch (final Exception e) {
throw new NakadiRuntimeException(e);
}
try {
final List<Partition> partitions = new ArrayList<>();
for (final String topic : getCurator().getChildren().forPath(getSubscriptionPath("/topics"))) {
for (final String topic : zkPartitions) {
for (final String partition : getCurator().getChildren().forPath(getSubscriptionPath("/topics/"
+ topic))) {
partitions.add(deserializeNode(
Expand All @@ -141,8 +150,6 @@ public Partition[] listPartitions() {
}
}
return partitions.toArray(new Partition[partitions.size()]);
} catch (final KeeperException.NoNodeException e) {
throw new SubscriptionNotInitializedException(getSubscriptionId());
} catch (final Exception e) {
throw new NakadiRuntimeException(e);
}
Expand Down

0 comments on commit f9b41de

Please sign in to comment.