Skip to content

Commit

Permalink
chore: check the API version within the reconcile loop (#8616)
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyelei authored Dec 11, 2024
1 parent 03504a8 commit 9e26c53
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 29 deletions.
2 changes: 1 addition & 1 deletion controllers/apps/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct

// SetupWithManager sets up the controller with the Manager.
func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
return intctrlutil.NewControllerManagedBy(mgr, &appsv1alpha1.Cluster{}, &appsv1alpha1.Component{}).
return intctrlutil.NewControllerManagedBy(mgr).
For(&appsv1alpha1.Cluster{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: int(math.Ceil(viper.GetFloat64(constant.CfgKBReconcileWorkers) / 4)),
Expand Down
6 changes: 5 additions & 1 deletion controllers/apps/clusterdefinition_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ func (r *ClusterDefinitionReconciler) Reconcile(ctx context.Context, req ctrl.Re
return intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, "")
}

if !intctrlutil.ObjectAPIVersionSupported(clusterDef) {
return intctrlutil.Reconciled()
}

if res, err := intctrlutil.HandleCRDeletion(reqCtx, r, clusterDef,
clusterDefinitionFinalizerName, r.deletionHandler(reqCtx, clusterDef)); res != nil {
return *res, err
Expand Down Expand Up @@ -99,7 +103,7 @@ func (r *ClusterDefinitionReconciler) Reconcile(ctx context.Context, req ctrl.Re

// SetupWithManager sets up the controller with the Manager.
func (r *ClusterDefinitionReconciler) SetupWithManager(mgr ctrl.Manager) error {
return intctrlutil.NewControllerManagedBy(mgr, &appsv1alpha1.ClusterDefinition{}).
return intctrlutil.NewControllerManagedBy(mgr).
For(&appsv1alpha1.ClusterDefinition{}).
Complete(r)
}
Expand Down
4 changes: 2 additions & 2 deletions controllers/apps/component_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (r *ComponentReconciler) SetupWithManager(mgr ctrl.Manager, multiClusterMgr
}

func (r *ComponentReconciler) setupWithManager(mgr ctrl.Manager) error {
b := intctrlutil.NewControllerManagedBy(mgr, &appsv1alpha1.Component{}, &workloads.InstanceSet{}).
b := intctrlutil.NewControllerManagedBy(mgr).
For(&appsv1alpha1.Component{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: viper.GetInt(constant.CfgKBReconcileWorkers),
Expand Down Expand Up @@ -241,7 +241,7 @@ func (r *ComponentReconciler) setupWithManager(mgr ctrl.Manager) error {
}

func (r *ComponentReconciler) setupWithMultiClusterManager(mgr ctrl.Manager, multiClusterMgr multicluster.Manager) error {
b := intctrlutil.NewControllerManagedBy(mgr, &appsv1alpha1.Component{}, &workloads.InstanceSet{}).
b := intctrlutil.NewControllerManagedBy(mgr).
For(&appsv1alpha1.Component{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: viper.GetInt(constant.CfgKBReconcileWorkers),
Expand Down
2 changes: 1 addition & 1 deletion controllers/apps/componentdefinition_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (r *ComponentDefinitionReconciler) Reconcile(ctx context.Context, req ctrl.

// SetupWithManager sets up the controller with the Manager.
func (r *ComponentDefinitionReconciler) SetupWithManager(mgr ctrl.Manager) error {
return intctrlutil.NewControllerManagedBy(mgr, &appsv1alpha1.ComponentDefinition{}).
return intctrlutil.NewControllerManagedBy(mgr).
For(&appsv1alpha1.ComponentDefinition{}).
Complete(r)
}
Expand Down
5 changes: 4 additions & 1 deletion controllers/apps/componentversion_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,16 @@ func (r *ComponentVersionReconciler) Reconcile(ctx context.Context, req ctrl.Req
if err := r.Client.Get(rctx.Ctx, rctx.Req.NamespacedName, compVersion); err != nil {
return intctrlutil.CheckedRequeueWithError(err, rctx.Log, "")
}
if !intctrlutil.ObjectAPIVersionSupported(compVersion) {
return intctrlutil.Reconciled()
}

return r.reconcile(rctx, compVersion)
}

// SetupWithManager sets up the controller with the Manager.
func (r *ComponentVersionReconciler) SetupWithManager(mgr ctrl.Manager) error {
return intctrlutil.NewControllerManagedBy(mgr, &appsv1alpha1.ComponentVersion{}, &appsv1alpha1.ComponentDefinition{}).
return intctrlutil.NewControllerManagedBy(mgr).
For(&appsv1alpha1.ComponentVersion{}).
Watches(&appsv1alpha1.ComponentDefinition{}, handler.EnqueueRequestsFromMapFunc(r.compatibleCompVersion)).
Complete(r)
Expand Down
4 changes: 4 additions & 0 deletions controllers/apps/transformer_cluster_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/controller/graph"
"github.com/apecloud/kubeblocks/pkg/controller/model"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
)

type clusterInitTransformer struct {
Expand All @@ -38,5 +39,8 @@ func (t *clusterInitTransformer) Transform(ctx graph.TransformContext, dag *grap

// init dag
graphCli.Root(dag, transCtx.OrigCluster, transCtx.Cluster, model.ActionStatusPtr())
if !intctrlutil.ObjectAPIVersionSupported(transCtx.Cluster) {
return graph.ErrPrematureStop
}
return nil
}
4 changes: 4 additions & 0 deletions controllers/apps/transformer_component_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package apps
import (
"github.com/apecloud/kubeblocks/pkg/controller/graph"
"github.com/apecloud/kubeblocks/pkg/controller/model"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
)

type componentInitTransformer struct{}
Expand All @@ -38,5 +39,8 @@ func (t *componentInitTransformer) Transform(ctx graph.TransformContext, dag *gr
// init placement
transCtx.Context = intoContext(transCtx.Context, placement(transCtx.Component))

if !intctrlutil.ObjectAPIVersionSupported(transCtx.Component) {
return graph.ErrPrematureStop
}
return nil
}
5 changes: 3 additions & 2 deletions controllers/workloads/instanceset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (r *InstanceSetReconciler) Reconcile(ctx context.Context, req ctrl.Request)

res, err := kubebuilderx.NewController(ctx, r.Client, req, r.Recorder, logger).
Prepare(instanceset.NewTreeLoader()).
Do(instanceset.NewAPIVersionReconciler()).
Do(instanceset.NewFixMetaReconciler()).
Do(instanceset.NewDeletionReconciler()).
Do(instanceset.NewStatusReconciler()).
Expand Down Expand Up @@ -112,7 +113,7 @@ func (r *InstanceSetReconciler) SetupWithManager(mgr ctrl.Manager, multiClusterM
func (r *InstanceSetReconciler) setupWithManager(mgr ctrl.Manager, ctx *handler.FinderContext) error {
itsFinder := handler.NewLabelFinder(&workloads.InstanceSet{}, instanceset.WorkloadsManagedByLabelKey, workloads.Kind, instanceset.WorkloadsInstanceLabelKey)
podHandler := handler.NewBuilder(ctx).AddFinder(itsFinder).Build()
return intctrlutil.NewControllerManagedBy(mgr, &workloads.InstanceSet{}).
return intctrlutil.NewControllerManagedBy(mgr).
For(&workloads.InstanceSet{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: viper.GetInt(constant.CfgKBReconcileWorkers),
Expand All @@ -132,7 +133,7 @@ func (r *InstanceSetReconciler) setupWithMultiClusterManager(mgr ctrl.Manager,
// TODO: modify handler.getObjectFromKey to support running Job in data clusters
jobHandler := handler.NewBuilder(ctx).AddFinder(delegatorFinder).Build()

b := intctrlutil.NewControllerManagedBy(mgr, &workloads.InstanceSet{}).
b := intctrlutil.NewControllerManagedBy(mgr).
For(&workloads.InstanceSet{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: viper.GetInt(constant.CfgKBReconcileWorkers),
Expand Down
43 changes: 43 additions & 0 deletions pkg/controller/instanceset/reconciler_api_version.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
Copyright (C) 2022-2024 ApeCloud Co., Ltd
This file is part of KubeBlocks project
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package instanceset

import (
"github.com/apecloud/kubeblocks/pkg/controller/kubebuilderx"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
)

type apiVersionReconciler struct{}

func (r *apiVersionReconciler) PreCondition(tree *kubebuilderx.ObjectTree) *kubebuilderx.CheckResult {
if tree.GetRoot() == nil {
return kubebuilderx.ConditionUnsatisfied
}
return kubebuilderx.ConditionSatisfied
}

func (r *apiVersionReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilderx.Result, error) {
if intctrlutil.ObjectAPIVersionSupported(tree.GetRoot()) {
return kubebuilderx.Continue, nil
}
return kubebuilderx.Commit, nil
}

func NewAPIVersionReconciler() kubebuilderx.Reconciler {
return &apiVersionReconciler{}
}

var _ kubebuilderx.Reconciler = &apiVersionReconciler{}
65 changes: 65 additions & 0 deletions pkg/controller/instanceset/reconciler_api_version_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
Copyright (C) 2022-2024 ApeCloud Co., Ltd
This file is part of KubeBlocks project
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package instanceset

import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1"
viper "github.com/apecloud/kubeblocks/pkg/viperx"

"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/builder"
"github.com/apecloud/kubeblocks/pkg/controller/kubebuilderx"
)

var _ = Describe("api version reconciler test", func() {
Context("PreCondition & Reconcile", func() {
It("should work well", func() {
By("PreCondition")
its := builder.NewInstanceSetBuilder(namespace, name).GetObject()
tree := kubebuilderx.NewObjectTree()
tree.SetRoot(its)
reconciler := NewAPIVersionReconciler()
Expect(reconciler.PreCondition(tree)).Should(Equal(kubebuilderx.ConditionSatisfied))

By("Reconcile without dual mode operator")
tree.SetRoot(its)
res, err := reconciler.Reconcile(tree)
Expect(err).Should(BeNil())
Expect(res).Should(Equal(kubebuilderx.Continue))

By("Reconcile with supported api version and using dual mode operator")
viper.Set(constant.DualOperatorsMode, true)
if its.Annotations == nil {
its.Annotations = make(map[string]string)
}
its.Annotations[constant.CRDAPIVersionAnnotationKey] = workloads.GroupVersion.String()
tree.SetRoot(its)
res, err = reconciler.Reconcile(tree)
Expect(err).Should(BeNil())
Expect(res).Should(Equal(kubebuilderx.Continue))

By("Reconcile without dual mode operator")
delete(its.Annotations, constant.CRDAPIVersionAnnotationKey)
tree.SetRoot(its)
res, err = reconciler.Reconcile(tree)
Expect(err).Should(BeNil())
Expect(res).Should(Equal(kubebuilderx.Continue))
})
})
})
36 changes: 15 additions & 21 deletions pkg/controllerutil/predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,9 @@ var (
)
)

func NewControllerManagedBy(mgr manager.Manager, objs ...client.Object) *builder.Builder {
b := ctrl.NewControllerManagedBy(mgr).
func NewControllerManagedBy(mgr manager.Manager) *builder.Builder {
return ctrl.NewControllerManagedBy(mgr).
WithEventFilter(predicate.NewPredicateFuncs(namespacePredicateFilter))
if len(objs) > 0 {
b.WithEventFilter(predicate.NewPredicateFuncs(newAPIVersionPredicateFilter(objs)))
}
return b
}

func namespacePredicateFilter(object client.Object) bool {
Expand All @@ -125,20 +121,18 @@ func namespacePredicateFilter(object client.Object) bool {
return managedNamespaces.Has(object.GetNamespace())
}

func newAPIVersionPredicateFilter(objs []client.Object) func(client.Object) bool {
return func(obj client.Object) bool {
if !viper.GetBool(constant.DualOperatorsMode) {
return true
}
_, clusterObj := obj.(*appsv1alpha1.Cluster)
annotations := obj.GetAnnotations()
if annotations == nil {
return !clusterObj // for newly created clusters, let the new operator handle them first
}
apiVersion, ok := annotations[constant.CRDAPIVersionAnnotationKey]
if !ok {
return !clusterObj // for newly created clusters, let the new operator handle them first
}
return supportedCRDAPIVersions.Has(apiVersion)
func ObjectAPIVersionSupported(obj client.Object) bool {
if !viper.GetBool(constant.DualOperatorsMode) {
return true
}
_, clusterObj := obj.(*appsv1alpha1.Cluster)
annotations := obj.GetAnnotations()
if annotations == nil {
return !clusterObj // for newly created clusters, let the new operator handle them first
}
apiVersion, ok := annotations[constant.CRDAPIVersionAnnotationKey]
if !ok {
return !clusterObj // for newly created clusters, let the new operator handle them first
}
return supportedCRDAPIVersions.Has(apiVersion)
}

0 comments on commit 9e26c53

Please sign in to comment.