diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 0cb2ac1141bba..dcf13310dd29e 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -71,6 +71,7 @@ import java.util.{Optional, OptionalInt, OptionalLong} import scala.collection.{Map, Seq, Set, mutable} import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ +import scala.util.Random /* * Result metadata of a log append operation on the log @@ -1646,7 +1647,9 @@ class ReplicaManager(val config: KafkaConfig, var errorReadingData = false // The 1st topic-partition that has to be read from remote storage - var remoteFetchInfo: Optional[RemoteStorageFetchInfo] = Optional.empty() + // WORKAROUND: randomize partition fetched + // var remoteFetchInfo: Optional[RemoteStorageFetchInfo] = Optional.empty() + var remoteFetchInfos: Seq[Optional[RemoteStorageFetchInfo]] = Seq.empty var hasDivergingEpoch = false var hasPreferredReadReplica = false @@ -1657,8 +1660,12 @@ class ReplicaManager(val config: KafkaConfig, brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark() if (logReadResult.error != Errors.NONE) errorReadingData = true - if (!remoteFetchInfo.isPresent && logReadResult.info.delayedRemoteStorageFetch.isPresent) { - remoteFetchInfo = logReadResult.info.delayedRemoteStorageFetch + // WORKAROUND: randomize partition fetched + //if (!remoteFetchInfo.isPresent && logReadResult.info.delayedRemoteStorageFetch.isPresent) { + // remoteFetchInfo = logReadResult.info.delayedRemoteStorageFetch + //} + if (logReadResult.info.delayedRemoteStorageFetch.isPresent) { + remoteFetchInfos = remoteFetchInfos ++ Seq(logReadResult.info.delayedRemoteStorageFetch) } if (logReadResult.divergingEpoch.nonEmpty) hasDivergingEpoch = true @@ -1668,6 +1675,8 @@ class ReplicaManager(val config: KafkaConfig, logReadResultMap.put(topicIdPartition, logReadResult) } + val remoteFetchInfo: Optional[RemoteStorageFetchInfo] = maybeRemoteFetchInfo(remoteFetchInfos) + // Respond immediately if no remote fetches are required and any of the below conditions is true // 1) fetch request does not want to wait // 2) fetch request does not require any data @@ -1723,6 +1732,18 @@ class ReplicaManager(val config: KafkaConfig, } } + // WORKAROUND: randomize partition fetched + private val rnd = new Random() + def maybeRemoteFetchInfo(remoteFetchInfos: Seq[Optional[RemoteStorageFetchInfo]]): Optional[RemoteStorageFetchInfo] = { + val remoteFetchInfo: Optional[RemoteStorageFetchInfo] = if (remoteFetchInfos.isEmpty) { + Optional.empty() + } else { + val i = rnd.nextInt(remoteFetchInfos.size) + remoteFetchInfos(i) + } + remoteFetchInfo + } + /** * Read from multiple topic partitions at the given offset up to maxSize bytes */ diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 0ca8e8127fad8..4fab5337c49b3 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -6558,6 +6558,51 @@ class ReplicaManagerTest { )) } } + + @Test + def testRandomizedRemoteFetchInfo(): Unit = { + // Given + val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect, logDirCount = 2) + val config = KafkaConfig.fromProps(props) + val logDirFiles = config.logDirs.map(new File(_)) + val logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size) + val logManager = TestUtils.createLogManager(logDirFiles, defaultConfig = new LogConfig(new Properties()), time = time) + val mockZkClient = mock(classOf[KafkaZkClient]) + val replicaManager = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = time.scheduler, + logManager = logManager, + quotaManagers = quotaManager, + metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion), + logDirFailureChannel = logDirFailureChannel, + alterPartitionManager = alterPartitionManager, + threadNamePrefix = Option(this.getClass.getName), + zkClient = Option(mockZkClient), + ) + + var infos: Seq[Optional[RemoteStorageFetchInfo]] = Seq() + + // When + val emptyRemoteFetchInfo = replicaManager.maybeRemoteFetchInfo(infos) + + // Then + assertFalse(emptyRemoteFetchInfo.isPresent) + + // Given + val info0 = new RemoteStorageFetchInfo(1000, false, new TopicPartition(topic, 0), null, FetchIsolation.HIGH_WATERMARK, false) + val info1 = new RemoteStorageFetchInfo(1000, false, new TopicPartition(topic, 1), null, FetchIsolation.HIGH_WATERMARK, false) + val info2 = new RemoteStorageFetchInfo(1000, false, new TopicPartition(topic, 2), null, FetchIsolation.HIGH_WATERMARK, false) + infos = infos ++ Seq(Optional.of(info0), Optional.of(info1), Optional.of(info2)) + + // When + val someRemoteFetchInfo = replicaManager.maybeRemoteFetchInfo(infos) + + // Then + assertTrue(someRemoteFetchInfo.isPresent) + println(someRemoteFetchInfo.get()) + } } class MockReplicaSelector extends ReplicaSelector {