Skip to content

Commit

Permalink
Add an option for disabling the workqueue bucket rate limiter (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
tfujiwar authored Nov 29, 2024
1 parent d1f3427 commit eea49ea
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ spec:
{{- with .Values.flags.clientGoRateLimiterBurst }}
- "--client-go-rate-limiter-burst={{ . }}"
{{- end }}
{{- with .Values.flags.disableWorkqueueBucketRateLimiter }}
- "--disable-workqueue-bucket-rate-limiter={{ . }}"
{{- end }}
command:
- "/manager"
{{- if or .Values.metrics .Values.pprof }}
Expand Down
1 change: 1 addition & 0 deletions charts/gha-runner-scale-set-controller/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,4 @@ flags:
## Defines the client-go rate limiter parameters.
# clientGoRateLimiterQPS: 20
# clientGoRateLimiterBurst: 30
# disableWorkqueueBucketRateLimiter: false
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -57,6 +58,7 @@ type AutoscalingListenerReconciler struct {
ListenerMetricsEndpoint string

MaxConcurrentReconciles int
WorkqueueRateLimiter workqueue.RateLimiter

ResourceBuilder
}
Expand Down Expand Up @@ -733,7 +735,10 @@ func (r *AutoscalingListenerReconciler) SetupWithManager(mgr ctrl.Manager) error
Watches(&rbacv1.Role{}, handler.EnqueueRequestsFromMapFunc(labelBasedWatchFunc)).
Watches(&rbacv1.RoleBinding{}, handler.EnqueueRequestsFromMapFunc(labelBasedWatchFunc)).
WithEventFilter(predicate.ResourceVersionChangedPredicate{}).
WithOptions(controller.Options{MaxConcurrentReconciles: r.MaxConcurrentReconciles}).
WithOptions(controller.Options{
MaxConcurrentReconciles: r.MaxConcurrentReconciles,
RateLimiter: r.WorkqueueRateLimiter,
}).
Complete(r)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -81,6 +82,7 @@ type AutoscalingRunnerSetReconciler struct {
UpdateStrategy UpdateStrategy
ActionsClient actions.MultiClient
MaxConcurrentReconciles int
WorkqueueRateLimiter workqueue.RateLimiter
ResourceBuilder
}

Expand Down Expand Up @@ -765,7 +767,10 @@ func (r *AutoscalingRunnerSetReconciler) SetupWithManager(mgr ctrl.Manager) erro
},
)).
WithEventFilter(predicate.ResourceVersionChangedPredicate{}).
WithOptions(controller.Options{MaxConcurrentReconciles: r.MaxConcurrentReconciles}).
WithOptions(controller.Options{
MaxConcurrentReconciles: r.MaxConcurrentReconciles,
RateLimiter: r.WorkqueueRateLimiter,
}).
Complete(r)
}

Expand Down
7 changes: 6 additions & 1 deletion controllers/actions.github.com/ephemeralrunner_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand All @@ -54,6 +55,7 @@ type EphemeralRunnerReconciler struct {
Scheme *runtime.Scheme
ActionsClient actions.MultiClient
MaxConcurrentReconciles int
WorkqueueRateLimiter workqueue.RateLimiter
ResourceBuilder
}

Expand Down Expand Up @@ -837,7 +839,10 @@ func (r *EphemeralRunnerReconciler) SetupWithManager(mgr ctrl.Manager) error {
For(&v1alpha1.EphemeralRunner{}).
Owns(&corev1.Pod{}).
WithEventFilter(predicate.ResourceVersionChangedPredicate{}).
WithOptions(controller.Options{MaxConcurrentReconciles: r.MaxConcurrentReconciles}).
WithOptions(controller.Options{
MaxConcurrentReconciles: r.MaxConcurrentReconciles,
RateLimiter: r.WorkqueueRateLimiter,
}).
Complete(r)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand All @@ -55,6 +56,7 @@ type EphemeralRunnerSetReconciler struct {
PublishMetrics bool

MaxConcurrentReconciles int
WorkqueueRateLimiter workqueue.RateLimiter

ResourceBuilder
}
Expand Down Expand Up @@ -578,7 +580,10 @@ func (r *EphemeralRunnerSetReconciler) SetupWithManager(mgr ctrl.Manager) error
For(&v1alpha1.EphemeralRunnerSet{}).
Owns(&v1alpha1.EphemeralRunner{}).
WithEventFilter(predicate.ResourceVersionChangedPredicate{}).
WithOptions(controller.Options{MaxConcurrentReconciles: r.MaxConcurrentReconciles}).
WithOptions(controller.Options{
MaxConcurrentReconciles: r.MaxConcurrentReconciles,
RateLimiter: r.WorkqueueRateLimiter,
}).
Complete(r)
}

Expand Down
15 changes: 15 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -112,6 +113,8 @@ func main() {

rateLimiterQPS int
rateLimiterBurst int

disableWorkqueueBucketRateLimiter bool
)
var c github.Config
err = envconfig.Process("github", &c)
Expand Down Expand Up @@ -161,6 +164,7 @@ func main() {
flag.IntVar(&maxConcurrentReconcilesForAutoscalingListener, "max-concurrent-reconciles-for-autoscaling-listener", 1, "The maximum number of concurrent reconciles for AutoscalingListener.")
flag.IntVar(&rateLimiterQPS, "client-go-rate-limiter-qps", 20, "The QPS value of client-go rate limiter.")
flag.IntVar(&rateLimiterBurst, "client-go-rate-limiter-burst", 30, "The burst value of client-go rate limiter.")
flag.BoolVar(&disableWorkqueueBucketRateLimiter, "disable-workqueue-bucket-rate-limiter", false, "Disable workqueue BucketRateLimiter.")
flag.Parse()

runnerPodDefaults.RunnerImagePullSecrets = runnerImagePullSecrets
Expand Down Expand Up @@ -265,6 +269,13 @@ func main() {
}

if autoScalingRunnerSetOnly {
var newWorkqueueRateLimiter func() workqueue.RateLimiter
if disableWorkqueueBucketRateLimiter {
newWorkqueueRateLimiter = workqueue.DefaultItemBasedRateLimiter
} else {
newWorkqueueRateLimiter = workqueue.DefaultControllerRateLimiter
}

if err := actionsgithubcom.SetupIndexers(mgr); err != nil {
log.Error(err, "unable to setup indexers")
os.Exit(1)
Expand Down Expand Up @@ -298,6 +309,7 @@ func main() {
UpdateStrategy: actionsgithubcom.UpdateStrategy(updateStrategy),
DefaultRunnerScaleSetListenerImagePullSecrets: autoScalerImagePullSecrets,
MaxConcurrentReconciles: maxConcurrentReconcilesForAutoscalingRunnerSet,
WorkqueueRateLimiter: newWorkqueueRateLimiter(),
ResourceBuilder: rb,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", "AutoscalingRunnerSet")
Expand All @@ -310,6 +322,7 @@ func main() {
Scheme: mgr.GetScheme(),
ActionsClient: actionsMultiClient,
MaxConcurrentReconciles: maxConcurrentReconcilesForEphemeralRunner,
WorkqueueRateLimiter: newWorkqueueRateLimiter(),
ResourceBuilder: rb,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", "EphemeralRunner")
Expand All @@ -323,6 +336,7 @@ func main() {
ActionsClient: actionsMultiClient,
PublishMetrics: metricsAddr != "0",
MaxConcurrentReconciles: maxConcurrentReconcilesForEphemeralRunnerSet,
WorkqueueRateLimiter: newWorkqueueRateLimiter(),
ResourceBuilder: rb,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", "EphemeralRunnerSet")
Expand All @@ -336,6 +350,7 @@ func main() {
ListenerMetricsAddr: listenerMetricsAddr,
ListenerMetricsEndpoint: listenerMetricsEndpoint,
MaxConcurrentReconciles: maxConcurrentReconcilesForAutoscalingListener,
WorkqueueRateLimiter: newWorkqueueRateLimiter(),
ResourceBuilder: rb,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", "AutoscalingListener")
Expand Down

0 comments on commit eea49ea

Please sign in to comment.