Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
rawdaGastan committed Sep 13, 2023
2 parents f5d77ac + a79f0c7 commit 52dafa2
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 36 deletions.
7 changes: 3 additions & 4 deletions cmds/modules/noded/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,11 @@ func action(cli *cli.Context) error {
return nil, errors.Wrapf(err, "failed to unmarshal payload: %v", payload)
}

if taskName == "" {
return perfMon.GetAll()
}

return perfMon.Get(taskName)
})
bus.WithHandler("zos.perf.get_all", func(ctx context.Context, payload []byte) (interface{}, error) {
return perfMon.GetAll()
})

// answer calls for dmi
go func() {
Expand Down
52 changes: 35 additions & 17 deletions pkg/perf/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@ const (
moduleName = "perf"
)

var (
ErrResultNotFound = errors.New("result not found")
)

// TaskResult the result test schema
type TaskResult struct {
TaskName string `json:"task_name"`
Name string `json:"name"`
Timestamp uint64 `json:"timestamp"`
Result interface{} `json:"result"`
}

// generateKey is helper method to add moduleName as prefix for the taskName
func generateKey(taskName string) string {
return fmt.Sprintf("%s.%s", moduleName, taskName)
}
Expand All @@ -34,22 +39,23 @@ func (pm *PerformanceMonitor) setCache(ctx context.Context, result TaskResult) e
conn := pm.pool.Get()
defer conn.Close()

_, err = conn.Do("SET", generateKey(result.TaskName), data)
_, err = conn.Do("SET", generateKey(result.Name), data)
return err
}

// Get gets data from redis
func (pm *PerformanceMonitor) Get(taskName string) (TaskResult, error) {
// get directly gets result for some key
func get(conn redis.Conn, key string) (TaskResult, error) {
var res TaskResult

conn := pm.pool.Get()
defer conn.Close()

data, err := conn.Do("GET", generateKey(taskName))
data, err := conn.Do("GET", key)
if err != nil {
return res, errors.Wrap(err, "failed to get the result")
}

if data == nil {
return res, ErrResultNotFound
}

err = json.Unmarshal(data.([]byte), &res)
if err != nil {
return res, errors.Wrap(err, "failed to unmarshal data from json")
Expand All @@ -58,6 +64,14 @@ func (pm *PerformanceMonitor) Get(taskName string) (TaskResult, error) {
return res, nil
}

// Get gets data from redis
func (pm *PerformanceMonitor) Get(taskName string) (TaskResult, error) {
conn := pm.pool.Get()
defer conn.Close()
return get(conn, generateKey(taskName))
}

// GetAll gets the results for all the tests with moduleName as prefix
func (pm *PerformanceMonitor) GetAll() ([]TaskResult, error) {
var res []TaskResult

Expand All @@ -79,18 +93,10 @@ func (pm *PerformanceMonitor) GetAll() ([]TaskResult, error) {
}

for _, key := range keys {
value, err := conn.Do("GET", key)
result, err := get(conn, key)
if err != nil {
continue
}

var result TaskResult

err = json.Unmarshal(value.([]byte), &result)
if err != nil {
return res, errors.Wrap(err, "failed to unmarshal data from json")
}

res = append(res, result)
}

Expand All @@ -101,3 +107,15 @@ func (pm *PerformanceMonitor) GetAll() ([]TaskResult, error) {
}
return res, nil
}

// exists check if a key exists
func (pm *PerformanceMonitor) exists(key string) (bool, error) {
conn := pm.pool.Get()
defer conn.Close()

ok, err := redis.Bool(conn.Do("EXISTS", generateKey(key)))
if err != nil {
return false, errors.Wrapf(err, "error checking if key %s exists", generateKey(key))
}
return ok, nil
}
47 changes: 32 additions & 15 deletions pkg/perf/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,29 +39,46 @@ func (pm *PerformanceMonitor) AddTask(task Task) {
pm.tasks = append(pm.tasks, task)
}

// runTask runs the task and store its result
func (pm *PerformanceMonitor) runTask(ctx context.Context, task Task) error {
res, err := task.Run(ctx)
if err != nil {
return errors.Wrapf(err, "failed to run task: %s", task.ID())
}

err = pm.setCache(ctx, TaskResult{
Name: task.ID(),
Timestamp: uint64(time.Now().Unix()),
Result: res,
})
if err != nil {
return errors.Wrap(err, "failed to set cache")
}

return nil
}

// Run adds the tasks to the corn queue and start the scheduler
func (pm *PerformanceMonitor) Run(ctx context.Context) error {
for _, task := range pm.tasks {
_, err := pm.scheduler.CronWithSeconds(task.Cron()).Do(func() error {
res, err := task.Run(ctx)
if err != nil {
return errors.Wrapf(err, "failed to run task: %s", task.ID())
}

err = pm.setCache(ctx, TaskResult{
TaskName: task.ID(),
Timestamp: uint64(time.Now().Unix()),
Result: res,
})
if err != nil {
return errors.Wrap(err, "failed to set cache")
}

return nil
return pm.runTask(ctx, task)
})
if err != nil {
return errors.Wrapf(err, "failed to schedule the task: %s", task.ID())
}

ok, err := pm.exists(task.ID())
if err != nil {
return errors.Wrapf(err, "failed to find key %s", task.ID())
}

if !ok {
if err := pm.runTask(ctx, task); err != nil {
return errors.Wrapf(err, "failed to run task: %s", task.ID())
}
}

}

pm.scheduler.StartAsync()
Expand Down

0 comments on commit 52dafa2

Please sign in to comment.