diff --git a/go.mod b/go.mod index 2efe4ded8..4ae3f5e0f 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/go-bindata/go-bindata v3.1.2+incompatible github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b github.com/golang/mock v1.4.3 + github.com/google/uuid v1.1.1 github.com/openshift/api v0.0.0-20200728181127-fc1d675671df github.com/openshift/build-machinery-go v0.0.0-20200713135615-1f43d26dccc7 github.com/openshift/client-go v0.0.0-20200722173614-5a1b0aaeff15 diff --git a/pkg/cmd/operator/cmd.go b/pkg/cmd/operator/cmd.go index 0f737e362..f42d7d154 100644 --- a/pkg/cmd/operator/cmd.go +++ b/pkg/cmd/operator/cmd.go @@ -17,11 +17,16 @@ limitations under the License. package cmd import ( + "context" "flag" golog "log" + "os" + "os/signal" + "syscall" "time" "github.com/golang/glog" + "github.com/google/uuid" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -30,8 +35,12 @@ import ( controller "github.com/openshift/cloud-credential-operator/pkg/operator" "github.com/openshift/cloud-credential-operator/pkg/util" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" "sigs.k8s.io/controller-runtime/pkg/client/config" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/runtime/signals" @@ -67,35 +76,95 @@ func NewOperator() *cobra.Command { log.WithError(err).Fatal("unable to set up client config") } - // Create a new Cmd to provide shared dependencies and start components - log.Info("setting up manager") - mgr, err := manager.New(cfg, manager.Options{ - MetricsBindAddress: ":2112", - LeaderElection: true, - LeaderElectionNamespace: minterv1.CloudCredOperatorNamespace, - LeaderElectionID: leaderElectionConfigMap, - }) - if err != nil { - log.WithError(err).Fatal("unable to set up overall controller manager") + run := func(ctx context.Context) { + // Create a new Cmd to provide shared dependencies and start components + log.Info("setting up manager") + mgr, err := manager.New(cfg, manager.Options{ + MetricsBindAddress: ":2112", + }) + if err != nil { + log.WithError(err).Fatal("unable to set up overall controller manager") + } + + log.Info("registering components") + + // Setup Scheme for all resources + util.SetupScheme(mgr.GetScheme()) + + // Setup all Controllers + log.Info("setting up controller") + kubeconfigCommandLinePath := cmd.PersistentFlags().Lookup("kubeconfig").Value.String() + if err := controller.AddToManager(mgr, kubeconfigCommandLinePath); err != nil { + log.WithError(err).Fatal("unable to register controllers to the manager") + } + + // Start the Cmd + log.Info("starting the cmd") + if err := mgr.Start(signals.SetupSignalHandler()); err != nil { + log.WithError(err).Fatal("unable to run the manager") + } } - log.Info("registering components") - - // Setup Scheme for all resources - util.SetupScheme(mgr.GetScheme()) - - // Setup all Controllers - log.Info("setting up controller") - kubeconfigCommandLinePath := cmd.PersistentFlags().Lookup("kubeconfig").Value.String() - if err := controller.AddToManager(mgr, kubeconfigCommandLinePath); err != nil { - log.WithError(err).Fatal("unable to register controllers to the manager") + // Leader election code based on: + // https://github.com/kubernetes/kubernetes/blob/f7e3bcdec2e090b7361a61e21c20b3dbbb41b7f0/staging/src/k8s.io/client-go/examples/leader-election/main.go#L92-L154 + // This gives us ReleaseOnCancel which is not presently exposed in controller-runtime. + + // use a Go context so we can tell the leaderelection code when we want to step down + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // listen for interrupts or the Linux SIGTERM signal and cancel + // our context, which the leader election code will observe and + // step down + ch := make(chan os.Signal, 1) + signal.Notify(ch, os.Interrupt, syscall.SIGTERM) + go func() { + <-ch + log.Info("received termination, signaling shutdown") + cancel() + }() + + id := uuid.New().String() + leLog := log.WithField("id", id) + leLog.Info("generated leader election ID") + + lock := &resourcelock.ConfigMapLock{ + ConfigMapMeta: metav1.ObjectMeta{ + Namespace: minterv1.CloudCredOperatorNamespace, + Name: leaderElectionConfigMap, + }, + Client: kubernetes.NewForConfigOrDie(cfg).CoreV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: id, + }, } - // Start the Cmd - log.Info("starting the cmd") - if err := mgr.Start(signals.SetupSignalHandler()); err != nil { - log.WithError(err).Fatal("unable to run the manager") - } + // start the leader election code loop + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: lock, + ReleaseOnCancel: true, + LeaseDuration: 360 * time.Second, + RenewDeadline: 270 * time.Second, + RetryPeriod: 90 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + run(ctx) + }, + OnStoppedLeading: func() { + // we can do cleanup here if necessary + leLog.Infof("leader lost") + os.Exit(0) + }, + OnNewLeader: func(identity string) { + if identity == id { + // We just became the leader + leLog.Info("became leader") + return + } + log.Infof("current leader: %s", identity) + }, + }, + }) }, }