-
Notifications
You must be signed in to change notification settings - Fork 205
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for Update/Upsert/Delete operations in OpenSearch Sink #3424
Conversation
Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
@@ -74,6 +76,10 @@ private long estimateBulkOperationSize(BulkOperationWrapper bulkOperation) { | |||
if (anyDocument == null) | |||
return OPERATION_OVERHEAD; | |||
|
|||
if (anyDocument instanceof JsonNode) { | |||
return OPERATION_OVERHEAD + ((JsonNode)anyDocument).toString().length(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this point, we are serializing the JsonNode into a string. So is there any real gain in keeping the JsonNode
?
It seems we can simplify by using the string and then converting to a JsonNode
when needed for the update
condition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need getJsonNode() api to pass to the update operation. I don't think reconstructing it from Json String would be efficient. And we need toJsonString() to get the size. We can do different ways but I think we will need both these
|
||
if (StringUtils.equalsIgnoreCase(eventAction, BulkAction.CREATE.toString())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's require the exact case. Ignoring the case could lead to users errors and inconsistencies. It's also too easy for us to accidentally break support for variable cased input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. It is how the existing code is. Hopefully, it won't break if I make it exact case match.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. If that is the existing behavior, we can keep it for now. Maybe in 3.0 we can consolidate all to use only correct case.
bulkOperation = new BulkOperation.Builder() | ||
.create(createOperationBuilder.build()) | ||
.build(); | ||
String eventAction = action; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is significant logic. Can we pull it out into its own class? It will be easier to test this way and make this method more manageable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, will make it a different method.
Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
if (StringUtils.equals(action, BulkAction.UPDATE.toString()) || | ||
StringUtils.equals(action, BulkAction.UPSERT.toString())) { | ||
final UpdateOperation.Builder<Object> updateOperationBuilder = (action.toLowerCase() == BulkAction.UPSERT.toString()) ? | ||
new UpdateOperation.Builder<>() | ||
.index(indexName) | ||
.document(jsonNode) | ||
.upsert(jsonNode) : | ||
new UpdateOperation.Builder<>() | ||
.index(indexName) | ||
.document(jsonNode); | ||
docId.ifPresent(updateOperationBuilder::id); | ||
routing.ifPresent(updateOperationBuilder::routing); | ||
bulkOperation = new BulkOperation.Builder() | ||
.update(updateOperationBuilder.build()) | ||
.build(); | ||
return bulkOperation; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious did you do any manual testing of these 2? Just to verify UPSERT will create or update and update will updates certain fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added integration test cases. Please take a look and let me know.
public Builder withAction(final String action) { | ||
checkArgument(EnumUtils.isValidEnumIgnoreCase(BulkAction.class, action), "action must be one of the following: " + BulkAction.values()); | ||
// Removed validation because action may have expresions | ||
this.action = action; | ||
return this; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will be the user experience if the action that is passed is invalid? Can we at least validate that it is either a valid expression with (
Line 36 in d3a027a
Boolean isValidExpressionStatement(final String statement); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am doing it at the run time after evaluating the expression. See
if (BulkAction.fromOptionValue(eventAction) == null) {
LOG.error("Unknown action {}, skipping the event", eventAction);
invalidActionErrorsCounter.increment();
continue;
}
The isValidExpressionStatement
may not be accurate if the key field passed as /xyz
is seen as valid expression but there is no xyz
field. Do you think it is still a good idea to do config time validation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/xyz
, xyz
, etc. is not a valid expression currently. But even if it were it's still better to have the validation. we should do config time validation to check the following
private boolean isValidAction() {
return isValidExpression(action) || isValidEnum(action);
}
} | ||
|
||
@Test | ||
public void testBulkActionUpsertWithActions() throws IOException, InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add one test for upsert to verify that if document does not exist it is created? Looks like we only have a test for upsert with existing document
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
for (final Map<String, Object> actionMap: actions) { | ||
String action = (String)actionMap.get("type"); | ||
if (action != null) { | ||
checkArgument((EnumUtils.isValidEnumIgnoreCase(BulkAction.class, action) || JacksonEvent.isValidFormatExpressions(action, expressionEvaluator)), "action must be one of the following: " + BulkAction.values()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this! Would be nice to give a more direct exception for if there is an invalid expression vs some value that is not one of the enum values. Up to you if you think this is something to add
…3424) * Add support for Update/Upsert/Delete operations in OpenSearch Sink Signed-off-by: Krishna Kondaka <[email protected]> * Fixed tests and removed unused imports Signed-off-by: Krishna Kondaka <[email protected]> * Updated documentation Signed-off-by: Krishna Kondaka <[email protected]> * Added test cases to improve code coverage Signed-off-by: Krishna Kondaka <[email protected]> * Addressed review comments Signed-off-by: Krishna Kondaka <[email protected]> * Fixed check style errors Signed-off-by: Krishna Kondaka <[email protected]> * Added another test for upsert action without prior create action Signed-off-by: Krishna Kondaka <[email protected]> * Added check for valid action strings at config time Signed-off-by: Krishna Kondaka <[email protected]> --------- Signed-off-by: Krishna Kondaka <[email protected]> Co-authored-by: Krishna Kondaka <[email protected]> (cherry picked from commit 36b0b9c)
…3424) (#3446) * Add support for Update/Upsert/Delete operations in OpenSearch Sink Signed-off-by: Krishna Kondaka <[email protected]> * Fixed tests and removed unused imports Signed-off-by: Krishna Kondaka <[email protected]> * Updated documentation Signed-off-by: Krishna Kondaka <[email protected]> * Added test cases to improve code coverage Signed-off-by: Krishna Kondaka <[email protected]> * Addressed review comments Signed-off-by: Krishna Kondaka <[email protected]> * Fixed check style errors Signed-off-by: Krishna Kondaka <[email protected]> * Added another test for upsert action without prior create action Signed-off-by: Krishna Kondaka <[email protected]> * Added check for valid action strings at config time Signed-off-by: Krishna Kondaka <[email protected]> --------- Signed-off-by: Krishna Kondaka <[email protected]> Co-authored-by: Krishna Kondaka <[email protected]> (cherry picked from commit 36b0b9c) Co-authored-by: kkondaka <[email protected]>
Description
Add support for Update/Upsert/Delete operations in OpenSearch Sink
Actions can be specified with type = Create/Index/Update/Upsert/Delete and with condition
Adds support for expressions in "action" or actions "type".
Resolves #3109
Issues Resolved
Resolves #3109
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.