Skip to content

Commit

Permalink
[Disk Manager] fix delete image idempotency: do not schedule retire b…
Browse files Browse the repository at this point in the history
…ase disks if image was not found in database (#2497)

* [Disk Manager] fix delete image idempotency: do not schedule retire base disks if image was not found in database

* [Disk Manager] add image deletion idempotency check in image service tests

* [Disk Manager] return nil imageMeta when delete image if there is no image in database; use ImageIDIsNotAcceptedError when create/delete image

* [Disk Manager] fixes

* [Disk Manager] fix test on deletion of non-existing image

* [Disk Manager] fixes

* [Disk Manager] do not use ImageIDIsNotAcceptedError when create/delete image; instead return non cancellable error for snapshot/image creation/deletion

* [Disk Manager] fix

* [Disk Manager] return imageMeta/snapshotMeta by value in createImage/deleteImage methods of resources storage

* [Disk Manager] fix
  • Loading branch information
gy2411 authored Nov 28, 2024
1 parent 13e8d01 commit e0243d9
Show file tree
Hide file tree
Showing 14 changed files with 131 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,16 @@ func TestImageServiceDeleteImage(t *testing.T) {
err = internal_client.WaitOperation(ctx, client, operation.Id)
require.NoError(t, err)

// Check idempotency.
reqCtx = testcommon.GetRequestContext(t, ctx)
operation, err = client.DeleteImage(reqCtx, &disk_manager.DeleteImageRequest{
ImageId: imageID,
})
require.NoError(t, err)
require.NotEmpty(t, operation)
err = internal_client.WaitOperation(ctx, client, operation.Id)
require.NoError(t, err)

testcommon.CheckConsistency(t, ctx)
}

Expand Down
73 changes: 38 additions & 35 deletions cloud/disk_manager/internal/pkg/resources/images.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,39 +309,37 @@ func (s *storageYDB) createImage(
ctx context.Context,
session *persistence.Session,
image ImageMeta,
) (*ImageMeta, error) {
) (ImageMeta, error) {

tx, err := session.BeginRWTransaction(ctx)
if err != nil {
return nil, err
return ImageMeta{}, err
}

defer tx.Rollback(ctx)

// HACK: see NBS-974 for details.
snapshotExists, err := s.snapshotExists(ctx, tx, image.ID)
if err != nil {
return nil, err
return ImageMeta{}, err
}

if snapshotExists {
err = tx.Commit(ctx)
if err != nil {
return nil, err
return ImageMeta{}, err
}

logging.Info(
ctx,
return ImageMeta{}, errors.NewNonCancellableErrorf(
"image with id %v can't be created, because snapshot with id %v already exists",
image.ID,
image.ID,
)
return nil, nil
}

createRequest, err := proto.Marshal(image.CreateRequest)
if err != nil {
return nil, errors.NewNonRetriableErrorf(
return ImageMeta{}, errors.NewNonRetriableErrorf(
"failed to marshal create request for image with id %v: %w",
image.ID,
err,
Expand All @@ -360,27 +358,27 @@ func (s *storageYDB) createImage(
persistence.ValueParam("$id", persistence.UTF8Value(image.ID)),
)
if err != nil {
return nil, err
return ImageMeta{}, err
}

defer res.Close()

states, err := scanImageStates(ctx, res)
if err != nil {
return nil, err
return ImageMeta{}, err
}

if len(states) != 0 {
err = tx.Commit(ctx)
if err != nil {
return nil, err
return ImageMeta{}, err
}

state := states[0]

if state.status >= imageStatusDeleting {
logging.Info(ctx, "can't create already deleting/deleted image with id %v", image.ID)
return nil, errors.NewSilentNonRetriableErrorf(
return ImageMeta{}, errors.NewSilentNonRetriableErrorf(
"can't create already deleting/deleted image with id %v",
image.ID,
)
Expand All @@ -391,11 +389,14 @@ func (s *storageYDB) createImage(
state.createTaskID == image.CreateTaskID &&
state.createdBy == image.CreatedBy {

return state.toImageMeta(), nil
return *state.toImageMeta(), nil
}

logging.Info(ctx, "image with different params already exists, old=%v, new=%v", state, image)
return nil, nil
return ImageMeta{}, errors.NewNonCancellableErrorf(
"image with different params already exists, old=%v, new=%v",
state,
image,
)
}

state := imageState{
Expand All @@ -422,7 +423,10 @@ func (s *storageYDB) createImage(
case nil:
state.encryptionKeyHash = nil
default:
return nil, errors.NewNonRetriableErrorf("unknown key %s", key)
return ImageMeta{}, errors.NewNonRetriableErrorf(
"unknown key %s",
key,
)
}
} else {
state.encryptionMode = uint32(types.EncryptionMode_NO_ENCRYPTION)
Expand All @@ -441,15 +445,15 @@ func (s *storageYDB) createImage(
persistence.ValueParam("$states", persistence.ListValue(state.structValue())),
)
if err != nil {
return nil, err
return ImageMeta{}, err
}

err = tx.Commit(ctx)
if err != nil {
return nil, err
return ImageMeta{}, err
}

return state.toImageMeta(), nil
return *state.toImageMeta(), nil
}

func (s *storageYDB) imageCreated(
Expand Down Expand Up @@ -572,13 +576,11 @@ func (s *storageYDB) deleteImage(
return nil, err
}

logging.Info(
ctx,
return nil, errors.NewNonCancellableErrorf(
"image with id %v can't be deleted, because snapshot with id %v already exists",
imageID,
imageID,
)
return nil, nil
}

res, err := tx.Execute(ctx, fmt.Sprintf(`
Expand All @@ -603,21 +605,22 @@ func (s *storageYDB) deleteImage(
return nil, err
}

var state imageState
if len(states) == 0 {
// Should be idempotent.
return nil, nil
}

if len(states) != 0 {
state = states[0]
state := states[0]

if state.status >= imageStatusDeleting {
// Image already marked as deleting/deleted.
if state.status >= imageStatusDeleting {
// Image already marked as deleting/deleted.

err = tx.Commit(ctx)
if err != nil {
return nil, err
}

return state.toImageMeta(), nil
err = tx.Commit(ctx)
if err != nil {
return nil, err
}

return state.toImageMeta(), nil
}

state.id = imageID
Expand Down Expand Up @@ -832,9 +835,9 @@ func (s *storageYDB) listImages(
func (s *storageYDB) CreateImage(
ctx context.Context,
image ImageMeta,
) (*ImageMeta, error) {
) (ImageMeta, error) {

var created *ImageMeta
var created ImageMeta

err := s.db.Execute(
ctx,
Expand Down
33 changes: 17 additions & 16 deletions cloud/disk_manager/internal/pkg/resources/images_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ func TestImagesCreateImage(t *testing.T) {

created, err := storage.CreateImage(ctx, image)
require.NoError(t, err)
require.NotNil(t, created)
require.Equal(t, image.ID, created.ID)

// Check idempotency.
created, err = storage.CreateImage(ctx, image)
require.NoError(t, err)
require.NotNil(t, created)
require.Equal(t, image.ID, created.ID)

err = storage.ImageCreated(ctx, image.ID, time.Now(), 0, 0)
require.NoError(t, err)
Expand All @@ -72,14 +72,14 @@ func TestImagesCreateImage(t *testing.T) {
// Check idempotency.
created, err = storage.CreateImage(ctx, image)
require.NoError(t, err)
require.NotNil(t, created)
require.Equal(t, image.ID, created.ID)

require.EqualValues(t, "disk", created.SrcDiskID)

image.CreateTaskID = "other"
created, err = storage.CreateImage(ctx, image)
require.NoError(t, err)
require.Nil(t, created)
_, err = storage.CreateImage(ctx, image)
require.Error(t, err)
require.True(t, errors.Is(err, errors.NewEmptyNonCancellableError()))
}

func TestImagesDeleteImage(t *testing.T) {
Expand All @@ -105,7 +105,7 @@ func TestImagesDeleteImage(t *testing.T) {

created, err := storage.CreateImage(ctx, image)
require.NoError(t, err)
require.NotNil(t, created)
require.Equal(t, image.ID, created.ID)

expected := image
expected.CreateRequest = nil
Expand Down Expand Up @@ -170,13 +170,13 @@ func TestImagesDeleteNonexistentImage(t *testing.T) {
deletingAt := time.Now()
actual, err := storage.DeleteImage(ctx, image.ID, "delete", deletingAt)
require.NoError(t, err)
requireImagesAreEqual(t, image, *actual)
require.Nil(t, actual)

// Check idempotency.
deletingAt = deletingAt.Add(time.Second)
actual, err = storage.DeleteImage(ctx, image.ID, "delete", deletingAt)
require.NoError(t, err)
requireImagesAreEqual(t, image, *actual)
require.Nil(t, actual)

_, err = storage.CreateImage(ctx, image)
require.Error(t, err)
Expand Down Expand Up @@ -215,7 +215,7 @@ func TestImagesClearDeletedImages(t *testing.T) {

created, err := storage.CreateImage(ctx, image)
require.NoError(t, err)
require.NotNil(t, created)
require.Equal(t, image.ID, created.ID)

_, err = storage.DeleteImage(ctx, image.ID, "delete", deletedAt)
require.NoError(t, err)
Expand All @@ -236,7 +236,7 @@ func TestImagesClearDeletedImages(t *testing.T) {

created, err = storage.CreateImage(ctx, image)
require.NoError(t, err)
require.NotNil(t, created)
require.Equal(t, image.ID, created.ID)
}

func TestImagesCreateImageShouldFailIfSnapshotAlreadyExists(t *testing.T) {
Expand All @@ -263,9 +263,9 @@ func TestImagesCreateImageShouldFailIfSnapshotAlreadyExists(t *testing.T) {
_, err = storage.CreateSnapshot(ctx, snapshot)
require.NoError(t, err)

created, err := storage.CreateImage(ctx, ImageMeta{ID: snapshot.ID})
require.NoError(t, err)
require.Nil(t, created)
_, err = storage.CreateImage(ctx, ImageMeta{ID: snapshot.ID})
require.Error(t, err)
require.True(t, errors.Is(err, errors.NewEmptyNonCancellableError()))
}

func TestImagesDeleteImageShouldFailIfSnapshotAlreadyExists(t *testing.T) {
Expand Down Expand Up @@ -293,7 +293,8 @@ func TestImagesDeleteImageShouldFailIfSnapshotAlreadyExists(t *testing.T) {
require.NoError(t, err)

created, err := storage.DeleteImage(ctx, snapshot.ID, "delete", time.Now())
require.NoError(t, err)
require.Error(t, err)
require.True(t, errors.Is(err, errors.NewEmptyNonCancellableError()))
require.Nil(t, created)
}

Expand Down Expand Up @@ -328,7 +329,7 @@ func TestImagesGetImage(t *testing.T) {

created, err := storage.CreateImage(ctx, image)
require.NoError(t, err)
require.NotNil(t, created)
require.Equal(t, image.ID, created.ID)

image.CreateRequest = nil

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ func (s *StorageMock) ListDisks(
func (s *StorageMock) CreateImage(
ctx context.Context,
image resources.ImageMeta,
) (*resources.ImageMeta, error) {
) (resources.ImageMeta, error) {

args := s.Called(ctx, image)
return args.Get(0).(*resources.ImageMeta), args.Error(1)
return args.Get(0).(resources.ImageMeta), args.Error(1)
}

func (s *StorageMock) ImageCreated(
Expand Down Expand Up @@ -161,10 +161,10 @@ func (s *StorageMock) ListImages(
func (s *StorageMock) CreateSnapshot(
ctx context.Context,
snapshot resources.SnapshotMeta,
) (*resources.SnapshotMeta, error) {
) (resources.SnapshotMeta, error) {

args := s.Called(ctx, snapshot)
return args.Get(0).(*resources.SnapshotMeta), args.Error(1)
return args.Get(0).(resources.SnapshotMeta), args.Error(1)
}

func (s *StorageMock) SnapshotCreated(
Expand Down
Loading

0 comments on commit e0243d9

Please sign in to comment.