Skip to content

Commit

Permalink
feat: support component-level stop and start capabilities (cherry-pick
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyelei authored Nov 19, 2024
1 parent c4e3433 commit 98ba122
Show file tree
Hide file tree
Showing 15 changed files with 531 additions and 67 deletions.
22 changes: 22 additions & 0 deletions apis/apps/v1alpha1/opsrequest_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,28 @@ type SpecificOpsRequest struct {
// +listMapKey=componentName
VolumeExpansionList []VolumeExpansion `json:"volumeExpansion,omitempty" patchStrategy:"merge,retainKeys" patchMergeKey:"componentName"`

// Lists Components to be started. If empty, all components will be started.
//
// +optional
// +kubebuilder:validation:XValidation:rule="self == oldSelf",message="forbidden to update spec.start"
// +kubebuilder:validation:MaxItems=1024
// +patchMergeKey=componentName
// +patchStrategy=merge,retainKeys
// +listType=map
// +listMapKey=componentName
StartList []ComponentOps `json:"start,omitempty" patchStrategy:"merge,retainKeys" patchMergeKey:"componentName"`

// Lists Components to be stopped. If empty, all components will be stopped.
//
// +optional
// +kubebuilder:validation:XValidation:rule="self == oldSelf",message="forbidden to update spec.stop"
// +kubebuilder:validation:MaxItems=1024
// +patchMergeKey=componentName
// +patchStrategy=merge,retainKeys
// +listType=map
// +listMapKey=componentName
StopList []ComponentOps `json:"stop,omitempty" patchStrategy:"merge,retainKeys" patchMergeKey:"componentName"`

// Lists Components to be restarted.
//
// +optional
Expand Down
10 changes: 10 additions & 0 deletions apis/apps/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 42 additions & 0 deletions config/crd/bases/apps.kubeblocks.io_opsrequests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4723,6 +4723,48 @@ spec:
required:
- componentName
type: object
start:
description: Lists Components to be started. If empty, all components
will be started.
items:
description: ComponentOps specifies the Component to be operated
on.
properties:
componentName:
description: Specifies the name of the Component.
type: string
required:
- componentName
type: object
maxItems: 1024
type: array
x-kubernetes-list-map-keys:
- componentName
x-kubernetes-list-type: map
x-kubernetes-validations:
- message: forbidden to update spec.start
rule: self == oldSelf
stop:
description: Lists Components to be stopped. If empty, all components
will be stopped.
items:
description: ComponentOps specifies the Component to be operated
on.
properties:
componentName:
description: Specifies the name of the Component.
type: string
required:
- componentName
type: object
maxItems: 1024
type: array
x-kubernetes-list-map-keys:
- componentName
x-kubernetes-list-type: map
x-kubernetes-validations:
- message: forbidden to update spec.stop
rule: self == oldSelf
switchover:
description: Lists Switchover objects, each specifying a Component
to perform the switchover operation.
Expand Down
9 changes: 9 additions & 0 deletions controllers/apps/operations/ops_comp_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,3 +291,12 @@ func (c componentOpsHelper) reconcileActionWithComponentOps(reqCtx intctrlutil.R
}
return appsv1alpha1.OpsSucceedPhase, 0, nil
}

func hasIntersectionCompOpsList[T ComponentOpsInterface, S ComponentOpsInterface](currCompOpsMap map[string]T, list []S) bool {
for _, comp := range list {
if _, ok := currCompOpsMap[comp.GetComponentName()]; ok {
return true
}
}
return false
}
5 changes: 3 additions & 2 deletions controllers/apps/operations/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,14 @@ func (r restartOpsHandler) Action(reqCtx intctrlutil.RequestCtx, cli client.Clie
return fmt.Errorf("status.startTimestamp can not be null")
}
// abort earlier running vertical scaling opsRequest.
r.compOpsHelper = newComponentOpsHelper(opsRes.OpsRequest.Spec.RestartList)
// abort earlier running 'Restart' opsRequest.
if err := abortEarlierOpsRequestWithSameKind(reqCtx, cli, opsRes, []appsv1alpha1.OpsType{appsv1alpha1.RestartType},
func(earlierOps *appsv1alpha1.OpsRequest) (bool, error) {
return true, nil
return hasIntersectionCompOpsList(r.compOpsHelper.componentOpsSet, earlierOps.Spec.RestartList), nil
}); err != nil {
return err
}
r.compOpsHelper = newComponentOpsHelper(opsRes.OpsRequest.Spec.RestartList)
componentKindList := []client.ObjectList{
&appv1.StatefulSetList{},
&workloads.InstanceSetList{},
Expand Down
16 changes: 12 additions & 4 deletions controllers/apps/operations/restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,20 @@ var _ = Describe("Restart OpsRequest", func() {
})
})

func createRestartOpsObj(clusterName, restartOpsName string) *appsv1alpha1.OpsRequest {
func createRestartOpsObj(clusterName, restartOpsName string, componentNames ...string) *appsv1alpha1.OpsRequest {
ops := testapps.NewOpsRequestObj(restartOpsName, testCtx.DefaultNamespace,
clusterName, appsv1alpha1.RestartType)
ops.Spec.RestartList = []appsv1alpha1.ComponentOps{
{ComponentName: consensusComp},
{ComponentName: statelessComp},
if len(componentNames) == 0 {
ops.Spec.RestartList = []appsv1alpha1.ComponentOps{
{ComponentName: consensusComp},
{ComponentName: statelessComp},
}
} else {
for _, compName := range componentNames {
ops.Spec.RestartList = append(ops.Spec.RestartList, appsv1alpha1.ComponentOps{
ComponentName: compName,
})
}
}
opsRequest := testapps.CreateOpsRequest(ctx, testCtx, ops)
opsRequest.Status.Phase = appsv1alpha1.OpsPendingPhase
Expand Down
47 changes: 33 additions & 14 deletions controllers/apps/operations/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@ type StartOpsHandler struct{}
var _ OpsHandler = StartOpsHandler{}

func init() {
stopBehaviour := OpsBehaviour{
FromClusterPhases: []appsv1alpha1.ClusterPhase{appsv1alpha1.StoppedClusterPhase},
ToClusterPhase: appsv1alpha1.UpdatingClusterPhase,
QueueByCluster: true,
OpsHandler: StartOpsHandler{},
startBehaviour := OpsBehaviour{
FromClusterPhases: append(appsv1alpha1.GetClusterUpRunningPhases(), appsv1alpha1.UpdatingClusterPhase,
appsv1alpha1.StoppedClusterPhase, appsv1alpha1.StoppingClusterPhase),
ToClusterPhase: appsv1alpha1.UpdatingClusterPhase,
QueueByCluster: true,
OpsHandler: StartOpsHandler{},
}

opsMgr := GetOpsManager()
opsMgr.RegisterOps(appsv1alpha1.StartType, stopBehaviour)
opsMgr.RegisterOps(appsv1alpha1.StartType, startBehaviour)
}

// ActionStartedCondition the started condition when handling the start request.
Expand All @@ -55,15 +56,33 @@ func (start StartOpsHandler) ActionStartedCondition(reqCtx intctrlutil.RequestCt
func (start StartOpsHandler) Action(reqCtx intctrlutil.RequestCtx, cli client.Client, opsRes *OpsResource) error {
var (
cluster = opsRes.Cluster
startComp = func(compSpec *appsv1alpha1.ClusterComponentSpec) {
compSpec.Stop = nil
}
startList = opsRes.OpsRequest.Spec.StartList
)
for i := range cluster.Spec.ComponentSpecs {
startComp(&cluster.Spec.ComponentSpecs[i])
compOpsHelper := newComponentOpsHelper(startList)
// abort earlier running opsRequests.
if err := abortEarlierOpsRequestWithSameKind(reqCtx, cli, opsRes, []appsv1alpha1.OpsType{appsv1alpha1.StopType},
func(earlierOps *appsv1alpha1.OpsRequest) (bool, error) {
if len(startList) == 0 {
// start all components
return true, nil
}
return len(earlierOps.Spec.StopList) == 0 || hasIntersectionCompOpsList(compOpsHelper.componentOpsSet, earlierOps.Spec.StopList), nil
}); err != nil {
return err
}
startComp := func(compSpec *appsv1alpha1.ClusterComponentSpec, clusterCompName string) {
if len(startList) > 0 {
if _, ok := compOpsHelper.componentOpsSet[clusterCompName]; !ok {
return
}
}
compSpec.Stop = nil
}
for i, v := range cluster.Spec.ComponentSpecs {
startComp(&cluster.Spec.ComponentSpecs[i], v.Name)
}
for i := range cluster.Spec.ShardingSpecs {
startComp(&cluster.Spec.ShardingSpecs[i].Template)
for i, v := range cluster.Spec.ShardingSpecs {
startComp(&cluster.Spec.ShardingSpecs[i].Template, v.Name)
}
return cli.Update(reqCtx.Ctx, cluster)
}
Expand All @@ -84,7 +103,7 @@ func (start StartOpsHandler) ReconcileAction(reqCtx intctrlutil.RequestCtx, cli
}
return handleComponentProgressForScalingReplicas(reqCtx, cli, opsRes, pgRes, compStatus)
}
compOpsHelper := newComponentOpsHelper([]appsv1alpha1.ComponentOps{})
compOpsHelper := newComponentOpsHelper(opsRes.OpsRequest.Spec.StartList)
return compOpsHelper.reconcileActionWithComponentOps(reqCtx, cli, opsRes, "start", handleComponentProgress)
}

Expand Down
98 changes: 89 additions & 9 deletions controllers/apps/operations/start_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package operations
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/utils/pointer"

"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -38,6 +39,7 @@ var _ = Describe("Start OpsRequest", func() {
clusterDefinitionName = "cluster-definition-for-ops-" + randomStr
clusterVersionName = "clusterversion-for-ops-" + randomStr
clusterName = "cluster-for-ops-" + randomStr
clusterDefName = "test-clusterdef-" + randomStr
)

cleanEnv := func() {
Expand All @@ -48,7 +50,7 @@ var _ = Describe("Start OpsRequest", func() {
By("clean resources")

// delete cluster(and all dependent sub-resources), clusterversion and clusterdef
testapps.ClearClusterResources(&testCtx)
testapps.ClearClusterResourcesWithRemoveFinalizerOption(&testCtx)

// delete rest resources
inNS := client.InNamespace(testCtx.DefaultNamespace)
Expand All @@ -63,17 +65,32 @@ var _ = Describe("Start OpsRequest", func() {
AfterEach(cleanEnv)

Context("Test OpsRequest", func() {
createStartOpsRequest := func(opsRes *OpsResource, startCompNames ...string) *appsv1alpha1.OpsRequest {
By("create Start opsRequest")
ops := testapps.NewOpsRequestObj("start-ops-"+testCtx.GetRandomStr(), testCtx.DefaultNamespace,
clusterName, appsv1alpha1.StartType)
var startList []appsv1alpha1.ComponentOps
for _, startCompName := range startCompNames {
startList = append(startList, appsv1alpha1.ComponentOps{
ComponentName: startCompName,
})
}
ops.Spec.StartList = startList
opsRes.OpsRequest = testapps.CreateOpsRequest(ctx, testCtx, ops)
// set ops phase to Pending
opsRes.OpsRequest.Status.Phase = appsv1alpha1.OpsPendingPhase
return ops
}

It("Test start OpsRequest", func() {
By("init operations resources ")
reqCtx := intctrlutil.RequestCtx{Ctx: ctx}
opsRes, _, _ := initOperationsResources(clusterDefinitionName, clusterVersionName, clusterName)
testapps.MockInstanceSetComponent(&testCtx, clusterName, consensusComp)
testapps.MockInstanceSetComponent(&testCtx, clusterName, statelessComp)
testapps.MockInstanceSetComponent(&testCtx, clusterName, statefulComp)
By("create Start opsRequest")
ops := testapps.NewOpsRequestObj("start-ops-"+randomStr, testCtx.DefaultNamespace,
clusterName, appsv1alpha1.StartType)
opsRes.OpsRequest = testapps.CreateOpsRequest(ctx, testCtx, ops)
By("create 'Start' opsRequest")
createStartOpsRequest(opsRes)

By("test start action and reconcile function")
Expect(opsutil.UpdateClusterOpsAnnotations(ctx, k8sClient, opsRes.Cluster, nil)).Should(Succeed())
Expand All @@ -83,18 +100,81 @@ var _ = Describe("Start OpsRequest", func() {
})).ShouldNot(HaveOccurred())

// set ops phase to Pending
opsRes.OpsRequest.Status.Phase = appsv1alpha1.OpsPendingPhase
_, err := GetOpsManager().Do(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())
runAction(reqCtx, opsRes, appsv1alpha1.OpsCreatingPhase)
Eventually(testapps.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(appsv1alpha1.OpsCreatingPhase))
// do start action
_, err = GetOpsManager().Do(reqCtx, k8sClient, opsRes)
_, err := GetOpsManager().Do(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())
for _, v := range opsRes.Cluster.Spec.ComponentSpecs {
Expect(v.Stop).Should(BeNil())
}
_, err = GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes)
Expect(err == nil).Should(BeTrue())
})
It("Test start specific components OpsRequest", func() {
By("init operations resources with topology")
opsRes, _, _ := initOperationsResourcesWithTopology(clusterDefName, clusterDefName, clusterName)
// mock components is stopped
Expect(testapps.ChangeObj(&testCtx, opsRes.Cluster, func(pobj *appsv1alpha1.Cluster) {
for i := range pobj.Spec.ComponentSpecs {
pobj.Spec.ComponentSpecs[i].Stop = pointer.Bool(true)
}
})).Should(Succeed())

By("create 'Start' opsRequest for specific components")
createStartOpsRequest(opsRes, defaultCompName)

By("mock 'Start' OpsRequest to Creating phase")
reqCtx := intctrlutil.RequestCtx{Ctx: ctx}
runAction(reqCtx, opsRes, appsv1alpha1.OpsCreatingPhase)

By("test start action")
startHandler := StartOpsHandler{}
err := startHandler.Action(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())

By("verify components are being started")
Eventually(testapps.CheckObj(&testCtx, client.ObjectKeyFromObject(opsRes.Cluster), func(g Gomega, pobj *appsv1alpha1.Cluster) {
for _, v := range pobj.Spec.ComponentSpecs {
if v.Name == defaultCompName {
Expect(v.Stop).Should(BeNil())
} else {
Expect(v.Stop).ShouldNot(BeNil())
Expect(*v.Stop).Should(BeTrue())
}
}
})).Should(Succeed())

By("mock components start successfully")
testapps.MockInstanceSetPods(&testCtx, nil, opsRes.Cluster, defaultCompName)
testapps.MockInstanceSetStatus(testCtx, opsRes.Cluster, defaultCompName)

By("test reconcile")
_, err = GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())

By("verify ops request completed")
Eventually(testapps.GetOpsRequestPhase(&testCtx,
client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(appsv1alpha1.OpsSucceedPhase))
})

It("Test abort running 'Stop' opsRequest", func() {
By("init operations resources with topology")
opsRes, _, _ := initOperationsResourcesWithTopology(clusterDefName, clusterDefName, clusterName)
reqCtx := intctrlutil.RequestCtx{Ctx: ctx}

By("create 'Stop' opsRequest for all components")
stopOps := createStopOpsRequest(opsRes, defaultCompName)
runAction(reqCtx, opsRes, appsv1alpha1.OpsCreatingPhase)

By("create a start opsRequest")
createStartOpsRequest(opsRes, defaultCompName)
startHandler := StartOpsHandler{}
err := startHandler.Action(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())

By("expect the 'Stop' OpsRequest to be Aborted")
Eventually(testapps.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(stopOps))).Should(Equal(appsv1alpha1.OpsAbortedPhase))
})
})
})
Loading

0 comments on commit 98ba122

Please sign in to comment.