Skip to content

Commit

Permalink
move kafka.go to its own package
Browse files Browse the repository at this point in the history
  • Loading branch information
nasark committed Oct 23, 2023
1 parent b3e62ac commit 3a51235
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package miqtools
package miqkafka

import (
miqv1alpha1 "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1"
Expand Down
9 changes: 5 additions & 4 deletions manageiq-operator/internal/controller/manageiq_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

miqv1alpha1 "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1"
cr_migration "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1/helpers/cr_migration"
miqkafka "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1/helpers/miq-components/kafka"
miqtool "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1/helpers/miq-components"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -524,21 +525,21 @@ func (r *ManageIQReconciler) generatePostgresqlResources(cr *miqv1alpha1.ManageI
}

func (r *ManageIQReconciler) generateKafkaResources(cr *miqv1alpha1.ManageIQ) error {
kafkaSubscription, mutateFunc := miqtool.KafkaInstall(cr, r.Scheme)
kafkaSubscription, mutateFunc := miqkafka.KafkaInstall(cr, r.Scheme)
if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaSubscription, mutateFunc); err != nil {
return err
} else if result != controllerutil.OperationResultNone {
logger.Info("Kafka Subscription has been reconciled", "result", result)
}

kafkaClusterCR, mutateFunc := miqtool.KafkaCluster(cr, r.Scheme)
kafkaClusterCR, mutateFunc := miqkafka.KafkaCluster(cr, r.Scheme)
if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaClusterCR, mutateFunc); err != nil {
return err
} else if result != controllerutil.OperationResultNone {
logger.Info("Kafka Cluster has been reconciled", "result", result)
}

kafkaUserCR, mutateFunc := miqtool.KafkaUser(cr, r.Scheme)
kafkaUserCR, mutateFunc := miqkafka.KafkaUser(cr, r.Scheme)
if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaUserCR, mutateFunc); err != nil {
return err
} else if result != controllerutil.OperationResultNone {
Expand All @@ -547,7 +548,7 @@ func (r *ManageIQReconciler) generateKafkaResources(cr *miqv1alpha1.ManageIQ) er

topics := []string{"messaging-health-check", "manageiq.ems", "manageiq.ems-events", "manageiq.ems-inventory", "manageiq.metrics"}
for i := 0; i < len(topics); i++ {
kafkaTopicCR, mutateFunc := miqtool.KafkaTopic(cr, r.Scheme, topics[i])
kafkaTopicCR, mutateFunc := miqkafka.KafkaTopic(cr, r.Scheme, topics[i])
if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaTopicCR, mutateFunc); err != nil {
return err
} else if result != controllerutil.OperationResultNone {
Expand Down

0 comments on commit 3a51235

Please sign in to comment.