Skip to content

Commit

Permalink
cephfs: safeguard localClusterState struct from race conditions
Browse files Browse the repository at this point in the history
This commit uses atomic.Int64 and sync.Map with members of
localClusterState and safeguards clusterAdditionalInfo map
operations with a mutex.

Signed-off-by: Rakshith R <[email protected]>
  • Loading branch information
Rakshith-R committed Oct 5, 2023
1 parent cba5402 commit da3ef26
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 19 deletions.
6 changes: 3 additions & 3 deletions internal/cephfs/core/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,18 @@ var ErrSubVolMetadataNotSupported = errors.New("subvolume metadata operations ar
func (s *subVolumeClient) supportsSubVolMetadata() bool {
newLocalClusterState(s.clusterID)

return clusterAdditionalInfo[s.clusterID].subVolMetadataState != unsupported
return clusterAdditionalInfo[s.clusterID].subVolMetadataState.Load() != unsupported
}

func (s *subVolumeClient) isUnsupportedSubVolMetadata(err error) bool {
var invalid fsAdmin.NotImplementedError
if err != nil && errors.As(err, &invalid) {
// In case the error is other than invalid command return error to the caller.
clusterAdditionalInfo[s.clusterID].subVolMetadataState = unsupported
clusterAdditionalInfo[s.clusterID].subVolMetadataState.Store(unsupported)

return false
}
clusterAdditionalInfo[s.clusterID].subVolMetadataState = supported
clusterAdditionalInfo[s.clusterID].subVolMetadataState.Store(supported)

return true
}
Expand Down
6 changes: 3 additions & 3 deletions internal/cephfs/core/snapshot_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ var ErrSubVolSnapMetadataNotSupported = errors.New("subvolume snapshot metadata
func (s *snapshotClient) supportsSubVolSnapMetadata() bool {
newLocalClusterState(s.clusterID)

return clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState != unsupported
return clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.Load() != unsupported
}

func (s *snapshotClient) isUnsupportedSubVolSnapMetadata(err error) bool {
var invalid fsAdmin.NotImplementedError
if err != nil && errors.As(err, &invalid) {
// In case the error is other than invalid command return error to
// the caller.
clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState = unsupported
clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.Store(unsupported)

return false
}
clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState = supported
clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.Store(supported)

return true
}
Expand Down
30 changes: 17 additions & 13 deletions internal/cephfs/core/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"fmt"
"path"
"strings"
"sync"
"sync/atomic"

cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
Expand All @@ -38,6 +40,7 @@ import (
// Subvolumegroup creation and volume resize decisions are
// taken through this additional cluster information.
var clusterAdditionalInfo = make(map[string]*localClusterState)
var clusterAdditionalInfoMutex sync.Mutex

// Subvolume holds subvolume information. This includes only the needed members
// from fsAdmin.SubVolumeInfo.
Expand Down Expand Up @@ -193,30 +196,31 @@ func (s *subVolumeClient) GetSubVolumeInfo(ctx context.Context) (*Subvolume, err
type operationState int64

const (
unknown operationState = iota
unknown int64 = iota
supported
unsupported
)

type localClusterState struct {
// set the enum value i.e., unknown, supported,
// unsupported as per the state of the cluster.
resizeState operationState
subVolMetadataState operationState
subVolSnapshotMetadataState operationState
resizeState atomic.Int64
subVolMetadataState atomic.Int64
subVolSnapshotMetadataState atomic.Int64
// A cluster can have multiple filesystem for that we need to have a map of
// subvolumegroups to check filesystem is created nor not.
// set true once a subvolumegroup is created
// for corresponding filesystem in a cluster.
subVolumeGroupsCreated map[string]bool
subVolumeGroupsCreated sync.Map
}

func newLocalClusterState(clusterID string) {
// verify if corresponding clusterID key is present in the map,
// and if not, initialize with default values(false).
clusterAdditionalInfoMutex.Lock()
defer clusterAdditionalInfoMutex.Unlock()
if _, keyPresent := clusterAdditionalInfo[clusterID]; !keyPresent {
clusterAdditionalInfo[clusterID] = &localClusterState{}
clusterAdditionalInfo[clusterID].subVolumeGroupsCreated = make(map[string]bool)
}
}

Expand All @@ -232,7 +236,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error {
}

// create subvolumegroup if not already created for the cluster.
if !clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] {
if _, found := clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated.Load(s.FsName); found {
opts := fsAdmin.SubVolumeGroupOptions{}
err = ca.CreateSubVolumeGroup(s.FsName, s.SubvolumeGroup, &opts)
if err != nil {
Expand All @@ -246,7 +250,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error {
return err
}
log.DebugLog(ctx, "cephfs: created subvolume group %s", s.SubvolumeGroup)
clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] = true
clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated.Store(s.FsName, true)
}

opts := fsAdmin.SubVolumeOptions{
Expand All @@ -264,7 +268,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error {
if errors.Is(err, rados.ErrNotFound) {
// Reset the subVolumeGroupsCreated so that we can try again to create the
// subvolumegroup in next request if the error is Not Found.
clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] = false
clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated.Delete(s.FsName)
}

return err
Expand Down Expand Up @@ -297,8 +301,8 @@ func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) er
newLocalClusterState(s.clusterID)
// resize subvolume when either it's supported, or when corresponding
// clusterID key was not present.
if clusterAdditionalInfo[s.clusterID].resizeState == unknown ||
clusterAdditionalInfo[s.clusterID].resizeState == supported {
if clusterAdditionalInfo[s.clusterID].resizeState.Load() == unknown ||
clusterAdditionalInfo[s.clusterID].resizeState.Load() == supported {
fsa, err := s.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin, can not resize volume %s:", s.FsName, err)
Expand All @@ -307,7 +311,7 @@ func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) er
}
_, err = fsa.ResizeSubVolume(s.FsName, s.SubvolumeGroup, s.VolID, fsAdmin.ByteCount(bytesQuota), true)
if err == nil {
clusterAdditionalInfo[s.clusterID].resizeState = supported
clusterAdditionalInfo[s.clusterID].resizeState.Store(supported)

return nil
}
Expand All @@ -319,7 +323,7 @@ func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) er
return err
}
}
clusterAdditionalInfo[s.clusterID].resizeState = unsupported
clusterAdditionalInfo[s.clusterID].resizeState.Store(unsupported)
s.Size = bytesQuota

return s.CreateVolume(ctx)
Expand Down

0 comments on commit da3ef26

Please sign in to comment.