Skip to content

Commit

Permalink
Små fikser
Browse files Browse the repository at this point in the history
  • Loading branch information
OleksandrChmyrNAV committed Dec 4, 2024
1 parent e818317 commit 95d1914
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 11 deletions.
13 changes: 8 additions & 5 deletions ebms-provider/src/main/kotlin/no/nav/emottak/ebms/App.kt
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,22 @@ fun Application.ebmsProviderModule() {
}
get("/kafkatest") {
log.debug("Kafka test: start")
try {
val consumer = kafkaClientObject.createConsumer()
val topic = "ebxml-acknowledgments"

consumer.subscribe(listOf(topic))
val consumer = kafkaClientObject.createConsumer()
val topic = "ebxml-acknowledgments"

consumer.subscribe(listOf(topic))

try {
val records = consumer.poll(Duration.ofMillis(100))
log.debug("Kafka test: Messages read - ${records.count()}")
if (records.count() > 0) {
log.debug("Kafka test: Last message - ${records.toList().last().value()}")
}
consumer.close()
} catch (e: Exception) {
log.error("Kafka test: Exception while reading messages from queue", e)
} finally {
consumer.close()
}

call.respondText("Kafka works!")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class KafkaClient {
fun createConsumer(): KafkaConsumer<String, String> {
val props = Properties().apply {
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers)
put(ConsumerConfig.GROUP_ID_CONFIG, "ebms-provider10")
put(ConsumerConfig.GROUP_ID_CONFIG, "ebms-provider")
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,21 @@ class ProcessingService(private val httpClient: PayloadProcessingClient) {

private fun acknowledgment(acknowledgment: Acknowledgment) {
try {
log.debug("Sending acknowledgment to queue")
log.debug("Acknowledgment document: {}", acknowledgment.dokument.toString())
log.debug("Kafka test: Sending acknowledgment to queue")
log.debug("Kafka test: Acknowledgment document: {}", acknowledgment.dokument.toString())
acknowledgment.dokument.toString()
val kafkaProducer = kafkaClientObject.createProducer()
val topic = getEnvVar("KAFKA_TOPIC_ACKNOWLEDGMENTS", "emottak-acknowledgments")
val topic = getEnvVar("KAFKA_TOPIC_ACKNOWLEDGMENTS", "ebxml-acknowledgments")
log.debug("Kafka test: Acknowledgment topic: {}", topic)
kafkaProducer.send(
ProducerRecord(topic, acknowledgment.messageId, acknowledgment.toEbmsDokument().toString())
)
kafkaProducer.flush()
kafkaProducer.close()
} catch (e: Exception) {
log.error("Exception while sending acknowledgment to queue", e)
log.error("Kafka test: Exception while sending acknowledgment to queue", e)
}
log.debug("Acknowledgment sent to queue")
log.debug("Kafka test: Acknowledgment sent to queue")
}

private fun fail(fail: EbmsFail) {
Expand Down

0 comments on commit 95d1914

Please sign in to comment.