forked from erikfrey/bashreduce
-
Notifications
You must be signed in to change notification settings - Fork 6
/
br
executable file
·183 lines (164 loc) · 4.82 KB
/
br
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
#!/bin/bash
# bashreduce: mapreduce in bash
usage() {
echo "Usage: $1 [-h '<host>[ <host>][...]'] [-f]" >&2
echo " [-m <map>] [-r <reduce>] [-M <merge>]" >&2
echo " [-i <input>] [-o <output>] [-c <column>]" \
"[-S <sort-mem-MB>]" >&2
echo " [-t <tmp>] [-?]" >&2
if [ -n "$2" ] ; then
echo " -h hosts to use; repeat hosts for multiple cores" >&2
echo " (defaults to contents of /etc/br.hosts)" >&2
echo " -f send filenames, not data, over the network," >&2
echo " implies each host has a mirror of the dataset" >&2
echo " -m map program (effectively defaults to cat)" >&2
echo " -r reduce program" >&2
echo " -M merge program" >&2
echo " -i input file or directory (defaults to stdin)" >&2
echo " -o output file (defaults to stdout)" >&2
echo " -c column used by sort (defaults to 1)" >&2
echo " -S memory to use for sort (defaults to 256M)" >&2
echo " -t tmp directory (defaults to /tmp)" >&2
echo " -? this help message" >&2
fi
exit 2
}
# Defaults
hosts=
filenames=false
map=
reduce=
merge=
input=
output=
column=1
sort_mem=256M
tmp=/tmp
program=$(basename $0)
while getopts "h:fm:r:M:i:o:c:S:t:?" name; do
case "$name" in
h) hosts=$OPTARG;;
f) filenames=true;;
m) map=$OPTARG;;
r) reduce=$OPTARG;;
M) merge=$OPTARG;;
i) input=$OPTARG;;
o) output=$OPTARG;;
c) column=$OPTARG;;
S) sort_mem=$OPTARG;;
t) tmp=$OPTARG;;
?) usage $program MOAR;;
*) usage $program;;
esac
done
# If -h wasn't given, try /etc/br.hosts
if [[ -z "$hosts" ]]; then
if [[ -e /etc/br.hosts ]]; then
hosts=$(cat /etc/br.hosts)
else
echo "$program: must specify -h or provide /etc/br.hosts"
usage $program
fi
fi
# Start br_stderr from a clean slate
cp /dev/null $tmp/br_stderr
# Setup map and reduce as parts of a pipeline
[[ -n "$map" ]] && map="| $map 2>>$tmp/br_stderr"
[[ $filenames == true ]] && map="| xargs -n1 \
sh -c 'zcat \$0 2>>$tmp/br_stderr || cat \$0 2>>$tmp/br_stderr' $map"
[[ -n "$reduce" ]] && reduce="| $reduce 2>>$tmp/br_stderr"
jobid="$(uuidgen)"
jobpath="$tmp/br_job_$jobid"
nodepath="$tmp/br_node_$jobid"
mkdir -p $jobpath/{in,out}
port_in=8192
port_out=$(($port_in + 1))
host_idx=0
out_files=
for host in $hosts; do
mkfifo $jobpath/{in,out}/$host_idx
# Listen for work (remote)
ssh -n $host "mkdir -p $nodepath/"
pid=$(ssh -n $host "nc -l -p $port_out >$nodepath/$host_idx \
2>>$tmp/br_stderr </dev/null & jobs -l" \
| awk {'print $2'})
# Do work (remote)
ssh -n $host "tail -s0.1 -f --pid=$pid $nodepath/$host_idx \
2>>$tmp/br_stderr </dev/null \
| LC_ALL='$LC_ALL' sort -S$sort_mem -T$tmp -k$column,$column \
2>>$tmp/br_stderr \
$map $reduce \
| nc -q0 -l -p $port_in >>$tmp/br_stderr &"
# Send work (local)
nc $host $port_in >$jobpath/in/$host_idx &
# Receive results (local)
nc -q0 $host $port_out <$jobpath/out/$host_idx &
out_files="$out_files $jobpath/out/$host_idx"
# ++i
port_in=$(($port_in + 2))
port_out=$(($port_in + 1))
host_idx=$(($host_idx + 1))
done
# Create the command to produce input
if [[ -d "$input" ]]; then
input="find $input -type f |"
[[ $filenames == false ]] && input="$input xargs -n1 \
sh -c 'zcat \$0 2>>$tmp/br_stderr || cat \$0 2>>$tmp/br_stderr' |"
elif [[ -f "$input" ]]; then
input="sh -c 'zcat $input 2>>$tmp/br_stderr \
|| cat $input 2>>$tmp/br_stderr' |"
else
input=
fi
# Partition local input to the remote workers
if which brp >>$tmp/br_stderr; then
BRP=brp
elif [[ -f brutils/brp ]]; then
BRP=brutils/brp
fi
if [[ -n "$BRP" ]]; then
eval "$input $BRP - $(($column - 1)) $out_files"
else
# use awk if we don't have brp
# we're taking advantage of a special property that awk leaves its file handles open until its done
# i think this is universal
# we're also sending a zero length string to all the handles at the end, in case some pipe got no love
eval "$input awk '{
srand(\$$column);
print \$0 >>\"$jobpath/out/\"int(rand() * $host_idx);
}
END {
for (i = 0; i != $host_idx; ++i)
printf \"\" >>\"$jobpath/out/\"i;
}'"
fi
# Merge output from hosts into one
# Maybe use the -M program, if not just sort (preferring brm)
if which brm >>$tmp/br_stderr; then
BRM=brm
elif [[ -f brutils/brm ]]; then
BRM=brutils/brm
fi
if [[ -n "$merge" ]]; then
eval "find $jobpath/in -type p | xargs cat \
| $merge 2>>$tmp/br_stderr ${output:+| pv >$output}"
else
if [[ -n "$BRM" ]]; then
eval "$BRM - $(($column - 1)) $(find $jobpath/in/ -type p | xargs) \
${output:+| pv >$output}"
else
# use sort -m if we don't have brm
# sort -m creates tmp files if too many input files are specified
# brm doesn't do this
eval "sort -k$column,$column -S$sort_mem -m $jobpath/in/* \
${output:+| pv >$output}"
fi
fi
# Cleanup
rm -rf $jobpath
for host in $hosts; do
ssh $host "rm -rf $nodepath"
done
# TODO: is there a safe way to kill subprocesses upon fail?
# this seems to work: /bin/kill -- -$$