From 7dca1794dbefca87b57c0166f6f7851581836161 Mon Sep 17 00:00:00 2001 From: kongfei605 Date: Sun, 29 Sep 2024 19:17:50 +0800 Subject: [PATCH] chore: report global labels with heartbeat (#1066) * chore: report global labels with heartbeat * chore: sort import packages * fix: lint error --- heartbeat/heartbeat.go | 50 ++++++++++++++++++- inputs/apache/apache.go | 2 +- .../runtime/compiler/errors/errors.go | 5 +- .../internal/tailer/logstream/filestream.go | 2 +- .../internal/tailer/logstream/logstream.go | 2 +- 5 files changed, 55 insertions(+), 6 deletions(-) diff --git a/heartbeat/heartbeat.go b/heartbeat/heartbeat.go index 9de0937e..4cc99829 100644 --- a/heartbeat/heartbeat.go +++ b/heartbeat/heartbeat.go @@ -8,18 +8,33 @@ import ( "log" "net" "net/http" + "os" + osExec "os/exec" "runtime" "strconv" "strings" "time" + cpuUtil "github.com/shirou/gopsutil/v3/cpu" + "flashcat.cloud/categraf/config" "flashcat.cloud/categraf/inputs/system" - cpuUtil "github.com/shirou/gopsutil/v3/cpu" + "flashcat.cloud/categraf/pkg/cmdx" ) const collinterval = 3 +type ( + HeartbeatResponse struct { + Data UpdateInfo `json:"dat"` + Msg string `json:"err"` + } + UpdateInfo struct { + NewVersion string `json:"new_version"` + UpdateURL string `json:"download_url"` + } +) + func Work() { conf := config.Config.Heartbeat @@ -114,6 +129,7 @@ func work(ps *system.SystemPS, client *http.Client) { "cpu_util": cpuUsagePercent, "mem_util": memUsagePercent, "unixtime": time.Now().UnixMilli(), + "global_labels": config.GlobalLabels(), "host_ip": hostIP, } @@ -191,6 +207,38 @@ func work(ps *system.SystemPS, client *http.Client) { if debug() { log.Println("D! heartbeat response:", string(bs), "status code:", res.StatusCode) } + + hr := HeartbeatResponse{} + err = json.Unmarshal(bs, &hr) + if err != nil { + log.Println("W! failed to unmarshal heartbeat response:", err) + return + } + if len(hr.Data.NewVersion) != 0 && len(hr.Data.UpdateURL) != 0 && hr.Data.NewVersion != shortVersion && hr.Data.NewVersion != config.Version { + var ( + out bytes.Buffer + stderr bytes.Buffer + ) + exe, err := os.Executable() + if err != nil { + log.Println("E! failed to get current executable:", err) + return + } + cmd := osExec.Command(exe, "-update", "-update_url", hr.Data.UpdateURL) + cmd.Stdout = &out + cmd.Stderr = &stderr + err, timeout := cmdx.RunTimeout(cmd, time.Second*300) + if timeout { + log.Printf("E! exec %s timeout", cmd.String()) + return + } + if err != nil { + log.Println("E! failed to update categraf:", err, "stderr:", stderr.String(), "stdout:", + out.String(), "command:", cmd.String()) + return + } + log.Printf("update categraf(%s) from %s success, new version: %s", version(), hr.Data.UpdateURL, hr.Data.NewVersion) + } } func memUsage(ps *system.SystemPS) float64 { diff --git a/inputs/apache/apache.go b/inputs/apache/apache.go index b10462b2..4d8fdf2d 100644 --- a/inputs/apache/apache.go +++ b/inputs/apache/apache.go @@ -82,7 +82,7 @@ func (ins *Instance) Init() error { e, err := exporter.New(logger, &ins.Config) if err != nil { - return fmt.Errorf("could not instantiate mongodb lag exporter: %w", err) + return fmt.Errorf("could not instantiate mongodb lag exporter: %v", err) } ins.e = e diff --git a/inputs/mtail/internal/runtime/compiler/errors/errors.go b/inputs/mtail/internal/runtime/compiler/errors/errors.go index 76882472..3eb08e7c 100644 --- a/inputs/mtail/internal/runtime/compiler/errors/errors.go +++ b/inputs/mtail/internal/runtime/compiler/errors/errors.go @@ -7,8 +7,9 @@ import ( "fmt" "strings" - "flashcat.cloud/categraf/inputs/mtail/internal/runtime/compiler/position" "github.com/pkg/errors" + + "flashcat.cloud/categraf/inputs/mtail/internal/runtime/compiler/position" ) type compileError struct { @@ -26,7 +27,7 @@ type ErrorList []*compileError // Add appends an error at a position to the list of errors. func (p *ErrorList) Add(pos *position.Position, msg string) { if pos == nil { - pos = &position.Position{"", -1, -1, -1} + pos = &position.Position{Filename: "", Line: -1, Startcol: -1, Endcol: -1} } *p = append(*p, &compileError{*pos, msg}) } diff --git a/inputs/mtail/internal/tailer/logstream/filestream.go b/inputs/mtail/internal/tailer/logstream/filestream.go index 9b358d6f..0b58326b 100644 --- a/inputs/mtail/internal/tailer/logstream/filestream.go +++ b/inputs/mtail/internal/tailer/logstream/filestream.go @@ -137,7 +137,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake // common change pattern anyway. newfi, serr := os.Stat(fs.pathname) if serr != nil { - log.Printf("stream(%s): stat error: %v", serr) + log.Printf("stream(%s): stat error: %v", fs.pathname, serr) // If this is a NotExist error, then we should wrap up this // goroutine. The Tailer will create a new logstream if the // file is in the middle of a rotation and gets recreated diff --git a/inputs/mtail/internal/tailer/logstream/logstream.go b/inputs/mtail/internal/tailer/logstream/logstream.go index 59837d36..6a881c84 100644 --- a/inputs/mtail/internal/tailer/logstream/logstream.go +++ b/inputs/mtail/internal/tailer/logstream/logstream.go @@ -69,7 +69,7 @@ func New(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname st if err != nil { return nil, err } - log.Println("Parsed url as %v", u) + log.Printf("Parsed url as %v", u) path := pathname switch u.Scheme {