Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #681 from zalando/aruha-858-negative-distance
Browse files Browse the repository at this point in the history
aruha-858 allow calculation of negative distances
  • Loading branch information
rcillo authored Jun 20, 2017
2 parents 9a2249a + 91995ab commit 3c7fe0d
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 30 deletions.
2 changes: 1 addition & 1 deletion api/nakadi-event-bus-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1905,7 +1905,7 @@ definitions:
format: int64
description: |
Number of events between two offsets. Initial offset is exclusive. It's only zero when both provided offsets
are equal. If the `final_cursor` is strictly smaller than the `initial_cursor` it results in client error.
are equal.
Cursor:
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,8 @@ public ResponseEntity<?> invalidCursorOperation(final InvalidCursorOperation e,

private String clientErrorMessage(final InvalidCursorOperation.Reason reason) {
switch (reason) {
case INVERTED_TIMELINE_ORDER: return "Inverted timelines. Final cursor must correspond to a newer " +
"timeline than initial cursor.";

case TIMELINE_NOT_FOUND: return "Timeline not found. It might happen in case the cursor refers to a " +
"timeline that has already expired.";
case INVERTED_OFFSET_ORDER: return "Inverted offsets. Final cursor offsets must be newer than initial " +
"cursor offsets";
case PARTITION_NOT_FOUND: return "Partition not found.";
case CURSORS_WITH_DIFFERENT_PARTITION: return "Cursors with different partition. Pairs of cursors should " +
"have matching partitions.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ public class InvalidCursorOperation extends MyNakadiRuntimeException1 {
private final Reason reason;

public enum Reason {
INVERTED_TIMELINE_ORDER,
TIMELINE_NOT_FOUND,
INVERTED_OFFSET_ORDER,
PARTITION_NOT_FOUND,
CURSORS_WITH_DIFFERENT_PARTITION
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import java.util.stream.Collectors;

import static org.zalando.nakadi.exceptions.runtime.InvalidCursorOperation.Reason.CURSORS_WITH_DIFFERENT_PARTITION;
import static org.zalando.nakadi.exceptions.runtime.InvalidCursorOperation.Reason.INVERTED_OFFSET_ORDER;
import static org.zalando.nakadi.exceptions.runtime.InvalidCursorOperation.Reason.INVERTED_TIMELINE_ORDER;
import static org.zalando.nakadi.exceptions.runtime.InvalidCursorOperation.Reason.PARTITION_NOT_FOUND;
import static org.zalando.nakadi.exceptions.runtime.InvalidCursorOperation.Reason.TIMELINE_NOT_FOUND;

Expand All @@ -44,7 +42,7 @@ public Long calculateDistance(final NakadiCursor initialCursor, final NakadiCurs
if (!initialCursor.getPartition().equals(finalCursor.getPartition())) {
throw new InvalidCursorOperation(CURSORS_WITH_DIFFERENT_PARTITION);
} else if (initialCursor.getTimeline().getOrder() > finalCursor.getTimeline().getOrder()) {
throw new InvalidCursorOperation(INVERTED_TIMELINE_ORDER);
return - getDistanceDifferentTimelines(finalCursor, initialCursor);
}

if (initialCursor.getTimeline().getOrder() == finalCursor.getTimeline().getOrder()) {
Expand All @@ -56,9 +54,7 @@ public Long calculateDistance(final NakadiCursor initialCursor, final NakadiCurs

private long getDistanceSameTimeline(final NakadiCursor initialCursor, final NakadiCursor finalCursor) {
final long distance = numberOfEventsBeforeCursor(finalCursor) - numberOfEventsBeforeCursor(initialCursor);
if (distance < 0) {
throw new InvalidCursorOperation(INVERTED_OFFSET_ORDER);
}

return distance;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ private SubscriptionEventTypeStats loadStats(
if (!lastPosition.getEventType().equals(eventType.getName())) {
continue;
}
Long distance;
final Long distance;
if (subscriptionNode.containsPartition(lastPosition.getEventTypePartition())) {
final NakadiCursor currentPosition;
final SubscriptionCursorWithoutToken offset =
Expand All @@ -267,12 +267,7 @@ private SubscriptionEventTypeStats loadStats(
try {
distance = cursorOperationsService.calculateDistance(currentPosition, lastPosition);
} catch (final InvalidCursorOperation ex) {
if (ex.getReason() == InvalidCursorOperation.Reason.INVERTED_TIMELINE_ORDER ||
ex.getReason() == InvalidCursorOperation.Reason.INVERTED_OFFSET_ORDER) {
distance = 0L;
} else {
throw new InconsistentStateException("Unexpected exception while calculating distance", ex);
}
throw new InconsistentStateException("Unexpected exception while calculating distance", ex);
}
} else {
distance = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.zalando.nakadi.exceptions.runtime.InvalidCursorOperation.Reason.CURSORS_WITH_DIFFERENT_PARTITION;
import static org.zalando.nakadi.exceptions.runtime.InvalidCursorOperation.Reason.INVERTED_OFFSET_ORDER;
import static org.zalando.nakadi.exceptions.runtime.InvalidCursorOperation.Reason.INVERTED_TIMELINE_ORDER;
import static org.zalando.nakadi.exceptions.runtime.InvalidCursorOperation.Reason.PARTITION_NOT_FOUND;
import static org.zalando.nakadi.exceptions.runtime.InvalidCursorOperation.Reason.TIMELINE_NOT_FOUND;

Expand All @@ -46,22 +44,29 @@ public void whenCursorsAreInTheSameTimeline() throws Exception {
}

@Test
public void whenCursorsOffsetsAreInvertedThenException() throws Exception {
public void whenCursorsOffsetsAreInvertedThenNegativeDistance() throws Exception {
final NakadiCursor initialCursor = new NakadiCursor(timeline, "0", "0000000000000002");
final NakadiCursor finalCursor = new NakadiCursor(timeline, "0", "0000000000000001");

expectException(initialCursor, finalCursor, INVERTED_OFFSET_ORDER);
final Long distance = service.calculateDistance(initialCursor, finalCursor);

assertThat(distance, equalTo(-1L));
}

@Test
public void whenCursorTimelinesAreInvertedThenException() throws Exception {
final Timeline initialTimeline = mockTimeline(1);
final Timeline finalTimeline = mockTimeline(0);
public void whenCursorTimelinesAreInvertedThenNegativeDistance() throws Exception {
final Timeline initialTimeline = mockTimeline(2, 7);
final Timeline intermediaryTimeline = mockTimeline(1, 9L);
final Timeline finalTimeline = mockTimeline(0, 5);

final NakadiCursor initialCursor = new NakadiCursor(initialTimeline, "0", "0000000000000001");
final NakadiCursor finalCursor = new NakadiCursor(finalTimeline, "0", "0000000000000002");
final NakadiCursor finalCursor = new NakadiCursor(finalTimeline, "0", "0000000000000003");

mockActiveTimelines(initialTimeline, intermediaryTimeline, finalTimeline);

final Long distance = service.calculateDistance(initialCursor, finalCursor);

expectException(initialCursor, finalCursor, INVERTED_TIMELINE_ORDER);
assertThat(distance, equalTo(-14L));
}

@Test
Expand Down

0 comments on commit 3c7fe0d

Please sign in to comment.