From 3cb2b1bfb221d1fa0ffa8569cb334dcd40f8caf5 Mon Sep 17 00:00:00 2001 From: Dave Peterson Date: Tue, 12 Jan 2016 12:24:38 -0800 Subject: [PATCH] Set client ID in produce requests to "bruce" if none was specified with --client_id option. This is a workaround for a bug in Kafka 0.9.0.0 that causes broker to crash on receipt of produce request with empty client ID. See https://issues.apache.org/jira/browse/KAFKA-3088 for details. --- src/bruce/bruce.test.cc | 2 ++ src/bruce/config.cc | 17 +++++++++++++++++ src/bruce/config.h | 2 ++ 3 files changed, 21 insertions(+) diff --git a/src/bruce/bruce.test.cc b/src/bruce/bruce.test.cc index acd0a4a..5695899 100644 --- a/src/bruce/bruce.test.cc +++ b/src/bruce/bruce.test.cc @@ -221,6 +221,8 @@ namespace { args.push_back(msg_buffer_max_str.c_str()); args.push_back("--receive_socket_name"); args.push_back(UnixSocketName); + args.push_back("--client_id"); + args.push_back("bruce"); args.push_back("--status_loopback_only"); args.push_back("--log_level"); args.push_back("LOG_INFO"); diff --git a/src/bruce/config.cc b/src/bruce/config.cc index ce5c403..3933388 100644 --- a/src/bruce/config.cc +++ b/src/bruce/config.cc @@ -321,6 +321,14 @@ static void ParseArgs(int argc, char *argv[], TConfig &config) { arg_max_failed_delivery_attempts.getValue(); config.Daemon = arg_daemon.getValue(); config.ClientId = arg_client_id.getValue(); + config.ClientIdWasEmpty = config.ClientId.empty(); + + if (config.ClientIdWasEmpty) { + /* Workaround for bug in Kafka 0.9.0.0. See + https://issues.apache.org/jira/browse/KAFKA-3088 for details. */ + config.ClientId = "bruce"; + } + config.RequiredAcks = arg_required_acks.getValue(); config.ReplicationTimeout = arg_replication_timeout.getValue(); config.ShutdownMaxDelay = arg_shutdown_max_delay.getValue(); @@ -367,6 +375,7 @@ TConfig::TConfig(int argc, char *argv[]) AllowLargeUnixDatagrams(false), MaxFailedDeliveryAttempts(5), Daemon(false), + ClientIdWasEmpty(true), RequiredAcks(-1), ReplicationTimeout(10000), ShutdownMaxDelay(30000), @@ -394,6 +403,14 @@ TConfig::TConfig(int argc, char *argv[]) } void Bruce::LogConfig(const TConfig &config) { + if (config.ClientIdWasEmpty) { + syslog(LOG_WARNING, "Using \"bruce\" for client ID since none was " + "specified with --client_id option. This is a workaround for a bug " + "in Kafka 0.9.0.0 that causes broker to crash on receipt of produce " + "request with empty client ID. See " + "https://issues.apache.org/jira/browse/KAFKA-3088 for details."); + } + syslog(LOG_NOTICE, "Version: [%s]", bruce_build_id); syslog(LOG_NOTICE, "Config file: [%s]", config.ConfigPath.c_str()); syslog(LOG_NOTICE, "UNIX domain datagram input socket [%s]", diff --git a/src/bruce/config.h b/src/bruce/config.h index ebdf962..ba05185 100644 --- a/src/bruce/config.h +++ b/src/bruce/config.h @@ -64,6 +64,8 @@ namespace Bruce { std::string ClientId; + bool ClientIdWasEmpty; + int16_t RequiredAcks; size_t ReplicationTimeout;