Skip to content

Commit

Permalink
KAFKA-14588 Log cleaner configuration move to CleanerConfig (apache#1…
Browse files Browse the repository at this point in the history
…5387)

In order to move ConfigCommand to tools we must move all it's dependencies which includes KafkaConfig and other core classes to java. This PR moves log cleaner configuration to CleanerConfig class of storage module.

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
nizhikov authored Mar 5, 2024
1 parent 4779277 commit eea369a
Show file tree
Hide file tree
Showing 19 changed files with 141 additions and 127 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2177,6 +2177,7 @@ project(':streams') {
testImplementation project(':core')
testImplementation project(':tools')
testImplementation project(':core').sourceSets.test.output
testImplementation project(':storage')
testImplementation project(':server-common')
testImplementation project(':server-common').sourceSets.test.output
testImplementation libs.log4j
Expand Down Expand Up @@ -2975,6 +2976,7 @@ project(':connect:runtime') {
testImplementation project(':metadata')
testImplementation project(':core').sourceSets.test.output
testImplementation project(':server-common')
testImplementation project(':storage')
testImplementation project(':connect:test-plugins')

testImplementation libs.easymock
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control-core.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.fault" />
<allow class="org.apache.kafka.storage.internals.log.CleanerConfig" />
</subpackage>

<subpackage name="tools">
Expand Down
2 changes: 2 additions & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.kafka.tools" />
<allow pkg="org.apache.kafka.server.config" />
<allow class="org.apache.kafka.storage.internals.log.CleanerConfig" />
</subpackage>

<subpackage name="test">
Expand Down Expand Up @@ -601,6 +602,7 @@
<allow pkg="com.fasterxml.jackson.core.type" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.eclipse.jetty.client"/>
<allow class="org.apache.kafka.storage.internals.log.CleanerConfig" />
</subpackage>
</subpackage>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.metadata.BrokerState;
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -160,7 +161,7 @@ private void doStart() {
putIfAbsent(brokerConfig, KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) brokers.length);
putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), false);
// reduce the size of the log cleaner map to reduce test memory usage
putIfAbsent(brokerConfig, KafkaConfig.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L);
putIfAbsent(brokerConfig, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 1024 * 1024L);

Object listenerConfig = brokerConfig.get(KafkaConfig.InterBrokerListenerNameProp());
if (listenerConfig == null)
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -499,13 +499,13 @@ class LogCleaner(initialConfig: CleanerConfig,

object LogCleaner {
val ReconfigurableConfigs: Set[String] = Set(
KafkaConfig.LogCleanerThreadsProp,
KafkaConfig.LogCleanerDedupeBufferSizeProp,
KafkaConfig.LogCleanerDedupeBufferLoadFactorProp,
KafkaConfig.LogCleanerIoBufferSizeProp,
CleanerConfig.LOG_CLEANER_THREADS_PROP,
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP,
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP,
CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP,
KafkaConfig.MessageMaxBytesProp,
KafkaConfig.LogCleanerIoMaxBytesPerSecondProp,
KafkaConfig.LogCleanerBackoffMsProp
CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP,
CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP
)

def cleanerConfig(config: KafkaConfig): CleanerConfig = {
Expand Down
76 changes: 23 additions & 53 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import org.apache.kafka.server.config.{Defaults, ServerTopicConfigSynonyms}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.Csv
import org.apache.kafka.storage.internals.log.{LogConfig, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
import org.apache.zookeeper.client.ZKClientConfig

Expand Down Expand Up @@ -211,17 +211,6 @@ object KafkaConfig {
val LogRetentionBytesProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.RETENTION_BYTES_CONFIG)
val LogCleanupIntervalMsProp = LogConfigPrefix + "retention.check.interval.ms"
val LogCleanupPolicyProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.CLEANUP_POLICY_CONFIG)
val LogCleanerThreadsProp = LogConfigPrefix + "cleaner.threads"
val LogCleanerIoMaxBytesPerSecondProp = LogConfigPrefix + "cleaner.io.max.bytes.per.second"
val LogCleanerDedupeBufferSizeProp = LogConfigPrefix + "cleaner.dedupe.buffer.size"
val LogCleanerIoBufferSizeProp = LogConfigPrefix + "cleaner.io.buffer.size"
val LogCleanerDedupeBufferLoadFactorProp = LogConfigPrefix + "cleaner.io.buffer.load.factor"
val LogCleanerBackoffMsProp = LogConfigPrefix + "cleaner.backoff.ms"
val LogCleanerMinCleanRatioProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG)
val LogCleanerEnableProp = LogConfigPrefix + "cleaner.enable"
val LogCleanerDeleteRetentionMsProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.DELETE_RETENTION_MS_CONFIG)
val LogCleanerMinCompactionLagMsProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG)
val LogCleanerMaxCompactionLagMsProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG)
val LogIndexSizeMaxBytesProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG)
val LogIndexIntervalBytesProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG)
val LogFlushIntervalMessagesProp = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG)
Expand Down Expand Up @@ -652,25 +641,6 @@ object KafkaConfig {
val LogRetentionBytesDoc = "The maximum size of the log before deleting it"
val LogCleanupIntervalMsDoc = "The frequency in milliseconds that the log cleaner checks whether any log is eligible for deletion"
val LogCleanupPolicyDoc = "The default cleanup policy for segments beyond the retention window. A comma separated list of valid policies. Valid policies are: \"delete\" and \"compact\""
val LogCleanerThreadsDoc = "The number of background threads to use for log cleaning"
val LogCleanerIoMaxBytesPerSecondDoc = "The log cleaner will be throttled so that the sum of its read and write i/o will be less than this value on average"
val LogCleanerDedupeBufferSizeDoc = "The total memory used for log deduplication across all cleaner threads"
val LogCleanerIoBufferSizeDoc = "The total memory used for log cleaner I/O buffers across all cleaner threads"
val LogCleanerDedupeBufferLoadFactorDoc = "Log cleaner dedupe buffer load factor. The percentage full the dedupe buffer can become. A higher value " +
"will allow more log to be cleaned at once but will lead to more hash collisions"
val LogCleanerBackoffMsDoc = "The amount of time to sleep when there are no logs to clean"
val LogCleanerMinCleanRatioDoc = "The minimum ratio of dirty log to total log for a log to eligible for cleaning. " +
"If the " + LogCleanerMaxCompactionLagMsProp + " or the " + LogCleanerMinCompactionLagMsProp +
" configurations are also specified, then the log compactor considers the log eligible for compaction " +
"as soon as either: (i) the dirty ratio threshold has been met and the log has had dirty (uncompacted) " +
"records for at least the " + LogCleanerMinCompactionLagMsProp + " duration, or (ii) if the log has had " +
"dirty (uncompacted) records for at most the " + LogCleanerMaxCompactionLagMsProp + " period."
val LogCleanerEnableDoc = "Enable the log cleaner process to run on the server. Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic. If disabled those topics will not be compacted and continually grow in size."
val LogCleanerDeleteRetentionMsDoc = "The amount of time to retain tombstone message markers for log compacted topics. This setting also gives a bound " +
"on the time in which a consumer must complete a read if they begin from offset 0 to ensure that they get a valid snapshot of the final stage (otherwise " +
"tombstones messages may be collected before a consumer completes their scan)."
val LogCleanerMinCompactionLagMsDoc = "The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted."
val LogCleanerMaxCompactionLagMsDoc = "The maximum time a message will remain ineligible for compaction in the log. Only applicable for logs that are being compacted."
val LogIndexSizeMaxBytesDoc = "The maximum size in bytes of the offset index"
val LogIndexIntervalBytesDoc = "The interval with which we add an entry to the offset index."
val LogFlushIntervalMessagesDoc = "The number of messages accumulated on a log partition before messages are flushed to disk."
Expand Down Expand Up @@ -1075,17 +1045,17 @@ object KafkaConfig {
.define(LogRetentionBytesProp, LONG, LogConfig.DEFAULT_RETENTION_BYTES, HIGH, LogRetentionBytesDoc)
.define(LogCleanupIntervalMsProp, LONG, Defaults.LOG_CLEANUP_INTERVAL_MS, atLeast(1), MEDIUM, LogCleanupIntervalMsDoc)
.define(LogCleanupPolicyProp, LIST, LogConfig.DEFAULT_CLEANUP_POLICY, ValidList.in(TopicConfig.CLEANUP_POLICY_COMPACT, TopicConfig.CLEANUP_POLICY_DELETE), MEDIUM, LogCleanupPolicyDoc)
.define(LogCleanerThreadsProp, INT, Defaults.LOG_CLEANER_THREADS, atLeast(0), MEDIUM, LogCleanerThreadsDoc)
.define(LogCleanerIoMaxBytesPerSecondProp, DOUBLE, Defaults.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND, MEDIUM, LogCleanerIoMaxBytesPerSecondDoc)
.define(LogCleanerDedupeBufferSizeProp, LONG, Defaults.LOG_CLEANER_DEDUPE_BUFFER_SIZE, MEDIUM, LogCleanerDedupeBufferSizeDoc)
.define(LogCleanerIoBufferSizeProp, INT, Defaults.LOG_CLEANER_IO_BUFFER_SIZE, atLeast(0), MEDIUM, LogCleanerIoBufferSizeDoc)
.define(LogCleanerDedupeBufferLoadFactorProp, DOUBLE, Defaults.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR, MEDIUM, LogCleanerDedupeBufferLoadFactorDoc)
.define(LogCleanerBackoffMsProp, LONG, Defaults.LOG_CLEANER_BACKOFF_MS, atLeast(0), MEDIUM, LogCleanerBackoffMsDoc)
.define(LogCleanerMinCleanRatioProp, DOUBLE, LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO, between(0, 1), MEDIUM, LogCleanerMinCleanRatioDoc)
.define(LogCleanerEnableProp, BOOLEAN, Defaults.LOG_CLEANER_ENABLE, MEDIUM, LogCleanerEnableDoc)
.define(LogCleanerDeleteRetentionMsProp, LONG, LogConfig.DEFAULT_DELETE_RETENTION_MS, atLeast(0), MEDIUM, LogCleanerDeleteRetentionMsDoc)
.define(LogCleanerMinCompactionLagMsProp, LONG, LogConfig.DEFAULT_MIN_COMPACTION_LAG_MS, atLeast(0), MEDIUM, LogCleanerMinCompactionLagMsDoc)
.define(LogCleanerMaxCompactionLagMsProp, LONG, LogConfig.DEFAULT_MAX_COMPACTION_LAG_MS, atLeast(1), MEDIUM, LogCleanerMaxCompactionLagMsDoc)
.define(CleanerConfig.LOG_CLEANER_THREADS_PROP, INT, CleanerConfig.LOG_CLEANER_THREADS, atLeast(0), MEDIUM, CleanerConfig.LOG_CLEANER_THREADS_DOC)
.define(CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP, DOUBLE, CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND, MEDIUM, CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_DOC)
.define(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, LONG, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE, MEDIUM, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_DOC)
.define(CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP, INT, CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE, atLeast(0), MEDIUM, CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_DOC)
.define(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP, DOUBLE, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR, MEDIUM, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_DOC)
.define(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP, LONG, CleanerConfig.LOG_CLEANER_BACKOFF_MS, atLeast(0), MEDIUM, CleanerConfig.LOG_CLEANER_BACKOFF_MS_DOC)
.define(CleanerConfig.LOG_CLEANER_MIN_CLEAN_RATIO_PROP, DOUBLE, LogConfig.DEFAULT_MIN_CLEANABLE_DIRTY_RATIO, between(0, 1), MEDIUM, CleanerConfig.LOG_CLEANER_MIN_CLEAN_RATIO_DOC)
.define(CleanerConfig.LOG_CLEANER_ENABLE_PROP, BOOLEAN, CleanerConfig.LOG_CLEANER_ENABLE, MEDIUM, CleanerConfig.LOG_CLEANER_ENABLE_DOC)
.define(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP, LONG, LogConfig.DEFAULT_DELETE_RETENTION_MS, atLeast(0), MEDIUM, CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_DOC)
.define(CleanerConfig.LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP, LONG, LogConfig.DEFAULT_MIN_COMPACTION_LAG_MS, atLeast(0), MEDIUM, CleanerConfig.LOG_CLEANER_MIN_COMPACTION_LAG_MS_DOC)
.define(CleanerConfig.LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP, LONG, LogConfig.DEFAULT_MAX_COMPACTION_LAG_MS, atLeast(1), MEDIUM, CleanerConfig.LOG_CLEANER_MAX_COMPACTION_LAG_MS_DOC)
.define(LogIndexSizeMaxBytesProp, INT, LogConfig.DEFAULT_SEGMENT_INDEX_BYTES, atLeast(4), MEDIUM, LogIndexSizeMaxBytesDoc)
.define(LogIndexIntervalBytesProp, INT, LogConfig.DEFAULT_INDEX_INTERVAL_BYTES, atLeast(0), MEDIUM, LogIndexIntervalBytesDoc)
.define(LogFlushIntervalMessagesProp, LONG, LogConfig.DEFAULT_FLUSH_MESSAGES_INTERVAL, atLeast(1), HIGH, LogFlushIntervalMessagesDoc)
Expand Down Expand Up @@ -1652,7 +1622,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val logDirs = CoreUtils.parseCsvList(Option(getString(KafkaConfig.LogDirsProp)).getOrElse(getString(KafkaConfig.LogDirProp)))
def logSegmentBytes = getInt(KafkaConfig.LogSegmentBytesProp)
def logFlushIntervalMessages = getLong(KafkaConfig.LogFlushIntervalMessagesProp)
val logCleanerThreads = getInt(KafkaConfig.LogCleanerThreadsProp)
val logCleanerThreads = getInt(CleanerConfig.LOG_CLEANER_THREADS_PROP)
def numRecoveryThreadsPerDataDir = getInt(KafkaConfig.NumRecoveryThreadsPerDataDirProp)
val logFlushSchedulerIntervalMs = getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp)
val logFlushOffsetCheckpointIntervalMs = getInt(KafkaConfig.LogFlushOffsetCheckpointIntervalMsProp).toLong
Expand All @@ -1662,16 +1632,16 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val offsetsRetentionMinutes = getInt(KafkaConfig.OffsetsRetentionMinutesProp)
val offsetsRetentionCheckIntervalMs = getLong(KafkaConfig.OffsetsRetentionCheckIntervalMsProp)
def logRetentionBytes = getLong(KafkaConfig.LogRetentionBytesProp)
val logCleanerDedupeBufferSize = getLong(KafkaConfig.LogCleanerDedupeBufferSizeProp)
val logCleanerDedupeBufferLoadFactor = getDouble(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp)
val logCleanerIoBufferSize = getInt(KafkaConfig.LogCleanerIoBufferSizeProp)
val logCleanerIoMaxBytesPerSecond = getDouble(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp)
def logCleanerDeleteRetentionMs = getLong(KafkaConfig.LogCleanerDeleteRetentionMsProp)
def logCleanerMinCompactionLagMs = getLong(KafkaConfig.LogCleanerMinCompactionLagMsProp)
def logCleanerMaxCompactionLagMs = getLong(KafkaConfig.LogCleanerMaxCompactionLagMsProp)
val logCleanerBackoffMs = getLong(KafkaConfig.LogCleanerBackoffMsProp)
def logCleanerMinCleanRatio = getDouble(KafkaConfig.LogCleanerMinCleanRatioProp)
val logCleanerEnable = getBoolean(KafkaConfig.LogCleanerEnableProp)
val logCleanerDedupeBufferSize = getLong(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP)
val logCleanerDedupeBufferLoadFactor = getDouble(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP)
val logCleanerIoBufferSize = getInt(CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP)
val logCleanerIoMaxBytesPerSecond = getDouble(CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP)
def logCleanerDeleteRetentionMs = getLong(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP)
def logCleanerMinCompactionLagMs = getLong(CleanerConfig.LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP)
def logCleanerMaxCompactionLagMs = getLong(CleanerConfig.LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP)
val logCleanerBackoffMs = getLong(CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP)
def logCleanerMinCleanRatio = getDouble(CleanerConfig.LOG_CLEANER_MIN_CLEAN_RATIO_PROP)
val logCleanerEnable = getBoolean(CleanerConfig.LOG_CLEANER_ENABLE_PROP)
def logIndexSizeMaxBytes = getInt(KafkaConfig.LogIndexSizeMaxBytesProp)
def logIndexIntervalBytes = getInt(KafkaConfig.LogIndexIntervalBytesProp)
def logDeleteDelayMs = getLong(KafkaConfig.LogDeleteDelayMsProp)
Expand Down
3 changes: 2 additions & 1 deletion core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.MockFaultHandler;
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -192,7 +193,7 @@ private KafkaConfig createNodeConfig(TestKitNode node) {
props.put(RaftConfig.QUORUM_VOTERS_CONFIG, uninitializedQuorumVotersString);

// reduce log cleaner offset map memory usage
props.put(KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), "2097152");
props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152");

// Add associated broker node property overrides
if (brokerNode != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{ConsumerGroupState, ElectionType, TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica, Uuid}
import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
import org.apache.kafka.server.config.Defaults
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
Expand Down Expand Up @@ -447,7 +447,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
configs.get(brokerResource2).entries.size)
assertEquals(brokers(2).config.brokerId.toString, configs.get(brokerResource2).get(KafkaConfig.BrokerIdProp).value)
assertEquals(brokers(2).config.logCleanerThreads.toString,
configs.get(brokerResource2).get(KafkaConfig.LogCleanerThreadsProp).value)
configs.get(brokerResource2).get(CleanerConfig.LOG_CLEANER_THREADS_PROP).value)

checkValidAlterConfigs(client, this, topicResource1, topicResource2)
}
Expand Down Expand Up @@ -2532,7 +2532,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
.all().get(15, TimeUnit.SECONDS)

val newLogCleanerDeleteRetention = new Properties
newLogCleanerDeleteRetention.put(KafkaConfig.LogCleanerDeleteRetentionMsProp, "34")
newLogCleanerDeleteRetention.put(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP, "34")
TestUtils.incrementalAlterConfigs(brokers, client, newLogCleanerDeleteRetention, perBrokerConfig = true)
.all().get(15, TimeUnit.SECONDS)

Expand All @@ -2543,14 +2543,14 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
controllerServer.config.nodeId.toString)
controllerServer.controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT,
Collections.singletonMap(controllerNodeResource,
Collections.singletonMap(KafkaConfig.LogCleanerDeleteRetentionMsProp,
Collections.singletonMap(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP,
new SimpleImmutableEntry(AlterConfigOp.OpType.SET, "34"))), false).get()
ensureConsistentKRaftMetadata()
}

waitUntilTrue(() => brokers.forall(_.config.originals.getOrDefault(
KafkaConfig.LogCleanerDeleteRetentionMsProp, "").toString.equals("34")),
s"Timed out waiting for change to ${KafkaConfig.LogCleanerDeleteRetentionMsProp}",
CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP, "").toString.equals("34")),
s"Timed out waiting for change to ${CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP}",
waitTimeMs = 60000L)

waitUntilTrue(() => brokers.forall(_.config.originals.getOrDefault(
Expand Down
Loading

0 comments on commit eea369a

Please sign in to comment.