-
Notifications
You must be signed in to change notification settings - Fork 84
/
pods.go
106 lines (92 loc) · 3.35 KB
/
pods.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package kubernetes
import (
"context"
"fmt"
"sort"
"sync"
"time"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
apisv1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)
const resyncInterval = 1 * time.Minute
// PodInformer is a event handler for Pod events registered to, that builds a local list of valid and relevant pods
// and sends an event to the endpoint channel, triggering a resync of the targets.
func (a *Adapter) PodInformer(ctx context.Context, endpointChan chan<- []string) (err error) {
podEndpoints := sync.Map{}
log.Infof("Watching for Pods with labelselector %s in namespace %s", a.cniPodLabelSelector, a.cniPodNamespace)
factory := informers.NewSharedInformerFactoryWithOptions(a.clientset, resyncInterval, informers.WithNamespace(a.cniPodNamespace),
informers.WithTweakListOptions(func(options *apisv1.ListOptions) { options.LabelSelector = a.cniPodLabelSelector }))
informer := factory.Core().V1().Pods().Informer()
factory.Start(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
return fmt.Errorf("timed out waiting for caches to sync")
}
// list warms the pod cache and verifies whether pods for given specs can be found, preventing to fail silently
var podList []*corev1.Pod
for {
podList, err = factory.Core().V1().Pods().Lister().List(labels.Everything())
if err == nil && len(podList) > 0 {
break
}
log.Errorf("Error listing Pods with labelselector %s in namespace %s: %v", a.cniPodNamespace, a.cniPodLabelSelector, err)
time.Sleep(resyncInterval)
}
for _, pod := range podList {
if !isPodTerminating(pod) && isPodRunning(pod) {
podEndpoints.Store(pod.Name, pod.Status.PodIP)
}
}
queueEndpoints(&podEndpoints, endpointChan)
// delta triggered updates
_, err = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(_, newResource interface{}) {
pod, ok := newResource.(*corev1.Pod)
if !ok {
log.Printf("cannot cast object to corev1.Pod %v", newResource)
return
}
switch {
case isPodTerminating(pod):
name, exists := podEndpoints.LoadAndDelete(pod.Name)
if !exists {
return
}
log.Infof("Deleted pod: %s IP: %s", pod.Name, name)
queueEndpoints(&podEndpoints, endpointChan)
case isPodRunning(pod):
if _, isStored := podEndpoints.LoadOrStore(pod.Name, pod.Status.PodIP); isStored {
return
}
log.Infof("New discovered pod: %s IP: %s", pod.Name, pod.Status.PodIP)
queueEndpoints(&podEndpoints, endpointChan)
}
},
})
if err != nil {
log.Errorf("Error adding event handler to informer: %v", err)
}
<-ctx.Done()
return nil
}
func queueEndpoints(podEndpoints *sync.Map, endpointChan chan<- []string) {
podList := []string{}
podEndpoints.Range(func(key, value interface{}) bool {
podList = append(podList, value.(string))
return true
})
sort.StringSlice(podList).Sort()
endpointChan <- podList
}
// intermediate states à la kubectl https://github.com/kubernetes/kubernetes/blob/76cdb57ccfbfebc689fbce45f289add8a0562e07/pkg/printers/internalversion/printers.go#L839
func isPodTerminating(p *corev1.Pod) bool {
return p.DeletionTimestamp != nil
}
func isPodRunning(p *corev1.Pod) bool {
return len(p.Status.ContainerStatuses) > 0 &&
p.Status.ContainerStatuses[0].State.Running != nil &&
p.Status.PodIP != ""
}