Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CPU benchmark task #2066

Merged
merged 4 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmds/modules/noded/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ func action(cli *cli.Context) error {
iperfTest := perf.NewIperfTest()
perfMon.AddTask(&iperfTest)

cpuBenchmarkTask := perf.NewCPUBenchmarkTask()
perfMon.AddTask(&cpuBenchmarkTask)

if err = perfMon.Run(ctx); err != nil {
return errors.Wrap(err, "failed to run the scheduler")
}
Expand Down
72 changes: 72 additions & 0 deletions pkg/perf/cpubench_task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package perf

import (
"context"
"encoding/json"
"fmt"
"os/exec"

"github.com/threefoldtech/zos/pkg/stubs"
)

const cpuBenchmarkTaskID = "CPUBenchmark"
const cpuBenchmarkCronSchedule = "0 0 */6 * * *"

// CPUBenchmarkTask defines CPU benchmark task data.
type CPUBenchmarkTask struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add more explanation on the fields, e.g taskID is that a sequential string?a uuid?or else?
schedule: i'm assuming that's a cron string? let's clear the ambiguity please

// taskID is a unique string ID for the task.
taskID string
// schedule is a 6 field cron schedule (unlike unix cron).
schedule string
}

// CPUBenchmarkResult holds CPU benchmark results with the workloads number during the benchmark.
type CPUBenchmarkResult struct {
Copy link
Collaborator

@xmonader xmonader Oct 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the Single/Multi process or a thread? can we add a few more info and even change the name to a include the full name (or that's because of the json marshalling?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can change the name of the fields but the data returned from zos.perf.get_all will still be single, multi and threads, is that fine?

SingleThreaded float64 `json:"single"`
MultiThreaded float64 `json:"multi"`
Threads int `json:"threads"`
Workloads int `json:"workloads"`
}

var _ Task = (*CPUBenchmarkTask)(nil)

// NewCPUBenchmarkTask returns a new CPU benchmark task.
func NewCPUBenchmarkTask() CPUBenchmarkTask {
return CPUBenchmarkTask{
taskID: cpuBenchmarkTaskID,
schedule: cpuBenchmarkCronSchedule,
}
}

// ID returns task ID.
func (c *CPUBenchmarkTask) ID() string {
return c.taskID
}

// Cron returns task cron schedule.
func (c *CPUBenchmarkTask) Cron() string {
return c.schedule
}

// Run executes the CPU benchmark.
func (c *CPUBenchmarkTask) Run(ctx context.Context) (interface{}, error) {
cpubenchOut, err := exec.CommandContext(ctx, "cpubench", "-j").CombinedOutput()
if err != nil {
return nil, fmt.Errorf("failed to execute cpubench command: %w", err)
}
cpuBenchmarkResult := CPUBenchmarkResult{}
err = json.Unmarshal(cpubenchOut, &cpuBenchmarkResult)
if err != nil {
return nil, fmt.Errorf("failed to parse cpubench output: %w", err)
}
client := GetZbusClient(ctx)
statistics := stubs.NewStatisticsStub(client)

workloads, err := statistics.Workloads(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get workloads number: %w", err)
}

cpuBenchmarkResult.Workloads = workloads
return cpuBenchmarkResult, nil
}
31 changes: 25 additions & 6 deletions pkg/perf/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ import (
"github.com/rs/zerolog/log"

"github.com/pkg/errors"
"github.com/threefoldtech/zbus"
"github.com/threefoldtech/zos/pkg/utils"
)

// PerformanceMonitor holds the module data
type PerformanceMonitor struct {
scheduler *gocron.Scheduler
pool *redis.Pool
tasks []Task
scheduler *gocron.Scheduler
pool *redis.Pool
zbusClient zbus.Client
tasks []Task
}

// NewPerformanceMonitor returns PerformanceMonitor instance
Expand All @@ -25,13 +27,18 @@ func NewPerformanceMonitor(redisAddr string) (*PerformanceMonitor, error) {
if err != nil {
return nil, errors.Wrap(err, "failed creating new redis pool")
}
zbusClient, err := zbus.NewRedisClient(redisAddr)
if err != nil {
return nil, errors.Wrap(err, "failed to connect to zbus")
}

scheduler := gocron.NewScheduler(time.UTC)

return &PerformanceMonitor{
scheduler: scheduler,
pool: redisPool,
tasks: []Task{},
scheduler: scheduler,
pool: redisPool,
zbusClient: zbusClient,
tasks: []Task{},
}, nil
}

Expand Down Expand Up @@ -61,6 +68,7 @@ func (pm *PerformanceMonitor) runTask(ctx context.Context, task Task) error {

// Run adds the tasks to the corn queue and start the scheduler
func (pm *PerformanceMonitor) Run(ctx context.Context) error {
ctx = withZbusClient(ctx, pm.zbusClient)
for _, task := range pm.tasks {
if _, err := pm.scheduler.CronWithSeconds(task.Cron()).Do(func() error {
return pm.runTask(ctx, task)
Expand All @@ -86,3 +94,14 @@ func (pm *PerformanceMonitor) Run(ctx context.Context) error {
pm.scheduler.StartAsync()
return nil
}

type zbusClient struct{}

func withZbusClient(ctx context.Context, client zbus.Client) context.Context {
return context.WithValue(ctx, zbusClient{}, client)
}

// GetZbusClient gets zbus client from the given context
func GetZbusClient(ctx context.Context) zbus.Client {
return ctx.Value(zbusClient{}).(zbus.Client)
}
8 changes: 8 additions & 0 deletions pkg/primitives/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,11 @@ func (s *statsStream) Current() (gridtypes.Capacity, error) {
func (s *statsStream) Total() gridtypes.Capacity {
return s.stats.Total()
}

func (s *statsStream) Workloads() (int, error) {
capacity, err := s.stats.storage.Capacity()
if err != nil {
return 0, err
}
return capacity.Workloads, nil
}
1 change: 1 addition & 0 deletions pkg/provision.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ type Statistics interface {
ReservedStream(ctx context.Context) <-chan gridtypes.Capacity
Current() (gridtypes.Capacity, error)
Total() gridtypes.Capacity
Workloads() (int, error)
}
17 changes: 17 additions & 0 deletions pkg/stubs/statistics_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,20 @@ func (s *StatisticsStub) Total(ctx context.Context) (ret0 gridtypes.Capacity) {
}
return
}

func (s *StatisticsStub) Workloads(ctx context.Context) (ret0 int, ret1 error) {
args := []interface{}{}
result, err := s.client.RequestContext(ctx, s.module, s.object, "Workloads", args...)
if err != nil {
panic(err)
}
result.PanicOnError()
ret1 = result.CallError()
loader := zbus.Loader{
&ret0,
}
if err := result.Unmarshal(&loader); err != nil {
panic(err)
}
return
}
1 change: 0 additions & 1 deletion pkg/stubs/storage_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package stubs

import (
"context"

zbus "github.com/threefoldtech/zbus"
pkg "github.com/threefoldtech/zos/pkg"
gridtypes "github.com/threefoldtech/zos/pkg/gridtypes"
Expand Down
Loading