Skip to content

Commit

Permalink
Make pods not require IsPodAvailable since that will break when a pod…
Browse files Browse the repository at this point in the history
… just started (it won't generate a second event)
  • Loading branch information
EraYaN committed Jun 26, 2023
1 parent 1f5875f commit 0622106
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 40 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ It accepts arguments in the following formats:
- `pod,pod-name` using the namespace from the `--namespace`, `-n` flag or `default`
- `pod-name` using the namespace from the `--namespace`, `-n` flag or `default` and the kind `pod`

For pods it waits until the pod is Ready (`k8s.io/kubectl/pkg/util/podutils.IsPodReady`) and Available (`k8s.io/kubectl/pkg/util/podutils.IsPodAvailable`).
For pods it waits until the pod is Ready (`k8s.io/kubectl/pkg/util/podutils.IsPodReady`).

For jobs it wait until the `Completed` condition is true.

For services it will wait until all pods that match the service selector are Ready and Available (like above).
For services it will wait until all pods that match the service selector are Ready (like above).

## Example

Expand Down
1 change: 1 addition & 0 deletions cmd/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func wait(cmd *cobra.Command, args []string) error {

opts := cache.Options{
Namespaces: namespaces,
SyncPeriod: WaitForConfigFlags.SyncPeriod,
}

conf, err := KubernetesConfigFlags.ToRESTConfig()
Expand Down
10 changes: 8 additions & 2 deletions flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ type ConfigFlags struct {
PrintTree *bool
PrintCollapsedTree *bool

Timeout *time.Duration
Timeout *time.Duration
SyncPeriod *time.Duration
}

func NewConfigFlags() *ConfigFlags {
Expand All @@ -38,7 +39,8 @@ func NewConfigFlags() *ConfigFlags {
PrintTree: utilpointer.Bool(true),
PrintCollapsedTree: utilpointer.Bool(true),

Timeout: utilpointer.Duration(time.Duration(600 * time.Second)),
Timeout: utilpointer.Duration(time.Duration(600 * time.Second)),
SyncPeriod: utilpointer.Duration(time.Duration(90 * time.Second)),
}
}

Expand All @@ -47,6 +49,10 @@ func (f *ConfigFlags) AddFlags(flags *pflag.FlagSet) {
flags.DurationVarP(f.Timeout, "timeout", "t", *f.Timeout, "The length of time to wait before ending watch, zero means never. Any other values should contain a corresponding time unit (e.g. 1s, 2m, 3h)")
}

if f.Timeout != nil {
flags.DurationVar(f.SyncPeriod, "sync-period", *f.SyncPeriod, "The length of time to pass to the cache to initiate a sync. (e.g. 1s, 2m, 3h)")
}

if f.PrintVersion != nil {
flags.BoolVarP(f.PrintVersion, "version", "v", *f.PrintVersion, "Display version info")
}
Expand Down
66 changes: 38 additions & 28 deletions pkg/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ func (w *Waitables) ProcessEventDeleteService(ctx context.Context, svc *corev1.S
}

func (w *Waitables) ProcessOldPodEvents(ctx context.Context, pod *corev1.Pod) (bool, error) {
if val, ok := w.UnprocessablePodEvents[pod.UID]; ok {
//log.Printf("Running UnprocessablePodEvents for %s/%s of type %v", pod.Namespace, pod.Name, val.EventType)
//defer delete(w.UnprocessablePodEvents, pod.UID)
if val, ok := w.LastPodEvents[pod.UID]; ok {
//log.Printf("Running LastPodEvents for %s/%s of type %v", pod.Namespace, pod.Name, val.EventType)
//defer delete(w.LastPodEvents, pod.UID)
if val.EventType == EventTypeAdd {
return w.ProcessEventAddPod(ctx, pod)
} else if val.EventType == EventTypeUpdate {
Expand All @@ -83,54 +83,64 @@ func (w *Waitables) ProcessOldPodEvents(ctx context.Context, pod *corev1.Pod) (b
}

func (w *Waitables) ProcessEventAddPod(ctx context.Context, pod *corev1.Pod) (bool, error) {
processed := false
if w.HasPod(pod.ObjectMeta) {
//log.Printf("Add %T %s %s", pod, pod.Namespace, pod.Name)
// if w.HasPod(pod.ObjectMeta) {
// log.Printf("Add %T %s %s", pod, pod.Namespace, pod.Name)
// }

if w.HasPodDirect(pod.ObjectMeta) {
w.SetPodReadyFromPod(pod)
processed = true
}

if podItems, ok := w.Services.GetPods(pod); ok {
for _, podItem := range podItems {
podItem.WithReadyFromPod(pod)
}
processed = true
}
if processed {
return true, nil
} else {
w.UnprocessablePodEvents[pod.UID] = Event{EventType: EventTypeAdd, Pod: pod}
}
return false, nil

w.LastPodEvents[pod.UID] = Event{EventType: EventTypeAdd, Pod: pod}

return w.HasPod(pod.ObjectMeta), nil
}

func (w *Waitables) ProcessEventUpdatePod(ctx context.Context, pod *corev1.Pod) (bool, error) {
if w.HasPod(pod.ObjectMeta) {
//log.Printf("Update %T %s %s", pod, pod.Namespace, pod.Name)
// if w.HasPod(pod.ObjectMeta) {
// log.Printf("Update %T %s %s", pod, pod.Namespace, pod.Name)
// }

if w.HasPodDirect(pod.ObjectMeta) {
w.SetPodReadyFromPod(pod)
return true, nil
} else if podItems, ok := w.Services.GetPods(pod); ok {
}

if podItems, ok := w.Services.GetPods(pod); ok {
for _, podItem := range podItems {
podItem.WithReadyFromPod(pod)
}
} else {
w.UnprocessablePodEvents[pod.UID] = Event{EventType: EventTypeUpdate, Pod: pod}
}
return false, nil

w.LastPodEvents[pod.UID] = Event{EventType: EventTypeUpdate, Pod: pod}

return w.HasPod(pod.ObjectMeta), nil
}

func (w *Waitables) ProcessEventDeletePod(ctx context.Context, pod *corev1.Pod) (bool, error) {
if w.HasPod(pod.ObjectMeta) {
//log.Printf("Delete %T %s %s", pod, pod.Namespace, pod.Name)
// if w.HasPod(pod.ObjectMeta) {
// log.Printf("Delete %T %s %s", pod, pod.Namespace, pod.Name)
// }

if w.HasPodDirect(pod.ObjectMeta) {
w.UnsetPodReady(pod)
return true, nil
} else if podItems, ok := w.Services.GetPods(pod); ok {
}

if podItems, ok := w.Services.GetPods(pod); ok {
for _, podItem := range podItems {
podItem.WithReady(false)
}
} else {
w.UnprocessablePodEvents[pod.UID] = Event{EventType: EventTypeDelete, Pod: pod}
w.Services.DeletePod(&pod.ObjectMeta)
}
return false, nil

w.LastPodEvents[pod.UID] = Event{EventType: EventTypeDelete, Pod: pod}

return w.HasPod(pod.ObjectMeta), nil
}

func (w *Waitables) ProcessEventAddJob(ctx context.Context, job *batchv1.Job) (bool, error) {
Expand Down
3 changes: 1 addition & 2 deletions pkg/items/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"k8s.io/kubectl/pkg/util/podutils"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type NamespacedPodCollection map[string]PodCollection
Expand All @@ -47,7 +46,7 @@ func (i *PodItem) WithReady(ready bool) *PodItem {
}

func (i *PodItem) WithReadyFromPod(pod *corev1.Pod) *PodItem {
i.ready = podutils.IsPodReady(pod) && podutils.IsPodAvailable(pod, 2, metav1.Now())
i.ready = podutils.IsPodReady(pod)
return i
}

Expand Down
31 changes: 31 additions & 0 deletions pkg/items/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,26 @@ func (i *ServiceItem) GetPod(pod ItemInterface) (*PodItem, bool) {
return val, ok
}

func (c ServiceItem) DeletePod(i ItemInterface) {
if c.namespace == i.GetNamespace() {
delete(c.children, i.GetName())
}
}

func (c ServiceCollection) DeletePod(i ItemInterface) {
for _, svc := range c {
svc.DeletePod(i)
}
}

func (c NamespacedServiceCollection) DeletePod(i ItemInterface) {
for ns, nssvcs := range c {
if ns == i.GetNamespace() {
nssvcs.DeletePod(i)
}
}
}

func (c NamespacedServiceCollection) EnsureNamespace(ns string) {
if _, ok := c[ns]; !ok {
c[ns] = ServiceCollection{}
Expand All @@ -83,6 +103,17 @@ func (c NamespacedServiceCollection) Contains(i ItemInterface) bool {
return ok
}

func (c NamespacedServiceCollection) ContainsPod(i ItemInterface) bool {
for _, items := range c {
for _, item := range items {
if item.children.Contains(i) {
return true
}
}
}
return false
}

func (c NamespacedServiceCollection) GetPods(i ItemInterface) ([]*PodItem, bool) {

pods := []*PodItem{}
Expand Down
16 changes: 10 additions & 6 deletions pkg/waitables.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type Waitables struct {
tickerDone chan bool
tickerFinished chan bool

UnprocessablePodEvents map[types.UID]Event
LastPodEvents map[types.UID]Event

Services items.NamespacedServiceCollection
Pods items.NamespacedPodCollection
Expand Down Expand Up @@ -89,10 +89,14 @@ func (w *Waitables) addJob(namespace string, name string) *items.JobItem {
return w.Jobs[namespace][name]
}

func (w *Waitables) HasPod(meta metav1.ObjectMeta) bool {
func (w *Waitables) HasPodDirect(meta metav1.ObjectMeta) bool {
return w.Pods.Contains(&meta)
}

func (w *Waitables) HasPod(meta metav1.ObjectMeta) bool {
return w.HasPodDirect(meta) || w.Services.ContainsPod(&meta)
}

func (w *Waitables) HasService(meta metav1.ObjectMeta) bool {
return w.Services.Contains(&meta)
}
Expand Down Expand Up @@ -319,10 +323,10 @@ func (w *Waitables) WithCache(c cache.Cache) *Waitables {

func NewWaitables(c *flags.ConfigFlags) *Waitables {
w := &Waitables{
UnprocessablePodEvents: map[types.UID]Event{},
Services: items.NamespacedServiceCollection{},
Pods: items.NamespacedPodCollection{},
Jobs: items.NamespacedJobCollection{},
LastPodEvents: map[types.UID]Event{},
Services: items.NamespacedServiceCollection{},
Pods: items.NamespacedPodCollection{},
Jobs: items.NamespacedJobCollection{},

ticker: time.NewTicker(250 * time.Millisecond),
queuedPrints: 0,
Expand Down

0 comments on commit 0622106

Please sign in to comment.