Skip to content

Commit

Permalink
[BUG] Handle Potential Indefinite Propeller Update Loops (#4755)
Browse files Browse the repository at this point in the history
* keep terminal phase on retry if already in terminal phase and updating CRD fails due to ErrWorkflowToLarge

Signed-off-by: Paul Dittamo <[email protected]>

* update error message

Signed-off-by: Paul Dittamo <[email protected]>

* ensure no finalizers set on retrying terminal wf update

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>
  • Loading branch information
pvditt authored Jan 23, 2024
1 parent 73b1d40 commit 70d9c6f
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 6 deletions.
23 changes: 18 additions & 5 deletions flytepropeller/pkg/controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,11 +385,24 @@ func (p *Propeller) streak(ctx context.Context, w *v1alpha1.FlyteWorkflow, wfClo
// Workflow is too large, we will mark the workflow as failing and record it. This will automatically
// propagate the failure in the next round.
mutableW := w.DeepCopy()
mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "Workflow size has breached threshold, aborting", &core.ExecutionError{
Kind: core.ExecutionError_SYSTEM,
Code: "WorkflowTooLarge",
Message: "Workflow execution state is too large for Flyte to handle.",
})
// catch potential indefinite update loop
if mutatedWf.GetExecutionStatus().IsTerminated() {
ResetFinalizers(mutableW)
SetDefinitionVersionIfEmpty(mutableW, v1alpha1.LatestWorkflowDefinitionVersion)
SetCompletedLabel(mutableW, time.Now())
msg := fmt.Sprintf("Workflow size has breached threshold. Finalized with status: %v", mutatedWf.GetExecutionStatus().GetPhase())
mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailed, msg, &core.ExecutionError{
Kind: core.ExecutionError_SYSTEM,
Code: "WorkflowTooLarge",
Message: "Workflow execution state is too large for Flyte to handle.",
})
} else {
mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "Workflow size has breached threshold, aborting", &core.ExecutionError{
Kind: core.ExecutionError_SYSTEM,
Code: "WorkflowTooLarge",
Message: "Workflow execution state is too large for Flyte to handle.",
})
}
if _, e := p.wfStore.Update(ctx, mutableW, workflowstore.PriorityClassCritical); e != nil {
logger.Errorf(ctx, "Failed recording a large workflow as failed, reason: %s. Retrying...", e)
return nil, e
Expand Down
30 changes: 29 additions & 1 deletion flytepropeller/pkg/controller/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,8 +815,36 @@ func TestNewPropellerHandler_UpdateFailure(t *testing.T) {
}
s.OnGetMatch(mock.Anything, mock.Anything, mock.Anything).Return(wf, nil)
s.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(workflowstore.ErrWorkflowToLarge, "too large")).Once()
s.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil).Once()
s.On("Update", mock.Anything, mock.MatchedBy(func(w *v1alpha1.FlyteWorkflow) bool {
return w.Status.Phase == v1alpha1.WorkflowPhaseFailing
}), mock.Anything).Return(nil, nil).Once()
err := p.Handle(ctx, namespace, name)
assert.NoError(t, err)
})

t.Run("too-large-terminal", func(t *testing.T) {
scope := promutils.NewTestScope()
s := &mocks.FlyteWorkflow{}
exec := &mockExecutor{}
p := NewPropellerHandler(ctx, cfg, nil, s, exec, scope)
wf := &v1alpha1.FlyteWorkflow{
ObjectMeta: v1.ObjectMeta{
Name: name,
Namespace: namespace,
},
WorkflowSpec: &v1alpha1.WorkflowSpec{
ID: "w1",
},
}
exec.HandleCb = func(ctx context.Context, w *v1alpha1.FlyteWorkflow) error {
w.GetExecutionStatus().UpdatePhase(v1alpha1.WorkflowPhaseFailed, "done", nil)
return nil
}
s.OnGetMatch(mock.Anything, mock.Anything, mock.Anything).Return(wf, nil)
s.On("Update", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(workflowstore.ErrWorkflowToLarge, "too large")).Once()
s.On("Update", mock.Anything, mock.MatchedBy(func(w *v1alpha1.FlyteWorkflow) bool {
return w.Status.Phase == v1alpha1.WorkflowPhaseFailed && !HasFinalizer(w) && HasCompletedLabel(w)
}), mock.Anything).Return(nil, nil).Once()
err := p.Handle(ctx, namespace, name)
assert.NoError(t, err)
})
Expand Down

0 comments on commit 70d9c6f

Please sign in to comment.