From 6546901a8492500adda6d4fcfbc7d7f014734d47 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Tue, 26 Sep 2023 15:34:54 -0500 Subject: [PATCH] Add support for fully async acknowledgments in source coordination (#3384) Signed-off-by: Taylor Gray --- .../model/configuration/PluginSetting.java | 3 +- .../dataprepper/model/event/Event.java | 1 + .../source/coordinator/SourceCoordinator.java | 20 +++- .../LeaseBasedSourceCoordinator.java | 52 +++++++++-- .../LeaseBasedSourceCoordinatorTest.java | 91 +++++++++++++++++-- .../worker/NoSearchContextWorker.java | 10 +- .../source/opensearch/worker/PitWorker.java | 11 +-- .../opensearch/worker/ScrollWorker.java | 11 +-- .../opensearch/worker/WorkerCommonUtils.java | 33 ++++--- .../worker/NoSearchContextWorkerTest.java | 13 +-- .../opensearch/worker/PitWorkerTest.java | 19 ++-- .../opensearch/worker/ScrollWorkerTest.java | 15 +-- .../plugins/source/s3/ScanObjectWorker.java | 22 ++--- .../source/s3/S3ScanObjectWorkerTest.java | 20 ++-- 14 files changed, 213 insertions(+), 108 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PluginSetting.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PluginSetting.java index ab7e16455e..eae5362001 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PluginSetting.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PluginSetting.java @@ -257,8 +257,7 @@ public Long getLongOrDefault(final String attribute, final long defaultValue) { throw new IllegalArgumentException(String.format(UNEXPECTED_ATTRIBUTE_TYPE_MSG, object.getClass(), attribute)); } - private void checkObjectType(final String attribute, final Object object, final Class type) { - if (object != null && !(type.isAssignableFrom(object.getClass()))){ + private void checkObjectType(final String attribute, final Object object, final Class type) {if (object != null && !(type.isAssignableFrom(object.getClass()))){ throw new IllegalArgumentException(String.format(UNEXPECTED_ATTRIBUTE_TYPE_MSG, object.getClass(), attribute)); } } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java index db97e1a327..133abfdb2f 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java @@ -127,6 +127,7 @@ public interface Event extends Serializable { * Returns formatted parts of the input string replaced by their values in the event or the values from the result * of a Data Prepper expression * @param format input format + * @param expressionEvaluator - The expression evaluator that will support formatting from Data Prepper expressions * @return returns a string with no formatted parts, returns null if no value is found * @throws RuntimeException if the input string is not properly formatted * @since 2.1 diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourceCoordinator.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourceCoordinator.java index 6e640e4682..e8dc9da25d 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourceCoordinator.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/SourceCoordinator.java @@ -36,8 +36,8 @@ public interface SourceCoordinator { * If the global state map is not needed, then it can be ignored. Updating the global state map will save it, so the next time the supplier function is run, * it will contain the most recent state from the previous supplier function run. * @return {@link SourcePartition} with the details about how to process this partition. Will repeatedly return the partition until - * {@link SourceCoordinator#completePartition(String)} - * or {@link SourceCoordinator#closePartition(String, Duration, int)} are called by the source, + * {@link SourceCoordinator#completePartition(String, Boolean)} + * or {@link SourceCoordinator#closePartition(String, Duration, int, Boolean)} are called by the source, * or until the partition ownership times out. * @since 2.2 */ @@ -49,9 +49,10 @@ public interface SourceCoordinator { * @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotOwnedException if the partition is not owned by this instance of SourceCoordinator * @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException if the partition can not be completed * @param partitionKey - The partition key that uniquely identifies the partition of work that was fully processed + * @param fromAcknowledgmentsCallback - Whether this method is being called from an acknowledgment callback or not * @since 2.2 */ - void completePartition(final String partitionKey); + void completePartition(final String partitionKey, final Boolean fromAcknowledgmentsCallback); /** * Should be called by the source when it has processed all that it can up to this point in time for a given partition, @@ -62,12 +63,13 @@ public interface SourceCoordinator { * @param reopenAfter - The duration from the current time to wait before this partition should be processed further at a later date * @param maxClosedCount - The number of times to allow this partition to be closed. Will mark the partition as completed if the partition has been closed this many times or more * in the past + * @param fromAcknowledgmentsCallback - Whether this method is being called from an acknowledgment callback or not * @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotFoundException if the partition key could not be found in the distributed store * @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotOwnedException if the partition is not owned by this instance of SourceCoordinator * @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException if the partition can not be closed * @since 2.2 */ - void closePartition(final String partitionKey, final Duration reopenAfter, final int maxClosedCount); + void closePartition(final String partitionKey, final Duration reopenAfter, final int maxClosedCount, final Boolean fromAcknowledgmentsCallback); /** * Should be called by the source when it has completed some work for a partition, and needs to save its progress before continuing work on the partition. @@ -94,4 +96,14 @@ public interface SourceCoordinator { * @since 2.2 */ void giveUpPartitions(); + + + /** + * Should be called by the source after when acknowledgments are enabled to keep ownership of the partition for acknowledgmentTimeout amount of time + * before another instance of Data Prepper can pick it up for processing. Allows the source to acquire another partition immediately for processing + * @param partitionKey - the partition to update for ack timeout + * @param ackowledgmentTimeout - the amount of time that this partition can be completed by the acknowledgment callback before another instance of Data Prepper + * can pick it up for processing + */ + void updatePartitionForAcknowledgmentWait(final String partitionKey, final Duration ackowledgmentTimeout); } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java index c920cf59a6..be282d0ca5 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java @@ -40,6 +40,10 @@ public class LeaseBasedSourceCoordinator implements SourceCoordinator { private static final Logger LOG = LoggerFactory.getLogger(LeaseBasedSourceCoordinator.class); private static final ObjectMapper objectMapper = new ObjectMapper(); + private static final String COMPLETE_ACTION = "complete"; + private static final String CLOSE_ACTION = "close"; + private static final String SAVE_STATE_ACTION = "saveState"; + static final String PARTITION_CREATION_SUPPLIER_INVOCATION_COUNT = "partitionCreationSupplierInvocations"; static final String NO_PARTITIONS_ACQUIRED_COUNT = "noPartitionsAcquired"; static final String PARTITION_CREATED_COUNT = "partitionsCreatedCount"; @@ -121,9 +125,9 @@ public LeaseBasedSourceCoordinator(final Class partitionProgressStateClass, this.partitionsGivenUpCounter = pluginMetrics.counter(PARTITION_OWNERSHIP_GIVEN_UP_COUNT); this.partitionNotFoundErrorCounter = pluginMetrics.counter(PARTITION_NOT_FOUND_ERROR_COUNT); this.partitionNotOwnedErrorCounter = pluginMetrics.counter(PARTITION_NOT_OWNED_ERROR_COUNT); - this.saveStatePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, "saveState"); - this.closePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, "close"); - this.completePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, "complete"); + this.saveStatePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, SAVE_STATE_ACTION); + this.closePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, CLOSE_ACTION); + this.completePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, COMPLETE_ACTION); } @Override @@ -202,10 +206,10 @@ private void createPartitions(final List partitionIdentifie } @Override - public void completePartition(final String partitionKey) { + public void completePartition(final String partitionKey, final Boolean fromAcknowledgmentsCallback) { validateIsInitialized(); - final SourcePartitionStoreItem itemToUpdate = validateAndGetSourcePartitionStoreItem(partitionKey, "complete"); + final SourcePartitionStoreItem itemToUpdate = getItemWithAction(partitionKey, COMPLETE_ACTION, fromAcknowledgmentsCallback); validatePartitionOwnership(itemToUpdate); itemToUpdate.setPartitionOwner(null); @@ -220,17 +224,19 @@ public void completePartition(final String partitionKey) { throw e; } - partitionManager.removeActivePartition(); + if (!fromAcknowledgmentsCallback) { + partitionManager.removeActivePartition(); + } LOG.info("Partition key {} was completed by owner {}.", partitionKey, ownerId); partitionsCompletedCounter.increment(); } @Override - public void closePartition(final String partitionKey, final Duration reopenAfter, final int maxClosedCount) { + public void closePartition(final String partitionKey, final Duration reopenAfter, final int maxClosedCount, final Boolean fromAcknowledgmentsCallback) { validateIsInitialized(); - final SourcePartitionStoreItem itemToUpdate = validateAndGetSourcePartitionStoreItem(partitionKey, "close"); + final SourcePartitionStoreItem itemToUpdate = getItemWithAction(partitionKey, CLOSE_ACTION, fromAcknowledgmentsCallback); validatePartitionOwnership(itemToUpdate); itemToUpdate.setPartitionOwner(null); @@ -262,7 +268,9 @@ public void closePartition(final String partitionKey, final Duration reopenAfter partitionsClosedCounter.increment(); } - partitionManager.removeActivePartition(); + if (!fromAcknowledgmentsCallback) { + partitionManager.removeActivePartition(); + } LOG.info("Partition key {} was closed by owner {}. The resulting status of the partition is now {}", partitionKey, ownerId, itemToUpdate.getSourcePartitionStatus()); } @@ -271,7 +279,7 @@ public void closePartition(final String partitionKey, final Duration reopenAfter public void saveProgressStateForPartition(final String partitionKey, final S partitionProgressState) { validateIsInitialized(); - final SourcePartitionStoreItem itemToUpdate = validateAndGetSourcePartitionStoreItem(partitionKey, "save state"); + final SourcePartitionStoreItem itemToUpdate = validateAndGetSourcePartitionStoreItem(partitionKey, SAVE_STATE_ACTION); validatePartitionOwnership(itemToUpdate); itemToUpdate.setPartitionOwnershipTimeout(Instant.now().plus(DEFAULT_LEASE_TIMEOUT)); @@ -290,6 +298,20 @@ public void saveProgressStateForPartition(final String partitionKe saveProgressStateInvocationSuccessCounter.increment(); } + @Override + public void updatePartitionForAcknowledgmentWait(final String partitionKey, final Duration ackowledgmentTimeout) { + validateIsInitialized(); + + final SourcePartitionStoreItem itemToUpdate = validateAndGetSourcePartitionStoreItem(partitionKey, "update for ack wait"); + validatePartitionOwnership(itemToUpdate); + + itemToUpdate.setPartitionOwnershipTimeout(Instant.now().plus(ackowledgmentTimeout)); + + sourceCoordinationStore.tryUpdateSourcePartitionItem(itemToUpdate); + + partitionManager.removeActivePartition(); + } + @Override public void giveUpPartitions() { @@ -380,6 +402,10 @@ private SourcePartitionStoreItem validateAndGetSourcePartitionStoreItem(final St ); } + return getSourcePartitionStoreItem(partitionKey, action); + } + + private SourcePartitionStoreItem getSourcePartitionStoreItem(final String partitionKey, final String action) { final Optional optionalPartitionItem = sourceCoordinationStore.getSourcePartitionItem(sourceIdentifierWithPartitionType, partitionKey); if (optionalPartitionItem.isEmpty()) { @@ -441,4 +467,10 @@ private void giveUpAndSaveGlobalStateForPartitionCreation(final SourcePartitionS LOG.warn("There was an error saving global state after creating partitions.", e); } } + + private SourcePartitionStoreItem getItemWithAction(final String partitionKey, final String action, final Boolean fromAcknowledgmentsCallback) { + // The validation against activePartition in partition manager needs to be skipped when called from acknowledgments callback + // because otherwise it will fail the validation since it is actively working on a different partition when ack is received + return fromAcknowledgmentsCallback ? getSourcePartitionStoreItem(partitionKey, action) : validateAndGetSourcePartitionStoreItem(partitionKey, action); + } } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinatorTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinatorTest.java index 1bcb39d7e7..2a95299593 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinatorTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinatorTest.java @@ -440,7 +440,7 @@ void completePartition_with_partitionKey_that_is_not_owned_throws_partition_not_ given(partitionManager.getActivePartition()).willReturn(Optional.of(sourcePartition)); - assertThrows(PartitionNotOwnedException.class, () -> createObjectUnderTest().completePartition(UUID.randomUUID().toString())); + assertThrows(PartitionNotOwnedException.class, () -> createObjectUnderTest().completePartition(UUID.randomUUID().toString(), false)); verify(partitionNotOwnedErrorCounter).increment(); @@ -469,7 +469,7 @@ void completePartition_with_owned_partition_key_and_no_store_item_throws_Partiti given(partitionManager.getActivePartition()).willReturn(Optional.of(sourcePartition)); given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForPartition, sourcePartition.getPartitionKey())).willReturn(Optional.empty()); - assertThrows(PartitionNotFoundException.class, () -> createObjectUnderTest().completePartition(sourcePartition.getPartitionKey())); + assertThrows(PartitionNotFoundException.class, () -> createObjectUnderTest().completePartition(sourcePartition.getPartitionKey(), false)); verify(partitionNotFoundErrorCounter).increment(); @@ -500,7 +500,7 @@ void completePartition_with_owned_partition_key_and_existing_store_item_with_inv given(sourcePartitionStoreItem.getSourcePartitionKey()).willReturn(sourcePartition.getPartitionKey()); given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForPartition, sourcePartition.getPartitionKey())).willReturn(Optional.of(sourcePartitionStoreItem)); - assertThrows(PartitionNotOwnedException.class, () -> createObjectUnderTest().completePartition(sourcePartition.getPartitionKey())); + assertThrows(PartitionNotOwnedException.class, () -> createObjectUnderTest().completePartition(sourcePartition.getPartitionKey(), false)); verify(partitionManager).removeActivePartition(); @@ -536,7 +536,7 @@ void completePartition_with_owned_partition_and_existing_store_item_with_valid_o if (updatedItemSuccessfully) { doNothing().when(sourceCoordinationStore).tryUpdateSourcePartitionItem(sourcePartitionStoreItem); - createObjectUnderTest().completePartition(sourcePartition.getPartitionKey()); + createObjectUnderTest().completePartition(sourcePartition.getPartitionKey(), false); verify(sourcePartitionStoreItem).setSourcePartitionStatus(SourcePartitionStatus.COMPLETED); verify(sourcePartitionStoreItem).setReOpenAt(null); @@ -549,7 +549,7 @@ void completePartition_with_owned_partition_and_existing_store_item_with_valid_o verifyNoInteractions(completePartitionUpdateErrorCounter); } else { doThrow(PartitionUpdateException.class).when(sourceCoordinationStore).tryUpdateSourcePartitionItem(sourcePartitionStoreItem); - assertThrows(PartitionUpdateException.class, () -> createObjectUnderTest().completePartition(sourcePartition.getPartitionKey())); + assertThrows(PartitionUpdateException.class, () -> createObjectUnderTest().completePartition(sourcePartition.getPartitionKey(), false)); verify(completePartitionUpdateErrorCounter).increment(); verifyNoInteractions(partitionsCompletedCounter); @@ -569,6 +569,27 @@ void completePartition_with_owned_partition_and_existing_store_item_with_valid_o closePartitionUpdateErrorCounter); } + @Test + void completePartition_with_fromAcknowledgmentCallback_true_does_not_interact_with_partition_manager() throws UnknownHostException { + final SourcePartition sourcePartition = SourcePartition.builder(String.class) + .withPartitionKey(UUID.randomUUID().toString()) + .withPartitionState(null) + .build(); + + given(sourcePartitionStoreItem.getPartitionOwner()).willReturn(sourceIdentifierWithPartitionPrefix + ":" + InetAddress.getLocalHost().getHostName()); + + given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForPartition, sourcePartition.getPartitionKey())).willReturn(Optional.of(sourcePartitionStoreItem)); + doNothing().when(sourceCoordinationStore).tryUpdateSourcePartitionItem(sourcePartitionStoreItem); + createObjectUnderTest().completePartition(sourcePartition.getPartitionKey(), true); + + verify(sourcePartitionStoreItem).setSourcePartitionStatus(SourcePartitionStatus.COMPLETED); + verify(sourcePartitionStoreItem).setReOpenAt(null); + verify(sourcePartitionStoreItem).setPartitionOwnershipTimeout(null); + verify(sourcePartitionStoreItem).setPartitionOwner(null); + + verifyNoInteractions(partitionManager); + } + @Test void closePartition_with_partitionKey_that_is_not_owned_throws_partition_not_owned_exception() { final SourcePartition sourcePartition = SourcePartition.builder(String.class) @@ -578,7 +599,7 @@ void closePartition_with_partitionKey_that_is_not_owned_throws_partition_not_own given(partitionManager.getActivePartition()).willReturn(Optional.of(sourcePartition)); - assertThrows(PartitionNotOwnedException.class, () -> createObjectUnderTest().closePartition(UUID.randomUUID().toString(), Duration.ofMinutes(2), 1)); + assertThrows(PartitionNotOwnedException.class, () -> createObjectUnderTest().closePartition(UUID.randomUUID().toString(), Duration.ofMinutes(2), 1, false)); verify(partitionNotOwnedErrorCounter).increment(); @@ -607,7 +628,7 @@ void closePartition_with_owned_partition_key_and_no_store_item_throws_PartitionN given(partitionManager.getActivePartition()).willReturn(Optional.of(sourcePartition)); given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForPartition, sourcePartition.getPartitionKey())).willReturn(Optional.empty()); - assertThrows(PartitionNotFoundException.class, () -> createObjectUnderTest().closePartition(sourcePartition.getPartitionKey(), Duration.ofMinutes(2), 1)); + assertThrows(PartitionNotFoundException.class, () -> createObjectUnderTest().closePartition(sourcePartition.getPartitionKey(), Duration.ofMinutes(2), 1, false)); verify(partitionNotFoundErrorCounter).increment(); @@ -638,7 +659,7 @@ void closePartition_with_owned_partition_key_and_existing_store_item_with_invali given(sourcePartitionStoreItem.getSourcePartitionKey()).willReturn(sourcePartition.getPartitionKey()); given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForPartition, sourcePartition.getPartitionKey())).willReturn(Optional.of(sourcePartitionStoreItem)); - assertThrows(PartitionNotOwnedException.class, () -> createObjectUnderTest().closePartition(sourcePartition.getPartitionKey(), Duration.ofMinutes(2), 1)); + assertThrows(PartitionNotOwnedException.class, () -> createObjectUnderTest().closePartition(sourcePartition.getPartitionKey(), Duration.ofMinutes(2), 1, false)); verify(partitionManager).removeActivePartition(); @@ -683,7 +704,7 @@ void closePartition_with_owned_partition_and_existing_store_item_with_valid_owne if (updatedItemSuccessfully) { doNothing().when(sourceCoordinationStore).tryUpdateSourcePartitionItem(sourcePartitionStoreItem); - createObjectUnderTest().closePartition(sourcePartition.getPartitionKey(), Duration.ofMinutes(2), maxClosedCount); + createObjectUnderTest().closePartition(sourcePartition.getPartitionKey(), Duration.ofMinutes(2), maxClosedCount, false); verify(sourcePartitionStoreItem).setPartitionOwnershipTimeout(null); verify(sourcePartitionStoreItem).setPartitionOwner(null); @@ -703,7 +724,7 @@ void closePartition_with_owned_partition_and_existing_store_item_with_valid_owne verify(partitionManager).removeActivePartition(); } else { doThrow(PartitionUpdateException.class).when(sourceCoordinationStore).tryUpdateSourcePartitionItem(sourcePartitionStoreItem); - assertThrows(PartitionUpdateException.class, () -> createObjectUnderTest().closePartition(sourcePartition.getPartitionKey(), Duration.ofMinutes(2), maxClosedCount)); + assertThrows(PartitionUpdateException.class, () -> createObjectUnderTest().closePartition(sourcePartition.getPartitionKey(), Duration.ofMinutes(2), maxClosedCount, false)); if (closedCount >= maxClosedCount) { verify(completePartitionUpdateErrorCounter).increment(); verifyNoInteractions(closePartitionUpdateErrorCounter); @@ -725,6 +746,30 @@ void closePartition_with_owned_partition_and_existing_store_item_with_valid_owne saveStatePartitionUpdateErrorCounter); } + @Test + void closePartition_with_fromAcknowledgmentCallback_true_does_not_interact_with_partition_manager() throws UnknownHostException { + final SourcePartition sourcePartition = SourcePartition.builder(String.class) + .withPartitionKey(UUID.randomUUID().toString()) + .withPartitionState(null) + .build(); + + final long closedCount = 1; + + given(sourcePartitionStoreItem.getPartitionOwner()).willReturn(sourceIdentifierWithPartitionPrefix + ":" + InetAddress.getLocalHost().getHostName()); + given(sourcePartitionStoreItem.getSourcePartitionStatus()).willReturn(SourcePartitionStatus.COMPLETED); + given(sourcePartitionStoreItem.getClosedCount()).willReturn(closedCount); + + given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForPartition, sourcePartition.getPartitionKey())).willReturn(Optional.of(sourcePartitionStoreItem)); + doNothing().when(sourceCoordinationStore).tryUpdateSourcePartitionItem(sourcePartitionStoreItem); + createObjectUnderTest().closePartition(sourcePartition.getPartitionKey(), Duration.ofMinutes(2), 1, true); + + verify(sourcePartitionStoreItem).setSourcePartitionStatus(SourcePartitionStatus.COMPLETED); + verify(sourcePartitionStoreItem).setPartitionOwnershipTimeout(null); + verify(sourcePartitionStoreItem).setPartitionOwner(null); + + verifyNoInteractions(partitionManager); + } + @Test void savePartitionProgressState_with_partitionKey_that_is_not_owned_throws_partition_not_owned_exception() { final SourcePartition sourcePartition = SourcePartition.builder(String.class) @@ -864,6 +909,32 @@ void saveProgressStateForPartition_with_owned_partition_and_existing_store_item_ completePartitionUpdateErrorCounter); } + @Test + void updatePartitionForAckWait_updates_partition_ownership_and_removes_active_partition_from_partition_manager() throws UnknownHostException { + final SourcePartition sourcePartition = SourcePartition.builder(String.class) + .withPartitionKey(UUID.randomUUID().toString()) + .withPartitionState(null) + .build(); + + final Instant beforeSave = Instant.now(); + + given(partitionManager.getActivePartition()).willReturn(Optional.of(sourcePartition)); + given(sourcePartitionStoreItem.getPartitionOwner()).willReturn(sourceIdentifierWithPartitionPrefix + ":" + InetAddress.getLocalHost().getHostName()); + given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForPartition, sourcePartition.getPartitionKey())).willReturn(Optional.of(sourcePartitionStoreItem)); + + doNothing().when(sourceCoordinationStore).tryUpdateSourcePartitionItem(sourcePartitionStoreItem); + + final Duration ackTimeout = Duration.ofSeconds(10); + createObjectUnderTest().updatePartitionForAcknowledgmentWait(sourcePartition.getPartitionKey(), ackTimeout); + + final ArgumentCaptor argumentCaptorForPartitionOwnershipTimeout = ArgumentCaptor.forClass(Instant.class); + verify(sourcePartitionStoreItem).setPartitionOwnershipTimeout(argumentCaptorForPartitionOwnershipTimeout.capture()); + final Instant newPartitionOwnershipTimeout = argumentCaptorForPartitionOwnershipTimeout.getValue(); + assertThat(newPartitionOwnershipTimeout.isAfter(beforeSave.plus(ackTimeout)), equalTo(true)); + + verify(partitionManager).removeActivePartition(); + } + @Test void giveUpPartitions_with_nonInitialized_store_does_nothing_and_returns() { final SourceCoordinator objectUnderTest = new LeaseBasedSourceCoordinator<>(String.class, sourceCoordinationStore, sourceCoordinationConfig, partitionManager, sourceIdentifierWithPartitionPrefix, pluginMetrics); diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java index 5db3e5dc87..121be0a0b5 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java @@ -5,7 +5,6 @@ package org.opensearch.dataprepper.plugins.source.opensearch.worker; -import org.apache.commons.lang3.tuple.Pair; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; @@ -30,7 +29,6 @@ import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.BACKOFF_ON_EXCEPTION; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.calculateExponentialBackoffAndJitter; @@ -90,15 +88,15 @@ public void run() { noAvailableIndicesCount = 0; try { - final Pair> acknowledgementSet = createAcknowledgmentSet( + final AcknowledgementSet acknowledgementSet = createAcknowledgmentSet( acknowledgementSetManager, openSearchSourceConfiguration, sourceCoordinator, indexPartition.get()); - openSearchSourcePluginMetrics.getIndexProcessingTimeTimer().record(() -> processIndex(indexPartition.get(), acknowledgementSet.getLeft())); + openSearchSourcePluginMetrics.getIndexProcessingTimeTimer().record(() -> processIndex(indexPartition.get(), acknowledgementSet)); - completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet.getLeft(), acknowledgementSet.getRight(), + completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet, indexPartition.get(), sourceCoordinator); openSearchSourcePluginMetrics.getIndicesProcessedCounter().increment(); @@ -108,7 +106,7 @@ public void run() { sourceCoordinator.giveUpPartitions(); } catch (final IndexNotFoundException e) { LOG.warn("{}, marking index as complete and continuing processing", e.getMessage()); - sourceCoordinator.completePartition(indexPartition.get().getPartitionKey()); + sourceCoordinator.completePartition(indexPartition.get().getPartitionKey(), false); } catch (final Exception e) { LOG.error("Unknown exception while processing index '{}', moving on to another index:", indexPartition.get().getPartitionKey(), e); sourceCoordinator.giveUpPartitions(); diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java index 6fff83dd73..ee49aee262 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java @@ -4,7 +4,6 @@ */ package org.opensearch.dataprepper.plugins.source.opensearch.worker; -import org.apache.commons.lang3.tuple.Pair; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; @@ -34,7 +33,6 @@ import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.BACKOFF_ON_EXCEPTION; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.calculateExponentialBackoffAndJitter; @@ -103,19 +101,18 @@ public void run() { try { - final Pair> acknowledgementSet = createAcknowledgmentSet( + final AcknowledgementSet acknowledgementSet = createAcknowledgmentSet( acknowledgementSetManager, openSearchSourceConfiguration, sourceCoordinator, indexPartition.get()); - openSearchSourcePluginMetrics.getIndexProcessingTimeTimer().record(() -> processIndex(indexPartition.get(), acknowledgementSet.getLeft())); + openSearchSourcePluginMetrics.getIndexProcessingTimeTimer().record(() -> processIndex(indexPartition.get(), acknowledgementSet)); - completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet.getLeft(), acknowledgementSet.getRight(), + completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet, indexPartition.get(), sourceCoordinator); openSearchSourcePluginMetrics.getIndicesProcessedCounter().increment(); - LOG.info("Completed processing for index: '{}'", indexPartition.get().getPartitionKey()); } catch (final PartitionUpdateException | PartitionNotFoundException | PartitionNotOwnedException e) { LOG.warn("PitWorker received an exception from the source coordinator. There is a potential for duplicate data for index {}, giving up partition and getting next partition: {}", indexPartition.get().getPartitionKey(), e.getMessage()); sourceCoordinator.giveUpPartitions(); @@ -131,7 +128,7 @@ public void run() { } } catch (final IndexNotFoundException e){ LOG.warn("{}, marking index as complete and continuing processing", e.getMessage()); - sourceCoordinator.completePartition(indexPartition.get().getPartitionKey()); + sourceCoordinator.completePartition(indexPartition.get().getPartitionKey(), false); } catch (final RuntimeException e) { LOG.error("Unknown exception while processing index '{}':", indexPartition.get().getPartitionKey(), e); sourceCoordinator.giveUpPartitions(); diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java index 0176e48356..ce34521205 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java @@ -4,7 +4,6 @@ */ package org.opensearch.dataprepper.plugins.source.opensearch.worker; -import org.apache.commons.lang3.tuple.Pair; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; @@ -33,7 +32,6 @@ import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.BACKOFF_ON_EXCEPTION; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.calculateExponentialBackoffAndJitter; @@ -98,19 +96,18 @@ public void run() { noAvailableIndicesCount = 0; try { - final Pair> acknowledgementSet = createAcknowledgmentSet( + final AcknowledgementSet acknowledgementSet = createAcknowledgmentSet( acknowledgementSetManager, openSearchSourceConfiguration, sourceCoordinator, indexPartition.get()); - openSearchSourcePluginMetrics.getIndexProcessingTimeTimer().record(() -> processIndex(indexPartition.get(), acknowledgementSet.getLeft())); + openSearchSourcePluginMetrics.getIndexProcessingTimeTimer().record(() -> processIndex(indexPartition.get(), acknowledgementSet)); - completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet.getLeft(), acknowledgementSet.getRight(), + completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet, indexPartition.get(), sourceCoordinator); openSearchSourcePluginMetrics.getIndicesProcessedCounter().increment(); - LOG.info("Completed processing for index: '{}'", indexPartition.get().getPartitionKey()); } catch (final PartitionUpdateException | PartitionNotFoundException | PartitionNotOwnedException e) { LOG.warn("ScrollWorker received an exception from the source coordinator. There is a potential for duplicate data for index {}, giving up partition and getting next partition: {}", indexPartition.get().getPartitionKey(), e.getMessage()); sourceCoordinator.giveUpPartitions(); @@ -126,7 +123,7 @@ public void run() { } } catch (final IndexNotFoundException e){ LOG.warn("{}, marking index as complete and continuing processing", e.getMessage()); - sourceCoordinator.completePartition(indexPartition.get().getPartitionKey()); + sourceCoordinator.completePartition(indexPartition.get().getPartitionKey(), false); } catch (final RuntimeException e) { LOG.error("Unknown exception while processing index '{}':", indexPartition.get().getPartitionKey(), e); sourceCoordinator.giveUpPartitions(); diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtils.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtils.java index 2069fcb023..2bde6c0370 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtils.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtils.java @@ -5,20 +5,17 @@ package org.opensearch.dataprepper.plugins.source.opensearch.worker; -import org.apache.commons.lang3.tuple.Pair; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; import org.opensearch.dataprepper.model.source.coordinator.SourcePartition; import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState; import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Random; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import static com.google.common.math.LongMath.pow; import static com.google.common.primitives.Longs.min; @@ -26,51 +23,53 @@ public class WorkerCommonUtils { private static final Random RANDOM = new Random(); + private static final Logger LOG = LoggerFactory.getLogger(WorkerCommonUtils.class); static final Duration BACKOFF_ON_EXCEPTION = Duration.ofSeconds(60); - static final int ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS = Integer.MAX_VALUE; + static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofHours(2); static final Duration STARTING_BACKOFF = Duration.ofMillis(500); static final Duration MAX_BACKOFF = Duration.ofSeconds(60); static final int BACKOFF_RATE = 2; static final Duration MAX_JITTER = Duration.ofSeconds(2); static final Duration MIN_JITTER = Duration.ofSeconds(-2); - static Pair> createAcknowledgmentSet(final AcknowledgementSetManager acknowledgementSetManager, + static AcknowledgementSet createAcknowledgmentSet(final AcknowledgementSetManager acknowledgementSetManager, final OpenSearchSourceConfiguration openSearchSourceConfiguration, final SourceCoordinator sourceCoordinator, final SourcePartition indexPartition) { AcknowledgementSet acknowledgementSet = null; - CompletableFuture completableFuture = new CompletableFuture<>(); - if (openSearchSourceConfiguration.isAcknowledgmentsEnabled()) { acknowledgementSet = acknowledgementSetManager.create((result) -> { if (result == true) { sourceCoordinator.closePartition( indexPartition.getPartitionKey(), openSearchSourceConfiguration.getSchedulingParameterConfiguration().getInterval(), - openSearchSourceConfiguration.getSchedulingParameterConfiguration().getIndexReadCount()); + openSearchSourceConfiguration.getSchedulingParameterConfiguration().getIndexReadCount(), + true); + + LOG.info("Received acknowledgment of completion from sink for index {}", indexPartition.getPartitionKey()); } - completableFuture.complete(result); - }, Duration.ofSeconds(ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS)); + }, ACKNOWLEDGEMENT_SET_TIMEOUT); } - return Pair.of(acknowledgementSet, completableFuture); + return acknowledgementSet; } static void completeIndexPartition(final OpenSearchSourceConfiguration openSearchSourceConfiguration, final AcknowledgementSet acknowledgementSet, - final CompletableFuture completableFuture, final SourcePartition indexPartition, - final SourceCoordinator sourceCoordinator) throws ExecutionException, InterruptedException, TimeoutException { + final SourceCoordinator sourceCoordinator) { if (openSearchSourceConfiguration.isAcknowledgmentsEnabled()) { + sourceCoordinator.updatePartitionForAcknowledgmentWait(indexPartition.getPartitionKey(), ACKNOWLEDGEMENT_SET_TIMEOUT); acknowledgementSet.complete(); - completableFuture.get(ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS, TimeUnit.SECONDS); } else { sourceCoordinator.closePartition( indexPartition.getPartitionKey(), openSearchSourceConfiguration.getSchedulingParameterConfiguration().getInterval(), - openSearchSourceConfiguration.getSchedulingParameterConfiguration().getIndexReadCount()); + openSearchSourceConfiguration.getSchedulingParameterConfiguration().getIndexReadCount(), + false); + LOG.info("Completed processing of index {}", indexPartition.getPartitionKey()); } } diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java index 24af77aeac..2397aa87b0 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java @@ -46,6 +46,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; @@ -58,7 +59,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; -import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS; +import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.ACKNOWLEDGEMENT_SET_TIMEOUT; @ExtendWith(MockitoExtension.class) public class NoSearchContextWorkerTest { @@ -153,8 +154,8 @@ void run_when_search_without_search_context_throws_index_not_found_exception_com assertThat(executorService.awaitTermination(100, TimeUnit.MILLISECONDS), equalTo(true)); - verify(sourceCoordinator).completePartition(partitionKey); - verify(sourceCoordinator, never()).closePartition(anyString(), any(Duration.class), anyInt()); + verify(sourceCoordinator).completePartition(partitionKey, false); + verify(sourceCoordinator, never()).closePartition(anyString(), any(Duration.class), anyInt(), anyBoolean()); verifyNoInteractions(documentsProcessedCounter); verifyNoInteractions(indicesProcessedCounter); @@ -193,7 +194,7 @@ void run_with_getNextPartition_with_non_empty_partition_processes_and_closes_tha when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration); doNothing().when(sourceCoordinator).closePartition(partitionKey, - Duration.ZERO, 1); + Duration.ZERO, 1, false); final Future future = executorService.submit(() -> createObjectUnderTest().run()); @@ -239,7 +240,7 @@ void run_with_getNextPartition_with_acknowledgments_processes_and_closes_that_pa Consumer consumer = invocation.getArgument(0); consumer.accept(true); return acknowledgementSet; - }).when(acknowledgementSetManager).create(any(Consumer.class), eq(Duration.ofSeconds(ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS))); + }).when(acknowledgementSetManager).create(any(Consumer.class), eq(ACKNOWLEDGEMENT_SET_TIMEOUT)); when(openSearchSourceConfiguration.isAcknowledgmentsEnabled()).thenReturn(true); final SourcePartition sourcePartition = mock(SourcePartition.class); @@ -270,7 +271,7 @@ void run_with_getNextPartition_with_acknowledgments_processes_and_closes_that_pa when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration); doNothing().when(sourceCoordinator).closePartition(partitionKey, - Duration.ZERO, 1); + Duration.ZERO, 1, true); final Future future = executorService.submit(() -> createObjectUnderTest().run()); diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java index 69a5ca6991..7784f7ddff 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java @@ -50,6 +50,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; @@ -64,7 +65,7 @@ import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.PitWorker.EXTEND_KEEP_ALIVE_TIME; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.PitWorker.STARTING_KEEP_ALIVE; -import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS; +import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.ACKNOWLEDGEMENT_SET_TIMEOUT; @ExtendWith(MockitoExtension.class) public class PitWorkerTest { @@ -173,7 +174,7 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_pit_ when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration); doNothing().when(sourceCoordinator).closePartition(partitionKey, - Duration.ZERO, 1); + Duration.ZERO, 1, false); final Future future = executorService.submit(() -> createObjectUnderTest().run()); @@ -233,7 +234,7 @@ void run_with_acknowledgments_enabled_creates_and_deletes_pit_and_closes_that_pa Consumer consumer = invocation.getArgument(0); consumer.accept(true); return acknowledgementSet; - }).when(acknowledgementSetManager).create(any(Consumer.class), eq(Duration.ofSeconds(ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS))); + }).when(acknowledgementSetManager).create(any(Consumer.class), eq(ACKNOWLEDGEMENT_SET_TIMEOUT)); when(openSearchSourceConfiguration.isAcknowledgmentsEnabled()).thenReturn(true); final SourcePartition sourcePartition = mock(SourcePartition.class); @@ -272,8 +273,9 @@ void run_with_acknowledgments_enabled_creates_and_deletes_pit_and_closes_that_pa when(schedulingParameterConfiguration.getInterval()).thenReturn(Duration.ZERO); when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration); + doNothing().when(sourceCoordinator).updatePartitionForAcknowledgmentWait(partitionKey, ACKNOWLEDGEMENT_SET_TIMEOUT); doNothing().when(sourceCoordinator).closePartition(partitionKey, - Duration.ZERO, 1); + Duration.ZERO, 1, true); final Future future = executorService.submit(() -> createObjectUnderTest().run()); @@ -359,7 +361,7 @@ void run_with_getNextPartition_with_valid_existing_point_in_time_does_not_create when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration); doNothing().when(sourceCoordinator).closePartition(partitionKey, - Duration.ZERO, 1); + Duration.ZERO, 1, false); final Future future = executorService.submit(() -> createObjectUnderTest().run()); @@ -377,6 +379,7 @@ void run_with_getNextPartition_with_valid_existing_point_in_time_does_not_create verify(searchAccessor, never()).createPit(any(CreatePointInTimeRequest.class)); verify(searchAccessor, times(2)).searchWithPit(any(SearchPointInTimeRequest.class)); verify(sourceCoordinator, times(2)).saveProgressStateForPartition(eq(partitionKey), eq(openSearchIndexProgressState)); + verify(sourceCoordinator, times(0)).updatePartitionForAcknowledgmentWait(anyString(), any(Duration.class)); verify(documentsProcessedCounter, times(3)).increment(); verify(indicesProcessedCounter).increment(); @@ -407,7 +410,7 @@ void run_gives_up_partitions_and_waits_when_createPit_throws_SearchContextLimitE verify(searchAccessor, never()).deletePit(any(DeletePointInTimeRequest.class)); verify(sourceCoordinator).giveUpPartitions(); - verify(sourceCoordinator, never()).closePartition(anyString(), any(Duration.class), anyInt()); + verify(sourceCoordinator, never()).closePartition(anyString(), any(Duration.class), anyInt(), anyBoolean()); verifyNoInteractions(documentsProcessedCounter); verifyNoInteractions(indicesProcessedCounter); @@ -438,8 +441,8 @@ void run_completes_partitions_when_createPit_throws_IndexNotFoundException() thr assertThat(executorService.awaitTermination(100, TimeUnit.MILLISECONDS), equalTo(true)); verify(searchAccessor, never()).deletePit(any(DeletePointInTimeRequest.class)); - verify(sourceCoordinator).completePartition(partitionKey); - verify(sourceCoordinator, never()).closePartition(anyString(), any(Duration.class), anyInt()); + verify(sourceCoordinator).completePartition(partitionKey, false); + verify(sourceCoordinator, never()).closePartition(anyString(), any(Duration.class), anyInt(), anyBoolean()); verifyNoInteractions(documentsProcessedCounter); verifyNoInteractions(indicesProcessedCounter); diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java index b458a26d1b..63f88c272c 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java @@ -49,6 +49,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; @@ -63,7 +64,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.ScrollWorker.SCROLL_TIME_PER_BATCH; -import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS; +import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.ACKNOWLEDGEMENT_SET_TIMEOUT; @ExtendWith(MockitoExtension.class) public class ScrollWorkerTest { @@ -172,7 +173,7 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_scro when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration); doNothing().when(sourceCoordinator).closePartition(partitionKey, - Duration.ZERO, 1); + Duration.ZERO, 1, false); final Future future = executorService.submit(() -> createObjectUnderTest().run()); @@ -227,7 +228,7 @@ void run_with_getNextPartition_with_acknowledgments_creates_and_deletes_scroll_a Consumer consumer = invocation.getArgument(0); consumer.accept(true); return acknowledgementSet; - }).when(acknowledgementSetManager).create(any(Consumer.class), eq(Duration.ofSeconds(ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS))); + }).when(acknowledgementSetManager).create(any(Consumer.class), eq(ACKNOWLEDGEMENT_SET_TIMEOUT)); when(openSearchSourceConfiguration.isAcknowledgmentsEnabled()).thenReturn(true); final SourcePartition sourcePartition = mock(SourcePartition.class); @@ -267,7 +268,7 @@ void run_with_getNextPartition_with_acknowledgments_creates_and_deletes_scroll_a when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration); doNothing().when(sourceCoordinator).closePartition(partitionKey, - Duration.ZERO, 1); + Duration.ZERO, 1, true); final Future future = executorService.submit(() -> createObjectUnderTest().run()); @@ -336,7 +337,7 @@ void run_gives_up_partitions_and_waits_when_createScroll_throws_SearchContextLim verifyNoMoreInteractions(searchAccessor); verify(sourceCoordinator).giveUpPartitions(); - verify(sourceCoordinator, never()).closePartition(anyString(), any(Duration.class), anyInt()); + verify(sourceCoordinator, never()).closePartition(anyString(), any(Duration.class), anyInt(), eq(false)); verifyNoInteractions(documentsProcessedCounter); verifyNoInteractions(indicesProcessedCounter); @@ -371,8 +372,8 @@ void run_completes_partitions_createScroll_throws_IndexNotFoundException() throw assertThat(executorService.awaitTermination(100, TimeUnit.MILLISECONDS), equalTo(true)); verifyNoMoreInteractions(searchAccessor); - verify(sourceCoordinator).completePartition(partitionKey); - verify(sourceCoordinator, never()).closePartition(anyString(), any(Duration.class), anyInt()); + verify(sourceCoordinator).completePartition(partitionKey, false); + verify(sourceCoordinator, never()).closePartition(anyString(), any(Duration.class), anyInt(), anyBoolean()); verifyNoInteractions(documentsProcessedCounter); verifyNoInteractions(indicesProcessedCounter); diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java index 149bc80e03..5fe8e98a0c 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java @@ -28,10 +28,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.function.Function; /** @@ -45,7 +41,7 @@ public class ScanObjectWorker implements Runnable{ private static final int STANDARD_BACKOFF_MILLIS = 30_000; private static final int RETRY_BACKOFF_ON_EXCEPTION_MILLIS = 5_000; - static final int ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS = Integer.MAX_VALUE; + static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofHours(2); static final String ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME = "acknowledgementSetCallbackCounter"; private final S3Client s3Client; @@ -141,18 +137,16 @@ private void startProcessingObject(final int waitTimeMillis) { try { List waitingForAcknowledgements = new ArrayList<>(); AcknowledgementSet acknowledgementSet = null; - CompletableFuture completableFuture = new CompletableFuture<>(); if (endToEndAcknowledgementsEnabled) { acknowledgementSet = acknowledgementSetManager.create((result) -> { acknowledgementSetCallbackCounter.increment(); // Delete only if this is positive acknowledgement if (result == true) { - sourceCoordinator.completePartition(objectToProcess.get().getPartitionKey()); + sourceCoordinator.completePartition(objectToProcess.get().getPartitionKey(), true); waitingForAcknowledgements.forEach(s3ObjectDeleteWorker::deleteS3Object); } - completableFuture.complete(result); - }, Duration.ofSeconds(ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS)); + }, ACKNOWLEDGEMENT_SET_TIMEOUT); } @@ -161,22 +155,18 @@ private void startProcessingObject(final int waitTimeMillis) { if (endToEndAcknowledgementsEnabled) { deleteObjectRequest.ifPresent(waitingForAcknowledgements::add); + sourceCoordinator.updatePartitionForAcknowledgmentWait(objectToProcess.get().getPartitionKey(), ACKNOWLEDGEMENT_SET_TIMEOUT); acknowledgementSet.complete(); - completableFuture.get(ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS, TimeUnit.SECONDS); } else { - sourceCoordinator.completePartition(objectToProcess.get().getPartitionKey()); + sourceCoordinator.completePartition(objectToProcess.get().getPartitionKey(), false); deleteObjectRequest.ifPresent(s3ObjectDeleteWorker::deleteS3Object); } } catch (final NoSuchKeyException e) { LOG.warn("Object {} from bucket {} could not be found, marking this object as complete and continuing processing", objectKey, bucket); - sourceCoordinator.completePartition(objectToProcess.get().getPartitionKey()); + sourceCoordinator.completePartition(objectToProcess.get().getPartitionKey(), false); } catch (final PartitionNotOwnedException | PartitionNotFoundException | PartitionUpdateException e) { LOG.warn("S3 scan object worker received an exception from the source coordinator. There is a potential for duplicate data from {}, giving up partition and getting next partition: {}", objectKey, e.getMessage()); sourceCoordinator.giveUpPartitions(); - } catch (final ExecutionException | TimeoutException e) { - LOG.error("Exception occurred while waiting for acknowledgement.", e); - } catch (final InterruptedException e) { - LOG.error("S3 Scan worker thread interrupted while processing S3 object.", e); } } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerTest.java index 31319f9925..3eba8d587d 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerTest.java @@ -48,11 +48,13 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker.ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME; +import static org.opensearch.dataprepper.plugins.source.s3.ScanObjectWorker.ACKNOWLEDGEMENT_SET_TIMEOUT; @ExtendWith(MockitoExtension.class) class S3ScanObjectWorkerTest { @@ -150,7 +152,7 @@ void partition_from_getNextPartition_is_processed_correctly() throws IOException final ArgumentCaptor objectReferenceArgumentCaptor = ArgumentCaptor.forClass(S3ObjectReference.class); doNothing().when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(null), eq(sourceCoordinator), eq(partitionKey)); - doNothing().when(sourceCoordinator).completePartition(anyString()); + doNothing().when(sourceCoordinator).completePartition(anyString(), eq(false)); createObjectUnderTest().runWithoutInfiniteLoop(); @@ -179,7 +181,7 @@ void buildDeleteObjectRequest_should_be_invoked_after_processing_when_deleteS3Ob final ArgumentCaptor objectReferenceArgumentCaptor = ArgumentCaptor.forClass(S3ObjectReference.class); doNothing().when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(acknowledgementSet), eq(sourceCoordinator), eq(partitionKey)); - doNothing().when(sourceCoordinator).completePartition(anyString()); + doNothing().when(sourceCoordinator).completePartition(anyString(), eq(true)); final ScanObjectWorker scanObjectWorker = createObjectUnderTest(); @@ -191,7 +193,8 @@ void buildDeleteObjectRequest_should_be_invoked_after_processing_when_deleteS3Ob scanObjectWorker.runWithoutInfiniteLoop(); - verify(sourceCoordinator).completePartition(partitionKey); + verify(sourceCoordinator).updatePartitionForAcknowledgmentWait(partitionKey, ACKNOWLEDGEMENT_SET_TIMEOUT); + verify(sourceCoordinator).completePartition(partitionKey, true); verify(s3ObjectDeleteWorker).buildDeleteObjectRequest(bucket, objectKey); verify(acknowledgementSet).complete(); verify(counter).increment(); @@ -220,13 +223,14 @@ void buildDeleteObjectRequest_should_not_be_invoked_after_processing_when_delete final ArgumentCaptor objectReferenceArgumentCaptor = ArgumentCaptor.forClass(S3ObjectReference.class); doNothing().when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(null), eq(sourceCoordinator), eq(partitionKey)); - doNothing().when(sourceCoordinator).completePartition(anyString()); + doNothing().when(sourceCoordinator).completePartition(anyString(), eq(false)); final ScanObjectWorker scanObjectWorker = createObjectUnderTest(); scanObjectWorker.runWithoutInfiniteLoop(); - verify(sourceCoordinator).completePartition(partitionKey); + verify(sourceCoordinator).completePartition(partitionKey, false); + verify(sourceCoordinator, times(0)).updatePartitionForAcknowledgmentWait(anyString(), any(Duration.class)); verifyNoInteractions(s3ObjectDeleteWorker); verifyNoInteractions(acknowledgementSetManager); @@ -254,14 +258,14 @@ void deleteS3Object_should_not_be_invoked_after_processing_when_deleteS3Objects_ final ArgumentCaptor objectReferenceArgumentCaptor = ArgumentCaptor.forClass(S3ObjectReference.class); doNothing().when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(null), eq(sourceCoordinator), eq(partitionKey)); - doNothing().when(sourceCoordinator).completePartition(anyString()); + doNothing().when(sourceCoordinator).completePartition(anyString(), eq(false)); final ScanObjectWorker scanObjectWorker = createObjectUnderTest(); scanObjectWorker.runWithoutInfiniteLoop(); verifyNoInteractions(acknowledgementSetManager); - verify(sourceCoordinator).completePartition(partitionKey); + verify(sourceCoordinator).completePartition(partitionKey, false); verifyNoInteractions(s3ObjectDeleteWorker); final S3ObjectReference processedObject = objectReferenceArgumentCaptor.getValue(); @@ -289,7 +293,7 @@ void partitionIsCompleted_when_NoObjectKeyException_is_thrown_from_process_objec final ArgumentCaptor objectReferenceArgumentCaptor = ArgumentCaptor.forClass(S3ObjectReference.class); doThrow(NoSuchKeyException.class).when(s3ObjectHandler).parseS3Object(objectReferenceArgumentCaptor.capture(), eq(null), eq(sourceCoordinator), eq(partitionKey)); - doNothing().when(sourceCoordinator).completePartition(partitionKey); + doNothing().when(sourceCoordinator).completePartition(partitionKey, false); createObjectUnderTest().runWithoutInfiniteLoop();