Skip to content

Commit

Permalink
Version 0.0.2
Browse files Browse the repository at this point in the history
- Ignore kubernetes system events that modify only the status of the resources.
  - I added a test for it
- Log to console internal logs instead of shipping/printing them.
- Modify event message for cluster level resources(ignore namespace).
- Disable sender debug mode
- Reorder informer cache sync
- Better error handling
  • Loading branch information
ralongit committed Oct 25, 2023
1 parent 069c75c commit 362ed6b
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 34 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,7 @@ The [tests.yml](https://github.com/logzio/logzio-k8s-events/blob/master/.github/
![Architecture](./architecture.svg)

## Change log
- **0.0.2**:
- Ignore internal event changes.
- **0.0.1**:
- Initial release.
- Initial release.
12 changes: 9 additions & 3 deletions common/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,26 @@ func IsValidList(arrayFieldI []interface{}) (listField []interface{}, isValidArr

// ParseEventMessage parses event messages from the kubernetes event log
func ParseEventMessage(eventType string, resourceName string, resourceKind string, resourceNamespace string, newResourceVersion string, oldResourceVersions ...string) (msg string) {
// Support cluster level resources
namespacePart := ""
if resourceNamespace != "" {
namespacePart = " in namespace: " + resourceNamespace
}

if eventType == EventTypeModified {
if len(oldResourceVersions) > 0 {
oldResourceVersion := oldResourceVersions[0]
msg = fmt.Sprintf("[EVENT] Resource: %s of kind: %s in namespace: %s was updated from version: %s to new version: %s.", resourceName, resourceKind, resourceNamespace, oldResourceVersion, newResourceVersion)
msg = fmt.Sprintf("[EVENT] Resource: %s of kind: %s%s was updated from version: %s to new version: %s.", resourceName, resourceKind, namespacePart, oldResourceVersion, newResourceVersion)
}
} else if eventType == EventTypeDeleted {
msg = fmt.Sprintf("[EVENT] Resource: %s of kind: %s in namespace: %s with version: %s was deleted.", resourceName, resourceKind, resourceNamespace, newResourceVersion)
msg = fmt.Sprintf("[EVENT] Resource: %s of kind: %s%s with version: %s was deleted.", resourceName, resourceKind, namespacePart, newResourceVersion)

} else if eventType == EventTypeAdded {
msg = fmt.Sprintf("[EVENT] Resource: %s of kind: %s in namespace: %s was added with version: %s.", resourceName, resourceKind, resourceNamespace, newResourceVersion)
msg = fmt.Sprintf("[EVENT] Resource: %s of kind: %s%s was added with version: %s.", resourceName, resourceKind, namespacePart, newResourceVersion)
} else {
log.Printf("[ERROR] Failed to parse resource event log message. Unknown eventType: %s.\n", eventType)
}

return msg
}

Expand Down
1 change: 0 additions & 1 deletion common/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ func ConfigureLogzioSender() {
// Creating a new logz.io logger with specified configuration
LogzioSender, err = logzio.New(
LogzioToken,
logzio.SetDebug(os.Stderr),
logzio.SetUrl(LogzioListener),
logzio.SetDrainDuration(time.Second*5),
logzio.SetDrainDiskThreshold(99),
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"log"
"main.go/common"
"main.go/resources"
)
Expand All @@ -10,7 +11,7 @@ func main() {
common.ConfigureLogzioSender() // Configure logz.io logger

// Sending a log message indicating the start of K8S Events Logz.io Integration
common.SendLog("Starting K8S Events Logz.io Integration.")
log.Printf("Starting K8S Events Logz.io Integration.")

// Configuring dynamic client for kubernetes cluster
common.DynamicClient = common.ConfigureClusterDynamicClient()
Expand Down
2 changes: 1 addition & 1 deletion mockLogzioListener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (h *ListenerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
json.Unmarshal(reqBody, &requestBody)
b, err := json.Marshal(requestBody)

fmt.Printf("%s", b)
log.Printf("%s", b)
if err != nil {
http.Error(w, fmt.Sprintf("Bad Request\nRequest:\n%v", requestBody), http.StatusBadRequest)
return
Expand Down
29 changes: 23 additions & 6 deletions resources/clusterResources.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"log"
"main.go/common"
Expand Down Expand Up @@ -154,7 +155,11 @@ func GetDeployment(deploymentName string, namespace string) (relatedDeployment a
deploymentsClient := common.K8sClient.AppsV1().Deployments(namespace)
deployment, err := deploymentsClient.Get(context.Background(), deploymentName, metav1.GetOptions{})
if err != nil {
log.Printf("[ERROR] Error getting Deployment: %s \nError: %v", deploymentName, err)
if errors.IsNotFound(err) {

} else {
log.Printf("[ERROR] Failed to get Deployment: %s in namespace %s\nError: %v", deploymentName, namespace, err)
}
return
}
if reflect.ValueOf(deployment).IsValid() {
Expand All @@ -170,7 +175,11 @@ func GetDaemonSet(daemonSetName string, namespace string) (relatedDaemonSet apps
daemonSetsClient := common.K8sClient.AppsV1().DaemonSets(namespace)
daemonSet, err := daemonSetsClient.Get(context.Background(), daemonSetName, metav1.GetOptions{})
if err != nil {
log.Printf("[ERROR] Error getting DaemonSet: %s \nError: %v", daemonSetName, err)
if errors.IsNotFound(err) {

} else {
log.Printf("[ERROR] Failed to get DaemonSet: %s in namespace %s\nError: %v", daemonSetName, namespace, err)
}
return
}
if reflect.ValueOf(daemonSet).IsValid() {
Expand All @@ -186,7 +195,12 @@ func GetStatefulSet(statefulSetName string, namespace string) (relatedStatefulSe
statefulSetsClient := common.K8sClient.AppsV1().StatefulSets(namespace)
statefulSet, err := statefulSetsClient.Get(context.Background(), statefulSetName, metav1.GetOptions{})
if err != nil {
log.Printf("[ERROR] Error getting statefulSet: %s \nError: %v", statefulSetName, err)
if errors.IsNotFound(err) {

} else {
log.Printf("[ERROR] Failed to get StatefulSet: %s in namespace %s\nError: %v", statefulSetName, namespace, err)
}

return
}
if reflect.ValueOf(statefulSet).IsValid() {
Expand All @@ -202,9 +216,12 @@ func GetClusterRoleBinding(clusterRoleBindingName string) (relatedClusterRoleBin
clusterRoleBindingsClient := common.K8sClient.RbacV1().ClusterRoleBindings()
clusterRoleBinding, err := clusterRoleBindingsClient.Get(context.Background(), clusterRoleBindingName, metav1.GetOptions{})
if err != nil {
// Handle error by common the error and returning an empty list of related ClusterRoleBindings.
log.Printf("[ERROR] Error getting clusterRoleBinding: %v", err)
return
if errors.IsNotFound(err) {

} else {
log.Printf("[ERROR] Failed to get ClusterRoleBinding: %s\nError: %v", clusterRoleBindingName, err)
return
}
}

if reflect.ValueOf(clusterRoleBinding).IsValid() {
Expand Down
86 changes: 69 additions & 17 deletions resources/resourceInformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,58 @@ func createResourceInformer(resourceGVR schema.GroupVersionResource, clusterClie
return resourceInformer
}

// deleteInternalFields deletes internal fields from a Kubernetes object
func deleteInternalFields(obj *unstructured.Unstructured) {
if meta, ok := obj.Object["metadata"].(map[string]interface{}); ok {
if _, ok := meta["managedFields"]; ok {
delete(meta, "managedFields")
}
if _, ok := meta["resourceVersion"]; ok {
delete(meta, "resourceVersion")
}
if annotations, ok := meta["annotations"].(map[string]interface{}); ok {
if _, ok := annotations["deployment.kubernetes.io/revision"]; ok {
delete(annotations, "deployment.kubernetes.io/revision")
// If annotations is empty, delete it
if len(annotations) == 0 {
delete(meta, "annotations")
}
}
}
}
if _, ok := obj.Object["status"]; ok {
delete(obj.Object, "status")
}
}

// IgnoreInternalChanges determines whether the only changes between two Kubernetes objects are internal.
func IgnoreInternalChanges(oldObj, newObj interface{}) bool {
oldUnst, ok1 := oldObj.(*unstructured.Unstructured)
newUnst, ok2 := newObj.(*unstructured.Unstructured)

if ok1 && ok2 {
oldCopy := oldUnst.DeepCopy()
newCopy := newUnst.DeepCopy()

deleteInternalFields(oldCopy)
deleteInternalFields(newCopy)
newJson, _ := json.Marshal(newCopy)
oldJson, _ := json.Marshal(oldCopy)

return string(oldJson) == string(newJson)
}

return false
}

// addInformerEventHandler adds event handlers to the informer.
// It handles add, update, and delete events.
func addInformerEventHandler(resourceInformer cache.SharedIndexInformer) {
var event map[string]interface{}
synced := false

mux := &sync.RWMutex{}

_, err := resourceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
// Handle add event
AddFunc: func(obj interface{}) {
Expand All @@ -74,12 +119,16 @@ func addInformerEventHandler(resourceInformer cache.SharedIndexInformer) {
return
}

event = map[string]interface{}{
"oldObject": oldObj,
"newObject": newObj,
"eventType": common.EventTypeModified,
if IgnoreInternalChanges(oldObj, newObj) {
return // ignore internal cluster updates
} else {
event = map[string]interface{}{
"oldObject": oldObj,
"newObject": newObj,
"eventType": common.EventTypeModified,
}
go StructResourceLog(event)
}
go StructResourceLog(event)

},
// Handle delete event
Expand All @@ -102,22 +151,25 @@ func addInformerEventHandler(resourceInformer cache.SharedIndexInformer) {
if err != nil {
msg := fmt.Sprintf("[ERROR] Failed to add event handler for informer.\nERROR:\n%v", err)
common.SendLog(msg)

return
}

ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()

// Start the informer
go resourceInformer.Run(ctx.Done())

isSynced := cache.WaitForCacheSync(ctx.Done(), resourceInformer.HasSynced)
// Wait for all caches to sync
cache.WaitForCacheSync(ctx.Done(), resourceInformer.HasSynced)

// Set synced to true after all informers have synced
mux.Lock()
synced = isSynced
synced = true
mux.Unlock()

// If the informer failed to sync, log the error and terminate the program
if !isSynced {
if !resourceInformer.HasSynced() {
log.Fatal("Informer event handler failed to sync.")
}

Expand Down Expand Up @@ -151,16 +203,16 @@ func AddEventHandlers() {
resourceGVR := schema.GroupVersionResource{Group: resourceGroup, Version: "v1", Resource: resourceType}

// Attempt to create an informer for the resource
common.SendLog(fmt.Sprintf("Attempting to create informer for resource API: '%s'", resourceAPI))
log.Printf("Attempting to create informer for resource API: '%s'", resourceAPI)
resourceInformer := createResourceInformer(resourceGVR, common.DynamicClient)
if resourceInformer != nil {
// If the informer was successfully created, attempt to add an event handler to it
common.SendLog(fmt.Sprintf("Attempting to add event handler to informer for resource API: '%s'", resourceAPI))
log.Printf("Attempting to add event handler to informer for resource API: '%s'", resourceAPI)
eventHandlerSync.Add(resourceIndex)
go addInformerEventHandler(resourceInformer)
{
defer eventHandlerSync.Done()
common.SendLog(fmt.Sprintf("Finished adding event handler to informer for resource API: '%s'", resourceAPI))
log.Printf("Finished adding event handler to informer for resource API: '%s'", resourceAPI)
}
} else {
// If the informer could not be created, log the failure
Expand All @@ -178,11 +230,11 @@ func EventObject(rawObject map[string]interface{}) (resourceEventObject common.K
rawObjUnstructured.Object = rawObject
unstructuredObjectJSON, err := rawObjUnstructured.MarshalJSON()
if err != nil {
fmt.Printf("[ERROR] Failed to marshal unstructured event object.\nERROR:\n%v", err)
log.Printf("[ERROR] Failed to marshal unstructured event object.\nERROR:\n%v", err)
}
err = json.Unmarshal(unstructuredObjectJSON, &resourceEventObject)
if err != nil {
fmt.Printf("[ERROR] Failed to unmarshal unstructured event object.\nERROR:\n%v", err)
log.Printf("[ERROR] Failed to unmarshal unstructured event object.\nERROR:\n%v", err)
}

return resourceEventObject
Expand All @@ -196,14 +248,14 @@ func StructResourceLog(event map[string]interface{}) (isStructured bool, parsedE
jsonString, err := json.Marshal(event)
if err != nil {

fmt.Printf("Failed to marshal structure event log.\nERROR:\n%v", err)
log.Printf("Failed to marshal structure event log.\nERROR:\n%v", err)
return
}
err = json.Unmarshal(jsonString, logEvent)
if err != nil {

// event log.
fmt.Printf("Failed to unmarshal structure event log.\nERROR:\n%v", err)
log.Printf("Failed to unmarshal structure event log.\nERROR:\n%v", err)
return
}
eventType := event["eventType"].(string)
Expand All @@ -221,9 +273,9 @@ func StructResourceLog(event map[string]interface{}) (isStructured bool, parsedE
msg = common.ParseEventMessage(eventType, oldResourceName, resourceKind, oldResourceNamespace, newResourceVersion, oldResourceVersion)

}

// Get cluster related resources
clusterRelatedResources := GetClusterRelatedResources(resourceKind, resourceName, resourceNamespace)

// If the cluster related resources are valid, add them to the event
if reflect.ValueOf(clusterRelatedResources).IsValid() {
event["relatedClusterServices"] = clusterRelatedResources
Expand Down
Loading

0 comments on commit 362ed6b

Please sign in to comment.