Skip to content

Commit

Permalink
Temporal workaround for KAFKA-15776
Browse files Browse the repository at this point in the history
By adding a server side config, remote fetch operations can be managed by Kafka operators.

See https://issues.apache.org/jira/browse/KAFKA-15776
  • Loading branch information
jeqo committed Jan 24, 2024
1 parent bc1d5f1 commit 6a8771e
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 10 deletions.
6 changes: 4 additions & 2 deletions core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
fetchParams: FetchParams,
localReadResults: Seq[(TopicIdPartition, LogReadResult)],
replicaManager: ReplicaManager,
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit)
extends DelayedOperation(fetchParams.maxWaitMs) {
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit,
// TODO: temporal workaround, wait for workaround on KAFKA-15776
maxWaitMs: Long)
extends DelayedOperation(maxWaitMs) {

/**
* The operation can be completed if:
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1357,8 +1357,10 @@ class ReplicaManager(val config: KafkaConfig,
return Some(createLogReadResult(e))
}

// TODO: temporal workaround, wait for workaround on KAFKA-15776
val maxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs()
val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo,
fetchPartitionStatus, params, logReadResults, this, responseCallback)
fetchPartitionStatus, params, logReadResults, this, responseCallback, maxWaitMs)

delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key))
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class DelayedRemoteFetchTest {
private val logStartOffset = 0L
private val currentLeaderEpoch = Optional.of[Integer](10)
private val replicaId = 1
private val remoteFetchMaxWaitMs = 30000L

private val fetchStatus = FetchPartitionStatus(
startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
Expand All @@ -65,7 +66,7 @@ class DelayedRemoteFetchTest {
val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset)

val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
Seq(topicIdPartition -> logReadInfo), replicaManager, callback, remoteFetchMaxWaitMs)

when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
.thenReturn(mock(classOf[Partition]))
Expand Down Expand Up @@ -103,7 +104,7 @@ class DelayedRemoteFetchTest {
val logReadInfo = buildReadResult(Errors.NONE)

val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
Seq(topicIdPartition -> logReadInfo), replicaManager, callback, remoteFetchMaxWaitMs)

// delayed remote fetch should still be able to complete
assertTrue(delayedRemoteFetch.tryComplete())
Expand Down Expand Up @@ -134,7 +135,7 @@ class DelayedRemoteFetchTest {
val logReadInfo = buildReadResult(Errors.FENCED_LEADER_EPOCH)

val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
Seq(topicIdPartition -> logReadInfo), replicaManager, callback, remoteFetchMaxWaitMs)

assertTrue(delayedRemoteFetch.tryComplete())
assertTrue(delayedRemoteFetch.isCompleted)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ public final class RemoteLogManagerConfig {
"less than or equal to `log.retention.bytes` value.";
public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L;

// TODO: temporal workaround, wait for workaround on KAFKA-15776
public static final String REMOTE_FETCH_MAX_WAIT_MS_PROP = "remote.fetch.max.wait.ms";
public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "Fetch timeout for reading remote log";
public static final int DEFAULT_REMOTE_FETCH_MAX_WAIT_MS = 2000;


public static final ConfigDef CONFIG_DEF = new ConfigDef();

static {
Expand Down Expand Up @@ -255,7 +261,13 @@ public final class RemoteLogManagerConfig {
DEFAULT_LOG_LOCAL_RETENTION_BYTES,
atLeast(DEFAULT_LOG_LOCAL_RETENTION_BYTES),
MEDIUM,
LOG_LOCAL_RETENTION_BYTES_DOC);
LOG_LOCAL_RETENTION_BYTES_DOC)
.define(REMOTE_FETCH_MAX_WAIT_MS_PROP,
LONG,
DEFAULT_REMOTE_FETCH_MAX_WAIT_MS,
atLeast(1),
MEDIUM,
REMOTE_FETCH_MAX_WAIT_MS_DOC);
}

private final boolean enableRemoteStorageSystem;
Expand All @@ -277,6 +289,7 @@ public final class RemoteLogManagerConfig {
private final HashMap<String, Object> remoteLogMetadataManagerProps;
private final String remoteLogMetadataManagerListenerName;
private final int remoteLogMetadataCustomMetadataMaxBytes;
private final long remoteFetchMaxWaitMs;

public RemoteLogManagerConfig(AbstractConfig config) {
this(config.getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP),
Expand All @@ -301,7 +314,8 @@ public RemoteLogManagerConfig(AbstractConfig config) {
config.getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP),
config.getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP) != null
? config.originalsWithPrefix(config.getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP))
: Collections.emptyMap());
: Collections.emptyMap(),
config.getLong(REMOTE_FETCH_MAX_WAIT_MS_PROP));
}

// Visible for testing
Expand All @@ -323,7 +337,8 @@ public RemoteLogManagerConfig(boolean enableRemoteStorageSystem,
String remoteStorageManagerPrefix,
Map<String, Object> remoteStorageManagerProps, /* properties having keys stripped out with remoteStorageManagerPrefix */
String remoteLogMetadataManagerPrefix,
Map<String, Object> remoteLogMetadataManagerProps /* properties having keys stripped out with remoteLogMetadataManagerPrefix */
Map<String, Object> remoteLogMetadataManagerProps, /* properties having keys stripped out with remoteLogMetadataManagerPrefix */
long remoteFetchMaxWaitMs
) {
this.enableRemoteStorageSystem = enableRemoteStorageSystem;
this.remoteStorageManagerClassName = remoteStorageManagerClassName;
Expand All @@ -344,6 +359,7 @@ public RemoteLogManagerConfig(boolean enableRemoteStorageSystem,
this.remoteLogMetadataManagerProps = new HashMap<>(remoteLogMetadataManagerProps);
this.remoteLogMetadataManagerListenerName = remoteLogMetadataManagerListenerName;
this.remoteLogMetadataCustomMetadataMaxBytes = remoteLogMetadataCustomMetadataMaxBytes;
this.remoteFetchMaxWaitMs = remoteFetchMaxWaitMs;
}

public boolean enableRemoteStorageSystem() {
Expand Down Expand Up @@ -422,6 +438,10 @@ public Map<String, Object> remoteLogMetadataManagerProps() {
return Collections.unmodifiableMap(remoteLogMetadataManagerProps);
}

public long remoteFetchMaxWaitMs() {
return remoteFetchMaxWaitMs;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void testValidConfigs(boolean useDefaultRemoteLogMetadataManagerClass) {
= new RemoteLogManagerConfig(true, "dummy.remote.storage.class", "dummy.remote.storage.class.path",
remoteLogMetadataManagerClass, "dummy.remote.log.metadata.class.path",
"listener.name", 1024 * 1024L, 1, 60000L, 100L, 60000L, 0.3, 10, 100, 100,
rsmPrefix, rsmProps, rlmmPrefix, rlmmProps);
rsmPrefix, rsmProps, rlmmPrefix, rlmmProps, 30000);

Map<String, Object> props = extractProps(expectedRemoteLogManagerConfig);
rsmProps.forEach((k, v) -> props.put(rsmPrefix + k, v));
Expand Down

0 comments on commit 6a8771e

Please sign in to comment.