diff --git a/model/task/task.go b/model/task/task.go index e289da9d380..75d564e7430 100644 --- a/model/task/task.go +++ b/model/task/task.go @@ -30,6 +30,8 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "gonum.org/v1/gonum/graph" "gonum.org/v1/gonum/graph/simple" "gonum.org/v1/gonum/graph/topo" @@ -91,16 +93,16 @@ type Task struct { // Priority is a specifiable value that adds weight to the prioritization that task will be given in its // corresponding distro task queue. Priority int64 `bson:"priority" json:"priority"` - // PriorityRankValue is not persisted to the db, but stored in memory and passed to the task queue document. - // It is a mixture of Priority and the various task queue ranking factors that multiply on top of Priority. - PriorityRankValue int64 `bson:"priority_rank_value" json:"priority_rank_value"` - TaskGroup string `bson:"task_group" json:"task_group"` - TaskGroupMaxHosts int `bson:"task_group_max_hosts,omitempty" json:"task_group_max_hosts,omitempty"` - TaskGroupOrder int `bson:"task_group_order,omitempty" json:"task_group_order,omitempty"` - ResultsService string `bson:"results_service,omitempty" json:"results_service,omitempty"` - HasCedarResults bool `bson:"has_cedar_results,omitempty" json:"has_cedar_results,omitempty"` - ResultsFailed bool `bson:"results_failed,omitempty" json:"results_failed,omitempty"` - MustHaveResults bool `bson:"must_have_results,omitempty" json:"must_have_results,omitempty"` + // SortingValueBreakdown is not persisted to the db, but stored in memory and passed to the task queue document. + // It contains information on what factors led to the overall queue ranking value for the task. + SortingValueBreakdown SortingValueBreakdown `bson:"-" json:"sorting_value_breakdown"` + TaskGroup string `bson:"task_group" json:"task_group"` + TaskGroupMaxHosts int `bson:"task_group_max_hosts,omitempty" json:"task_group_max_hosts,omitempty"` + TaskGroupOrder int `bson:"task_group_order,omitempty" json:"task_group_order,omitempty"` + ResultsService string `bson:"results_service,omitempty" json:"results_service,omitempty"` + HasCedarResults bool `bson:"has_cedar_results,omitempty" json:"has_cedar_results,omitempty"` + ResultsFailed bool `bson:"results_failed,omitempty" json:"results_failed,omitempty"` + MustHaveResults bool `bson:"must_have_results,omitempty" json:"must_have_results,omitempty"` // only relevant if the task is running. the time of the last heartbeat // sent back by the agent LastHeartbeat time.Time `bson:"last_heartbeat" json:"last_heartbeat"` @@ -4037,3 +4039,100 @@ func (t *Task) FindAbortingAndResettingDependencies() ([]Task, error) { }) return FindAll(q) } + +// SortingValueBreakdown is the full breakdown of the final value used to sort on in the queue, +// with accompanying breakdowns of priority and rank value. +type SortingValueBreakdown struct { + TaskGroupLength int64 + TotalValue int64 + PriorityBreakdown PriorityBreakdown + RankValueBreakdown RankValueBreakdown +} + +// PriorityBreakdown contains information on how much various factors impacted the custom +// priority value that is used in the queue sorting value equation. +type PriorityBreakdown struct { + // InitialPriorityImpact represents how much of the total priority can be attributed to the + // original priority set on a task. + InitialPriorityImpact int64 + // TaskGroupImpact represents how much of the total priority can be attributed to a + // task being in a task group. + TaskGroupImpact int64 + // GeneratorTaskImpact represents how much of the total priority can be attributed to a + // task having a generate.tasks command in it. + GeneratorTaskImpact int64 + // CommitQueueImpact represents how much of the total priority can be attributed to a task + // being in the commit queue. + CommitQueueImpact int64 +} + +// RankValueBreakdown contains information on how much various factors impacted the custom +// rank value that is used in the queue sorting value equation. +type RankValueBreakdown struct { + // CommitQueueImpact represents how much of the total rank value can be attributed to a task + // being in the commit queue. + CommitQueueImpact int64 + // NumDependentsImpact represents how much of the total rank value can be attributed to the number + // of tasks that are dependents of a task. + NumDependentsImpact int64 + // EstimatedRuntimeImpact represents how much of the total rank value can be attributed to the + // estimated runtime of a task. + EstimatedRuntimeImpact int64 + // MainlineWaitTimeImpact represents how much of the total rank value can be attributed to + // how long a mainline task has been waiting for dispatch. + MainlineWaitTimeImpact int64 + // StepbackImpact represents how much of the total rank value can be attributed to + // a task being activated by the stepback process. + StepbackImpact int64 + // PatchImpact represents how much of the total rank value can be attributed to a task + // being part of a patch. + PatchImpact int64 + // PatchWaitTimeImpact represents how much of the total rank value can be attributed to + // how long a patch task has been waiting for dispatch. + PatchWaitTimeImpact int64 +} + +const ( + priorityBreakdownAttributePrefix = "evergreen.priority_breakdown" + rankBreakdownAttributePrefix = "evergreen.rank_breakdown" + priorityScaledRankAttribute = "evergreen.priority_scaled_rank" +) + +// SetSortingValueBreakdownAttributes saves a full breakdown which compartmentalizes each factor that played a role in computing the +// overall value used to sort it in the queue, and creates a honeycomb trace with this data to enable dashboards/analysis. +func (t *Task) SetSortingValueBreakdownAttributes(ctx context.Context, breakdown SortingValueBreakdown) { + _, span := tracer.Start(ctx, "queue-factor-breakdown", trace.WithNewRoot()) + defer span.End() + span.SetAttributes( + attribute.String(evergreen.DistroIDOtelAttribute, t.DistroId), + attribute.String(evergreen.TaskIDOtelAttribute, t.Id), + attribute.Int64(priorityScaledRankAttribute, breakdown.TotalValue), + // Priority values + attribute.Int64(fmt.Sprintf("%s.base_priority", priorityBreakdownAttributePrefix), breakdown.PriorityBreakdown.InitialPriorityImpact), + attribute.Int64(fmt.Sprintf("%s.task_group", priorityBreakdownAttributePrefix), breakdown.PriorityBreakdown.TaskGroupImpact), + attribute.Int64(fmt.Sprintf("%s.generate_task", priorityBreakdownAttributePrefix), breakdown.PriorityBreakdown.GeneratorTaskImpact), + attribute.Int64(fmt.Sprintf("%s.commit_queue", priorityBreakdownAttributePrefix), breakdown.PriorityBreakdown.CommitQueueImpact), + // Rank values + attribute.Int64(fmt.Sprintf("%s.commit_queue", rankBreakdownAttributePrefix), breakdown.RankValueBreakdown.CommitQueueImpact), + attribute.Int64(fmt.Sprintf("%s.patch", rankBreakdownAttributePrefix), breakdown.RankValueBreakdown.PatchImpact), + attribute.Int64(fmt.Sprintf("%s.patch_wait_time", rankBreakdownAttributePrefix), breakdown.RankValueBreakdown.PatchWaitTimeImpact), + attribute.Int64(fmt.Sprintf("%s.mainline_wait_time", rankBreakdownAttributePrefix), breakdown.RankValueBreakdown.MainlineWaitTimeImpact), + attribute.Int64(fmt.Sprintf("%s.stepback", rankBreakdownAttributePrefix), breakdown.RankValueBreakdown.StepbackImpact), + attribute.Int64(fmt.Sprintf("%s.num_dependents", rankBreakdownAttributePrefix), breakdown.RankValueBreakdown.NumDependentsImpact), + attribute.Int64(fmt.Sprintf("%s.estimated_runtime", rankBreakdownAttributePrefix), breakdown.RankValueBreakdown.EstimatedRuntimeImpact), + // Priority percentage values + attribute.Float64(fmt.Sprintf("%s.base_priority_pct", priorityBreakdownAttributePrefix), float64(breakdown.PriorityBreakdown.InitialPriorityImpact/breakdown.TotalValue*100)), + attribute.Float64(fmt.Sprintf("%s.task_group_pct", priorityBreakdownAttributePrefix), float64(breakdown.PriorityBreakdown.TaskGroupImpact/breakdown.TotalValue*100)), + attribute.Float64(fmt.Sprintf("%s.generate_task_pct", priorityBreakdownAttributePrefix), float64(breakdown.PriorityBreakdown.GeneratorTaskImpact/breakdown.TotalValue*100)), + attribute.Float64(fmt.Sprintf("%s.commit_queue_pct", priorityBreakdownAttributePrefix), float64(breakdown.PriorityBreakdown.CommitQueueImpact/breakdown.TotalValue*100)), + // Rank value percentage values + attribute.Float64(fmt.Sprintf("%s.patch_pct", rankBreakdownAttributePrefix), float64(breakdown.RankValueBreakdown.PatchImpact/breakdown.TotalValue*100)), + attribute.Float64(fmt.Sprintf("%s.patch_wait_time_pct", rankBreakdownAttributePrefix), float64(breakdown.RankValueBreakdown.PatchWaitTimeImpact/breakdown.TotalValue*100)), + attribute.Float64(fmt.Sprintf("%s.commit_queue_pct", rankBreakdownAttributePrefix), float64(breakdown.RankValueBreakdown.CommitQueueImpact/breakdown.TotalValue*100)), + attribute.Float64(fmt.Sprintf("%s.mainline_wait_time_pct", rankBreakdownAttributePrefix), float64(breakdown.RankValueBreakdown.MainlineWaitTimeImpact/breakdown.TotalValue*100)), + attribute.Float64(fmt.Sprintf("%s.stepback_pct", rankBreakdownAttributePrefix), float64(breakdown.RankValueBreakdown.StepbackImpact/breakdown.TotalValue*100)), + attribute.Float64(fmt.Sprintf("%s.num_dependents_pct", rankBreakdownAttributePrefix), float64(breakdown.RankValueBreakdown.NumDependentsImpact/breakdown.TotalValue*100)), + attribute.Float64(fmt.Sprintf("%s.estimated_runtime_pct", rankBreakdownAttributePrefix), float64(breakdown.RankValueBreakdown.EstimatedRuntimeImpact/breakdown.TotalValue*100)), + ) + t.SortingValueBreakdown = breakdown +} diff --git a/model/task_queue.go b/model/task_queue.go index 4f0ab146ba3..1535c2100cd 100644 --- a/model/task_queue.go +++ b/model/task_queue.go @@ -4,6 +4,7 @@ import ( "time" "github.com/evergreen-ci/evergreen/db" + "github.com/evergreen-ci/evergreen/model/task" "github.com/mongodb/anser/bsonutil" adb "github.com/mongodb/anser/db" "github.com/mongodb/grip" @@ -99,24 +100,24 @@ type TaskDep struct { } type TaskQueueItem struct { - Id string `bson:"_id" json:"_id"` - IsDispatched bool `bson:"dispatched" json:"dispatched"` - DisplayName string `bson:"display_name" json:"display_name"` - Group string `bson:"group_name" json:"group_name"` - GroupMaxHosts int `bson:"group_max_hosts,omitempty" json:"group_max_hosts,omitempty"` - GroupIndex int `bson:"group_index,omitempty" json:"group_index,omitempty"` - Version string `bson:"version" json:"version"` - BuildVariant string `bson:"build_variant" json:"build_variant"` - RevisionOrderNumber int `bson:"order" json:"order"` - Requester string `bson:"requester" json:"requester"` - Revision string `bson:"gitspec" json:"gitspec"` - Project string `bson:"project" json:"project"` - ExpectedDuration time.Duration `bson:"exp_dur" json:"exp_dur"` - Priority int64 `bson:"priority" json:"priority"` - PriorityRankValue int64 `bson:"priority_rank_value" json:"priority_rank_value"` - Dependencies []string `bson:"dependencies" json:"dependencies"` - DependenciesMet bool `bson:"dependencies_met" json:"dependencies_met"` - ActivatedBy string `bson:"activated_by" json:"activated_by"` + Id string `bson:"_id" json:"_id"` + IsDispatched bool `bson:"dispatched" json:"dispatched"` + DisplayName string `bson:"display_name" json:"display_name"` + Group string `bson:"group_name" json:"group_name"` + GroupMaxHosts int `bson:"group_max_hosts,omitempty" json:"group_max_hosts,omitempty"` + GroupIndex int `bson:"group_index,omitempty" json:"group_index,omitempty"` + Version string `bson:"version" json:"version"` + BuildVariant string `bson:"build_variant" json:"build_variant"` + RevisionOrderNumber int `bson:"order" json:"order"` + Requester string `bson:"requester" json:"requester"` + Revision string `bson:"gitspec" json:"gitspec"` + Project string `bson:"project" json:"project"` + ExpectedDuration time.Duration `bson:"exp_dur" json:"exp_dur"` + Priority int64 `bson:"priority" json:"priority"` + SortingValueBreakdown task.SortingValueBreakdown `bson:"sorting_value_breakdown" json:"sorting_value_breakdown"` + Dependencies []string `bson:"dependencies" json:"dependencies"` + DependenciesMet bool `bson:"dependencies_met" json:"dependencies_met"` + ActivatedBy string `bson:"activated_by" json:"activated_by"` } // must not no-lint these values diff --git a/model/task_queue_service_test.go b/model/task_queue_service_test.go index 8e072e81560..53a39a907e5 100644 --- a/model/task_queue_service_test.go +++ b/model/task_queue_service_test.go @@ -1962,23 +1962,23 @@ func (s *taskDAGDispatchServiceSuite) refreshTaskQueue(service *basicCachedDAGDi dependencies = append(dependencies, d.TaskId) } taskQueue = append(taskQueue, TaskQueueItem{ - Id: t.Id, - DisplayName: t.DisplayName, - BuildVariant: t.BuildVariant, - RevisionOrderNumber: t.RevisionOrderNumber, - Requester: t.Requester, - Revision: t.Revision, - Project: t.Project, - ExpectedDuration: t.ExpectedDuration, - Priority: t.Priority, - PriorityRankValue: t.PriorityRankValue, - Group: t.TaskGroup, - GroupMaxHosts: t.TaskGroupMaxHosts, - GroupIndex: t.TaskGroupOrder, - Version: t.Version, - ActivatedBy: t.ActivatedBy, - Dependencies: dependencies, - DependenciesMet: t.HasDependenciesMet(), + Id: t.Id, + DisplayName: t.DisplayName, + BuildVariant: t.BuildVariant, + RevisionOrderNumber: t.RevisionOrderNumber, + Requester: t.Requester, + Revision: t.Revision, + Project: t.Project, + ExpectedDuration: t.ExpectedDuration, + Priority: t.Priority, + SortingValueBreakdown: t.SortingValueBreakdown, + Group: t.TaskGroup, + GroupMaxHosts: t.TaskGroupMaxHosts, + GroupIndex: t.TaskGroupOrder, + Version: t.Version, + ActivatedBy: t.ActivatedBy, + Dependencies: dependencies, + DependenciesMet: t.HasDependenciesMet(), }) } err = service.rebuild(taskQueue) diff --git a/rest/data/scheduler.go b/rest/data/scheduler.go index 754db70503f..14a1bca5ed0 100644 --- a/rest/data/scheduler.go +++ b/rest/data/scheduler.go @@ -52,7 +52,7 @@ func CompareTasks(ctx context.Context, taskIds []string, useLegacy bool) ([]stri return nil, nil, errors.Errorf("distro '%s' not found", distroId) } taskPlan := scheduler.PrepareTasksForPlanning(d, tasks) - tasks = taskPlan.Export() + tasks = taskPlan.Export(ctx) } prioritizedIds := []string{} for _, t := range tasks { diff --git a/scheduler/distro_alias_test.go b/scheduler/distro_alias_test.go index c44ec605b91..8122199aa54 100644 --- a/scheduler/distro_alias_test.go +++ b/scheduler/distro_alias_test.go @@ -1,6 +1,7 @@ package scheduler import ( + "context" "testing" "github.com/evergreen-ci/evergreen" @@ -41,6 +42,9 @@ func TestDistroAliases(t *testing.T) { require.NoError(t, db.Clear(model.VersionCollection)) require.NoError(t, (&model.Version{Id: "foo"}).Insert()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + t.Run("VerifyPrimaryQueue", func(t *testing.T) { distroOne := &distro.Distro{ Id: "one", @@ -50,7 +54,7 @@ func TestDistroAliases(t *testing.T) { require.NoError(t, db.Clear(model.TaskQueuesCollection)) distroOne.PlannerSettings.Version = evergreen.PlannerVersionTunable - output, err := PrioritizeTasks(distroOne, tasks, TaskPlannerOptions{ID: "tunable-0"}) + output, err := PrioritizeTasks(ctx, distroOne, tasks, TaskPlannerOptions{ID: "tunable-0"}) require.NoError(t, err) require.Len(t, output, 2) require.Equal(t, "one", output[0].Id) @@ -68,7 +72,7 @@ func TestDistroAliases(t *testing.T) { require.NoError(t, db.Clear(model.TaskQueuesCollection)) distroOne.PlannerSettings.Version = evergreen.PlannerVersionLegacy - output, err := PrioritizeTasks(distroOne, tasks, TaskPlannerOptions{ID: "legacy-1"}) + output, err := PrioritizeTasks(ctx, distroOne, tasks, TaskPlannerOptions{ID: "legacy-1"}) require.NoError(t, err) require.Len(t, output, 2) require.Equal(t, "one", output[0].Id) @@ -96,7 +100,7 @@ func TestDistroAliases(t *testing.T) { require.NoError(t, db.Clear(model.TaskSecondaryQueuesCollection)) distroTwo.PlannerSettings.Version = evergreen.PlannerVersionTunable - output, err := PrioritizeTasks(distroTwo, tasks, TaskPlannerOptions{ID: "tunable-0", IsSecondaryQueue: true}) + output, err := PrioritizeTasks(ctx, distroTwo, tasks, TaskPlannerOptions{ID: "tunable-0", IsSecondaryQueue: true}) require.NoError(t, err) require.Len(t, output, 2) require.Equal(t, "one", output[0].Id) @@ -115,7 +119,7 @@ func TestDistroAliases(t *testing.T) { require.NoError(t, db.Clear(model.TaskSecondaryQueuesCollection)) distroTwo.PlannerSettings.Version = evergreen.PlannerVersionLegacy - output, err := PrioritizeTasks(distroTwo, tasks, TaskPlannerOptions{ID: "legacy-0", IsSecondaryQueue: true}) + output, err := PrioritizeTasks(ctx, distroTwo, tasks, TaskPlannerOptions{ID: "legacy-0", IsSecondaryQueue: true}) require.NoError(t, err) require.Len(t, output, 2) require.Equal(t, "one", output[0].Id) diff --git a/scheduler/planner.go b/scheduler/planner.go index 465bea614ee..4fa505cb9f2 100644 --- a/scheduler/planner.go +++ b/scheduler/planner.go @@ -1,6 +1,7 @@ package scheduler import ( + "context" "crypto/sha1" "fmt" "io" @@ -93,7 +94,7 @@ func (cache UnitCache) Export() TaskPlan { // in a Unit must be unique with regards to their ID. type Unit struct { tasks map[string]task.Task - cachedValue int64 + cachedValue task.SortingValueBreakdown id string distro *distro.Distro } @@ -195,78 +196,99 @@ type unitInfo struct { ContainsStepbackTask bool `json:"contains_stepback_task"` } -func (u *unitInfo) value() int64 { - var value int64 - - length := int64(len(u.TaskIDs)) - priority := 1 + (u.TotalPriority / length) - - if !u.ContainsNonGroupTasks { - // if all tasks in the unit are in a task group then - // we should give it a little bump, so that task - // groups tasks are sorted together even when they - // would also be scheduled in a version. - priority += length - } - if u.ContainsGenerateTask { - // give generators a boost so people don't have to wait twice. - priority = priority * u.Settings.GetGenerateTaskFactor() - } +// value computes a full SortingValueBreakdown, containing the final value by which the unit +// will be sorted in its queue (stored in TotalValue), and the numerical influences that +// the unit's properties had on computing that final value. Currently, the formula for +// computing this value is (custom_priority * custom_rankValue) + unit_length, where custom_priority +// and custom_rankValue are both derived from specific properties of the unit and various +// scheduler constants. +func (u *unitInfo) value() task.SortingValueBreakdown { + var breakdown task.SortingValueBreakdown + unitLength := int64(len(u.TaskIDs)) + breakdown.TaskGroupLength = unitLength + priority := u.computePriority(&breakdown) + rankValue := u.computeRankValue(&breakdown) + breakdown.TotalValue = priority*rankValue + breakdown.TaskGroupLength + return breakdown +} +// computeRankValue computes the custom rank value for this unit, which will later be multiplied with the +// computed priority to compute a final value by which the unit will be sorted on in the queue. It also +// modifies the RankValueBreakdown field of the passed in SortingValueBreakdown struct, which is used for +// statistically analyzing the queue's state. +func (u *unitInfo) computeRankValue(breakdown *task.SortingValueBreakdown) int64 { + unitLength := breakdown.TaskGroupLength if u.ContainsInPatch { - // give patches a bump, over non-patches. - value += priority * u.Settings.GetPatchFactor() - // patches that have spent more time in the queue - // should get worked on first (because people are - // waiting on the results), and because FIFO feels + // Give patches a bump over non-patches. Patches that have spent more time in the queue + // should get worked on first (because people are waiting on the results), and because FIFO feels // fair in this context. - value += priority * u.Settings.GetPatchTimeInQueueFactor() * int64(math.Floor(u.TimeInQueue.Minutes()/float64(length))) + breakdown.RankValueBreakdown.PatchImpact = u.Settings.GetPatchFactor() + breakdown.RankValueBreakdown.PatchWaitTimeImpact = u.Settings.GetPatchTimeInQueueFactor() * int64(math.Floor(u.TimeInQueue.Minutes()/float64(unitLength))) } else if u.ContainsInCommitQueue { - // give commit queue patches a boost over everything else - priority += 200 - value += priority * u.Settings.GetCommitQueueFactor() + // Give commit queue patches a boost over everything else + breakdown.RankValueBreakdown.CommitQueueImpact = u.Settings.GetCommitQueueFactor() } else { - // for mainline builds that are more recent, give them a bit + // For mainline builds that are more recent, give them a bit // of a bump, to avoid running older builds first. - avgLifeTime := u.TimeInQueue / time.Duration(length) - - var mainlinePriority int64 + avgLifeTime := u.TimeInQueue / time.Duration(unitLength) if avgLifeTime < time.Duration(7*24)*time.Hour { - mainlinePriority += u.Settings.GetMainlineTimeInQueueFactor() * int64((7*24*time.Hour - avgLifeTime).Hours()) + breakdown.RankValueBreakdown.MainlineWaitTimeImpact = u.Settings.GetMainlineTimeInQueueFactor() * int64((7*24*time.Hour - avgLifeTime).Hours()) } if u.ContainsStepbackTask { - mainlinePriority += u.Settings.GetStepbackTaskFactor() + breakdown.RankValueBreakdown.StepbackImpact = u.Settings.GetStepbackTaskFactor() } - - value += priority * mainlinePriority } - - // Start with the number of tasks so that units with more - // tasks get sorted above one-offs, and then add the priority - // setting as a base. - value += length - value += priority - - // The remaining values are normalized per tasks, to avoid - // situations where larger units are always prioritized above - // smaller groups. - // - // Additionally, all these values are multiplied by the - // priority, to avoid situations where the impact of changing - // priority is obviated by other factors. - // Increase the value for the number of dependents, so that // tasks (and units) which block other tasks run before tasks // that don't block other tasks. - value += int64(float64(priority) * u.Settings.GetNumDependentsFactor() * float64(u.NumDependents/length)) + breakdown.RankValueBreakdown.NumDependentsImpact = int64(u.Settings.GetNumDependentsFactor() * float64(u.NumDependents/unitLength)) // Increase the value for tasks with longer runtimes, given // that most of our workloads have different runtimes, and we // don't want to have longer makespans if longer running tasks // have to execute after shorter running tasks. - value += priority * u.Settings.GetExpectedRuntimeFactor() * int64(math.Floor(u.ExpectedRuntime.Minutes()/float64(length))) + breakdown.RankValueBreakdown.EstimatedRuntimeImpact = u.Settings.GetExpectedRuntimeFactor() * int64(math.Floor(u.ExpectedRuntime.Minutes()/float64(unitLength))) + + return 1 + breakdown.RankValueBreakdown.PatchImpact + + breakdown.RankValueBreakdown.PatchWaitTimeImpact + + breakdown.RankValueBreakdown.MainlineWaitTimeImpact + + breakdown.RankValueBreakdown.CommitQueueImpact + + breakdown.RankValueBreakdown.StepbackImpact + + breakdown.RankValueBreakdown.NumDependentsImpact + + breakdown.RankValueBreakdown.EstimatedRuntimeImpact +} - return value +// computePriority computes the custom priority value for this unit, which will later be multiplied with the +// custom rank value to compute a final value by which the unit will be sorted on in the queue. It also +// modifies the PriorityBreakdown field of the passed in SortingValueBreakdown struct, which is used for +// statistically analyzing the queue's state. +func (u *unitInfo) computePriority(breakdown *task.SortingValueBreakdown) int64 { + unitLength := breakdown.TaskGroupLength + initialPriority := 1 + (u.TotalPriority / unitLength) + breakdown.PriorityBreakdown.InitialPriorityImpact = initialPriority + if !u.ContainsNonGroupTasks { + // If all tasks in the unit are in a task group then + // we should give it a little bump, so that task + // groups tasks are sorted together even when they + // would also be scheduled in a version. + breakdown.PriorityBreakdown.TaskGroupImpact = unitLength + initialPriority += unitLength + } + if u.ContainsGenerateTask { + // Give generators a boost so people don't have to wait twice. + prevPriority := initialPriority + initialPriority = initialPriority * u.Settings.GetGenerateTaskFactor() + breakdown.PriorityBreakdown.GeneratorTaskImpact = initialPriority - prevPriority + if !u.ContainsNonGroupTasks { + breakdown.PriorityBreakdown.TaskGroupImpact *= u.Settings.GetGenerateTaskFactor() + breakdown.PriorityBreakdown.GeneratorTaskImpact -= unitLength * u.Settings.GetGenerateTaskFactor() + } + } + if u.ContainsInCommitQueue { + breakdown.PriorityBreakdown.CommitQueueImpact = 200 + initialPriority += 200 + } + return initialPriority } func (unit *Unit) info() unitInfo { @@ -300,20 +322,19 @@ func (unit *Unit) info() unitInfo { return info } -// RankValue returns a point value for the tasks in the unit that can +// sortingValueBreakdown returns a breakdown for the tasks in the unit that can // be used to compare units with each other. // // Generally, higher point values are given to larger units and for // units that have been in the queue for longer, with longer expected // runtimes. The tasks' priority acts as a multiplying factor. -func (unit *Unit) RankValue() int64 { - if unit.cachedValue > 0 { +func (unit *Unit) sortingValueBreakdown() task.SortingValueBreakdown { + if unit.cachedValue.TotalValue > 0 { return unit.cachedValue } info := unit.info() unit.cachedValue = info.value() - return unit.cachedValue } @@ -368,12 +389,14 @@ func (tl TaskList) Less(i, j int) bool { // TaskPlan provides a sortable interface on top of a slice of // schedulable units, with ordering of units provided by the -// implementation of RankValue. +// implementation of SortingValueBreakdown. type TaskPlan []*Unit -func (tpl TaskPlan) Len() int { return len(tpl) } -func (tpl TaskPlan) Less(i, j int) bool { return tpl[i].RankValue() > tpl[j].RankValue() } -func (tpl TaskPlan) Swap(i, j int) { tpl[i], tpl[j] = tpl[j], tpl[i] } +func (tpl TaskPlan) Len() int { return len(tpl) } +func (tpl TaskPlan) Less(i, j int) bool { + return tpl[i].sortingValueBreakdown().TotalValue > tpl[j].sortingValueBreakdown().TotalValue +} +func (tpl TaskPlan) Swap(i, j int) { tpl[i], tpl[j] = tpl[j], tpl[i] } func (tpl TaskPlan) Keys() []string { out := []string{} @@ -416,20 +439,20 @@ func PrepareTasksForPlanning(distro *distro.Distro, tasks []task.Task) TaskPlan } // Export sorts the TaskPlan returning a unique list of tasks. -func (tpl TaskPlan) Export() []task.Task { +func (tpl TaskPlan) Export(ctx context.Context) []task.Task { sort.Sort(tpl) output := []task.Task{} seen := StringSet{} for _, unit := range tpl { - rankValue := unit.RankValue() + sortingValueBreakdown := unit.sortingValueBreakdown() tasks := unit.Export() sort.Sort(tasks) for i := range tasks { if seen.Visit(tasks[i].Id) { continue } - tasks[i].PriorityRankValue = rankValue + tasks[i].SetSortingValueBreakdownAttributes(ctx, sortingValueBreakdown) output = append(output, tasks[i]) } } diff --git a/scheduler/planner_test.go b/scheduler/planner_test.go index 5c04d45a26f..69d1f0c298e 100644 --- a/scheduler/planner_test.go +++ b/scheduler/planner_test.go @@ -25,7 +25,7 @@ func TestPlanner(t *testing.T) { assert.Len(t, StringSet{}, 0) assert.NotNil(t, StringSet{}) }) - t.Run("CheckNonExistant", func(t *testing.T) { + t.Run("CheckNonExistent", func(t *testing.T) { set := StringSet{} assert.False(t, set.Check("foo")) }) @@ -130,7 +130,7 @@ func TestPlanner(t *testing.T) { ts.SetDistro(&distro.Distro{}) require.Len(t, ts.tasks, 1) } - for _, ts := range plan.Export() { + for _, ts := range plan.Export(context.Background()) { require.True(t, ts.Id == "one" || ts.Id == "two") } }) @@ -197,98 +197,131 @@ func TestPlanner(t *testing.T) { t.Run("SingleTask", func(t *testing.T) { unit := NewUnit(task.Task{Id: "foo"}) unit.SetDistro(&distro.Distro{}) - assert.EqualValues(t, 180, unit.RankValue()) + assert.EqualValues(t, 180, unit.sortingValueBreakdown().TotalValue) + verifyRankBreakdown(t, unit.sortingValueBreakdown()) }) t.Run("MultipleTasks", func(t *testing.T) { unit := NewUnit(task.Task{Id: "foo"}) unit.SetDistro(&distro.Distro{}) unit.Add(task.Task{Id: "bar"}) - assert.EqualValues(t, 181, unit.RankValue()) + assert.EqualValues(t, 181, unit.sortingValueBreakdown().TotalValue) + verifyRankBreakdown(t, unit.sortingValueBreakdown()) }) t.Run("CommitQueue", func(t *testing.T) { unit := NewUnit(task.Task{Id: "foo", Requester: evergreen.MergeTestRequester}) unit.SetDistro(&distro.Distro{}) - assert.EqualValues(t, 2413, unit.RankValue()) + assert.EqualValues(t, 2413, unit.sortingValueBreakdown().TotalValue) + verifyRankBreakdown(t, unit.sortingValueBreakdown()) }) t.Run("MergeQueue", func(t *testing.T) { unit := NewUnit(task.Task{Id: "foo", Requester: evergreen.GithubMergeRequester}) unit.SetDistro(&distro.Distro{}) - assert.EqualValues(t, 2413, unit.RankValue()) + assert.EqualValues(t, 2413, unit.sortingValueBreakdown().TotalValue) + verifyRankBreakdown(t, unit.sortingValueBreakdown()) }) t.Run("Patches", func(t *testing.T) { t.Run("CLI", func(t *testing.T) { unit := NewUnit(task.Task{Id: "foo", Requester: evergreen.PatchVersionRequester}) unit.SetDistro(&distro.Distro{}) unit.distro.PlannerSettings.PatchFactor = 10 - assert.EqualValues(t, 22, unit.RankValue()) + assert.EqualValues(t, 22, unit.sortingValueBreakdown().TotalValue) + verifyRankBreakdown(t, unit.sortingValueBreakdown()) }) t.Run("Github", func(t *testing.T) { unit := NewUnit(task.Task{Id: "foo", Requester: evergreen.GithubPRRequester}) unit.SetDistro(&distro.Distro{}) unit.distro.PlannerSettings.PatchFactor = 10 - assert.EqualValues(t, 22, unit.RankValue()) + assert.EqualValues(t, 22, unit.sortingValueBreakdown().TotalValue) + verifyRankBreakdown(t, unit.sortingValueBreakdown()) }) }) t.Run("Priority", func(t *testing.T) { unit := NewUnit(task.Task{Id: "foo", Priority: 10}) unit.SetDistro(&distro.Distro{}) - assert.EqualValues(t, 1970, unit.RankValue()) + assert.EqualValues(t, 1970, unit.sortingValueBreakdown().TotalValue) + verifyRankBreakdown(t, unit.sortingValueBreakdown()) }) t.Run("TimeInQueuePatch", func(t *testing.T) { unit := NewUnit(task.Task{Id: "foo", Requester: evergreen.PatchVersionRequester, ActivatedTime: time.Now().Add(-time.Hour)}) unit.SetDistro(&distro.Distro{}) - assert.EqualValues(t, 73, unit.RankValue()) + assert.EqualValues(t, 73, unit.sortingValueBreakdown().TotalValue) + verifyRankBreakdown(t, unit.sortingValueBreakdown()) }) t.Run("TimeInQueueMainline", func(t *testing.T) { unit := NewUnit(task.Task{Id: "foo", Requester: evergreen.RepotrackerVersionRequester, ActivatedTime: time.Now().Add(-time.Hour)}) unit.SetDistro(&distro.Distro{}) - assert.EqualValues(t, 178, unit.RankValue()) + assert.EqualValues(t, 178, unit.sortingValueBreakdown().TotalValue) + verifyRankBreakdown(t, unit.sortingValueBreakdown()) }) t.Run("LifeTimePatch", func(t *testing.T) { unit := NewUnit(task.Task{Id: "foo", Requester: evergreen.PatchVersionRequester, IngestTime: time.Now().Add(-10 * time.Hour)}) unit.SetDistro(&distro.Distro{}) - assert.EqualValues(t, 613, unit.RankValue()) + assert.EqualValues(t, 613, unit.sortingValueBreakdown().TotalValue) + verifyRankBreakdown(t, unit.sortingValueBreakdown()) }) t.Run("LifeTimeMainlineNew", func(t *testing.T) { unit := NewUnit(task.Task{Id: "foo", Requester: evergreen.RepotrackerVersionRequester, IngestTime: time.Now().Add(-10 * time.Minute)}) unit.SetDistro(&distro.Distro{}) - assert.EqualValues(t, 179, unit.RankValue()) + assert.EqualValues(t, 179, unit.sortingValueBreakdown().TotalValue) + verifyRankBreakdown(t, unit.sortingValueBreakdown()) }) t.Run("LifeTimeMainlineOld", func(t *testing.T) { unit := NewUnit(task.Task{Id: "foo", Requester: evergreen.RepotrackerVersionRequester, IngestTime: time.Now().Add(-7 * 24 * time.Hour)}) unit.SetDistro(&distro.Distro{}) - assert.EqualValues(t, 12, unit.RankValue()) + assert.EqualValues(t, 12, unit.sortingValueBreakdown().TotalValue) + verifyRankBreakdown(t, unit.sortingValueBreakdown()) }) t.Run("NumDependents", func(t *testing.T) { unit := NewUnit(task.Task{Id: "foo", NumDependents: 2}) unit.SetDistro(&distro.Distro{}) - assert.EqualValues(t, 182, unit.RankValue()) + assert.EqualValues(t, 182, unit.sortingValueBreakdown().TotalValue) + verifyRankBreakdown(t, unit.sortingValueBreakdown()) }) t.Run("NumDependentsWithFactor", func(t *testing.T) { unit := NewUnit(task.Task{Id: "foo", NumDependents: 2}) unit.SetDistro(&distro.Distro{}) unit.distro.PlannerSettings.NumDependentsFactor = 10 - assert.EqualValues(t, 200, unit.RankValue()) + assert.EqualValues(t, 200, unit.sortingValueBreakdown().TotalValue) + verifyRankBreakdown(t, unit.sortingValueBreakdown()) }) t.Run("NumDependentsWithFractionFactor", func(t *testing.T) { unit := NewUnit(task.Task{Id: "foo", NumDependents: 2}) unit.SetDistro(&distro.Distro{}) unit.distro.PlannerSettings.NumDependentsFactor = 0.5 - assert.EqualValues(t, 181, unit.RankValue()) + assert.EqualValues(t, 181, unit.sortingValueBreakdown().TotalValue) + verifyRankBreakdown(t, unit.sortingValueBreakdown()) + }) + t.Run("GenerateTask", func(t *testing.T) { + unit := NewUnit(task.Task{Id: "foo", GenerateTask: true}) + unit.SetDistro(&distro.Distro{}) + unit.distro.PlannerSettings.GenerateTaskFactor = 10 + assert.EqualValues(t, 1791, unit.sortingValueBreakdown().TotalValue) + verifyRankBreakdown(t, unit.sortingValueBreakdown()) + }) + t.Run("TaskGroup", func(t *testing.T) { + unit := NewUnit(task.Task{Id: "foo", TaskGroup: "tg1"}) + unit.Add(task.Task{Id: "bar", TaskGroup: "tg1"}) + unit.Add(task.Task{Id: "baz", TaskGroup: "tg1"}) + unit.SetDistro(&distro.Distro{}) + assert.EqualValues(t, 719, unit.sortingValueBreakdown().TotalValue) + verifyRankBreakdown(t, unit.sortingValueBreakdown()) }) }) t.Run("RankCachesValue", func(t *testing.T) { unit := NewUnit(task.Task{Id: "foo", Priority: 100}) unit.SetDistro(&distro.Distro{}) - assert.EqualValues(t, 18080, unit.RankValue()) + assert.EqualValues(t, 18080, unit.sortingValueBreakdown().TotalValue) + verifyRankBreakdown(t, unit.sortingValueBreakdown()) unit.Add(task.Task{Id: "bar"}) - assert.EqualValues(t, 18080, unit.RankValue()) + assert.EqualValues(t, 18080, unit.sortingValueBreakdown().TotalValue) + verifyRankBreakdown(t, unit.sortingValueBreakdown()) }) t.Run("RankForCommitQueue", func(t *testing.T) { unit := NewUnit(task.Task{Id: "foo", Requester: evergreen.MergeTestRequester}) unit.SetDistro(&distro.Distro{}) - assert.EqualValues(t, 2413, unit.RankValue()) + assert.EqualValues(t, 2413, unit.sortingValueBreakdown().TotalValue) + verifyRankBreakdown(t, unit.sortingValueBreakdown()) }) }) t.Run("TaskPlan", func(t *testing.T) { @@ -302,20 +335,20 @@ func TestPlanner(t *testing.T) { t.Run("NoChange", func(t *testing.T) { plan := buildPlan(NewUnit(task.Task{Id: "foo"}), NewUnit(task.Task{Id: "bar"})) sort.Stable(plan) - out := plan.Export() + out := plan.Export(context.Background()) assert.Equal(t, "foo", out[0].Id) assert.Equal(t, "bar", out[1].Id) }) t.Run("ChangeOrder", func(t *testing.T) { plan := buildPlan(NewUnit(task.Task{Id: "foo"}), NewUnit(task.Task{Id: "bar", Priority: 10})) sort.Stable(plan) - out := plan.Export() + out := plan.Export(context.Background()) assert.Equal(t, "bar", out[0].Id) assert.Equal(t, "foo", out[1].Id) }) t.Run("Deduplicates", func(t *testing.T) { plan := buildPlan(NewUnit(task.Task{Id: "foo"}), NewUnit(task.Task{Id: "foo"})) - assert.Len(t, plan.Export(), 1) + assert.Len(t, plan.Export(context.Background()), 1) }) }) t.Run("TaskList", func(t *testing.T) { @@ -378,7 +411,7 @@ func TestPlanner(t *testing.T) { }) assert.Len(t, plan, 2) - assert.Len(t, plan.Export(), 3) + assert.Len(t, plan.Export(context.Background()), 3) }) t.Run("VersionsGrouped", func(t *testing.T) { plan := PrepareTasksForPlanning(&distro.Distro{ @@ -392,7 +425,7 @@ func TestPlanner(t *testing.T) { }) assert.Len(t, plan, 2) - assert.Len(t, plan.Export(), 3) + assert.Len(t, plan.Export(context.Background()), 3) }) t.Run("VersionsAndTaskGroupsGrouped", func(t *testing.T) { plan := PrepareTasksForPlanning(&distro.Distro{ @@ -409,7 +442,7 @@ func TestPlanner(t *testing.T) { }) assert.Len(t, plan, 3) - tasks := plan.Export() + tasks := plan.Export(context.Background()) assert.Len(t, tasks, 6) assert.Equal(t, "one", tasks[0].TaskGroup) assert.Equal(t, "one", tasks[1].TaskGroup) @@ -423,7 +456,7 @@ func TestPlanner(t *testing.T) { }) require.Len(t, plan, 4, "keys:%s", plan.Keys()) - tasks := plan.Export() + tasks := plan.Export(context.Background()) require.Len(t, tasks, 4) assert.Equal(t, "three", tasks[3].Id) @@ -441,7 +474,22 @@ func TestPlanner(t *testing.T) { }) assert.Len(t, plan, 3) - assert.Len(t, plan.Export(), 3) + assert.Len(t, plan.Export(context.Background()), 3) }) }) } + +func verifyRankBreakdown(t *testing.T, breakdown task.SortingValueBreakdown) { + totalRankValue := breakdown.RankValueBreakdown.StepbackImpact + + breakdown.RankValueBreakdown.PatchImpact + + breakdown.RankValueBreakdown.PatchWaitTimeImpact + + breakdown.RankValueBreakdown.MainlineWaitTimeImpact + + breakdown.RankValueBreakdown.EstimatedRuntimeImpact + + breakdown.RankValueBreakdown.NumDependentsImpact + + breakdown.RankValueBreakdown.CommitQueueImpact + totalPriorityValue := breakdown.PriorityBreakdown.InitialPriorityImpact + + breakdown.PriorityBreakdown.CommitQueueImpact + + breakdown.PriorityBreakdown.GeneratorTaskImpact + + breakdown.PriorityBreakdown.TaskGroupImpact + assert.Equal(t, totalPriorityValue+breakdown.TaskGroupLength+totalRankValue*totalPriorityValue, breakdown.TotalValue) +} diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 69c927c9307..ca19abac477 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -24,18 +24,18 @@ type TaskPlannerOptions struct { type TaskPlanner func(*distro.Distro, []task.Task, TaskPlannerOptions) ([]task.Task, error) -func PrioritizeTasks(d *distro.Distro, tasks []task.Task, opts TaskPlannerOptions) ([]task.Task, error) { +func PrioritizeTasks(ctx context.Context, d *distro.Distro, tasks []task.Task, opts TaskPlannerOptions) ([]task.Task, error) { opts.IncludesDependencies = d.DispatcherSettings.Version == evergreen.DispatcherVersionRevisedWithDependencies switch d.PlannerSettings.Version { case evergreen.PlannerVersionTunable: - return runTunablePlanner(d, tasks, opts) + return runTunablePlanner(ctx, d, tasks, opts) default: return runLegacyPlanner(d, tasks, opts) } } -func runTunablePlanner(d *distro.Distro, tasks []task.Task, opts TaskPlannerOptions) ([]task.Task, error) { +func runTunablePlanner(ctx context.Context, d *distro.Distro, tasks []task.Task, opts TaskPlannerOptions) ([]task.Task, error) { var err error tasks, err = PopulateCaches(opts.ID, tasks) @@ -43,11 +43,10 @@ func runTunablePlanner(d *distro.Distro, tasks []task.Task, opts TaskPlannerOpti return nil, errors.WithStack(err) } - plan := PrepareTasksForPlanning(d, tasks).Export() + plan := PrepareTasksForPlanning(d, tasks).Export(ctx) info := GetDistroQueueInfo(d.Id, plan, d.GetTargetTime(), opts) info.SecondaryQueue = opts.IsSecondaryQueue info.PlanCreatedAt = opts.StartedAt - if err = PersistTaskQueue(d.Id, plan, info); err != nil { return nil, errors.WithStack(err) } diff --git a/scheduler/task_queue_persister.go b/scheduler/task_queue_persister.go index 02fa4f02ab0..ddae30d9af8 100644 --- a/scheduler/task_queue_persister.go +++ b/scheduler/task_queue_persister.go @@ -20,23 +20,23 @@ func PersistTaskQueue(distro string, tasks []task.Task, distroQueueInfo model.Di dependencies = append(dependencies, d.TaskId) } taskQueue = append(taskQueue, model.TaskQueueItem{ - Id: t.Id, - DisplayName: t.DisplayName, - BuildVariant: t.BuildVariant, - RevisionOrderNumber: t.RevisionOrderNumber, - Requester: t.Requester, - Revision: t.Revision, - Project: t.Project, - ExpectedDuration: t.ExpectedDuration, - Priority: t.Priority, - PriorityRankValue: t.PriorityRankValue, - Group: t.TaskGroup, - GroupMaxHosts: t.TaskGroupMaxHosts, - GroupIndex: t.TaskGroupOrder, - Version: t.Version, - ActivatedBy: t.ActivatedBy, - Dependencies: dependencies, - DependenciesMet: t.HasDependenciesMet(), + Id: t.Id, + DisplayName: t.DisplayName, + BuildVariant: t.BuildVariant, + RevisionOrderNumber: t.RevisionOrderNumber, + Requester: t.Requester, + Revision: t.Revision, + Project: t.Project, + ExpectedDuration: t.ExpectedDuration, + Priority: t.Priority, + SortingValueBreakdown: t.SortingValueBreakdown, + Group: t.TaskGroup, + GroupMaxHosts: t.TaskGroupMaxHosts, + GroupIndex: t.TaskGroupOrder, + Version: t.Version, + ActivatedBy: t.ActivatedBy, + Dependencies: dependencies, + DependenciesMet: t.HasDependenciesMet(), }) } @@ -48,8 +48,7 @@ func PersistTaskQueue(distro string, tasks []task.Task, distroQueueInfo model.Di // track scheduled time for prioritized tasks if err := task.SetTasksScheduledTime(tasks, startAt); err != nil { - return errors.Wrapf(err, "error setting scheduled time for prioritized tasks for distro '%s'", distro) + return errors.Wrapf(err, "setting scheduled time for prioritized tasks for distro '%s'", distro) } - return nil } diff --git a/scheduler/wrapper.go b/scheduler/wrapper.go index 659b9ca5611..76612bc588e 100644 --- a/scheduler/wrapper.go +++ b/scheduler/wrapper.go @@ -102,7 +102,7 @@ func PlanDistro(ctx context.Context, conf Configuration, s *evergreen.Settings) ///////////////// planningPhaseBegins := time.Now() - prioritizedTasks, err := PrioritizeTasks(distro, tasks, TaskPlannerOptions{ + prioritizedTasks, err := PrioritizeTasks(ctx, distro, tasks, TaskPlannerOptions{ StartedAt: taskFindingBegins, ID: schedulerInstanceID, IsSecondaryQueue: false, diff --git a/units/scheduler_alias.go b/units/scheduler_alias.go index 802fef3af43..3b492363611 100644 --- a/units/scheduler_alias.go +++ b/units/scheduler_alias.go @@ -83,7 +83,7 @@ func (j *distroAliasSchedulerJob) Run(ctx context.Context) { if d == nil { return } - plan, err := scheduler.PrioritizeTasks(d, tasks, scheduler.TaskPlannerOptions{ + plan, err := scheduler.PrioritizeTasks(ctx, d, tasks, scheduler.TaskPlannerOptions{ StartedAt: startAt, ID: j.ID(), IsSecondaryQueue: true,