Skip to content

Commit

Permalink
Adjustments to error handling and stat command (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
zveinn authored Oct 10, 2024
1 parent fbbdfd2 commit 7c04ee8
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 69 deletions.
2 changes: 0 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
./hperf
dist
hperf
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ NOTE: Be careful not to re-use the ID's if you care about fetching results at a

```bash
# get test results
./hperf get --hosts 1.1.1.{1...100} --id [my_test_id]
./hperf stat --hosts 1.1.1.{1...100} --id [my_test_id]
# save test results
./hperf stat --hosts 1.1.1.{1...100} --id [my_test_id] --output /tmp/file

# listen in on a running test
./hperf listen --hosts 1.1.1.{1...100} --id [my_test_id]
Expand Down
25 changes: 23 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"net/http"
"os"
"runtime/debug"
"slices"
"strconv"
Expand Down Expand Up @@ -206,7 +207,7 @@ func handleWSConnection(ctx context.Context, c *shared.Config, host string, id i
case shared.ListTests:
go parseTestList(signal.TestList)
case shared.GetTest:
go receiveJSONDataPoint(signal.Data)
go receiveJSONDataPoint(signal.Data, c)
case shared.Err:
go PrintErrorString(signal.Error)
case shared.Done:
Expand All @@ -231,7 +232,7 @@ func PrintError(err error) {
fmt.Println(ErrorStyle.Render("ERROR: ", err.Error()))
}

func receiveJSONDataPoint(data []byte) {
func receiveJSONDataPoint(data []byte, c *shared.Config) {
responseLock.Lock()
defer responseLock.Unlock()

Expand Down Expand Up @@ -430,6 +431,26 @@ func GetTest(ctx context.Context, c shared.Config) (err error) {
}
})

if c.Output != "" {
f, err := os.Create(c.Output)
if err != nil {
return err
}
for i := range responseDPS {
outb, err := json.Marshal(responseDPS[i])
if err != nil {
PrintError(err)
continue
}
_, err = f.Write(append(outb, []byte{10}...))
if err != nil {
return err
}
}

return nil
}

printDataPointHeaders(responseDPS[0].Type)
for i := range responseDPS {
dp := responseDPS[i]
Expand Down
5 changes: 5 additions & 0 deletions cmd/hperf/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ var (
Name: "id",
Usage: "specify custom ID per test",
}
outputFlag = cli.StringFlag{
Name: "output",
Usage: "set output file path/name",
}
saveTestFlag = cli.BoolTFlag{
Name: "save",
EnvVar: "HPERF_SAVE",
Expand Down Expand Up @@ -234,6 +238,7 @@ func parseConfig(ctx *cli.Context) (*shared.Config, error) {
TestID: ctx.String(testIDFlag.Name),
RestartOnError: ctx.BoolT(restartOnErrorFlag.Name),
Hosts: hosts,
Output: ctx.String(outputFlag.Name),
}

if ctx.String("id") == "" {
Expand Down
3 changes: 3 additions & 0 deletions cmd/hperf/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var statTestsCMD = cli.Command{
hostsFlag,
portFlag,
testIDFlag,
outputFlag,
},
CustomHelpTemplate: `NAME:
{{.HelpName}} - {{.Usage}}
Expand All @@ -44,6 +45,8 @@ FLAGS:
EXAMPLES:
1. Print stats by ID for hosts '10.10.10.1' and '10.10.10.2':
{{.Prompt}} {{.HelpName}} --hosts 10.10.10.1,10.10.10.2 --id my_test_id
2. Save stats by ID for hosts '10.10.10.1' and '10.10.10.2':
{{.Prompt}} {{.HelpName}} --hosts 10.10.10.1,10.10.10.2 --id my_test_id --output /tmp/output-file
`,
}

Expand Down
139 changes: 75 additions & 64 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"os"
"runtime"
"runtime/debug"
"slices"
"strconv"
"strings"
"sync"
Expand All @@ -38,6 +39,7 @@ import (

"github.com/gofiber/contrib/websocket"
"github.com/gofiber/fiber/v2"
"github.com/google/uuid"
"github.com/minio/hperf/shared"
"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/mem"
Expand Down Expand Up @@ -69,24 +71,31 @@ type test struct {

Readers []*netPerfReader
errors []shared.TError
errMap map[string]struct{}
errIndex atomic.Int32
DPS []shared.DP
M sync.Mutex

DataFile *os.File
DataFileIndex int
cons map[string]*websocket.Conn
}

func (t *test) AddError(err error) {
func (t *test) AddError(err error, id string) {
t.M.Lock()
defer t.M.Unlock()
if err == nil {
return
}
_, ok := t.errMap[id]
if ok {
return
}
if t.Config.Debug {
fmt.Println("ERR:", err)
}
t.M.Lock()
defer t.M.Unlock()
t.errors = append(t.errors, shared.TError{Error: err.Error(), Created: time.Now()})
t.errMap[id] = struct{}{}
}

func RunServer(ctx context.Context, address string, storagePath string) (err error) {
Expand Down Expand Up @@ -359,6 +368,8 @@ func newTest(c *shared.Config) (t *test, err error) {
defer testLock.Unlock()

t = new(test)
t.errMap = make(map[string]struct{})
t.cons = make(map[string]*websocket.Conn)
t.Started = time.Now()
t.Config = *c
t.DPS = make([]shared.DP, 0)
Expand Down Expand Up @@ -477,7 +488,7 @@ func createAndRunTest(con *websocket.Conn, signal shared.WebsocketSignal) {
go startPerformanceReader(test, test.Readers[i])
}

var paginator DataPointPaginator
listenToLiveTests(con, signal)
for {
if test.ctx.Err() != nil {
return
Expand All @@ -490,34 +501,24 @@ func createAndRunTest(con *websocket.Conn, signal shared.WebsocketSignal) {
if signal.Config.Debug {
fmt.Println("Duration: ", signal.Config.TestID, time.Since(start).Seconds())
}

generateDataPoints(test)
if con != nil {
_, paginator = sendDataResponseToWebsocket(con, test, paginator)
}
_ = sendAndSaveData(test)
}
}

func listenToLiveTests(con *websocket.Conn, s shared.WebsocketSignal) {
var paginator DataPointPaginator
var err error
paginator.After = time.Now()
for {
time.Sleep(1 * time.Second)
for i := range tests {
if time.Since(tests[i].Started).Seconds() > float64(tests[i].Config.Duration) {
continue
}
if s.Config.TestID != "" && tests[i].ID != s.Config.TestID {
continue
}
if s.Config.Debug {
fmt.Println("Listen:", tests[i].ID, "DPS:", len(tests[i].DPS), "ERR:", len(tests[i].errors))
}
err, paginator = sendDataResponseToWebsocket(con, tests[i], paginator)
if err != nil {
return
}
uid := uuid.NewString()

for i := range tests {
if s.Config.TestID != "" && tests[i].ID != s.Config.TestID {
continue
}
if s.Config.Debug {
fmt.Println("Listen:", tests[i].ID, "DPS:", len(tests[i].DPS), "ERR:", len(tests[i].errors))
}

tests[i].cons[uid] = con
}
}

Expand All @@ -527,45 +528,67 @@ type DataPointPaginator struct {
After time.Time
}

func sendDataResponseToWebsocket(con *websocket.Conn, t *test, lastPaginator DataPointPaginator) (err error, Paginator DataPointPaginator) {
func sendAndSaveData(t *test) (err error) {
defer func() {
r := recover()
if r != nil {
log.Println(r, string(debug.Stack()))
}
}()

wss := new(shared.WebsocketSignal)
wss.SType = shared.Stats
dataResponse := new(shared.DataReponseToClient)

if t.DataFile == nil && t.Config.Save {
newTestFile(t)
}

for i := range t.DPS {
if i <= lastPaginator.DPIndex {
continue
}
if !lastPaginator.After.IsZero() {
if t.DPS[i].Created.Before(lastPaginator.After) {
continue
dataResponse.DPS = append(dataResponse.DPS, t.DPS[i])
if t.Config.Save {
fileb, err := json.Marshal(t.DPS[i])
if err != nil {
t.AddError(err, "datapoint-marshaling")
}
t.DataFile.Write(append(fileb, []byte{10}...))
}
dataResponse.DPS = append(dataResponse.DPS, t.DPS[i])
Paginator.DPIndex = i
t.DPS = slices.Delete(t.DPS, i, i+1)
}

for i := range t.errors {
if i <= lastPaginator.ErrIndex {
continue
}
if !lastPaginator.After.IsZero() {
if t.errors[i].Created.Before(lastPaginator.After) {
continue
dataResponse.Errors = append(dataResponse.Errors, t.errors[i])
if t.Config.Save {
fileb, err := json.Marshal(t.errors[i])
if err != nil {
t.AddError(err, "error-marshaling")
}
t.DataFile.Write(append(fileb, []byte{10}...))
}
dataResponse.Errors = append(dataResponse.Errors, t.errors[i])
Paginator.ErrIndex = i
t.M.Lock()
t.errors = slices.Delete(t.errors, i, i+1)
t.M.Unlock()
}

t.M.Lock()
t.errMap = make(map[string]struct{})
t.M.Unlock()

wss.DataPoint = dataResponse
err = con.WriteJSON(wss)
if err != nil {
if t.Config.Debug {
fmt.Println("Unable to send data point:", err)
for i := range t.cons {
if t.cons[i] == nil {
continue
}

err = t.cons[i].WriteJSON(wss)
if err != nil {
if t.Config.Debug {
fmt.Println("Unable to send data point:", err)
}
t.cons[i].Close()
delete(t.cons, i)
continue
}
con.Close()
con = nil
}
return
}
Expand Down Expand Up @@ -609,18 +632,6 @@ func generateDataPoints(t *test) {
r.m.Unlock()

t.DPS = append(t.DPS, d)

if t.Config.Save {
fileb, err := json.Marshal(d)
if err != nil {
t.AddError(err)
}
if t.DataFile == nil {
newTestFile(t)
}
t.DataFile.Write(append(fileb, []byte{10}...))
}

}
return
}
Expand Down Expand Up @@ -727,7 +738,7 @@ func sendRequestToHost(t *test, r *netPerfReader, cid int) {
route = "/http"
body = AR
default:
t.AddError(fmt.Errorf("Unknown test type: %d", t.Config.TestType))
t.AddError(fmt.Errorf("Unknown test type: %d", t.Config.TestType), "unknown-signal")
}

req, err = http.NewRequestWithContext(
Expand All @@ -748,12 +759,12 @@ func sendRequestToHost(t *test, r *netPerfReader, cid int) {
if errors.Is(err, context.Canceled) {
return
}
t.AddError(err)
t.AddError(err, "network-error")
return
}

if resp.StatusCode != http.StatusOK {
t.AddError(fmt.Errorf("Status code was %d, expected 200 from host %s", resp.StatusCode, r.addr))
t.AddError(fmt.Errorf("Status code was %d, expected 200 from host %s", resp.StatusCode, r.addr), "invalid-status-code")
return
}

Expand Down
1 change: 1 addition & 0 deletions shared/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ type Config struct {
Save bool `json:"Save"`
Insecure bool `json:"Insecure"`
TestType TestType `json:"TestType"`
Output string `json:"Output"`
// AllowLocalInterface bool `json:"AllowLocalInterfaces"`

// Client Only
Expand Down

0 comments on commit 7c04ee8

Please sign in to comment.