diff --git a/deploy/helm/templates/rbac/cluster_pod_required_role.yaml b/deploy/helm/templates/rbac/cluster_pod_required_role.yaml index 8b6a99a156d..b1a0a8e37d1 100644 --- a/deploy/helm/templates/rbac/cluster_pod_required_role.yaml +++ b/deploy/helm/templates/rbac/cluster_pod_required_role.yaml @@ -25,6 +25,8 @@ rules: - events verbs: - create + - get + - update - apiGroups: - "" resources: diff --git a/pkg/kbagent/service/command.go b/pkg/kbagent/service/command.go index 9f2f07a0302..a9f69fbc2c0 100644 --- a/pkg/kbagent/service/command.go +++ b/pkg/kbagent/service/command.go @@ -22,6 +22,7 @@ package service import ( "bytes" "context" + "fmt" "io" "os" "os/exec" @@ -66,12 +67,11 @@ func runCommand(ctx context.Context, action *proto.ExecAction, parameters map[st if err != nil { var exitErr *exec.ExitError if errors.As(err, &exitErr) { - stderrMsg := result.stderr.String() - if len(stderrMsg) > 0 { - err = errors.Wrapf(proto.ErrFailed, "exec exit %d and stderr: %s", exitErr.ExitCode(), stderrMsg) - } else { - err = errors.Wrapf(proto.ErrFailed, "exec exit %d but stderr is blank", exitErr.ExitCode()) + errMsg := fmt.Sprintf("exit code: %d", exitErr.ExitCode()) + if stderrMsg := result.stderr.String(); len(stderrMsg) > 0 { + errMsg += fmt.Sprintf(", stderr: %s", stderrMsg) } + return nil, errors.Wrapf(proto.ErrFailed, errMsg) } return nil, err } diff --git a/pkg/kbagent/util/event.go b/pkg/kbagent/util/event.go index ece0e1d8ed8..3c2cde98e6e 100644 --- a/pkg/kbagent/util/event.go +++ b/pkg/kbagent/util/event.go @@ -22,14 +22,15 @@ package util import ( "context" "fmt" + "hash/fnv" "time" "github.com/go-logr/logr" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/rand" "k8s.io/client-go/kubernetes" ctlruntime "sigs.k8s.io/controller-runtime" @@ -37,16 +38,17 @@ import ( ) const ( - sendEventMaxAttempts = 30 - sendEventRetryInterval = 10 * time.Second + maxRetryAttempts = 30 + retryInterval = 10 * time.Second ) func SendEventWithMessage(logger *logr.Logger, reason string, message string, sync bool) error { send := func() error { - event := createEvent(reason, message) - err := sendEvent(event) + err := createOrUpdateEvent(reason, message) if logger != nil && err != nil { - logger.Error(err, fmt.Sprintf("send event failed, reason: %s, message: %s", reason, message)) + logger.Error(err, "failed to send event", + "reason", reason, + "message", message) } return err } @@ -59,10 +61,11 @@ func SendEventWithMessage(logger *logr.Logger, reason string, message string, sy return nil } -func createEvent(reason string, message string) *corev1.Event { +func newEvent(reason string, message string) *corev1.Event { + now := metav1.Now() return &corev1.Event{ ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s.%s", podName(), rand.String(16)), + Name: generateEventName(reason, message), Namespace: namespace(), }, InvolvedObject: corev1.ObjectReference{ @@ -78,29 +81,47 @@ func createEvent(reason string, message string) *corev1.Event { Component: proto.ProbeEventSourceComponent, Host: nodeName(), }, - FirstTimestamp: metav1.Now(), - LastTimestamp: metav1.Now(), + FirstTimestamp: now, + LastTimestamp: now, EventTime: metav1.NowMicro(), ReportingController: proto.ProbeEventReportingController, ReportingInstance: podName(), Action: reason, Type: "Normal", + Count: 1, } } -func sendEvent(event *corev1.Event) error { +func createOrUpdateEvent(reason, message string) error { clientSet, err := getK8sClientSet() if err != nil { return err } - for i := 0; i < sendEventMaxAttempts; i++ { - _, err = clientSet.CoreV1().Events(namespace()).Create(context.Background(), event, metav1.CreateOptions{}) + eventsClient := clientSet.CoreV1().Events(namespace()) + eventName := generateEventName(reason, message) + + var event *corev1.Event + for i := 0; i < maxRetryAttempts; i++ { + event, err = eventsClient.Get(context.Background(), eventName, metav1.GetOptions{}) if err == nil { - return nil + // update + event.Count++ + event.LastTimestamp = metav1.Now() + _, err = eventsClient.Update(context.Background(), event, metav1.UpdateOptions{}) + if err == nil { + return nil + } + } else if k8serrors.IsNotFound(err) { + // create + event = newEvent(reason, message) + _, err = eventsClient.Create(context.Background(), event, metav1.CreateOptions{}) + if err == nil { + return nil + } } - time.Sleep(sendEventRetryInterval) + time.Sleep(retryInterval) } - return err + return errors.Wrapf(err, "failed to handle event after %d attempts", maxRetryAttempts) } func getK8sClientSet() (*kubernetes.Clientset, error) { @@ -114,3 +135,9 @@ func getK8sClientSet() (*kubernetes.Clientset, error) { } return clientSet, nil } + +func generateEventName(reason, message string) string { + hash := fnv.New32a() + hash.Write([]byte(fmt.Sprintf("%s.%s.%s", podName(), reason, message))) + return fmt.Sprintf("%s.%x", podName(), hash.Sum32()) +}