Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor deployments storage on redeploy and undeployment #483

Merged
merged 18 commits into from
Nov 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/models/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@ type Deployment struct {
Error string `json:"error"`
CreatedUpdated
}

func (d *Deployment) IsSuccess() bool {
return d.Status == EndpointRunning || d.Status == EndpointServing
}
11 changes: 9 additions & 2 deletions api/queue/work/model_service_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,15 @@ func (depl *ModelServiceDeployment) Deploy(job *queue.Job) error {

// record the deployment result
deployment.UpdatedAt = time.Now()
if _, err := depl.DeploymentStorage.Save(deployment); err != nil {
log.Warnf("unable to update deployment history", err)
if deployment.IsSuccess() {
if err := depl.DeploymentStorage.OnDeploymentSuccess(deployment); err != nil {
ariefrahmansyah marked this conversation as resolved.
Show resolved Hide resolved
log.Errorf("unable to update deployment history on successful deployment (ID: %+v): %s", deployment.ID, err)
}
} else {
// If failed, only update the latest deployment
if _, err := depl.DeploymentStorage.Save(deployment); err != nil {
log.Errorf("unable to update deployment history for failed deployment (ID: %+v): %s", deployment.ID, err)
}
}

// if redeployment failed, we only update the previous endpoint status from pending to previous status
Expand Down
28 changes: 15 additions & 13 deletions api/queue/work/model_service_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestExecuteDeployment(t *testing.T) {
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("Save", mock.Anything).Return(nil, nil)
mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil)
return mockStorage
},
storage: func() *mocks.VersionEndpointStorage {
Expand Down Expand Up @@ -129,7 +129,7 @@ func TestExecuteDeployment(t *testing.T) {
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("Save", mock.Anything).Return(nil, nil)
mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil)
return mockStorage
},
storage: func() *mocks.VersionEndpointStorage {
Expand Down Expand Up @@ -174,7 +174,7 @@ func TestExecuteDeployment(t *testing.T) {
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("Save", mock.Anything).Return(nil, nil)
mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil)
return mockStorage
},
storage: func() *mocks.VersionEndpointStorage {
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestExecuteDeployment(t *testing.T) {
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("Save", mock.Anything).Return(nil, nil)
mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil)
return mockStorage
},
storage: func() *mocks.VersionEndpointStorage {
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestExecuteDeployment(t *testing.T) {
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("Save", mock.Anything).Return(nil, nil)
mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil)
return mockStorage
},
storage: func() *mocks.VersionEndpointStorage {
Expand Down Expand Up @@ -328,7 +328,6 @@ func TestExecuteDeployment(t *testing.T) {
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("Save", mock.Anything).Return(nil, nil)
return mockStorage
},
storage: func() *mocks.VersionEndpointStorage {
Expand Down Expand Up @@ -368,7 +367,6 @@ func TestExecuteDeployment(t *testing.T) {
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("Save", mock.Anything).Return(nil, nil)
return mockStorage
},
storage: func() *mocks.VersionEndpointStorage {
Expand Down Expand Up @@ -430,7 +428,6 @@ func TestExecuteDeployment(t *testing.T) {
}

mockStorage.AssertNumberOfCalls(t, "Save", 1)
mockDeploymentStorage.AssertNumberOfCalls(t, "Save", 2)

savedEndpoint := mockStorage.Calls[1].Arguments[0].(*models.VersionEndpoint)
assert.Equal(t, tt.model.ID, savedEndpoint.VersionModelID)
Expand All @@ -445,8 +442,11 @@ func TestExecuteDeployment(t *testing.T) {
}

if tt.deployErr != nil {
mockDeploymentStorage.AssertNumberOfCalls(t, "Save", 2)
assert.Equal(t, models.EndpointFailed, savedEndpoint.Status)
} else {
mockDeploymentStorage.AssertNumberOfCalls(t, "Save", 1)
mockDeploymentStorage.AssertNumberOfCalls(t, "OnDeploymentSuccess", 1)
assert.Equal(t, models.EndpointRunning, savedEndpoint.Status)
assert.Equal(t, url, savedEndpoint.URL)
assert.Equal(t, "", savedEndpoint.InferenceServiceName)
Expand Down Expand Up @@ -527,7 +527,7 @@ func TestExecuteRedeployment(t *testing.T) {
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("Save", mock.Anything).Return(nil, nil)
mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil)
return mockStorage
},
storage: func() *mocks.VersionEndpointStorage {
Expand Down Expand Up @@ -592,7 +592,7 @@ func TestExecuteRedeployment(t *testing.T) {
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("Save", mock.Anything).Return(nil, nil)
mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil)
return mockStorage
},
storage: func() *mocks.VersionEndpointStorage {
Expand Down Expand Up @@ -657,7 +657,7 @@ func TestExecuteRedeployment(t *testing.T) {
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("Save", mock.Anything).Return(nil, nil)
mockStorage.On("OnDeploymentSuccess", mock.Anything).Return(nil)
return mockStorage
},
storage: func() *mocks.VersionEndpointStorage {
Expand Down Expand Up @@ -723,7 +723,6 @@ func TestExecuteRedeployment(t *testing.T) {
deploymentStorage: func() *mocks.DeploymentStorage {
mockStorage := &mocks.DeploymentStorage{}
mockStorage.On("Save", mock.Anything).Return(&models.Deployment{}, nil)
mockStorage.On("Save", mock.Anything).Return(nil, nil)
return mockStorage
},
storage: func() *mocks.VersionEndpointStorage {
Expand Down Expand Up @@ -798,7 +797,6 @@ func TestExecuteRedeployment(t *testing.T) {
}

mockStorage.AssertNumberOfCalls(t, "Save", 1)
mockDeploymentStorage.AssertNumberOfCalls(t, "Save", 2)
krithika369 marked this conversation as resolved.
Show resolved Hide resolved

savedEndpoint := mockStorage.Calls[1].Arguments[0].(*models.VersionEndpoint)
assert.Equal(t, tt.model.ID, savedEndpoint.VersionModelID)
Expand All @@ -814,8 +812,12 @@ func TestExecuteRedeployment(t *testing.T) {

assert.Equal(t, tt.expectedEndpointStatus, savedEndpoint.Status)
if tt.deployErr == nil {
mockDeploymentStorage.AssertNumberOfCalls(t, "Save", 1)
mockDeploymentStorage.AssertNumberOfCalls(t, "OnDeploymentSuccess", 1)
assert.Equal(t, url, savedEndpoint.URL)
assert.Equal(t, modelSvcName, savedEndpoint.InferenceServiceName)
} else {
mockDeploymentStorage.AssertNumberOfCalls(t, "Save", 2)
}
})
}
Expand Down
8 changes: 6 additions & 2 deletions api/service/version_endpoint_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,12 @@ func (k *endpointService) UndeployEndpoint(ctx context.Context, environment *mod
}

endpoint.Status = models.EndpointTerminated
err = k.storage.Save(endpoint)
if err != nil {

if err := k.storage.Save(endpoint); err != nil {
return nil, err
}

if err := k.deploymentStorage.Undeploy(model.ID.String(), version.ID.String(), endpoint.ID.String()); err != nil {
return nil, err
}

Expand Down
86 changes: 86 additions & 0 deletions api/storage/deployment_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package storage

import (
"fmt"

"gorm.io/gorm"

"github.com/caraml-dev/merlin/models"
Expand All @@ -27,6 +29,10 @@ type DeploymentStorage interface {
ListInModelVersion(modelID, versionID, endpointUUID string) ([]*models.Deployment, error)
// Save save the deployment to underlying storage
Save(deployment *models.Deployment) (*models.Deployment, error)
// OnDeploymentSuccess updates the new deployment status to successful on DB and update all previous deployment status for that version endpoint to terminated.
OnDeploymentSuccess(newDeployment *models.Deployment) error
krithika369 marked this conversation as resolved.
Show resolved Hide resolved
// Undeploy updates all successful deployment status to terminated on DB
Undeploy(modelID, versionID, endpointUUID string) error
// GetFirstSuccessModelVersionPerModel Return mapping of model id and the first model version with a successful model version
GetFirstSuccessModelVersionPerModel() (map[models.ID]models.ID, error)
Delete(modelID models.ID, versionID models.ID) error
Expand Down Expand Up @@ -83,3 +89,83 @@ func (d *deploymentStorage) GetFirstSuccessModelVersionPerModel() (map[models.ID
func (d *deploymentStorage) Delete(modelID models.ID, versionID models.ID) error {
return d.db.Where("version_id = ? AND version_model_id = ?", versionID, modelID).Delete(models.Deployment{}).Error
}

// OnDeploymentSuccess updates the new deployment status to successful on DB and update all previous successful deployment status for that version endpoint to terminated.
func (d *deploymentStorage) OnDeploymentSuccess(newDeployment *models.Deployment) error {
if newDeployment.ID == 0 {
return fmt.Errorf("newDeployment.ID must not be empty")
}

if !newDeployment.IsSuccess() {
krithika369 marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("newDeployment.Status must be running or serving")
}

tx := d.db.Begin()
if tx.Error != nil {
return fmt.Errorf("failed to begin transaction: %w", tx.Error)
}

var err error
defer func() {
if err != nil {
tx.Rollback()
} else {
tx.Commit()
}
}()

var deployments []*models.Deployment
err = tx.Where("version_model_id = ? AND version_id = ? AND version_endpoint_id = ? AND status IN ('running', 'serving') AND id != ?",
newDeployment.VersionModelID, newDeployment.VersionID, newDeployment.VersionEndpointID, newDeployment.ID).Find(&deployments).Error
if err != nil {
return err
}

for i := range deployments {
// Set older successful deployment to terminated
deployments[i].Status = models.EndpointTerminated
}

deployments = append(deployments, newDeployment)

err = tx.Save(deployments).Error
if err != nil {
return err
}

return err
}

func (d *deploymentStorage) Undeploy(modelID, versionID, endpointUUID string) error {
tx := d.db.Begin()
if tx.Error != nil {
return fmt.Errorf("failed to begin transaction: %w", tx.Error)
}

var err error
defer func() {
if err != nil {
tx.Rollback()
} else {
tx.Commit()
}
}()

var deployments []*models.Deployment
err = tx.Where("version_model_id = ? AND version_id = ? AND version_endpoint_id = ? AND status IN ('running', 'serving')",
modelID, versionID, endpointUUID).Find(&deployments).Error
if err != nil {
return err
}

for i := range deployments {
deployments[i].Status = models.EndpointTerminated
}

err = tx.Save(deployments).Error
if err != nil {
return err
}

return err
}
Loading
Loading