diff --git a/config/samples/nifi.orange.com_v1alpha1_nificluster_demo.yaml b/config/samples/nifi.orange.com_v1alpha1_nificluster_demo.yaml new file mode 100644 index 0000000000..fbf7b73177 --- /dev/null +++ b/config/samples/nifi.orange.com_v1alpha1_nificluster_demo.yaml @@ -0,0 +1,215 @@ +apiVersion: nifi.orange.com/v1alpha1 +kind: NifiCluster +metadata: + name: mynifi +spec: + # headlessServiceEnabled specifies if the cluster should use headlessService for Nifi or individual services + # using service per nodes may come an handy case of service mesh. + headlessServiceEnabled: true + # zKAddresse specifies the ZooKeeper connection string + # in the form hostname:port where host and port are those of a Zookeeper server. + zkAddresse: "zookeepercluster-client.zookeeper:2181" + # zKPath specifies the Zookeeper chroot path as part + # of its Zookeeper connection string which puts its data under same path in the global ZooKeeper namespace. + zkPath: "/myNiFi" + # clusterImage can specify the whole nificluster image in one place + clusterImage: "apache/nifi:1.11.2" + # readOnlyConfig specifies the read-only type Nifi config cluster wide, all theses + # will be merged with node specified readOnly configurations, so it can be overwritten per node. + readOnlyConfig: + # NifiProperties configuration that will be applied to the node. + nifiProperties: + # Additionnals nifi.properties configuration that will override the one produced based + # on template and configurations. + overrideConfigs: | + nifi.ui.banner.text=NiFiKop by Orange + # Site to Site properties Secure mode : https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#site_to_site_properties + siteToSiteSecure: false + # Cluster nodes secure mode : https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#cluster_common_properties + clusterSecure: false + # A comma separated list of allowed HTTP Host header values to consider when NiFi + # is running securely and will be receiving requests to a different host[:port] than it is bound to. + # https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#web-properties +# webProxyHost: + # Nifi security client auth + needClientAuth: false + # Indicates which of the configured authorizers in the authorizers.xml file to use + # https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#authorizer-configuration +# authorizer: + # ZookeeperProperties configuration that will be applied to the node. + zookeeperProperties: + # Additionnals zookeeper.properties configuration that will override the one produced based + # on template and configurations. + overrideConfigs: | + initLimit=15 + autopurge.purgeInterval=24 + syncLimit=5 + tickTime=2000 + dataDir=./state/zookeeper + autopurge.snapRetainCount=30 + # BootstrapProperties configuration that will be applied to the node. + bootstrapProperties: + # JVM memory settings + nifiJvmMemory: "512m" + # Additionnals bootstrap.properties configuration that will override the one produced based + # on template and configurations. + # https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#bootstrap_properties + overrideConfigs: | + # java.arg.4=-Djava.net.preferIPv4Stack=true + # nodeConfigGroups specifies multiple node configs with unique name + nodeConfigGroups: + default_group: + # provenanceStorage allow to specify the maximum amount of data provenance information to store at a time + # https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#write-ahead-provenance-repository-properties + provenanceStorage: "10 GB" + #RunAsUser define the id of the user to run in the Nifi image + # +kubebuilder:validation:Minimum=1 + runAsUser: 1000 + # Set this to true if the instance is a node in a cluster. + # https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#basic-cluster-setup + isNode: true + # Docker image used by the operator to create the node associated + # https://hub.docker.com/r/apache/nifi/ +# image: "apache/nifi:1.11.2" + # nodeAffinity can be specified, operator populates this value if new pvc added later to node + # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#node-affinity +# nodeAffinity: + # storageConfigs specifies the node log related configs + storageConfigs: + # Name of the storage config, used to name PV to reuse into sidecars for example. + - name: provenance-repository + # Path where the volume will be mount into the main nifi container inside the pod. + mountPath: "/opt/nifi/provenance_repository" + # Kubernetes PVC spec + # https://kubernetes.io/docs/tasks/configure-pod-container/configure-persistent-volume-storage/#create-a-persistentvolumeclaim + pvcSpec: + accessModes: + - ReadWriteOnce + storageClassName: "standard" + resources: + requests: + storage: 10Gi + - mountPath: "/opt/nifi/nifi-current/logs" + name: logs + pvcSpec: + accessModes: + - ReadWriteOnce + storageClassName: "standard" + resources: + requests: + storage: 10Gi + # serviceAccountName specifies the serviceAccount used for this specific node + serviceAccountName: "default" + # resourceRequirements works exactly like Container resources, the user can specify the limit and the requests + # through this property + # https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/ + resourcesRequirements: + limits: + cpu: "2" + memory: 3Gi + requests: + cpu: "1" + memory: 1Gi + # imagePullSecrets specifies the secret to use when using private registry + # https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.11/#localobjectreference-v1-core +# imagePullSecrets: + # nodeSelector can be specified, which set the pod to fit on a node + # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#nodeselector +# nodeSelector: + # tolerations can be specified, which set the pod's tolerations + # https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/#concepts +# tolerations: + # Additionnal annotation to attach to the pod associated + # https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/#syntax-and-character-set +# nodeAnnotations: + # all node requires an image, unique id, and storageConfigs settings + nodes: + # Unique Node id + - id: 0 + # nodeConfigGroup can be used to ease the node configuration, if set no only the id is required + nodeConfigGroup: "default_group" + # readOnlyConfig can be used to pass Nifi node config + # which has type read-only these config changes will trigger rolling upgrade + readOnlyConfig: + nifiProperties: + overrideConfigs: | + nifi.ui.banner.text=NiFiKop by Orange - Node 0 + # node configuration +# nodeConfig: + - id: 2 + # readOnlyConfig can be used to pass Nifi node config + # which has type read-only these config changes will trigger rolling upgrade + readOnlyConfig: + overrideConfigs: | + nifi.ui.banner.text=NiFiKop by Orange - Node 2 + # node configuration + nodeConfig: + resourcesRequirements: + limits: + cpu: "2" + memory: 3Gi + requests: + cpu: "1" + memory: 1Gi + storageConfigs: + # Name of the storage config, used to name PV to reuse into sidecars for example. + - name: provenance-repository + # Path where the volume will be mount into the main nifi container inside the pod. + mountPath: "/opt/nifi/provenance_repository" + # Kubernetes PVC spec + # https://kubernetes.io/docs/tasks/configure-pod-container/configure-persistent-volume-storage/#create-a-persistentvolumeclaim + pvcSpec: + accessModes: + - ReadWriteOnce + storageClassName: "standard" + resources: + requests: + storage: 8Gi + # rollingUpgradeConfig specifies the rolling upgrade config for the cluster + rollingUpgradeConfig: + # failureThreshold states that how many errors can the cluster tolerate during rolling upgrade + failureThreshold: 10 + # oneNifiNodePerNode if set to true every nifi node is started on a new node, if there is not enough node to do that + # it will stay in pending state. If set to false the operator also tries to schedule the nifi node to a unique node + # but if the node number is insufficient the nifi node will be scheduled to a node where a nifi node is already running. + oneNifiNodePerNode: false + # + propagateLabels: true + # LdapConfiguration specifies the configuration if you want to use LDAP + ldapConfiguration: + # If set to true, we will enable ldap usage into nifi.properties configuration. + enabled: false + # Space-separated list of URLs of the LDAP servers (i.e. ldap://:). +# url: + # Base DN for searching for users (i.e. CN=Users,DC=example,DC=com). +# searchBase: + # Filter for searching for users against the 'User Search Base'. + # (i.e. sAMAccountName={0}). The user specified name is inserted into '{0}'. +# searchFilter: + # NifiClusterTaskSpec specifies the configuration of the nifi cluster Tasks + nifiClusterTaskSpec: + # retryDurationMinutes describes the amount of time the Operator waits for the task + retryDurationMinutes: 10 + # listenerConfig specifies nifi's listener specifig configs + listenersConfig: + # internalListeners specifies settings required to access nifi internally + internalListeners: + # (Optional field) Type allow to specify if we are in a specific nifi listener + # it's allowing to define some required information such as Cluster Port, + # Http Port, Https Port or S2S port + # {"cluster", "http", "https", "s2s"} + - type: "http" + # An identifier for the port which will be configured. + name: "http" + # The container port. + containerPort: 8080 + - type: "cluster" + name: "cluster" + containerPort: 6007 + - type: "s2s" + name: "s2s" + containerPort: 10000 + - name: "prometheus" + containerPort: 9020 + - name: "rsyslog" + containerPort: 10001 \ No newline at end of file diff --git a/deploy/crds/nifi.orange.com_nificlusters_crd.yaml b/deploy/crds/nifi.orange.com_nificlusters_crd.yaml index c35828d882..9e8a1fdebb 100644 --- a/deploy/crds/nifi.orange.com_nificlusters_crd.yaml +++ b/deploy/crds/nifi.orange.com_nificlusters_crd.yaml @@ -353,7 +353,7 @@ spec: description: provenanceStorage allow to specify the maximum amount of data provenance information to store at a time https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#write-ahead-provenance-repository-properties type: string - resourceRequirements: + resourcesRequirements: description: resourceRequirements works exactly like Container resources, the user can specify the limit and the requests through this property @@ -833,7 +833,7 @@ spec: amount of data provenance information to store at a time https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#write-ahead-provenance-repository-properties type: string - resourceRequirements: + resourcesRequirements: description: resourceRequirements works exactly like Container resources, the user can specify the limit and the requests through this property diff --git a/nifi-operator.env b/nifi-operator.env index 7d61302cf8..d057f175af 100644 --- a/nifi-operator.env +++ b/nifi-operator.env @@ -34,24 +34,24 @@ MYNIFI_SERVICE_PORT_HTTP=8080 MYNIFI_SERVICE_PORT_PROMETHEUS=9020 MYNIFI_SERVICE_PORT_RSYSLOG=10001 MYNIFI_SERVICE_PORT_S2S=10000 -NIFIKOP_METRICS_PORT=tcp://10.39.254.58:8383 -NIFIKOP_METRICS_PORT_8383_TCP=tcp://10.39.254.58:8383 -NIFIKOP_METRICS_PORT_8383_TCP_ADDR=10.39.254.58 +NIFIKOP_METRICS_PORT=tcp://10.39.255.70:8383 +NIFIKOP_METRICS_PORT_8383_TCP=tcp://10.39.255.70:8383 +NIFIKOP_METRICS_PORT_8383_TCP_ADDR=10.39.255.70 NIFIKOP_METRICS_PORT_8383_TCP_PORT=8383 NIFIKOP_METRICS_PORT_8383_TCP_PROTO=tcp -NIFIKOP_METRICS_PORT_8686_TCP=tcp://10.39.254.58:8686 -NIFIKOP_METRICS_PORT_8686_TCP_ADDR=10.39.254.58 +NIFIKOP_METRICS_PORT_8686_TCP=tcp://10.39.255.70:8686 +NIFIKOP_METRICS_PORT_8686_TCP_ADDR=10.39.255.70 NIFIKOP_METRICS_PORT_8686_TCP_PORT=8686 NIFIKOP_METRICS_PORT_8686_TCP_PROTO=tcp -NIFIKOP_METRICS_SERVICE_HOST=10.39.254.58 +NIFIKOP_METRICS_SERVICE_HOST=10.39.255.70 NIFIKOP_METRICS_SERVICE_PORT=8383 NIFIKOP_METRICS_SERVICE_PORT_CR_METRICS=8686 NIFIKOP_METRICS_SERVICE_PORT_HTTP_METRICS=8383 OPERATOR_NAME=nifikop -POD_NAME=nifikop-8ddfad0ea07847e794e8b9e50ee59196-584d4ff945-qp7zm +POD_NAME=nifikop-30f47286730947c398dc5f2d11a752cc-6674c7bc6c-qx2sq TELEPRESENCE_CONTAINER=nifikop TELEPRESENCE_CONTAINER_NAMESPACE=nifi-demo TELEPRESENCE_MOUNTS=/var/run/secrets/kubernetes.io/serviceaccount -TELEPRESENCE_POD=nifikop-8ddfad0ea07847e794e8b9e50ee59196-584d4ff945-qp7zm +TELEPRESENCE_POD=nifikop-30f47286730947c398dc5f2d11a752cc-6674c7bc6c-qx2sq TELEPRESENCE_ROOT=/tmp/known WATCH_NAMESPACE=nifi-demo diff --git a/pkg/apis/nifi/v1alpha1/nificluster_types.go b/pkg/apis/nifi/v1alpha1/nificluster_types.go index 3e8ffd80cd..c141b00b14 100644 --- a/pkg/apis/nifi/v1alpha1/nificluster_types.go +++ b/pkg/apis/nifi/v1alpha1/nificluster_types.go @@ -155,7 +155,7 @@ type NodeConfig struct { ServiceAccountName string `json:"serviceAccountName,omitempty"` // resourceRequirements works exactly like Container resources, the user can specify the limit and the requests // through this property - ResourcesRequirements *corev1.ResourceRequirements `json:"resourceRequirements,omitempty"` + ResourcesRequirements *corev1.ResourceRequirements `json:"resourcesRequirements,omitempty"` // imagePullSecrets specifies the secret to use when using private registry ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"` // nodeSelector can be specified, which set the pod to fit on a node diff --git a/pkg/controller/nificlustertask/nificlustertask_controller.go b/pkg/controller/nificlustertask/nificlustertask_controller.go index ff7e5d9a08..1e82227f03 100644 --- a/pkg/controller/nificlustertask/nificlustertask_controller.go +++ b/pkg/controller/nificlustertask/nificlustertask_controller.go @@ -250,8 +250,8 @@ func (r *ReconcileNifiClusterTask) handlePodRunningTask(nifiCluster *v1alpha1.Ni // If node is disconnected, performing offload if nifiCluster.Status.NodesState[nodeId].GracefulActionState.ActionStep == v1alpha1.DisconnectStatus { - actionStep, taskStartTime, err := scale.OffloadClusterNode(nifiCluster.Spec.HeadlessServiceEnabled, nifiCluster.Spec.Nodes, nifiResources.GetServerPort(&nifiCluster.Spec.ListenersConfig), nodeId, - nifiCluster.Namespace, nifiCluster.Name) + actionStep, taskStartTime, err := scale.OffloadClusterNode(nifiCluster.Spec.HeadlessServiceEnabled, nifiCluster.Spec.Nodes, + nifiResources.GetServerPort(&nifiCluster.Spec.ListenersConfig), nodeId, nifiCluster.Namespace, nifiCluster.Name) if err != nil { log.Info(fmt.Sprintf("nifi cluster communication error during removing node id: %s", nodeId)) return errorfactory.New(errorfactory.NifiClusterNotReady{}, err, fmt.Sprintf("node id: %s", nodeId)) diff --git a/pkg/resources/nifi/nifi.go b/pkg/resources/nifi/nifi.go index c0c2bfe997..784e6159d6 100644 --- a/pkg/resources/nifi/nifi.go +++ b/pkg/resources/nifi/nifi.go @@ -288,6 +288,14 @@ OUTERLOOP: } } } + + // TODO: Ensure usage and needing + err = scale.EnsureRemovedNodes(r.NifiCluster.Spec.HeadlessServiceEnabled, r.NifiCluster.Spec.Nodes, r.NifiCluster.Status.NodesState, + GetServerPort(&r.NifiCluster.Spec.ListenersConfig), r.NifiCluster.Namespace, r.NifiCluster.Name) + if err != nil { + return err + } + return nil } diff --git a/pkg/scale/scale.go b/pkg/scale/scale.go index a9777b1dd7..acff63e6bf 100644 --- a/pkg/scale/scale.go +++ b/pkg/scale/scale.go @@ -20,6 +20,7 @@ import ( "fmt" "github.com/openshift/origin/Godeps/_workspace/src/k8s.io/kubernetes/pkg/util/json" "github.com/orangeopensource/nifikop/pkg/apis/nifi/v1alpha1" + "github.com/orangeopensource/nifikop/pkg/util" nifiutil "github.com/orangeopensource/nifikop/pkg/util/nifi" "io/ioutil" "net/http" @@ -208,34 +209,34 @@ func getNifiNodeIdFromAddress(headlessServiceEnabled bool, nodeId, serverPort in return targetNodeId, nil } -func getNifiClusterCoordinatorNode(headlessServiceEnabled bool, availableNodes []v1alpha1.Node, serverPort int32, namespace, clusterName string) (v1alpha1.Node, error) { +func getNifiClusterCoordinatorNode(headlessServiceEnabled bool, availableNodeIds []int32, serverPort int32, namespace, clusterName string) (*int32, error) { var err error - for _, n := range availableNodes { - clusterStatus, err := GetNifiClusterNodesStatus(headlessServiceEnabled, n.Id, serverPort, namespace, clusterName) - searchedAddress := nifiutil.ComputeHostname(headlessServiceEnabled, n.Id, clusterName, namespace) + for _, nodeId := range availableNodeIds { + clusterStatus, err := GetNifiClusterNodesStatus(headlessServiceEnabled, nodeId, serverPort, namespace, clusterName) + searchedAddress := nifiutil.ComputeHostname(headlessServiceEnabled, nodeId, clusterName, namespace) if err == nil { for _, node := range clusterStatus["cluster"].(map[string]interface{})["nodes"].([]interface{}){ roles := node.(map[string]interface{})["roles"].([]interface{}) address := node.(map[string]interface{})["address"].(string) if address == searchedAddress && len(roles) >= 2 && (roles[0].(string) == clusterCoordinator || roles[1].(string) == clusterCoordinator) { - return n, nil + return &nodeId, nil } } } } - return v1alpha1.Node{}, err + return nil, err } // DownsizeCluster downscales Nifi cluster func DisconnectClusterNode(headlessServiceEnabled bool, availableNodes []v1alpha1.Node, serverPort int32, nodeId, namespace, clusterName string) (v1alpha1.ActionStep, string, error) { - var node v1alpha1.Node var err error var rsp map[string]interface{} + // Look for available nifi node. - node, err = getNifiClusterCoordinatorNode(headlessServiceEnabled, availableNodes, serverPort, namespace, clusterName) + coordinatorNodeId, err := getNifiClusterCoordinatorNode(headlessServiceEnabled, generateNodeIdSlice(availableNodes), serverPort, namespace, clusterName) - if &node == nil { + if coordinatorNodeId == nil { return "", "", err } @@ -246,12 +247,12 @@ func DisconnectClusterNode(headlessServiceEnabled bool, availableNodes []v1alpha int32NodeId := int32(intNodeId) searchedAddress := nifiutil.ComputeHostname(headlessServiceEnabled, int32NodeId, clusterName, namespace) - targetNodeId, err := getNifiNodeIdFromAddress(headlessServiceEnabled, node.Id, serverPort, namespace, clusterName, searchedAddress) + targetNodeId, err := getNifiNodeIdFromAddress(headlessServiceEnabled, *coordinatorNodeId, serverPort, namespace, clusterName, searchedAddress) if err != nil { return "", "", err } - rsp, err = GetNifiClusterNodeStatus(headlessServiceEnabled, node.Id, serverPort, namespace, clusterName, targetNodeId) + rsp, err = GetNifiClusterNodeStatus(headlessServiceEnabled, *coordinatorNodeId, serverPort, namespace, clusterName, targetNodeId) if err != nil { return "", "", err @@ -268,7 +269,7 @@ func DisconnectClusterNode(headlessServiceEnabled bool, availableNodes []v1alpha } // Disconnect node - dResp, err = putNifiNode(headlessServiceEnabled, node.Id, serverPort, fmt.Sprintf(endpointNode, targetNodeId), namespace, clusterName, string(v1alpha1.DisconnectNodeAction), targetNodeId) + dResp, err = putNifiNode(headlessServiceEnabled, *coordinatorNodeId, serverPort, fmt.Sprintf(endpointNode, targetNodeId), namespace, clusterName, string(v1alpha1.DisconnectNodeAction), targetNodeId) if err != nil && err != errNifiClusterNotReturned200 { log.Error(err, "Disconnect cluster gracefully failed since Nifi node returned non 200") return "", "", err @@ -285,13 +286,12 @@ func DisconnectClusterNode(headlessServiceEnabled bool, availableNodes []v1alpha } func OffloadClusterNode(headlessServiceEnabled bool, availableNodes []v1alpha1.Node, serverPort int32, nodeId, namespace, clusterName string) (v1alpha1.ActionStep, string, error) { - var node v1alpha1.Node var err error // Look for available nifi node. - node, err = getNifiClusterCoordinatorNode(headlessServiceEnabled, availableNodes, serverPort, namespace, clusterName) + coordinatorNodeId, err := getNifiClusterCoordinatorNode(headlessServiceEnabled, generateNodeIdSlice(availableNodes), serverPort, namespace, clusterName) - if &node == nil { + if coordinatorNodeId == nil { return "", "", err } @@ -302,13 +302,13 @@ func OffloadClusterNode(headlessServiceEnabled bool, availableNodes []v1alpha1.N int32NodeId := int32(intNodeId) searchedAddress := nifiutil.ComputeHostname(headlessServiceEnabled, int32NodeId, clusterName, namespace) - targetNodeId, err := getNifiNodeIdFromAddress(headlessServiceEnabled, node.Id, serverPort, namespace, clusterName, searchedAddress) + targetNodeId, err := getNifiNodeIdFromAddress(headlessServiceEnabled, *coordinatorNodeId, serverPort, namespace, clusterName, searchedAddress) if err != nil { return "", "", err } // Offload node - dResp, err = putNifiNode(headlessServiceEnabled, node.Id, serverPort, fmt.Sprintf(endpointNode, targetNodeId), namespace, clusterName, string(v1alpha1.OffloadNodeAction), targetNodeId) + dResp, err = putNifiNode(headlessServiceEnabled, *coordinatorNodeId, serverPort, fmt.Sprintf(endpointNode, targetNodeId), namespace, clusterName, string(v1alpha1.OffloadNodeAction), targetNodeId) if err != nil && err != errNifiClusterNotReturned200 { log.Error(err, "Offload node gracefully failed since Nifi node returned non 200") return "", "", err @@ -325,13 +325,12 @@ func OffloadClusterNode(headlessServiceEnabled bool, availableNodes []v1alpha1.N } func RemoveClusterNode(headlessServiceEnabled bool, availableNodes []v1alpha1.Node, serverPort int32, nodeId, namespace, clusterName string) (v1alpha1.ActionStep, string, error) { - var node v1alpha1.Node var err error // Look for available nifi node. - node, err = getNifiClusterCoordinatorNode(headlessServiceEnabled, availableNodes, serverPort, namespace, clusterName) + coordinatorNodeId, err := getNifiClusterCoordinatorNode(headlessServiceEnabled, generateNodeIdSlice(availableNodes), serverPort, namespace, clusterName) - if &node == nil { + if coordinatorNodeId == nil { return "", "", err } @@ -343,13 +342,13 @@ func RemoveClusterNode(headlessServiceEnabled bool, availableNodes []v1alpha1.No int32NodeId := int32(intNodeId) searchedAddress := nifiutil.ComputeHostname(headlessServiceEnabled, int32NodeId, clusterName, namespace) - targetNodeId, err := getNifiNodeIdFromAddress(headlessServiceEnabled, node.Id, serverPort, namespace, clusterName, searchedAddress) + targetNodeId, err := getNifiNodeIdFromAddress(headlessServiceEnabled, *coordinatorNodeId, serverPort, namespace, clusterName, searchedAddress) if err != nil { return "", "", err } // Remove node - dResp, err = deleteNifiNode(headlessServiceEnabled, node.Id, serverPort, fmt.Sprintf(endpointNode, targetNodeId), namespace, clusterName) + dResp, err = deleteNifiNode(headlessServiceEnabled, *coordinatorNodeId, serverPort, fmt.Sprintf(endpointNode, targetNodeId), namespace, clusterName) if err == errNifiClusterReturned404 { currentTime := time.Now() return v1alpha1.RemoveNodeAction, currentTime.Format("Mon, 2 Jan 2006 15:04:05 GMT"), nil @@ -371,15 +370,52 @@ func RemoveClusterNode(headlessServiceEnabled bool, availableNodes []v1alpha1.No return actionStep, startTimeStamp, nil } +func EnsureRemovedNodes(headlessServiceEnabled bool, availableNodes []v1alpha1.Node, availableNodesState map[string]v1alpha1.NodeState, serverPort int32, namespace, clusterName string) (error) { + // Look for available nifi node. + coordinatorNodeId, err := getNifiClusterCoordinatorNode(headlessServiceEnabled, generateNodeIdSlice(availableNodes), serverPort, namespace, clusterName) + + if coordinatorNodeId == nil { + return err + } + + clusterStatus, err := GetNifiClusterNodesStatus(headlessServiceEnabled, *coordinatorNodeId, serverPort, namespace, clusterName) + + var availableAdresses []string + for _, nodeId := range generateNodeStateIdSlice(availableNodesState) { + availableAdresses = append(availableAdresses, nifiutil.ComputeHostname(headlessServiceEnabled, nodeId, clusterName, namespace)) + } + + for _, node := range clusterStatus["cluster"].(map[string]interface{})["nodes"].([]interface{}){ + address := node.(map[string]interface{})["address"].(string) + if !util.StringSliceContains(availableAdresses, address) { + targetNodeId := node.(map[string]interface{})["nodeId"].(string) + + _, err := deleteNifiNode(headlessServiceEnabled, *coordinatorNodeId, serverPort, fmt.Sprintf(endpointNode, targetNodeId), namespace, clusterName) + if err == errNifiClusterReturned404 { + return nil + } + + if err != nil && err != errNifiClusterNotReturned200 { + log.Error(err, "Remove node from cluster failed") + return err + } + if err == errNifiClusterNotReturned200 { + log.Error(err, "could not communicate with nifi node") + return err + } + } + } + return nil +} + // func ConnectClusterNode(headlessServiceEnabled bool, availableNodes []v1alpha1.Node, serverPort int32, nodeId, namespace, clusterName string) (v1alpha1.ActionStep, string, error) { - var node v1alpha1.Node var err error // Look for available nifi node. - node, err = getNifiClusterCoordinatorNode(headlessServiceEnabled, availableNodes, serverPort, namespace, clusterName) + coordinatorNodeId, err := getNifiClusterCoordinatorNode(headlessServiceEnabled, generateNodeIdSlice(availableNodes), serverPort, namespace, clusterName) - if &node == nil { + if coordinatorNodeId == nil { return "", "", err } @@ -390,13 +426,13 @@ func ConnectClusterNode(headlessServiceEnabled bool, availableNodes []v1alpha1.N int32NodeId := int32(intNodeId) searchedAddress := nifiutil.ComputeHostname(headlessServiceEnabled, int32NodeId, clusterName, namespace) - targetNodeId, err := getNifiNodeIdFromAddress(headlessServiceEnabled, node.Id, serverPort, namespace, clusterName, searchedAddress) + targetNodeId, err := getNifiNodeIdFromAddress(headlessServiceEnabled, *coordinatorNodeId, serverPort, namespace, clusterName, searchedAddress) if err != nil { return "", "", err } // Connect node - dResp, err = putNifiNode(headlessServiceEnabled, node.Id, serverPort, fmt.Sprintf(endpointNode, targetNodeId), namespace, clusterName, string(v1alpha1.ConnectNodeAction), targetNodeId) + dResp, err = putNifiNode(headlessServiceEnabled, *coordinatorNodeId, serverPort, fmt.Sprintf(endpointNode, targetNodeId), namespace, clusterName, string(v1alpha1.ConnectNodeAction), targetNodeId) if err != nil && err != errNifiClusterNotReturned200 { log.Error(err, "Connect node gracefully failed since Nifi node returned non 200") return "", "", err @@ -415,13 +451,12 @@ func ConnectClusterNode(headlessServiceEnabled bool, availableNodes []v1alpha1.N // CheckIfCCTaskFinished checks whether the given CC Task ID finished or not // headlessServiceEnabled bool, availableNodes []v1alpha1.Node, serverPort int32, nodeId, namespace, clusterName string func CheckIfNCActionStepFinished(headlessServiceEnabled bool, availableNodes []v1alpha1.Node, serverPort int32, actionStep v1alpha1.ActionStep, nodeId, namespace, clusterName string) (bool, error) { - var node v1alpha1.Node var err error // Look for available nifi node. - node, err = getNifiClusterCoordinatorNode(headlessServiceEnabled, availableNodes, serverPort, namespace, clusterName) + coordinatorNodeId, err := getNifiClusterCoordinatorNode(headlessServiceEnabled, generateNodeIdSlice(availableNodes), serverPort, namespace, clusterName) - if &node == nil { + if coordinatorNodeId == nil { return false, err } @@ -432,12 +467,12 @@ func CheckIfNCActionStepFinished(headlessServiceEnabled bool, availableNodes []v int32NodeId := int32(intNodeId) searchedAddress := nifiutil.ComputeHostname(headlessServiceEnabled, int32NodeId, clusterName, namespace) - targetNodeId, err := getNifiNodeIdFromAddress(headlessServiceEnabled, node.Id, serverPort, namespace, clusterName, searchedAddress) + targetNodeId, err := getNifiNodeIdFromAddress(headlessServiceEnabled, *coordinatorNodeId, serverPort, namespace, clusterName, searchedAddress) if err != nil { return false, err } - dResp, err = GetNifiClusterNodeStatus(headlessServiceEnabled, node.Id, serverPort, namespace, clusterName, targetNodeId) + dResp, err = GetNifiClusterNodeStatus(headlessServiceEnabled, *coordinatorNodeId, serverPort, namespace, clusterName, targetNodeId) if err == errNifiClusterReturned404 && actionStep == v1alpha1.RemoveNodeAction { return true, nil @@ -466,3 +501,24 @@ func CheckIfNCActionStepFinished(headlessServiceEnabled bool, availableNodes []v return false, nil } + +func generateNodeIdSlice(nodes []v1alpha1.Node) []int32 { + var nodeIdsSlice []int32 + + for _, node := range nodes { + nodeIdsSlice = append(nodeIdsSlice, node.Id) + } + return nodeIdsSlice +} + +func generateNodeStateIdSlice(nodesState map[string]v1alpha1.NodeState) []int32 { + var nodeIdsSlice []int32 + + for nodeId, _ := range nodesState { + intNodeId, _ := strconv.ParseInt(nodeId,10, 32) + int32NodeId := int32(intNodeId) + + nodeIdsSlice = append(nodeIdsSlice, int32NodeId) + } + return nodeIdsSlice +} \ No newline at end of file