Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Disk Manager] fix delete image idempotency: do not schedule retire base disks if image was not found in database #2497

Merged
merged 10 commits into from
Nov 28, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,16 @@ func TestImageServiceDeleteImage(t *testing.T) {
err = internal_client.WaitOperation(ctx, client, operation.Id)
require.NoError(t, err)

// Check idempotency.
reqCtx = testcommon.GetRequestContext(t, ctx)
operation, err = client.DeleteImage(reqCtx, &disk_manager.DeleteImageRequest{
ImageId: imageID,
})
require.NoError(t, err)
require.NotEmpty(t, operation)
err = internal_client.WaitOperation(ctx, client, operation.Id)
require.NoError(t, err)

testcommon.CheckConsistency(t, ctx)
}

Expand Down
73 changes: 38 additions & 35 deletions cloud/disk_manager/internal/pkg/resources/images.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,39 +309,37 @@ func (s *storageYDB) createImage(
ctx context.Context,
session *persistence.Session,
image ImageMeta,
) (*ImageMeta, error) {
) (ImageMeta, error) {

tx, err := session.BeginRWTransaction(ctx)
if err != nil {
return nil, err
return ImageMeta{}, err
}

defer tx.Rollback(ctx)

// HACK: see NBS-974 for details.
snapshotExists, err := s.snapshotExists(ctx, tx, image.ID)
if err != nil {
return nil, err
return ImageMeta{}, err
}

if snapshotExists {
err = tx.Commit(ctx)
if err != nil {
return nil, err
return ImageMeta{}, err
}

logging.Info(
ctx,
return ImageMeta{}, errors.NewNonCancellableErrorf(
"image with id %v can't be created, because snapshot with id %v already exists",
image.ID,
image.ID,
)
return nil, nil
}

createRequest, err := proto.Marshal(image.CreateRequest)
if err != nil {
return nil, errors.NewNonRetriableErrorf(
return ImageMeta{}, errors.NewNonRetriableErrorf(
"failed to marshal create request for image with id %v: %w",
image.ID,
err,
Expand All @@ -360,27 +358,27 @@ func (s *storageYDB) createImage(
persistence.ValueParam("$id", persistence.UTF8Value(image.ID)),
)
if err != nil {
return nil, err
return ImageMeta{}, err
}

defer res.Close()

states, err := scanImageStates(ctx, res)
if err != nil {
return nil, err
return ImageMeta{}, err
}

if len(states) != 0 {
err = tx.Commit(ctx)
if err != nil {
return nil, err
return ImageMeta{}, err
}

state := states[0]

if state.status >= imageStatusDeleting {
logging.Info(ctx, "can't create already deleting/deleted image with id %v", image.ID)
return nil, errors.NewSilentNonRetriableErrorf(
return ImageMeta{}, errors.NewSilentNonRetriableErrorf(
"can't create already deleting/deleted image with id %v",
image.ID,
)
Expand All @@ -391,11 +389,14 @@ func (s *storageYDB) createImage(
state.createTaskID == image.CreateTaskID &&
state.createdBy == image.CreatedBy {

return state.toImageMeta(), nil
return *state.toImageMeta(), nil
}

logging.Info(ctx, "image with different params already exists, old=%v, new=%v", state, image)
return nil, nil
return ImageMeta{}, errors.NewNonCancellableErrorf(
"image with different params already exists, old=%v, new=%v",
state,
image,
)
}

state := imageState{
Expand All @@ -422,7 +423,10 @@ func (s *storageYDB) createImage(
case nil:
state.encryptionKeyHash = nil
default:
return nil, errors.NewNonRetriableErrorf("unknown key %s", key)
return ImageMeta{}, errors.NewNonRetriableErrorf(
"unknown key %s",
key,
)
}
} else {
state.encryptionMode = uint32(types.EncryptionMode_NO_ENCRYPTION)
Expand All @@ -441,15 +445,15 @@ func (s *storageYDB) createImage(
persistence.ValueParam("$states", persistence.ListValue(state.structValue())),
)
if err != nil {
return nil, err
return ImageMeta{}, err
}

err = tx.Commit(ctx)
if err != nil {
return nil, err
return ImageMeta{}, err
}

return state.toImageMeta(), nil
return *state.toImageMeta(), nil
}

func (s *storageYDB) imageCreated(
Expand Down Expand Up @@ -572,13 +576,11 @@ func (s *storageYDB) deleteImage(
return nil, err
}

logging.Info(
ctx,
return nil, errors.NewNonCancellableErrorf(
"image with id %v can't be deleted, because snapshot with id %v already exists",
imageID,
imageID,
)
return nil, nil
}

res, err := tx.Execute(ctx, fmt.Sprintf(`
Expand All @@ -603,21 +605,22 @@ func (s *storageYDB) deleteImage(
return nil, err
}

var state imageState
if len(states) == 0 {
// Should be idempotent.
return nil, nil
}

if len(states) != 0 {
state = states[0]
state := states[0]

if state.status >= imageStatusDeleting {
// Image already marked as deleting/deleted.
if state.status >= imageStatusDeleting {
// Image already marked as deleting/deleted.

err = tx.Commit(ctx)
if err != nil {
return nil, err
}

return state.toImageMeta(), nil
err = tx.Commit(ctx)
if err != nil {
return nil, err
}

return state.toImageMeta(), nil
}

state.id = imageID
Expand Down Expand Up @@ -832,9 +835,9 @@ func (s *storageYDB) listImages(
func (s *storageYDB) CreateImage(
ctx context.Context,
image ImageMeta,
) (*ImageMeta, error) {
) (ImageMeta, error) {

var created *ImageMeta
var created ImageMeta

err := s.db.Execute(
ctx,
Expand Down
33 changes: 17 additions & 16 deletions cloud/disk_manager/internal/pkg/resources/images_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ func TestImagesCreateImage(t *testing.T) {

created, err := storage.CreateImage(ctx, image)
require.NoError(t, err)
require.NotNil(t, created)
require.Equal(t, image.ID, created.ID)

// Check idempotency.
created, err = storage.CreateImage(ctx, image)
require.NoError(t, err)
require.NotNil(t, created)
require.Equal(t, image.ID, created.ID)

err = storage.ImageCreated(ctx, image.ID, time.Now(), 0, 0)
require.NoError(t, err)
Expand All @@ -72,14 +72,14 @@ func TestImagesCreateImage(t *testing.T) {
// Check idempotency.
created, err = storage.CreateImage(ctx, image)
require.NoError(t, err)
require.NotNil(t, created)
require.Equal(t, image.ID, created.ID)

require.EqualValues(t, "disk", created.SrcDiskID)

image.CreateTaskID = "other"
created, err = storage.CreateImage(ctx, image)
require.NoError(t, err)
require.Nil(t, created)
_, err = storage.CreateImage(ctx, image)
require.Error(t, err)
require.True(t, errors.Is(err, errors.NewEmptyNonCancellableError()))
}

func TestImagesDeleteImage(t *testing.T) {
Expand All @@ -105,7 +105,7 @@ func TestImagesDeleteImage(t *testing.T) {

created, err := storage.CreateImage(ctx, image)
require.NoError(t, err)
require.NotNil(t, created)
require.Equal(t, image.ID, created.ID)

expected := image
expected.CreateRequest = nil
Expand Down Expand Up @@ -170,13 +170,13 @@ func TestImagesDeleteNonexistentImage(t *testing.T) {
deletingAt := time.Now()
actual, err := storage.DeleteImage(ctx, image.ID, "delete", deletingAt)
require.NoError(t, err)
requireImagesAreEqual(t, image, *actual)
require.Nil(t, actual)

// Check idempotency.
deletingAt = deletingAt.Add(time.Second)
actual, err = storage.DeleteImage(ctx, image.ID, "delete", deletingAt)
require.NoError(t, err)
requireImagesAreEqual(t, image, *actual)
require.Nil(t, actual)

_, err = storage.CreateImage(ctx, image)
require.Error(t, err)
Expand Down Expand Up @@ -215,7 +215,7 @@ func TestImagesClearDeletedImages(t *testing.T) {

created, err := storage.CreateImage(ctx, image)
require.NoError(t, err)
require.NotNil(t, created)
require.Equal(t, image.ID, created.ID)

_, err = storage.DeleteImage(ctx, image.ID, "delete", deletedAt)
require.NoError(t, err)
Expand All @@ -236,7 +236,7 @@ func TestImagesClearDeletedImages(t *testing.T) {

created, err = storage.CreateImage(ctx, image)
require.NoError(t, err)
require.NotNil(t, created)
require.Equal(t, image.ID, created.ID)
}

func TestImagesCreateImageShouldFailIfSnapshotAlreadyExists(t *testing.T) {
Expand All @@ -263,9 +263,9 @@ func TestImagesCreateImageShouldFailIfSnapshotAlreadyExists(t *testing.T) {
_, err = storage.CreateSnapshot(ctx, snapshot)
require.NoError(t, err)

created, err := storage.CreateImage(ctx, ImageMeta{ID: snapshot.ID})
require.NoError(t, err)
require.Nil(t, created)
_, err = storage.CreateImage(ctx, ImageMeta{ID: snapshot.ID})
require.Error(t, err)
require.True(t, errors.Is(err, errors.NewEmptyNonCancellableError()))
}

func TestImagesDeleteImageShouldFailIfSnapshotAlreadyExists(t *testing.T) {
Expand Down Expand Up @@ -293,7 +293,8 @@ func TestImagesDeleteImageShouldFailIfSnapshotAlreadyExists(t *testing.T) {
require.NoError(t, err)

created, err := storage.DeleteImage(ctx, snapshot.ID, "delete", time.Now())
require.NoError(t, err)
require.Error(t, err)
require.True(t, errors.Is(err, errors.NewEmptyNonCancellableError()))
require.Nil(t, created)
}

Expand Down Expand Up @@ -328,7 +329,7 @@ func TestImagesGetImage(t *testing.T) {

created, err := storage.CreateImage(ctx, image)
require.NoError(t, err)
require.NotNil(t, created)
require.Equal(t, image.ID, created.ID)

image.CreateRequest = nil

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ func (s *StorageMock) ListDisks(
func (s *StorageMock) CreateImage(
ctx context.Context,
image resources.ImageMeta,
) (*resources.ImageMeta, error) {
) (resources.ImageMeta, error) {

args := s.Called(ctx, image)
return args.Get(0).(*resources.ImageMeta), args.Error(1)
return args.Get(0).(resources.ImageMeta), args.Error(1)
}

func (s *StorageMock) ImageCreated(
Expand Down Expand Up @@ -161,10 +161,10 @@ func (s *StorageMock) ListImages(
func (s *StorageMock) CreateSnapshot(
ctx context.Context,
snapshot resources.SnapshotMeta,
) (*resources.SnapshotMeta, error) {
) (resources.SnapshotMeta, error) {

args := s.Called(ctx, snapshot)
return args.Get(0).(*resources.SnapshotMeta), args.Error(1)
return args.Get(0).(resources.SnapshotMeta), args.Error(1)
}

func (s *StorageMock) SnapshotCreated(
Expand Down
Loading