From e1f49bef496fa688d769848198bd4a69e189b5b4 Mon Sep 17 00:00:00 2001 From: "Jaehrling, Stefan" Date: Thu, 14 Mar 2024 18:19:27 +0100 Subject: [PATCH] adding feature to use configmaps to configure kafka components --- .../kafka-kraft-on-k8s/templates/_helpers.tpl | 25 +++ .../templates/configmap_connect.yaml | 35 ++++ .../templates/configmap_controller.yaml | 24 +++ .../templates/configmap_kafka.yaml | 43 ++++ .../{stable_connect.yaml => sts_connect.yaml} | 14 ++ ...le_controller.yaml => sts_controller.yaml} | 9 + .../{stable_kafka.yaml => sts_kafka.yaml} | 17 +- charts/kafka-kraft-on-k8s/values.yaml | 194 +++++++++++++----- manifests/connect/entrypoint.sh | 47 +++-- manifests/controller/entrypoint.sh | 51 +++-- manifests/kafka/entrypoint.sh | 128 +++++++----- 11 files changed, 458 insertions(+), 129 deletions(-) create mode 100644 charts/kafka-kraft-on-k8s/templates/_helpers.tpl create mode 100644 charts/kafka-kraft-on-k8s/templates/configmap_connect.yaml create mode 100644 charts/kafka-kraft-on-k8s/templates/configmap_controller.yaml create mode 100644 charts/kafka-kraft-on-k8s/templates/configmap_kafka.yaml rename charts/kafka-kraft-on-k8s/templates/{stable_connect.yaml => sts_connect.yaml} (85%) rename charts/kafka-kraft-on-k8s/templates/{stable_controller.yaml => sts_controller.yaml} (92%) rename charts/kafka-kraft-on-k8s/templates/{stable_kafka.yaml => sts_kafka.yaml} (88%) diff --git a/charts/kafka-kraft-on-k8s/templates/_helpers.tpl b/charts/kafka-kraft-on-k8s/templates/_helpers.tpl new file mode 100644 index 0000000..f0b99bc --- /dev/null +++ b/charts/kafka-kraft-on-k8s/templates/_helpers.tpl @@ -0,0 +1,25 @@ +{{/* Define template to generate controller.quorum.voters for broker mode */}} +{{- define "kafka.controllerQuorumVoters" -}} +{{- $totalReplicas := .Values.kafka.configmap.properties.controllerReplicas -}} +{{- $service := .Values.kafka.configmap.properties.controllerService -}} +{{- $namespace := .Values.kafka.configmap.properties.namespace -}} +{{- $voters := list -}} +{{- range $i := until (int $totalReplicas) }} + {{- $voter := printf "%d@controller-%d.%s.%s.svc:9093" $i $i $service $namespace -}} + {{- $voters = append $voters $voter -}} +{{- end }} +{{- join "," $voters -}} +{{- end -}} + +{{/* Define template to generate controller.quorum.voters for combined broker and controller mode */}} +{{- define "kafka.controllerQuorumVotersCombined" -}} +{{- $totalReplicas := .Values.kafka.configmap.properties.brokerReplicas -}} +{{- $service := .Values.kafka.configmap.properties.service -}} +{{- $namespace := .Values.kafka.configmap.properties.namespace -}} +{{- $voters := list -}} +{{- range $i := until (int $totalReplicas) }} + {{- $voter := printf "%d@kafka-%d.%s.%s.svc:9093" $i $i $service $namespace -}} + {{- $voters = append $voters $voter -}} +{{- end }} +{{- join "," $voters -}} +{{- end -}} \ No newline at end of file diff --git a/charts/kafka-kraft-on-k8s/templates/configmap_connect.yaml b/charts/kafka-kraft-on-k8s/templates/configmap_connect.yaml new file mode 100644 index 0000000..72615fb --- /dev/null +++ b/charts/kafka-kraft-on-k8s/templates/configmap_connect.yaml @@ -0,0 +1,35 @@ +{{- if and .Values.connect.enabled .Values.connect.configmap.enabled (not .Values.connect.env.ENVS_ENABLED) }} +{{- $portsList := .Values.kafka.ports }} +{{- $portMap := index $portsList 0 }} +{{- $KafkaPort := $portMap.port }} +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ .Values.connect.configmap.name }} +data: + connect-distributed.properties: |- + bootstrap.servers={{ .Values.connect.configmap.properties.service }}.{{ .Values.connect.configmap.properties.namespace }}.svc:{{ $KafkaPort }} + group.id={{ .Values.connect.configmap.properties.groupId }} + key.converter={{ .Values.connect.configmap.properties.keyConverter }} + key.converter.schemas.enable={{ .Values.connect.configmap.properties.keyConverterSchemasEnable }} + value.converter={{ .Values.connect.configmap.properties.valueConverter }} + value.converter.schemas.enable={{ .Values.connect.configmap.properties.valueConverterSchemasEnable }} + offset.storage.topic={{ .Values.connect.configmap.properties.offsetStorageTopic }} + offset.storage.replication.factor={{ .Values.connect.configmap.properties.offsetStorageReplicationFactor }} + offset.storage.partitions={{ .Values.connect.configmap.properties.offsetStoragePartitions }} + offset.storage.cleanup.policy={{ .Values.connect.configmap.properties.offsetStorageCleanupPolicy }} + config.storage.topic={{ .Values.connect.configmap.properties.configStorageTopic }} + config.storage.replication.factor={{ .Values.connect.configmap.properties.configStorageReplicationFactor }} + status.storage.topic={{ .Values.connect.configmap.properties.statusStorageTopic }} + status.storage.replication.factor={{ .Values.connect.configmap.properties.statusStorageReplicationFactor }} + status.storage.partitions={{ .Values.connect.configmap.properties.statusStoragePartitions }} + listeners={{ .Values.connect.configmap.properties.listeners }} + plugin.path={{ .Values.connect.configmap.properties.pluginPath }} +{{- else }} +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ .Values.connect.configmap.name }} +data: + message: "INFO: envs are enabled. properties are not set via configmap." +{{- end }} diff --git a/charts/kafka-kraft-on-k8s/templates/configmap_controller.yaml b/charts/kafka-kraft-on-k8s/templates/configmap_controller.yaml new file mode 100644 index 0000000..714f12e --- /dev/null +++ b/charts/kafka-kraft-on-k8s/templates/configmap_controller.yaml @@ -0,0 +1,24 @@ +{{- if and .Values.controller.enabled .Values.controller.configmap.enabled (not .Values.controller.env.ENABLED) }} +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ .Values.controller.configmap.name }} +data: + {{- $processRoles := .Values.controller.configmap.properties.processRoles | trim }} + {{- if eq $processRoles "controller" }} + controller.properties: |- + process.roles={{ .Values.controller.configmap.properties.processRoles }} + log.dirs={{ .Values.controller.configmap.properties.logdirs }}/{{ add .Values.controller.configmap.properties.nodeIdOffset (int (index (splitList "-" .Release.Name) 0)) }} + listeners={{ .Values.controller.configmap.properties.listeners }} + controller.listener.names={{ .Values.controller.configmap.properties.controllerListenerNames }} + controller.listener.security.protocol.map={{ .Values.controller.configmap.properties.controllerListenerSecurityProtocolMap }} + controller.quorum.voters={{ include "kafka.controllerQuorumVoters" . }} +{{- else }} +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ .Values.controller.configmap.name }} +data: + message: "INFO: envs are enabled. properties are not set via configmap." +{{- end }} +{{- end }} diff --git a/charts/kafka-kraft-on-k8s/templates/configmap_kafka.yaml b/charts/kafka-kraft-on-k8s/templates/configmap_kafka.yaml new file mode 100644 index 0000000..83e46f1 --- /dev/null +++ b/charts/kafka-kraft-on-k8s/templates/configmap_kafka.yaml @@ -0,0 +1,43 @@ +{{- if and .Values.kafka.enabled .Values.kafka.configmap.enabled (not .Values.kafka.env.ENABLED) }} +{{- $portsList := .Values.controller.ports }} +{{- $portMap := index $portsList 0 }} +{{- $ControllerPort := $portMap.port }} +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ .Values.kafka.configmap.name }} +data: + {{- $processRoles := .Values.kafka.configmap.properties.processRoles | trim }} + {{- if eq $processRoles "broker" }} + broker.properties: |- + process.roles={{ .Values.kafka.configmap.properties.processRoles }} + listeners={{ .Values.kafka.configmap.properties.listeners }} + advertised.listeners={{ .Values.kafka.configmap.properties.advertisedListeners }} + log.dirs={{ .Values.kafka.configmap.properties.logdirs }}/{{ add .Values.kafka.configmap.properties.nodeIdOffset (int (index (splitList "-" .Release.Name) 0)) }} + num.partitions={{ .Values.kafka.configmap.properties.kafkaNumPartitions }} + controller.listener.names={{ .Values.kafka.configmap.properties.controllerListenerNames }} + inter.broker.listener.name={{ .Values.kafka.configmap.properties.interBrokerListenerName }} + listener.security.protocol.map={{ .Values.kafka.configmap.properties.controllerListenerSecurityProtocolMap }} + controller.quorum.voters={{ include "kafka.controllerQuorumVoters" . }} + {{- else if and (regexMatch "broker" .Values.kafka.configmap.properties.processRoles) (regexMatch "controller" .Values.kafka.configmap.properties.processRoles) }} + server.properties: |- + process.roles={{ .Values.kafka.configmap.properties.processRoles }} + listeners={{ .Values.kafka.configmap.properties.listeners }} + advertised.listeners={{ .Values.kafka.configmap.properties.advertisedListeners }} + log.dirs={{ .Values.kafka.configmap.properties.logdirs }}/{{ add .Values.kafka.configmap.properties.nodeIdOffset (int (index (splitList "-" .Release.Name) 0)) }} + num.partitions={{ .Values.kafka.configmap.properties.kafkaNumPartitions }} + default.replication.factor={{ .Values.kafka.configmap.properties.defaultReplicationFactor }} + auto.create.topics.enable={{ .Values.kafka.configmap.properties.autoCreateTopicsEnable }} + controller.listener.names={{ .Values.kafka.configmap.properties.controllerListenerNames }} + inter.broker.listener.name={{ .Values.kafka.configmap.properties.interBrokerListenerName }} + listener.security.protocol.map={{ .Values.kafka.configmap.properties.controllerListenerSecurityProtocolMap }} + controller.quorum.voters={{ include "kafka.controllerQuorumVotersCombined" . }} + {{- end }} +{{- else }} +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ .Values.kafka.configmap.name }} +data: + message: "INFO: envs are enabled. properties are not set via configmap." +{{- end }} \ No newline at end of file diff --git a/charts/kafka-kraft-on-k8s/templates/stable_connect.yaml b/charts/kafka-kraft-on-k8s/templates/sts_connect.yaml similarity index 85% rename from charts/kafka-kraft-on-k8s/templates/stable_connect.yaml rename to charts/kafka-kraft-on-k8s/templates/sts_connect.yaml index 0dba41c..8c93166 100644 --- a/charts/kafka-kraft-on-k8s/templates/stable_connect.yaml +++ b/charts/kafka-kraft-on-k8s/templates/sts_connect.yaml @@ -3,6 +3,9 @@ apiVersion: apps/v1 kind: StatefulSet metadata: name: {{ .Values.connect.name }} + annotations: + "helm.sh/hook": pre-install + "helm.sh/hook-weight": "3" labels: {{- range $key, $value := .Values.connect.labels }} {{ $key }}: {{ $value }} @@ -80,4 +83,15 @@ spec: cpu: {{ .Values.connect.resources.requests.cpu }} memory: {{ .Values.connect.resources.requests.memory }} {{- end }} + volumeMounts: + {{- range .Values.connect.volumeMounts }} + - name: {{ .name | quote }} + mountPath: {{ .mountPath }} + {{- end }} + volumes: + {{- range .Values.connect.volumes }} + - name: {{ .name | quote }} + configMap: + name: {{ $.Values.connect.configmap.name | quote }} + {{- end }} {{- end }} \ No newline at end of file diff --git a/charts/kafka-kraft-on-k8s/templates/stable_controller.yaml b/charts/kafka-kraft-on-k8s/templates/sts_controller.yaml similarity index 92% rename from charts/kafka-kraft-on-k8s/templates/stable_controller.yaml rename to charts/kafka-kraft-on-k8s/templates/sts_controller.yaml index 0a43724..b415832 100644 --- a/charts/kafka-kraft-on-k8s/templates/stable_controller.yaml +++ b/charts/kafka-kraft-on-k8s/templates/sts_controller.yaml @@ -3,6 +3,9 @@ apiVersion: apps/v1 kind: StatefulSet metadata: name: {{ .Values.controller.name }} + annotations: + "helm.sh/hook": pre-install + "helm.sh/hook-weight": "1" labels: {{- range $key, $value := .Values.controller.labels }} {{ $key }}: {{ $value }} @@ -85,6 +88,12 @@ spec: - name: {{ .name | quote }} mountPath: {{ .mountPath }} {{- end }} + volumes: + {{- range .Values.controller.volumes }} + - name: {{ .name | quote }} + configMap: + name: {{ $.Values.controller.configmap.name | quote }} + {{- end }} volumeClaimTemplates: {{- range .Values.controller.volumeClaimTemplates }} diff --git a/charts/kafka-kraft-on-k8s/templates/stable_kafka.yaml b/charts/kafka-kraft-on-k8s/templates/sts_kafka.yaml similarity index 88% rename from charts/kafka-kraft-on-k8s/templates/stable_kafka.yaml rename to charts/kafka-kraft-on-k8s/templates/sts_kafka.yaml index be2a1b4..b90c54e 100644 --- a/charts/kafka-kraft-on-k8s/templates/stable_kafka.yaml +++ b/charts/kafka-kraft-on-k8s/templates/sts_kafka.yaml @@ -3,6 +3,9 @@ apiVersion: apps/v1 kind: StatefulSet metadata: name: {{ .Values.kafka.name }} + annotations: + "helm.sh/hook": pre-install + "helm.sh/hook-weight": "2" labels: {{- range $key, $value := .Values.kafka.labels }} {{ $key }}: {{ $value }} @@ -60,13 +63,13 @@ spec: livenessProbe: tcpSocket: port: {{ $port.port }} - initialDelaySeconds: 5 - periodSeconds: 3 + initialDelaySeconds: 15 + periodSeconds: 10 readinessProbe: tcpSocket: port: {{ $port.port }} - initialDelaySeconds: 5 - periodSeconds: 3 + initialDelaySeconds: 15 + periodSeconds: 10 {{- end }} {{- end }} resources: @@ -85,6 +88,12 @@ spec: - name: {{ .name | quote }} mountPath: {{ .mountPath }} {{- end }} + volumes: + {{- range .Values.kafka.volumes }} + - name: {{ .name | quote }} + configMap: + name: {{ $.Values.kafka.configmap.name | quote }} + {{- end }} volumeClaimTemplates: {{- range .Values.kafka.volumeClaimTemplates }} diff --git a/charts/kafka-kraft-on-k8s/values.yaml b/charts/kafka-kraft-on-k8s/values.yaml index 213fab8..73969ad 100644 --- a/charts/kafka-kraft-on-k8s/values.yaml +++ b/charts/kafka-kraft-on-k8s/values.yaml @@ -4,40 +4,64 @@ global: notes: enabled: true - -# -# Warning: ! ENVs are mandatory and hard defined in the entrypoint.sh on image build level. -# - - # --------------------------------------------------------------------------------------------------------------------------------------- # # Kafka KRaft # -# See: https://kafka.apache.org/documentation/#kafkakraft +# See: https://kafka.apache.org/documentation/#kraft # --------------------------------------------------------------------------------------------------------------------------------------- # kafka: enabled: true + configmap: + enabled: true # switch to enable kafka properties via configmap + name: &kafka-properties-cm kafka-properties-cm + + properties: + service: &kafka-service kafka-svc + controllerService: &controller-service controller-svc + namespace: &namespace kafka-test + brokerReplicas: &kafka_replicas 3 + # + processRoles: &process-roles broker # "broker" or "broker,controller" + clusterId: &clusterid QwjfU6MPQ_CMdFsbCx7EGg + autoCreateTopicsEnable: &auto-create-topics-enable true + logdirs: &logdirs /mnt/kafka # "/mnt/kafka" -> no trailing slash + # + interBrokerListenerName: &inter-broker-listener-name PLAINTEXT + advertisedListeners: &advertised-listeners PLAINTEXT://:9092 + listeners: &listeners PLAINTEXT://:9092 # ,CONTROLLER://:9093 + listenersecurityprotocolmap: &listener-security-protocol-map PLAINTEXT:PLAINTEXT,JMX:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL + # + controllerReplicas: *kafka_replicas + controllerListenerNames: &controller-listener-names CONTROLLER + controllerListenerSecurityProtocolMap: &controller-listener-security-protocol-map CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,JMX:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL + # + defaultReplicationFactor: &default-replication-factor 3 + kafkaNumPartitions: &kafka-num-partitions 8 + env: - PROCESS_ROLES: 'broker' # Must be equal to either 'broker' OR 'broker, controller'. # broker node.ids = {id + 3}, empty == ZooKeeper mode NOT integrated - REPLICAS: &kafka_replicas 3 - SERVICE: &service kafka-svc - CONTROLLER_SERVICE: &controller-service controller-svc + DEBUG: false # activates debug mode in the pod which executes a sleeping process for a long time. + ENVS_ENABLED: false # necessary switch to enable kafka properties via envs + PROCESS_ROLES: *process-roles + NODE_ID_OFFSET: 1000 # necessary offset for the node id of the brokers, when broker and controller are splitted into seperate clusters by processRoles. see https://kafka.apache.org/documentation/#kraft + REPLICAS: *kafka_replicas + SERVICE: *kafka-service + ADVERTISED_LISTENERS: *advertised-listeners + LISTENERS: *listeners + CONTROLLER_SERVICE: *controller-service CONTROLLER_REPLICAS: *kafka_replicas - CONTROLLER_LISTENER_NAMES: &controller-listener-names CONTROLLER - CONTROLLER_LISTENER_SECURITY_PROTOCOL_MAP: &controller-listener-security-protocol-map CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,JMX:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL - INTER_BROKER_LISTENER_NAME: &inter-broker-listener-name PLAINTEXT - NAMESPACE: &namespace kafka-test - SHARE_DIR: /mnt/kafka + CONTROLLER_LISTENER_NAMES: *controller-listener-names + CONTROLLER_LISTENER_SECURITY_PROTOCOL_MAP: *controller-listener-security-protocol-map + INTER_BROKER_LISTENER_NAME: *inter-broker-listener-name + NAMESPACE: *namespace + SHARE_DIR: *logdirs # $ kafka-storage.sh random-uuid # 16 bytes # see docs: https://kafka.apache.org/33/documentation.html#quickstart_startserver - CLUSTER_ID: &clusterid QwjfU6MPQ_CMdFsbCx7EGg - DEFAULT_REPLICATION_FACTOR: '3' - DEFAULT_MIN_INSYNC_REPLICAS: '2' - KAFKA_NUM_PARTITIONS: '8' - AUTO_CREATE_TOPICS_ENABLE: 'true' - DEBUG: 'false' + CLUSTER_ID: *clusterid + DEFAULT_REPLICATION_FACTOR: *default-replication-factor + KAFKA_NUM_PARTITIONS: *kafka-num-partitions + AUTO_CREATE_TOPICS_ENABLE: *auto-create-topics-enable name: kafka namespace: *namespace @@ -46,8 +70,7 @@ kafka: image: repository: kafkakraft/kafkakraft pullPolicy: IfNotPresent - # Overrides the image tag whose default is the chart appVersion. - tag: &tag "3.6.1" + tag: &tag "3.7.0" imagePullSecrets: [] container: name: kafka-container @@ -140,9 +163,9 @@ kafka: # # The following is an example of a service definition. service: - name: *service + name: *kafka-service labels: - app: *service + app: *kafka-service # Annotations to add to the service # e.g. to enable JMX metrics via Prometheus annotations: @@ -161,7 +184,7 @@ kafka: # Annotations to add to the service account annotations: {} labels: - app: *service + app: *kafka-service rbac: create: false @@ -195,6 +218,12 @@ kafka: volumeMounts: - name: &volumename kafka-storage-data mountPath: /mnt/kafka + - name: &kafka-properties-cm-vol kafka-properties-cm-vol + mountPath: /opt/kafka/config/kraft/properties/ + + volumes: + - name: *kafka-properties-cm-vol + emptyDir: {} # volumeClaimTemplates for the volumeMounts volumeClaimTemplates: @@ -209,23 +238,44 @@ kafka: # --------------------------------------------------------------------------------------------------------------------------------------- # # Kafka Controller # -# see https://kafka.apache.org/documentation/#kraft +# see https://kafka.apache.org/documentation/#brokerconfigs_controller.listener.names # --------------------------------------------------------------------------------------------------------------------------------------- # controller: enabled: true + configmap: + enabled: true # switch to enable kafka properties via configmap + name: &controller-properties-cm controller-properties-cm + + properties: + service: *kafka-service + namespace: *namespace + # + processRoles: &controller-process-roles 'controller' # "broker" or "broker,controller" + clusterId: *clusterid + # + listeners: &controller-listeners CONTROLLER://:9093 + logdirs: *logdirs # "/mnt/kafka" -> no trailing slash + # + controllerReplicas: *kafka_replicas + controllerService: *controller-service + controllerListenerNames: *controller-listener-names + controllerListenerSecurityProtocolMap: *controller-listener-security-protocol-map + env: - PROCESS_ROLES: controller #! Should be always 'controller'. DonĀ“t change that until you know what you do. + DEBUG: false # activates debug mode in the pod which executes a sleeping process for a long time. + ENVS_ENABLED: false # necessary switch to enable controller properties via envs + PROCESS_ROLES: *controller-process-roles #! Should be always 'controller'. Don't change that until you know what you do. CONTROLLER_REPLICAS: *kafka_replicas # matches the replicaCount of Kafka - SERVICE: *service + SERVICE: *kafka-service CONTROLLER_SERVICE: *controller-service + CONTROLLER_LISTENERS: *controller-listeners CONTROLLER_LISTENER_NAMES: *controller-listener-names CONTROLLER_LISTENER_SECURITY_PROTOCOL_MAP: *controller-listener-security-protocol-map NAMESPACE: *namespace - SHARE_DIR: /mnt/kafka + SHARE_DIR: *logdirs CLUSTER_ID: *clusterid - DEBUG: false name: controller namespace: *namespace @@ -234,7 +284,6 @@ controller: image: repository: kafkakraft/kafka-controller pullPolicy: IfNotPresent - # Overrides the image tag whose default is the chart appVersion. tag: *tag imagePullSecrets: [] container: @@ -337,7 +386,13 @@ controller: volumeMounts: - name: &volumename kafka-storage-data - mountPath: /mnt/kafka + mountPath: *logdirs + - name: &controller-properties-cm-vol controller-properties-cm-vol + mountPath: /opt/kafka/config/kraft/properties/ + + volumes: + - name: *controller-properties-cm-vol + emptyDir: {} # volumeClaimTemplates for the volumeMounts volumeClaimTemplates: @@ -351,28 +406,62 @@ controller: # --------------------------------------------------------------------------------------------------------------------------------------- # # Kafka Connect +# +# see https://kafka.apache.org/documentation/#connect # --------------------------------------------------------------------------------------------------------------------------------------- # connect: enabled: true + configmap: + enabled: true # switch to enable kafka properties via configmap + name: &connect-properties-cm connect-properties-cm + + properties: + service: *kafka-service + service-connect: &service-connect connect-svc + namespace: *namespace + # + keyConverter: &keyconverter org.apache.kafka.connect.json.JsonConverter + keyConverterSchemasEnable: &keyconverterschemasenable true + valueConverter: &valueconverter org.apache.kafka.connect.json.JsonConverter + valueConverterSchemasEnable: &valueconverterschemasenable true + # + offsetStorageTopic: connect-offsets + offsetFlushIntervalMs: 10000 + offsetStorageReplicationFactor: &offsetstoragereplicationfactor 3 + offsetStoragePartitions: &offsetstoragepartitions 25 + offsetStorageCleanupPolicy: &offsetstoragecleanuppolicy compact + # + configStorageTopic: connect-configs + configStorageReplicationFactor: 1 + # + statusStorageTopic: connect-status + statusStorageReplicationFactor: &statusstoragereplicationfactor 3 + statusStoragePartitions: &statusstoragepartitions 5 + # + groupId: kafka-connect-cluster + listeners: &connect-listeners HTTP://:8083 + pluginPath: &pluginpath /opt/kafka/libs + env: + DEBUG: false # activates debug mode in the pod which executes a sleeping process for a long time. + ENVS_ENABLED: false # necessary switch to enable connect properties via envs REPLICAS: &connect_replicas 3 - SERVICE: *service - SERVICE_CONNECT: &service-connect connect-svc + SERVICE: *kafka-service + SERVICE_CONNECT: *service-connect NAMESPACE: *namespace - DEBUG: 'false' - KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter - VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter - KEY_CONVERTER_SCHEMAS_ENABLE: true - VALUE_CONVERTER_SCHEMAS_ENABLE: true - OFFSET_STORAGE_REPLICATION_FACTOR: 3 - OFFSET_STORAGE_PARTITIONS: 25 - STATUS_STORAGE_REPLICATION_FACTOR: 3 - STATUS_STORAGE_PARTITIONS: 5 - OFFSET_STORAGE_CLEANUP_POLICY: compact - LISTENERS: "HTTP://:8083" - PLUGIN_PATH: "/opt/kafka/libs" + KEY_CONVERTER: *keyconverter + VALUE_CONVERTER: *valueconverter + KEY_CONVERTER_SCHEMAS_ENABLE: *keyconverterschemasenable + VALUE_CONVERTER_SCHEMAS_ENABLE: *valueconverterschemasenable + OFFSET_STORAGE_REPLICATION_FACTOR: *offsetstoragereplicationfactor + OFFSET_STORAGE_PARTITIONS: *offsetstoragepartitions + STATUS_STORAGE_REPLICATION_FACTOR: *statusstoragereplicationfactor + STATUS_STORAGE_PARTITIONS: *statusstoragepartitions + OFFSET_STORAGE_CLEANUP_POLICY: *offsetstoragecleanuppolicy + LISTENERS: *connect-listeners + PLUGIN_PATH: *pluginpath name: connect namespace: *namespace @@ -381,7 +470,6 @@ connect: image: repository: kafkakraft/kafka-connect pullPolicy: IfNotPresent - # Overrides the image tag whose default is the chart appVersion. tag: *tag imagePullSecrets: [] container: @@ -487,3 +575,11 @@ connect: - get - list - watch + + volumeMounts: + - name: &connect-properties-cm-vol connect-properties-cm-vol + mountPath: /opt/kafka/config/connect/ + + volumes: + - name: *connect-properties-cm-vol + emptyDir: {} diff --git a/manifests/connect/entrypoint.sh b/manifests/connect/entrypoint.sh index 8ca4646..d2e4c3e 100644 --- a/manifests/connect/entrypoint.sh +++ b/manifests/connect/entrypoint.sh @@ -1,18 +1,35 @@ #!/bin/bash -sed -e "s+^bootstrap.servers=.*+bootstrap.servers=$SERVICE.$NAMESPACE.svc:9092+" \ - -e "s+^key.converter=.*+key.converter=$KEY_CONVERTER+" \ - -e "s+^value.converter=.*+value.converter=$VALUE_CONVERTER+" \ - -e "s+^key.converter.schemas.enable=.*+key.converter.schemas.enable=$KEY_CONVERTER_SCHEMAS_ENABLE+" \ - -e "s+^value.converter.schemas.enable=.*+value.converter.schemas.enable=$VALUE_CONVERTER_SCHEMAS_ENABLE+" \ - -e "s+^offset.storage.replication.factor=.*+offset.storage.replication.factor=$OFFSET_STORAGE_REPLICATION_FACTOR+" \ - -e "s+^offset.storage.partitions=.*+offset.storage.partitions=$OFFSET_STORAGE_PARTITIONS+" \ - -e "s+^offset.storage.cleanup.policy=.*+offset.storage.cleanup.policy=$OFFSET_STORAGE_CLEANUP_POLICY+" \ - -e "s+^status.storage.replication.factor=.*+status.storage.replication.factor=$STATUS_STORAGE_REPLICATION_FACTOR+" \ - -e "s+^status.storage.partitions=.*+status.storage.partitions=$STATUS_STORAGE_PARTITIONS+" \ - -e "s+^listeners=.*+listeners=$LISTENERS+" \ - -e "s+^plugin.path=.*+plugin.path=$PLUGIN_PATH+" \ - /opt/kafka/config/connect-distributed.properties > /opt/kafka/config/connect-distributed.properties.updated \ - && mv /opt/kafka/config/connect-distributed.properties.updated /opt/kafka/config/connect-distributed.properties +if [[ $DEBUG == true ]]; then + echo "DEBUG mode activated, sleep for a long time.." + sleep 133333333333333337 + exit 1 +fi -exec connect-distributed.sh /opt/kafka/config/connect-distributed.properties +if [[ $ENVS_ENABLED == true ]]; then + + sed -e "s+^bootstrap.servers=.*+bootstrap.servers=$SERVICE.$NAMESPACE.svc:9092+" \ + -e "s+^key.converter=.*+key.converter=$KEY_CONVERTER+" \ + -e "s+^value.converter=.*+value.converter=$VALUE_CONVERTER+" \ + -e "s+^key.converter.schemas.enable=.*+key.converter.schemas.enable=$KEY_CONVERTER_SCHEMAS_ENABLE+" \ + -e "s+^value.converter.schemas.enable=.*+value.converter.schemas.enable=$VALUE_CONVERTER_SCHEMAS_ENABLE+" \ + -e "s+^offset.storage.replication.factor=.*+offset.storage.replication.factor=$OFFSET_STORAGE_REPLICATION_FACTOR+" \ + -e "s+^offset.storage.partitions=.*+offset.storage.partitions=$OFFSET_STORAGE_PARTITIONS+" \ + -e "s+^offset.storage.cleanup.policy=.*+offset.storage.cleanup.policy=$OFFSET_STORAGE_CLEANUP_POLICY+" \ + -e "s+^status.storage.replication.factor=.*+status.storage.replication.factor=$STATUS_STORAGE_REPLICATION_FACTOR+" \ + -e "s+^status.storage.partitions=.*+status.storage.partitions=$STATUS_STORAGE_PARTITIONS+" \ + -e "s+^listeners=.*+listeners=$LISTENERS+" \ + -e "s+^plugin.path=.*+plugin.path=$PLUGIN_PATH+" \ + /opt/kafka/config/connect-distributed.properties > /opt/kafka/config/connect-distributed.properties.updated \ + && mv /opt/kafka/config/connect-distributed.properties.updated /opt/kafka/config/connect-distributed.properties + + echo "[$(date +'%F %T.%3N')] INFO Kafka Connect properties loaded via ENVs" + exec connect-distributed.sh /opt/kafka/config/connect-distributed.properties + +elif [[ $ENVS_ENABLED == false ]]; then + echo "[$(date +'%F %T.%3N')] INFO Kafka Connect properties loaded via ConfigMap" + exec connect-distributed.sh /opt/kafka/config/connect/connect-distributed.properties +else + echo "Invalid helm value: $ENVS_ENABLED; values.yaml-line:43" + sleep 1337 +fi diff --git a/manifests/controller/entrypoint.sh b/manifests/controller/entrypoint.sh index b84aa50..1de54df 100644 --- a/manifests/controller/entrypoint.sh +++ b/manifests/controller/entrypoint.sh @@ -1,5 +1,11 @@ #!/bin/bash +if [[ $DEBUG == true ]]; then + echo "DEBUG mode activated, sleep for a long time.." + sleep 133333333333333337 + exit 1 +fi + NODE_ID=$(echo $HOSTNAME | sed 's/[^0-9]*//g') LISTENERS="CONTROLLER://:9093" @@ -11,16 +17,35 @@ CONTROLLER_QUORUM_VOTERS=$(echo $CONTROLLER_QUORUM_VOTERS | sed 's/,$//') mkdir -p $SHARE_DIR/$NODE_ID - ADVERTISED_LISTENERS="" - sed -e "s+^controller.quorum.voters=.*+controller.quorum.voters=$CONTROLLER_QUORUM_VOTERS+" \ - -e "s+^listeners=.*+listeners=$LISTENERS+" \ - -e "s+^log.dirs=.*+log.dirs=$SHARE_DIR/$NODE_ID+" \ - -e "s+^node.id=.*+node.id=$NODE_ID+" \ - -e "s+^process.roles=.*+process.roles=$PROCESS_ROLES+" \ - -e "s+^controller.listeners.names=.*+controller.listeners.names=$CONTROLLER_LISTENER_NAMES+" \ - -e "s+^listener.security.protocol.map=.*+listener.security.protocol.map=$CONTROLLER_LISTENER_SECURITY_PROTOCOL_MAP+" \ - /opt/kafka/config/kraft/controller.properties > /opt/kafka/controller.properties.updated \ - && mv /opt/kafka/controller.properties.updated /opt/kafka/config/kraft/controller.properties - -kafka-storage.sh format -t $CLUSTER_ID -c /opt/kafka/config/kraft/controller.properties -exec kafka-server-start.sh /opt/kafka/config/kraft/controller.properties +if [[ $ENVS_ENABLED == true ]]; then + if [[ $PROCESS_ROLES == 'controller' ]]; then + + sed -e "s+^process.roles=.*+process.roles=$PROCESS_ROLES+" \ + -e "s+^log.dirs=.*+log.dirs=$SHARE_DIR/$NODE_ID+" \ + -e "s+^node.id=.*+node.id=$NODE_ID+" \ + -e "s+^listeners=.*+listeners=$CONTROLLER_LISTENERS+" \ + -e "s+^controller.listeners.names=.*+controller.listeners.names=$CONTROLLER_LISTENER_NAMES+" \ + -e "s+^listener.security.protocol.map=.*+listener.security.protocol.map=$CONTROLLER_LISTENER_SECURITY_PROTOCOL_MAP+" \ + -e "s+^controller.quorum.voters=.*+controller.quorum.voters=$CONTROLLER_QUORUM_VOTERS+" \ + /opt/kafka/config/kraft/controller.properties > /opt/kafka/controller.properties.updated \ + && mv /opt/kafka/controller.properties.updated /opt/kafka/config/kraft/controller.properties + + kafka-storage.sh format -t $CLUSTER_ID -c /opt/kafka/config/kraft/controller.properties + echo "[$(date +'%F %T.%3N')] INFO Kafka Controller properties loaded via ENVs" + exec kafka-server-start.sh /opt/kafka/config/kraft/controller.properties + fi + +elif [[ $ENVS_ENABLED == false ]]; then + if [[ $PROCESS_ROLES == 'controller' ]]; then + + cp /opt/kafka/config/kraft/properties/controller.properties /opt/kafka/config/kraft/controller.properties + echo -e "\nnode.id=$NODE_ID" >> /opt/kafka/config/kraft/controller.properties + kafka-storage.sh format -t $CLUSTER_ID -c /opt/kafka/config/kraft/controller.properties + echo "[$(date +'%F %T.%3N')] INFO Kafka Controller properties loaded via ConfigMap" + exec kafka-server-start.sh /opt/kafka/config/kraft/controller.properties + fi + +else + echo "Invalid helm value: $ENVS_ENABLED; values.yaml-line:43" + sleep 1337 +fi diff --git a/manifests/kafka/entrypoint.sh b/manifests/kafka/entrypoint.sh index 8176d33..2497d89 100644 --- a/manifests/kafka/entrypoint.sh +++ b/manifests/kafka/entrypoint.sh @@ -1,60 +1,92 @@ #!/bin/bash -NODE_ID=$(echo $HOSTNAME|sed 's/[^0-9]*//g') +export NODE_ID=$(echo $HOSTNAME|sed 's/[^0-9]*//g') mkdir -p $SHARE_DIR/$NODE_ID -if [[ $PROCESS_ROLES == "broker" ]]; then - - LISTENERS="PLAINTEXT://:9092" - NODE_ID=$((NODE_ID + 3)) - - for i in $(seq 0 $(($CONTROLLER_REPLICAS - 1))); do - CONTROLLER_QUORUM_VOTERS="$CONTROLLER_QUORUM_VOTERS$i@controller-$i.$CONTROLLER_SERVICE.$NAMESPACE.svc:9093," - done - CONTROLLER_QUORUM_VOTERS=$(echo $CONTROLLER_QUORUM_VOTERS | sed 's/,$//') - - sed -e "s+^process.roles=.*+process.roles=$PROCESS_ROLES+" \ - -e "s+^node.id=.*+node.id=$NODE_ID+" \ - -e "s+^broker.id=.*+broker.id=$NODE_ID+" \ - -e "s+^listeners=.*+listeners=$LISTENERS+" \ - -e "s+^advertised.listeners=.*+advertised.listeners=$LISTENERS+" \ - -e "s+^log.dirs=.*+log.dirs=$SHARE_DIR/$NODE_ID+" \ - -e "s+^controller.quorum.voters=.*+controller.quorum.voters=$CONTROLLER_QUORUM_VOTERS+" \ - -e "s+^num.partitions=.*+num.partitions=$KAFKA_NUM_PARTITIONS+" \ - -e "s+^controller.listeners.names=.*+controller.listeners.names=$CONTROLLER_LISTENER_NAMES+" \ - -e "s+^inter.broker.listener.name=.*+inter.broker.listener.name=$INTER_BROKER_LISTENER_NAME+" \ - -e "s+^listener.security.protocol.map=.*+listener.security.protocol.map=$CONTROLLER_LISTENER_SECURITY_PROTOCOL_MAP+" \ - /opt/kafka/config/kraft/broker.properties > /opt/kafka/broker.properties.updated \ - && mv /opt/kafka/broker.properties.updated /opt/kafka/config/kraft/broker.properties - - kafka-storage.sh format -t $CLUSTER_ID -c /opt/kafka/config/kraft/broker.properties - exec kafka-server-start.sh /opt/kafka/config/kraft/broker.properties +if [[ $DEBUG == true ]]; then + echo "DEBUG mode activated, sleep for a long time.." + sleep 133333333333333337 + exit 1 fi -if [[ $PROCESS_ROLES =~ broker && $PROCESS_ROLES =~ controller ]]; then +if [[ $ENVS_ENABLED == true ]]; then + if [[ $PROCESS_ROLES == "broker" ]]; then - ADVERTISED_LISTENERS="PLAINTEXT://kafka-$NODE_ID.$SERVICE.$NAMESPACE.svc:9092" - LISTENERS="PLAINTEXT://:9092,CONTROLLER://:9093" - CONTROLLER_QUORUM_VOTERS="" + # LISTENERS="PLAINTEXT://:9092" + NODE_ID=$(($NODE_ID + $NODE_ID_OFFSET)) - for i in $(seq 0 $(($REPLICAS - 1))); do - CONTROLLER_QUORUM_VOTERS="$CONTROLLER_QUORUM_VOTERS$i@kafka-$i.$SERVICE.$NAMESPACE.svc:9093," - done - CONTROLLER_QUORUM_VOTERS=$(echo $CONTROLLER_QUORUM_VOTERS | sed 's/,$//') + for i in $(seq 0 $(($CONTROLLER_REPLICAS - 1))); do + CONTROLLER_QUORUM_VOTERS="$CONTROLLER_QUORUM_VOTERS$i@controller-$i.$CONTROLLER_SERVICE.$NAMESPACE.svc:9093," + done + CONTROLLER_QUORUM_VOTERS=$(echo $CONTROLLER_QUORUM_VOTERS | sed 's/,$//') - sed -e "s+^node.id=.*+node.id=$NODE_ID+" \ - -e "s+^num.partitions=.*+num.partitions=$KAFKA_NUM_PARTITIONS+" \ - -e "s+^controller.quorum.voters=.*+controller.quorum.voters=$CONTROLLER_QUORUM_VOTERS+" \ - -e "s+^listeners=.*+listeners=$LISTENERS+" \ - -e "s+^advertised.listeners=.*+advertised.listeners=$ADVERTISED_LISTENERS+" \ - -e "s+^log.dirs=.*+log.dirs=$SHARE_DIR/$NODE_ID+" \ - /opt/kafka/config/kraft/server.properties > /opt/kafka/server.properties.updated \ - && mv /opt/kafka/server.properties.updated /opt/kafka/config/kraft/server.properties + sed -e "s+^process.roles=.*+process.roles=$PROCESS_ROLES+" \ + -e "s+^node.id=.*+node.id=$NODE_ID+" \ + -e "s+^broker.id=.*+broker.id=$NODE_ID+" \ + -e "s+^listeners=.*+listeners=$LISTENERS+" \ + -e "s+^advertised.listeners=.*+advertised.listeners=$LISTENERS+" \ + -e "s+^log.dirs=.*+log.dirs=$SHARE_DIR/$NODE_ID+" \ + -e "s+^controller.quorum.voters=.*+controller.quorum.voters=$CONTROLLER_QUORUM_VOTERS+" \ + -e "s+^num.partitions=.*+num.partitions=$KAFKA_NUM_PARTITIONS+" \ + -e "s+^controller.listeners.names=.*+controller.listeners.names=$CONTROLLER_LISTENER_NAMES+" \ + -e "s+^inter.broker.listener.name=.*+inter.broker.listener.name=$INTER_BROKER_LISTENER_NAME+" \ + -e "s+^listener.security.protocol.map=.*+listener.security.protocol.map=$CONTROLLER_LISTENER_SECURITY_PROTOCOL_MAP+" \ + /opt/kafka/config/kraft/broker.properties > /opt/kafka/broker.properties.updated \ + && mv /opt/kafka/broker.properties.updated /opt/kafka/config/kraft/broker.properties - echo "default.replication.factor=$DEFAULT_REPLICATION_FACTOR" >> /opt/kafka/config/kraft/server.properties - echo "auto.create.topics.enable=$AUTO_CREATE_TOPICS_ENABLE" >> /opt/kafka/config/kraft/server.properties + kafka-storage.sh format -t $CLUSTER_ID -c /opt/kafka/config/kraft/broker.properties + echo "[$(date +'%F %T.%3N')] INFO Kafka KRaft properties loaded via ENVs" + exec kafka-server-start.sh /opt/kafka/config/kraft/broker.properties + fi - kafka-storage.sh format -t $CLUSTER_ID -c /opt/kafka/config/kraft/server.properties - exec kafka-server-start.sh /opt/kafka/config/kraft/server.properties -fi + if [[ $PROCESS_ROLES =~ broker && $PROCESS_ROLES =~ controller ]]; then + + # ADVERTISED_LISTENERS="PLAINTEXT://kafka-$NODE_ID.$SERVICE.$NAMESPACE.svc:9092" + # LISTENERS="PLAINTEXT://:9092,CONTROLLER://:9093" + CONTROLLER_QUORUM_VOTERS="" + + for i in $(seq 0 $(($REPLICAS - 1))); do + CONTROLLER_QUORUM_VOTERS="$CONTROLLER_QUORUM_VOTERS$i@kafka-$i.$SERVICE.$NAMESPACE.svc:9093," + done + CONTROLLER_QUORUM_VOTERS=$(echo $CONTROLLER_QUORUM_VOTERS | sed 's/,$//') + + sed -e "s+^node.id=.*+node.id=$NODE_ID+" \ + -e "s+^num.partitions=.*+num.partitions=$KAFKA_NUM_PARTITIONS+" \ + -e "s+^controller.quorum.voters=.*+controller.quorum.voters=$CONTROLLER_QUORUM_VOTERS+" \ + -e "s+^listeners=.*+listeners=$LISTENERS+" \ + -e "s+^advertised.listeners=.*+advertised.listeners=$ADVERTISED_LISTENERS+" \ + -e "s+^log.dirs=.*+log.dirs=$SHARE_DIR/$NODE_ID+" \ + /opt/kafka/config/kraft/server.properties > /opt/kafka/server.properties.updated \ + && mv /opt/kafka/server.properties.updated /opt/kafka/config/kraft/server.properties + + echo "default.replication.factor=$DEFAULT_REPLICATION_FACTOR" >> /opt/kafka/config/kraft/server.properties + echo "auto.create.topics.enable=$AUTO_CREATE_TOPICS_ENABLE" >> /opt/kafka/config/kraft/server.properties + + kafka-storage.sh format -t $CLUSTER_ID -c /opt/kafka/config/kraft/server.properties + echo "[$(date +'%F %T.%3N')] INFO Kafka KRaft properties loaded via ENVs" + exec kafka-server-start.sh /opt/kafka/config/kraft/server.properties + fi +elif [[ $ENVS_ENABLED == false ]]; then + if [[ $PROCESS_ROLES == 'broker' ]]; then + + cp /opt/kafka/config/kraft/properties/broker.properties /opt/kafka/config/kraft/broker.properties + echo -e "\nnode.id=$(($NODE_ID + $NODE_ID_OFFSET))" >> /opt/kafka/config/kraft/broker.properties + kafka-storage.sh format -t $CLUSTER_ID -c /opt/kafka/config/kraft/broker.properties + echo "[$(date +'%F %T.%3N')] INFO Kafka KRaft properties loaded via ConfigMap" + exec kafka-server-start.sh /opt/kafka/config/kraft/broker.properties + fi + + if [[ $PROCESS_ROLES =~ 'broker' && $PROCESS_ROLES =~ 'controller' ]]; then + + cp /opt/kafka/config/kraft/properties/server.properties /opt/kafka/config/kraft/server.properties + echo -e "\nnode.id=$NODE_ID" >> /opt/kafka/config/kraft/server.properties + kafka-storage.sh format -t $CLUSTER_ID -c /opt/kafka/config/kraft/server.properties + echo "[$(date +'%F %T.%3N')] INFO Kafka KRaft properties loaded via ConfigMap" + exec kafka-server-start.sh /opt/kafka/config/kraft/server.properties + fi + +else + echo "Invalid helm value: $ENVS_ENABLED; values.yaml-line:43" + sleep 1337 +fi