From 43f902852ed65a39ea43a8422c046d5fadb4ede2 Mon Sep 17 00:00:00 2001 From: Tyson Norris Date: Tue, 5 Mar 2019 15:46:20 -0800 Subject: [PATCH] "invoker clustered resources" --- .../core/containerpool/ContainerFactory.scala | 5 +- .../org/apache/openwhisk/utils/Events.scala | 22 ++ .../src/main/resources/application.conf | 5 +- .../core/containerpool/ContainerPool.scala | 196 ++++++++++++++++-- .../core/containerpool/ContainerProxy.scala | 24 +-- .../core/invoker/InvokerReactive.scala | 8 +- .../test/MesosContainerFactoryTest.scala | 2 +- .../test/ContainerPoolTests.scala | 3 +- .../test/ContainerProxyTests.scala | 11 +- 9 files changed, 238 insertions(+), 38 deletions(-) create mode 100644 common/scala/src/main/scala/org/apache/openwhisk/utils/Events.scala diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala index eeb37f301e9..3d069c36726 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala @@ -31,7 +31,10 @@ case class ContainerArgsConfig(network: String, dnsOptions: Seq[String] = Seq.empty, extraArgs: Map[String, Set[String]] = Map.empty) -case class ContainerPoolConfig(userMemory: ByteSize, concurrentPeekFactor: Double, akkaClient: Boolean) { +case class ContainerPoolConfig(userMemory: ByteSize, + concurrentPeekFactor: Double, + akkaClient: Boolean, + clusterManagedResources: Boolean) { require( concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0, s"concurrentPeekFactor must be > 0 and <= 1.0; was $concurrentPeekFactor") diff --git a/common/scala/src/main/scala/org/apache/openwhisk/utils/Events.scala b/common/scala/src/main/scala/org/apache/openwhisk/utils/Events.scala new file mode 100644 index 00000000000..e82306dd966 --- /dev/null +++ b/common/scala/src/main/scala/org/apache/openwhisk/utils/Events.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.utils + +/** NodeStats captures resource potential availability within a cluster. */ +case class NodeStats(mem: Double, cpu: Double, ports: Int) +case class NodeStatsUpdate(stats: Map[String, NodeStats]) diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf index c1d4bf717b8..e82b6af1f3f 100644 --- a/core/invoker/src/main/resources/application.conf +++ b/core/invoker/src/main/resources/application.conf @@ -39,8 +39,9 @@ whisk { container-pool { user-memory: 1024 m - concurrent-peek-factor: 0.5 #factor used to limit message peeking: 0 < factor <= 1.0 - larger number improves concurrent processing, but increases risk of message loss during invoker crash - akka-client: false # if true, use PoolingContainerClient for HTTP from invoker to action container (otherwise use ApacheBlockingContainerClient) + concurrent-peek-factor: 0.5 # Factor used to limit message peeking: 0 < factor <= 1.0 - larger number improves concurrent processing, but increases risk of message loss during invoker crash + akka-client: false # If true, use PoolingContainerClient for HTTP from invoker to action container (otherwise use ApacheBlockingContainerClient) + cluster-managed-resources: false # If false, container-pool.user-memory is used to determine pool capacity to launch containers, otherwise, ContainerPool.clusterCanLaunch() is used } kubernetes { diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala index 390eea84c40..ba755900a22 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala @@ -22,7 +22,10 @@ import org.apache.openwhisk.common.{AkkaLogging, LoggingMarkers, TransactionId} import org.apache.openwhisk.core.connector.MessageFeed import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.core.entity.size._ +import org.apache.openwhisk.utils.NodeStats +import org.apache.openwhisk.utils.NodeStatsUpdate import scala.collection.immutable +import scala.collection.mutable.ListBuffer import scala.concurrent.duration._ import scala.util.Try @@ -32,6 +35,15 @@ case object Free extends WorkerState case class WorkerData(data: ContainerData, state: WorkerState) +/** + * Reservation indicates resources allocated to container, but possibly not launched yet. May be negative for container stop. + * Pending: Allocated from this point of view, but not yet started/stopped by cluster manager. + * Scheduled: Started/Stopped by cluster manager, but not yet reflected in NodeStats, so must still be considered when allocating resources. + * */ +sealed abstract class Reservation(val size: Long) +case class Pending(override val size: Long) extends Reservation(size) +case class Scheduled(override val size: Long) extends Reservation(size) + /** * A pool managing containers to run actions on. * @@ -70,14 +82,30 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, var runBuffer = immutable.Queue.empty[Run] val logMessageInterval = 10.seconds - prewarmConfig.foreach { config => - logging.info(this, s"pre-warming ${config.count} ${config.exec.kind} ${config.memoryLimit.toString}")( - TransactionId.invokerWarmup) - (1 to config.count).foreach { _ => - prewarmContainer(config.exec, config.memoryLimit) + /** cluster state tracking */ + private var clusterReservations: Map[ActorRef, Reservation] = Map.empty + var clusterActionHostStats = Map.empty[String, NodeStats] //track the most recent node stats per action host (host that is able to run action containers) + + var prewarmsInitialized = false + + def initPrewarms() = { + prewarmConfig.foreach { config => + logging.info(this, s"pre-warming ${config.count} ${config.exec.kind} ${config.memoryLimit.toString}")( + TransactionId.invokerWarmup) + (1 to config.count).foreach { _ => + prewarmContainer(config.exec, config.memoryLimit) + } } } + //if cluster managed resources, subscribe to events + if (poolConfig.clusterManagedResources) { + logging.info(this, "subscribing to NodeStats updates") + context.system.eventStream.subscribe(self, classOf[NodeStatsUpdate]) + } else { + initPrewarms() + } + def logContainerStart(r: Run, containerState: String, activeActivations: Int, container: Option[Container]): Unit = { val namespaceName = r.msg.user.namespace.name val actionName = r.action.name.name @@ -89,6 +117,13 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, LoggingMarkers.INVOKER_CONTAINER_START(containerState), s"containerStart containerState: $containerState container: $container activations: $activeActivations of max $maxConcurrent action: $actionName namespace: $namespaceName activationId: $activationId", akka.event.Logging.InfoLevel) + if (poolConfig.clusterManagedResources) { + logging.info( + this, + s"node stats ${clusterActionHostStats} reserved ${clusterReservations.size} containers ${reservedSize}MB " + + s"${reservedStartCount} pending starts ${reservedStopCount} pending stops " + + s"${scheduledStartCount} scheduled starts ${scheduledStopCount} scheduled stops") + } } def receive: Receive = { @@ -98,6 +133,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, // their requests and send them back to the pool for rescheduling (this may happen if "docker" operations // fail for example, or a container has aged and was destroying itself when a new request was assigned) case r: Run => + implicit val tid: TransactionId = r.msg.transid // Check if the message is resent from the buffer. Only the first message on the buffer can be resent. val isResentFromBuffer = runBuffer.nonEmpty && runBuffer.dequeueOption.exists(_._1.msg == r.msg) @@ -106,8 +142,11 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, // It is guaranteed, that only the first message on the buffer is resent. if (runBuffer.isEmpty || isResentFromBuffer) { val createdContainer = - // Is there enough space on the invoker for this action to be executed. - if (hasPoolSpaceFor(busyPool, r.action.limits.memory.megabytes.MB)) { + // Is there enough space on the invoker (or the cluster manager) for this action to be executed. + if (poolConfig.clusterManagedResources || hasPoolSpaceFor( + poolConfig, + busyPool, + r.action.limits.memory.megabytes.MB)) { // Schedule a job to a warm container ContainerPool .schedule(r.action, r.msg.user.namespace.name, freePool) @@ -116,7 +155,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, // There was no warm/warming/warmingCold container. Try to take a prewarm container or a cold container. // Is there enough space to create a new container or do other containers have to be removed? - if (hasPoolSpaceFor(busyPool ++ freePool, r.action.limits.memory.megabytes.MB)) { + if (hasPoolSpaceFor(poolConfig, busyPool ++ freePool, r.action.limits.memory.megabytes.MB)) { takePrewarmContainer(r.action) .map(container => (container, "prewarmed")) .orElse(Some(createContainer(r.action.limits.memory.megabytes.MB), "cold")) @@ -125,8 +164,18 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, // Remove a container and create a new one for the given job ContainerPool // Only free up the amount, that is really needed to free up - .remove(freePool, Math.min(r.action.limits.memory.megabytes, memoryConsumptionOf(freePool)).MB) - .map(removeContainer) + .remove( + freePool, + if (!poolConfig.clusterManagedResources) { //do not allow overprovision when cluster manages resources + Math.min(r.action.limits.memory.megabytes, memoryConsumptionOf(freePool)).MB + } else { + r.action.limits.memory.megabytes.MB + }) + .map { a => + //decrease the reserved (not yet stopped container) memory tracker + addReservation(a, -r.action.limits.memory.megabytes) + removeContainer(a) + } // If the list had at least one entry, enough containers were removed to start the new container. After // removing the containers, we are not interested anymore in the containers that have been removed. .headOption @@ -228,10 +277,15 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, // Container is prewarmed and ready to take work case NeedWork(data: PreWarmedData) => + //stop tracking via reserved + releaseReservation(sender()) prewarmedPool = prewarmedPool + (sender() -> data) // Container got removed case ContainerRemoved => + //stop tracking via reserved + releaseReservation(sender()) + // if container was in free pool, it may have been processing (but under capacity), // so there is capacity to accept another job request freePool.get(sender()).foreach { f => @@ -250,23 +304,70 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, // 1. Container errored while resuming a warm container, could not process the job, and sent the job back // 2. The container aged, is destroying itself, and was assigned a job which it had to send back // 3. The container aged and is destroying itself + // 4. The container is paused, and being removed to make room for new containers // Update the free/busy lists but no message is sent to the feed since there is no change in capacity yet case RescheduleJob => + releaseReservation(sender()) freePool = freePool - sender() busyPool = busyPool - sender() + + case NodeStatsUpdate(stats) => + //clear Scheduled reservations (leave Pending) IFF stats are changing (in case of residual stats that have not accommodated the reservations' impacts) + pruneReserved(stats) + logging.info( + this, + s"received node stats ${stats} reserved/scheduled ${clusterReservations.size} containers ${reservedSize}MB") + clusterActionHostStats = stats + if (!prewarmsInitialized) { //we assume that when stats are received, we should startup prewarm containers + prewarmsInitialized = true + logging.info(this, "initializing prewarmpool after stats recevied") + initPrewarms() + } + + case ContainerStarted => //only used for receiving post-start from cold container + //stop tracking via reserved + releaseReservation(sender()) + } + + /** reservation adjustments */ + def addReservation(ref: ActorRef, size: Long): Unit = { + clusterReservations = clusterReservations + (ref -> Pending(size)) + } + def releaseReservation(ref: ActorRef): Unit = { + clusterReservations.get(ref).foreach { r => + clusterReservations = clusterReservations + (ref -> Scheduled(r.size)) + } + } + def pruneReserved(newStats: Map[String, NodeStats]) = { + //don't prune until all nodes have updated stats + if (clusterActionHostStats != newStats && clusterActionHostStats.keySet.subsetOf(newStats.keySet)) { + clusterReservations = clusterReservations.collect { case (key, p: Pending) => (key, p) } + } } /** Creates a new container and updates state accordingly. */ def createContainer(memoryLimit: ByteSize): (ActorRef, ContainerData) = { val ref = childFactory(context) val data = MemoryData(memoryLimit) + //increase the reserved (not yet started container) memory tracker + addReservation(ref, memoryLimit.toMB) freePool = freePool + (ref -> data) ref -> data } /** Creates a new prewarmed container */ - def prewarmContainer(exec: CodeExec[_], memoryLimit: ByteSize): Unit = - childFactory(context) ! Start(exec, memoryLimit) + def prewarmContainer(exec: CodeExec[_], memoryLimit: ByteSize): Unit = { + //when using cluster managed resources, we can only create prewarms when allowed by the cluster + if (!poolConfig.clusterManagedResources || hasPoolSpaceFor(poolConfig, freePool, memoryLimit)( + TransactionId.invokerWarmup)) { + val ref = childFactory(context) + ref ! Start(exec, memoryLimit) + //increase the reserved (not yet started container) memory tracker + addReservation(ref, memoryLimit.toMB) + } else { + logging.warn(this, "cannot create additional prewarm") + } + } /** * Takes a prewarm container out of the prewarmed pool @@ -306,12 +407,79 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, /** * Calculate if there is enough free memory within a given pool. * + * @param poolConfig The ContainerPoolConfig, which indicates who governs the resource accounting * @param pool The pool, that has to be checked, if there is enough free memory. * @param memory The amount of memory to check. + * @param reserve If true, then during check of cluster managed resources, check will attempt to reserve resources * @return true, if there is enough space for the given amount of memory. */ - def hasPoolSpaceFor[A](pool: Map[A, ContainerData], memory: ByteSize): Boolean = { - memoryConsumptionOf(pool) + memory.toMB <= poolConfig.userMemory.toMB + def hasPoolSpaceFor[A](poolConfig: ContainerPoolConfig, pool: Map[A, ContainerData], memory: ByteSize)( + implicit tid: TransactionId): Boolean = { + if (poolConfig.clusterManagedResources) { + clusterCanLaunch(memory) + } else { + memoryConsumptionOf(pool) + memory.toMB <= poolConfig.userMemory.toMB + } + } + + def reservedSize = clusterReservations.values.collect { case p: Pending => p.size }.sum + def reservedStartCount = clusterReservations.values.count { + case p: Pending => p.size >= 0 + case _ => false + } + def reservedStopCount = clusterReservations.values.count { + case p: Pending => p.size < 0 + case _ => false + } + def scheduledStartCount = clusterReservations.values.count { + case p: Scheduled => p.size >= 0 + case _ => false + } + def scheduledStopCount = clusterReservations.values.count { + case p: Scheduled => p.size < 0 + case _ => false + } + + private var clusterResourcesAvailable + : Boolean = false //track to log whenever there is a switch from cluster resources being available to not being available + + def clusterCanLaunch(memory: ByteSize)(implicit tid: TransactionId): Boolean = { + //make sure there is at least one node with unreerved mem > memory + val canLaunch = clusterHasPotentialMemoryCapacity(memory.toMB, clusterReservations.values.map(_.size).toList) //consider all reservations blocking till they are removed during NodeStatsUpdate + //log only when changing value + if (canLaunch != clusterResourcesAvailable) { + if (canLaunch) { + logging.info( + this, + s"cluster can launch action with ${memory.toMB}MB reserved:${reservedSize} freepool: ${memoryConsumptionOf(freePool)}") + } else { + logging.warn( + this, + s"cluster cannot launch action with ${memory.toMB}MB reserved:${reservedSize} freepool: ${memoryConsumptionOf(freePool)}") + } + } + clusterResourcesAvailable = canLaunch + canLaunch + } + + /** Return true to indicate there is expectation that there is "room" to launch a task with these memory/cpu/ports specs */ + def clusterHasPotentialMemoryCapacity(memory: Double, reserve: List[Long]): Boolean = { + //copy AgentStats, then deduct pending tasks + var availableResources = clusterActionHostStats.toList.sortBy(_._2.mem).toMap //sort by mem to match lowest value + val inNeedReserved = ListBuffer.empty ++ reserve + + var unmatched = 0 + + inNeedReserved.foreach { p => + //for each pending find an available offer that fits + availableResources.find(_._2.mem > p) match { + case Some(o) => + availableResources = availableResources + (o._1 -> o._2.copy(mem = o._2.mem - p)) + inNeedReserved -= p + case None => unmatched += 1 + } + } + unmatched == 0 && availableResources.exists(_._2.mem > memory) } } diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala index 7419bf3c0a8..67ce23ee124 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala @@ -176,6 +176,7 @@ case object RescheduleJob // job is sent back to parent and could not be process case class PreWarmCompleted(data: PreWarmedData) case class InitCompleted(data: WarmedData) case object RunCompleted +case object ContainerStarted /** * A proxy that wraps a Container. It is used to keep track of the lifecycle @@ -285,7 +286,7 @@ class ContainerProxy( // also update the feed and active ack; the container cleanup is queued // implicitly via a FailureMessage which will be processed later when the state // transitions to Running - val activation = ContainerProxy.constructWhiskActivation(job, None, Interval.zero, false, response) + val activation = ContainerProxy.constructWhiskActivation(job, None, Interval.zero, response) sendActiveAck( transid, activation, @@ -334,7 +335,9 @@ class ContainerProxy( when(Running) { // Intermediate state, we were able to start a container // and we keep it in case we need to destroy it. - case Event(completed: PreWarmCompleted, _) => stay using completed.data + case Event(completed: PreWarmCompleted, _) => + context.parent ! ContainerStarted + stay using completed.data // Init was successful case Event(completed: InitCompleted, _: PreWarmedData) => @@ -581,22 +584,12 @@ class ContainerProxy( val initRunInterval = initInterval .map(i => Interval(runInterval.start.minusMillis(i.duration.toMillis), runInterval.end)) .getOrElse(runInterval) - ContainerProxy.constructWhiskActivation( - job, - initInterval, - initRunInterval, - runInterval.duration >= actionTimeout, - response) + ContainerProxy.constructWhiskActivation(job, initInterval, initRunInterval, response) } } .recover { case InitializationError(interval, response) => - ContainerProxy.constructWhiskActivation( - job, - Some(interval), - interval, - interval.duration >= actionTimeout, - response) + ContainerProxy.constructWhiskActivation(job, Some(interval), interval, response) case t => // Actually, this should never happen - but we want to make sure to not miss a problem logging.error(this, s"caught unexpected error while running activation: ${t}") @@ -604,7 +597,6 @@ class ContainerProxy( job, None, Interval.zero, - false, ActivationResponse.whiskError(Messages.abnormalRun)) } @@ -719,7 +711,6 @@ object ContainerProxy { def constructWhiskActivation(job: Run, initInterval: Option[Interval], totalInterval: Interval, - isTimeout: Boolean, response: ActivationResponse) = { val causedBy = Some { if (job.msg.causedBySequence) { @@ -757,7 +748,6 @@ object ContainerProxy { Parameters(WhiskActivation.limitsAnnotation, job.action.limits.toJson) ++ Parameters(WhiskActivation.pathAnnotation, JsString(job.action.fullyQualifiedName(false).asString)) ++ Parameters(WhiskActivation.kindAnnotation, JsString(job.action.exec.kind)) ++ - Parameters(WhiskActivation.timeoutAnnotation, JsBoolean(isTimeout)) ++ causedBy ++ initTime }) } diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala index 5f682d71b4e..2207e52dd16 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala @@ -17,6 +17,8 @@ package org.apache.openwhisk.core.invoker +import akka.Done +import akka.actor.CoordinatedShutdown import java.nio.charset.StandardCharsets import java.time.Instant @@ -96,7 +98,11 @@ class InvokerReactive( "--ulimit" -> Set("nofile=1024:1024"), "--pids-limit" -> Set("1024")) ++ logsProvider.containerParameters) containerFactory.init() - sys.addShutdownHook(containerFactory.cleanup()) + + CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "invokerCleanup") { () => + containerFactory.cleanup() + Future.successful(Done) + } /** Initialize needed databases */ private val entityStore = WhiskEntityStore.datastore() diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala index 225a79bd842..abf09666f4f 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala @@ -81,7 +81,7 @@ class MesosContainerFactoryTest } // 80 slots, each 265MB - val poolConfig = ContainerPoolConfig(21200.MB, 0.5, false) + val poolConfig = ContainerPoolConfig(21200.MB, 0.5, false, false) val actionMemory = 265.MB val mesosCpus = poolConfig.cpuShare(actionMemory) / 1024.0 diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala index 431b3f2218e..6d11872c2fa 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala @@ -125,7 +125,7 @@ class ContainerPoolTests (containers, factory) } - def poolConfig(userMemory: ByteSize) = ContainerPoolConfig(userMemory, 0.5, false) + def poolConfig(userMemory: ByteSize) = ContainerPoolConfig(userMemory, 0.5, false, false) behavior of "ContainerPool" @@ -809,4 +809,5 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory { pool = pool - 'first ContainerPool.remove(pool, MemoryLimit.stdMemory) shouldBe List('second) } + } diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala index 866ee68c187..12657816bc4 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala @@ -133,6 +133,9 @@ class ContainerProxyTests def run(machine: ActorRef, currentState: ContainerState) = { machine ! Run(action, message) expectMsg(Transition(machine, currentState, Running)) + if (currentState == Uninitialized) { + expectMsg(ContainerStarted) + } expectWarmed(invocationNamespace.name, action) expectMsg(Transition(machine, Running, Ready)) } @@ -197,7 +200,7 @@ class ContainerProxyTests (transid: TransactionId, activation: WhiskActivation, context: UserContext) => Future.successful(()) } - val poolConfig = ContainerPoolConfig(2.MB, 0.5, false) + val poolConfig = ContainerPoolConfig(2.MB, 0.5, false, false) behavior of "ContainerProxy" @@ -449,6 +452,7 @@ class ContainerProxyTests machine ! Run(noLogsAction, message) expectMsg(Transition(machine, Uninitialized, Running)) + expectMsg(ContainerStarted) expectWarmed(invocationNamespace.name, noLogsAction) expectMsg(Transition(machine, Running, Ready)) @@ -730,6 +734,7 @@ class ContainerProxyTests registerCallback(machine) machine ! Run(action, message) expectMsg(Transition(machine, Uninitialized, Running)) + expectMsg(ContainerStarted) expectMsg(ContainerRemoved) // The message is sent as soon as the container decides to destroy itself expectMsg(Transition(machine, Running, Removing)) @@ -778,6 +783,7 @@ class ContainerProxyTests registerCallback(machine) machine ! Run(action, message) expectMsg(Transition(machine, Uninitialized, Running)) + expectMsg(ContainerStarted) expectMsg(ContainerRemoved) // The message is sent as soon as the container decides to destroy itself expectMsg(Transition(machine, Running, Removing)) @@ -816,6 +822,7 @@ class ContainerProxyTests registerCallback(machine) machine ! Run(action, message) expectMsg(Transition(machine, Uninitialized, Running)) + expectMsg(ContainerStarted) expectMsg(ContainerRemoved) // The message is sent as soon as the container decides to destroy itself expectMsg(Transition(machine, Running, Removing)) @@ -853,6 +860,7 @@ class ContainerProxyTests registerCallback(machine) machine ! Run(action, message) expectMsg(Transition(machine, Uninitialized, Running)) + expectMsg(ContainerStarted) expectMsg(ContainerRemoved) // The message is sent as soon as the container decides to destroy itself expectMsg(Transition(machine, Running, Removing)) @@ -984,6 +992,7 @@ class ContainerProxyTests // Start running the action machine ! Run(action, message) expectMsg(Transition(machine, Uninitialized, Running)) + expectMsg(ContainerStarted) // Schedule the container to be removed machine ! Remove