Skip to content

Commit

Permalink
chore: filter events to reconcile by CRD API version (#8522)
Browse files Browse the repository at this point in the history
  • Loading branch information
leon-inf authored Nov 29, 2024
1 parent ea5914a commit 6931535
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 113 deletions.
10 changes: 2 additions & 8 deletions cmd/dataprotection/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,6 @@ const (
multiClusterContextsDisabledFlagKey flagName = "multi-cluster-contexts-disabled"

userAgentFlagKey flagName = "user-agent"

// dual-operators-mode indicates whether the operator runs in dual-operators mode.
// If it's true, the operator will degrade to a secondary operator and only manage the resources dedicated to releases prior to v1.0.
dualOperatorsModeFlag flagName = "dual-operators-mode"
)

var (
Expand Down Expand Up @@ -122,6 +118,7 @@ func init() {
viper.SetDefault(dptypes.CfgKeyWorkerServiceAccountAnnotations, "{}")
viper.SetDefault(dptypes.CfgKeyWorkerClusterRoleName, "kubeblocks-dataprotection-worker-role")
viper.SetDefault(dptypes.CfgDataProtectionReconcileWorkers, runtime.NumCPU())
viper.SetDefault(constant.DualOperatorsMode, false)
}

func main() {
Expand Down Expand Up @@ -152,8 +149,6 @@ func main() {

flag.String(userAgentFlagKey.String(), "", "User agent of the operator.")

flag.Bool(dualOperatorsModeFlag.String(), false, "Whether the operator runs in dual-operators mode.")

flag.String(constant.ManagedNamespacesFlag, "",
"The namespaces that the operator will manage, multiple namespaces are separated by commas.")

Expand Down Expand Up @@ -279,8 +274,7 @@ func main() {
client = multiClusterMgr.GetClient()
}

dualOperatorsMode := viper.GetBool(dualOperatorsModeFlag.viperName())
if !dualOperatorsMode {
if !viper.GetBool(constant.DualOperatorsMode) {
if err = (&dpcontrollers.ActionSetReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Expand Down
151 changes: 77 additions & 74 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,6 @@ const (
multiClusterContextsDisabledFlagKey flagName = "multi-cluster-contexts-disabled"

userAgentFlagKey flagName = "user-agent"

// dual-operators-mode indicates whether the operator runs in dual-operators mode.
// If it's true, the operator will degrade to a secondary operator and only manage the resources dedicated to releases prior to v1.0.
dualOperatorsModeFlag flagName = "dual-operators-mode"
)

var (
Expand Down Expand Up @@ -150,6 +146,7 @@ func init() {
viper.SetDefault(constant.FeatureGateComponentReplicasAnnotation, true)
viper.SetDefault(constant.FeatureGateInPlacePodVerticalScaling, false)
viper.SetDefault(constant.FeatureGateNoRSMEnv, false)
viper.SetDefault(constant.DualOperatorsMode, false)
}

type flagName string
Expand Down Expand Up @@ -191,8 +188,6 @@ func setupFlags() {

flag.String(userAgentFlagKey.String(), "", "User agent of the operator.")

flag.Bool(dualOperatorsModeFlag.String(), false, "Whether the operator runs in dual-operators mode.")

opts := zap.Options{
Development: false,
}
Expand Down Expand Up @@ -434,49 +429,53 @@ func main() {
os.Exit(1)
}

if err = (&appscontrollers.OpsDefinitionReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("ops-definition-controller"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "OpsDefinition")
os.Exit(1)
}

if err = (&appscontrollers.OpsRequestReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("ops-request-controller"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "OpsRequest")
os.Exit(1)
}

if err = (&configuration.ConfigConstraintReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("config-constraint-controller"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ConfigConstraint")
os.Exit(1)
}

if err = (&configuration.ReconfigureReconciler{
Client: client,
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("reconfigure-controller"),
}).SetupWithManager(mgr, multiClusterMgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ReconfigureRequest")
os.Exit(1)
}

if err = (&configuration.ConfigurationReconciler{
Client: client,
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("configuration-controller"),
}).SetupWithManager(mgr, multiClusterMgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Configuration")
os.Exit(1)
if !viper.GetBool(constant.DualOperatorsMode) {
if err = (&appscontrollers.OpsDefinitionReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("ops-definition-controller"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "OpsDefinition")
os.Exit(1)
}

if err = (&appscontrollers.OpsRequestReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("ops-request-controller"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "OpsRequest")
os.Exit(1)
}
}

if !viper.GetBool(constant.DualOperatorsMode) {
if err = (&configuration.ConfigConstraintReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("config-constraint-controller"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ConfigConstraint")
os.Exit(1)
}

if err = (&configuration.ReconfigureReconciler{
Client: client,
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("reconfigure-controller"),
}).SetupWithManager(mgr, multiClusterMgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ReconfigureRequest")
os.Exit(1)
}

if err = (&configuration.ConfigurationReconciler{
Client: client,
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("configuration-controller"),
}).SetupWithManager(mgr, multiClusterMgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Configuration")
os.Exit(1)
}
}

if err = (&appscontrollers.SystemAccountReconciler{
Expand All @@ -488,13 +487,15 @@ func main() {
os.Exit(1)
}

if err = (&k8scorecontrollers.EventReconciler{
Client: client,
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("event-controller"),
}).SetupWithManager(mgr, multiClusterMgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Event")
os.Exit(1)
if !viper.GetBool(constant.DualOperatorsMode) {
if err = (&k8scorecontrollers.EventReconciler{
Client: client,
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("event-controller"),
}).SetupWithManager(mgr, multiClusterMgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Event")
os.Exit(1)
}
}

if err = (&appscontrollers.ComponentClassReconciler{
Expand All @@ -506,22 +507,26 @@ func main() {
os.Exit(1)
}

if err = (&appscontrollers.ServiceDescriptorReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("service-descriptor-controller"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ServiceDescriptor")
os.Exit(1)
if !viper.GetBool(constant.DualOperatorsMode) {
if err = (&appscontrollers.ServiceDescriptorReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("service-descriptor-controller"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ServiceDescriptor")
os.Exit(1)
}
}

if err = (&appscontrollers.BackupPolicyTemplateReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("backup-policy-template-controller"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "BackupPolicyTemplate")
os.Exit(1)
if !viper.GetBool(constant.DualOperatorsMode) {
if err = (&appscontrollers.BackupPolicyTemplateReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("backup-policy-template-controller"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "BackupPolicyTemplate")
os.Exit(1)
}
}
}

Expand All @@ -536,9 +541,7 @@ func main() {
}
}

dualOperatorsMode := viper.GetBool(dualOperatorsModeFlag.viperName())

if !dualOperatorsMode && viper.GetBool(extensionsFlagKey.viperName()) {
if !viper.GetBool(constant.DualOperatorsMode) && viper.GetBool(extensionsFlagKey.viperName()) {
if err = (&extensionscontrollers.AddonReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Expand All @@ -550,7 +553,7 @@ func main() {
}
}

if !dualOperatorsMode && viper.GetBool(experimentalFlagKey.viperName()) {
if !viper.GetBool(constant.DualOperatorsMode) && viper.GetBool(experimentalFlagKey.viperName()) {
if err = (&experimentalcontrollers.NodeCountScalerReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Expand Down
2 changes: 1 addition & 1 deletion controllers/apps/backuppolicytemplate_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (r *BackupPolicyTemplateReconciler) compatibleBackupPolicyTemplate(ctx cont

// SetupWithManager sets up the controller with the Manager.
func (r *BackupPolicyTemplateReconciler) SetupWithManager(mgr ctrl.Manager) error {
return intctrlutil.NewControllerManagedBy(mgr, &appsv1alpha1.BackupPolicyTemplate{}, &appsv1alpha1.ComponentDefinition{}).
return intctrlutil.NewControllerManagedBy(mgr).
For(&appsv1alpha1.BackupPolicyTemplate{}).
Watches(&appsv1alpha1.ComponentDefinition{}, handler.EnqueueRequestsFromMapFunc(r.compatibleBackupPolicyTemplate)).
Complete(r)
Expand Down
2 changes: 1 addition & 1 deletion controllers/apps/opsrequest_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (r *OpsRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request)

// SetupWithManager sets up the controller with the Manager.
func (r *OpsRequestReconciler) SetupWithManager(mgr ctrl.Manager) error {
return intctrlutil.NewControllerManagedBy(mgr, &appsv1alpha1.Cluster{}, &workloadsv1alpha1.InstanceSet{}).
return intctrlutil.NewControllerManagedBy(mgr).
For(&appsv1alpha1.OpsRequest{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: int(math.Ceil(viper.GetFloat64(constant.CfgKBReconcileWorkers) / 2)),
Expand Down
2 changes: 1 addition & 1 deletion controllers/apps/servicedescriptor_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (r *ServiceDescriptorReconciler) Reconcile(ctx context.Context, req ctrl.Re

// SetupWithManager sets up the controller with the Manager.
func (r *ServiceDescriptorReconciler) SetupWithManager(mgr ctrl.Manager) error {
return intctrlutil.NewControllerManagedBy(mgr, &appsv1alpha1.ServiceDescriptor{}).
return intctrlutil.NewControllerManagedBy(mgr).
For(&appsv1alpha1.ServiceDescriptor{}).
Complete(r)
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/apps/systemaccount_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (r *SystemAccountReconciler) Reconcile(ctx context.Context, req ctrl.Reques

// SetupWithManager sets up the controller with the Manager.
func (r *SystemAccountReconciler) SetupWithManager(mgr ctrl.Manager) error {
return intctrlutil.NewControllerManagedBy(mgr, &appsv1alpha1.Cluster{}).
return intctrlutil.NewControllerManagedBy(mgr).
For(&appsv1alpha1.Cluster{}).
Owns(&corev1.Secret{}).
Watches(&batchv1.Job{}, r.jobCompletionHandler()).
Expand Down
2 changes: 1 addition & 1 deletion controllers/experimental/nodecountscaler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (r *NodeCountScalerReconciler) Reconcile(ctx context.Context, req ctrl.Requ

// SetupWithManager sets up the controller with the Manager.
func (r *NodeCountScalerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return intctrlutil.NewControllerManagedBy(mgr, &appsv1alpha1.Cluster{}).
return intctrlutil.NewControllerManagedBy(mgr).
For(&experimental.NodeCountScaler{}).
Watches(&corev1.Node{}, &nodeScalingHandler{r.Client}).
Watches(&appsv1alpha1.Cluster{}, &clusterHandler{r.Client}).
Expand Down
6 changes: 6 additions & 0 deletions pkg/constant/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,3 +325,9 @@ const (
const InvalidContainerPort int32 = 0

const EmptyInsTemplateName = ""

// DualOperatorsMode indicates whether the operator runs in dual-operators mode.
// If it's true, the operator will degrade to a secondary operator and only manage the resources dedicated to releases prior to v1.0.
const (
DualOperatorsMode = "dual-operators-mode"
)
34 changes: 8 additions & 26 deletions pkg/controllerutil/predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
package controllerutil

import (
"fmt"
"strings"

"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -92,19 +91,13 @@ var (
// unchanged:NodeCountScaler, Addon - the new operator will be responsible for these
// deleted:ClusterVersion, ComponentClassDefinition - nothing to do
// group changed:OpsRequest, OpsDefinition, ConfigConstraint, Configuration - nothing to do
// TODO:
// EventReconciler.Event
// configs & parameters
// data protections

managedNamespaces *sets.Set[string]
supportedCRDAPIVersions = sets.New[string](
// ClusterDefinition, ComponentDefinition, ComponentVersion, BackupPolicyTemplate
// ServiceDescriptor, Cluster, Component
// ClusterDefinition, ComponentDefinition, ComponentVersion, Cluster, Component
appsv1alpha1.GroupVersion.String(),
// InstanceSet
workloadsv1alpha1.GroupVersion.String(),
// TODO: corev1.Event
)
)

Expand Down Expand Up @@ -134,29 +127,18 @@ func namespacePredicateFilter(object client.Object) bool {

func newAPIVersionPredicateFilter(objs []client.Object) func(client.Object) bool {
return func(obj client.Object) bool {
if !viper.GetBool(constant.DualOperatorsMode) {
return true
}
_, clusterObj := obj.(*appsv1alpha1.Cluster)
annotations := obj.GetAnnotations()
if annotations == nil {
return true
return !clusterObj // for newly created clusters, let the new operator handle them first
}
apiVersion, ok := annotations[constant.CRDAPIVersionAnnotationKey]
if !ok {
return true
return !clusterObj // for newly created clusters, let the new operator handle them first
}
// as a fast path
if !supportedCRDAPIVersions.Has(apiVersion) {
return false
}
if len(objs) > 0 {
for _, o := range objs {
if o.GetObjectKind().GroupVersionKind().GroupKind() == obj.GetObjectKind().GroupVersionKind().GroupKind() {
return true
}
}
// has the api version set, but not in the object list?
// we cannot ignore the event silently, so panic here
panic(fmt.Sprintf("seen an event of an object with API version %s, "+
"but the object is not in the object list that controller expects, object: %v", apiVersion, obj))
}
return true
return supportedCRDAPIVersions.Has(apiVersion)
}
}

0 comments on commit 6931535

Please sign in to comment.