From d4cbb88c440e6d8875b51ddbafc312badd076ea6 Mon Sep 17 00:00:00 2001
From: Masterchen09 <13187726+Masterchen09@users.noreply.github.com.>
Date: Tue, 27 Feb 2024 12:22:20 +0100
Subject: [PATCH] feat: add configuration options for Kafka consumer groups

---
 charts/datahub/Chart.yaml                     |  4 ++--
 charts/datahub/README.md                      | 12 +++++++++++
 .../acryl-datahub-actions/Chart.yaml          |  2 +-
 .../templates/deployment.yaml                 |  8 ++++++++
 .../acryl-datahub-actions/values.yaml         |  2 +-
 .../datahub/subcharts/datahub-gms/Chart.yaml  |  2 +-
 .../datahub-gms/templates/deployment.yaml     | 20 +++++++++++++++++--
 .../templates/deployment.yaml                 | 14 +++++++++++--
 .../templates/deployment.yaml                 | 12 +++++++++--
 charts/datahub/values.yaml                    | 15 +++++++++++++-
 10 files changed, 79 insertions(+), 12 deletions(-)

diff --git a/charts/datahub/Chart.yaml b/charts/datahub/Chart.yaml
index 14c32845a..9051fc2e7 100644
--- a/charts/datahub/Chart.yaml
+++ b/charts/datahub/Chart.yaml
@@ -4,13 +4,13 @@ description: A Helm chart for LinkedIn DataHub
 type: application
 # This is the chart version. This version number should be incremented each time you make changes
 # to the chart and its templates, including the app version.
-version: 0.4.14
+version: 0.4.15
 # This is the version number of the application being deployed. This version number should be
 # incremented each time you make changes to the application.
 appVersion: 0.13.2
 dependencies:
   - name: datahub-gms
-    version: 0.2.169
+    version: 0.2.170
     repository: file://./subcharts/datahub-gms
     condition: datahub-gms.enabled
   - name: datahub-frontend
diff --git a/charts/datahub/README.md b/charts/datahub/README.md
index e23b2ccfd..3f4d2d5a9 100644
--- a/charts/datahub/README.md
+++ b/charts/datahub/README.md
@@ -116,6 +116,17 @@ helm install datahub datahub/datahub --values <<path-to-values-file>>
 | global.kafka.bootstrap.server                                  | string  | `"prerequisites-broker:9092"`                                                                    | Kafka bootstrap servers (with port)                                                                                                                     |
 | global.kafka.zookeeper.server                                  | string  | `"prerequisites-zookeeper:2181"`                                                                 | Kafka zookeeper servers (with port)                                                                                                                     |
 | global.kafka.consumer.stopContainerOnDeserializationError      | boolean | `true`                                                                                           | Determines whether or not to halt progress when encountering a deserialization error, halting prevents data loss but prevents progress until fixed      |
+| global.kafka.consumer_groups.datahub_upgrade_history_kafka_consumer_group_id.gms | string  | `"<<release-name>>-duhe-consumer-job-client-gms"`                              | Consumer group id for consuming DataHub Upgrade history events by the GMS                                                                                |
+| global.kafka.consumer_groups.datahub_upgrade_history_kafka_consumer_group_id.mae-consumer | string  | `"<<release-name>>-duhe-consumer-job-client-mcl"`                     | Consumer group id for consuming DataHub Upgrade history events by the MAE consumer          
+| global.kafka.consumer_groups.datahub_upgrade_history_kafka_consumer_group_id.mce-consumer | string  | `"<<release-name>>-duhe-consumer-job-client-mcp"`                     | Consumer group id for consuming DataHub Upgrade history events by the MCE consumer                                                                       |
+| global.kafka.consumer_groups.datahub_actions_ingestion_executor_consumer_group_id | string  | `"ingestion_executor"`                                                        | Consumer group id for consuming events by DataHub Actions ingestion executor pipeline                                                                    |
+| global.kafka.consumer_groups.datahub_actions_slack_consumer_group_id | string  | `"datahub_slack_action"`                                                                   | Consumer group id for consuming events by DataHub Actions slack action pipeline                                                                          |
+| global.kafka.consumer_groups.datahub_actions_teams_consumer_group_id | string  | `"datahub_teams_action"`                                                                   | Consumer group id for consuming events by DataHub Actions teams action pipeline                                                                          |
+| global.kafka.consumer_groups.datahub_usage_event_kafka_consumer_group_id | string  | `"datahub-usage-event-consumer-job-client"`                                            | Consumer group id for consuming DataHub Usage events by the GMS or MAE consumer                                                                          |
+| global.kafka.consumer_groups.metadata_change_log_kafka_consumer_group_id | string  | `"generic-mae-consumer-job-client"`                                                    | Consumer group id for consuming Metadata Change Log events by the GMS or MAE consumer                                                                    |
+| global.kafka.consumer_groups.platform_event_kafka_consumer_group_id | string  | `"generic-platform-event-job-client"`                                                       | Consumer group id for consuming Platform events by the GMS or MAE consumer                                                                               |
+| global.kafka.consumer_groups.metadata_change_event_kafka_consumer_group_id | string  | `"mce-consumer-job-client"`                                                          | Consumer group id for consuming Metadata Change events by the GMS or MCE consumer                                                                        |
+| global.kafka.consumer_groups.metadata_change_proposal_kafka_consumer_group_id | string  | `"generic-mce-consumer-job-client"`                                               | Consumer group id for consuming Metadata Change Proposal events by the GMS or MCE consumer                                                               |
 | global.kafka.topics.metadata_change_event_name                 | string  | `"MetadataChangeEvent_v4"`                                                                       | Kafka topic name for Metadata Change Events (deprecated)                                                                                                |
 | global.kafka.topics.failed_metadata_change_event_name          | string  | `"FailedMetadataChangeEvent_v4"`                                                                 | Kafka topic name for Failed Metadata Change events (deprecated)                                                                                         |
 | global.kafka.topics.metadata_audit_event_name                  | string  | `"MetadataAuditEvent_v4"`                                                                        | Kafka topic name for Metadata Audit events (deprecated)                                                                                                 |
@@ -125,6 +136,7 @@ helm install datahub datahub/datahub --values <<path-to-values-file>>
 | global.kafka.topics.metadata_change_log_versioned_topic_name   | string  | `"MetadataChangeLog_Versioned_v1"`                                                               | Kafka topic name for Versioned Metadata Change Log events                                                                                               |
 | global.kafka.topics.metadata_change_log_timeseries_topic_name  | string  | `"MetadataChangeLog_Timeseries_v1"`                                                              | Kafka topic name for Timeseries Metadata Change Log events                                                                                              |
 | global.kafka.topics.platform_event_topic_name                  | string  | `"PlatformEvent_v1"`                                                                             | Kafka topic name for Platform events                                                                                                                    |
+| global.kafka.topics.datahub_upgrade_history_topic_name         | string  | `"DataHubUpgradeHistory_v1"`                                                                     | Kafka topic name for DataHub Upgrade History events                                                                                                     |
 | global.kafka.schemaregistry.url                                | string  | ``                                                                                               | URL to kafka schema registry if using `KAFKA` type                                                                                                      |
 | global.neo4j.host                                              | string  | `"prerequisites-neo4j:7474"`                                                                     | Neo4j host address (with port)                                                                                                                          |
 | global.neo4j.uri                                               | string  | `"bolt://prerequisites-neo4j"`                                                                   | Neo4j URI                                                                                                                                               |
diff --git a/charts/datahub/subcharts/acryl-datahub-actions/Chart.yaml b/charts/datahub/subcharts/acryl-datahub-actions/Chart.yaml
index 4a94610ed..36787b77b 100644
--- a/charts/datahub/subcharts/acryl-datahub-actions/Chart.yaml
+++ b/charts/datahub/subcharts/acryl-datahub-actions/Chart.yaml
@@ -15,4 +15,4 @@ type: application
 version: 0.2.145
 # This is the version number of the application being deployed. This version number should be
 # incremented each time you make changes to the application.
-appVersion: 0.0.11
+appVersion: 0.0.16
diff --git a/charts/datahub/subcharts/acryl-datahub-actions/templates/deployment.yaml b/charts/datahub/subcharts/acryl-datahub-actions/templates/deployment.yaml
index 8c25222bd..344d6c961 100644
--- a/charts/datahub/subcharts/acryl-datahub-actions/templates/deployment.yaml
+++ b/charts/datahub/subcharts/acryl-datahub-actions/templates/deployment.yaml
@@ -142,6 +142,14 @@ spec:
             - name: PLATFORM_EVENT_TOPIC_NAME
               value: {{ .platform_event_topic_name }}
             {{- end }}
+            {{- with .Values.global.kafka.consumer_groups }}
+            - name: DATAHUB_ACTIONS_INGESTION_EXECUTOR_CONSUMER_GROUP_ID
+              value: {{ .datahub_actions_ingestion_executor_consumer_group_id }}
+            - name: DATAHUB_ACTIONS_SLACK_CONSUMER_GROUP_ID
+              value: {{ .datahub_actions_slack_consumer_group_id }}
+            - name: DATAHUB_ACTIONS_TEAMS_CONSUMER_GROUP_ID
+              value: {{ .datahub_actions_teams_consumer_group_id }}
+            {{- end }}
             {{- if .Values.global.datahub.metadata_service_authentication.enabled }}
             - name: DATAHUB_SYSTEM_CLIENT_ID
               value: {{ .Values.global.datahub.metadata_service_authentication.systemClientId }}
diff --git a/charts/datahub/subcharts/acryl-datahub-actions/values.yaml b/charts/datahub/subcharts/acryl-datahub-actions/values.yaml
index e2aca2f15..a9cd5c055 100644
--- a/charts/datahub/subcharts/acryl-datahub-actions/values.yaml
+++ b/charts/datahub/subcharts/acryl-datahub-actions/values.yaml
@@ -5,7 +5,7 @@ replicaCount: 1
 
 image:
   repository: acryldata/datahub-actions
-  tag: "v0.0.1"
+  tag: "v0.0.16"
   pullPolicy: IfNotPresent
   # Override the image's command & args with a new one.
   # This may be necessary for custom startup or shutdown behaviors
diff --git a/charts/datahub/subcharts/datahub-gms/Chart.yaml b/charts/datahub/subcharts/datahub-gms/Chart.yaml
index 65a6919bf..e2432cd2f 100644
--- a/charts/datahub/subcharts/datahub-gms/Chart.yaml
+++ b/charts/datahub/subcharts/datahub-gms/Chart.yaml
@@ -12,7 +12,7 @@ description: A Helm chart for LinkedIn DataHub's datahub-gms component
 type: application
 # This is the chart version. This version number should be incremented each time you make changes
 # to the chart and its templates, including the app version.
-version: 0.2.169
+version: 0.2.170
 # This is the version number of the application being deployed. This version number should be
 # incremented each time you make changes to the application.
 appVersion: v0.13.1
diff --git a/charts/datahub/subcharts/datahub-gms/templates/deployment.yaml b/charts/datahub/subcharts/datahub-gms/templates/deployment.yaml
index 2b3b7337e..1dd0f6010 100644
--- a/charts/datahub/subcharts/datahub-gms/templates/deployment.yaml
+++ b/charts/datahub/subcharts/datahub-gms/templates/deployment.yaml
@@ -126,8 +126,6 @@ spec:
               value: {{ printf "%s-%s-%s" .Release.Name (regexReplaceAll "[^-a-z0-9]+" .Values.global.datahub.version "-") "hazelcast-svc" | trunc 63 | trimSuffix "-" }}
             {{- end}}
             {{- if .Values.global.datahub.systemUpdate.enabled }}
-            - name: DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID
-              value: {{ printf "%s-%s" .Release.Name "duhe-consumer-job-client-gms" }}
             - name: DATAHUB_REVISION
               value: {{ .Release.Revision | quote }}
             {{- end }}
@@ -291,6 +289,24 @@ spec:
             - name: DATAHUB_UPGRADE_HISTORY_TOPIC_NAME
               value: {{ .datahub_upgrade_history_topic_name }}
             {{- end }}
+            {{- if .Values.global.datahub.systemUpdate.enabled }}
+            - name: DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID
+              value: {{ .Values.global.kafka.consumer_groups.datahub_upgrade_history_kafka_consumer_group_id.gms | default (printf "%s-%s" .Release.Name "duhe-consumer-job-client-gms") }}
+            {{- end }}
+            {{- if not .Values.global.datahub_standalone_consumers_enabled }}
+            {{- with .Values.global.kafka.consumer_groups }}
+            - name: DATAHUB_USAGE_EVENT_KAFKA_CONSUMER_GROUP_ID
+              value: {{ .datahub_usage_event_kafka_consumer_group_id }}
+            - name: METADATA_CHANGE_LOG_KAFKA_CONSUMER_GROUP_ID
+              value: {{ .metadata_change_log_kafka_consumer_group_id }}
+            - name: PLATFORM_EVENT_KAFKA_CONSUMER_GROUP_ID
+              value: {{ .platform_event_kafka_consumer_group_id }}
+            - name: METADATA_CHANGE_EVENT_KAFKA_CONSUMER_GROUP_ID
+              value: {{ .metadata_change_event_kafka_consumer_group_id }}
+            - name: METADATA_CHANGE_PROPOSAL_KAFKA_CONSUMER_GROUP_ID
+              value: {{ .metadata_change_proposal_kafka_consumer_group_id }}
+            {{- end }}
+            {{- end }}
             {{- if .Values.global.datahub.metadata_service_authentication.enabled }}
             - name: METADATA_SERVICE_AUTH_ENABLED
               value: "true"
diff --git a/charts/datahub/subcharts/datahub-mae-consumer/templates/deployment.yaml b/charts/datahub/subcharts/datahub-mae-consumer/templates/deployment.yaml
index 174328085..2a1a3e6a4 100644
--- a/charts/datahub/subcharts/datahub-mae-consumer/templates/deployment.yaml
+++ b/charts/datahub/subcharts/datahub-mae-consumer/templates/deployment.yaml
@@ -100,8 +100,6 @@ spec:
             - name: BACKFILL_BROWSE_PATHS_V2
               value: {{ .Values.global.datahub.search_and_browse.backfill_browse_v2 | quote }}
             {{- if .Values.global.datahub.systemUpdate.enabled }}
-            - name: DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID
-              value: {{ printf "%s-%s" .Release.Name "duhe-consumer-job-client-mcl" }}
             - name: DATAHUB_REVISION
               value: {{ .Release.Revision | quote }}
             {{- end }}
@@ -253,6 +251,18 @@ spec:
             - name: DATAHUB_UPGRADE_HISTORY_TOPIC_NAME
               value: {{ .datahub_upgrade_history_topic_name }}
             {{- end }}
+            {{- if .Values.global.datahub.systemUpdate.enabled }}
+            - name: DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID
+              value: {{ index .Values.global.kafka.consumer_groups.datahub_upgrade_history_kafka_consumer_group_id "mae-consumer" | default (printf "%s-%s" .Release.Name "duhe-consumer-job-client-mcl")  }}
+            {{- end }}
+            {{- with .Values.global.kafka.consumer_groups }}
+            - name: DATAHUB_USAGE_EVENT_KAFKA_CONSUMER_GROUP_ID
+              value: {{ .datahub_usage_event_kafka_consumer_group_id }}
+            - name: METADATA_CHANGE_LOG_KAFKA_CONSUMER_GROUP_ID
+              value: {{ .metadata_change_log_kafka_consumer_group_id }}
+            - name: PLATFORM_EVENT_KAFKA_CONSUMER_GROUP_ID
+              value: {{ .platform_event_kafka_consumer_group_id }}
+            {{- end }}
             - name: ALWAYS_EMIT_CHANGE_LOG
               value: {{ .Values.global.datahub.alwaysEmitChangeLog | quote }}
             - name: GRAPH_SERVICE_DIFF_MODE_ENABLED
diff --git a/charts/datahub/subcharts/datahub-mce-consumer/templates/deployment.yaml b/charts/datahub/subcharts/datahub-mce-consumer/templates/deployment.yaml
index 355f1eb38..2d61ad95d 100644
--- a/charts/datahub/subcharts/datahub-mce-consumer/templates/deployment.yaml
+++ b/charts/datahub/subcharts/datahub-mce-consumer/templates/deployment.yaml
@@ -104,8 +104,6 @@ spec:
             - name: BACKFILL_BROWSE_PATHS_V2
               value: {{ .Values.global.datahub.search_and_browse.backfill_browse_v2 | quote }}
             {{- if .Values.global.datahub.systemUpdate.enabled }}
-            - name: DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID
-              value: {{ printf "%s-%s" .Release.Name "duhe-consumer-job-client-mcp" }}
             - name: DATAHUB_REVISION
               value: {{ .Release.Revision | quote }}
             {{- end }}
@@ -266,6 +264,16 @@ spec:
             - name: DATAHUB_UPGRADE_HISTORY_TOPIC_NAME
               value: {{ .datahub_upgrade_history_topic_name }}
             {{- end }}
+            {{- if .Values.global.datahub.systemUpdate.enabled }}
+            - name: DATAHUB_UPGRADE_HISTORY_KAFKA_CONSUMER_GROUP_ID
+              value: {{ index .Values.global.kafka.consumer_groups.datahub_upgrade_history_kafka_consumer_group_id "mce-consumer" | default (printf "%s-%s" .Release.Name "duhe-consumer-job-client-mcp")  }}
+            {{- end }}
+            {{- with .Values.global.kafka.consumer_groups }}
+            - name: METADATA_CHANGE_EVENT_KAFKA_CONSUMER_GROUP_ID
+              value: {{ .metadata_change_event_kafka_consumer_group_id }}
+            - name: METADATA_CHANGE_PROPOSAL_KAFKA_CONSUMER_GROUP_ID
+              value: {{ .metadata_change_proposal_kafka_consumer_group_id }}
+            {{- end }}
             - name: ALWAYS_EMIT_CHANGE_LOG
               value: {{ .Values.global.datahub.alwaysEmitChangeLog | quote }}
             - name: GRAPH_SERVICE_DIFF_MODE_ENABLED
diff --git a/charts/datahub/values.yaml b/charts/datahub/values.yaml
index 760033491..a66475828 100644
--- a/charts/datahub/values.yaml
+++ b/charts/datahub/values.yaml
@@ -68,7 +68,7 @@ acryl-datahub-actions:
   enabled: true
   image:
     repository: acryldata/datahub-actions
-    tag: "v0.0.15"
+    tag: "v0.0.16"
     # Add custom command / arguments to this job.  Useful if you need a custom startup or shutdown script
     # to run
     # command: customCommand
@@ -587,6 +587,19 @@ global:
       metadata_change_log_timeseries_topic_name: "MetadataChangeLog_Timeseries_v1"
       platform_event_topic_name: "PlatformEvent_v1"
       datahub_upgrade_history_topic_name: "DataHubUpgradeHistory_v1"
+    consumer_groups:
+      datahub_upgrade_history_kafka_consumer_group_id: {}
+      #   gms: "<<release-name>>-duhe-consumer-job-client-gms"
+      #   mae-consumer: "<<release-name>>-duhe-consumer-job-client-mcl"
+      #   mce-consumer: "<<release-name>>-duhe-consumer-job-client-mcp"
+      datahub_actions_ingestion_executor_consumer_group_id: "ingestion_executor"
+      datahub_actions_slack_consumer_group_id: "datahub_slack_action"
+      datahub_actions_teams_consumer_group_id: "datahub_teams_action"
+      datahub_usage_event_kafka_consumer_group_id: "datahub-usage-event-consumer-job-client"
+      metadata_change_log_kafka_consumer_group_id: "generic-mae-consumer-job-client"
+      platform_event_kafka_consumer_group_id: "generic-platform-event-job-client"
+      metadata_change_event_kafka_consumer_group_id: "mce-consumer-job-client"
+      metadata_change_proposal_kafka_consumer_group_id: "generic-mce-consumer-job-client"
     maxMessageBytes: "5242880"  # 5MB
     producer:
       compressionType: none