diff --git a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/App.kt b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/App.kt index c78b0735..28284fae 100644 --- a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/App.kt +++ b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/App.kt @@ -72,19 +72,22 @@ fun Application.ebmsProviderModule() { } get("/kafkatest") { log.debug("Kafka test: start") - try { - val consumer = kafkaClientObject.createConsumer() - val topic = "ebxml-acknowledgments" - consumer.subscribe(listOf(topic)) + val consumer = kafkaClientObject.createConsumer() + val topic = "ebxml-acknowledgments" + + consumer.subscribe(listOf(topic)) + + try { val records = consumer.poll(Duration.ofMillis(100)) log.debug("Kafka test: Messages read - ${records.count()}") if (records.count() > 0) { log.debug("Kafka test: Last message - ${records.toList().last().value()}") } - consumer.close() } catch (e: Exception) { log.error("Kafka test: Exception while reading messages from queue", e) + } finally { + consumer.close() } call.respondText("Kafka works!") diff --git a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/kafka/KafkaClient.kt b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/kafka/KafkaClient.kt index e5be8561..aba63b6f 100644 --- a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/kafka/KafkaClient.kt +++ b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/kafka/KafkaClient.kt @@ -33,7 +33,7 @@ class KafkaClient { fun createConsumer(): KafkaConsumer { val props = Properties().apply { put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers) - put(ConsumerConfig.GROUP_ID_CONFIG, "ebms-provider10") + put(ConsumerConfig.GROUP_ID_CONFIG, "ebms-provider") put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name) put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name) put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") diff --git a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/processing/ProcessingService.kt b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/processing/ProcessingService.kt index 6571e76d..40e36994 100644 --- a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/processing/ProcessingService.kt +++ b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/processing/ProcessingService.kt @@ -86,20 +86,21 @@ class ProcessingService(private val httpClient: PayloadProcessingClient) { private fun acknowledgment(acknowledgment: Acknowledgment) { try { - log.debug("Sending acknowledgment to queue") - log.debug("Acknowledgment document: {}", acknowledgment.dokument.toString()) + log.debug("Kafka test: Sending acknowledgment to queue") + log.debug("Kafka test: Acknowledgment document: {}", acknowledgment.dokument.toString()) acknowledgment.dokument.toString() val kafkaProducer = kafkaClientObject.createProducer() - val topic = getEnvVar("KAFKA_TOPIC_ACKNOWLEDGMENTS", "emottak-acknowledgments") + val topic = getEnvVar("KAFKA_TOPIC_ACKNOWLEDGMENTS", "ebxml-acknowledgments") + log.debug("Kafka test: Acknowledgment topic: {}", topic) kafkaProducer.send( ProducerRecord(topic, acknowledgment.messageId, acknowledgment.toEbmsDokument().toString()) ) kafkaProducer.flush() kafkaProducer.close() } catch (e: Exception) { - log.error("Exception while sending acknowledgment to queue", e) + log.error("Kafka test: Exception while sending acknowledgment to queue", e) } - log.debug("Acknowledgment sent to queue") + log.debug("Kafka test: Acknowledgment sent to queue") } private fun fail(fail: EbmsFail) {