diff --git a/apis/fluentbit/v1alpha2/clusterfluentbitconfig_types.go b/apis/fluentbit/v1alpha2/clusterfluentbitconfig_types.go index dc4183438..2b7c64721 100644 --- a/apis/fluentbit/v1alpha2/clusterfluentbitconfig_types.go +++ b/apis/fluentbit/v1alpha2/clusterfluentbitconfig_types.go @@ -170,7 +170,15 @@ func (s *Service) Params() *params.KVs { m.Insert("Log_Level", s.LogLevel) } if s.ParsersFile != "" { - m.Insert("Parsers_File", s.ParsersFile) + if s.ParsersFile == "parsers.conf" { + // For backwards compatibility, if the "usual" parsers.conf is + // configured, actually write the fully-qualified path in order + // to not break hot-reload. + // See https://github.com/fluent/fluent-bit/issues/8275. + m.Insert("Parsers_File", "/fluent-bit/etc/parsers.conf") + } else { + m.Insert("Parsers_File", s.ParsersFile) + } } if s.Storage != nil { if s.Storage.Path != "" { diff --git a/apis/fluentbit/v1alpha2/clusterfluentbitconfig_types_test.go b/apis/fluentbit/v1alpha2/clusterfluentbitconfig_types_test.go index c47bc9b8c..ea4b7fcb3 100644 --- a/apis/fluentbit/v1alpha2/clusterfluentbitconfig_types_test.go +++ b/apis/fluentbit/v1alpha2/clusterfluentbitconfig_types_test.go @@ -19,7 +19,7 @@ var expected = `[Service] Grace 30 Http_Server true Log_Level info - Parsers_File parsers.conf + Parsers_File /fluent-bit/etc/parsers.conf [Input] Name tail Alias input0_alias @@ -104,7 +104,7 @@ var expectedK8s = `[Service] Grace 30 Http_Server true Log_Level info - Parsers_File parsers.conf + Parsers_File /fluent-bit/etc/parsers.conf [Input] Name tail Path /var/log/containers/*.log diff --git a/cmd/fluent-watcher/fluentbit/main.go b/cmd/fluent-watcher/fluentbit/main.go index 4d19fccc5..712e6b6b5 100644 --- a/cmd/fluent-watcher/fluentbit/main.go +++ b/cmd/fluent-watcher/fluentbit/main.go @@ -2,12 +2,12 @@ package main import ( "context" + "errors" "flag" - "math" + "fmt" "os" "os/exec" - "sync" - "sync/atomic" + "os/signal" "syscall" "time" @@ -15,7 +15,7 @@ import ( "github.com/fsnotify/fsnotify" "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/oklog/run" + "golang.org/x/sync/errgroup" ) const ( @@ -23,276 +23,122 @@ const ( defaultCfgPath = "/fluent-bit/etc/fluent-bit.conf" defaultWatchDir = "/fluent-bit/config" defaultPollInterval = 1 * time.Second - defaultFlbTimeout = 30 * time.Second - - MaxDelayTime = 5 * time.Minute - ResetTime = 10 * time.Minute -) - -var ( - logger log.Logger - cmd *exec.Cmd - flbTerminated chan bool - mutex sync.Mutex - restartTimes int32 - timerCtx context.Context - timerCancel context.CancelFunc ) -var configPath string -var externalPluginPath string -var binPath string -var watchPath string -var poll bool -var exitOnFailure bool -var pollInterval time.Duration -var flbTerminationTimeout time.Duration - func main() { - + var configPath string + var externalPluginPath string + var binPath string + var watchPath string + var poll bool + var pollInterval time.Duration flag.StringVar(&binPath, "b", defaultBinPath, "The fluent bit binary path.") flag.StringVar(&configPath, "c", defaultCfgPath, "The config file path.") flag.StringVar(&externalPluginPath, "e", "", "Path to external plugin (shared lib)") - flag.BoolVar(&exitOnFailure, "exit-on-failure", false, "If fluentbit exits with failure, also exit the watcher.") flag.StringVar(&watchPath, "watch-path", defaultWatchDir, "The path to watch.") flag.BoolVar(&poll, "poll", false, "Use poll watcher instead of ionotify.") flag.DurationVar(&pollInterval, "poll-interval", defaultPollInterval, "Poll interval if using poll watcher.") - flag.DurationVar(&flbTerminationTimeout, "flb-timeout", defaultFlbTimeout, "Time to wait for FluentBit to gracefully terminate before sending SIGKILL.") - flag.Parse() - - logger = log.NewLogfmtLogger(os.Stdout) - logger = log.With(logger, "time", log.TimestampFormat(time.Now, time.RFC3339)) + // Deprecated flags to be removed in one of the next releases. + var exitOnFailure bool + var flbTerminationTimeout time.Duration + flag.BoolVar(&exitOnFailure, "exit-on-failure", false, "Deprecated: This has no effect anymore.") + flag.DurationVar(&flbTerminationTimeout, "flb-timeout", 0, "Deprecated: This has no effect anymore.") - timerCtx, timerCancel = context.WithCancel(context.Background()) - - var g run.Group - { - // Termination handler. - g.Add(run.SignalHandler(context.Background(), os.Interrupt, syscall.SIGTERM)) - } - { - // Watch the Fluent bit, if the Fluent bit not exists or stopped, restart it. - cancel := make(chan struct{}) - g.Add( - func() error { - - for { - select { - case <-cancel: - return nil - default: - } - - // Start fluent bit if it does not existed. - start() - // Wait for the fluent bit exit. - err := wait() - if exitOnFailure && err != nil { - _ = level.Error(logger).Log("msg", "Fluent bit exited with error; exiting watcher") - return err - } - - timerCtx, timerCancel = context.WithCancel(context.Background()) - - // After the fluent bit exit, fluent bit watcher restarts it with an exponential - // back-off delay (1s, 2s, 4s, ...), that is capped at five minutes. - backoff() - } - }, - func(err error) { - close(cancel) - stop() - resetTimer() - }, - ) - } - { - // Watch the config file, if the config file changed, stop Fluent bit. - watcher, err := newWatcher(poll, pollInterval) - if err != nil { - _ = level.Error(logger).Log("err", err) - return - } - - // Start watcher. - err = watcher.Add(watchPath) - if err != nil { - _ = level.Error(logger).Log("err", err) - return - } - - cancel := make(chan struct{}) - g.Add( - func() error { - - for { - select { - case <-cancel: - return nil - case event := <-watcher.Events(): - if !isValidEvent(event) { - continue - } - // After the config file changed, it should stop the fluent bit, - // and resets the restart backoff timer. - if cmd != nil { - _ = level.Info(logger).Log("msg", "Config file changed, stopping Fluent Bit") - stop() - resetTimer() - _ = level.Info(logger).Log("msg", "Config file changed, stopped Fluent Bit") - } - case <-watcher.Errors(): - _ = level.Error(logger).Log("msg", "Watcher stopped") - return nil - } - } - }, - func(err error) { - _ = watcher.Close() - close(cancel) - }, - ) - } - - if err := g.Run(); err != nil { - _ = level.Error(logger).Log("err", err) - os.Exit(1) - } - _ = level.Info(logger).Log("msg", "See you next time!") -} + flag.Parse() -func newWatcher(poll bool, interval time.Duration) (filenotify.FileWatcher, error) { - var err error - var watcher filenotify.FileWatcher + signalCtx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer cancel() - if poll { - watcher = filenotify.NewPollingWatcher(interval) - } else { - watcher, err = filenotify.New(interval) - } + logger := log.NewLogfmtLogger(os.Stdout) + logger = log.With(logger, "time", log.TimestampFormat(time.Now, time.RFC3339)) - if err != nil { - return nil, err + if exitOnFailure { + level.Warn(logger).Log("--exit-on-failure is deprecated. The process will exit no matter what if fluent-bit exits so this can safely be removed.") } - - return watcher, nil -} - -// Inspired by https://github.com/jimmidyson/configmap-reload -func isValidEvent(event fsnotify.Event) bool { - return event.Op == fsnotify.Create || event.Op == fsnotify.Write -} - -func start() { - mutex.Lock() - defer mutex.Unlock() - - if cmd != nil { - return + if flbTerminationTimeout > 0 { + level.Warn(logger).Log("--flb-timeout is deprecated. Consider setting the terminationGracePeriod field on the `(Cluster)FluentBit` instance.") } + // First, launch the fluent-bit process. + args := []string{"--enable-hot-reload", "-c", configPath} if externalPluginPath != "" { - cmd = exec.Command(binPath, "-c", configPath, "-e", externalPluginPath) - } else { - cmd = exec.Command(binPath, "-c", configPath) + args = append(args, "-e", externalPluginPath) } + cmd := exec.Command(binPath, args...) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr - flbTerminated = make(chan bool, 1) if err := cmd.Start(); err != nil { - _ = level.Error(logger).Log("msg", "start Fluent bit error", "error", err) - cmd = nil - return + _ = level.Error(logger).Log("msg", "failed to start fluent-bit", "error", err) + os.Exit(1) } + _ = level.Info(logger).Log("msg", "fluent-bit started") - _ = level.Info(logger).Log("msg", "Fluent bit started") -} - -func wait() error { - mutex.Lock() - if cmd == nil { - mutex.Unlock() + grp, grpCtx := errgroup.WithContext(context.Background()) + grp.Go(func() error { + // Watch the process. If it exits, we want to crash immediately. + defer cancel() + if err := cmd.Wait(); err != nil { + return fmt.Errorf("failed to run fluent-bit: %w", err) + } return nil - } - mutex.Unlock() - - startTime := time.Now() - - err := cmd.Wait() - if err != nil { - _ = level.Error(logger).Log("msg", "Fluent bit exited", "error", err) - } - cmd = nil - flbTerminated <- true - - // Once the fluent bit has executed for 10 minutes without any problems, - // it should resets the restart backoff timer. - if time.Since(startTime) >= ResetTime { - atomic.StoreInt32(&restartTimes, 0) - } - - return err -} - -func backoff() { - - delayTime := time.Duration(math.Pow(2, float64(atomic.LoadInt32(&restartTimes)))) * time.Second - if delayTime >= MaxDelayTime { - delayTime = MaxDelayTime - } - - _ = level.Info(logger).Log("msg", "backoff", "delay", delayTime) + }) + grp.Go(func() error { + // Watch the config as it's loaded into the pod and trigger a config reload. + var watcher filenotify.FileWatcher + if poll { + watcher = filenotify.NewPollingWatcher(pollInterval) + } else { + var err error + watcher, err = filenotify.NewEventWatcher() + if err != nil { + return fmt.Errorf("failed to open event watcher: %w", err) + } + } - startTime := time.Now() + if err := watcher.Add(watchPath); err != nil { + return fmt.Errorf("failed to watch path %q: %w", watchPath, err) + } - timer := time.NewTimer(delayTime) - defer timer.Stop() + for { + select { + case <-signalCtx.Done(): + return nil + case <-grpCtx.Done(): + return nil + case event := <-watcher.Events(): + if !isValidEvent(event) { + continue + } + _ = level.Info(logger).Log("msg", "Config file changed, reloading...") + if err := cmd.Process.Signal(syscall.SIGHUP); err != nil { + return fmt.Errorf("failed to reload config: %w", err) + } + case err := <-watcher.Errors(): + return fmt.Errorf("failed the watcher: %w", err) + } + } + }) select { - case <-timerCtx.Done(): - _ = level.Info(logger).Log("msg", "context cancel", "actual", time.Since(startTime), "expected", delayTime) - - atomic.StoreInt32(&restartTimes, 0) - - return - case <-timer.C: - _ = level.Info(logger).Log("msg", "backoff timer done", "actual", time.Since(startTime), "expected", delayTime) - - atomic.AddInt32(&restartTimes, 1) - - return - } -} - -func stop() { - - mutex.Lock() - defer mutex.Unlock() - - if cmd == nil || cmd.Process == nil { - _ = level.Info(logger).Log("msg", "Fluent Bit not running. No process to stop.") - return + case <-signalCtx.Done(): + case <-grpCtx.Done(): } - // Send SIGTERM, if fluent-bit doesn't terminate in the specified timeframe, send SIGKILL - if err := cmd.Process.Signal(syscall.SIGTERM); err != nil { - _ = level.Info(logger).Log("msg", "Error while terminating FluentBit", "error", err) - } else { - _ = level.Info(logger).Log("msg", "Sent SIGTERM to FluentBit, waiting max "+flbTerminationTimeout.String()) + // Always try to gracefully shut down fluent-bit. This will allow `cmd.Wait` above to finish + // and thus allow `grp.Wait` below to return. + if err := cmd.Process.Signal(syscall.SIGTERM); err != nil && !errors.Is(err, os.ErrProcessDone) { + _ = level.Error(logger).Log("msg", "Failed to send SIGTERM to fluent-bit", "error", err) + // Do not exit on error here. The process might've died and that's okay. } - select { - case <-time.After(flbTerminationTimeout): - _ = level.Info(logger).Log("msg", "FluentBit failed to terminate gracefully, killing process") - cmd.Process.Kill() - <-flbTerminated - case <-flbTerminated: - _ = level.Info(logger).Log("msg", "FluentBit terminated successfully") + if err := grp.Wait(); err != nil { + _ = level.Error(logger).Log("msg", "Failure during the run time of fluent-bit", "error", err) + os.Exit(1) } } -func resetTimer() { - timerCancel() - atomic.StoreInt32(&restartTimes, 0) +// Inspired by https://github.com/jimmidyson/configmap-reload +func isValidEvent(event fsnotify.Event) bool { + return event.Op == fsnotify.Create || event.Op == fsnotify.Write } diff --git a/go.mod b/go.mod index b21b2c76f..178beed42 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/oklog/run v1.1.0 github.com/onsi/ginkgo v1.16.5 github.com/onsi/gomega v1.30.0 + golang.org/x/sync v0.6.0 k8s.io/api v0.26.3 k8s.io/apimachinery v0.27.4 k8s.io/client-go v0.26.3 diff --git a/go.sum b/go.sum index af3c1d1ee..7439d0cf9 100644 --- a/go.sum +++ b/go.sum @@ -408,6 +408,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= +golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=