Skip to content

Commit

Permalink
Some failure test
Browse files Browse the repository at this point in the history
  • Loading branch information
blling committed Aug 3, 2018
1 parent 9e3ff4d commit be5aa74
Showing 1 changed file with 78 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,82 @@ public void testSendOneMessageRestartConsumerEnsureOneMessageOnly() throws Excep
await().atMost(2, SECONDS).until(() -> messageCounter.get() == 1);
consumer1.stop();
}

@Test
public void stopOneOfTwoConsumerTest() throws Exception {
final String topic = "stop_oneOfTwoConsumer_topic";
createTopic(topic);

AtomicInteger messageCounter = new AtomicInteger();

Consumer<String> action = (message) -> {
messageCounter.incrementAndGet();
};

BlockingQueueConsumer<String, String> consumer1 = new BlockingQueueConsumer<>(topic, props,
42, action);
consumer1.start();

BlockingQueueConsumer<String, String> consumer2 = new BlockingQueueConsumer<>(topic, props,
42, action);
consumer2.start();

SimpleTestProducer testProducer = new SimpleTestProducer("Lorem-Radio", topic, kafkaConnect);
logger.info("Sending {} messages", NUMBER_OF_MESSAGES);

for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
testProducer.send(String.valueOf(i));
if (i == NUMBER_OF_MESSAGES / 5) {
consumer1.stop();
}
}

try {
await().atMost(5, SECONDS).until(() -> messageCounter.get() == NUMBER_OF_MESSAGES);
} finally {
logger.info( "expected {} , actully {}", NUMBER_OF_MESSAGES, messageCounter.get() );
}
testProducer.close();
consumer2.stop();
}

@Test
public void handleMessageWithExceptionTest() throws Exception {
final String topic = "handle_exception_topic";
createTopic(topic);

AtomicInteger messageCounter = new AtomicInteger();

Consumer<String> action1 = (message) -> {
throw new RuntimeException("unexpected");
};

Consumer<String> action2 = (message) -> {
messageCounter.incrementAndGet();
};

BlockingQueueConsumer<String, String> consumer1 = new BlockingQueueConsumer<>(topic, props,
42, action1);
consumer1.start();

BlockingQueueConsumer<String, String> consumer2 = new BlockingQueueConsumer<>(topic, props,
42, action2);
consumer2.start();

SimpleTestProducer testProducer = new SimpleTestProducer("Lorem-Radio", topic, kafkaConnect);
logger.info("Sending {} messages", NUMBER_OF_MESSAGES);

for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
testProducer.send(LOREM_IPSUM.paragraph());
}

try {
await().atMost(60, SECONDS).until(() -> messageCounter.get() == NUMBER_OF_MESSAGES);
} finally {
logger.info( "expected {} , actully {}", NUMBER_OF_MESSAGES, messageCounter.get() );
}

testProducer.close();
consumer2.stop();
}
}

0 comments on commit be5aa74

Please sign in to comment.