Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RealtimeAPI CRD #2366

Open
wants to merge 45 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
4ab5c7c
Initial kubebuilder scaffolding
Jul 12, 2021
20b7a6f
Populate RealtimeAPI CRD types and add kubebuilder validation annotat…
Jul 12, 2021
0613ff3
Initial implementation of the realtime api controller
Jul 15, 2021
c5ecd08
Add desired resources methods for realtime api controller
Jul 20, 2021
3334a92
Fix CRD types
Jul 20, 2021
a56ece5
Add istio to scheme
Jul 20, 2021
32b6be2
Fix RealtimeAPI CRD defaulting behaviour
Jul 20, 2021
ad5cfdb
Fix createOrUpdateService method
Jul 21, 2021
164d492
Rename CRD api group to serverless
Jul 21, 2021
5f22fd8
Update logger name for serverless CRD controllers
Jul 21, 2021
6a97d37
Add additional print columns to realtime crd
Jul 21, 2021
da929a6
Fix annotations in realtime crd
Jul 21, 2021
f164ecb
Fix endpoint string on realtime crd status
Jul 21, 2021
27c8307
Update createOrUpdate* methods
Jul 21, 2021
2f0f481
Create apiID and deploymentID annotations on resource creation
Jul 21, 2021
a51872e
Set controller reference on child resources
Jul 21, 2021
5d830c7
Add replica counts to RealtimeAPI CRD status
Jul 22, 2021
766f154
Handle api ids annotations
Jul 22, 2021
07af2a8
WIP: refactor UpdateAPI function for RealtimeAPI to work with the CRD
Jul 24, 2021
9feb4e7
Implementation of get, refresh and delete operations for realtime api…
Jul 25, 2021
532c8f2
Fix linting errors
Jul 25, 2021
c8e8d49
Merge branch 'master' into feature/realtime-crd
Jul 25, 2021
7c7afa0
Fix typo in UpdateStrategySpec struct
Jul 26, 2021
424cba7
Declare slice size when known beforehand
Jul 26, 2021
b8d8b6a
Rename helper function
Jul 27, 2021
39ddc6b
Fix `cortex get` behaviour by uploading specs to S3
Jul 27, 2021
72b8fa4
Merge branch 'master' into feature/realtime-crd
Jul 27, 2021
17427b3
Update API status following the Realtime CRD addition (#2375)
RobertLucian Jul 29, 2021
69f0fe8
Remove unnecessary annotations and add descriptions to the api status…
Jul 29, 2021
128cf5b
Fix deep equal comparison
Jul 29, 2021
c1df9b7
Fix rolling update on autoscaling spec update
Jul 29, 2021
b7cac93
WIP: update realtime scaler to work with CRD
Jul 30, 2021
e27f414
Fix autoscaler for RealtimeAPI CRD
RobertLucian Jul 30, 2021
fb5c085
RealtimeAPI controller fixes
RobertLucian Jul 30, 2021
835ff98
Add InitReplicas to RealtimeAPI spec
RobertLucian Jul 30, 2021
7ae8949
Simplify GetAutoscalingSpec function
RobertLucian Jul 30, 2021
5050519
Add serverless to autoscaler's scheme
RobertLucian Jul 31, 2021
934f1ff
apiID is required for determining if a pod is up-to-date or not
RobertLucian Jul 31, 2021
d0a1b61
Allow autoscaler to get/update realtimeapis resources
RobertLucian Jul 31, 2021
cf85344
Merge branch 'master' into feature/realtime-crd
RobertLucian Jul 31, 2021
cac1a11
Fix cortex logs/refresh cmds for realtime API
RobertLucian Jul 31, 2021
c31bf2c
Fix CORTEX_PORT not present on realtime api pods
RobertLucian Jul 31, 2021
2c8c233
Use deployment-id instead of api-id for
RobertLucian Jul 31, 2021
763acf0
Ensure that the `min_replicas`/`max_replicas` range is ensured by the…
RobertLucian Jul 31, 2021
b4b8745
Revert explicit return expression
RobertLucian Jul 31, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cli/cmd/lib_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ import (
func replicaCountTable(counts *status.ReplicaCounts) table.Table {
var rows [][]interface{}
for _, replicaCountType := range status.ReplicaCountTypes {
// skip up-to-date count
if replicaCountType == status.ReplicaCountUpToDate {
continue
}

count := counts.GetCountBy(replicaCountType)
canBeHiddenIfZero := false
switch replicaCountType {
Expand Down
24 changes: 18 additions & 6 deletions cli/cmd/lib_async_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ func asyncDescribeAPITable(asyncAPI schema.APIResponse, env cliconfig.Environmen
return "", errors.ErrorUnexpected("missing metadata from operator response")
}

if asyncAPI.Status == nil {
return "", errors.ErrorUnexpected(fmt.Sprintf("missing status for %s api", asyncAPI.Metadata.Name))
if asyncAPI.ReplicaCounts == nil {
return "", errors.ErrorUnexpected(fmt.Sprintf("missing replica counts for %s api", asyncAPI.Metadata.Name))
}

t := asyncAPIsTable([]schema.APIResponse{asyncAPI}, []string{env.Name})
Expand All @@ -75,7 +75,7 @@ func asyncDescribeAPITable(asyncAPI schema.APIResponse, env cliconfig.Environmen
out += "\n" + console.Bold("endpoint: ") + *asyncAPI.Endpoint + "\n"
}

t = replicaCountTable(asyncAPI.Status.ReplicaCounts)
t = replicaCountTable(asyncAPI.ReplicaCounts)
out += "\n" + t.MustFormat()

return out, nil
Expand All @@ -85,15 +85,27 @@ func asyncAPIsTable(asyncAPIs []schema.APIResponse, envNames []string) table.Tab
rows := make([][]interface{}, 0, len(asyncAPIs))

for i, asyncAPI := range asyncAPIs {
if asyncAPI.Metadata == nil || asyncAPI.Status == nil {
if asyncAPI.Metadata == nil || (asyncAPI.Status == nil && asyncAPI.ReplicaCounts == nil) {
continue
}

var ready, requested, upToDate int32
if asyncAPI.Status != nil {
ready = asyncAPI.Status.Ready
requested = asyncAPI.Status.Requested
upToDate = asyncAPI.Status.UpToDate
} else {
ready = asyncAPI.ReplicaCounts.Ready
requested = asyncAPI.ReplicaCounts.Requested
upToDate = asyncAPI.ReplicaCounts.UpToDate
}

lastUpdated := time.Unix(asyncAPI.Metadata.LastUpdated, 0)
rows = append(rows, []interface{}{
envNames[i],
asyncAPI.Metadata.Name,
fmt.Sprintf("%d/%d", asyncAPI.Status.Ready, asyncAPI.Status.Requested),
asyncAPI.Status.UpToDate,
fmt.Sprintf("%d/%d", ready, requested),
upToDate,
libtime.SinceStr(&lastUpdated),
})
}
Expand Down
24 changes: 18 additions & 6 deletions cli/cmd/lib_realtime_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ func realtimeDescribeAPITable(realtimeAPI schema.APIResponse, env cliconfig.Envi
return "", errors.ErrorUnexpected("missing metadata from operator response")
}

if realtimeAPI.Status == nil {
return "", errors.ErrorUnexpected(fmt.Sprintf("missing status for %s api", realtimeAPI.Metadata.Name))
if realtimeAPI.ReplicaCounts == nil {
return "", errors.ErrorUnexpected(fmt.Sprintf("missing replica counts for %s api", realtimeAPI.Metadata.Name))
}

t := realtimeAPIsTable([]schema.APIResponse{realtimeAPI}, []string{env.Name})
Expand All @@ -74,7 +74,7 @@ func realtimeDescribeAPITable(realtimeAPI schema.APIResponse, env cliconfig.Envi
out += "\n" + console.Bold("endpoint: ") + *realtimeAPI.Endpoint + "\n"
}

t = replicaCountTable(realtimeAPI.Status.ReplicaCounts)
t = replicaCountTable(realtimeAPI.ReplicaCounts)
out += "\n" + t.MustFormat()

return out, nil
Expand All @@ -84,15 +84,27 @@ func realtimeAPIsTable(realtimeAPIs []schema.APIResponse, envNames []string) tab
rows := make([][]interface{}, 0, len(realtimeAPIs))

for i, realtimeAPI := range realtimeAPIs {
if realtimeAPI.Metadata == nil || realtimeAPI.Status == nil {
if realtimeAPI.Metadata == nil || (realtimeAPI.Status == nil && realtimeAPI.ReplicaCounts == nil) {
continue
}

var ready, requested, upToDate int32
if realtimeAPI.Status != nil {
ready = realtimeAPI.Status.Ready
requested = realtimeAPI.Status.Requested
upToDate = realtimeAPI.Status.UpToDate
} else {
ready = realtimeAPI.ReplicaCounts.Ready
requested = realtimeAPI.ReplicaCounts.Requested
upToDate = realtimeAPI.ReplicaCounts.UpToDate
}

lastUpdated := time.Unix(realtimeAPI.Metadata.LastUpdated, 0)
rows = append(rows, []interface{}{
envNames[i],
realtimeAPI.Metadata.Name,
fmt.Sprintf("%d/%d", realtimeAPI.Status.Ready, realtimeAPI.Status.Requested),
realtimeAPI.Status.UpToDate,
fmt.Sprintf("%d/%d", ready, requested),
upToDate,
libtime.SinceStr(&lastUpdated),
})
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

"github.com/cortexlabs/cortex/pkg/autoscaler"
serverless "github.com/cortexlabs/cortex/pkg/crds/apis/serverless/v1alpha1"
"github.com/cortexlabs/cortex/pkg/lib/aws"
"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/k8s"
Expand Down Expand Up @@ -107,6 +108,9 @@ func main() {
defer telemetry.Close()

scheme := runtime.NewScheme()
if err := serverless.AddToScheme(scheme); err != nil {
exit(log, err, "failed to add k8s serverless to scheme")
}
if err := clientgoscheme.AddToScheme(scheme); err != nil {
exit(log, err, "failed to add k8s client-go-scheme to scheme")
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/go-ole/go-ole v1.2.5 // indirect
github.com/gobwas/glob v0.2.3
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/go-cmp v0.5.6 // indirect
github.com/google/go-cmp v0.5.6
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.2.0
github.com/googleapis/gnostic v0.5.5 // indirect
Expand Down
7 changes: 7 additions & 0 deletions manager/manifests/autoscaler.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ rules:
- get
- update
- watch
- apiGroups:
- "serverless.cortex.dev"
resources:
- realtimeapis
verbs:
- get
- update

---

Expand Down
12 changes: 6 additions & 6 deletions pkg/activator/activator.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (a *activator) getOrCreateReadinessTracker(apiName string) *readinessTracke
}

func (a *activator) addAPI(obj interface{}) {
apiMetadata, err := getAPIMeta(obj)
apiMetadata, err := getAPIMeta(obj, true)
if err != nil {
a.logger.Errorw("error during virtual service informer add callback", zap.Error(err))
telemetry.Error(err)
Expand All @@ -182,7 +182,7 @@ func (a *activator) addAPI(obj interface{}) {
}

func (a *activator) updateAPI(oldObj interface{}, newObj interface{}) {
apiMetadata, err := getAPIMeta(newObj)
apiMetadata, err := getAPIMeta(newObj, true)
if err != nil {
a.logger.Errorw("error during virtual service informer update callback", zap.Error(err))
telemetry.Error(err)
Expand All @@ -195,7 +195,7 @@ func (a *activator) updateAPI(oldObj interface{}, newObj interface{}) {

apiName := apiMetadata.apiName

oldAPIMetatada, err := getAPIMeta(oldObj)
oldAPIMetatada, err := getAPIMeta(oldObj, true)
if err != nil {
a.logger.Errorw("error during virtual service informer update callback", zap.Error(err))
telemetry.Error(err)
Expand All @@ -212,7 +212,7 @@ func (a *activator) updateAPI(oldObj interface{}, newObj interface{}) {
}

func (a *activator) removeAPI(obj interface{}) {
apiMetadata, err := getAPIMeta(obj)
apiMetadata, err := getAPIMeta(obj, false)
if err != nil {
a.logger.Errorw("error during virtual service informer delete callback", zap.Error(err))
telemetry.Error(err)
Expand Down Expand Up @@ -250,7 +250,7 @@ func (a *activator) updateReadinessTracker(obj interface{}) {
return
}

api, err := getAPIMeta(obj)
api, err := getAPIMeta(obj, false)
if err != nil {
a.logger.Errorw("error during deployment informer callback", zap.Error(err))
telemetry.Error(err)
Expand All @@ -271,7 +271,7 @@ func (a *activator) updateReadinessTracker(obj interface{}) {
}

func (a *activator) removeReadinessTracker(obj interface{}) {
api, err := getAPIMeta(obj)
api, err := getAPIMeta(obj, false)
if err != nil {
a.logger.Errorw("error during deployment informer callback", zap.Error(err))
telemetry.Error(err)
Expand Down
16 changes: 11 additions & 5 deletions pkg/activator/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type apiMeta struct {
maxQueueLength int
}

func getAPIMeta(obj interface{}) (apiMeta, error) {
func getAPIMeta(obj interface{}, includeAnnotations bool) (apiMeta, error) {
resource, err := meta.Accessor(obj)
if err != nil {
return apiMeta{}, err
Expand All @@ -48,16 +48,22 @@ func getAPIMeta(obj interface{}) (apiMeta, error) {
return apiMeta{}, errors.ErrorUnexpected("got a virtual service without apiName label")
}

maxQueueLength, maxConcurrency, err := userconfig.ConcurrencyFromAnnotations(resource)
if err != nil {
return apiMeta{}, err
var maxQueueLength, maxConcurrency int
var annotations map[string]string

if includeAnnotations {
maxQueueLength, maxConcurrency, err = userconfig.ConcurrencyFromAnnotations(resource)
if err != nil {
return apiMeta{}, err
}
annotations = resource.GetAnnotations()
}

return apiMeta{
apiName: apiName,
apiKind: userconfig.KindFromString(apiKind),
labels: labels,
annotations: resource.GetAnnotations(),
annotations: annotations,
maxConcurrency: maxConcurrency,
maxQueueLength: maxQueueLength,
}, nil
Expand Down
27 changes: 9 additions & 18 deletions pkg/autoscaler/autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,9 @@ func TestAutoscaler_Awake(t *testing.T) {

ticker := time.NewTicker(250 * time.Millisecond)
go func() {
for {
select {
case <-ticker.C:
err := autoscaleFn()
require.NoError(t, err)
}
for range ticker.C {
err := autoscaleFn()
require.NoError(t, err)
}
}()

Expand Down Expand Up @@ -372,12 +369,9 @@ func TestAutoscaler_MinReplicas(t *testing.T) {

ticker := time.NewTicker(250 * time.Millisecond)
go func() {
for {
select {
case <-ticker.C:
err := autoscaleFn()
require.NoError(t, err)
}
for range ticker.C {
err := autoscaleFn()
require.NoError(t, err)
}
}()

Expand Down Expand Up @@ -444,12 +438,9 @@ func TestAutoscaler_MaxReplicas(t *testing.T) {

ticker := time.NewTicker(250 * time.Millisecond)
go func() {
for {
select {
case <-ticker.C:
err := autoscaleFn()
require.NoError(t, err)
}
for range ticker.C {
err := autoscaleFn()
require.NoError(t, err)
}
}()

Expand Down
65 changes: 65 additions & 0 deletions pkg/autoscaler/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
Copyright 2021 Cortex Labs, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package autoscaler

import (
serverless "github.com/cortexlabs/cortex/pkg/crds/apis/serverless/v1alpha1"
"github.com/cortexlabs/cortex/pkg/lib/errors"
libstrings "github.com/cortexlabs/cortex/pkg/lib/strings"
"github.com/cortexlabs/cortex/pkg/types/userconfig"
)

func generateAutoscalingFromServerlessRealtimeAPI(realtimeAPI serverless.RealtimeAPI) (*userconfig.Autoscaling, error) {
targetInFlight, ok := libstrings.ParseFloat64(realtimeAPI.Spec.Autoscaling.TargetInFlight)
if !ok {
return nil, errors.ErrorUnexpected("failed to parse target-in-flight requests from autoscaling spec")
}

maxDownscaleFactor, ok := libstrings.ParseFloat64(realtimeAPI.Spec.Autoscaling.MaxDownscaleFactor)
if !ok {
return nil, errors.ErrorUnexpected("failed to parse max downscale factor from autoscaling spec")
}

maxUpscaleFactor, ok := libstrings.ParseFloat64(realtimeAPI.Spec.Autoscaling.MaxUpscaleFactor)
if !ok {
return nil, errors.ErrorUnexpected("failed to parse max upscale factor from autoscaling spec")
}

downscaleTolerance, ok := libstrings.ParseFloat64(realtimeAPI.Spec.Autoscaling.DownscaleTolerance)
if !ok {
return nil, errors.ErrorUnexpected("failed to parse downscale tolerance from autoscaling spec")
}

upscaleTolerance, ok := libstrings.ParseFloat64(realtimeAPI.Spec.Autoscaling.UpscaleTolerance)
if !ok {
return nil, errors.ErrorUnexpected("failed to parse upscale tolerance from autoscaling spec")
}

return &userconfig.Autoscaling{
MinReplicas: realtimeAPI.Spec.Autoscaling.MinReplicas,
MaxReplicas: realtimeAPI.Spec.Autoscaling.MaxReplicas,
InitReplicas: realtimeAPI.Spec.Autoscaling.InitReplicas,
TargetInFlight: &targetInFlight,
Window: realtimeAPI.Spec.Autoscaling.Window.Duration,
DownscaleStabilizationPeriod: realtimeAPI.Spec.Autoscaling.DownscaleStabilizationPeriod.Duration,
UpscaleStabilizationPeriod: realtimeAPI.Spec.Autoscaling.UpscaleStabilizationPeriod.Duration,
MaxDownscaleFactor: maxDownscaleFactor,
MaxUpscaleFactor: maxUpscaleFactor,
DownscaleTolerance: downscaleTolerance,
UpscaleTolerance: upscaleTolerance,
}, nil
}
Loading