From f0faa8790f35bf0176a444ec9259e4fb47b3ba68 Mon Sep 17 00:00:00 2001 From: Tyson Norris Date: Mon, 17 Jun 2019 16:59:37 -0700 Subject: [PATCH] keep track of Container.lastUsed value properly to prevent early cleanup of containers --- .../AkkaClusterContainerResourceManager.scala | 1 + .../core/containerpool/ContainerPool.scala | 15 ++++- .../core/containerpool/ContainerProxy.scala | 6 +- .../test/ContainerPoolTests.scala | 61 +++++++++++++------ 4 files changed, 61 insertions(+), 22 deletions(-) diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaClusterContainerResourceManager.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaClusterContainerResourceManager.scala index 0ea3510cce5..dd9ea1fbbc9 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaClusterContainerResourceManager.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaClusterContainerResourceManager.scala @@ -125,6 +125,7 @@ class AkkaClusterContainerResourceManager(system: ActorSystem, val localRes = localReservations.values.map(_.size) //active local reservations val remoteRes = remoteReservations.values.toList.flatten.map(_.size) //remote/stale reservations + //TODO: consider potential to fit each reservation, then required memory for this action. val allRes = localRes ++ remoteRes //make sure there is at least one node with unreserved mem > memory val canLaunch = clusterHasPotentialMemoryCapacity(memory.toMB, allRes) //consider all reservations blocking till they are removed during NodeStatsUpdate diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala index 8e57d27bf2c..17b0c9464fb 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala @@ -259,7 +259,8 @@ class ContainerPool(instanceId: InvokerInstanceId, // Container is free to take more work case NeedWork(warmData: WarmedData) => val oldData = freePool.get(sender()).getOrElse(busyPool(sender())) - val newData = warmData.copy(activeActivationCount = oldData.activeActivationCount - 1) + val newData = + warmData.copy(lastUsed = oldData.lastUsed, activeActivationCount = oldData.activeActivationCount - 1) if (newData.activeActivationCount < 0) { logging.error(this, s"invalid activation count after warming < 1 ${newData}") } @@ -456,6 +457,7 @@ class ContainerPool(instanceId: InvokerInstanceId, } def updateUnused() = { + //TODO: exclude those not past idle grace val unused = freePool.filter(_._2.activeActivationCount == 0) resourceManager.updateUnused(unused) } @@ -567,6 +569,12 @@ object ContainerPool { // and has not been used since the removal was requested // and has not been used past the idle grace period val idleGraceInstant = Instant.now().minusSeconds(idleGrace.toSeconds) + + freePool.foreach { c => + println(s"c ${c._2.getContainer} ${c._2.lastUsed} ${idleGraceInstant} ${c._2.lastUsed + .isBefore(idleGraceInstant)} ${c._2.activeActivationCount}") + } + val toRemove = freePool //FILTER MATCHING MEMORY USAGE WITH LAST USE BEFORE IDLE GRACE INSTANT .filter { f => @@ -578,7 +586,10 @@ object ContainerPool { r.size == f._2.memoryLimit && r.lastUsed == f._2.lastUsed && r.containerAddress == f._2.getContainer.get.addr } } - toRemove.foreach(i => logging.info(this, s"removing idle container ${i._2.getContainer.get}")) + toRemove.foreach(i => + logging.info( + this, + s"removing idle container ${i._2.getContainer.get} with last use ${i._2.lastUsed} before idle grace at ${idleGraceInstant}")) toRemove.keySet } diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala index 4b5bcd52b87..85fe28b3d75 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala @@ -137,7 +137,7 @@ case class WarmingData(override val container: Container, extends ContainerStarted(container, lastUsed, action.limits.memory.megabytes.MB, activeActivationCount) with ContainerInUse { override val initingState = "warming" - override def nextRun(r: Run) = copy(activeActivationCount = activeActivationCount + 1) + override def nextRun(r: Run) = copy(lastUsed = Instant.now, activeActivationCount = activeActivationCount + 1) } /** type representing a cold (not yet running) container that is being initialized (for a specific action + invocation namespace) */ @@ -148,7 +148,7 @@ case class WarmingColdData(invocationNamespace: EntityName, extends ContainerNotStarted(lastUsed, action.limits.memory.megabytes.MB, activeActivationCount) with ContainerInUse { override val initingState = "warmingCold" - override def nextRun(r: Run) = copy(activeActivationCount = activeActivationCount + 1) + override def nextRun(r: Run) = copy(lastUsed = Instant.now, activeActivationCount = activeActivationCount + 1) } /** type representing a warm container that has already been in use (for a specific action + invocation namespace) */ @@ -160,7 +160,7 @@ case class WarmedData(override val container: Container, extends ContainerStarted(container, lastUsed, action.limits.memory.megabytes.MB, activeActivationCount) with ContainerInUse { override val initingState = "warmed" - override def nextRun(r: Run) = copy(activeActivationCount = activeActivationCount + 1) + override def nextRun(r: Run) = copy(lastUsed = Instant.now, activeActivationCount = activeActivationCount + 1) } // Events received by the actor diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala index 25f7f38c9b6..98a7bd3b6f8 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala @@ -45,6 +45,7 @@ import org.apache.openwhisk.core.entity.ExecManifest.RuntimeManifest import org.apache.openwhisk.core.entity.ExecManifest.ImageName import org.apache.openwhisk.core.entity.size._ import org.apache.openwhisk.core.connector.MessageFeed +import org.apache.openwhisk.core.containerpool.ContainerData /** * Behavior tests for the ContainerPool @@ -128,9 +129,7 @@ class ContainerPoolTests (containers, factory) } - def poolConfig(userMemory: ByteSize, - clusterMangedResources: Boolean = false, - idleGrace: FiniteDuration = 10.seconds) = + def poolConfig(userMemory: ByteSize, clusterMangedResources: Boolean = false, idleGrace: FiniteDuration = 0.seconds) = ContainerPoolConfig(userMemory, 0.5, false, clusterMangedResources, false, 10, idleGrace) val instanceId = InvokerInstanceId(0, userMemory = 1024.MB) @@ -700,12 +699,13 @@ class ContainerPoolTests val (containers, factory) = testContainers(2) val feed = TestProbe() val resMgr = mock[ContainerResourceManager] //mock to capture invocations + val idleGrace = 1.seconds val pool = TestActorRef( ContainerPool .props( instanceId, factory, - poolConfig(MemoryLimit.stdMemory), + poolConfig(MemoryLimit.stdMemory, idleGrace = idleGrace), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit)), Some(resMgr))) @@ -722,7 +722,11 @@ class ContainerPoolTests (() => resMgr.activationStartLogMessage()).expects().returning("").repeat(2) //expect the container to become unused after second NeedWork - (resMgr.updateUnused(_: Map[ActorRef, ContainerData])).expects(Map(containers(0).ref -> warmed)).atLeastOnce() + (resMgr + .updateUnused(_: Map[ActorRef, ContainerData])) + .expects(where { m: Map[ActorRef, ContainerData] => + m.values.head.getContainer == warmed.getContainer + }) pool ! runMessageConcurrent pool ! runMessageConcurrent @@ -945,15 +949,8 @@ class ContainerPoolTests it should "remove unused on ReleaseFree" in { val (containers, factory) = testContainers(2) val feed = TestProbe() - val resMgr = new ContainerResourceManager { - override def canLaunch(size: ByteSize, - poolMemory: Long, - poolConfig: ContainerPoolConfig, - prewarm: Boolean): Boolean = { - true - } - } - val idleGrace = 25.seconds + val resMgr = mock[ContainerResourceManager] //mock to capture invocations + val idleGrace = 1.seconds val pool = TestActorRef( ContainerPool .props( @@ -963,8 +960,34 @@ class ContainerPoolTests feed.ref, List(PrewarmingConfig(1, exec, memoryLimit)), Some(resMgr))) - val warmed = warmedData(lastUsed = Instant.now().minusSeconds(idleGrace.toSeconds)) //will only be released if lastUsed is after idleGrace + val warmed = warmedData() //will only be released if lastUsed is after idleGrace + + (resMgr + .canLaunch(_: ByteSize, _: Long, _: ContainerPoolConfig, _: Boolean)) + .expects(memoryLimit, 0, *, false) + .returning(true) + .repeat(3) + + (resMgr.addReservation(_: ActorRef, _: ByteSize)).expects(*, memoryLimit) + + (resMgr.updateUnused(_: Map[ActorRef, ContainerData])).expects(Map.empty[ActorRef, ContainerData]).atLeastOnce() + (() => resMgr.activationStartLogMessage()).expects().returning("").repeat(2) + + //expect the container to become unused after second NeedWork + var warmLastUsed: Instant = null + (resMgr + .updateUnused(_: Map[ActorRef, ContainerData])) + .expects(where { m: Map[ActorRef, ContainerData] => + if (m.values.head.getContainer == warmed.getContainer) { + warmLastUsed = m.values.head.lastUsed //this bit of hackery allows us to capture the lastUsed value from the arg + true + } else { + false + } + + }) + println(s"last used ${warmed.lastUsed}") pool ! runMessageConcurrent pool ! runMessageConcurrent @@ -972,9 +995,13 @@ class ContainerPoolTests containers(0).expectMsg(runMessageConcurrent) containers(0).send(pool, NeedWork(warmed)) - containers(0).send(pool, NeedWork(warmed)) //container will become unused, and removable now + containers(0) + .send(pool, NeedWork(warmed)) - pool ! ReleaseFree(List(RemoteContainerRef(memoryLimit, warmed.lastUsed, warmed.container.addr))) + //container will become unused, and removable only after idle grace period + Thread.sleep(idleGrace.toMillis + 1) + + pool ! ReleaseFree(List(RemoteContainerRef(memoryLimit, warmLastUsed, warmed.container.addr))) containers(0).expectMsg(Remove)