diff --git a/client/python/pyproject.toml b/client/python/pyproject.toml index 635a4bea261..5b0952245f3 100644 --- a/client/python/pyproject.toml +++ b/client/python/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "armada_client" -version = "0.3.5" +version = "0.4.6" description = "Armada gRPC API python client" readme = "README.md" requires-python = ">=3.7" diff --git a/cmd/simulator/cmd/root.go b/cmd/simulator/cmd/root.go index 0e5951d5e4a..a1c815633c5 100644 --- a/cmd/simulator/cmd/root.go +++ b/cmd/simulator/cmd/root.go @@ -123,8 +123,8 @@ func runSimulations(cmd *cobra.Command, args []string) error { defer outputSink.Close(ctx) ctx.Info("Armada simulator") - ctx.Infof("ClusterSpec: %v", clusterSpec.Name) - ctx.Infof("WorkloadSpecs: %v", workloadSpec.Name) + ctx.Infof("ClusterSpec: %v", clusterFile) + ctx.Infof("WorkloadSpecs: %v", workloadFile) ctx.Infof("SchedulingConfig: %v", configFile) ctx.Infof("OutputDir: %v", outputDirPath) diff --git a/internal/scheduler/database/query.sql.go b/internal/scheduler/database/query.sql.go index 347a14e0887..cfb1fb84a40 100644 --- a/internal/scheduler/database/query.sql.go +++ b/internal/scheduler/database/query.sql.go @@ -100,7 +100,7 @@ func (q *Queries) MarkJobRunsFailedById(ctx context.Context, runIds []string) er } const markJobRunsPreemptRequestedByJobId = `-- name: MarkJobRunsPreemptRequestedByJobId :exec -UPDATE runs SET preempt_requested = true WHERE queue = $1 and job_set = $2 and job_id = ANY($3::text[]) +UPDATE runs SET preempt_requested = true WHERE queue = $1 and job_set = $2 and job_id = ANY($3::text[]) and cancelled = false and succeeded = false and failed = false ` type MarkJobRunsPreemptRequestedByJobIdParams struct { @@ -142,7 +142,7 @@ func (q *Queries) MarkJobRunsSucceededById(ctx context.Context, runIds []string) } const markJobsCancelRequestedById = `-- name: MarkJobsCancelRequestedById :exec -UPDATE jobs SET cancel_requested = true WHERE queue = $1 and job_set = $2 and job_id = ANY($3::text[]) +UPDATE jobs SET cancel_requested = true WHERE queue = $1 and job_set = $2 and job_id = ANY($3::text[]) and cancelled = false and succeeded = false and failed = false ` type MarkJobsCancelRequestedByIdParams struct { @@ -157,7 +157,7 @@ func (q *Queries) MarkJobsCancelRequestedById(ctx context.Context, arg MarkJobsC } const markJobsCancelRequestedBySetAndQueuedState = `-- name: MarkJobsCancelRequestedBySetAndQueuedState :exec -UPDATE jobs SET cancel_by_jobset_requested = true WHERE job_set = $1 and queue = $2 and queued = ANY($3::bool[]) +UPDATE jobs SET cancel_by_jobset_requested = true WHERE job_set = $1 and queue = $2 and queued = ANY($3::bool[]) and cancelled = false and succeeded = false and failed = false ` type MarkJobsCancelRequestedBySetAndQueuedStateParams struct { @@ -1169,7 +1169,7 @@ func (q *Queries) SetTerminatedTime(ctx context.Context, arg SetTerminatedTimePa } const updateJobPriorityById = `-- name: UpdateJobPriorityById :exec -UPDATE jobs SET priority = $1 WHERE queue = $2 and job_set = $3 and job_id = ANY($4::text[]) +UPDATE jobs SET priority = $1 WHERE queue = $2 and job_set = $3 and job_id = ANY($4::text[]) and cancelled = false and succeeded = false and failed = false ` type UpdateJobPriorityByIdParams struct { @@ -1190,7 +1190,7 @@ func (q *Queries) UpdateJobPriorityById(ctx context.Context, arg UpdateJobPriori } const updateJobPriorityByJobSet = `-- name: UpdateJobPriorityByJobSet :exec -UPDATE jobs SET priority = $1 WHERE job_set = $2 and queue = $3 +UPDATE jobs SET priority = $1 WHERE job_set = $2 and queue = $3 and cancelled = false and succeeded = false and failed = false ` type UpdateJobPriorityByJobSetParams struct { diff --git a/internal/scheduler/database/query/query.sql b/internal/scheduler/database/query/query.sql index 0c93b068f47..ad4da1a55f2 100644 --- a/internal/scheduler/database/query/query.sql +++ b/internal/scheduler/database/query/query.sql @@ -11,16 +11,16 @@ SELECT job_id, job_set, queue, priority, submitted, queued, queued_version, vali SELECT job_id, job_set, queue, priority, submitted, queued, queued_version, validated, cancel_requested, cancel_by_jobset_requested, cancelled, succeeded, failed, scheduling_info, scheduling_info_version, pools, serial FROM jobs WHERE serial > $1 ORDER BY serial LIMIT $2; -- name: UpdateJobPriorityByJobSet :exec -UPDATE jobs SET priority = $1 WHERE job_set = $2 and queue = $3; +UPDATE jobs SET priority = $1 WHERE job_set = $2 and queue = $3 and cancelled = false and succeeded = false and failed = false; -- name: MarkJobsCancelRequestedBySetAndQueuedState :exec -UPDATE jobs SET cancel_by_jobset_requested = true WHERE job_set = sqlc.arg(job_set) and queue = sqlc.arg(queue) and queued = ANY(sqlc.arg(queued_states)::bool[]); +UPDATE jobs SET cancel_by_jobset_requested = true WHERE job_set = sqlc.arg(job_set) and queue = sqlc.arg(queue) and queued = ANY(sqlc.arg(queued_states)::bool[]) and cancelled = false and succeeded = false and failed = false; -- name: MarkJobsSucceededById :exec UPDATE jobs SET succeeded = true WHERE job_id = ANY(sqlc.arg(job_ids)::text[]); -- name: MarkJobsCancelRequestedById :exec -UPDATE jobs SET cancel_requested = true WHERE queue = sqlc.arg(queue) and job_set = sqlc.arg(job_set) and job_id = ANY(sqlc.arg(job_ids)::text[]); +UPDATE jobs SET cancel_requested = true WHERE queue = sqlc.arg(queue) and job_set = sqlc.arg(job_set) and job_id = ANY(sqlc.arg(job_ids)::text[]) and cancelled = false and succeeded = false and failed = false; -- name: MarkJobsCancelledById :exec UPDATE jobs SET cancelled = true WHERE job_id = ANY(sqlc.arg(job_ids)::text[]); @@ -29,7 +29,7 @@ UPDATE jobs SET cancelled = true WHERE job_id = ANY(sqlc.arg(job_ids)::text[]); UPDATE jobs SET failed = true WHERE job_id = ANY(sqlc.arg(job_ids)::text[]); -- name: UpdateJobPriorityById :exec -UPDATE jobs SET priority = $1 WHERE queue = sqlc.arg(queue) and job_set = sqlc.arg(job_set) and job_id = ANY(sqlc.arg(job_ids)::text[]); +UPDATE jobs SET priority = $1 WHERE queue = sqlc.arg(queue) and job_set = sqlc.arg(job_set) and job_id = ANY(sqlc.arg(job_ids)::text[]) and cancelled = false and succeeded = false and failed = false; -- name: SelectInitialRuns :many SELECT * FROM runs WHERE serial > $1 AND job_id = ANY(sqlc.arg(job_ids)::text[]) ORDER BY serial LIMIT $2; @@ -44,7 +44,7 @@ SELECT run_id FROM runs; SELECT * FROM runs WHERE serial > $1 AND job_id = ANY(sqlc.arg(job_ids)::text[]) ORDER BY serial; -- name: MarkJobRunsPreemptRequestedByJobId :exec -UPDATE runs SET preempt_requested = true WHERE queue = sqlc.arg(queue) and job_set = sqlc.arg(job_set) and job_id = ANY(sqlc.arg(job_ids)::text[]); +UPDATE runs SET preempt_requested = true WHERE queue = sqlc.arg(queue) and job_set = sqlc.arg(job_set) and job_id = ANY(sqlc.arg(job_ids)::text[]) and cancelled = false and succeeded = false and failed = false; -- name: MarkJobRunsSucceededById :exec UPDATE runs SET succeeded = true WHERE run_id = ANY(sqlc.arg(run_ids)::text[]); diff --git a/internal/scheduler/floatingresources/floating_resource_types.go b/internal/scheduler/floatingresources/floating_resource_types.go index 3b023e0c891..e56dea92553 100644 --- a/internal/scheduler/floatingresources/floating_resource_types.go +++ b/internal/scheduler/floatingresources/floating_resource_types.go @@ -10,55 +10,55 @@ import ( "github.com/armadaproject/armada/internal/common/maps" "github.com/armadaproject/armada/internal/scheduler/configuration" "github.com/armadaproject/armada/internal/scheduler/internaltypes" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) type FloatingResourceTypes struct { - zeroFloatingResources schedulerobjects.ResourceList - pools map[string]*floatingResourcePool - rlFactory *internaltypes.ResourceListFactory + floatingResourceLimitsByPool map[string]internaltypes.ResourceList } -type floatingResourcePool struct { - totalResources schedulerobjects.ResourceList +func NewFloatingResourceTypes(config []configuration.FloatingResourceConfig, rlFactory *internaltypes.ResourceListFactory) (*FloatingResourceTypes, error) { + err := validate(config) + if err != nil { + return nil, err + } + + floatingResourceLimitsByPool := map[string]internaltypes.ResourceList{} + for _, fr := range config { + for _, poolConfig := range fr.Pools { + floatingResourceLimitsByPool[poolConfig.Name] = floatingResourceLimitsByPool[poolConfig.Name].Add( + rlFactory.FromNodeProto(map[string]resource.Quantity{fr.Name: poolConfig.Quantity}), + ) + } + } + + return &FloatingResourceTypes{ + floatingResourceLimitsByPool: floatingResourceLimitsByPool, + }, nil } -func NewFloatingResourceTypes(config []configuration.FloatingResourceConfig, rlFactory *internaltypes.ResourceListFactory) (*FloatingResourceTypes, error) { - zeroFloatingResources := schedulerobjects.ResourceList{Resources: make(map[string]resource.Quantity, len(config))} +func validate(config []configuration.FloatingResourceConfig) error { + floatingResourceNamesSeen := map[string]bool{} for _, c := range config { - if _, exists := zeroFloatingResources.Resources[c.Name]; exists { - return nil, fmt.Errorf("duplicate floating resource %s", c.Name) + if _, exists := floatingResourceNamesSeen[c.Name]; exists { + return fmt.Errorf("duplicate floating resource %s", c.Name) } - zeroFloatingResources.Resources[c.Name] = resource.Quantity{} + floatingResourceNamesSeen[c.Name] = true } - pools := map[string]*floatingResourcePool{} for _, fr := range config { + poolNamesSeen := map[string]bool{} for _, poolConfig := range fr.Pools { - pool, exists := pools[poolConfig.Name] - if !exists { - pool = &floatingResourcePool{ - totalResources: zeroFloatingResources.DeepCopy(), - } - pools[poolConfig.Name] = pool - } - existing := pool.totalResources.Resources[fr.Name] - if existing.Cmp(resource.Quantity{}) != 0 { - return nil, fmt.Errorf("duplicate floating resource %s for pool %s", fr.Name, poolConfig.Name) + if _, exists := poolNamesSeen[poolConfig.Name]; exists { + return fmt.Errorf("floating resource %s has duplicate pool %s", fr.Name, poolConfig.Name) } - pool.totalResources.Resources[fr.Name] = poolConfig.Quantity.DeepCopy() + poolNamesSeen[poolConfig.Name] = true } } - - return &FloatingResourceTypes{ - zeroFloatingResources: zeroFloatingResources, - pools: pools, - rlFactory: rlFactory, - }, nil + return nil } func (frt *FloatingResourceTypes) WithinLimits(poolName string, allocated internaltypes.ResourceList) (bool, string) { - available := frt.GetTotalAvailableForPoolInternalTypes(poolName) + available := frt.GetTotalAvailableForPool(poolName) if available.AllZero() { return false, fmt.Sprintf("floating resources not connfigured for pool %s", poolName) } @@ -72,26 +72,38 @@ func (frt *FloatingResourceTypes) WithinLimits(poolName string, allocated intern } func (frt *FloatingResourceTypes) AllPools() []string { - result := maps.Keys(frt.pools) + result := maps.Keys(frt.floatingResourceLimitsByPool) slices.Sort(result) return result } -func (frt *FloatingResourceTypes) GetTotalAvailableForPool(poolName string) schedulerobjects.ResourceList { - pool, exists := frt.pools[poolName] - if !exists { - return frt.zeroFloatingResources.DeepCopy() +func (frt *FloatingResourceTypes) GetTotalAvailableForPool(poolName string) internaltypes.ResourceList { + limits, ok := frt.floatingResourceLimitsByPool[poolName] + if !ok { + return internaltypes.ResourceList{} } - return pool.totalResources.DeepCopy() + return limits } -func (frt *FloatingResourceTypes) GetTotalAvailableForPoolInternalTypes(poolName string) internaltypes.ResourceList { - return frt.rlFactory.FromNodeProto(frt.GetTotalAvailableForPool(poolName).Resources) +func (frt *FloatingResourceTypes) GetTotalAvailableForPoolAsMap(poolName string) map[string]resource.Quantity { + limits := frt.GetTotalAvailableForPool(poolName) + result := map[string]resource.Quantity{} + for _, res := range limits.GetResources() { + if res.Type != internaltypes.Floating { + continue + } + result[res.Name] = res.Value + } + return result } func (frt *FloatingResourceTypes) SummaryString() string { - if len(frt.zeroFloatingResources.Resources) == 0 { + if len(frt.floatingResourceLimitsByPool) == 0 { return "none" } - return strings.Join(maps.Keys(frt.zeroFloatingResources.Resources), " ") + poolSummaries := []string{} + for _, poolName := range frt.AllPools() { + poolSummaries = append(poolSummaries, fmt.Sprintf("%s: (%s)", poolName, frt.floatingResourceLimitsByPool[poolName])) + } + return strings.Join(poolSummaries, " ") } diff --git a/internal/scheduler/floatingresources/floating_resource_types_test.go b/internal/scheduler/floatingresources/floating_resource_types_test.go index 76ce183b624..4bb5e82fb63 100644 --- a/internal/scheduler/floatingresources/floating_resource_types_test.go +++ b/internal/scheduler/floatingresources/floating_resource_types_test.go @@ -15,28 +15,86 @@ func TestAllPools(t *testing.T) { assert.Equal(t, []string{"cpu", "gpu"}, sut.AllPools()) } -func TestGetTotalAvailableForPool(t *testing.T) { - sut := makeSut(t, makeRlFactory()) - zero := resource.Quantity{} - assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": resource.MustParse("200"), "floating-resource-2": resource.MustParse("300")}, sut.GetTotalAvailableForPool("cpu").Resources) - assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": resource.MustParse("100"), "floating-resource-2": zero}, sut.GetTotalAvailableForPool("gpu").Resources) - assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": zero, "floating-resource-2": zero}, sut.GetTotalAvailableForPool("some-other-pool").Resources) +func TestNewFloatingResourceTypes_ErrorsOnDuplicateFloatingResource(t *testing.T) { + cfg := []configuration.FloatingResourceConfig{ + { + Name: "floating-resource-1", + Pools: []configuration.FloatingResourcePoolConfig{ + { + Name: "cpu", + Quantity: resource.MustParse("200"), + }, + }, + }, + { + Name: "floating-resource-1", + Pools: []configuration.FloatingResourcePoolConfig{ + { + Name: "gpu", + Quantity: resource.MustParse("300"), + }, + }, + }, + } + + frt, err := NewFloatingResourceTypes(cfg, makeRlFactory()) + assert.Nil(t, frt) + assert.NotNil(t, err) } -func TestGetTotalAvailableForPoolInternalTypes(t *testing.T) { +func TestNewFloatingResourceTypes_ErrorsOnDuplicatePool(t *testing.T) { + cfg := []configuration.FloatingResourceConfig{ + { + Name: "floating-resource-1", + Pools: []configuration.FloatingResourcePoolConfig{ + { + Name: "cpu", + Quantity: resource.MustParse("200"), + }, { + Name: "cpu", + Quantity: resource.MustParse("200"), + }, + }, + }, + } + + frt, err := NewFloatingResourceTypes(cfg, makeRlFactory()) + assert.Nil(t, frt) + assert.NotNil(t, err) +} + +func TestGetTotalAvailableForPool(t *testing.T) { sut := makeSut(t, makeRlFactory()) - cpuPool := sut.GetTotalAvailableForPoolInternalTypes("cpu") + cpuPool := sut.GetTotalAvailableForPool("cpu") assert.Equal(t, int64(200000), cpuPool.GetByNameZeroIfMissing("floating-resource-1")) assert.Equal(t, int64(300000), cpuPool.GetByNameZeroIfMissing("floating-resource-2")) - gpuPool := sut.GetTotalAvailableForPoolInternalTypes("gpu") + gpuPool := sut.GetTotalAvailableForPool("gpu") assert.Equal(t, int64(100000), gpuPool.GetByNameZeroIfMissing("floating-resource-1")) assert.Equal(t, int64(0), gpuPool.GetByNameZeroIfMissing("floating-resource-2")) - notFound := sut.GetTotalAvailableForPoolInternalTypes("some-invalid-value") - assert.Equal(t, int64(0), notFound.GetByNameZeroIfMissing("floating-resource-1")) - assert.Equal(t, int64(0), notFound.GetByNameZeroIfMissing("floating-resource-2")) + notFound := sut.GetTotalAvailableForPool("some-invalid-value") + assert.True(t, notFound.IsEmpty()) +} + +func TestGetTotalAvailableForPoolAsMap(t *testing.T) { + sut := makeSut(t, makeRlFactory()) + + cpuPool := sut.GetTotalAvailableForPoolAsMap("cpu") + assert.Equal(t, map[string]resource.Quantity{ + "floating-resource-1": *resource.NewMilliQuantity(200000, resource.DecimalSI), + "floating-resource-2": *resource.NewMilliQuantity(300000, resource.DecimalSI), + }, cpuPool) + + gpuPool := sut.GetTotalAvailableForPoolAsMap("gpu") + assert.Equal(t, map[string]resource.Quantity{ + "floating-resource-1": *resource.NewMilliQuantity(100000, resource.DecimalSI), + "floating-resource-2": *resource.NewMilliQuantity(0, resource.DecimalSI), + }, gpuPool) + + notFound := sut.GetTotalAvailableForPoolAsMap("some-invalid-value") + assert.Equal(t, map[string]resource.Quantity{}, notFound) } func TestWithinLimits_WhenWithinLimits_ReturnsTrue(t *testing.T) { diff --git a/internal/scheduler/internaltypes/node_factory.go b/internal/scheduler/internaltypes/node_factory.go index 8d4526a7b2b..148ede62909 100644 --- a/internal/scheduler/internaltypes/node_factory.go +++ b/internal/scheduler/internaltypes/node_factory.go @@ -1,6 +1,8 @@ package internaltypes import ( + "fmt" + "github.com/armadaproject/armada/internal/common/util" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) @@ -51,3 +53,22 @@ func (f *NodeFactory) FromSchedulerObjectsNode(node *schedulerobjects.Node) (*No f.resourceListFactory, ) } + +func (f *NodeFactory) FromSchedulerObjectsExecutors(executors []*schedulerobjects.Executor, errorLogger func(string)) []*Node { + result := []*Node{} + for _, executor := range executors { + for _, node := range executor.GetNodes() { + if executor.Id != node.Executor { + errorLogger(fmt.Sprintf("Executor name mismatch: %q != %q", node.Executor, executor.Id)) + continue + } + itNode, err := f.FromSchedulerObjectsNode(node) + if err != nil { + errorLogger(fmt.Sprintf("Invalid node %s: %v", node.Name, err)) + continue + } + result = append(result, itNode) + } + } + return result +} diff --git a/internal/scheduler/internaltypes/resource_list.go b/internal/scheduler/internaltypes/resource_list.go index 63acf649eff..e4d05b5d642 100644 --- a/internal/scheduler/internaltypes/resource_list.go +++ b/internal/scheduler/internaltypes/resource_list.go @@ -24,10 +24,11 @@ type ResourceList struct { } type Resource struct { - Name string - Value int64 - Scale k8sResource.Scale - Type ResourceType + Name string + RawValue int64 + Value k8sResource.Quantity + Scale k8sResource.Scale + Type ResourceType } func (rl ResourceList) Equal(other ResourceList) bool { @@ -87,7 +88,7 @@ func (rl ResourceList) GetResourceByNameZeroIfMissing(name string) k8sResource.Q return k8sResource.Quantity{} } - return *k8sResource.NewScaledQuantity(rl.resources[index], rl.factory.scales[index]) + return *rl.asQuantity(index) } func (rl ResourceList) GetResources() []Resource { @@ -98,10 +99,11 @@ func (rl ResourceList) GetResources() []Resource { result := make([]Resource, len(rl.resources)) for i, q := range rl.resources { result[i] = Resource{ - Name: rl.factory.indexToName[i], - Value: q, - Scale: rl.factory.scales[i], - Type: rl.factory.types[i], + Name: rl.factory.indexToName[i], + RawValue: q, + Value: *rl.asQuantity(i), + Scale: rl.factory.scales[i], + Type: rl.factory.types[i], } } return result diff --git a/internal/scheduler/internaltypes/resource_list_test.go b/internal/scheduler/internaltypes/resource_list_test.go index 4f3657c4e04..0775589bb80 100644 --- a/internal/scheduler/internaltypes/resource_list_test.go +++ b/internal/scheduler/internaltypes/resource_list_test.go @@ -83,13 +83,18 @@ func TestGetResources(t *testing.T) { a := testResourceList(factory, "1", "1Gi") expected := []Resource{ - {Name: "memory", Value: 1024 * 1024 * 1024, Scale: k8sResource.Scale(0), Type: Kubernetes}, - {Name: "ephemeral-storage", Value: 0, Scale: k8sResource.Scale(0), Type: Kubernetes}, - {Name: "cpu", Value: 1000, Scale: k8sResource.Milli, Type: Kubernetes}, - {Name: "nvidia.com/gpu", Value: 0, Scale: k8sResource.Milli, Type: Kubernetes}, - {Name: "external-storage-connections", Value: 0, Scale: 0, Type: Floating}, - {Name: "external-storage-bytes", Value: 0, Scale: 0, Type: Floating}, + {Name: "memory", RawValue: 1024 * 1024 * 1024, Scale: k8sResource.Scale(0), Type: Kubernetes}, + {Name: "ephemeral-storage", RawValue: 0, Scale: k8sResource.Scale(0), Type: Kubernetes}, + {Name: "cpu", RawValue: 1000, Scale: k8sResource.Milli, Type: Kubernetes}, + {Name: "nvidia.com/gpu", RawValue: 0, Scale: k8sResource.Milli, Type: Kubernetes}, + {Name: "external-storage-connections", RawValue: 0, Scale: 0, Type: Floating}, + {Name: "external-storage-bytes", RawValue: 0, Scale: 0, Type: Floating}, } + + for i, r := range expected { + expected[i].Value = *k8sResource.NewScaledQuantity(r.RawValue, r.Scale) + } + assert.Equal(t, expected, a.GetResources()) } diff --git a/internal/scheduler/jobdb/job.go b/internal/scheduler/jobdb/job.go index f7b9fc4226f..f9f295e8d05 100644 --- a/internal/scheduler/jobdb/job.go +++ b/internal/scheduler/jobdb/job.go @@ -503,7 +503,7 @@ func (job *Job) Tolerations() []v1.Toleration { // ResourceRequirements returns the resource requirements of the Job // KubernetesResourceRequirements below is preferred -func (job *Job) ResourceRequirements() v1.ResourceRequirements { +func (job *Job) resourceRequirements() v1.ResourceRequirements { if req := job.PodRequirements(); req != nil { return req.ResourceRequirements } @@ -831,7 +831,7 @@ func SchedulingKeyFromJob(skg *schedulerobjects.SchedulingKeyGenerator, job *Job job.NodeSelector(), job.Affinity(), job.Tolerations(), - job.ResourceRequirements().Requests, + job.resourceRequirements().Requests, job.PriorityClassName(), ) } diff --git a/internal/scheduler/metrics.go b/internal/scheduler/metrics.go index 43d9bbe21b2..af9d932fe64 100644 --- a/internal/scheduler/metrics.go +++ b/internal/scheduler/metrics.go @@ -288,12 +288,32 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p } txn := c.jobDb.ReadTxn() + + for _, executorSetting := range executorSettings { + if executorSetting.Cordoned { + // We may have settings for executors that don't exist in the repository. + cordonedStatusByCluster[executorSetting.ExecutorId] = &clusterCordonedStatus{ + status: 1.0, + reason: executorSetting.CordonReason, + setByUser: executorSetting.SetByUser, + } + } else { + cordonedStatusByCluster[executorSetting.ExecutorId] = &clusterCordonedStatus{ + status: 0.0, + reason: executorSetting.CordonReason, + setByUser: executorSetting.SetByUser, + } + } + } + for _, executor := range executors { - // We may not have executorSettings for all known executors, but we still want a cordon status metric for them. - cordonedStatusByCluster[executor.Id] = &clusterCordonedStatus{ - status: 0.0, - reason: "", - setByUser: "", + if _, statusExists := cordonedStatusByCluster[executor.Id]; !statusExists { + // We may not have executorSettings for all known executors, but we still want a cordon status metric for them. + cordonedStatusByCluster[executor.Id] = &clusterCordonedStatus{ + status: 0.0, + reason: "", + setByUser: "", + } } for _, node := range executor.Nodes { nodePool := node.GetPool() @@ -305,6 +325,10 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p nodeType: node.ReportingNodeType, } + if _, ok := schedulableNodeCountByCluster[clusterKey]; !ok { + schedulableNodeCountByCluster[clusterKey] = 0 + } + awayClusterKeys := make([]clusterMetricKey, 0, len(awayPools)) for _, ap := range awayPools { awayClusterKeys = append(awayClusterKeys, clusterMetricKey{ @@ -314,14 +338,20 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p }) } - if !node.Unschedulable { - addToResourceListMap(availableResourceByCluster, clusterKey, node.AvailableArmadaResource()) + nodeResources := node.AvailableArmadaResource() + + if !node.Unschedulable && cordonedStatusByCluster[executor.Id].status != 1.0 { schedulableNodeCountByCluster[clusterKey]++ + } else { + // We still want to publish these metrics, just with zeroed values + nodeResources.Zero() + } - // Add available resources to the away cluster pool - for _, awayClusterKey := range awayClusterKeys { - addToResourceListMap(availableResourceByCluster, awayClusterKey, node.AvailableArmadaResource()) - } + addToResourceListMap(availableResourceByCluster, clusterKey, nodeResources) + + // Add available resources to the away cluster pool + for _, awayClusterKey := range awayClusterKeys { + addToResourceListMap(availableResourceByCluster, awayClusterKey, nodeResources) } totalNodeCountByCluster[clusterKey]++ @@ -384,25 +414,8 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p } } - for _, executorSetting := range executorSettings { - if executorSetting.Cordoned { - if cordonedValue, ok := cordonedStatusByCluster[executorSetting.ExecutorId]; ok { - cordonedValue.status = 1.0 - cordonedValue.reason = executorSetting.CordonReason - cordonedValue.setByUser = executorSetting.SetByUser - } else { - // We may have settings for executors that don't exist in the repository. - cordonedStatusByCluster[executorSetting.ExecutorId] = &clusterCordonedStatus{ - status: 1.0, - reason: executorSetting.CordonReason, - setByUser: executorSetting.SetByUser, - } - } - } - } - for _, pool := range c.floatingResourceTypes.AllPools() { - totalFloatingResources := c.floatingResourceTypes.GetTotalAvailableForPool(pool) + totalFloatingResources := schedulerobjects.ResourceList{Resources: c.floatingResourceTypes.GetTotalAvailableForPoolAsMap(pool)} clusterKey := clusterMetricKey{ cluster: "floating", pool: pool, diff --git a/internal/scheduler/metrics/cycle_metrics.go b/internal/scheduler/metrics/cycle_metrics.go index b6fdfa22a91..c2f32ad60e7 100644 --- a/internal/scheduler/metrics/cycle_metrics.go +++ b/internal/scheduler/metrics/cycle_metrics.go @@ -302,7 +302,7 @@ func (m *cycleMetrics) ReportSchedulerResult(result scheduling.SchedulerResult) m.evictedJobs.WithLabelValues(pool, queue).Set(float64(s.EvictedJobCount)) for _, r := range s.EvictedResources.GetResources() { - m.evictedResources.WithLabelValues(pool, queue, r.Name).Set(float64(r.Value)) + m.evictedResources.WithLabelValues(pool, queue, r.Name).Set(float64(r.RawValue)) } } } diff --git a/internal/scheduler/metrics/state_metrics.go b/internal/scheduler/metrics/state_metrics.go index 3e25deb892b..faa10d50901 100644 --- a/internal/scheduler/metrics/state_metrics.go +++ b/internal/scheduler/metrics/state_metrics.go @@ -209,7 +209,7 @@ func (m *jobStateMetrics) updateStateDuration(job *jobdb.Job, state string, prio } queue := job.Queue() - requests := job.ResourceRequirements().Requests + requests := job.AllResourceRequirements() latestRun := job.LatestRun() pool := "" node := "" @@ -236,7 +236,7 @@ func (m *jobStateMetrics) updateStateDuration(job *jobdb.Job, state string, prio // Resource Seconds for _, res := range m.trackedResourceNames { - resQty := requests[res] + resQty := requests.GetResourceByNameZeroIfMissing(string(res)) resSeconds := duration * float64(resQty.MilliValue()) / 1000 m.jobStateResourceSecondsByQueue. WithLabelValues(queue, pool, state, priorState, res.String()).Add(resSeconds) diff --git a/internal/scheduler/metrics_test.go b/internal/scheduler/metrics_test.go index d576713618f..772101890cc 100644 --- a/internal/scheduler/metrics_test.go +++ b/internal/scheduler/metrics_test.go @@ -308,9 +308,9 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) { jobDbJobs: []*jobdb.Job{}, executors: []*schedulerobjects.Executor{executor}, expected: []prometheus.Metric{ - commonmetrics.NewClusterAvailableCapacity(64, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), - commonmetrics.NewClusterAvailableCapacity(512*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"), - commonmetrics.NewClusterAvailableCapacity(2, "cluster-1", testfixtures.TestPool, "nodes", "type-1"), + commonmetrics.NewClusterAvailableCapacity(0.0, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), + commonmetrics.NewClusterAvailableCapacity(0.0, "cluster-1", testfixtures.TestPool, "memory", "type-1"), + commonmetrics.NewClusterAvailableCapacity(0.0, "cluster-1", testfixtures.TestPool, "nodes", "type-1"), commonmetrics.NewClusterTotalCapacity(64, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), commonmetrics.NewClusterTotalCapacity(512*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"), commonmetrics.NewClusterTotalCapacity(2, "cluster-1", testfixtures.TestPool, "nodes", "type-1"), @@ -387,17 +387,19 @@ func TestMetricsCollector_TestCollect_ClusterMetricsAvailableCapacity(t *testing node.StateByJobRunId[job.LatestRun().Id()] = schedulerobjects.JobRunState_RUNNING tests := map[string]struct { - poolConfig []configuration.PoolConfig - runningJobs []*jobdb.Job - nodes []*schedulerobjects.Node - expected []prometheus.Metric + poolConfig []configuration.PoolConfig + runningJobs []*jobdb.Job + nodes []*schedulerobjects.Node + executorSettings []*schedulerobjects.ExecutorSettings + expected []prometheus.Metric }{ "No away pools": { poolConfig: []configuration.PoolConfig{ {Name: testfixtures.TestPool}, }, - runningJobs: []*jobdb.Job{job}, - nodes: []*schedulerobjects.Node{node}, + runningJobs: []*jobdb.Job{job}, + nodes: []*schedulerobjects.Node{node}, + executorSettings: []*schedulerobjects.ExecutorSettings{}, expected: []prometheus.Metric{ commonmetrics.NewClusterAvailableCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), commonmetrics.NewClusterTotalCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), @@ -413,8 +415,9 @@ func TestMetricsCollector_TestCollect_ClusterMetricsAvailableCapacity(t *testing AwayPools: []string{testfixtures.TestPool}, }, }, - runningJobs: []*jobdb.Job{job}, - nodes: []*schedulerobjects.Node{node}, + runningJobs: []*jobdb.Job{job}, + nodes: []*schedulerobjects.Node{node}, + executorSettings: []*schedulerobjects.ExecutorSettings{}, expected: []prometheus.Metric{ commonmetrics.NewClusterAvailableCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), commonmetrics.NewClusterTotalCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), @@ -422,6 +425,24 @@ func TestMetricsCollector_TestCollect_ClusterMetricsAvailableCapacity(t *testing commonmetrics.NewClusterTotalCapacity(31, "cluster-1", testfixtures.TestPool2, "cpu", "type-1"), }, }, + "Cordoned cluster": { + poolConfig: []configuration.PoolConfig{ + {Name: testfixtures.TestPool}, + }, + runningJobs: []*jobdb.Job{job}, + nodes: []*schedulerobjects.Node{node}, + executorSettings: []*schedulerobjects.ExecutorSettings{ + { + ExecutorId: "cluster-1", + Cordoned: true, + CordonReason: "bad executor", + }, + }, + expected: []prometheus.Metric{ + commonmetrics.NewClusterAvailableCapacity(0, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), + commonmetrics.NewClusterTotalCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), + }, + }, } for name, tc := range tests { t.Run(name, func(t *testing.T) { @@ -446,7 +467,7 @@ func TestMetricsCollector_TestCollect_ClusterMetricsAvailableCapacity(t *testing executorRepository := schedulermocks.NewMockExecutorRepository(ctrl) executorRepository.EXPECT().GetExecutors(ctx).Return(executors, nil) - executorRepository.EXPECT().GetExecutorSettings(ctx).Return([]*schedulerobjects.ExecutorSettings{}, nil) + executorRepository.EXPECT().GetExecutorSettings(ctx).Return(tc.executorSettings, nil) collector := NewMetricsCollector( jobDb, diff --git a/internal/scheduler/nodedb/nodedb.go b/internal/scheduler/nodedb/nodedb.go index 336ca0de314..ba74a66fc8d 100644 --- a/internal/scheduler/nodedb/nodedb.go +++ b/internal/scheduler/nodedb/nodedb.go @@ -340,7 +340,6 @@ func (nodeDb *NodeDb) ScheduleManyWithTxn(txn *memdb.Txn, gctx *context.GangSche // order to find the best fit for this gang); clear out any remnants of // previous attempts. jctx.UnschedulableReason = "" - jctx.PreemptingJobId = "" node, err := nodeDb.SelectNodeForJobWithTxn(txn, jctx) if err != nil { diff --git a/internal/scheduler/nodedb/nodedb_test.go b/internal/scheduler/nodedb/nodedb_test.go index 1f23597541b..3ace44245c4 100644 --- a/internal/scheduler/nodedb/nodedb_test.go +++ b/internal/scheduler/nodedb/nodedb_test.go @@ -12,7 +12,6 @@ import ( armadamaps "github.com/armadaproject/armada/internal/common/maps" "github.com/armadaproject/armada/internal/common/util" - "github.com/armadaproject/armada/internal/scheduler/adapters" schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration" "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" @@ -132,8 +131,6 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) { jobFilter := func(job *jobdb.Job) bool { return true } job := testfixtures.Test1GpuJob("A", testfixtures.PriorityClass0) request := job.KubernetesResourceRequirements() - requestInternalRl, err := nodeDb.resourceListFactory.FromJobResourceListFailOnUnknown(adapters.K8sResourceListToMap(job.ResourceRequirements().Requests)) - assert.Nil(t, err) jobId := job.Id() @@ -177,14 +174,14 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) { assert.True( t, armadamaps.DeepEqual( - map[string]internaltypes.ResourceList{jobId: requestInternalRl}, + map[string]internaltypes.ResourceList{jobId: request}, boundNode.AllocatedByJobId, ), ) assert.True( t, armadamaps.DeepEqual( - map[string]internaltypes.ResourceList{jobId: requestInternalRl}, + map[string]internaltypes.ResourceList{jobId: request}, evictedNode.AllocatedByJobId, ), ) diff --git a/internal/scheduler/scheduling/context/job.go b/internal/scheduler/scheduling/context/job.go index 2b96b335f1b..e92167c3275 100644 --- a/internal/scheduler/scheduling/context/job.go +++ b/internal/scheduler/scheduling/context/job.go @@ -218,10 +218,10 @@ func PrintJobSummary(ctx *armadacontext.Context, prefix string, jctxs []*JobSche ) resourcesByQueue := armadamaps.MapValues( jobsByQueue, - func(jobs []*jobdb.Job) schedulerobjects.ResourceList { - rv := schedulerobjects.NewResourceListWithDefaultSize() + func(jobs []*jobdb.Job) internaltypes.ResourceList { + rv := internaltypes.ResourceList{} for _, job := range jobs { - rv.AddV1ResourceList(job.ResourceRequirements().Requests) + rv = rv.Add(job.AllResourceRequirements()) } return rv }, @@ -247,8 +247,8 @@ func PrintJobSummary(ctx *armadacontext.Context, prefix string, jctxs []*JobSche maps.Keys(jobsByQueue), armadamaps.MapValues( resourcesByQueue, - func(rl schedulerobjects.ResourceList) string { - return rl.CompactString() + func(rl internaltypes.ResourceList) string { + return rl.String() }, ), jobCountPerQueue, diff --git a/internal/scheduler/scheduling/preempting_queue_scheduler_test.go b/internal/scheduler/scheduling/preempting_queue_scheduler_test.go index eb0a3bb05ad..f8fdbac4ce8 100644 --- a/internal/scheduler/scheduling/preempting_queue_scheduler_test.go +++ b/internal/scheduler/scheduling/preempting_queue_scheduler_test.go @@ -76,7 +76,7 @@ func TestEvictOversubscribed(t *testing.T) { for nodeId, node := range result.AffectedNodesById { for _, p := range priorities { for _, r := range node.AllocatableByPriority[p].GetResources() { - assert.True(t, r.Value >= 0, "resource %s oversubscribed by %d on node %s", r.Name, r.Value, nodeId) + assert.True(t, r.RawValue >= 0, "resource %s oversubscribed by %d on node %s", r.Name, r.RawValue, nodeId) } } } @@ -1920,7 +1920,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { // Accounting across scheduling rounds. roundByJobId := make(map[string]int) indexByJobId := make(map[string]int) - allocatedByQueueAndPriorityClass := make(map[string]schedulerobjects.QuantityByTAndResourceType[string]) + allocatedByQueueAndPriorityClass := make(map[string]map[string]internaltypes.ResourceList) nodeIdByJobId := make(map[string]string) var jobIdsByGangId map[string]map[string]bool var gangIdByJobId map[string]string @@ -1941,7 +1941,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { ) } - demandByQueue := map[string]schedulerobjects.ResourceList{} + demandByQueue := map[string]internaltypes.ResourceList{} // Run the scheduler. cordonedNodes := map[int]bool{} @@ -1978,12 +1978,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { queuedJobs = append(queuedJobs, job.WithQueued(true)) roundByJobId[job.Id()] = i indexByJobId[job.Id()] = j - r, ok := demandByQueue[job.Queue()] - if !ok { - r = schedulerobjects.NewResourceList(len(job.PodRequirements().ResourceRequirements.Requests)) - demandByQueue[job.Queue()] = r - } - r.AddV1ResourceList(job.PodRequirements().ResourceRequirements.Requests) + demandByQueue[job.Queue()] = demandByQueue[job.Queue()].Add(job.AllResourceRequirements()) } } err = jobDbTxn.Upsert(queuedJobs) @@ -2005,12 +2000,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { delete(gangIdByJobId, job.Id()) delete(jobIdsByGangId[gangId], job.Id()) } - r, ok := demandByQueue[job.Queue()] - if !ok { - r = schedulerobjects.NewResourceList(len(job.PodRequirements().ResourceRequirements.Requests)) - demandByQueue[job.Queue()] = r - } - r.SubV1ResourceList(job.PodRequirements().ResourceRequirements.Requests) + demandByQueue[job.Queue()] = demandByQueue[job.Queue()].Subtract(job.AllResourceRequirements()) } } } @@ -2049,11 +2039,11 @@ func TestPreemptingQueueScheduler(t *testing.T) { for queue, priorityFactor := range tc.PriorityFactorByQueue { weight := 1 / priorityFactor - queueDemand := testfixtures.TestResourceListFactory.FromJobResourceListIgnoreUnknown(demandByQueue[queue].Resources) + queueDemand := demandByQueue[queue] err := sctx.AddQueueSchedulingContext( queue, weight, - internaltypes.RlMapFromJobSchedulerObjects(allocatedByQueueAndPriorityClass[queue], testfixtures.TestResourceListFactory), + allocatedByQueueAndPriorityClass[queue], queueDemand, queueDemand, limiterByQueue[queue], @@ -2092,28 +2082,22 @@ func TestPreemptingQueueScheduler(t *testing.T) { job := jctx.Job m := allocatedByQueueAndPriorityClass[job.Queue()] if m == nil { - m = make(schedulerobjects.QuantityByTAndResourceType[string]) + m = make(map[string]internaltypes.ResourceList) allocatedByQueueAndPriorityClass[job.Queue()] = m } - m.SubV1ResourceList( - job.PriorityClassName(), - job.ResourceRequirements().Requests, - ) + m[job.PriorityClassName()] = m[job.PriorityClassName()].Subtract(job.AllResourceRequirements()) } for _, jctx := range result.ScheduledJobs { job := jctx.Job m := allocatedByQueueAndPriorityClass[job.Queue()] if m == nil { - m = make(schedulerobjects.QuantityByTAndResourceType[string]) + m = make(map[string]internaltypes.ResourceList) allocatedByQueueAndPriorityClass[job.Queue()] = m } - m.AddV1ResourceList( - job.PriorityClassName(), - job.ResourceRequirements().Requests, - ) + m[job.PriorityClassName()] = m[job.PriorityClassName()].Add(job.AllResourceRequirements()) } for queue, qctx := range sctx.QueueSchedulingContexts { - m := internaltypes.RlMapFromJobSchedulerObjects(allocatedByQueueAndPriorityClass[queue], testfixtures.TestResourceListFactory) + m := allocatedByQueueAndPriorityClass[queue] assert.Equal(t, internaltypes.RlMapRemoveZeros(m), internaltypes.RlMapRemoveZeros(qctx.AllocatedByPriorityClass)) } @@ -2202,7 +2186,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { for node := it.NextNode(); node != nil; node = it.NextNode() { for _, p := range priorities { for _, r := range node.AllocatableByPriority[p].GetResources() { - assert.True(t, r.Value >= 0, "resource %s oversubscribed by %d on node %s", r.Name, r.Value, node.GetId()) + assert.True(t, r.RawValue >= 0, "resource %s oversubscribed by %d on node %s", r.Name, r.RawValue, node.GetId()) } } } diff --git a/internal/scheduler/scheduling/scheduling_algo.go b/internal/scheduler/scheduling/scheduling_algo.go index 01ad8e37773..0dad9c8061f 100644 --- a/internal/scheduler/scheduling/scheduling_algo.go +++ b/internal/scheduler/scheduling/scheduling_algo.go @@ -129,7 +129,7 @@ func (l *FairSchedulingAlgo) Schedule( ctx.Infof("Scheduling on pool %s with capacity %s %s", pool, fsctx.nodeDb.TotalKubernetesResources().String(), - l.floatingResourceTypes.GetTotalAvailableForPoolInternalTypes(pool.Name).String(), + l.floatingResourceTypes.GetTotalAvailableForPool(pool.Name).String(), ) start := time.Now() @@ -237,21 +237,11 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con } healthyExecutors = l.filterCordonedExecutors(ctx, healthyExecutors, executorSettings) } - nodes := []*internaltypes.Node{} - for _, executor := range healthyExecutors { - for _, node := range executor.Nodes { - if executor.Id != node.Executor { - ctx.Errorf("Executor name mismatch: %q != %q", node.Executor, executor.Id) - continue - } - itNode, err := nodeFactory.FromSchedulerObjectsNode(node) - if err != nil { - ctx.Errorf("Invalid node %s: %v", node.Name, err) - continue - } - nodes = append(nodes, itNode) - } - } + + nodes := nodeFactory.FromSchedulerObjectsExecutors(healthyExecutors, func(errMes string) { + ctx.Error(errMes) + }) + homeJobs := jobSchedulingInfo.jobsByPool[pool.Name] awayJobs := []*jobdb.Job{} @@ -277,7 +267,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con } totalResources := nodeDb.TotalKubernetesResources() - totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPoolInternalTypes(pool.Name)) + totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPool(pool.Name)) schedulingContext, err := l.constructSchedulingContext( pool.Name, @@ -528,7 +518,7 @@ func (l *FairSchedulingAlgo) SchedulePool( pool string, ) (*SchedulerResult, *schedulercontext.SchedulingContext, error) { totalResources := fsctx.nodeDb.TotalKubernetesResources() - totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPoolInternalTypes(pool)) + totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPool(pool)) constraints := schedulerconstraints.NewSchedulingConstraints(pool, totalResources, l.schedulingConfig, maps.Values(fsctx.queues)) diff --git a/internal/scheduler/simulator/simulator.go b/internal/scheduler/simulator/simulator.go index dec3c5d406d..c6b74f21458 100644 --- a/internal/scheduler/simulator/simulator.go +++ b/internal/scheduler/simulator/simulator.go @@ -235,7 +235,7 @@ func (s *Simulator) Run(ctx *armadacontext.Context) error { } if time.Now().Unix()-lastLogTime.Unix() >= 5 { ctx.Infof("Simulator time %s", s.time) - lastLogTime = s.time + lastLogTime = time.Now() } if s.time.After(simTerminationTime) { ctx.Infof("Current simulated time (%s) exceeds runtime deadline (%s). Terminating", s.time, simTerminationTime) @@ -465,6 +465,8 @@ func submitJobFromJobTemplate(jobId string, jobTemplate *JobTemplate, gangId str } else { annotations[serverconfig.GangNodeUniformityLabelAnnotation] = "armadaproject.io/clusterName" } + // Make it so gang jobs end at the same time, this means they don't have a distribution currently + jobTemplate.RuntimeDistribution.TailMean = 0 } return &armadaevents.SubmitJob{ diff --git a/internal/scheduler/simulator/sink/job_writer.go b/internal/scheduler/simulator/sink/job_writer.go index ccac68dcace..7f4d029d365 100644 --- a/internal/scheduler/simulator/sink/job_writer.go +++ b/internal/scheduler/simulator/sink/job_writer.go @@ -4,7 +4,6 @@ import ( "os" parquetWriter "github.com/xitongsys/parquet-go/writer" - v1 "k8s.io/api/core/v1" "github.com/armadaproject/armada/internal/common/armadacontext" protoutil "github.com/armadaproject/armada/internal/common/proto" @@ -77,10 +76,10 @@ func (j *JobWriter) createJobRunRow(st *model.StateTransition) ([]*JobRunRow, er associatedJob := jobsList[i] if event.GetCancelledJob() != nil || event.GetJobSucceeded() != nil || event.GetJobRunPreempted() != nil { // Resource requirements - cpuLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceCPU] - memoryLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceMemory] - ephemeralStorageLimit := associatedJob.ResourceRequirements().Requests[v1.ResourceEphemeralStorage] - gpuLimit := associatedJob.ResourceRequirements().Requests["nvidia.com/gpu"] + cpuLimit := associatedJob.AllResourceRequirements().GetResourceByNameZeroIfMissing("cpu") + memoryLimit := associatedJob.AllResourceRequirements().GetResourceByNameZeroIfMissing("memory") + ephemeralStorageLimit := associatedJob.AllResourceRequirements().GetResourceByNameZeroIfMissing("ephemeral-storage") + gpuLimit := associatedJob.AllResourceRequirements().GetResourceByNameZeroIfMissing("nvidia.com/gpu") eventTime := protoutil.ToStdTime(event.Created) rows = append(rows, &JobRunRow{ diff --git a/internal/scheduler/submitcheck.go b/internal/scheduler/submitcheck.go index c94937e02bb..393409632df 100644 --- a/internal/scheduler/submitcheck.go +++ b/internal/scheduler/submitcheck.go @@ -92,13 +92,22 @@ func (srv *SubmitChecker) updateExecutors(ctx *armadacontext.Context) { panic(err) } + nodeFactory := internaltypes.NewNodeFactory( + srv.schedulingConfig.IndexedTaints, + srv.schedulingConfig.IndexedNodeLabels, + srv.resourceListFactory) + executorsByPoolAndId := map[string]map[string]*executor{} for _, ex := range executors { - nodes := ex.GetNodes() - nodesByPool := armadaslices.GroupByFunc(nodes, func(n *schedulerobjects.Node) string { + nodes := nodeFactory.FromSchedulerObjectsExecutors( + []*schedulerobjects.Executor{ex}, + func(s string) { ctx.Error(s) }) + + nodesByPool := armadaslices.GroupByFunc(nodes, func(n *internaltypes.Node) string { return n.GetPool() }) for pool, nodes := range nodesByPool { + nodeDb, err := srv.constructNodeDb(nodes) if _, present := executorsByPoolAndId[pool]; !present { @@ -115,7 +124,6 @@ func (srv *SubmitChecker) updateExecutors(ctx *armadacontext.Context) { WithStacktrace(ctx, err). Warnf("Error constructing nodedb for executor: %s", ex.Id) } - } } srv.state.Store(&schedulerState{ @@ -264,11 +272,7 @@ poolStart: return schedulingResult{isSchedulable: false, reason: sb.String()} } -func (srv *SubmitChecker) constructNodeDb(nodes []*schedulerobjects.Node) (*nodedb.NodeDb, error) { - nodeFactory := internaltypes.NewNodeFactory(srv.schedulingConfig.IndexedTaints, - srv.schedulingConfig.IndexedNodeLabels, - srv.resourceListFactory) - +func (srv *SubmitChecker) constructNodeDb(nodes []*internaltypes.Node) (*nodedb.NodeDb, error) { nodeDb, err := nodedb.NewNodeDb( srv.schedulingConfig.PriorityClasses, srv.schedulingConfig.IndexedResources, @@ -284,11 +288,7 @@ func (srv *SubmitChecker) constructNodeDb(nodes []*schedulerobjects.Node) (*node txn := nodeDb.Txn(true) defer txn.Abort() for _, node := range nodes { - dbNode, err := nodeFactory.FromSchedulerObjectsNode(node) - if err != nil { - return nil, err - } - if err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, dbNode); err != nil { + if err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node); err != nil { return nil, err } } diff --git a/internal/scheduler/submitcheck_test.go b/internal/scheduler/submitcheck_test.go index 4a302b41a93..526ce6f2266 100644 --- a/internal/scheduler/submitcheck_test.go +++ b/internal/scheduler/submitcheck_test.go @@ -257,8 +257,12 @@ func TestSubmitChecker_CheckJobDbJobs(t *testing.T) { } func Executor(nodes ...*schedulerobjects.Node) *schedulerobjects.Executor { + executorId := uuid.NewString() + for _, node := range nodes { + node.Executor = executorId + } return &schedulerobjects.Executor{ - Id: uuid.NewString(), + Id: executorId, Pool: "cpu", Nodes: nodes, } diff --git a/third_party/airflow/pyproject.toml b/third_party/airflow/pyproject.toml index 5073307bab3..a4ae0607679 100644 --- a/third_party/airflow/pyproject.toml +++ b/third_party/airflow/pyproject.toml @@ -4,19 +4,18 @@ build-backend = "setuptools.build_meta" [project] name = "armada_airflow" -version = "1.0.9" +version = "1.0.10" description = "Armada Airflow Operator" readme='README.md' authors = [{name = "Armada-GROSS", email = "armada@armadaproject.io"}] license = { text = "Apache Software License" } dependencies=[ - 'armada-client==0.3.4', + 'armada-client>=0.4.6', 'apache-airflow>=2.6.3', - 'grpcio==1.58.0', - 'grpcio-tools==1.58.0', 'types-protobuf==4.24.0.1', 'kubernetes>=23.6.0', 'kubernetes_asyncio>=24.2.3', + 'opentelemetry-exporter-otlp>=1.28.1' # We want to force dependency upgrade for transitive Airflow dependency ] requires-python=">=3.8" classifiers=[