Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
ARUHA-834 Fixes after review
Browse files Browse the repository at this point in the history
  • Loading branch information
antban committed May 31, 2017
1 parent f9b41de commit 8ea76cd
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* Subscription client that uses topology as an object node and separates changeable data to a separate zk node.
* The structure of zk data is like this:
* <pre>
*- nakadi
* - nakadi
* +- locks
* | |- subscription_{subscription_id} // Node that is being used to guarantee consistency of subscription data
* |
Expand Down Expand Up @@ -142,10 +142,10 @@ protected byte[] createTopologyAndOffsets(final Collection<SubscriptionCursorWit
}

@Override
public void updatePartitionsConfiguration(final Partition[] partitions) throws NakadiRuntimeException {
public void updatePartitionsConfiguration(final Partition[] partitions) throws NakadiRuntimeException,
SubscriptionNotInitializedException, OldSubscriptionFormatException {
final Topology newTopology = readTopology().withUpdatedPartitions(partitions);
try {
final Topology topology = readTopology();
final Topology newTopology = topology.withUpdatedPartitions(partitions);
getCurator().setData().forPath(
getSubscriptionPath(NODE_TOPOLOGY),
objectMapper.writeValueAsBytes(newTopology));
Expand All @@ -154,7 +154,8 @@ public void updatePartitionsConfiguration(final Partition[] partitions) throws N
}
}

private Topology readTopology() throws Exception {
private Topology readTopology() throws NakadiRuntimeException, OldSubscriptionFormatException,
SubscriptionNotInitializedException {
try {
final byte[] data = getCurator().getData().forPath(getSubscriptionPath(NODE_TOPOLOGY));
final Topology result = objectMapper.readValue(data, Topology.class);
Expand All @@ -164,19 +165,15 @@ private Topology readTopology() throws Exception {
throw new OldSubscriptionFormatException();
} catch (KeeperException.NoNodeException ex) {
throw new SubscriptionNotInitializedException(getSubscriptionId());
} catch (final Exception ex) {
throw new NakadiRuntimeException(ex);
}
}

@Override
public Partition[] listPartitions() throws NakadiRuntimeException, SubscriptionNotInitializedException,
OldSubscriptionFormatException {
try {
return readTopology().getPartitions();
} catch (final SubscriptionNotInitializedException | OldSubscriptionFormatException e) {
throw e;
} catch (final Exception e) {
throw new NakadiRuntimeException(e);
}
return readTopology().getPartitions();
}

protected String getOffsetPath(final EventTypePartition etp) {
Expand All @@ -194,14 +191,11 @@ public SubscriptionCursorWithoutToken getOffset(final EventTypePartition key) th
}

@Override
public void transfer(final String sessionId, final Collection<EventTypePartition> partitions) {
public void transfer(final String sessionId, final Collection<EventTypePartition> partitions)
throws NakadiRuntimeException, OldSubscriptionFormatException, SubscriptionNotInitializedException {
getLog().info("session " + sessionId + " releases partitions " + partitions);
final Topology topology;
try {
topology = readTopology();
} catch (final Exception e) {
throw new NakadiRuntimeException(e);
}
final Topology topology = readTopology();

final List<Partition> changeSet = new ArrayList<>();
for (final EventTypePartition etp : partitions) {
final Partition candidate = Stream.of(topology.getPartitions())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private void incrementTopology() {
}

@Override
public Partition[] listPartitions() {
public Partition[] listPartitions() throws NakadiRuntimeException {
getLog().info("fetching partitions information");

final List<String> zkPartitions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public interface ZkSubscriptionClient {
/**
* Updates specified partitions in zk.
*/
void updatePartitionsConfiguration(Partition[] partitions) throws NakadiRuntimeException;
void updatePartitionsConfiguration(Partition[] partitions) throws NakadiRuntimeException,
SubscriptionNotInitializedException, OldSubscriptionFormatException;

/**
* Returns session list in zk related to this subscription.
Expand Down Expand Up @@ -113,7 +114,8 @@ List<Boolean> commitOffsets(List<SubscriptionCursorWithoutToken> cursors,
* @param sessionId Someone who actually tries to transfer data.
* @param partitions topic ids and partition ids of transferred data.
*/
void transfer(String sessionId, Collection<EventTypePartition> partitions);
void transfer(String sessionId, Collection<EventTypePartition> partitions)
throws NakadiRuntimeException, OldSubscriptionFormatException, SubscriptionNotInitializedException;

/**
* Retrieves subscription data like partitions and sessions from ZK under lock.
Expand Down

0 comments on commit 8ea76cd

Please sign in to comment.