diff --git a/it-test/src/test/java/io/datanerds/verteiler/it_test/BlockingQueueConsumerTest.java b/it-test/src/test/java/io/datanerds/verteiler/it_test/BlockingQueueConsumerTest.java index 0597753..d067bb6 100644 --- a/it-test/src/test/java/io/datanerds/verteiler/it_test/BlockingQueueConsumerTest.java +++ b/it-test/src/test/java/io/datanerds/verteiler/it_test/BlockingQueueConsumerTest.java @@ -129,4 +129,82 @@ public void testSendOneMessageRestartConsumerEnsureOneMessageOnly() throws Excep await().atMost(2, SECONDS).until(() -> messageCounter.get() == 1); consumer1.stop(); } + + @Test + public void stopOneOfTwoConsumerTest() throws Exception { + final String topic = "stop_oneOfTwoConsumer_topic"; + createTopic(topic); + + AtomicInteger messageCounter = new AtomicInteger(); + + Consumer action = (message) -> { + messageCounter.incrementAndGet(); + }; + + BlockingQueueConsumer consumer1 = new BlockingQueueConsumer<>(topic, props, + 42, action); + consumer1.start(); + + BlockingQueueConsumer consumer2 = new BlockingQueueConsumer<>(topic, props, + 42, action); + consumer2.start(); + + SimpleTestProducer testProducer = new SimpleTestProducer("Lorem-Radio", topic, kafkaConnect); + logger.info("Sending {} messages", NUMBER_OF_MESSAGES); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + testProducer.send(String.valueOf(i)); + if (i == NUMBER_OF_MESSAGES / 5) { + consumer1.stop(); + } + } + + try { + await().atMost(5, SECONDS).until(() -> messageCounter.get() == NUMBER_OF_MESSAGES); + } finally { + logger.info( "expected {} , actully {}", NUMBER_OF_MESSAGES, messageCounter.get() ); + } + testProducer.close(); + consumer2.stop(); + } + + @Test + public void handleMessageWithExceptionTest() throws Exception { + final String topic = "handle_exception_topic"; + createTopic(topic); + + AtomicInteger messageCounter = new AtomicInteger(); + + Consumer action1 = (message) -> { + throw new RuntimeException("unexpected"); + }; + + Consumer action2 = (message) -> { + messageCounter.incrementAndGet(); + }; + + BlockingQueueConsumer consumer1 = new BlockingQueueConsumer<>(topic, props, + 42, action1); + consumer1.start(); + + BlockingQueueConsumer consumer2 = new BlockingQueueConsumer<>(topic, props, + 42, action2); + consumer2.start(); + + SimpleTestProducer testProducer = new SimpleTestProducer("Lorem-Radio", topic, kafkaConnect); + logger.info("Sending {} messages", NUMBER_OF_MESSAGES); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + testProducer.send(LOREM_IPSUM.paragraph()); + } + + try { + await().atMost(60, SECONDS).until(() -> messageCounter.get() == NUMBER_OF_MESSAGES); + } finally { + logger.info( "expected {} , actully {}", NUMBER_OF_MESSAGES, messageCounter.get() ); + } + + testProducer.close(); + consumer2.stop(); + } }