From ce5b3513940ca949a715b8d6583306624708c798 Mon Sep 17 00:00:00 2001 From: Timo Riski Date: Tue, 19 Nov 2024 10:25:31 +0200 Subject: [PATCH 1/2] feat: add Flink kind --- CHANGELOG.md | 1 + PROJECT | 13 + api/v1alpha1/flink_types.go | 68 +++ api/v1alpha1/flink_webhook.go | 61 +++ api/v1alpha1/setup_webhooks.go | 3 + .../userconfig/service/flink/flink.go | 72 +++ .../service/flink/zz_generated.deepcopy.go | 143 ++++++ api/v1alpha1/zz_generated.deepcopy.go | 81 ++++ .../templates/aiven.io_flinks.yaml | 410 ++++++++++++++++++ .../templates/cluster_role.yaml | 28 ++ .../mutating_webhook_configuration.yaml | 21 + .../validating_webhook_configuration.yaml | 22 + config/crd/bases/aiven.io_flinks.yaml | 410 ++++++++++++++++++ config/crd/kustomization.yaml | 3 + config/crd/patches/cainjection_in_flinks.yaml | 7 + config/crd/patches/webhook_in_flinks.yaml | 16 + config/rbac/flink_editor_role.yaml | 31 ++ config/rbac/flink_viewer_role.yaml | 27 ++ config/rbac/role.yaml | 28 ++ config/samples/_v1alpha1_flink.yaml | 12 + config/samples/kustomization.yaml | 1 + config/webhook/manifests.yaml | 41 ++ controllers/flink_controller.go | 97 +++++ controllers/setup.go | 1 + docs/docs/api-reference/flink.md | 166 +++++++ docs/mkdocs.yml | 1 + main.go | 2 +- tests/flink_test.go | 106 +++++ 28 files changed, 1871 insertions(+), 1 deletion(-) create mode 100644 api/v1alpha1/flink_types.go create mode 100644 api/v1alpha1/flink_webhook.go create mode 100644 api/v1alpha1/userconfig/service/flink/flink.go create mode 100644 api/v1alpha1/userconfig/service/flink/zz_generated.deepcopy.go create mode 100644 charts/aiven-operator-crds/templates/aiven.io_flinks.yaml create mode 100644 config/crd/bases/aiven.io_flinks.yaml create mode 100644 config/crd/patches/cainjection_in_flinks.yaml create mode 100644 config/crd/patches/webhook_in_flinks.yaml create mode 100644 config/rbac/flink_editor_role.yaml create mode 100644 config/rbac/flink_viewer_role.yaml create mode 100644 config/samples/_v1alpha1_flink.yaml create mode 100644 controllers/flink_controller.go create mode 100644 docs/docs/api-reference/flink.md create mode 100644 tests/flink_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 94aa4575..b4e48d77 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,7 @@ minimum ~~`1`~~ → `0` - Change `Cassandra` field `userConfig.cassandra_version`: enum remove `4` - Change `PostgreSQL` field `userConfig.pg_version`: enum remove `12` +- Add kind: `Flink` ## v0.25.0 - 2024-09-19 diff --git a/PROJECT b/PROJECT index 8212945a..caf11ecb 100644 --- a/PROJECT +++ b/PROJECT @@ -291,4 +291,17 @@ resources: defaulting: true validation: true webhookVersion: v1 +- api: + crdVersion: v1 + namespaced: true + controller: true + domain: aiven.io + kind: Flink + path: github.com/aiven/aiven-operator/api/v1alpha1 + version: v1alpha1 + webhooks: + conversion: true + defaulting: true + validation: true + webhookVersion: v1 version: "3" diff --git a/api/v1alpha1/flink_types.go b/api/v1alpha1/flink_types.go new file mode 100644 index 00000000..c4ca1f8f --- /dev/null +++ b/api/v1alpha1/flink_types.go @@ -0,0 +1,68 @@ +// Copyright (c) 2024 Aiven, Helsinki, Finland. https://aiven.io/ + +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + flinkuserconfig "github.com/aiven/aiven-operator/api/v1alpha1/userconfig/service/flink" +) + +// FlinkSpec defines the desired state of Flink +type FlinkSpec struct { + ServiceCommonSpec `json:",inline"` + + // Cassandra specific user configuration options + UserConfig *flinkuserconfig.FlinkUserConfig `json:"userConfig,omitempty"` +} + +// Flink is the Schema for the flinks API. +// Info "Exposes secret keys": `FLINK_HOST`, `FLINK_PORT`, `FLINK_USER`, `FLINK_PASSWORD`, `FLINK_URI`, `FLINK_HOSTS` +// +kubebuilder:object:root=true +// +kubebuilder:subresource:status +// +kubebuilder:printcolumn:name="Project",type="string",JSONPath=".spec.project" +// +kubebuilder:printcolumn:name="Region",type="string",JSONPath=".spec.cloudName" +// +kubebuilder:printcolumn:name="Plan",type="string",JSONPath=".spec.plan" +// +kubebuilder:printcolumn:name="State",type="string",JSONPath=".status.state" +type Flink struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec FlinkSpec `json:"spec,omitempty"` + Status ServiceStatus `json:"status,omitempty"` +} + +var _ AivenManagedObject = &Flink{} + +func (in *Flink) AuthSecretRef() *AuthSecretReference { + return in.Spec.AuthSecretRef +} + +func (in *Flink) Conditions() *[]metav1.Condition { + return &in.Status.Conditions +} + +func (in *Flink) NoSecret() bool { + return in.Spec.ConnInfoSecretTargetDisabled != nil && *in.Spec.ConnInfoSecretTargetDisabled +} + +func (in *Flink) GetRefs() []*ResourceReferenceObject { + return in.Spec.GetRefs(in.GetNamespace()) +} + +func (in *Flink) GetConnInfoSecretTarget() ConnInfoSecretTarget { + return in.Spec.ConnInfoSecretTarget +} + +//+kubebuilder:object:root=true + +// FlinkList contains a list of Flink +type FlinkList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []Flink `json:"items"` +} + +func init() { + SchemeBuilder.Register(&Flink{}, &FlinkList{}) +} diff --git a/api/v1alpha1/flink_webhook.go b/api/v1alpha1/flink_webhook.go new file mode 100644 index 00000000..2b43f61f --- /dev/null +++ b/api/v1alpha1/flink_webhook.go @@ -0,0 +1,61 @@ +// Copyright (c) 2024 Aiven, Helsinki, Finland. https://aiven.io/ + +package v1alpha1 + +import ( + "errors" + + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/webhook" +) + +// log is for logging in this package. +var flinklog = logf.Log.WithName("flink-resource") + +func (in *Flink) SetupWebhookWithManager(mgr ctrl.Manager) error { + return ctrl.NewWebhookManagedBy(mgr). + For(in). + Complete() +} + +//+kubebuilder:webhook:path=/mutate-aiven-io-v1alpha1-flink,mutating=true,failurePolicy=fail,sideEffects=None,groups=aiven.io,resources=flinks,verbs=create;update,versions=v1alpha1,name=mflink.kb.io,admissionReviewVersions=v1 + +var _ webhook.Defaulter = &Flink{} + +func (in *Flink) Default() { + flinklog.Info("default", "name", in.Name) +} + +//+kubebuilder:webhook:verbs=create;update;delete,path=/validate-aiven-io-v1alpha1-flink,mutating=false,failurePolicy=fail,groups=aiven.io,resources=flinks,versions=v1alpha1,name=vflink.kb.io,sideEffects=none,admissionReviewVersions=v1 + +var _ webhook.Validator = &Flink{} + +// ValidateCreate implements webhook.Validator so a webhook will be registered for the type +func (in *Flink) ValidateCreate() error { + flinklog.Info("validate create", "name", in.Name) + + return in.Spec.Validate() +} + +// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type +func (in *Flink) ValidateUpdate(old runtime.Object) error { + flinklog.Info("validate update", "name", in.Name) + return in.Spec.Validate() +} + +// ValidateDelete implements webhook.Validator so a webhook will be registered for the type +func (in *Flink) ValidateDelete() error { + flinklog.Info("validate delete", "name", in.Name) + + if in.Spec.TerminationProtection != nil && *in.Spec.TerminationProtection { + return errors.New("cannot delete Flink service, termination protection is on") + } + + if in.Spec.ProjectVPCID != "" && in.Spec.ProjectVPCRef != nil { + return errors.New("cannot use both projectVpcId and projectVPCRef") + } + + return nil +} diff --git a/api/v1alpha1/setup_webhooks.go b/api/v1alpha1/setup_webhooks.go index 040892e1..af91e07b 100644 --- a/api/v1alpha1/setup_webhooks.go +++ b/api/v1alpha1/setup_webhooks.go @@ -16,6 +16,9 @@ func SetupWebhooks(mgr ctrl.Manager) error { if err := (&Database{}).SetupWebhookWithManager(mgr); err != nil { return fmt.Errorf("webhook Database: %w", err) } + if err := (&Flink{}).SetupWebhookWithManager(mgr); err != nil { + return fmt.Errorf("webhook Flink: %w", err) + } if err := (&ConnectionPool{}).SetupWebhookWithManager(mgr); err != nil { return fmt.Errorf("webhook ConnectionPool: %w", err) } diff --git a/api/v1alpha1/userconfig/service/flink/flink.go b/api/v1alpha1/userconfig/service/flink/flink.go new file mode 100644 index 00000000..c8fd9067 --- /dev/null +++ b/api/v1alpha1/userconfig/service/flink/flink.go @@ -0,0 +1,72 @@ +// Code generated by user config generator. DO NOT EDIT. +// +kubebuilder:object:generate=true + +package flinkuserconfig + +// CIDR address block, either as a string, or in a dict with an optional description field +type IpFilter struct { + // +kubebuilder:validation:MaxLength=1024 + // Description for IP filter list entry + Description *string `groups:"create,update" json:"description,omitempty"` + + // +kubebuilder:validation:MaxLength=43 + // CIDR address block + Network string `groups:"create,update" json:"network"` +} + +// Allow access to selected service components through Privatelink +type PrivatelinkAccess struct { + // Enable flink + Flink *bool `groups:"create,update" json:"flink,omitempty"` + + // Enable prometheus + Prometheus *bool `groups:"create,update" json:"prometheus,omitempty"` +} + +// Allow access to selected service ports from the public Internet +type PublicAccess struct { + // Allow clients to connect to flink from the public internet for service nodes that are in a project VPC or another type of private network + Flink *bool `groups:"create,update" json:"flink,omitempty"` +} +type FlinkUserConfig struct { + // +kubebuilder:validation:MaxItems=1 + // +kubebuilder:deprecatedversion:warning="additional_backup_regions is deprecated" + // Deprecated. Additional Cloud Regions for Backup Replication + AdditionalBackupRegions []string `groups:"create,update" json:"additional_backup_regions,omitempty"` + + // +kubebuilder:validation:Enum="1.19" + // +kubebuilder:validation:XValidation:rule="self == oldSelf",message="Value is immutable" + // Flink major version + FlinkVersion *string `groups:"create" json:"flink_version,omitempty"` + + // +kubebuilder:validation:MaxItems=1024 + // Allow incoming connections from CIDR address block, e.g. '10.20.0.0/16' + IpFilter []*IpFilter `groups:"create,update" json:"ip_filter,omitempty"` + + // +kubebuilder:validation:Minimum=1 + // +kubebuilder:validation:Maximum=1024 + // Task slots per node. For a 3 node plan, total number of task slots is 3x this value + NumberOfTaskSlots *int `groups:"create,update" json:"number_of_task_slots,omitempty"` + + // +kubebuilder:validation:Minimum=5 + // +kubebuilder:validation:Maximum=60 + // Timeout in seconds used for all futures and blocking Pekko requests + PekkoAskTimeoutS *int `groups:"create,update" json:"pekko_ask_timeout_s,omitempty"` + + // +kubebuilder:validation:Minimum=1048576 + // +kubebuilder:validation:Maximum=52428800 + // Maximum size in bytes for messages exchanged between the JobManager and the TaskManagers + PekkoFramesizeB *int `groups:"create,update" json:"pekko_framesize_b,omitempty"` + + // Allow access to selected service components through Privatelink + PrivatelinkAccess *PrivatelinkAccess `groups:"create,update" json:"privatelink_access,omitempty"` + + // Allow access to selected service ports from the public Internet + PublicAccess *PublicAccess `groups:"create,update" json:"public_access,omitempty"` + + // Store logs for the service so that they are available in the HTTP API and console. + ServiceLog *bool `groups:"create,update" json:"service_log,omitempty"` + + // Use static public IP addresses + StaticIps *bool `groups:"create,update" json:"static_ips,omitempty"` +} diff --git a/api/v1alpha1/userconfig/service/flink/zz_generated.deepcopy.go b/api/v1alpha1/userconfig/service/flink/zz_generated.deepcopy.go new file mode 100644 index 00000000..d4428ab3 --- /dev/null +++ b/api/v1alpha1/userconfig/service/flink/zz_generated.deepcopy.go @@ -0,0 +1,143 @@ +//go:build !ignore_autogenerated + +// Copyright (c) 2024 Aiven, Helsinki, Finland. https://aiven.io/ + +// Code generated by controller-gen. DO NOT EDIT. + +package flinkuserconfig + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FlinkUserConfig) DeepCopyInto(out *FlinkUserConfig) { + *out = *in + if in.AdditionalBackupRegions != nil { + in, out := &in.AdditionalBackupRegions, &out.AdditionalBackupRegions + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.FlinkVersion != nil { + in, out := &in.FlinkVersion, &out.FlinkVersion + *out = new(string) + **out = **in + } + if in.IpFilter != nil { + in, out := &in.IpFilter, &out.IpFilter + *out = make([]*IpFilter, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(IpFilter) + (*in).DeepCopyInto(*out) + } + } + } + if in.NumberOfTaskSlots != nil { + in, out := &in.NumberOfTaskSlots, &out.NumberOfTaskSlots + *out = new(int) + **out = **in + } + if in.PekkoAskTimeoutS != nil { + in, out := &in.PekkoAskTimeoutS, &out.PekkoAskTimeoutS + *out = new(int) + **out = **in + } + if in.PekkoFramesizeB != nil { + in, out := &in.PekkoFramesizeB, &out.PekkoFramesizeB + *out = new(int) + **out = **in + } + if in.PrivatelinkAccess != nil { + in, out := &in.PrivatelinkAccess, &out.PrivatelinkAccess + *out = new(PrivatelinkAccess) + (*in).DeepCopyInto(*out) + } + if in.PublicAccess != nil { + in, out := &in.PublicAccess, &out.PublicAccess + *out = new(PublicAccess) + (*in).DeepCopyInto(*out) + } + if in.ServiceLog != nil { + in, out := &in.ServiceLog, &out.ServiceLog + *out = new(bool) + **out = **in + } + if in.StaticIps != nil { + in, out := &in.StaticIps, &out.StaticIps + *out = new(bool) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlinkUserConfig. +func (in *FlinkUserConfig) DeepCopy() *FlinkUserConfig { + if in == nil { + return nil + } + out := new(FlinkUserConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *IpFilter) DeepCopyInto(out *IpFilter) { + *out = *in + if in.Description != nil { + in, out := &in.Description, &out.Description + *out = new(string) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new IpFilter. +func (in *IpFilter) DeepCopy() *IpFilter { + if in == nil { + return nil + } + out := new(IpFilter) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PrivatelinkAccess) DeepCopyInto(out *PrivatelinkAccess) { + *out = *in + if in.Flink != nil { + in, out := &in.Flink, &out.Flink + *out = new(bool) + **out = **in + } + if in.Prometheus != nil { + in, out := &in.Prometheus, &out.Prometheus + *out = new(bool) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PrivatelinkAccess. +func (in *PrivatelinkAccess) DeepCopy() *PrivatelinkAccess { + if in == nil { + return nil + } + out := new(PrivatelinkAccess) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PublicAccess) DeepCopyInto(out *PublicAccess) { + *out = *in + if in.Flink != nil { + in, out := &in.Flink, &out.Flink + *out = new(bool) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PublicAccess. +func (in *PublicAccess) DeepCopy() *PublicAccess { + if in == nil { + return nil + } + out := new(PublicAccess) + in.DeepCopyInto(out) + return out +} diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index bea6ad57..e4873e38 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -36,6 +36,7 @@ import ( rsyslog "github.com/aiven/aiven-operator/api/v1alpha1/userconfig/integrationendpoints/rsyslog" cassandra "github.com/aiven/aiven-operator/api/v1alpha1/userconfig/service/cassandra" clickhouse "github.com/aiven/aiven-operator/api/v1alpha1/userconfig/service/clickhouse" + flink "github.com/aiven/aiven-operator/api/v1alpha1/userconfig/service/flink" grafana "github.com/aiven/aiven-operator/api/v1alpha1/userconfig/service/grafana" kafka "github.com/aiven/aiven-operator/api/v1alpha1/userconfig/service/kafka" kafka_connect "github.com/aiven/aiven-operator/api/v1alpha1/userconfig/service/kafka_connect" @@ -913,6 +914,86 @@ func (in *DatabaseStatus) DeepCopy() *DatabaseStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Flink) DeepCopyInto(out *Flink) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Flink. +func (in *Flink) DeepCopy() *Flink { + if in == nil { + return nil + } + out := new(Flink) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *Flink) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FlinkList) DeepCopyInto(out *FlinkList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]Flink, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlinkList. +func (in *FlinkList) DeepCopy() *FlinkList { + if in == nil { + return nil + } + out := new(FlinkList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *FlinkList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FlinkSpec) DeepCopyInto(out *FlinkSpec) { + *out = *in + in.ServiceCommonSpec.DeepCopyInto(&out.ServiceCommonSpec) + if in.UserConfig != nil { + in, out := &in.UserConfig, &out.UserConfig + *out = new(flink.FlinkUserConfig) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlinkSpec. +func (in *FlinkSpec) DeepCopy() *FlinkSpec { + if in == nil { + return nil + } + out := new(FlinkSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Grafana) DeepCopyInto(out *Grafana) { *out = *in diff --git a/charts/aiven-operator-crds/templates/aiven.io_flinks.yaml b/charts/aiven-operator-crds/templates/aiven.io_flinks.yaml new file mode 100644 index 00000000..4d886de4 --- /dev/null +++ b/charts/aiven-operator-crds/templates/aiven.io_flinks.yaml @@ -0,0 +1,410 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.15.0 + name: flinks.aiven.io +spec: + group: aiven.io + names: + kind: Flink + listKind: FlinkList + plural: flinks + singular: flink + scope: Namespaced + versions: + - additionalPrinterColumns: + - jsonPath: .spec.project + name: Project + type: string + - jsonPath: .spec.cloudName + name: Region + type: string + - jsonPath: .spec.plan + name: Plan + type: string + - jsonPath: .status.state + name: State + type: string + name: v1alpha1 + schema: + openAPIV3Schema: + description: |- + Flink is the Schema for the flinks API. + Info "Exposes secret keys": `FLINK_HOST`, `FLINK_PORT`, `FLINK_USER`, `FLINK_PASSWORD`, `FLINK_URI`, `FLINK_HOSTS` + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: FlinkSpec defines the desired state of Flink + properties: + authSecretRef: + description: Authentication reference to Aiven token in a secret + properties: + key: + minLength: 1 + type: string + name: + minLength: 1 + type: string + required: + - key + - name + type: object + cloudName: + description: Cloud the service runs in. + maxLength: 256 + type: string + connInfoSecretTarget: + description: Secret configuration. + properties: + annotations: + additionalProperties: + type: string + description: Annotations added to the secret + type: object + x-kubernetes-preserve-unknown-fields: true + labels: + additionalProperties: + type: string + description: Labels added to the secret + type: object + x-kubernetes-preserve-unknown-fields: true + name: + description: + Name of the secret resource to be created. By default, + it is equal to the resource name + type: string + x-kubernetes-validations: + - message: Value is immutable + rule: self == oldSelf + prefix: + description: |- + Prefix for the secret's keys. + Added "as is" without any transformations. + By default, is equal to the kind name in uppercase + underscore, e.g. `KAFKA_`, `REDIS_`, etc. + type: string + required: + - name + type: object + connInfoSecretTargetDisabled: + description: + When true, the secret containing connection information + will not be created, defaults to false. This field cannot be changed + after resource creation. + type: boolean + x-kubernetes-validations: + - message: connInfoSecretTargetDisabled is immutable. + rule: self == oldSelf + disk_space: + description: |- + The disk space of the service, possible values depend on the service type, the cloud provider and the project. + Reducing will result in the service re-balancing. + The removal of this field does not change the value. + pattern: (?i)^[1-9][0-9]*(GiB|G)?$ + type: string + maintenanceWindowDow: + description: + Day of week when maintenance operations should be performed. + One monday, tuesday, wednesday, etc. + enum: + - monday + - tuesday + - wednesday + - thursday + - friday + - saturday + - sunday + type: string + maintenanceWindowTime: + description: + Time of day when maintenance operations should be performed. + UTC time in HH:mm:ss format. + maxLength: 8 + type: string + plan: + description: Subscription plan. + maxLength: 128 + type: string + project: + description: Identifies the project this resource belongs to + maxLength: 63 + pattern: ^[a-zA-Z0-9_-]+$ + type: string + x-kubernetes-validations: + - message: Value is immutable + rule: self == oldSelf + projectVPCRef: + description: + ProjectVPCRef reference to ProjectVPC resource to use + its ID as ProjectVPCID automatically + properties: + name: + minLength: 1 + type: string + namespace: + minLength: 1 + type: string + required: + - name + type: object + projectVpcId: + description: Identifier of the VPC the service should be in, if any. + maxLength: 36 + type: string + serviceIntegrations: + description: + Service integrations to specify when creating a service. + Not applied after initial service creation + items: + description: + Service integrations to specify when creating a service. + Not applied after initial service creation + properties: + integrationType: + enum: + - read_replica + type: string + sourceServiceName: + maxLength: 64 + minLength: 1 + type: string + required: + - integrationType + - sourceServiceName + type: object + maxItems: 1 + type: array + x-kubernetes-validations: + - message: Value is immutable + rule: self == oldSelf + tags: + additionalProperties: + type: string + description: + Tags are key-value pairs that allow you to categorize + services. + type: object + technicalEmails: + description: + Defines the email addresses that will receive alerts + about upcoming maintenance updates or warnings about service instability. + items: + properties: + email: + description: Email address. + pattern: ^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$ + type: string + required: + - email + type: object + maxItems: 10 + type: array + terminationProtection: + description: + Prevent service from being deleted. It is recommended + to have this enabled for all services. + type: boolean + userConfig: + description: Cassandra specific user configuration options + properties: + additional_backup_regions: + description: Deprecated. Additional Cloud Regions for Backup Replication + items: + type: string + maxItems: 1 + type: array + flink_version: + description: Flink major version + enum: + - "1.19" + type: string + x-kubernetes-validations: + - message: Value is immutable + rule: self == oldSelf + ip_filter: + description: + Allow incoming connections from CIDR address block, + e.g. '10.20.0.0/16' + items: + description: + CIDR address block, either as a string, or in a + dict with an optional description field + properties: + description: + description: Description for IP filter list entry + maxLength: 1024 + type: string + network: + description: CIDR address block + maxLength: 43 + type: string + required: + - network + type: object + maxItems: 1024 + type: array + number_of_task_slots: + description: + Task slots per node. For a 3 node plan, total number + of task slots is 3x this value + maximum: 1024 + minimum: 1 + type: integer + pekko_ask_timeout_s: + description: + Timeout in seconds used for all futures and blocking + Pekko requests + maximum: 60 + minimum: 5 + type: integer + pekko_framesize_b: + description: + Maximum size in bytes for messages exchanged between + the JobManager and the TaskManagers + maximum: 52428800 + minimum: 1048576 + type: integer + privatelink_access: + description: + Allow access to selected service components through + Privatelink + properties: + flink: + description: Enable flink + type: boolean + prometheus: + description: Enable prometheus + type: boolean + type: object + public_access: + description: + Allow access to selected service ports from the public + Internet + properties: + flink: + description: + Allow clients to connect to flink from the public + internet for service nodes that are in a project VPC or + another type of private network + type: boolean + type: object + service_log: + description: + Store logs for the service so that they are available + in the HTTP API and console. + type: boolean + static_ips: + description: Use static public IP addresses + type: boolean + type: object + required: + - plan + - project + type: object + x-kubernetes-validations: + - message: + connInfoSecretTargetDisabled can only be set during resource + creation. + rule: has(oldSelf.connInfoSecretTargetDisabled) == has(self.connInfoSecretTargetDisabled) + status: + description: ServiceStatus defines the observed state of service + properties: + conditions: + description: + Conditions represent the latest available observations + of a service state + items: + description: + "Condition contains details for one aspect of the current + state of this API Resource.\n---\nThis struct is intended for + direct use as an array at the field path .status.conditions. For + example,\n\n\n\ttype FooStatus struct{\n\t // Represents the + observations of a foo's current state.\n\t // Known .status.conditions.type + are: \"Available\", \"Progressing\", and \"Degraded\"\n\t // + +patchMergeKey=type\n\t // +patchStrategy=merge\n\t // +listType=map\n\t + \ // +listMapKey=type\n\t Conditions []metav1.Condition `json:\"conditions,omitempty\" + patchStrategy:\"merge\" patchMergeKey:\"type\" protobuf:\"bytes,1,rep,name=conditions\"`\n\n\n\t + \ // other fields\n\t}" + properties: + lastTransitionTime: + description: |- + lastTransitionTime is the last time the condition transitioned from one status to another. + This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: |- + message is a human readable message indicating details about the transition. + This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: |- + observedGeneration represents the .metadata.generation that the condition was set based upon. + For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date + with respect to the current state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: |- + reason contains a programmatic identifier indicating the reason for the condition's last transition. + Producers of specific condition types may define expected values and meanings for this field, + and whether the values are considered a guaranteed API. + The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: |- + type of condition in CamelCase or in foo.example.com/CamelCase. + --- + Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be + useful (see .node.status.conditions), the ability to deconflict is important. + The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array + state: + description: Service state + type: string + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/charts/aiven-operator/templates/cluster_role.yaml b/charts/aiven-operator/templates/cluster_role.yaml index b8947b9f..5488d1f6 100644 --- a/charts/aiven-operator/templates/cluster_role.yaml +++ b/charts/aiven-operator/templates/cluster_role.yaml @@ -250,6 +250,34 @@ rules: - get - patch - update + - apiGroups: + - aiven.io + resources: + - flinks + verbs: + - create + - delete + - get + - list + - patch + - update + - watch + - apiGroups: + - aiven.io + resources: + - flinks/finalizers + verbs: + - create + - get + - update + - apiGroups: + - aiven.io + resources: + - flinks/status + verbs: + - get + - patch + - update - apiGroups: - aiven.io resources: diff --git a/charts/aiven-operator/templates/mutating_webhook_configuration.yaml b/charts/aiven-operator/templates/mutating_webhook_configuration.yaml index b2b294f0..8f476f85 100644 --- a/charts/aiven-operator/templates/mutating_webhook_configuration.yaml +++ b/charts/aiven-operator/templates/mutating_webhook_configuration.yaml @@ -93,6 +93,27 @@ webhooks: - databases sideEffects: None {{- include "aiven-operator.webhookNamespaceSelector" . | indent 4 }} + - admissionReviewVersions: + - v1 + clientConfig: + service: + name: {{ include "aiven-operator.fullname" . }}-webhook-service + namespace: {{ include "aiven-operator.namespace" . }} + path: /mutate-aiven-io-v1alpha1-flink + failurePolicy: Fail + name: mflink.kb.io + rules: + - apiGroups: + - aiven.io + apiVersions: + - v1alpha1 + operations: + - CREATE + - UPDATE + resources: + - flinks + sideEffects: None + {{- include "aiven-operator.webhookNamespaceSelector" . | indent 4 }} - admissionReviewVersions: - v1 clientConfig: diff --git a/charts/aiven-operator/templates/validating_webhook_configuration.yaml b/charts/aiven-operator/templates/validating_webhook_configuration.yaml index 966073f7..db5832ef 100644 --- a/charts/aiven-operator/templates/validating_webhook_configuration.yaml +++ b/charts/aiven-operator/templates/validating_webhook_configuration.yaml @@ -97,6 +97,28 @@ webhooks: - databases sideEffects: None {{- include "aiven-operator.webhookNamespaceSelector" . | indent 4 }} + - admissionReviewVersions: + - v1 + clientConfig: + service: + name: {{ include "aiven-operator.fullname" . }}-webhook-service + namespace: {{ include "aiven-operator.namespace" . }} + path: /validate-aiven-io-v1alpha1-flink + failurePolicy: Fail + name: vflink.kb.io + rules: + - apiGroups: + - aiven.io + apiVersions: + - v1alpha1 + operations: + - CREATE + - UPDATE + - DELETE + resources: + - flinks + sideEffects: None + {{- include "aiven-operator.webhookNamespaceSelector" . | indent 4 }} - admissionReviewVersions: - v1 clientConfig: diff --git a/config/crd/bases/aiven.io_flinks.yaml b/config/crd/bases/aiven.io_flinks.yaml new file mode 100644 index 00000000..4d886de4 --- /dev/null +++ b/config/crd/bases/aiven.io_flinks.yaml @@ -0,0 +1,410 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.15.0 + name: flinks.aiven.io +spec: + group: aiven.io + names: + kind: Flink + listKind: FlinkList + plural: flinks + singular: flink + scope: Namespaced + versions: + - additionalPrinterColumns: + - jsonPath: .spec.project + name: Project + type: string + - jsonPath: .spec.cloudName + name: Region + type: string + - jsonPath: .spec.plan + name: Plan + type: string + - jsonPath: .status.state + name: State + type: string + name: v1alpha1 + schema: + openAPIV3Schema: + description: |- + Flink is the Schema for the flinks API. + Info "Exposes secret keys": `FLINK_HOST`, `FLINK_PORT`, `FLINK_USER`, `FLINK_PASSWORD`, `FLINK_URI`, `FLINK_HOSTS` + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: FlinkSpec defines the desired state of Flink + properties: + authSecretRef: + description: Authentication reference to Aiven token in a secret + properties: + key: + minLength: 1 + type: string + name: + minLength: 1 + type: string + required: + - key + - name + type: object + cloudName: + description: Cloud the service runs in. + maxLength: 256 + type: string + connInfoSecretTarget: + description: Secret configuration. + properties: + annotations: + additionalProperties: + type: string + description: Annotations added to the secret + type: object + x-kubernetes-preserve-unknown-fields: true + labels: + additionalProperties: + type: string + description: Labels added to the secret + type: object + x-kubernetes-preserve-unknown-fields: true + name: + description: + Name of the secret resource to be created. By default, + it is equal to the resource name + type: string + x-kubernetes-validations: + - message: Value is immutable + rule: self == oldSelf + prefix: + description: |- + Prefix for the secret's keys. + Added "as is" without any transformations. + By default, is equal to the kind name in uppercase + underscore, e.g. `KAFKA_`, `REDIS_`, etc. + type: string + required: + - name + type: object + connInfoSecretTargetDisabled: + description: + When true, the secret containing connection information + will not be created, defaults to false. This field cannot be changed + after resource creation. + type: boolean + x-kubernetes-validations: + - message: connInfoSecretTargetDisabled is immutable. + rule: self == oldSelf + disk_space: + description: |- + The disk space of the service, possible values depend on the service type, the cloud provider and the project. + Reducing will result in the service re-balancing. + The removal of this field does not change the value. + pattern: (?i)^[1-9][0-9]*(GiB|G)?$ + type: string + maintenanceWindowDow: + description: + Day of week when maintenance operations should be performed. + One monday, tuesday, wednesday, etc. + enum: + - monday + - tuesday + - wednesday + - thursday + - friday + - saturday + - sunday + type: string + maintenanceWindowTime: + description: + Time of day when maintenance operations should be performed. + UTC time in HH:mm:ss format. + maxLength: 8 + type: string + plan: + description: Subscription plan. + maxLength: 128 + type: string + project: + description: Identifies the project this resource belongs to + maxLength: 63 + pattern: ^[a-zA-Z0-9_-]+$ + type: string + x-kubernetes-validations: + - message: Value is immutable + rule: self == oldSelf + projectVPCRef: + description: + ProjectVPCRef reference to ProjectVPC resource to use + its ID as ProjectVPCID automatically + properties: + name: + minLength: 1 + type: string + namespace: + minLength: 1 + type: string + required: + - name + type: object + projectVpcId: + description: Identifier of the VPC the service should be in, if any. + maxLength: 36 + type: string + serviceIntegrations: + description: + Service integrations to specify when creating a service. + Not applied after initial service creation + items: + description: + Service integrations to specify when creating a service. + Not applied after initial service creation + properties: + integrationType: + enum: + - read_replica + type: string + sourceServiceName: + maxLength: 64 + minLength: 1 + type: string + required: + - integrationType + - sourceServiceName + type: object + maxItems: 1 + type: array + x-kubernetes-validations: + - message: Value is immutable + rule: self == oldSelf + tags: + additionalProperties: + type: string + description: + Tags are key-value pairs that allow you to categorize + services. + type: object + technicalEmails: + description: + Defines the email addresses that will receive alerts + about upcoming maintenance updates or warnings about service instability. + items: + properties: + email: + description: Email address. + pattern: ^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$ + type: string + required: + - email + type: object + maxItems: 10 + type: array + terminationProtection: + description: + Prevent service from being deleted. It is recommended + to have this enabled for all services. + type: boolean + userConfig: + description: Cassandra specific user configuration options + properties: + additional_backup_regions: + description: Deprecated. Additional Cloud Regions for Backup Replication + items: + type: string + maxItems: 1 + type: array + flink_version: + description: Flink major version + enum: + - "1.19" + type: string + x-kubernetes-validations: + - message: Value is immutable + rule: self == oldSelf + ip_filter: + description: + Allow incoming connections from CIDR address block, + e.g. '10.20.0.0/16' + items: + description: + CIDR address block, either as a string, or in a + dict with an optional description field + properties: + description: + description: Description for IP filter list entry + maxLength: 1024 + type: string + network: + description: CIDR address block + maxLength: 43 + type: string + required: + - network + type: object + maxItems: 1024 + type: array + number_of_task_slots: + description: + Task slots per node. For a 3 node plan, total number + of task slots is 3x this value + maximum: 1024 + minimum: 1 + type: integer + pekko_ask_timeout_s: + description: + Timeout in seconds used for all futures and blocking + Pekko requests + maximum: 60 + minimum: 5 + type: integer + pekko_framesize_b: + description: + Maximum size in bytes for messages exchanged between + the JobManager and the TaskManagers + maximum: 52428800 + minimum: 1048576 + type: integer + privatelink_access: + description: + Allow access to selected service components through + Privatelink + properties: + flink: + description: Enable flink + type: boolean + prometheus: + description: Enable prometheus + type: boolean + type: object + public_access: + description: + Allow access to selected service ports from the public + Internet + properties: + flink: + description: + Allow clients to connect to flink from the public + internet for service nodes that are in a project VPC or + another type of private network + type: boolean + type: object + service_log: + description: + Store logs for the service so that they are available + in the HTTP API and console. + type: boolean + static_ips: + description: Use static public IP addresses + type: boolean + type: object + required: + - plan + - project + type: object + x-kubernetes-validations: + - message: + connInfoSecretTargetDisabled can only be set during resource + creation. + rule: has(oldSelf.connInfoSecretTargetDisabled) == has(self.connInfoSecretTargetDisabled) + status: + description: ServiceStatus defines the observed state of service + properties: + conditions: + description: + Conditions represent the latest available observations + of a service state + items: + description: + "Condition contains details for one aspect of the current + state of this API Resource.\n---\nThis struct is intended for + direct use as an array at the field path .status.conditions. For + example,\n\n\n\ttype FooStatus struct{\n\t // Represents the + observations of a foo's current state.\n\t // Known .status.conditions.type + are: \"Available\", \"Progressing\", and \"Degraded\"\n\t // + +patchMergeKey=type\n\t // +patchStrategy=merge\n\t // +listType=map\n\t + \ // +listMapKey=type\n\t Conditions []metav1.Condition `json:\"conditions,omitempty\" + patchStrategy:\"merge\" patchMergeKey:\"type\" protobuf:\"bytes,1,rep,name=conditions\"`\n\n\n\t + \ // other fields\n\t}" + properties: + lastTransitionTime: + description: |- + lastTransitionTime is the last time the condition transitioned from one status to another. + This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: |- + message is a human readable message indicating details about the transition. + This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: |- + observedGeneration represents the .metadata.generation that the condition was set based upon. + For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date + with respect to the current state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: |- + reason contains a programmatic identifier indicating the reason for the condition's last transition. + Producers of specific condition types may define expected values and meanings for this field, + and whether the values are considered a guaranteed API. + The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: |- + type of condition in CamelCase or in foo.example.com/CamelCase. + --- + Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be + useful (see .node.status.conditions), the ability to deconflict is important. + The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array + state: + description: Service state + type: string + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/config/crd/kustomization.yaml b/config/crd/kustomization.yaml index 4e22cb22..5c66e8c6 100644 --- a/config/crd/kustomization.yaml +++ b/config/crd/kustomization.yaml @@ -27,6 +27,7 @@ resources: - bases/aiven.io_clickhouseroles.yaml - bases/aiven.io_clickhousegrants.yaml - bases/aiven.io_serviceintegrationendpoints.yaml + - bases/aiven.io_flinks.yaml #+kubebuilder:scaffold:crdkustomizeresource patchesStrategicMerge: @@ -51,6 +52,7 @@ patchesStrategicMerge: - patches/webhook_in_cassandras.yaml - patches/webhook_in_grafanas.yaml - patches/webhook_in_serviceintegrationendpoints.yaml + - patches/webhook_in_flinks.yaml #+kubebuilder:scaffold:crdkustomizewebhookpatch # [CERTMANAGER] To enable cert-manager, uncomment all the sections with [CERTMANAGER] prefix. @@ -74,6 +76,7 @@ patchesStrategicMerge: - patches/cainjection_in_cassandras.yaml - patches/cainjection_in_grafanas.yaml - patches/cainjection_in_serviceintegrationendpoints.yaml + - patches/cainjection_in_flinks.yaml #+kubebuilder:scaffold:crdkustomizecainjectionpatch # the following config is for teaching kustomize how to do kustomization for CRDs. diff --git a/config/crd/patches/cainjection_in_flinks.yaml b/config/crd/patches/cainjection_in_flinks.yaml new file mode 100644 index 00000000..e8d7c275 --- /dev/null +++ b/config/crd/patches/cainjection_in_flinks.yaml @@ -0,0 +1,7 @@ +# The following patch adds a directive for certmanager to inject CA into the CRD +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME) + name: flinks.aiven.io diff --git a/config/crd/patches/webhook_in_flinks.yaml b/config/crd/patches/webhook_in_flinks.yaml new file mode 100644 index 00000000..13d42cdf --- /dev/null +++ b/config/crd/patches/webhook_in_flinks.yaml @@ -0,0 +1,16 @@ +# The following patch enables a conversion webhook for the CRD +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: flinks.aiven.io +spec: + conversion: + strategy: Webhook + webhook: + clientConfig: + service: + namespace: system + name: webhook-service + path: /convert + conversionReviewVersions: + - v1 diff --git a/config/rbac/flink_editor_role.yaml b/config/rbac/flink_editor_role.yaml new file mode 100644 index 00000000..a960b16b --- /dev/null +++ b/config/rbac/flink_editor_role.yaml @@ -0,0 +1,31 @@ +# permissions for end users to edit flinks. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + labels: + app.kubernetes.io/name: clusterrole + app.kubernetes.io/instance: flink-editor-role + app.kubernetes.io/component: rbac + app.kubernetes.io/created-by: aiven-operator + app.kubernetes.io/part-of: aiven-operator + app.kubernetes.io/managed-by: kustomize + name: flink-editor-role +rules: + - apiGroups: + - aiven.io + resources: + - flinks + verbs: + - create + - delete + - get + - list + - patch + - update + - watch + - apiGroups: + - aiven.io + resources: + - flinks/status + verbs: + - get diff --git a/config/rbac/flink_viewer_role.yaml b/config/rbac/flink_viewer_role.yaml new file mode 100644 index 00000000..7860db94 --- /dev/null +++ b/config/rbac/flink_viewer_role.yaml @@ -0,0 +1,27 @@ +# permissions for end users to view flinks. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + labels: + app.kubernetes.io/name: clusterrole + app.kubernetes.io/instance: flink-viewer-role + app.kubernetes.io/component: rbac + app.kubernetes.io/created-by: aiven-operator + app.kubernetes.io/part-of: aiven-operator + app.kubernetes.io/managed-by: kustomize + name: flink-viewer-role +rules: + - apiGroups: + - aiven.io + resources: + - flinks + verbs: + - get + - list + - watch + - apiGroups: + - aiven.io + resources: + - flinks/status + verbs: + - get diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index beccaf6a..a187ebf6 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -247,6 +247,34 @@ rules: - get - patch - update + - apiGroups: + - aiven.io + resources: + - flinks + verbs: + - create + - delete + - get + - list + - patch + - update + - watch + - apiGroups: + - aiven.io + resources: + - flinks/finalizers + verbs: + - create + - get + - update + - apiGroups: + - aiven.io + resources: + - flinks/status + verbs: + - get + - patch + - update - apiGroups: - aiven.io resources: diff --git a/config/samples/_v1alpha1_flink.yaml b/config/samples/_v1alpha1_flink.yaml new file mode 100644 index 00000000..e7b44a94 --- /dev/null +++ b/config/samples/_v1alpha1_flink.yaml @@ -0,0 +1,12 @@ +apiVersion: aiven.io/v1alpha1 +kind: Flink +metadata: + labels: + app.kubernetes.io/name: flink + app.kubernetes.io/instance: flink-sample + app.kubernetes.io/part-of: aiven-operator + app.kubernetes.io/managed-by: kustomize + app.kubernetes.io/created-by: aiven-operator + name: flink-sample +spec: + # TODO(user): Add fields here diff --git a/config/samples/kustomization.yaml b/config/samples/kustomization.yaml index 8dc58df7..5d4b02b8 100644 --- a/config/samples/kustomization.yaml +++ b/config/samples/kustomization.yaml @@ -24,4 +24,5 @@ resources: - _v1alpha1_kafkaschemaregistryacl.yaml - _v1alpha1_clickhouserole.yaml - _v1alpha1_serviceintegrationendpoint.yaml + - _v1alpha1_flink.yaml #+kubebuilder:scaffold:manifestskustomizesamples diff --git a/config/webhook/manifests.yaml b/config/webhook/manifests.yaml index f5f6dc39..90621fa8 100644 --- a/config/webhook/manifests.yaml +++ b/config/webhook/manifests.yaml @@ -84,6 +84,26 @@ webhooks: resources: - databases sideEffects: None + - admissionReviewVersions: + - v1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /mutate-aiven-io-v1alpha1-flink + failurePolicy: Fail + name: mflink.kb.io + rules: + - apiGroups: + - aiven.io + apiVersions: + - v1alpha1 + operations: + - CREATE + - UPDATE + resources: + - flinks + sideEffects: None - admissionReviewVersions: - v1 clientConfig: @@ -474,6 +494,27 @@ webhooks: resources: - databases sideEffects: None + - admissionReviewVersions: + - v1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /validate-aiven-io-v1alpha1-flink + failurePolicy: Fail + name: vflink.kb.io + rules: + - apiGroups: + - aiven.io + apiVersions: + - v1alpha1 + operations: + - CREATE + - UPDATE + - DELETE + resources: + - flinks + sideEffects: None - admissionReviewVersions: - v1 clientConfig: diff --git a/controllers/flink_controller.go b/controllers/flink_controller.go new file mode 100644 index 00000000..7af49dd0 --- /dev/null +++ b/controllers/flink_controller.go @@ -0,0 +1,97 @@ +// Copyright (c) 2024 Aiven, Helsinki, Finland. https://aiven.io/ + +package controllers + +import ( + "context" + "fmt" + "strings" + + "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/service" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/aiven/aiven-operator/api/v1alpha1" +) + +// FlinkReconciler reconciles a Flink object +type FlinkReconciler struct { + Controller +} + +func newFlinkReconciler(c Controller) reconcilerType { + return &FlinkReconciler{Controller: c} +} + +//+kubebuilder:rbac:groups=aiven.io,resources=flinks,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=aiven.io,resources=flinks/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=aiven.io,resources=flinks/finalizers,verbs=get;create;update + +func (r *FlinkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + return r.reconcileInstance(ctx, req, newGenericServiceHandler(newFlinkAdapter), &v1alpha1.Flink{}) +} + +// SetupWithManager sets up the controller with the Manager. +func (r *FlinkReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&v1alpha1.Flink{}). + Owns(&corev1.Secret{}). + Complete(r) +} + +func newFlinkAdapter(_ *aiven.Client, object client.Object) (serviceAdapter, error) { + flink, ok := object.(*v1alpha1.Flink) + if !ok { + return nil, fmt.Errorf("object is not of type v1alpha1.Flink") + } + return &flinkAdapter{flink}, nil +} + +// flinkAdapter handles an Aiven Flink service +type flinkAdapter struct { + *v1alpha1.Flink +} + +func (a *flinkAdapter) getObjectMeta() *metav1.ObjectMeta { + return &a.ObjectMeta +} + +func (a *flinkAdapter) getServiceStatus() *v1alpha1.ServiceStatus { + return &a.Status +} + +func (a *flinkAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec { + return &a.Spec.ServiceCommonSpec +} + +func (a *flinkAdapter) getUserConfig() any { + return a.Spec.UserConfig +} + +func (a *flinkAdapter) newSecret(ctx context.Context, s *service.ServiceGetOut) (*corev1.Secret, error) { + stringData := map[string]string{ + "HOST": s.ServiceUriParams["host"], + "USER": s.ServiceUriParams["user"], + "PASSWORD": s.ServiceUriParams["password"], + "URI": s.ServiceUri, + "HOSTS": strings.Join(s.ConnectionInfo.Flink, ","), + } + + return newSecret(a, stringData, true), nil +} + +func (a *flinkAdapter) getServiceType() string { + return "flink" +} + +func (a *flinkAdapter) getDiskSpace() string { + return a.Spec.DiskSpace +} + +func (a *flinkAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn avngen.Client, old *service.ServiceGetOut) error { + return nil +} diff --git a/controllers/setup.go b/controllers/setup.go index 1650214d..1c39b9c3 100644 --- a/controllers/setup.go +++ b/controllers/setup.go @@ -32,6 +32,7 @@ func SetupControllers(mgr ctrl.Manager, defaultToken, kubeVersion, operatorVersi "ClickhouseGrant": newClickhouseGrantReconciler, "ConnectionPool": newConnectionPoolReconciler, "Database": newDatabaseReconciler, + "Flink": newFlinkReconciler, "Grafana": newGrafanaReconciler, "Kafka": newKafkaReconciler, "KafkaACL": newKafkaACLReconciler, diff --git a/docs/docs/api-reference/flink.md b/docs/docs/api-reference/flink.md new file mode 100644 index 00000000..d34ad2c0 --- /dev/null +++ b/docs/docs/api-reference/flink.md @@ -0,0 +1,166 @@ +--- +title: "Flink" +--- + +## Flink {: #Flink } + +Flink is the Schema for the flinks API. + +!!! Info "Exposes secret keys" + + `FLINK_HOST`, `FLINK_PORT`, `FLINK_USER`, `FLINK_PASSWORD`, `FLINK_URI`, `FLINK_HOSTS`. + +**Required** + +- [`apiVersion`](#apiVersion-property){: name='apiVersion-property'} (string). Value `aiven.io/v1alpha1`. +- [`kind`](#kind-property){: name='kind-property'} (string). Value `Flink`. +- [`metadata`](#metadata-property){: name='metadata-property'} (object). Data that identifies the object, including a `name` string and optional `namespace`. +- [`spec`](#spec-property){: name='spec-property'} (object). FlinkSpec defines the desired state of Flink. See below for [nested schema](#spec). + +## spec {: #spec } + +_Appears on [`Flink`](#Flink)._ + +FlinkSpec defines the desired state of Flink. + +**Required** + +- [`plan`](#spec.plan-property){: name='spec.plan-property'} (string, MaxLength: 128). Subscription plan. +- [`project`](#spec.project-property){: name='spec.project-property'} (string, Immutable, Pattern: `^[a-zA-Z0-9_-]+$`, MaxLength: 63). Identifies the project this resource belongs to. + +**Optional** + +- [`authSecretRef`](#spec.authSecretRef-property){: name='spec.authSecretRef-property'} (object). Authentication reference to Aiven token in a secret. See below for [nested schema](#spec.authSecretRef). +- [`cloudName`](#spec.cloudName-property){: name='spec.cloudName-property'} (string, MaxLength: 256). Cloud the service runs in. +- [`connInfoSecretTarget`](#spec.connInfoSecretTarget-property){: name='spec.connInfoSecretTarget-property'} (object). Secret configuration. See below for [nested schema](#spec.connInfoSecretTarget). +- [`connInfoSecretTargetDisabled`](#spec.connInfoSecretTargetDisabled-property){: name='spec.connInfoSecretTargetDisabled-property'} (boolean, Immutable). When true, the secret containing connection information will not be created, defaults to false. This field cannot be changed after resource creation. +- [`disk_space`](#spec.disk_space-property){: name='spec.disk_space-property'} (string, Pattern: `(?i)^[1-9][0-9]*(GiB|G)?$`). The disk space of the service, possible values depend on the service type, the cloud provider and the project. +Reducing will result in the service re-balancing. +The removal of this field does not change the value. +- [`maintenanceWindowDow`](#spec.maintenanceWindowDow-property){: name='spec.maintenanceWindowDow-property'} (string, Enum: `monday`, `tuesday`, `wednesday`, `thursday`, `friday`, `saturday`, `sunday`). Day of week when maintenance operations should be performed. One monday, tuesday, wednesday, etc. +- [`maintenanceWindowTime`](#spec.maintenanceWindowTime-property){: name='spec.maintenanceWindowTime-property'} (string, MaxLength: 8). Time of day when maintenance operations should be performed. UTC time in HH:mm:ss format. +- [`projectVPCRef`](#spec.projectVPCRef-property){: name='spec.projectVPCRef-property'} (object). ProjectVPCRef reference to ProjectVPC resource to use its ID as ProjectVPCID automatically. See below for [nested schema](#spec.projectVPCRef). +- [`projectVpcId`](#spec.projectVpcId-property){: name='spec.projectVpcId-property'} (string, MaxLength: 36). Identifier of the VPC the service should be in, if any. +- [`serviceIntegrations`](#spec.serviceIntegrations-property){: name='spec.serviceIntegrations-property'} (array of objects, Immutable, MaxItems: 1). Service integrations to specify when creating a service. Not applied after initial service creation. See below for [nested schema](#spec.serviceIntegrations). +- [`tags`](#spec.tags-property){: name='spec.tags-property'} (object, AdditionalProperties: string). Tags are key-value pairs that allow you to categorize services. +- [`technicalEmails`](#spec.technicalEmails-property){: name='spec.technicalEmails-property'} (array of objects, MaxItems: 10). Defines the email addresses that will receive alerts about upcoming maintenance updates or warnings about service instability. See below for [nested schema](#spec.technicalEmails). +- [`terminationProtection`](#spec.terminationProtection-property){: name='spec.terminationProtection-property'} (boolean). Prevent service from being deleted. It is recommended to have this enabled for all services. +- [`userConfig`](#spec.userConfig-property){: name='spec.userConfig-property'} (object). Cassandra specific user configuration options. See below for [nested schema](#spec.userConfig). + +## authSecretRef {: #spec.authSecretRef } + +_Appears on [`spec`](#spec)._ + +Authentication reference to Aiven token in a secret. + +**Required** + +- [`key`](#spec.authSecretRef.key-property){: name='spec.authSecretRef.key-property'} (string, MinLength: 1). +- [`name`](#spec.authSecretRef.name-property){: name='spec.authSecretRef.name-property'} (string, MinLength: 1). + +## connInfoSecretTarget {: #spec.connInfoSecretTarget } + +_Appears on [`spec`](#spec)._ + +Secret configuration. + +**Required** + +- [`name`](#spec.connInfoSecretTarget.name-property){: name='spec.connInfoSecretTarget.name-property'} (string, Immutable). Name of the secret resource to be created. By default, it is equal to the resource name. + +**Optional** + +- [`annotations`](#spec.connInfoSecretTarget.annotations-property){: name='spec.connInfoSecretTarget.annotations-property'} (object, AdditionalProperties: string). Annotations added to the secret. +- [`labels`](#spec.connInfoSecretTarget.labels-property){: name='spec.connInfoSecretTarget.labels-property'} (object, AdditionalProperties: string). Labels added to the secret. +- [`prefix`](#spec.connInfoSecretTarget.prefix-property){: name='spec.connInfoSecretTarget.prefix-property'} (string). Prefix for the secret's keys. +Added "as is" without any transformations. +By default, is equal to the kind name in uppercase + underscore, e.g. `KAFKA_`, `REDIS_`, etc. + +## projectVPCRef {: #spec.projectVPCRef } + +_Appears on [`spec`](#spec)._ + +ProjectVPCRef reference to ProjectVPC resource to use its ID as ProjectVPCID automatically. + +**Required** + +- [`name`](#spec.projectVPCRef.name-property){: name='spec.projectVPCRef.name-property'} (string, MinLength: 1). + +**Optional** + +- [`namespace`](#spec.projectVPCRef.namespace-property){: name='spec.projectVPCRef.namespace-property'} (string, MinLength: 1). + +## serviceIntegrations {: #spec.serviceIntegrations } + +_Appears on [`spec`](#spec)._ + +Service integrations to specify when creating a service. Not applied after initial service creation. + +**Required** + +- [`integrationType`](#spec.serviceIntegrations.integrationType-property){: name='spec.serviceIntegrations.integrationType-property'} (string, Enum: `read_replica`). +- [`sourceServiceName`](#spec.serviceIntegrations.sourceServiceName-property){: name='spec.serviceIntegrations.sourceServiceName-property'} (string, MinLength: 1, MaxLength: 64). + +## technicalEmails {: #spec.technicalEmails } + +_Appears on [`spec`](#spec)._ + +Defines the email addresses that will receive alerts about upcoming maintenance updates or warnings about service instability. + +**Required** + +- [`email`](#spec.technicalEmails.email-property){: name='spec.technicalEmails.email-property'} (string). Email address. + +## userConfig {: #spec.userConfig } + +_Appears on [`spec`](#spec)._ + +Cassandra specific user configuration options. + +**Optional** + +- [`additional_backup_regions`](#spec.userConfig.additional_backup_regions-property){: name='spec.userConfig.additional_backup_regions-property'} (array of strings, MaxItems: 1). Deprecated. Additional Cloud Regions for Backup Replication. +- [`flink_version`](#spec.userConfig.flink_version-property){: name='spec.userConfig.flink_version-property'} (string, Enum: `1.19`, Immutable). Flink major version. +- [`ip_filter`](#spec.userConfig.ip_filter-property){: name='spec.userConfig.ip_filter-property'} (array of objects, MaxItems: 1024). Allow incoming connections from CIDR address block, e.g. `10.20.0.0/16`. See below for [nested schema](#spec.userConfig.ip_filter). +- [`number_of_task_slots`](#spec.userConfig.number_of_task_slots-property){: name='spec.userConfig.number_of_task_slots-property'} (integer, Minimum: 1, Maximum: 1024). Task slots per node. For a 3 node plan, total number of task slots is 3x this value. +- [`pekko_ask_timeout_s`](#spec.userConfig.pekko_ask_timeout_s-property){: name='spec.userConfig.pekko_ask_timeout_s-property'} (integer, Minimum: 5, Maximum: 60). Timeout in seconds used for all futures and blocking Pekko requests. +- [`pekko_framesize_b`](#spec.userConfig.pekko_framesize_b-property){: name='spec.userConfig.pekko_framesize_b-property'} (integer, Minimum: 1048576, Maximum: 52428800). Maximum size in bytes for messages exchanged between the JobManager and the TaskManagers. +- [`privatelink_access`](#spec.userConfig.privatelink_access-property){: name='spec.userConfig.privatelink_access-property'} (object). Allow access to selected service components through Privatelink. See below for [nested schema](#spec.userConfig.privatelink_access). +- [`public_access`](#spec.userConfig.public_access-property){: name='spec.userConfig.public_access-property'} (object). Allow access to selected service ports from the public Internet. See below for [nested schema](#spec.userConfig.public_access). +- [`service_log`](#spec.userConfig.service_log-property){: name='spec.userConfig.service_log-property'} (boolean). Store logs for the service so that they are available in the HTTP API and console. +- [`static_ips`](#spec.userConfig.static_ips-property){: name='spec.userConfig.static_ips-property'} (boolean). Use static public IP addresses. + +### ip_filter {: #spec.userConfig.ip_filter } + +_Appears on [`spec.userConfig`](#spec.userConfig)._ + +CIDR address block, either as a string, or in a dict with an optional description field. + +**Required** + +- [`network`](#spec.userConfig.ip_filter.network-property){: name='spec.userConfig.ip_filter.network-property'} (string, MaxLength: 43). CIDR address block. + +**Optional** + +- [`description`](#spec.userConfig.ip_filter.description-property){: name='spec.userConfig.ip_filter.description-property'} (string, MaxLength: 1024). Description for IP filter list entry. + +### privatelink_access {: #spec.userConfig.privatelink_access } + +_Appears on [`spec.userConfig`](#spec.userConfig)._ + +Allow access to selected service components through Privatelink. + +**Optional** + +- [`flink`](#spec.userConfig.privatelink_access.flink-property){: name='spec.userConfig.privatelink_access.flink-property'} (boolean). Enable flink. +- [`prometheus`](#spec.userConfig.privatelink_access.prometheus-property){: name='spec.userConfig.privatelink_access.prometheus-property'} (boolean). Enable prometheus. + +### public_access {: #spec.userConfig.public_access } + +_Appears on [`spec.userConfig`](#spec.userConfig)._ + +Allow access to selected service ports from the public Internet. + +**Required** + +- [`flink`](#spec.userConfig.public_access.flink-property){: name='spec.userConfig.public_access.flink-property'} (boolean). Allow clients to connect to flink from the public internet for service nodes that are in a project VPC or another type of private network. diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index b0f1f1f2..b84a3868 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -96,6 +96,7 @@ nav: - api-reference/clickhousegrant.md - api-reference/connectionpool.md - api-reference/database.md + - api-reference/flink.md - api-reference/grafana.md - api-reference/kafka.md - api-reference/kafkaacl.md diff --git a/main.go b/main.go index 27252e06..da706762 100644 --- a/main.go +++ b/main.go @@ -24,7 +24,7 @@ import ( //+kubebuilder:scaffold:imports ) -//go:generate go run ./generators/userconfigs/... --services mysql,cassandra,grafana,pg,kafka,redis,clickhouse,opensearch,kafka_connect +//go:generate go run ./generators/userconfigs/... --services mysql,cassandra,flink,grafana,pg,kafka,redis,clickhouse,opensearch,kafka_connect //go:generate go run ./generators/userconfigs/... --integrations autoscaler,clickhouse_kafka,clickhouse_postgresql,datadog,kafka_connect,kafka_logs,kafka_mirrormaker,logs,metrics,external_aws_cloudwatch_metrics //go:generate go run ./generators/userconfigs/... --integration-endpoints autoscaler,datadog,external_aws_cloudwatch_logs,external_aws_cloudwatch_metrics,external_elasticsearch_logs,external_google_cloud_bigquery,external_google_cloud_logging,external_kafka,external_opensearch_logs,external_postgresql,external_schema_registry,jolokia,prometheus,rsyslog diff --git a/tests/flink_test.go b/tests/flink_test.go new file mode 100644 index 00000000..8a9638e9 --- /dev/null +++ b/tests/flink_test.go @@ -0,0 +1,106 @@ +package tests + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/aiven/aiven-operator/api/v1alpha1" + flinkuserconfig "github.com/aiven/aiven-operator/api/v1alpha1/userconfig/service/flink" +) + +func getFlinkYaml(project, name, cloudName string) string { + return fmt.Sprintf(` +apiVersion: aiven.io/v1alpha1 +kind: Flink +metadata: + name: %[2]s +spec: + authSecretRef: + name: aiven-token + key: token + + project: %[1]s + cloudName: %[3]s + plan: business-4 + + tags: + env: test + instance: foo + + userConfig: + number_of_task_slots: 10 + ip_filter: + - network: 0.0.0.0/32 + description: bar + - network: 10.20.0.0/16 + +`, project, name, cloudName) +} + +func TestFlink(t *testing.T) { + t.Parallel() + defer recoverPanic(t) + + // GIVEN + ctx, cancel := testCtx() + defer cancel() + + name := randName("flink") + yml := getFlinkYaml(cfg.Project, name, cfg.PrimaryCloudName) + s := NewSession(ctx, k8sClient, cfg.Project) + + // Cleans test afterward + defer s.Destroy(t) + + // WHEN + // Applies given manifest + require.NoError(t, s.Apply(yml)) + + // Waits kube objects + flink := new(v1alpha1.Flink) + require.NoError(t, s.GetRunning(flink, name)) + + // THEN + flinkAvn, err := avnGen.ServiceGet(ctx, cfg.Project, name) + require.NoError(t, err) + assert.Equal(t, flinkAvn.ServiceName, flink.GetName()) + assert.Equal(t, serviceRunningState, flink.Status.State) + assert.Contains(t, serviceRunningStatesAiven, flinkAvn.State) + assert.Equal(t, flinkAvn.Plan, flink.Spec.Plan) + assert.Equal(t, flinkAvn.CloudName, flink.Spec.CloudName) + assert.Equal(t, map[string]string{"env": "test", "instance": "foo"}, flink.Spec.Tags) + flinkResp, err := avnClient.ServiceTags.Get(ctx, cfg.Project, name) + require.NoError(t, err) + assert.Equal(t, flinkResp.Tags, flink.Spec.Tags) + + // UserConfig test + assert.Equal(t, anyPointer(10), flink.Spec.UserConfig.NumberOfTaskSlots) + + // Validates ip filters + require.Len(t, flink.Spec.UserConfig.IpFilter, 2) + + // First entry + assert.Equal(t, "0.0.0.0/32", flink.Spec.UserConfig.IpFilter[0].Network) + assert.Equal(t, "bar", *flink.Spec.UserConfig.IpFilter[0].Description) + + // Second entry + assert.Equal(t, "10.20.0.0/16", flink.Spec.UserConfig.IpFilter[1].Network) + assert.Nil(t, flink.Spec.UserConfig.IpFilter[1].Description) + + // Compares with Aiven ip_filter + var ipFilterAvn []*flinkuserconfig.IpFilter + require.NoError(t, castInterface(flinkAvn.UserConfig["ip_filter"], &ipFilterAvn)) + assert.Equal(t, ipFilterAvn, flink.Spec.UserConfig.IpFilter) + + // Secrets test + secret, err := s.GetSecret(flink.GetName()) + require.NoError(t, err) + assert.NotEmpty(t, secret.Data["FLINK_HOST"]) + assert.NotEmpty(t, secret.Data["FLINK_USER"]) + assert.NotEmpty(t, secret.Data["FLINK_PASSWORD"]) + assert.NotEmpty(t, secret.Data["FLINK_URI"]) + assert.NotEmpty(t, secret.Data["FLINK_HOSTS"]) +} From 8c256e320ba3d93c10a1fc02a4c6c0d79d571393 Mon Sep 17 00:00:00 2001 From: Timo Riski Date: Thu, 21 Nov 2024 10:07:21 +0200 Subject: [PATCH 2/2] feat: add flink example and use it in tests --- docs/docs/api-reference/examples/flink.yaml | 29 ++++++++ docs/docs/api-reference/flink.md | 81 +++++++++++++++++++++ tests/flink_test.go | 46 +++--------- 3 files changed, 119 insertions(+), 37 deletions(-) create mode 100644 docs/docs/api-reference/examples/flink.yaml diff --git a/docs/docs/api-reference/examples/flink.yaml b/docs/docs/api-reference/examples/flink.yaml new file mode 100644 index 00000000..6e0e4550 --- /dev/null +++ b/docs/docs/api-reference/examples/flink.yaml @@ -0,0 +1,29 @@ +apiVersion: aiven.io/v1alpha1 +kind: Flink +metadata: + name: my-flink +spec: + authSecretRef: + name: aiven-token + key: token + + connInfoSecretTarget: + name: flink-secret + annotations: + foo: bar + labels: + baz: egg + + project: my-aiven-project + cloudName: google-europe-west1 + plan: business-4 + + maintenanceWindowDow: sunday + maintenanceWindowTime: 11:00:00 + + userConfig: + number_of_task_slots: 10 + ip_filter: + - network: 0.0.0.0/32 + description: whatever + - network: 10.20.0.0/16 diff --git a/docs/docs/api-reference/flink.md b/docs/docs/api-reference/flink.md index d34ad2c0..6629ed4f 100644 --- a/docs/docs/api-reference/flink.md +++ b/docs/docs/api-reference/flink.md @@ -2,6 +2,87 @@ title: "Flink" --- +## Usage example + +??? example + ```yaml + apiVersion: aiven.io/v1alpha1 + kind: Flink + metadata: + name: my-flink + spec: + authSecretRef: + name: aiven-token + key: token + + connInfoSecretTarget: + name: flink-secret + prefix: MY_SECRET_PREFIX_ + annotations: + foo: bar + labels: + baz: egg + + project: my-aiven-project + cloudName: google-europe-west1 + plan: business-4 + + maintenanceWindowDow: sunday + maintenanceWindowTime: 11:00:00 + + userConfig: + number_of_task_slots: 10 + ip_filter: + - network: 0.0.0.0 + description: whatever + - network: 10.20.0.0/16 + ``` + +!!! info + To create this resource, a `Secret` containing Aiven token must be [created](/aiven-operator/authentication.html) first. + +Apply the resource with: + +```shell +kubectl apply -f example.yaml +``` + +Verify the newly created `Flink`: + +```shell +kubectl get flinks my-flink +``` + +The output is similar to the following: +```shell +Name Project Region Plan State +my-flink my-aiven-project google-europe-west1 business-4 RUNNING +``` + +To view the details of the `Secret`, use the following command: +```shell +kubectl describe secret flink-secret +``` + +You can use the [jq](https://github.com/jqlang/jq) to quickly decode the `Secret`: + +```shell +kubectl get secret flink-secret -o json | jq '.data | map_values(@base64d)' +``` + +The output is similar to the following: + +```{ .json .no-copy } +{ + "FLINK_HOST": "", + "FLINK_PORT": "", + "FLINK_USER": "", + "FLINK_PASSWORD": "", + "FLINK_URI": "", + "FLINK_HOSTS": "", +} +``` + ## Flink {: #Flink } Flink is the Schema for the flinks API. diff --git a/tests/flink_test.go b/tests/flink_test.go index 8a9638e9..826e9992 100644 --- a/tests/flink_test.go +++ b/tests/flink_test.go @@ -1,7 +1,6 @@ package tests import ( - "fmt" "testing" "github.com/stretchr/testify/assert" @@ -11,35 +10,6 @@ import ( flinkuserconfig "github.com/aiven/aiven-operator/api/v1alpha1/userconfig/service/flink" ) -func getFlinkYaml(project, name, cloudName string) string { - return fmt.Sprintf(` -apiVersion: aiven.io/v1alpha1 -kind: Flink -metadata: - name: %[2]s -spec: - authSecretRef: - name: aiven-token - key: token - - project: %[1]s - cloudName: %[3]s - plan: business-4 - - tags: - env: test - instance: foo - - userConfig: - number_of_task_slots: 10 - ip_filter: - - network: 0.0.0.0/32 - description: bar - - network: 10.20.0.0/16 - -`, project, name, cloudName) -} - func TestFlink(t *testing.T) { t.Parallel() defer recoverPanic(t) @@ -49,7 +19,13 @@ func TestFlink(t *testing.T) { defer cancel() name := randName("flink") - yml := getFlinkYaml(cfg.Project, name, cfg.PrimaryCloudName) + yml, err := loadExampleYaml("flink.yaml", map[string]string{ + "google-europe-west1": cfg.PrimaryCloudName, + "my-aiven-project": cfg.Project, + "my-flink": name, + }) + require.NoError(t, err) + s := NewSession(ctx, k8sClient, cfg.Project) // Cleans test afterward @@ -71,10 +47,6 @@ func TestFlink(t *testing.T) { assert.Contains(t, serviceRunningStatesAiven, flinkAvn.State) assert.Equal(t, flinkAvn.Plan, flink.Spec.Plan) assert.Equal(t, flinkAvn.CloudName, flink.Spec.CloudName) - assert.Equal(t, map[string]string{"env": "test", "instance": "foo"}, flink.Spec.Tags) - flinkResp, err := avnClient.ServiceTags.Get(ctx, cfg.Project, name) - require.NoError(t, err) - assert.Equal(t, flinkResp.Tags, flink.Spec.Tags) // UserConfig test assert.Equal(t, anyPointer(10), flink.Spec.UserConfig.NumberOfTaskSlots) @@ -84,7 +56,7 @@ func TestFlink(t *testing.T) { // First entry assert.Equal(t, "0.0.0.0/32", flink.Spec.UserConfig.IpFilter[0].Network) - assert.Equal(t, "bar", *flink.Spec.UserConfig.IpFilter[0].Description) + assert.Equal(t, "whatever", *flink.Spec.UserConfig.IpFilter[0].Description) // Second entry assert.Equal(t, "10.20.0.0/16", flink.Spec.UserConfig.IpFilter[1].Network) @@ -96,7 +68,7 @@ func TestFlink(t *testing.T) { assert.Equal(t, ipFilterAvn, flink.Spec.UserConfig.IpFilter) // Secrets test - secret, err := s.GetSecret(flink.GetName()) + secret, err := s.GetSecret(flink.Spec.ConnInfoSecretTarget.Name) require.NoError(t, err) assert.NotEmpty(t, secret.Data["FLINK_HOST"]) assert.NotEmpty(t, secret.Data["FLINK_USER"])