diff --git a/LICENSE-binary b/LICENSE-binary index 15db2a14808de..d6d7dd7219a85 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -259,7 +259,7 @@ scala-library-2.13.12 scala-logging_2.13-3.9.4 scala-reflect-2.13.12 scala-java8-compat_2.13-1.0.2 -snappy-java-1.1.10.3 +snappy-java-1.1.10.4 swagger-annotations-2.2.8 zookeeper-3.8.2 zookeeper-jute-3.8.2 diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala index b1e39ebde49fb..6135ec952ca90 100755 --- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala @@ -19,8 +19,10 @@ package kafka.api.test import kafka.server.{KafkaBroker, KafkaConfig, QuorumTestHarness} import kafka.utils.TestUtils -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.header.Header +import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.serialization.ByteArraySerializer @@ -29,7 +31,10 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.CsvSource +import java.util.concurrent.Future import java.util.{Collections, Properties} +import scala.collection.mutable.ListBuffer +import scala.util.Random class ProducerCompressionTest extends QuorumTestHarness { @@ -64,10 +69,10 @@ class ProducerCompressionTest extends QuorumTestHarness { "kraft,snappy", "kraft,lz4", "kraft,zstd", - "zk,gzip" + "zk,gzip", + "zk,snappy" )) def testCompression(quorum: String, compression: String): Unit = { - val producerProps = new Properties() val bootstrapServers = TestUtils.plaintextBootstrapServers(Seq(broker)) producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) @@ -88,13 +93,28 @@ class ProducerCompressionTest extends QuorumTestHarness { } val partition = 0 + def messageValue(length: Int): String = { + val random = new Random(0) + new String(random.alphanumeric.take(length).toArray) + } + // prepare the messages - val messageValues = (0 until numRecords).map(i => "value" + i) + val messageValues = (0 until numRecords).map(i => messageValue(i)) + val headerArr = Array[Header](new RecordHeader("key", "value".getBytes)) + val headers = new RecordHeaders(headerArr) // make sure the returned messages are correct val now = System.currentTimeMillis() - val responses = for (message <- messageValues) - yield producer.send(new ProducerRecord(topic, null, now, null, message.getBytes)) + val responses: ListBuffer[Future[RecordMetadata]] = new ListBuffer[Future[RecordMetadata]]() + + for (message <- messageValues) { + // 1. send message without key and header + responses += producer.send(new ProducerRecord(topic, null, now, null, message.getBytes)) + // 2. send message with key, without header + responses += producer.send(new ProducerRecord(topic, null, now, message.length.toString.getBytes, message.getBytes)) + // 3. send message with key and header + responses += producer.send(new ProducerRecord(topic, null, now, message.length.toString.getBytes, message.getBytes, headers)) + } for ((future, offset) <- responses.zipWithIndex) { assertEquals(offset.toLong, future.get.offset) } @@ -103,12 +123,37 @@ class ProducerCompressionTest extends QuorumTestHarness { // make sure the fetched message count match consumer.assign(Collections.singleton(tp)) consumer.seek(tp, 0) - val records = TestUtils.consumeRecords(consumer, numRecords) + val records = TestUtils.consumeRecords(consumer, numRecords*3) + + for (i <- 0 until numRecords) { + val messageValue = messageValues(i) + // 1. verify message without key and header + var offset = i * 3 + var record = records(offset) + assertNull(record.key()) + assertEquals(messageValue, new String(record.value)) + assertEquals(0, record.headers().toArray.length) + assertEquals(now, record.timestamp) + assertEquals(offset.toLong, record.offset) + + // 2. verify message with key, without header + offset = i * 3 + 1 + record = records(offset) + assertEquals(messageValue.length.toString, new String(record.key())) + assertEquals(messageValue, new String(record.value)) + assertEquals(0, record.headers().toArray.length) + assertEquals(now, record.timestamp) + assertEquals(offset.toLong, record.offset) - for (((messageValue, record), index) <- messageValues.zip(records).zipWithIndex) { + // 3. verify message with key and header + offset = i * 3 + 2 + record = records(offset) + assertEquals(messageValue.length.toString, new String(record.key())) assertEquals(messageValue, new String(record.value)) + assertEquals(1, record.headers().toArray.length) + assertEquals(headerArr.apply(0), record.headers().toArray.apply(0)) assertEquals(now, record.timestamp) - assertEquals(index.toLong, record.offset) + assertEquals(offset.toLong, record.offset) } } finally { producer.close() diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala index 5ab53699cd433..132a77ff97bc9 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala @@ -144,9 +144,9 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati case _ => // the broker assigns absolute offsets for message format 0 which potentially causes the compressed size to // increase because the broker offsets are larger than the ones assigned by the client - // adding `5` to the message set size is good enough for this test: it covers the increased message size while + // adding `6` to the message set size is good enough for this test: it covers the increased message size while // still being less than the overhead introduced by the conversion from message format version 0 to 1 - largeMessageSet.sizeInBytes + 5 + largeMessageSet.sizeInBytes + 6 } cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize = maxMessageSize) diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index c61abd3a26487..013c8cbc5a798 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -156,7 +156,7 @@ versions += [ scalaJava8Compat : "1.0.2", scoverage: "1.9.3", slf4j: "1.7.36", - snappy: "1.1.10.3", + snappy: "1.1.10.4", spotbugs: "4.7.3", // New version of Swagger 2.2.14 requires minimum JDK 11. swaggerAnnotations: "2.2.8",