Skip to content

Commit

Permalink
Cleanup jobs in Scheduler on build cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Chelombitko committed Dec 12, 2024
1 parent bfdfcb0 commit c0382b7
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
3 changes: 3 additions & 0 deletions core/src/main/kotlin/com/malinskiy/marathon/Marathon.kt
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ class Marathon(
}

override fun close() {
if (::scheduler.isInitialized) {
scheduler.close()
}
deviceProvider.close()
cacheService.close()
analytics.close()
Expand Down
17 changes: 10 additions & 7 deletions core/src/main/kotlin/com/malinskiy/marathon/execution/Scheduler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.cancel
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.delay
Expand All @@ -48,14 +49,11 @@ class Scheduler(
private val track: Track,
private val timer: Timer,
context: CoroutineContext
) {

private val job = Job()
private val pools = ConcurrentHashMap<DevicePoolId, SendChannel<FromScheduler>>()
private val poolingStrategy = configuration.poolingStrategy
) : AutoCloseable {

private val logger = MarathonLogging.getLogger(Scheduler::class.java)

private val pools = ConcurrentHashMap<DevicePoolId, SendChannel<FromScheduler>>()
private val job = Job()
private val scope = CoroutineScope(context)

suspend fun initialize() {
Expand Down Expand Up @@ -106,6 +104,11 @@ class Scheduler(
}
}

override fun close() {
scope.cancel()
job.cancel()
}

private fun subscribeToCacheController() {
scope.launch {
for (cacheResult in cacheLoader.results) {
Expand Down Expand Up @@ -160,7 +163,7 @@ class Scheduler(
return
}

val poolId = poolingStrategy.associate(device)
val poolId = configuration.poolingStrategy.associate(device)
logger.debug("[{}] Associated with pool {}", device.serialNumber, poolId)
pools.computeIfAbsent(poolId) { id ->
logger.debug("Creating pool actor {}", id)
Expand Down

0 comments on commit c0382b7

Please sign in to comment.