diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaClusterContainerResourceManager.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaClusterContainerResourceManager.scala index dd9ea1fbbc9..fc34a2fbe76 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaClusterContainerResourceManager.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaClusterContainerResourceManager.scala @@ -186,12 +186,13 @@ class AkkaClusterContainerResourceManager(system: ActorSystem, implicit val ec = context.dispatcher val mediator = DistributedPubSub(system).mediator val replicator = DistributedData(system).replicator + implicit val myAddress = DistributedData(system).selfUniqueAddress mediator ! Put(self) //allow point to point messaging based on the actor name: use Send(/user/) to send messages to me in the cluster //subscribe to invoker ids changes (need to setup additional keys based on each invoker arriving) replicator ! Subscribe(InvokerIdsKey, self) //add this invoker to ids list - replicator ! Update(InvokerIdsKey, ORSet.empty[Int], WriteLocal)(_ + (myId)) + replicator ! Update(InvokerIdsKey, ORSet.empty[Int], WriteLocal)(_ :+ (myId)) logging.info(this, "subscribing to NodeStats updates") system.eventStream.subscribe(self, classOf[NodeStatsUpdate]) @@ -212,7 +213,7 @@ class AkkaClusterContainerResourceManager(system: ActorSystem, private def cleanup() = { //remove this invoker from ids list logging.info(this, s"stopping invoker ${myId}") - replicator ! Update(InvokerIdsKey, ORSet.empty[Int], WriteLocal)(_ - myId) + replicator ! Update(InvokerIdsKey, ORSet.empty[Int], WriteLocal)(_.remove(myId)) } override def receive: Receive = { @@ -224,8 +225,8 @@ class AkkaClusterContainerResourceManager(system: ActorSystem, logging.info( this, s"invoker ${myId} (self) has ${reservations.size} reservations (${reservations.map(_.size.toMB).sum}MB)") - replicator ! Update(myReservationsKey, LWWRegister[List[Reservation]](List.empty), WriteLocal)(reg => - reg.withValue(reservations)) + replicator ! Update(myReservationsKey, LWWRegister[List[Reservation]](myAddress, List.empty), WriteLocal)( + reg => reg.withValueOf(reservations)) } //update this invokers idles seen by other invokers; including only idles past the idleGrace period val idleGraceInstant = Instant.now().minusSeconds(poolConfig.clusterManagedIdleGrace.toSeconds) @@ -238,8 +239,8 @@ class AkkaClusterContainerResourceManager(system: ActorSystem, logging.info( this, s"invoker ${myId} (self) has ${lastUnused.size} unused (${lastUnused.map(_.size.toMB).sum}MB)") - replicator ! Update(myUnusedKey, LWWRegister[List[RemoteContainerRef]](List.empty), WriteLocal)(reg => - reg.withValue(idles)) + replicator ! Update(myUnusedKey, LWWRegister[List[RemoteContainerRef]](myAddress, List.empty), WriteLocal)( + reg => reg.withValueOf(idles)) } case UpdateSuccess => //nothing (normal behavior) case f: UpdateFailure[_] => //log the failure