Skip to content

Commit

Permalink
Code migrations for controller-runtime upgrade 0.14.6 -> 0.16.3
Browse files Browse the repository at this point in the history
Signed-off-by: Edgar Hernández <[email protected]>
  • Loading branch information
israel-hdez committed Apr 10, 2024
1 parent c5eef34 commit bc39887
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 32 deletions.
8 changes: 4 additions & 4 deletions controllers/predictor_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,20 +592,20 @@ func (pr *PredictorReconciler) SetupWithManager(mgr ctrl.Manager, eventStream *m
watchInferenceServices bool, sourcePluginEvents <-chan event.GenericEvent) error {
builder := ctrl.NewControllerManagedBy(mgr).
For(&api.Predictor{}).
Watches(&src.Channel{Source: eventStream.MMEvents}, &handler.EnqueueRequestForObject{})
WatchesRawSource(&src.Channel{Source: eventStream.MMEvents}, &handler.EnqueueRequestForObject{})

if sourcePluginEvents != nil {
builder.Watches(&src.Channel{Source: sourcePluginEvents}, &handler.EnqueueRequestForObject{})
builder.WatchesRawSource(&src.Channel{Source: sourcePluginEvents}, &handler.EnqueueRequestForObject{})
}

if watchInferenceServices {
builder = builder.Watches(&src.Kind{Type: &v1beta1.InferenceService{}}, prefixName(InferenceServiceCRSourceId))
builder = builder.Watches(&v1beta1.InferenceService{}, prefixName(InferenceServiceCRSourceId))
}
return builder.Complete(pr)
}

func prefixName(prefix string) handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request {
return handler.EnqueueRequestsFromMapFunc(func(_ context.Context, o client.Object) []reconcile.Request {
// Prepend prefix
return []reconcile.Request{{
NamespacedName: types.NamespacedName{
Expand Down
13 changes: 6 additions & 7 deletions controllers/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/source"
)

const (
Expand Down Expand Up @@ -447,7 +446,7 @@ func (r *ServiceReconciler) setupForNamespaceScope(builder *bld.Builder) {
UpdateFunc: func(event event.UpdateEvent) bool { return filter(event.ObjectNew) },
DeleteFunc: func(event event.DeleteEvent) bool { return false },
})).
Watches(&source.Kind{Type: &corev1.ConfigMap{}},
Watches(&corev1.ConfigMap{},
config.ConfigWatchHandler(r.ConfigMapName, func() []reconcile.Request {
if _, _, changed := r.getMMService(r.ControllerDeployment.Namespace, r.ConfigProvider, true); changed {
r.Log.Info("Triggering service reconciliation after config change")
Expand All @@ -458,8 +457,8 @@ func (r *ServiceReconciler) setupForNamespaceScope(builder *bld.Builder) {

// Enable ServiceMonitor watch if ServiceMonitorCRDExists
if r.ServiceMonitorCRDExists {
builder.Watches(&source.Kind{Type: &monitoringv1.ServiceMonitor{}},
handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request {
builder.Watches(&monitoringv1.ServiceMonitor{},
handler.EnqueueRequestsFromMapFunc(func(_ context.Context, o client.Object) []reconcile.Request {
if o.GetName() == serviceMonitorName && o.GetNamespace() == r.ControllerDeployment.Namespace {
return []reconcile.Request{{NamespacedName: r.ControllerDeployment}}
}
Expand All @@ -470,7 +469,7 @@ func (r *ServiceReconciler) setupForNamespaceScope(builder *bld.Builder) {

func (r *ServiceReconciler) setupForClusterScope(builder *bld.Builder) {
builder.For(&corev1.Namespace{}).
Watches(&source.Kind{Type: &corev1.ConfigMap{}},
Watches(&corev1.ConfigMap{},
config.ConfigWatchHandler(r.ConfigMapName, func() []reconcile.Request {
list := &corev1.NamespaceList{}
if err := r.Client.List(context.TODO(), list); err != nil {
Expand All @@ -492,8 +491,8 @@ func (r *ServiceReconciler) setupForClusterScope(builder *bld.Builder) {

// Enable ServiceMonitor watch if ServiceMonitorCRDExists
if r.ServiceMonitorCRDExists {
builder.Watches(&source.Kind{Type: &monitoringv1.ServiceMonitor{}},
handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request {
builder.Watches(&monitoringv1.ServiceMonitor{},
handler.EnqueueRequestsFromMapFunc(func(_ context.Context, o client.Object) []reconcile.Request {
if o.GetName() == serviceMonitorName {
return []reconcile.Request{{
NamespacedName: types.NamespacedName{Name: o.GetNamespace()},
Expand Down
28 changes: 14 additions & 14 deletions controllers/servingruntime_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ func (r *ServingRuntimeReconciler) SetupWithManager(mgr ctrl.Manager,
For(&kserveapi.ServingRuntime{}).
Owns(&appsv1.Deployment{}).
// watch the user configmap and reconcile all runtimes when it changes
Watches(&source.Kind{Type: &corev1.ConfigMap{}},
Watches(&corev1.ConfigMap{},
config.ConfigWatchHandler(r.ConfigMapName, func() []reconcile.Request {
return r.requestsForRuntimes("", func(namespace string) bool {
mme, err := modelMeshEnabled2(context.TODO(), namespace,
Expand All @@ -615,22 +615,22 @@ func (r *ServingRuntimeReconciler) SetupWithManager(mgr ctrl.Manager,
})
}, r.ConfigProvider, &r.Client)).
// watch predictors and reconcile the corresponding runtime(s) it could be assigned to
Watches(&source.Kind{Type: &api.Predictor{}},
handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request {
Watches(&api.Predictor{},
handler.EnqueueRequestsFromMapFunc(func(_ context.Context, o client.Object) []reconcile.Request {
return r.runtimeRequestsForPredictor(o.(*api.Predictor), "Predictor")
}))

if r.ClusterScope {
// watch namespaces to check the modelmesh-enabled flag
builder = builder.Watches(&source.Kind{Type: &corev1.Namespace{}}, handler.EnqueueRequestsFromMapFunc(
func(o client.Object) []reconcile.Request {
builder = builder.Watches(&corev1.Namespace{}, handler.EnqueueRequestsFromMapFunc(
func(_ context.Context, o client.Object) []reconcile.Request {
return r.requestsForRuntimes(o.GetName(), nil)
}))
}

if watchInferenceServices {
builder = builder.Watches(&source.Kind{Type: &v1beta1.InferenceService{}},
handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request {
builder = builder.Watches(&v1beta1.InferenceService{},
handler.EnqueueRequestsFromMapFunc(func(_ context.Context, o client.Object) []reconcile.Request {
if p, _ := predictor_source.BuildBasePredictorFromInferenceService(o.(*v1beta1.InferenceService)); p != nil {
return r.runtimeRequestsForPredictor(p, "InferenceService")
}
Expand All @@ -639,26 +639,26 @@ func (r *ServingRuntimeReconciler) SetupWithManager(mgr ctrl.Manager,
}

if r.EnableCSRWatch {
builder = builder.Watches(&source.Kind{Type: &kserveapi.ClusterServingRuntime{}},
handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request {
builder = builder.Watches(&kserveapi.ClusterServingRuntime{},
handler.EnqueueRequestsFromMapFunc(func(_ context.Context, o client.Object) []reconcile.Request {
return r.clusterServingRuntimeRequests(o.(*kserveapi.ClusterServingRuntime))
}))
}

if r.EnableSecretWatch {
builder = builder.Watches(&source.Kind{Type: &corev1.Secret{}},
handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request {
builder = builder.Watches(&corev1.Secret{},
handler.EnqueueRequestsFromMapFunc(func(_ context.Context, o client.Object) []reconcile.Request {
return r.storageSecretRequests(o.(*corev1.Secret))
}))
}

if sourcePluginEvents != nil {
builder.Watches(&source.Channel{Source: sourcePluginEvents},
handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request {
builder.WatchesRawSource(&source.Channel{Source: sourcePluginEvents},
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) []reconcile.Request {
nn, src := predictor_source.ResolveSource(types.NamespacedName{
Name: o.GetName(), Namespace: o.GetNamespace()}, PredictorCRSourceId)
if registry, ok := r.RegistryMap[src]; ok {
if p, _ := registry.Get(context.TODO(), nn); p != nil {
if p, _ := registry.Get(ctx, nn); p != nil {
return r.runtimeRequestsForPredictor(p, registry.GetSourceName())
}
}
Expand Down
5 changes: 3 additions & 2 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/yaml"

kserveapi "github.com/kserve/kserve/pkg/apis/serving/v1alpha1"
Expand Down Expand Up @@ -101,8 +102,8 @@ var _ = BeforeSuite(func() {
// +kubebuilder:scaffold:scheme

k8sManager, err = ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme.Scheme,
MetricsBindAddress: "0", //This disables the metrics server
Scheme: scheme.Scheme,
Metrics: server.Options{BindAddress: "0"}, //This disables the metrics server
})
Expect(err).ToNot(HaveOccurred())

Expand Down
12 changes: 9 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
_ "k8s.io/client-go/plugin/pkg/client/auth/oidc"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/metrics/server"

monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"

Expand Down Expand Up @@ -219,14 +221,18 @@ func main() {

mgrOpts := ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
Metrics: server.Options{BindAddress: metricsAddr},
WebhookServer: webhook.NewServer(webhook.Options{Port: 9443}),
HealthProbeBindAddress: probeAddr,
}

if !clusterScopeMode {
// Set manager to operate scoped to our namespace
mgrOpts.Namespace = ControllerNamespace
mgrOpts.Cache = cache.Options{
DefaultNamespaces: map[string]cache.Config{
ControllerNamespace: {},
},
}
}

if enableLeaderElection {
Expand Down
4 changes: 2 additions & 2 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,10 @@ func (cp *ConfigProvider) ReloadConfigMap(ctx context.Context, c client.Client,
// Handler used by controllers which depend on the user configuration
func ConfigWatchHandler(configMapName types.NamespacedName, f func() []reconcile.Request,
cp *ConfigProvider, kclient *client.Client) handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request {
return handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) []reconcile.Request {
// Ignore ConfigMaps we don't care about
if o.GetName() == configMapName.Name && o.GetNamespace() == configMapName.Namespace {
err := cp.ReloadConfigMap(context.TODO(), *kclient, configMapName)
err := cp.ReloadConfigMap(ctx, *kclient, configMapName)
if err != nil {
configLog.Error(err, "Unable to reload user configuration")
}
Expand Down

0 comments on commit bc39887

Please sign in to comment.