Skip to content
This repository has been archived by the owner on Apr 12, 2023. It is now read-only.

Feat/filter on searchitem #194

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ConsumerViewModel @Inject constructor(
clearRecords()
val consumerFrom = ConsumeFrom.values().first { it.text == consumeFromProperty.value }
val deserializationFormat = DeserializationFormat.valueOf(deserializeValueProperty.value)
consumer.start(topic.name, consumerFrom, deserializationFormat) {
consumer.start(topic.name, searchItem.value, consumerFrom, deserializationFormat) {
records.runOnFXThread { addAll(it.map { record -> RecordViewModel(record) }) }
}
} else stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ class ConsumerViewModelTest : StringSpec({
}
})

private class ConsumerViewModelTestContext {
private class ConsumerViewModelTestContext(val searchItem: String = "") {
val mockConsumer = mockk<Consumer> {
coEvery { start(any(), any(), any(), any()) } answers {
coEvery { start(any(), searchItem, any(), any(), any()) } answers {
arg<(List<insulator.kafka.model.Record>) -> Unit>(3)(listOf(insulator.kafka.model.Record("1", "2", 3L, mockk(), 1, 3L)))
arg<(List<insulator.kafka.model.Record>) -> Unit>(3)(listOf(insulator.kafka.model.Record("1", "2", 3L, mockk(), 1, 3L)))
arg<(List<insulator.kafka.model.Record>) -> Unit>(3)(listOf(insulator.kafka.model.Record("1", "2", 3L, mockk(), 1, 3L)))
Expand Down
12 changes: 8 additions & 4 deletions lib/kafka/src/main/kotlin/insulator/kafka/consumer/Consumer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ class Consumer(
private var threadLoop: Thread? = null
private var running = false

suspend fun start(topic: String, from: ConsumeFrom, valueFormat: DeserializationFormat, callback: ConsumerCallback) =
suspend fun start(topic: String, searchItem: String, from: ConsumeFrom, valueFormat: DeserializationFormat, callback: ConsumerCallback) =
suspendCoroutine<Unit> { continuation ->
GlobalScope.launch {
if (isRunning()) throw IllegalStateException("Consumer already running")
val consumer = consumerFactory.build(valueFormat)
initializeConsumer(consumer, topic, from)
running = true
loop(consumer, callback)
loop(consumer, searchItem, callback)
}.invokeOnCompletion { exception ->
if (exception == null) continuation.resume(Unit) else continuation.resumeWithException(exception)
}
Expand All @@ -57,12 +57,16 @@ class Consumer(
}.invokeOnCompletion { continuation.resume(Unit) }
}

private fun loop(consumer: Consumer<Any, Any>, callback: ConsumerCallback) {
private fun loop(consumer: Consumer<Any, Any>, searchItem: String, callback: ConsumerCallback) {
threadLoop = thread {
while (running) {
val records = consumer.poll(Duration.ofSeconds(1))
if (records.isEmpty) continue
callback(records.toList().map { parse(it) })
if (searchItem.isEmpty()) {
callback(records.toList().map { parse(it) })
} else {
callback(records.filter { item -> item.key() == searchItem }.toList().map { parse(it) })
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class ConsumerTest : StringSpec({
// arrange
val messages = mutableListOf<String>()
// act
it.sut.start("testTopic", consumerFrom, deserializationFormat) { lst -> messages.addAll(lst.map { r -> r.value }) }
it.sut.start("testTopic", "", consumerFrom, deserializationFormat) { lst -> messages.addAll(lst.map { r -> r.value }) }
// assert
delay(300L)
eventually(3.seconds) {
Expand All @@ -53,7 +53,7 @@ class ConsumerTest : StringSpec({
val messages = mutableListOf<String>()
val sut = Consumer(it.consumerFactory) { Throwable("").left() }
// act
sut.start("testTopic", ConsumeFrom.Beginning, DeserializationFormat.Avro) { lst -> messages.addAll(lst.map { r -> r.value }) }
sut.start("testTopic", "", ConsumeFrom.Beginning, DeserializationFormat.Avro) { lst -> messages.addAll(lst.map { r -> r.value }) }
// assert
eventually(20.seconds) {
messages.size shouldBe 1
Expand All @@ -66,7 +66,7 @@ class ConsumerTest : StringSpec({
// arrange
val messages = mutableListOf<String>()
// act
it.sut.start("testTopic", ConsumeFrom.Now, DeserializationFormat.String) { lst -> messages.addAll(lst.map { r -> r.value }) }
it.sut.start("testTopic", "", ConsumeFrom.Now, DeserializationFormat.String) { lst -> messages.addAll(lst.map { r -> r.value }) }
// assert
it.sut.isRunning() shouldBe true
it.sut.stop()
Expand All @@ -85,9 +85,9 @@ class ConsumerTest : StringSpec({
"start twice throw an error" {
TestConsumerScenario().use {
// arrange
it.sut.start("testTopic", ConsumeFrom.Now, DeserializationFormat.String) { }
it.sut.start("testTopic", "", ConsumeFrom.Now, DeserializationFormat.String) { }
// act
val action = suspend { it.sut.start("testTopic", ConsumeFrom.Now, DeserializationFormat.String) { } }
val action = suspend { it.sut.start("testTopic", "", ConsumeFrom.Now, DeserializationFormat.String) { } }
// assert
kotlin.runCatching { action.invoke() }.isFailure shouldBe true
}
Expand Down