Skip to content

Commit

Permalink
fix the flow collecting BigMonospaceText changes never runs after fas…
Browse files Browse the repository at this point in the history
…t typing and switching to another Payload which causes cancellation of the flow
  • Loading branch information
sunny-chung committed Nov 3, 2024
1 parent a323f28 commit b10ad04
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.sunnychung.application.multiplatform.hellohttp.util

import com.sunnychung.lib.multiplatform.kdatetime.KDuration
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlin.coroutines.cancellation.CancellationException

// Modified from https://blog.shreyaspatil.dev/collecting-items-from-the-flow-in-chunks
private class TimeChunkedLatestFlow<T>(
private val upstream: Flow<T>,
private val duration: KDuration
) : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) = coroutineScope<Unit> {
val mutex = Mutex()

// Holds the un-emitted items
var latestValue: T? = null
var hasValue = false

// Flag to know the status of upstream flow whether it has been completed or not
var isFlowCompleted = false

launch {
try {
while (true) {
delay(duration.toMilliseconds())
mutex.withLock {
// If the upstream flow has been completed and there are no values
// pending to emit in the collector, just break this loop.
if (isFlowCompleted && !hasValue) {
return@launch
}
if (hasValue) {
collector.emit(latestValue!!)
hasValue = false
}
}
}
} catch (e: CancellationException) {
mutex.withLock {
if (hasValue) {
collector.emit(latestValue!!)
hasValue = false
}
}
throw e
}
}

// Collect the upstream flow and add the items to the above `values` list
upstream.collect {
mutex.withLock {
latestValue = it
hasValue = true
}
}

// If we reach here it means the upstream flow has been completed and won't
// produce any values anymore. So set the flag as flow is completed so that
// child coroutine will break its loop
isFlowCompleted = true
}
}

fun <T> Flow<T>.chunkedLatest(duration: KDuration): Flow<T> = TimeChunkedLatestFlow(this, duration)
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import com.sunnychung.application.multiplatform.hellohttp.extension.contains
import com.sunnychung.application.multiplatform.hellohttp.extension.insert
import com.sunnychung.application.multiplatform.hellohttp.model.SyntaxHighlight
import com.sunnychung.application.multiplatform.hellohttp.util.TreeRangeMaps
import com.sunnychung.application.multiplatform.hellohttp.util.chunkedLatest
import com.sunnychung.application.multiplatform.hellohttp.util.log
import com.sunnychung.application.multiplatform.hellohttp.ux.bigtext.BigMonospaceText
import com.sunnychung.application.multiplatform.hellohttp.ux.bigtext.BigMonospaceTextField
Expand Down Expand Up @@ -618,12 +619,12 @@ fun CodeEditorView(

LaunchedEffect(bigTextFieldState, onTextChange) {
bigTextFieldState.valueChangesFlow
.debounce(200.milliseconds().toMilliseconds())
.chunkedLatest(200.milliseconds())
.collect {
log.d { "bigTextFieldState change ${it.changeId}" }
log.d { "bigTextFieldState change ${it.changeId} ${it.bigText.buildString()}" }
onTextChange?.let { onTextChange ->
val string = it.bigText.buildCharSequence() as AnnotatedString
log.v { "${bigTextFieldState.text} : ${it.bigText} onTextChange(${string.text.abbr()})" }
log.d { "${bigTextFieldState.text} : ${it.bigText} onTextChange(${string.text.abbr()})" }
onTextChange(string.text)
secondCacheKey.value = string.text
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package com.sunnychung.application.multiplatform.hellohttp.test.util

import com.sunnychung.application.multiplatform.hellohttp.util.chunkedLatest
import com.sunnychung.lib.multiplatform.kdatetime.extension.milliseconds
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.runBlocking
import java.util.Collections
import kotlin.test.Test
import kotlin.test.assertEquals

class ChunkedLatestFlowTest {

@Test
fun receiveOnlyLatestValues() {
runBlocking {
val results = Collections.synchronizedList(mutableListOf<Int>())

coroutineScope {
flow<Int> {
(0..10).forEach {
emit(it)
delay(145)
}
}
.chunkedLatest(500.milliseconds())
.onEach { results += it }
.launchIn(this)
}

assertEquals(listOf(3, 6, 10), results)
}
}

@Test
fun receiveValuesEmittedAtCompletion1() {
runBlocking {
val results = Collections.synchronizedList(mutableListOf<Int>())

coroutineScope {
flow<Int> {
(0..10).forEach {
emit(it)
delay(145)
}
emit(11)
emit(12)
}
.chunkedLatest(500.milliseconds())
.onEach { results += it }
.launchIn(this)
}

assertEquals(listOf(3, 6, 10, 12), results)
}
}

@Test
fun receiveValuesEmittedAtCompletion2() {
runBlocking {
val results = Collections.synchronizedList(mutableListOf<Int>())

coroutineScope {
flow<Int> {
(0..12).forEach {
emit(it)
delay(145)
}
}
.chunkedLatest(500.milliseconds())
.onEach { results += it }
.launchIn(this)
}

assertEquals(listOf(3, 6, 10, 12), results)
}
}

@Test
fun emptyFlow() {
runBlocking {
val results = Collections.synchronizedList(mutableListOf<Int>())

coroutineScope {
flow<Int> {
delay(1000)
}
.chunkedLatest(500.milliseconds())
.onEach { results += it }
.launchIn(this)
}

assertEquals(listOf(), results)
}
}

@Test
fun singleValueWithoutDelay() {
runBlocking {
val results = Collections.synchronizedList(mutableListOf<Int>())

coroutineScope {
flow<Int> {
emit(10)
}
.chunkedLatest(500.milliseconds())
.onEach { results += it }
.launchIn(this)
}

assertEquals(listOf(10), results)
}
}
}

0 comments on commit b10ad04

Please sign in to comment.