Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add custom status readers #314

Merged
merged 1 commit into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions internal/kstatus/statusreaders/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package statusreaders

import (
"context"
"errors"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
)

// baseStatusReader is the implementation of the StatusReader interface defined
// in the engine package. It contains the basic logic needed for every resource.
// In order to handle resource specific logic, it must include an implementation
// of the resourceTypeStatusReader interface.
// In practice we will create many instances of baseStatusReader, each with a different
// implementation of the resourceTypeStatusReader interface and therefore each
// of the instances will be able to handle different resource types.
type baseStatusReader struct {
// mapper provides a way to look up the resource types that are available
// in the cluster.
mapper meta.RESTMapper

// resourceStatusReader is an resource-type specific implementation
// of the resourceTypeStatusReader interface. While the baseStatusReader
// contains the logic shared between all resource types, this implementation
// will contain the resource specific info.
resourceStatusReader resourceTypeStatusReader
}

// resourceTypeStatusReader is an interface that can be implemented differently
// for each resource type.
type resourceTypeStatusReader interface {
Supports(gk schema.GroupKind) bool
ReadStatusForObject(ctx context.Context, reader engine.ClusterReader, object *unstructured.Unstructured) (*event.ResourceStatus, error)
}

func (b *baseStatusReader) Supports(gk schema.GroupKind) bool {
return b.resourceStatusReader.Supports(gk)
}

// ReadStatus reads the object identified by the passed-in identifier and computes it's status. It reads
// the resource here, but computing status is delegated to the ReadStatusForObject function.
func (b *baseStatusReader) ReadStatus(ctx context.Context, reader engine.ClusterReader, identifier object.ObjMetadata) (*event.ResourceStatus, error) {
object, err := b.lookupResource(ctx, reader, identifier)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this lookup needed? If it's needed that's ultimately fine, but seems like the unstructured resource should have been available and could have been avoided.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It resolves the GroupKindVersion for the given identifier and check if the resource exists

if err != nil {
return errIdentifierToResourceStatus(err, identifier)
}
return b.resourceStatusReader.ReadStatusForObject(ctx, reader, object)
}

// ReadStatusForObject computes the status for the passed-in object. Since this is specific for each
// resource type, the actual work is delegated to the implementation of the resourceTypeStatusReader interface.
func (b *baseStatusReader) ReadStatusForObject(ctx context.Context, reader engine.ClusterReader, object *unstructured.Unstructured) (*event.ResourceStatus, error) {
return b.resourceStatusReader.ReadStatusForObject(ctx, reader, object)
}

// lookupResource looks up a resource with the given identifier. It will use the rest mapper to resolve
// the version of the GroupKind given in the identifier.
// If the resource is found, it is returned. If it is not found or something
// went wrong, the function will return an error.
func (b *baseStatusReader) lookupResource(ctx context.Context, reader engine.ClusterReader, identifier object.ObjMetadata) (*unstructured.Unstructured, error) {
GVK, err := gvk(identifier.GroupKind, b.mapper)
if err != nil {
return nil, err
}

var u unstructured.Unstructured
u.SetGroupVersionKind(GVK)
key := types.NamespacedName{
Name: identifier.Name,
Namespace: identifier.Namespace,
}
err = reader.Get(ctx, key, &u)
if err != nil {
return nil, err
}
return &u, nil
}

// gvk looks up the GVK from a GroupKind using the rest mapper.
func gvk(gk schema.GroupKind, mapper meta.RESTMapper) (schema.GroupVersionKind, error) {
mapping, err := mapper.RESTMapping(gk)
if err != nil {
return schema.GroupVersionKind{}, err
}
return mapping.GroupVersionKind, nil
}

// errResourceToResourceStatus construct the appropriate ResourceStatus
// object based on an error and the resource itself.
func errResourceToResourceStatus(err error, resource *unstructured.Unstructured, genResources ...*event.ResourceStatus) (*event.ResourceStatus, error) {
// If the error is from the context, we don't attach that to the ResourceStatus,
// but just return it directly so the caller can decide how to handle this
// situation.
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return nil, err
}
identifier := object.UnstructuredToObjMetadata(resource)
if apierrors.IsNotFound(err) {
return &event.ResourceStatus{
Identifier: identifier,
Status: status.NotFoundStatus,
Message: "Resource not found",
}, nil
}
return &event.ResourceStatus{
Identifier: identifier,
Status: status.UnknownStatus,
Resource: resource,
Error: err,
GeneratedResources: genResources,
}, nil
}

// errIdentifierToResourceStatus construct the appropriate ResourceStatus
// object based on an error and the identifier for a resource.
func errIdentifierToResourceStatus(err error, identifier object.ObjMetadata) (*event.ResourceStatus, error) {
// If the error is from the context, we don't attach that to the ResourceStatus,
// but just return it directly so the caller can decide how to handle this
// situation.
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return nil, err
}
if apierrors.IsNotFound(err) {
return &event.ResourceStatus{
Identifier: identifier,
Status: status.NotFoundStatus,
Message: "Resource not found",
}, nil
}
return &event.ResourceStatus{
Identifier: identifier,
Status: status.UnknownStatus,
Error: err,
}, nil
}
84 changes: 84 additions & 0 deletions internal/kstatus/statusreaders/default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package statusreaders

import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/statusreaders"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
)

// NewDefaultStatusReader returns a DelegatingStatusReader that wraps a list of
// statusreaders to cover all built-in Kubernetes resources and other CRDs that
// follow known status conventions.
func NewDefaultStatusReader(mapper meta.RESTMapper) engine.StatusReader {
return NewStatusReader(mapper)
}

// NewStatusReader returns a DelegatingStatusReader that includes the statusreaders
// for the build-in Kubernetes resources and also any provided custom status readers.
func NewStatusReader(mapper meta.RESTMapper, statusReaders ...engine.StatusReader) engine.StatusReader {
defaultStatusReader := statusreaders.NewGenericStatusReader(mapper, status.Compute)

replicaSetStatusReader := NewReplicaSetStatusReader(mapper)
deploymentStatusReader := NewDeploymentResourceReader(mapper)
statefulSetStatusReader := NewStatefulSetResourceReader(mapper)

statusReaders = append(statusReaders,
deploymentStatusReader,
statefulSetStatusReader,
replicaSetStatusReader,
defaultStatusReader,
)

return &DelegatingStatusReader{
StatusReaders: statusReaders,
}
}

type DelegatingStatusReader struct {
StatusReaders []engine.StatusReader
}

func (dsr *DelegatingStatusReader) Supports(gk schema.GroupKind) bool {
for _, sr := range dsr.StatusReaders {
if sr.Supports(gk) {
return true
}
}
return false
}

func (dsr *DelegatingStatusReader) ReadStatus(
ctx context.Context,
reader engine.ClusterReader,
id object.ObjMetadata,
) (*event.ResourceStatus, error) {
gk := id.GroupKind
for _, sr := range dsr.StatusReaders {
if sr.Supports(gk) {
return sr.ReadStatus(ctx, reader, id)
}
}
return nil, fmt.Errorf("no status reader supports this resource: %v", gk)
}

func (dsr *DelegatingStatusReader) ReadStatusForObject(
ctx context.Context,
reader engine.ClusterReader,
obj *unstructured.Unstructured,
) (*event.ResourceStatus, error) {
gk := obj.GroupVersionKind().GroupKind()
for _, sr := range dsr.StatusReaders {
if sr.Supports(gk) {
return sr.ReadStatusForObject(ctx, reader, obj)
}
}
return nil, fmt.Errorf("no status reader supports this resource: %v", gk)
}
54 changes: 54 additions & 0 deletions internal/kstatus/statusreaders/deployment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package statusreaders

import (
"context"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
)

func NewDeploymentResourceReader(mapper meta.RESTMapper) engine.StatusReader {
return &baseStatusReader{
mapper: mapper,
resourceStatusReader: &deploymentResourceReader{
mapper: mapper,
},
}
}

// deploymentResourceReader is a resourceTypeStatusReader that can fetch Deployment
// resources from the cluster, knows how to find any ReplicaSets belonging to the
// Deployment, and compute status for the deployment.
type deploymentResourceReader struct {
mapper meta.RESTMapper
}

var _ resourceTypeStatusReader = &deploymentResourceReader{}

func (d *deploymentResourceReader) Supports(gk schema.GroupKind) bool {
return gk == appsv1.SchemeGroupVersion.WithKind("Deployment").GroupKind()
}

func (d *deploymentResourceReader) ReadStatusForObject(_ context.Context, _ engine.ClusterReader,
deployment *unstructured.Unstructured) (*event.ResourceStatus, error) {
identifier := object.UnstructuredToObjMetadata(deployment)

res, err := status.Compute(deployment)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't the code that lists the pods w/in kstatus? I would assume this just means we'll execute it by delegating to it.

Copy link
Member Author

@zreigz zreigz Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code looks at the properties that are available on all or most of the Kubernetes resources (like: .status.currentReplicas, .status.replicas etc.). I got rid of the code for checking the Pod's status. It checks only the main resource type

if err != nil {
return errResourceToResourceStatus(err, deployment, []*event.ResourceStatus{}...)
}

return &event.ResourceStatus{
Identifier: identifier,
Status: res.Status,
Resource: deployment,
Message: res.Message,
GeneratedResources: []*event.ResourceStatus{},
}, nil
}
52 changes: 52 additions & 0 deletions internal/kstatus/statusreaders/replicaset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package statusreaders

import (
"context"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
)

func NewReplicaSetStatusReader(mapper meta.RESTMapper) engine.StatusReader {
return &baseStatusReader{
mapper: mapper,
resourceStatusReader: &replicaSetStatusReader{
mapper: mapper,
},
}
}

// replicaSetStatusReader is an engine that can fetch ReplicaSet resources
// from the cluster, knows how to find any Pods belonging to the ReplicaSet,
// and compute status for the ReplicaSet.
type replicaSetStatusReader struct {
mapper meta.RESTMapper
}

var _ resourceTypeStatusReader = &replicaSetStatusReader{}

func (r *replicaSetStatusReader) Supports(gk schema.GroupKind) bool {
return gk == appsv1.SchemeGroupVersion.WithKind("ReplicaSet").GroupKind()
}

func (r *replicaSetStatusReader) ReadStatusForObject(_ context.Context, _ engine.ClusterReader, rs *unstructured.Unstructured) (*event.ResourceStatus, error) {
identifier := object.UnstructuredToObjMetadata(rs)
res, err := status.Compute(rs)
if err != nil {
return errResourceToResourceStatus(err, rs, []*event.ResourceStatus{}...)
}

return &event.ResourceStatus{
Identifier: identifier,
Status: res.Status,
Resource: rs,
Message: res.Message,
GeneratedResources: []*event.ResourceStatus{},
}, nil
}
53 changes: 53 additions & 0 deletions internal/kstatus/statusreaders/statefulset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package statusreaders

import (
"context"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
)

func NewStatefulSetResourceReader(mapper meta.RESTMapper) engine.StatusReader {
return &baseStatusReader{
mapper: mapper,
resourceStatusReader: &statefulSetResourceReader{
mapper: mapper,
},
}
}

// statefulSetResourceReader is an implementation of the ResourceReader interface
// that can fetch StatefulSet resources from the cluster, knows how to find any
// Pods belonging to the StatefulSet, and compute status for the StatefulSet.
type statefulSetResourceReader struct {
mapper meta.RESTMapper
}

var _ resourceTypeStatusReader = &statefulSetResourceReader{}

func (s *statefulSetResourceReader) Supports(gk schema.GroupKind) bool {
return gk == appsv1.SchemeGroupVersion.WithKind("StatefulSet").GroupKind()
}

func (s *statefulSetResourceReader) ReadStatusForObject(_ context.Context, _ engine.ClusterReader,
statefulSet *unstructured.Unstructured) (*event.ResourceStatus, error) {
identifier := object.UnstructuredToObjMetadata(statefulSet)
res, err := status.Compute(statefulSet)
if err != nil {
return errResourceToResourceStatus(err, statefulSet, []*event.ResourceStatus{}...)
}

return &event.ResourceStatus{
Identifier: identifier,
Status: res.Status,
Resource: statefulSet,
Message: res.Message,
GeneratedResources: []*event.ResourceStatus{},
}, nil
}
Loading
Loading