diff --git a/README.md b/README.md index c429e87..6d2cb47 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ Two solutions are provided: ## Requirements - Java 8 -- Kafka 2.2.0 +- Kafka 3.0.0 ## Installation diff --git a/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaConsumer.java b/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaConsumer.java index e00a363..7f44551 100644 --- a/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaConsumer.java +++ b/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaConsumer.java @@ -22,8 +22,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.OptionalLong; import java.util.Set; -import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.Consumer; @@ -300,6 +300,11 @@ public Map endOffsets(Collection collectio return consumer.endOffsets(collection, duration); } + @Override + public OptionalLong currentLag(TopicPartition topicPartition) { + return consumer.currentLag(topicPartition); + } + @Override public ConsumerGroupMetadata groupMetadata() { return consumer.groupMetadata(); @@ -315,12 +320,6 @@ public void close() { consumer.close(); } - @Override - @Deprecated - public void close(long l, TimeUnit timeUnit) { - consumer.close(l, timeUnit); - } - @Override public void close(Duration duration) { consumer.close(duration); diff --git a/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaProducer.java b/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaProducer.java index 3447d68..08b0179 100644 --- a/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaProducer.java +++ b/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaProducer.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -166,9 +165,4 @@ public void close(Duration duration) { producer.close(duration); } - @Override - public void close(long timeout, TimeUnit timeUnit) { - producer.close(timeout, timeUnit); - } - } diff --git a/opentracing-kafka-client/src/test/java/io/opentracing/contrib/kafka/TracingKafkaTest.java b/opentracing-kafka-client/src/test/java/io/opentracing/contrib/kafka/TracingKafkaTest.java index 5bb11b1..013e008 100644 --- a/opentracing-kafka-client/src/test/java/io/opentracing/contrib/kafka/TracingKafkaTest.java +++ b/opentracing-kafka-client/src/test/java/io/opentracing/contrib/kafka/TracingKafkaTest.java @@ -13,7 +13,6 @@ */ package io.opentracing.contrib.kafka; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -26,11 +25,14 @@ import io.opentracing.mock.MockTracer; import io.opentracing.tag.Tags; import io.opentracing.util.GlobalTracer; + +import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -49,22 +51,33 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.ClassRule; import org.junit.Test; -import org.springframework.kafka.test.rule.EmbeddedKafkaRule; -import org.springframework.kafka.test.utils.KafkaTestUtils; public class TracingKafkaTest { - @ClassRule - public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(2, true, 2, "messages"); + private static EmbeddedKafkaCluster cluster; private static final MockTracer mockTracer = new MockTracer(); @BeforeClass - public static void init() { + public static void init() throws InterruptedException, IOException { GlobalTracer.registerIfAbsent(mockTracer); + + cluster = new EmbeddedKafkaCluster(2); + cluster.start(); + cluster.createTopic("messages", 2, 2); + } + + @AfterClass + public static void shutdown() { + cluster.stop(); } @Before @@ -74,10 +87,8 @@ public void before() { @Test public void with_interceptors() throws Exception { - Map senderProps = KafkaTestUtils - .producerProps(embeddedKafka.getEmbeddedKafka()); - senderProps - .put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); + Map senderProps = producerProps(); + senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); KafkaProducer producer = new KafkaProducer<>(senderProps); producer.send(new ProducerRecord<>("messages", 1, "test")); @@ -246,10 +257,6 @@ public void nullKey() throws Exception { ProducerRecord record = new ProducerRecord<>("messages", "test"); producer.send(record); - final Map consumerProps = KafkaTestUtils - .consumerProps("sampleRawConsumer", "false", embeddedKafka.getEmbeddedKafka()); - consumerProps.put("auto.offset.reset", "earliest"); - final CountDownLatch latch = new CountDownLatch(1); createConsumer(latch, null, false, null); @@ -271,9 +278,7 @@ public void testSeekInConsumerAndCloseInProducer() throws InterruptedException { ExecutorService executorService = Executors.newSingleThreadExecutor(); - final Map consumerProps = KafkaTestUtils - .consumerProps("sampleRawConsumer", "false", embeddedKafka.getEmbeddedKafka()); - consumerProps.put("auto.offset.reset", "earliest"); + final Map consumerProps = consumerProps("sampleRawConsumer", true); executorService.execute(() -> { KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerProps); @@ -591,8 +596,7 @@ public void onError(Exception exception, Span span) { } private Producer createProducer() { - Map senderProps = KafkaTestUtils - .producerProps(embeddedKafka.getEmbeddedKafka()); + Map senderProps = producerProps(); return new KafkaProducer<>(senderProps); } @@ -603,9 +607,7 @@ private Producer createNameProvidedProducer( private Consumer createConsumerWithDecorators( Collection spanDecorators) { - Map consumerProps = KafkaTestUtils - .consumerProps("sampleRawConsumer", "false", embeddedKafka.getEmbeddedKafka()); - consumerProps.put("auto.offset.reset", "earliest"); + Map consumerProps = consumerProps("sampleRawConsumer", true); KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerProps); TracingKafkaConsumerBuilder tracingKafkaConsumerBuilder = new TracingKafkaConsumerBuilder(kafkaConsumer, mockTracer); @@ -621,9 +623,7 @@ private Consumer createConsumerWithDecorators( private Consumer createConsumerWithSpanNameProvider( BiFunction spanNameProvider) { - Map consumerProps = KafkaTestUtils - .consumerProps("sampleRawConsumer", "false", embeddedKafka.getEmbeddedKafka()); - consumerProps.put("auto.offset.reset", "earliest"); + Map consumerProps = consumerProps("sampleRawConsumer", true); KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerProps); TracingKafkaConsumerBuilder tracingKafkaConsumerBuilder = new TracingKafkaConsumerBuilder(kafkaConsumer, mockTracer); @@ -640,8 +640,7 @@ private Consumer createConsumerWithSpanNameProvider( private Producer createProducerWithDecorators( Collection spanDecorators) { - Map senderProps = KafkaTestUtils - .producerProps(embeddedKafka.getEmbeddedKafka()); + Map senderProps = producerProps(); KafkaProducer kafkaProducer = new KafkaProducer<>(senderProps); TracingKafkaProducerBuilder tracingKafkaProducerBuilder = new TracingKafkaProducerBuilder<>(kafkaProducer, mockTracer); @@ -654,8 +653,7 @@ private Producer createProducerWithDecorators( private Producer createProducerWithSpanNameProvider( BiFunction spanNameProvider) { - Map senderProps = KafkaTestUtils - .producerProps(embeddedKafka.getEmbeddedKafka()); + Map senderProps = producerProps(); KafkaProducer kafkaProducer = new KafkaProducer<>(senderProps); TracingKafkaProducerBuilder tracingKafkaProducerBuilder = new TracingKafkaProducerBuilder<>(kafkaProducer, mockTracer); @@ -674,9 +672,7 @@ private void createConsumer(final CountDownLatch latch, final Integer key, ExecutorService executorService = Executors.newSingleThreadExecutor(); - final Map consumerProps = KafkaTestUtils - .consumerProps("sampleRawConsumer", "false", embeddedKafka.getEmbeddedKafka()); - consumerProps.put("auto.offset.reset", "earliest"); + final Map consumerProps = consumerProps("sampleRawConsumer", true); if (withInterceptor) { consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); @@ -764,4 +760,32 @@ private List getByOperationNameAll(List spans, String operat return found; } + private Map producerProps() { + final Map producerProps = new HashMap<>(); + + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); + producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); + producerProps.put(ProducerConfig.LINGER_MS_CONFIG, 1); + producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return producerProps; + } + + private Map consumerProps(String consumerGroup, boolean autoCommit) { + final Map consumerProps = new HashMap<>(); + + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup); + consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit); + consumerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10"); + consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000"); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + return consumerProps; + } } diff --git a/opentracing-kafka-spring/src/test/java/io/opentracing/contrib/kafka/spring/TestConfiguration.java b/opentracing-kafka-spring/src/test/java/io/opentracing/contrib/kafka/spring/TestConfiguration.java index 33bb56c..624bfee 100644 --- a/opentracing-kafka-spring/src/test/java/io/opentracing/contrib/kafka/spring/TestConfiguration.java +++ b/opentracing-kafka-spring/src/test/java/io/opentracing/contrib/kafka/spring/TestConfiguration.java @@ -13,10 +13,19 @@ */ package io.opentracing.contrib.kafka.spring; -import static io.opentracing.contrib.kafka.spring.TracingSpringKafkaTest.embeddedKafka; +import static io.opentracing.contrib.kafka.spring.TracingSpringKafkaTest.cluster; import io.opentracing.mock.MockTracer; + +import java.util.HashMap; import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; @@ -28,7 +37,6 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.test.utils.KafkaTestUtils; @Configuration @EnableKafka @@ -52,9 +60,7 @@ public MockTracer tracer() { @Bean public ConsumerFactory consumerFactory() { - final Map consumerProps = KafkaTestUtils - .consumerProps("sampleRawConsumer", "false", embeddedKafka.getEmbeddedKafka()); - consumerProps.put("auto.offset.reset", "earliest"); + final Map consumerProps = consumerProps("sampleRawConsumer", true); return new TracingConsumerFactory<>(new DefaultKafkaConsumerFactory<>(consumerProps), tracer()); } @@ -62,8 +68,7 @@ public ConsumerFactory consumerFactory() { @Bean public ProducerFactory producerFactory() { - return new TracingProducerFactory<>(new DefaultKafkaProducerFactory<>( - KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka())), tracer()); + return new TracingProducerFactory<>(new DefaultKafkaProducerFactory<>(producerProps()), tracer()); } @Bean @@ -75,4 +80,33 @@ public KafkaTemplate kafkaTemplate() { public TracingKafkaAspect tracingKafkaAspect() { return new TracingKafkaAspect(tracer()); } + + private Map producerProps() { + final Map producerProps = new HashMap<>(); + + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); + producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); + producerProps.put(ProducerConfig.LINGER_MS_CONFIG, 1); + producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return producerProps; + } + + private Map consumerProps(String consumerGroup, boolean autoCommit) { + final Map consumerProps = new HashMap<>(); + + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup); + consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit); + consumerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10"); + consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000"); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + return consumerProps; + } } diff --git a/opentracing-kafka-spring/src/test/java/io/opentracing/contrib/kafka/spring/TracingSpringKafkaTest.java b/opentracing-kafka-spring/src/test/java/io/opentracing/contrib/kafka/spring/TracingSpringKafkaTest.java index a59aabc..012ab68 100644 --- a/opentracing-kafka-spring/src/test/java/io/opentracing/contrib/kafka/spring/TracingSpringKafkaTest.java +++ b/opentracing-kafka-spring/src/test/java/io/opentracing/contrib/kafka/spring/TracingSpringKafkaTest.java @@ -20,12 +20,18 @@ import io.opentracing.mock.MockSpan; import io.opentracing.mock.MockTracer; + +import java.io.IOException; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; + +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.runner.RunWith; @@ -39,15 +45,26 @@ @ContextConfiguration(classes = {TestConfiguration.class}) public class TracingSpringKafkaTest { - @ClassRule - public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(2, true, 2, "spring"); - @Autowired private MockTracer mockTracer; @Autowired private KafkaTemplate kafkaTemplate; + public static EmbeddedKafkaCluster cluster; + + @BeforeClass + public static void init() throws InterruptedException, IOException { + cluster = new EmbeddedKafkaCluster(2); + cluster.start(); + cluster.createTopic("spring", 2, 2); + } + + @AfterClass + public static void shutdown() { + cluster.stop(); + } + @Before public void before() { mockTracer.reset(); diff --git a/opentracing-kafka-streams/src/main/java/io/opentracing/contrib/kafka/streams/TracingKafkaClientSupplier.java b/opentracing-kafka-streams/src/main/java/io/opentracing/contrib/kafka/streams/TracingKafkaClientSupplier.java index a2c4eff..59c26cc 100644 --- a/opentracing-kafka-streams/src/main/java/io/opentracing/contrib/kafka/streams/TracingKafkaClientSupplier.java +++ b/opentracing-kafka-streams/src/main/java/io/opentracing/contrib/kafka/streams/TracingKafkaClientSupplier.java @@ -25,6 +25,8 @@ import java.util.Collections; import java.util.Map; import java.util.function.BiFunction; + +import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -89,6 +91,13 @@ public TracingKafkaClientSupplier( this(GlobalTracer.get(), null, consumerSpanNameProvider, producerSpanNameProvider); } + // This method is required by Kafka Streams >=3.0 + @Override + public Admin getAdmin(final Map config) { + // create a new client upon each call; but expect this call to be only triggered once so this should be fine + return Admin.create(config); + } + // This method is required by Kafka Streams >=1.1, and optional for Kafka Streams <1.1 public AdminClient getAdminClient(final Map config) { // create a new client upon each call; but expect this call to be only triggered once so this should be fine diff --git a/opentracing-kafka-streams/src/test/java/io/opentracing/contrib/kafka/streams/TracingKafkaStreamsTest.java b/opentracing-kafka-streams/src/test/java/io/opentracing/contrib/kafka/streams/TracingKafkaStreamsTest.java index 746eb6b..0ffcd83 100644 --- a/opentracing-kafka-streams/src/test/java/io/opentracing/contrib/kafka/streams/TracingKafkaStreamsTest.java +++ b/opentracing-kafka-streams/src/test/java/io/opentracing/contrib/kafka/streams/TracingKafkaStreamsTest.java @@ -24,35 +24,52 @@ import io.opentracing.mock.MockSpan; import io.opentracing.mock.MockTracer; import io.opentracing.tag.Tags; + +import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; + import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Produced; +import org.junit.AfterClass; import org.junit.Before; -import org.junit.ClassRule; +import org.junit.BeforeClass; import org.junit.Test; -import org.springframework.kafka.test.rule.EmbeddedKafkaRule; -import org.springframework.kafka.test.utils.KafkaTestUtils; public class TracingKafkaStreamsTest { - @ClassRule - public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(2, true, 2, "stream-test"); - + private static EmbeddedKafkaCluster cluster; private MockTracer mockTracer = new MockTracer(); + @BeforeClass + public static void init() throws InterruptedException, IOException { + cluster = new EmbeddedKafkaCluster(2); + cluster.start(); + cluster.createTopic("stream-test", 2, 2); + } + + @AfterClass + public static void shutdown() { + cluster.stop(); + } + @Before public void before() { mockTracer.reset(); @@ -60,8 +77,7 @@ public void before() { @Test public void test() { - Map senderProps = KafkaTestUtils - .producerProps(embeddedKafka.getEmbeddedKafka()); + Map senderProps = producerProps(); Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-app"); @@ -99,8 +115,7 @@ public void test() { } private Producer createProducer() { - Map senderProps = KafkaTestUtils - .producerProps(embeddedKafka.getEmbeddedKafka()); + Map senderProps = producerProps(); KafkaProducer kafkaProducer = new KafkaProducer<>(senderProps); return new TracingKafkaProducer<>(kafkaProducer, mockTracer); } @@ -132,4 +147,19 @@ private void checkSpans(List mockSpans) { private Callable reportedSpansSize() { return () -> mockTracer.finishedSpans().size(); } + + private Map producerProps() { + final Map producerProps = new HashMap<>(); + + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); + producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); + producerProps.put(ProducerConfig.LINGER_MS_CONFIG, 1); + producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return producerProps; + } + } diff --git a/pom.xml b/pom.xml index c1289af..9fdaf0d 100644 --- a/pom.xml +++ b/pom.xml @@ -67,7 +67,7 @@ UTF-8 0.33.0 - 2.6.0 + 3.0.0 2.6.1 5.2.7.RELEASE 4.3.0 @@ -160,6 +160,14 @@ test + + org.apache.kafka + kafka-streams + ${kafka.version} + test + test + + junit junit