From 6d174246aa66658654094c7c1c5913a4ac5b73a6 Mon Sep 17 00:00:00 2001 From: Tyson Norris Date: Thu, 27 Jun 2019 15:06:17 -0700 Subject: [PATCH] review feedback --- .../core/containerpool/ContainerPool.scala | 98 ++++++++-------- .../core/invoker/InvokerReactive.scala | 12 +- .../test/ContainerPoolTests.scala | 106 +++++++++++------- 3 files changed, 118 insertions(+), 98 deletions(-) diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala index 17b0c9464fb..1750b381817 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala @@ -59,25 +59,21 @@ case object EmitMetrics * @param feed actor to request more work from * @param prewarmConfig optional settings for container prewarming * @param poolConfig config for the ContainerPool - * @param resMgr ContainerResourceManager impl + * @param resMgrFactory factory for ContainerResourceManager impl */ class ContainerPool(instanceId: InvokerInstanceId, childFactory: ActorRefFactory => ActorRef, feed: ActorRef, prewarmConfig: List[PrewarmingConfig] = List.empty, poolConfig: ContainerPoolConfig, - resMgr: Option[ContainerResourceManager]) + resMgrFactory: (ActorRef) => ContainerResourceManager) extends Actor { import ContainerPool.memoryConsumptionOf implicit val logging = new AkkaLogging(context.system.log) implicit val ec = context.dispatcher - val resourceManager = resMgr.getOrElse(if (poolConfig.clusterManagedResources) { - new AkkaClusterContainerResourceManager(context.system, instanceId, self, poolConfig) - } else { - new LocalContainerResourceManager(self) - }) + val resourceManager = resMgrFactory(self) var freePool = immutable.Map.empty[ActorRef, ContainerData] var busyPool = immutable.Map.empty[ActorRef, ContainerData] @@ -113,9 +109,9 @@ class ContainerPool(instanceId: InvokerInstanceId, r.msg.transid.mark( this, LoggingMarkers.INVOKER_CONTAINER_START(containerState), - s"containerStart containerState: $containerState container: $container activations: $activeActivations of max $maxConcurrent action: $actionName namespace: $namespaceName activationId: $activationId", + s"containerStart containerState: $containerState container: $container activations: $activeActivations of max $maxConcurrent action: $actionName namespace: $namespaceName activationId: $activationId ${resourceManager + .activationStartLogMessage()}", akka.event.Logging.InfoLevel) - resourceManager.activationStartLogMessage() } def receive: Receive = { @@ -133,15 +129,13 @@ class ContainerPool(instanceId: InvokerInstanceId, // next request to process // It is guaranteed, that only the first message on the buffer is resent. if (runBuffer.isEmpty || isResentFromBuffer) { + val warmContainer = ContainerPool + .schedule(r.action, r.msg.user.namespace.name, freePool) val createdContainer = // Is there enough space on the invoker (or the cluster manager) for this action to be executed. - if (poolConfig.clusterManagedResources || hasPoolSpaceFor( - poolConfig, - busyPool, - r.action.limits.memory.megabytes.MB)) { + if (warmContainer.isDefined || hasPoolSpaceFor(poolConfig, busyPool, r.action.limits.memory.megabytes.MB)) { // Schedule a job to a warm container - ContainerPool - .schedule(r.action, r.msg.user.namespace.name, freePool) + warmContainer .map(container => (container, container._2.initingState)) //warmed, warming, and warmingCold always know their state .orElse( // There was no warm/warming/warmingCold container. Try to take a prewarm container or a cold container. @@ -245,10 +239,6 @@ class ContainerPool(instanceId: InvokerInstanceId, // Add this request to the buffer, as it is not there yet. runBuffer = runBuffer.enqueue(r) } - if (!poolConfig.clusterManagedResources) { - // As this request is the first one in the buffer, try again to execute it. - self ! Run(r.action, r.msg, retryLogDeadline) - } //cannot do this in cluster managed resources, since it will introduce a tight loop } } else { // There are currently actions waiting to be executed before this action gets executed. @@ -309,7 +299,7 @@ class ContainerPool(instanceId: InvokerInstanceId, processBuffer() sender() } - //if container was neither free or busy, + //if container was neither free or busy, (should never happen, just being defensive) if (foundFree.orElse(foundBusy).isEmpty) { processBuffer() } @@ -342,43 +332,23 @@ class ContainerPool(instanceId: InvokerInstanceId, //remove each ref, IFF it is still not in use, and has not been used for the idle grade period ContainerPool.findIdlesToRemove(poolConfig.clusterManagedIdleGrace, freePool, refs).foreach(removeContainer) case EmitMetrics => - logging.info( - this, - s"metrics invoker (self) has ${runBuffer.size} buffered (${runBuffer.map(_.action.limits.memory.megabytes).sum}MB)") - - MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_RUNBUFFER_COUNT, runBuffer.size) - MetricEmitter.emitGaugeMetric( - LoggingMarkers.CONTAINER_POOL_RUNBUFFER_SIZE, - runBuffer.map(_.action.limits.memory.megabytes).sum) - val containersInUse = inUse - MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_ACTIVE_COUNT, containersInUse.size) - MetricEmitter.emitGaugeMetric( - LoggingMarkers.CONTAINER_POOL_ACTIVE_SIZE, - containersInUse.map(_._2.memoryLimit.toMB).sum) - MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_COUNT, prewarmedPool.size) - MetricEmitter.emitGaugeMetric( - LoggingMarkers.CONTAINER_POOL_PREWARM_SIZE, - prewarmedPool.map(_._2.memoryLimit.toMB).sum) + emitMetrics() } /** Buffer processing in cluster managed resources means to send the first item in runBuffer; * In non-clustered case, it means signalling MessageFeed (since runBuffer is processed in tight loop). * */ def processBuffer() = { - if (poolConfig.clusterManagedResources) { - //if next runbuffer item has not already been resent, send it - runBuffer.dequeueOption match { - case Some((run, _)) => //run the first from buffer - //avoid sending dupes - if (!resent.contains(run.msg.activationId)) { - resent = resent + run.msg.activationId - self ! run - } - case None => //feed me! - feed ! MessageFeed.Processed - } - } else { //TODO: should this ever be used? - feed ! MessageFeed.Processed + //if next runbuffer item has not already been resent, send it + runBuffer.dequeueOption match { + case Some((run, _)) => //run the first from buffer + //avoid sending dupes + if (!resent.contains(run.msg.activationId)) { + resent = resent + run.msg.activationId + self ! run + } + case None => //feed me! + feed ! MessageFeed.Processed } } @@ -461,6 +431,26 @@ class ContainerPool(instanceId: InvokerInstanceId, val unused = freePool.filter(_._2.activeActivationCount == 0) resourceManager.updateUnused(unused) } + + def emitMetrics() = { + logging.info( + this, + s"metrics invoker (self) has ${runBuffer.size} buffered (${runBuffer.map(_.action.limits.memory.megabytes).sum}MB)") + + MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_RUNBUFFER_COUNT, runBuffer.size) + MetricEmitter.emitGaugeMetric( + LoggingMarkers.CONTAINER_POOL_RUNBUFFER_SIZE, + runBuffer.map(_.action.limits.memory.megabytes).sum) + val containersInUse = inUse + MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_ACTIVE_COUNT, containersInUse.size) + MetricEmitter.emitGaugeMetric( + LoggingMarkers.CONTAINER_POOL_ACTIVE_SIZE, + containersInUse.map(_._2.memoryLimit.toMB).sum) + MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_COUNT, prewarmedPool.size) + MetricEmitter.emitGaugeMetric( + LoggingMarkers.CONTAINER_POOL_PREWARM_SIZE, + prewarmedPool.map(_._2.memoryLimit.toMB).sum) + } } object ContainerPool { @@ -597,9 +587,9 @@ object ContainerPool { factory: ActorRefFactory => ActorRef, poolConfig: ContainerPoolConfig, feed: ActorRef, - prewarmConfig: List[PrewarmingConfig] = List.empty, - resMgr: Option[ContainerResourceManager] = None) = - Props(new ContainerPool(instanceId, factory, feed, prewarmConfig, poolConfig, resMgr)) + resMgrFactory: ActorRef => ContainerResourceManager, + prewarmConfig: List[PrewarmingConfig] = List.empty) = + Props(new ContainerPool(instanceId, factory, feed, prewarmConfig, poolConfig, resMgrFactory)) } /** Contains settings needed to perform container prewarming. */ diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala index 30b604083e1..f49f363a37a 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala @@ -17,8 +17,7 @@ package org.apache.openwhisk.core.invoker -import akka.Done -import akka.actor.CoordinatedShutdown +import akka.actor.ActorRef import java.nio.charset.StandardCharsets import java.time.Instant @@ -39,7 +38,6 @@ import org.apache.openwhisk.http.Messages import org.apache.openwhisk.spi.SpiLoader import pureconfig._ import spray.json._ - import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} @@ -207,8 +205,14 @@ class InvokerReactive( }.toList } + val resMgrFactory = if (poolConfig.clusterManagedResources) { pool: ActorRef => + new AkkaClusterContainerResourceManager(actorSystem, instance, pool, poolConfig) + } else { pool: ActorRef => + new LocalContainerResourceManager(pool) + } private val pool = - actorSystem.actorOf(ContainerPool.props(instance, childFactory, poolConfig, activationFeed, prewarmingConfigs)) + actorSystem.actorOf( + ContainerPool.props(instance, childFactory, poolConfig, activationFeed, resMgrFactory, prewarmingConfigs)) /** Is called when an ActivationMessage is read from Kafka */ def processActivationMessage(bytes: Array[Byte]): Future[Unit] = { diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala index 98a7bd3b6f8..b64c891d8a2 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala @@ -34,6 +34,7 @@ import akka.testkit.ImplicitSender import akka.testkit.TestActorRef import akka.testkit.TestKit import akka.testkit.TestProbe +import common.StreamLogging import common.WhiskProperties import java.util.concurrent.atomic.AtomicInteger import org.apache.openwhisk.common.PrintStreamLogging @@ -59,7 +60,8 @@ class ContainerPoolTests with FlatSpecLike with Matchers with BeforeAndAfterAll - with MockFactory { + with MockFactory + with StreamLogging { override def afterAll = TestKit.shutdownActorSystem(system) @@ -134,7 +136,7 @@ class ContainerPoolTests val instanceId = InvokerInstanceId(0, userMemory = 1024.MB) behavior of "ContainerPool" - + val resMgrFactory = (pool: ActorRef) => new LocalContainerResourceManager(pool) /* * CONTAINER SCHEDULING * @@ -145,7 +147,8 @@ class ContainerPoolTests val (containers, factory) = testContainers(2) val feed = TestProbe() // Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled. - val pool = system.actorOf(ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref)) + val pool = system.actorOf( + ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref, resMgrFactory)) pool ! runMessage containers(0).expectMsg(runMessage) @@ -160,7 +163,8 @@ class ContainerPoolTests val (containers, factory) = testContainers(2) val feed = TestProbe() // Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled. - val pool = system.actorOf(ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref)) + val pool = system.actorOf( + ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref, resMgrFactory)) pool ! runMessage containers(0).expectMsg(runMessage) @@ -176,7 +180,8 @@ class ContainerPoolTests val feed = TestProbe() // Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled. - val pool = system.actorOf(ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref)) + val pool = system.actorOf( + ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref, resMgrFactory)) pool ! runMessage containers(0).expectMsg(runMessage) // Note that the container doesn't respond, thus it's not free to take work @@ -190,7 +195,8 @@ class ContainerPoolTests val feed = TestProbe() // a pool with only 1 slot - val pool = system.actorOf(ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory), feed.ref)) + val pool = system.actorOf( + ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory), feed.ref, resMgrFactory)) pool ! runMessage containers(0).expectMsg(runMessage) containers(0).send(pool, NeedWork(warmedData())) @@ -206,7 +212,7 @@ class ContainerPoolTests val feed = TestProbe() // a pool with slots for 2 actions with default memory limit. - val pool = system.actorOf(ContainerPool.props(instanceId, factory, poolConfig(512.MB), feed.ref)) + val pool = system.actorOf(ContainerPool.props(instanceId, factory, poolConfig(512.MB), feed.ref, resMgrFactory)) pool ! runMessage containers(0).expectMsg(runMessage) pool ! runMessageDifferentAction // 2 * stdMemory taken -> full @@ -228,7 +234,8 @@ class ContainerPoolTests val feed = TestProbe() // a pool with only 1 active slot but 2 slots in total - val pool = system.actorOf(ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory * 2), feed.ref)) + val pool = system.actorOf( + ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory * 2), feed.ref, resMgrFactory)) // Run the first container pool ! runMessage @@ -254,7 +261,8 @@ class ContainerPoolTests val feed = TestProbe() // a pool with only 1 slot - val pool = system.actorOf(ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory), feed.ref)) + val pool = system.actorOf( + ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory), feed.ref, resMgrFactory)) pool ! runMessage containers(0).expectMsg(runMessage) containers(0).send(pool, NeedWork(warmedData())) @@ -269,7 +277,8 @@ class ContainerPoolTests val feed = TestProbe() // a pool with only 1 slot - val pool = system.actorOf(ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory), feed.ref)) + val pool = system.actorOf( + ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory), feed.ref, resMgrFactory)) pool ! runMessage containers(0).expectMsg(runMessage) containers(0).send(pool, RescheduleJob) // emulate container failure ... @@ -282,7 +291,8 @@ class ContainerPoolTests val (containers, factory) = testContainers(2) val feed = TestProbe() - val pool = system.actorOf(ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory * 2), feed.ref)) + val pool = system.actorOf( + ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory * 2), feed.ref, resMgrFactory)) // Start first action pool ! runMessage // 1 * stdMemory taken @@ -316,7 +326,13 @@ class ContainerPoolTests val pool = system.actorOf( ContainerPool - .props(instanceId, factory, poolConfig(0.MB), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit)))) + .props( + instanceId, + factory, + poolConfig(0.MB), + feed.ref, + resMgrFactory, + List(PrewarmingConfig(1, exec, memoryLimit)))) containers(0).expectMsg(Start(exec, memoryLimit)) } @@ -332,6 +348,7 @@ class ContainerPoolTests factory, poolConfig(MemoryLimit.stdMemory), feed.ref, + resMgrFactory, List(PrewarmingConfig(1, exec, memoryLimit)))) containers(0).expectMsg(Start(exec, memoryLimit)) containers(0).send(pool, NeedWork(preWarmedData(exec.kind))) @@ -352,6 +369,7 @@ class ContainerPoolTests factory, poolConfig(MemoryLimit.stdMemory), feed.ref, + resMgrFactory, List(PrewarmingConfig(1, alternativeExec, memoryLimit)))) containers(0).expectMsg(Start(alternativeExec, memoryLimit)) // container0 was prewarmed containers(0).send(pool, NeedWork(preWarmedData(alternativeExec.kind))) @@ -373,6 +391,7 @@ class ContainerPoolTests factory, poolConfig(MemoryLimit.stdMemory), feed.ref, + resMgrFactory, List(PrewarmingConfig(1, exec, alternativeLimit)))) containers(0).expectMsg(Start(exec, alternativeLimit)) // container0 was prewarmed containers(0).send(pool, NeedWork(preWarmedData(exec.kind, alternativeLimit))) @@ -387,7 +406,8 @@ class ContainerPoolTests val (containers, factory) = testContainers(2) val feed = TestProbe() - val pool = system.actorOf(ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref)) + val pool = system.actorOf( + ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref, resMgrFactory)) // container0 is created and used pool ! runMessage @@ -417,7 +437,8 @@ class ContainerPoolTests // Pool with 512 MB usermemory val pool = - system.actorOf(ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory * 2), feed.ref)) + system.actorOf( + ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory * 2), feed.ref, resMgrFactory)) // Send action that blocks the pool pool ! runMessageLarge @@ -449,7 +470,8 @@ class ContainerPoolTests val feed = TestProbe() // Pool with 512 MB usermemory - val pool = system.actorOf(ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory * 2), feed.ref)) + val pool = system.actorOf( + ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory * 2), feed.ref, resMgrFactory)) // Send 4 actions to the ContainerPool (Action 0, Action 2 and Action 3 with each 265 MB and Action 1 with 512 MB) pool ! runMessage @@ -503,7 +525,8 @@ class ContainerPoolTests val (containers, factory) = testContainers(2) val feed = TestProbe() - val pool = system.actorOf(ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref)) + val pool = system.actorOf( + ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref, resMgrFactory)) // container0 is created and used pool ! runMessageConcurrent @@ -527,7 +550,8 @@ class ContainerPoolTests val (containers, factory) = testContainers(2) val feed = TestProbe() - val pool = system.actorOf(ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref)) + val pool = system.actorOf( + ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref, resMgrFactory)) // container0 is created and used pool ! runMessageConcurrent @@ -543,7 +567,8 @@ class ContainerPoolTests val (containers, factory) = testContainers(2) val feed = TestProbe() - val pool = system.actorOf(ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref)) + val pool = system.actorOf( + ContainerPool.props(instanceId, factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref, resMgrFactory)) // container0 is created and used pool ! runMessageConcurrent @@ -593,8 +618,8 @@ class ContainerPoolTests factory, poolConfig(MemoryLimit.stdMemory), feed.ref, - List(PrewarmingConfig(1, exec, memoryLimit)), - Some(resMgr))) + _ => resMgr, + List(PrewarmingConfig(1, exec, memoryLimit)))) //prewarms are not started immediately containers(0).expectNoMessage //prewarms must be started explicitly (e.g. by the ContainerResourceManager) @@ -637,8 +662,9 @@ class ContainerPoolTests factory, poolConfig(MemoryLimit.stdMemory), feed.ref, - List(PrewarmingConfig(3, exec, memoryLimit)), //configure 3 prewarms, but only allow 2 to start - Some(resMgr))) + _ => resMgr, + List(PrewarmingConfig(3, exec, memoryLimit)) //configure 3 prewarms, but only allow 2 to start + )) //prewarms are not started immediately containers(0).expectNoMessage @@ -668,8 +694,8 @@ class ContainerPoolTests factory, poolConfig(MemoryLimit.stdMemory), feed.ref, - List(PrewarmingConfig(1, exec, memoryLimit)), - Some(resMgr))) + _ => resMgr, + List(PrewarmingConfig(1, exec, memoryLimit)))) (resMgr .canLaunch(_: ByteSize, _: Long, _: ContainerPoolConfig, _: Boolean)) @@ -707,8 +733,8 @@ class ContainerPoolTests factory, poolConfig(MemoryLimit.stdMemory, idleGrace = idleGrace), feed.ref, - List(PrewarmingConfig(1, exec, memoryLimit)), - Some(resMgr))) + _ => resMgr, + List(PrewarmingConfig(1, exec, memoryLimit)))) val warmed = warmedData() (resMgr .canLaunch(_: ByteSize, _: Long, _: ContainerPoolConfig, _: Boolean)) @@ -818,8 +844,8 @@ class ContainerPoolTests factory, poolConfig(MemoryLimit.stdMemory, true), feed.ref, - List(PrewarmingConfig(1, exec, memoryLimit)), - Some(resMgr))) + _ => resMgr, + List(PrewarmingConfig(1, exec, memoryLimit)))) //these will get buffered since allowLaunch is false pool ! run1 @@ -858,8 +884,8 @@ class ContainerPoolTests factory, poolConfig(MemoryLimit.stdMemory, true), feed.ref, - List(PrewarmingConfig(1, exec, memoryLimit)), - Some(resMgr))) + _ => resMgr, + List(PrewarmingConfig(1, exec, memoryLimit)))) //these will get buffered since allowLaunch is false pool ! run1 @@ -885,8 +911,8 @@ class ContainerPoolTests factory, poolConfig(MemoryLimit.stdMemory), feed.ref, - List(PrewarmingConfig(1, exec, memoryLimit)), - Some(resMgr))) + _ => resMgr, + List(PrewarmingConfig(1, exec, memoryLimit)))) val warmed = warmedData() (resMgr .canLaunch(_: ByteSize, _: Long, _: ContainerPoolConfig, _: Boolean)) @@ -923,8 +949,8 @@ class ContainerPoolTests factory, poolConfig(MemoryLimit.stdMemory), feed.ref, - List(PrewarmingConfig(1, exec, memoryLimit)), - Some(resMgr))) + _ => resMgr, + List(PrewarmingConfig(1, exec, memoryLimit)))) (resMgr .canLaunch(_: ByteSize, _: Long, _: ContainerPoolConfig, _: Boolean)) .expects(memoryLimit, 0, *, false) @@ -958,8 +984,8 @@ class ContainerPoolTests factory, poolConfig(MemoryLimit.stdMemory, idleGrace = idleGrace), feed.ref, - List(PrewarmingConfig(1, exec, memoryLimit)), - Some(resMgr))) + _ => resMgr, + List(PrewarmingConfig(1, exec, memoryLimit)))) val warmed = warmedData() //will only be released if lastUsed is after idleGrace (resMgr @@ -1026,8 +1052,8 @@ class ContainerPoolTests factory, poolConfig(MemoryLimit.stdMemory, idleGrace = idleGrace), feed.ref, - List(PrewarmingConfig(1, exec, memoryLimit)), - Some(resMgr))) + _ => resMgr, + List(PrewarmingConfig(1, exec, memoryLimit)))) val warmed = warmedData(lastUsed = Instant.now().minusSeconds(idleGrace.toSeconds - 3)) //will only be released if lastUsed is after idleGrace pool ! runMessageConcurrent @@ -1069,8 +1095,8 @@ class ContainerPoolTests factory, poolConfig(MemoryLimit.stdMemory, true), feed.ref, - List(PrewarmingConfig(1, exec, memoryLimit)), - Some(resMgr))) + _ => resMgr, + List(PrewarmingConfig(1, exec, memoryLimit)))) val run1 = createRunMessage(action, invocationNamespace) val run2 = createRunMessage(action, invocationNamespace)