Skip to content

Commit

Permalink
better flowcaching + docs (v1.5.1)
Browse files Browse the repository at this point in the history
  • Loading branch information
regulad committed Nov 23, 2024
1 parent 8f0c160 commit e3dc7af
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 14 deletions.
10 changes: 5 additions & 5 deletions .idea/deploymentTargetSelector.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ plugins {
}

group = "xyz.regulad"
version = "1.5.0"
version = "1.5.1"

nexusPublishing {
repositories {
Expand Down
50 changes: 42 additions & 8 deletions common/src/main/java/xyz/regulad/regulib/FlowCache.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package xyz.regulad.regulib
import android.annotation.SuppressLint
import android.content.ContentValues
import android.content.Context
import android.database.CursorWindow
import android.database.sqlite.SQLiteDatabase
import android.database.sqlite.SQLiteOpenHelper
import android.os.Build
Expand All @@ -20,20 +21,37 @@ import kotlinx.serialization.json.Json
import xyz.regulad.regulib.FlowCache.Companion.asCached
import xyz.regulad.regulib.agnostic.versionAgnosticComputeIfAbsent
import java.io.File
import java.lang.reflect.Field
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.read
import kotlin.concurrent.write
import kotlin.reflect.KClass


/**
* A cache for flows that persists between application restarts, effectively turning cold flows into hot flows, maintaining their ability to be completed.
*
* This class should not be used directly. Instead, use [Flow.asCached] to create a cached proxy of a flow.
*/
@SuppressLint("PrivateApi")
class FlowCache @PublishedApi internal constructor(context: Context) :
SQLiteOpenHelper(context, File(context.cacheDir, DATABASE_NAME).absolutePath, null, DATABASE_VERSION) {
init {
// we need to expand the cursor window past the original 2mb limit since we may have some massive JSON strings
try {
val field: Field = CursorWindow::class.java.getDeclaredField("sCursorWindowSize")
field.isAccessible = true
field.set(null, 25 * 1024 * 1024) // 25 MiB

Log.d(TAG, "Successfully expanded the cursor window size")
} catch (e: Exception) {
Log.w(TAG, "Failed to expand the cursor window size", e)
Log.w(TAG, "This may cause issues with large JSON strings in the cache, possible recomputes incoming!")
}
}

companion object {
// no need to declare a custom initial size of a hashmap: it will grow as needed efficiently
const val TAG = "FlowCache"
Expand Down Expand Up @@ -110,7 +128,7 @@ class FlowCache @PublishedApi internal constructor(context: Context) :
): Flow<T> {
val collectionLock = Mutex() // enforces linear collection of items
val itemsReceived = mutableListOf<T>()
val newItemFlow = MutableSharedFlow<T>(0)
val proxyFlow = MutableSharedFlow<T>(Int.MAX_VALUE)
val streamCompletedFlow = MutableStateFlow(false) // beware: no replay of this state

collectionCoroutineScope.launch {
Expand All @@ -130,15 +148,15 @@ class FlowCache @PublishedApi internal constructor(context: Context) :

if (cachedItems != null) {
Log.d(TAG, "Replaying cached items for key $cacheKey")
cachedItems.forEach { newItemFlow.emit(it) }
cachedItems.forEach { proxyFlow.emit(it) }
itemsReceived.addAll(cachedItems)
streamCompletedFlow.value = true
} else {
Log.d(TAG, "No cached items found for key $cacheKey, collecting from flow")
flow.collect {
collectionLock.withLock {
itemsReceived.add(it)
newItemFlow.emit(it)
proxyFlow.emit(it)
}
}
streamCompletedFlow.value = true
Expand All @@ -160,27 +178,43 @@ class FlowCache @PublishedApi internal constructor(context: Context) :

// we use a channel flow instead of flow since we launch a coroutine that adds items to the flow
val newProxyFlow = channelFlow {
val emissionLock = Mutex() // enforces linear emission of items
// NEW SUBSCRIBER

// although we could choose just to reply on the replay of the newItemFlow here,
// since we already have to collect the items in order to save them,
// we might as well just send them all and avoid registering as a collector earlier

// in addition, otherwise we could miss items if the streamCompletedFlow turns true while collecting but before we emit all items
if (streamCompletedFlow.value) {
itemsReceived.forEach { channel.send(it) }
Log.d(TAG, "NEW SUBSCRIBER: Stream already completed for cache key $cacheKey, emitting all items")
itemsReceived.forEach {
emissionLock.withLock {
channel.send(it)
}
}
return@channelFlow
} else {
// we are subscribing to the flow live
Log.d(
TAG,
"NEW SUBSCRIBER: Stream not yet completed for cache key $cacheKey, emitting items as they come"
)
val alreadyEmittedItems = mutableListOf<T>()

// tricky problem: if we cancel this job when the streamCompleted is true, we may have missed some items
val itemCollectionJob = collectionCoroutineScope.launch {
newItemFlow.collect {
channel.send(it)
alreadyEmittedItems.add(it)
proxyFlow.collect {
emissionLock.withLock {
// we may be cancelled with some items left to emit and process, so we need to track them in alreadyEmittedItems
alreadyEmittedItems.add(it)
channel.send(it)
}
}
}

streamCompletedFlow.waitForTrue()
itemCollectionJob.cancelAndJoin() // everything in alreadyEmittedItems is guaranteed to be emitted, and itemsReceived is guaranteed to be complete
itemCollectionJob.cancelAndJoin()

// emit items we haven't emitted yet but have received, remembering that all items are received in order
if (alreadyEmittedItems.size < itemsReceived.size) {
Expand Down

0 comments on commit e3dc7af

Please sign in to comment.