diff --git a/src/jvmMain/kotlin/com/sunnychung/application/multiplatform/hellohttp/util/TimeChunkedLatestFlow.kt b/src/jvmMain/kotlin/com/sunnychung/application/multiplatform/hellohttp/util/TimeChunkedLatestFlow.kt new file mode 100644 index 00000000..b3f9ea1b --- /dev/null +++ b/src/jvmMain/kotlin/com/sunnychung/application/multiplatform/hellohttp/util/TimeChunkedLatestFlow.kt @@ -0,0 +1,70 @@ +package com.sunnychung.application.multiplatform.hellohttp.util + +import com.sunnychung.lib.multiplatform.kdatetime.KDuration +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.FlowCollector +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlin.coroutines.cancellation.CancellationException + +// Modified from https://blog.shreyaspatil.dev/collecting-items-from-the-flow-in-chunks +private class TimeChunkedLatestFlow( + private val upstream: Flow, + private val duration: KDuration +) : Flow { + override suspend fun collect(collector: FlowCollector) = coroutineScope { + val mutex = Mutex() + + // Holds the un-emitted items + var latestValue: T? = null + var hasValue = false + + // Flag to know the status of upstream flow whether it has been completed or not + var isFlowCompleted = false + + launch { + try { + while (true) { + delay(duration.toMilliseconds()) + mutex.withLock { + // If the upstream flow has been completed and there are no values + // pending to emit in the collector, just break this loop. + if (isFlowCompleted && !hasValue) { + return@launch + } + if (hasValue) { + collector.emit(latestValue!!) + hasValue = false + } + } + } + } catch (e: CancellationException) { + mutex.withLock { + if (hasValue) { + collector.emit(latestValue!!) + hasValue = false + } + } + throw e + } + } + + // Collect the upstream flow and add the items to the above `values` list + upstream.collect { + mutex.withLock { + latestValue = it + hasValue = true + } + } + + // If we reach here it means the upstream flow has been completed and won't + // produce any values anymore. So set the flag as flow is completed so that + // child coroutine will break its loop + isFlowCompleted = true + } +} + +fun Flow.chunkedLatest(duration: KDuration): Flow = TimeChunkedLatestFlow(this, duration) diff --git a/src/jvmMain/kotlin/com/sunnychung/application/multiplatform/hellohttp/ux/CodeEditorView.kt b/src/jvmMain/kotlin/com/sunnychung/application/multiplatform/hellohttp/ux/CodeEditorView.kt index 22d5e386..eac5c56a 100644 --- a/src/jvmMain/kotlin/com/sunnychung/application/multiplatform/hellohttp/ux/CodeEditorView.kt +++ b/src/jvmMain/kotlin/com/sunnychung/application/multiplatform/hellohttp/ux/CodeEditorView.kt @@ -63,6 +63,7 @@ import com.sunnychung.application.multiplatform.hellohttp.extension.contains import com.sunnychung.application.multiplatform.hellohttp.extension.insert import com.sunnychung.application.multiplatform.hellohttp.model.SyntaxHighlight import com.sunnychung.application.multiplatform.hellohttp.util.TreeRangeMaps +import com.sunnychung.application.multiplatform.hellohttp.util.chunkedLatest import com.sunnychung.application.multiplatform.hellohttp.util.log import com.sunnychung.application.multiplatform.hellohttp.ux.bigtext.BigMonospaceText import com.sunnychung.application.multiplatform.hellohttp.ux.bigtext.BigMonospaceTextField @@ -618,12 +619,12 @@ fun CodeEditorView( LaunchedEffect(bigTextFieldState, onTextChange) { bigTextFieldState.valueChangesFlow - .debounce(200.milliseconds().toMilliseconds()) + .chunkedLatest(200.milliseconds()) .collect { - log.d { "bigTextFieldState change ${it.changeId}" } + log.d { "bigTextFieldState change ${it.changeId} ${it.bigText.buildString()}" } onTextChange?.let { onTextChange -> val string = it.bigText.buildCharSequence() as AnnotatedString - log.v { "${bigTextFieldState.text} : ${it.bigText} onTextChange(${string.text.abbr()})" } + log.d { "${bigTextFieldState.text} : ${it.bigText} onTextChange(${string.text.abbr()})" } onTextChange(string.text) secondCacheKey.value = string.text } diff --git a/src/jvmTest/kotlin/com/sunnychung/application/multiplatform/hellohttp/test/util/ChunkedLatestFlowTest.kt b/src/jvmTest/kotlin/com/sunnychung/application/multiplatform/hellohttp/test/util/ChunkedLatestFlowTest.kt new file mode 100644 index 00000000..d45818d0 --- /dev/null +++ b/src/jvmTest/kotlin/com/sunnychung/application/multiplatform/hellohttp/test/util/ChunkedLatestFlowTest.kt @@ -0,0 +1,119 @@ +package com.sunnychung.application.multiplatform.hellohttp.test.util + +import com.sunnychung.application.multiplatform.hellohttp.util.chunkedLatest +import com.sunnychung.lib.multiplatform.kdatetime.extension.milliseconds +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.runBlocking +import java.util.Collections +import kotlin.test.Test +import kotlin.test.assertEquals + +class ChunkedLatestFlowTest { + + @Test + fun receiveOnlyLatestValues() { + runBlocking { + val results = Collections.synchronizedList(mutableListOf()) + + coroutineScope { + flow { + (0..10).forEach { + emit(it) + delay(145) + } + } + .chunkedLatest(500.milliseconds()) + .onEach { results += it } + .launchIn(this) + } + + assertEquals(listOf(3, 6, 10), results) + } + } + + @Test + fun receiveValuesEmittedAtCompletion1() { + runBlocking { + val results = Collections.synchronizedList(mutableListOf()) + + coroutineScope { + flow { + (0..10).forEach { + emit(it) + delay(145) + } + emit(11) + emit(12) + } + .chunkedLatest(500.milliseconds()) + .onEach { results += it } + .launchIn(this) + } + + assertEquals(listOf(3, 6, 10, 12), results) + } + } + + @Test + fun receiveValuesEmittedAtCompletion2() { + runBlocking { + val results = Collections.synchronizedList(mutableListOf()) + + coroutineScope { + flow { + (0..12).forEach { + emit(it) + delay(145) + } + } + .chunkedLatest(500.milliseconds()) + .onEach { results += it } + .launchIn(this) + } + + assertEquals(listOf(3, 6, 10, 12), results) + } + } + + @Test + fun emptyFlow() { + runBlocking { + val results = Collections.synchronizedList(mutableListOf()) + + coroutineScope { + flow { + delay(1000) + } + .chunkedLatest(500.milliseconds()) + .onEach { results += it } + .launchIn(this) + } + + assertEquals(listOf(), results) + } + } + + @Test + fun singleValueWithoutDelay() { + runBlocking { + val results = Collections.synchronizedList(mutableListOf()) + + coroutineScope { + flow { + emit(10) + } + .chunkedLatest(500.milliseconds()) + .onEach { results += it } + .launchIn(this) + } + + assertEquals(listOf(10), results) + } + } +}