diff --git a/api/cluster/resource/templater.go b/api/cluster/resource/templater.go index 95ffd54fc..baf33ec21 100644 --- a/api/cluster/resource/templater.go +++ b/api/cluster/resource/templater.go @@ -323,21 +323,27 @@ func (t *InferenceServiceTemplater) createPredictorSpec(modelService *models.Ser if err != nil { return kservev1beta1.PredictorSpec{}, err } - envVars := models.MergeEnvVars(modelService.EnvVars, pyfuncDefaultEnv) - + // priority env vars + // 1. PyFunc default env + // 2. User environment variable + // 3. Default env variable that can be override by user environment + higherPriorityEnvVars := models.MergeEnvVars(modelService.EnvVars, pyfuncDefaultEnv) + lowerPriorityEnvVars := models.EnvVars{} if modelService.Protocol == protocol.UpiV1 { - envVars = append(envVars, models.EnvVar{Name: envGRPCOptions, Value: t.deploymentConfig.PyfuncGRPCOptions}) + lowerPriorityEnvVars = append(lowerPriorityEnvVars, models.EnvVar{Name: envGRPCOptions, Value: t.deploymentConfig.PyfuncGRPCOptions}) } if modelService.EnabledModelObservability { pyfuncPublisherCfg := t.deploymentConfig.PyFuncPublisher - envVars = append(envVars, models.EnvVar{Name: envPublisherEnabled, Value: strconv.FormatBool(modelService.EnabledModelObservability)}) - envVars = append(envVars, models.EnvVar{Name: envPublisherKafkaTopic, Value: modelService.GetPredictionLogTopicForVersion()}) - envVars = append(envVars, models.EnvVar{Name: envPublisherKafkaBrokers, Value: pyfuncPublisherCfg.Kafka.Brokers}) - envVars = append(envVars, models.EnvVar{Name: envPublisherKafkaLinger, Value: fmt.Sprintf("%d", pyfuncPublisherCfg.Kafka.LingerMS)}) - envVars = append(envVars, models.EnvVar{Name: envPublisherKafkaAck, Value: fmt.Sprintf("%d", pyfuncPublisherCfg.Kafka.Acks)}) - envVars = append(envVars, models.EnvVar{Name: envPublisherSamplingRatio, Value: fmt.Sprintf("%f", pyfuncPublisherCfg.SamplingRatioRate)}) - envVars = append(envVars, models.EnvVar{Name: envPublisherKafkaConfig, Value: pyfuncPublisherCfg.Kafka.AdditionalConfig}) + lowerPriorityEnvVars = append(lowerPriorityEnvVars, models.EnvVar{Name: envPublisherEnabled, Value: strconv.FormatBool(modelService.EnabledModelObservability)}) + lowerPriorityEnvVars = append(lowerPriorityEnvVars, models.EnvVar{Name: envPublisherKafkaTopic, Value: modelService.GetPredictionLogTopicForVersion()}) + lowerPriorityEnvVars = append(lowerPriorityEnvVars, models.EnvVar{Name: envPublisherKafkaBrokers, Value: pyfuncPublisherCfg.Kafka.Brokers}) + lowerPriorityEnvVars = append(lowerPriorityEnvVars, models.EnvVar{Name: envPublisherKafkaLinger, Value: fmt.Sprintf("%d", pyfuncPublisherCfg.Kafka.LingerMS)}) + lowerPriorityEnvVars = append(lowerPriorityEnvVars, models.EnvVar{Name: envPublisherKafkaAck, Value: fmt.Sprintf("%d", pyfuncPublisherCfg.Kafka.Acks)}) + lowerPriorityEnvVars = append(lowerPriorityEnvVars, models.EnvVar{Name: envPublisherSamplingRatio, Value: fmt.Sprintf("%f", pyfuncPublisherCfg.SamplingRatioRate)}) + lowerPriorityEnvVars = append(lowerPriorityEnvVars, models.EnvVar{Name: envPublisherKafkaConfig, Value: pyfuncPublisherCfg.Kafka.AdditionalConfig}) } + + envVars = models.MergeEnvVars(lowerPriorityEnvVars, higherPriorityEnvVars) predictorSpec = kservev1beta1.PredictorSpec{ PodSpec: kservev1beta1.PodSpec{ Containers: []corev1.Container{ diff --git a/api/cluster/resource/templater_gpu_test.go b/api/cluster/resource/templater_gpu_test.go index 2faf48bee..5716aaa6a 100644 --- a/api/cluster/resource/templater_gpu_test.go +++ b/api/cluster/resource/templater_gpu_test.go @@ -1421,12 +1421,11 @@ func TestCreateInferenceServiceSpecWithGPU(t *testing.T) { PodSpec: kservev1beta1.PodSpec{ Containers: []corev1.Container{ { - Name: kserveconstant.InferenceServiceContainerName, - Image: "gojek/project-model:1", - Resources: expDefaultModelResourceRequestsWithGPU, - Ports: grpcRawContainerPorts, - Env: models.MergeEnvVars(createPyFuncDefaultEnvVarsWithProtocol(modelSvc, protocol.UpiV1), - models.EnvVars{models.EnvVar{Name: envGRPCOptions, Value: "{}"}}).ToKubernetesEnvVars(), + Name: kserveconstant.InferenceServiceContainerName, + Image: "gojek/project-model:1", + Resources: expDefaultModelResourceRequestsWithGPU, + Ports: grpcRawContainerPorts, + Env: models.MergeEnvVars(models.EnvVars{models.EnvVar{Name: envGRPCOptions, Value: "{}"}}, createPyFuncDefaultEnvVarsWithProtocol(modelSvc, protocol.UpiV1)).ToKubernetesEnvVars(), LivenessProbe: probeConfigUPI, }, }, diff --git a/api/cluster/resource/templater_test.go b/api/cluster/resource/templater_test.go index dd77ec065..fd6c3cf84 100644 --- a/api/cluster/resource/templater_test.go +++ b/api/cluster/resource/templater_test.go @@ -166,7 +166,7 @@ var ( Acks: 0, AdditionalConfig: "{}", }, - SamplingRatioRate: 0.01, + SamplingRatioRate: 0.1, } userContainerCPUDefaultLimit = "8" @@ -904,10 +904,96 @@ func TestCreateInferenceServiceSpec(t *testing.T) { PodSpec: kservev1beta1.PodSpec{ Containers: []corev1.Container{ { - Name: kserveconstant.InferenceServiceContainerName, - Image: "gojek/project-model:1", - Env: models.MergeEnvVars(createPyFuncDefaultEnvVarsWithProtocol(modelSvcWithSchema, protocol.HttpJson), - createPyFuncPublisherEnvVars(modelSvcWithSchema, pyfuncPublisherConfig)).ToKubernetesEnvVars(), + Name: kserveconstant.InferenceServiceContainerName, + Image: "gojek/project-model:1", + Env: models.MergeEnvVars(createPyFuncPublisherEnvVars(modelSvcWithSchema, pyfuncPublisherConfig), createPyFuncDefaultEnvVarsWithProtocol(modelSvcWithSchema, protocol.HttpJson)).ToKubernetesEnvVars(), + Resources: expDefaultModelResourceRequests, + LivenessProbe: probeConfig, + }, + }, + }, + ComponentExtensionSpec: kservev1beta1.ComponentExtensionSpec{ + MinReplicas: &defaultModelResourceRequests.MinReplica, + MaxReplicas: defaultModelResourceRequests.MaxReplica, + }, + }, + }, + }, + }, + { + name: "pyfunc_v3 spec with model observability enabled, overwrite the sampling ratio", + modelSvc: &models.Service{ + Name: modelSvc.Name, + ModelName: modelSvc.ModelName, + ModelVersion: modelSvc.ModelVersion, + Namespace: project.Name, + ArtifactURI: modelSvc.ArtifactURI, + Type: models.ModelTypePyFuncV3, + Options: &models.ModelOption{ + PyFuncImageName: "gojek/project-model:1", + }, + EnvVars: models.EnvVars{ + { + Name: envPublisherSamplingRatio, + Value: "0.5", + }, + }, + Metadata: modelSvc.Metadata, + Protocol: protocol.HttpJson, + EnabledModelObservability: true, + ModelSchema: &models.ModelSchema{ + ID: models.ID(1), + ModelID: models.ID(1), + Spec: &models.SchemaSpec{ + PredictionIDColumn: "prediction_id", + TagColumns: []string{"tags"}, + FeatureTypes: map[string]models.ValueType{ + "featureA": models.Float64, + "featureB": models.Int64, + "featureC": models.String, + "featureD": models.Boolean, + }, + ModelPredictionOutput: &models.ModelPredictionOutput{ + RankingOutput: &models.RankingOutput{ + PredictionGroupIDColumn: "session_id", + RankScoreColumn: "score", + RelevanceScoreColumn: "relevance_score", + }, + }, + }, + }, + }, + resourcePercentage: queueResourcePercentage, + deploymentScale: defaultDeploymentScale, + exp: &kservev1beta1.InferenceService{ + ObjectMeta: metav1.ObjectMeta{ + Name: modelSvc.Name, + Namespace: project.Name, + Annotations: map[string]string{ + knserving.QueueSidecarResourcePercentageAnnotationKey: queueResourcePercentage, + "prometheus.io/scrape": "true", + "prometheus.io/port": "8080", + kserveconstant.DeploymentMode: string(kserveconstant.Serverless), + knautoscaling.InitialScaleAnnotationKey: fmt.Sprint(testPredictorScale), + }, + Labels: map[string]string{ + "gojek.com/app": modelSvc.Metadata.App, + "gojek.com/component": models.ComponentModelVersion, + "gojek.com/environment": testEnvironmentName, + "gojek.com/orchestrator": testOrchestratorName, + "gojek.com/stream": modelSvc.Metadata.Stream, + "gojek.com/team": modelSvc.Metadata.Team, + "sample": "true", + }, + }, + Spec: kservev1beta1.InferenceServiceSpec{ + Predictor: kservev1beta1.PredictorSpec{ + PodSpec: kservev1beta1.PodSpec{ + Containers: []corev1.Container{ + { + Name: kserveconstant.InferenceServiceContainerName, + Image: "gojek/project-model:1", + Env: models.MergeEnvVars(createPyFuncPublisherEnvVars(modelSvcWithSchema, pyfuncPublisherConfig), models.MergeEnvVars(models.EnvVars{{Name: envPublisherSamplingRatio, Value: "0.5"}}, createPyFuncDefaultEnvVarsWithProtocol(modelSvcWithSchema, protocol.HttpJson))).ToKubernetesEnvVars(), Resources: expDefaultModelResourceRequests, LivenessProbe: probeConfig, }, @@ -965,10 +1051,9 @@ func TestCreateInferenceServiceSpec(t *testing.T) { PodSpec: kservev1beta1.PodSpec{ Containers: []corev1.Container{ { - Name: kserveconstant.InferenceServiceContainerName, - Image: "gojek/project-model:1", - Env: models.MergeEnvVars(createPyFuncDefaultEnvVarsWithProtocol(modelSvc, protocol.HttpJson), - createPyFuncPublisherEnvVars(modelSvc, pyfuncPublisherConfig)).ToKubernetesEnvVars(), + Name: kserveconstant.InferenceServiceContainerName, + Image: "gojek/project-model:1", + Env: models.MergeEnvVars(createPyFuncPublisherEnvVars(modelSvc, pyfuncPublisherConfig), createPyFuncDefaultEnvVarsWithProtocol(modelSvc, protocol.HttpJson)).ToKubernetesEnvVars(), Resources: expDefaultModelResourceRequests, LivenessProbe: probeConfig, }, @@ -1727,12 +1812,11 @@ func TestCreateInferenceServiceSpec(t *testing.T) { PodSpec: kservev1beta1.PodSpec{ Containers: []corev1.Container{ { - Name: kserveconstant.InferenceServiceContainerName, - Image: "gojek/project-model:1", - Resources: expDefaultModelResourceRequests, - Ports: grpcServerlessContainerPorts, - Env: models.MergeEnvVars(createPyFuncDefaultEnvVarsWithProtocol(modelSvc, protocol.UpiV1), - models.EnvVars{models.EnvVar{Name: envGRPCOptions, Value: "{}"}}).ToKubernetesEnvVars(), + Name: kserveconstant.InferenceServiceContainerName, + Image: "gojek/project-model:1", + Resources: expDefaultModelResourceRequests, + Ports: grpcServerlessContainerPorts, + Env: models.MergeEnvVars(models.EnvVars{models.EnvVar{Name: envGRPCOptions, Value: "{}"}}, createPyFuncDefaultEnvVarsWithProtocol(modelSvc, protocol.UpiV1)).ToKubernetesEnvVars(), LivenessProbe: probeConfigUPI, }, }, @@ -2502,10 +2586,9 @@ func TestCreateInferenceServiceSpecWithTransformer(t *testing.T) { PodSpec: kservev1beta1.PodSpec{ Containers: []corev1.Container{ { - Name: kserveconstant.InferenceServiceContainerName, - Image: "gojek/project-model:1", - Env: models.MergeEnvVars(createPyFuncDefaultEnvVarsWithProtocol(modelSvc, protocol.UpiV1), - models.EnvVars{models.EnvVar{Name: envGRPCOptions, Value: "{}"}}).ToKubernetesEnvVars(), + Name: kserveconstant.InferenceServiceContainerName, + Image: "gojek/project-model:1", + Env: models.MergeEnvVars(models.EnvVars{models.EnvVar{Name: envGRPCOptions, Value: "{}"}}, createPyFuncDefaultEnvVarsWithProtocol(modelSvc, protocol.UpiV1)).ToKubernetesEnvVars(), Resources: expDefaultModelResourceRequests, LivenessProbe: probeConfigUPI, Ports: grpcRawContainerPorts,