Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: watch kubernetes events #296

Merged
merged 26 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
0382676
feat: watch kubernetes events
adityathebe Sep 5, 2023
0c1e43b
chore: update getSourceFromEvent test
adityathebe Sep 5, 2023
7fd6d12
feat: impl consuming of the events
adityathebe Sep 5, 2023
f1f76f1
refactor: save changes immediately and then consume the collected in
adityathebe Sep 6, 2023
ddf06f3
wip: setup a way to scrape selected configs by ids
adityathebe Sep 8, 2023
c0be9d9
chore: bump sigs.k8s.io/controller-runtime
adityathebe Sep 11, 2023
99e0387
chore: save results from targetted scrape
adityathebe Sep 11, 2023
8322e53
chore: fix github workflows
adityathebe Sep 11, 2023
73ddb39
fix: Docker image
adityathebe Sep 11, 2023
02ae95d
chore: bump go version in .github/lint.yml
adityathebe Sep 11, 2023
dc11e30
chore: bump duty
adityathebe Sep 12, 2023
88ba276
chore: remove Involved object definition.
adityathebe Sep 12, 2023
91e4a5d
refactor: scrape uses config directly instead of a config index
adityathebe Sep 12, 2023
14eb68b
feat: new format for exclusions
adityathebe Sep 13, 2023
b762efa
chore: bump commons
adityathebe Sep 13, 2023
ee6a55e
chore: go mod tidy on hack
adityathebe Sep 13, 2023
3951c13
refactor: use mapstructure to decode events instead of json
adityathebe Sep 14, 2023
d181711
feat: directly use the kubernetes scraper and remove TargettedScraper
adityathebe Sep 14, 2023
2de4c0e
chore: create different method to generate event from map.
adityathebe Sep 14, 2023
1f5a7ca
refactor: move KubernetsEvent to v1 so Filter() can take event object as
adityathebe Sep 14, 2023
5deb67c
feat: enable retry when watching events
adityathebe Sep 14, 2023
5a50c1b
Merge branch 'main' into feat/watch-kubernetes-events
adityathebe Oct 5, 2023
c1ed6b0
chore: bump gomplate
adityathebe Oct 5, 2023
457a670
Merge branch 'main' into feat/watch-kubernetes-events
adityathebe Jan 23, 2024
86235d9
chore: cleanup
adityathebe Jan 23, 2024
68a00a6
Merge branch 'main' into feat/watch-kubernetes-events
adityathebe Jan 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/flanksource/config-db/db"
"github.com/flanksource/config-db/jobs"
"github.com/flanksource/config-db/query"
"github.com/flanksource/config-db/scrapers/kubernetes"

"github.com/flanksource/config-db/scrapers"
"github.com/labstack/echo/v4"
Expand Down Expand Up @@ -100,6 +101,11 @@ func startScraperCron(configFiles []string) {
}
scrapers.AddToCron(_scraper)

for _, k := range _scraper.Spec.Kubernetes {
ctx := api.NewScrapeContext(context.Background(), _scraper)
go exitOnError(kubernetes.WatchEvents(ctx, k, kubernetesChangeEventConsumer), "error watching events")
}

fn := func() {
ctx := api.NewScrapeContext(context.Background(), _scraper)
if _, err := scrapers.RunScraper(ctx); err != nil {
Expand Down Expand Up @@ -130,3 +136,13 @@ func forward(e *echo.Echo, prefix string, target string) {
func init() {
ServerFlags(Serve.Flags())
}

func exitOnError(err error, description string) {
if err != nil {
logger.Fatalf("%s %v", description, err)
}
}

func kubernetesChangeEventConsumer(ctx *v1.ScrapeContext, changes []*v1.ChangeResult) {
return
}
82 changes: 42 additions & 40 deletions scrapers/kubernetes/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/config-db/utils"
"github.com/google/uuid"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// InvolvedObject represents a Kubernetes InvolvedObject object
Expand All @@ -23,12 +23,32 @@ type InvolvedObject struct {

// Event represents a Kubernetes Event object
type Event struct {
Reason string `json:"reason,omitempty"`
Message string `json:"message,omitempty"`
InvolvedObject *InvolvedObject `json:"involvedObject,omitempty"`
Reason string `json:"reason,omitempty"`
Message string `json:"message,omitempty"`
Source map[string]any `json:"source,omitempty"`
Metadata *metav1.ObjectMeta `json:"metadata,omitempty"`
InvolvedObject *InvolvedObject `json:"involvedObject,omitempty"`
}

func (t *Event) FromObjMap(obj map[string]interface{}) error {
func (t *Event) GetUID() string {
return string(t.Metadata.UID)
}

func (t *Event) AsMap() (map[string]any, error) {
eventJSON, err := json.Marshal(t)
if err != nil {
return nil, fmt.Errorf("failed to marshal event object: %v", err)
}

var result map[string]any
if err := json.Unmarshal(eventJSON, &result); err != nil {
return nil, fmt.Errorf("failed to unmarshal event object: %v", err)
}

return result, nil
}

func (t *Event) FromObjMap(obj any) error {
eventJSON, err := json.Marshal(obj)
if err != nil {
return fmt.Errorf("failed to marshal event object: %v", err)
Expand All @@ -53,52 +73,34 @@ func getSeverityFromReason(reason string, errKeywords, warnKeywords []string) st
return ""
}

func getSourceFromEvent(obj *unstructured.Unstructured) string {
val, ok := obj.Object["source"].(map[string]any)
if !ok {
return ""
}

keyVals := make([]string, 0, len(val))
for k, v := range val {
func getSourceFromEvent(event Event) string {
keyVals := make([]string, 0, len(event.Source))
for k, v := range event.Source {
keyVals = append(keyVals, fmt.Sprintf("%s=%s", k, v))
}

sort.Slice(keyVals, func(i, j int) bool { return keyVals[i] < keyVals[j] })
return fmt.Sprintf("kubernetes/%s", strings.Join(keyVals, ","))
}

func getDetailsFromEvent(obj *unstructured.Unstructured) map[string]any {
details := make(map[string]any)

for k, v := range obj.Object {
switch k {
case "involvedObject":
continue
func getDetailsFromEvent(event Event) map[string]any {
details, err := event.AsMap()
if err != nil {
logger.Errorf("failed to convert event to map: %v", err)
return nil
}

case "metadata":
if metadata, ok := v.(map[string]any); ok {
delete(metadata, "managedFields")
}
}
delete(details, "involvedObject")

details[k] = v
if metadata, ok := details["metadata"].(map[string]any); ok {
delete(metadata, "managedFields")
}

return details
}

func getChangeFromEvent(obj *unstructured.Unstructured, severityKeywords v1.SeverityKeywords) *v1.ChangeResult {
eventCreatedAt := obj.GetCreationTimestamp().Time

var event Event
if err := event.FromObjMap(obj.Object); err != nil {
logger.Errorf("failed to parse event: %v", err)
return nil
}

func getChangeFromEvent(event Event, severityKeywords v1.SeverityKeywords) *v1.ChangeResult {
if event.InvolvedObject == nil {
logger.Debugf("event has no involved object: %v", event)
return nil
}

Expand All @@ -109,13 +111,13 @@ func getChangeFromEvent(obj *unstructured.Unstructured, severityKeywords v1.Seve

return &v1.ChangeResult{
ChangeType: event.Reason,
CreatedAt: &eventCreatedAt,
Details: getDetailsFromEvent(obj),
ExternalChangeID: string(obj.GetUID()),
CreatedAt: &event.Metadata.CreationTimestamp.Time,
Details: getDetailsFromEvent(event),
ExternalChangeID: event.GetUID(),
ExternalID: event.InvolvedObject.UID,
ConfigType: ConfigTypePrefix + event.InvolvedObject.Kind,
Severity: getSeverityFromReason(event.Reason, severityKeywords.Error, severityKeywords.Warn),
Source: getSourceFromEvent(obj),
Source: getSourceFromEvent(event),
Summary: event.Message,
}
}
39 changes: 8 additions & 31 deletions scrapers/kubernetes/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,56 +2,33 @@ package kubernetes

import (
"testing"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

func Test_getSourceFromEvent(t *testing.T) {
type args struct {
obj *unstructured.Unstructured
}
tests := []struct {
name string
args args
args Event
want string
}{
{
name: "simple", args: args{
&unstructured.Unstructured{
Object: map[string]interface{}{
"source": map[string]interface{}{
"component": "kubelet",
"host": "minikube",
},
},
name: "simple", args: Event{
Source: map[string]interface{}{
"component": "kubelet",
"host": "minikube",
},
},
want: "kubernetes/component=kubelet,host=minikube",
},
{
name: "empty", args: args{
&unstructured.Unstructured{
Object: map[string]interface{}{
"source": map[string]interface{}{},
},
},
name: "empty", args: Event{
Source: map[string]any{},
},
want: "kubernetes/",
},
{
name: "nil source", args: args{
&unstructured.Unstructured{
Object: map[string]interface{}{
"source": nil,
},
},
},
want: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := getSourceFromEvent(tt.args.obj); got != tt.want {
if got := getSourceFromEvent(tt.args); got != tt.want {
t.Errorf("getSourceFromEvent() = %v, want %v", got, tt.want)
}
})
Expand Down
84 changes: 84 additions & 0 deletions scrapers/kubernetes/events_watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package kubernetes

import (
"context"
"fmt"
"time"

"github.com/flanksource/commons/logger"
v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/config-db/utils"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var (
// eventWatchTimeout is the timeout for watching events
eventWatchTimeout = time.Second * 10

// maxBufferSize is the maximum number of events that can be buffered before consuming.
maxBufferSize = 50

changesBuffer []*v1.ChangeResult
)

type consumerFunc func(ctx *v1.ScrapeContext, changesBuffer []*v1.ChangeResult)

// WatchEvents watches Kubernetes events for any config changes & fetches
// the referenced config items in batches.
func WatchEvents(ctx *v1.ScrapeContext, config v1.Kubernetes, consume consumerFunc) error {
logger.Infof("Watching kubernetes events: %v", config)

watcher, err := ctx.Kubernetes.CoreV1().Events(config.Namespace).Watch(context.TODO(), metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to watch events: %w", err)
}
defer watcher.Stop()

for {
select {
case <-ctx.Done():
consumeChangeEvents(ctx, consume)
return nil

case <-time.After(eventWatchTimeout):
consumeChangeEvents(ctx, consume)

case watchEvent := <-watcher.ResultChan():
var event Event
if err := event.FromObjMap(watchEvent.Object); err != nil {
logger.Errorf("failed to unmarshal event (id=%s): %v", event.GetUID(), err)
continue
}

logger.Infof("New Event: reason=%s source=%s", event.Reason, event.Source)

if utils.MatchItems(event.Reason, config.Event.Exclusions...) {
adityathebe marked this conversation as resolved.
Show resolved Hide resolved
continue
}

change := getChangeFromEvent(event, config.Event.SeverityKeywords)
if change == nil {
logger.Debugf("No change detected")
continue
}

changesBuffer = append(changesBuffer, change)
if len(changesBuffer) >= maxBufferSize {
consumeChangeEvents(ctx, consume)
}
}
}
}

// consumeChangeEvents fetches the configs referenced by the changes and saves them.
// It clears the buffer after.
func consumeChangeEvents(ctx *v1.ScrapeContext, consume consumerFunc) {
adityathebe marked this conversation as resolved.
Show resolved Hide resolved
logger.Infof("Consuming buffer. Len: %d", len(changesBuffer))
if len(changesBuffer) == 0 {
return
}

consume(ctx, changesBuffer)

changesBuffer = nil
}
13 changes: 9 additions & 4 deletions scrapers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,18 @@ func (kubernetes KubernetesScraper) Scrape(ctx *v1.ScrapeContext) v1.ScrapeResul
}

if obj.GetKind() == "Event" {
reason, _ := obj.Object["reason"].(string)
if utils.MatchItems(reason, config.Event.Exclusions...) {
logger.Debugf("excluding event object for reason [%s].", reason)
var event Event
if err := event.FromObjMap(obj.Object); err != nil {
logger.Errorf("failed to parse event: %v", err)
return nil
}

if utils.MatchItems(event.Reason, config.Event.Exclusions...) {
logger.Debugf("excluding event object for reason [%s].", event.Reason)
continue
}

change := getChangeFromEvent(obj, config.Event.SeverityKeywords)
change := getChangeFromEvent(event, config.Event.SeverityKeywords)
if change != nil {
changeResults = append(changeResults, v1.ScrapeResult{
Changes: []v1.ChangeResult{*change},
Expand Down