From d89a24d6ef64ba401d4f3e4c123652f8d7524aa6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=9Cst=C3=BCn=20Ergenoglu?= Date: Mon, 1 Aug 2022 01:36:01 +0300 Subject: [PATCH] Stop running actors on graceful shutdown --- cluster/identitylookup/disthash/manager.go | 6 ++++++ cluster/identitylookup/disthash/placement_actor.go | 14 ++++++++++++++ cluster/identitylookup/partition/manager.go | 6 ++++++ .../identitylookup/partition/placement_actor.go | 14 ++++++++++++++ 4 files changed, 40 insertions(+) diff --git a/cluster/identitylookup/disthash/manager.go b/cluster/identitylookup/disthash/manager.go index fc6f3487b..7a3653d40 100644 --- a/cluster/identitylookup/disthash/manager.go +++ b/cluster/identitylookup/disthash/manager.go @@ -46,6 +46,12 @@ func (pm *Manager) Start() { func (pm *Manager) Stop() { system := pm.cluster.ActorSystem system.EventStream.Unsubscribe(pm.topologySub) + + err := system.Root.PoisonFuture(pm.placementActor).Wait() + if err != nil { + plog.Error("Failed to shutdown partition placement actor", log.Error(err)) + } + plog.Info("Stopped PartitionManager") } diff --git a/cluster/identitylookup/disthash/placement_actor.go b/cluster/identitylookup/disthash/placement_actor.go index a19c65483..cec19d797 100644 --- a/cluster/identitylookup/disthash/placement_actor.go +++ b/cluster/identitylookup/disthash/placement_actor.go @@ -29,6 +29,11 @@ func (p *placementActor) Receive(ctx actor.Context) { switch msg := ctx.Message().(type) { case *actor.Started: plog.Info("Placement actor started") + case *actor.Stopping: + plog.Info("Placement actor stopping") + p.onStopping(ctx) + case *actor.Stopped: + plog.Info("Placement actor stopped") case *actor.Terminated: p.onTerminated(msg, ctx) case *clustering.ActivationRequest: @@ -54,6 +59,15 @@ func (p *placementActor) onTerminated(msg *actor.Terminated, ctx actor.Context) } } +func (p *placementActor) onStopping(ctx actor.Context) { + for _, meta := range p.actors { + err := ctx.PoisonFuture(meta.PID).Wait() + if err != nil { + plog.Error("Failed to poison actor", log.String("identity", meta.ID.Identity), log.Error(err)) + } + } +} + func (p *placementActor) onActivationRequest(msg *clustering.ActivationRequest, ctx actor.Context) { key := msg.ClusterIdentity.AsKey() meta, found := p.actors[key] diff --git a/cluster/identitylookup/partition/manager.go b/cluster/identitylookup/partition/manager.go index 1d1220f89..10617801f 100644 --- a/cluster/identitylookup/partition/manager.go +++ b/cluster/identitylookup/partition/manager.go @@ -52,6 +52,12 @@ func (pm *Manager) Start() { func (pm *Manager) Stop() { system := pm.cluster.ActorSystem system.EventStream.Unsubscribe(pm.topologySub) + + err := system.Root.PoisonFuture(pm.placementActor).Wait() + if err != nil { + plog.Error("Failed to shutdown partition placement actor", log.Error(err)) + } + plog.Info("Stopped PartitionManager") } diff --git a/cluster/identitylookup/partition/placement_actor.go b/cluster/identitylookup/partition/placement_actor.go index fa4cfc2c1..641bb3760 100644 --- a/cluster/identitylookup/partition/placement_actor.go +++ b/cluster/identitylookup/partition/placement_actor.go @@ -27,6 +27,11 @@ func newPlacementActor(c *clustering.Cluster, pm *Manager) *placementActor { func (p *placementActor) Receive(ctx actor.Context) { switch msg := ctx.Message().(type) { + case *actor.Stopping: + plog.Info("Placement actor stopping") + p.onStopping(ctx) + case *actor.Stopped: + plog.Info("Placement actor stopped") case *actor.Terminated: p.onTerminated(msg, ctx) case *clustering.IdentityHandoverRequest: @@ -52,6 +57,15 @@ func (p *placementActor) onTerminated(msg *actor.Terminated, ctx actor.Context) } } +func (p *placementActor) onStopping(ctx actor.Context) { + for _, meta := range p.actors { + err := ctx.PoisonFuture(meta.PID).Wait() + if err != nil { + plog.Error("Failed to poison actor", log.String("identity", meta.ID.Identity), log.Error(err)) + } + } +} + // this is pure, we do not change any state or actually move anything // the requester also provide its own view of the world in terms of members // TLDR; we are not using any topology state from this actor itself