From 1451f0832a007366b5baf750e615185f3fa384a2 Mon Sep 17 00:00:00 2001 From: Binayak Pokharel Date: Thu, 14 Nov 2024 13:21:11 +1100 Subject: [PATCH] Instacollector Kafka 1.0.0 --- kafka/README.md | 55 +++++++ kafka/cluster_collector.sh | 159 ++++++++++++++++++ kafka/node_collector.sh | 330 +++++++++++++++++++++++++++++++++++++ 3 files changed, 544 insertions(+) create mode 100644 kafka/README.md create mode 100755 kafka/cluster_collector.sh create mode 100755 kafka/node_collector.sh diff --git a/kafka/README.md b/kafka/README.md new file mode 100644 index 0000000..187f089 --- /dev/null +++ b/kafka/README.md @@ -0,0 +1,55 @@ + +This tool is used to collect information from a Kafka cluster to add in problem diagnosis or review. + +# Design info: +There are two scripts used in instacollector tool. The `node_collector.sh` is supposed to be executed on each Kafka node. +The `cluster_collector.sh` can be executed on a machine connected to Kafka cluster e.g. user laptop or Jumpbox having connectivity +with Cassandra cluster. + +The node_collector.sh executes Linux and Kafka commands and copies configuration and log files required for cluster health check. +The cluster_collector.sh executes node_collector.sh on each Kafka node using ssh. The cluster_collector.sh accepts 4 user inputs - + +Enter your kafka environment (vm/docker) : +Enter username for login on Kafka cluster nodes (Press Enter for default admin) : +Enter Identity file path: +Enter path of the command config file: +Enter file containing ip addresses/host/container names of Kafka cluster nodes: + + +# Execution settings: +The cluster_collector.sh has setting of connecting to cluster nodes using key file or id file. +If the ssh key has passphrase enabled then please use ssh-agent and ssh-add commands to add the passphrase before running cluster_collector.sh. +If there is another method required for `ssh`, user is requested to change the script as applicable. +Alternatively, the node_collector.sh can also be executed on individual nodes if cluster_collector.sh is not useful in any case. + +The Kafka configuration file locations, data directory location and other settings are used as per Apache Kafka default setup. +User is requested to change those in node_collector.sh if other values are required. Below are the Kafka & Zookeeper related files +which are copied from different nodes. + +Kafka Broker Files +******************* +server.properties +server.log +kafkaServer.out +kafka-authorizer.log +controller.log +state-change.log +kafka_server_jaas.conf +kafka-topics/.sh +kafka-topics/.sh +kafka-broker-api-versions/.sh +kafka-consumer-groups/.sh + +Zookeeper Files +**************** +zookeeper.properties +zoo.cfg +log4j.properties +zoo.log +zookeeper_jaas.conf +zookeeper.out + + +**Note:** The scripts should be executed on bash shell. + +Please see https://www.instaclustr.com/support/documentation/announcements/instaclustr-open-source-project-status/ for Instaclustr support status of this project diff --git a/kafka/cluster_collector.sh b/kafka/cluster_collector.sh new file mode 100755 index 0000000..d862ab0 --- /dev/null +++ b/kafka/cluster_collector.sh @@ -0,0 +1,159 @@ +#!/bin/bash + +##******************************************************************************************************************** +##******************************************************************************************************************** +## The purpose of this tool is to extract kafka & zookeeper related configuration and log files for troubleshooting. +## Following are the list of files that are extracted. Please note that not all files exists in an environment. +## All properties with the word "password" in it are replaced with "***" +#=============================================================# +# kafka files and the path variables where they are expected +# BROKER_CONFIG_PATH +# server.properties +# BROKER_LOG_PATH +# server.log +# kafkaServer.out +# kafka-authorizer.log +# controller.log +# state-change.log +# BROKER_JAAS_CONFIG +# kafka_server_jaas.conf +# ZOOKEEPER_CONFIG +# zookeeper.properties +# zoo.cfg +# log4j.properties +# ZOOKEEPER_LOG_PATH +# zoo.log +# ZOOKEEPER_JAAS_CONFIG +# zookeeper_jaas.conf +# ZOOKEEPER_LOG_PATH +# zookeeper.out +# BROKER_BIN_PATH +# kafka-topics/.sh +# kafka-topics/.sh +# kafka-broker-api-versions/.sh +# kafka-consumer-groups/.sh +#=============================================================# +## +## In addition to the files above the script also extract the following OS related information - +## 1. file system & directory size +## 2. io stats +## 3. file descriptors +## 4. cpu & memory +## 5. contents of the hosts file +## 6. output of kafka-topics.sh topic describe +## +##******************************************************************************************************************** +##******************************************************************************************************************** +## Last Modification Date : 10/29/2021 +## Description : Script functionality enhanced to add information related to iostat, df, file descriptor +## cpu & memory info +##******************************************************************************************************************** +##******************************************************************************************************************** + +clear + +#GLOBAL VARIABLES +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +INFO_DIR=/tmp/InstaCollection_$(date +%Y%m%d%H%M) + +#Collect environment info (VM/docker) +read -p "Enter your kafka environment (vm/docker) :" kenv + +if [[ "${kenv}" == "vm" ]]; then + #Collect user info. + read -p "Enter username for login on Kafka cluster nodes (Press Enter for default admin) :" user + [ -z "${user}" ] && user='admin' + + #user='rahulchakrabarty' + + read -p "Enter Identity file path:" id_file + if [[ ! -f ${id_file} || ! -s ${id_file} ]]; then + echo "$id_file File not found!" + exit 1 + fi + + #id_file='/Users/rahulchakrabarty-instaclustr/.ssh/rahulchakrabarty-instaclustr' +elif [[ "${kenv}" == "docker" ]]; then + read -p "Enter docker home directory :" docker_home + + if [ -z "$docker_home" ]; then + echo "Docker home directory cannot be empty" + exit 1 + fi +else + echo "Invalid value for environment" + exit 1 +fi + +read -p "Enter path of the command config file:" config_file + +read -p "Enter file containing ip addresses/host/container names of Kafka cluster nodes:" peers_file +if [[ ! -f ${peers_file} || ! -s ${peers_file} ]]; then + echo "$peers_file File not found!" + exit 1 +fi + +#peers_file='./hosts' + +echo "environment $kenv" + +#Execute the node_collector on each node or container +if [ "$kenv" == "vm" ]; then + while read peer + do + if [[ -z "$peer" ]]; then + break + fi + ssh -i $id_file $user@$peer "bash -s" < node_collector.sh $peer $config_file & + done < "$peers_file" +else + while read peer + do + if [[ -z "$peer" ]]; then + break + fi + echo "Copying file node_collector.sh to container" + docker cp ./node_collector.sh $peer:$docker_home/ + docker exec $peer /bin/bash -c "sh $docker_home/node_collector.sh $peer $config_file" & + done < "$peers_file" +fi + +#waiting for all node_collectors to complete +wait + +mkdir $INFO_DIR + +#copy the data from each node/container + +if [ "$kenv" == "vm" ]; then + while read peer + do + if [[ -z "$peer" ]]; then + break + fi + mkdir $INFO_DIR/$peer + scp -i $id_file $user@$peer:/tmp/InstaCollection.tar.gz $INFO_DIR/$peer/InstaCollection_$peer.tar.gz & + + done < "$peers_file" +else + while read peer + do + if [[ -z "$peer" ]]; then + break + fi + mkdir $INFO_DIR/$peer + docker cp $peer:/tmp/InstaCollection.tar.gz $INFO_DIR/$peer/InstaCollection_$peer.tar.gz & + + done < "$peers_file" + +fi + +#waiting for all scp to complete +wait + +#compress the info directory +result_file=/tmp/InstaCollection_$(date +%Y%m%d%H%M).tar.gz +tar -zcf $result_file -C $INFO_DIR . +rm -r $INFO_DIR + +echo "Process complete. File generated : " $result_file diff --git a/kafka/node_collector.sh b/kafka/node_collector.sh new file mode 100755 index 0000000..9e851d8 --- /dev/null +++ b/kafka/node_collector.sh @@ -0,0 +1,330 @@ +#!/bin/bash +#!TODO: implement https://google.github.io/styleguide/shellguide.html +#==========================# +# -- Collection Options -- # +#==========================# +# -- Paths +KAFKA_HOME=/opt/kafka +BROKER_BIN_PATH=${KAFKA_HOME}/bin +BROKER_CONFIG_PATH=${KAFKA_HOME}/config +BROKER_LOG_PATH=${KAFKA_HOME}/logs +BROKER_DATA_PATHS=${KAFKA_HOME}/kafka-logs +ZOOKEEPER_CONFIG=${KAFKA_HOME}/config +ZOOKEEPER_LOG_PATH=${KAFKA_HOME}/logs + +# -- Ports +KAFKA_CLIENT_PORT=9092 +ZOOKEEPER_CLIENT_PORT=2181 + +# -- GC Logging +GC_LOGGING_ENABLED=yes +GC_LOG_PATH=${KAFKA_HOME}/logs + +# -- Arguments from CLI +unset ip +unset command_config +unset debug +while [[ $# -gt 0 ]]; do + key="$1" + case $key in + + # IP + -ip | --ip) + ip="$2" + echo "$ip : main : Using ip=$ip" + shift + shift + ;; + + # Command Config + -c | --command-config) + command_config="$2" + shift + shift + ;; + + # Debug + -d | --debug) + debug=true + shift + ;; + + # unknown option + *) + if [ -z "$ip" ]; then + ip=$(hostname -I | tail -n -1 | tr -d [:blank:]) + fi + echo "$ip : main : WARN : ignoring unknown argument '$1'" + shift + ;; + esac +done + +# -- Calculated Defaults +if [ -z "$ip" ]; then + ip=$(hostname -I | tail -n -1 | tr -d [:blank:]) +fi + +if [ -z "$debug" ]; then + debug=false +fi + +if [ -z "$ZOOKEEPER_CONFIG" ]; then + ZOOKEEPER_CONFIG="$BROKER_CONFIG_PATH" +fi + +if [ -z "$ZOOKEEPER_CONFIG" ]; then + ZOOKEEPER_CONFIG="$BROKER_CONFIG_PATH" +fi + +if [ ! -z "$command_config" ] && [ ! -f "$command_config" ]; then + echo "$ip : main : FATAL : specified command config not found: $command_config" + exit +elif [ ! -z "$command_config" ]; then + echo "$ip : main : using command-config=$command_config" +fi + +# -- Output Paths +data_dir=/tmp/DataCollection_${ip} +data_file=$data_dir/disk.info +io_stats_file=$data_dir/io_stat.info +cpu_info=$data_dir/cpu.info +mem_info=$data_dir/mem.info +missing_files=$data_dir/missing_files.info +data_dir=/tmp/DataCollection_${ip} +data_file=$data_dir/disk.info +io_stats_file=$data_dir/io_stat.info +cpu_info=$data_dir/cpu.info +mem_info=$data_dir/mem.info +missing_files=$data_dir/missing_files.info +file_descriptor=$data_dir/file_descriptor.info +hosts_info=$data_dir/hosts_file.info +output_tar=/tmp/InstaCollection.tar.gz + + +#==========================# +# -- Functions -- # +#==========================# +copy_config_files() { + echo "$ip : ${FUNCNAME[0]} : Copying files" + local config_files=( + "$BROKER_CONFIG_PATH/server.properties" + "$BROKER_LOG_PATH/server.log" + "$BROKER_LOG_PATH/kafkaServer.out" + "$BROKER_LOG_PATH/kafka-authorizer.log" + "$BROKER_LOG_PATH/controller.log" + "$BROKER_LOG_PATH/state-change.log" + "$BROKER_JAAS_CONFIG/kafka_server_jaas.conf" + "$ZOOKEEPER_CONFIG/zookeeper.properties" + "$ZOOKEEPER_CONFIG/zoo.cfg" + "$ZOOKEEPER_CONFIG/log4j.properties" + "$ZOOKEEPER_LOG_PATH/zoo.log" + "$ZOOKEEPER_JAAS_CONFIG/zookeeper_jaas.conf" + "$ZOOKEEPER_LOG_PATH/zookeeper.out" + ) + if [ "$GC_LOGGING_ENABLED" == "yes" ]; then + config_files+=("$GC_LOG_PATH/kafkaServer-gc.log" "$GC_LOG_PATH/zookeeper-gc.log") + fi + + for i in "${config_files[@]}"; do + # - Check if the file exists and copy + if [[ -f "$i" ]]; then + cp -nr $i* -t $data_dir + + # - Redact passwords + #!TODO: move redactions to a list & for loop + #!TODO: account for server.properties* for redactions + if [[ "$i" == *"server.properties"* ]]; then + redact_passwords "$data_dir/server.properties" + elif [[ "$i" == *"kafka_server_jaas.conf"* ]]; then + redact_passwords "$data_dir/kafka_server_jaas.conf" + elif [[ "$i" == *"zookeeper_jaas.conf"* ]]; then + redact_passwords "$data_dir/zookeeper_jaas.conf" + elif [[ "$i" == *"server.properties"* ]]; then + redact_passwords "$data_dir/server.properties" + fi + else + if [ "$debug" = true ]; then + echo "$ip : ${FUNCNAME[0]} : DEBUG : File $i not found" + fi + echo "$ip : ${FUNCNAME[0]} : File $i not found" >>$missing_files + fi + done + echo "$ip : ${FUNCNAME[0]} : Done copying files" +} + +redact_passwords() { + local input_file=$1 + echo "$ip : ${FUNCNAME[0]} : Redacting passwords from $input_file" + sed -i.bak -e 's: *password.*$:password ****:g' $input_file + rm $input_file.bak +} + +get_size_info() { + # - collects size of data directories + echo "$ip : ${FUNCNAME[0]} : Executing disk space commands" + local commands=("df -h" "du -h") + local paths=($(echo "$BROKER_DATA_PATHS" | tr ',' '\n')) + + if [ -d "$BROKER_DATA_PATHS" ]; then + for i in "${commands[@]}"; do + for j in "${paths[@]}"; do + + # check if the path exists + if [ -d "${paths[@]}" ]; then + echo "" >>$data_file + k=$(echo $i $j) + echo "$k" >>$data_file + eval $k >>$data_file + else + echo "$ip : ${FUNCNAME[0]} : ERROR : PATHNOTFOUND : ${paths[@]}" + fi + done + done + else + echo "$ip : ${FUNCNAME[0]} : ERROR: Directory does not exist: $BROKER_DATA_PATHS" + fi + echo "$ip : ${FUNCNAME[0]} : Done executing disk space commands" +} + +get_io_stats() { + if ! [ -x "$(command -v iostat)" ]; then + echo "$ip : ${FUNCNAME[0]} : Executable not found - iostat" + else + # - Collects iostat for 60 sec. please change according to requirement + echo "$ip : ${FUNCNAME[0]} : Executing iostat command" + eval timeout -sHUP 60s iostat -x -m -t -y -z 30 $io_stats_file + echo "$ip : ${FUNCNAME[0]} : Done executing iostat command" + fi +} + +get_file_descriptor() { + echo "$ip : ${FUNCNAME[0]}: Getting file descriptor count" + eval ulimit -n $file_descriptor + echo "$ip : ${FUNCNAME[0]}: Done getting file descriptor count" +} + +get_hosts() { + echo "$ip : ${FUNCNAME[0]} : Getting hosts info" + if [[ -f "/etc/hosts" ]]; then + echo "$ip : ${FUNCNAME[0]} : Getting contents of hosts file" + eval cat /etc/hosts $hosts_info + echo "$ip : ${FUNCNAME[0]} : Done getting contents of hosts file" + else + echo "$ip : ${FUNCNAME[0]} : ERROR: FILENOTFOUND /etc/hosts" + fi +} + +get_cpu_memory() { + echo "$ip : ${FUNCNAME[0]} : Getting CPU & Memory info" + if [[ -f "/proc/cpuinfo" ]]; then + echo "$ip : ${FUNCNAME[0]} : Executing cpuinfo command" + eval cat /proc/cpuinfo $cpu_info + echo "$ip : ${FUNCNAME[0]} : Done getting CPU info" + else + echo "$ip : ${FUNCNAME[0]} : ERROR : FILENOTFOUND /proc/cpuinfo" + fi + + if [[ -f "/proc/meminfo" ]]; then + echo "$ip : ${FUNCNAME[0]} : Executing cpuinfo command" + eval cat /proc/meminfo $mem_info + echo "$ip : ${FUNCNAME[0]} : Done getting memory info" + else + echo "$ip : ${FUNCNAME[0]} : ERROR : FILENOTFOUND /proc/meminfo" + fi +} + +get_kafka_cli_info() { + echo "$ip : ${FUNCNAME[0]} : Executing kafka CLI commands " + # - List of commands & filenames to save output + #!TODO: make .sh vs bin less messy + #!TODO: account for version skew with --zookeeper, etc + + local commands=( + "$BROKER_BIN_PATH/kafka-topics.sh --version" + "$BROKER_BIN_PATH/kafka-topics.sh --bootstrap-server $ip:$KAFKA_CLIENT_PORT --describe" + "$BROKER_BIN_PATH/kafka-broker-api-versions.sh --bootstrap-server $ip:$KAFKA_CLIENT_PORT" + "$BROKER_BIN_PATH/kafka-consumer-groups.sh --bootstrap-server $ip:$KAFKA_CLIENT_PORT --describe --all-groups --verbose" + ) + local filenames=( + "kafka-versions-sh" + "kafka-topics-describe-sh" + "kafka-api-versions-sh" + "consumer-groups-sh" + ) + + local commands_bin=( + "$BROKER_BIN_PATH/kafka-topics --version" + "$BROKER_BIN_PATH/kafka-topics --bootstrap-server $ip:$KAFKA_CLIENT_PORT --describe" + "$BROKER_BIN_PATH/kafka-broker-api-versions --bootstrap-server $ip:$KAFKA_CLIENT_PORT" + "$BROKER_BIN_PATH/kafka-consumer-groups --bootstrap-server $ip:$KAFKA_CLIENT_PORT --describe --all-groups --verbose" + ) + local filenames_bin=( + "kafka-versions-bin" + "kafka-topics-describe-bin" + "kafka-api-versions-bin" + "consumer-groups-bin" + ) + if [ -f "$BROKER_BIN_PATH/kafka-topics" ]; then + commands=$commands_bin + filenames=$filenames_bin + fi + + arrlen=${#commands[@]} + arrlen="$((arrlen - 1))" + for ((i = 0; i <= ${arrlen}; i++)); do + fname=${commands[i]} + fname=${fname%% *} + + thiscmd=${commands[i]} + if [[ -f "$command_config" ]]; then + thiscmd+="--command-config $command_config" + fi + + if [[ -f "${fname}" ]]; then + local cmd_file=$data_dir/${filenames[i]}.info + if [ "$debug" = true ]; then + echo "$ip : ${FUNCNAME[0]} : DEBUG: Will execute: [$thiscmd]" + fi + echo "" >>$cmd_file + eval $thiscmd >>$cmd_file + else + echo "$ip : ${FUNCNAME[0]} : ERROR : FILENOTFOUND ${fname}" + fi + done + echo "$ip : ${FUNCNAME[0]} : Done executing kafka CLI commands " +} + + +#==========================# +# -- Collect Data -- # +#==========================# +echo "$ip : main : Creating local directory for data collection: $data_dir" +# - rename already exsisting directory +mv $data_dir $data_dir_$(date +%Y%m%d%H%M) 2>/dev/null +mkdir $data_dir + +# - start execution +get_io_stats & +copy_config_files & +get_size_info & +get_kafka_cli_info & +get_cpu_memory & +get_file_descriptor & +get_hosts & + +echo "$ip : main : Waiting for collection to complete" +wait +echo "$ip : main : Collection complete" + +# -- Report missing files +if [ "$debug" = true ]; then + "$ip : main : ********************** Missing files *********************" + cat $data_dir/missing_files.info + echo "$ip : main : **********************************************************" +fi + +# - Compress the info directory +echo "$ip : main : Compressing results to $output_tar" +tar -zcf $output_tar -C $data_dir . +echo "$ip : main : Process Complete."