From 2e2980fadb6dbd196b8f3261316424c014ee7fc0 Mon Sep 17 00:00:00 2001 From: satotake Date: Sun, 1 Jan 2023 18:50:44 +0900 Subject: [PATCH] Fix lock and remove `closed` field --- watcher/filenotify/eventwatcher_darwin.go | 58 ++++++++++------------- 1 file changed, 26 insertions(+), 32 deletions(-) diff --git a/watcher/filenotify/eventwatcher_darwin.go b/watcher/filenotify/eventwatcher_darwin.go index 2466c919d33..7cc92ae2320 100644 --- a/watcher/filenotify/eventwatcher_darwin.go +++ b/watcher/filenotify/eventwatcher_darwin.go @@ -47,12 +47,16 @@ type fsEventsWatcher struct { events chan fsnotify.Event errors chan error mu sync.Mutex - closed bool done chan bool } func (w *fsEventsWatcher) Events() <-chan fsnotify.Event { - return w.events + select { + case <-w.done: + return nil + default: + return w.events + } } func (w *fsEventsWatcher) Errors() <-chan error { @@ -60,11 +64,10 @@ func (w *fsEventsWatcher) Errors() <-chan error { } func (w *fsEventsWatcher) Add(path string) error { - w.mu.Lock() - defer w.mu.Unlock() - - if w.closed { + select { + case <-w.done: return errFSEventsWatcherClosed + default: } abs, err := filepath.Abs(path) @@ -72,7 +75,11 @@ func (w *fsEventsWatcher) Add(path string) error { return err } - if _, found := w.streams[abs]; found { + w.mu.Lock() + _, found := w.streams[abs] + w.mu.Unlock() + + if found { return fmt.Errorf("already registered: %s", abs) } @@ -124,7 +131,9 @@ func (w *fsEventsWatcher) add(path string) error { path, make(chan bool), } + w.mu.Lock() w.streams[path] = stream + w.mu.Unlock() go func(stream *eventStream) { stream.Start() stream.Flush(true) @@ -198,7 +207,6 @@ func (s *eventStream) convertEvent(e fsevents.Event) (fsnotify.Event, error) { } func (s *eventStream) sendEvent(e fsevents.Event) error { - w := s.watcher ne, err := s.convertEvent(e) if err != nil { @@ -207,19 +215,12 @@ func (s *eventStream) sendEvent(e fsevents.Event) error { if ne.Op == 0 { return nil } - select { - case <-w.done: - return fmt.Errorf("closed") - case w.events <- ne: - } + w.events <- ne return nil } -func (w *fsEventsWatcher) sendErr(e error) error { - select { - case w.errors <- e: - } - return nil +func (w *fsEventsWatcher) sendErr(e error) { + w.errors <- e } func (w *fsEventsWatcher) hasParentEventStreamPath(path string) bool { @@ -242,8 +243,6 @@ func (w *fsEventsWatcher) getChildEventStreamPaths(path string) (children []stri } func (w *fsEventsWatcher) Remove(path string) error { - w.mu.Lock() - defer w.mu.Unlock() abs, err := filepath.Abs(path) if err != nil { return err @@ -254,19 +253,15 @@ func (w *fsEventsWatcher) Remove(path string) error { func (w *fsEventsWatcher) removePaths(paths []string) error { for _, p := range paths { if err := w.remove(p); err != nil { - err = w.sendErr(err) - if err != nil { - return err - } + return err } } return nil } func (w *fsEventsWatcher) remove(path string) error { - if w.closed { - return nil - } + w.mu.Lock() + defer w.mu.Unlock() stream, exists := w.streams[path] if !exists { @@ -278,13 +273,12 @@ func (w *fsEventsWatcher) remove(path string) error { } func (w *fsEventsWatcher) Close() error { - w.mu.Lock() - defer w.mu.Unlock() - - if w.closed { + select { + case <-w.done: return nil + default: } - w.closed = true + close(w.done) for path := range w.streams { err := w.remove(path)