-
Notifications
You must be signed in to change notification settings - Fork 9
/
parallel.Rnw
1015 lines (738 loc) · 40.4 KB
/
parallel.Rnw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
%% LyX 2.0.6 created this file. For more info, see http://www.lyx.org/.
%% Do not edit unless you really know what you are doing.
\documentclass[12pt]{article}
\usepackage[T1]{fontenc}
\usepackage[latin9]{inputenc}
\usepackage[letterpaper]{geometry}
\geometry{verbose,tmargin=1in,bmargin=1in,lmargin=1in,rmargin=1in}
\usepackage{setspace}
\onehalfspacing
\usepackage[unicode=true]
{hyperref}
\usepackage{breakurl}
\makeatletter
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% User specified LaTeX commands.
\usepackage{/accounts/gen/vis/paciorek/latex/paciorek-asa,times,graphics}
\input{/accounts/gen/vis/paciorek/latex/paciorekMacros}
%\renewcommand{\baselinestretch}{1.5}
\usepackage[unicode=true]{hyperref}
\hypersetup{unicode=true, pdfusetitle,
bookmarks=true,bookmarksnumbered=false,bookmarksopen=false,
breaklinks=false,pdfborder={0 0 1},backref=false,colorlinks=true,}
\makeatother
\begin{document}
\title{An Introduction to Parallel Processing in R, \\
Including Use on Clusters and in the Cloud}
\author{Chris Paciorek\\
Department of Statistics\\
University of California, Berkeley}
\maketitle
<<setup, include=FALSE, cache=FALSE>>=
options(replace.assign=TRUE, width=55)
@
<<read-chunk, echo=FALSE>>=
read_chunk('parallel.R')
read_chunk('doMPIexample.R')
read_chunk('RmpiExample.R')
read_chunk('parallel.sh')
read_chunk('starcluster.sh')
read_chunk('pbd-mpi.R')
read_chunk('pbd-apply.R')
read_chunk('pbd-linalg.R')
@
\section{How to follow and try out this material}
Note that my examples here will be silly toy examples for the purpose
of keeping things simple and focused on the parallelization approaches.
The demo code embedded in this document is available in the various
.R and .sh files in the Github repository, \href{https://github.com/berkeley-scf/parallelR-biostat-2015}{https://github.com/berkeley-scf/parallelR-biostat-2015}.
This document was created using Sweave.
I will do demos on an Ubuntu Linux virtual machine (VM), on the biostat
cluster, and on Amazon's AWS.
We'll use the \href{http://bce.berkeley.edu}{BCE Virtual Machine}.
You can run this on your laptop (see the \href{http://bce.berkeley.edu/install.html}{BCE installation instructions}),
and I encourage you to do so to follow along. Note: to allow for parallelization,
before starting the BCE VM, go to \texttt{Machine > Settings}, select
\texttt{System} and increase the number of processors. You may also
want to increase the amount of memory.
We'll also use BCE as the basis for the virtual machines we start
on Amazon's AWS.
\section{Overview of parallel processing computers}
There are two basic flavors of parallel processing (leaving aside
GPUs): distributed memory and shared memory. With shared memory, multiple
processors (which I'll call cores) share the same memory. With distributed
memory, you have multiple nodes, each with their own memory. You can
think of each node as a separate computer connected by a fast network.
\subsection{Some useful terminology:}
\begin{itemize}
\item \emph{cores}: We'll use this term to mean the different processing
units available on a single node.
\item \emph{nodes}: We'll use this term to mean the different computers,
each with their own distinct memory, that make up a cluster or supercomputer.
\item \emph{processes}: computational tasks executing on a machine. A given
program may start up multiple processes at once. Ideally we have no
more processes than cores on a node.
\item \emph{thread}s: multiple paths of execution within a single process;
the OS sees the threads as a single process, but one can think of
them as 'lightweight' processes. Ideally when considering the processes
and their threads, we would have no more processes and threads combined
than cores on a node.
\item \emph{forking}: child processes are spawned that are identical to
the parent, but with different process id's and their own memory.
\item \emph{sockets}: some of R's parallel functionality involves creating
new R processes (e.g., starting processes via \emph{Rscript}) and
communicating with them via a communication technology called sockets.
\end{itemize}
\subsection{Shared memory}
For shared memory parallelism, each core is accessing the same memory
so there is no need to pass information (in the form of messages)
between different machines. But in some programming contexts one needs
to be careful that activity on different cores doesn't mistakenly
overwrite places in memory that are used by other cores.
The shared memory parallelism approaches that we'll cover are:
\begin{enumerate}
\item threaded linear algebra and
\item simple parallelization of embarrassingly parallel computations.
\end{enumerate}
\paragraph*{Threading}
Threads are multiple paths of execution within a single process. Using
\emph{top} to monitor a job that is executing threaded code, you'll
see the process using more than 100\% of CPU. When this occurs, the
process is using multiple cores, although it appears as a single process
rather than as multiple processes. In general, threaded code will
detect the number of cores available on a machine and make use of
them. However, you can also explicitly control the number of threads
available to a process.
\subsection{Distributed memory}
Parallel programming for distributed memory parallelism requires passing
messages between the different nodes. The standard protocol for doing
this is MPI, of which there are various versions, including \emph{openMPI}.
The Python package \emph{mpi4py} implements MPI in Python and the
R package \emph{Rmpi} implements MPI in R.
Some of the distributed memory approaches that we'll cover are:
\begin{enumerate}
\item simple parallelization of embarrassingly parallel computations,
\item using MPI for explicit distributed memory processing, and
\item distributed linear algebra.
\end{enumerate}
\subsection{Other type of parallel processing}
We won't cover either of the following in this material.
\subsubsection{GPUs}
GPUs (Graphics Processing Units) are processing units originally designed
for rendering graphics on a computer quickly. This is done by having
a large number of simple processing units for massively parallel calculation.
The idea of general purpose GPU (GPGPU) computing is to exploit this
capability for general computation.
In spring 2014, I gave a \href{http://statistics.berkeley.edu/computing/gpu}{workshop on using GPUs}.
One easy way to use a GPU is on an Amazon EC2 virtual machine.
\subsubsection{Spark and Hadoop}
Spark and Hadoop are systems for implementing computations in a distributed
memory environment, using the MapReduce approach. In fall 2014 I gave
a \href{http://statistics.berkeley.edu/computing/spark}{workshop on Spark}.
One easy way to use Spark is on a cluster of Amazon EC2 virtual machines.
\section{Basic suggestions for parallelizing your code}
The easiest situation is when your code is embarrassingly parallel,
which means that the different tasks can be done independently and
the results collected. When the tasks need to interact, things get
much harder. Much of the material here is focused on embarrassingly
parallel computation.
The following are some basic principles/suggestions for how to parallelize
your computation.
\begin{itemize}
\item If you can do your computation on the cores of a single node using
shared memory, that will be faster than using the same number of cores
(or even somewhat more cores) across multiple nodes. Similarly, jobs
with a lot of data/high memory requirements that one might think of
as requiring Hadoop may in some cases be much faster if you can find
a single machine with a lot of memory.
\begin{itemize}
\item That said, if you would run out of memory on a single node, then you'll
need to use distributed memory.
\end{itemize}
\item If you have nested loops, you generally only want to parallelize at
one level of the code. That said, there may be cases in which it is
helpful to do both. Keep in mind whether your linear algebra is being
threaded. Often you will want to parallelize over a loop and not use
threaded linear algebra. (That said, if you have multiple nodes, you
might have one task per node and use threaded linear algebra to exploit
the cores on each node.)
\item Often it makes sense to parallelize the outer loop when you have nested
loops.
\item You generally want to parallelize in such a way that your code is
load-balanced and does not involve too much communication.
\begin{itemize}
\item If you have very few tasks, particularly if the tasks take different
amounts of time, often some processors will be idle and your code
poorly load-balanced.
\item If you have very many tasks and each one takes little time, the communication
overhead of starting and stopping the tasks will reduce efficiency.
\end{itemize}
\end{itemize}
I'm happy to help discuss specific circumstances, so just email [email protected].
The new Berkeley Research Computing (BRC) initiative is also providing
consulting on efficient parallelization strategies. If my expertise
is not sufficient, I can help you get assistance from BRC.
\paragraph{Static vs. dynamic assignment of tasks}
Some of R's parallel functions allow you to say whether the tasks
can be divided up and allocated to the workers at the beginning or
whether tasks should be assigned individually as previous tasks complete.
E.g., the \emph{mc.preschedule} argument in \emph{mclapply()} and
the \emph{.scheduling} argument in \emph{parLapply()}.
Basically if you have many tasks that each take similar time, you
want to preschedule to reduce communication. If you have few tasks
or tasks with highly variable completion times, you don't want to
preschedule, to improve load-balancing.
\section{Threaded linear algebra and the BLAS }
The BLAS is the library of basic linear algebra operations (written
in Fortran or C). A fast BLAS can greatly speed up linear algebra
relative to the default BLAS on a machine. Some fast BLAS libraries
are Intel's \emph{MKL}, AMD's \emph{ACML}, and the open source (and
free) \emph{openBLAS} (formerly \emph{GotoBLAS}). For the Mac, there
is \emph{vecLib} BLAS. All of these BLAS libraries are now threaded
- if your computer has multiple cores and there are free resources,
your linear algebra will use multiple cores, provided your program
is linked against the specific BLAS and provided OMP\_NUM\_THREADS
is not set to one. (Macs make use of VECLIB\_MAXIMUM\_THREADS rather
than OMP\_NUM\_THREADS.)
\subsection{Fixing the number of threads (cores used)}
In general, if you want to limit the number of threads used, you can
set the OMP\_NUM\_THREADS environment variable on UNIX machine. This
can be used in the context of R or C code that uses BLAS or your own
threaded C code, but this does not work with Matlab. In the UNIX shell,
you'd do this as follows (e.g. to limit to 3 cores):
\texttt{export OMP\_NUM\_THREADS=3 \# bash}
\texttt{setenv OMP\_NUM\_THREADS 3 \# tcsh}
If you are running R, you'd need to do this in your shell session
before invoking R.
\subsection{Using threading in R}
Threading in R is limited to linear algebra, for which R calls external
BLAS and LAPACK libraries.
Here's some code that when run in an R executable linked to a threaded
BLAS illustrates the speed of using a threaded BLAS:
<<RlinAlg, cache=TRUE, eval=TRUE>>=
@
\subsubsection{Setting up R with a fast BLAS}
R on the Biostat cluster is linked to openBLAS. So you should be able
to use OMP\_NUM\_THREADS to control the number of threads used for
linear algebra on the Biostat cluster.
In general, the \href{http://cran.r-project.org/manuals.html}{R installation manual}
gives information on how to link R to a fast BLAS. On Ubuntu Linux,
if you install openBLAS as follows, the \emph{/etc/alternatives} system
will set \emph{/usr/lib/libblas.so} to point to openBLAS. This is
what is done in the BCE virtual machine. By default on Ubuntu, R will
use the system BLAS, so it will as a result use openBLAS.
<<etc-alternatives, eval=FALSE, engine='bash'>>=
@
To use a fast, threaded BLAS enabled on your own Mac, do the following
as the administrative user:
<<MacBLAS, eval=FALSE, engine='bash'>>=
@
\subsubsection{Important warnings about use of threaded BLAS}
\paragraph{Conflict between openBLAS and some parallel functionality in R}
There are conflicts between forking in R and threaded BLAS that in
some cases affect \emph{foreach} (when using the \emph{multicore}
and \emph{parallel} backends), \emph{mclapply()}, and (only if \emph{cluster()}
is set up with forking (not the default)) \emph{par\{L,S,\}apply()}.
The result is that if linear algebra is used within your parallel
code, R hangs. This affects (under somewhat different circumstances)
both ACML and openBLAS.
To address this, before running an R job that does linear algebra,
you can set OMP\_NUM\_THREADS to 1 to prevent the BLAS from doing
threaded calculations. Alternatively, you can use MPI as the parallel
backend (via \emph{doMPI} in place of \emph{doMC} or \emph{doParallel}
-- see Section \ref{sec:Distributed-memory}). You may also be able
to convert your code to use \emph{par\{L,S,\}apply() }{[}with the
default PSOCK type{]} and avoid \emph{foreach} entirely.
\paragraph{Conflict between threaded BLAS and R profiling}
There is also a conflict between threaded BLAS and R profiling, so
if you are using \emph{Rprof()}, you may need to set OMP\_NUM\_THREADS
to one. This has definitely occurred with openBLAS; I'm not sure about
other threaded BLAS libraries.
\paragraph{Speed and threaded BLAS}
In many cases, using multiple threads for linear algebra operations
will outperform using a single thread, but there is no guarantee that
this will be the case, in particular for operations with small matrices
and vectors. Testing with openBLAS suggests that sometimes a job may
take more time when using multiple threads; this seems to be less
likely with ACML. This presumably occurs because openBLAS is not doing
a good job in detecting when the overhead of threading outweights
the gains from distributing the computations. You can compare speeds
by setting OMP\_NUM\_THREADS to different values. In cases where threaded
linear algebra is slower than unthreaded, you would want to set OMP\_NUM\_THREADS
to 1.
Therefore I recommend that you test any large jobs to compare performance
with a single thread vs. multiple threads. Only if you see a substantive
improvement with multiple threads does it make sense to have OMP\_NUM\_THREADS
be greater than one.
\section{Basic shared memory parallel programming in R}
\subsection{foreach}
A simple way to exploit parallelism in R when you have an embarrassingly
parallel problem (one where you can split the problem up into independent
chunks) is to use the \emph{foreach} package to do a for loop in parallel.
For example, bootstrapping, random forests, simulation studies, cross-validation
and many other statistical methods can be handled in this way. You
would not want to use \emph{foreach} if the iterations were not independent
of each other.
The \emph{foreach} package provides a \emph{foreach} command that
allows you to do this easily. \emph{foreach} can use a variety of
parallel ``back-ends''. It can use \emph{Rmpi} to access cores in
a distributed memory setting as discussed in Section \ref{sec:Distributed-memory}
or the \emph{parallel} or \emph{multicore} packages to use shared
memory cores. When using \emph{parallel} or \emph{multicore} as the
back-end (these are equivalent to each other from the user perspective),
you should see multiple processes (as many as you registered; ideally
each at 100\%) when you look at \emph{top}. The multiple processes
are created by forking or using sockets; this is discussed a bit more
later in this document.
<<foreach, eval=FALSE, tidy=FALSE>>=
@
The result of \emph{foreach} will generally be a list, unless \emph{foreach}
is able to put it into a simpler R object. Note that \emph{foreach}
also provides some additional functionality for collecting and managing
the results that mean that you don't have to do some of the bookkeeping
you would need to do if writing your own for loop.
You can debug by running serially using \emph{\%do\%} rather than
\emph{\%dopar\%}. Note that you may need to load packages within the
\emph{foreach} construct to ensure a package is available to all of
the calculations.
\textbf{Caution}: Note that I didn't pay any attention to possible
danger in generating random numbers in separate processes. More on
this issue in the section on RNG (Section \ref{sec:RNG}).
\subsection{parallel apply (parallel package)}
The \emph{parallel} package has the ability to parallelize the various
\emph{apply()} functions (\emph{apply}, \emph{lapply}, \emph{sapply},
etc.) and parallelize vectorized functions. It's a bit hard to find
the \href{http://stat.ethz.ch/R-manual/R-devel/library/parallel/doc/parallel.pdf}{vignette}
for the parallel package because parallel is not listed as one of
the contributed packages on CRAN.
Here are examples of some of the parallel apply-style commands.
<<parallelApply, eval=TRUE, tidy=FALSE, cache=TRUE>>=
@
Note that some R packages can directly interact with the parallelization
packages to work with multiple cores. E.g., the \emph{boot} package
can make use of the \emph{multicore} package directly.
\subsection{Using mcparallel() to manually parallelize individual tasks}
One can use \emph{mcparallel()} in the \emph{parallel} package to
send different chunks of code to different processes. Here we would
need to manage the number of tasks so that we don't have more tasks
than available cores.
<<mcparallel, eval=TRUE, cache=TRUE>>=
@
Note that \emph{mcparallel()} also allows the use of the \emph{mc.set.seed}
argument as with \emph{mclapply()}.
Note that on the cluster, one should create only as many parallel
blocks of code as were requested when submitting the job.
\subsection{Running on the Biostat cluster}
Here's how you would submit an R job that uses shared memory on a
single node (in this case requesting four cores of a maximum of 8
on a given core):
\texttt{qsub -pe smp 4 job.sh}
\emph{job.sh} would have the various lines starting with \#\$ as in
the biostat cluster documentation and your R command would look like:
\texttt{R CMD BATCH -{}-no-save file.R file.out}
You can make use of the UNIX environment variable NSLOTS in your code
to programmatically control the number of the number of cores your
code uses (e.g., with foreach, mclapply, parLapply, etc.). To read
this variable in R do:
\texttt{nCores <- Sys.getenv('NSLOTS')}
\section{Distributed memory\label{sec:Distributed-memory}}
\subsection{Use of MPI as a black box for embarrassingly parallel computation}
\subsubsection{foreach with MPI in R on a single node}
One reason to use Rmpi on a single node is that in some contexts the
threaded BLAS (particularly openBLAS) conflicts with \emph{multicore}/\emph{parallel}'s
forking functionality. This can cause \emph{foreach} to hang when
used with the \emph{doParallel} or \emph{doMC} parallel back ends
(also with \emph{mclapply()}) and OMP\_NUM\_THREADS set to a value
greater than one.
Here's R code for using \emph{Rmpi} as the back-end to \emph{foreach}.
<<Rmpi-foreach-oneNode, cache=TRUE>>=
@
A caution concerning \emph{Rmpi/doMPI}: when you invoke \emph{startMPIcluster()},
all the slave R processes become 100\% active and stay active until
the cluster is closed. In addition, when \emph{foreach} is actually
running, the master process also becomes 100\% active. So using this
functionality involves some inefficiency in CPU usage. This inefficiency
is not seen with a sockets cluster (see next) nor when using other
\emph{Rmpi} functionality - i.e., starting slaves with \emph{mpi.spawn.Rslaves()}
and then issuing commands to the slaves.
\subsubsection{foreach in R on multiple nodes}
\emph{{[}Note that at this point in the demo, we'll switch to use
of a BCE-based EC2 virtual cluster, as we need access to multiple
nodes. Note to self - just do starcluster sshmaster -u oski mycluster
from laptop BCE.{]}}
For this, we need to start R through the \emph{mpirun} command so
that inter-node communication is handled by MPI. The \texttt{-machinefile}
argument tells MPI what machines (nodes) to use for the computation.
\texttt{mpirun -machinefile .hosts -np 4 R CMD BATCH -{}-no-save doMPIexample.R
doMPIexample.Rout}
\texttt{mpirun -machinefile .hosts -np 4 R -{}-no-save \# for interactive
use}
When running on the Biostat cluster you can simply do:
\texttt{mpirun -np 1 R CMD BATCH -{}-no-save doMPIexample.R doMPIexample.Rout}
Here's the R code for using \emph{Rmpi} as the back-end to \emph{foreach}.
If you call \emph{startMPIcluster()} with no arguments, it will start
up one fewer worker processes than the value indicated by the np flag,
so your R code will be more portable. Also you can leave off the -np
and mpirun will start up as many processes as there are lines in the
.hosts file.
<<Rmpi-foreach-multipleNodes, eval=FALSE, cache=TRUE>>=
@
CAUTION: Note that the above with \texttt{-np 4 }should in general
work with \texttt{-np 1} as well (and does on the biostat cluster).
However \texttt{-np 1} does not work in the Amazon cluster nor the
SCF (though it has in the past). I'm in the process of investigating
this.
\subsubsection{Sockets in R example}
One can also set up a cluster via sockets. You just need to specify
a character vector with the machine names as the input to \emph{makeCluster()}.
Here we can just start R as we normally would, without using \emph{mpirun}.
<<sockets-multipleNodes, eval=FALSE, cache=TRUE>>=
@
\subsection{MPI basics}
There are multiple MPI implementations, of which \emph{openMPI} and
\emph{mpich} are very common.
In MPI programming, the same code runs on all the machines. This is
called SPMD (single program, multiple data). As we saw above, one
invokes the same code (same program) multiple times, but the behavior
of the code can be different based on querying the rank (ID) of the
process. Since MPI operates in a distributed fashion, any transfer
of information between processes must be done explicitly via send
and receive calls (e.g., \emph{MPI\_Send}, \emph{MPI\_Recv}, \emph{MPI\_Isend},
and \emph{MPI\_Irecv}). (The ``MPI\_'' is for C code; C++ just has
\emph{Send}, \emph{Recv}, etc.)
The latter two of these functions (\emph{MPI\_Isend} and \emph{MPI\_Irecv})
are so-called non-blocking calls. One important concept to understand
is the difference between blocking and non-blocking calls. Blocking
calls wait until the call finishes, while non-blocking calls return
and allow the code to continue. Non-blocking calls can be more efficient,
but can lead to problems with synchronization between processes.
In addition to send and receive calls to transfer to and from specific
processes, there are calls that send out data to all processes (\emph{MPI\_Scatter}),
gather data back (\emph{MPI\_Gather}) and perform reduction operations
(\emph{MPI\_Reduce}).
Debugging MPI/\emph{Rmpi} code can be tricky because communication
can hang, error messages from the workers may not be seen or readily
accessible and it can be difficult to assess the state of the worker
processes.
\subsubsection{Using Rmpi to write code for distributed computation}
R users can use \emph{Rmpi} to interface with MPI.
To start your R job do the following. Here one requests just a single
process, as R will manage the task of spawning slaves.
\texttt{mpirun -machinefile .hosts -np 1 R CMD BATCH -{}-no-save RmpiExample.R
RmpiExample.Rout}
Here's some example code that uses actual \emph{Rmpi} syntax (as opposed
to \emph{foreach} with Rmpi as the back-end). This code runs in a
master-slave paradigm where the master starts the slaves and invokes
commands on them.
<<Rmpi-usingMPIsyntax, cache=TRUE>>=
@
CAUTION: for some reason the \emph{mpirun} syntax above is not distributing
workers to the other nodes listed in the hosts file on the Amazon
cluster and the SCF, but it should work on the Biostat cluster. This
has worked in the past for me, so I'm not sure what is going on, but
it may be a difference in the MPI version or something in the latest
Ubuntu (Ubuntu 14 vs. Ubuntu 12).
You can use Rmpi on a single node, for example for debugging or prototyping.
In this case you can just start R without using mpirun.
Note that if you do this in interactive mode, some of the usual functionality
of command line R (tab completion, scrolling for history) is not enabled
and errors will cause R to quit. This occurs because passing things
through \emph{mpirun} causes R to think it is not running interactively.
\subparagraph{Note on supercomputers}
Note: in some cases a cluster/supercomputer will be set up so that
\emph{Rmpi} is loaded and the worker processes are already started
when you start R. In this case you wouldn't need to load \emph{Rmpi}
or use \emph{mpi.spawn.Rslaves(). }You can always check \emph{mpi.comm.size()}
to see if the workers are already set up.
\subsubsection{Using \emph{pbdR} for distributed computation}
There is a relatively new effort to enhance R's capability for distributed
memory processing called \emph{pbdR}. \emph{pbdR} is designed for
SPMD processing in batch mode, which means that you start up multiple
processes in a non-interactive fashion using mpirun. The same code
runs in each R process so you need to have the code behavior depend
on the process ID, as we saw with \emph{Rmpi} above.
\emph{pbdR} provides the following capabilities:
\begin{itemize}
\item an alternative to \emph{Rmpi} for interfacing with MPI,
\item the ability to do some parallel apply-style computations, and
\item the ability to do distributed linear algebra by interfacing to \emph{ScaLapack.}
\end{itemize}
Personally, I think the last of the three is the most exciting as
it's a functionality not readily available in R or even more generally
in other readily-accessible software.
\emph{pbdR} is installed on the nodes in our example EC2 cluster,
as detailed in Section \ref{sub:Starting-a-Cluster}.
Let's see some basic examples of using pbdR.
One starts pbdR via mpirun as follows:
\texttt{mpirun -machinefile .hosts -np 4 Rscript file.R > file.out}
\paragraph{Interfacing with MPI}
Here's an example of distributing an embarrassingly parallel calculation
(estimating an integral via Monte Carlo - in this case estimating
the value of $\pi$).
<<pbd-mpi, cache=TRUE, eval=FALSE>>=
@
\paragraph{Parallel apply}
Here's some basic syntax for doing a distributed \emph{apply()} on
a matrix that is on one of the workers (i.e., the matrix is not distributed).
<<pbd-apply, cache=TRUE, eval=FALSE>>=
@
\paragraph{Distributed linear algebra}
And here's how you would set up a distributed matrix and do linear
algebra on it. Note that when working with large matrices, you would
generally want to construct the matrices (or read from disk) in a
parallel fashion rather than creating the full matrix on one worker.
<<pbd-linalg, cache=TRUE, eval=FALSE>>=
@
As a quick, completely non-definitive point of comparison, doing the
crossproduct and Cholesky for the 8192x8192 matrix on 3 EC2 nodes
(2 cores per node) with -np 6 took 39 seconds for each operation,
while doing with two threads on the master node took 64 seconds (crossproduct)
and 23 seconds (Cholesky).
\subsection{Running on the Biostat cluster}
Here's how you would run distributed R jobs based on \emph{mpirun}
on the Biostat cluster
\texttt{qsub -pe orte 12 job.sh}
\emph{job.sh} would have the various lines starting with \#\$ as in
the biostat cluster documentation.
For Rmpi/doMPI your R command inside \emph{job.sh} would look like
this, where R will start the worker processes (hence the \texttt{-np
1})
\texttt{mpirun -np 1 R CMD BATCH file.R file.out}
or for \emph{pbdR} where \emph{mpirun} starts the workers in SPMD
fashion:
\texttt{mpirun -np 12 Rscript file.R > file.out}
In general on the Biostat cluster, you do not need to pass anything
via \texttt{-machinefile} as the queueing software takes care of this.
\section{Using R on Cloud VMs and Cloud-based Clusters}
For Amazon-based machines, we'll use the BCE image as our starting
point and add software to it.
Amazon's service that provides cloud-based virtual machines is called
EC2. To use EC2, you'll need an Amazon account. This might be an account
you pay for with your own credit card, or an account that Biostat
pays for through a grant.
Once you get an account, you'll receive authentication information
in the form of two long strings of letters/numbers, called your access
key ID and secret access key.\textbf{ WARNING: do not share your Amazon
authentication info (AWS\_ACCESS\_KEY\_ID or AWS\_SECRET\_ACCESS\_KEY)
in any public setting, such as a public Github account.} A user in
my class did this and we ended up with thousands of dollars in charges
from hackers starting up instances (probably to mine Bitcoin).
\subsection{Starting an Amazon EC2 Instance}
\paragraph{Point-and-click via the EC2 console}
To start up an EC2 virtual machine based on BCE, follow the instructions
at \href{http://bce.berkeley.edu/install.html}{bce.berkeley.edu/install.html}.
To logon, you'll need to make use of SSH keys. Here's how to get things
going with BCE such that you can logon as the \emph{oski} user. You
obtain the <IP\_address> by clicking on the ``Connect'' button on
the EC2 console.
\begin{verbatim}
# get the modify-for-aws.sh and add-parallel-tools.sh
# scripts from the BCE webpage
# run on the VM to do some setup
ssh -i ~/.ssh/your_private_key_file ubuntu@<IP_address> \
sudo bash modify-for-aws.sh
# now install software for parallel R functionality
ssh -i ~/.ssh/your_private_key_file ubuntu@<IP_address> \
sudo bash add-parallel-tools.sh
# Now you can ssh in directly as the oski user.
ssh -i ~/.ssh/your_private_key_file oski@<IP_address>
\end{verbatim}
Now you can logon to the instance as the \emph{oski} user (or just
do \texttt{su - oski} if you are logged on as root) and do your work.
Once you are logged in to the VM, running parallel R jobs on a single
node should be exactly the same as we've already seen on the BCE VM
running on your laptop.
\paragraph{By scripting}
For those of you in PH290, Aaron Culich of Berkeley Research Computing
has set up a \href{https://github.com/ucberkeley/ph290-bce-2015-spring}{scripted approach}
to start and access an EC2 instance from the command line using a
tool called \emph{vagrant}.
The python \emph{boto} package is also handy for working with EC2.
\subsection{Starting a Cluster of EC2 Instances\label{sub:Starting-a-Cluster}}
We'll use a tool called \href{http://star.mit.edu/cluster/index.html}{Starcluster}
to start EC2 clusters. This manages setting up the networking between
the nodes. You'll need Starcluster installed on your local machine.
The file \emph{starcluster.config} contains some basic settings you
can use to start up an EC2 cluster based on BCE. In particular it
has the AMI IDs for the BCE EC2 images. You'll need to put your own
AWS authentication information in the file (or set these as environment
variables in your shell session). Rename the file as \emph{config}
and put it in a directory called \emph{.starcluster} in your home
directory. Then you can start an EC2 cluster and login as follows:
<<starcluster-start, eval=FALSE, engine='bash'>>=
@
When you start the cluster via \texttt{starcluster start}, you should
see logging information that looks like this, followed by some instructions
about how to restart, stop, and login to your cluster.
\begin{verbatim}
StarCluster - (http://star.mit.edu/cluster) (v. 0.95.6)
Software Tools for Academics and Researchers (STAR)
Please submit bug reports to [email protected]
>>> Validating cluster template settings...
>>> Cluster template settings are valid
>>> Starting cluster...
>>> Launching a 4-node cluster...
>>> Creating security group @sc-mycluster...
Reservation:r-e344eee9
>>> Waiting for instances to propagate...
4/4 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100%
>>> Waiting for cluster to come up... (updating every 30s)
>>> Waiting for all nodes to be in a 'running' state...
4/4 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100%
>>> Waiting for SSH to come up on all nodes...
4/4 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100%
>>> Waiting for cluster to come up took 1.591 mins
>>> The master node is ec2-52-10-188-152.us-west-2.compute.amazonaws.com
>>> Configuring cluster...
>>> Running plugin starcluster.clustersetup.DefaultClusterSetup
>>> Configuring hostnames...
4/4 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100%
>>> Creating cluster user: oski (uid: 1001, gid: 1001)
4/4 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100%
>>> Configuring scratch space for user(s): oski
4/4 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100%
>>> Configuring /etc/hosts on each node
4/4 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100%
>>> Starting NFS server on master
>>> Configuring NFS exports path(s):
/home
>>> Mounting all NFS export path(s) on 3 worker node(s)
3/3 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100%
>>> Setting up NFS took 0.070 mins
>>> Configuring passwordless ssh for root
>>> Configuring passwordless ssh for oski
>>> Configuring cluster took 0.227 mins
>>> Starting cluster took 1.860 mins
\end{verbatim}
The other commands after the \texttt{starcluster start} customize
the EC2 cluster with some parallel R (and Python) tools, as well as
MPI. In particular note that I've created a \emph{.hosts} file for
use with \emph{mpirun}. The host file just contains lines with \emph{master},
\emph{node001}, \emph{node002}, ..., which are the internal names
of the nodes (as named by Starcluster) in the cluster.
\section{Efficiency and Profiling}
\subsection{General tips on efficient R code}
See \href{https://github.com/berkeley-stat243/stat243-fall-2014/blob/master/units/unit6-Rprog.pdf?raw=true}{Section 1 of Unit 6 from Statistics 243}
on efficient R code.
Here are a few high-level principles:
\begin{enumerate}
\item Write vectorized code, not for loops or apply-style statements.
\item Pre-allocate storage rather than allocating into a position in a vector
or list that is beyond the length of that vector/list.
\item Make sure R is linked to a fast BLAS.
\item To get one or more values from a vector or list, use numeric indices
rather than looking up/subsetting by name or boolean values.
\end{enumerate}
\subsection{Benchmarking and profiling}
Again my class notes (see above link) provide more details on the
topics below.
The \emph{system.time()} function allows you to time R code. The rbenchmark
package provides a nice wrapper function, \emph{benchmark()}, that
eases timing of code and comparison of timing of different code chunks.
The \emph{Rprof()} function allows you to see how much time is spent
in different parts of your code, helping to identify bottlenecks.
\emph{Rprof()} works as follows. At set time intervals, it queries
the R process to see what function is being evaluated and writes this
info to disk. Then when the code is finished you can call \emph{summaryRprof()}
to collect those statistics. Note that if your code runs very quickly,
you may not get enough queries accumulated to have useful information.
See the \emph{interval} argument to \emph{Rprof()}. In this case you
may be able to write a loop that repeatedly evaluates your code.
I haven't used it, but Hadley Wickham's \href{https://github.com/hadley/lineprof}{lineprof package}
provides a nice interface that allows you to see timing (as with \emph{Rprof()})
but also memory use by lines of your code. Some details are in \href{http://adv-r.had.co.nz/memory.html\#memory-profiling}{his book}.
\section{RNG \label{sec:RNG}}
The key thing when thinking about random numbers in a parallel context
is that you want to avoid having the same 'random' numbers occur on
multiple processes. On a computer, random numbers are not actually
random but are generated as a sequence of pseudo-random numbers designed
to mimic true random numbers. The sequence is finite (but very long)
and eventually repeats itself. When one sets a seed, one is choosing
a position in that sequence to start from. Subsequent random numbers
are based on that subsequence. All random numbers can be generated
from one or more random uniform numbers, so we can just think about
a sequence of values between 0 and 1.
The worst thing that could happen is that one sets things up in such
a way that every process is using the same sequence of random numbers.
This could happen if you mistakenly set the same seed in each process,
e.g., using \texttt{set.seed(mySeed)} in R on every process.
The naive approach is to use a different seed for each process. E.g.,
if your processes are numbered $id=1,\ldots,p$, with \emph{id} unique
to a process, using \texttt{set.seed(id)} on each process. This is
likely not to cause problems, but raises the danger that two (or more
sequences) might overlap. For an algorithm with dependence on the
full sequence, such as an MCMC, this probably won't cause big problems
(though you likely wouldn't know if it did), but for something like
simple simulation studies, some of your 'independent' samples could
be exact replicates of a sample on another process. Given the period
length of the default generators in R, Matlab and Python, this is
actually quite unlikely, but it is a bit sloppy.
One approach to avoid the problem is to do all your RNG on one process
and distribute the random deviates, but this can be infeasible with
many random numbers.
More generally to avoid this problem, the key is to use an algorithm
that ensures sequences that do not overlap.
\subsection{Ensuring separate sequences in R}
In R, there are two packages that deal with this, \emph{rlecuyer}
and \emph{rsprng}. We'll go over \emph{rlecuyer}, as I've heard that
\emph{rsprng} is deprecated (though there is no evidence of this on
CRAN) and \emph{rsprng} is (at the moment) not available for the Mac.
The L'Ecuyer algorithm has a period of $2^{191}$, which it divides
into subsequences of length $2^{127}$.
\subsubsection{With the parallel package}
Here's how you initialize independent sequences on different processes
when using the \emph{parallel} package's parallel apply functionality
(illustrated here with \emph{parSapply()}.
<<RNG-apply, eval=FALSE>>=
@
If you want to explicitly move from stream to stream, you can use
\emph{nextRNGStream()}. For example:
<<RNGstream, eval=FALSE>>=
@
When using \emph{mclapply()}, you can use the \emph{mc.set.seed} argument
as follows (note that \emph{mc.set.seed} is TRUE by default, so you
should get different seeds for the different processes by default),
but one needs to invoke \texttt{RNGkind(\textquotedbl{}L'Ecuyer-CMRG\textquotedbl{})}
to get independent streams via the L'Ecuyer algorithm.
<<RNG-mclapply, eval=FALSE, tidy=FALSE>>=
@
The documentation for \emph{mcparallel()} gives more information about
reproducibility based on \emph{mc.set.seed}.
\subsubsection{With foreach}
\paragraph*{Getting independent streams}
One question is whether \emph{foreach} deals with RNG correctly. This
is not documented, but the developers (Revolution Analytics) are well
aware of RNG issues. Digging into the underlying code reveals that
the \emph{doMC} and \emph{doParallel} backends both invoke \emph{mclapply()}
and set \emph{mc.set.seed} to TRUE by default. This suggests that
the discussion above r.e. \emph{mclapply()} holds for \emph{foreach}
as well, so you should do \texttt{RNGkind(\textquotedbl{}L'Ecuyer-CMRG\textquotedbl{})}
before your foreach call. For \emph{doMPI}, as of version 0.2, you
can do something like this, which uses L'Ecuyer behind the scenes:\\
\texttt{cl <- makeCluster(nSlots)}~\\
\texttt{setRngDoMPI(cl, seed=0)}
\paragraph*{Ensuring reproducibility}
While using \emph{foreach} as just described should ensure that the
streams on each worker are are distinct, it does not ensure reproducibility
because task chunks may be assigned to workers differently in different
runs and the substreams are specific to workers, not to tasks.
For \emph{doMPI}, you can specify a different RNG substream for each
task chunk in a way that ensures reproducibility. Basically you provide
a list called \emph{.options.mpi} as an argument to \emph{foreach},
with \emph{seed} as an element of the list:
<<RNG-doMPI, eval=TRUE>>=
@
That single seed then initializes the RNG for the first task, and
subsequent tasks get separate substreams, using the L'Ecuyer algorithm,
based on \emph{nextRNGStream()}. Note that the \emph{doMPI} developers
also suggest using the \emph{chunkSize} option (also specified as
an element of \emph{.options.mpi}) when using \emph{seed}. See \texttt{?''doMPI-package''}
for more details.