Skip to content

Commit

Permalink
Add CPU benchmark task (#2066)
Browse files Browse the repository at this point in the history
* Add CPU benchmark task

* Add more docs and remove anonymous struct

* Fix typo

* Change zbus client getter to be public
  • Loading branch information
AbdelrahmanElawady authored Oct 16, 2023
1 parent e92dcd7 commit 1d41a6f
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 7 deletions.
3 changes: 3 additions & 0 deletions cmds/modules/noded/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ func action(cli *cli.Context) error {
iperfTest := perf.NewIperfTest()
perfMon.AddTask(&iperfTest)

cpuBenchmarkTask := perf.NewCPUBenchmarkTask()
perfMon.AddTask(&cpuBenchmarkTask)

if err = perfMon.Run(ctx); err != nil {
return errors.Wrap(err, "failed to run the scheduler")
}
Expand Down
72 changes: 72 additions & 0 deletions pkg/perf/cpubench_task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package perf

import (
"context"
"encoding/json"
"fmt"
"os/exec"

"github.com/threefoldtech/zos/pkg/stubs"
)

const cpuBenchmarkTaskID = "CPUBenchmark"
const cpuBenchmarkCronSchedule = "0 0 */6 * * *"

// CPUBenchmarkTask defines CPU benchmark task data.
type CPUBenchmarkTask struct {
// taskID is a unique string ID for the task.
taskID string
// schedule is a 6 field cron schedule (unlike unix cron).
schedule string
}

// CPUBenchmarkResult holds CPU benchmark results with the workloads number during the benchmark.
type CPUBenchmarkResult struct {
SingleThreaded float64 `json:"single"`
MultiThreaded float64 `json:"multi"`
Threads int `json:"threads"`
Workloads int `json:"workloads"`
}

var _ Task = (*CPUBenchmarkTask)(nil)

// NewCPUBenchmarkTask returns a new CPU benchmark task.
func NewCPUBenchmarkTask() CPUBenchmarkTask {
return CPUBenchmarkTask{
taskID: cpuBenchmarkTaskID,
schedule: cpuBenchmarkCronSchedule,
}
}

// ID returns task ID.
func (c *CPUBenchmarkTask) ID() string {
return c.taskID
}

// Cron returns task cron schedule.
func (c *CPUBenchmarkTask) Cron() string {
return c.schedule
}

// Run executes the CPU benchmark.
func (c *CPUBenchmarkTask) Run(ctx context.Context) (interface{}, error) {
cpubenchOut, err := exec.CommandContext(ctx, "cpubench", "-j").CombinedOutput()
if err != nil {
return nil, fmt.Errorf("failed to execute cpubench command: %w", err)
}
cpuBenchmarkResult := CPUBenchmarkResult{}
err = json.Unmarshal(cpubenchOut, &cpuBenchmarkResult)
if err != nil {
return nil, fmt.Errorf("failed to parse cpubench output: %w", err)
}
client := GetZbusClient(ctx)
statistics := stubs.NewStatisticsStub(client)

workloads, err := statistics.Workloads(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get workloads number: %w", err)
}

cpuBenchmarkResult.Workloads = workloads
return cpuBenchmarkResult, nil
}
31 changes: 25 additions & 6 deletions pkg/perf/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ import (
"github.com/rs/zerolog/log"

"github.com/pkg/errors"
"github.com/threefoldtech/zbus"
"github.com/threefoldtech/zos/pkg/utils"
)

// PerformanceMonitor holds the module data
type PerformanceMonitor struct {
scheduler *gocron.Scheduler
pool *redis.Pool
tasks []Task
scheduler *gocron.Scheduler
pool *redis.Pool
zbusClient zbus.Client
tasks []Task
}

// NewPerformanceMonitor returns PerformanceMonitor instance
Expand All @@ -25,13 +27,18 @@ func NewPerformanceMonitor(redisAddr string) (*PerformanceMonitor, error) {
if err != nil {
return nil, errors.Wrap(err, "failed creating new redis pool")
}
zbusClient, err := zbus.NewRedisClient(redisAddr)
if err != nil {
return nil, errors.Wrap(err, "failed to connect to zbus")
}

scheduler := gocron.NewScheduler(time.UTC)

return &PerformanceMonitor{
scheduler: scheduler,
pool: redisPool,
tasks: []Task{},
scheduler: scheduler,
pool: redisPool,
zbusClient: zbusClient,
tasks: []Task{},
}, nil
}

Expand Down Expand Up @@ -61,6 +68,7 @@ func (pm *PerformanceMonitor) runTask(ctx context.Context, task Task) error {

// Run adds the tasks to the corn queue and start the scheduler
func (pm *PerformanceMonitor) Run(ctx context.Context) error {
ctx = withZbusClient(ctx, pm.zbusClient)
for _, task := range pm.tasks {
if _, err := pm.scheduler.CronWithSeconds(task.Cron()).Do(func() error {
return pm.runTask(ctx, task)
Expand All @@ -86,3 +94,14 @@ func (pm *PerformanceMonitor) Run(ctx context.Context) error {
pm.scheduler.StartAsync()
return nil
}

type zbusClient struct{}

func withZbusClient(ctx context.Context, client zbus.Client) context.Context {
return context.WithValue(ctx, zbusClient{}, client)
}

// GetZbusClient gets zbus client from the given context
func GetZbusClient(ctx context.Context) zbus.Client {
return ctx.Value(zbusClient{}).(zbus.Client)
}
8 changes: 8 additions & 0 deletions pkg/primitives/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,11 @@ func (s *statsStream) Current() (gridtypes.Capacity, error) {
func (s *statsStream) Total() gridtypes.Capacity {
return s.stats.Total()
}

func (s *statsStream) Workloads() (int, error) {
capacity, err := s.stats.storage.Capacity()
if err != nil {
return 0, err
}
return capacity.Workloads, nil
}
1 change: 1 addition & 0 deletions pkg/provision.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ type Statistics interface {
ReservedStream(ctx context.Context) <-chan gridtypes.Capacity
Current() (gridtypes.Capacity, error)
Total() gridtypes.Capacity
Workloads() (int, error)
}
17 changes: 17 additions & 0 deletions pkg/stubs/statistics_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,20 @@ func (s *StatisticsStub) Total(ctx context.Context) (ret0 gridtypes.Capacity) {
}
return
}

func (s *StatisticsStub) Workloads(ctx context.Context) (ret0 int, ret1 error) {
args := []interface{}{}
result, err := s.client.RequestContext(ctx, s.module, s.object, "Workloads", args...)
if err != nil {
panic(err)
}
result.PanicOnError()
ret1 = result.CallError()
loader := zbus.Loader{
&ret0,
}
if err := result.Unmarshal(&loader); err != nil {
panic(err)
}
return
}
1 change: 0 additions & 1 deletion pkg/stubs/storage_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package stubs

import (
"context"

zbus "github.com/threefoldtech/zbus"
pkg "github.com/threefoldtech/zos/pkg"
gridtypes "github.com/threefoldtech/zos/pkg/gridtypes"
Expand Down

0 comments on commit 1d41a6f

Please sign in to comment.