Skip to content

Commit

Permalink
Fix lock and remove closed field
Browse files Browse the repository at this point in the history
  • Loading branch information
satotake committed Jan 1, 2023
1 parent d304287 commit 2e2980f
Showing 1 changed file with 26 additions and 32 deletions.
58 changes: 26 additions & 32 deletions watcher/filenotify/eventwatcher_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,32 +47,39 @@ type fsEventsWatcher struct {
events chan fsnotify.Event
errors chan error
mu sync.Mutex
closed bool
done chan bool
}

func (w *fsEventsWatcher) Events() <-chan fsnotify.Event {
return w.events
select {
case <-w.done:
return nil
default:
return w.events
}
}

func (w *fsEventsWatcher) Errors() <-chan error {
return w.errors
}

func (w *fsEventsWatcher) Add(path string) error {
w.mu.Lock()
defer w.mu.Unlock()

if w.closed {
select {
case <-w.done:
return errFSEventsWatcherClosed
default:
}

abs, err := filepath.Abs(path)
if err != nil {
return err
}

if _, found := w.streams[abs]; found {
w.mu.Lock()
_, found := w.streams[abs]
w.mu.Unlock()

if found {
return fmt.Errorf("already registered: %s", abs)
}

Expand Down Expand Up @@ -124,7 +131,9 @@ func (w *fsEventsWatcher) add(path string) error {
path,
make(chan bool),
}
w.mu.Lock()
w.streams[path] = stream
w.mu.Unlock()
go func(stream *eventStream) {
stream.Start()
stream.Flush(true)
Expand Down Expand Up @@ -198,7 +207,6 @@ func (s *eventStream) convertEvent(e fsevents.Event) (fsnotify.Event, error) {
}

func (s *eventStream) sendEvent(e fsevents.Event) error {

w := s.watcher
ne, err := s.convertEvent(e)
if err != nil {
Expand All @@ -207,19 +215,12 @@ func (s *eventStream) sendEvent(e fsevents.Event) error {
if ne.Op == 0 {
return nil
}
select {
case <-w.done:
return fmt.Errorf("closed")
case w.events <- ne:
}
w.events <- ne
return nil
}

func (w *fsEventsWatcher) sendErr(e error) error {
select {
case w.errors <- e:
}
return nil
func (w *fsEventsWatcher) sendErr(e error) {
w.errors <- e
}

func (w *fsEventsWatcher) hasParentEventStreamPath(path string) bool {
Expand All @@ -242,8 +243,6 @@ func (w *fsEventsWatcher) getChildEventStreamPaths(path string) (children []stri
}

func (w *fsEventsWatcher) Remove(path string) error {
w.mu.Lock()
defer w.mu.Unlock()
abs, err := filepath.Abs(path)
if err != nil {
return err
Expand All @@ -254,19 +253,15 @@ func (w *fsEventsWatcher) Remove(path string) error {
func (w *fsEventsWatcher) removePaths(paths []string) error {
for _, p := range paths {
if err := w.remove(p); err != nil {
err = w.sendErr(err)
if err != nil {
return err
}
return err
}
}
return nil
}

func (w *fsEventsWatcher) remove(path string) error {
if w.closed {
return nil
}
w.mu.Lock()
defer w.mu.Unlock()

stream, exists := w.streams[path]
if !exists {
Expand All @@ -278,13 +273,12 @@ func (w *fsEventsWatcher) remove(path string) error {
}

func (w *fsEventsWatcher) Close() error {
w.mu.Lock()
defer w.mu.Unlock()

if w.closed {
select {
case <-w.done:
return nil
default:
}
w.closed = true

close(w.done)
for path := range w.streams {
err := w.remove(path)
Expand Down

0 comments on commit 2e2980f

Please sign in to comment.