-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
performance monitor package #2046
Changes from 3 commits
1e99a71
7bd548d
51fc43d
1483702
91ce438
9369efe
21b359f
a79f0c7
fef5daf
b17cb6d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -3,6 +3,7 @@ package noded | |||||||||||||||||
import ( | ||||||||||||||||||
"context" | ||||||||||||||||||
"crypto/ed25519" | ||||||||||||||||||
"encoding/json" | ||||||||||||||||||
"fmt" | ||||||||||||||||||
"os/exec" | ||||||||||||||||||
"strings" | ||||||||||||||||||
|
@@ -19,6 +20,7 @@ import ( | |||||||||||||||||
"github.com/threefoldtech/zos/pkg/environment" | ||||||||||||||||||
"github.com/threefoldtech/zos/pkg/events" | ||||||||||||||||||
"github.com/threefoldtech/zos/pkg/monitord" | ||||||||||||||||||
"github.com/threefoldtech/zos/pkg/perf" | ||||||||||||||||||
"github.com/threefoldtech/zos/pkg/registrar" | ||||||||||||||||||
"github.com/threefoldtech/zos/pkg/stubs" | ||||||||||||||||||
"github.com/threefoldtech/zos/pkg/utils" | ||||||||||||||||||
|
@@ -193,6 +195,27 @@ func action(cli *cli.Context) error { | |||||||||||||||||
return hypervisor, nil | ||||||||||||||||||
}) | ||||||||||||||||||
|
||||||||||||||||||
log.Info().Msg("start perf scheduler") | ||||||||||||||||||
perfMon, err := perf.NewPerformanceMonitor("unix://var/run/redis.sock") | ||||||||||||||||||
if err != nil { | ||||||||||||||||||
return errors.Wrap(err, "failed to create a new perfMon") | ||||||||||||||||||
} | ||||||||||||||||||
defer perfMon.Close() | ||||||||||||||||||
|
||||||||||||||||||
err = perfMon.Run(ctx) | ||||||||||||||||||
if err != nil { | ||||||||||||||||||
log.Error().Err(err).Msg("failed to run the scheduler") | ||||||||||||||||||
} | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Also i think it's better if we return the error not just log |
||||||||||||||||||
bus.WithHandler("zos.perf.get", func(ctx context.Context, payload []byte) (interface{}, error) { | ||||||||||||||||||
var taskName string | ||||||||||||||||||
err := json.Unmarshal(payload, &taskName) | ||||||||||||||||||
if err != nil { | ||||||||||||||||||
return nil, errors.Wrapf(err, "failed to unmarshal payload: %v", payload) | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
return perfMon.GetCache(taskName) | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the API need to support returning all available tests, or specific one by name. so this function need to support not having a Also the |
||||||||||||||||||
}) | ||||||||||||||||||
|
||||||||||||||||||
// answer calls for dmi | ||||||||||||||||||
go func() { | ||||||||||||||||||
if err := bus.Run(ctx); err != nil { | ||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
# Perf | ||
|
||
Perf is a performance test module that is scheduled to run and cache those tests results in redis which can be retrieved later over RMB call. | ||
|
||
Perf tests are monitored by `noded` service from zos modules. | ||
|
||
### Usage Example | ||
|
||
1. Create a task that implement `Task` interface | ||
|
||
```go | ||
type LoggingTask struct { | ||
TaskID string | ||
Schedule string // should be in cron syntax with seconds ("* 0 * * * *") | ||
} | ||
|
||
func (t *LoggingTask) ID() string { | ||
return t.TaskID | ||
} | ||
|
||
func (t *LoggingTask) Cron() string { | ||
return t.Schedule | ||
} | ||
|
||
// a simple task that returns a string with the current time | ||
func (t *LoggingTask) Run(ctx context.Context) (interface{}, error) { | ||
result := fmt.Sprintf("time is: %v", time.Now()) | ||
return result, nil | ||
} | ||
``` | ||
|
||
2. Add the created task to scheduler | ||
|
||
```go | ||
perfMon.AddTask(&perf.LoggingTask{ | ||
TaskID: "LoggingTask", | ||
Schedule: "* 0 * * * *", // when minutes is 0 (aka: every hour) | ||
}) | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
package perf | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
|
||
"github.com/pkg/errors" | ||
) | ||
|
||
// TaskResult the result test schema | ||
type TaskResult struct { | ||
TaskName string `json:"task_name"` | ||
RunTimestamp string `json:"run_timestamp"` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please change this to an actual unix timpestamp uint64. Storing time as a string is the most inefficient way to store time. |
||
Result interface{} `json:"result"` | ||
} | ||
|
||
// setCache set result in redis | ||
func (pm *PerformanceMonitor) setCache(ctx context.Context, taskName string, result TaskResult) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do u pass taskName when the TaskResult already have this? |
||
data, err := json.Marshal(result) | ||
if err != nil { | ||
return errors.Wrap(err, "failed to marshal data to JSON") | ||
} | ||
|
||
_, err = pm.redisConn.Do("SET", taskName, data) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using the task name alone as a key is not a good idea. The same redis instance is used by other apps on the system to store and queue data. Hence you need to create some sort of a Having a prefix also makes it easier for range scan. For example if u wanna return ALL tests u can scan KEYS with |
||
return err | ||
} | ||
|
||
// GetCache get data from redis | ||
func (pm *PerformanceMonitor) GetCache(taskName string) (TaskResult, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As commented before, the GetCache is not correct. The performance framework internal details (how tests are run or how results are retrieved) is only an implementation details. The PerformanceMonitor struct then API should be In other words, why a user of this framework know that the result is cached for example |
||
var res TaskResult | ||
|
||
data, err := pm.redisConn.Do("GET", taskName) | ||
if err != nil { | ||
return res, errors.Wrap(err, "failed to get the cached result") | ||
} | ||
|
||
err = json.Unmarshal(data.([]byte), &res) | ||
if err != nil { | ||
return res, errors.Wrap(err, "failed to unmarshal data from json") | ||
} | ||
|
||
return res, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
package perf | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
"github.com/go-co-op/gocron" | ||
"github.com/gomodule/redigo/redis" | ||
|
||
"github.com/pkg/errors" | ||
"github.com/threefoldtech/zos/pkg/utils" | ||
) | ||
|
||
// PerformanceMonitor holds the module data | ||
type PerformanceMonitor struct { | ||
scheduler *gocron.Scheduler | ||
redisConn redis.Conn | ||
tasks []Task | ||
} | ||
|
||
// NewPerformanceMonitor returns PerformanceMonitor instance | ||
func NewPerformanceMonitor(redisAddr string) (*PerformanceMonitor, error) { | ||
redisPool, err := utils.NewRedisPool(redisAddr) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "failed creating new redis pool") | ||
} | ||
|
||
redisConn := redisPool.Get() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not how u use the redis Pool. The idea of a Pool of connection that u Get a connection from the pool, use it and then close it immediately when you are done using it. Closing a connection will return the connection to the pool (the pool might not close the connection) but keep it alive for the next one to need a connection (by calling the pool.Get()) So what u are doing is breaking the pooling contract by hording a connection forever. This is also really bad because if the connection is interrupted (or lost because of any reason) there is NO way that u renew the connection. The pool on the other hand tests the connection before it give u one when u do a Get, and renew it if needed. In summary, what u need to do is to keep the pool on you structure (not a connection) and in you setCache and Get methods what u need to do is con := pm.pool.Get()
defer con.Clonse()
// use the con to get or set |
||
scheduler := gocron.NewScheduler(time.UTC) | ||
|
||
return &PerformanceMonitor{ | ||
scheduler: scheduler, | ||
redisConn: redisConn, | ||
tasks: []Task{}, | ||
}, nil | ||
} | ||
|
||
// AddTask a simple helper method to add new tasks | ||
func (pm *PerformanceMonitor) AddTask(task Task) { | ||
pm.tasks = append(pm.tasks, task) | ||
} | ||
|
||
// Run adds the tasks to the corn queue and start the scheduler | ||
func (pm *PerformanceMonitor) Run(ctx context.Context) error { | ||
for _, task := range pm.tasks { | ||
_, err := pm.scheduler.CronWithSeconds(task.Cron()).Do(func() error { | ||
res, err := task.Run(ctx) | ||
if err != nil { | ||
return errors.Wrapf(err, "failed to run task: %s", task.ID()) | ||
} | ||
|
||
err = pm.setCache(ctx, task.ID(), TaskResult{ | ||
TaskName: task.ID(), | ||
RunTimestamp: time.Now().Format("2006-01-02 15:04:05"), | ||
Result: res, | ||
}) | ||
if err != nil { | ||
return errors.Wrap(err, "failed to set cache") | ||
} | ||
|
||
return nil | ||
}) | ||
if err != nil { | ||
return errors.Wrapf(err, "failed to schedule the task: %s", task.ID()) | ||
} | ||
} | ||
|
||
pm.scheduler.StartAsync() | ||
return nil | ||
} | ||
|
||
// Close closes the redis connection | ||
func (pm *PerformanceMonitor) Close() { | ||
pm.redisConn.Close() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
package perf | ||
|
||
import ( | ||
"context" | ||
) | ||
|
||
type Task interface { | ||
ID() string | ||
Cron() string | ||
Run(ctx context.Context) (interface{}, error) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't be hard coded, specially that we already have a command line argument for that