From 362ed6be9d98d1226840ca859b98452336ef404e Mon Sep 17 00:00:00 2001 From: ralongit Date: Wed, 25 Oct 2023 19:10:40 +0300 Subject: [PATCH] Version 0.0.2 - Ignore kubernetes system events that modify only the status of the resources. - I added a test for it - Log to console internal logs instead of shipping/printing them. - Modify event message for cluster level resources(ignore namespace). - Disable sender debug mode - Reorder informer cache sync - Better error handling --- README.md | 4 +- common/parser.go | 12 +++-- common/sender.go | 1 - main.go | 3 +- mockLogzioListener/listener.go | 2 +- resources/clusterResources.go | 29 +++++++--- resources/resourceInformer.go | 86 ++++++++++++++++++++++++------ resources/resourceInformer_test.go | 71 ++++++++++++++++++++++-- 8 files changed, 174 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index ad32f6a..fcbb9cb 100644 --- a/README.md +++ b/README.md @@ -23,5 +23,7 @@ The [tests.yml](https://github.com/logzio/logzio-k8s-events/blob/master/.github/ ![Architecture](./architecture.svg) ## Change log + - **0.0.2**: + - Ignore internal event changes. - **0.0.1**: - - Initial release. + - Initial release. \ No newline at end of file diff --git a/common/parser.go b/common/parser.go index 245719c..88a16b0 100644 --- a/common/parser.go +++ b/common/parser.go @@ -65,20 +65,26 @@ func IsValidList(arrayFieldI []interface{}) (listField []interface{}, isValidArr // ParseEventMessage parses event messages from the kubernetes event log func ParseEventMessage(eventType string, resourceName string, resourceKind string, resourceNamespace string, newResourceVersion string, oldResourceVersions ...string) (msg string) { + // Support cluster level resources + namespacePart := "" + if resourceNamespace != "" { + namespacePart = " in namespace: " + resourceNamespace + } if eventType == EventTypeModified { if len(oldResourceVersions) > 0 { oldResourceVersion := oldResourceVersions[0] - msg = fmt.Sprintf("[EVENT] Resource: %s of kind: %s in namespace: %s was updated from version: %s to new version: %s.", resourceName, resourceKind, resourceNamespace, oldResourceVersion, newResourceVersion) + msg = fmt.Sprintf("[EVENT] Resource: %s of kind: %s%s was updated from version: %s to new version: %s.", resourceName, resourceKind, namespacePart, oldResourceVersion, newResourceVersion) } } else if eventType == EventTypeDeleted { - msg = fmt.Sprintf("[EVENT] Resource: %s of kind: %s in namespace: %s with version: %s was deleted.", resourceName, resourceKind, resourceNamespace, newResourceVersion) + msg = fmt.Sprintf("[EVENT] Resource: %s of kind: %s%s with version: %s was deleted.", resourceName, resourceKind, namespacePart, newResourceVersion) } else if eventType == EventTypeAdded { - msg = fmt.Sprintf("[EVENT] Resource: %s of kind: %s in namespace: %s was added with version: %s.", resourceName, resourceKind, resourceNamespace, newResourceVersion) + msg = fmt.Sprintf("[EVENT] Resource: %s of kind: %s%s was added with version: %s.", resourceName, resourceKind, namespacePart, newResourceVersion) } else { log.Printf("[ERROR] Failed to parse resource event log message. Unknown eventType: %s.\n", eventType) } + return msg } diff --git a/common/sender.go b/common/sender.go index 40d3cfb..31e7127 100644 --- a/common/sender.go +++ b/common/sender.go @@ -24,7 +24,6 @@ func ConfigureLogzioSender() { // Creating a new logz.io logger with specified configuration LogzioSender, err = logzio.New( LogzioToken, - logzio.SetDebug(os.Stderr), logzio.SetUrl(LogzioListener), logzio.SetDrainDuration(time.Second*5), logzio.SetDrainDiskThreshold(99), diff --git a/main.go b/main.go index a0aebf5..3451db0 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "log" "main.go/common" "main.go/resources" ) @@ -10,7 +11,7 @@ func main() { common.ConfigureLogzioSender() // Configure logz.io logger // Sending a log message indicating the start of K8S Events Logz.io Integration - common.SendLog("Starting K8S Events Logz.io Integration.") + log.Printf("Starting K8S Events Logz.io Integration.") // Configuring dynamic client for kubernetes cluster common.DynamicClient = common.ConfigureClusterDynamicClient() diff --git a/mockLogzioListener/listener.go b/mockLogzioListener/listener.go index 6edd317..4c9dac9 100644 --- a/mockLogzioListener/listener.go +++ b/mockLogzioListener/listener.go @@ -46,7 +46,7 @@ func (h *ListenerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { json.Unmarshal(reqBody, &requestBody) b, err := json.Marshal(requestBody) - fmt.Printf("%s", b) + log.Printf("%s", b) if err != nil { http.Error(w, fmt.Sprintf("Bad Request\nRequest:\n%v", requestBody), http.StatusBadRequest) return diff --git a/resources/clusterResources.go b/resources/clusterResources.go index 2699921..49c8a02 100644 --- a/resources/clusterResources.go +++ b/resources/clusterResources.go @@ -5,6 +5,7 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "log" "main.go/common" @@ -154,7 +155,11 @@ func GetDeployment(deploymentName string, namespace string) (relatedDeployment a deploymentsClient := common.K8sClient.AppsV1().Deployments(namespace) deployment, err := deploymentsClient.Get(context.Background(), deploymentName, metav1.GetOptions{}) if err != nil { - log.Printf("[ERROR] Error getting Deployment: %s \nError: %v", deploymentName, err) + if errors.IsNotFound(err) { + + } else { + log.Printf("[ERROR] Failed to get Deployment: %s in namespace %s\nError: %v", deploymentName, namespace, err) + } return } if reflect.ValueOf(deployment).IsValid() { @@ -170,7 +175,11 @@ func GetDaemonSet(daemonSetName string, namespace string) (relatedDaemonSet apps daemonSetsClient := common.K8sClient.AppsV1().DaemonSets(namespace) daemonSet, err := daemonSetsClient.Get(context.Background(), daemonSetName, metav1.GetOptions{}) if err != nil { - log.Printf("[ERROR] Error getting DaemonSet: %s \nError: %v", daemonSetName, err) + if errors.IsNotFound(err) { + + } else { + log.Printf("[ERROR] Failed to get DaemonSet: %s in namespace %s\nError: %v", daemonSetName, namespace, err) + } return } if reflect.ValueOf(daemonSet).IsValid() { @@ -186,7 +195,12 @@ func GetStatefulSet(statefulSetName string, namespace string) (relatedStatefulSe statefulSetsClient := common.K8sClient.AppsV1().StatefulSets(namespace) statefulSet, err := statefulSetsClient.Get(context.Background(), statefulSetName, metav1.GetOptions{}) if err != nil { - log.Printf("[ERROR] Error getting statefulSet: %s \nError: %v", statefulSetName, err) + if errors.IsNotFound(err) { + + } else { + log.Printf("[ERROR] Failed to get StatefulSet: %s in namespace %s\nError: %v", statefulSetName, namespace, err) + } + return } if reflect.ValueOf(statefulSet).IsValid() { @@ -202,9 +216,12 @@ func GetClusterRoleBinding(clusterRoleBindingName string) (relatedClusterRoleBin clusterRoleBindingsClient := common.K8sClient.RbacV1().ClusterRoleBindings() clusterRoleBinding, err := clusterRoleBindingsClient.Get(context.Background(), clusterRoleBindingName, metav1.GetOptions{}) if err != nil { - // Handle error by common the error and returning an empty list of related ClusterRoleBindings. - log.Printf("[ERROR] Error getting clusterRoleBinding: %v", err) - return + if errors.IsNotFound(err) { + + } else { + log.Printf("[ERROR] Failed to get ClusterRoleBinding: %s\nError: %v", clusterRoleBindingName, err) + return + } } if reflect.ValueOf(clusterRoleBinding).IsValid() { diff --git a/resources/resourceInformer.go b/resources/resourceInformer.go index b890d59..a2f487e 100644 --- a/resources/resourceInformer.go +++ b/resources/resourceInformer.go @@ -43,6 +43,50 @@ func createResourceInformer(resourceGVR schema.GroupVersionResource, clusterClie return resourceInformer } +// deleteInternalFields deletes internal fields from a Kubernetes object +func deleteInternalFields(obj *unstructured.Unstructured) { + if meta, ok := obj.Object["metadata"].(map[string]interface{}); ok { + if _, ok := meta["managedFields"]; ok { + delete(meta, "managedFields") + } + if _, ok := meta["resourceVersion"]; ok { + delete(meta, "resourceVersion") + } + if annotations, ok := meta["annotations"].(map[string]interface{}); ok { + if _, ok := annotations["deployment.kubernetes.io/revision"]; ok { + delete(annotations, "deployment.kubernetes.io/revision") + // If annotations is empty, delete it + if len(annotations) == 0 { + delete(meta, "annotations") + } + } + } + } + if _, ok := obj.Object["status"]; ok { + delete(obj.Object, "status") + } +} + +// IgnoreInternalChanges determines whether the only changes between two Kubernetes objects are internal. +func IgnoreInternalChanges(oldObj, newObj interface{}) bool { + oldUnst, ok1 := oldObj.(*unstructured.Unstructured) + newUnst, ok2 := newObj.(*unstructured.Unstructured) + + if ok1 && ok2 { + oldCopy := oldUnst.DeepCopy() + newCopy := newUnst.DeepCopy() + + deleteInternalFields(oldCopy) + deleteInternalFields(newCopy) + newJson, _ := json.Marshal(newCopy) + oldJson, _ := json.Marshal(oldCopy) + + return string(oldJson) == string(newJson) + } + + return false +} + // addInformerEventHandler adds event handlers to the informer. // It handles add, update, and delete events. func addInformerEventHandler(resourceInformer cache.SharedIndexInformer) { @@ -50,6 +94,7 @@ func addInformerEventHandler(resourceInformer cache.SharedIndexInformer) { synced := false mux := &sync.RWMutex{} + _, err := resourceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ // Handle add event AddFunc: func(obj interface{}) { @@ -74,12 +119,16 @@ func addInformerEventHandler(resourceInformer cache.SharedIndexInformer) { return } - event = map[string]interface{}{ - "oldObject": oldObj, - "newObject": newObj, - "eventType": common.EventTypeModified, + if IgnoreInternalChanges(oldObj, newObj) { + return // ignore internal cluster updates + } else { + event = map[string]interface{}{ + "oldObject": oldObj, + "newObject": newObj, + "eventType": common.EventTypeModified, + } + go StructResourceLog(event) } - go StructResourceLog(event) }, // Handle delete event @@ -102,22 +151,25 @@ func addInformerEventHandler(resourceInformer cache.SharedIndexInformer) { if err != nil { msg := fmt.Sprintf("[ERROR] Failed to add event handler for informer.\nERROR:\n%v", err) common.SendLog(msg) - return } ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) defer cancel() + // Start the informer go resourceInformer.Run(ctx.Done()) - isSynced := cache.WaitForCacheSync(ctx.Done(), resourceInformer.HasSynced) + // Wait for all caches to sync + cache.WaitForCacheSync(ctx.Done(), resourceInformer.HasSynced) + + // Set synced to true after all informers have synced mux.Lock() - synced = isSynced + synced = true mux.Unlock() // If the informer failed to sync, log the error and terminate the program - if !isSynced { + if !resourceInformer.HasSynced() { log.Fatal("Informer event handler failed to sync.") } @@ -151,16 +203,16 @@ func AddEventHandlers() { resourceGVR := schema.GroupVersionResource{Group: resourceGroup, Version: "v1", Resource: resourceType} // Attempt to create an informer for the resource - common.SendLog(fmt.Sprintf("Attempting to create informer for resource API: '%s'", resourceAPI)) + log.Printf("Attempting to create informer for resource API: '%s'", resourceAPI) resourceInformer := createResourceInformer(resourceGVR, common.DynamicClient) if resourceInformer != nil { // If the informer was successfully created, attempt to add an event handler to it - common.SendLog(fmt.Sprintf("Attempting to add event handler to informer for resource API: '%s'", resourceAPI)) + log.Printf("Attempting to add event handler to informer for resource API: '%s'", resourceAPI) eventHandlerSync.Add(resourceIndex) go addInformerEventHandler(resourceInformer) { defer eventHandlerSync.Done() - common.SendLog(fmt.Sprintf("Finished adding event handler to informer for resource API: '%s'", resourceAPI)) + log.Printf("Finished adding event handler to informer for resource API: '%s'", resourceAPI) } } else { // If the informer could not be created, log the failure @@ -178,11 +230,11 @@ func EventObject(rawObject map[string]interface{}) (resourceEventObject common.K rawObjUnstructured.Object = rawObject unstructuredObjectJSON, err := rawObjUnstructured.MarshalJSON() if err != nil { - fmt.Printf("[ERROR] Failed to marshal unstructured event object.\nERROR:\n%v", err) + log.Printf("[ERROR] Failed to marshal unstructured event object.\nERROR:\n%v", err) } err = json.Unmarshal(unstructuredObjectJSON, &resourceEventObject) if err != nil { - fmt.Printf("[ERROR] Failed to unmarshal unstructured event object.\nERROR:\n%v", err) + log.Printf("[ERROR] Failed to unmarshal unstructured event object.\nERROR:\n%v", err) } return resourceEventObject @@ -196,14 +248,14 @@ func StructResourceLog(event map[string]interface{}) (isStructured bool, parsedE jsonString, err := json.Marshal(event) if err != nil { - fmt.Printf("Failed to marshal structure event log.\nERROR:\n%v", err) + log.Printf("Failed to marshal structure event log.\nERROR:\n%v", err) return } err = json.Unmarshal(jsonString, logEvent) if err != nil { // event log. - fmt.Printf("Failed to unmarshal structure event log.\nERROR:\n%v", err) + log.Printf("Failed to unmarshal structure event log.\nERROR:\n%v", err) return } eventType := event["eventType"].(string) @@ -221,9 +273,9 @@ func StructResourceLog(event map[string]interface{}) (isStructured bool, parsedE msg = common.ParseEventMessage(eventType, oldResourceName, resourceKind, oldResourceNamespace, newResourceVersion, oldResourceVersion) } + // Get cluster related resources clusterRelatedResources := GetClusterRelatedResources(resourceKind, resourceName, resourceNamespace) - // If the cluster related resources are valid, add them to the event if reflect.ValueOf(clusterRelatedResources).IsValid() { event["relatedClusterServices"] = clusterRelatedResources diff --git a/resources/resourceInformer_test.go b/resources/resourceInformer_test.go index aed77a5..af7c181 100644 --- a/resources/resourceInformer_test.go +++ b/resources/resourceInformer_test.go @@ -2,8 +2,8 @@ package resources import ( "encoding/json" - "fmt" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic/dynamicinformer" @@ -72,12 +72,12 @@ func TestEventObject(t *testing.T) { return } - fmt.Printf("NEW RAW OBJECT: %v\n", logEvent.NewObject) + log.Printf("NEW RAW OBJECT: %v\n", logEvent.NewObject) // Get the new event object eventObject := logEvent.KubernetesEvent - fmt.Printf("NewObject before EventObject: %v\n", newObject) + log.Printf("NewObject before EventObject: %v\n", newObject) eventObject = EventObject(newObject) - fmt.Printf("KubernetesEvent after EventObject: %v\n", eventObject) + log.Printf("KubernetesEvent after EventObject: %v\n", eventObject) if eventObject.Kind != "Deployment" { t.Errorf("Failed to create event object, expected kind Deployment, got %s", eventObject.Kind) @@ -122,3 +122,66 @@ func TestStructResourceLog(t *testing.T) { t.Errorf("Failed to structure resource log") } } + +// TestIgnoreInternalChanges tests the function IgnoreInternalChanges +func TestIgnoreInternalChanges(t *testing.T) { + // Create two identical Kubernetes objects + oldObj := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "test", + "annotations": map[string]interface{}{ + "deployment.kubernetes.io/revision": "1", + }, + "resourceVersion": "1000", + "managedFields": []interface{}{}, + }, + "status": map[string]interface{}{ + "conditions": []interface{}{ + map[string]interface{}{ + "type": "Ready", + "status": "True", + }, + }, + }, + }, + } + + newObj := oldObj.DeepCopy() + + // Change the status field and internal fields of newObj + newObj.Object["status"] = map[string]interface{}{ + "conditions": []interface{}{ + map[string]interface{}{ + "type": "Ready", + "status": "False", + }, + }, + } + newObj.Object["metadata"].(map[string]interface{})["annotations"].(map[string]interface{})["deployment.kubernetes.io/revision"] = "2" + newObj.Object["metadata"].(map[string]interface{})["resourceVersion"] = "1001" + newObj.Object["metadata"].(map[string]interface{})["managedFields"] = []interface{}{ + map[string]interface{}{ + "manager": "kubectl", + "operation": "Update", + "apiVersion": "v1", + "fieldsType": "FieldsV1", + "fieldsV1": map[string]interface{}{ + "f:metadata": map[string]interface{}{ + "f:annotations": map[string]interface{}{ + ".": map[string]interface{}{}, + "f:deployment.kubernetes.io/revision": map[string]interface{}{}, + }, + }, + }, + }, + } + + // Pass the objects to IgnoreInternalChanges + shouldIgnore := IgnoreInternalChanges(oldObj, newObj) + + // Check that the function correctly ignored the status change and internal field changes + if !shouldIgnore { + t.Errorf("IgnoreInternalChanges did not ignore internal changes") + } +}