Skip to content

Commit

Permalink
Model Version Revision Improvement (#479)
Browse files Browse the repository at this point in the history
<!--  Thanks for sending a pull request!  Here are some tips for you:

1. Run unit tests and ensure that they are passing
2. If your change introduces any API changes, make sure to update the
e2e tests
3. Make sure documentation is updated for your PR!

-->

**What this PR does / why we need it**:
<!-- Explain here the context and why you're making the change. What is
the problem you're trying to solve. --->

This PR is for fixing and improving model version revision:

1. For the redeployment, we are no longer update the current endpoint
with new endpoint data to make sure the UI still showing the current
endpoint deployed which is consistent with the actual endpoint deployed
in the cluster. Instead, once the deployment is successful, we will then
update the endpoint.
2. Fix Deployment History UI when deployment status is running but with
error message displayed as Failed.

**Which issue(s) this PR fixes**:
<!--
*Automatically closes linked issue when PR is merged.
Usage: `Fixes #<issue number>`, or `Fixes (paste link of issue)`.
-->

* Fixes deployment status UI is failed even though status is running or
serving.

**Does this PR introduce a user-facing change?**:
<!--
If no, just write "NONE" in the release-note block below.
If yes, a release note is required. Enter your extended release note in
the block below.
If the PR requires additional action from users switching to the new
release, include the string "action required".

For more information about release notes, see kubernetes' guide here:
http://git.k8s.io/community/contributors/guide/release-notes.md
-->

```release-note
NONE
```

**Checklist**

- [ ] Added unit test, integration, and/or e2e tests
- [x] Tested locally
- [ ] Updated documentation
- [ ] Update Swagger spec if the PR introduce API changes
- [ ] Regenerated Golang and Python client if the PR introduce API
changes
  • Loading branch information
ariefrahmansyah authored Oct 26, 2023
1 parent e78908c commit 7fc8e96
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 123 deletions.
4 changes: 2 additions & 2 deletions api/models/version_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ type VersionEndpoint struct {
// ServiceName service name
ServiceName string `json:"service_name" gorm:"service_name"`
// InferenceServiceName name of inference service
InferenceServiceName string `json:"-" gorm:"inference_service_name"`
InferenceServiceName string `json:"inference_service_name" gorm:"inference_service_name"`
// Namespace namespace where the version is deployed at
Namespace string `json:"-" gorm:"namespace"`
Namespace string `json:"namespace" gorm:"namespace"`
// MonitoringURL URL pointing to the version endpoint's dashboard
MonitoringURL string `json:"monitoring_url,omitempty" gorm:"-"`
// Environment environment where the version endpoint is deployed
Expand Down
54 changes: 32 additions & 22 deletions api/queue/work/model_service_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,25 @@ func (depl *ModelServiceDeployment) Deploy(job *queue.Job) error {
return err
}

endpointArg := jobArgs.Endpoint
endpoint, err := depl.Storage.Get(endpointArg.ID)
endpoint := jobArgs.Endpoint
previousStatus := endpoint.Status

version := jobArgs.Version
project := jobArgs.Project
model := jobArgs.Model
model.Project = project

prevEndpoint, err := depl.Storage.Get(endpoint.ID)
if errors.Is(err, gorm.ErrRecordNotFound) {
log.Errorf("could not found version endpoint with id %s and error: %v", endpointArg.ID, err)
log.Errorf("could not found version endpoint with id %s and error: %v", endpoint.ID, err)
return err
}
if err != nil {
log.Errorf("could not fetch version endpoint with id %s and error: %v", endpointArg.ID, err)
log.Errorf("could not fetch version endpoint with id %s and error: %v", endpoint.ID, err)
// If error getting record from db, return err as RetryableError to enable retry
return queue.RetryableError{Message: err.Error()}
}

version := jobArgs.Version
project := jobArgs.Project
model := jobArgs.Model
model.Project = project

isRedeployment := false

// Need to reassign destionationURL cause it is ignored when marshalled and unmarshalled
Expand All @@ -83,13 +85,11 @@ func (depl *ModelServiceDeployment) Deploy(job *queue.Job) error {
}

endpoint.RevisionID++
endpoint.Status = models.EndpointFailed

// for backward compatibility, if inference service name is not empty, it means we are redeploying the "legacy" endpoint that created prior to model version revision introduction
// for future compatibility, if endpoint.RevisionID > 1, it means we are redeploying the endpoint that created after model version revision introduction
if endpoint.InferenceServiceName != "" || endpoint.RevisionID > 1 {
isRedeployment = true
endpoint.Status = endpointArg.Status
}

log.Infof("creating deployment for model %s version %s revision %s with endpoint id: %s", model.Name, endpoint.VersionID, endpoint.RevisionID, endpoint.ID)
Expand All @@ -115,17 +115,25 @@ func (depl *ModelServiceDeployment) Deploy(job *queue.Job) error {
log.Warnf("unable to update deployment history", err)
}

// record the version endpoint result
if err := depl.Storage.Save(endpoint); err != nil {
log.Errorf("unable to update endpoint status for model: %s, version: %s, reason: %v", model.Name, version.ID, err)
// if redeployment failed, we only update the previous endpoint status from pending to previous status
if deployment.Error != "" {
prevEndpoint.Status = previousStatus
// but if it's not redeployment (first deployment), we set the status to failed
if !isRedeployment {
prevEndpoint.Status = models.EndpointFailed
}

// record the version endpoint result if deployment
if err := depl.Storage.Save(prevEndpoint); err != nil {
log.Errorf("unable to update endpoint status for model: %s, version: %s, reason: %v", model.Name, version.ID, err)
}
}
}()

modelOpt, err := depl.generateModelOptions(ctx, model, version)
if err != nil {
deployment.Status = models.EndpointFailed
deployment.Error = err.Error()
endpoint.Message = err.Error()
return err
}

Expand All @@ -134,7 +142,6 @@ func (depl *ModelServiceDeployment) Deploy(job *queue.Job) error {
if !ok {
deployment.Status = models.EndpointFailed
deployment.Error = err.Error()
endpoint.Message = err.Error()
return fmt.Errorf("unable to find cluster controller for environment %s", endpoint.EnvironmentName)
}

Expand All @@ -143,24 +150,27 @@ func (depl *ModelServiceDeployment) Deploy(job *queue.Job) error {
log.Errorf("unable to deploy version endpoint for model: %s, version: %s, reason: %v", model.Name, version.ID, err)
deployment.Status = models.EndpointFailed
deployment.Error = err.Error()
endpoint.Message = err.Error()
return err
}

// By reaching this point, the deployment is successful
endpoint.URL = svc.URL
previousStatus := endpointArg.Status
endpoint.ServiceName = svc.ServiceName
endpoint.InferenceServiceName = svc.CurrentIsvcName
endpoint.Message = "" // reset message

if previousStatus == models.EndpointServing {
endpoint.Status = models.EndpointServing
} else {
endpoint.Status = models.EndpointRunning
}
endpoint.ServiceName = svc.ServiceName
endpoint.InferenceServiceName = svc.CurrentIsvcName
endpoint.Message = "" // reset message

deployment.Status = endpoint.Status

// record the new endpoint result if deployment is successful
if err := depl.Storage.Save(endpoint); err != nil {
log.Errorf("unable to update endpoint status for model: %s, version: %s, reason: %v", model.Name, version.ID, err)
}

return nil
}

Expand Down
Loading

0 comments on commit 7fc8e96

Please sign in to comment.