Skip to content

Commit

Permalink
Backward compatibility test
Browse files Browse the repository at this point in the history
  • Loading branch information
bgedik committed Nov 2, 2024
1 parent cb24656 commit 2fa02ec
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type earliestFileErrorReader struct {
- While the earliest file error reader handles the single error file scenario as well,
it is not set as the default, because its implementation depends on doing a listing operation
on remote storage. We do not want the listing overhead to be paid for the more common case of
on remote storage. We do not want the listing overhead to be paid for the more common case of
having a single error file.
- Under the multiple error aggregation scenario, it is possible that the error aggregation
is performed before all the errors are reported. For PyTorch plugin specifically, the
Expand All @@ -57,8 +57,8 @@ type earliestFileErrorReader struct {
where the pod that has the earliest error gets delayed in uploading its error file to
remote storage, and the pod that has a later error ends up completing first. If the
training operator's detection of job completion and Propeller's error aggregation happen so
fast that the pod with the earliest error has not yet uploaded it's error to remote storage,
we may end up reporting the wrong error. This is highly unlikely in practice. The implementation
fast that the pod with the earliest error has not yet uploaded it's error to remote storage,
we may end up reporting the wrong error. This is highly unlikely in practice. The implementation
we have here is significantly better than the prior behavior of reporting the latest written
error.pb file (as there was a race condition on overwriting error files), which is almost always
not the earliest error.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,55 @@ func TestReadOrigin(t *testing.T) {
assert.Equal(t, timestamppb.New(time.Unix(99, 0)), executionError.Timestamp)
assert.False(t, executionError.IsRecoverable)
})

t.Run("multi-user-error-backward-compat", func(t *testing.T) {
outputPaths := &pluginsIOMock.OutputFilePaths{}
outputPaths.OnGetErrorPath().Return("s3://errors/error.pb")

store := &storageMocks.ComposedProtobufStore{}
store.OnReadProtobufMatch(mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
errorDoc := &core.ErrorDocument{
Error: &core.ContainerError{
Code: "red",
Message: "hi",
Kind: core.ContainerError_NON_RECOVERABLE,
Origin: core.ExecutionError_USER,
},
}
incomingErrorDoc := args.Get(2)
assert.NotNil(t, incomingErrorDoc)
casted := incomingErrorDoc.(*core.ErrorDocument)
casted.Error = errorDoc.Error
}).Return(nil)

store.OnList(ctx, storage.DataReference("s3://errors/error"), 1000, storage.NewCursorAtStart()).Return(
[]storage.DataReference{"error.pb"}, storage.NewCursorAtEnd(), nil)

store.OnHead(ctx, storage.DataReference("error.pb")).Return(MemoryMetadata{
exists: true,
}, nil)

maxPayloadSize := int64(0)
r, err := NewRemoteFileOutputReaderWithErrorAggregationStrategy(
ctx,
store,
outputPaths,
maxPayloadSize,
k8s.EarliestErrorAggregationStrategy,
)
assert.NoError(t, err)

hasError, err := r.IsError(ctx)
assert.NoError(t, err)
assert.True(t, hasError)

executionError, err := r.ReadError(ctx)
assert.NoError(t, err)
assert.Equal(t, core.ExecutionError_USER, executionError.Kind)
assert.Equal(t, "red", executionError.Code)
assert.Equal(t, "hi", executionError.Message)
assert.Equal(t, "", executionError.Worker)
assert.Nil(t, executionError.Timestamp)
assert.False(t, executionError.IsRecoverable)
})
}

0 comments on commit 2fa02ec

Please sign in to comment.