Skip to content

Commit

Permalink
Merge pull request #9 from logzio/ignore-status-only-changes
Browse files Browse the repository at this point in the history
Ignore internal changes Version 0.0.2
  • Loading branch information
ralongit authored Oct 26, 2023
2 parents 069c75c + 8104c3c commit 8ca0d99
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 37 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,7 @@ The [tests.yml](https://github.com/logzio/logzio-k8s-events/blob/master/.github/
![Architecture](./architecture.svg)

## Change log
- **0.0.2**:
- Ignore internal event changes.
- **0.0.1**:
- Initial release.
- Initial release.
8 changes: 8 additions & 0 deletions common/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,11 @@ const (
DefaultListener = "https://listener.logz.io:8071"
DefaultLogType = "logzio-k8s-events"
)
const (
Metadata = "metadata"
ManagedFields = "managedFields"
ResourceVersion = "resourceVersion"
Annotations = "annotations"
DeploymentRevision = "deployment.kubernetes.io/revision"
Status = "status"
)
12 changes: 9 additions & 3 deletions common/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,26 @@ func IsValidList(arrayFieldI []interface{}) (listField []interface{}, isValidArr

// ParseEventMessage parses event messages from the kubernetes event log
func ParseEventMessage(eventType string, resourceName string, resourceKind string, resourceNamespace string, newResourceVersion string, oldResourceVersions ...string) (msg string) {
// More accurate message for cluster level resources
inNamespaceMsg := ""
if resourceNamespace != "" {
inNamespaceMsg = " in namespace: " + resourceNamespace
}

if eventType == EventTypeModified {
if len(oldResourceVersions) > 0 {
oldResourceVersion := oldResourceVersions[0]
msg = fmt.Sprintf("[EVENT] Resource: %s of kind: %s in namespace: %s was updated from version: %s to new version: %s.", resourceName, resourceKind, resourceNamespace, oldResourceVersion, newResourceVersion)
msg = fmt.Sprintf("[EVENT] Resource: %s of kind: %s%s was updated from version: %s to new version: %s.", resourceName, resourceKind, inNamespaceMsg, oldResourceVersion, newResourceVersion)
}
} else if eventType == EventTypeDeleted {
msg = fmt.Sprintf("[EVENT] Resource: %s of kind: %s in namespace: %s with version: %s was deleted.", resourceName, resourceKind, resourceNamespace, newResourceVersion)
msg = fmt.Sprintf("[EVENT] Resource: %s of kind: %s%s with version: %s was deleted.", resourceName, resourceKind, inNamespaceMsg, newResourceVersion)

} else if eventType == EventTypeAdded {
msg = fmt.Sprintf("[EVENT] Resource: %s of kind: %s in namespace: %s was added with version: %s.", resourceName, resourceKind, resourceNamespace, newResourceVersion)
msg = fmt.Sprintf("[EVENT] Resource: %s of kind: %s%s was added with version: %s.", resourceName, resourceKind, inNamespaceMsg, newResourceVersion)
} else {
log.Printf("[ERROR] Failed to parse resource event log message. Unknown eventType: %s.\n", eventType)
}

return msg
}

Expand Down
1 change: 0 additions & 1 deletion common/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ func ConfigureLogzioSender() {
// Creating a new logz.io logger with specified configuration
LogzioSender, err = logzio.New(
LogzioToken,
logzio.SetDebug(os.Stderr),
logzio.SetUrl(LogzioListener),
logzio.SetDrainDuration(time.Second*5),
logzio.SetDrainDiskThreshold(99),
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"log"
"main.go/common"
"main.go/resources"
)
Expand All @@ -10,7 +11,7 @@ func main() {
common.ConfigureLogzioSender() // Configure logz.io logger

// Sending a log message indicating the start of K8S Events Logz.io Integration
common.SendLog("Starting K8S Events Logz.io Integration.")
log.Printf("Starting K8S Events Logz.io Integration.")

// Configuring dynamic client for kubernetes cluster
common.DynamicClient = common.ConfigureClusterDynamicClient()
Expand Down
2 changes: 1 addition & 1 deletion mockLogzioListener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (h *ListenerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
json.Unmarshal(reqBody, &requestBody)
b, err := json.Marshal(requestBody)

fmt.Printf("%s", b)
log.Printf("%s", b)
if err != nil {
http.Error(w, fmt.Sprintf("Bad Request\nRequest:\n%v", requestBody), http.StatusBadRequest)
return
Expand Down
38 changes: 29 additions & 9 deletions resources/clusterResources.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"log"
"main.go/common"
Expand Down Expand Up @@ -154,8 +155,13 @@ func GetDeployment(deploymentName string, namespace string) (relatedDeployment a
deploymentsClient := common.K8sClient.AppsV1().Deployments(namespace)
deployment, err := deploymentsClient.Get(context.Background(), deploymentName, metav1.GetOptions{})
if err != nil {
log.Printf("[ERROR] Error getting Deployment: %s \nError: %v", deploymentName, err)
return
resourceNotFoundErr := errors.IsNotFound(err)
// Ignore errors of resource not found, as the resource may not exist in the cluster in deletion events.
if !resourceNotFoundErr {
log.Printf("[ERROR] Failed to get Deployment: %s in namespace %s\nError: %v", deploymentName, namespace, err)
return
}

}
if reflect.ValueOf(deployment).IsValid() {
relatedDeployment = *deployment
Expand All @@ -170,8 +176,13 @@ func GetDaemonSet(daemonSetName string, namespace string) (relatedDaemonSet apps
daemonSetsClient := common.K8sClient.AppsV1().DaemonSets(namespace)
daemonSet, err := daemonSetsClient.Get(context.Background(), daemonSetName, metav1.GetOptions{})
if err != nil {
log.Printf("[ERROR] Error getting DaemonSet: %s \nError: %v", daemonSetName, err)
return
resourceNotFoundErr := errors.IsNotFound(err)
// Ignore errors of resource not found, as the resource may no longer exist in the cluster in deletion events.
if !resourceNotFoundErr {
log.Printf("[ERROR] Failed to get DaemonSet: %s in namespace %s\nError: %v", daemonSetName, namespace, err)
return
}

}
if reflect.ValueOf(daemonSet).IsValid() {
relatedDaemonSet = *daemonSet
Expand All @@ -186,8 +197,13 @@ func GetStatefulSet(statefulSetName string, namespace string) (relatedStatefulSe
statefulSetsClient := common.K8sClient.AppsV1().StatefulSets(namespace)
statefulSet, err := statefulSetsClient.Get(context.Background(), statefulSetName, metav1.GetOptions{})
if err != nil {
log.Printf("[ERROR] Error getting statefulSet: %s \nError: %v", statefulSetName, err)
return
resourceNotFoundErr := errors.IsNotFound(err)
// Ignore errors of resource not found, as the resource may not exist in the cluster in deletion events.
if !resourceNotFoundErr {
log.Printf("[ERROR] Failed to get StatefulSet: %s in namespace %s\nError: %v", statefulSetName, namespace, err)
return
}

}
if reflect.ValueOf(statefulSet).IsValid() {
relatedStatefulSet = *statefulSet
Expand All @@ -202,9 +218,13 @@ func GetClusterRoleBinding(clusterRoleBindingName string) (relatedClusterRoleBin
clusterRoleBindingsClient := common.K8sClient.RbacV1().ClusterRoleBindings()
clusterRoleBinding, err := clusterRoleBindingsClient.Get(context.Background(), clusterRoleBindingName, metav1.GetOptions{})
if err != nil {
// Handle error by common the error and returning an empty list of related ClusterRoleBindings.
log.Printf("[ERROR] Error getting clusterRoleBinding: %v", err)
return
resourceNotFoundErr := errors.IsNotFound(err)
// Ignore errors of resource not found, as the resource may not exist in the cluster in deletion events.
if !resourceNotFoundErr {
log.Printf("[ERROR] Failed to get ClusterRoleBinding: %s\nError: %v", clusterRoleBindingName, err)
return
}

}

if reflect.ValueOf(clusterRoleBinding).IsValid() {
Expand Down
86 changes: 69 additions & 17 deletions resources/resourceInformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,58 @@ func createResourceInformer(resourceGVR schema.GroupVersionResource, clusterClie
return resourceInformer
}

// deleteInternalFields deletes internal fields from a Kubernetes object
func deleteInternalFields(obj *unstructured.Unstructured) {
if meta, ok := obj.Object[common.Metadata].(map[string]interface{}); ok {
if _, ok := meta[common.ManagedFields]; ok {
delete(meta, common.ManagedFields)
}
if _, ok := meta[common.ResourceVersion]; ok {
delete(meta, common.ResourceVersion)
}
if annotations, ok := meta[common.Annotations].(map[string]interface{}); ok {
if _, ok := annotations[common.DeploymentRevision]; ok {
delete(annotations, common.DeploymentRevision)
// If annotations is empty, delete it
if len(annotations) == 0 {
delete(meta, common.Annotations)
}
}
}
}
if _, ok := obj.Object[common.Status]; ok {
delete(obj.Object, common.Status)
}
}

// IgnoreInternalChanges determines whether the only changes between two Kubernetes objects are internal.
func IgnoreInternalChanges(oldObj, newObj interface{}) bool {
oldUnst, ok1 := oldObj.(*unstructured.Unstructured)
newUnst, ok2 := newObj.(*unstructured.Unstructured)

if ok1 && ok2 {
oldCopy := oldUnst.DeepCopy()
newCopy := newUnst.DeepCopy()

deleteInternalFields(oldCopy)
deleteInternalFields(newCopy)
newJson, _ := json.Marshal(newCopy)
oldJson, _ := json.Marshal(oldCopy)

return string(oldJson) == string(newJson)
}

return false
}

// addInformerEventHandler adds event handlers to the informer.
// It handles add, update, and delete events.
func addInformerEventHandler(resourceInformer cache.SharedIndexInformer) {
var event map[string]interface{}
synced := false

mux := &sync.RWMutex{}

_, err := resourceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
// Handle add event
AddFunc: func(obj interface{}) {
Expand All @@ -74,12 +119,16 @@ func addInformerEventHandler(resourceInformer cache.SharedIndexInformer) {
return
}

event = map[string]interface{}{
"oldObject": oldObj,
"newObject": newObj,
"eventType": common.EventTypeModified,
if IgnoreInternalChanges(oldObj, newObj) {
return // ignore internal cluster updates
} else {
event = map[string]interface{}{
"oldObject": oldObj,
"newObject": newObj,
"eventType": common.EventTypeModified,
}
go StructResourceLog(event)
}
go StructResourceLog(event)

},
// Handle delete event
Expand All @@ -102,22 +151,25 @@ func addInformerEventHandler(resourceInformer cache.SharedIndexInformer) {
if err != nil {
msg := fmt.Sprintf("[ERROR] Failed to add event handler for informer.\nERROR:\n%v", err)
common.SendLog(msg)

return
}

ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()

// Start the informer
go resourceInformer.Run(ctx.Done())

isSynced := cache.WaitForCacheSync(ctx.Done(), resourceInformer.HasSynced)
// Wait for all caches to sync
cache.WaitForCacheSync(ctx.Done(), resourceInformer.HasSynced)

// Set synced to true after all informers have synced
mux.Lock()
synced = isSynced
synced = true
mux.Unlock()

// If the informer failed to sync, log the error and terminate the program
if !isSynced {
if !resourceInformer.HasSynced() {
log.Fatal("Informer event handler failed to sync.")
}

Expand Down Expand Up @@ -151,16 +203,16 @@ func AddEventHandlers() {
resourceGVR := schema.GroupVersionResource{Group: resourceGroup, Version: "v1", Resource: resourceType}

// Attempt to create an informer for the resource
common.SendLog(fmt.Sprintf("Attempting to create informer for resource API: '%s'", resourceAPI))
log.Printf("Attempting to create informer for resource API: '%s'", resourceAPI)
resourceInformer := createResourceInformer(resourceGVR, common.DynamicClient)
if resourceInformer != nil {
// If the informer was successfully created, attempt to add an event handler to it
common.SendLog(fmt.Sprintf("Attempting to add event handler to informer for resource API: '%s'", resourceAPI))
log.Printf("Attempting to add event handler to informer for resource API: '%s'", resourceAPI)
eventHandlerSync.Add(resourceIndex)
go addInformerEventHandler(resourceInformer)
{
defer eventHandlerSync.Done()
common.SendLog(fmt.Sprintf("Finished adding event handler to informer for resource API: '%s'", resourceAPI))
log.Printf("Finished adding event handler to informer for resource API: '%s'", resourceAPI)
}
} else {
// If the informer could not be created, log the failure
Expand All @@ -178,11 +230,11 @@ func EventObject(rawObject map[string]interface{}) (resourceEventObject common.K
rawObjUnstructured.Object = rawObject
unstructuredObjectJSON, err := rawObjUnstructured.MarshalJSON()
if err != nil {
fmt.Printf("[ERROR] Failed to marshal unstructured event object.\nERROR:\n%v", err)
log.Printf("[ERROR] Failed to marshal unstructured event object.\nERROR:\n%v", err)
}
err = json.Unmarshal(unstructuredObjectJSON, &resourceEventObject)
if err != nil {
fmt.Printf("[ERROR] Failed to unmarshal unstructured event object.\nERROR:\n%v", err)
log.Printf("[ERROR] Failed to unmarshal unstructured event object.\nERROR:\n%v", err)
}

return resourceEventObject
Expand All @@ -196,14 +248,14 @@ func StructResourceLog(event map[string]interface{}) (isStructured bool, parsedE
jsonString, err := json.Marshal(event)
if err != nil {

fmt.Printf("Failed to marshal structure event log.\nERROR:\n%v", err)
log.Printf("Failed to marshal structure event log.\nERROR:\n%v", err)
return
}
err = json.Unmarshal(jsonString, logEvent)
if err != nil {

// event log.
fmt.Printf("Failed to unmarshal structure event log.\nERROR:\n%v", err)
log.Printf("Failed to unmarshal structure event log.\nERROR:\n%v", err)
return
}
eventType := event["eventType"].(string)
Expand All @@ -221,9 +273,9 @@ func StructResourceLog(event map[string]interface{}) (isStructured bool, parsedE
msg = common.ParseEventMessage(eventType, oldResourceName, resourceKind, oldResourceNamespace, newResourceVersion, oldResourceVersion)

}

// Get cluster related resources
clusterRelatedResources := GetClusterRelatedResources(resourceKind, resourceName, resourceNamespace)

// If the cluster related resources are valid, add them to the event
if reflect.ValueOf(clusterRelatedResources).IsValid() {
event["relatedClusterServices"] = clusterRelatedResources
Expand Down
Loading

0 comments on commit 8ca0d99

Please sign in to comment.