Skip to content

Commit

Permalink
feat: add clusterauth for k8s
Browse files Browse the repository at this point in the history
  • Loading branch information
yordis committed Aug 5, 2024
1 parent a535c26 commit 19c9832
Show file tree
Hide file tree
Showing 4 changed files with 386 additions and 1 deletion.
91 changes: 91 additions & 0 deletions apis/kubernetes/v1alpha1/clusterauth_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package v1alpha1

import (
v1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)

type ClusterAuthParameters struct {
// Region is the region you'd like your resource to be created in.
// +terrajet:crd:field:TFTag=-
// +kubebuilder:validation:Required
Region string `json:"region"`

// ClusterName is the name of the cluster you'd like to fetch Kubeconfig of.
// Either ClusterName, ClusterNameRef or ClusterNameSelector has to be given.
// +crossplane:generate:reference:type=Cluster
// +crossplane:generate:reference:extractor=ExternalNameIfClusterActive()
ClusterName string `json:"clusterName,omitempty"`

// Reference to a Cluster to populate clusterName.
// Either ClusterName, ClusterNameRef or ClusterNameSelector has to be given.
// +kubebuilder:validation:Optional
ClusterNameRef *v1.Reference `json:"clusterNameRef,omitempty"`

// Selector for a Cluster to populate clusterName.
// Either ClusterName, ClusterNameRef or ClusterNameSelector has to be given.
// +kubebuilder:validation:Optional
ClusterNameSelector *v1.Selector `json:"clusterNameSelector,omitempty"`

// RefreshPeriod is how frequently you'd like the token in the published
// Kubeconfig to be refreshed. The maximum is 10m0s.
// The default is 10m0s.
// +kubebuilder:default:="10m0s"
RefreshPeriod *metav1.Duration `json:"refreshPeriod,omitempty"`
}

type ClusterAuthObservation struct {

// LastRefreshTime is the time when the token was refreshed.
LastRefreshTime *metav1.Time `json:"lastRefreshTime,omitempty"`
}

// ClusterAuthSpec defines the desired state of ClusterAuth
type ClusterAuthSpec struct {
v1.ResourceSpec `json:",inline"`
ForProvider ClusterAuthParameters `json:"forProvider"`
}

// ClusterAuthStatus defines the observed state of ClusterAuth.
type ClusterAuthStatus struct {
v1.ResourceStatus `json:",inline"`
AtProvider ClusterAuthObservation `json:"atProvider,omitempty"`
}

// +kubebuilder:object:root=true

// ClusterAuth is used to retrieve Kubeconfig of given EKS cluster.
// +kubebuilder:printcolumn:name="SYNCED",type="string",JSONPath=".status.conditions[?(@.type=='Synced')].status"
// +kubebuilder:printcolumn:name="READY",type="string",JSONPath=".status.conditions[?(@.type=='Ready')].status"
// +kubebuilder:printcolumn:name="EXTERNAL-NAME",type="string",JSONPath=".metadata.annotations.crossplane\\.io/external-name"
// +kubebuilder:printcolumn:name="AGE",type="date",JSONPath=".metadata.creationTimestamp"
// +kubebuilder:subresource:status
// +kubebuilder:resource:scope=Cluster,categories={crossplane,managed,aws}
type ClusterAuth struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec ClusterAuthSpec `json:"spec"`
Status ClusterAuthStatus `json:"status,omitempty"`
}

// +kubebuilder:object:root=true

// ClusterAuthList contains a list of ClusterAuths
type ClusterAuthList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []ClusterAuth `json:"items"`
}

// Repository type metadata.
var (
ClusterAuth_Kind = "ClusterAuth"
ClusterAuth_GroupKind = schema.GroupKind{Group: CRDGroup, Kind: ClusterAuth_Kind}.String()
ClusterAuth_KindAPIVersion = ClusterAuth_Kind + "." + CRDGroupVersion.String()
ClusterAuth_GroupVersionKind = CRDGroupVersion.WithKind(ClusterAuth_Kind)
)

//func init() {
// SchemeBuilder.Register(&ClusterAuth{}, &ClusterAuthList{})
//}
5 changes: 4 additions & 1 deletion config/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ func GetProvider() *ujconfig.Provider {
ujconfig.WithFeaturesPackage("internal/features"),
ujconfig.WithDefaultResourceOptions(
ExternalNameConfigurations(),
))
),
)

pc.BasePackages.ControllerMap["internal/controller/kubernetes/clusterauth"] = "kubernetes"

pc.AddResourceConfigurator("digitalocean_spaces_cors_configuration", func(r *ujconfig.Resource) {
r.References["region"] = ujconfig.Reference{
Expand Down
198 changes: 198 additions & 0 deletions internal/controller/kubernetes/clusterauth/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package clusterauth

import (
"context"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/eks"
"github.com/aws/aws-sdk-go-v2/service/sts"
xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
"github.com/crossplane/crossplane-runtime/pkg/connection"
"github.com/crossplane/crossplane-runtime/pkg/event"
"github.com/crossplane/crossplane-runtime/pkg/meta"
"github.com/crossplane/crossplane-runtime/pkg/reconciler/managed"
"github.com/crossplane/crossplane-runtime/pkg/resource"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

tjcontroller "github.com/crossplane/upjet/pkg/controller"
ujresource "github.com/crossplane/upjet/pkg/resource"

"github.com/crossplane-contrib/provider-upjet-digitalocean/apis/kubernetes/v1alpha1"
"github.com/upbound/provider-aws/apis/v1alpha1"
"github.com/upbound/provider-aws/internal/clients"
"github.com/upbound/provider-aws/internal/features"
)

const (
additionalDurationForExpiration = 5 * time.Minute

errNotClusterAuth = "managed resource is not a ClusterAuth custom resource"
errDescribeCluster = "cannot describe cluster"
errGetKubeconfig = "cannot get kubeconfig"
errStatusUpdate = "cannot update status of ClusterAuth"
)

// Setup adds a controller that reconciles ClusterAuth.
func Setup(mgr ctrl.Manager, o tjcontroller.Options) error {
name := managed.ControllerName(v1alpha1.ClusterAuth_GroupKind)

cps := []managed.ConnectionPublisher{managed.NewAPISecretPublisher(mgr.GetClient(), mgr.GetScheme())}
if o.Features.Enabled(features.EnableAlphaExternalSecretStores) {
cps = append(cps, connection.NewDetailsManager(mgr.GetClient(), v1alpha1.StoreConfigGroupVersionKind))
}

return ctrl.NewControllerManagedBy(mgr).
Named(name).
WithOptions(o.ForControllerRuntime()).
For(&v1alpha1.ClusterAuth{}).
Complete(managed.NewReconciler(mgr,
resource.ManagedKind(v1alpha1.ClusterAuth_GroupVersionKind),
managed.WithExternalConnecter(&connector{
kube: mgr.GetClient(),
newEKSClientFn: eks.NewFromConfig,
newPresignClientFn: newPresignClient,
}),
// We use a constant poll interval here to make sure we get a chance
// to refresh the token before it expires.
managed.WithPollInterval(time.Minute*1),
managed.WithLogger(o.Logger.WithValues("controller", name)),
managed.WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name))),
managed.WithConnectionPublishers(cps...)))
}

type connector struct {
kube client.Client
newEKSClientFn func(cfg aws.Config, optFns ...func(*eks.Options)) *eks.Client
newPresignClientFn func(cfg aws.Config, optFns ...func(*sts.Options)) *sts.PresignClient
}

func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.ExternalClient, error) {
cfg, err := clients.GetAWSConfigWithTracking(ctx, c.kube, mg)
if err != nil {
return nil, err
}
return &external{
eksClient: c.newEKSClientFn(*cfg),
presignClient: c.newPresignClientFn(*cfg),
kube: c.kube},
nil
}

type external struct {
eksClient *eks.Client
presignClient *sts.PresignClient
kube client.Client
}

func (e *external) Observe(_ context.Context, mg resource.Managed) (managed.ExternalObservation, error) { // nolint:gocyclo
cr, ok := mg.(*v1alpha1.ClusterAuth)
if !ok {
return managed.ExternalObservation{}, errors.New(errNotClusterAuth)
}
if meta.WasDeleted(cr) {
return managed.ExternalObservation{
ResourceExists: false,
}, nil
}
if cr.Status.AtProvider.LastRefreshTime == nil {
return managed.ExternalObservation{
ResourceExists: false,
}, nil
}
deadline := cr.Status.AtProvider.LastRefreshTime.Add(cr.Spec.ForProvider.RefreshPeriod.Duration)
if time.Now().After(deadline) {
return managed.ExternalObservation{
ResourceExists: true,
ResourceUpToDate: false,
}, nil
}
cr.Status.SetConditions(xpv1.Available())
ujresource.SetUpToDateCondition(mg, true)
return managed.ExternalObservation{
ResourceExists: true,
ResourceUpToDate: true,
}, nil
}

func (e *external) Create(ctx context.Context, mg resource.Managed) (managed.ExternalCreation, error) {
cr, ok := mg.(*v1alpha1.ClusterAuth)
if !ok {
return managed.ExternalCreation{}, errors.New(errNotClusterAuth)
}
cl, err := e.eksClient.DescribeCluster(ctx, &eks.DescribeClusterInput{Name: aws.String(cr.Spec.ForProvider.ClusterName)})
if err != nil {
return managed.ExternalCreation{}, errors.Wrap(err, errDescribeCluster)
}
if aws.ToString(cl.Cluster.CertificateAuthority.Data) == "" {
return managed.ExternalCreation{}, errors.New("ca data from the retrieved cluster is empty")
}
// NOTE(muvaf): The maximum time allowed for a token to live is 15 minutes
// even though API allows setting longer durations. Additional duration is
// add cushion so that we have the room for reconciliation to kick in at most
// in 5 minutes.
d := cr.Spec.ForProvider.RefreshPeriod.Duration + additionalDurationForExpiration
if d > time.Minute*15 {
d = time.Minute * 15
}
conn, err := GetConnectionDetails(
ctx,
e.presignClient,
cl.Cluster,
d,
)
if err != nil {
return managed.ExternalCreation{}, errors.Wrap(err, errGetKubeconfig)
}
t := metav1.NewTime(time.Now())
cr.Status.AtProvider.LastRefreshTime = &t
cr.Status.SetConditions(xpv1.Available())
// NOTE(muvaf): We need to update status by ourselves because after-math
// of Create doesn't include updating the status, hence the lastRefreshTime
// gets lost.
if err := e.kube.Status().Update(ctx, cr); err != nil {
return managed.ExternalCreation{}, errors.Wrap(err, errStatusUpdate)
}
return managed.ExternalCreation{ConnectionDetails: conn}, nil
}

func (e *external) Update(ctx context.Context, mg resource.Managed) (managed.ExternalUpdate, error) {
cr, ok := mg.(*v1alpha1.ClusterAuth)
if !ok {
return managed.ExternalUpdate{}, errors.New(errNotClusterAuth)
}
cl, err := e.eksClient.DescribeCluster(ctx, &eks.DescribeClusterInput{Name: aws.String(cr.Spec.ForProvider.ClusterName)})
if err != nil {
return managed.ExternalUpdate{}, errors.Wrap(err, errDescribeCluster)
}
if aws.ToString(cl.Cluster.CertificateAuthority.Data) == "" {
return managed.ExternalUpdate{}, errors.New("ca data from the retrieved cluster is empty")
}
// NOTE(muvaf): The maximum time allowed for a token to live is 15 minutes
// even though API allows setting longer durations. Additional duration is
// add cushion so that we have the room for reconciliation to kick in at most
// in 5 minutes.
d := cr.Spec.ForProvider.RefreshPeriod.Duration + additionalDurationForExpiration
if d > time.Minute*15 {
d = time.Minute * 15
}
conn, err := GetConnectionDetails(
ctx,
e.presignClient,
cl.Cluster,
d,
)
if err != nil {
return managed.ExternalUpdate{}, errors.Wrap(err, errGetKubeconfig)
}
t := metav1.NewTime(time.Now())
cr.Status.AtProvider.LastRefreshTime = &t
return managed.ExternalUpdate{ConnectionDetails: conn}, nil
}

func (e *external) Delete(_ context.Context, _ resource.Managed) error {
return nil
}
93 changes: 93 additions & 0 deletions internal/controller/kubernetes/clusterauth/eks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package clusterauth

import (
"context"
"encoding/base64"
"fmt"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sts"
smithyhttp "github.com/aws/smithy-go/transport/http"
ekstypes "github.com/crossplane-contrib/provider-upjet-digitalocean/apis/kubernetes/v1alpha1"
xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
"github.com/crossplane/crossplane-runtime/pkg/reconciler/managed"
"github.com/pkg/errors"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
)

const (
clusterIDHeader = "x-k8s-aws-id"
expireHeader = "X-Amz-Expires"
v1Prefix = "k8s-aws-v1."

errGetPresignGetCallerIdentity = "cannot get caller identity for presign"
errDecodeCA = "cannot decode certificate authority data"
errProduceKubeconfig = "cannot produce kubeconfig"
)

func newPresignClient(cfg aws.Config, optFns ...func(*sts.Options)) *sts.PresignClient {
cl := sts.NewFromConfig(cfg, optFns...)
return sts.NewPresignClient(cl)
}

// GetConnectionDetails extracts managed.ConnectionDetails out of ekstypes.Cluster.
func GetConnectionDetails(ctx context.Context, stsClient *sts.PresignClient, cluster *ekstypes.Cluster, expiration time.Duration) (managed.ConnectionDetails, error) {
getCallerIdentity, err := stsClient.PresignGetCallerIdentity(ctx, &sts.GetCallerIdentityInput{},
func(po *sts.PresignOptions) {
po.ClientOptions = []func(*sts.Options){
sts.WithAPIOptions(
smithyhttp.AddHeaderValue(clusterIDHeader, *cluster.Name),
// Required to provide.
// See https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html
smithyhttp.AddHeaderValue(expireHeader, fmt.Sprintf("%d", int(expiration.Seconds()))),
),
}
},
)
if err != nil {
return nil, errors.Wrap(err, errGetPresignGetCallerIdentity)
}

// More information: https://docs.aws.amazon.com/eks/latest/userguide/create-kubeconfig.html
token := v1Prefix + base64.RawURLEncoding.EncodeToString([]byte(getCallerIdentity.URL))

// NOTE(hasheddan): We must decode the CA data before constructing our
// Kubeconfig, as the raw Kubeconfig will be base64 encoded again when
// written as a Secret.
caData, err := base64.StdEncoding.DecodeString(aws.ToString(cluster.CertificateAuthority.Data))
if err != nil {
return managed.ConnectionDetails{}, errors.Wrap(err, errDecodeCA)
}
kc := clientcmdapi.Config{
Clusters: map[string]*clientcmdapi.Cluster{
*cluster.Name: {
Server: *cluster.Endpoint,
CertificateAuthorityData: caData,
},
},
Contexts: map[string]*clientcmdapi.Context{
*cluster.Name: {
Cluster: *cluster.Name,
AuthInfo: *cluster.Name,
},
},
AuthInfos: map[string]*clientcmdapi.AuthInfo{
*cluster.Name: {
Token: token,
},
},
CurrentContext: *cluster.Name,
}

rawConfig, err := clientcmd.Write(kc)
if err != nil {
return managed.ConnectionDetails{}, errors.Wrap(err, errProduceKubeconfig)
}
return managed.ConnectionDetails{
xpv1.ResourceCredentialsSecretEndpointKey: []byte(*cluster.Endpoint),
xpv1.ResourceCredentialsSecretKubeconfigKey: rawConfig,
xpv1.ResourceCredentialsSecretCAKey: caData,
}, nil
}

0 comments on commit 19c9832

Please sign in to comment.