From e3dc7af5cc8369ef4fc77dc2e4f73a4f3ce68465 Mon Sep 17 00:00:00 2001 From: Parker Wahle Date: Fri, 22 Nov 2024 20:53:01 -0500 Subject: [PATCH] better flowcaching + docs (v1.5.1) --- .idea/deploymentTargetSelector.xml | 10 ++-- build.gradle.kts | 2 +- .../java/xyz/regulad/regulib/FlowCache.kt | 50 ++++++++++++++++--- 3 files changed, 48 insertions(+), 14 deletions(-) diff --git a/.idea/deploymentTargetSelector.xml b/.idea/deploymentTargetSelector.xml index aa2188f..1b52b7c 100644 --- a/.idea/deploymentTargetSelector.xml +++ b/.idea/deploymentTargetSelector.xml @@ -16,12 +16,9 @@ - + - - + + \ No newline at end of file diff --git a/build.gradle.kts b/build.gradle.kts index 178288e..72cd091 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -9,7 +9,7 @@ plugins { } group = "xyz.regulad" -version = "1.5.0" +version = "1.5.1" nexusPublishing { repositories { diff --git a/common/src/main/java/xyz/regulad/regulib/FlowCache.kt b/common/src/main/java/xyz/regulad/regulib/FlowCache.kt index f21766b..dcdbaee 100644 --- a/common/src/main/java/xyz/regulad/regulib/FlowCache.kt +++ b/common/src/main/java/xyz/regulad/regulib/FlowCache.kt @@ -3,6 +3,7 @@ package xyz.regulad.regulib import android.annotation.SuppressLint import android.content.ContentValues import android.content.Context +import android.database.CursorWindow import android.database.sqlite.SQLiteDatabase import android.database.sqlite.SQLiteOpenHelper import android.os.Build @@ -20,6 +21,7 @@ import kotlinx.serialization.json.Json import xyz.regulad.regulib.FlowCache.Companion.asCached import xyz.regulad.regulib.agnostic.versionAgnosticComputeIfAbsent import java.io.File +import java.lang.reflect.Field import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.locks.ReentrantReadWriteLock @@ -27,13 +29,29 @@ import kotlin.concurrent.read import kotlin.concurrent.write import kotlin.reflect.KClass + /** * A cache for flows that persists between application restarts, effectively turning cold flows into hot flows, maintaining their ability to be completed. * * This class should not be used directly. Instead, use [Flow.asCached] to create a cached proxy of a flow. */ +@SuppressLint("PrivateApi") class FlowCache @PublishedApi internal constructor(context: Context) : SQLiteOpenHelper(context, File(context.cacheDir, DATABASE_NAME).absolutePath, null, DATABASE_VERSION) { + init { + // we need to expand the cursor window past the original 2mb limit since we may have some massive JSON strings + try { + val field: Field = CursorWindow::class.java.getDeclaredField("sCursorWindowSize") + field.isAccessible = true + field.set(null, 25 * 1024 * 1024) // 25 MiB + + Log.d(TAG, "Successfully expanded the cursor window size") + } catch (e: Exception) { + Log.w(TAG, "Failed to expand the cursor window size", e) + Log.w(TAG, "This may cause issues with large JSON strings in the cache, possible recomputes incoming!") + } + } + companion object { // no need to declare a custom initial size of a hashmap: it will grow as needed efficiently const val TAG = "FlowCache" @@ -110,7 +128,7 @@ class FlowCache @PublishedApi internal constructor(context: Context) : ): Flow { val collectionLock = Mutex() // enforces linear collection of items val itemsReceived = mutableListOf() - val newItemFlow = MutableSharedFlow(0) + val proxyFlow = MutableSharedFlow(Int.MAX_VALUE) val streamCompletedFlow = MutableStateFlow(false) // beware: no replay of this state collectionCoroutineScope.launch { @@ -130,7 +148,7 @@ class FlowCache @PublishedApi internal constructor(context: Context) : if (cachedItems != null) { Log.d(TAG, "Replaying cached items for key $cacheKey") - cachedItems.forEach { newItemFlow.emit(it) } + cachedItems.forEach { proxyFlow.emit(it) } itemsReceived.addAll(cachedItems) streamCompletedFlow.value = true } else { @@ -138,7 +156,7 @@ class FlowCache @PublishedApi internal constructor(context: Context) : flow.collect { collectionLock.withLock { itemsReceived.add(it) - newItemFlow.emit(it) + proxyFlow.emit(it) } } streamCompletedFlow.value = true @@ -160,27 +178,43 @@ class FlowCache @PublishedApi internal constructor(context: Context) : // we use a channel flow instead of flow since we launch a coroutine that adds items to the flow val newProxyFlow = channelFlow { + val emissionLock = Mutex() // enforces linear emission of items + // NEW SUBSCRIBER + // although we could choose just to reply on the replay of the newItemFlow here, // since we already have to collect the items in order to save them, // we might as well just send them all and avoid registering as a collector earlier // in addition, otherwise we could miss items if the streamCompletedFlow turns true while collecting but before we emit all items if (streamCompletedFlow.value) { - itemsReceived.forEach { channel.send(it) } + Log.d(TAG, "NEW SUBSCRIBER: Stream already completed for cache key $cacheKey, emitting all items") + itemsReceived.forEach { + emissionLock.withLock { + channel.send(it) + } + } return@channelFlow } else { + // we are subscribing to the flow live + Log.d( + TAG, + "NEW SUBSCRIBER: Stream not yet completed for cache key $cacheKey, emitting items as they come" + ) val alreadyEmittedItems = mutableListOf() // tricky problem: if we cancel this job when the streamCompleted is true, we may have missed some items val itemCollectionJob = collectionCoroutineScope.launch { - newItemFlow.collect { - channel.send(it) - alreadyEmittedItems.add(it) + proxyFlow.collect { + emissionLock.withLock { + // we may be cancelled with some items left to emit and process, so we need to track them in alreadyEmittedItems + alreadyEmittedItems.add(it) + channel.send(it) + } } } streamCompletedFlow.waitForTrue() - itemCollectionJob.cancelAndJoin() // everything in alreadyEmittedItems is guaranteed to be emitted, and itemsReceived is guaranteed to be complete + itemCollectionJob.cancelAndJoin() // emit items we haven't emitted yet but have received, remembering that all items are received in order if (alreadyEmittedItems.size < itemsReceived.size) {