Skip to content

Commit

Permalink
Add ttl to all dynamo source coordination store items on creation, no…
Browse files Browse the repository at this point in the history
…t just when they are COMPLETED (#3121)

Add ttl to all dynamo source coordination store items on creation, not just when they are COMPLETED

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Aug 9, 2023
1 parent 94d7b10 commit 57bfc24
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ public boolean tryCreatePartitionItem(final String sourceIdentifier,
final Long closedCount,
final String partitionProgressState) {
final DynamoDbSourcePartitionItem newPartitionItem = new DynamoDbSourcePartitionItem();

if (Objects.nonNull(dynamoStoreSettings.getTtl())) {
newPartitionItem.setExpirationTime(Instant.now().plus(dynamoStoreSettings.getTtl()).getEpochSecond());
}
newPartitionItem.setSourceIdentifier(sourceIdentifier);
newPartitionItem.setSourceStatusCombinationKey(String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, sourcePartitionStatus));
newPartitionItem.setPartitionPriority(Instant.now().toString());
Expand Down Expand Up @@ -114,7 +118,7 @@ public void tryUpdateSourcePartitionItem(final SourcePartitionStoreItem updateIt
dynamoDbSourcePartitionItem.setPartitionPriority(updateItem.getPartitionOwnershipTimeout().toString());
}

if (SourcePartitionStatus.COMPLETED.equals(updateItem.getSourcePartitionStatus()) && Objects.nonNull(dynamoStoreSettings.getTtl())) {
if (Objects.nonNull(dynamoStoreSettings.getTtl())) {
dynamoDbSourcePartitionItem.setExpirationTime(Instant.now().plus(dynamoStoreSettings.getTtl()).getEpochSecond());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ void tryCreatePartitionItem_calls_dynamoDbClientWrapper_correctly() {
final ArgumentCaptor<DynamoDbSourcePartitionItem> argumentCaptor = ArgumentCaptor.forClass(DynamoDbSourcePartitionItem.class);
given(dynamoDbClientWrapper.tryCreatePartitionItem(argumentCaptor.capture())).willReturn(true);

final Duration ttl = Duration.ofSeconds(30);
final Long nowPlusTtl = Instant.now().plus(ttl).getEpochSecond();
given(dynamoStoreSettings.getTtl()).willReturn(ttl);

final boolean result = createObjectUnderTest().tryCreatePartitionItem(sourceIdentifier, sourcePartitionKey, sourcePartitionStatus, closedCount, partitionProgressState);

assertThat(result, equalTo(true));
Expand All @@ -118,6 +122,7 @@ void tryCreatePartitionItem_calls_dynamoDbClientWrapper_correctly() {
assertThat(createdItem.getPartitionProgressState(), equalTo(partitionProgressState));
assertThat(createdItem.getSourceStatusCombinationKey(), equalTo(sourceIdentifier + "|" + sourcePartitionStatus));
assertThat(createdItem.getPartitionPriority(), notNullValue());
assertThat(createdItem.getExpirationTime(), greaterThanOrEqualTo(nowPlusTtl));
}

@Test
Expand All @@ -128,6 +133,8 @@ void tryUpdateSourcePartitionItem_calls_dynamoClientWrapper_correctly_for_assign
given(updateItem.getSourceIdentifier()).willReturn(sourceIdentifier);
given(updateItem.getSourcePartitionStatus()).willReturn(SourcePartitionStatus.ASSIGNED);

given(dynamoStoreSettings.getTtl()).willReturn(null);

doNothing().when(dynamoDbClientWrapper).tryUpdatePartitionItem((DynamoDbSourcePartitionItem) updateItem);

final Instant partitionOwnershipTimeout = Instant.now();
Expand All @@ -146,6 +153,8 @@ void tryUpdateSourcePartitionItem_calls_dynamoClientWrapper_correctly_for_closed
given(updateItem.getSourceIdentifier()).willReturn(sourceIdentifier);
given(updateItem.getSourcePartitionStatus()).willReturn(SourcePartitionStatus.CLOSED);

given(dynamoStoreSettings.getTtl()).willReturn(null);

doNothing().when(dynamoDbClientWrapper).tryUpdatePartitionItem((DynamoDbSourcePartitionItem) updateItem);
final Instant reOpenAtTime = Instant.now();
given(updateItem.getReOpenAt()).willReturn(reOpenAtTime);
Expand All @@ -163,13 +172,14 @@ void tryUpdateSourcePartitionItem_calls_dynamoClientWrapper_correctly_for_comple
given(updateItem.getSourceIdentifier()).willReturn(sourceIdentifier);
given(updateItem.getSourcePartitionStatus()).willReturn(SourcePartitionStatus.COMPLETED);

doNothing().when(dynamoDbClientWrapper).tryUpdatePartitionItem((DynamoDbSourcePartitionItem) updateItem);

final ArgumentCaptor<Long> argumentCaptor = ArgumentCaptor.forClass(Long.class);
final Duration ttl = Duration.ofSeconds(30);
final Long nowPlusTtl = Instant.now().plus(ttl).getEpochSecond();
given(dynamoStoreSettings.getTtl()).willReturn(ttl);
doNothing().when((DynamoDbSourcePartitionItem) updateItem).setExpirationTime(argumentCaptor.capture());

doNothing().when(dynamoDbClientWrapper).tryUpdatePartitionItem((DynamoDbSourcePartitionItem) updateItem);

createObjectUnderTest().tryUpdateSourcePartitionItem(updateItem);

final Long expirationTimeResult = argumentCaptor.getValue();
Expand Down

0 comments on commit 57bfc24

Please sign in to comment.