Skip to content

Commit

Permalink
Add an error for file size exceeded to prevent system retries (#5725)
Browse files Browse the repository at this point in the history
Signed-off-by: Yee Hing Tong <[email protected]>
  • Loading branch information
wild-endeavor authored Oct 15, 2024
1 parent f2a0619 commit 66391ff
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type RemoteFileOutputReader struct {
maxPayloadSize int64
}

var ErrRemoteFileExceedsMaxSize = errors.New("remote file exceeds max size")

func (r RemoteFileOutputReader) IsError(ctx context.Context) (bool, error) {
metadata, err := r.store.Head(ctx, r.outPath.GetErrorPath())
if err != nil {
Expand Down Expand Up @@ -81,7 +83,7 @@ func (r RemoteFileOutputReader) Exists(ctx context.Context) (bool, error) {
}
if md.Exists() {
if md.Size() > r.maxPayloadSize {
return false, errors.Errorf("output file @[%s] is too large [%d] bytes, max allowed [%d] bytes", r.outPath.GetOutputPath(), md.Size(), r.maxPayloadSize)
return false, errors.Wrapf(ErrRemoteFileExceedsMaxSize, "output file @[%s] is too large [%d] bytes, max allowed [%d] bytes", r.outPath.GetOutputPath(), md.Size(), r.maxPayloadSize)
}
return true, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"testing"

regErrors "github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

Expand Down Expand Up @@ -36,6 +37,32 @@ func (m MemoryMetadata) Etag() string {
return m.etag
}

func TestExistsTooBig(t *testing.T) {
ctx := context.TODO()
opath := &pluginsIOMock.OutputFilePaths{}
opath.OnGetErrorPath().Return("")
deckPath := "some.file"
opath.OnGetOutputPath().Return(storage.DataReference(deckPath))

t.Run("too large", func(t *testing.T) {
store := &storageMocks.ComposedProtobufStore{}
store.OnHead(ctx, "some.file").Return(MemoryMetadata{
exists: true,
size: 2,
}, nil)

r := RemoteFileOutputReader{
outPath: opath,
store: store,
maxPayloadSize: 1,
}

_, err := r.Exists(ctx)
assert.Error(t, err)
assert.True(t, regErrors.Is(err, ErrRemoteFileExceedsMaxSize))
})
}

func TestReadOrigin(t *testing.T) {
ctx := context.TODO()

Expand Down
9 changes: 9 additions & 0 deletions flytepropeller/pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,15 @@ func (t *Handler) ValidateOutput(ctx context.Context, nodeID v1alpha1.NodeID, i
}
ok, err := r.Exists(ctx)
if err != nil {
if regErrors.Is(err, ioutils.ErrRemoteFileExceedsMaxSize) {
return &io.ExecutionError{
ExecutionError: &core.ExecutionError{
Code: "OutputSizeExceeded",
Message: fmt.Sprintf("Remote output size exceeds max, err: [%s]", err.Error()),
},
IsRecoverable: false,
}, nil
}
logger.Errorf(ctx, "Failed to check if the output file exists. Error: %s", err.Error())
return nil, err
}
Expand Down
39 changes: 39 additions & 0 deletions flytepropeller/pkg/controller/nodes/task/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -1270,3 +1271,41 @@ func init() {
labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey,
contextutils.TaskIDKey)
}

func Test_task_Handle_ValidateOutputErr(t *testing.T) {
ctx := context.TODO()
nodeID := "n1"
execConfig := v1alpha1.ExecutionConfig{}

tk := &core.TaskTemplate{
Id: &core.Identifier{ResourceType: core.ResourceType_TASK, Project: "proj", Domain: "dom", Version: "ver"},
Type: "test",
Interface: &core.TypedInterface{
Outputs: &core.VariableMap{
Variables: map[string]*core.Variable{
"x": {
Type: &core.LiteralType{
Type: &core.LiteralType_Simple{
Simple: core.SimpleType_BOOLEAN,
},
},
},
},
},
},
}
taskID := &core.Identifier{}
tr := &nodeMocks.TaskReader{}
tr.OnGetTaskID().Return(taskID)
tr.OnReadMatch(mock.Anything).Return(tk, nil)

expectedErr := errors.Wrapf(ioutils.ErrRemoteFileExceedsMaxSize, "test file size exceeded")
r := &ioMocks.OutputReader{}
r.OnIsError(ctx).Return(false, nil)
r.OnExists(ctx).Return(true, expectedErr)

h := Handler{}
result, err := h.ValidateOutput(ctx, nodeID, nil, r, nil, execConfig, tr)
assert.NoError(t, err)
assert.False(t, result.IsRecoverable)
}

0 comments on commit 66391ff

Please sign in to comment.