From 7f7d42cbd6cc7c7dabb20810c1fc25c97eca9cd4 Mon Sep 17 00:00:00 2001 From: Maksym Bruner Date: Tue, 5 Nov 2024 16:34:52 +0800 Subject: [PATCH] feat(batch-predictor): remove CPU limits from SparkApplication resource (#615) # Description Similar to kserve models, this PR removes CPU limits for batch prediction jobs. If limits are set, executors will experience significant throttling because batch jobs tend to consume all available CPU by its nature. # Modifications * SparkApplication resource no longer has `coreLimit` set in driver and executor specs. This field is optional according to [the specs](https://github.com/kubeflow/spark-operator/blob/master/docs/api-docs.md#sparkoperator.k8s.io/v1beta2.SparkPodSpec) and it should be safe to simply remove it. # Tests # Checklist - [x] Added PR label - [x] Added unit test, integration, and/or e2e tests - [x] Tested locally - [ ] Updated documentation - [ ] Update Swagger spec if the PR introduce API changes - [ ] Regenerated Golang and Python client if the PR introduces API changes # Release Notes ```release-note Remove CPU Limits for batch prediction jobs ``` --- api/batch/resource.go | 23 ++++++++--------------- api/batch/resource_test.go | 22 +++------------------- 2 files changed, 11 insertions(+), 34 deletions(-) diff --git a/api/batch/resource.go b/api/batch/resource.go index e7c88ddfc..b8b24dc89 100644 --- a/api/batch/resource.go +++ b/api/batch/resource.go @@ -143,7 +143,7 @@ func (t *BatchJobTemplater) createDriverSpec(job *models.PredictionJob) (v1beta2 } core := getCoreRequest(userCPURequest) - cpuRequest, cpuLimit := getCPURequestAndLimit(userCPURequest) + cpuRequest := getCPURequest(userCPURequest) memoryRequest, err := toMegabyte(job.Config.ResourceRequest.DriverMemoryRequest) if err != nil { @@ -158,9 +158,8 @@ func (t *BatchJobTemplater) createDriverSpec(job *models.PredictionJob) (v1beta2 return v1beta2.DriverSpec{ CoreRequest: cpuRequest, SparkPodSpec: v1beta2.SparkPodSpec{ - Cores: core, - CoreLimit: cpuLimit, - Memory: memoryRequest, + Cores: core, + Memory: memoryRequest, ConfigMaps: []v1beta2.NamePath{ { Name: job.Name, @@ -188,7 +187,7 @@ func (t *BatchJobTemplater) createExecutorSpec(job *models.PredictionJob) (v1bet } core := getCoreRequest(userCPURequest) - cpuRequest, cpuLimit := getCPURequestAndLimit(userCPURequest) + cpuRequest := getCPURequest(userCPURequest) memoryRequest, err := toMegabyte(job.Config.ResourceRequest.ExecutorMemoryRequest) if err != nil { @@ -204,9 +203,8 @@ func (t *BatchJobTemplater) createExecutorSpec(job *models.PredictionJob) (v1bet Instances: &job.Config.ResourceRequest.ExecutorReplica, CoreRequest: cpuRequest, SparkPodSpec: v1beta2.SparkPodSpec{ - Cores: core, - CoreLimit: cpuLimit, - Memory: memoryRequest, + Cores: core, + Memory: memoryRequest, ConfigMaps: []v1beta2.NamePath{ { Name: job.Name, @@ -251,14 +249,9 @@ func createLabel(job *models.PredictionJob) map[string]string { return labels } -func getCPURequestAndLimit(cpuRequest resource.Quantity) (*string, *string) { +func getCPURequest(cpuRequest resource.Quantity) *string { cpuRequestStr := cpuRequest.String() - - cpuLimitMilli := cpuRequestToCPULimit * float64(cpuRequest.MilliValue()) - cpuLimit := resource.NewMilliQuantity(int64(cpuLimitMilli), resource.BinarySI) - cpuLimitStr := cpuLimit.String() - - return &cpuRequestStr, &cpuLimitStr + return &cpuRequestStr } func getCoreRequest(cpuRequest resource.Quantity) *int32 { diff --git a/api/batch/resource_test.go b/api/batch/resource_test.go index 329ed714f..2fe04c3fb 100644 --- a/api/batch/resource_test.go +++ b/api/batch/resource_test.go @@ -79,27 +79,21 @@ var ( } driverCore int32 = 1 - driverCPURequest = "1" // coreToCpuRequestRatio * driverCore - driverCoreLimit = "1250m" // cpuRequestToCPULimit * driverCPURequest + driverCPURequest = "1" // coreToCpuRequestRatio * driverCore driverMemory = "1Gi" driverMemoryInMB = "1024m" executorReplica int32 = 5 executorCore int32 = 1 - executorCPURequest = "2" // coreToCpuRequestRatio * executorCore - executorCoreLimit = "2500m" // cpuRequestToCPULimit * executorCPURequest + executorCPURequest = "2" // coreToCpuRequestRatio * executorCore executorMemory = "2Gi" executorMemoryInMB = "2048m" fractExecutorCPURequest = "1500m" - fractExecutorCPULimit = "1875m" - - fractDriverCPURequest = "500m" - fractDriverCPULimit = "625m" + fractDriverCPURequest = "500m" largeExecutorCore int32 = 5 largeExecutorCPURequest = "8" - largeExecutorCPULimit = "10" defaultConfigMap = []v1beta2.NamePath{ { @@ -193,7 +187,6 @@ func TestCreateSparkApplicationResource(t *testing.T) { CoreRequest: &driverCPURequest, SparkPodSpec: v1beta2.SparkPodSpec{ Cores: &driverCore, - CoreLimit: &driverCoreLimit, Memory: &driverMemoryInMB, ConfigMaps: defaultConfigMap, Secrets: defaultSecret, @@ -209,7 +202,6 @@ func TestCreateSparkApplicationResource(t *testing.T) { CoreRequest: &executorCPURequest, SparkPodSpec: v1beta2.SparkPodSpec{ Cores: &executorCore, - CoreLimit: &executorCoreLimit, Memory: &executorMemoryInMB, ConfigMaps: defaultConfigMap, Secrets: defaultSecret, @@ -272,7 +264,6 @@ func TestCreateSparkApplicationResource(t *testing.T) { CoreRequest: &fractDriverCPURequest, SparkPodSpec: v1beta2.SparkPodSpec{ Cores: &driverCore, - CoreLimit: &fractDriverCPULimit, Memory: &driverMemoryInMB, ConfigMaps: defaultConfigMap, Secrets: defaultSecret, @@ -288,7 +279,6 @@ func TestCreateSparkApplicationResource(t *testing.T) { CoreRequest: &executorCPURequest, SparkPodSpec: v1beta2.SparkPodSpec{ Cores: &executorCore, - CoreLimit: &executorCoreLimit, Memory: &executorMemoryInMB, ConfigMaps: defaultConfigMap, Secrets: defaultSecret, @@ -351,7 +341,6 @@ func TestCreateSparkApplicationResource(t *testing.T) { CoreRequest: &driverCPURequest, SparkPodSpec: v1beta2.SparkPodSpec{ Cores: &driverCore, - CoreLimit: &driverCoreLimit, Memory: &driverMemoryInMB, ConfigMaps: defaultConfigMap, Secrets: defaultSecret, @@ -367,7 +356,6 @@ func TestCreateSparkApplicationResource(t *testing.T) { CoreRequest: &fractExecutorCPURequest, SparkPodSpec: v1beta2.SparkPodSpec{ Cores: &executorCore, - CoreLimit: &fractExecutorCPULimit, Memory: &executorMemoryInMB, ConfigMaps: defaultConfigMap, Secrets: defaultSecret, @@ -430,7 +418,6 @@ func TestCreateSparkApplicationResource(t *testing.T) { CoreRequest: &driverCPURequest, SparkPodSpec: v1beta2.SparkPodSpec{ Cores: &driverCore, - CoreLimit: &driverCoreLimit, Memory: &driverMemoryInMB, ConfigMaps: defaultConfigMap, Secrets: defaultSecret, @@ -446,7 +433,6 @@ func TestCreateSparkApplicationResource(t *testing.T) { CoreRequest: &largeExecutorCPURequest, SparkPodSpec: v1beta2.SparkPodSpec{ Cores: &largeExecutorCore, - CoreLimit: &largeExecutorCPULimit, Memory: &executorMemoryInMB, ConfigMaps: defaultConfigMap, Secrets: defaultSecret, @@ -515,7 +501,6 @@ func TestCreateSparkApplicationResource(t *testing.T) { CoreRequest: &driverCPURequest, SparkPodSpec: v1beta2.SparkPodSpec{ Cores: &driverCore, - CoreLimit: &driverCoreLimit, Memory: &driverMemoryInMB, ConfigMaps: defaultConfigMap, Secrets: defaultSecret, @@ -534,7 +519,6 @@ func TestCreateSparkApplicationResource(t *testing.T) { CoreRequest: &executorCPURequest, SparkPodSpec: v1beta2.SparkPodSpec{ Cores: &executorCore, - CoreLimit: &executorCoreLimit, Memory: &executorMemoryInMB, ConfigMaps: defaultConfigMap, Secrets: defaultSecret,