Skip to content

Commit

Permalink
Overrode SingleStreamTracker#orphanedInitialPositionInStream() to
Browse files Browse the repository at this point in the history
match that of the provided `StreamConfig`.
  • Loading branch information
stair-aws committed Jul 25, 2023
1 parent eccd6cf commit ce2604e
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,3 @@ public class StreamConfig {
private final InitialPositionInStreamExtended initialPositionInStreamExtended;
private String consumerArn;
}


Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ public class SingleStreamTracker implements StreamTracker {

private final List<StreamConfig> streamConfigs;

/**
* Cached reference to {@link StreamConfig#initialPositionInStreamExtended()}
* to avoid unnecessary getter invocations.
*/
@EqualsAndHashCode.Exclude
@ToString.Exclude
private final InitialPositionInStreamExtended initialPositionInStream;

public SingleStreamTracker(String streamName) {
this(StreamIdentifier.singleStreamInstance(streamName));
}
Expand Down Expand Up @@ -72,6 +80,7 @@ public SingleStreamTracker(
public SingleStreamTracker(@NonNull StreamIdentifier streamIdentifier, @NonNull StreamConfig streamConfig) {
this.streamIdentifier = streamIdentifier;
this.streamConfigs = Collections.singletonList(streamConfig);
this.initialPositionInStream = streamConfig.initialPositionInStreamExtended();
}

@Override
Expand All @@ -84,6 +93,11 @@ public FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy()
return NO_LEASE_DELETION;
}

@Override
public InitialPositionInStreamExtended orphanedStreamInitialPositionInStream() {
return initialPositionInStream;
}

@Override
public boolean isMultiStream() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package software.amazon.kinesis.processor;

import java.util.Date;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
Expand Down Expand Up @@ -48,6 +50,22 @@ public void testInitialPositionConstructor() {
validate(tracker, expectedPosition);
}

@Test
public void testConsistencyOfInitialPositionInStream() {
for (final InitialPositionInStream position : InitialPositionInStream.values()) {
final InitialPositionInStreamExtended expectedPosition;
if (position == InitialPositionInStream.AT_TIMESTAMP) {
expectedPosition = InitialPositionInStreamExtended.newInitialPositionAtTimestamp(new Date());
} else {
expectedPosition = InitialPositionInStreamExtended.newInitialPosition(position);
}

final StreamTracker tracker = new SingleStreamTracker(STREAM_NAME, expectedPosition);

assertEquals(expectedPosition, tracker.orphanedStreamInitialPositionInStream());
}
}

private static void validate(StreamTracker tracker) {
validate(tracker, StreamTracker.DEFAULT_POSITION_IN_STREAM);
}
Expand Down

0 comments on commit ce2604e

Please sign in to comment.