-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjob.go
151 lines (130 loc) · 3.24 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package fileperf
import (
"context"
"io"
"io/fs"
"path"
"time"
)
type scanJob struct {
// Internal job state
root Dir
ch chan<- fileIterUpdate
cancel context.CancelFunc
// External job requirements
include, exclude []Pattern
sendSkipped bool
// Job statistics and tallies
stats JobStats
}
func executeJob(ctx context.Context, job scanJob) {
// Close the update channel when finished
defer close(job.ch)
// Make sure the cancellation function always gets triggered as clean up
defer job.cancel()
// Walk each file in the directory
err := fs.WalkDir(job.root, ".", func(p string, d fs.DirEntry, dirErr error) error {
// Stop walking the directory if the job has been cancelled
if err := ctx.Err(); err != nil {
return err
}
// Ignore the root directory itself
if p == "." {
return nil
}
// Prepare the file object with our results
file := File{
Root: job.root,
Path: p,
Index: job.stats.Skipped + job.stats.Scanned,
}
// Skip this file if it doesn't pass our file name pattern matching
// filters
{
_, name := path.Split(p)
skip := false
// Handle exclusions
for _, pattern := range job.exclude {
if pattern.Expression.MatchString(name) {
skip = true
break
}
}
// Handle inclusions
if !skip && len(job.include) > 0 {
matched := false
for _, pattern := range job.include {
if pattern.Expression.MatchString(name) {
matched = true
break
}
}
if !matched {
skip = true
}
}
// Record skipped jobs and carry on
if skip {
job.stats.Skipped++
if job.sendSkipped {
file.Skipped = true
select {
case <-ctx.Done():
return ctx.Err()
case job.ch <- fileIterUpdate{file: file, stats: job.stats, updated: time.Now()}:
return nil
}
}
return nil
}
}
// Increment our scanned file count
job.stats.Scanned++
// If an error was reported, such as access denied, record it as a
// scan error
if dirErr != nil {
file.Err = dirErr
} else {
// Attempt to collect more information about the file
info, err := d.Info()
if err != nil {
file.Err = err
} else {
file.Name = info.Name()
file.Size = info.Size()
file.Mode = info.Mode()
file.ModTime = info.ModTime()
}
}
// If we haven't encountered an error, attempt to read the file
if file.Err == nil {
var data []byte
file.Start = time.Now()
data, file.Err = fs.ReadFile(job.root, file.Path)
file.End = time.Now()
job.stats.ElapsedRead += file.End.Sub(file.Start)
job.stats.TotalBytes += int64(len(data))
if file.Err == nil {
job.stats.Read++
}
}
// Tally error statistics
if file.Err != nil {
job.stats.Errors++
}
// Send files to the iterator via the job's channel
select {
case <-ctx.Done():
return ctx.Err()
case job.ch <- fileIterUpdate{file: file, stats: job.stats, updated: time.Now()}:
return nil
}
})
// If no error was encountered, send io.EOF in the last update so it
// doesn't get processed as an incoming file update by the file iterator
if err == nil {
err = io.EOF
}
// Always provide a final update with the completed statistics
job.ch <- fileIterUpdate{streamErr: err, stats: job.stats, updated: time.Now()}
}