Skip to content

Commit

Permalink
fix: shopware messengers are orphaned on crash or SIGINT
Browse files Browse the repository at this point in the history
  • Loading branch information
pkramme authored and shyim committed Aug 2, 2024
1 parent b2c8cb2 commit 55f0b12
Showing 1 changed file with 35 additions and 4 deletions.
39 changes: 35 additions & 4 deletions cmd/project/project_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package project

import (
"context"
"errors"
"fmt"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"

"github.com/FriendsOfShopware/shopware-cli/internal/phpexec"
"github.com/FriendsOfShopware/shopware-cli/shop"
Expand All @@ -30,14 +32,14 @@ var projectWorkerCmd = &cobra.Command{
queuesToConsume, _ := cobraCmd.Flags().GetString("queue")
memoryLimit, _ := cobraCmd.Flags().GetString("memory-limit")
timeLimit, _ := cobraCmd.Flags().GetString("time-limit")
gracefulStopLimit, _ := cobraCmd.Flags().GetUint("graceful-stop-limit")

if projectRoot, err = findClosestShopwareProject(); err != nil {
return err
}

if len(args) > 0 {
workerAmount, err = strconv.Atoi(args[0])

if err != nil {
return err
}
Expand Down Expand Up @@ -76,15 +78,38 @@ var projectWorkerCmd = &cobra.Command{
for a := 0; a < workerAmount; a++ {
wg.Add(1)
go func(ctx context.Context, index int) {
defer wg.Done()

for {
cmd := phpexec.ConsoleCommand(cancelCtx, consumeArgs...)
cmd.Dir = projectRoot
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Env = append(os.Environ(), fmt.Sprintf("MESSENGER_CONSUMER_NAME=%s-%d", baseName, index))
cmd.WaitDelay = time.Second
cmd.Cancel = func() error {
if gracefulStopLimit > 0 {
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil {
return err
}

now := time.Now()

for time.Since(now) < time.Second*time.Duration(gracefulStopLimit) {
if isProcessStopped(cmd.Process) {
return os.ErrProcessDone
}
time.Sleep(time.Millisecond * 250)
}
}
return cmd.Process.Kill()
}

if err := cmd.Run(); err != nil {
logging.FromContext(ctx).Fatal(err)
if errors.Is(err, context.Canceled) {
break
}
logging.FromContext(ctx).Error(err)
}
}
}(cancelCtx, a)
Expand All @@ -102,14 +127,20 @@ func init() {
projectWorkerCmd.PersistentFlags().String("queue", "", "Queues to consume")
projectWorkerCmd.PersistentFlags().String("memory-limit", "", "Memory Limit")
projectWorkerCmd.PersistentFlags().String("time-limit", "", "Time Limit")
projectWorkerCmd.PersistentFlags().Uint("graceful-stop-limit", 0, "Graceful Stop Limit")
}

func cancelOnTermination(ctx context.Context, cancel context.CancelFunc) {
logging.FromContext(ctx).Infof("setting up a signal handler")
s := make(chan os.Signal, 1)
signal.Notify(s, syscall.SIGTERM)
signal.Notify(s, syscall.SIGTERM, syscall.SIGINT)
go func() {
logging.FromContext(ctx).Infof("received SIGTERM %v\n", <-s)
sig := <-s
logging.FromContext(ctx).Infof("received signal %v\n", sig.String())
cancel()
}()
}

func isProcessStopped(p *os.Process) bool {
return errors.Is(p.Signal(syscall.Signal(0)), os.ErrProcessDone)
}

0 comments on commit 55f0b12

Please sign in to comment.