diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index 02e57b4009f6a..b081ce637fd15 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -70,10 +70,19 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager, val logs = logManager.logsByTopic(topic) val wasRemoteLogEnabledBeforeUpdate = logs.exists(_.remoteLogEnabled()) + maybeFailIfDisablingRemoteStorage(props, wasRemoteLogEnabledBeforeUpdate) logManager.updateTopicConfig(topic, props, kafkaConfig.isRemoteLogStorageSystemEnabled) maybeBootstrapRemoteLogComponents(topic, logs, wasRemoteLogEnabledBeforeUpdate) } + private[server] def maybeFailIfDisablingRemoteStorage(props: Properties, + wasRemoteLogEnabledBeforeUpdate: Boolean): Unit = { + val isRemoteLogToBeEnabled = props.getProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, String.valueOf(wasRemoteLogEnabledBeforeUpdate)) + if (wasRemoteLogEnabledBeforeUpdate && !java.lang.Boolean.parseBoolean(isRemoteLogToBeEnabled)) { + throw new IllegalArgumentException(s"Disabling remote log on the topic is not supported.") + } + } + private[server] def maybeBootstrapRemoteLogComponents(topic: String, logs: Seq[UnifiedLog], wasRemoteLogEnabledBeforeUpdate: Boolean): Unit = { diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala index 3a4715660fbc2..25d4e17788e53 100644 --- a/core/src/main/scala/kafka/zk/AdminZkClient.scala +++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala @@ -24,6 +24,7 @@ import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig, KafkaConfig} import kafka.utils._ import kafka.utils.Implicits._ import org.apache.kafka.admin.{AdminUtils, BrokerMetadata} +import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic @@ -477,6 +478,14 @@ class AdminZkClient(zkClient: KafkaZkClient, Topic.validate(topic) if (!zkClient.topicExists(topic)) throw new UnknownTopicOrPartitionException(s"Topic '$topic' does not exist.") + + // fix: workaround to fail when trying to disable tiered storage instead of silently update value while logging warning + val currentRemoteStorageEnable = zkClient.getEntityConfigs(ConfigType.Topic, topic).getProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false") + val newRemoteStorageEnable = configs.getProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, currentRemoteStorageEnable) + if (java.lang.Boolean.parseBoolean(currentRemoteStorageEnable) && !java.lang.Boolean.parseBoolean(newRemoteStorageEnable)) { + throw new InvalidConfigurationException(s"Disabling remote log on the topic is not supported.") + } + // remove the topic overrides LogConfig.validate(configs, kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()), diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index 2751f0c71affb..5761fcbeda56b 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -599,4 +599,18 @@ class DynamicConfigChangeUnitTest { configHandler.maybeBootstrapRemoteLogComponents(topic, Seq(log0), isRemoteLogEnabledBeforeUpdate) verify(rlm, never()).onLeadershipChange(any(), any(), any()) } + + @Test + def testDisableRemoteLogStorageOnTopicOnAlreadyEnabledTopic(): Unit = { + val rlm: RemoteLogManager = mock(classOf[RemoteLogManager]) + val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) + when(replicaManager.remoteLogManager).thenReturn(Some(rlm)) + + val isRemoteLogEnabledBeforeUpdate = true + val configHandler: TopicConfigHandler = new TopicConfigHandler(replicaManager, null, null, None) + val newProps = new Properties() + newProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false") + assertThrows(classOf[IllegalArgumentException], () => configHandler.maybeFailIfDisablingRemoteStorage(newProps, isRemoteLogEnabledBeforeUpdate)) + verify(rlm, never()).onLeadershipChange(any(), any(), any()) + } }