Skip to content

Commit

Permalink
chore: merge same events for kbagent (#8445)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ursasi authored Dec 2, 2024
1 parent 808f253 commit d9fda81
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 21 deletions.
2 changes: 2 additions & 0 deletions deploy/helm/templates/rbac/cluster_pod_required_role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ rules:
- events
verbs:
- create
- get
- update
- apiGroups:
- ""
resources:
Expand Down
10 changes: 5 additions & 5 deletions pkg/kbagent/service/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package service
import (
"bytes"
"context"
"fmt"
"io"
"os"
"os/exec"
Expand Down Expand Up @@ -66,12 +67,11 @@ func runCommand(ctx context.Context, action *proto.ExecAction, parameters map[st
if err != nil {
var exitErr *exec.ExitError
if errors.As(err, &exitErr) {
stderrMsg := result.stderr.String()
if len(stderrMsg) > 0 {
err = errors.Wrapf(proto.ErrFailed, "exec exit %d and stderr: %s", exitErr.ExitCode(), stderrMsg)
} else {
err = errors.Wrapf(proto.ErrFailed, "exec exit %d but stderr is blank", exitErr.ExitCode())
errMsg := fmt.Sprintf("exit code: %d", exitErr.ExitCode())
if stderrMsg := result.stderr.String(); len(stderrMsg) > 0 {
errMsg += fmt.Sprintf(", stderr: %s", stderrMsg)
}
return nil, errors.Wrapf(proto.ErrFailed, errMsg)
}
return nil, err
}
Expand Down
59 changes: 43 additions & 16 deletions pkg/kbagent/util/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,33 @@ package util
import (
"context"
"fmt"
"hash/fnv"
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/kubernetes"
ctlruntime "sigs.k8s.io/controller-runtime"

"github.com/apecloud/kubeblocks/pkg/kbagent/proto"
)

const (
sendEventMaxAttempts = 30
sendEventRetryInterval = 10 * time.Second
maxRetryAttempts = 30
retryInterval = 10 * time.Second
)

func SendEventWithMessage(logger *logr.Logger, reason string, message string, sync bool) error {
send := func() error {
event := createEvent(reason, message)
err := sendEvent(event)
err := createOrUpdateEvent(reason, message)
if logger != nil && err != nil {
logger.Error(err, fmt.Sprintf("send event failed, reason: %s, message: %s", reason, message))
logger.Error(err, "failed to send event",
"reason", reason,
"message", message)
}
return err
}
Expand All @@ -59,10 +61,11 @@ func SendEventWithMessage(logger *logr.Logger, reason string, message string, sy
return nil
}

func createEvent(reason string, message string) *corev1.Event {
func newEvent(reason string, message string) *corev1.Event {
now := metav1.Now()
return &corev1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s.%s", podName(), rand.String(16)),
Name: generateEventName(reason, message),
Namespace: namespace(),
},
InvolvedObject: corev1.ObjectReference{
Expand All @@ -78,29 +81,47 @@ func createEvent(reason string, message string) *corev1.Event {
Component: proto.ProbeEventSourceComponent,
Host: nodeName(),
},
FirstTimestamp: metav1.Now(),
LastTimestamp: metav1.Now(),
FirstTimestamp: now,
LastTimestamp: now,
EventTime: metav1.NowMicro(),
ReportingController: proto.ProbeEventReportingController,
ReportingInstance: podName(),
Action: reason,
Type: "Normal",
Count: 1,
}
}

func sendEvent(event *corev1.Event) error {
func createOrUpdateEvent(reason, message string) error {
clientSet, err := getK8sClientSet()
if err != nil {
return err
}
for i := 0; i < sendEventMaxAttempts; i++ {
_, err = clientSet.CoreV1().Events(namespace()).Create(context.Background(), event, metav1.CreateOptions{})
eventsClient := clientSet.CoreV1().Events(namespace())
eventName := generateEventName(reason, message)

var event *corev1.Event
for i := 0; i < maxRetryAttempts; i++ {
event, err = eventsClient.Get(context.Background(), eventName, metav1.GetOptions{})
if err == nil {
return nil
// update
event.Count++
event.LastTimestamp = metav1.Now()
_, err = eventsClient.Update(context.Background(), event, metav1.UpdateOptions{})
if err == nil {
return nil
}
} else if k8serrors.IsNotFound(err) {
// create
event = newEvent(reason, message)
_, err = eventsClient.Create(context.Background(), event, metav1.CreateOptions{})
if err == nil {
return nil
}
}
time.Sleep(sendEventRetryInterval)
time.Sleep(retryInterval)
}
return err
return errors.Wrapf(err, "failed to handle event after %d attempts", maxRetryAttempts)
}

func getK8sClientSet() (*kubernetes.Clientset, error) {
Expand All @@ -114,3 +135,9 @@ func getK8sClientSet() (*kubernetes.Clientset, error) {
}
return clientSet, nil
}

func generateEventName(reason, message string) string {
hash := fnv.New32a()
hash.Write([]byte(fmt.Sprintf("%s.%s.%s", podName(), reason, message)))
return fmt.Sprintf("%s.%x", podName(), hash.Sum32())
}

0 comments on commit d9fda81

Please sign in to comment.