Skip to content

Commit

Permalink
rbd: make use of both listSnapshots and listChildren
Browse files Browse the repository at this point in the history
Currently, CephCSI only uses listSnaps to determine
number of snapshots on a RBD image and uses snapshot
names as child image names to flatten them.
But child images may have different name(in case of
group snapshot) or they maybe in trash
(deleted k8s VolSnapshot with alive restored PVC).

The above problems are avoid by making use of both
snap and child image lists.

Signed-off-by: Rakshith R <[email protected]>
  • Loading branch information
Rakshith-R committed Dec 3, 2024
1 parent 6523ca3 commit 94b9f0b
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 30 deletions.
23 changes: 19 additions & 4 deletions internal/rbd/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ func (cs *ControllerServer) repairExistingVolume(ctx context.Context, req *csi.C
// are more than the `minSnapshotOnImage` Add a task to flatten all the
// temporary cloned images.
func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error {
snaps, err := rbdVol.listSnapshots()
snaps, children, err := rbdVol.ListSnapAndChildren()
if err != nil {
if errors.Is(err, ErrImageNotFound) {
return status.Error(codes.InvalidArgument, err.Error())
Expand All @@ -589,9 +589,17 @@ func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *ut
len(snaps),
rbdVol,
maxSnapshotsOnImage)

if len(children) == 0 {
// if none of the child images(are in trash) exist, we can't flatten them.
// return ResourceExhausted error message as we have reached the hard limit.
log.DebugLog(ctx, "child images of image %q cannot be flatten", rbdVol)

return status.Errorf(codes.ResourceExhausted, "rbd image %q has %d snapshots but child images cannot be flattened", rbdVol, len(snaps))
}
err = flattenClonedRbdImages(
ctx,
snaps,
children,
rbdVol.Pool,
rbdVol.Monitors,
rbdVol.RbdImageName,
Expand All @@ -610,13 +618,20 @@ func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *ut
len(snaps),
rbdVol,
minSnapshotsOnImageToStartFlatten)
if len(children) == 0 {
// if none of the child images(are in trash) exist, we can't flatten them.
// return nil since we have only reach the soft limit.
log.DebugLog(ctx, "child images of image %q cannot be flatten", rbdVol)

return nil
}
// If we start flattening all the snapshots at one shot the volume
// creation time will be affected,so we will flatten only the extra
// snapshots.
snaps = snaps[minSnapshotsOnImageToStartFlatten-1:]
children = children[:minSnapshotsOnImageToStartFlatten]
err = flattenClonedRbdImages(
ctx,
snaps,
children,
rbdVol.Pool,
rbdVol.Monitors,
rbdVol.RbdImageName,
Expand Down
6 changes: 5 additions & 1 deletion internal/rbd/group_controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,16 @@ func (cs *ControllerServer) CreateVolumeGroupSnapshot(
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
errList := make([]error, 0)
for _, volume := range volumes {
err = volume.PrepareVolumeForSnapshot(ctx, creds)
if err != nil {
return nil, status.Error(codes.Aborted, err.Error())
errList = append(errList, err)
}
}
if err != nil {
return nil, status.Errorf(codes.Aborted, "failed to prepare volumes for snapshot: %v", errList)
}

// create a temporary VolumeGroup with a different name
vg, err = mgr.CreateVolumeGroup(ctx, vgName)
Expand Down
40 changes: 15 additions & 25 deletions internal/rbd/rbd_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ type trashSnapInfo struct {

func flattenClonedRbdImages(
ctx context.Context,
snaps []librbd.SnapInfo,
children []string,
pool, monitors, rbdImageName string,
cr *util.Credentials,
) error {
Expand All @@ -803,26 +803,9 @@ func flattenClonedRbdImages(

return err
}
var origNameList []trashSnapInfo
for _, snapInfo := range snaps {
// check if the snapshot belongs to trash namespace.
isTrash, retErr := rv.isTrashSnap(snapInfo.Id)
if retErr != nil {
return retErr
}

if isTrash {
// get original snap name for the snapshot in trash namespace
origSnapName, retErr := rv.getOrigSnapName(snapInfo.Id)
if retErr != nil {
return retErr
}
origNameList = append(origNameList, trashSnapInfo{origSnapName})
}
}

for _, snapName := range origNameList {
rv.RbdImageName = snapName.origSnapName
for _, childName := range children {
rv.RbdImageName = childName
err = rv.flattenRbdImage(ctx, true, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
if err != nil {
log.ErrorLog(ctx, "failed to flatten %s; err %v", rv, err)
Expand Down Expand Up @@ -2048,19 +2031,26 @@ func (ri *rbdImage) DisableDeepFlatten() error {
return image.UpdateFeatures(librbd.FeatureDeepFlatten, false)
}

func (ri *rbdImage) listSnapshots() ([]librbd.SnapInfo, error) {
// ListSnapAndChildren returns list of names of snapshots and child images.
func (ri *rbdImage) ListSnapAndChildren() ([]librbd.SnapInfo, []string, error) {
image, err := ri.open()
if err != nil {
return nil, err
return nil, nil, err
}
defer image.Close()

snapInfoList, err := image.GetSnapshotNames()
snaps, err := image.GetSnapshotNames()
if err != nil {
return nil, err
return nil, nil, err
}

// ListChildren() returns pools, images, err.
_, children, err := image.ListChildren()
if err != nil {
return nil, nil, err
}

return snapInfoList, nil
return snaps, children, nil
}

// isTrashSnap returns true if the snapshot belongs to trash namespace.
Expand Down

0 comments on commit 94b9f0b

Please sign in to comment.