diff --git a/cmds/modules/noded/main.go b/cmds/modules/noded/main.go index 1ece7b26c..86e487a40 100644 --- a/cmds/modules/noded/main.go +++ b/cmds/modules/noded/main.go @@ -250,12 +250,11 @@ func action(cli *cli.Context) error { return nil, errors.Wrapf(err, "failed to unmarshal payload: %v", payload) } - if taskName == "" { - return perfMon.GetAll() - } - return perfMon.Get(taskName) }) + bus.WithHandler("zos.perf.get_all", func(ctx context.Context, payload []byte) (interface{}, error) { + return perfMon.GetAll() + }) // answer calls for dmi go func() { diff --git a/pkg/perf/cache.go b/pkg/perf/cache.go index 352603a1a..98ba81367 100644 --- a/pkg/perf/cache.go +++ b/pkg/perf/cache.go @@ -13,13 +13,18 @@ const ( moduleName = "perf" ) +var ( + ErrResultNotFound = errors.New("result not found") +) + // TaskResult the result test schema type TaskResult struct { - TaskName string `json:"task_name"` + Name string `json:"name"` Timestamp uint64 `json:"timestamp"` Result interface{} `json:"result"` } +// generateKey is helper method to add moduleName as prefix for the taskName func generateKey(taskName string) string { return fmt.Sprintf("%s.%s", moduleName, taskName) } @@ -34,22 +39,23 @@ func (pm *PerformanceMonitor) setCache(ctx context.Context, result TaskResult) e conn := pm.pool.Get() defer conn.Close() - _, err = conn.Do("SET", generateKey(result.TaskName), data) + _, err = conn.Do("SET", generateKey(result.Name), data) return err } -// Get gets data from redis -func (pm *PerformanceMonitor) Get(taskName string) (TaskResult, error) { +// get directly gets result for some key +func get(conn redis.Conn, key string) (TaskResult, error) { var res TaskResult - conn := pm.pool.Get() - defer conn.Close() - - data, err := conn.Do("GET", generateKey(taskName)) + data, err := conn.Do("GET", key) if err != nil { return res, errors.Wrap(err, "failed to get the result") } + if data == nil { + return res, ErrResultNotFound + } + err = json.Unmarshal(data.([]byte), &res) if err != nil { return res, errors.Wrap(err, "failed to unmarshal data from json") @@ -58,6 +64,14 @@ func (pm *PerformanceMonitor) Get(taskName string) (TaskResult, error) { return res, nil } +// Get gets data from redis +func (pm *PerformanceMonitor) Get(taskName string) (TaskResult, error) { + conn := pm.pool.Get() + defer conn.Close() + return get(conn, generateKey(taskName)) +} + +// GetAll gets the results for all the tests with moduleName as prefix func (pm *PerformanceMonitor) GetAll() ([]TaskResult, error) { var res []TaskResult @@ -79,18 +93,10 @@ func (pm *PerformanceMonitor) GetAll() ([]TaskResult, error) { } for _, key := range keys { - value, err := conn.Do("GET", key) + result, err := get(conn, key) if err != nil { continue } - - var result TaskResult - - err = json.Unmarshal(value.([]byte), &result) - if err != nil { - return res, errors.Wrap(err, "failed to unmarshal data from json") - } - res = append(res, result) } @@ -101,3 +107,15 @@ func (pm *PerformanceMonitor) GetAll() ([]TaskResult, error) { } return res, nil } + +// exists check if a key exists +func (pm *PerformanceMonitor) exists(key string) (bool, error) { + conn := pm.pool.Get() + defer conn.Close() + + ok, err := redis.Bool(conn.Do("EXISTS", generateKey(key))) + if err != nil { + return false, errors.Wrapf(err, "error checking if key %s exists", generateKey(key)) + } + return ok, nil +} diff --git a/pkg/perf/monitor.go b/pkg/perf/monitor.go index 01bb57704..9afe28f28 100644 --- a/pkg/perf/monitor.go +++ b/pkg/perf/monitor.go @@ -39,29 +39,46 @@ func (pm *PerformanceMonitor) AddTask(task Task) { pm.tasks = append(pm.tasks, task) } +// runTask runs the task and store its result +func (pm *PerformanceMonitor) runTask(ctx context.Context, task Task) error { + res, err := task.Run(ctx) + if err != nil { + return errors.Wrapf(err, "failed to run task: %s", task.ID()) + } + + err = pm.setCache(ctx, TaskResult{ + Name: task.ID(), + Timestamp: uint64(time.Now().Unix()), + Result: res, + }) + if err != nil { + return errors.Wrap(err, "failed to set cache") + } + + return nil +} + // Run adds the tasks to the corn queue and start the scheduler func (pm *PerformanceMonitor) Run(ctx context.Context) error { for _, task := range pm.tasks { _, err := pm.scheduler.CronWithSeconds(task.Cron()).Do(func() error { - res, err := task.Run(ctx) - if err != nil { - return errors.Wrapf(err, "failed to run task: %s", task.ID()) - } - - err = pm.setCache(ctx, TaskResult{ - TaskName: task.ID(), - Timestamp: uint64(time.Now().Unix()), - Result: res, - }) - if err != nil { - return errors.Wrap(err, "failed to set cache") - } - - return nil + return pm.runTask(ctx, task) }) if err != nil { return errors.Wrapf(err, "failed to schedule the task: %s", task.ID()) } + + ok, err := pm.exists(task.ID()) + if err != nil { + return errors.Wrapf(err, "failed to find key %s", task.ID()) + } + + if !ok { + if err := pm.runTask(ctx, task); err != nil { + return errors.Wrapf(err, "failed to run task: %s", task.ID()) + } + } + } pm.scheduler.StartAsync()