Skip to content

Commit

Permalink
[Disk Manager] return imageMeta/snapshotMeta by value in createImage/…
Browse files Browse the repository at this point in the history
…deleteImage methods of resources storage
  • Loading branch information
gy2411 committed Nov 26, 2024
1 parent cb5a2e8 commit e2f1b2d
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 64 deletions.
41 changes: 23 additions & 18 deletions cloud/disk_manager/internal/pkg/resources/images.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,28 +309,30 @@ func (s *storageYDB) createImage(
ctx context.Context,
session *persistence.Session,
image ImageMeta,
) (*ImageMeta, error) {
) (ImageMeta, error) {

var emptyImageMeta ImageMeta

tx, err := session.BeginRWTransaction(ctx)
if err != nil {
return nil, err
return emptyImageMeta, err
}

defer tx.Rollback(ctx)

// HACK: see NBS-974 for details.
snapshotExists, err := s.snapshotExists(ctx, tx, image.ID)
if err != nil {
return nil, err
return emptyImageMeta, err
}

if snapshotExists {
err = tx.Commit(ctx)
if err != nil {
return nil, err
return emptyImageMeta, err
}

return nil, errors.NewNonCancellableErrorf(
return emptyImageMeta, errors.NewNonCancellableErrorf(
"image with id %v can't be created, because snapshot with id %v already exists",
image.ID,
image.ID,
Expand All @@ -339,7 +341,7 @@ func (s *storageYDB) createImage(

createRequest, err := proto.Marshal(image.CreateRequest)
if err != nil {
return nil, errors.NewNonRetriableErrorf(
return emptyImageMeta, errors.NewNonRetriableErrorf(
"failed to marshal create request for image with id %v: %w",
image.ID,
err,
Expand All @@ -358,27 +360,27 @@ func (s *storageYDB) createImage(
persistence.ValueParam("$id", persistence.UTF8Value(image.ID)),
)
if err != nil {
return nil, err
return emptyImageMeta, err
}

defer res.Close()

states, err := scanImageStates(ctx, res)
if err != nil {
return nil, err
return emptyImageMeta, err
}

if len(states) != 0 {
err = tx.Commit(ctx)
if err != nil {
return nil, err
return emptyImageMeta, err
}

state := states[0]

if state.status >= imageStatusDeleting {
logging.Info(ctx, "can't create already deleting/deleted image with id %v", image.ID)
return nil, errors.NewSilentNonRetriableErrorf(
return emptyImageMeta, errors.NewSilentNonRetriableErrorf(
"can't create already deleting/deleted image with id %v",
image.ID,
)
Expand All @@ -389,10 +391,10 @@ func (s *storageYDB) createImage(
state.createTaskID == image.CreateTaskID &&
state.createdBy == image.CreatedBy {

return state.toImageMeta(), nil
return *state.toImageMeta(), nil
}

return nil, errors.NewNonCancellableErrorf(
return emptyImageMeta, errors.NewNonCancellableErrorf(
"image with different params already exists, old=%v, new=%v",
state,
image,
Expand Down Expand Up @@ -423,7 +425,10 @@ func (s *storageYDB) createImage(
case nil:
state.encryptionKeyHash = nil
default:
return nil, errors.NewNonRetriableErrorf("unknown key %s", key)
return emptyImageMeta, errors.NewNonRetriableErrorf(
"unknown key %s",
key,
)
}
} else {
state.encryptionMode = uint32(types.EncryptionMode_NO_ENCRYPTION)
Expand All @@ -442,15 +447,15 @@ func (s *storageYDB) createImage(
persistence.ValueParam("$states", persistence.ListValue(state.structValue())),
)
if err != nil {
return nil, err
return emptyImageMeta, err
}

err = tx.Commit(ctx)
if err != nil {
return nil, err
return emptyImageMeta, err
}

return state.toImageMeta(), nil
return *state.toImageMeta(), nil
}

func (s *storageYDB) imageCreated(
Expand Down Expand Up @@ -832,9 +837,9 @@ func (s *storageYDB) listImages(
func (s *storageYDB) CreateImage(
ctx context.Context,
image ImageMeta,
) (*ImageMeta, error) {
) (ImageMeta, error) {

var created *ImageMeta
var created ImageMeta

err := s.db.Execute(
ctx,
Expand Down
20 changes: 9 additions & 11 deletions cloud/disk_manager/internal/pkg/resources/images_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ func TestImagesCreateImage(t *testing.T) {

created, err := storage.CreateImage(ctx, image)
require.NoError(t, err)
require.NotNil(t, created)
require.Equal(t, image.ID, created.ID)

// Check idempotency.
created, err = storage.CreateImage(ctx, image)
require.NoError(t, err)
require.NotNil(t, created)
require.Equal(t, image.ID, created.ID)

err = storage.ImageCreated(ctx, image.ID, time.Now(), 0, 0)
require.NoError(t, err)
Expand All @@ -72,15 +72,14 @@ func TestImagesCreateImage(t *testing.T) {
// Check idempotency.
created, err = storage.CreateImage(ctx, image)
require.NoError(t, err)
require.NotNil(t, created)
require.Equal(t, image.ID, created.ID)

require.EqualValues(t, "disk", created.SrcDiskID)

image.CreateTaskID = "other"
created, err = storage.CreateImage(ctx, image)
_, err = storage.CreateImage(ctx, image)
require.Error(t, err)
require.True(t, errors.Is(err, errors.NewEmptyNonCancellableError()))
require.Nil(t, created)
}

func TestImagesDeleteImage(t *testing.T) {
Expand All @@ -106,7 +105,7 @@ func TestImagesDeleteImage(t *testing.T) {

created, err := storage.CreateImage(ctx, image)
require.NoError(t, err)
require.NotNil(t, created)
require.Equal(t, image.ID, created.ID)

expected := image
expected.CreateRequest = nil
Expand Down Expand Up @@ -216,7 +215,7 @@ func TestImagesClearDeletedImages(t *testing.T) {

created, err := storage.CreateImage(ctx, image)
require.NoError(t, err)
require.NotNil(t, created)
require.Equal(t, image.ID, created.ID)

_, err = storage.DeleteImage(ctx, image.ID, "delete", deletedAt)
require.NoError(t, err)
Expand All @@ -237,7 +236,7 @@ func TestImagesClearDeletedImages(t *testing.T) {

created, err = storage.CreateImage(ctx, image)
require.NoError(t, err)
require.NotNil(t, created)
require.Equal(t, image.ID, created.ID)
}

func TestImagesCreateImageShouldFailIfSnapshotAlreadyExists(t *testing.T) {
Expand All @@ -264,10 +263,9 @@ func TestImagesCreateImageShouldFailIfSnapshotAlreadyExists(t *testing.T) {
_, err = storage.CreateSnapshot(ctx, snapshot)
require.NoError(t, err)

created, err := storage.CreateImage(ctx, ImageMeta{ID: snapshot.ID})
_, err = storage.CreateImage(ctx, ImageMeta{ID: snapshot.ID})
require.Error(t, err)
require.True(t, errors.Is(err, errors.NewEmptyNonCancellableError()))
require.Nil(t, created)
}

func TestImagesDeleteImageShouldFailIfSnapshotAlreadyExists(t *testing.T) {
Expand Down Expand Up @@ -331,7 +329,7 @@ func TestImagesGetImage(t *testing.T) {

created, err := storage.CreateImage(ctx, image)
require.NoError(t, err)
require.NotNil(t, created)
require.Equal(t, image.ID, created.ID)

image.CreateRequest = nil

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ func (s *StorageMock) ListDisks(
func (s *StorageMock) CreateImage(
ctx context.Context,
image resources.ImageMeta,
) (*resources.ImageMeta, error) {
) (resources.ImageMeta, error) {

args := s.Called(ctx, image)
return args.Get(0).(*resources.ImageMeta), args.Error(1)
return args.Get(0).(resources.ImageMeta), args.Error(1)
}

func (s *StorageMock) ImageCreated(
Expand Down Expand Up @@ -161,10 +161,10 @@ func (s *StorageMock) ListImages(
func (s *StorageMock) CreateSnapshot(
ctx context.Context,
snapshot resources.SnapshotMeta,
) (*resources.SnapshotMeta, error) {
) (resources.SnapshotMeta, error) {

args := s.Called(ctx, snapshot)
return args.Get(0).(*resources.SnapshotMeta), args.Error(1)
return args.Get(0).(resources.SnapshotMeta), args.Error(1)
}

func (s *StorageMock) SnapshotCreated(
Expand Down
41 changes: 23 additions & 18 deletions cloud/disk_manager/internal/pkg/resources/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,27 +296,29 @@ func (s *storageYDB) createSnapshot(
ctx context.Context,
session *persistence.Session,
snapshot SnapshotMeta,
) (*SnapshotMeta, error) {
) (SnapshotMeta, error) {

var emptySnapshotMeta SnapshotMeta

tx, err := session.BeginRWTransaction(ctx)
if err != nil {
return nil, err
return emptySnapshotMeta, err
}
defer tx.Rollback(ctx)

// HACK: see NBS-974 for details.
imageExists, err := s.imageExists(ctx, tx, snapshot.ID)
if err != nil {
return nil, err
return emptySnapshotMeta, err
}

if imageExists {
err = tx.Commit(ctx)
if err != nil {
return nil, err
return emptySnapshotMeta, err
}

return nil, errors.NewNonCancellableErrorf(
return emptySnapshotMeta, errors.NewNonCancellableErrorf(
"snapshot with id %v can't be created, because image with id %v already exists",
snapshot.ID,
snapshot.ID,
Expand All @@ -325,7 +327,7 @@ func (s *storageYDB) createSnapshot(

createRequest, err := proto.Marshal(snapshot.CreateRequest)
if err != nil {
return nil, errors.NewNonRetriableErrorf(
return emptySnapshotMeta, errors.NewNonRetriableErrorf(
"failed to marshal create request for snapshot with id %v: %w",
snapshot.ID,
err,
Expand All @@ -344,26 +346,26 @@ func (s *storageYDB) createSnapshot(
persistence.ValueParam("$id", persistence.UTF8Value(snapshot.ID)),
)
if err != nil {
return nil, err
return emptySnapshotMeta, err
}
defer res.Close()

states, err := scanSnapshotStates(ctx, res)
if err != nil {
return nil, err
return emptySnapshotMeta, err
}

if len(states) != 0 {
err = tx.Commit(ctx)
if err != nil {
return nil, err
return emptySnapshotMeta, err
}

state := states[0]

if state.status >= snapshotStatusDeleting {
logging.Info(ctx, "can't create already deleting/deleted snapshot with id %v", snapshot.ID)
return nil, errors.NewSilentNonRetriableErrorf(
return emptySnapshotMeta, errors.NewSilentNonRetriableErrorf(
"can't create already deleting/deleted snapshot with id %v",
snapshot.ID,
)
Expand All @@ -374,10 +376,10 @@ func (s *storageYDB) createSnapshot(
state.createTaskID == snapshot.CreateTaskID &&
state.createdBy == snapshot.CreatedBy {

return state.toSnapshotMeta(), nil
return *state.toSnapshotMeta(), nil
}

return nil, errors.NewNonCancellableErrorf(
return emptySnapshotMeta, errors.NewNonCancellableErrorf(
"snapshot with different params already exists, old=%v, new=%v",
state,
snapshot,
Expand Down Expand Up @@ -408,7 +410,10 @@ func (s *storageYDB) createSnapshot(
case nil:
state.encryptionKeyHash = nil
default:
return nil, errors.NewNonRetriableErrorf("unknown key %s", key)
return emptySnapshotMeta, errors.NewNonRetriableErrorf(
"unknown key %s",
key,
)
}
} else {
state.encryptionMode = uint32(types.EncryptionMode_NO_ENCRYPTION)
Expand All @@ -427,15 +432,15 @@ func (s *storageYDB) createSnapshot(
persistence.ValueParam("$states", persistence.ListValue(state.structValue())),
)
if err != nil {
return nil, err
return emptySnapshotMeta, err
}

err = tx.Commit(ctx)
if err != nil {
return nil, err
return emptySnapshotMeta, err
}

return state.toSnapshotMeta(), nil
return *state.toSnapshotMeta(), nil
}

func (s *storageYDB) snapshotCreated(
Expand Down Expand Up @@ -808,9 +813,9 @@ func (s *storageYDB) listSnapshots(
func (s *storageYDB) CreateSnapshot(
ctx context.Context,
snapshot SnapshotMeta,
) (*SnapshotMeta, error) {
) (SnapshotMeta, error) {

var created *SnapshotMeta
var created SnapshotMeta

err := s.db.Execute(
ctx,
Expand Down
Loading

0 comments on commit e2f1b2d

Please sign in to comment.