Skip to content

Commit

Permalink
"invoker clustered resources"
Browse files Browse the repository at this point in the history
  • Loading branch information
tysonnorris committed Mar 5, 2019
1 parent a08175f commit 43f9028
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ case class ContainerArgsConfig(network: String,
dnsOptions: Seq[String] = Seq.empty,
extraArgs: Map[String, Set[String]] = Map.empty)

case class ContainerPoolConfig(userMemory: ByteSize, concurrentPeekFactor: Double, akkaClient: Boolean) {
case class ContainerPoolConfig(userMemory: ByteSize,
concurrentPeekFactor: Double,
akkaClient: Boolean,
clusterManagedResources: Boolean) {
require(
concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0,
s"concurrentPeekFactor must be > 0 and <= 1.0; was $concurrentPeekFactor")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.utils

/** NodeStats captures resource potential availability within a cluster. */
case class NodeStats(mem: Double, cpu: Double, ports: Int)
case class NodeStatsUpdate(stats: Map[String, NodeStats])
5 changes: 3 additions & 2 deletions core/invoker/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ whisk {

container-pool {
user-memory: 1024 m
concurrent-peek-factor: 0.5 #factor used to limit message peeking: 0 < factor <= 1.0 - larger number improves concurrent processing, but increases risk of message loss during invoker crash
akka-client: false # if true, use PoolingContainerClient for HTTP from invoker to action container (otherwise use ApacheBlockingContainerClient)
concurrent-peek-factor: 0.5 # Factor used to limit message peeking: 0 < factor <= 1.0 - larger number improves concurrent processing, but increases risk of message loss during invoker crash
akka-client: false # If true, use PoolingContainerClient for HTTP from invoker to action container (otherwise use ApacheBlockingContainerClient)
cluster-managed-resources: false # If false, container-pool.user-memory is used to determine pool capacity to launch containers, otherwise, ContainerPool.clusterCanLaunch() is used
}

kubernetes {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ import org.apache.openwhisk.common.{AkkaLogging, LoggingMarkers, TransactionId}
import org.apache.openwhisk.core.connector.MessageFeed
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.utils.NodeStats
import org.apache.openwhisk.utils.NodeStatsUpdate
import scala.collection.immutable
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration._
import scala.util.Try

Expand All @@ -32,6 +35,15 @@ case object Free extends WorkerState

case class WorkerData(data: ContainerData, state: WorkerState)

/**
* Reservation indicates resources allocated to container, but possibly not launched yet. May be negative for container stop.
* Pending: Allocated from this point of view, but not yet started/stopped by cluster manager.
* Scheduled: Started/Stopped by cluster manager, but not yet reflected in NodeStats, so must still be considered when allocating resources.
* */
sealed abstract class Reservation(val size: Long)
case class Pending(override val size: Long) extends Reservation(size)
case class Scheduled(override val size: Long) extends Reservation(size)

/**
* A pool managing containers to run actions on.
*
Expand Down Expand Up @@ -70,14 +82,30 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
var runBuffer = immutable.Queue.empty[Run]
val logMessageInterval = 10.seconds

prewarmConfig.foreach { config =>
logging.info(this, s"pre-warming ${config.count} ${config.exec.kind} ${config.memoryLimit.toString}")(
TransactionId.invokerWarmup)
(1 to config.count).foreach { _ =>
prewarmContainer(config.exec, config.memoryLimit)
/** cluster state tracking */
private var clusterReservations: Map[ActorRef, Reservation] = Map.empty
var clusterActionHostStats = Map.empty[String, NodeStats] //track the most recent node stats per action host (host that is able to run action containers)

var prewarmsInitialized = false

def initPrewarms() = {
prewarmConfig.foreach { config =>
logging.info(this, s"pre-warming ${config.count} ${config.exec.kind} ${config.memoryLimit.toString}")(
TransactionId.invokerWarmup)
(1 to config.count).foreach { _ =>
prewarmContainer(config.exec, config.memoryLimit)
}
}
}

//if cluster managed resources, subscribe to events
if (poolConfig.clusterManagedResources) {
logging.info(this, "subscribing to NodeStats updates")
context.system.eventStream.subscribe(self, classOf[NodeStatsUpdate])
} else {
initPrewarms()
}

def logContainerStart(r: Run, containerState: String, activeActivations: Int, container: Option[Container]): Unit = {
val namespaceName = r.msg.user.namespace.name
val actionName = r.action.name.name
Expand All @@ -89,6 +117,13 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
LoggingMarkers.INVOKER_CONTAINER_START(containerState),
s"containerStart containerState: $containerState container: $container activations: $activeActivations of max $maxConcurrent action: $actionName namespace: $namespaceName activationId: $activationId",
akka.event.Logging.InfoLevel)
if (poolConfig.clusterManagedResources) {
logging.info(
this,
s"node stats ${clusterActionHostStats} reserved ${clusterReservations.size} containers ${reservedSize}MB " +
s"${reservedStartCount} pending starts ${reservedStopCount} pending stops " +
s"${scheduledStartCount} scheduled starts ${scheduledStopCount} scheduled stops")
}
}

def receive: Receive = {
Expand All @@ -98,6 +133,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
// their requests and send them back to the pool for rescheduling (this may happen if "docker" operations
// fail for example, or a container has aged and was destroying itself when a new request was assigned)
case r: Run =>
implicit val tid: TransactionId = r.msg.transid
// Check if the message is resent from the buffer. Only the first message on the buffer can be resent.
val isResentFromBuffer = runBuffer.nonEmpty && runBuffer.dequeueOption.exists(_._1.msg == r.msg)

Expand All @@ -106,8 +142,11 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
// It is guaranteed, that only the first message on the buffer is resent.
if (runBuffer.isEmpty || isResentFromBuffer) {
val createdContainer =
// Is there enough space on the invoker for this action to be executed.
if (hasPoolSpaceFor(busyPool, r.action.limits.memory.megabytes.MB)) {
// Is there enough space on the invoker (or the cluster manager) for this action to be executed.
if (poolConfig.clusterManagedResources || hasPoolSpaceFor(
poolConfig,
busyPool,
r.action.limits.memory.megabytes.MB)) {
// Schedule a job to a warm container
ContainerPool
.schedule(r.action, r.msg.user.namespace.name, freePool)
Expand All @@ -116,7 +155,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
// There was no warm/warming/warmingCold container. Try to take a prewarm container or a cold container.

// Is there enough space to create a new container or do other containers have to be removed?
if (hasPoolSpaceFor(busyPool ++ freePool, r.action.limits.memory.megabytes.MB)) {
if (hasPoolSpaceFor(poolConfig, busyPool ++ freePool, r.action.limits.memory.megabytes.MB)) {
takePrewarmContainer(r.action)
.map(container => (container, "prewarmed"))
.orElse(Some(createContainer(r.action.limits.memory.megabytes.MB), "cold"))
Expand All @@ -125,8 +164,18 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
// Remove a container and create a new one for the given job
ContainerPool
// Only free up the amount, that is really needed to free up
.remove(freePool, Math.min(r.action.limits.memory.megabytes, memoryConsumptionOf(freePool)).MB)
.map(removeContainer)
.remove(
freePool,
if (!poolConfig.clusterManagedResources) { //do not allow overprovision when cluster manages resources
Math.min(r.action.limits.memory.megabytes, memoryConsumptionOf(freePool)).MB
} else {
r.action.limits.memory.megabytes.MB
})
.map { a =>
//decrease the reserved (not yet stopped container) memory tracker
addReservation(a, -r.action.limits.memory.megabytes)
removeContainer(a)
}
// If the list had at least one entry, enough containers were removed to start the new container. After
// removing the containers, we are not interested anymore in the containers that have been removed.
.headOption
Expand Down Expand Up @@ -228,10 +277,15 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,

// Container is prewarmed and ready to take work
case NeedWork(data: PreWarmedData) =>
//stop tracking via reserved
releaseReservation(sender())
prewarmedPool = prewarmedPool + (sender() -> data)

// Container got removed
case ContainerRemoved =>
//stop tracking via reserved
releaseReservation(sender())

// if container was in free pool, it may have been processing (but under capacity),
// so there is capacity to accept another job request
freePool.get(sender()).foreach { f =>
Expand All @@ -250,23 +304,70 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
// 1. Container errored while resuming a warm container, could not process the job, and sent the job back
// 2. The container aged, is destroying itself, and was assigned a job which it had to send back
// 3. The container aged and is destroying itself
// 4. The container is paused, and being removed to make room for new containers
// Update the free/busy lists but no message is sent to the feed since there is no change in capacity yet
case RescheduleJob =>
releaseReservation(sender())
freePool = freePool - sender()
busyPool = busyPool - sender()

case NodeStatsUpdate(stats) =>
//clear Scheduled reservations (leave Pending) IFF stats are changing (in case of residual stats that have not accommodated the reservations' impacts)
pruneReserved(stats)
logging.info(
this,
s"received node stats ${stats} reserved/scheduled ${clusterReservations.size} containers ${reservedSize}MB")
clusterActionHostStats = stats
if (!prewarmsInitialized) { //we assume that when stats are received, we should startup prewarm containers
prewarmsInitialized = true
logging.info(this, "initializing prewarmpool after stats recevied")
initPrewarms()
}

case ContainerStarted => //only used for receiving post-start from cold container
//stop tracking via reserved
releaseReservation(sender())
}

/** reservation adjustments */
def addReservation(ref: ActorRef, size: Long): Unit = {
clusterReservations = clusterReservations + (ref -> Pending(size))
}
def releaseReservation(ref: ActorRef): Unit = {
clusterReservations.get(ref).foreach { r =>
clusterReservations = clusterReservations + (ref -> Scheduled(r.size))
}
}
def pruneReserved(newStats: Map[String, NodeStats]) = {
//don't prune until all nodes have updated stats
if (clusterActionHostStats != newStats && clusterActionHostStats.keySet.subsetOf(newStats.keySet)) {
clusterReservations = clusterReservations.collect { case (key, p: Pending) => (key, p) }
}
}

/** Creates a new container and updates state accordingly. */
def createContainer(memoryLimit: ByteSize): (ActorRef, ContainerData) = {
val ref = childFactory(context)
val data = MemoryData(memoryLimit)
//increase the reserved (not yet started container) memory tracker
addReservation(ref, memoryLimit.toMB)
freePool = freePool + (ref -> data)
ref -> data
}

/** Creates a new prewarmed container */
def prewarmContainer(exec: CodeExec[_], memoryLimit: ByteSize): Unit =
childFactory(context) ! Start(exec, memoryLimit)
def prewarmContainer(exec: CodeExec[_], memoryLimit: ByteSize): Unit = {
//when using cluster managed resources, we can only create prewarms when allowed by the cluster
if (!poolConfig.clusterManagedResources || hasPoolSpaceFor(poolConfig, freePool, memoryLimit)(
TransactionId.invokerWarmup)) {
val ref = childFactory(context)
ref ! Start(exec, memoryLimit)
//increase the reserved (not yet started container) memory tracker
addReservation(ref, memoryLimit.toMB)
} else {
logging.warn(this, "cannot create additional prewarm")
}
}

/**
* Takes a prewarm container out of the prewarmed pool
Expand Down Expand Up @@ -306,12 +407,79 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
/**
* Calculate if there is enough free memory within a given pool.
*
* @param poolConfig The ContainerPoolConfig, which indicates who governs the resource accounting
* @param pool The pool, that has to be checked, if there is enough free memory.
* @param memory The amount of memory to check.
* @param reserve If true, then during check of cluster managed resources, check will attempt to reserve resources
* @return true, if there is enough space for the given amount of memory.
*/
def hasPoolSpaceFor[A](pool: Map[A, ContainerData], memory: ByteSize): Boolean = {
memoryConsumptionOf(pool) + memory.toMB <= poolConfig.userMemory.toMB
def hasPoolSpaceFor[A](poolConfig: ContainerPoolConfig, pool: Map[A, ContainerData], memory: ByteSize)(
implicit tid: TransactionId): Boolean = {
if (poolConfig.clusterManagedResources) {
clusterCanLaunch(memory)
} else {
memoryConsumptionOf(pool) + memory.toMB <= poolConfig.userMemory.toMB
}
}

def reservedSize = clusterReservations.values.collect { case p: Pending => p.size }.sum
def reservedStartCount = clusterReservations.values.count {
case p: Pending => p.size >= 0
case _ => false
}
def reservedStopCount = clusterReservations.values.count {
case p: Pending => p.size < 0
case _ => false
}
def scheduledStartCount = clusterReservations.values.count {
case p: Scheduled => p.size >= 0
case _ => false
}
def scheduledStopCount = clusterReservations.values.count {
case p: Scheduled => p.size < 0
case _ => false
}

private var clusterResourcesAvailable
: Boolean = false //track to log whenever there is a switch from cluster resources being available to not being available

def clusterCanLaunch(memory: ByteSize)(implicit tid: TransactionId): Boolean = {
//make sure there is at least one node with unreerved mem > memory
val canLaunch = clusterHasPotentialMemoryCapacity(memory.toMB, clusterReservations.values.map(_.size).toList) //consider all reservations blocking till they are removed during NodeStatsUpdate
//log only when changing value
if (canLaunch != clusterResourcesAvailable) {
if (canLaunch) {
logging.info(
this,
s"cluster can launch action with ${memory.toMB}MB reserved:${reservedSize} freepool: ${memoryConsumptionOf(freePool)}")
} else {
logging.warn(
this,
s"cluster cannot launch action with ${memory.toMB}MB reserved:${reservedSize} freepool: ${memoryConsumptionOf(freePool)}")
}
}
clusterResourcesAvailable = canLaunch
canLaunch
}

/** Return true to indicate there is expectation that there is "room" to launch a task with these memory/cpu/ports specs */
def clusterHasPotentialMemoryCapacity(memory: Double, reserve: List[Long]): Boolean = {
//copy AgentStats, then deduct pending tasks
var availableResources = clusterActionHostStats.toList.sortBy(_._2.mem).toMap //sort by mem to match lowest value
val inNeedReserved = ListBuffer.empty ++ reserve

var unmatched = 0

inNeedReserved.foreach { p =>
//for each pending find an available offer that fits
availableResources.find(_._2.mem > p) match {
case Some(o) =>
availableResources = availableResources + (o._1 -> o._2.copy(mem = o._2.mem - p))
inNeedReserved -= p
case None => unmatched += 1
}
}
unmatched == 0 && availableResources.exists(_._2.mem > memory)
}
}

Expand Down
Loading

0 comments on commit 43f9028

Please sign in to comment.