Skip to content

Commit

Permalink
keep track of Container.lastUsed value properly to prevent early clea…
Browse files Browse the repository at this point in the history
…nup of containers
  • Loading branch information
tysonnorris committed Jun 27, 2019
1 parent 23e9cf0 commit f0faa87
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ class AkkaClusterContainerResourceManager(system: ActorSystem,
val localRes = localReservations.values.map(_.size) //active local reservations
val remoteRes = remoteReservations.values.toList.flatten.map(_.size) //remote/stale reservations

//TODO: consider potential to fit each reservation, then required memory for this action.
val allRes = localRes ++ remoteRes
//make sure there is at least one node with unreserved mem > memory
val canLaunch = clusterHasPotentialMemoryCapacity(memory.toMB, allRes) //consider all reservations blocking till they are removed during NodeStatsUpdate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ class ContainerPool(instanceId: InvokerInstanceId,
// Container is free to take more work
case NeedWork(warmData: WarmedData) =>
val oldData = freePool.get(sender()).getOrElse(busyPool(sender()))
val newData = warmData.copy(activeActivationCount = oldData.activeActivationCount - 1)
val newData =
warmData.copy(lastUsed = oldData.lastUsed, activeActivationCount = oldData.activeActivationCount - 1)
if (newData.activeActivationCount < 0) {
logging.error(this, s"invalid activation count after warming < 1 ${newData}")
}
Expand Down Expand Up @@ -456,6 +457,7 @@ class ContainerPool(instanceId: InvokerInstanceId,
}

def updateUnused() = {
//TODO: exclude those not past idle grace
val unused = freePool.filter(_._2.activeActivationCount == 0)
resourceManager.updateUnused(unused)
}
Expand Down Expand Up @@ -567,6 +569,12 @@ object ContainerPool {
// and has not been used since the removal was requested
// and has not been used past the idle grace period
val idleGraceInstant = Instant.now().minusSeconds(idleGrace.toSeconds)

freePool.foreach { c =>
println(s"c ${c._2.getContainer} ${c._2.lastUsed} ${idleGraceInstant} ${c._2.lastUsed
.isBefore(idleGraceInstant)} ${c._2.activeActivationCount}")
}

val toRemove = freePool
//FILTER MATCHING MEMORY USAGE WITH LAST USE BEFORE IDLE GRACE INSTANT
.filter { f =>
Expand All @@ -578,7 +586,10 @@ object ContainerPool {
r.size == f._2.memoryLimit && r.lastUsed == f._2.lastUsed && r.containerAddress == f._2.getContainer.get.addr
}
}
toRemove.foreach(i => logging.info(this, s"removing idle container ${i._2.getContainer.get}"))
toRemove.foreach(i =>
logging.info(
this,
s"removing idle container ${i._2.getContainer.get} with last use ${i._2.lastUsed} before idle grace at ${idleGraceInstant}"))
toRemove.keySet
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ case class WarmingData(override val container: Container,
extends ContainerStarted(container, lastUsed, action.limits.memory.megabytes.MB, activeActivationCount)
with ContainerInUse {
override val initingState = "warming"
override def nextRun(r: Run) = copy(activeActivationCount = activeActivationCount + 1)
override def nextRun(r: Run) = copy(lastUsed = Instant.now, activeActivationCount = activeActivationCount + 1)
}

/** type representing a cold (not yet running) container that is being initialized (for a specific action + invocation namespace) */
Expand All @@ -148,7 +148,7 @@ case class WarmingColdData(invocationNamespace: EntityName,
extends ContainerNotStarted(lastUsed, action.limits.memory.megabytes.MB, activeActivationCount)
with ContainerInUse {
override val initingState = "warmingCold"
override def nextRun(r: Run) = copy(activeActivationCount = activeActivationCount + 1)
override def nextRun(r: Run) = copy(lastUsed = Instant.now, activeActivationCount = activeActivationCount + 1)
}

/** type representing a warm container that has already been in use (for a specific action + invocation namespace) */
Expand All @@ -160,7 +160,7 @@ case class WarmedData(override val container: Container,
extends ContainerStarted(container, lastUsed, action.limits.memory.megabytes.MB, activeActivationCount)
with ContainerInUse {
override val initingState = "warmed"
override def nextRun(r: Run) = copy(activeActivationCount = activeActivationCount + 1)
override def nextRun(r: Run) = copy(lastUsed = Instant.now, activeActivationCount = activeActivationCount + 1)
}

// Events received by the actor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import org.apache.openwhisk.core.entity.ExecManifest.RuntimeManifest
import org.apache.openwhisk.core.entity.ExecManifest.ImageName
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.connector.MessageFeed
import org.apache.openwhisk.core.containerpool.ContainerData

/**
* Behavior tests for the ContainerPool
Expand Down Expand Up @@ -128,9 +129,7 @@ class ContainerPoolTests
(containers, factory)
}

def poolConfig(userMemory: ByteSize,
clusterMangedResources: Boolean = false,
idleGrace: FiniteDuration = 10.seconds) =
def poolConfig(userMemory: ByteSize, clusterMangedResources: Boolean = false, idleGrace: FiniteDuration = 0.seconds) =
ContainerPoolConfig(userMemory, 0.5, false, clusterMangedResources, false, 10, idleGrace)

val instanceId = InvokerInstanceId(0, userMemory = 1024.MB)
Expand Down Expand Up @@ -700,12 +699,13 @@ class ContainerPoolTests
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val resMgr = mock[ContainerResourceManager] //mock to capture invocations
val idleGrace = 1.seconds
val pool = TestActorRef(
ContainerPool
.props(
instanceId,
factory,
poolConfig(MemoryLimit.stdMemory),
poolConfig(MemoryLimit.stdMemory, idleGrace = idleGrace),
feed.ref,
List(PrewarmingConfig(1, exec, memoryLimit)),
Some(resMgr)))
Expand All @@ -722,7 +722,11 @@ class ContainerPoolTests
(() => resMgr.activationStartLogMessage()).expects().returning("").repeat(2)

//expect the container to become unused after second NeedWork
(resMgr.updateUnused(_: Map[ActorRef, ContainerData])).expects(Map(containers(0).ref -> warmed)).atLeastOnce()
(resMgr
.updateUnused(_: Map[ActorRef, ContainerData]))
.expects(where { m: Map[ActorRef, ContainerData] =>
m.values.head.getContainer == warmed.getContainer
})

pool ! runMessageConcurrent
pool ! runMessageConcurrent
Expand Down Expand Up @@ -945,15 +949,8 @@ class ContainerPoolTests
it should "remove unused on ReleaseFree" in {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
val resMgr = new ContainerResourceManager {
override def canLaunch(size: ByteSize,
poolMemory: Long,
poolConfig: ContainerPoolConfig,
prewarm: Boolean): Boolean = {
true
}
}
val idleGrace = 25.seconds
val resMgr = mock[ContainerResourceManager] //mock to capture invocations
val idleGrace = 1.seconds
val pool = TestActorRef(
ContainerPool
.props(
Expand All @@ -963,18 +960,48 @@ class ContainerPoolTests
feed.ref,
List(PrewarmingConfig(1, exec, memoryLimit)),
Some(resMgr)))
val warmed = warmedData(lastUsed = Instant.now().minusSeconds(idleGrace.toSeconds)) //will only be released if lastUsed is after idleGrace
val warmed = warmedData() //will only be released if lastUsed is after idleGrace

(resMgr
.canLaunch(_: ByteSize, _: Long, _: ContainerPoolConfig, _: Boolean))
.expects(memoryLimit, 0, *, false)
.returning(true)
.repeat(3)

(resMgr.addReservation(_: ActorRef, _: ByteSize)).expects(*, memoryLimit)

(resMgr.updateUnused(_: Map[ActorRef, ContainerData])).expects(Map.empty[ActorRef, ContainerData]).atLeastOnce()
(() => resMgr.activationStartLogMessage()).expects().returning("").repeat(2)

//expect the container to become unused after second NeedWork
var warmLastUsed: Instant = null
(resMgr
.updateUnused(_: Map[ActorRef, ContainerData]))
.expects(where { m: Map[ActorRef, ContainerData] =>
if (m.values.head.getContainer == warmed.getContainer) {
warmLastUsed = m.values.head.lastUsed //this bit of hackery allows us to capture the lastUsed value from the arg
true
} else {
false
}

})

println(s"last used ${warmed.lastUsed}")
pool ! runMessageConcurrent
pool ! runMessageConcurrent

containers(0).expectMsg(runMessageConcurrent)
containers(0).expectMsg(runMessageConcurrent)

containers(0).send(pool, NeedWork(warmed))
containers(0).send(pool, NeedWork(warmed)) //container will become unused, and removable now
containers(0)
.send(pool, NeedWork(warmed))

pool ! ReleaseFree(List(RemoteContainerRef(memoryLimit, warmed.lastUsed, warmed.container.addr)))
//container will become unused, and removable only after idle grace period
Thread.sleep(idleGrace.toMillis + 1)

pool ! ReleaseFree(List(RemoteContainerRef(memoryLimit, warmLastUsed, warmed.container.addr)))

containers(0).expectMsg(Remove)

Expand Down

0 comments on commit f0faa87

Please sign in to comment.