Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for fully async acknowledgments in source coordination #3384

Merged

Conversation

graytaylor0
Copy link
Member

@graytaylor0 graytaylor0 commented Sep 25, 2023

Description

This changes adds a new function to the source coordinator interface

updatePartitionForAckWait(final String partitionKey, final Duration acknowledgmentTimeout)

This method will give ownership of acknowledgmentTimeout amount for the acknowledgment to be received. If the acknowledgment for the partition is not received in time, then another instance of Data Prepper will attempt to process it from the beginning. Calling the updatePartitionForAckWait is also necessary to allow the current node to pick up a new partition to process immediately.

Also added a paramter fromAcknowledgmentCallback to close/completePartition. Only the callback functions should set this to true when the partition is closed or completed by the callback.

  • Implemented the use of this change in both s3 scan and the opensearch source to have acknowledgment timeout of 2 hours (we could consider making this configurable in the future, but given that we have infinite timeout in Kafka, I was hoping we would consolidate on the user experience for acknowledgments timeout first) Instead of waiting to receive the acknowledgments from the sink, it will now be done completely asynchronously, meaning the source will continuously pick up the next partition and write it to the buffer.

Issues Resolved

Resolves #3381

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

* @param ackowledgmentTimeout - the amount of time that this partition can be completed by the acknowledgment callback before another instance of Data Prepper
* can pick it up for processing
*/
void updatePartitionForAckWait(final String partitionKey, final Duration ackowledgmentTimeout);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use full words in interface names. updatePartitionForAcknowledgementWait.

validateIsInitialized();

final SourcePartitionStoreItem itemToUpdate = validateAndGetSourcePartitionStoreItem(partitionKey, "complete");
final SourcePartitionStoreItem itemToUpdate = fromAcknowledgmentsCallback ? getSourcePartitionStoreItem(partitionKey, "complete") : validateAndGetSourcePartitionStoreItem(partitionKey, "complete");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make "complete" a constant now that you are using it twice.

* @since 2.2
*/
void completePartition(final String partitionKey);
void completePartition(final String partitionKey, final Boolean fromAcknowledgmentsCallback);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an object we can use here instead? Is there any acknowledgements handle of any sort? This would be ideal so that if data is needed from there in the future, we don't have to make yet another change to the interface.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's anything that makes sense to pass for acknowledgments other than the boolean. As far as objects in requests, it could make sense to keep the model class the same with

void completePartition(final CompletePartitionRequest)

and to do that with the rest of the SourceCoordinator methods as well. I'm not sure if that's worth a large refactor though

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only suggested it if there is an existing model. I'm good keeping this as it is.

validateIsInitialized();

final SourcePartitionStoreItem itemToUpdate = validateAndGetSourcePartitionStoreItem(partitionKey, "close");
final SourcePartitionStoreItem itemToUpdate = fromAcknowledgmentsCallback ? getSourcePartitionStoreItem(partitionKey, "close") : validateAndGetSourcePartitionStoreItem(partitionKey, "close");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is very similar to what you have above in another method. It would be best to move this into a private method so that future maintainers/developers don't miss the other location when modifying the code.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment on why validation is not needed if this is called from acknowledgement callback?

Signed-off-by: Taylor Gray <[email protected]>
@graytaylor0 graytaylor0 merged commit 6546901 into opensearch-project:main Sep 26, 2023
2 checks passed
@opensearch-trigger-bot
Copy link
Contributor

The backport to 2.4 failed:

The process '/usr/bin/git' failed with exit code 1

To backport manually, run these commands in your terminal:

# Fetch latest updates from GitHub
git fetch
# Create a new working tree
git worktree add .worktrees/backport-2.4 2.4
# Navigate to the new working tree
cd .worktrees/backport-2.4
# Create a new branch
git switch --create backport/backport-3384-to-2.4
# Cherry-pick the merged commit of this pull request and resolve the conflicts
git cherry-pick -x --mainline 1 6546901a8492500adda6d4fcfbc7d7f014734d47
# Push it to GitHub
git push --set-upstream origin backport/backport-3384-to-2.4
# Go back to the original working tree
cd ../..
# Delete the working tree
git worktree remove .worktrees/backport-2.4

Then, create a pull request where the base branch is 2.4 and the compare/head branch is backport/backport-3384-to-2.4.

graytaylor0 added a commit to graytaylor0/data-prepper that referenced this pull request Sep 27, 2023
asifsmohammed pushed a commit to asifsmohammed/data-prepper that referenced this pull request Sep 27, 2023
graytaylor0 added a commit that referenced this pull request Sep 28, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

S3 Scan with acknowledgments waits for acknowledgment before processing another object
3 participants