Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to Kafka 3.0.0 #91

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Two solutions are provided:
## Requirements

- Java 8
- Kafka 2.2.0
- Kafka 3.0.0

## Installation

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
Expand Down Expand Up @@ -300,6 +300,11 @@ public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collectio
return consumer.endOffsets(collection, duration);
}

@Override
public OptionalLong currentLag(TopicPartition topicPartition) {
return consumer.currentLag(topicPartition);
}

@Override
public ConsumerGroupMetadata groupMetadata() {
return consumer.groupMetadata();
Expand All @@ -315,12 +320,6 @@ public void close() {
consumer.close();
}

@Override
@Deprecated
public void close(long l, TimeUnit timeUnit) {
consumer.close(l, timeUnit);
}

@Override
public void close(Duration duration) {
consumer.close(duration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
Expand Down Expand Up @@ -166,9 +165,4 @@ public void close(Duration duration) {
producer.close(duration);
}

@Override
public void close(long timeout, TimeUnit timeUnit) {
producer.close(timeout, timeUnit);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.opentracing.contrib.kafka;


import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
Expand All @@ -26,11 +25,14 @@
import io.opentracing.mock.MockTracer;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
Expand All @@ -49,22 +51,33 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;

public class TracingKafkaTest {

@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(2, true, 2, "messages");
private static EmbeddedKafkaCluster cluster;
private static final MockTracer mockTracer = new MockTracer();

@BeforeClass
public static void init() {
public static void init() throws InterruptedException, IOException {
GlobalTracer.registerIfAbsent(mockTracer);

cluster = new EmbeddedKafkaCluster(2);
cluster.start();
cluster.createTopic("messages", 2, 2);
}

@AfterClass
public static void shutdown() {
cluster.stop();
}

@Before
Expand All @@ -74,10 +87,8 @@ public void before() {

@Test
public void with_interceptors() throws Exception {
Map<String, Object> senderProps = KafkaTestUtils
.producerProps(embeddedKafka.getEmbeddedKafka());
senderProps
.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
Map<String, Object> senderProps = producerProps();
senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);

producer.send(new ProducerRecord<>("messages", 1, "test"));
Expand Down Expand Up @@ -246,10 +257,6 @@ public void nullKey() throws Exception {
ProducerRecord<Integer, String> record = new ProducerRecord<>("messages", "test");
producer.send(record);

final Map<String, Object> consumerProps = KafkaTestUtils
.consumerProps("sampleRawConsumer", "false", embeddedKafka.getEmbeddedKafka());
consumerProps.put("auto.offset.reset", "earliest");

final CountDownLatch latch = new CountDownLatch(1);
createConsumer(latch, null, false, null);

Expand All @@ -271,9 +278,7 @@ public void testSeekInConsumerAndCloseInProducer() throws InterruptedException {

ExecutorService executorService = Executors.newSingleThreadExecutor();

final Map<String, Object> consumerProps = KafkaTestUtils
.consumerProps("sampleRawConsumer", "false", embeddedKafka.getEmbeddedKafka());
consumerProps.put("auto.offset.reset", "earliest");
final Map<String, Object> consumerProps = consumerProps("sampleRawConsumer", true);

executorService.execute(() -> {
KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
Expand Down Expand Up @@ -591,8 +596,7 @@ public <K, V> void onError(Exception exception, Span span) {
}

private Producer<Integer, String> createProducer() {
Map<String, Object> senderProps = KafkaTestUtils
.producerProps(embeddedKafka.getEmbeddedKafka());
Map<String, Object> senderProps = producerProps();
return new KafkaProducer<>(senderProps);
}

Expand All @@ -603,9 +607,7 @@ private Producer<Integer, String> createNameProvidedProducer(

private Consumer<Integer, String> createConsumerWithDecorators(
Collection<SpanDecorator> spanDecorators) {
Map<String, Object> consumerProps = KafkaTestUtils
.consumerProps("sampleRawConsumer", "false", embeddedKafka.getEmbeddedKafka());
consumerProps.put("auto.offset.reset", "earliest");
Map<String, Object> consumerProps = consumerProps("sampleRawConsumer", true);
KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
TracingKafkaConsumerBuilder tracingKafkaConsumerBuilder =
new TracingKafkaConsumerBuilder(kafkaConsumer, mockTracer);
Expand All @@ -621,9 +623,7 @@ private Consumer<Integer, String> createConsumerWithDecorators(

private Consumer<Integer, String> createConsumerWithSpanNameProvider(
BiFunction<String, ConsumerRecord, String> spanNameProvider) {
Map<String, Object> consumerProps = KafkaTestUtils
.consumerProps("sampleRawConsumer", "false", embeddedKafka.getEmbeddedKafka());
consumerProps.put("auto.offset.reset", "earliest");
Map<String, Object> consumerProps = consumerProps("sampleRawConsumer", true);
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerProps);
TracingKafkaConsumerBuilder tracingKafkaConsumerBuilder =
new TracingKafkaConsumerBuilder(kafkaConsumer, mockTracer);
Expand All @@ -640,8 +640,7 @@ private Consumer<Integer, String> createConsumerWithSpanNameProvider(

private Producer<Integer, String> createProducerWithDecorators(
Collection<SpanDecorator> spanDecorators) {
Map<String, Object> senderProps = KafkaTestUtils
.producerProps(embeddedKafka.getEmbeddedKafka());
Map<String, Object> senderProps = producerProps();
KafkaProducer kafkaProducer = new KafkaProducer<>(senderProps);
TracingKafkaProducerBuilder tracingKafkaProducerBuilder =
new TracingKafkaProducerBuilder<>(kafkaProducer, mockTracer);
Expand All @@ -654,8 +653,7 @@ private Producer<Integer, String> createProducerWithDecorators(

private Producer<Integer, String> createProducerWithSpanNameProvider(
BiFunction<String, ProducerRecord, String> spanNameProvider) {
Map<String, Object> senderProps = KafkaTestUtils
.producerProps(embeddedKafka.getEmbeddedKafka());
Map<String, Object> senderProps = producerProps();
KafkaProducer kafkaProducer = new KafkaProducer<>(senderProps);
TracingKafkaProducerBuilder tracingKafkaProducerBuilder =
new TracingKafkaProducerBuilder<>(kafkaProducer, mockTracer);
Expand All @@ -674,9 +672,7 @@ private void createConsumer(final CountDownLatch latch, final Integer key,

ExecutorService executorService = Executors.newSingleThreadExecutor();

final Map<String, Object> consumerProps = KafkaTestUtils
.consumerProps("sampleRawConsumer", "false", embeddedKafka.getEmbeddedKafka());
consumerProps.put("auto.offset.reset", "earliest");
final Map<String, Object> consumerProps = consumerProps("sampleRawConsumer", true);
if (withInterceptor) {
consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
TracingConsumerInterceptor.class.getName());
Expand Down Expand Up @@ -764,4 +760,32 @@ private List<MockSpan> getByOperationNameAll(List<MockSpan> spans, String operat
return found;
}

private Map<String, Object> producerProps() {
final Map<String, Object> producerProps = new HashMap<>();

producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
producerProps.put(ProducerConfig.RETRIES_CONFIG, 0);
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, 1);
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

return producerProps;
}

private Map<String, Object> consumerProps(String consumerGroup, boolean autoCommit) {
final Map<String, Object> consumerProps = new HashMap<>();

consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
consumerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10");
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

return consumerProps;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,19 @@
*/
package io.opentracing.contrib.kafka.spring;

import static io.opentracing.contrib.kafka.spring.TracingSpringKafkaTest.embeddedKafka;
import static io.opentracing.contrib.kafka.spring.TracingSpringKafkaTest.cluster;

import io.opentracing.mock.MockTracer;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
Expand All @@ -28,7 +37,6 @@
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.test.utils.KafkaTestUtils;

@Configuration
@EnableKafka
Expand All @@ -52,18 +60,15 @@ public MockTracer tracer() {

@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
final Map<String, Object> consumerProps = KafkaTestUtils
.consumerProps("sampleRawConsumer", "false", embeddedKafka.getEmbeddedKafka());
consumerProps.put("auto.offset.reset", "earliest");
final Map<String, Object> consumerProps = consumerProps("sampleRawConsumer", true);

return new TracingConsumerFactory<>(new DefaultKafkaConsumerFactory<>(consumerProps), tracer());
}


@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new TracingProducerFactory<>(new DefaultKafkaProducerFactory<>(
KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka())), tracer());
return new TracingProducerFactory<>(new DefaultKafkaProducerFactory<>(producerProps()), tracer());
}

@Bean
Expand All @@ -75,4 +80,33 @@ public KafkaTemplate<Integer, String> kafkaTemplate() {
public TracingKafkaAspect tracingKafkaAspect() {
return new TracingKafkaAspect(tracer());
}

private Map<String, Object> producerProps() {
final Map<String, Object> producerProps = new HashMap<>();

producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
producerProps.put(ProducerConfig.RETRIES_CONFIG, 0);
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, 1);
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

return producerProps;
}

private Map<String, Object> consumerProps(String consumerGroup, boolean autoCommit) {
final Map<String, Object> consumerProps = new HashMap<>();

consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
consumerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10");
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

return consumerProps;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@

import io.opentracing.mock.MockSpan;
import io.opentracing.mock.MockTracer;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -39,15 +45,26 @@
@ContextConfiguration(classes = {TestConfiguration.class})
public class TracingSpringKafkaTest {

@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(2, true, 2, "spring");

@Autowired
private MockTracer mockTracer;

@Autowired
private KafkaTemplate<Integer, String> kafkaTemplate;

public static EmbeddedKafkaCluster cluster;

@BeforeClass
public static void init() throws InterruptedException, IOException {
cluster = new EmbeddedKafkaCluster(2);
cluster.start();
cluster.createTopic("spring", 2, 2);
}

@AfterClass
public static void shutdown() {
cluster.stop();
}

@Before
public void before() {
mockTracer.reset();
Expand Down
Loading