Skip to content

Commit

Permalink
fix: do not allow discovery api call for helm templating to stop the …
Browse files Browse the repository at this point in the history
…agent (#222)

* do not allow discovery api call for helm templating to stop the agent

* bump default number of reconciles to 20

* update logic

* fix lint
  • Loading branch information
floreks authored Jun 17, 2024
1 parent 72d9760 commit 7243acc
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 36 deletions.
23 changes: 12 additions & 11 deletions cmd/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,40 @@ import (
"github.com/pluralsh/deployment-operator/internal/utils"
"github.com/pluralsh/deployment-operator/pkg/controller/stacks"

"github.com/samber/lo"
"golang.org/x/net/context"
"k8s.io/client-go/rest"
ctrclient "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/pluralsh/deployment-operator/pkg/controller"
"github.com/pluralsh/deployment-operator/pkg/controller/namespaces"
"github.com/pluralsh/deployment-operator/pkg/controller/pipelinegates"
"github.com/pluralsh/deployment-operator/pkg/controller/restore"
"github.com/pluralsh/deployment-operator/pkg/controller/service"
"github.com/samber/lo"
"golang.org/x/net/context"
"k8s.io/client-go/rest"
ctrclient "sigs.k8s.io/controller-runtime/pkg/client"
)

func runAgent(opt *options, config *rest.Config, ctx context.Context, k8sClient ctrclient.Client) (*controller.ControllerManager, *service.ServiceReconciler, *pipelinegates.GateReconciler) {
r, err := time.ParseDuration(opt.refreshInterval)
if err != nil {
setupLog.Error(err, "unable to get refresh interval")
setupLog.Error("unable to get refresh interval", "error", err)
os.Exit(1)
}

t, err := time.ParseDuration(opt.processingTimeout)
if err != nil {
setupLog.Error(err, "unable to get processing timeout")
setupLog.Errorw("unable to get processing timeout", "error", err)
os.Exit(1)
}

mgr, err := controller.NewControllerManager(ctx, opt.maxConcurrentReconciles, t, r, lo.ToPtr(true), opt.consoleUrl, opt.deployToken, opt.clusterId)
if err != nil {
setupLog.Error(err, "unable to create manager")
setupLog.Errorw("unable to create manager", "error", err)
os.Exit(1)
}

sr, err := service.NewServiceReconciler(ctx, mgr.GetClient(), config, r, opt.restoreNamespace)
if err != nil {
setupLog.Error(err, "unable to create service reconciler")
setupLog.Errorw("unable to create service reconciler", "error", err)
os.Exit(1)
}
mgr.AddController(&controller.Controller{
Expand All @@ -49,7 +50,7 @@ func runAgent(opt *options, config *rest.Config, ctx context.Context, k8sClient
})
gr, err := pipelinegates.NewGateReconciler(mgr.GetClient(), k8sClient, config, r, opt.clusterId)
if err != nil {
setupLog.Error(err, "unable to create gate reconciler")
setupLog.Errorw("unable to create gate reconciler", "error", err)
os.Exit(1)
}
mgr.AddController(&controller.Controller{
Expand All @@ -74,7 +75,7 @@ func runAgent(opt *options, config *rest.Config, ctx context.Context, k8sClient

namespace, err := utils.GetOperatorNamespace()
if err != nil {
setupLog.Error(err, "unable to get operator namespace")
setupLog.Errorw("unable to get operator namespace", "error", err)
os.Exit(1)
}

Expand All @@ -85,7 +86,7 @@ func runAgent(opt *options, config *rest.Config, ctx context.Context, k8sClient
Queue: s.StackQueue,
})
if err := mgr.Start(); err != nil {
setupLog.Error(err, "unable to start controller manager")
setupLog.Errorw("unable to start controller manager", "error", err)
os.Exit(1)
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func newOptions() *options {
flag.StringVar(&o.metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&o.probeAddr, "health-probe-bind-address", ":9001", "The address the probe endpoint binds to.")
flag.BoolVar(&o.enableLeaderElection, "leader-elect", false, "Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.")
flag.IntVar(&o.maxConcurrentReconciles, "max-concurrent-reconciles", 10, "Maximum number of concurrent reconciles which can be run.")
flag.IntVar(&o.maxConcurrentReconciles, "max-concurrent-reconciles", 20, "Maximum number of concurrent reconciles which can be run.")
flag.IntVar(&o.resyncSeconds, "resync-seconds", 300, "Resync duration in seconds.")
flag.StringVar(&o.refreshInterval, "refresh-interval", "2m", "Refresh interval duration.")
flag.StringVar(&o.processingTimeout, "processing-timeout", "1m", "Maximum amount of time to spend trying to process queue item.")
Expand Down
24 changes: 24 additions & 0 deletions internal/helpers/poll.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package helpers

import (
"context"
"time"

"k8s.io/apimachinery/pkg/util/wait"
)

// BackgroundPollUntilContextCancel spawns a new goroutine that runs the condition function on interval.
// If syncFirstRun is set to true, it will execute the condition function synchronously first and then start
// polling. Since error is returned synchronously, the only way to check for it is to use syncFirstRun.
// Background poller does not sync errors. It can be stopped externally by cancelling the provided context.
func BackgroundPollUntilContextCancel(ctx context.Context, interval time.Duration, immediate, syncFirstRun bool, condition wait.ConditionWithContextFunc) (err error) {
if syncFirstRun {
_, err = condition(ctx)
}

go func() {
_ = wait.PollUntilContextCancel(ctx, interval, immediate, condition)
}()

return err
}
44 changes: 20 additions & 24 deletions pkg/controller/service/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,11 @@ import (
"time"

console "github.com/pluralsh/console-client-go"
clienterrors "github.com/pluralsh/deployment-operator/internal/errors"
"github.com/pluralsh/deployment-operator/internal/utils"
"github.com/pluralsh/deployment-operator/pkg/applier"
"github.com/pluralsh/deployment-operator/pkg/client"
"github.com/pluralsh/deployment-operator/pkg/controller"
plrlerrors "github.com/pluralsh/deployment-operator/pkg/errors"
"github.com/pluralsh/deployment-operator/pkg/manifests"
manis "github.com/pluralsh/deployment-operator/pkg/manifests"
"github.com/pluralsh/deployment-operator/pkg/manifests/template"
"github.com/pluralsh/deployment-operator/pkg/ping"
"github.com/pluralsh/deployment-operator/pkg/websocket"
"github.com/pluralsh/polly/algorithms"
"github.com/samber/lo"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand All @@ -34,6 +22,19 @@ import (
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

clienterrors "github.com/pluralsh/deployment-operator/internal/errors"
"github.com/pluralsh/deployment-operator/internal/helpers"
"github.com/pluralsh/deployment-operator/internal/utils"
"github.com/pluralsh/deployment-operator/pkg/applier"
"github.com/pluralsh/deployment-operator/pkg/client"
"github.com/pluralsh/deployment-operator/pkg/controller"
plrlerrors "github.com/pluralsh/deployment-operator/pkg/errors"
"github.com/pluralsh/deployment-operator/pkg/manifests"
manis "github.com/pluralsh/deployment-operator/pkg/manifests"
"github.com/pluralsh/deployment-operator/pkg/manifests/template"
"github.com/pluralsh/deployment-operator/pkg/ping"
"github.com/pluralsh/deployment-operator/pkg/websocket"
)

func init() {
Expand Down Expand Up @@ -106,19 +107,14 @@ func NewServiceReconciler(ctx context.Context, consoleClient client.Client, conf
if err != nil {
return nil, err
}
if err := CapabilitiesAPIVersions(discoveryClient); err != nil {
return nil, err
}

go func() {
//nolint:all
_ = wait.PollImmediateInfinite(time.Minute*5, func() (done bool, err error) {
if err := CapabilitiesAPIVersions(discoveryClient); err != nil {
logger.Error(err, "can't fetch API versions")
}
return false, nil
})
}()
_ = helpers.BackgroundPollUntilContextCancel(ctx, 5*time.Minute, true, true, func(_ context.Context) (done bool, err error) {
if err := CapabilitiesAPIVersions(discoveryClient); err != nil {
logger.Error(err, "can't fetch API versions")
}

return false, nil
})

return &ServiceReconciler{
ConsoleClient: consoleClient,
Expand Down

0 comments on commit 7243acc

Please sign in to comment.