Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #670 from zalando/ARUHA-834
Browse files Browse the repository at this point in the history
ARUHA-834 Fix bugs related to ARUHA-753
  • Loading branch information
antban authored Jun 1, 2017
2 parents b9c758a + 8ea76cd commit ddbfc69
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 49 deletions.
19 changes: 13 additions & 6 deletions src/main/java/org/zalando/nakadi/service/CursorsService.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.zalando.nakadi.service;

import com.google.common.collect.ImmutableList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -33,6 +32,8 @@
import org.zalando.nakadi.repository.db.SubscriptionDbRepository;
import org.zalando.nakadi.security.Client;
import org.zalando.nakadi.service.subscription.model.Partition;
import org.zalando.nakadi.service.subscription.state.StartingState;
import org.zalando.nakadi.service.subscription.zk.OldSubscriptionFormatException;
import org.zalando.nakadi.service.subscription.zk.SubscriptionClientFactory;
import org.zalando.nakadi.service.subscription.zk.SubscriptionNotInitializedException;
import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient;
Expand Down Expand Up @@ -131,12 +132,18 @@ public List<SubscriptionCursorWithoutToken> getSubscriptionCursors(final String
subscription, "subscription." + subscriptionId + ".get_cursors");
final ImmutableList.Builder<SubscriptionCursorWithoutToken> cursorsListBuilder = ImmutableList.builder();

Partition[] partitions;
try {
for (final Partition p : zkSubscriptionClient.listPartitions()) {
cursorsListBuilder.add(zkSubscriptionClient.getOffset(p.getKey()));
}
} catch (SubscriptionNotInitializedException ex) {
return Collections.emptyList();
partitions = zkSubscriptionClient.listPartitions();
} catch (final OldSubscriptionFormatException ex) {
zkSubscriptionClient.runLocked(() -> StartingState.initializeSubscriptionStructure(
subscription, timelineService, cursorConverter, zkSubscriptionClient));
partitions = zkSubscriptionClient.listPartitions();
} catch (final SubscriptionNotInitializedException ex) {
partitions = new Partition[]{};
}
for (final Partition p : partitions) {
cursorsListBuilder.add(zkSubscriptionClient.getOffset(p.getKey()));
}
return cursorsListBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
import org.zalando.nakadi.service.CursorOperationsService;
import org.zalando.nakadi.service.Result;
import org.zalando.nakadi.service.subscription.state.StartingState;
import org.zalando.nakadi.service.subscription.zk.OldSubscriptionFormatException;
import org.zalando.nakadi.service.subscription.zk.SubscriptionClientFactory;
import org.zalando.nakadi.service.subscription.zk.SubscriptionNotInitializedException;
import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClient;
import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionNode;
import org.zalando.nakadi.service.timeline.TimelineService;
Expand Down Expand Up @@ -225,7 +225,7 @@ private ZkSubscriptionNode getZkSubscriptionNode(
final Subscription subscription, final ZkSubscriptionClient subscriptionClient) {
try {
return subscriptionClient.getZkSubscriptionNodeLocked();
} catch (SubscriptionNotInitializedException ex) {
} catch (OldSubscriptionFormatException ex) {
subscriptionClient.runLocked(() -> StartingState.initializeSubscriptionStructure(
subscription, timelineService, converter, subscriptionClient));
return subscriptionClient.getZkSubscriptionNodeLocked();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ void offsetChanged(final EventTypePartition key) {
reconfigureKafkaConsumer(true);
}

if (commitResult.committedCount > 0) {
if (commitResult.committedCount > 0) {
committedEvents += commitResult.committedCount;
this.lastCommitMillis = System.currentTimeMillis();
streamToOutput();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.zalando.nakadi.exceptions.NakadiRuntimeException;
import org.zalando.nakadi.exceptions.ServiceUnavailableException;
import org.zalando.nakadi.exceptions.UnableProcessException;
import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1;
import org.zalando.nakadi.exceptions.runtime.OperationInterruptedException;
import org.zalando.nakadi.exceptions.runtime.OperationTimeoutException;
import org.zalando.nakadi.exceptions.runtime.RequestInProgressException;
Expand Down Expand Up @@ -102,7 +103,7 @@ public final void runLocked(final Runnable function) {
if (releaseException != null) {
throw releaseException;
}
} catch (final NakadiRuntimeException e) {
} catch (final NakadiRuntimeException | MyNakadiRuntimeException1 e) {
throw e;
} catch (final Exception e) {
throw new NakadiRuntimeException(e);
Expand Down Expand Up @@ -207,15 +208,22 @@ public final void unregisterSession(final Session session) {
public final Session[] listSessions() {
getLog().info("fetching sessions information");
final List<Session> sessions = new ArrayList<>();
final List<String> zkSessions;
try {
for (final String sessionId : getCurator().getChildren().forPath(getSubscriptionPath("/sessions"))) {
zkSessions = getCurator().getChildren().forPath(getSubscriptionPath("/sessions"));
} catch (final KeeperException.NoNodeException e) {
throw new SubscriptionNotInitializedException(getSubscriptionId());
} catch (Exception ex) {
throw new NakadiRuntimeException(ex);
}

try {
for (final String sessionId : zkSessions) {
final int weight = Integer.parseInt(new String(getCurator().getData()
.forPath(getSubscriptionPath("/sessions/" + sessionId)), UTF_8));
sessions.add(new Session(sessionId, weight));
}
return sessions.toArray(new Session[sessions.size()]);
} catch (final KeeperException.NoNodeException e) {
throw new SubscriptionNotInitializedException(getSubscriptionId());
} catch (final Exception e) {
throw new NakadiRuntimeException(e);
}
Expand Down Expand Up @@ -350,7 +358,8 @@ public final ZKSubscription subscribeForTopologyChanges(final Runnable onTopolog
}

@Override
public final ZkSubscriptionNode getZkSubscriptionNodeLocked() throws SubscriptionNotInitializedException {
public final ZkSubscriptionNode getZkSubscriptionNodeLocked() throws SubscriptionNotInitializedException,
OldSubscriptionFormatException {
final ZkSubscriptionNode subscriptionNode = new ZkSubscriptionNode();
try {
if (null == getCurator().checkExists().forPath(getSubscriptionPath(""))) {
Expand All @@ -366,11 +375,8 @@ public final ZkSubscriptionNode getZkSubscriptionNodeLocked() throws Subscriptio
subscriptionNode.setPartitions(listPartitions());
subscriptionNode.setSessions(listSessions());
});
} catch (final NakadiRuntimeException nre) {
final Exception cause = nre.getException();
if (!(cause instanceof KeeperException.NoNodeException)) {
throw new NakadiRuntimeException(cause);
}
} catch (final NakadiRuntimeException ex) {
// this line intentionally left to have the same behavior as it was before
getLog().info("No data about provided subscription {} in ZK", getSubscriptionPath(""));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* Subscription client that uses topology as an object node and separates changeable data to a separate zk node.
* The structure of zk data is like this:
* <pre>
*- nakadi
* - nakadi
* +- locks
* | |- subscription_{subscription_id} // Node that is being used to guarantee consistency of subscription data
* |
Expand Down Expand Up @@ -142,10 +142,10 @@ protected byte[] createTopologyAndOffsets(final Collection<SubscriptionCursorWit
}

@Override
public void updatePartitionsConfiguration(final Partition[] partitions) throws NakadiRuntimeException {
public void updatePartitionsConfiguration(final Partition[] partitions) throws NakadiRuntimeException,
SubscriptionNotInitializedException, OldSubscriptionFormatException {
final Topology newTopology = readTopology().withUpdatedPartitions(partitions);
try {
final Topology topology = readTopology();
final Topology newTopology = topology.withUpdatedPartitions(partitions);
getCurator().setData().forPath(
getSubscriptionPath(NODE_TOPOLOGY),
objectMapper.writeValueAsBytes(newTopology));
Expand All @@ -154,24 +154,28 @@ public void updatePartitionsConfiguration(final Partition[] partitions) throws N
}
}

private Topology readTopology() throws Exception {
final byte[] data = getCurator().getData().forPath(getSubscriptionPath(NODE_TOPOLOGY));
final Topology result = objectMapper.readValue(data, Topology.class);
getLog().info("Topology is {}", result);
return result;
}

@Override
public Partition[] listPartitions() throws NakadiRuntimeException, SubscriptionNotInitializedException {
private Topology readTopology() throws NakadiRuntimeException, OldSubscriptionFormatException,
SubscriptionNotInitializedException {
try {
return readTopology().getPartitions();
} catch (final KeeperException.NoNodeException e) {
final byte[] data = getCurator().getData().forPath(getSubscriptionPath(NODE_TOPOLOGY));
final Topology result = objectMapper.readValue(data, Topology.class);
getLog().info("Topology is {}", result);
return result;
} catch (final IOException ex) {
throw new OldSubscriptionFormatException();
} catch (KeeperException.NoNodeException ex) {
throw new SubscriptionNotInitializedException(getSubscriptionId());
} catch (final Exception e) {
throw new NakadiRuntimeException(e);
} catch (final Exception ex) {
throw new NakadiRuntimeException(ex);
}
}

@Override
public Partition[] listPartitions() throws NakadiRuntimeException, SubscriptionNotInitializedException,
OldSubscriptionFormatException {
return readTopology().getPartitions();
}

protected String getOffsetPath(final EventTypePartition etp) {
return getSubscriptionPath("/offsets/" + etp.getEventType() + "/" + etp.getPartition());
}
Expand All @@ -187,14 +191,11 @@ public SubscriptionCursorWithoutToken getOffset(final EventTypePartition key) th
}

@Override
public void transfer(final String sessionId, final Collection<EventTypePartition> partitions) {
public void transfer(final String sessionId, final Collection<EventTypePartition> partitions)
throws NakadiRuntimeException, OldSubscriptionFormatException, SubscriptionNotInitializedException {
getLog().info("session " + sessionId + " releases partitions " + partitions);
final Topology topology;
try {
topology = readTopology();
} catch (final Exception e) {
throw new NakadiRuntimeException(e);
}
final Topology topology = readTopology();

final List<Partition> changeSet = new ArrayList<>();
for (final EventTypePartition etp : partitions) {
final Partition candidate = Stream.of(topology.getPartitions())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.zalando.nakadi.service.subscription.zk;

import org.zalando.nakadi.exceptions.runtime.MyNakadiRuntimeException1;

public class OldSubscriptionFormatException extends MyNakadiRuntimeException1 {
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,20 @@ private void incrementTopology() {
}

@Override
public Partition[] listPartitions() {
public Partition[] listPartitions() throws NakadiRuntimeException {
getLog().info("fetching partitions information");

final List<String> zkPartitions;
try {
zkPartitions = getCurator().getChildren().forPath(getSubscriptionPath("/topics"));
} catch (KeeperException.NoNodeException ex) {
throw new SubscriptionNotInitializedException(getSubscriptionId());
} catch (final Exception e) {
throw new NakadiRuntimeException(e);
}
try {
final List<Partition> partitions = new ArrayList<>();
for (final String topic : getCurator().getChildren().forPath(getSubscriptionPath("/topics"))) {
for (final String topic : zkPartitions) {
for (final String partition : getCurator().getChildren().forPath(getSubscriptionPath("/topics/"
+ topic))) {
partitions.add(deserializeNode(
Expand All @@ -141,8 +150,6 @@ public Partition[] listPartitions() {
}
}
return partitions.toArray(new Partition[partitions.size()]);
} catch (final KeeperException.NoNodeException e) {
throw new SubscriptionNotInitializedException(getSubscriptionId());
} catch (final Exception e) {
throw new NakadiRuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public interface ZkSubscriptionClient {
/**
* Updates specified partitions in zk.
*/
void updatePartitionsConfiguration(Partition[] partitions) throws NakadiRuntimeException;
void updatePartitionsConfiguration(Partition[] partitions) throws NakadiRuntimeException,
SubscriptionNotInitializedException, OldSubscriptionFormatException;

/**
* Returns session list in zk related to this subscription.
Expand Down Expand Up @@ -113,7 +114,8 @@ List<Boolean> commitOffsets(List<SubscriptionCursorWithoutToken> cursors,
* @param sessionId Someone who actually tries to transfer data.
* @param partitions topic ids and partition ids of transferred data.
*/
void transfer(String sessionId, Collection<EventTypePartition> partitions);
void transfer(String sessionId, Collection<EventTypePartition> partitions)
throws NakadiRuntimeException, OldSubscriptionFormatException, SubscriptionNotInitializedException;

/**
* Retrieves subscription data like partitions and sessions from ZK under lock.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.zalando.nakadi.view;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.concurrent.Immutable;
import javax.validation.constraints.NotNull;
Expand All @@ -22,6 +23,7 @@ public String getEventType() {
return eventType;
}

@JsonIgnore
public EventTypePartition getEventTypePartition() {
return new EventTypePartition(eventType, getPartition());
}
Expand Down

0 comments on commit ddbfc69

Please sign in to comment.