Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.7.1 2024 07 23 #58

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# The workflow to check pull requests into main.
# This checks the source in the state as if after the merge.
name: Branch push checks
on: push
jobs:
build:
strategy:
matrix:
java-version: [ 11, 17 ]
runs-on: [ ubuntu-latest ]
name: Build on ${{ matrix.runs-on }} with jdk ${{ matrix.java-version }}
runs-on: ${{ matrix.runs-on }}
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set up JDK ${{ matrix.java-version }}
uses: actions/setup-java@v3
with:
java-version: ${{ matrix.java-version }}
distribution: temurin

- name: Build Docker image
run: make BRANCH=${GITHUB_REF_NAME} docker_image
38 changes: 38 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
##
# Copyright 2023 Aiven Oy
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
# Kafka 3.4.x
FROM confluentinc/cp-kafka:7.4.0

ARG _SCALA_VERSION
ARG _KAFKA_VERSION
ENV _KAFKA_FULL_VERSION "kafka_${_SCALA_VERSION}-${_KAFKA_VERSION}"

USER root
COPY core/build/distributions/${_KAFKA_FULL_VERSION}.tgz /
RUN cd / \
&& tar -xf ${_KAFKA_FULL_VERSION}.tgz \
&& rm -r /usr/share/java/kafka/* \
&& cp /${_KAFKA_FULL_VERSION}/libs/* /usr/share/java/kafka/ \
&& ln -s /usr/share/java/kafka/${_KAFKA_FULL_VERSION}.jar /usr/share/java/kafka/kafka.jar \
&& rm -r /${_KAFKA_FULL_VERSION}.tgz /${_KAFKA_FULL_VERSION}

# Add test jars with local implementations.
COPY clients/build/libs/kafka-clients-${_KAFKA_VERSION}-test.jar /usr/share/java/kafka/
COPY storage/build/libs/kafka-storage-${_KAFKA_VERSION}-test.jar /usr/share/java/kafka/
COPY storage/api/build/libs/kafka-storage-api-${_KAFKA_VERSION}-test.jar /usr/share/java/kafka/

# Restore the user.
USER appuser
48 changes: 48 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
##
# Copyright 2023 Aiven Oy
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
SCALA_VERSION := $(shell grep -o -E '^scalaVersion=[0-9]+\.[0-9]+\.[0-9]+' gradle.properties | cut -c14-)
SCALA_MAJOR_VERSION := $(shell grep -o -E '^scalaVersion=[0-9]+\.[0-9]+' gradle.properties | cut -c14-)
KAFKA_VERSION := $(shell grep -o -E '^version=[0-9]+\.[0-9]+\.[0-9]+(-SNAPSHOT)?' gradle.properties | cut -c9-)
IMAGE_NAME=aivenoy/kafka

BRANCH := $(shell git rev-parse --abbrev-ref HEAD)
IMAGE_TAG := $(subst /,_,$(BRANCH))

.PHONY: all build clean

all: clean build

clean:
./gradlew clean

build: core/build/distributions/kafka_$(SCALA_VERSION)-$(KAFKA_VERSION).tgz

core/build/distributions/kafka_$(SCALA_VERSION)-$(KAFKA_VERSION).tgz:
echo "Version: $(KAFKA_VERSION)-$(SCALA_VERSION)"
./gradlew -PscalaVersion=$(SCALA_VERSION) testJar releaseTarGz

.PHONY: docker_image
docker_image: build
docker build . \
--build-arg _SCALA_VERSION=$(SCALA_MAJOR_VERSION) \
--build-arg _KAFKA_VERSION=$(KAFKA_VERSION) \
-t $(IMAGE_NAME):$(IMAGE_TAG)

.PHONY: docker_push
docker_push:
docker push $(IMAGE_NAME):$(IMAGE_TAG)

# TODO publish docker images
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.kafka.clients.admin;

import java.util.Optional;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfig;
Expand All @@ -35,6 +37,7 @@
*/
public class NewTopic {

private final Uuid id;
private final String name;
private final Optional<Integer> numPartitions;
private final Optional<Short> replicationFactor;
Expand All @@ -48,12 +51,17 @@ public NewTopic(String name, int numPartitions, short replicationFactor) {
this(name, Optional.of(numPartitions), Optional.of(replicationFactor));
}

public NewTopic(String name, Optional<Integer> numPartitions, Optional<Short> replicationFactor) {
this(null, name, numPartitions, replicationFactor);
}

/**
* A new topic that optionally defaults {@code numPartitions} and {@code replicationFactor} to
* the broker configurations for {@code num.partitions} and {@code default.replication.factor}
* respectively.
*/
public NewTopic(String name, Optional<Integer> numPartitions, Optional<Short> replicationFactor) {
public NewTopic(Uuid id, String name, Optional<Integer> numPartitions, Optional<Short> replicationFactor) {
this.id = id;
this.name = name;
this.numPartitions = numPartitions;
this.replicationFactor = replicationFactor;
Expand All @@ -68,12 +76,17 @@ public NewTopic(String name, Optional<Integer> numPartitions, Optional<Short> re
* generally a good idea for all partitions to have the same number of replicas.
*/
public NewTopic(String name, Map<Integer, List<Integer>> replicasAssignments) {
this.id = null;
this.name = name;
this.numPartitions = Optional.empty();
this.replicationFactor = Optional.empty();
this.replicasAssignments = Collections.unmodifiableMap(replicasAssignments);
}

public Uuid id() {
return id;
}

/**
* The name of the topic to be created.
*/
Expand Down Expand Up @@ -126,6 +139,9 @@ CreatableTopic convertToCreatableTopic() {
setName(name).
setNumPartitions(numPartitions.orElse(CreateTopicsRequest.NO_NUM_PARTITIONS)).
setReplicationFactor(replicationFactor.orElse(CreateTopicsRequest.NO_REPLICATION_FACTOR));
if (id != null) {
creatableTopic.setId(id);
}
if (replicasAssignments != null) {
for (Entry<Integer, List<Integer>> entry : replicasAssignments.entrySet()) {
creatableTopic.assignments().add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@
"about": "The configuration name." },
{ "name": "Value", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The configuration value." }
]}
]},
{ "name": "Id", "type": "uuid", "tag": 0, "taggedVersions": "5+", "versions": "5+", "ignorable": true,
"about": "Optional topic id."}
]},
{ "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "60000",
"about": "How long to wait in milliseconds before timing out the request." },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,17 @@ boolean isCycle(String topic) {
String source = replicationPolicy.topicSource(topic);
if (source == null) {
return false;
} else if (source.equals(sourceAndTarget.target())) {
}

// Fix for https://issues.apache.org/jira/browse/KAFKA-9914
final boolean condition;
if (replicationPolicy instanceof IdentityReplicationPolicy) {
condition = source.equals(sourceAndTarget.target());
} else {
condition = source.equals(sourceAndTarget.source()) || source.equals(sourceAndTarget.target());
}

if (condition) {
return true;
} else {
String upstreamTopic = replicationPolicy.upstreamTopic(topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,31 @@ public void testReplicatesHeartbeatsDespiteFilter() {
public void testNoCycles() {
MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
new DefaultReplicationPolicy(), x -> true, getConfigPropertyFilter());
assertFalse(connector.shouldReplicateTopic("source.topic1"), "should not allow cycles");
assertFalse(connector.shouldReplicateTopic("target.topic1"), "should not allow cycles");
assertFalse(connector.shouldReplicateTopic("target.source.topic1"), "should not allow cycles");
assertFalse(connector.shouldReplicateTopic("source.target.topic1"), "should not allow cycles");
assertFalse(connector.shouldReplicateTopic("target.source.target.topic1"), "should not allow cycles");
assertFalse(connector.shouldReplicateTopic("source.target.source.topic1"), "should not allow cycles");
assertTrue(connector.shouldReplicateTopic("topic1"), "should allow anything else");
assertTrue(connector.shouldReplicateTopic("source.topic1"), "should allow anything else");
assertTrue(connector.shouldReplicateTopic("othersource.topic1"), "should allow anything else");
assertTrue(connector.shouldReplicateTopic("othertarget.topic1"), "should allow anything else");
assertTrue(connector.shouldReplicateTopic("other.another.topic1"), "should allow anything else");

final IdentityReplicationPolicy identityReplicationPolicy = new IdentityReplicationPolicy();
final HashMap<String, String> props = new HashMap<>();
props.put("source.cluster.alias", "source");
identityReplicationPolicy.configure(props);
connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
identityReplicationPolicy, x -> true, x -> true);
assertTrue(connector.shouldReplicateTopic("source.topic1"), "should not consider this a cycle");
assertTrue(connector.shouldReplicateTopic("target.topic1"), "should not consider this a cycle");
assertTrue(connector.shouldReplicateTopic("target.source.topic1"), "should not consider this a cycle");
assertTrue(connector.shouldReplicateTopic("source.target.topic1"), "should not consider this a cycle");
assertTrue(connector.shouldReplicateTopic("topic1"), "should not consider this a cycle");
assertTrue(connector.shouldReplicateTopic("othersource.topic1"), "should not consider this a cycle");
assertTrue(connector.shouldReplicateTopic("othertarget.topic1"), "should not consider this a cycle");
assertTrue(connector.shouldReplicateTopic("other.another.topic1"), "should not consider this a cycle");
}

@Test
Expand Down
13 changes: 8 additions & 5 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -831,14 +831,13 @@ public void run() {
}
} catch (InterruptedException ex) {
if (!isCancelled()) {
logger.warn("Current thread for topic-partition-id {} is interrupted. Reason: {}", topicIdPartition, ex.getMessage());
logger.warn("Current thread for topic-partition-id {} is interrupted", topicIdPartition, ex);
}
} catch (RetriableException ex) {
logger.debug("Encountered a retryable error while executing current task for topic-partition {}", topicIdPartition, ex);
} catch (Exception ex) {
if (!isCancelled()) {
logger.warn("Current task for topic-partition {} received error but it will be scheduled. " +
"Reason: {}", topicIdPartition, ex.getMessage());
logger.warn("Current task for topic-partition {} received error but it will be scheduled", topicIdPartition, ex);
}
}
}
Expand Down Expand Up @@ -879,7 +878,9 @@ private boolean isSegmentBreachedByRetentionSize(RemoteLogSegmentMetadata metada
}
}
if (shouldDeleteSegment) {
logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
if (!logStartOffset.isPresent() || logStartOffset.getAsLong() < metadata.endOffset() + 1) {
logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
}
logger.info("About to delete remote log segment {} due to retention size {} breach. Log size after deletion will be {}.",
metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize);
}
Expand All @@ -896,7 +897,9 @@ public boolean isSegmentBreachedByRetentionTime(RemoteLogSegmentMetadata metadat
remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals
// are ascending with in an epoch.
logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
if (!logStartOffset.isPresent() || logStartOffset.getAsLong() < metadata.endOffset() + 1) {
logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
}
logger.info("About to delete remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment",
metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs);
}
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/kafka/server/ConfigHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,19 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager,
val logs = logManager.logsByTopic(topic)
val wasRemoteLogEnabledBeforeUpdate = logs.exists(_.remoteLogEnabled())

maybeFailIfDisablingRemoteStorage(props, wasRemoteLogEnabledBeforeUpdate)
logManager.updateTopicConfig(topic, props, kafkaConfig.isRemoteLogStorageSystemEnabled)
maybeBootstrapRemoteLogComponents(topic, logs, wasRemoteLogEnabledBeforeUpdate)
}

private[server] def maybeFailIfDisablingRemoteStorage(props: Properties,
wasRemoteLogEnabledBeforeUpdate: Boolean): Unit = {
val isRemoteLogToBeEnabled = props.getProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, String.valueOf(wasRemoteLogEnabledBeforeUpdate))
if (wasRemoteLogEnabledBeforeUpdate && !java.lang.Boolean.parseBoolean(isRemoteLogToBeEnabled)) {
throw new IllegalArgumentException(s"Disabling remote log on the topic is not supported.")
}
}

private[server] def maybeBootstrapRemoteLogComponents(topic: String,
logs: Seq[UnifiedLog],
wasRemoteLogEnabledBeforeUpdate: Boolean): Unit = {
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
fetchParams: FetchParams,
localReadResults: Seq[(TopicIdPartition, LogReadResult)],
replicaManager: ReplicaManager,
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit)
extends DelayedOperation(fetchParams.maxWaitMs) {
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit,
// TODO: temporal workaround, wait for workaround on KAFKA-15776, see KIP-1018
maxWaitMs: Long)
extends DelayedOperation(maxWaitMs) {

if (fetchParams.isFromFollower) {
throw new IllegalStateException(s"The follower should not invoke remote fetch. Fetch params are: $fetchParams")
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1613,8 +1613,10 @@ class ReplicaManager(val config: KafkaConfig,
return Some(createLogReadResult(e))
}

// TODO: temporal workaround, wait for workaround on KAFKA-15776, see KIP-1018
val maxWaitMs = config.remoteLogManagerConfig.fetchRemoteMaxWaitMs()
val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo,
fetchPartitionStatus, params, logReadResults, this, responseCallback)
fetchPartitionStatus, params, logReadResults, this, responseCallback, maxWaitMs)

delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key))
None
Expand Down
13 changes: 10 additions & 3 deletions core/src/main/scala/kafka/server/ZkAdminManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ import org.apache.kafka.admin.AdminUtils
import org.apache.kafka.clients.admin.{AlterConfigOp, ScramMechanism}
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.config.{ConfigDef, ConfigException, ConfigResource}
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedVersionException}
import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, ThrottlingQuotaExceededException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedVersionException}
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData.AlterUserScramCredentialsResult
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
Expand Down Expand Up @@ -167,6 +166,14 @@ class ZkAdminManager(val config: KafkaConfig,
try {
if (metadataCache.contains(topic.name))
throw new TopicExistsException(s"Topic '${topic.name}' already exists.")
val maybeUuid = topic.id() match {
case Uuid.ZERO_UUID => None
case id =>
if (metadataCache.topicNamesToIds().containsValue(id)) {
throw new TopicExistsException(s"Topic id '$id' already exists.")
}
Some(id)
}

val nullConfigs = topic.configs.asScala.filter(_.value == null).map(_.name)
if (nullConfigs.nonEmpty)
Expand Down Expand Up @@ -211,7 +218,7 @@ class ZkAdminManager(val config: KafkaConfig,
CreatePartitionsMetadata(topic.name, assignments.keySet)
} else {
controllerMutationQuota.record(assignments.size)
adminZkClient.createTopicWithAssignment(topic.name, configs, assignments, validate = false, config.usesTopicId)
adminZkClient.createTopicWithAssignment(topic.name, configs, assignments, validate = false, config.usesTopicId, maybeUuid)
populateIds(includeConfigsAndMetadata, topic.name)
CreatePartitionsMetadata(topic.name, assignments.keySet)
}
Expand Down
Loading
Loading