Skip to content

Commit

Permalink
Merge pull request asynkron#700 from rgngl/terminate-grains-on-gracef…
Browse files Browse the repository at this point in the history
…ul-shutdown

Stop running actors on graceful shutdown
  • Loading branch information
rogeralsing authored Oct 2, 2022
2 parents 34209ec + d89a24d commit 880b460
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 0 deletions.
6 changes: 6 additions & 0 deletions cluster/identitylookup/disthash/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ func (pm *Manager) Start() {
func (pm *Manager) Stop() {
system := pm.cluster.ActorSystem
system.EventStream.Unsubscribe(pm.topologySub)

err := system.Root.PoisonFuture(pm.placementActor).Wait()
if err != nil {
plog.Error("Failed to shutdown partition placement actor", log.Error(err))
}

plog.Info("Stopped PartitionManager")
}

Expand Down
14 changes: 14 additions & 0 deletions cluster/identitylookup/disthash/placement_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ func (p *placementActor) Receive(ctx actor.Context) {
switch msg := ctx.Message().(type) {
case *actor.Started:
plog.Info("Placement actor started")
case *actor.Stopping:
plog.Info("Placement actor stopping")
p.onStopping(ctx)
case *actor.Stopped:
plog.Info("Placement actor stopped")
case *actor.Terminated:
p.onTerminated(msg, ctx)
case *clustering.ActivationRequest:
Expand All @@ -54,6 +59,15 @@ func (p *placementActor) onTerminated(msg *actor.Terminated, ctx actor.Context)
}
}

func (p *placementActor) onStopping(ctx actor.Context) {
for _, meta := range p.actors {
err := ctx.PoisonFuture(meta.PID).Wait()
if err != nil {
plog.Error("Failed to poison actor", log.String("identity", meta.ID.Identity), log.Error(err))
}
}
}

func (p *placementActor) onActivationRequest(msg *clustering.ActivationRequest, ctx actor.Context) {
key := msg.ClusterIdentity.AsKey()
meta, found := p.actors[key]
Expand Down
6 changes: 6 additions & 0 deletions cluster/identitylookup/partition/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ func (pm *Manager) Start() {
func (pm *Manager) Stop() {
system := pm.cluster.ActorSystem
system.EventStream.Unsubscribe(pm.topologySub)

err := system.Root.PoisonFuture(pm.placementActor).Wait()
if err != nil {
plog.Error("Failed to shutdown partition placement actor", log.Error(err))
}

plog.Info("Stopped PartitionManager")
}

Expand Down
14 changes: 14 additions & 0 deletions cluster/identitylookup/partition/placement_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ func newPlacementActor(c *clustering.Cluster, pm *Manager) *placementActor {

func (p *placementActor) Receive(ctx actor.Context) {
switch msg := ctx.Message().(type) {
case *actor.Stopping:
plog.Info("Placement actor stopping")
p.onStopping(ctx)
case *actor.Stopped:
plog.Info("Placement actor stopped")
case *actor.Terminated:
p.onTerminated(msg, ctx)
case *clustering.IdentityHandoverRequest:
Expand All @@ -52,6 +57,15 @@ func (p *placementActor) onTerminated(msg *actor.Terminated, ctx actor.Context)
}
}

func (p *placementActor) onStopping(ctx actor.Context) {
for _, meta := range p.actors {
err := ctx.PoisonFuture(meta.PID).Wait()
if err != nil {
plog.Error("Failed to poison actor", log.String("identity", meta.ID.Identity), log.Error(err))
}
}
}

// this is pure, we do not change any state or actually move anything
// the requester also provide its own view of the world in terms of members
// TLDR; we are not using any topology state from this actor itself
Expand Down

0 comments on commit 880b460

Please sign in to comment.