Skip to content

Commit

Permalink
feat(batch-predictor): remove CPU limits from SparkApplication resour…
Browse files Browse the repository at this point in the history
…ce (#615)

<!--  Thanks for sending a pull request!  Here are some tips for you:

1. Run unit tests and ensure that they are passing
2. If your change introduces any API changes, make sure to update the
e2e tests
3. Make sure documentation is updated for your PR!

-->
# Description

Similar to kserve models, this PR removes CPU limits for batch
prediction jobs. If limits are set, executors will experience
significant throttling because batch jobs tend to consume all available
CPU by its nature.

# Modifications
* SparkApplication resource no longer has `coreLimit` set in driver and
executor specs. This field is optional according to [the
specs](https://github.com/kubeflow/spark-operator/blob/master/docs/api-docs.md#sparkoperator.k8s.io/v1beta2.SparkPodSpec)
and it should be safe to simply remove it.

# Tests
<!-- Besides the existing / updated automated tests, what specific
scenarios should be tested? Consider the backward compatibility of the
changes, whether corner cases are covered, etc. Please describe the
tests and check the ones that have been completed. Eg:
- [x] Deploying new and existing standard models
- [ ] Deploying PyFunc models
-->

# Checklist
- [x] Added PR label
- [x] Added unit test, integration, and/or e2e tests
- [x] Tested locally
- [ ] Updated documentation
- [ ] Update Swagger spec if the PR introduce API changes
- [ ] Regenerated Golang and Python client if the PR introduces API
changes

# Release Notes
<!--
Does this PR introduce a user-facing change?
If no, just write "NONE" in the release-note block below.
If yes, a release note is required. Enter your extended release note in
the block below.
If the PR requires additional action from users switching to the new
release, include the string "action required".

For more information about release notes, see kubernetes' guide here:
http://git.k8s.io/community/contributors/guide/release-notes.md
-->

```release-note
Remove CPU Limits for batch prediction jobs
```
  • Loading branch information
mbruner authored Nov 5, 2024
1 parent 0877a4f commit 7f7d42c
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 34 deletions.
23 changes: 8 additions & 15 deletions api/batch/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (t *BatchJobTemplater) createDriverSpec(job *models.PredictionJob) (v1beta2
}

core := getCoreRequest(userCPURequest)
cpuRequest, cpuLimit := getCPURequestAndLimit(userCPURequest)
cpuRequest := getCPURequest(userCPURequest)

memoryRequest, err := toMegabyte(job.Config.ResourceRequest.DriverMemoryRequest)
if err != nil {
Expand All @@ -158,9 +158,8 @@ func (t *BatchJobTemplater) createDriverSpec(job *models.PredictionJob) (v1beta2
return v1beta2.DriverSpec{
CoreRequest: cpuRequest,
SparkPodSpec: v1beta2.SparkPodSpec{
Cores: core,
CoreLimit: cpuLimit,
Memory: memoryRequest,
Cores: core,
Memory: memoryRequest,
ConfigMaps: []v1beta2.NamePath{
{
Name: job.Name,
Expand Down Expand Up @@ -188,7 +187,7 @@ func (t *BatchJobTemplater) createExecutorSpec(job *models.PredictionJob) (v1bet
}

core := getCoreRequest(userCPURequest)
cpuRequest, cpuLimit := getCPURequestAndLimit(userCPURequest)
cpuRequest := getCPURequest(userCPURequest)

memoryRequest, err := toMegabyte(job.Config.ResourceRequest.ExecutorMemoryRequest)
if err != nil {
Expand All @@ -204,9 +203,8 @@ func (t *BatchJobTemplater) createExecutorSpec(job *models.PredictionJob) (v1bet
Instances: &job.Config.ResourceRequest.ExecutorReplica,
CoreRequest: cpuRequest,
SparkPodSpec: v1beta2.SparkPodSpec{
Cores: core,
CoreLimit: cpuLimit,
Memory: memoryRequest,
Cores: core,
Memory: memoryRequest,
ConfigMaps: []v1beta2.NamePath{
{
Name: job.Name,
Expand Down Expand Up @@ -251,14 +249,9 @@ func createLabel(job *models.PredictionJob) map[string]string {
return labels
}

func getCPURequestAndLimit(cpuRequest resource.Quantity) (*string, *string) {
func getCPURequest(cpuRequest resource.Quantity) *string {
cpuRequestStr := cpuRequest.String()

cpuLimitMilli := cpuRequestToCPULimit * float64(cpuRequest.MilliValue())
cpuLimit := resource.NewMilliQuantity(int64(cpuLimitMilli), resource.BinarySI)
cpuLimitStr := cpuLimit.String()

return &cpuRequestStr, &cpuLimitStr
return &cpuRequestStr
}

func getCoreRequest(cpuRequest resource.Quantity) *int32 {
Expand Down
22 changes: 3 additions & 19 deletions api/batch/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,27 +79,21 @@ var (
}

driverCore int32 = 1
driverCPURequest = "1" // coreToCpuRequestRatio * driverCore
driverCoreLimit = "1250m" // cpuRequestToCPULimit * driverCPURequest
driverCPURequest = "1" // coreToCpuRequestRatio * driverCore
driverMemory = "1Gi"
driverMemoryInMB = "1024m"

executorReplica int32 = 5
executorCore int32 = 1
executorCPURequest = "2" // coreToCpuRequestRatio * executorCore
executorCoreLimit = "2500m" // cpuRequestToCPULimit * executorCPURequest
executorCPURequest = "2" // coreToCpuRequestRatio * executorCore
executorMemory = "2Gi"
executorMemoryInMB = "2048m"

fractExecutorCPURequest = "1500m"
fractExecutorCPULimit = "1875m"

fractDriverCPURequest = "500m"
fractDriverCPULimit = "625m"
fractDriverCPURequest = "500m"

largeExecutorCore int32 = 5
largeExecutorCPURequest = "8"
largeExecutorCPULimit = "10"

defaultConfigMap = []v1beta2.NamePath{
{
Expand Down Expand Up @@ -193,7 +187,6 @@ func TestCreateSparkApplicationResource(t *testing.T) {
CoreRequest: &driverCPURequest,
SparkPodSpec: v1beta2.SparkPodSpec{
Cores: &driverCore,
CoreLimit: &driverCoreLimit,
Memory: &driverMemoryInMB,
ConfigMaps: defaultConfigMap,
Secrets: defaultSecret,
Expand All @@ -209,7 +202,6 @@ func TestCreateSparkApplicationResource(t *testing.T) {
CoreRequest: &executorCPURequest,
SparkPodSpec: v1beta2.SparkPodSpec{
Cores: &executorCore,
CoreLimit: &executorCoreLimit,
Memory: &executorMemoryInMB,
ConfigMaps: defaultConfigMap,
Secrets: defaultSecret,
Expand Down Expand Up @@ -272,7 +264,6 @@ func TestCreateSparkApplicationResource(t *testing.T) {
CoreRequest: &fractDriverCPURequest,
SparkPodSpec: v1beta2.SparkPodSpec{
Cores: &driverCore,
CoreLimit: &fractDriverCPULimit,
Memory: &driverMemoryInMB,
ConfigMaps: defaultConfigMap,
Secrets: defaultSecret,
Expand All @@ -288,7 +279,6 @@ func TestCreateSparkApplicationResource(t *testing.T) {
CoreRequest: &executorCPURequest,
SparkPodSpec: v1beta2.SparkPodSpec{
Cores: &executorCore,
CoreLimit: &executorCoreLimit,
Memory: &executorMemoryInMB,
ConfigMaps: defaultConfigMap,
Secrets: defaultSecret,
Expand Down Expand Up @@ -351,7 +341,6 @@ func TestCreateSparkApplicationResource(t *testing.T) {
CoreRequest: &driverCPURequest,
SparkPodSpec: v1beta2.SparkPodSpec{
Cores: &driverCore,
CoreLimit: &driverCoreLimit,
Memory: &driverMemoryInMB,
ConfigMaps: defaultConfigMap,
Secrets: defaultSecret,
Expand All @@ -367,7 +356,6 @@ func TestCreateSparkApplicationResource(t *testing.T) {
CoreRequest: &fractExecutorCPURequest,
SparkPodSpec: v1beta2.SparkPodSpec{
Cores: &executorCore,
CoreLimit: &fractExecutorCPULimit,
Memory: &executorMemoryInMB,
ConfigMaps: defaultConfigMap,
Secrets: defaultSecret,
Expand Down Expand Up @@ -430,7 +418,6 @@ func TestCreateSparkApplicationResource(t *testing.T) {
CoreRequest: &driverCPURequest,
SparkPodSpec: v1beta2.SparkPodSpec{
Cores: &driverCore,
CoreLimit: &driverCoreLimit,
Memory: &driverMemoryInMB,
ConfigMaps: defaultConfigMap,
Secrets: defaultSecret,
Expand All @@ -446,7 +433,6 @@ func TestCreateSparkApplicationResource(t *testing.T) {
CoreRequest: &largeExecutorCPURequest,
SparkPodSpec: v1beta2.SparkPodSpec{
Cores: &largeExecutorCore,
CoreLimit: &largeExecutorCPULimit,
Memory: &executorMemoryInMB,
ConfigMaps: defaultConfigMap,
Secrets: defaultSecret,
Expand Down Expand Up @@ -515,7 +501,6 @@ func TestCreateSparkApplicationResource(t *testing.T) {
CoreRequest: &driverCPURequest,
SparkPodSpec: v1beta2.SparkPodSpec{
Cores: &driverCore,
CoreLimit: &driverCoreLimit,
Memory: &driverMemoryInMB,
ConfigMaps: defaultConfigMap,
Secrets: defaultSecret,
Expand All @@ -534,7 +519,6 @@ func TestCreateSparkApplicationResource(t *testing.T) {
CoreRequest: &executorCPURequest,
SparkPodSpec: v1beta2.SparkPodSpec{
Cores: &executorCore,
CoreLimit: &executorCoreLimit,
Memory: &executorMemoryInMB,
ConfigMaps: defaultConfigMap,
Secrets: defaultSecret,
Expand Down

0 comments on commit 7f7d42c

Please sign in to comment.