Skip to content

Commit

Permalink
Merge branch 'release-0.9' into support/v1alpha1-api-version-filter
Browse files Browse the repository at this point in the history
  • Loading branch information
leon-inf committed Nov 29, 2024
2 parents b2c2544 + d72a42b commit fc95641
Show file tree
Hide file tree
Showing 96 changed files with 2,289 additions and 1,573 deletions.
10 changes: 10 additions & 0 deletions apis/apps/v1alpha1/opsrequest_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,11 +477,17 @@ type OpsRequestVolumeClaimTemplate struct {
Name string `json:"name"`
}

// +kubebuilder:validation:XValidation:rule="has(self.shards) ? (!has(self.scaleOut) && !has(self.scaleIn)) : true",message="shards field cannot be used together with scaleOut or scaleIn"

// HorizontalScaling defines the parameters of a horizontal scaling operation.
type HorizontalScaling struct {
// Specifies the name of the Component.
ComponentOps `json:",inline"`

// Specifies the desired number of shards for the component.
// This parameter is mutually exclusive with other parameters.
Shards *int32 `json:"shards,omitempty"`

// Deprecated: since v0.9, use scaleOut and scaleIn instead.
// Specifies the number of replicas for the component. Cannot be used with "scaleIn" and "scaleOut".
// +kubebuilder:deprecatedversion:warning="This field has been deprecated since 0.9.0"
Expand Down Expand Up @@ -1211,6 +1217,10 @@ type LastComponentConfiguration struct {
// +optional
Replicas *int32 `json:"replicas,omitempty"`

// Records the `shards` of the Component prior to any changes.
// +optional
Shards *int32 `json:"shards,omitempty"`

// Records the resources of the Component prior to any changes.
// +kubebuilder:pruning:PreserveUnknownFields
// +optional
Expand Down
40 changes: 28 additions & 12 deletions apis/apps/v1alpha1/opsrequest_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ func (r *OpsRequest) Force() bool {
return r.Spec.Force && r.Spec.Type != StartType
}

// validateClusterPhase validates whether the current cluster state supports the OpsRequest
func (r *OpsRequest) validateClusterPhase(cluster *Cluster) error {
// ValidateClusterPhase validates whether the current cluster state supports the OpsRequest
func (r *OpsRequest) ValidateClusterPhase(cluster *Cluster) error {
opsBehaviour := OpsRequestBehaviourMapper[r.Spec.Type]
// if the OpsType has no cluster phases, ignore it
if len(opsBehaviour.FromClusterPhases) == 0 {
Expand All @@ -134,25 +134,32 @@ func (r *OpsRequest) validateClusterPhase(cluster *Cluster) error {
// validate whether existing the same type OpsRequest
var (
opsRequestValue string
opsRecorder []OpsRecorder
opsRecorders []OpsRecorder
ok bool
)
if opsRequestValue, ok = cluster.Annotations[opsRequestAnnotationKey]; ok {
// opsRequest annotation value in cluster to map
if err := json.Unmarshal([]byte(opsRequestValue), &opsRecorder); err != nil {
if err := json.Unmarshal([]byte(opsRequestValue), &opsRecorders); err != nil {
return err
}
}
// check if the opsRequest can be executed in the current cluster.
if slices.Contains(opsBehaviour.FromClusterPhases, cluster.Status.Phase) {
return nil
}
var opsRecord *OpsRecorder
for _, v := range opsRecorders {
if v.Name == r.Name {
opsRecord = &v
break
}
}
// check if this opsRequest needs to verify cluster phase before opsRequest starts running.
needCheck := len(opsRecorder) == 0 || (opsRecorder[0].Name == r.Name && opsRecorder[0].InQueue)
if !needCheck {
return nil
needCheck := len(opsRecorders) == 0 || (opsRecord != nil && !opsRecord.InQueue)
if needCheck {
return fmt.Errorf("OpsRequest.spec.type=%s is forbidden when Cluster.status.phase=%s", r.Spec.Type, cluster.Status.Phase)
}
return fmt.Errorf("OpsRequest.spec.type=%s is forbidden when Cluster.status.phase=%s", r.Spec.Type, cluster.Status.Phase)
return nil
}

// getCluster gets cluster with webhook client
Expand Down Expand Up @@ -189,11 +196,11 @@ func (r *OpsRequest) Validate(ctx context.Context,
cluster *Cluster,
needCheckClusterPhase bool) error {
if needCheckClusterPhase {
if err := r.validateClusterPhase(cluster); err != nil {
if err := r.ValidateClusterPhase(cluster); err != nil {
return err
}
}
return r.validateOps(ctx, k8sClient, cluster)
return r.ValidateOps(ctx, k8sClient, cluster)
}

// ValidateEntry OpsRequest webhook validate entry
Expand All @@ -215,8 +222,8 @@ func (r *OpsRequest) validateEntry(isCreate bool) error {
return r.Validate(ctx, k8sClient, cluster, isCreate)
}

// validateOps validates ops attributes
func (r *OpsRequest) validateOps(ctx context.Context,
// ValidateOps validates ops attributes
func (r *OpsRequest) ValidateOps(ctx context.Context,
k8sClient client.Client,
cluster *Cluster) error {
// Check whether the corresponding attribute is legal according to the operation type
Expand Down Expand Up @@ -456,6 +463,15 @@ func (r *OpsRequest) CountOfflineOrOnlineInstances(clusterName, componentName st
func (r *OpsRequest) validateHorizontalScalingSpec(hScale HorizontalScaling, compSpec ClusterComponentSpec, clusterName string, isSharding bool) error {
scaleIn := hScale.ScaleIn
scaleOut := hScale.ScaleOut
if hScale.Shards != nil {
if !isSharding {
return fmt.Errorf(`shards field cannot be used for the component "%s"`, hScale.ComponentName)
}
if scaleOut != nil || scaleIn != nil {
return fmt.Errorf(`shards field cannot be used together with scaleOut or scaleIn for the component "%s"`, hScale.ComponentName)
}
return nil
}
if hScale.Replicas != nil && (scaleIn != nil || scaleOut != nil) {
return fmt.Errorf(`"replicas" has been deprecated and cannot be used with "scaleOut" and "scaleIn"`)
}
Expand Down
10 changes: 10 additions & 0 deletions apis/apps/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions config/crd/bases/apps.kubeblocks.io_opsrequests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3780,9 +3780,20 @@ spec:
minimum: 0
type: integer
type: object
shards:
description: |-
Specifies the desired number of shards for the component.
This parameter is mutually exclusive with other parameters.
format: int32
type: integer
required:
- componentName
type: object
x-kubernetes-validations:
- message: shards field cannot be used together with scaleOut or
scaleIn
rule: 'has(self.shards) ? (!has(self.scaleOut) && !has(self.scaleIn))
: true'
type: array
x-kubernetes-list-map-keys:
- componentName
Expand Down Expand Up @@ -8662,6 +8673,11 @@ spec:
- name
type: object
type: array
shards:
description: Records the `shards` of the Component prior
to any changes.
format: int32
type: integer
targetResources:
additionalProperties:
items:
Expand Down
50 changes: 44 additions & 6 deletions controllers/apps/operations/horizontal_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,13 @@ func (hs horizontalScalingOpsHandler) Action(reqCtx intctrlutil.RequestCtx, cli
if currHorizontalScaling.Replicas != nil || v.Replicas != nil {
return true, nil
}
// if the earlier opsRequest is pending and not `Overwrite` operator, return false.
// if the earlier opsRequest is pending, return false.
if earlierOps.Status.Phase == appsv1alpha1.OpsPendingPhase {
return false, nil
}
if v.Shards != nil && currHorizontalScaling.Shards != nil {
return true, nil
}
// check if the instance to be taken offline was created by another opsRequest.
if err := hs.checkIntersectionWithEarlierOps(opsRes, earlierOps, currHorizontalScaling, v); err != nil {
return false, err
Expand All @@ -97,8 +100,22 @@ func (hs horizontalScalingOpsHandler) Action(reqCtx intctrlutil.RequestCtx, cli
return err
}

// update shard count
for i := range opsRes.Cluster.Spec.ShardingSpecs {
sharding := &opsRes.Cluster.Spec.ShardingSpecs[i]
if compOps, ok := compOpsSet.componentOpsSet[sharding.Name]; ok {
horizontalScaling := compOps.(appsv1alpha1.HorizontalScaling)
if horizontalScaling.Shards != nil {
sharding.Shards = *horizontalScaling.Shards
}
}
}

if err := compOpsSet.updateClusterComponentsAndShardings(opsRes.Cluster, func(compSpec *appsv1alpha1.ClusterComponentSpec, obj ComponentOpsInterface) error {
horizontalScaling := obj.(appsv1alpha1.HorizontalScaling)
if horizontalScaling.Shards != nil {
return nil
}
lastCompConfiguration := opsRes.OpsRequest.Status.LastConfiguration.Components[obj.GetComponentName()]
if horizontalScaling.ScaleIn != nil && len(horizontalScaling.ScaleIn.OnlineInstancesToOffline) > 0 {
// check if the instances are online.
Expand Down Expand Up @@ -146,14 +163,18 @@ func (hs horizontalScalingOpsHandler) ReconcileAction(reqCtx intctrlutil.Request
opsRes *OpsResource,
pgRes *progressResource,
compStatus *appsv1alpha1.OpsRequestComponentStatus) (int32, int32, error) {
lastCompConfiguration := opsRes.OpsRequest.Status.LastConfiguration.Components[pgRes.compOps.GetComponentName()]
horizontalScaling := pgRes.compOps.(appsv1alpha1.HorizontalScaling)
pgRes.noWaitComponentCompleted = true
if horizontalScaling.Shards != nil {
// horizontal scaling for shard count.
return handleComponentProgressForScalingShards(reqCtx, cli, opsRes, pgRes, compStatus)
}
var err error
lastCompConfiguration := opsRes.OpsRequest.Status.LastConfiguration.Components[pgRes.compOps.GetComponentName()]
pgRes.createdPodSet, pgRes.deletedPodSet, err = hs.getCreateAndDeletePodSet(opsRes, lastCompConfiguration, *pgRes.clusterComponent, horizontalScaling, pgRes.fullComponentName)
if err != nil {
return 0, 0, err
}
pgRes.noWaitComponentCompleted = true
return handleComponentProgressForScalingReplicas(reqCtx, cli, opsRes, pgRes, compStatus)
}
compOpsHelper := newComponentOpsHelper(opsRes.OpsRequest.Spec.HorizontalScalingList)
Expand All @@ -162,15 +183,26 @@ func (hs horizontalScalingOpsHandler) ReconcileAction(reqCtx intctrlutil.Request

// SaveLastConfiguration records last configuration to the OpsRequest.status.lastConfiguration
func (hs horizontalScalingOpsHandler) SaveLastConfiguration(reqCtx intctrlutil.RequestCtx, cli client.Client, opsRes *OpsResource) error {
compOpsHelper := newComponentOpsHelper(opsRes.OpsRequest.Spec.HorizontalScalingList)
shardsMap := make(map[string]int32, len(opsRes.Cluster.Spec.ShardingSpecs))
for _, v := range opsRes.Cluster.Spec.ShardingSpecs {
shardsMap[v.Name] = v.Shards
}
getLastComponentInfo := func(compSpec appsv1alpha1.ClusterComponentSpec, comOps ComponentOpsInterface) appsv1alpha1.LastComponentConfiguration {
lastCompConfiguration := appsv1alpha1.LastComponentConfiguration{
horizontalScaling := comOps.(appsv1alpha1.HorizontalScaling)
if horizontalScaling.Shards != nil {
var lastCompConfiguration appsv1alpha1.LastComponentConfiguration
if shards, ok := shardsMap[comOps.GetComponentName()]; ok {
lastCompConfiguration.Shards = pointer.Int32(shards)
}
return lastCompConfiguration
}
return appsv1alpha1.LastComponentConfiguration{
Replicas: pointer.Int32(compSpec.Replicas),
Instances: compSpec.Instances,
OfflineInstances: compSpec.OfflineInstances,
}
return lastCompConfiguration
}
compOpsHelper := newComponentOpsHelper(opsRes.OpsRequest.Spec.HorizontalScalingList)
compOpsHelper.saveLastConfigurations(opsRes, getLastComponentInfo)
return nil
}
Expand Down Expand Up @@ -217,6 +249,12 @@ func (hs horizontalScalingOpsHandler) getCreateAndDeletePodSet(opsRes *OpsResour

// Cancel this function defines the cancel horizontalScaling action.
func (hs horizontalScalingOpsHandler) Cancel(reqCtx intctrlutil.RequestCtx, cli client.Client, opsRes *OpsResource) error {
for _, v := range opsRes.OpsRequest.Spec.HorizontalScalingList {
if v.Shards != nil {
// This operation requires intervention by operations personnel.
return intctrlutil.NewErrorf(intctrlutil.ErrorIgnoreCancel, "does not support cancellation of shard count changes during horizontal scaling.")
}
}
compOpsHelper := newComponentOpsHelper(opsRes.OpsRequest.Spec.HorizontalScalingList)
if err := compOpsHelper.cancelComponentOps(reqCtx.Ctx, cli, opsRes, func(lastConfig *appsv1alpha1.LastComponentConfiguration, comp *appsv1alpha1.ClusterComponentSpec) {
comp.Replicas = *lastConfig.Replicas
Expand Down
101 changes: 100 additions & 1 deletion controllers/apps/operations/horizontal_scaling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,9 @@ var _ = Describe("HorizontalScaling OpsRequest", func() {
checkOpsRequestPhaseIsSucceed(reqCtx, opsRes)
})
createOpsAndToCreatingPhase := func(reqCtx intctrlutil.RequestCtx, opsRes *OpsResource, horizontalScaling appsv1alpha1.HorizontalScaling) *appsv1alpha1.OpsRequest {
horizontalScaling.ComponentName = consensusComp
if horizontalScaling.ComponentName == "" {
horizontalScaling.ComponentName = consensusComp
}
opsRes.OpsRequest = createHorizontalScaling(clusterName, horizontalScaling)
opsRes.OpsRequest.Spec.Force = true
// set ops phase to Pending
Expand Down Expand Up @@ -553,6 +555,103 @@ var _ = Describe("HorizontalScaling OpsRequest", func() {
Eventually(testapps.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(ops3))).Should(Equal(appsv1alpha1.OpsAbortedPhase))
Expect(opsRes.Cluster.Spec.GetComponentByName(consensusComp).Replicas).Should(BeEquivalentTo(4))
})

It("horizontal scaling for shards component", func() {
By("init operations resources")
opsRes, _, _ := initOperationsResources(clusterDefinitionName, clusterVersionName, clusterName)
// add a sharding component
Expect(testapps.ChangeObj(&testCtx, opsRes.Cluster, func(cluster *appsv1alpha1.Cluster) {
cluster.Spec.ShardingSpecs = []appsv1alpha1.ShardingSpec{
{
Name: secondaryCompName,
Shards: int32(3),
Template: cluster.Spec.ComponentSpecs[0],
},
}
})).Should(Succeed())

By("add two shards by set shards to 5")
reqCtx := intctrlutil.RequestCtx{Ctx: ctx}
_ = createOpsAndToCreatingPhase(reqCtx, opsRes, appsv1alpha1.HorizontalScaling{
ComponentOps: appsv1alpha1.ComponentOps{ComponentName: secondaryCompName},
Shards: pointer.Int32(5),
})
Eventually(testapps.CheckObj(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest), func(g Gomega, ops *appsv1alpha1.OpsRequest) {
g.Expect(*ops.Status.LastConfiguration.Components[secondaryCompName].Shards).Should(BeEquivalentTo(3))
})).Should(Succeed())

By("expect component shards to 5")
Eventually(testapps.CheckObj(&testCtx, client.ObjectKeyFromObject(opsRes.Cluster), func(g Gomega, cluster *appsv1alpha1.Cluster) {
g.Expect(cluster.Spec.ShardingSpecs[0].Shards).Should(BeEquivalentTo(5))
})).Should(Succeed())

By("mock the new components")
createComponent := func(compName string) *appsv1alpha1.Component {
comp := testapps.NewComponentFactory(testCtx.DefaultNamespace, opsRes.Cluster.Name+"-"+compName, testapps.CompDefinitionName).
AddLabels(constant.AppInstanceLabelKey, opsRes.Cluster.Name).
AddLabels(constant.KBAppClusterUIDLabelKey, string(opsRes.Cluster.UID)).
AddLabels(constant.KBAppShardingNameLabelKey, secondaryCompName).
AddLabels(constant.KBAppComponentLabelKey, compName).
SetReplicas(3).
Create(&testCtx).
GetObject()
Expect(testapps.ChangeObjStatus(&testCtx, comp, func() {
comp.Status.Phase = appsv1alpha1.CreatingClusterCompPhase
})).Should(Succeed())
return comp
}
comp4 := createComponent(secondaryCompName + "-comp4")
comp5 := createComponent(secondaryCompName + "-comp5")
_, err := GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())
Eventually(testapps.CheckObj(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest), func(g Gomega, pobj *appsv1alpha1.OpsRequest) {
g.Expect(pobj.Status.Progress).Should(Equal("0/2"))
g.Expect(pobj.Status.Components[secondaryCompName].ProgressDetails).Should(HaveLen(2))
})).Should(Succeed())

By("expect ops phase to succeed when new components are running")
// mock components and cluster is running
Expect(testapps.ChangeObjStatus(&testCtx, comp4, func() {
comp4.Status.Phase = appsv1alpha1.RunningClusterCompPhase
})).Should(Succeed())
Expect(testapps.ChangeObjStatus(&testCtx, comp5, func() {
comp5.Status.Phase = appsv1alpha1.RunningClusterCompPhase
})).Should(Succeed())
_, err = GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())
Eventually(testapps.CheckObj(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest), func(g Gomega, pobj *appsv1alpha1.OpsRequest) {
g.Expect(pobj.Status.Progress).Should(Equal("2/2"))
g.Expect(pobj.Status.Phase).Should(Equal(appsv1alpha1.OpsSucceedPhase))
})).Should(Succeed())

By("delete one shard by set shards to 4")
reqCtx = intctrlutil.RequestCtx{Ctx: ctx}
_ = createOpsAndToCreatingPhase(reqCtx, opsRes, appsv1alpha1.HorizontalScaling{
ComponentOps: appsv1alpha1.ComponentOps{ComponentName: secondaryCompName},
Shards: pointer.Int32(4),
})
Eventually(testapps.CheckObj(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest), func(g Gomega, ops *appsv1alpha1.OpsRequest) {
g.Expect(*ops.Status.LastConfiguration.Components[secondaryCompName].Shards).Should(BeEquivalentTo(5))
})).Should(Succeed())

By("expect component shards to 5")
Eventually(testapps.CheckObj(&testCtx, client.ObjectKeyFromObject(opsRes.Cluster), func(g Gomega, cluster *appsv1alpha1.Cluster) {
g.Expect(cluster.Spec.ShardingSpecs[0].Shards).Should(BeEquivalentTo(4))
})).Should(Succeed())

By("expect ops phase to succeed when the component is deleted")
// Create 3 components to mock already existing components.
createComponent(secondaryCompName + "-comp1")
createComponent(secondaryCompName + "-comp2")
createComponent(secondaryCompName + "-comp3")
testapps.DeleteObject(&testCtx, client.ObjectKeyFromObject(comp5), &appsv1alpha1.Component{})
_, err = GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())
Eventually(testapps.CheckObj(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest), func(g Gomega, pobj *appsv1alpha1.OpsRequest) {
g.Expect(pobj.Status.Progress).Should(Equal("1/1"))
g.Expect(pobj.Status.Phase).Should(Equal(appsv1alpha1.OpsSucceedPhase))
})).Should(Succeed())
})
})
})

Expand Down
Loading

0 comments on commit fc95641

Please sign in to comment.