From 06221069b329a676200896d659cfb0d2ab117f60 Mon Sep 17 00:00:00 2001 From: Erwin de Haan Date: Mon, 26 Jun 2023 15:51:55 +0200 Subject: [PATCH] Make pods not require IsPodAvailable since that will break when a pod just started (it won't generate a second event) --- README.md | 4 +-- cmd/wait.go | 1 + flags/flags.go | 10 +++++-- pkg/handlers.go | 66 +++++++++++++++++++++++++------------------- pkg/items/pod.go | 3 +- pkg/items/service.go | 31 +++++++++++++++++++++ pkg/waitables.go | 16 +++++++---- 7 files changed, 91 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index 96a7597..ae428ae 100644 --- a/README.md +++ b/README.md @@ -17,11 +17,11 @@ It accepts arguments in the following formats: - `pod,pod-name` using the namespace from the `--namespace`, `-n` flag or `default` - `pod-name` using the namespace from the `--namespace`, `-n` flag or `default` and the kind `pod` -For pods it waits until the pod is Ready (`k8s.io/kubectl/pkg/util/podutils.IsPodReady`) and Available (`k8s.io/kubectl/pkg/util/podutils.IsPodAvailable`). +For pods it waits until the pod is Ready (`k8s.io/kubectl/pkg/util/podutils.IsPodReady`). For jobs it wait until the `Completed` condition is true. -For services it will wait until all pods that match the service selector are Ready and Available (like above). +For services it will wait until all pods that match the service selector are Ready (like above). ## Example diff --git a/cmd/wait.go b/cmd/wait.go index 6bf25fd..28e7d4f 100644 --- a/cmd/wait.go +++ b/cmd/wait.go @@ -92,6 +92,7 @@ func wait(cmd *cobra.Command, args []string) error { opts := cache.Options{ Namespaces: namespaces, + SyncPeriod: WaitForConfigFlags.SyncPeriod, } conf, err := KubernetesConfigFlags.ToRESTConfig() diff --git a/flags/flags.go b/flags/flags.go index 53355f3..f7f3bad 100644 --- a/flags/flags.go +++ b/flags/flags.go @@ -29,7 +29,8 @@ type ConfigFlags struct { PrintTree *bool PrintCollapsedTree *bool - Timeout *time.Duration + Timeout *time.Duration + SyncPeriod *time.Duration } func NewConfigFlags() *ConfigFlags { @@ -38,7 +39,8 @@ func NewConfigFlags() *ConfigFlags { PrintTree: utilpointer.Bool(true), PrintCollapsedTree: utilpointer.Bool(true), - Timeout: utilpointer.Duration(time.Duration(600 * time.Second)), + Timeout: utilpointer.Duration(time.Duration(600 * time.Second)), + SyncPeriod: utilpointer.Duration(time.Duration(90 * time.Second)), } } @@ -47,6 +49,10 @@ func (f *ConfigFlags) AddFlags(flags *pflag.FlagSet) { flags.DurationVarP(f.Timeout, "timeout", "t", *f.Timeout, "The length of time to wait before ending watch, zero means never. Any other values should contain a corresponding time unit (e.g. 1s, 2m, 3h)") } + if f.Timeout != nil { + flags.DurationVar(f.SyncPeriod, "sync-period", *f.SyncPeriod, "The length of time to pass to the cache to initiate a sync. (e.g. 1s, 2m, 3h)") + } + if f.PrintVersion != nil { flags.BoolVarP(f.PrintVersion, "version", "v", *f.PrintVersion, "Display version info") } diff --git a/pkg/handlers.go b/pkg/handlers.go index d955c26..efe1bb2 100644 --- a/pkg/handlers.go +++ b/pkg/handlers.go @@ -68,9 +68,9 @@ func (w *Waitables) ProcessEventDeleteService(ctx context.Context, svc *corev1.S } func (w *Waitables) ProcessOldPodEvents(ctx context.Context, pod *corev1.Pod) (bool, error) { - if val, ok := w.UnprocessablePodEvents[pod.UID]; ok { - //log.Printf("Running UnprocessablePodEvents for %s/%s of type %v", pod.Namespace, pod.Name, val.EventType) - //defer delete(w.UnprocessablePodEvents, pod.UID) + if val, ok := w.LastPodEvents[pod.UID]; ok { + //log.Printf("Running LastPodEvents for %s/%s of type %v", pod.Namespace, pod.Name, val.EventType) + //defer delete(w.LastPodEvents, pod.UID) if val.EventType == EventTypeAdd { return w.ProcessEventAddPod(ctx, pod) } else if val.EventType == EventTypeUpdate { @@ -83,54 +83,64 @@ func (w *Waitables) ProcessOldPodEvents(ctx context.Context, pod *corev1.Pod) (b } func (w *Waitables) ProcessEventAddPod(ctx context.Context, pod *corev1.Pod) (bool, error) { - processed := false - if w.HasPod(pod.ObjectMeta) { - //log.Printf("Add %T %s %s", pod, pod.Namespace, pod.Name) + // if w.HasPod(pod.ObjectMeta) { + // log.Printf("Add %T %s %s", pod, pod.Namespace, pod.Name) + // } + + if w.HasPodDirect(pod.ObjectMeta) { w.SetPodReadyFromPod(pod) - processed = true } + if podItems, ok := w.Services.GetPods(pod); ok { for _, podItem := range podItems { podItem.WithReadyFromPod(pod) } - processed = true - } - if processed { - return true, nil - } else { - w.UnprocessablePodEvents[pod.UID] = Event{EventType: EventTypeAdd, Pod: pod} } - return false, nil + + w.LastPodEvents[pod.UID] = Event{EventType: EventTypeAdd, Pod: pod} + + return w.HasPod(pod.ObjectMeta), nil } func (w *Waitables) ProcessEventUpdatePod(ctx context.Context, pod *corev1.Pod) (bool, error) { - if w.HasPod(pod.ObjectMeta) { - //log.Printf("Update %T %s %s", pod, pod.Namespace, pod.Name) + // if w.HasPod(pod.ObjectMeta) { + // log.Printf("Update %T %s %s", pod, pod.Namespace, pod.Name) + // } + + if w.HasPodDirect(pod.ObjectMeta) { w.SetPodReadyFromPod(pod) - return true, nil - } else if podItems, ok := w.Services.GetPods(pod); ok { + } + + if podItems, ok := w.Services.GetPods(pod); ok { for _, podItem := range podItems { podItem.WithReadyFromPod(pod) } - } else { - w.UnprocessablePodEvents[pod.UID] = Event{EventType: EventTypeUpdate, Pod: pod} } - return false, nil + + w.LastPodEvents[pod.UID] = Event{EventType: EventTypeUpdate, Pod: pod} + + return w.HasPod(pod.ObjectMeta), nil } func (w *Waitables) ProcessEventDeletePod(ctx context.Context, pod *corev1.Pod) (bool, error) { - if w.HasPod(pod.ObjectMeta) { - //log.Printf("Delete %T %s %s", pod, pod.Namespace, pod.Name) + // if w.HasPod(pod.ObjectMeta) { + // log.Printf("Delete %T %s %s", pod, pod.Namespace, pod.Name) + // } + + if w.HasPodDirect(pod.ObjectMeta) { w.UnsetPodReady(pod) - return true, nil - } else if podItems, ok := w.Services.GetPods(pod); ok { + } + + if podItems, ok := w.Services.GetPods(pod); ok { for _, podItem := range podItems { podItem.WithReady(false) } - } else { - w.UnprocessablePodEvents[pod.UID] = Event{EventType: EventTypeDelete, Pod: pod} + w.Services.DeletePod(&pod.ObjectMeta) } - return false, nil + + w.LastPodEvents[pod.UID] = Event{EventType: EventTypeDelete, Pod: pod} + + return w.HasPod(pod.ObjectMeta), nil } func (w *Waitables) ProcessEventAddJob(ctx context.Context, job *batchv1.Job) (bool, error) { diff --git a/pkg/items/pod.go b/pkg/items/pod.go index bf4fc4a..a4fd385 100644 --- a/pkg/items/pod.go +++ b/pkg/items/pod.go @@ -20,7 +20,6 @@ import ( "k8s.io/kubectl/pkg/util/podutils" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) type NamespacedPodCollection map[string]PodCollection @@ -47,7 +46,7 @@ func (i *PodItem) WithReady(ready bool) *PodItem { } func (i *PodItem) WithReadyFromPod(pod *corev1.Pod) *PodItem { - i.ready = podutils.IsPodReady(pod) && podutils.IsPodAvailable(pod, 2, metav1.Now()) + i.ready = podutils.IsPodReady(pod) return i } diff --git a/pkg/items/service.go b/pkg/items/service.go index 3e55105..08fdf07 100644 --- a/pkg/items/service.go +++ b/pkg/items/service.go @@ -72,6 +72,26 @@ func (i *ServiceItem) GetPod(pod ItemInterface) (*PodItem, bool) { return val, ok } +func (c ServiceItem) DeletePod(i ItemInterface) { + if c.namespace == i.GetNamespace() { + delete(c.children, i.GetName()) + } +} + +func (c ServiceCollection) DeletePod(i ItemInterface) { + for _, svc := range c { + svc.DeletePod(i) + } +} + +func (c NamespacedServiceCollection) DeletePod(i ItemInterface) { + for ns, nssvcs := range c { + if ns == i.GetNamespace() { + nssvcs.DeletePod(i) + } + } +} + func (c NamespacedServiceCollection) EnsureNamespace(ns string) { if _, ok := c[ns]; !ok { c[ns] = ServiceCollection{} @@ -83,6 +103,17 @@ func (c NamespacedServiceCollection) Contains(i ItemInterface) bool { return ok } +func (c NamespacedServiceCollection) ContainsPod(i ItemInterface) bool { + for _, items := range c { + for _, item := range items { + if item.children.Contains(i) { + return true + } + } + } + return false +} + func (c NamespacedServiceCollection) GetPods(i ItemInterface) ([]*PodItem, bool) { pods := []*PodItem{} diff --git a/pkg/waitables.go b/pkg/waitables.go index 549cceb..299db62 100644 --- a/pkg/waitables.go +++ b/pkg/waitables.go @@ -45,7 +45,7 @@ type Waitables struct { tickerDone chan bool tickerFinished chan bool - UnprocessablePodEvents map[types.UID]Event + LastPodEvents map[types.UID]Event Services items.NamespacedServiceCollection Pods items.NamespacedPodCollection @@ -89,10 +89,14 @@ func (w *Waitables) addJob(namespace string, name string) *items.JobItem { return w.Jobs[namespace][name] } -func (w *Waitables) HasPod(meta metav1.ObjectMeta) bool { +func (w *Waitables) HasPodDirect(meta metav1.ObjectMeta) bool { return w.Pods.Contains(&meta) } +func (w *Waitables) HasPod(meta metav1.ObjectMeta) bool { + return w.HasPodDirect(meta) || w.Services.ContainsPod(&meta) +} + func (w *Waitables) HasService(meta metav1.ObjectMeta) bool { return w.Services.Contains(&meta) } @@ -319,10 +323,10 @@ func (w *Waitables) WithCache(c cache.Cache) *Waitables { func NewWaitables(c *flags.ConfigFlags) *Waitables { w := &Waitables{ - UnprocessablePodEvents: map[types.UID]Event{}, - Services: items.NamespacedServiceCollection{}, - Pods: items.NamespacedPodCollection{}, - Jobs: items.NamespacedJobCollection{}, + LastPodEvents: map[types.UID]Event{}, + Services: items.NamespacedServiceCollection{}, + Pods: items.NamespacedPodCollection{}, + Jobs: items.NamespacedJobCollection{}, ticker: time.NewTicker(250 * time.Millisecond), queuedPrints: 0,