Skip to content

Commit

Permalink
Merge pull request #2 from kkostov/feature/offline-cache
Browse files Browse the repository at this point in the history
Add a basic persistent local cache for signals
  • Loading branch information
winsmith authored Feb 6, 2022
2 parents 46875ee + f403a8a commit a9bf6fe
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 9 deletions.
64 changes: 64 additions & 0 deletions lib/src/main/java/com/telemetrydeck/sdk/PersistentSignalCache.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.telemetrydeck.sdk

import kotlinx.serialization.json.Json
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.encodeToString
import java.io.File
import java.lang.Exception

class PersistentSignalCache(private var signalQueue: MutableList<Signal> = mutableListOf()): SignalCache {
val cacheFileName: String = "telemetrydeck.json"
var file: File? = null

constructor(cacheDir: File, logger: DebugLogger?) : this() {
if (!cacheDir.isDirectory) {
logger?.error("Expected a folder but received a file instead.")
return
}

val writeFile = File(cacheDir, cacheFileName)
logger?.debug("Using signal cache at ${writeFile.absolutePath}.")
if (writeFile.exists()) {
logger?.debug("Detected existing signal cache, attempting to parse...")
val content = writeFile.readText()
try {
val oldSignals: List<Signal> = Json.decodeFromString(content)
logger?.error("Restoring ${oldSignals.count()} signals from cache.")
signalQueue.addAll(oldSignals)
} catch (e: Exception) {
logger?.error("Failed to parse signal cache.")
}
}
file = writeFile
saveSignals()
}

override fun add(signal: Signal) {
synchronized(this) {
signalQueue.add(signal)
saveSignals()
}

}

override fun empty(): List<Signal> {
synchronized(this) {
val items = signalQueue.toList()
signalQueue = mutableListOf()
saveSignals()
return items
}
}

override fun count(): Int {
synchronized(this) {
return signalQueue.count()
}
}

fun saveSignals() {
file?.createNewFile()
val json = Json.encodeToString(signalQueue)
file?.writeText(json)
}
}
30 changes: 21 additions & 9 deletions lib/src/main/java/com/telemetrydeck/sdk/TelemetryManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ class TelemetryManager(
suspend fun send(
signals: List<Signal>
) {
val client = TelemetryClient(configuration.telemetryAppID, configuration.apiBaseURL, configuration.showDebugLogs, logger)
val client = TelemetryClient(
configuration.telemetryAppID,
configuration.apiBaseURL,
configuration.showDebugLogs,
logger
)
client.send(signals)
}

Expand All @@ -94,18 +99,18 @@ class TelemetryManager(
val hashedUser = hashString(userValue, "SHA-256")
val payload = SignalPayload(additionalPayload = enrichedPayload)
val signal = Signal(
appID=configuration.telemetryAppID,
type=signalType,
clientUser=hashedUser,
payload=payload.asMultiValueDimension,
appID = configuration.telemetryAppID,
type = signalType,
clientUser = hashedUser,
payload = payload.asMultiValueDimension,
isTestMode = configuration.testMode
)
signal.sessionID = this.configuration.sessionID.toString()
logger?.debug("Created a signal ${signal.type}, session ${signal.sessionID}, test ${signal.isTestMode}")
return signal
}

private fun hashString(input: String, algorithm: String): String {
private fun hashString(input: String, algorithm: String): String {
return MessageDigest.getInstance(algorithm)
.digest(input.toByteArray())
.fold("", { str, it -> str + "%02x".format(it) })
Expand Down Expand Up @@ -332,7 +337,8 @@ class TelemetryManager(
} else {
// do not change testMode if it was provided through a configuration object
if (initConfiguration) {
config.testMode = 0 != (context?.applicationInfo?.flags ?: 0) and ApplicationInfo.FLAG_DEBUGGABLE
config.testMode = 0 != (context?.applicationInfo?.flags
?: 0) and ApplicationInfo.FLAG_DEBUGGABLE
}
}

Expand All @@ -358,11 +364,17 @@ class TelemetryManager(
manager.logger = logger
manager.installProviders(context)

val broadcaster = TelemetryBroadcastTimer(WeakReference(manager), WeakReference(manager.logger))
val broadcaster =
TelemetryBroadcastTimer(WeakReference(manager), WeakReference(manager.logger))
broadcaster.start()
manager.broadcastTimer = broadcaster

manager.cache = MemorySignalCache()
if (context != null) {
manager.cache = PersistentSignalCache(context.cacheDir, logger)
} else {
manager.cache = MemorySignalCache()
}

return manager
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package com.telemetrydeck.sdk

import androidx.arch.core.executor.testing.InstantTaskExecutorRule
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.json.Json
import kotlinx.coroutines.*
import org.junit.Assert
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import java.io.File
import java.util.*

class PersistentSignalCacheTest {
@get:Rule
val instantTaskExecutorRule = InstantTaskExecutorRule()

@get:Rule
var folder = TemporaryFolder()

@Test
fun persistentSignalCache_starts_with_empty_queue() {
val cacheDir = folder.newFolder()
val sut = PersistentSignalCache(cacheDir, null)

Assert.assertEquals(0, sut.count())
}

@Test
fun persistentSignalCache_creates_cache_file() {
val cacheDir = folder.newFolder()
val sut = PersistentSignalCache(cacheDir, null)

val cacheFile = File(cacheDir, sut.cacheFileName)

Assert.assertTrue(cacheFile.exists())

val json = cacheFile.readText()
val signals = Json.decodeFromString<List<Signal>>(json)
Assert.assertEquals(0, signals.count())
}

@Test
fun persistentSignalCache_new_signals_added_to_cache() {
val cacheDir = folder.newFolder()
val sut = PersistentSignalCache(cacheDir, null)
sut.add(Signal(UUID.randomUUID(), "type", "user", SignalPayload()))
val cacheFile = File(cacheDir, sut.cacheFileName)

val json = cacheFile.readText()
val signals = Json.decodeFromString<List<Signal>>(json)
Assert.assertEquals(1, signals.count())
Assert.assertEquals("type", signals[0].type)
Assert.assertEquals("user", signals[0].clientUser)
}

@Test
fun persistentSignalCache_empty_clears_cache() {
val cacheDir = folder.newFolder()
val sut = PersistentSignalCache(cacheDir, null)
sut.add(Signal(UUID.randomUUID(), "type", "user", SignalPayload()))
sut.empty()
val cacheFile = File(cacheDir, sut.cacheFileName)

val json = cacheFile.readText()
val signals = Json.decodeFromString<List<Signal>>(json)
Assert.assertEquals(0, signals.count())
}

@DelicateCoroutinesApi
@Test
fun persistentSignalCache_allows_adding_signals_concurrently() = runBlocking {
val cacheDir = folder.newFolder()
val sut = PersistentSignalCache(cacheDir, null)

// the cache should accept signals from concurrent coroutines
val scope = CoroutineScope(newFixedThreadPoolContext(4, "pool"))
scope.launch {
val coroutines = 1.rangeTo(100).map {
// create 100 coroutines (light-weight threads).
launch {
for (i in 1..20) { // and in each of them, add 20 signals
sut.add(Signal(UUID.randomUUID(), "type", "user", SignalPayload()))
}
}
}

coroutines.forEach { coroutine ->
coroutine.join() // wait for all coroutines to finish their jobs.
}
}.join()

val cacheFile = File(cacheDir, sut.cacheFileName)

val json = cacheFile.readText()
val signals = Json.decodeFromString<List<Signal>>(json)
Assert.assertEquals(2000, signals.count())
}
}

0 comments on commit a9bf6fe

Please sign in to comment.