Skip to content

Commit

Permalink
add custom status readers (#314)
Browse files Browse the repository at this point in the history
  • Loading branch information
zreigz authored Nov 19, 2024
1 parent 6257887 commit fe48081
Show file tree
Hide file tree
Showing 6 changed files with 404 additions and 5 deletions.
143 changes: 143 additions & 0 deletions internal/kstatus/statusreaders/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package statusreaders

import (
"context"
"errors"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
)

// baseStatusReader is the implementation of the StatusReader interface defined
// in the engine package. It contains the basic logic needed for every resource.
// In order to handle resource specific logic, it must include an implementation
// of the resourceTypeStatusReader interface.
// In practice we will create many instances of baseStatusReader, each with a different
// implementation of the resourceTypeStatusReader interface and therefore each
// of the instances will be able to handle different resource types.
type baseStatusReader struct {
// mapper provides a way to look up the resource types that are available
// in the cluster.
mapper meta.RESTMapper

// resourceStatusReader is an resource-type specific implementation
// of the resourceTypeStatusReader interface. While the baseStatusReader
// contains the logic shared between all resource types, this implementation
// will contain the resource specific info.
resourceStatusReader resourceTypeStatusReader
}

// resourceTypeStatusReader is an interface that can be implemented differently
// for each resource type.
type resourceTypeStatusReader interface {
Supports(gk schema.GroupKind) bool
ReadStatusForObject(ctx context.Context, reader engine.ClusterReader, object *unstructured.Unstructured) (*event.ResourceStatus, error)
}

func (b *baseStatusReader) Supports(gk schema.GroupKind) bool {
return b.resourceStatusReader.Supports(gk)
}

// ReadStatus reads the object identified by the passed-in identifier and computes it's status. It reads
// the resource here, but computing status is delegated to the ReadStatusForObject function.
func (b *baseStatusReader) ReadStatus(ctx context.Context, reader engine.ClusterReader, identifier object.ObjMetadata) (*event.ResourceStatus, error) {
object, err := b.lookupResource(ctx, reader, identifier)
if err != nil {
return errIdentifierToResourceStatus(err, identifier)
}
return b.resourceStatusReader.ReadStatusForObject(ctx, reader, object)
}

// ReadStatusForObject computes the status for the passed-in object. Since this is specific for each
// resource type, the actual work is delegated to the implementation of the resourceTypeStatusReader interface.
func (b *baseStatusReader) ReadStatusForObject(ctx context.Context, reader engine.ClusterReader, object *unstructured.Unstructured) (*event.ResourceStatus, error) {
return b.resourceStatusReader.ReadStatusForObject(ctx, reader, object)
}

// lookupResource looks up a resource with the given identifier. It will use the rest mapper to resolve
// the version of the GroupKind given in the identifier.
// If the resource is found, it is returned. If it is not found or something
// went wrong, the function will return an error.
func (b *baseStatusReader) lookupResource(ctx context.Context, reader engine.ClusterReader, identifier object.ObjMetadata) (*unstructured.Unstructured, error) {
GVK, err := gvk(identifier.GroupKind, b.mapper)
if err != nil {
return nil, err
}

var u unstructured.Unstructured
u.SetGroupVersionKind(GVK)
key := types.NamespacedName{
Name: identifier.Name,
Namespace: identifier.Namespace,
}
err = reader.Get(ctx, key, &u)
if err != nil {
return nil, err
}
return &u, nil
}

// gvk looks up the GVK from a GroupKind using the rest mapper.
func gvk(gk schema.GroupKind, mapper meta.RESTMapper) (schema.GroupVersionKind, error) {
mapping, err := mapper.RESTMapping(gk)
if err != nil {
return schema.GroupVersionKind{}, err
}
return mapping.GroupVersionKind, nil
}

// errResourceToResourceStatus construct the appropriate ResourceStatus
// object based on an error and the resource itself.
func errResourceToResourceStatus(err error, resource *unstructured.Unstructured, genResources ...*event.ResourceStatus) (*event.ResourceStatus, error) {
// If the error is from the context, we don't attach that to the ResourceStatus,
// but just return it directly so the caller can decide how to handle this
// situation.
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return nil, err
}
identifier := object.UnstructuredToObjMetadata(resource)
if apierrors.IsNotFound(err) {
return &event.ResourceStatus{
Identifier: identifier,
Status: status.NotFoundStatus,
Message: "Resource not found",
}, nil
}
return &event.ResourceStatus{
Identifier: identifier,
Status: status.UnknownStatus,
Resource: resource,
Error: err,
GeneratedResources: genResources,
}, nil
}

// errIdentifierToResourceStatus construct the appropriate ResourceStatus
// object based on an error and the identifier for a resource.
func errIdentifierToResourceStatus(err error, identifier object.ObjMetadata) (*event.ResourceStatus, error) {
// If the error is from the context, we don't attach that to the ResourceStatus,
// but just return it directly so the caller can decide how to handle this
// situation.
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return nil, err
}
if apierrors.IsNotFound(err) {
return &event.ResourceStatus{
Identifier: identifier,
Status: status.NotFoundStatus,
Message: "Resource not found",
}, nil
}
return &event.ResourceStatus{
Identifier: identifier,
Status: status.UnknownStatus,
Error: err,
}, nil
}
84 changes: 84 additions & 0 deletions internal/kstatus/statusreaders/default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package statusreaders

import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/statusreaders"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
)

// NewDefaultStatusReader returns a DelegatingStatusReader that wraps a list of
// statusreaders to cover all built-in Kubernetes resources and other CRDs that
// follow known status conventions.
func NewDefaultStatusReader(mapper meta.RESTMapper) engine.StatusReader {
return NewStatusReader(mapper)
}

// NewStatusReader returns a DelegatingStatusReader that includes the statusreaders
// for the build-in Kubernetes resources and also any provided custom status readers.
func NewStatusReader(mapper meta.RESTMapper, statusReaders ...engine.StatusReader) engine.StatusReader {
defaultStatusReader := statusreaders.NewGenericStatusReader(mapper, status.Compute)

replicaSetStatusReader := NewReplicaSetStatusReader(mapper)
deploymentStatusReader := NewDeploymentResourceReader(mapper)
statefulSetStatusReader := NewStatefulSetResourceReader(mapper)

statusReaders = append(statusReaders,
deploymentStatusReader,
statefulSetStatusReader,
replicaSetStatusReader,
defaultStatusReader,
)

return &DelegatingStatusReader{
StatusReaders: statusReaders,
}
}

type DelegatingStatusReader struct {
StatusReaders []engine.StatusReader
}

func (dsr *DelegatingStatusReader) Supports(gk schema.GroupKind) bool {
for _, sr := range dsr.StatusReaders {
if sr.Supports(gk) {
return true
}
}
return false
}

func (dsr *DelegatingStatusReader) ReadStatus(
ctx context.Context,
reader engine.ClusterReader,
id object.ObjMetadata,
) (*event.ResourceStatus, error) {
gk := id.GroupKind
for _, sr := range dsr.StatusReaders {
if sr.Supports(gk) {
return sr.ReadStatus(ctx, reader, id)
}
}
return nil, fmt.Errorf("no status reader supports this resource: %v", gk)
}

func (dsr *DelegatingStatusReader) ReadStatusForObject(
ctx context.Context,
reader engine.ClusterReader,
obj *unstructured.Unstructured,
) (*event.ResourceStatus, error) {
gk := obj.GroupVersionKind().GroupKind()
for _, sr := range dsr.StatusReaders {
if sr.Supports(gk) {
return sr.ReadStatusForObject(ctx, reader, obj)
}
}
return nil, fmt.Errorf("no status reader supports this resource: %v", gk)
}
54 changes: 54 additions & 0 deletions internal/kstatus/statusreaders/deployment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package statusreaders

import (
"context"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
)

func NewDeploymentResourceReader(mapper meta.RESTMapper) engine.StatusReader {
return &baseStatusReader{
mapper: mapper,
resourceStatusReader: &deploymentResourceReader{
mapper: mapper,
},
}
}

// deploymentResourceReader is a resourceTypeStatusReader that can fetch Deployment
// resources from the cluster, knows how to find any ReplicaSets belonging to the
// Deployment, and compute status for the deployment.
type deploymentResourceReader struct {
mapper meta.RESTMapper
}

var _ resourceTypeStatusReader = &deploymentResourceReader{}

func (d *deploymentResourceReader) Supports(gk schema.GroupKind) bool {
return gk == appsv1.SchemeGroupVersion.WithKind("Deployment").GroupKind()
}

func (d *deploymentResourceReader) ReadStatusForObject(_ context.Context, _ engine.ClusterReader,
deployment *unstructured.Unstructured) (*event.ResourceStatus, error) {
identifier := object.UnstructuredToObjMetadata(deployment)

res, err := status.Compute(deployment)
if err != nil {
return errResourceToResourceStatus(err, deployment, []*event.ResourceStatus{}...)
}

return &event.ResourceStatus{
Identifier: identifier,
Status: res.Status,
Resource: deployment,
Message: res.Message,
GeneratedResources: []*event.ResourceStatus{},
}, nil
}
52 changes: 52 additions & 0 deletions internal/kstatus/statusreaders/replicaset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package statusreaders

import (
"context"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
)

func NewReplicaSetStatusReader(mapper meta.RESTMapper) engine.StatusReader {
return &baseStatusReader{
mapper: mapper,
resourceStatusReader: &replicaSetStatusReader{
mapper: mapper,
},
}
}

// replicaSetStatusReader is an engine that can fetch ReplicaSet resources
// from the cluster, knows how to find any Pods belonging to the ReplicaSet,
// and compute status for the ReplicaSet.
type replicaSetStatusReader struct {
mapper meta.RESTMapper
}

var _ resourceTypeStatusReader = &replicaSetStatusReader{}

func (r *replicaSetStatusReader) Supports(gk schema.GroupKind) bool {
return gk == appsv1.SchemeGroupVersion.WithKind("ReplicaSet").GroupKind()
}

func (r *replicaSetStatusReader) ReadStatusForObject(_ context.Context, _ engine.ClusterReader, rs *unstructured.Unstructured) (*event.ResourceStatus, error) {
identifier := object.UnstructuredToObjMetadata(rs)
res, err := status.Compute(rs)
if err != nil {
return errResourceToResourceStatus(err, rs, []*event.ResourceStatus{}...)
}

return &event.ResourceStatus{
Identifier: identifier,
Status: res.Status,
Resource: rs,
Message: res.Message,
GeneratedResources: []*event.ResourceStatus{},
}, nil
}
53 changes: 53 additions & 0 deletions internal/kstatus/statusreaders/statefulset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package statusreaders

import (
"context"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
)

func NewStatefulSetResourceReader(mapper meta.RESTMapper) engine.StatusReader {
return &baseStatusReader{
mapper: mapper,
resourceStatusReader: &statefulSetResourceReader{
mapper: mapper,
},
}
}

// statefulSetResourceReader is an implementation of the ResourceReader interface
// that can fetch StatefulSet resources from the cluster, knows how to find any
// Pods belonging to the StatefulSet, and compute status for the StatefulSet.
type statefulSetResourceReader struct {
mapper meta.RESTMapper
}

var _ resourceTypeStatusReader = &statefulSetResourceReader{}

func (s *statefulSetResourceReader) Supports(gk schema.GroupKind) bool {
return gk == appsv1.SchemeGroupVersion.WithKind("StatefulSet").GroupKind()
}

func (s *statefulSetResourceReader) ReadStatusForObject(_ context.Context, _ engine.ClusterReader,
statefulSet *unstructured.Unstructured) (*event.ResourceStatus, error) {
identifier := object.UnstructuredToObjMetadata(statefulSet)
res, err := status.Compute(statefulSet)
if err != nil {
return errResourceToResourceStatus(err, statefulSet, []*event.ResourceStatus{}...)
}

return &event.ResourceStatus{
Identifier: identifier,
Status: res.Status,
Resource: statefulSet,
Message: res.Message,
GeneratedResources: []*event.ResourceStatus{},
}, nil
}
Loading

0 comments on commit fe48081

Please sign in to comment.