Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for fully async acknowledgments in source coordination #3384

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,7 @@ public Long getLongOrDefault(final String attribute, final long defaultValue) {
throw new IllegalArgumentException(String.format(UNEXPECTED_ATTRIBUTE_TYPE_MSG, object.getClass(), attribute));
}

private <T> void checkObjectType(final String attribute, final Object object, final Class<T> type) {
if (object != null && !(type.isAssignableFrom(object.getClass()))){
private <T> void checkObjectType(final String attribute, final Object object, final Class<T> type) {if (object != null && !(type.isAssignableFrom(object.getClass()))){
throw new IllegalArgumentException(String.format(UNEXPECTED_ATTRIBUTE_TYPE_MSG, object.getClass(), attribute));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public interface Event extends Serializable {
* Returns formatted parts of the input string replaced by their values in the event or the values from the result
* of a Data Prepper expression
* @param format input format
* @param expressionEvaluator - The expression evaluator that will support formatting from Data Prepper expressions
* @return returns a string with no formatted parts, returns null if no value is found
* @throws RuntimeException if the input string is not properly formatted
* @since 2.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public interface SourceCoordinator<T> {
* If the global state map is not needed, then it can be ignored. Updating the global state map will save it, so the next time the supplier function is run,
* it will contain the most recent state from the previous supplier function run.
* @return {@link SourcePartition} with the details about how to process this partition. Will repeatedly return the partition until
* {@link SourceCoordinator#completePartition(String)}
* or {@link SourceCoordinator#closePartition(String, Duration, int)} are called by the source,
* {@link SourceCoordinator#completePartition(String, Boolean)}
* or {@link SourceCoordinator#closePartition(String, Duration, int, Boolean)} are called by the source,
* or until the partition ownership times out.
* @since 2.2
*/
Expand All @@ -49,9 +49,10 @@ public interface SourceCoordinator<T> {
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotOwnedException if the partition is not owned by this instance of SourceCoordinator
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException if the partition can not be completed
* @param partitionKey - The partition key that uniquely identifies the partition of work that was fully processed
* @param fromAcknowledgmentsCallback - Whether this method is being called from an acknowledgment callback or not
* @since 2.2
*/
void completePartition(final String partitionKey);
void completePartition(final String partitionKey, final Boolean fromAcknowledgmentsCallback);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an object we can use here instead? Is there any acknowledgements handle of any sort? This would be ideal so that if data is needed from there in the future, we don't have to make yet another change to the interface.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's anything that makes sense to pass for acknowledgments other than the boolean. As far as objects in requests, it could make sense to keep the model class the same with

void completePartition(final CompletePartitionRequest)

and to do that with the rest of the SourceCoordinator methods as well. I'm not sure if that's worth a large refactor though

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only suggested it if there is an existing model. I'm good keeping this as it is.


/**
* Should be called by the source when it has processed all that it can up to this point in time for a given partition,
Expand All @@ -62,12 +63,13 @@ public interface SourceCoordinator<T> {
* @param reopenAfter - The duration from the current time to wait before this partition should be processed further at a later date
* @param maxClosedCount - The number of times to allow this partition to be closed. Will mark the partition as completed if the partition has been closed this many times or more
* in the past
* @param fromAcknowledgmentsCallback - Whether this method is being called from an acknowledgment callback or not
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotFoundException if the partition key could not be found in the distributed store
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotOwnedException if the partition is not owned by this instance of SourceCoordinator
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException if the partition can not be closed
* @since 2.2
*/
void closePartition(final String partitionKey, final Duration reopenAfter, final int maxClosedCount);
void closePartition(final String partitionKey, final Duration reopenAfter, final int maxClosedCount, final Boolean fromAcknowledgmentsCallback);

/**
* Should be called by the source when it has completed some work for a partition, and needs to save its progress before continuing work on the partition.
Expand All @@ -94,4 +96,14 @@ public interface SourceCoordinator<T> {
* @since 2.2
*/
void giveUpPartitions();


/**
* Should be called by the source after when acknowledgments are enabled to keep ownership of the partition for acknowledgmentTimeout amount of time
* before another instance of Data Prepper can pick it up for processing. Allows the source to acquire another partition immediately for processing
* @param partitionKey - the partition to update for ack timeout
* @param ackowledgmentTimeout - the amount of time that this partition can be completed by the acknowledgment callback before another instance of Data Prepper
* can pick it up for processing
*/
void updatePartitionForAcknowledgmentWait(final String partitionKey, final Duration ackowledgmentTimeout);
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public class LeaseBasedSourceCoordinator<T> implements SourceCoordinator<T> {
private static final Logger LOG = LoggerFactory.getLogger(LeaseBasedSourceCoordinator.class);
private static final ObjectMapper objectMapper = new ObjectMapper();

private static final String COMPLETE_ACTION = "complete";
private static final String CLOSE_ACTION = "close";
private static final String SAVE_STATE_ACTION = "saveState";

static final String PARTITION_CREATION_SUPPLIER_INVOCATION_COUNT = "partitionCreationSupplierInvocations";
static final String NO_PARTITIONS_ACQUIRED_COUNT = "noPartitionsAcquired";
static final String PARTITION_CREATED_COUNT = "partitionsCreatedCount";
Expand Down Expand Up @@ -121,9 +125,9 @@ public LeaseBasedSourceCoordinator(final Class<T> partitionProgressStateClass,
this.partitionsGivenUpCounter = pluginMetrics.counter(PARTITION_OWNERSHIP_GIVEN_UP_COUNT);
this.partitionNotFoundErrorCounter = pluginMetrics.counter(PARTITION_NOT_FOUND_ERROR_COUNT);
this.partitionNotOwnedErrorCounter = pluginMetrics.counter(PARTITION_NOT_OWNED_ERROR_COUNT);
this.saveStatePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, "saveState");
this.closePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, "close");
this.completePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, "complete");
this.saveStatePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, SAVE_STATE_ACTION);
this.closePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, CLOSE_ACTION);
this.completePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, COMPLETE_ACTION);
}

@Override
Expand Down Expand Up @@ -202,10 +206,10 @@ private void createPartitions(final List<PartitionIdentifier> partitionIdentifie
}

@Override
public void completePartition(final String partitionKey) {
public void completePartition(final String partitionKey, final Boolean fromAcknowledgmentsCallback) {
validateIsInitialized();

final SourcePartitionStoreItem itemToUpdate = validateAndGetSourcePartitionStoreItem(partitionKey, "complete");
final SourcePartitionStoreItem itemToUpdate = getItemWithAction(partitionKey, COMPLETE_ACTION, fromAcknowledgmentsCallback);
validatePartitionOwnership(itemToUpdate);

itemToUpdate.setPartitionOwner(null);
Expand All @@ -220,17 +224,19 @@ public void completePartition(final String partitionKey) {
throw e;
}

partitionManager.removeActivePartition();
if (!fromAcknowledgmentsCallback) {
partitionManager.removeActivePartition();
}

LOG.info("Partition key {} was completed by owner {}.", partitionKey, ownerId);
partitionsCompletedCounter.increment();
}

@Override
public void closePartition(final String partitionKey, final Duration reopenAfter, final int maxClosedCount) {
public void closePartition(final String partitionKey, final Duration reopenAfter, final int maxClosedCount, final Boolean fromAcknowledgmentsCallback) {
validateIsInitialized();

final SourcePartitionStoreItem itemToUpdate = validateAndGetSourcePartitionStoreItem(partitionKey, "close");
final SourcePartitionStoreItem itemToUpdate = getItemWithAction(partitionKey, CLOSE_ACTION, fromAcknowledgmentsCallback);
validatePartitionOwnership(itemToUpdate);

itemToUpdate.setPartitionOwner(null);
Expand Down Expand Up @@ -262,7 +268,9 @@ public void closePartition(final String partitionKey, final Duration reopenAfter
partitionsClosedCounter.increment();
}

partitionManager.removeActivePartition();
if (!fromAcknowledgmentsCallback) {
partitionManager.removeActivePartition();
}

LOG.info("Partition key {} was closed by owner {}. The resulting status of the partition is now {}", partitionKey, ownerId, itemToUpdate.getSourcePartitionStatus());
}
Expand All @@ -271,7 +279,7 @@ public void closePartition(final String partitionKey, final Duration reopenAfter
public <S extends T> void saveProgressStateForPartition(final String partitionKey, final S partitionProgressState) {
validateIsInitialized();

final SourcePartitionStoreItem itemToUpdate = validateAndGetSourcePartitionStoreItem(partitionKey, "save state");
final SourcePartitionStoreItem itemToUpdate = validateAndGetSourcePartitionStoreItem(partitionKey, SAVE_STATE_ACTION);
validatePartitionOwnership(itemToUpdate);

itemToUpdate.setPartitionOwnershipTimeout(Instant.now().plus(DEFAULT_LEASE_TIMEOUT));
Expand All @@ -290,6 +298,20 @@ public <S extends T> void saveProgressStateForPartition(final String partitionKe
saveProgressStateInvocationSuccessCounter.increment();
}

@Override
public void updatePartitionForAcknowledgmentWait(final String partitionKey, final Duration ackowledgmentTimeout) {
validateIsInitialized();

final SourcePartitionStoreItem itemToUpdate = validateAndGetSourcePartitionStoreItem(partitionKey, "update for ack wait");
validatePartitionOwnership(itemToUpdate);

itemToUpdate.setPartitionOwnershipTimeout(Instant.now().plus(ackowledgmentTimeout));

sourceCoordinationStore.tryUpdateSourcePartitionItem(itemToUpdate);

partitionManager.removeActivePartition();
}

@Override
public void giveUpPartitions() {

Expand Down Expand Up @@ -380,6 +402,10 @@ private SourcePartitionStoreItem validateAndGetSourcePartitionStoreItem(final St
);
}

return getSourcePartitionStoreItem(partitionKey, action);
}

private SourcePartitionStoreItem getSourcePartitionStoreItem(final String partitionKey, final String action) {
final Optional<SourcePartitionStoreItem> optionalPartitionItem = sourceCoordinationStore.getSourcePartitionItem(sourceIdentifierWithPartitionType, partitionKey);

if (optionalPartitionItem.isEmpty()) {
Expand Down Expand Up @@ -441,4 +467,10 @@ private void giveUpAndSaveGlobalStateForPartitionCreation(final SourcePartitionS
LOG.warn("There was an error saving global state after creating partitions.", e);
}
}

private SourcePartitionStoreItem getItemWithAction(final String partitionKey, final String action, final Boolean fromAcknowledgmentsCallback) {
// The validation against activePartition in partition manager needs to be skipped when called from acknowledgments callback
// because otherwise it will fail the validation since it is actively working on a different partition when ack is received
return fromAcknowledgmentsCallback ? getSourcePartitionStoreItem(partitionKey, action) : validateAndGetSourcePartitionStoreItem(partitionKey, action);
}
}
Loading
Loading