-
Notifications
You must be signed in to change notification settings - Fork 203
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Address multi-threading issues with AWS Lambda Plugin #5194
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Srikanth Govindarajan <[email protected]>
Signed-off-by: Srikanth Govindarajan <[email protected]>
Signed-off-by: Srikanth Govindarajan <[email protected]>
…mbda sink Signed-off-by: Srikanth Govindarajan <[email protected]>
@@ -17,6 +17,6 @@ | |||
|
|||
@Documented | |||
@Retention(RetentionPolicy.RUNTIME) | |||
@Target({ElementType.TYPE}) | |||
@Target({ElementType.CONSTRUCTOR, ElementType.TYPE}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this.
@@ -26,5 +31,6 @@ public void handleEvents(List<Event> parsedEvents, List<Record<Event>> originalR | |||
originalAcknowledgementSet.add(responseEvent); | |||
} | |||
} | |||
LOG.info("Successfully handled {} events in Aggregate response strategy", parsedEvents.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may be too many log statements
@@ -149,93 +155,136 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) { | |||
return records; | |||
} | |||
|
|||
//lambda mutates event | |||
List<Record<Event>> resultRecords = new ArrayList<>(); | |||
reentrantLock.lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You shouldn't need this. This means multiple threads become serialized.
List<Event> parsedEvents = new ArrayList<>(); | ||
InputStream inputStream = PayloadValidator.validateAndGetInputStream(payload); | ||
responseCodec.parse(inputStream, record -> parsedEvents.add(record.getData())); | ||
LOG.info("Parsed successfully"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make debug/trace. This would result in too much logging. The metrics should indicate this.
resultRecords.add(originalRecords.get(i)); | ||
} | ||
}catch (Exception e){ | ||
LOG.info("SRI ERRRRRRRRRROR",e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove try catch here. If it fails, we need to know. Catching is not correct.
if (whenCondition != null && !expressionEvaluator.evaluateConditional(whenCondition, event)) { | ||
resultRecords.add(record); | ||
continue; | ||
} | ||
try { | ||
if (currentBufferPerBatch.getEventCount() == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line and next few lines look same as processor code
Please rebase this PR and force push so that it doesn't have any conflicts. This will help make it clearer what we are reviewing. |
Description
Address multi-threading issues with AWS Lambda Plugin
Issues Resolved
Resolves #4700
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.