Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor deployments storage on redeploy and undeployment #483

Merged
merged 18 commits into from
Nov 19, 2023
Merged
4 changes: 4 additions & 0 deletions api/models/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@ type Deployment struct {
Error string `json:"error"`
CreatedUpdated
}

func (d *Deployment) IsSuccess() bool {
return d.Status == EndpointRunning || d.Status == EndpointServing
}
11 changes: 9 additions & 2 deletions api/queue/work/model_service_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,15 @@ func (depl *ModelServiceDeployment) Deploy(job *queue.Job) error {

// record the deployment result
deployment.UpdatedAt = time.Now()
if _, err := depl.DeploymentStorage.Save(deployment); err != nil {
log.Warnf("unable to update deployment history", err)
if deployment.IsSuccess() {
if err := depl.DeploymentStorage.OnDeploymentSuccess(deployment); err != nil {
ariefrahmansyah marked this conversation as resolved.
Show resolved Hide resolved
log.Errorf("unable to update deployment history on successful deployment (ID: %+v): %s", deployment.ID, err)
}
} else {
// If failed, only update the latest deployment
if _, err := depl.DeploymentStorage.Save(deployment); err != nil {
log.Errorf("unable to update deployment history for failed deployment (ID: %+v): %s", deployment.ID, err)
}
}

// if redeployment failed, we only update the previous endpoint status from pending to previous status
Expand Down
8 changes: 6 additions & 2 deletions api/service/version_endpoint_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,12 @@ func (k *endpointService) UndeployEndpoint(ctx context.Context, environment *mod
}

endpoint.Status = models.EndpointTerminated
err = k.storage.Save(endpoint)
if err != nil {

if err := k.storage.Save(endpoint); err != nil {
return nil, err
}

if err := k.deploymentStorage.Undeploy(model.ID.String(), version.ID.String(), endpoint.ID.String()); err != nil {
return nil, err
}

Expand Down
87 changes: 87 additions & 0 deletions api/storage/deployment_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package storage

import (
"fmt"

"gorm.io/gorm"

"github.com/caraml-dev/merlin/models"
Expand All @@ -27,6 +29,10 @@
ListInModelVersion(modelID, versionID, endpointUUID string) ([]*models.Deployment, error)
// Save save the deployment to underlying storage
Save(deployment *models.Deployment) (*models.Deployment, error)
// OnDeploymentSuccess updates the new deployment status to successful on DB and update all previous deployment status for that version endpoint to terminated.
OnDeploymentSuccess(newDeployment *models.Deployment) error
krithika369 marked this conversation as resolved.
Show resolved Hide resolved
// Undeploy updates all successful deployment status to terminated on DB
Undeploy(modelID, versionID, endpointUUID string) error
// GetFirstSuccessModelVersionPerModel Return mapping of model id and the first model version with a successful model version
GetFirstSuccessModelVersionPerModel() (map[models.ID]models.ID, error)
Delete(modelID models.ID, versionID models.ID) error
Expand Down Expand Up @@ -83,3 +89,84 @@
func (d *deploymentStorage) Delete(modelID models.ID, versionID models.ID) error {
return d.db.Where("version_id = ? AND version_model_id = ?", versionID, modelID).Delete(models.Deployment{}).Error
}

// OnDeploymentSuccess updates the new deployment status to successful on DB and update all previous deployment status for that version endpoint to terminated.
func (d *deploymentStorage) OnDeploymentSuccess(newDeployment *models.Deployment) error {
if newDeployment.ID == 0 {
return fmt.Errorf("newDeployment.ID must not be empty")
}

if newDeployment.Status != models.EndpointRunning && newDeployment.Status != models.EndpointServing {
return fmt.Errorf("newDeployment.Status must be running or serving")
}

tx := d.db.Begin()
if tx.Error != nil {
return fmt.Errorf("failed to begin transaction: %s", tx.Error)

Check failure on line 105 in api/storage/deployment_storage.go

View workflow job for this annotation

GitHub Actions / lint-api

non-wrapping format verb for fmt.Errorf. Use `%w` to format errors (errorlint)
}

var err error
defer func() {
if err != nil {
tx.Rollback()
} else {
tx.Commit()
}
}()

var deployments []*models.Deployment
err = tx.Where("version_model_id = ? AND version_id = ? AND version_endpoint_id = ?",
newDeployment.VersionModelID, newDeployment.VersionID, newDeployment.VersionEndpointID).Find(&deployments).Error
krithika369 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}

for i := range deployments {
if deployments[i].IsSuccess() {
deployments[i].Status = models.EndpointTerminated
}
}

err = tx.Save(deployments).Error
if err != nil {
return err
}

return err
}

func (d *deploymentStorage) Undeploy(modelID, versionID, endpointUUID string) error {
tx := d.db.Begin()
if tx.Error != nil {
return fmt.Errorf("failed to begin transaction: %s", tx.Error)

Check failure on line 141 in api/storage/deployment_storage.go

View workflow job for this annotation

GitHub Actions / lint-api

non-wrapping format verb for fmt.Errorf. Use `%w` to format errors (errorlint)
}

var err error
defer func() {
if err != nil {
tx.Rollback()
} else {
tx.Commit()
}
}()

var deployments []*models.Deployment
err = tx.Where("version_model_id = ? AND version_id = ? AND version_endpoint_id = ?",
modelID, versionID, endpointUUID).Find(&deployments).Error
if err != nil {
return err
}

for i := range deployments {
if deployments[i].IsSuccess() {
deployments[i].Status = models.EndpointTerminated
}
}

err = tx.Save(deployments).Error
if err != nil {
return err
}

return err
}
116 changes: 113 additions & 3 deletions api/storage/deployment_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ package storage
import (
"testing"

"github.com/caraml-dev/merlin/pkg/deployment"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"gorm.io/gorm"

"github.com/caraml-dev/merlin/database"
"github.com/caraml-dev/merlin/mlp"
"github.com/caraml-dev/merlin/models"
"github.com/caraml-dev/merlin/pkg/deployment"
)

func TestDeploymentStorage_List(t *testing.T) {
Expand Down Expand Up @@ -100,8 +100,8 @@ func TestDeploymentStorage_List(t *testing.T) {
_, err = deploymentStorage.Save(deploy2)
assert.NoError(t, err)

jobs, err := deploymentStorage.ListInModel(&m)
assert.Len(t, jobs, 2)
deps, err := deploymentStorage.ListInModel(&m)
assert.Len(t, deps, 2)
})
}

Expand Down Expand Up @@ -198,3 +198,113 @@ func TestDeploymentStorage_GetFirstSuccessModelVersionPerModel(t *testing.T) {
assert.Equal(t, v2.ID, resultMap[m.ID])
})
}

func TestDeploymentStorage_OnDeploymentSuccess(t *testing.T) {
database.WithTestDatabase(t, func(t *testing.T, db *gorm.DB) {
deploymentStorage := NewDeploymentStorage(db)
isDefaultTrue := true

p := mlp.Project{
Name: "project",
MLFlowTrackingURL: "http://mlflow:5000",
}

m := models.Model{
ID: 1,
ProjectID: models.ID(p.ID),
ExperimentID: 1,
Name: "model",
Type: models.ModelTypeSkLearn,
}
db.Create(&m)

v := models.Version{
ModelID: m.ID,
RunID: "1",
ArtifactURI: "gcs:/mlp/1/1",
PythonVersion: "3.7.*",
}
db.Create(&v)

env1 := models.Environment{
Name: "env1",
Cluster: "k8s",
IsDefault: &isDefaultTrue,
}
db.Create(&env1)

endpoint := models.VersionEndpoint{
ID: uuid.New(),
VersionID: v.ID,
VersionModelID: m.ID,
Status: "pending",
EnvironmentName: env1.Name,
DeploymentMode: deployment.ServerlessDeploymentMode,
}
db.Create(&endpoint)

deploy1 := &models.Deployment{
ProjectID: models.ID(p.ID),
VersionID: v.ID,
VersionModelID: m.ID,
VersionEndpointID: endpoint.ID,
Status: models.EndpointTerminated,
Error: "",
CreatedUpdated: models.CreatedUpdated{},
}
db.Create(&deploy1)

deploy2 := &models.Deployment{
ProjectID: models.ID(p.ID),
VersionID: v.ID,
VersionModelID: m.ID,
VersionEndpointID: endpoint.ID,
Status: models.EndpointRunning,
Error: "",
CreatedUpdated: models.CreatedUpdated{},
}
db.Create(&deploy2)

// To make sure that other deployment is not affected
otherDeployment := &models.Deployment{
ProjectID: models.ID(p.ID),
VersionID: v.ID + 1,
VersionModelID: m.ID + 1,
VersionEndpointID: uuid.New(),
Status: models.EndpointServing,
Error: "",
CreatedUpdated: models.CreatedUpdated{},
}
db.Create(&otherDeployment)

newDeployment := &models.Deployment{
ProjectID: models.ID(p.ID),
VersionID: v.ID,
VersionModelID: m.ID,
VersionEndpointID: endpoint.ID,
Status: models.EndpointPending,
Error: "",
CreatedUpdated: models.CreatedUpdated{},
}
db.Create(&newDeployment)

newDeployment.Status = models.EndpointRunning

err := deploymentStorage.OnDeploymentSuccess(newDeployment)
assert.Nil(t, err)

deps, err := deploymentStorage.ListInModel(&m)
assert.Len(t, deps, 3)

for i := range deps {
if deps[i].ID == newDeployment.ID {
assert.Equal(t, models.EndpointRunning, deps[i].Status)
} else {
assert.Equal(t, models.EndpointTerminated, deps[i].Status)
}
}

// To make sure that other deployment is not affected
assert.Equal(t, models.EndpointServing, otherDeployment.Status)
})
}
28 changes: 28 additions & 0 deletions api/storage/mocks/deployment_storage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading