From 7c04ee829047f714f5e92b153c83a41b43efc9a9 Mon Sep 17 00:00:00 2001 From: Sveinn Date: Thu, 10 Oct 2024 18:52:50 +0000 Subject: [PATCH] Adjustments to error handling and stat command (#16) --- .gitignore | 2 - README.md | 4 +- client/client.go | 25 ++++++++- cmd/hperf/main.go | 5 ++ cmd/hperf/stat.go | 3 + server/server.go | 139 +++++++++++++++++++++++++--------------------- shared/shared.go | 1 + 7 files changed, 110 insertions(+), 69 deletions(-) diff --git a/.gitignore b/.gitignore index 902caa3..1521c8b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1 @@ -./hperf dist -hperf diff --git a/README.md b/README.md index 5e7d9ed..c153ae6 100644 --- a/README.md +++ b/README.md @@ -86,7 +86,9 @@ NOTE: Be careful not to re-use the ID's if you care about fetching results at a ```bash # get test results -./hperf get --hosts 1.1.1.{1...100} --id [my_test_id] +./hperf stat --hosts 1.1.1.{1...100} --id [my_test_id] +# save test results +./hperf stat --hosts 1.1.1.{1...100} --id [my_test_id] --output /tmp/file # listen in on a running test ./hperf listen --hosts 1.1.1.{1...100} --id [my_test_id] diff --git a/client/client.go b/client/client.go index 1f2ece7..2cf24f7 100644 --- a/client/client.go +++ b/client/client.go @@ -23,6 +23,7 @@ import ( "errors" "fmt" "net/http" + "os" "runtime/debug" "slices" "strconv" @@ -206,7 +207,7 @@ func handleWSConnection(ctx context.Context, c *shared.Config, host string, id i case shared.ListTests: go parseTestList(signal.TestList) case shared.GetTest: - go receiveJSONDataPoint(signal.Data) + go receiveJSONDataPoint(signal.Data, c) case shared.Err: go PrintErrorString(signal.Error) case shared.Done: @@ -231,7 +232,7 @@ func PrintError(err error) { fmt.Println(ErrorStyle.Render("ERROR: ", err.Error())) } -func receiveJSONDataPoint(data []byte) { +func receiveJSONDataPoint(data []byte, c *shared.Config) { responseLock.Lock() defer responseLock.Unlock() @@ -430,6 +431,26 @@ func GetTest(ctx context.Context, c shared.Config) (err error) { } }) + if c.Output != "" { + f, err := os.Create(c.Output) + if err != nil { + return err + } + for i := range responseDPS { + outb, err := json.Marshal(responseDPS[i]) + if err != nil { + PrintError(err) + continue + } + _, err = f.Write(append(outb, []byte{10}...)) + if err != nil { + return err + } + } + + return nil + } + printDataPointHeaders(responseDPS[0].Type) for i := range responseDPS { dp := responseDPS[i] diff --git a/cmd/hperf/main.go b/cmd/hperf/main.go index 528dd55..864e0d4 100644 --- a/cmd/hperf/main.go +++ b/cmd/hperf/main.go @@ -134,6 +134,10 @@ var ( Name: "id", Usage: "specify custom ID per test", } + outputFlag = cli.StringFlag{ + Name: "output", + Usage: "set output file path/name", + } saveTestFlag = cli.BoolTFlag{ Name: "save", EnvVar: "HPERF_SAVE", @@ -234,6 +238,7 @@ func parseConfig(ctx *cli.Context) (*shared.Config, error) { TestID: ctx.String(testIDFlag.Name), RestartOnError: ctx.BoolT(restartOnErrorFlag.Name), Hosts: hosts, + Output: ctx.String(outputFlag.Name), } if ctx.String("id") == "" { diff --git a/cmd/hperf/stat.go b/cmd/hperf/stat.go index 044a369..2fbd7bd 100644 --- a/cmd/hperf/stat.go +++ b/cmd/hperf/stat.go @@ -31,6 +31,7 @@ var statTestsCMD = cli.Command{ hostsFlag, portFlag, testIDFlag, + outputFlag, }, CustomHelpTemplate: `NAME: {{.HelpName}} - {{.Usage}} @@ -44,6 +45,8 @@ FLAGS: EXAMPLES: 1. Print stats by ID for hosts '10.10.10.1' and '10.10.10.2': {{.Prompt}} {{.HelpName}} --hosts 10.10.10.1,10.10.10.2 --id my_test_id + 2. Save stats by ID for hosts '10.10.10.1' and '10.10.10.2': + {{.Prompt}} {{.HelpName}} --hosts 10.10.10.1,10.10.10.2 --id my_test_id --output /tmp/output-file `, } diff --git a/server/server.go b/server/server.go index a4bef59..7c1420c 100644 --- a/server/server.go +++ b/server/server.go @@ -30,6 +30,7 @@ import ( "os" "runtime" "runtime/debug" + "slices" "strconv" "strings" "sync" @@ -38,6 +39,7 @@ import ( "github.com/gofiber/contrib/websocket" "github.com/gofiber/fiber/v2" + "github.com/google/uuid" "github.com/minio/hperf/shared" "github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/mem" @@ -69,24 +71,31 @@ type test struct { Readers []*netPerfReader errors []shared.TError + errMap map[string]struct{} errIndex atomic.Int32 DPS []shared.DP M sync.Mutex DataFile *os.File DataFileIndex int + cons map[string]*websocket.Conn } -func (t *test) AddError(err error) { +func (t *test) AddError(err error, id string) { + t.M.Lock() + defer t.M.Unlock() if err == nil { return } + _, ok := t.errMap[id] + if ok { + return + } if t.Config.Debug { fmt.Println("ERR:", err) } - t.M.Lock() - defer t.M.Unlock() t.errors = append(t.errors, shared.TError{Error: err.Error(), Created: time.Now()}) + t.errMap[id] = struct{}{} } func RunServer(ctx context.Context, address string, storagePath string) (err error) { @@ -359,6 +368,8 @@ func newTest(c *shared.Config) (t *test, err error) { defer testLock.Unlock() t = new(test) + t.errMap = make(map[string]struct{}) + t.cons = make(map[string]*websocket.Conn) t.Started = time.Now() t.Config = *c t.DPS = make([]shared.DP, 0) @@ -477,7 +488,7 @@ func createAndRunTest(con *websocket.Conn, signal shared.WebsocketSignal) { go startPerformanceReader(test, test.Readers[i]) } - var paginator DataPointPaginator + listenToLiveTests(con, signal) for { if test.ctx.Err() != nil { return @@ -490,34 +501,24 @@ func createAndRunTest(con *websocket.Conn, signal shared.WebsocketSignal) { if signal.Config.Debug { fmt.Println("Duration: ", signal.Config.TestID, time.Since(start).Seconds()) } + generateDataPoints(test) - if con != nil { - _, paginator = sendDataResponseToWebsocket(con, test, paginator) - } + _ = sendAndSaveData(test) } } func listenToLiveTests(con *websocket.Conn, s shared.WebsocketSignal) { - var paginator DataPointPaginator - var err error - paginator.After = time.Now() - for { - time.Sleep(1 * time.Second) - for i := range tests { - if time.Since(tests[i].Started).Seconds() > float64(tests[i].Config.Duration) { - continue - } - if s.Config.TestID != "" && tests[i].ID != s.Config.TestID { - continue - } - if s.Config.Debug { - fmt.Println("Listen:", tests[i].ID, "DPS:", len(tests[i].DPS), "ERR:", len(tests[i].errors)) - } - err, paginator = sendDataResponseToWebsocket(con, tests[i], paginator) - if err != nil { - return - } + uid := uuid.NewString() + + for i := range tests { + if s.Config.TestID != "" && tests[i].ID != s.Config.TestID { + continue } + if s.Config.Debug { + fmt.Println("Listen:", tests[i].ID, "DPS:", len(tests[i].DPS), "ERR:", len(tests[i].errors)) + } + + tests[i].cons[uid] = con } } @@ -527,45 +528,67 @@ type DataPointPaginator struct { After time.Time } -func sendDataResponseToWebsocket(con *websocket.Conn, t *test, lastPaginator DataPointPaginator) (err error, Paginator DataPointPaginator) { +func sendAndSaveData(t *test) (err error) { + defer func() { + r := recover() + if r != nil { + log.Println(r, string(debug.Stack())) + } + }() + wss := new(shared.WebsocketSignal) wss.SType = shared.Stats dataResponse := new(shared.DataReponseToClient) + if t.DataFile == nil && t.Config.Save { + newTestFile(t) + } + for i := range t.DPS { - if i <= lastPaginator.DPIndex { - continue - } - if !lastPaginator.After.IsZero() { - if t.DPS[i].Created.Before(lastPaginator.After) { - continue + dataResponse.DPS = append(dataResponse.DPS, t.DPS[i]) + if t.Config.Save { + fileb, err := json.Marshal(t.DPS[i]) + if err != nil { + t.AddError(err, "datapoint-marshaling") } + t.DataFile.Write(append(fileb, []byte{10}...)) } - dataResponse.DPS = append(dataResponse.DPS, t.DPS[i]) - Paginator.DPIndex = i + t.DPS = slices.Delete(t.DPS, i, i+1) } for i := range t.errors { - if i <= lastPaginator.ErrIndex { - continue - } - if !lastPaginator.After.IsZero() { - if t.errors[i].Created.Before(lastPaginator.After) { - continue + dataResponse.Errors = append(dataResponse.Errors, t.errors[i]) + if t.Config.Save { + fileb, err := json.Marshal(t.errors[i]) + if err != nil { + t.AddError(err, "error-marshaling") } + t.DataFile.Write(append(fileb, []byte{10}...)) } - dataResponse.Errors = append(dataResponse.Errors, t.errors[i]) - Paginator.ErrIndex = i + t.M.Lock() + t.errors = slices.Delete(t.errors, i, i+1) + t.M.Unlock() } + t.M.Lock() + t.errMap = make(map[string]struct{}) + t.M.Unlock() + wss.DataPoint = dataResponse - err = con.WriteJSON(wss) - if err != nil { - if t.Config.Debug { - fmt.Println("Unable to send data point:", err) + for i := range t.cons { + if t.cons[i] == nil { + continue + } + + err = t.cons[i].WriteJSON(wss) + if err != nil { + if t.Config.Debug { + fmt.Println("Unable to send data point:", err) + } + t.cons[i].Close() + delete(t.cons, i) + continue } - con.Close() - con = nil } return } @@ -609,18 +632,6 @@ func generateDataPoints(t *test) { r.m.Unlock() t.DPS = append(t.DPS, d) - - if t.Config.Save { - fileb, err := json.Marshal(d) - if err != nil { - t.AddError(err) - } - if t.DataFile == nil { - newTestFile(t) - } - t.DataFile.Write(append(fileb, []byte{10}...)) - } - } return } @@ -727,7 +738,7 @@ func sendRequestToHost(t *test, r *netPerfReader, cid int) { route = "/http" body = AR default: - t.AddError(fmt.Errorf("Unknown test type: %d", t.Config.TestType)) + t.AddError(fmt.Errorf("Unknown test type: %d", t.Config.TestType), "unknown-signal") } req, err = http.NewRequestWithContext( @@ -748,12 +759,12 @@ func sendRequestToHost(t *test, r *netPerfReader, cid int) { if errors.Is(err, context.Canceled) { return } - t.AddError(err) + t.AddError(err, "network-error") return } if resp.StatusCode != http.StatusOK { - t.AddError(fmt.Errorf("Status code was %d, expected 200 from host %s", resp.StatusCode, r.addr)) + t.AddError(fmt.Errorf("Status code was %d, expected 200 from host %s", resp.StatusCode, r.addr), "invalid-status-code") return } diff --git a/shared/shared.go b/shared/shared.go index 347c40e..8361b91 100644 --- a/shared/shared.go +++ b/shared/shared.go @@ -151,6 +151,7 @@ type Config struct { Save bool `json:"Save"` Insecure bool `json:"Insecure"` TestType TestType `json:"TestType"` + Output string `json:"Output"` // AllowLocalInterface bool `json:"AllowLocalInterfaces"` // Client Only