From 4bbb24022905a6b5b46c95c93f2fd2e1272d8aac Mon Sep 17 00:00:00 2001 From: Alexandre Guitton Date: Mon, 29 Mar 2021 16:00:24 +0200 Subject: [PATCH] [Feature/Operator] Enable prometheus scrapping endpoint (#88) * manage configmap and secret for override * add reference and update samples * fix changelog * enable prometheus reporting task * append changelog * update doc and generated code * clean --- CHANGELOG.md | 7 + api/v1alpha1/common_types.go | 8 +- api/v1alpha1/nificluster_types.go | 34 +++- api/v1alpha1/zz_generated.deepcopy.go | 16 ++ .../bases/nifi.orange.com_nificlusters.yaml | 126 ++++++++++++ config/samples/nifi_v1alpha1_nificluster.yaml | 3 + config/samples/simplenificluster.yaml | 3 + .../reportingtask/reportingtask.go | 179 ++++++++++++++++++ pkg/errorfactory/errorfactory.go | 6 + pkg/nificlient/client.go | 7 + pkg/nificlient/reportingtask.go | 108 +++++++++++ pkg/resources/nifi/nifi.go | 43 +++++ pkg/resources/nifi/pod.go | 15 +- .../1_nifi_cluster/6_listeners_config.md | 5 +- 14 files changed, 544 insertions(+), 16 deletions(-) create mode 100644 pkg/clientwrappers/reportingtask/reportingtask.go create mode 100644 pkg/nificlient/reportingtask.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 683efbe75..0ecbc5f9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,15 +3,22 @@ ### Added - [PR #86](https://github.com/Orange-OpenSource/nifikop/pull/86) - **[Operator/Debugging]** Add events and improve HTTP calls error message +- [PR #87](https://github.com/Orange-OpenSource/nifikop/pull/87) - **[Operator/Configuration]** Allow to override the `.properties` files using a config map and/or a secret. +- [PR #87](https://github.com/Orange-OpenSource/nifikop/pull/87) - **[Operator/Configuration]** Allow to replace the `logback.xml` and `bootstrap_notification_service.xml` files using a config map or a secret. +- [PR #88](https://github.com/Orange-OpenSource/nifikop/pull/88) - **[Operator/Monitoring]** By choosing `prometheus` as type for an internal service in a NiFiCluster resource, the operator automatically creates the associated `reporting task`. ### Changed - [PR #85](https://github.com/Orange-OpenSource/nifikop/pull/85) - **[Operator/Dependencies]** Upgrade cert-manager & operator sdk dependencies +- [PR #87](https://github.com/Orange-OpenSource/nifikop/pull/87) - **[Operator/Configuration]** The node configuration files are no more stored in a configmap, but in a secret. + ### Deprecated - [PR #85](https://github.com/Orange-OpenSource/nifikop/pull/85) - **[Operator/Finalizers]** The finalizer name format suggested by [Kubernetes docs](https://kubernetes.io/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/#finalizers) is /, while the format previously documented by Operator SDK docs was .. If your operator uses any finalizers with names matching the incorrect format, change them to match the official format. For example, finalizer.nifiusergroups.nifi.orange.com should be changed to nifiusergroups.nifi.orange.com/finalizer. +- [PR #87](https://github.com/Orange-OpenSource/nifikop/pull/87) - **[Operator/Configuration]** Since the v0.5.0 the operator doesn't catch certain resource changes, due to bad copy of resource, this issue avoid rolling upgrage trigger when some configuration changes. + ### Removed diff --git a/api/v1alpha1/common_types.go b/api/v1alpha1/common_types.go index cac9ec947..219c75ff2 100644 --- a/api/v1alpha1/common_types.go +++ b/api/v1alpha1/common_types.go @@ -91,21 +91,21 @@ type UserState string // ConfigmapReference states a reference to a data into a configmap type ConfigmapReference struct { // Name of the configmap that we want to refer. - Name string `json:"name"` + Name string `json:"name"` // Namespace where is located the secret that we want to refer. Namespace string `json:"namespace,omitempty"` // The key of the value,in data content, that we want use. - Data string `json:"data"` + Data string `json:"data"` } // SecretConfigReference states a reference to a data into a secret type SecretConfigReference struct { // Name of the configmap that we want to refer. - Name string `json:"name"` + Name string `json:"name"` // Namespace where is located the secret that we want to refer. Namespace string `json:"namespace,omitempty"` // The key of the value,in data content, that we want use. - Data string `json:"data"` + Data string `json:"data"` } // ClusterReference states a reference to a cluster for dataflow/registryclient/user diff --git a/api/v1alpha1/nificluster_types.go b/api/v1alpha1/nificluster_types.go index 7c5188326..724e1b9bb 100644 --- a/api/v1alpha1/nificluster_types.go +++ b/api/v1alpha1/nificluster_types.go @@ -25,11 +25,11 @@ import ( ) const ( - ClusterListenerType = "cluster" - HttpListenerType = "http" - HttpsListenerType = "https" - S2sListenerType = "s2s" - MetricsPort = 9020 + ClusterListenerType = "cluster" + HttpListenerType = "http" + HttpsListenerType = "https" + S2sListenerType = "s2s" + prometheusListenerType = "prometheus" ) // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! @@ -320,7 +320,7 @@ type SSLSecrets struct { // InternalListenerConfig defines the internal listener config for Nifi type InternalListenerConfig struct { - // +kubebuilder:validation:Enum={"cluster", "http", "https", "s2s"} + // +kubebuilder:validation:Enum={"cluster", "http", "https", "s2s", "prometheus"} // (Optional field) Type allow to specify if we are in a specific nifi listener // it's allowing to define some required information such as Cluster Port, // Http Port, Https Port or S2S port @@ -443,6 +443,15 @@ type NifiClusterStatus struct { RollingUpgrade RollingUpgradeStatus `json:"rollingUpgradeStatus,omitempty"` // RootProcessGroupId contains the uuid of the root process group for this cluster RootProcessGroupId string `json:"rootProcessGroupId,omitempty"` + // PrometheusReportingTask contains the status of the prometheus reporting task managed by the operator + PrometheusReportingTask PrometheusReportingTaskStatus `json:"prometheusReportingTask,omitempty"` +} + +type PrometheusReportingTaskStatus struct { + // The nifi reporting task's id + Id string `json:"id"` + // The last nifi reporting task revision version catched + Version int64 `json:"version"` } // +kubebuilder:object:root=true @@ -620,3 +629,16 @@ func (nProperties NifiProperties) GetAuthorizer() string { } return "managed-authorizer" } + +// +func (nSpec *NifiClusterSpec) GetMetricPort() *int { + + for _, iListener := range nSpec.ListenersConfig.InternalListeners { + if iListener.Type == prometheusListenerType { + val := int(iListener.ContainerPort) + return &val + } + } + + return nil +} diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 085c1c7f4..86e76a816 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -451,6 +451,7 @@ func (in *NifiClusterStatus) DeepCopyInto(out *NifiClusterStatus) { } } out.RollingUpgrade = in.RollingUpgrade + out.PrometheusReportingTask = in.PrometheusReportingTask } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NifiClusterStatus. @@ -1224,6 +1225,21 @@ func (in *PortConfig) DeepCopy() *PortConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PrometheusReportingTaskStatus) DeepCopyInto(out *PrometheusReportingTaskStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PrometheusReportingTaskStatus. +func (in *PrometheusReportingTaskStatus) DeepCopy() *PrometheusReportingTaskStatus { + if in == nil { + return nil + } + out := new(PrometheusReportingTaskStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ReadOnlyConfig) DeepCopyInto(out *ReadOnlyConfig) { *out = *in diff --git a/config/crd/bases/nifi.orange.com_nificlusters.yaml b/config/crd/bases/nifi.orange.com_nificlusters.yaml index 8b84cd489..7f24d22fa 100644 --- a/config/crd/bases/nifi.orange.com_nificlusters.yaml +++ b/config/crd/bases/nifi.orange.com_nificlusters.yaml @@ -1306,6 +1306,7 @@ spec: - http - https - s2s + - prometheus type: string required: - containerPort @@ -2445,10 +2446,16 @@ spec: that will replace the one produced based on template properties: data: + description: The key of the value,in data content, + that we want use. type: string name: + description: Name of the configmap that we want + to refer. type: string namespace: + description: Namespace where is located the secret + that we want to refer. type: string required: - data @@ -2460,10 +2467,16 @@ spec: and overrideConfigMap properties: data: + description: The key of the value,in data content, + that we want use. type: string name: + description: Name of the configmap that we want + to refer. type: string namespace: + description: Namespace where is located the secret + that we want to refer. type: string required: - data @@ -2483,10 +2496,16 @@ spec: and configuration properties: data: + description: The key of the value,in data content, + that we want use. type: string name: + description: Name of the configmap that we want + to refer. type: string namespace: + description: Namespace where is located the secret + that we want to refer. type: string required: - data @@ -2503,10 +2522,16 @@ spec: configurations, overrideConfigMap and overrideConfigs. properties: data: + description: The key of the value,in data content, + that we want use. type: string name: + description: Name of the configmap that we want + to refer. type: string namespace: + description: Namespace where is located the secret + that we want to refer. type: string required: - data @@ -2522,10 +2547,16 @@ spec: the one produced based on template properties: data: + description: The key of the value,in data content, + that we want use. type: string name: + description: Name of the configmap that we want + to refer. type: string namespace: + description: Namespace where is located the secret + that we want to refer. type: string required: - data @@ -2536,10 +2567,16 @@ spec: the one produced based on template and overrideConfigMap properties: data: + description: The key of the value,in data content, + that we want use. type: string name: + description: Name of the configmap that we want + to refer. type: string namespace: + description: Namespace where is located the secret + that we want to refer. type: string required: - data @@ -2563,10 +2600,16 @@ spec: and configuration properties: data: + description: The key of the value,in data content, + that we want use. type: string name: + description: Name of the configmap that we want + to refer. type: string namespace: + description: Namespace where is located the secret + that we want to refer. type: string required: - data @@ -2583,10 +2626,16 @@ spec: configurations, overrideConfigMap and overrideConfigs. properties: data: + description: The key of the value,in data content, + that we want use. type: string name: + description: Name of the configmap that we want + to refer. type: string namespace: + description: Namespace where is located the secret + that we want to refer. type: string required: - data @@ -2611,10 +2660,16 @@ spec: and configuration properties: data: + description: The key of the value,in data content, + that we want use. type: string name: + description: Name of the configmap that we want + to refer. type: string namespace: + description: Namespace where is located the secret + that we want to refer. type: string required: - data @@ -2631,10 +2686,16 @@ spec: configurations, overrideConfigMap and overrideConfigs. properties: data: + description: The key of the value,in data content, + that we want use. type: string name: + description: Name of the configmap that we want + to refer. type: string namespace: + description: Namespace where is located the secret + that we want to refer. type: string required: - data @@ -2681,10 +2742,15 @@ spec: that will replace the one produced based on template properties: data: + description: The key of the value,in data content, that + we want use. type: string name: + description: Name of the configmap that we want to refer. type: string namespace: + description: Namespace where is located the secret that + we want to refer. type: string required: - data @@ -2696,10 +2762,15 @@ spec: overrideConfigMap properties: data: + description: The key of the value,in data content, that + we want use. type: string name: + description: Name of the configmap that we want to refer. type: string namespace: + description: Namespace where is located the secret that + we want to refer. type: string required: - data @@ -2719,10 +2790,15 @@ spec: configuration properties: data: + description: The key of the value,in data content, that + we want use. type: string name: + description: Name of the configmap that we want to refer. type: string namespace: + description: Namespace where is located the secret that + we want to refer. type: string required: - data @@ -2739,10 +2815,15 @@ spec: overrideConfigMap and overrideConfigs. properties: data: + description: The key of the value,in data content, that + we want use. type: string name: + description: Name of the configmap that we want to refer. type: string namespace: + description: Namespace where is located the secret that + we want to refer. type: string required: - data @@ -2758,10 +2839,15 @@ spec: one produced based on template properties: data: + description: The key of the value,in data content, that + we want use. type: string name: + description: Name of the configmap that we want to refer. type: string namespace: + description: Namespace where is located the secret that + we want to refer. type: string required: - data @@ -2772,10 +2858,15 @@ spec: one produced based on template and overrideConfigMap properties: data: + description: The key of the value,in data content, that + we want use. type: string name: + description: Name of the configmap that we want to refer. type: string namespace: + description: Namespace where is located the secret that + we want to refer. type: string required: - data @@ -2798,10 +2889,15 @@ spec: will override the one produced based on template and configuration properties: data: + description: The key of the value,in data content, that + we want use. type: string name: + description: Name of the configmap that we want to refer. type: string namespace: + description: Namespace where is located the secret that + we want to refer. type: string required: - data @@ -2818,10 +2914,15 @@ spec: overrideConfigMap and overrideConfigs. properties: data: + description: The key of the value,in data content, that + we want use. type: string name: + description: Name of the configmap that we want to refer. type: string namespace: + description: Namespace where is located the secret that + we want to refer. type: string required: - data @@ -2846,10 +2947,15 @@ spec: configuration properties: data: + description: The key of the value,in data content, that + we want use. type: string name: + description: Name of the configmap that we want to refer. type: string namespace: + description: Namespace where is located the secret that + we want to refer. type: string required: - data @@ -2866,10 +2972,15 @@ spec: overrideConfigMap and overrideConfigs. properties: data: + description: The key of the value,in data content, that + we want use. type: string name: + description: Name of the configmap that we want to refer. type: string namespace: + description: Namespace where is located the secret that + we want to refer. type: string required: - data @@ -4041,6 +4152,21 @@ spec: type: object description: Store the state of each nifi node type: object + prometheusReportingTask: + description: PrometheusReportingTask contains the status of the prometheus + reporting task managed by the operator + properties: + id: + description: The nifi reporting task's id + type: string + version: + description: The last nifi reporting task revision version catched + format: int64 + type: integer + required: + - id + - version + type: object rollingUpgradeStatus: description: RollingUpgradeStatus defines status of rolling upgrade properties: diff --git a/config/samples/nifi_v1alpha1_nificluster.yaml b/config/samples/nifi_v1alpha1_nificluster.yaml index 259d40797..1f9d22fac 100644 --- a/config/samples/nifi_v1alpha1_nificluster.yaml +++ b/config/samples/nifi_v1alpha1_nificluster.yaml @@ -311,6 +311,9 @@ spec: containerPort: 9020 - name: "rsyslog" containerPort: 10001 + - type: "prometheus" + name: "prometheus" + containerPort: 9090 # sslSecrets contains information about ssl related kubernetes secrets if one of the # listener setting type set to ssl these fields must be populated to # sslSecrets: diff --git a/config/samples/simplenificluster.yaml b/config/samples/simplenificluster.yaml index a5fd5d411..82e0b22b1 100644 --- a/config/samples/simplenificluster.yaml +++ b/config/samples/simplenificluster.yaml @@ -49,6 +49,9 @@ spec: - type: "s2s" name: "s2s" containerPort: 10000 + - type: "prometheus" + name: "prometheus" + containerPort: 9090 externalServices: - name: "clusterip" spec: diff --git a/pkg/clientwrappers/reportingtask/reportingtask.go b/pkg/clientwrappers/reportingtask/reportingtask.go new file mode 100644 index 000000000..73a01f5d9 --- /dev/null +++ b/pkg/clientwrappers/reportingtask/reportingtask.go @@ -0,0 +1,179 @@ +package reportingtask + +import ( + "github.com/Orange-OpenSource/nifikop/api/v1alpha1" + "github.com/Orange-OpenSource/nifikop/pkg/clientwrappers" + "github.com/Orange-OpenSource/nifikop/pkg/common" + "github.com/Orange-OpenSource/nifikop/pkg/errorfactory" + "github.com/Orange-OpenSource/nifikop/pkg/nificlient" + nigoapi "github.com/erdrix/nigoapi/pkg/nifi" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "strconv" +) + +var log = ctrl.Log.WithName("reportingtaks-method") + +const ( + reportingTaskName = "managed-prometheus" + reportingTaskType_ = "org.apache.nifi.reporting.prometheus.PrometheusReportingTask" + reportingTaskEnpointPortProperty = "prometheus-reporting-task-metrics-endpoint-port" + reportingTaskStrategyProperty = "prometheus-reporting-task-metrics-strategy" + reportingTaskStrategy = "All Components" + reportingTaskSendJVMProperty = "prometheus-reporting-task-metrics-send-jvm" + reportingTaskSendJVM = "true" +) + +func ExistReportingTaks(client client.Client, cluster *v1alpha1.NifiCluster) (bool, error) { + + if cluster.Status.PrometheusReportingTask.Id == "" { + return false, nil + } + + nClient, err := common.NewNodeConnection(log, client, cluster) + if err != nil { + return false, err + } + + entity, err := nClient.GetReportingTask(cluster.Status.PrometheusReportingTask.Id) + if err := clientwrappers.ErrorGetOperation(log, err, "Get reporting-task"); err != nil { + if err == nificlient.ErrNifiClusterReturned404 { + return false, nil + } + return false, err + } + + return entity != nil, nil +} + +func CreateReportingTask(client client.Client, cluster *v1alpha1.NifiCluster) (*v1alpha1.PrometheusReportingTaskStatus, error) { + nClient, err := common.NewNodeConnection(log, client, cluster) + if err != nil { + return nil, err + } + + scratchEntity := nigoapi.ReportingTaskEntity{} + updateReportingTaskEntity(cluster, &scratchEntity) + + entity, err := nClient.CreateReportingTask(scratchEntity) + if err := clientwrappers.ErrorCreateOperation(log, err, "Create reporting-task"); err != nil { + return nil, err + } + + return &v1alpha1.PrometheusReportingTaskStatus{ + Id: entity.Id, + Version: *entity.Revision.Version, + }, nil +} + +func SyncReportingTask(client client.Client, cluster *v1alpha1.NifiCluster) (*v1alpha1.PrometheusReportingTaskStatus, error) { + + nClient, err := common.NewNodeConnection(log, client, cluster) + if err != nil { + return nil, err + } + + entity, err := nClient.GetReportingTask(cluster.Status.PrometheusReportingTask.Id) + if err := clientwrappers.ErrorGetOperation(log, err, "Get registry-client"); err != nil { + return nil, err + } + + if !reportingTaksIsSync(cluster, entity) { + status := entity.Status + + if status.ValidationStatus == "VALIDATING" { + return nil, errorfactory.NifiReportingTasksValidating{} + } + + if status.RunStatus == "RUNNING" { + entity, err = nClient.UpdateRunStatusReportingTask(entity.Id, nigoapi.ReportingTaskRunStatusEntity{ + Revision: entity.Revision, + State: "STOPPED", + }) + if err := clientwrappers.ErrorUpdateOperation(log, err, "Update reporting-task status"); err != nil { + return nil, err + } + } + + updateReportingTaskEntity(cluster, entity) + entity, err = nClient.UpdateReportingTask(*entity) + if err := clientwrappers.ErrorUpdateOperation(log, err, "Update reporting-task"); err != nil { + return nil, err + } + + } + + if entity.Status.ValidationStatus == "INVALID" { + return nil, errorfactory.NifiReportingTasksInvalid{} + } + + if entity.Status.RunStatus == "STOPPED" || entity.Status.RunStatus == "DISABLED" { + entity, err = nClient.UpdateRunStatusReportingTask(entity.Id, nigoapi.ReportingTaskRunStatusEntity{ + Revision: entity.Revision, + State: "RUNNING", + }) + if err := clientwrappers.ErrorUpdateOperation(log, err, "Update reporting-task status"); err != nil { + return nil, err + } + } + + status := cluster.Status.PrometheusReportingTask + status.Version = *entity.Revision.Version + status.Id = entity.Id + + return &status, nil +} + +func RemoveReportingTaks(client client.Client, cluster *v1alpha1.NifiCluster) error { + nClient, err := common.NewNodeConnection(log, client, cluster) + if err != nil { + return err + } + + entity, err := nClient.GetReportingTask(cluster.Status.PrometheusReportingTask.Id) + if err := clientwrappers.ErrorGetOperation(log, err, "Get reporting-task"); err != nil { + if err == nificlient.ErrNifiClusterReturned404 { + return nil + } + return err + } + + updateReportingTaskEntity(cluster, entity) + err = nClient.RemoveReportingTask(*entity) + + return clientwrappers.ErrorRemoveOperation(log, err, "Remove registry-client") +} + +func reportingTaksIsSync(cluster *v1alpha1.NifiCluster, entity *nigoapi.ReportingTaskEntity) bool { + return reportingTaskName == entity.Component.Name && + strconv.Itoa(*cluster.Spec.GetMetricPort()) == entity.Component.Properties[reportingTaskEnpointPortProperty] && + reportingTaskStrategy == entity.Component.Properties[reportingTaskStrategyProperty] && + reportingTaskSendJVM == entity.Component.Properties[reportingTaskSendJVMProperty] +} + +func updateReportingTaskEntity(cluster *v1alpha1.NifiCluster, entity *nigoapi.ReportingTaskEntity) { + + var defaultVersion int64 = 0 + + if entity == nil { + entity = &nigoapi.ReportingTaskEntity{} + } + + if entity.Component == nil { + entity.Revision = &nigoapi.RevisionDto{ + Version: &defaultVersion, + } + } + + if entity.Component == nil { + entity.Component = &nigoapi.ReportingTaskDto{} + } + + entity.Component.Name = "managed-prometheus" + entity.Component.Type_ = "org.apache.nifi.reporting.prometheus.PrometheusReportingTask" + entity.Component.Properties = map[string]string{ + reportingTaskEnpointPortProperty: strconv.Itoa(*cluster.Spec.GetMetricPort()), + reportingTaskStrategyProperty: reportingTaskStrategy, + reportingTaskSendJVMProperty: reportingTaskSendJVM, + } +} diff --git a/pkg/errorfactory/errorfactory.go b/pkg/errorfactory/errorfactory.go index 34ff32c1f..ccb47c036 100644 --- a/pkg/errorfactory/errorfactory.go +++ b/pkg/errorfactory/errorfactory.go @@ -85,6 +85,12 @@ type NifiFlowSyncing struct{ error } // NifiFlowScheduling states that the flow is still scheduling type NifiFlowScheduling struct{ error } +// NifiReportingTasksValidating states that the reporting task is still validating +type NifiReportingTasksValidating struct{ error } + +// NifiReportingTasksInvalid states that the reporting task is invalid +type NifiReportingTasksInvalid struct{ error } + // New creates a new error factory error func New(t interface{}, err error, msg string, wrapArgs ...interface{}) error { wrapped := errors.WrapIfWithDetails(err, msg, wrapArgs...) diff --git a/pkg/nificlient/client.go b/pkg/nificlient/client.go index 49f90f30a..69c4a4797 100644 --- a/pkg/nificlient/client.go +++ b/pkg/nificlient/client.go @@ -109,6 +109,13 @@ type NifiClient interface { UpdateAccessPolicy(entity nigoapi.AccessPolicyEntity) (*nigoapi.AccessPolicyEntity, error) RemoveAccessPolicy(entity nigoapi.AccessPolicyEntity) error + // Reportingtask func + GetReportingTask(id string) (*nigoapi.ReportingTaskEntity, error) + CreateReportingTask(entity nigoapi.ReportingTaskEntity) (*nigoapi.ReportingTaskEntity, error) + UpdateReportingTask(entity nigoapi.ReportingTaskEntity) (*nigoapi.ReportingTaskEntity, error) + UpdateRunStatusReportingTask(id string, entity nigoapi.ReportingTaskRunStatusEntity) (*nigoapi.ReportingTaskEntity, error) + RemoveReportingTask(entity nigoapi.ReportingTaskEntity) error + Build() error } diff --git a/pkg/nificlient/reportingtask.go b/pkg/nificlient/reportingtask.go new file mode 100644 index 000000000..058fd3580 --- /dev/null +++ b/pkg/nificlient/reportingtask.go @@ -0,0 +1,108 @@ +// Copyright 2020 Orange SA +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License.package apis + +package nificlient + +import ( + "strconv" + + "github.com/antihax/optional" + nigoapi "github.com/erdrix/nigoapi/pkg/nifi" +) + +func (n *nifiClient) GetReportingTask(id string) (*nigoapi.ReportingTaskEntity, error) { + // Get nigoapi client, favoring the one associated to the coordinator node. + client := n.privilegeCoordinatorClient() + if client == nil { + log.Error(ErrNoNodeClientsAvailable, "Error during creating node client") + return nil, ErrNoNodeClientsAvailable + } + + // Request on Nifi Rest API to get the reporting task informations + out, rsp, body, err := client.ReportingTasksApi.GetReportingTask(nil, id) + + if err := errorGetOperation(rsp, body, err); err != nil { + return nil, err + } + + return &out, nil +} + +func (n *nifiClient) CreateReportingTask(entity nigoapi.ReportingTaskEntity) (*nigoapi.ReportingTaskEntity, error) { + // Get nigoapi client, favoring the one associated to the coordinator node. + client := n.privilegeCoordinatorClient() + if client == nil { + log.Error(ErrNoNodeClientsAvailable, "Error during creating node client") + return nil, ErrNoNodeClientsAvailable + } + + // Request on Nifi Rest API to create the reporting task + out, rsp, body, err := client.ControllerApi.CreateReportingTask(nil, entity) + if err := errorCreateOperation(rsp, body, err); err != nil { + return nil, err + } + + return &out, nil +} + +func (n *nifiClient) UpdateReportingTask(entity nigoapi.ReportingTaskEntity) (*nigoapi.ReportingTaskEntity, error) { + // Get nigoapi client, favoring the one associated to the coordinator node. + client := n.privilegeCoordinatorClient() + if client == nil { + log.Error(ErrNoNodeClientsAvailable, "Error during creating node client") + return nil, ErrNoNodeClientsAvailable + } + + // Request on Nifi Rest API to update the reporting task + out, rsp, body, err := client.ReportingTasksApi.UpdateReportingTask(nil, entity.Id, entity) + if err := errorUpdateOperation(rsp, body, err); err != nil { + return nil, err + } + + return &out, nil +} + +func (n *nifiClient) UpdateRunStatusReportingTask(id string, entity nigoapi.ReportingTaskRunStatusEntity) (*nigoapi.ReportingTaskEntity, error) { + // Get nigoapi client, favoring the one associated to the coordinator node. + client := n.privilegeCoordinatorClient() + if client == nil { + log.Error(ErrNoNodeClientsAvailable, "Error during creating node client") + return nil, ErrNoNodeClientsAvailable + } + + // Request on Nifi Rest API to update the reporting task + out, rsp, body, err := client.ReportingTasksApi.UpdateRunStatus(nil, id, entity) + if err := errorUpdateOperation(rsp, body, err); err != nil { + return nil, err + } + + return &out, nil +} + +func (n *nifiClient) RemoveReportingTask(entity nigoapi.ReportingTaskEntity) error { + // Get nigoapi client, favoring the one associated to the coordinator node. + client := n.privilegeCoordinatorClient() + if client == nil { + log.Error(ErrNoNodeClientsAvailable, "Error during creating node client") + return ErrNoNodeClientsAvailable + } + + // Request on Nifi Rest API to remove the reporting task + _, rsp, body, err := client.ReportingTasksApi.RemoveReportingTask(nil, entity.Id, + &nigoapi.ReportingTasksApiRemoveReportingTaskOpts{ + Version: optional.NewString(strconv.FormatInt(*entity.Revision.Version, 10)), + }) + + return errorDeleteOperation(rsp, body, err) +} diff --git a/pkg/resources/nifi/nifi.go b/pkg/resources/nifi/nifi.go index 9f210e51f..dcfa9bba3 100644 --- a/pkg/resources/nifi/nifi.go +++ b/pkg/resources/nifi/nifi.go @@ -17,6 +17,7 @@ package nifi import ( "context" "fmt" + "github.com/Orange-OpenSource/nifikop/pkg/clientwrappers/reportingtask" "reflect" "strings" @@ -235,6 +236,12 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { } } + if r.NifiCluster.Spec.GetMetricPort() != nil { + if err := r.reconcilePrometheusReportingTask(log); err != nil { + return errors.WrapIf(err, "failed to reconcile ressource") + } + } + log.V(1).Info("Reconciled") return nil @@ -804,3 +811,39 @@ func (r *Reconciler) reconcileNifiUsersAndGroups(log logr.Logger) error { return nil } + +func (r *Reconciler) reconcilePrometheusReportingTask(log logr.Logger) error { + + var err error + + // Check if the NiFi reporting task already exist + exist, err := reportingtask.ExistReportingTaks(r.Client, r.NifiCluster) + if err != nil { + return errors.WrapIfWithDetails(err, "failure checking for existing prometheus reporting task") + } + + if !exist { + // Create reporting task + status, err := reportingtask.CreateReportingTask(r.Client, r.NifiCluster) + if err != nil { + return errors.WrapIfWithDetails(err, "failure creating prometheus reporting task") + } + + r.NifiCluster.Status.PrometheusReportingTask = *status + if err := r.Client.Status().Update(context.TODO(), r.NifiCluster); err != nil { + return errors.WrapIfWithDetails(err, "failed to update NifiRegistryClient status") + } + } + + // Sync prometheus reporting task resource with NiFi side component + status, err := reportingtask.SyncReportingTask(r.Client, r.NifiCluster) + if err != nil { + return errors.WrapIfWithDetails(err, "failed to sync NifiRegistryClient") + } + + r.NifiCluster.Status.PrometheusReportingTask = *status + if err := r.Client.Status().Update(context.TODO(), r.NifiCluster); err != nil { + return errors.WrapIfWithDetails(err, "failed to update NifiRegistryClient status") + } + return nil +} diff --git a/pkg/resources/nifi/pod.go b/pkg/resources/nifi/pod.go index c03ee3526..321cd9018 100644 --- a/pkg/resources/nifi/pod.go +++ b/pkg/resources/nifi/pod.go @@ -106,6 +106,15 @@ func (r *Reconciler) pod(id int32, nodeConfig *v1alpha1.NodeConfig, pvcs []corev return initContainers[i].Name < initContainers[j].Name }) + anntotationsToMerge := []map[string]string{ + nodeConfig.GetNodeAnnotations(), + r.NifiCluster.Spec.Pod.Annotations, + } + + if r.NifiCluster.Spec.GetMetricPort() != nil { + anntotationsToMerge = append(anntotationsToMerge, util.MonitoringAnnotations(*r.NifiCluster.Spec.GetMetricPort())) + } + // curl -kv --cert /var/run/secrets/java.io/keystores/client/tls.crt --key /var/run/secrets/java.io/keystores/client/tls.key https://nifi.trycatchlearn.fr:8433/nifi // curl -kv --cert /var/run/secrets/java.io/keystores/client/tls.crt --key /var/run/secrets/java.io/keystores/client/tls.key https://securenc-headless.external-dns-test.gcp.trycatchlearn.fr:8443/nifi-api/controller/cluster // keytool -import -noprompt -keystore /home/nifi/truststore.jks -file /var/run/secrets/java.io/keystores/server/ca.crt -storepass $(cat /var/run/secrets/java.io/keystores/server/password) -alias test1 @@ -117,11 +126,7 @@ func (r *Reconciler) pod(id int32, nodeConfig *v1alpha1.NodeConfig, pvcs []corev LabelsForNifi(r.NifiCluster.Name), map[string]string{"nodeId": fmt.Sprintf("%d", id)}, ), - util.MergeAnnotations( - nodeConfig.GetNodeAnnotations(), - util.MonitoringAnnotations(v1alpha1.MetricsPort), - r.NifiCluster.Spec.Pod.Annotations, - ), r.NifiCluster, + util.MergeAnnotations(anntotationsToMerge...), r.NifiCluster, ), Spec: corev1.PodSpec{ SecurityContext: &corev1.PodSecurityContext{ diff --git a/site/docs/5_references/1_nifi_cluster/6_listeners_config.md b/site/docs/5_references/1_nifi_cluster/6_listeners_config.md index 21043c240..571d350b7 100644 --- a/site/docs/5_references/1_nifi_cluster/6_listeners_config.md +++ b/site/docs/5_references/1_nifi_cluster/6_listeners_config.md @@ -18,6 +18,9 @@ ListenersConfig defines the Nifi listener types : - type: "s2s" name: "s2s" containerPort: 10000 + - type: "prometheus" + name: "prometheus" + containerPort: 9090 sslSecrets: tlsSecretName: "test-nifikop" create: true @@ -36,7 +39,7 @@ ListenersConfig defines the Nifi listener types : Field|Type|Description|Required|Default| |-----|----|-----------|--------|--------| -|type|enum{ "cluster", "http", "https", "s2s"}| allow to specify if we are in a specific nifi listener it's allowing to define some required information such as Cluster Port, Http Port, Https Port or S2S port| Yes | - | +|type|enum{ "cluster", "http", "https", "s2s", "prometheus"}| allow to specify if we are in a specific nifi listener it's allowing to define some required information such as Cluster Port, Http Port, Https Port, S2S or Prometheus port| Yes | - | |name|string| an identifier for the port which will be configured. | Yes | - | |containerPort|int32| the containerPort. | Yes | - |