Skip to content

Commit

Permalink
fix: Fix order of pyfunc env vars based on priority (#542)
Browse files Browse the repository at this point in the history
<!--  Thanks for sending a pull request!  Here are some tips for you:

1. Run unit tests and ensure that they are passing
2. If your change introduces any API changes, make sure to update the
e2e tests
3. Make sure documentation is updated for your PR!

-->
# Description
<!-- Briefly describe the motivation for the change. Please include
illustrations where appropriate. -->
User environment variables can't overwrite environment default value due
to incorrect order of env vars that being passed to `InferenceService`

# Modifications
<!-- Summarize the key code changes. -->
Reorder env vars for pyfunc based on priority. Following are the order
of priority:
* Pyfunc Default ENV
* User defined ENV
* Default ENV that can be overwriten

# Tests
<!-- Besides the existing / updated automated tests, what specific
scenarios should be tested? Consider the backward compatibility of the
changes, whether corner cases are covered, etc. Please describe the
tests and check the ones that have been completed. Eg:
- [x] Deploying new and existing standard models
- [ ] Deploying PyFunc models
-->

# Checklist
- [x] Added PR label
- [x] Added unit test, integration, and/or e2e tests
- [ ] Tested locally
- [ ] Updated documentation
- [ ] Update Swagger spec if the PR introduce API changes
- [ ] Regenerated Golang and Python client if the PR introduces API
changes

# Release Notes
<!--
Does this PR introduce a user-facing change?
If no, just write "NONE" in the release-note block below.
If yes, a release note is required. Enter your extended release note in
the block below.
If the PR requires additional action from users switching to the new
release, include the string "action required".

For more information about release notes, see kubernetes' guide here:
http://git.k8s.io/community/contributors/guide/release-notes.md
-->

```release-note

```
  • Loading branch information
tiopramayudi committed Feb 20, 2024
1 parent 4a44b0f commit b764729
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 35 deletions.
26 changes: 16 additions & 10 deletions api/cluster/resource/templater.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,21 +323,27 @@ func (t *InferenceServiceTemplater) createPredictorSpec(modelService *models.Ser
if err != nil {
return kservev1beta1.PredictorSpec{}, err
}
envVars := models.MergeEnvVars(modelService.EnvVars, pyfuncDefaultEnv)

// priority env vars
// 1. PyFunc default env
// 2. User environment variable
// 3. Default env variable that can be override by user environment
higherPriorityEnvVars := models.MergeEnvVars(modelService.EnvVars, pyfuncDefaultEnv)
lowerPriorityEnvVars := models.EnvVars{}
if modelService.Protocol == protocol.UpiV1 {
envVars = append(envVars, models.EnvVar{Name: envGRPCOptions, Value: t.deploymentConfig.PyfuncGRPCOptions})
lowerPriorityEnvVars = append(lowerPriorityEnvVars, models.EnvVar{Name: envGRPCOptions, Value: t.deploymentConfig.PyfuncGRPCOptions})
}
if modelService.EnabledModelObservability {
pyfuncPublisherCfg := t.deploymentConfig.PyFuncPublisher
envVars = append(envVars, models.EnvVar{Name: envPublisherEnabled, Value: strconv.FormatBool(modelService.EnabledModelObservability)})
envVars = append(envVars, models.EnvVar{Name: envPublisherKafkaTopic, Value: modelService.GetPredictionLogTopicForVersion()})
envVars = append(envVars, models.EnvVar{Name: envPublisherKafkaBrokers, Value: pyfuncPublisherCfg.Kafka.Brokers})
envVars = append(envVars, models.EnvVar{Name: envPublisherKafkaLinger, Value: fmt.Sprintf("%d", pyfuncPublisherCfg.Kafka.LingerMS)})
envVars = append(envVars, models.EnvVar{Name: envPublisherKafkaAck, Value: fmt.Sprintf("%d", pyfuncPublisherCfg.Kafka.Acks)})
envVars = append(envVars, models.EnvVar{Name: envPublisherSamplingRatio, Value: fmt.Sprintf("%f", pyfuncPublisherCfg.SamplingRatioRate)})
envVars = append(envVars, models.EnvVar{Name: envPublisherKafkaConfig, Value: pyfuncPublisherCfg.Kafka.AdditionalConfig})
lowerPriorityEnvVars = append(lowerPriorityEnvVars, models.EnvVar{Name: envPublisherEnabled, Value: strconv.FormatBool(modelService.EnabledModelObservability)})
lowerPriorityEnvVars = append(lowerPriorityEnvVars, models.EnvVar{Name: envPublisherKafkaTopic, Value: modelService.GetPredictionLogTopicForVersion()})
lowerPriorityEnvVars = append(lowerPriorityEnvVars, models.EnvVar{Name: envPublisherKafkaBrokers, Value: pyfuncPublisherCfg.Kafka.Brokers})
lowerPriorityEnvVars = append(lowerPriorityEnvVars, models.EnvVar{Name: envPublisherKafkaLinger, Value: fmt.Sprintf("%d", pyfuncPublisherCfg.Kafka.LingerMS)})
lowerPriorityEnvVars = append(lowerPriorityEnvVars, models.EnvVar{Name: envPublisherKafkaAck, Value: fmt.Sprintf("%d", pyfuncPublisherCfg.Kafka.Acks)})
lowerPriorityEnvVars = append(lowerPriorityEnvVars, models.EnvVar{Name: envPublisherSamplingRatio, Value: fmt.Sprintf("%f", pyfuncPublisherCfg.SamplingRatioRate)})
lowerPriorityEnvVars = append(lowerPriorityEnvVars, models.EnvVar{Name: envPublisherKafkaConfig, Value: pyfuncPublisherCfg.Kafka.AdditionalConfig})
}

envVars = models.MergeEnvVars(lowerPriorityEnvVars, higherPriorityEnvVars)
predictorSpec = kservev1beta1.PredictorSpec{
PodSpec: kservev1beta1.PodSpec{
Containers: []corev1.Container{
Expand Down
11 changes: 5 additions & 6 deletions api/cluster/resource/templater_gpu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1421,12 +1421,11 @@ func TestCreateInferenceServiceSpecWithGPU(t *testing.T) {
PodSpec: kservev1beta1.PodSpec{
Containers: []corev1.Container{
{
Name: kserveconstant.InferenceServiceContainerName,
Image: "gojek/project-model:1",
Resources: expDefaultModelResourceRequestsWithGPU,
Ports: grpcRawContainerPorts,
Env: models.MergeEnvVars(createPyFuncDefaultEnvVarsWithProtocol(modelSvc, protocol.UpiV1),
models.EnvVars{models.EnvVar{Name: envGRPCOptions, Value: "{}"}}).ToKubernetesEnvVars(),
Name: kserveconstant.InferenceServiceContainerName,
Image: "gojek/project-model:1",
Resources: expDefaultModelResourceRequestsWithGPU,
Ports: grpcRawContainerPorts,
Env: models.MergeEnvVars(models.EnvVars{models.EnvVar{Name: envGRPCOptions, Value: "{}"}}, createPyFuncDefaultEnvVarsWithProtocol(modelSvc, protocol.UpiV1)).ToKubernetesEnvVars(),
LivenessProbe: probeConfigUPI,
},
},
Expand Down
121 changes: 102 additions & 19 deletions api/cluster/resource/templater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ var (
Acks: 0,
AdditionalConfig: "{}",
},
SamplingRatioRate: 0.01,
SamplingRatioRate: 0.1,
}

userContainerCPUDefaultLimit = "8"
Expand Down Expand Up @@ -904,10 +904,96 @@ func TestCreateInferenceServiceSpec(t *testing.T) {
PodSpec: kservev1beta1.PodSpec{
Containers: []corev1.Container{
{
Name: kserveconstant.InferenceServiceContainerName,
Image: "gojek/project-model:1",
Env: models.MergeEnvVars(createPyFuncDefaultEnvVarsWithProtocol(modelSvcWithSchema, protocol.HttpJson),
createPyFuncPublisherEnvVars(modelSvcWithSchema, pyfuncPublisherConfig)).ToKubernetesEnvVars(),
Name: kserveconstant.InferenceServiceContainerName,
Image: "gojek/project-model:1",
Env: models.MergeEnvVars(createPyFuncPublisherEnvVars(modelSvcWithSchema, pyfuncPublisherConfig), createPyFuncDefaultEnvVarsWithProtocol(modelSvcWithSchema, protocol.HttpJson)).ToKubernetesEnvVars(),
Resources: expDefaultModelResourceRequests,
LivenessProbe: probeConfig,
},
},
},
ComponentExtensionSpec: kservev1beta1.ComponentExtensionSpec{
MinReplicas: &defaultModelResourceRequests.MinReplica,
MaxReplicas: defaultModelResourceRequests.MaxReplica,
},
},
},
},
},
{
name: "pyfunc_v3 spec with model observability enabled, overwrite the sampling ratio",
modelSvc: &models.Service{
Name: modelSvc.Name,
ModelName: modelSvc.ModelName,
ModelVersion: modelSvc.ModelVersion,
Namespace: project.Name,
ArtifactURI: modelSvc.ArtifactURI,
Type: models.ModelTypePyFuncV3,
Options: &models.ModelOption{
PyFuncImageName: "gojek/project-model:1",
},
EnvVars: models.EnvVars{
{
Name: envPublisherSamplingRatio,
Value: "0.5",
},
},
Metadata: modelSvc.Metadata,
Protocol: protocol.HttpJson,
EnabledModelObservability: true,
ModelSchema: &models.ModelSchema{
ID: models.ID(1),
ModelID: models.ID(1),
Spec: &models.SchemaSpec{
PredictionIDColumn: "prediction_id",
TagColumns: []string{"tags"},
FeatureTypes: map[string]models.ValueType{
"featureA": models.Float64,
"featureB": models.Int64,
"featureC": models.String,
"featureD": models.Boolean,
},
ModelPredictionOutput: &models.ModelPredictionOutput{
RankingOutput: &models.RankingOutput{
PredictionGroupIDColumn: "session_id",
RankScoreColumn: "score",
RelevanceScoreColumn: "relevance_score",
},
},
},
},
},
resourcePercentage: queueResourcePercentage,
deploymentScale: defaultDeploymentScale,
exp: &kservev1beta1.InferenceService{
ObjectMeta: metav1.ObjectMeta{
Name: modelSvc.Name,
Namespace: project.Name,
Annotations: map[string]string{
knserving.QueueSidecarResourcePercentageAnnotationKey: queueResourcePercentage,
"prometheus.io/scrape": "true",
"prometheus.io/port": "8080",
kserveconstant.DeploymentMode: string(kserveconstant.Serverless),
knautoscaling.InitialScaleAnnotationKey: fmt.Sprint(testPredictorScale),
},
Labels: map[string]string{
"gojek.com/app": modelSvc.Metadata.App,
"gojek.com/component": models.ComponentModelVersion,
"gojek.com/environment": testEnvironmentName,
"gojek.com/orchestrator": testOrchestratorName,
"gojek.com/stream": modelSvc.Metadata.Stream,
"gojek.com/team": modelSvc.Metadata.Team,
"sample": "true",
},
},
Spec: kservev1beta1.InferenceServiceSpec{
Predictor: kservev1beta1.PredictorSpec{
PodSpec: kservev1beta1.PodSpec{
Containers: []corev1.Container{
{
Name: kserveconstant.InferenceServiceContainerName,
Image: "gojek/project-model:1",
Env: models.MergeEnvVars(createPyFuncPublisherEnvVars(modelSvcWithSchema, pyfuncPublisherConfig), models.MergeEnvVars(models.EnvVars{{Name: envPublisherSamplingRatio, Value: "0.5"}}, createPyFuncDefaultEnvVarsWithProtocol(modelSvcWithSchema, protocol.HttpJson))).ToKubernetesEnvVars(),
Resources: expDefaultModelResourceRequests,
LivenessProbe: probeConfig,
},
Expand Down Expand Up @@ -965,10 +1051,9 @@ func TestCreateInferenceServiceSpec(t *testing.T) {
PodSpec: kservev1beta1.PodSpec{
Containers: []corev1.Container{
{
Name: kserveconstant.InferenceServiceContainerName,
Image: "gojek/project-model:1",
Env: models.MergeEnvVars(createPyFuncDefaultEnvVarsWithProtocol(modelSvc, protocol.HttpJson),
createPyFuncPublisherEnvVars(modelSvc, pyfuncPublisherConfig)).ToKubernetesEnvVars(),
Name: kserveconstant.InferenceServiceContainerName,
Image: "gojek/project-model:1",
Env: models.MergeEnvVars(createPyFuncPublisherEnvVars(modelSvc, pyfuncPublisherConfig), createPyFuncDefaultEnvVarsWithProtocol(modelSvc, protocol.HttpJson)).ToKubernetesEnvVars(),
Resources: expDefaultModelResourceRequests,
LivenessProbe: probeConfig,
},
Expand Down Expand Up @@ -1727,12 +1812,11 @@ func TestCreateInferenceServiceSpec(t *testing.T) {
PodSpec: kservev1beta1.PodSpec{
Containers: []corev1.Container{
{
Name: kserveconstant.InferenceServiceContainerName,
Image: "gojek/project-model:1",
Resources: expDefaultModelResourceRequests,
Ports: grpcServerlessContainerPorts,
Env: models.MergeEnvVars(createPyFuncDefaultEnvVarsWithProtocol(modelSvc, protocol.UpiV1),
models.EnvVars{models.EnvVar{Name: envGRPCOptions, Value: "{}"}}).ToKubernetesEnvVars(),
Name: kserveconstant.InferenceServiceContainerName,
Image: "gojek/project-model:1",
Resources: expDefaultModelResourceRequests,
Ports: grpcServerlessContainerPorts,
Env: models.MergeEnvVars(models.EnvVars{models.EnvVar{Name: envGRPCOptions, Value: "{}"}}, createPyFuncDefaultEnvVarsWithProtocol(modelSvc, protocol.UpiV1)).ToKubernetesEnvVars(),
LivenessProbe: probeConfigUPI,
},
},
Expand Down Expand Up @@ -2502,10 +2586,9 @@ func TestCreateInferenceServiceSpecWithTransformer(t *testing.T) {
PodSpec: kservev1beta1.PodSpec{
Containers: []corev1.Container{
{
Name: kserveconstant.InferenceServiceContainerName,
Image: "gojek/project-model:1",
Env: models.MergeEnvVars(createPyFuncDefaultEnvVarsWithProtocol(modelSvc, protocol.UpiV1),
models.EnvVars{models.EnvVar{Name: envGRPCOptions, Value: "{}"}}).ToKubernetesEnvVars(),
Name: kserveconstant.InferenceServiceContainerName,
Image: "gojek/project-model:1",
Env: models.MergeEnvVars(models.EnvVars{models.EnvVar{Name: envGRPCOptions, Value: "{}"}}, createPyFuncDefaultEnvVarsWithProtocol(modelSvc, protocol.UpiV1)).ToKubernetesEnvVars(),
Resources: expDefaultModelResourceRequests,
LivenessProbe: probeConfigUPI,
Ports: grpcRawContainerPorts,
Expand Down

0 comments on commit b764729

Please sign in to comment.