Skip to content

Commit

Permalink
feat: add webhook to model version deployment & undeployment (#601)
Browse files Browse the repository at this point in the history
# Description
When deploying or undeploying model endpoint, other entity might want to
get a trigger to automate their process or for Merlin other process.
This PR add webhooks call (based on the [webhook from
MLP](https://github.com/caraml-dev/mlp/blob/main/api/pkg/webhooks/README.md)),
so on model version deployment/undeployment it will call the configured
webhooks.


# Modifications
**Main Changes:** 

Added webhooks call on
- Model version pre-deployment: will ignore the success response (for
this version) and will stop/fail the deployment version if any `async`
webhook fail
- Model version post-deployment: will ignore error, only log the error
if any occur during the call
- Model version post-undeployment: will ignore error, only log the error
if any occur during the call

Request payload to webhook: 
- Request:
  - `event_type`: name of event which triggers the webhook
  - `versionEndpoint`: object version endpoint

**Side effect changes**:
- With MLP update in go.mod, the assert function is also updated.
Previously, the `assert.InEpsilon` can pass when item in actual slice is
in expected slice, even though the expected slice might have more items;
now the `testify/assert` will check the two slices length first
([ref](https://github.com/stretchr/testify/pull/1483/files)) -> Added
some changes to fix the unit test in `TestToFloat64List`

# Tests

# Checklist
- [x] Added PR label
- [x] Added unit test, integration, and/or e2e tests
- [x] Tested locally
- [ ] Updated documentation
- [ ] Update Swagger spec if the PR introduce API changes
- [ ] Regenerated Golang and Python client if the PR introduces API
changes

# Release Notes
<!--
Does this PR introduce a user-facing change?
If no, just write "NONE" in the release-note block below.
If yes, a release note is required. Enter your extended release note in
the block below.
If the PR requires additional action from users switching to the new
release, include the string "action required".

For more information about release notes, see kubernetes' guide here:
http://git.k8s.io/community/contributors/guide/release-notes.md
-->

```release-note
add configurable webhook call in endpoint deployment and undeployment
```
  • Loading branch information
bthari authored Aug 19, 2024
1 parent 19e1acd commit da2803a
Show file tree
Hide file tree
Showing 13 changed files with 465 additions and 21 deletions.
12 changes: 9 additions & 3 deletions api/cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
"time"

mlflowDelete "github.com/caraml-dev/mlp/api/pkg/client/mlflow"

"github.com/caraml-dev/mlp/api/pkg/instrumentation/sentry"
webhookManager "github.com/caraml-dev/mlp/api/pkg/webhooks"
_ "github.com/golang-migrate/migrate/v4/source/file"
"github.com/gorilla/mux"
"github.com/heptiolabs/healthcheck"
Expand All @@ -49,6 +49,7 @@ import (
"github.com/caraml-dev/merlin/service"
"github.com/caraml-dev/merlin/storage"
"github.com/caraml-dev/merlin/warden"
"github.com/caraml-dev/merlin/webhooks"
"github.com/caraml-dev/mlp/api/pkg/authz/enforcer"
"github.com/caraml-dev/mlp/api/pkg/instrumentation/newrelic"
)
Expand Down Expand Up @@ -267,15 +268,20 @@ func buildDependencies(ctx context.Context, cfg *config.Config, db *gorm.DB, dis
log.Panicf("invalid deployment label prefix (%s): %s", cfg.DeploymentLabelPrefix, err)
}

webhookClient, err := webhookManager.InitializeWebhooks(&cfg.WebhooksConfig, webhooks.WebhookEvents)
if err != nil {
log.Panicf("failed to initialize webhooks: %s", err)
}

webServiceBuilder, predJobBuilder, imageBuilderJanitor := initImageBuilder(cfg)

observabilityPublisherStorage := storage.NewObservabilityPublisherStorage(db)
observabilityPublisherDeployment := initObservabilityPublisherDeployment(cfg, observabilityPublisherStorage)
versionStorage := storage.NewVersionStorage(db)
observabilityEvent := event.NewEventProducer(dispatcher, observabilityPublisherStorage, versionStorage)
clusterControllers := initClusterControllers(cfg)
modelServiceDeployment := initModelServiceDeployment(cfg, webServiceBuilder, clusterControllers, db, observabilityEvent)
versionEndpointService := initVersionEndpointService(cfg, webServiceBuilder, clusterControllers, db, coreClient, dispatcher)
modelServiceDeployment := initModelServiceDeployment(cfg, webServiceBuilder, clusterControllers, db, observabilityEvent, webhookClient)
versionEndpointService := initVersionEndpointService(cfg, webServiceBuilder, clusterControllers, db, coreClient, dispatcher, webhookClient)
modelEndpointService := initModelEndpointService(cfg, db, observabilityEvent)

batchControllers := initBatchControllers(cfg, db, mlpAPIClient)
Expand Down
7 changes: 5 additions & 2 deletions api/cmd/api/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/clientset/versioned"
"github.com/caraml-dev/mlp/api/pkg/artifact"
"github.com/caraml-dev/mlp/api/pkg/auth"
"github.com/caraml-dev/mlp/api/pkg/webhooks"
feast "github.com/feast-dev/feast/sdk/go"
"github.com/feast-dev/feast/sdk/go/protos/feast/core"
"google.golang.org/grpc"
Expand Down Expand Up @@ -421,7 +422,7 @@ func initPredictionJobService(cfg *config.Config, controllers map[string]batch.C
return service.NewPredictionJobService(controllers, builder, predictionJobStorage, clock.RealClock{}, cfg.Environment, producer)
}

func initModelServiceDeployment(cfg *config.Config, builder imagebuilder.ImageBuilder, controllers map[string]cluster.Controller, db *gorm.DB, observabilityEvent event.EventProducer) *work.ModelServiceDeployment {
func initModelServiceDeployment(cfg *config.Config, builder imagebuilder.ImageBuilder, controllers map[string]cluster.Controller, db *gorm.DB, observabilityEvent event.EventProducer, webhookManager webhooks.WebhookManager) *work.ModelServiceDeployment {
return &work.ModelServiceDeployment{
ClusterControllers: controllers,
ImageBuilder: builder,
Expand All @@ -430,6 +431,7 @@ func initModelServiceDeployment(cfg *config.Config, builder imagebuilder.ImageBu
LoggerDestinationURL: cfg.LoggerDestinationURL,
MLObsLoggerDestinationURL: cfg.MLObsLoggerDestinationURL,
ObservabilityEventProducer: observabilityEvent,
WebhookManager: webhookManager,
}
}

Expand Down Expand Up @@ -502,7 +504,7 @@ func initClusterControllers(cfg *config.Config) map[string]cluster.Controller {
return controllers
}

func initVersionEndpointService(cfg *config.Config, builder imagebuilder.ImageBuilder, controllers map[string]cluster.Controller, db *gorm.DB, feastCoreClient core.CoreServiceClient, producer queue.Producer) service.EndpointsService {
func initVersionEndpointService(cfg *config.Config, builder imagebuilder.ImageBuilder, controllers map[string]cluster.Controller, db *gorm.DB, feastCoreClient core.CoreServiceClient, producer queue.Producer, webhookManager webhooks.WebhookManager) service.EndpointsService {
return service.NewEndpointService(service.EndpointServiceParams{
ClusterControllers: controllers,
ImageBuilder: builder,
Expand All @@ -514,6 +516,7 @@ func initVersionEndpointService(cfg *config.Config, builder imagebuilder.ImageBu
JobProducer: producer,
FeastCoreClient: feastCoreClient,
StandardTransformerConfig: cfg.StandardTransformerConfig,
WebhookManager: webhookManager,
})
}

Expand Down
2 changes: 2 additions & 0 deletions api/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
mlpcluster "github.com/caraml-dev/mlp/api/pkg/cluster"
"github.com/caraml-dev/mlp/api/pkg/instrumentation/newrelic"
"github.com/caraml-dev/mlp/api/pkg/instrumentation/sentry"
"github.com/caraml-dev/mlp/api/pkg/webhooks"
"github.com/go-playground/validator/v10"
"github.com/mitchellh/mapstructure"
"github.com/ory/viper"
Expand Down Expand Up @@ -70,6 +71,7 @@ type Config struct {
PyFuncPublisherConfig PyFuncPublisherConfig
InferenceServiceDefaults InferenceServiceDefaults
ObservabilityPublisher ObservabilityPublisher
WebhooksConfig webhooks.Config
}

// UIConfig stores the configuration for the UI.
Expand Down
20 changes: 20 additions & 0 deletions api/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
mlpcluster "github.com/caraml-dev/mlp/api/pkg/cluster"
"github.com/caraml-dev/mlp/api/pkg/instrumentation/newrelic"
"github.com/caraml-dev/mlp/api/pkg/instrumentation/sentry"
"github.com/caraml-dev/mlp/api/pkg/webhooks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/durationpb"
Expand Down Expand Up @@ -598,6 +599,25 @@ func TestLoad(t *testing.T) {
},
DeploymentTimeout: 30 * time.Minute,
},
WebhooksConfig: webhooks.Config{
Enabled: true,
Config: map[webhooks.EventType][]webhooks.WebhookConfig{
"on-model-deployed": {
{
Name: "sync-webhooks",
URL: "http://127.0.0.1:8000/sync-webhook",
Method: "POST",
FinalResponse: true,
},
{
Name: "async-webhooks",
URL: "http://127.0.0.1:8000/async-webhook",
Method: "POST",
Async: true,
},
},
},
},
},
},
"missing file": {
Expand Down
12 changes: 12 additions & 0 deletions api/config/testdata/base-configs-1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,15 @@ InferenceServiceDefaults:
DefaultEnvVarsWithoutCPULimits:
- Name: foo
Value: bar
WebhooksConfig:
Enabled: true
Config:
On-Model-Deployed:
- URL: http://127.0.0.1:8000/sync-webhook
Method: POST
FinalResponse: true
Name: sync-webhooks
- URL: http://127.0.0.1:8000/async-webhook
Method: POST
Name: async-webhooks
Async: true
7 changes: 4 additions & 3 deletions api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/bboughton/gcp-helpers v0.1.0
github.com/buger/jsonparser v1.1.1
github.com/caraml-dev/merlin-pyspark-app v0.0.3
github.com/caraml-dev/mlp v1.12.2-0.20240517121307-b89dab536aab
github.com/caraml-dev/mlp v1.13.2-rc2
github.com/caraml-dev/protopath v0.1.0
github.com/caraml-dev/universal-prediction-interface v1.0.0
github.com/cenkalti/backoff/v4 v4.2.1
Expand Down Expand Up @@ -64,7 +64,7 @@ require (
github.com/rs/cors v1.8.2
github.com/soheilhy/cmux v0.1.5
github.com/spaolacci/murmur3 v1.1.0
github.com/stretchr/testify v1.8.4
github.com/stretchr/testify v1.9.0
github.com/xanzy/go-gitlab v0.32.0
go.opencensus.io v0.24.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0
Expand Down Expand Up @@ -213,7 +213,7 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.8.1 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
github.com/tomnomnom/linkheader v0.0.0-20180905144013-02ca5825eb80 // indirect
github.com/valyala/fastjson v1.6.3 // indirect
Expand Down Expand Up @@ -246,6 +246,7 @@ require (
)

require (
github.com/avast/retry-go/v4 v4.6.0 // indirect
golang.org/x/time v0.5.0 // indirect
k8s.io/klog/v2 v2.120.1 // indirect
k8s.io/kube-openapi v0.0.0-20231113174909-778a5567bc1e // indirect
Expand Down
13 changes: 8 additions & 5 deletions api/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmV
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/avast/retry-go/v4 v4.6.0 h1:K9xNA+KeB8HHc2aWFuLb25Offp+0iVRXEvFx8IinRJA=
github.com/avast/retry-go/v4 v4.6.0/go.mod h1:gvWlPhBVsvBbLkVGDg/KwvBv0bEkCOLRRSHKIr2PyOE=
github.com/aws/aws-sdk-go v1.17.7/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.50.0 h1:HBtrLeO+QyDKnc3t1+5DR1RxodOHCGr8ZcrHudpv7jI=
github.com/aws/aws-sdk-go v1.50.0/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
Expand All @@ -159,8 +161,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dR
github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
github.com/caraml-dev/mlp v1.12.2-0.20240517121307-b89dab536aab h1:+XKM4kEBZz1gEbOHrphso6HxmMGSfss9TyMBIE0hm2M=
github.com/caraml-dev/mlp v1.12.2-0.20240517121307-b89dab536aab/go.mod h1:Zdz4bALO9WOHXhOgsoLmCjMCJnDVEZEnQFg8rk+u2cE=
github.com/caraml-dev/mlp v1.13.2-rc2 h1:Zmyoy3OTPv2fU+42rxMwUt9erS9J6QA0nlZQy/xCPtk=
github.com/caraml-dev/mlp v1.13.2-rc2/go.mod h1:jKfnUEpCcARv/aJF6qH7vT7VMKICDVOq/pDFvj6V3vQ=
github.com/caraml-dev/protopath v0.1.0 h1:hjJ/U9RGD6QZ0Ee9SIYbVmwPugps4S5EpL6R+5ZrBe0=
github.com/caraml-dev/protopath v0.1.0/go.mod h1:hVA2HkTrMYv+Q57gtrzu9/P7EXlNtBUcTz43z6EE010=
github.com/caraml-dev/universal-prediction-interface v1.0.0 h1:3Z6adv1XZnBVRzFIeCu3mPcPnJrdB5IByYfdD9K/atI=
Expand Down Expand Up @@ -998,8 +1000,9 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.3.1-0.20190311161405-34c6fa2dc709/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
Expand All @@ -1010,8 +1013,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stvp/go-udp-testing v0.0.0-20201019212854-469649b16807/go.mod h1:7jxmlfBCDBXRzr0eAQJ48XC1hBu1np4CS5+cHEYfwpc=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
Expand Down
2 changes: 1 addition & 1 deletion api/pkg/transformer/types/converter/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1882,7 +1882,7 @@ func TestToFloat64List(t *testing.T) {
{
name: "from []float32",
args: args{
val: []float32{float32(3.14), float32(math.NaN())},
val: []float32{float32(3.14), float32(4.56), float32(math.NaN())},
},
want: []float64{3.14, 4.56},
wantErr: false,
Expand Down
16 changes: 16 additions & 0 deletions api/queue/work/model_service_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/caraml-dev/merlin/pkg/observability/event"
"github.com/caraml-dev/merlin/queue"
"github.com/caraml-dev/merlin/storage"
"github.com/caraml-dev/merlin/webhooks"
webhookManager "github.com/caraml-dev/mlp/api/pkg/webhooks"
"github.com/prometheus/client_golang/prometheus"
"gorm.io/gorm"
)
Expand Down Expand Up @@ -42,6 +44,7 @@ type ModelServiceDeployment struct {
LoggerDestinationURL string
MLObsLoggerDestinationURL string
ObservabilityEventProducer event.EventProducer
WebhookManager webhookManager.WebhookManager
}

type EndpointJob struct {
Expand Down Expand Up @@ -208,6 +211,19 @@ func (depl *ModelServiceDeployment) Deploy(job *queue.Job) error {
}
}

// calling webhooks if there's any webhooks configured
if depl.WebhookManager != nil && depl.WebhookManager.IsEventConfigured(webhooks.OnModelVersionDeployed) {
body := &webhooks.VersionEndpointRequest{
EventType: webhooks.OnModelVersionDeployed,
VersionEndpoint: endpoint,
}

err = depl.WebhookManager.InvokeWebhooks(ctx, webhooks.OnModelVersionDeployed, body, webhookManager.NoOpCallback, webhookManager.NoOpErrorHandler)
if err != nil {
log.Warnf("unable to invoke webhooks for event type: %s, model: %s, version: %s, error: %v", webhooks.OnModelVersionDeployed, model.Name, version.ID, err)
}
}

return nil
}

Expand Down
Loading

0 comments on commit da2803a

Please sign in to comment.