Skip to content

Commit

Permalink
KAFKA-17928: Make remote log manager thread-pool configs dynamic (apa…
Browse files Browse the repository at this point in the history
…che#17859)

- Disallow configuring -1 for copier and expiration thread pools dynamically

Co-authored-by: Peter Lee <[email protected]>

Reviewers: Peter Lee <[email protected]>, Satish Duggana <[email protected]>
  • Loading branch information
kamalcph authored Dec 14, 2024
1 parent 9157589 commit 139e5b1
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static java.lang.Thread.UncaughtExceptionHandler;

/**
* Utilities for working with threads.
*/
public class ThreadUtils {

private static final Logger log = LoggerFactory.getLogger(ThreadUtils.class);

/**
* Create a new ThreadFactory.
*
Expand All @@ -42,6 +45,22 @@ public class ThreadUtils {
*/
public static ThreadFactory createThreadFactory(final String pattern,
final boolean daemon) {
return createThreadFactory(pattern, daemon, null);
}

/**
* Create a new ThreadFactory.
*
* @param pattern The pattern to use. If this contains %d, it will be
* replaced with a thread number. It should not contain more
* than one %d.
* @param daemon True if we want daemon threads.
* @param ueh thread's uncaught exception handler.
* @return The new ThreadFactory.
*/
public static ThreadFactory createThreadFactory(final String pattern,
final boolean daemon,
final UncaughtExceptionHandler ueh) {
return new ThreadFactory() {
private final AtomicLong threadEpoch = new AtomicLong(0);

Expand All @@ -55,6 +74,9 @@ public Thread newThread(Runnable r) {
}
Thread thread = new Thread(r, threadName);
thread.setDaemon(daemon);
if (ueh != null) {
thread.setUncaughtExceptionHandler(ueh);
}
return thread;
}
};
Expand All @@ -64,12 +86,15 @@ public Thread newThread(Runnable r) {
* Shuts down an executor service in two phases, first by calling shutdown to reject incoming tasks,
* and then calling shutdownNow, if necessary, to cancel any lingering tasks.
* After the timeout/on interrupt, the service is forcefully closed.
* This pattern of shutting down thread pool is adopted from here:
* <a href="https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/ExecutorService.html">ExecutorService</a>
* @param executorService The service to shut down.
* @param timeout The timeout of the shutdown.
* @param timeUnit The time unit of the shutdown timeout.
* @param timeout The timeout of the shutdown.
* @param timeUnit The time unit of the shutdown timeout.
*/
public static void shutdownExecutorServiceQuietly(ExecutorService executorService,
long timeout, TimeUnit timeUnit) {
long timeout,
TimeUnit timeUnit) {
if (executorService == null) {
return;
}
Expand Down
88 changes: 44 additions & 44 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ChildFirstClassLoader;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.CheckpointFile;
Expand Down Expand Up @@ -130,7 +130,6 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -162,7 +161,7 @@
public class RemoteLogManager implements Closeable {

private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class);
private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = "remote-log-reader";
private static final String REMOTE_LOG_READER_THREAD_NAME_PATTERN = "remote-log-reader-%d";
private final RemoteLogManagerConfig rlmConfig;
private final int brokerId;
private final String logDir;
Expand Down Expand Up @@ -255,18 +254,18 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
indexCache = new RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), remoteLogStorageManager, logDir);
delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
rlmCopyThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerCopierThreadPoolSize(),
"RLMCopyThreadPool", "kafka-rlm-copy-thread-pool-");
"RLMCopyThreadPool", "kafka-rlm-copy-thread-pool-%d");
rlmExpirationThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerExpirationThreadPoolSize(),
"RLMExpirationThreadPool", "kafka-rlm-expiration-thread-pool-");
"RLMExpirationThreadPool", "kafka-rlm-expiration-thread-pool-%d");
followerThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize(),
"RLMFollowerScheduledThreadPool", "kafka-rlm-follower-thread-pool-");
"RLMFollowerScheduledThreadPool", "kafka-rlm-follower-thread-pool-%d");

metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC, rlmCopyThreadPool::getIdlePercent);
remoteReadTimer = metricsGroup.newTimer(REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC,
TimeUnit.MILLISECONDS, TimeUnit.SECONDS);

remoteStorageReaderThreadPool = new RemoteStorageThreadPool(
REMOTE_LOG_READER_THREAD_NAME_PREFIX,
REMOTE_LOG_READER_THREAD_NAME_PATTERN,
rlmConfig.remoteLogReaderThreads(),
rlmConfig.remoteLogReaderMaxPendingTasks()
);
Expand All @@ -290,6 +289,24 @@ public void updateFetchQuota(long quota) {
rlmFetchQuotaManager.updateQuota(new Quota(quota, true));
}

public void resizeCopierThreadPool(int newSize) {
int currentSize = rlmCopyThreadPool.getCorePoolSize();
LOGGER.info("Updating remote copy thread pool size from {} to {}", currentSize, newSize);
rlmCopyThreadPool.setCorePoolSize(newSize);
}

public void resizeExpirationThreadPool(int newSize) {
int currentSize = rlmExpirationThreadPool.getCorePoolSize();
LOGGER.info("Updating remote expiration thread pool size from {} to {}", currentSize, newSize);
rlmExpirationThreadPool.setCorePoolSize(newSize);
}

public void resizeReaderThreadPool(int newSize) {
int currentSize = remoteStorageReaderThreadPool.getCorePoolSize();
LOGGER.info("Updating remote reader thread pool size from {} to {}", currentSize, newSize);
remoteStorageReaderThreadPool.setCorePoolSize(newSize);
}

private void removeMetrics() {
metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC);
metricsGroup.removeMetric(REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC);
Expand Down Expand Up @@ -2077,28 +2094,10 @@ public void close() {
}
}

private static void shutdownAndAwaitTermination(ExecutorService pool, String poolName, long timeout, TimeUnit timeUnit) {
// This pattern of shutting down thread pool is adopted from here: https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/ExecutorService.html
LOGGER.info("Shutting down of thread pool {} is started", poolName);
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(timeout, timeUnit)) {
LOGGER.info("Shutting down of thread pool {} could not be completed. It will retry cancelling the tasks using shutdownNow.", poolName);
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(timeout, timeUnit))
LOGGER.warn("Shutting down of thread pool {} could not be completed even after retrying cancellation of the tasks using shutdownNow.", poolName);
}
} catch (InterruptedException ex) {
// (Re-)Cancel if current thread also interrupted
LOGGER.warn("Encountered InterruptedException while shutting down thread pool {}. It will retry cancelling the tasks using shutdownNow.", poolName);
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}

LOGGER.info("Shutting down of thread pool {} is completed", poolName);
private static void shutdownAndAwaitTermination(ExecutorService executor, String poolName, long timeout, TimeUnit timeUnit) {
LOGGER.info("Shutting down {} executor", poolName);
ThreadUtils.shutdownExecutorServiceQuietly(executor, timeout, timeUnit);
LOGGER.info("{} executor shutdown completed", poolName);
}

//Visible for testing
Expand Down Expand Up @@ -2152,31 +2151,32 @@ RLMTaskWithFuture followerTask(TopicIdPartition partition) {
static class RLMScheduledThreadPool {

private static final Logger LOGGER = LoggerFactory.getLogger(RLMScheduledThreadPool.class);
private final int poolSize;
private final String threadPoolName;
private final String threadNamePrefix;
private final String threadNamePattern;
private final ScheduledThreadPoolExecutor scheduledThreadPool;

public RLMScheduledThreadPool(int poolSize, String threadPoolName, String threadNamePrefix) {
this.poolSize = poolSize;
public RLMScheduledThreadPool(int poolSize, String threadPoolName, String threadNamePattern) {
this.threadPoolName = threadPoolName;
this.threadNamePrefix = threadNamePrefix;
scheduledThreadPool = createPool();
this.threadNamePattern = threadNamePattern;
scheduledThreadPool = createPool(poolSize);
}

public void setCorePoolSize(int newSize) {
scheduledThreadPool.setCorePoolSize(newSize);
}

public int getCorePoolSize() {
return scheduledThreadPool.getCorePoolSize();
}

private ScheduledThreadPoolExecutor createPool() {
private ScheduledThreadPoolExecutor createPool(int poolSize) {
ThreadFactory threadFactory = ThreadUtils.createThreadFactory(threadNamePattern, true,
(t, e) -> LOGGER.error("Uncaught exception in thread '{}':", t.getName(), e));
ScheduledThreadPoolExecutor threadPool = new ScheduledThreadPoolExecutor(poolSize);
threadPool.setRemoveOnCancelPolicy(true);
threadPool.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
threadPool.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
threadPool.setThreadFactory(new ThreadFactory() {
private final AtomicInteger sequence = new AtomicInteger();

public Thread newThread(Runnable r) {
return KafkaThread.daemon(threadNamePrefix + sequence.incrementAndGet(), r);
}
});

threadPool.setThreadFactory(threadFactory);
return threadPool;
}

Expand Down
42 changes: 36 additions & 6 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,22 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable w
throw new ConfigException(s"$errorMsg, value should be at least 1")
}
}

if (RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP.equals(k) ||
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP.equals(k) ||
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP.equals(k)) {
val newValue = v.asInstanceOf[Int]
val oldValue = server.config.getInt(k)
if (newValue != oldValue) {
val errorMsg = s"Dynamic thread count update validation failed for $k=$v"
if (newValue <= 0)
throw new ConfigException(s"$errorMsg, value should be at least 1")
if (newValue < oldValue / 2)
throw new ConfigException(s"$errorMsg, value should be at least half the current value $oldValue")
if (newValue > oldValue * 2)
throw new ConfigException(s"$errorMsg, value should not be greater than double the current value $oldValue")
}
}
}
}

Expand All @@ -1176,29 +1192,40 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable w

def isChangedLongValue(k : String): Boolean = oldLongValue(k) != newLongValue(k)

val remoteLogManager = server.remoteLogManagerOpt
if (remoteLogManager.nonEmpty) {
if (server.remoteLogManagerOpt.nonEmpty) {
val remoteLogManager = server.remoteLogManagerOpt.get
if (isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)) {
val oldValue = oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
val newValue = newLongValue(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
remoteLogManager.get.resizeCacheSize(newValue)
remoteLogManager.resizeCacheSize(newValue)
info(s"Dynamic remote log manager config: ${RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP} updated, " +
s"old value: $oldValue, new value: $newValue")
}
if (isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)) {
val oldValue = oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)
val newValue = newLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)
remoteLogManager.get.updateCopyQuota(newValue)
remoteLogManager.updateCopyQuota(newValue)
info(s"Dynamic remote log manager config: ${RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP} updated, " +
s"old value: $oldValue, new value: $newValue")
}
if (isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)) {
val oldValue = oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)
val newValue = newLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)
remoteLogManager.get.updateFetchQuota(newValue)
remoteLogManager.updateFetchQuota(newValue)
info(s"Dynamic remote log manager config: ${RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP} updated, " +
s"old value: $oldValue, new value: $newValue")
}

val newRLMConfig = newConfig.remoteLogManagerConfig
val oldRLMConfig = oldConfig.remoteLogManagerConfig
if (newRLMConfig.remoteLogManagerCopierThreadPoolSize() != oldRLMConfig.remoteLogManagerCopierThreadPoolSize())
remoteLogManager.resizeCopierThreadPool(newRLMConfig.remoteLogManagerCopierThreadPoolSize())

if (newRLMConfig.remoteLogManagerExpirationThreadPoolSize() != oldRLMConfig.remoteLogManagerExpirationThreadPoolSize())
remoteLogManager.resizeExpirationThreadPool(newRLMConfig.remoteLogManagerExpirationThreadPoolSize())

if (newRLMConfig.remoteLogReaderThreads() != oldRLMConfig.remoteLogReaderThreads())
remoteLogManager.resizeReaderThreadPool(newRLMConfig.remoteLogReaderThreads())
}
}

Expand All @@ -1219,6 +1246,9 @@ object DynamicRemoteLogConfig {
RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP,
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
RemoteLogManagerConfig.REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP
RemoteLogManagerConfig.REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP,
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP
)
}
Loading

0 comments on commit 139e5b1

Please sign in to comment.