Skip to content

Commit

Permalink
Add Ingress probe
Browse files Browse the repository at this point in the history
Signed-off-by: Sivanantham Chinnaiyan <[email protected]>
  • Loading branch information
sivanantha321 committed Mar 7, 2024
1 parent b0fef32 commit 15fab58
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 51 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/scheduled-go-security-scan.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
args: '-no-fail -fmt=sarif -out=go-security-scan-results.sarif -exclude-dir=pkg/client -exclude-dir=pkg/clientv1alpha1 ./...'

- name: Upload SARIF file to Github Code Scanning
uses: github/codeql-action/upload-sarif@v2
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: go-security-scan-results.sarif
category: gosec-tool
3 changes: 2 additions & 1 deletion pkg/apis/serving/v1beta1/inference_service_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ package v1beta1
import (
"reflect"

"github.com/kserve/kserve/pkg/constants"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
knservingv1 "knative.dev/serving/pkg/apis/serving/v1"

"github.com/kserve/kserve/pkg/constants"
)

// InferenceServiceStatus defines the observed state of InferenceService
Expand Down
37 changes: 20 additions & 17 deletions pkg/controller/v1beta1/inferenceservice/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,6 @@ import (
"reflect"

"github.com/go-logr/logr"
v1alpha1api "github.com/kserve/kserve/pkg/apis/serving/v1alpha1"
v1beta1api "github.com/kserve/kserve/pkg/apis/serving/v1beta1"
"github.com/kserve/kserve/pkg/constants"
"github.com/kserve/kserve/pkg/controller/v1beta1/inferenceservice/components"
"github.com/kserve/kserve/pkg/controller/v1beta1/inferenceservice/reconcilers/cabundleconfigmap"
"github.com/kserve/kserve/pkg/controller/v1beta1/inferenceservice/reconcilers/ingress"
modelconfig "github.com/kserve/kserve/pkg/controller/v1beta1/inferenceservice/reconcilers/modelconfig"
isvcutils "github.com/kserve/kserve/pkg/controller/v1beta1/inferenceservice/utils"
"github.com/kserve/kserve/pkg/utils"
"github.com/pkg/errors"
istioclientv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1"
appsv1 "k8s.io/api/apps/v1"
Expand All @@ -46,6 +37,16 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1alpha1api "github.com/kserve/kserve/pkg/apis/serving/v1alpha1"
v1beta1api "github.com/kserve/kserve/pkg/apis/serving/v1beta1"
"github.com/kserve/kserve/pkg/constants"
"github.com/kserve/kserve/pkg/controller/v1beta1/inferenceservice/components"
"github.com/kserve/kserve/pkg/controller/v1beta1/inferenceservice/reconcilers/cabundleconfigmap"
"github.com/kserve/kserve/pkg/controller/v1beta1/inferenceservice/reconcilers/ingress"
modelconfig "github.com/kserve/kserve/pkg/controller/v1beta1/inferenceservice/reconcilers/modelconfig"
isvcutils "github.com/kserve/kserve/pkg/controller/v1beta1/inferenceservice/utils"
"github.com/kserve/kserve/pkg/utils"
)

// +kubebuilder:rbac:groups=serving.kserve.io,resources=inferenceservices;inferenceservices/finalizers,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -92,7 +93,7 @@ type InferenceServiceReconciler struct {

func (r *InferenceServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = context.Background()

result := ctrl.Result{}
// Fetch the InferenceService instance
isvc := &v1beta1api.InferenceService{}
if err := r.Get(ctx, req.NamespacedName, isvc); err != nil {
Expand Down Expand Up @@ -185,18 +186,18 @@ func (r *InferenceServiceReconciler) Reconcile(ctx context.Context, req ctrl.Req
reconcilers = append(reconcilers, components.NewExplainer(r.Client, r.Scheme, isvcConfig, deploymentMode))
}
for _, reconciler := range reconcilers {
result, err := reconciler.Reconcile(isvc)
res, err := reconciler.Reconcile(isvc)
if err != nil {
r.Log.Error(err, "Failed to reconcile", "reconciler", reflect.ValueOf(reconciler), "Name", isvc.Name)
r.Recorder.Eventf(isvc, v1.EventTypeWarning, "InternalError", err.Error())
if err := r.updateStatus(isvc, deploymentMode); err != nil {
r.Log.Error(err, "Error updating status")
return result, err
return res, err
}
return reconcile.Result{}, errors.Wrapf(err, "fails to reconcile component")
}
if result.Requeue || result.RequeueAfter > 0 {
return result, nil
if res.Requeue || res.RequeueAfter > 0 {
return res, nil
}
}
// reconcile RoutesReady and LatestDeploymentReady conditions for serverless deployment
Expand Down Expand Up @@ -229,8 +230,10 @@ func (r *InferenceServiceReconciler) Reconcile(ctx context.Context, req ctrl.Req
} else {
reconciler := ingress.NewIngressReconciler(r.Client, r.Scheme, ingressConfig)
r.Log.Info("Reconciling ingress for inference service", "isvc", isvc.Name)
if err := reconciler.Reconcile(isvc); err != nil {
return reconcile.Result{}, errors.Wrapf(err, "fails to reconcile ingress")
if res, err := reconciler.Reconcile(isvc); err != nil {
return res, errors.Wrapf(err, "fails to reconcile ingress")
} else if res.Requeue || res.RequeueAfter > 0 {
result = res
}
}

Expand All @@ -245,7 +248,7 @@ func (r *InferenceServiceReconciler) Reconcile(ctx context.Context, req ctrl.Req
return reconcile.Result{}, err
}

return ctrl.Result{}, nil
return result, nil
}

func (r *InferenceServiceReconciler) updateStatus(desiredService *v1beta1api.InferenceService, deploymentMode constants.DeploymentModeType) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ package ingress

import (
"context"
"crypto/tls"
"fmt"
"net/http"
"strings"
"time"

"github.com/google/go-cmp/cmp"
"github.com/kserve/kserve/pkg/apis/serving/v1beta1"
"github.com/kserve/kserve/pkg/constants"
utils "github.com/kserve/kserve/pkg/utils"
"github.com/pkg/errors"
"google.golang.org/protobuf/testing/protocmp"
istiov1beta1 "istio.io/api/networking/v1beta1"
Expand All @@ -33,21 +35,27 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
knnethttp "knative.dev/networking/pkg/http"
knheader "knative.dev/networking/pkg/http/header"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/kmp"
"knative.dev/pkg/network"
knservingv1 "knative.dev/serving/pkg/apis/serving/v1"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"strings"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/kserve/kserve/pkg/apis/serving/v1beta1"
"github.com/kserve/kserve/pkg/constants"
utils "github.com/kserve/kserve/pkg/utils"
)

var (
log = logf.Log.WithName("IngressReconciler")
// probeTimeout defines the maximum amount of time a request will wait
probeTimeout = 1 * time.Second
)

type IngressReconciler struct {
Expand Down Expand Up @@ -174,7 +182,7 @@ func getHostBasedServiceUrl(isvc *v1beta1.InferenceService, config *v1beta1.Ingr
}
}

func (r *IngressReconciler) reconcileExternalService(isvc *v1beta1.InferenceService, config *v1beta1.IngressConfig) error {
func (ir *IngressReconciler) reconcileExternalService(isvc *v1beta1.InferenceService, config *v1beta1.IngressConfig) error {
desired := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: isvc.Name,
Expand All @@ -186,17 +194,17 @@ func (r *IngressReconciler) reconcileExternalService(isvc *v1beta1.InferenceServ
SessionAffinity: corev1.ServiceAffinityNone,
},
}
if err := controllerutil.SetControllerReference(isvc, desired, r.scheme); err != nil {
if err := controllerutil.SetControllerReference(isvc, desired, ir.scheme); err != nil {
return err
}

// Create service if does not exist
existing := &corev1.Service{}
err := r.client.Get(context.TODO(), types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, existing)
err := ir.client.Get(context.TODO(), types.NamespacedName{Name: desired.Name, Namespace: desired.Namespace}, existing)
if err != nil {
if apierr.IsNotFound(err) {
log.Info("Creating external name service", "namespace", desired.Namespace, "name", desired.Name)
err = r.client.Create(context.TODO(), desired)
err = ir.client.Create(context.TODO(), desired)
}
return err
}
Expand All @@ -216,7 +224,7 @@ func (r *IngressReconciler) reconcileExternalService(isvc *v1beta1.InferenceServ
existing.Spec = desired.Spec
existing.ObjectMeta.Labels = desired.ObjectMeta.Labels
existing.ObjectMeta.Annotations = desired.ObjectMeta.Annotations
err = r.client.Update(context.TODO(), existing)
err = ir.client.Update(context.TODO(), existing)
if err != nil {
return errors.Wrapf(err, "fails to update external name service")
}
Expand Down Expand Up @@ -454,12 +462,52 @@ func createIngress(isvc *v1beta1.InferenceService, useDefault bool, config *v1be
return desiredIngress
}

func (ir *IngressReconciler) Reconcile(isvc *v1beta1.InferenceService) error {
func probeIngress(url string) (bool, error) {
isReady := false
// Probes Queue-Proxy or Activator
target := url + knnethttp.HealthCheckPath
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.TLSClientConfig = &tls.Config{
// #nosec G402
// We only want to know that the Ingress is configured, not that the configuration is valid.
// Therefore, we can safely ignore any TLS certificate validation.
InsecureSkipVerify: true,
}
transport.DisableKeepAlives = true
ctx, cancel := context.WithTimeout(context.TODO(), probeTimeout)
defer cancel()
req, err := http.NewRequest(http.MethodGet, target, nil)
if err != nil {
return isReady, errors.Wrapf(err, "failed to probe ingress %s", target)
}
// ProbeKey is the name of a header that can be added to requests to probe the ingress.
// Requests with this header will not be passed to the user container or included in request metrics.
req.Header.Add(knheader.ProbeKey, knheader.ProbeValue)
req.Header.Add(knheader.HashKey, knheader.HashValueOverride)
// IngressReadinessUserAgent is the user-agent header value set in probe requests for Ingress status.
req.Header.Add(knheader.UserAgentKey, knheader.IngressReadinessUserAgent)
req = req.WithContext(ctx)
resp, err := transport.RoundTrip(req)
if err != nil {
return isReady, errors.Wrapf(err, "failed to probe ingress %s", target)
}
log.Info("ingress probe result", "statuscode", resp.StatusCode)
if resp.StatusCode == http.StatusOK {
isReady = true
}
return isReady, nil
}

func (ir *IngressReconciler) isIngressReady(isvc *v1beta1.InferenceService) bool {
return isvc.Generation == isvc.Status.ObservedGeneration && isvc.Status.GetCondition(v1beta1.IngressReady).IsTrue()
}

func (ir *IngressReconciler) Reconcile(isvc *v1beta1.InferenceService) (ctrl.Result, error) {
serviceHost := getServiceHost(isvc)
serviceUrl := getServiceUrl(isvc, ir.ingressConfig)
disableIstioVirtualHost := ir.ingressConfig.DisableIstioVirtualHost
if serviceHost == "" || serviceUrl == "" {
return nil
return ctrl.Result{}, nil
}
// When Istio virtual host is disabled, we return the underlying component url.
// When Istio virtual host is enabled. we return the url using inference service virtual host name and redirect to the corresponding transformer, predictor or explainer url.
Expand All @@ -473,16 +521,16 @@ func (ir *IngressReconciler) Reconcile(isvc *v1beta1.InferenceService) error {
}
desiredIngress := createIngress(isvc, useDefault, ir.ingressConfig)
if desiredIngress == nil {
return nil
return ctrl.Result{}, nil
}

//Create external service which points to local gateway
if err := ir.reconcileExternalService(isvc, ir.ingressConfig); err != nil {
return errors.Wrapf(err, "fails to reconcile external name service")
return ctrl.Result{}, errors.Wrapf(err, "fails to reconcile external name service")
}

if err := controllerutil.SetControllerReference(isvc, desiredIngress, ir.scheme); err != nil {
return errors.Wrapf(err, "fails to set owner reference for ingress")
return ctrl.Result{}, errors.Wrapf(err, "fails to set owner reference for ingress")
}

existing := &istioclientv1beta1.VirtualService{}
Expand All @@ -503,7 +551,7 @@ func (ir *IngressReconciler) Reconcile(isvc *v1beta1.InferenceService) error {
}
}
if err != nil {
return errors.Wrapf(err, "fails to create or update ingress")
return ctrl.Result{}, errors.Wrapf(err, "fails to create or update ingress")
}
}

Expand All @@ -522,20 +570,40 @@ func (ir *IngressReconciler) Reconcile(isvc *v1beta1.InferenceService) error {
} else {
hostPrefix = getHostPrefix(isvc, disableIstioVirtualHost, false)
}

host := network.GetServiceHostname(hostPrefix, isvc.Namespace)
scheme := "http"
isvc.Status.Address = &duckv1.Addressable{
URL: &apis.URL{
Host: network.GetServiceHostname(hostPrefix, isvc.Namespace),
Scheme: "http",
Host: host,
Scheme: scheme,
},
}
isvc.Status.SetCondition(v1beta1.IngressReady, &apis.Condition{
Type: v1beta1.IngressReady,
Status: corev1.ConditionTrue,
})
return nil
if ir.isIngressReady(isvc) {
// When the ingress has already been marked Ready for this generation,
// then it must have been successfully probed. This exception necessary for the case
// of global resyncs.
// As this is an optimization, we don't worry about the ObservedGeneration
// skew we might see when the resource is actually in flux, we simply care
// about the steady state.
} else {
if isReady, err := probeIngress(isvc.Status.Address.URL.String()); err != nil {
return ctrl.Result{}, err
} else if isReady {
isvc.Status.SetCondition(v1beta1.IngressReady, &apis.Condition{
Type: v1beta1.IngressReady,
Status: corev1.ConditionTrue,
})
} else {
isvc.Status.SetCondition(v1beta1.IngressReady, &apis.Condition{
Type: v1beta1.IngressReady,
Status: corev1.ConditionFalse,
})
return ctrl.Result{Requeue: true}, nil
}
}
return ctrl.Result{}, nil
} else {
return errors.Wrapf(err, "fails to parse service url")
return ctrl.Result{}, errors.Wrapf(err, "fails to parse service url")
}
}

Expand Down
5 changes: 0 additions & 5 deletions test/e2e/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import json
import logging
import os
import time
from urllib.parse import urlparse

import grpc
Expand Down Expand Up @@ -94,8 +93,6 @@ def predict_str(service_name, input_json, protocol_version="v1",
logging.info("Sending Header = %s", headers)
logging.info("Sending url = %s", url)
logging.info("Sending request data: %s", input_json)
# temporary sleep until this is fixed https://github.com/kserve/kserve/issues/604
time.sleep(10)
response = requests.post(url, input_json, headers=headers)
logging.info("Got response code %s, content %s", response.status_code, response.content)
if response.status_code == 200:
Expand Down Expand Up @@ -158,8 +155,6 @@ def explain_response(service_name, input_json):
data = json.load(json_file)
logging.info("Sending request data: %s", json.dumps(data))
try:
# temporary sleep until this is fixed https://github.com/kserve/kserve/issues/604
time.sleep(10)
response = requests.post(url, json.dumps(data), headers=headers)
logging.info(
"Got response code %s, content %s",
Expand Down

0 comments on commit 15fab58

Please sign in to comment.