Skip to content

Commit

Permalink
Fix memory leak of command inputs (#2262)
Browse files Browse the repository at this point in the history
Fix memory leak of command inputs
  • Loading branch information
Quinn-With-Two-Ns authored Oct 15, 2024
1 parent 92692b4 commit af7b5b7
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ public void createScheduleActivityTaskCommand() {
.setCommandType(CommandType.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK)
.setScheduleActivityTaskCommandAttributes(parameters.getAttributes())
.build());
parameters = null; // avoiding retaining large input for the duration of the activity
}

private void setStartedCommandEventId() {
Expand Down Expand Up @@ -455,7 +456,6 @@ private void createRequestCancelActivityTaskCommand() {
RequestCancelActivityTaskCommandAttributes.newBuilder()
.setScheduledEventId(getInitialCommandEventId()))
.build());
parameters = null; // avoid retaining large input for the duration of the activity
}

public static class FailureResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@

package io.temporal.internal.statemachines;

import static io.temporal.internal.common.WorkflowExecutionUtils.isCommandEvent;

import io.temporal.api.command.v1.Command;
import io.temporal.api.enums.v1.CommandType;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.workflow.Functions;
import javax.annotation.Nullable;

class EntityStateMachineInitialCommand<State, ExplicitEvent, Data>
extends EntityStateMachineBase<State, ExplicitEvent, Data> {

private CancellableCommand command;

private long initialCommandEventId;

public EntityStateMachineInitialCommand(
Expand All @@ -56,7 +57,17 @@ protected final void addCommand(Command command) {
}

protected final void cancelCommand() {
command.cancel();
if (command != null) {
command.cancel();
}
}

public WorkflowStateMachines.HandleEventStatus handleEvent(
HistoryEvent event, boolean hasNextEvent) {
if (isCommandEvent(event)) {
command = null;
}
return super.handleEvent(event, hasNextEvent);
}

protected long getInitialCommandEventId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,16 +278,20 @@ public <T> ActivityOutput<T> executeActivity(ActivityInput<T> input) {
ActivityOutput<Optional<Payloads>> output =
executeActivityOnce(input.getActivityName(), input.getOptions(), input.getHeader(), args);

// Avoid passing the input to the output handle as it causes the input to be retained for the
// duration of the operation.
Type resultType = input.getResultType();
Class<T> resultClass = input.getResultClass();
return new ActivityOutput<>(
output.getActivityId(),
output
.getResult()
.handle(
(r, f) -> {
if (f == null) {
return input.getResultType() != Void.TYPE
return resultType != Void.TYPE
? dataConverterWithActivityContext.fromPayloads(
0, r, input.getResultClass(), input.getResultType())
0, r, resultClass, resultType)
: null;
} else {
throw dataConverterWithActivityContext.failureToException(
Expand Down Expand Up @@ -412,13 +416,16 @@ public <R> LocalActivityOutput<R> executeLocalActivity(LocalActivityInput<R> inp
null,
serializedResult);

// Avoid passing the input to the output handle as it causes the input to be retained for the
// duration of the operation.
Type resultType = input.getResultType();
Class<R> resultClass = input.getResultClass();
Promise<R> result =
serializedResult.handle(
(r, f) -> {
if (f == null) {
return input.getResultClass() != Void.TYPE
? dataConverterWithActivityContext.fromPayloads(
0, r, input.getResultClass(), input.getResultType())
return resultClass != Void.TYPE
? dataConverterWithActivityContext.fromPayloads(0, r, resultClass, resultType)
: null;
} else {
throw dataConverterWithActivityContext.failureToException(
Expand Down Expand Up @@ -708,11 +715,14 @@ public <R> ChildWorkflowOutput<R> executeChildWorkflow(ChildWorkflowInput<R> inp
return null;
});

// Avoid passing the input to the output handle as it causes the input to be retained for the
// duration of the operation.
Type resultType = input.getResultType();
Class<R> resultClass = input.getResultClass();
Promise<R> result =
resultPromise.thenApply(
(b) ->
dataConverterWithChildWorkflowContext.fromPayloads(
0, b, input.getResultClass(), input.getResultType()));
dataConverterWithChildWorkflowContext.fromPayloads(0, b, resultClass, resultType));
return new ChildWorkflowOutput<>(result, executionPromise);
}

Expand Down

0 comments on commit af7b5b7

Please sign in to comment.