diff --git a/build.gradle b/build.gradle
index f2ba22e86b39d..7dc8f50342fdd 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2177,6 +2177,7 @@ project(':streams') {
testImplementation project(':core')
testImplementation project(':tools')
testImplementation project(':core').sourceSets.test.output
+ testImplementation project(':storage')
testImplementation project(':server-common')
testImplementation project(':server-common').sourceSets.test.output
testImplementation libs.log4j
@@ -2975,6 +2976,7 @@ project(':connect:runtime') {
testImplementation project(':metadata')
testImplementation project(':core').sourceSets.test.output
testImplementation project(':server-common')
+ testImplementation project(':storage')
testImplementation project(':connect:test-plugins')
testImplementation libs.easymock
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index 3f9a21fffc6c8..782d2fe4617e9 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -56,6 +56,7 @@
+
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index caf1fe5ebe146..a52b3d94e3240 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -410,6 +410,7 @@
+
@@ -601,6 +602,7 @@
+
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
index e3694cae2911a..c15aa27ae5964 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
@@ -55,6 +55,7 @@
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.metadata.BrokerState;
+import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -160,7 +161,7 @@ private void doStart() {
putIfAbsent(brokerConfig, KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) brokers.length);
putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), false);
// reduce the size of the log cleaner map to reduce test memory usage
- putIfAbsent(brokerConfig, KafkaConfig.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L);
+ putIfAbsent(brokerConfig, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 1024 * 1024L);
Object listenerConfig = brokerConfig.get(KafkaConfig.InterBrokerListenerNameProp());
if (listenerConfig == null)
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 8098ea237e06d..b653f40b287a8 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -499,13 +499,13 @@ class LogCleaner(initialConfig: CleanerConfig,
object LogCleaner {
val ReconfigurableConfigs: Set[String] = Set(
- KafkaConfig.LogCleanerThreadsProp,
- KafkaConfig.LogCleanerDedupeBufferSizeProp,
- KafkaConfig.LogCleanerDedupeBufferLoadFactorProp,
- KafkaConfig.LogCleanerIoBufferSizeProp,
+ CleanerConfig.LOG_CLEANER_THREADS_PROP,
+ CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP,
+ CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP,
+ CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP,
KafkaConfig.MessageMaxBytesProp,
- KafkaConfig.LogCleanerIoMaxBytesPerSecondProp,
- KafkaConfig.LogCleanerBackoffMsProp
+ CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP,
+ CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP
)
def cleanerConfig(config: KafkaConfig): CleanerConfig = {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index c6f51a000e2a5..eaddb047da8d5 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -49,7 +49,7 @@ import org.apache.kafka.server.config.{Defaults, ServerTopicConfigSynonyms}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.Csv
-import org.apache.kafka.storage.internals.log.{LogConfig, ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
import org.apache.zookeeper.client.ZKClientConfig
@@ -211,17 +211,6 @@ object KafkaConfig {
val LogRetentionBytesProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.RETENTION_BYTES_CONFIG)
val LogCleanupIntervalMsProp = LogConfigPrefix + "retention.check.interval.ms"
val LogCleanupPolicyProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.CLEANUP_POLICY_CONFIG)
- val LogCleanerThreadsProp = LogConfigPrefix + "cleaner.threads"
- val LogCleanerIoMaxBytesPerSecondProp = LogConfigPrefix + "cleaner.io.max.bytes.per.second"
- val LogCleanerDedupeBufferSizeProp = LogConfigPrefix + "cleaner.dedupe.buffer.size"
- val LogCleanerIoBufferSizeProp = LogConfigPrefix + "cleaner.io.buffer.size"
- val LogCleanerDedupeBufferLoadFactorProp = LogConfigPrefix + "cleaner.io.buffer.load.factor"
- val LogCleanerBackoffMsProp = LogConfigPrefix + "cleaner.backoff.ms"
- val LogCleanerMinCleanRatioProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG)
- val LogCleanerEnableProp = LogConfigPrefix + "cleaner.enable"
- val LogCleanerDeleteRetentionMsProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.DELETE_RETENTION_MS_CONFIG)
- val LogCleanerMinCompactionLagMsProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG)
- val LogCleanerMaxCompactionLagMsProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG)
val LogIndexSizeMaxBytesProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG)
val LogIndexIntervalBytesProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG)
val LogFlushIntervalMessagesProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG)
@@ -652,25 +641,6 @@ object KafkaConfig {
val LogRetentionBytesDoc = "The maximum size of the log before deleting it"
val LogCleanupIntervalMsDoc = "The frequency in milliseconds that the log cleaner checks whether any log is eligible for deletion"
val LogCleanupPolicyDoc = "The default cleanup policy for segments beyond the retention window. A comma separated list of valid policies. Valid policies are: \"delete\" and \"compact\""
- val LogCleanerThreadsDoc = "The number of background threads to use for log cleaning"
- val LogCleanerIoMaxBytesPerSecondDoc = "The log cleaner will be throttled so that the sum of its read and write i/o will be less than this value on average"
- val LogCleanerDedupeBufferSizeDoc = "The total memory used for log deduplication across all cleaner threads"
- val LogCleanerIoBufferSizeDoc = "The total memory used for log cleaner I/O buffers across all cleaner threads"
- val LogCleanerDedupeBufferLoadFactorDoc = "Log cleaner dedupe buffer load factor. The percentage full the dedupe buffer can become. A higher value " +
- "will allow more log to be cleaned at once but will lead to more hash collisions"
- val LogCleanerBackoffMsDoc = "The amount of time to sleep when there are no logs to clean"
- val LogCleanerMinCleanRatioDoc = "The minimum ratio of dirty log to total log for a log to eligible for cleaning. " +
- "If the " + LogCleanerMaxCompactionLagMsProp + " or the " + LogCleanerMinCompactionLagMsProp +
- " configurations are also specified, then the log compactor considers the log eligible for compaction " +
- "as soon as either: (i) the dirty ratio threshold has been met and the log has had dirty (uncompacted) " +
- "records for at least the " + LogCleanerMinCompactionLagMsProp + " duration, or (ii) if the log has had " +
- "dirty (uncompacted) records for at most the " + LogCleanerMaxCompactionLagMsProp + " period."
- val LogCleanerEnableDoc = "Enable the log cleaner process to run on the server. Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic. If disabled those topics will not be compacted and continually grow in size."
- val LogCleanerDeleteRetentionMsDoc = "The amount of time to retain tombstone message markers for log compacted topics. This setting also gives a bound " +
- "on the time in which a consumer must complete a read if they begin from offset 0 to ensure that they get a valid snapshot of the final stage (otherwise " +
- "tombstones messages may be collected before a consumer completes their scan)."
- val LogCleanerMinCompactionLagMsDoc = "The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted."
- val LogCleanerMaxCompactionLagMsDoc = "The maximum time a message will remain ineligible for compaction in the log. Only applicable for logs that are being compacted."
val LogIndexSizeMaxBytesDoc = "The maximum size in bytes of the offset index"
val LogIndexIntervalBytesDoc = "The interval with which we add an entry to the offset index."
val LogFlushIntervalMessagesDoc = "The number of messages accumulated on a log partition before messages are flushed to disk."
@@ -1075,17 +1045,17 @@ object KafkaConfig {
.define(LogRetentionBytesProp, LONG, LogConfig.DEFAULT_RETENTION_BYTES, HIGH, LogRetentionBytesDoc)
.define(LogCleanupIntervalMsProp, LONG, Defaults.LOG_CLEANUP_INTERVAL_MS, atLeast(1), MEDIUM, LogCleanupIntervalMsDoc)
.define(LogCleanupPolicyProp, LIST, LogConfig.DEFAULT_CLEANUP_POLICY, ValidList.in(TopicConfig.CLEANUP_POLICY_COMPACT, TopicConfig.CLEANUP_POLICY_DELETE), MEDIUM, LogCleanupPolicyDoc)
- .define(LogCleanerThreadsProp, INT, Defaults.LOG_CLEANER_THREADS, atLeast(0), MEDIUM, LogCleanerThreadsDoc)
- .define(LogCleanerIoMaxBytesPerSecondProp, DOUBLE, Defaults.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND, MEDIUM, LogCleanerIoMaxBytesPerSecondDoc)
- .define(LogCleanerDedupeBufferSizeProp, LONG, Defaults.LOG_CLEANER_DEDUPE_BUFFER_SIZE, MEDIUM, LogCleanerDedupeBufferSizeDoc)
- .define(LogCleanerIoBufferSizeProp, INT, Defaults.LOG_CLEANER_IO_BUFFER_SIZE, atLeast(0), MEDIUM, LogCleanerIoBufferSizeDoc)
- .define(LogCleanerDedupeBufferLoadFactorProp, DOUBLE, Defaults.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR, MEDIUM, LogCleanerDedupeBufferLoadFactorDoc)
- .define(LogCleanerBackoffMsProp, LONG, Defaults.LOG_CLEANER_BACKOFF_MS, atLeast(0), MEDIUM, LogCleanerBackoffMsDoc)
- .define(LogCleanerMinCleanRatioProp, DOUBLE, LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO, between(0, 1), MEDIUM, LogCleanerMinCleanRatioDoc)
- .define(LogCleanerEnableProp, BOOLEAN, Defaults.LOG_CLEANER_ENABLE, MEDIUM, LogCleanerEnableDoc)
- .define(LogCleanerDeleteRetentionMsProp, LONG, LogConfig.DEFAULT_DELETE_RETENTION_MS, atLeast(0), MEDIUM, LogCleanerDeleteRetentionMsDoc)
- .define(LogCleanerMinCompactionLagMsProp, LONG, LogConfig.DEFAULT_MIN_COMPACTION_LAG_MS, atLeast(0), MEDIUM, LogCleanerMinCompactionLagMsDoc)
- .define(LogCleanerMaxCompactionLagMsProp, LONG, LogConfig.DEFAULT_MAX_COMPACTION_LAG_MS, atLeast(1), MEDIUM, LogCleanerMaxCompactionLagMsDoc)
+ .define(CleanerConfig.LOG_CLEANER_THREADS_PROP, INT, CleanerConfig.LOG_CLEANER_THREADS, atLeast(0), MEDIUM, CleanerConfig.LOG_CLEANER_THREADS_DOC)
+ .define(CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP, DOUBLE, CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND, MEDIUM, CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_DOC)
+ .define(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, LONG, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE, MEDIUM, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_DOC)
+ .define(CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP, INT, CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE, atLeast(0), MEDIUM, CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_DOC)
+ .define(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP, DOUBLE, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR, MEDIUM, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_DOC)
+ .define(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP, LONG, CleanerConfig.LOG_CLEANER_BACKOFF_MS, atLeast(0), MEDIUM, CleanerConfig.LOG_CLEANER_BACKOFF_MS_DOC)
+ .define(CleanerConfig.LOG_CLEANER_MIN_CLEAN_RATIO_PROP, DOUBLE, LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO, between(0, 1), MEDIUM, CleanerConfig.LOG_CLEANER_MIN_CLEAN_RATIO_DOC)
+ .define(CleanerConfig.LOG_CLEANER_ENABLE_PROP, BOOLEAN, CleanerConfig.LOG_CLEANER_ENABLE, MEDIUM, CleanerConfig.LOG_CLEANER_ENABLE_DOC)
+ .define(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP, LONG, LogConfig.DEFAULT_DELETE_RETENTION_MS, atLeast(0), MEDIUM, CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_DOC)
+ .define(CleanerConfig.LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP, LONG, LogConfig.DEFAULT_MIN_COMPACTION_LAG_MS, atLeast(0), MEDIUM, CleanerConfig.LOG_CLEANER_MIN_COMPACTION_LAG_MS_DOC)
+ .define(CleanerConfig.LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP, LONG, LogConfig.DEFAULT_MAX_COMPACTION_LAG_MS, atLeast(1), MEDIUM, CleanerConfig.LOG_CLEANER_MAX_COMPACTION_LAG_MS_DOC)
.define(LogIndexSizeMaxBytesProp, INT, LogConfig.DEFAULT_SEGMENT_INDEX_BYTES, atLeast(4), MEDIUM, LogIndexSizeMaxBytesDoc)
.define(LogIndexIntervalBytesProp, INT, LogConfig.DEFAULT_INDEX_INTERVAL_BYTES, atLeast(0), MEDIUM, LogIndexIntervalBytesDoc)
.define(LogFlushIntervalMessagesProp, LONG, LogConfig.DEFAULT_FLUSH_MESSAGES_INTERVAL, atLeast(1), HIGH, LogFlushIntervalMessagesDoc)
@@ -1652,7 +1622,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val logDirs = CoreUtils.parseCsvList(Option(getString(KafkaConfig.LogDirsProp)).getOrElse(getString(KafkaConfig.LogDirProp)))
def logSegmentBytes = getInt(KafkaConfig.LogSegmentBytesProp)
def logFlushIntervalMessages = getLong(KafkaConfig.LogFlushIntervalMessagesProp)
- val logCleanerThreads = getInt(KafkaConfig.LogCleanerThreadsProp)
+ val logCleanerThreads = getInt(CleanerConfig.LOG_CLEANER_THREADS_PROP)
def numRecoveryThreadsPerDataDir = getInt(KafkaConfig.NumRecoveryThreadsPerDataDirProp)
val logFlushSchedulerIntervalMs = getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp)
val logFlushOffsetCheckpointIntervalMs = getInt(KafkaConfig.LogFlushOffsetCheckpointIntervalMsProp).toLong
@@ -1662,16 +1632,16 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val offsetsRetentionMinutes = getInt(KafkaConfig.OffsetsRetentionMinutesProp)
val offsetsRetentionCheckIntervalMs = getLong(KafkaConfig.OffsetsRetentionCheckIntervalMsProp)
def logRetentionBytes = getLong(KafkaConfig.LogRetentionBytesProp)
- val logCleanerDedupeBufferSize = getLong(KafkaConfig.LogCleanerDedupeBufferSizeProp)
- val logCleanerDedupeBufferLoadFactor = getDouble(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp)
- val logCleanerIoBufferSize = getInt(KafkaConfig.LogCleanerIoBufferSizeProp)
- val logCleanerIoMaxBytesPerSecond = getDouble(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp)
- def logCleanerDeleteRetentionMs = getLong(KafkaConfig.LogCleanerDeleteRetentionMsProp)
- def logCleanerMinCompactionLagMs = getLong(KafkaConfig.LogCleanerMinCompactionLagMsProp)
- def logCleanerMaxCompactionLagMs = getLong(KafkaConfig.LogCleanerMaxCompactionLagMsProp)
- val logCleanerBackoffMs = getLong(KafkaConfig.LogCleanerBackoffMsProp)
- def logCleanerMinCleanRatio = getDouble(KafkaConfig.LogCleanerMinCleanRatioProp)
- val logCleanerEnable = getBoolean(KafkaConfig.LogCleanerEnableProp)
+ val logCleanerDedupeBufferSize = getLong(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP)
+ val logCleanerDedupeBufferLoadFactor = getDouble(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP)
+ val logCleanerIoBufferSize = getInt(CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP)
+ val logCleanerIoMaxBytesPerSecond = getDouble(CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP)
+ def logCleanerDeleteRetentionMs = getLong(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP)
+ def logCleanerMinCompactionLagMs = getLong(CleanerConfig.LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP)
+ def logCleanerMaxCompactionLagMs = getLong(CleanerConfig.LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP)
+ val logCleanerBackoffMs = getLong(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP)
+ def logCleanerMinCleanRatio = getDouble(CleanerConfig.LOG_CLEANER_MIN_CLEAN_RATIO_PROP)
+ val logCleanerEnable = getBoolean(CleanerConfig.LOG_CLEANER_ENABLE_PROP)
def logIndexSizeMaxBytes = getInt(KafkaConfig.LogIndexSizeMaxBytesProp)
def logIndexIntervalBytes = getInt(KafkaConfig.LogIndexIntervalBytesProp)
def logDeleteDelayMs = getLong(KafkaConfig.LogDeleteDelayMsProp)
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index fa11fa9b186f1..4c6d1f9ef4adc 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -41,6 +41,7 @@
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.MockFaultHandler;
+import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -192,7 +193,7 @@ private KafkaConfig createNodeConfig(TestKitNode node) {
props.put(RaftConfig.QUORUM_VOTERS_CONFIG, uninitializedQuorumVotersString);
// reduce log cleaner offset map memory usage
- props.put(KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), "2097152");
+ props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152");
// Add associated broker node property overrides
if (brokerNode != null) {
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 54be3337625d4..e084454f5ff7b 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -45,7 +45,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{ConsumerGroupState, ElectionType, TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica, Uuid}
import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
import org.apache.kafka.server.config.Defaults
-import org.apache.kafka.storage.internals.log.LogConfig
+import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
@@ -447,7 +447,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
configs.get(brokerResource2).entries.size)
assertEquals(brokers(2).config.brokerId.toString, configs.get(brokerResource2).get(KafkaConfig.BrokerIdProp).value)
assertEquals(brokers(2).config.logCleanerThreads.toString,
- configs.get(brokerResource2).get(KafkaConfig.LogCleanerThreadsProp).value)
+ configs.get(brokerResource2).get(CleanerConfig.LOG_CLEANER_THREADS_PROP).value)
checkValidAlterConfigs(client, this, topicResource1, topicResource2)
}
@@ -2532,7 +2532,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
.all().get(15, TimeUnit.SECONDS)
val newLogCleanerDeleteRetention = new Properties
- newLogCleanerDeleteRetention.put(KafkaConfig.LogCleanerDeleteRetentionMsProp, "34")
+ newLogCleanerDeleteRetention.put(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP, "34")
TestUtils.incrementalAlterConfigs(brokers, client, newLogCleanerDeleteRetention, perBrokerConfig = true)
.all().get(15, TimeUnit.SECONDS)
@@ -2543,14 +2543,14 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
controllerServer.config.nodeId.toString)
controllerServer.controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT,
Collections.singletonMap(controllerNodeResource,
- Collections.singletonMap(KafkaConfig.LogCleanerDeleteRetentionMsProp,
+ Collections.singletonMap(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP,
new SimpleImmutableEntry(AlterConfigOp.OpType.SET, "34"))), false).get()
ensureConsistentKRaftMetadata()
}
waitUntilTrue(() => brokers.forall(_.config.originals.getOrDefault(
- KafkaConfig.LogCleanerDeleteRetentionMsProp, "").toString.equals("34")),
- s"Timed out waiting for change to ${KafkaConfig.LogCleanerDeleteRetentionMsProp}",
+ CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP, "").toString.equals("34")),
+ s"Timed out waiting for change to ${CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP}",
waitTimeMs = 60000L)
waitUntilTrue(() => brokers.forall(_.config.originals.getOrDefault(
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 4be668aa32113..c984eae0272c2 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -254,7 +254,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
expectedProps.setProperty(KafkaConfig.LogRetentionTimeMillisProp, "1680000000")
expectedProps.setProperty(KafkaConfig.LogRetentionTimeHoursProp, "168")
expectedProps.setProperty(KafkaConfig.LogRollTimeHoursProp, "168")
- expectedProps.setProperty(KafkaConfig.LogCleanerThreadsProp, "1")
+ expectedProps.setProperty(CleanerConfig.LOG_CLEANER_THREADS_PROP, "1")
val logRetentionMs = configEntry(configDesc, KafkaConfig.LogRetentionTimeMillisProp)
verifyConfig(KafkaConfig.LogRetentionTimeMillisProp, logRetentionMs,
isSensitive = false, isReadOnly = false, expectedProps)
@@ -264,8 +264,8 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
val logRollHours = configEntry(configDesc, KafkaConfig.LogRollTimeHoursProp)
verifyConfig(KafkaConfig.LogRollTimeHoursProp, logRollHours,
isSensitive = false, isReadOnly = true, expectedProps)
- val logCleanerThreads = configEntry(configDesc, KafkaConfig.LogCleanerThreadsProp)
- verifyConfig(KafkaConfig.LogCleanerThreadsProp, logCleanerThreads,
+ val logCleanerThreads = configEntry(configDesc, CleanerConfig.LOG_CLEANER_THREADS_PROP)
+ verifyConfig(CleanerConfig.LOG_CLEANER_THREADS_PROP, logCleanerThreads,
isSensitive = false, isReadOnly = false, expectedProps)
def synonymsList(configEntry: ConfigEntry): List[(String, ConfigSource)] =
@@ -278,7 +278,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
(KafkaConfig.LogRetentionTimeHoursProp, ConfigSource.DEFAULT_CONFIG)),
synonymsList(logRetentionHours))
assertEquals(List((KafkaConfig.LogRollTimeHoursProp, ConfigSource.DEFAULT_CONFIG)), synonymsList(logRollHours))
- assertEquals(List((KafkaConfig.LogCleanerThreadsProp, ConfigSource.DEFAULT_CONFIG)), synonymsList(logCleanerThreads))
+ assertEquals(List((CleanerConfig.LOG_CLEANER_THREADS_PROP, ConfigSource.DEFAULT_CONFIG)), synonymsList(logCleanerThreads))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@@ -536,19 +536,19 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 1)
val props = new Properties
- props.put(KafkaConfig.LogCleanerThreadsProp, "2")
- props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "20000000")
- props.put(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp, "0.8")
- props.put(KafkaConfig.LogCleanerIoBufferSizeProp, "300000")
+ props.put(CleanerConfig.LOG_CLEANER_THREADS_PROP, "2")
+ props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "20000000")
+ props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP, "0.8")
+ props.put(CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP, "300000")
props.put(KafkaConfig.MessageMaxBytesProp, "40000")
- props.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, "50000000")
- props.put(KafkaConfig.LogCleanerBackoffMsProp, "6000")
+ props.put(CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP, "50000000")
+ props.put(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP, "6000")
// Verify cleaner config was updated. Wait for one of the configs to be updated and verify
// that all other others were updated at the same time since they are reconfigured together
var newCleanerConfig: CleanerConfig = null
TestUtils.waitUntilTrue(() => {
- reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogCleanerThreadsProp, "2"))
+ reconfigureServers(props, perBrokerConfig = false, (CleanerConfig.LOG_CLEANER_THREADS_PROP, "2"))
newCleanerConfig = servers.head.logManager.cleaner.currentConfig
newCleanerConfig.numThreads == 2
}, "Log cleaner not reconfigured", 60000)
@@ -566,8 +566,8 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
def cleanerThreads = Thread.getAllStackTraces.keySet.asScala.filter(_.getName.startsWith("kafka-log-cleaner-thread-"))
cleanerThreads.take(2).foreach(_.interrupt())
TestUtils.waitUntilTrue(() => cleanerThreads.size == (2 * numServers) - 2, "Threads did not exit")
- props.put(KafkaConfig.LogCleanerBackoffMsProp, "8000")
- reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogCleanerBackoffMsProp, "8000"))
+ props.put(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP, "8000")
+ reconfigureServers(props, perBrokerConfig = false, (CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP, "8000"))
verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 2)
// Verify that produce/consume worked throughout this test without any retries in producer
@@ -635,10 +635,10 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
props.put(KafkaConfig.LogRetentionTimeMillisProp, TimeUnit.DAYS.toMillis(1).toString)
props.put(KafkaConfig.MessageMaxBytesProp, "100000")
props.put(KafkaConfig.LogIndexIntervalBytesProp, "10000")
- props.put(KafkaConfig.LogCleanerDeleteRetentionMsProp, TimeUnit.DAYS.toMillis(1).toString)
- props.put(KafkaConfig.LogCleanerMinCompactionLagMsProp, "60000")
+ props.put(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP, TimeUnit.DAYS.toMillis(1).toString)
+ props.put(CleanerConfig.LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP, "60000")
props.put(KafkaConfig.LogDeleteDelayMsProp, "60000")
- props.put(KafkaConfig.LogCleanerMinCleanRatioProp, "0.3")
+ props.put(CleanerConfig.LOG_CLEANER_MIN_CLEAN_RATIO_PROP, "0.3")
props.put(KafkaConfig.LogCleanupPolicyProp, "delete")
props.put(KafkaConfig.UncleanLeaderElectionEnableProp, "false")
props.put(KafkaConfig.MinInSyncReplicasProp, "2")
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
index 8729045db7e79..49e518ac2af89 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
@@ -251,13 +251,13 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati
def kafkaConfigWithCleanerConfig(cleanerConfig: CleanerConfig): KafkaConfig = {
val props = TestUtils.createBrokerConfig(0, "localhost:2181")
- props.put(KafkaConfig.LogCleanerThreadsProp, cleanerConfig.numThreads.toString)
- props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, cleanerConfig.dedupeBufferSize.toString)
- props.put(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp, cleanerConfig.dedupeBufferLoadFactor.toString)
- props.put(KafkaConfig.LogCleanerIoBufferSizeProp, cleanerConfig.ioBufferSize.toString)
+ props.put(CleanerConfig.LOG_CLEANER_THREADS_PROP, cleanerConfig.numThreads.toString)
+ props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, cleanerConfig.dedupeBufferSize.toString)
+ props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP, cleanerConfig.dedupeBufferLoadFactor.toString)
+ props.put(CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP, cleanerConfig.ioBufferSize.toString)
props.put(KafkaConfig.MessageMaxBytesProp, cleanerConfig.maxMessageSize.toString)
- props.put(KafkaConfig.LogCleanerBackoffMsProp, cleanerConfig.backoffMs.toString)
- props.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, cleanerConfig.maxIoBytesPerSecond.toString)
+ props.put(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP, cleanerConfig.backoffMs.toString)
+ props.put(CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP, cleanerConfig.maxIoBytesPerSecond.toString)
KafkaConfig.fromProps(props)
}
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 161493457e1f3..b3a7259219620 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1959,7 +1959,7 @@ class LogCleanerTest extends Logging {
@Test
def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
- oldKafkaProps.setProperty(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, "10000000")
+ oldKafkaProps.setProperty(CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP, "10000000")
val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new KafkaConfig(oldKafkaProps)),
logDirs = Array(TestUtils.tempDir()),
@@ -1973,14 +1973,14 @@ class LogCleanerTest extends Logging {
}
try {
- assertEquals(10000000, logCleaner.throttler.desiredRatePerSec, s"Throttler.desiredRatePerSec should be initialized from initial `${KafkaConfig.LogCleanerIoMaxBytesPerSecondProp}` config.")
+ assertEquals(10000000, logCleaner.throttler.desiredRatePerSec, s"Throttler.desiredRatePerSec should be initialized from initial `${CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP}` config.")
val newKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
- newKafkaProps.setProperty(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, "20000000")
+ newKafkaProps.setProperty(CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP, "20000000")
logCleaner.reconfigure(new KafkaConfig(oldKafkaProps), new KafkaConfig(newKafkaProps))
- assertEquals(20000000, logCleaner.throttler.desiredRatePerSec, s"Throttler.desiredRatePerSec should be updated with new `${KafkaConfig.LogCleanerIoMaxBytesPerSecondProp}` config.")
+ assertEquals(20000000, logCleaner.throttler.desiredRatePerSec, s"Throttler.desiredRatePerSec should be updated with new `${CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP}` config.")
} finally {
logCleaner.shutdown()
}
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 00777e9677bb2..135de6e51441f 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -55,6 +55,7 @@ import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult, Authorizer}
import org.apache.kafka.server.common.{ApiMessageAndVersion, Features, MetadataVersion, ProducerIdsBlock}
import org.apache.kafka.server.util.FutureUtils
+import org.apache.kafka.storage.internals.log.CleanerConfig
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.params.ParameterizedTest
@@ -318,19 +319,19 @@ class ControllerApisTest {
setResourceName("1").
setResourceType(ConfigResource.Type.BROKER.id()).
setConfigs(new OldAlterableConfigCollection(util.Arrays.asList(new OldAlterableConfig().
- setName(KafkaConfig.LogCleanerBackoffMsProp).
+ setName(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP).
setValue("100000")).iterator())),
new OldAlterConfigsResource().
setResourceName("2").
setResourceType(ConfigResource.Type.BROKER.id()).
setConfigs(new OldAlterableConfigCollection(util.Arrays.asList(new OldAlterableConfig().
- setName(KafkaConfig.LogCleanerBackoffMsProp).
+ setName(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP).
setValue("100000")).iterator())),
new OldAlterConfigsResource().
setResourceName("2").
setResourceType(ConfigResource.Type.BROKER.id()).
setConfigs(new OldAlterableConfigCollection(util.Arrays.asList(new OldAlterableConfig().
- setName(KafkaConfig.LogCleanerBackoffMsProp).
+ setName(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP).
setValue("100000")).iterator())),
new OldAlterConfigsResource().
setResourceName("baz").
@@ -472,7 +473,7 @@ class ControllerApisTest {
setResourceName("1").
setResourceType(ConfigResource.Type.BROKER.id()).
setConfigs(new AlterableConfigCollection(util.Arrays.asList(new AlterableConfig().
- setName(KafkaConfig.LogCleanerBackoffMsProp).
+ setName(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP).
setValue("100000").
setConfigOperation(AlterConfigOp.OpType.SET.id())).iterator())),
new AlterConfigsResource().
@@ -536,14 +537,14 @@ class ControllerApisTest {
setResourceName("3").
setResourceType(ConfigResource.Type.BROKER.id()).
setConfigs(new AlterableConfigCollection(util.Arrays.asList(new AlterableConfig().
- setName(KafkaConfig.LogCleanerBackoffMsProp).
+ setName(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP).
setValue("100000").
setConfigOperation(AlterConfigOp.OpType.SET.id())).iterator())),
new AlterConfigsResource().
setResourceName("3").
setResourceType(ConfigResource.Type.BROKER.id()).
setConfigs(new AlterableConfigCollection(util.Arrays.asList(new AlterableConfig().
- setName(KafkaConfig.LogCleanerBackoffMsProp).
+ setName(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP).
setValue("100000").
setConfigOperation(AlterConfigOp.OpType.SET.id())).iterator())),
new AlterConfigsResource().
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index be065f5b8caef..4e1c8eed2769d 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -38,7 +38,7 @@ import org.apache.kafka.server.config.Defaults
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.KafkaScheduler
-import org.apache.kafka.storage.internals.log.{LogConfig, ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, ProducerStateManagerConfig}
import org.apache.kafka.test.MockMetricsReporter
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@@ -215,7 +215,7 @@ class DynamicBrokerConfigTest {
verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, nonDynamicProps)
// Test update of configs with invalid type
- val invalidProps = Map(KafkaConfig.LogCleanerThreadsProp -> "invalid")
+ val invalidProps = Map(CleanerConfig.LOG_CLEANER_THREADS_PROP -> "invalid")
verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, invalidProps)
val excludedTopicConfig = Map(KafkaConfig.LogMessageFormatVersionProp -> "0.10.2")
@@ -225,21 +225,21 @@ class DynamicBrokerConfigTest {
@Test
def testConfigUpdateWithReconfigurableValidationFailure(): Unit = {
val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
- origProps.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "100000000")
+ origProps.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "100000000")
val config = KafkaConfig(origProps)
config.dynamicConfig.initialize(None, None)
val validProps = Map.empty[String, String]
- val invalidProps = Map(KafkaConfig.LogCleanerThreadsProp -> "20")
+ val invalidProps = Map(CleanerConfig.LOG_CLEANER_THREADS_PROP -> "20")
def validateLogCleanerConfig(configs: util.Map[String, _]): Unit = {
- val cleanerThreads = configs.get(KafkaConfig.LogCleanerThreadsProp).toString.toInt
+ val cleanerThreads = configs.get(CleanerConfig.LOG_CLEANER_THREADS_PROP).toString.toInt
if (cleanerThreads <=0 || cleanerThreads >= 5)
throw new ConfigException(s"Invalid cleaner threads $cleanerThreads")
}
val reconfigurable = new Reconfigurable {
override def configure(configs: util.Map[String, _]): Unit = {}
- override def reconfigurableConfigs(): util.Set[String] = Set(KafkaConfig.LogCleanerThreadsProp).asJava
+ override def reconfigurableConfigs(): util.Set[String] = Set(CleanerConfig.LOG_CLEANER_THREADS_PROP).asJava
override def validateReconfiguration(configs: util.Map[String, _]): Unit = validateLogCleanerConfig(configs)
override def reconfigure(configs: util.Map[String, _]): Unit = {}
}
@@ -248,7 +248,7 @@ class DynamicBrokerConfigTest {
config.dynamicConfig.removeReconfigurable(reconfigurable)
val brokerReconfigurable = new BrokerReconfigurable {
- override def reconfigurableConfigs: collection.Set[String] = Set(KafkaConfig.LogCleanerThreadsProp)
+ override def reconfigurableConfigs: collection.Set[String] = Set(CleanerConfig.LOG_CLEANER_THREADS_PROP)
override def validateReconfiguration(newConfig: KafkaConfig): Unit = validateLogCleanerConfig(newConfig.originals)
override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {}
}
@@ -260,8 +260,8 @@ class DynamicBrokerConfigTest {
def testReconfigurableValidation(): Unit = {
val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
val config = KafkaConfig(origProps)
- val invalidReconfigurableProps = Set(KafkaConfig.LogCleanerThreadsProp, KafkaConfig.BrokerIdProp, "some.prop")
- val validReconfigurableProps = Set(KafkaConfig.LogCleanerThreadsProp, KafkaConfig.LogCleanerDedupeBufferSizeProp, "some.prop")
+ val invalidReconfigurableProps = Set(CleanerConfig.LOG_CLEANER_THREADS_PROP, KafkaConfig.BrokerIdProp, "some.prop")
+ val validReconfigurableProps = Set(CleanerConfig.LOG_CLEANER_THREADS_PROP, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "some.prop")
def createReconfigurable(configs: Set[String]) = new Reconfigurable {
override def configure(configs: util.Map[String, _]): Unit = {}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index e9426245b143f..a5d4d961fe123 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_8_2, IBP_3_0_IV1}
import org.apache.kafka.server.config.ServerTopicConfigSynonyms
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
-import org.apache.kafka.storage.internals.log.LogConfig
+import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
import org.junit.jupiter.api.function.Executable
import scala.annotation.nowarn
@@ -845,14 +845,14 @@ class KafkaConfigTest {
case KafkaConfig.LogRetentionBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.LogCleanupIntervalMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case KafkaConfig.LogCleanupPolicyProp => assertPropertyInvalid(baseProperties, name, "unknown_policy", "0")
- case KafkaConfig.LogCleanerIoMaxBytesPerSecondProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
- case KafkaConfig.LogCleanerDedupeBufferSizeProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "1024")
- case KafkaConfig.LogCleanerDedupeBufferLoadFactorProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
- case KafkaConfig.LogCleanerEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
- case KafkaConfig.LogCleanerDeleteRetentionMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
- case KafkaConfig.LogCleanerMinCompactionLagMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
- case KafkaConfig.LogCleanerMaxCompactionLagMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
- case KafkaConfig.LogCleanerMinCleanRatioProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
+ case CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number")
+ case CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", "1024")
+ case CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number")
+ case CleanerConfig.LOG_CLEANER_ENABLE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
+ case CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number")
+ case CleanerConfig.LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number")
+ case CleanerConfig.LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number")
+ case CleanerConfig.LOG_CLEANER_MIN_CLEAN_RATIO_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.LogIndexSizeMaxBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "3")
case KafkaConfig.LogFlushIntervalMessagesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case KafkaConfig.LogFlushSchedulerIntervalMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 7b1ce80cbed3b..17ee3139a51ff 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -373,7 +373,7 @@ object TestUtils extends Logging {
props.put(KafkaConfig.DeleteTopicEnableProp, enableDeleteTopic.toString)
props.put(KafkaConfig.LogDeleteDelayMsProp, "1000")
props.put(KafkaConfig.ControlledShutdownRetryBackoffMsProp, "100")
- props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "2097152")
+ props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152")
props.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
if (!props.containsKey(KafkaConfig.OffsetsTopicPartitionsProp))
props.put(KafkaConfig.OffsetsTopicPartitionsProp, "5")
diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java
index 320de2db6b9c1..02cdc1cc875f8 100644
--- a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java
+++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java
@@ -30,7 +30,7 @@
public final class ServerTopicConfigSynonyms {
private static final String LOG_PREFIX = "log.";
- private static final String LOG_CLEANER_PREFIX = LOG_PREFIX + "cleaner.";
+ public static final String LOG_CLEANER_PREFIX = LOG_PREFIX + "cleaner.";
/**
* Maps topic configurations to their equivalent broker configurations.
diff --git a/server/src/main/java/org/apache/kafka/server/config/Defaults.java b/server/src/main/java/org/apache/kafka/server/config/Defaults.java
index e55f641790f6b..0c7ca123b87c1 100644
--- a/server/src/main/java/org/apache/kafka/server/config/Defaults.java
+++ b/server/src/main/java/org/apache/kafka/server/config/Defaults.java
@@ -106,13 +106,6 @@ public class Defaults {
public static final int NUM_PARTITIONS = 1;
public static final String LOG_DIR = "/tmp/kafka-logs";
public static final long LOG_CLEANUP_INTERVAL_MS = 5 * 60 * 1000L;
- public static final int LOG_CLEANER_THREADS = 1;
- public static final double LOG_CLEANER_IO_MAX_BYTES_PER_SECOND = Double.MAX_VALUE;
- public static final long LOG_CLEANER_DEDUPE_BUFFER_SIZE = 128 * 1024 * 1024L;
- public static final int LOG_CLEANER_IO_BUFFER_SIZE = 512 * 1024;
- public static final double LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR = 0.9d;
- public static final int LOG_CLEANER_BACKOFF_MS = 15 * 1000;
- public static final boolean LOG_CLEANER_ENABLE = true;
public static final int LOG_FLUSH_OFFSET_CHECKPOINT_INTERVAL_MS = 60000;
public static final int LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL_MS = 60000;
public static final int NUM_RECOVERY_THREADS_PER_DATA_DIR = 1;
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanerConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanerConfig.java
index 9e38d0c02e96b..8168197fe08ab 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanerConfig.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/CleanerConfig.java
@@ -16,11 +16,53 @@
*/
package org.apache.kafka.storage.internals.log;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
+
/**
* Configuration parameters for the log cleaner.
*/
public class CleanerConfig {
public static final String HASH_ALGORITHM = "MD5";
+ public static final int LOG_CLEANER_THREADS = 1;
+ public static final double LOG_CLEANER_IO_MAX_BYTES_PER_SECOND = Double.MAX_VALUE;
+ public static final long LOG_CLEANER_DEDUPE_BUFFER_SIZE = 128 * 1024 * 1024L;
+ public static final int LOG_CLEANER_IO_BUFFER_SIZE = 512 * 1024;
+ public static final double LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR = 0.9d;
+ public static final int LOG_CLEANER_BACKOFF_MS = 15 * 1000;
+ public static final boolean LOG_CLEANER_ENABLE = true;
+
+ public static final String LOG_CLEANER_THREADS_PROP = ServerTopicConfigSynonyms.LOG_CLEANER_PREFIX + "threads";
+ public static final String LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP = ServerTopicConfigSynonyms.LOG_CLEANER_PREFIX + "io.max.bytes.per.second";
+ public static final String LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP = ServerTopicConfigSynonyms.LOG_CLEANER_PREFIX + "dedupe.buffer.size";
+ public static final String LOG_CLEANER_IO_BUFFER_SIZE_PROP = ServerTopicConfigSynonyms.LOG_CLEANER_PREFIX + "io.buffer.size";
+ public static final String LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP = ServerTopicConfigSynonyms.LOG_CLEANER_PREFIX + "io.buffer.load.factor";
+ public static final String LOG_CLEANER_BACKOFF_MS_PROP = ServerTopicConfigSynonyms.LOG_CLEANER_PREFIX + "backoff.ms";
+ public static final String LOG_CLEANER_MIN_CLEAN_RATIO_PROP = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG);
+ public static final String LOG_CLEANER_ENABLE_PROP = ServerTopicConfigSynonyms.LOG_CLEANER_PREFIX + "enable";
+ public static final String LOG_CLEANER_DELETE_RETENTION_MS_PROP = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.DELETE_RETENTION_MS_CONFIG);
+ public static final String LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG);
+ public static final String LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG);
+
+ public static final String LOG_CLEANER_MIN_CLEAN_RATIO_DOC = "The minimum ratio of dirty log to total log for a log to eligible for cleaning. " +
+ "If the " + LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP + " or the " + LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP +
+ " configurations are also specified, then the log compactor considers the log eligible for compaction " +
+ "as soon as either: (i) the dirty ratio threshold has been met and the log has had dirty (uncompacted) " +
+ "records for at least the " + LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP + " duration, or (ii) if the log has had " +
+ "dirty (uncompacted) records for at most the " + LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP + " period.";
+ public static final String LOG_CLEANER_THREADS_DOC = "The number of background threads to use for log cleaning";
+ public static final String LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_DOC = "The log cleaner will be throttled so that the sum of its read and write i/o will be less than this value on average";
+ public static final String LOG_CLEANER_DEDUPE_BUFFER_SIZE_DOC = "The total memory used for log deduplication across all cleaner threads";
+ public static final String LOG_CLEANER_IO_BUFFER_SIZE_DOC = "The total memory used for log cleaner I/O buffers across all cleaner threads";
+ public static final String LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_DOC = "Log cleaner dedupe buffer load factor. The percentage full the dedupe buffer can become. A higher value " +
+ "will allow more log to be cleaned at once but will lead to more hash collisions";
+ public static final String LOG_CLEANER_BACKOFF_MS_DOC = "The amount of time to sleep when there are no logs to clean";
+ public static final String LOG_CLEANER_ENABLE_DOC = "Enable the log cleaner process to run on the server. Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic. If disabled those topics will not be compacted and continually grow in size.";
+ public static final String LOG_CLEANER_DELETE_RETENTION_MS_DOC = "The amount of time to retain tombstone message markers for log compacted topics. This setting also gives a bound " +
+ "on the time in which a consumer must complete a read if they begin from offset 0 to ensure that they get a valid snapshot of the final stage (otherwise " +
+ "tombstones messages may be collected before a consumer completes their scan).";
+ public static final String LOG_CLEANER_MIN_COMPACTION_LAG_MS_DOC = "The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted.";
+ public static final String LOG_CLEANER_MAX_COMPACTION_LAG_MS_DOC = "The maximum time a message will remain ineligible for compaction in the log. Only applicable for logs that are being compacted.";
public final int numThreads;
public final long dedupeBufferSize;
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index 3f3e9a243bd6c..4232e1d74c978 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -24,6 +24,7 @@
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.server.config.ConfigType;
import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
@@ -111,7 +112,7 @@ public void start() throws IOException {
brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString());
putIfAbsent(brokerConfig, KafkaConfig.ListenersProp(), "PLAINTEXT://localhost:" + DEFAULT_BROKER_PORT);
putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true);
- putIfAbsent(brokerConfig, KafkaConfig.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L);
+ putIfAbsent(brokerConfig, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 1024 * 1024L);
putIfAbsent(brokerConfig, KafkaConfig.GroupMinSessionTimeoutMsProp(), 0);
putIfAbsent(brokerConfig, KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0);
putIfAbsent(brokerConfig, KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) 1);