From 57f5cd71ecdd511f3657172edf64cc2e6ae6a6c3 Mon Sep 17 00:00:00 2001 From: chrisford Date: Fri, 28 May 2021 12:08:39 +0100 Subject: [PATCH 1/3] update search so when searchItem provided consumer wont parse all other messages --- .../viewmodel/main/topic/ConsumerViewModel.kt | 2 +- .../kotlin/insulator/kafka/consumer/Consumer.kt | 14 ++++++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/app/src/main/kotlin/insulator/viewmodel/main/topic/ConsumerViewModel.kt b/app/src/main/kotlin/insulator/viewmodel/main/topic/ConsumerViewModel.kt index e4f79958..643c5027 100644 --- a/app/src/main/kotlin/insulator/viewmodel/main/topic/ConsumerViewModel.kt +++ b/app/src/main/kotlin/insulator/viewmodel/main/topic/ConsumerViewModel.kt @@ -49,7 +49,7 @@ class ConsumerViewModel @Inject constructor( clearRecords() val consumerFrom = ConsumeFrom.values().first { it.text == consumeFromProperty.value } val deserializationFormat = DeserializationFormat.valueOf(deserializeValueProperty.value) - consumer.start(topic.name, consumerFrom, deserializationFormat) { + consumer.start(topic.name, searchItem.value, consumerFrom, deserializationFormat) { records.runOnFXThread { addAll(it.map { record -> RecordViewModel(record) }) } } } else stop() diff --git a/lib/kafka/src/main/kotlin/insulator/kafka/consumer/Consumer.kt b/lib/kafka/src/main/kotlin/insulator/kafka/consumer/Consumer.kt index d13bb239..7579d2a8 100644 --- a/lib/kafka/src/main/kotlin/insulator/kafka/consumer/Consumer.kt +++ b/lib/kafka/src/main/kotlin/insulator/kafka/consumer/Consumer.kt @@ -31,14 +31,14 @@ class Consumer( private var threadLoop: Thread? = null private var running = false - suspend fun start(topic: String, from: ConsumeFrom, valueFormat: DeserializationFormat, callback: ConsumerCallback) = + suspend fun start(topic: String, searchItem: String, from: ConsumeFrom, valueFormat: DeserializationFormat, callback: ConsumerCallback) = suspendCoroutine { continuation -> GlobalScope.launch { if (isRunning()) throw IllegalStateException("Consumer already running") val consumer = consumerFactory.build(valueFormat) initializeConsumer(consumer, topic, from) running = true - loop(consumer, callback) + loop(consumer, searchItem, callback) }.invokeOnCompletion { exception -> if (exception == null) continuation.resume(Unit) else continuation.resumeWithException(exception) } @@ -57,12 +57,18 @@ class Consumer( }.invokeOnCompletion { continuation.resume(Unit) } } - private fun loop(consumer: Consumer, callback: ConsumerCallback) { + private fun loop(consumer: Consumer, searchItem: String, callback: ConsumerCallback) { threadLoop = thread { while (running) { val records = consumer.poll(Duration.ofSeconds(1)) if (records.isEmpty) continue - callback(records.toList().map { parse(it) }) + if(searchItem.isEmpty()) + { + callback(records.toList().map { parse(it) }) + } + else{ + callback(records.filter { item -> item.key() == searchItem }.toList().map { parse(it) }) + } } } } From 4180980322b54c18c464eee46e33b0062dabc980 Mon Sep 17 00:00:00 2001 From: chrisford Date: Fri, 28 May 2021 12:16:54 +0100 Subject: [PATCH 2/3] update unit tests --- .../viewmodel/main/topic/ConsumerViewModelTest.kt | 4 ++-- .../kotlin/insulator/kafka/consumer/ConsumerTest.kt | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/app/src/test/kotlin/insulator/viewmodel/main/topic/ConsumerViewModelTest.kt b/app/src/test/kotlin/insulator/viewmodel/main/topic/ConsumerViewModelTest.kt index c00c1e24..08585948 100644 --- a/app/src/test/kotlin/insulator/viewmodel/main/topic/ConsumerViewModelTest.kt +++ b/app/src/test/kotlin/insulator/viewmodel/main/topic/ConsumerViewModelTest.kt @@ -45,9 +45,9 @@ class ConsumerViewModelTest : StringSpec({ } }) -private class ConsumerViewModelTestContext { +private class ConsumerViewModelTestContext(val searchItem: String = "") { val mockConsumer = mockk { - coEvery { start(any(), any(), any(), any()) } answers { + coEvery { start(any(), searchItem, any(), any(), any()) } answers { arg<(List) -> Unit>(3)(listOf(insulator.kafka.model.Record("1", "2", 3L, mockk(), 1, 3L))) arg<(List) -> Unit>(3)(listOf(insulator.kafka.model.Record("1", "2", 3L, mockk(), 1, 3L))) arg<(List) -> Unit>(3)(listOf(insulator.kafka.model.Record("1", "2", 3L, mockk(), 1, 3L))) diff --git a/lib/kafka/src/test/kotlin/insulator/kafka/consumer/ConsumerTest.kt b/lib/kafka/src/test/kotlin/insulator/kafka/consumer/ConsumerTest.kt index cbe4a95c..4c061414 100644 --- a/lib/kafka/src/test/kotlin/insulator/kafka/consumer/ConsumerTest.kt +++ b/lib/kafka/src/test/kotlin/insulator/kafka/consumer/ConsumerTest.kt @@ -30,7 +30,7 @@ class ConsumerTest : StringSpec({ // arrange val messages = mutableListOf() // act - it.sut.start("testTopic", consumerFrom, deserializationFormat) { lst -> messages.addAll(lst.map { r -> r.value }) } + it.sut.start("testTopic", "", consumerFrom, deserializationFormat) { lst -> messages.addAll(lst.map { r -> r.value }) } // assert delay(300L) eventually(3.seconds) { @@ -53,7 +53,7 @@ class ConsumerTest : StringSpec({ val messages = mutableListOf() val sut = Consumer(it.consumerFactory) { Throwable("").left() } // act - sut.start("testTopic", ConsumeFrom.Beginning, DeserializationFormat.Avro) { lst -> messages.addAll(lst.map { r -> r.value }) } + sut.start("testTopic", "", ConsumeFrom.Beginning, DeserializationFormat.Avro) { lst -> messages.addAll(lst.map { r -> r.value }) } // assert eventually(20.seconds) { messages.size shouldBe 1 @@ -66,7 +66,7 @@ class ConsumerTest : StringSpec({ // arrange val messages = mutableListOf() // act - it.sut.start("testTopic", ConsumeFrom.Now, DeserializationFormat.String) { lst -> messages.addAll(lst.map { r -> r.value }) } + it.sut.start("testTopic", "", ConsumeFrom.Now, DeserializationFormat.String) { lst -> messages.addAll(lst.map { r -> r.value }) } // assert it.sut.isRunning() shouldBe true it.sut.stop() @@ -85,9 +85,9 @@ class ConsumerTest : StringSpec({ "start twice throw an error" { TestConsumerScenario().use { // arrange - it.sut.start("testTopic", ConsumeFrom.Now, DeserializationFormat.String) { } + it.sut.start("testTopic", "", ConsumeFrom.Now, DeserializationFormat.String) { } // act - val action = suspend { it.sut.start("testTopic", ConsumeFrom.Now, DeserializationFormat.String) { } } + val action = suspend { it.sut.start("testTopic", "", ConsumeFrom.Now, DeserializationFormat.String) { } } // assert kotlin.runCatching { action.invoke() }.isFailure shouldBe true } From d5429994793aa089b2f51186f34e04f865ad8905 Mon Sep 17 00:00:00 2001 From: Auto Lint Date: Fri, 28 May 2021 13:11:13 +0000 Subject: [PATCH 3/3] Fix PR lint --- .../src/main/kotlin/insulator/kafka/consumer/Consumer.kt | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/kafka/src/main/kotlin/insulator/kafka/consumer/Consumer.kt b/lib/kafka/src/main/kotlin/insulator/kafka/consumer/Consumer.kt index 7579d2a8..173bd6bc 100644 --- a/lib/kafka/src/main/kotlin/insulator/kafka/consumer/Consumer.kt +++ b/lib/kafka/src/main/kotlin/insulator/kafka/consumer/Consumer.kt @@ -62,11 +62,9 @@ class Consumer( while (running) { val records = consumer.poll(Duration.ofSeconds(1)) if (records.isEmpty) continue - if(searchItem.isEmpty()) - { + if (searchItem.isEmpty()) { callback(records.toList().map { parse(it) }) - } - else{ + } else { callback(records.filter { item -> item.key() == searchItem }.toList().map { parse(it) }) } }