Skip to content

Commit

Permalink
feat: watch kubernetes events (#296)
Browse files Browse the repository at this point in the history
* feat: watch kubernetes events

[skip ci]

* chore: update getSourceFromEvent test

* feat: impl consuming of the events

[skip ci]

* refactor: save changes immediately and then consume the collected in
interval

* wip: setup a way to scrape selected configs by ids

* chore: bump sigs.k8s.io/controller-runtime

* chore: save results from targetted scrape

- go mod tidy on hack
- bug fix. KetOne can return empty object without any error.

* chore: fix github workflows

- Update versions in lint workflow
- Update go version in Docker image

* fix: Docker image

* chore: bump go version in .github/lint.yml

* chore: bump duty

* chore: remove Involved object definition.

* refactor: scrape uses config directly instead of a config index

* feat: new format for exclusions

[skip ci]

* chore: bump commons

* chore: go mod tidy on hack

* refactor: use mapstructure to decode events instead of json

* feat: directly use the kubernetes scraper and remove TargettedScraper
interface

* chore: create different method to generate event from map.

This is because mapstructure doesn't handle the conversion of timestamp
(string) to time.Time type

* refactor: move KubernetsEvent to v1 so Filter() can take event object as
an argument

* feat: enable retry when watching events

* chore: bump gomplate

* chore: cleanup
  • Loading branch information
adityathebe authored Jan 25, 2024
1 parent 8a8b510 commit 21f5489
Show file tree
Hide file tree
Showing 20 changed files with 1,972 additions and 695 deletions.
18 changes: 10 additions & 8 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
.bin/
.config-db/
.DS_Store
.env
.github/
.idea/
.releaserc
.vscode/

build

build/
chart/
CONTRIBUTING.md
SECURITY.md
README.md
PROJECT

cover.out
.releaserc
Dockerfile
PROJECT
README.md
SECURITY.md
test.test
169 changes: 152 additions & 17 deletions api/v1/kubernetes.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package v1

import (
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/flanksource/commons/collections"
"github.com/flanksource/duty/types"
"github.com/flanksource/gomplate/v3"
"github.com/flanksource/mapstructure"
coreV1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// SeverityKeywords is used to identify the severity
Expand All @@ -15,11 +21,88 @@ type SeverityKeywords struct {
Error []string `json:"error,omitempty"`
}

type KubernetesEvent struct {
// Exclusions is a list of keywords that'll be used to exclude
// event objects based on the reason.
Exclusions []string `json:"exclusions,omitempty"`
SeverityKeywords SeverityKeywords `json:"severityKeywords,omitempty"`
type KubernetesEventExclusions struct {
Names []string `json:"name,omitempty" yaml:"name,omitempty"`
Namespaces []string `json:"namespace,omitempty" yaml:"namespace,omitempty"`
Reasons []string `json:"reason,omitempty" yaml:"reason,omitempty"`
}

// Filter returns true if the given input matches any of the exclusions.
func (t *KubernetesEventExclusions) Filter(event KubernetesEvent) bool {
if event.InvolvedObject.Name != "" && len(t.Names) != 0 {
if collections.MatchItems(event.InvolvedObject.Name, t.Names...) {
return true
}
}

if event.InvolvedObject.Namespace != "" && len(t.Namespaces) != 0 {
if collections.MatchItems(event.InvolvedObject.Namespace, t.Namespaces...) {
return true
}
}

if event.Reason != "" && len(t.Reasons) != 0 {
if collections.MatchItems(event.Reason, t.Reasons...) {
return true
}
}

return false
}

type KubernetesEventConfig struct {
Exclusions KubernetesEventExclusions `json:"exclusions,omitempty"`
SeverityKeywords SeverityKeywords `json:"severityKeywords,omitempty"`
}

type KubernetesExclusionConfig struct {
Names []string `json:"name" yaml:"name"`
Kinds []string `json:"kind" yaml:"kind"`
Namespaces []string `json:"namespace" yaml:"namespace"`
Labels map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"`
}

// List returns the union of the exclusions.
func (t *KubernetesExclusionConfig) List() []string {
result := append(t.Names, t.Kinds...)
result = append(result, t.Namespaces...)
return result
}

// Filter returns true if the given input matches any of the exclusions.
func (t *KubernetesExclusionConfig) Filter(name, namespace, kind string, labels map[string]string) bool {
if name != "" && len(t.Names) != 0 {
if collections.MatchItems(name, t.Names...) {
return true
}
}

if namespace != "" && len(t.Namespaces) != 0 {
if collections.MatchItems(namespace, t.Namespaces...) {
return true
}
}

if kind != "" && len(t.Kinds) != 0 {
if collections.MatchItems(kind, t.Kinds...) {
return true
}
}

if len(labels) != 0 {
for k, v := range t.Labels {
qVal, ok := labels[k]
if !ok {
continue
}

if collections.MatchItems(qVal, v) {
return true
}
}
}

return false
}

type KubernetesRelationshipLookup struct {
Expand Down Expand Up @@ -67,18 +150,18 @@ type KubernetesRelationship struct {

type Kubernetes struct {
BaseScraper `json:",inline"`
ClusterName string `json:"clusterName,omitempty"`
Namespace string `json:"namespace,omitempty"`
UseCache bool `json:"useCache,omitempty"`
AllowIncomplete bool `json:"allowIncomplete,omitempty"`
Scope string `json:"scope,omitempty"`
Since string `json:"since,omitempty"`
Selector string `json:"selector,omitempty"`
FieldSelector string `json:"fieldSelector,omitempty"`
MaxInflight int64 `json:"maxInflight,omitempty"`
Exclusions []string `json:"exclusions,omitempty"`
Kubeconfig *types.EnvVar `json:"kubeconfig,omitempty"`
Event KubernetesEvent `json:"event,omitempty"`
ClusterName string `json:"clusterName,omitempty"`
Namespace string `json:"namespace,omitempty"`
UseCache bool `json:"useCache,omitempty"`
AllowIncomplete bool `json:"allowIncomplete,omitempty"`
Scope string `json:"scope,omitempty"`
Since string `json:"since,omitempty"`
Selector string `json:"selector,omitempty"`
FieldSelector string `json:"fieldSelector,omitempty"`
MaxInflight int64 `json:"maxInflight,omitempty"`
Exclusions KubernetesExclusionConfig `json:"exclusions,omitempty"`
Kubeconfig *types.EnvVar `json:"kubeconfig,omitempty"`
Event KubernetesEventConfig `json:"event,omitempty"`

// Relationships specify the fields to use to relate Kubernetes objects.
Relationships []KubernetesRelationship `json:"relationships,omitempty"`
Expand Down Expand Up @@ -128,3 +211,55 @@ func (r ResourceSelector) String() string {
}
return s
}

type InvolvedObject coreV1.ObjectReference

// KubernetesEvent represents a Kubernetes KubernetesEvent object
type KubernetesEvent struct {
Reason string `json:"reason,omitempty"`
Message string `json:"message,omitempty"`
Source map[string]string `json:"source,omitempty"`
Metadata *metav1.ObjectMeta `json:"metadata,omitempty" mapstructure:"metadata"`
InvolvedObject *InvolvedObject `json:"involvedObject,omitempty"`
}

func (t *KubernetesEvent) GetUID() string {
return string(t.Metadata.UID)
}

func (t *KubernetesEvent) AsMap() (map[string]any, error) {
eventJSON, err := json.Marshal(t)
if err != nil {
return nil, fmt.Errorf("failed to marshal event object: %v", err)
}

var result map[string]any
if err := json.Unmarshal(eventJSON, &result); err != nil {
return nil, fmt.Errorf("failed to unmarshal event object: %v", err)
}

return result, nil
}

func (t *KubernetesEvent) FromObj(obj any) error {
conf := mapstructure.DecoderConfig{
TagName: "json", // Need to set this to json because when `obj` is v1.Event there's no mapstructure struct tag.
Result: t,
}

decoder, err := mapstructure.NewDecoder(&conf)
if err != nil {
return err
}

return decoder.Decode(obj)
}

func (t *KubernetesEvent) FromObjMap(obj any) error {
b, err := json.Marshal(obj)
if err != nil {
return err
}

return json.Unmarshal(b, t)
}
98 changes: 98 additions & 0 deletions api/v1/kubernetes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package v1

import (
"testing"
)

func TestKubernetesConfigExclusions_Filter(t *testing.T) {
type args struct {
name string
namespace string
kind string
labels map[string]string
}
tests := []struct {
name string
config KubernetesExclusionConfig
args args
shouldExclude bool
}{
{
name: "exclusion by name",
config: KubernetesExclusionConfig{
Names: []string{"junit-*"},
},
args: args{
name: "junit-123",
},
shouldExclude: true,
},
{
name: "exclusion by namespace",
config: KubernetesExclusionConfig{
Namespaces: []string{"*-canaries"},
},
args: args{
namespace: "customer-canaries",
},
shouldExclude: true,
},
{
name: "exclusion by kind",
config: KubernetesExclusionConfig{
Kinds: []string{"*Chart"},
},
args: args{
kind: "HelmChart",
},
shouldExclude: true,
},
{
name: "exclusion by labels | exact match",
config: KubernetesExclusionConfig{
Labels: map[string]string{
"prod": "env",
},
},
args: args{
labels: map[string]string{
"prod": "env",
},
},
shouldExclude: true,
},
{
name: "exclusion by labels | one matches",
config: KubernetesExclusionConfig{
Labels: map[string]string{
"prod": "env",
"is-billed": "true",
"trace-enabled": "true",
},
},
args: args{
labels: map[string]string{
"prod": "env",
"trace-enabled": "false",
},
},
shouldExclude: true,
},
{
name: "no exclusions",
config: KubernetesExclusionConfig{},
args: args{
namespace: "default",
name: "test-foo",
},
shouldExclude: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := tt.config.Filter(tt.args.name, tt.args.namespace, tt.args.kind, tt.args.labels); got != tt.shouldExclude {
t.Errorf("KubernetesConfigExclusions.Filter() = %v, want %v", got, tt.shouldExclude)
}
})
}
}
Loading

0 comments on commit 21f5489

Please sign in to comment.