Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
tysonnorris committed Jun 28, 2019
1 parent f0faa87 commit 6d17424
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,25 +59,21 @@ case object EmitMetrics
* @param feed actor to request more work from
* @param prewarmConfig optional settings for container prewarming
* @param poolConfig config for the ContainerPool
* @param resMgr ContainerResourceManager impl
* @param resMgrFactory factory for ContainerResourceManager impl
*/
class ContainerPool(instanceId: InvokerInstanceId,
childFactory: ActorRefFactory => ActorRef,
feed: ActorRef,
prewarmConfig: List[PrewarmingConfig] = List.empty,
poolConfig: ContainerPoolConfig,
resMgr: Option[ContainerResourceManager])
resMgrFactory: (ActorRef) => ContainerResourceManager)
extends Actor {
import ContainerPool.memoryConsumptionOf

implicit val logging = new AkkaLogging(context.system.log)
implicit val ec = context.dispatcher

val resourceManager = resMgr.getOrElse(if (poolConfig.clusterManagedResources) {
new AkkaClusterContainerResourceManager(context.system, instanceId, self, poolConfig)
} else {
new LocalContainerResourceManager(self)
})
val resourceManager = resMgrFactory(self)

var freePool = immutable.Map.empty[ActorRef, ContainerData]
var busyPool = immutable.Map.empty[ActorRef, ContainerData]
Expand Down Expand Up @@ -113,9 +109,9 @@ class ContainerPool(instanceId: InvokerInstanceId,
r.msg.transid.mark(
this,
LoggingMarkers.INVOKER_CONTAINER_START(containerState),
s"containerStart containerState: $containerState container: $container activations: $activeActivations of max $maxConcurrent action: $actionName namespace: $namespaceName activationId: $activationId",
s"containerStart containerState: $containerState container: $container activations: $activeActivations of max $maxConcurrent action: $actionName namespace: $namespaceName activationId: $activationId ${resourceManager
.activationStartLogMessage()}",
akka.event.Logging.InfoLevel)
resourceManager.activationStartLogMessage()
}

def receive: Receive = {
Expand All @@ -133,15 +129,13 @@ class ContainerPool(instanceId: InvokerInstanceId,
// next request to process
// It is guaranteed, that only the first message on the buffer is resent.
if (runBuffer.isEmpty || isResentFromBuffer) {
val warmContainer = ContainerPool
.schedule(r.action, r.msg.user.namespace.name, freePool)
val createdContainer =
// Is there enough space on the invoker (or the cluster manager) for this action to be executed.
if (poolConfig.clusterManagedResources || hasPoolSpaceFor(
poolConfig,
busyPool,
r.action.limits.memory.megabytes.MB)) {
if (warmContainer.isDefined || hasPoolSpaceFor(poolConfig, busyPool, r.action.limits.memory.megabytes.MB)) {
// Schedule a job to a warm container
ContainerPool
.schedule(r.action, r.msg.user.namespace.name, freePool)
warmContainer
.map(container => (container, container._2.initingState)) //warmed, warming, and warmingCold always know their state
.orElse(
// There was no warm/warming/warmingCold container. Try to take a prewarm container or a cold container.
Expand Down Expand Up @@ -245,10 +239,6 @@ class ContainerPool(instanceId: InvokerInstanceId,
// Add this request to the buffer, as it is not there yet.
runBuffer = runBuffer.enqueue(r)
}
if (!poolConfig.clusterManagedResources) {
// As this request is the first one in the buffer, try again to execute it.
self ! Run(r.action, r.msg, retryLogDeadline)
} //cannot do this in cluster managed resources, since it will introduce a tight loop
}
} else {
// There are currently actions waiting to be executed before this action gets executed.
Expand Down Expand Up @@ -309,7 +299,7 @@ class ContainerPool(instanceId: InvokerInstanceId,
processBuffer()
sender()
}
//if container was neither free or busy,
//if container was neither free or busy, (should never happen, just being defensive)
if (foundFree.orElse(foundBusy).isEmpty) {
processBuffer()
}
Expand Down Expand Up @@ -342,43 +332,23 @@ class ContainerPool(instanceId: InvokerInstanceId,
//remove each ref, IFF it is still not in use, and has not been used for the idle grade period
ContainerPool.findIdlesToRemove(poolConfig.clusterManagedIdleGrace, freePool, refs).foreach(removeContainer)
case EmitMetrics =>
logging.info(
this,
s"metrics invoker (self) has ${runBuffer.size} buffered (${runBuffer.map(_.action.limits.memory.megabytes).sum}MB)")

MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_RUNBUFFER_COUNT, runBuffer.size)
MetricEmitter.emitGaugeMetric(
LoggingMarkers.CONTAINER_POOL_RUNBUFFER_SIZE,
runBuffer.map(_.action.limits.memory.megabytes).sum)
val containersInUse = inUse
MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_ACTIVE_COUNT, containersInUse.size)
MetricEmitter.emitGaugeMetric(
LoggingMarkers.CONTAINER_POOL_ACTIVE_SIZE,
containersInUse.map(_._2.memoryLimit.toMB).sum)
MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_COUNT, prewarmedPool.size)
MetricEmitter.emitGaugeMetric(
LoggingMarkers.CONTAINER_POOL_PREWARM_SIZE,
prewarmedPool.map(_._2.memoryLimit.toMB).sum)
emitMetrics()
}

/** Buffer processing in cluster managed resources means to send the first item in runBuffer;
* In non-clustered case, it means signalling MessageFeed (since runBuffer is processed in tight loop).
* */
def processBuffer() = {
if (poolConfig.clusterManagedResources) {
//if next runbuffer item has not already been resent, send it
runBuffer.dequeueOption match {
case Some((run, _)) => //run the first from buffer
//avoid sending dupes
if (!resent.contains(run.msg.activationId)) {
resent = resent + run.msg.activationId
self ! run
}
case None => //feed me!
feed ! MessageFeed.Processed
}
} else { //TODO: should this ever be used?
feed ! MessageFeed.Processed
//if next runbuffer item has not already been resent, send it
runBuffer.dequeueOption match {
case Some((run, _)) => //run the first from buffer
//avoid sending dupes
if (!resent.contains(run.msg.activationId)) {
resent = resent + run.msg.activationId
self ! run
}
case None => //feed me!
feed ! MessageFeed.Processed
}
}

Expand Down Expand Up @@ -461,6 +431,26 @@ class ContainerPool(instanceId: InvokerInstanceId,
val unused = freePool.filter(_._2.activeActivationCount == 0)
resourceManager.updateUnused(unused)
}

def emitMetrics() = {
logging.info(
this,
s"metrics invoker (self) has ${runBuffer.size} buffered (${runBuffer.map(_.action.limits.memory.megabytes).sum}MB)")

MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_RUNBUFFER_COUNT, runBuffer.size)
MetricEmitter.emitGaugeMetric(
LoggingMarkers.CONTAINER_POOL_RUNBUFFER_SIZE,
runBuffer.map(_.action.limits.memory.megabytes).sum)
val containersInUse = inUse
MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_ACTIVE_COUNT, containersInUse.size)
MetricEmitter.emitGaugeMetric(
LoggingMarkers.CONTAINER_POOL_ACTIVE_SIZE,
containersInUse.map(_._2.memoryLimit.toMB).sum)
MetricEmitter.emitGaugeMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_COUNT, prewarmedPool.size)
MetricEmitter.emitGaugeMetric(
LoggingMarkers.CONTAINER_POOL_PREWARM_SIZE,
prewarmedPool.map(_._2.memoryLimit.toMB).sum)
}
}

object ContainerPool {
Expand Down Expand Up @@ -597,9 +587,9 @@ object ContainerPool {
factory: ActorRefFactory => ActorRef,
poolConfig: ContainerPoolConfig,
feed: ActorRef,
prewarmConfig: List[PrewarmingConfig] = List.empty,
resMgr: Option[ContainerResourceManager] = None) =
Props(new ContainerPool(instanceId, factory, feed, prewarmConfig, poolConfig, resMgr))
resMgrFactory: ActorRef => ContainerResourceManager,
prewarmConfig: List[PrewarmingConfig] = List.empty) =
Props(new ContainerPool(instanceId, factory, feed, prewarmConfig, poolConfig, resMgrFactory))
}

/** Contains settings needed to perform container prewarming. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.openwhisk.core.invoker

import akka.Done
import akka.actor.CoordinatedShutdown
import akka.actor.ActorRef
import java.nio.charset.StandardCharsets
import java.time.Instant

Expand All @@ -39,7 +38,6 @@ import org.apache.openwhisk.http.Messages
import org.apache.openwhisk.spi.SpiLoader
import pureconfig._
import spray.json._

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
Expand Down Expand Up @@ -207,8 +205,14 @@ class InvokerReactive(
}.toList
}

val resMgrFactory = if (poolConfig.clusterManagedResources) { pool: ActorRef =>
new AkkaClusterContainerResourceManager(actorSystem, instance, pool, poolConfig)
} else { pool: ActorRef =>
new LocalContainerResourceManager(pool)
}
private val pool =
actorSystem.actorOf(ContainerPool.props(instance, childFactory, poolConfig, activationFeed, prewarmingConfigs))
actorSystem.actorOf(
ContainerPool.props(instance, childFactory, poolConfig, activationFeed, resMgrFactory, prewarmingConfigs))

/** Is called when an ActivationMessage is read from Kafka */
def processActivationMessage(bytes: Array[Byte]): Future[Unit] = {
Expand Down
Loading

0 comments on commit 6d17424

Please sign in to comment.