diff --git a/internal/kstatus/statusreaders/common.go b/internal/kstatus/statusreaders/common.go new file mode 100644 index 00000000..7b6c7be4 --- /dev/null +++ b/internal/kstatus/statusreaders/common.go @@ -0,0 +1,143 @@ +package statusreaders + +import ( + "context" + "errors" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event" + "sigs.k8s.io/cli-utils/pkg/kstatus/status" + "sigs.k8s.io/cli-utils/pkg/object" +) + +// baseStatusReader is the implementation of the StatusReader interface defined +// in the engine package. It contains the basic logic needed for every resource. +// In order to handle resource specific logic, it must include an implementation +// of the resourceTypeStatusReader interface. +// In practice we will create many instances of baseStatusReader, each with a different +// implementation of the resourceTypeStatusReader interface and therefore each +// of the instances will be able to handle different resource types. +type baseStatusReader struct { + // mapper provides a way to look up the resource types that are available + // in the cluster. + mapper meta.RESTMapper + + // resourceStatusReader is an resource-type specific implementation + // of the resourceTypeStatusReader interface. While the baseStatusReader + // contains the logic shared between all resource types, this implementation + // will contain the resource specific info. + resourceStatusReader resourceTypeStatusReader +} + +// resourceTypeStatusReader is an interface that can be implemented differently +// for each resource type. +type resourceTypeStatusReader interface { + Supports(gk schema.GroupKind) bool + ReadStatusForObject(ctx context.Context, reader engine.ClusterReader, object *unstructured.Unstructured) (*event.ResourceStatus, error) +} + +func (b *baseStatusReader) Supports(gk schema.GroupKind) bool { + return b.resourceStatusReader.Supports(gk) +} + +// ReadStatus reads the object identified by the passed-in identifier and computes it's status. It reads +// the resource here, but computing status is delegated to the ReadStatusForObject function. +func (b *baseStatusReader) ReadStatus(ctx context.Context, reader engine.ClusterReader, identifier object.ObjMetadata) (*event.ResourceStatus, error) { + object, err := b.lookupResource(ctx, reader, identifier) + if err != nil { + return errIdentifierToResourceStatus(err, identifier) + } + return b.resourceStatusReader.ReadStatusForObject(ctx, reader, object) +} + +// ReadStatusForObject computes the status for the passed-in object. Since this is specific for each +// resource type, the actual work is delegated to the implementation of the resourceTypeStatusReader interface. +func (b *baseStatusReader) ReadStatusForObject(ctx context.Context, reader engine.ClusterReader, object *unstructured.Unstructured) (*event.ResourceStatus, error) { + return b.resourceStatusReader.ReadStatusForObject(ctx, reader, object) +} + +// lookupResource looks up a resource with the given identifier. It will use the rest mapper to resolve +// the version of the GroupKind given in the identifier. +// If the resource is found, it is returned. If it is not found or something +// went wrong, the function will return an error. +func (b *baseStatusReader) lookupResource(ctx context.Context, reader engine.ClusterReader, identifier object.ObjMetadata) (*unstructured.Unstructured, error) { + GVK, err := gvk(identifier.GroupKind, b.mapper) + if err != nil { + return nil, err + } + + var u unstructured.Unstructured + u.SetGroupVersionKind(GVK) + key := types.NamespacedName{ + Name: identifier.Name, + Namespace: identifier.Namespace, + } + err = reader.Get(ctx, key, &u) + if err != nil { + return nil, err + } + return &u, nil +} + +// gvk looks up the GVK from a GroupKind using the rest mapper. +func gvk(gk schema.GroupKind, mapper meta.RESTMapper) (schema.GroupVersionKind, error) { + mapping, err := mapper.RESTMapping(gk) + if err != nil { + return schema.GroupVersionKind{}, err + } + return mapping.GroupVersionKind, nil +} + +// errResourceToResourceStatus construct the appropriate ResourceStatus +// object based on an error and the resource itself. +func errResourceToResourceStatus(err error, resource *unstructured.Unstructured, genResources ...*event.ResourceStatus) (*event.ResourceStatus, error) { + // If the error is from the context, we don't attach that to the ResourceStatus, + // but just return it directly so the caller can decide how to handle this + // situation. + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return nil, err + } + identifier := object.UnstructuredToObjMetadata(resource) + if apierrors.IsNotFound(err) { + return &event.ResourceStatus{ + Identifier: identifier, + Status: status.NotFoundStatus, + Message: "Resource not found", + }, nil + } + return &event.ResourceStatus{ + Identifier: identifier, + Status: status.UnknownStatus, + Resource: resource, + Error: err, + GeneratedResources: genResources, + }, nil +} + +// errIdentifierToResourceStatus construct the appropriate ResourceStatus +// object based on an error and the identifier for a resource. +func errIdentifierToResourceStatus(err error, identifier object.ObjMetadata) (*event.ResourceStatus, error) { + // If the error is from the context, we don't attach that to the ResourceStatus, + // but just return it directly so the caller can decide how to handle this + // situation. + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return nil, err + } + if apierrors.IsNotFound(err) { + return &event.ResourceStatus{ + Identifier: identifier, + Status: status.NotFoundStatus, + Message: "Resource not found", + }, nil + } + return &event.ResourceStatus{ + Identifier: identifier, + Status: status.UnknownStatus, + Error: err, + }, nil +} diff --git a/internal/kstatus/statusreaders/default.go b/internal/kstatus/statusreaders/default.go new file mode 100644 index 00000000..f79c216e --- /dev/null +++ b/internal/kstatus/statusreaders/default.go @@ -0,0 +1,84 @@ +package statusreaders + +import ( + "context" + "fmt" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/statusreaders" + "sigs.k8s.io/cli-utils/pkg/kstatus/status" + "sigs.k8s.io/cli-utils/pkg/object" +) + +// NewDefaultStatusReader returns a DelegatingStatusReader that wraps a list of +// statusreaders to cover all built-in Kubernetes resources and other CRDs that +// follow known status conventions. +func NewDefaultStatusReader(mapper meta.RESTMapper) engine.StatusReader { + return NewStatusReader(mapper) +} + +// NewStatusReader returns a DelegatingStatusReader that includes the statusreaders +// for the build-in Kubernetes resources and also any provided custom status readers. +func NewStatusReader(mapper meta.RESTMapper, statusReaders ...engine.StatusReader) engine.StatusReader { + defaultStatusReader := statusreaders.NewGenericStatusReader(mapper, status.Compute) + + replicaSetStatusReader := NewReplicaSetStatusReader(mapper) + deploymentStatusReader := NewDeploymentResourceReader(mapper) + statefulSetStatusReader := NewStatefulSetResourceReader(mapper) + + statusReaders = append(statusReaders, + deploymentStatusReader, + statefulSetStatusReader, + replicaSetStatusReader, + defaultStatusReader, + ) + + return &DelegatingStatusReader{ + StatusReaders: statusReaders, + } +} + +type DelegatingStatusReader struct { + StatusReaders []engine.StatusReader +} + +func (dsr *DelegatingStatusReader) Supports(gk schema.GroupKind) bool { + for _, sr := range dsr.StatusReaders { + if sr.Supports(gk) { + return true + } + } + return false +} + +func (dsr *DelegatingStatusReader) ReadStatus( + ctx context.Context, + reader engine.ClusterReader, + id object.ObjMetadata, +) (*event.ResourceStatus, error) { + gk := id.GroupKind + for _, sr := range dsr.StatusReaders { + if sr.Supports(gk) { + return sr.ReadStatus(ctx, reader, id) + } + } + return nil, fmt.Errorf("no status reader supports this resource: %v", gk) +} + +func (dsr *DelegatingStatusReader) ReadStatusForObject( + ctx context.Context, + reader engine.ClusterReader, + obj *unstructured.Unstructured, +) (*event.ResourceStatus, error) { + gk := obj.GroupVersionKind().GroupKind() + for _, sr := range dsr.StatusReaders { + if sr.Supports(gk) { + return sr.ReadStatusForObject(ctx, reader, obj) + } + } + return nil, fmt.Errorf("no status reader supports this resource: %v", gk) +} diff --git a/internal/kstatus/statusreaders/deployment.go b/internal/kstatus/statusreaders/deployment.go new file mode 100644 index 00000000..1d073ef4 --- /dev/null +++ b/internal/kstatus/statusreaders/deployment.go @@ -0,0 +1,54 @@ +package statusreaders + +import ( + "context" + + appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event" + "sigs.k8s.io/cli-utils/pkg/kstatus/status" + "sigs.k8s.io/cli-utils/pkg/object" +) + +func NewDeploymentResourceReader(mapper meta.RESTMapper) engine.StatusReader { + return &baseStatusReader{ + mapper: mapper, + resourceStatusReader: &deploymentResourceReader{ + mapper: mapper, + }, + } +} + +// deploymentResourceReader is a resourceTypeStatusReader that can fetch Deployment +// resources from the cluster, knows how to find any ReplicaSets belonging to the +// Deployment, and compute status for the deployment. +type deploymentResourceReader struct { + mapper meta.RESTMapper +} + +var _ resourceTypeStatusReader = &deploymentResourceReader{} + +func (d *deploymentResourceReader) Supports(gk schema.GroupKind) bool { + return gk == appsv1.SchemeGroupVersion.WithKind("Deployment").GroupKind() +} + +func (d *deploymentResourceReader) ReadStatusForObject(_ context.Context, _ engine.ClusterReader, + deployment *unstructured.Unstructured) (*event.ResourceStatus, error) { + identifier := object.UnstructuredToObjMetadata(deployment) + + res, err := status.Compute(deployment) + if err != nil { + return errResourceToResourceStatus(err, deployment, []*event.ResourceStatus{}...) + } + + return &event.ResourceStatus{ + Identifier: identifier, + Status: res.Status, + Resource: deployment, + Message: res.Message, + GeneratedResources: []*event.ResourceStatus{}, + }, nil +} diff --git a/internal/kstatus/statusreaders/replicaset.go b/internal/kstatus/statusreaders/replicaset.go new file mode 100644 index 00000000..2592a65f --- /dev/null +++ b/internal/kstatus/statusreaders/replicaset.go @@ -0,0 +1,52 @@ +package statusreaders + +import ( + "context" + + appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event" + "sigs.k8s.io/cli-utils/pkg/kstatus/status" + "sigs.k8s.io/cli-utils/pkg/object" +) + +func NewReplicaSetStatusReader(mapper meta.RESTMapper) engine.StatusReader { + return &baseStatusReader{ + mapper: mapper, + resourceStatusReader: &replicaSetStatusReader{ + mapper: mapper, + }, + } +} + +// replicaSetStatusReader is an engine that can fetch ReplicaSet resources +// from the cluster, knows how to find any Pods belonging to the ReplicaSet, +// and compute status for the ReplicaSet. +type replicaSetStatusReader struct { + mapper meta.RESTMapper +} + +var _ resourceTypeStatusReader = &replicaSetStatusReader{} + +func (r *replicaSetStatusReader) Supports(gk schema.GroupKind) bool { + return gk == appsv1.SchemeGroupVersion.WithKind("ReplicaSet").GroupKind() +} + +func (r *replicaSetStatusReader) ReadStatusForObject(_ context.Context, _ engine.ClusterReader, rs *unstructured.Unstructured) (*event.ResourceStatus, error) { + identifier := object.UnstructuredToObjMetadata(rs) + res, err := status.Compute(rs) + if err != nil { + return errResourceToResourceStatus(err, rs, []*event.ResourceStatus{}...) + } + + return &event.ResourceStatus{ + Identifier: identifier, + Status: res.Status, + Resource: rs, + Message: res.Message, + GeneratedResources: []*event.ResourceStatus{}, + }, nil +} diff --git a/internal/kstatus/statusreaders/statefulset.go b/internal/kstatus/statusreaders/statefulset.go new file mode 100644 index 00000000..9df162b2 --- /dev/null +++ b/internal/kstatus/statusreaders/statefulset.go @@ -0,0 +1,53 @@ +package statusreaders + +import ( + "context" + + appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event" + "sigs.k8s.io/cli-utils/pkg/kstatus/status" + "sigs.k8s.io/cli-utils/pkg/object" +) + +func NewStatefulSetResourceReader(mapper meta.RESTMapper) engine.StatusReader { + return &baseStatusReader{ + mapper: mapper, + resourceStatusReader: &statefulSetResourceReader{ + mapper: mapper, + }, + } +} + +// statefulSetResourceReader is an implementation of the ResourceReader interface +// that can fetch StatefulSet resources from the cluster, knows how to find any +// Pods belonging to the StatefulSet, and compute status for the StatefulSet. +type statefulSetResourceReader struct { + mapper meta.RESTMapper +} + +var _ resourceTypeStatusReader = &statefulSetResourceReader{} + +func (s *statefulSetResourceReader) Supports(gk schema.GroupKind) bool { + return gk == appsv1.SchemeGroupVersion.WithKind("StatefulSet").GroupKind() +} + +func (s *statefulSetResourceReader) ReadStatusForObject(_ context.Context, _ engine.ClusterReader, + statefulSet *unstructured.Unstructured) (*event.ResourceStatus, error) { + identifier := object.UnstructuredToObjMetadata(statefulSet) + res, err := status.Compute(statefulSet) + if err != nil { + return errResourceToResourceStatus(err, statefulSet, []*event.ResourceStatus{}...) + } + + return &event.ResourceStatus{ + Identifier: identifier, + Status: res.Status, + Resource: statefulSet, + Message: res.Message, + GeneratedResources: []*event.ResourceStatus{}, + }, nil +} diff --git a/internal/kstatus/watcher/watcher.go b/internal/kstatus/watcher/watcher.go index e2a7d388..e4b83591 100644 --- a/internal/kstatus/watcher/watcher.go +++ b/internal/kstatus/watcher/watcher.go @@ -3,15 +3,17 @@ package watcher import ( "context" "fmt" + "time" - "k8s.io/client-go/discovery" - "k8s.io/klog/v2" - kwatcher "sigs.k8s.io/cli-utils/pkg/kstatus/watcher" - + "github.com/pluralsh/deployment-operator/internal/kstatus/statusreaders" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" + "k8s.io/klog/v2" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/clusterreader" "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event" + kwatcher "sigs.k8s.io/cli-utils/pkg/kstatus/watcher" "sigs.k8s.io/cli-utils/pkg/object" ) @@ -93,7 +95,18 @@ func NewDynamicStatusWatcher(dynamicClient dynamic.Interface, discoveryClient di informerRefs = make(map[GroupKindNamespace]*watcherReference) } - defaultStatusWatcher := kwatcher.NewDefaultStatusWatcher(dynamicClient, mapper) + defaultStatusWatcher := &kwatcher.DefaultStatusWatcher{ + DynamicClient: dynamicClient, + Mapper: mapper, + ResyncPeriod: 1 * time.Hour, + StatusReader: statusreaders.NewDefaultStatusReader(mapper), + ClusterReader: &clusterreader.DynamicClusterReader{ + DynamicClient: dynamicClient, + Mapper: mapper, + }, + Indexers: kwatcher.DefaultIndexers(), + } + defaultStatusWatcher.Filters = options.Filters return &DynamicStatusWatcher{