Skip to content

Commit

Permalink
kqueue: less mutexes
Browse files Browse the repository at this point in the history
closes howeyc#13
  • Loading branch information
nathany committed Sep 24, 2014
1 parent 9e50462 commit 3de491f
Showing 1 changed file with 60 additions and 87 deletions.
147 changes: 60 additions & 87 deletions kqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,17 @@ import (
type Watcher struct {
Events chan Event
Errors chan error
done chan bool // Channel for sending a "quit message" to the reader goroutine

kq int // File descriptor (as returned by the kqueue() syscall).
kq int // File descriptor (as returned by the kqueue() syscall).

mu sync.Mutex // Protects access to watcher data
watches map[string]int // Map of watched file descriptors (key: path).
externalWatches map[string]bool // Map of watches added by user of the library.
dirFlags map[string]uint32 // Map of watched directories to fflags used in kqueue.
paths map[int]pathInfo // Map file descriptors to path names for processing kqueue events.
fileExists map[string]bool // Keep track of if we know this file exists (to stop duplicate create events).
done chan bool // Channel for sending a "quit message" to the reader goroutine
isClosed bool // Set to true when Close() is first called

mu sync.Mutex // Mutex for the Watcher itself (isClosed).

wmut sync.Mutex // Protects access to watches.
pmut sync.Mutex // Protects access to paths.
ewmut sync.Mutex // Protects access to externalWatches.

dirmut sync.Mutex // Protects access to dirFlags.
femut sync.Mutex // Protects access to fileExists.
}

type pathInfo struct {
Expand Down Expand Up @@ -81,9 +74,10 @@ func (w *Watcher) Close() error {

// Send "quit" message to the reader goroutine:
w.done <- true
w.wmut.Lock()

w.mu.Lock()
ws := w.watches
w.wmut.Unlock()
w.mu.Unlock()
for name := range ws {
w.Remove(name)
}
Expand All @@ -93,18 +87,18 @@ func (w *Watcher) Close() error {

// Add starts watching the named file or directory (non-recursively).
func (w *Watcher) Add(name string) error {
w.ewmut.Lock()
w.mu.Lock()
w.externalWatches[name] = true
w.ewmut.Unlock()
w.mu.Unlock()
return w.addWatch(name, noteAllEvents)
}

// Remove stops watching the the named file or directory (non-recursively).
func (w *Watcher) Remove(name string) error {
name = filepath.Clean(name)
w.wmut.Lock()
w.mu.Lock()
watchfd, ok := w.watches[name]
w.wmut.Unlock()
w.mu.Unlock()
if !ok {
return fmt.Errorf("can't remove non-existent kevent watch for: %s", name)
}
Expand All @@ -116,32 +110,26 @@ func (w *Watcher) Remove(name string) error {

syscall.Close(watchfd)

w.wmut.Lock()
delete(w.watches, name)
w.wmut.Unlock()
w.dirmut.Lock()
delete(w.dirFlags, name)
w.dirmut.Unlock()
w.pmut.Lock()
w.mu.Lock()
isDir := w.paths[watchfd].isDir
delete(w.watches, name)
delete(w.paths, watchfd)
w.pmut.Unlock()
delete(w.dirFlags, name)
w.mu.Unlock()

// Find all watched paths that are in this directory that are not external.
if isDir {
var pathsToRemove []string
w.pmut.Lock()
w.mu.Lock()
for _, path := range w.paths {
wdir, _ := filepath.Split(path.name)
if filepath.Clean(wdir) == filepath.Clean(name) {
w.ewmut.Lock()
if filepath.Clean(wdir) == name {
if !w.externalWatches[path.name] {
pathsToRemove = append(pathsToRemove, path.name)
}
w.ewmut.Unlock()
}
}
w.pmut.Unlock()
w.mu.Unlock()
for _, name := range pathsToRemove {
// Since these are internal, not much sense in propagating error
// to the user, as that will just confuse them with an error about
Expand All @@ -162,34 +150,29 @@ var keventWaitTime = durationToTimespec(100 * time.Millisecond)
// addWatch adds name to the watched file set.
// The flags are interpreted as described in kevent(2).
func (w *Watcher) addWatch(name string, flags uint32) error {
var isDir bool
// Make ./name and name equivalent
name = filepath.Clean(name)

w.mu.Lock()
if w.isClosed {
w.mu.Unlock()
return errors.New("kevent instance already closed")
}
w.mu.Unlock()

// Make ./name and name equivalent
name = filepath.Clean(name)

w.wmut.Lock()
watchfd, alreadyWatching := w.watches[name]
w.wmut.Unlock()

var isDir bool

// We already have a watch, but we can still override flags.
if alreadyWatching {
// We already have a watch, but we can still override flags
w.pmut.Lock()
isDir = w.paths[watchfd].isDir
w.pmut.Unlock()
} else {
}
w.mu.Unlock()

if !alreadyWatching {
fi, err := os.Lstat(name)
if err != nil {
return err
}

// don't watch socket
// Don't watch sockets.
if fi.Mode()&os.ModeSocket == os.ModeSocket {
return nil
}
Expand Down Expand Up @@ -227,24 +210,21 @@ func (w *Watcher) addWatch(name string, flags uint32) error {
}

if !alreadyWatching {
w.wmut.Lock()
w.mu.Lock()
w.watches[name] = watchfd
w.wmut.Unlock()

w.pmut.Lock()
w.paths[watchfd] = pathInfo{name: name, isDir: isDir}
w.pmut.Unlock()
w.mu.Unlock()
}

if isDir {
// Watch the directory if it has not been watched before,
// or if it was watched before, but perhaps only a NOTE_DELETE (watchDirectoryFiles)
w.dirmut.Lock()
w.mu.Lock()
watchDir := (flags&syscall.NOTE_WRITE) == syscall.NOTE_WRITE &&
(!alreadyWatching || (w.dirFlags[name]&syscall.NOTE_WRITE) != syscall.NOTE_WRITE)
// Store flags so this watch can be updated later
w.dirFlags[name] = flags
w.dirmut.Unlock()
w.mu.Unlock()

if watchDir {
if err := w.watchDirectoryFiles(name); err != nil {
Expand All @@ -255,18 +235,18 @@ func (w *Watcher) addWatch(name string, flags uint32) error {
return nil
}

// readEvents reads from the kqueue file descriptor, converts the
// received events into Event objects and sends them via the Events channel
// readEvents reads from kqueue and converts the received kevents into
// Event values that it sends down the Events channel.
func (w *Watcher) readEvents() {
eventBuffer := make([]syscall.Kevent_t, 10)

for {
// See if there is a message on the "done" channel
select {
case <-w.done:
errno := syscall.Close(w.kq)
if errno != nil {
w.Errors <- os.NewSyscallError("close", errno)
err := syscall.Close(w.kq)
if err != nil {
w.Errors <- os.NewSyscallError("close", err)
}
close(w.Events)
close(w.Errors)
Expand All @@ -284,18 +264,16 @@ func (w *Watcher) readEvents() {

// Flush the events we received to the Events channel
for len(kevents) > 0 {
watchEvent := &kevents[0]
watchfd := int(watchEvent.Ident)
mask := uint32(watchEvent.Fflags)

w.pmut.Lock()
kevent := &kevents[0]
watchfd := int(kevent.Ident)
mask := uint32(kevent.Fflags)
w.mu.Lock()
path := w.paths[watchfd]
w.pmut.Unlock()

w.mu.Unlock()
event := newEvent(path.name, mask)

if path.isDir && !(event.Op&Remove == Remove) {
// Double check to make sure the directory exist. This can happen when
// Double check to make sure the directory exists. This can happen when
// we do a rm -fr on a recursively watched folders and we receive a
// modification event first but the folder has been deleted and later
// receive the delete event
Expand All @@ -312,28 +290,20 @@ func (w *Watcher) readEvents() {
w.Events <- event
}

// Move to next event
kevents = kevents[1:]

if event.Op&Rename == Rename {
if event.Op&Rename == Rename || event.Op&Remove == Remove {
w.Remove(event.Name)
w.femut.Lock()
w.mu.Lock()
delete(w.fileExists, event.Name)
w.femut.Unlock()
w.mu.Unlock()
}
if event.Op&Remove == Remove {
w.Remove(event.Name)
w.femut.Lock()
delete(w.fileExists, event.Name)
w.femut.Unlock()

// Look for a file that may have overwritten this.
// For example, mv f1 f2 will delete f2, then create f2.
fileDir, _ := filepath.Split(event.Name)
fileDir = filepath.Clean(fileDir)
w.wmut.Lock()
w.mu.Lock()
_, found := w.watches[fileDir]
w.wmut.Unlock()
w.mu.Unlock()
if found {
// make sure the directory exists before we watch for changes. When we
// do a recursive watch and perform rm -fr, the parent directory might
Expand All @@ -345,6 +315,9 @@ func (w *Watcher) readEvents() {
}
}
}

// Move to next event
kevents = kevents[1:]
}
}
}
Expand Down Expand Up @@ -385,9 +358,9 @@ func (w *Watcher) watchDirectoryFiles(dirPath string) error {
return err
}

w.femut.Lock()
w.mu.Lock()
w.fileExists[filePath] = true
w.femut.Unlock()
w.mu.Unlock()
}

return nil
Expand All @@ -407,9 +380,9 @@ func (w *Watcher) sendDirectoryChangeEvents(dirPath string) {
// Search for new files
for _, fileInfo := range files {
filePath := filepath.Join(dirPath, fileInfo.Name())
w.femut.Lock()
w.mu.Lock()
_, doesExist := w.fileExists[filePath]
w.femut.Unlock()
w.mu.Unlock()
if !doesExist {
// Send create event
w.Events <- newCreateEvent(filePath)
Expand All @@ -420,19 +393,19 @@ func (w *Watcher) sendDirectoryChangeEvents(dirPath string) {
return
}

w.femut.Lock()
w.mu.Lock()
w.fileExists[filePath] = true
w.femut.Unlock()
w.mu.Unlock()
}
}

func (w *Watcher) internalWatch(name string, fileInfo os.FileInfo) error {
if fileInfo.IsDir() {
// mimic Linux providing delete events for subdirectories
// but preserve the flags used if currently watching subdirectory
w.dirmut.Lock()
w.mu.Lock()
flags := w.dirFlags[name]
w.dirmut.Unlock()
w.mu.Unlock()

flags |= syscall.NOTE_DELETE
return w.addWatch(name, flags)
Expand Down Expand Up @@ -469,7 +442,7 @@ func register(kq int, fds []int, flags int, fflags uint32) error {
return nil
}

// read retrieves pending events
// read retrieves pending events, or waits until an event occurs.
// A timeout of nil blocks indefinitely, while 0 polls the queue.
func read(kq int, events []syscall.Kevent_t, timeout *syscall.Timespec) ([]syscall.Kevent_t, error) {
n, err := syscall.Kevent(kq, nil, events, timeout)
Expand Down

0 comments on commit 3de491f

Please sign in to comment.