-
Notifications
You must be signed in to change notification settings - Fork 11
/
parallel-dist.Rmd
683 lines (484 loc) · 32.7 KB
/
parallel-dist.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
Parallel Processing for Distributed Computing in R, Python, MATLAB, and C
==============================================================
Parallelization tools in a distributed memory (multiple machine) context
--------------
Chris Paciorek, Department of Statistics, UC Berkeley
```{r setup, include=FALSE}
library(knitr)
library(stringr)
read_chunk('sockets.R')
read_chunk('doMPI.R')
read_chunk('doSNOW.R')
read_chunk('mpi.parSapply.R')
read_chunk('pbd-apply.R')
read_chunk('pbd-linalg.R')
read_chunk('pbd-mpi.R')
read_chunk('Rmpi.R')
read_chunk('pbd-construct.R')
read_chunk('python-ipyparallel.py')
read_chunk('python-pp.py')
read_chunk('example-mpi.py')
```
# 0) This Tutorial
This tutorial covers strategies for using parallel processing in R, Python, MATLAB (briefly), and C on multiple machines, in which the various processes must interact across a network linking the machines.
This tutorial assumes you have access to two or more servers on which to parallelize your computation, potentially via a Linux cluster managed via scheduling software such as SLURM, and that MPI, R, and Python are installed on the machines.
Alternatively, you may be able to start a virtual cluster on Amazon Web Services using CfnCluster. If using CfnCluster, we recommend using a virtual machine developed here at Berkeley, [the Berkeley Common Environment (BCE)](http://bce.berkeley.edu). BCE is a virtual Linux machine - basically it is a Linux computer that you can run within your own computer, regardless of whether you are using Windows, Mac, or Linux. This provides a common environment so that things behave the same for all of us. Please follow the instructions related to CfnCluster at the [BCE install page](http://bce.berkeley.edu/install.html).
This tutorial assumes you have a working knowledge of either R, Python, or C.
Materials for this tutorial, including the R markdown file and associated code files that were used to create this document are available on Github at (https://github.com/berkeley-scf/tutorial-parallel-distributed). You can download the files by doing a git clone from a terminal window on a UNIX-like machine, as follows:
```{r, clone, eval=FALSE}
git clone https://github.com/berkeley-scf/tutorial-parallel-distributed
```
To create this HTML document, simply compile the corresponding R Markdown file in R as follows (the following will work from within BCE after cloning the repository as above).
```{r, build-html, eval=FALSE}
Rscript -e "library(knitr); knit2html('parallel-dist.Rmd')"
```
This tutorial by Christopher Paciorek is licensed under a Creative Commons Attribution 3.0 Unported License.
# 1) Types of parallel processing
There are two basic flavors of parallel processing (leaving aside
GPUs): distributed memory and shared memory. With shared memory, multiple
processors (which I'll call cores) share the same memory. With distributed
memory, you have multiple nodes, each with their own memory. You can
think of each node as a separate computer connected by a fast network.
## 1.1) Some useful terminology:
- *cores*: We'll use this term to mean the different processing
units available on a single node.
- *nodes*: We'll use this term to mean the different computers,
each with their own distinct memory, that make up a cluster or supercomputer.
- *processes*: computational tasks executing on a machine; multiple
processes may be executing at once. A given program may start up multiple
processes at once. Ideally we have no more processes than cores on
a node.
- *threads*: multiple paths of execution within a single process;
the OS sees the threads as a single process, but one can think of
them as 'lightweight' processes. Ideally when considering the processes
and their threads, we would have no more processes and threads combined
than cores on a node.
- *forking*: child processes are spawned that are identical to
the parent, but with different process IDs and their own memory.
- *sockets*: some of R's parallel functionality involves creating
new R processes (e.g., starting processes via *Rscript*) and
communicating with them via a communication technology called sockets.
## 1.2) Distributed memory and an overview of the topics in this tutorial
Parallel programming for distributed memory parallelism requires passing
messages between the different nodes. The standard protocol for doing
this is MPI, of which there are various versions, including *openMPI*, which we'll use here.
The R package *Rmpi* implements MPI in R. The *pbdR* packages for R also implement MPI as well as distributed linear algebra.
Python has a package *mpi4py* that allows use of MPI within Python.
In both R and Python, there are also easy ways to do embarrassingly parallel calculations (such as simple parallel for loops) across multiple machines, with MPI and similar tools used behind the scenes to manage the worker processes.
MATLAB has its own system for distributed computation, called the Distributed Computing Server (DCS), requiring additional licensing above the standard MATLAB installation.
This tutorial will cover:
- simple parallelization of embarrassingly parallel computations (in R, Python, and MATLAB) without writing code that explicitly uses MPI;
- distributed linear algebra using the pbdR front-end to the *ScaLapack* package; and
- using MPI explicitly (in R, Python and C).
## 1.3) Other type of parallel processing
We won't cover any of these in this material.
### Shared memory parallelization
For shared memory parallelism, each core is accessing the same memory
so there is no need to pass information (in the form of messages)
between different machines. But in some programming contexts one needs
to be careful that activity on different cores doesn't mistakenly
overwrite places in memory that are used by other cores. Threading is a form of shared memory parallelism.
This tutorial will not cover shared memory parallelization, as it is covered in [a separate tutorial](https://github.com/berkeley-scf/tutorial-parallel-basics).
For information about working with random numbers in a parallel computation, please see that same tutorial, as the discussion applies to both shared and distributed memory.
### GPUs
GPUs (Graphics Processing Units) are processing units originally designed
for rendering graphics on a computer quickly. This is done by having
a large number of simple processing units for massively parallel calculation.
The idea of general purpose GPU (GPGPU) computing is to exploit this
capability for general computation. In spring 2016, I gave a [workshop on using GPUs](http://statistics.berkeley.edu/computing/gpu).
Most researchers don't program for a GPU directly but rather use software (often machine learning software such as Tensorflow or Caffe) that has been programmed to take advantage of a GPU if one is available.
### Spark and Hadoop
Spark and Hadoop are systems for implementing computations in a distributed
memory environment, using the MapReduce approach.
# 2) Starting MPI-based jobs
Code that explicitly uses MPI, as well as code using MPI under the hood, such as *foreach* with *doMPI* in R and pbdR, requires that you start your process(es) in a special way via the *mpirun* command. Note that *mpirun*, *mpiexec* and *orterun* are synonyms under *openMPI*.
The basic requirements for starting such a job are that you specify the number of processes you want to run and that you indicate what machines those processes should run on. Those machines should be networked together such that MPI can ssh to the various machines without any password required.
# 2.1) Running an MPI job under SLURM
There are two ways to tell *mpirun* the machines on which to run the worker processes.
First, we can pass the machine names directly, replicating the name
if we want multiple processes on a single machine. In the example here, these are machines accessible to me, and you would need to replace those names with the names of machines you have access to. You'll need to [set up SSH keys](http://statistics.berkeley.edu/computing/sshkeys) so that you can access the machines without a password.
```{r, mpirun1, engine='bash'}
mpirun --host smeagol,radagast,arwen,arwen -np 4 hostname
```
Alternatively, we can create a file with the relevant information.
```{r, mpirun2, engine='bash'}
echo 'smeagol slots=1' > .hosts
echo 'radagast slots=1' >> .hosts
echo 'arwen slots=2' >> .hosts
mpirun -machinefile .hosts -np 4 hostname
```
**If you are running your code as part of a job submitted to SLURM, you generally won't need to pass the *machinefile* or *np* arguments as MPI will get that information from SLURM.** So you can simply do:
```
mpirun hostname
```
Note that on a CfnCluster-based EC2 VM, you could run your job through SLURM, or you can directly use the node names, which can be seen by invoking `sinfo` and looking at the *NODELIST* column.
To limit the number of threads for each process, we can tell *mpirun*
to export the value of *OMP_NUM_THREADS* to the processes. E.g., calling a C program, *quad_mpi*:
```
export OMP_NUM_THREADS=2
mpirun -machinefile .hosts -np 4 -x OMP_NUM_THREADS quad_mpi
```
In the examples above, I illustrated with a simple bash command (hostname) and with a compiled C program, but one would similarly
use the -machinefile flag when starting R or Python or a C program via mpirun.
There are additional details involved in carefully controlling how processes are allocated to nodes, but the default arguments for mpirun should do a reasonable job in many situations.
Also, I've had inconsistent results in terms of having the correct number of workers start up on each of the machines specified, depending on whether I specify the number of workers implicitly via the hosts information (without specifying -np), explicitly via -np or both. You may want to check that the right number of workers is running on each host.
# 3) Basic parallelization across nodes
Here we'll see the use of high-level packages in R, Python, and MATLAB that hide the details of communication between nodes.
## 3.1) R
### 3.1.1) *foreach* with the *doMPI* and *doSNOW* backends
Just as we used *foreach* in a shared memory context, we can
use it in a distributed memory context as well, and R will handle
everything behind the scenes for you.
#### *doMPI*
Start R through the *mpirun* command as discussed above, either
as a batch job or for interactive use. We'll only ask for 1 process
because the worker processes will be started automatically from within R (but using the machine names information passed to mpirun).
```
mpirun -machinefile .hosts -np 1 R CMD BATCH -q --no-save doMPI.R doMPI.out
mpirun -machinefile .hosts -np 1 R --no-save
```
Here's R code for using *Rmpi* as the back-end to *foreach*.
If you call *startMPIcluster* with no arguments, it will start
up one fewer worker processes than the number of hosts times slots given to mpirun
so your R code will be more portable.
```{r, doMPI, eval=FALSE, cache=TRUE}
```
```{r, doMPI-test, engine='bash'}
mpirun -machinefile .hosts -np 1 R CMD BATCH -q --no-save doMPI.R doMPI.out
cat doMPI.out
```
A caution concerning Rmpi/doMPI: when you invoke `startMPIcluster()`,
all the slave R processes become 100% active and stay active until
the cluster is closed. In addition, when *foreach* is actually
running, the master process also becomes 100% active. So using this
functionality involves some inefficiency in CPU usage. This inefficiency
is not seen with a sockets cluster (Section 3.1.4) nor when using other
Rmpi functionality - i.e., starting slaves with *mpi.spawn.Rslaves*
and then issuing commands to the slaves.
If you specified `-np` with more than one process then as with the C-based
MPI job above, you can control the threading via OMP_NUM_THREADS
and the -x flag to *mpirun*. Note that this only works when the
R processes are directly started by *mpirun*, which they are
not if you set -np 1. The *maxcores* argument to *startMPIcluster()*
does not seem to function (perhaps it does on other systems).
Sidenote: You can use *doMPI* on a single node, which might be useful for avoiding
some of the conflicts between R's forking functionality and openBLAS that
can cause R to hang when using *foreach* with *doParallel*.
#### *doSNOW*
The *doSNOW* backend has the advantage that it doesn't need to have MPI installed on the system. MPI can be tricky to install and keep working, so this is an easy approach to using *foreach* across multiple machines.
Simply start R as you usually would.
Here's R code for using *doSNOW* as the back-end to *foreach*. Make sure to use the `type = "SOCK"` argument or *doSNOW* will actually use MPI behind the scenes.
```{r, doSNOW, eval=FALSE, cache=TRUE}
```
#### Loading packages and accessing variables within your parallel tasks
When using *foreach* with multiple machines, you need to use the *.packages* argument (or load the package in the code being run in parallel) to load any packages needed in the code. You do not need to explicitly export variables from the master process to the workers. Rather, *foreach* determines which variables in the global environment of the master process are used in the code being run in parallel and makes copies of those in each worker process. Note that these variables are read-only on the workers and cannot be modified (if you try to do so, you'll notice that *foreach* actually did not make copies of the variables that your code tries to modify).
### 3.1.2) Using pbdR
There is a project to enhance R's capability for distributed
memory processing called [pbdR](http://r-pbd.org). For an extensive tutorial, see the
[pbdDEMO vignette](https://github.com/wrathematics/pbdDEMO/blob/master/inst/doc/pbdDEMO-guide.pdf?raw=true).
*pbdR* is designed for
SPMD processing in batch mode, which means that you start up multiple
processes in a non-interactive fashion using mpirun. The same code
runs in each R process so you need to have the code behavior depend
on the process ID.
*pbdR* provides the following capabilities:
- the ability to do some parallel apply-style computations (this section),
- the ability to do distributed linear algebra by interfacing to *ScaLapack* (see Section 4), and
- an alternative to *Rmpi* for interfacing with MPI (see Section 5).
Personally, I think the second of the three is the most exciting as
it's a functionality not readily available in R or even more generally
in other readily-accessible software.
Let's see parallel-apply style computations in pbdR.
Here's some basic syntax for doing a distributed *apply()* on
a matrix that is on one of the workers. So in this case, the matrix is not initially distributed to the workers -- that is done as part of the *pbdApply* computation. (One can also use *pbdApply* on matrices that are already distributed, and this is of course recommended for large matrices -- see Section 4.)
As mentioned above, pbdR code is always run in batch mode, with the same code running on all of the processes. This means that you often need to explicitly build in logic about which process should execute a given piece of code, including print statements. Here the check for `comm.rank() == 0` allows us to only create the matrix and call some print statements on the master node (rank 0).
```{r, pbd-apply, cache=TRUE, eval=FALSE}
```
```{r, pbd-apply-example, engine='bash'}
mpirun -machinefile .hosts -np 4 Rscript pbd-apply.R > pbd-apply.out
cat pbd-apply.out
```
In this case it's a fair amount slower to parallelize the calculation than just to do it in R using *rowSums()*, because of the overhead of communication (including passing the data) with the workers.
### 3.1.3) Using parallel apply functionality in Rmpi
*Rmpi* is a package that provides MPI capabilities from R, including low-level MPI type calls (see Section 5). It also provides high-level wrapper functions that use MPI behind the scenes, including parallel apply functionality for operating on lists (and vectors) with functions such as *mpi.parSapply*.
The documentation (see `help(mpi.parSapply)`) documents a number of confusingly-named functions. It appears that they are basically multi-node versions of the analogous *parSapply* and related functions.
```{r, mpi.parSapply, eval=FALSE}
```
```{r, mpi.parSapply-example, engine='bash'}
mpirun -machinefile .hosts -np 1 R CMD BATCH -q --no-save mpi.parSapply.R mpi.parSapply.out
cat mpi.parSapply.out
```
In some cases, it may be useful to specify *job.num* when the number of tasks is bigger than the number of worker processes to ensure load-balancing.
### 3.1.4) Using sockets
One can also set up a cluster with the worker processes communicating via sockets. You just need to specify
a character vector with the machine names as the input to *makeCluster()*. A nice thing about this is that it doesn't involve any of the complications of working with needing MPI installed.
```{r, sockets, cache=TRUE}
```
Note the use of *clusterExport*, needed to make variables in the master process available to the workers; this involves making a copy of each variable for each worker process. You'd also need to load any packages used in the code being run in parallel in that code.
### 3.1.5) The *partools* package
*partools* is a somewhat new package developed by Norm Matloff at UC-Davis. He has the perspective that Spark/Hadoop are not the right tools in many cases when doing statistics-related work and has developed some simple tools for parallelizing computation across multiple nodes, also referred to as *Snowdoop*. The tools make use of the key idea in Hadoop of a distributed file system and distributed data objects but avoid the complications of trying to ensure fault tolerance, which is critical only on very large clusters of machines.
I won't go into details, but *partools* allows you to split up your data across multiple nodes and then read the data into R in parallel across R sessions running on those nodes, all controlled from a single master R session. You can then do operations on the subsets and gather results back to the master session as needed. One point that confused me in the *partools* vignette is that it shows how to split up a dataset that you can read into your R session, but it's not clear what one does if the dataset is too big to read into a single R session.
## 3.2) Python
### 3.2.1) IPython parallel
One can use IPython's parallelization tools in a context with multiple nodes, though the setup to get the worker processes is a bit more involved when you have multiple nodes. For details on using IPython parallel on a single node, see the [parallel basics tutorial appendix](https://github.com/berkeley-scf/tutorial-parallel-basics).
If we are using the SLURM scheduling software, here's how we start up the worker processes:
```{r, ipyparallel-setup, engine='bash', eval=FALSE}
ipcontroller --ip='*' &
sleep 25
# next line will start as many ipengines as we have SLURM tasks
# because srun is a SLURM command
srun ipengine &
sleep 45 # wait until all engines have successfully started
```
We can then run IPython to split up our computational tasks across the engines.
```{r, ipyparallel, engine='python', eval=FALSE}
```
To finish up, we need to shut down the cluster of workers:
```{r, engine='bash', eval=FALSE}
ipcluster stop
```
To start the engines in a context outside of using slurm (provided all machines share a filesystem), you should be able ssh to each machine and run `ipengine &` for as many worker processes as you want to start as follows. In some, but not all cases (depending on how the network is set up) you may not need the `--location` flag.
```{r, ipyparallel-setup2, engine='bash', eval=FALSE}
ipcontroller --ip='*' --location=URL_OF_THIS_MACHINE &
sleep 25
nengines=8
ssh other_host "for (( i = 0; i < ${nengines}; i++ )); do ipengine & done"
sleep 45 # wait until all engines have successfully started
```
### 3.2.2) *pp* package
Another way to parallelize across multiple nodes that uses more manual setup and doesn't integrate as well with scheduling software like SLURM is to use the pp package (also useful for parallelizing on a single machine as discussed in the [parallel basics tutorial appendix](https://github.com/berkeley-scf/tutorial-parallel-basics).
Assuming that the pp package is installed on each node (e.g., `sudo apt-get install python-pp` on an Ubuntu machine), you need to start up a ppserver process on each node. E.g., if `$nodes` is a UNIX environment variable containing the names of the worker nodes and you want to start 2 workers per node:
```{r, pp-start, engine='bash', eval=FALSE}
nodes='smeagol radagast beren arwen'
for node in $nodes; do
# cd /tmp is because of issue with starting ppserver in home directory
# -w says how many workers to start on the node
ssh $node "cd /tmp && ppserver -s mysecretphrase -t 120 -w 2 &" &
done
```
Now in our Python code we create a server object and submit jobs to the server object, which manages the farming out of the tasks. Note that this will run interactively in IPython or as a script from UNIX, but there have been times where I was not able to run it interactively in the base Python interpreter. Also note that while we are illustrating this as basically another parallelized for loop, the individual jobs can be whatever calculations you want, so the function (in this case it's always *pi.sample*) could change from job to job.
```{r, python-pp, engine='python', eval=FALSE}
```
```{r, python-pp-example, engine='bash', eval=FALSE}
python python-pp.py > python-pp.out
cat python-pp.out
```
```
['smeagol', 'radagast', 'beren', 'arwen', 'smeagol', 'radagast', 'beren', 'arwen']
Pi is roughly 3.141567
Time elapsed: 32.0389587879
```
The -t flag used when starting ppserver should ensure that the server processes are removed, but if you need to do it manually, this should work:
```{r, pp-stop, engine='bash', eval=FALSE}
for node in $nodes; do
killall ppserver
done
```
## 3.3) MATLAB
To use MATLAB across multiple nodes, you need to have the MATLAB Distributed Computing Server (DCS). If it is installed, one can set up MATLAB so that *parfor* will distribute its work across multiple nodes. Details may vary depending on how DCS is installed on your system.
# 4) Distributed linear algebra in R using pbdR
## 4.1) Distributed linear algebra example
And here's how you would set up a distributed matrix and do linear
algebra on it. Note that when working with large matrices, you would
generally want to construct the matrices (or read from disk) in a
parallel fashion rather than creating the full matrix on one worker.
For simplicity in the example, I construct the matrix, *x*, on the master
and then create the distributed version of the matrix, *dx*, with *as.ddmatrix*.
Here's the code in *pbd-linalg.R*.
```{r, pbd-linalg, eval=FALSE}
```
As before we run the job in batch mode via mpirun:
```{r, pbd-linalg-example, engine='bash', cache=TRUE}
export OMP_NUM_THREADS=1
mpirun -machinefile .hosts -np 4 -x OMP_NUM_THREADS Rscript pbd-linalg.R > pbd-linalg.out
cat pbd-linalg.out
```
You may want to set the *bldim* argument to *as.ddmatrix*. That determines
the size of the submatrices (aka 'blocks') into which the overall matrix is split. Generally, multiple
submatrices are owned by an individual worker process. For example, to use 100x100
blocks, you'd have
```
dx <- as.ddmatrix(x, bldim = c(100, 100))
```
In general, you don't
want the blocks too big as the work may not be well load-balanced, or too small as
that may have a higher computational cost in terms of latency and communication.
My experiments suggest that it's worth exploring block sizes of 10x10 through 1000x1000 (if you have square matrices).
As a quick, completely non-definitive point of comparison, doing the
crossproduct and Cholesky for the 8192x8192 matrix on 3 EC2 nodes
(2 cores per node) with -np 6 took 39 seconds for each operation,
while doing with two threads on the master node took 64 seconds (crossproduct)
and 23 seconds (Cholesky). While that is a single test, some other experiments
I've done also haven't show much speedup in using multiple nodes with pbdR compared
to simply using a threaded BLAS on one machine. So you may need to get fairly big matrices
that won't fit in memory on a single machine before it's worthwhile
to do the computation in distributed fashion using pbdR.
## 4.2) Constructing a distributed matrix on parallel
pbdR has functionality for reading in parallel from a parallel file
system such as Lustre (available on Berkeley's Savio cluster). Things
are bit more complicated if that's not the case. Here's some code that
illustrates how to construct a distributed matrix from constituent column blocks.
First create a distributed version of the
matrix using a standard R matrix with each process owning a block of
columns (I haven't yet gotten the syntax to work for blocks of rows). Then create a
pbd version of that distributed matrix and finally convert the
distributed matrix to a standard pbd block structure on which the
linear algebra can be done efficiently.
```{r, pbd-construct, eval=FALSE}
```
The code above creates the submatrices within the R sessions, but one could also read in from separate files, one per process.
The code in *redistribute-test.R* demonstrates that constructing the full matrix
from column-wise blocks with this syntax works correctly.
# 5) MPI
## 5.1) MPI Overview
There are multiple MPI implementations, of which *openMPI* and
*mpich* are very common. *openMPI* is quite common, and we'll use that.
In MPI programming, the same code runs on all the machines. This is
called SPMD (single program, multiple data). As we saw a bit with the pbdR code, one
invokes the same code (same program) multiple times, but the behavior
of the code can be different based on querying the rank (ID) of the
process. Since MPI operates in a distributed fashion, any transfer
of information between processes must be done explicitly via send
and receive calls (e.g., *MPI_Send*, *MPI_Recv*, *MPI_Isend*,
and *MPI_Irecv*). (The ``MPI_'' is for C code; C++ just has
*Send*, *Recv*, etc.)
The latter two of these functions (*MPI_Isend* and *MPI_Irecv*)
are so-called non-blocking calls. One important concept to understand
is the difference between blocking and non-blocking calls. Blocking
calls wait until the call finishes, while non-blocking calls return
and allow the code to continue. Non-blocking calls can be more efficient,
but can lead to problems with synchronization between processes.
In addition to send and receive calls to transfer to and from specific
processes, there are calls that send out data to all processes (*MPI_Scatter*),
gather data back (*MPI_Gather*) and perform reduction operations
(*MPI_Reduce*).
Debugging MPI code can be tricky because communication
can hang, error messages from the workers may not be seen or readily
accessible, and it can be difficult to assess the state of the worker
processes.
## 5.2) Basic syntax for MPI in C
Here's a basic hello world example The code is also in *mpiHello.c*.
```
// see mpiHello.c
#include <stdio.h>
#include <math.h>
#include <mpi.h>
int main(int argc, char* argv) {
int myrank, nprocs, namelen;
char process_name[MPI_MAX_PROCESSOR_NAME];
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
MPI_Get_processor_name(process_name, &namelen);
printf("Hello from process %d of %d on %s\n",
myrank, nprocs, process_name);
MPI_Finalize();
return 0;
}
```
There are C (*mpicc*) and C++ (*mpic++*) compilers for MPI programs (*mpicxx* and *mpiCC* are synonyms).
I'll use the MPI C++ compiler
even though the code is all plain C code.
```{r, change-hosts1, engine='bash', include=FALSE}
# change hosts back so pdf shows original hosts
echo 'smeagol slots=1' > .hosts
echo 'radagast slots=1' >> .hosts
echo 'arwen slots=2' >> .hosts
```
```{r, mpiHello, engine = 'bash'}
mpicxx mpiHello.c -o mpiHello
cat .hosts # what hosts do I expect it to run on?
mpirun -machinefile .hosts -np 4 mpiHello
```
To actually write real MPI code, you'll need to go learn some of the
MPI syntax. See *quad_mpi.c* and *quad_mpi.cpp*, which
are example C and C++ programs (for approximating an integral via
quadrature) that show some of the basic MPI functions. Compilation
and running are as above:
```{r, quad_mpi, engine = 'bash'}
mpicxx quad_mpi.cpp -o quad_mpi
mpirun -machinefile .hosts -np 4 quad_mpi
```
## 5.3) Using MPI from R via Rmpi or pbdR
### 5.3.1) Rmpi
R users can use Rmpi to interface with MPI.
Here's some example code that uses actual Rmpi syntax (as opposed
to *foreach* with Rmpi as the back-end, where the use of Rmpi was hidden from us).
The syntax is very similar to the MPI C syntax we've already seen.
This code runs in a master-slave paradigm where the master starts
the slaves and invokes commands on them. It may be possible to run
Rmpi in a context where each process runs the same code based
on invoking with Rmpi, but I haven't investigated this further.
```{r, Rmpi, eval=FALSE}
```
*mpi.bcast.cmd* and *mpi.remote.exec* are quite similar - they execute a function on the workers and can also use arguments on the master as inputs to the function evaluated on the workers (see the ... argument). *mpi.remote.exec* can return the results of the execution to the master.
As before, we would start R via *mpirun*, requesting one process, since the workers are started within R via *mpi.spawn.Rslaves*.
```{r, Rmpi-example, engine='bash'}
mpirun -machinefile .hosts -np 1 R CMD BATCH -q --no-save Rmpi.R Rmpi.out
cat Rmpi.out
```
Note that if you do this in interactive mode, some of the usual functionality
of command line R (tab completion, scrolling for history) is not enabled
and errors will cause R to quit. This occurs because passing things
through *mpirun* causes R to think it is not running interactively.
Note: in some cases a cluster/supercomputer will be set up so that
*Rmpi* is loaded and the worker processes are already started
when you start R. In this case you wouldn't need to load *Rmpi*
or use *mpi.spawn.Rslaves*. You can always run `mpi.comm.size()` to see how
many workers are running.
### 5.3.2) pbdMPI in pbdR
Here's an example of distributing an embarrassingly parallel calculation
(estimating an integral via Monte Carlo - in this case estimating
the value of pi).
```{r, pbd-mpi, eval=FALSE}
```
```{r, pbd-mpi-example, engine='bash', cache=TRUE}
mpirun -machinefile .hosts -np 4 Rscript pbd-mpi.R > pbd-mpi.out
cat pbd-mpi.out
```
## 5.4) Using MPI from Python via mpi4py
Here's some basic use of MPI within Python.
```{r, mpi4py, engine='python', eval=FALSE}
```
To run the code, we start Python through the mpirun command as done previously.
```{r, mpi4py-example, engine = 'bash'}
mpirun -machinefile .hosts -np 4 python example-mpi.py
```
More generally, you can send, receive, broadcast, gather, etc. as with MPI itself.
*mpi4py* generally does not work interactively.
# 6) Parallelization strategies
The following are some basic principles/suggestions for how to parallelize
your computation.
Should I use one machine/node or many machines/nodes?
- If you can do your computation on the cores of a single node using
shared memory, that will be faster than using the same number of cores
(or even somewhat more cores) across multiple nodes. Similarly, jobs
with a lot of data/high memory requirements that one might think of
as requiring Spark or Hadoop may in some cases be much faster if you can find
a single machine with a lot of memory.
- That said, if you would run out of memory on a single node, then you'll
need to use distributed memory.
What level or dimension should I parallelize over?
- If you have nested loops, you generally only want to parallelize at
one level of the code. That said, there may be cases in which it is
helpful to do both. Keep in mind whether your linear algebra is being
threaded. Often you will want to parallelize over a loop and not use
threaded linear algebra.
- Often it makes sense to parallelize the outer loop when you have nested
loops.
- You generally want to parallelize in such a way that your code is
load-balanced and does not involve too much communication.
How do I balance communication overhead with keeping my cores busy?
- If you have very few tasks, particularly if the tasks take different
amounts of time, often some of the processors will be idle and your code
poorly load-balanced.
- If you have very many tasks and each one takes little time, the communication
overhead of starting and stopping the tasks will reduce efficiency.
Should multiple tasks be pre-assigned to a process (i.e., a worker) (sometimes called *prescheduling*) or should tasks
be assigned dynamically as previous tasks finish?
- Basically if you have many tasks that each take similar time, you
want to preschedule the tasks to reduce communication. If you have few tasks
or tasks with highly variable completion times, you don't want to
preschedule, to improve load-balancing.
- For R in particular, some of R's parallel functions allow you to say whether the
tasks should be prescheduled. E.g., `library(Rmpi); help(mpi.parSapply)` gives some information.