Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore internal changes Version 0.0.2 #9

Merged
merged 2 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,7 @@ The [tests.yml](https://github.com/logzio/logzio-k8s-events/blob/master/.github/
![Architecture](./architecture.svg)

## Change log
- **0.0.2**:
- Ignore internal event changes.
- **0.0.1**:
- Initial release.
- Initial release.
12 changes: 9 additions & 3 deletions common/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,26 @@ func IsValidList(arrayFieldI []interface{}) (listField []interface{}, isValidArr

// ParseEventMessage parses event messages from the kubernetes event log
func ParseEventMessage(eventType string, resourceName string, resourceKind string, resourceNamespace string, newResourceVersion string, oldResourceVersions ...string) (msg string) {
// Support cluster level resources
ralongit marked this conversation as resolved.
Show resolved Hide resolved
namespacePart := ""
ralongit marked this conversation as resolved.
Show resolved Hide resolved
if resourceNamespace != "" {
namespacePart = " in namespace: " + resourceNamespace
}

if eventType == EventTypeModified {
if len(oldResourceVersions) > 0 {
oldResourceVersion := oldResourceVersions[0]
msg = fmt.Sprintf("[EVENT] Resource: %s of kind: %s in namespace: %s was updated from version: %s to new version: %s.", resourceName, resourceKind, resourceNamespace, oldResourceVersion, newResourceVersion)
msg = fmt.Sprintf("[EVENT] Resource: %s of kind: %s%s was updated from version: %s to new version: %s.", resourceName, resourceKind, namespacePart, oldResourceVersion, newResourceVersion)
}
} else if eventType == EventTypeDeleted {
msg = fmt.Sprintf("[EVENT] Resource: %s of kind: %s in namespace: %s with version: %s was deleted.", resourceName, resourceKind, resourceNamespace, newResourceVersion)
msg = fmt.Sprintf("[EVENT] Resource: %s of kind: %s%s with version: %s was deleted.", resourceName, resourceKind, namespacePart, newResourceVersion)

} else if eventType == EventTypeAdded {
msg = fmt.Sprintf("[EVENT] Resource: %s of kind: %s in namespace: %s was added with version: %s.", resourceName, resourceKind, resourceNamespace, newResourceVersion)
msg = fmt.Sprintf("[EVENT] Resource: %s of kind: %s%s was added with version: %s.", resourceName, resourceKind, namespacePart, newResourceVersion)
} else {
log.Printf("[ERROR] Failed to parse resource event log message. Unknown eventType: %s.\n", eventType)
}

return msg
}

Expand Down
1 change: 0 additions & 1 deletion common/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ func ConfigureLogzioSender() {
// Creating a new logz.io logger with specified configuration
LogzioSender, err = logzio.New(
LogzioToken,
logzio.SetDebug(os.Stderr),
logzio.SetUrl(LogzioListener),
logzio.SetDrainDuration(time.Second*5),
logzio.SetDrainDiskThreshold(99),
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"log"
"main.go/common"
"main.go/resources"
)
Expand All @@ -10,7 +11,7 @@ func main() {
common.ConfigureLogzioSender() // Configure logz.io logger

// Sending a log message indicating the start of K8S Events Logz.io Integration
common.SendLog("Starting K8S Events Logz.io Integration.")
log.Printf("Starting K8S Events Logz.io Integration.")

// Configuring dynamic client for kubernetes cluster
common.DynamicClient = common.ConfigureClusterDynamicClient()
Expand Down
2 changes: 1 addition & 1 deletion mockLogzioListener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (h *ListenerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
json.Unmarshal(reqBody, &requestBody)
b, err := json.Marshal(requestBody)

fmt.Printf("%s", b)
log.Printf("%s", b)
if err != nil {
http.Error(w, fmt.Sprintf("Bad Request\nRequest:\n%v", requestBody), http.StatusBadRequest)
return
Expand Down
29 changes: 23 additions & 6 deletions resources/clusterResources.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"log"
"main.go/common"
Expand Down Expand Up @@ -154,7 +155,11 @@ func GetDeployment(deploymentName string, namespace string) (relatedDeployment a
deploymentsClient := common.K8sClient.AppsV1().Deployments(namespace)
deployment, err := deploymentsClient.Get(context.Background(), deploymentName, metav1.GetOptions{})
if err != nil {
log.Printf("[ERROR] Error getting Deployment: %s \nError: %v", deploymentName, err)
if errors.IsNotFound(err) {

} else {
log.Printf("[ERROR] Failed to get Deployment: %s in namespace %s\nError: %v", deploymentName, namespace, err)
}
ralongit marked this conversation as resolved.
Show resolved Hide resolved
return
}
if reflect.ValueOf(deployment).IsValid() {
Expand All @@ -170,7 +175,11 @@ func GetDaemonSet(daemonSetName string, namespace string) (relatedDaemonSet apps
daemonSetsClient := common.K8sClient.AppsV1().DaemonSets(namespace)
daemonSet, err := daemonSetsClient.Get(context.Background(), daemonSetName, metav1.GetOptions{})
if err != nil {
log.Printf("[ERROR] Error getting DaemonSet: %s \nError: %v", daemonSetName, err)
if errors.IsNotFound(err) {

} else {
log.Printf("[ERROR] Failed to get DaemonSet: %s in namespace %s\nError: %v", daemonSetName, namespace, err)
}
return
}
if reflect.ValueOf(daemonSet).IsValid() {
Expand All @@ -186,7 +195,12 @@ func GetStatefulSet(statefulSetName string, namespace string) (relatedStatefulSe
statefulSetsClient := common.K8sClient.AppsV1().StatefulSets(namespace)
statefulSet, err := statefulSetsClient.Get(context.Background(), statefulSetName, metav1.GetOptions{})
if err != nil {
log.Printf("[ERROR] Error getting statefulSet: %s \nError: %v", statefulSetName, err)
if errors.IsNotFound(err) {

} else {
log.Printf("[ERROR] Failed to get StatefulSet: %s in namespace %s\nError: %v", statefulSetName, namespace, err)
}

return
}
if reflect.ValueOf(statefulSet).IsValid() {
Expand All @@ -202,9 +216,12 @@ func GetClusterRoleBinding(clusterRoleBindingName string) (relatedClusterRoleBin
clusterRoleBindingsClient := common.K8sClient.RbacV1().ClusterRoleBindings()
clusterRoleBinding, err := clusterRoleBindingsClient.Get(context.Background(), clusterRoleBindingName, metav1.GetOptions{})
if err != nil {
// Handle error by common the error and returning an empty list of related ClusterRoleBindings.
log.Printf("[ERROR] Error getting clusterRoleBinding: %v", err)
return
if errors.IsNotFound(err) {

} else {
log.Printf("[ERROR] Failed to get ClusterRoleBinding: %s\nError: %v", clusterRoleBindingName, err)
return
}
}

if reflect.ValueOf(clusterRoleBinding).IsValid() {
Expand Down
86 changes: 69 additions & 17 deletions resources/resourceInformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,58 @@ func createResourceInformer(resourceGVR schema.GroupVersionResource, clusterClie
return resourceInformer
}

// deleteInternalFields deletes internal fields from a Kubernetes object
func deleteInternalFields(obj *unstructured.Unstructured) {
if meta, ok := obj.Object["metadata"].(map[string]interface{}); ok {
if _, ok := meta["managedFields"]; ok {
delete(meta, "managedFields")
}
if _, ok := meta["resourceVersion"]; ok {
delete(meta, "resourceVersion")
}
if annotations, ok := meta["annotations"].(map[string]interface{}); ok {
if _, ok := annotations["deployment.kubernetes.io/revision"]; ok {
delete(annotations, "deployment.kubernetes.io/revision")
// If annotations is empty, delete it
if len(annotations) == 0 {
delete(meta, "annotations")
}
}
}
}
if _, ok := obj.Object["status"]; ok {
delete(obj.Object, "status")
ralongit marked this conversation as resolved.
Show resolved Hide resolved
}
}

// IgnoreInternalChanges determines whether the only changes between two Kubernetes objects are internal.
func IgnoreInternalChanges(oldObj, newObj interface{}) bool {
oldUnst, ok1 := oldObj.(*unstructured.Unstructured)
newUnst, ok2 := newObj.(*unstructured.Unstructured)

if ok1 && ok2 {
oldCopy := oldUnst.DeepCopy()
newCopy := newUnst.DeepCopy()

deleteInternalFields(oldCopy)
deleteInternalFields(newCopy)
newJson, _ := json.Marshal(newCopy)
oldJson, _ := json.Marshal(oldCopy)

return string(oldJson) == string(newJson)
}

return false
}

// addInformerEventHandler adds event handlers to the informer.
// It handles add, update, and delete events.
func addInformerEventHandler(resourceInformer cache.SharedIndexInformer) {
var event map[string]interface{}
synced := false

mux := &sync.RWMutex{}

_, err := resourceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
// Handle add event
AddFunc: func(obj interface{}) {
Expand All @@ -74,12 +119,16 @@ func addInformerEventHandler(resourceInformer cache.SharedIndexInformer) {
return
}

event = map[string]interface{}{
"oldObject": oldObj,
"newObject": newObj,
"eventType": common.EventTypeModified,
if IgnoreInternalChanges(oldObj, newObj) {
return // ignore internal cluster updates
} else {
event = map[string]interface{}{
"oldObject": oldObj,
"newObject": newObj,
"eventType": common.EventTypeModified,
}
go StructResourceLog(event)
}
go StructResourceLog(event)

},
// Handle delete event
Expand All @@ -102,22 +151,25 @@ func addInformerEventHandler(resourceInformer cache.SharedIndexInformer) {
if err != nil {
msg := fmt.Sprintf("[ERROR] Failed to add event handler for informer.\nERROR:\n%v", err)
common.SendLog(msg)

return
}

ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()

// Start the informer
go resourceInformer.Run(ctx.Done())

isSynced := cache.WaitForCacheSync(ctx.Done(), resourceInformer.HasSynced)
// Wait for all caches to sync
cache.WaitForCacheSync(ctx.Done(), resourceInformer.HasSynced)

// Set synced to true after all informers have synced
mux.Lock()
synced = isSynced
synced = true
mux.Unlock()

// If the informer failed to sync, log the error and terminate the program
if !isSynced {
if !resourceInformer.HasSynced() {
log.Fatal("Informer event handler failed to sync.")
}

Expand Down Expand Up @@ -151,16 +203,16 @@ func AddEventHandlers() {
resourceGVR := schema.GroupVersionResource{Group: resourceGroup, Version: "v1", Resource: resourceType}

// Attempt to create an informer for the resource
common.SendLog(fmt.Sprintf("Attempting to create informer for resource API: '%s'", resourceAPI))
log.Printf("Attempting to create informer for resource API: '%s'", resourceAPI)
resourceInformer := createResourceInformer(resourceGVR, common.DynamicClient)
if resourceInformer != nil {
// If the informer was successfully created, attempt to add an event handler to it
common.SendLog(fmt.Sprintf("Attempting to add event handler to informer for resource API: '%s'", resourceAPI))
log.Printf("Attempting to add event handler to informer for resource API: '%s'", resourceAPI)
eventHandlerSync.Add(resourceIndex)
go addInformerEventHandler(resourceInformer)
{
defer eventHandlerSync.Done()
common.SendLog(fmt.Sprintf("Finished adding event handler to informer for resource API: '%s'", resourceAPI))
log.Printf("Finished adding event handler to informer for resource API: '%s'", resourceAPI)
}
} else {
// If the informer could not be created, log the failure
Expand All @@ -178,11 +230,11 @@ func EventObject(rawObject map[string]interface{}) (resourceEventObject common.K
rawObjUnstructured.Object = rawObject
unstructuredObjectJSON, err := rawObjUnstructured.MarshalJSON()
if err != nil {
fmt.Printf("[ERROR] Failed to marshal unstructured event object.\nERROR:\n%v", err)
log.Printf("[ERROR] Failed to marshal unstructured event object.\nERROR:\n%v", err)
}
err = json.Unmarshal(unstructuredObjectJSON, &resourceEventObject)
if err != nil {
fmt.Printf("[ERROR] Failed to unmarshal unstructured event object.\nERROR:\n%v", err)
log.Printf("[ERROR] Failed to unmarshal unstructured event object.\nERROR:\n%v", err)
}

return resourceEventObject
Expand All @@ -196,14 +248,14 @@ func StructResourceLog(event map[string]interface{}) (isStructured bool, parsedE
jsonString, err := json.Marshal(event)
if err != nil {

fmt.Printf("Failed to marshal structure event log.\nERROR:\n%v", err)
log.Printf("Failed to marshal structure event log.\nERROR:\n%v", err)
return
}
err = json.Unmarshal(jsonString, logEvent)
if err != nil {

// event log.
fmt.Printf("Failed to unmarshal structure event log.\nERROR:\n%v", err)
log.Printf("Failed to unmarshal structure event log.\nERROR:\n%v", err)
return
}
eventType := event["eventType"].(string)
Expand All @@ -221,9 +273,9 @@ func StructResourceLog(event map[string]interface{}) (isStructured bool, parsedE
msg = common.ParseEventMessage(eventType, oldResourceName, resourceKind, oldResourceNamespace, newResourceVersion, oldResourceVersion)

}

// Get cluster related resources
clusterRelatedResources := GetClusterRelatedResources(resourceKind, resourceName, resourceNamespace)

// If the cluster related resources are valid, add them to the event
if reflect.ValueOf(clusterRelatedResources).IsValid() {
event["relatedClusterServices"] = clusterRelatedResources
Expand Down
Loading
Loading