From 9369efe9ef64b4b79eedbb6fd673abe36efb9b82 Mon Sep 17 00:00:00 2001 From: Omar Abdulaziz Date: Mon, 11 Sep 2023 17:23:01 +0300 Subject: [PATCH 1/2] - add new handler for getAll method - reuse Get code --- cmds/modules/noded/main.go | 7 +++---- pkg/perf/cache.go | 14 +++----------- pkg/perf/monitor.go | 2 +- 3 files changed, 7 insertions(+), 16 deletions(-) diff --git a/cmds/modules/noded/main.go b/cmds/modules/noded/main.go index 02aacf1a3..cef315578 100644 --- a/cmds/modules/noded/main.go +++ b/cmds/modules/noded/main.go @@ -212,12 +212,11 @@ func action(cli *cli.Context) error { return nil, errors.Wrapf(err, "failed to unmarshal payload: %v", payload) } - if taskName == "" { - return perfMon.GetAll() - } - return perfMon.Get(taskName) }) + bus.WithHandler("zos.perf.get_all", func(ctx context.Context, payload []byte) (interface{}, error) { + return perfMon.GetAll() + }) // answer calls for dmi go func() { diff --git a/pkg/perf/cache.go b/pkg/perf/cache.go index 352603a1a..77900a144 100644 --- a/pkg/perf/cache.go +++ b/pkg/perf/cache.go @@ -15,7 +15,7 @@ const ( // TaskResult the result test schema type TaskResult struct { - TaskName string `json:"task_name"` + Name string `json:"name"` Timestamp uint64 `json:"timestamp"` Result interface{} `json:"result"` } @@ -34,7 +34,7 @@ func (pm *PerformanceMonitor) setCache(ctx context.Context, result TaskResult) e conn := pm.pool.Get() defer conn.Close() - _, err = conn.Do("SET", generateKey(result.TaskName), data) + _, err = conn.Do("SET", generateKey(result.Name), data) return err } @@ -79,18 +79,10 @@ func (pm *PerformanceMonitor) GetAll() ([]TaskResult, error) { } for _, key := range keys { - value, err := conn.Do("GET", key) + result, err := pm.Get(key) if err != nil { continue } - - var result TaskResult - - err = json.Unmarshal(value.([]byte), &result) - if err != nil { - return res, errors.Wrap(err, "failed to unmarshal data from json") - } - res = append(res, result) } diff --git a/pkg/perf/monitor.go b/pkg/perf/monitor.go index 01bb57704..f6c89d8e6 100644 --- a/pkg/perf/monitor.go +++ b/pkg/perf/monitor.go @@ -49,7 +49,7 @@ func (pm *PerformanceMonitor) Run(ctx context.Context) error { } err = pm.setCache(ctx, TaskResult{ - TaskName: task.ID(), + Name: task.ID(), Timestamp: uint64(time.Now().Unix()), Result: res, }) From a79f0c791ed8a392c9c477a92cd2755c3243e262 Mon Sep 17 00:00:00 2001 From: Omar Abdulaziz Date: Tue, 12 Sep 2023 21:50:42 +0300 Subject: [PATCH 2/2] run tasks if it has no results stored, fix getAll method --- pkg/perf/cache.go | 40 +++++++++++++++++++++++++++++++------- pkg/perf/monitor.go | 47 ++++++++++++++++++++++++++++++--------------- 2 files changed, 65 insertions(+), 22 deletions(-) diff --git a/pkg/perf/cache.go b/pkg/perf/cache.go index 77900a144..98ba81367 100644 --- a/pkg/perf/cache.go +++ b/pkg/perf/cache.go @@ -13,6 +13,10 @@ const ( moduleName = "perf" ) +var ( + ErrResultNotFound = errors.New("result not found") +) + // TaskResult the result test schema type TaskResult struct { Name string `json:"name"` @@ -20,6 +24,7 @@ type TaskResult struct { Result interface{} `json:"result"` } +// generateKey is helper method to add moduleName as prefix for the taskName func generateKey(taskName string) string { return fmt.Sprintf("%s.%s", moduleName, taskName) } @@ -38,18 +43,19 @@ func (pm *PerformanceMonitor) setCache(ctx context.Context, result TaskResult) e return err } -// Get gets data from redis -func (pm *PerformanceMonitor) Get(taskName string) (TaskResult, error) { +// get directly gets result for some key +func get(conn redis.Conn, key string) (TaskResult, error) { var res TaskResult - conn := pm.pool.Get() - defer conn.Close() - - data, err := conn.Do("GET", generateKey(taskName)) + data, err := conn.Do("GET", key) if err != nil { return res, errors.Wrap(err, "failed to get the result") } + if data == nil { + return res, ErrResultNotFound + } + err = json.Unmarshal(data.([]byte), &res) if err != nil { return res, errors.Wrap(err, "failed to unmarshal data from json") @@ -58,6 +64,14 @@ func (pm *PerformanceMonitor) Get(taskName string) (TaskResult, error) { return res, nil } +// Get gets data from redis +func (pm *PerformanceMonitor) Get(taskName string) (TaskResult, error) { + conn := pm.pool.Get() + defer conn.Close() + return get(conn, generateKey(taskName)) +} + +// GetAll gets the results for all the tests with moduleName as prefix func (pm *PerformanceMonitor) GetAll() ([]TaskResult, error) { var res []TaskResult @@ -79,7 +93,7 @@ func (pm *PerformanceMonitor) GetAll() ([]TaskResult, error) { } for _, key := range keys { - result, err := pm.Get(key) + result, err := get(conn, key) if err != nil { continue } @@ -93,3 +107,15 @@ func (pm *PerformanceMonitor) GetAll() ([]TaskResult, error) { } return res, nil } + +// exists check if a key exists +func (pm *PerformanceMonitor) exists(key string) (bool, error) { + conn := pm.pool.Get() + defer conn.Close() + + ok, err := redis.Bool(conn.Do("EXISTS", generateKey(key))) + if err != nil { + return false, errors.Wrapf(err, "error checking if key %s exists", generateKey(key)) + } + return ok, nil +} diff --git a/pkg/perf/monitor.go b/pkg/perf/monitor.go index f6c89d8e6..9afe28f28 100644 --- a/pkg/perf/monitor.go +++ b/pkg/perf/monitor.go @@ -39,29 +39,46 @@ func (pm *PerformanceMonitor) AddTask(task Task) { pm.tasks = append(pm.tasks, task) } +// runTask runs the task and store its result +func (pm *PerformanceMonitor) runTask(ctx context.Context, task Task) error { + res, err := task.Run(ctx) + if err != nil { + return errors.Wrapf(err, "failed to run task: %s", task.ID()) + } + + err = pm.setCache(ctx, TaskResult{ + Name: task.ID(), + Timestamp: uint64(time.Now().Unix()), + Result: res, + }) + if err != nil { + return errors.Wrap(err, "failed to set cache") + } + + return nil +} + // Run adds the tasks to the corn queue and start the scheduler func (pm *PerformanceMonitor) Run(ctx context.Context) error { for _, task := range pm.tasks { _, err := pm.scheduler.CronWithSeconds(task.Cron()).Do(func() error { - res, err := task.Run(ctx) - if err != nil { - return errors.Wrapf(err, "failed to run task: %s", task.ID()) - } - - err = pm.setCache(ctx, TaskResult{ - Name: task.ID(), - Timestamp: uint64(time.Now().Unix()), - Result: res, - }) - if err != nil { - return errors.Wrap(err, "failed to set cache") - } - - return nil + return pm.runTask(ctx, task) }) if err != nil { return errors.Wrapf(err, "failed to schedule the task: %s", task.ID()) } + + ok, err := pm.exists(task.ID()) + if err != nil { + return errors.Wrapf(err, "failed to find key %s", task.ID()) + } + + if !ok { + if err := pm.runTask(ctx, task); err != nil { + return errors.Wrapf(err, "failed to run task: %s", task.ID()) + } + } + } pm.scheduler.StartAsync()