Skip to content

Commit

Permalink
Model deployment support for model observability (#492)
Browse files Browse the repository at this point in the history
<!--  Thanks for sending a pull request!  Here are some tips for you:

1. Run unit tests and ensure that they are passing
2. If your change introduces any API changes, make sure to update the
e2e tests
3. Make sure documentation is updated for your PR!

-->

**What this PR does / why we need it**:
<!-- Explain here the context and why you're making the change. What is
the problem you're trying to solve. --->
In the previous [PR](#489) we
introduce `PyFuncV3Model` to support model observability use case. What
missing from previous PR is deployment process of `PyFuncV3Model`, hence
this PR is raised.

Modification:
* `api/config/config.go` - Adding `PyFuncPublisherConfig` new
configuration for pyfunc publisher
* `api/config/environment.go` - Update parsing `DeploymentConfig` to add
relevant config to the deployment config
* `db-migrations/34_version_endpoints_enable_observability.*.sql` -
Adding model observability enable flag to version endpoint
* `api/models/model.go` - Adding `pyfunc_v3` model type
* `api/cluster/resource/templater.go` 
* All deployment config will be part of `deploymentConfig` including
standard transformer configuration
  * Add required env vars to be passed for `PyFuncV3Model`
* `api/api/version_endpoints_api.go` - Adding `PyFuncV3Model` as
supported model type for UPI
* `python/sdk/test/pyfunc_integration_test.py` - Adding integration test
for `PyFuncV3Model`


**Which issue(s) this PR fixes**:
<!--
*Automatically closes linked issue when PR is merged.
Usage: `Fixes #<issue number>`, or `Fixes (paste link of issue)`.
-->

Fixes #

**Does this PR introduce a user-facing change?**:
<!--
If no, just write "NONE" in the release-note block below.
If yes, a release note is required. Enter your extended release note in
the block below.
If the PR requires additional action from users switching to the new
release, include the string "action required".

For more information about release notes, see kubernetes' guide here:
http://git.k8s.io/community/contributors/guide/release-notes.md
-->

```release-note

```

**Checklist**

- [ ] Added unit test, integration, and/or e2e tests
- [ ] Tested locally
- [ ] Updated documentation
- [ ] Update Swagger spec if the PR introduce API changes
- [ ] Regenerated Golang and Python client if the PR introduce API
changes
  • Loading branch information
tiopramayudi authored Dec 5, 2023
1 parent 670ac14 commit c5b61e0
Show file tree
Hide file tree
Showing 34 changed files with 1,562 additions and 279 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/merlin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ jobs:
LOCAL_REGISTRY_PORT: 12345
LOCAL_REGISTRY: "dev.localhost"
INGRESS_HOST: "127.0.0.1.nip.io"
MERLIN_CHART_VERSION: 0.11.4
MERLIN_CHART_VERSION: 0.13.4
E2E_PYTHON_VERSION: "3.10.6"
K3S_VERSION: v1.26.7-k3s1
steps:
Expand Down
156 changes: 156 additions & 0 deletions api/api/validator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package api

import (
"context"
"fmt"

"github.com/caraml-dev/merlin/config"
"github.com/caraml-dev/merlin/models"
"github.com/caraml-dev/merlin/pkg/protocol"
"github.com/caraml-dev/merlin/service"
"github.com/feast-dev/feast/sdk/go/protos/feast/core"
)

type requestValidator interface {
validate() error
}

type funcValidate struct {
f func() error
}

func newFuncValidate(f func() error) *funcValidate {
return &funcValidate{
f: f,
}
}

func (fv *funcValidate) validate() error {
return fv.f()
}

var supportedUPIModelTypes = map[string]bool{
models.ModelTypePyFunc: true,
models.ModelTypeCustom: true,
models.ModelTypePyFuncV3: true,
}

func isModelSupportUPI(model *models.Model) bool {
_, isSupported := supportedUPIModelTypes[model.Type]

return isSupported
}

func validateRequest(validators ...requestValidator) error {
for _, validator := range validators {
if err := validator.validate(); err != nil {
return err
}
}
return nil
}

func customModelValidation(model *models.Model, version *models.Version) requestValidator {
return newFuncValidate(func() error {
if model.Type == models.ModelTypeCustom {
if err := validateCustomPredictor(version); err != nil {
return err
}
}
return nil
})
}

func upiModelValidation(model *models.Model, endpointProtocol protocol.Protocol) requestValidator {
return newFuncValidate(func() error {
if !isModelSupportUPI(model) && endpointProtocol == protocol.UpiV1 {
return fmt.Errorf("%s model is not supported by UPI", model.Type)
}
return nil
})
}

func newVersionEndpointValidation(version *models.Version, envName string) requestValidator {
return newFuncValidate(func() error {
endpoint, ok := version.GetEndpointByEnvironmentName(envName)
if ok && (endpoint.IsRunning() || endpoint.IsServing()) {
return fmt.Errorf("there is `%s` deployment for the model version", endpoint.Status)
}
return nil
})
}

func deploymentQuotaValidation(ctx context.Context, model *models.Model, env *models.Environment, endpointSvc service.EndpointsService) requestValidator {
return newFuncValidate(func() error {
deployedModelVersionCount, err := endpointSvc.CountEndpoints(ctx, env, model)
if err != nil {
return fmt.Errorf("unable to count number of endpoints in env %s: %w", env.Name, err)
}

if deployedModelVersionCount >= config.MaxDeployedVersion {
return fmt.Errorf("max deployed endpoint reached. Max: %d Current: %d, undeploy existing endpoint before continuing", config.MaxDeployedVersion, deployedModelVersionCount)
}
return nil
})
}

func transformerValidation(
ctx context.Context,
endpoint *models.VersionEndpoint,
stdTransformerCfg config.StandardTransformerConfig,
feastCore core.CoreServiceClient) requestValidator {
return newFuncValidate(func() error {
if endpoint.Transformer != nil && endpoint.Transformer.Enabled {
err := validateTransformer(ctx, endpoint, stdTransformerCfg, feastCore)
if err != nil {
return fmt.Errorf("Error validating transformer: %w", err)
}
}
return nil
})
}

func updateRequestValidation(prev *models.VersionEndpoint, new *models.VersionEndpoint) requestValidator {
return newFuncValidate(func() error {
if prev.EnvironmentName != new.EnvironmentName {
return fmt.Errorf("updating environment is not allowed, previous: %s, new: %s", prev.EnvironmentName, new.EnvironmentName)
}

if prev.Status == models.EndpointPending {
return fmt.Errorf("updating endpoint status to %s is not allowed when the endpoint is currently in the pending state", new.Status)
}

if new.Status != prev.Status {
if prev.Status == models.EndpointServing {
return fmt.Errorf("updating endpoint status to %s is not allowed when the endpoint is currently in the serving state", new.Status)
}

if new.Status != models.EndpointRunning && new.Status != models.EndpointTerminated {
return fmt.Errorf("updating endpoint status to %s is not allowed", new.Status)
}
}
return nil
})
}

func deploymentModeValidation(prev *models.VersionEndpoint, new *models.VersionEndpoint) requestValidator {
return newFuncValidate(func() error {
// Should not allow changing the deployment mode of a pending/running/serving model for 2 reasons:
// * For "serving" models it's risky as, we can't guarantee graceful re-deployment
// * Kserve uses slightly different deployment resource naming under the hood and doesn't clean up the older deployment
if (prev.IsRunning() || prev.IsServing()) && new.DeploymentMode != "" &&
new.DeploymentMode != prev.DeploymentMode {
return fmt.Errorf("changing deployment type of a %s model is not allowed, please terminate it first", prev.Status)
}
return nil
})
}

func modelObservabilityValidation(endpoint *models.VersionEndpoint, model *models.Model) requestValidator {
return newFuncValidate(func() error {
if endpoint.EnableModelObservability && model.Type != models.ModelTypePyFuncV3 {
return fmt.Errorf("model type should be pyfunc_v3 if want to enable model observablity")
}
return nil
})
}
Loading

0 comments on commit c5b61e0

Please sign in to comment.