-
Notifications
You must be signed in to change notification settings - Fork 11
/
parallel-dist.html
1430 lines (1111 loc) · 64.2 KB
/
parallel-dist.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
<!DOCTYPE html>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
<title>Parallel Processing for Distributed Computing in R, Python, Matlab, and C</title>
<script type="text/javascript">
window.onload = function() {
var imgs = document.getElementsByTagName('img'), i, img;
for (i = 0; i < imgs.length; i++) {
img = imgs[i];
// center an image if it is the only element of its parent
if (img.parentElement.childElementCount === 1)
img.parentElement.style.textAlign = 'center';
}
};
</script>
<!-- Styles for R syntax highlighter -->
<style type="text/css">
pre .operator,
pre .paren {
color: rgb(104, 118, 135)
}
pre .literal {
color: #990073
}
pre .number {
color: #099;
}
pre .comment {
color: #998;
font-style: italic
}
pre .keyword {
color: #900;
font-weight: bold
}
pre .identifier {
color: rgb(0, 0, 0);
}
pre .string {
color: #d14;
}
</style>
<!-- R syntax highlighter -->
<script type="text/javascript">
var hljs=new function(){function m(p){return p.replace(/&/gm,"&").replace(/</gm,"<")}function f(r,q,p){return RegExp(q,"m"+(r.cI?"i":"")+(p?"g":""))}function b(r){for(var p=0;p<r.childNodes.length;p++){var q=r.childNodes[p];if(q.nodeName=="CODE"){return q}if(!(q.nodeType==3&&q.nodeValue.match(/\s+/))){break}}}function h(t,s){var p="";for(var r=0;r<t.childNodes.length;r++){if(t.childNodes[r].nodeType==3){var q=t.childNodes[r].nodeValue;if(s){q=q.replace(/\n/g,"")}p+=q}else{if(t.childNodes[r].nodeName=="BR"){p+="\n"}else{p+=h(t.childNodes[r])}}}if(/MSIE [678]/.test(navigator.userAgent)){p=p.replace(/\r/g,"\n")}return p}function a(s){var r=s.className.split(/\s+/);r=r.concat(s.parentNode.className.split(/\s+/));for(var q=0;q<r.length;q++){var p=r[q].replace(/^language-/,"");if(e[p]){return p}}}function c(q){var p=[];(function(s,t){for(var r=0;r<s.childNodes.length;r++){if(s.childNodes[r].nodeType==3){t+=s.childNodes[r].nodeValue.length}else{if(s.childNodes[r].nodeName=="BR"){t+=1}else{if(s.childNodes[r].nodeType==1){p.push({event:"start",offset:t,node:s.childNodes[r]});t=arguments.callee(s.childNodes[r],t);p.push({event:"stop",offset:t,node:s.childNodes[r]})}}}}return t})(q,0);return p}function k(y,w,x){var q=0;var z="";var s=[];function u(){if(y.length&&w.length){if(y[0].offset!=w[0].offset){return(y[0].offset<w[0].offset)?y:w}else{return w[0].event=="start"?y:w}}else{return y.length?y:w}}function t(D){var A="<"+D.nodeName.toLowerCase();for(var B=0;B<D.attributes.length;B++){var C=D.attributes[B];A+=" "+C.nodeName.toLowerCase();if(C.value!==undefined&&C.value!==false&&C.value!==null){A+='="'+m(C.value)+'"'}}return A+">"}while(y.length||w.length){var v=u().splice(0,1)[0];z+=m(x.substr(q,v.offset-q));q=v.offset;if(v.event=="start"){z+=t(v.node);s.push(v.node)}else{if(v.event=="stop"){var p,r=s.length;do{r--;p=s[r];z+=("</"+p.nodeName.toLowerCase()+">")}while(p!=v.node);s.splice(r,1);while(r<s.length){z+=t(s[r]);r++}}}}return z+m(x.substr(q))}function j(){function q(x,y,v){if(x.compiled){return}var u;var s=[];if(x.k){x.lR=f(y,x.l||hljs.IR,true);for(var w in x.k){if(!x.k.hasOwnProperty(w)){continue}if(x.k[w] instanceof Object){u=x.k[w]}else{u=x.k;w="keyword"}for(var r in u){if(!u.hasOwnProperty(r)){continue}x.k[r]=[w,u[r]];s.push(r)}}}if(!v){if(x.bWK){x.b="\\b("+s.join("|")+")\\s"}x.bR=f(y,x.b?x.b:"\\B|\\b");if(!x.e&&!x.eW){x.e="\\B|\\b"}if(x.e){x.eR=f(y,x.e)}}if(x.i){x.iR=f(y,x.i)}if(x.r===undefined){x.r=1}if(!x.c){x.c=[]}x.compiled=true;for(var t=0;t<x.c.length;t++){if(x.c[t]=="self"){x.c[t]=x}q(x.c[t],y,false)}if(x.starts){q(x.starts,y,false)}}for(var p in e){if(!e.hasOwnProperty(p)){continue}q(e[p].dM,e[p],true)}}function d(B,C){if(!j.called){j();j.called=true}function q(r,M){for(var L=0;L<M.c.length;L++){if((M.c[L].bR.exec(r)||[null])[0]==r){return M.c[L]}}}function v(L,r){if(D[L].e&&D[L].eR.test(r)){return 1}if(D[L].eW){var M=v(L-1,r);return M?M+1:0}return 0}function w(r,L){return L.i&&L.iR.test(r)}function K(N,O){var M=[];for(var L=0;L<N.c.length;L++){M.push(N.c[L].b)}var r=D.length-1;do{if(D[r].e){M.push(D[r].e)}r--}while(D[r+1].eW);if(N.i){M.push(N.i)}return f(O,M.join("|"),true)}function p(M,L){var N=D[D.length-1];if(!N.t){N.t=K(N,E)}N.t.lastIndex=L;var r=N.t.exec(M);return r?[M.substr(L,r.index-L),r[0],false]:[M.substr(L),"",true]}function z(N,r){var L=E.cI?r[0].toLowerCase():r[0];var M=N.k[L];if(M&&M instanceof Array){return M}return false}function F(L,P){L=m(L);if(!P.k){return L}var r="";var O=0;P.lR.lastIndex=0;var M=P.lR.exec(L);while(M){r+=L.substr(O,M.index-O);var N=z(P,M);if(N){x+=N[1];r+='<span class="'+N[0]+'">'+M[0]+"</span>"}else{r+=M[0]}O=P.lR.lastIndex;M=P.lR.exec(L)}return r+L.substr(O,L.length-O)}function J(L,M){if(M.sL&&e[M.sL]){var r=d(M.sL,L);x+=r.keyword_count;return r.value}else{return F(L,M)}}function I(M,r){var L=M.cN?'<span class="'+M.cN+'">':"";if(M.rB){y+=L;M.buffer=""}else{if(M.eB){y+=m(r)+L;M.buffer=""}else{y+=L;M.buffer=r}}D.push(M);A+=M.r}function G(N,M,Q){var R=D[D.length-1];if(Q){y+=J(R.buffer+N,R);return false}var P=q(M,R);if(P){y+=J(R.buffer+N,R);I(P,M);return P.rB}var L=v(D.length-1,M);if(L){var O=R.cN?"</span>":"";if(R.rE){y+=J(R.buffer+N,R)+O}else{if(R.eE){y+=J(R.buffer+N,R)+O+m(M)}else{y+=J(R.buffer+N+M,R)+O}}while(L>1){O=D[D.length-2].cN?"</span>":"";y+=O;L--;D.length--}var r=D[D.length-1];D.length--;D[D.length-1].buffer="";if(r.starts){I(r.starts,"")}return R.rE}if(w(M,R)){throw"Illegal"}}var E=e[B];var D=[E.dM];var A=0;var x=0;var y="";try{var s,u=0;E.dM.buffer="";do{s=p(C,u);var t=G(s[0],s[1],s[2]);u+=s[0].length;if(!t){u+=s[1].length}}while(!s[2]);if(D.length>1){throw"Illegal"}return{r:A,keyword_count:x,value:y}}catch(H){if(H=="Illegal"){return{r:0,keyword_count:0,value:m(C)}}else{throw H}}}function g(t){var p={keyword_count:0,r:0,value:m(t)};var r=p;for(var q in e){if(!e.hasOwnProperty(q)){continue}var s=d(q,t);s.language=q;if(s.keyword_count+s.r>r.keyword_count+r.r){r=s}if(s.keyword_count+s.r>p.keyword_count+p.r){r=p;p=s}}if(r.language){p.second_best=r}return p}function i(r,q,p){if(q){r=r.replace(/^((<[^>]+>|\t)+)/gm,function(t,w,v,u){return w.replace(/\t/g,q)})}if(p){r=r.replace(/\n/g,"<br>")}return r}function n(t,w,r){var x=h(t,r);var v=a(t);var y,s;if(v){y=d(v,x)}else{return}var q=c(t);if(q.length){s=document.createElement("pre");s.innerHTML=y.value;y.value=k(q,c(s),x)}y.value=i(y.value,w,r);var u=t.className;if(!u.match("(\\s|^)(language-)?"+v+"(\\s|$)")){u=u?(u+" "+v):v}if(/MSIE [678]/.test(navigator.userAgent)&&t.tagName=="CODE"&&t.parentNode.tagName=="PRE"){s=t.parentNode;var p=document.createElement("div");p.innerHTML="<pre><code>"+y.value+"</code></pre>";t=p.firstChild.firstChild;p.firstChild.cN=s.cN;s.parentNode.replaceChild(p.firstChild,s)}else{t.innerHTML=y.value}t.className=u;t.result={language:v,kw:y.keyword_count,re:y.r};if(y.second_best){t.second_best={language:y.second_best.language,kw:y.second_best.keyword_count,re:y.second_best.r}}}function o(){if(o.called){return}o.called=true;var r=document.getElementsByTagName("pre");for(var p=0;p<r.length;p++){var q=b(r[p]);if(q){n(q,hljs.tabReplace)}}}function l(){if(window.addEventListener){window.addEventListener("DOMContentLoaded",o,false);window.addEventListener("load",o,false)}else{if(window.attachEvent){window.attachEvent("onload",o)}else{window.onload=o}}}var e={};this.LANGUAGES=e;this.highlight=d;this.highlightAuto=g;this.fixMarkup=i;this.highlightBlock=n;this.initHighlighting=o;this.initHighlightingOnLoad=l;this.IR="[a-zA-Z][a-zA-Z0-9_]*";this.UIR="[a-zA-Z_][a-zA-Z0-9_]*";this.NR="\\b\\d+(\\.\\d+)?";this.CNR="\\b(0[xX][a-fA-F0-9]+|(\\d+(\\.\\d*)?|\\.\\d+)([eE][-+]?\\d+)?)";this.BNR="\\b(0b[01]+)";this.RSR="!|!=|!==|%|%=|&|&&|&=|\\*|\\*=|\\+|\\+=|,|\\.|-|-=|/|/=|:|;|<|<<|<<=|<=|=|==|===|>|>=|>>|>>=|>>>|>>>=|\\?|\\[|\\{|\\(|\\^|\\^=|\\||\\|=|\\|\\||~";this.ER="(?![\\s\\S])";this.BE={b:"\\\\.",r:0};this.ASM={cN:"string",b:"'",e:"'",i:"\\n",c:[this.BE],r:0};this.QSM={cN:"string",b:'"',e:'"',i:"\\n",c:[this.BE],r:0};this.CLCM={cN:"comment",b:"//",e:"$"};this.CBLCLM={cN:"comment",b:"/\\*",e:"\\*/"};this.HCM={cN:"comment",b:"#",e:"$"};this.NM={cN:"number",b:this.NR,r:0};this.CNM={cN:"number",b:this.CNR,r:0};this.BNM={cN:"number",b:this.BNR,r:0};this.inherit=function(r,s){var p={};for(var q in r){p[q]=r[q]}if(s){for(var q in s){p[q]=s[q]}}return p}}();hljs.LANGUAGES.cpp=function(){var a={keyword:{"false":1,"int":1,"float":1,"while":1,"private":1,"char":1,"catch":1,"export":1,virtual:1,operator:2,sizeof:2,dynamic_cast:2,typedef:2,const_cast:2,"const":1,struct:1,"for":1,static_cast:2,union:1,namespace:1,unsigned:1,"long":1,"throw":1,"volatile":2,"static":1,"protected":1,bool:1,template:1,mutable:1,"if":1,"public":1,friend:2,"do":1,"return":1,"goto":1,auto:1,"void":2,"enum":1,"else":1,"break":1,"new":1,extern:1,using:1,"true":1,"class":1,asm:1,"case":1,typeid:1,"short":1,reinterpret_cast:2,"default":1,"double":1,register:1,explicit:1,signed:1,typename:1,"try":1,"this":1,"switch":1,"continue":1,wchar_t:1,inline:1,"delete":1,alignof:1,char16_t:1,char32_t:1,constexpr:1,decltype:1,noexcept:1,nullptr:1,static_assert:1,thread_local:1,restrict:1,_Bool:1,complex:1},built_in:{std:1,string:1,cin:1,cout:1,cerr:1,clog:1,stringstream:1,istringstream:1,ostringstream:1,auto_ptr:1,deque:1,list:1,queue:1,stack:1,vector:1,map:1,set:1,bitset:1,multiset:1,multimap:1,unordered_set:1,unordered_map:1,unordered_multiset:1,unordered_multimap:1,array:1,shared_ptr:1}};return{dM:{k:a,i:"</",c:[hljs.CLCM,hljs.CBLCLM,hljs.QSM,{cN:"string",b:"'\\\\?.",e:"'",i:"."},{cN:"number",b:"\\b(\\d+(\\.\\d*)?|\\.\\d+)(u|U|l|L|ul|UL|f|F)"},hljs.CNM,{cN:"preprocessor",b:"#",e:"$"},{cN:"stl_container",b:"\\b(deque|list|queue|stack|vector|map|set|bitset|multiset|multimap|unordered_map|unordered_set|unordered_multiset|unordered_multimap|array)\\s*<",e:">",k:a,r:10,c:["self"]}]}}}();hljs.LANGUAGES.r={dM:{c:[hljs.HCM,{cN:"number",b:"\\b0[xX][0-9a-fA-F]+[Li]?\\b",e:hljs.IMMEDIATE_RE,r:0},{cN:"number",b:"\\b\\d+(?:[eE][+\\-]?\\d*)?L\\b",e:hljs.IMMEDIATE_RE,r:0},{cN:"number",b:"\\b\\d+\\.(?!\\d)(?:i\\b)?",e:hljs.IMMEDIATE_RE,r:1},{cN:"number",b:"\\b\\d+(?:\\.\\d*)?(?:[eE][+\\-]?\\d*)?i?\\b",e:hljs.IMMEDIATE_RE,r:0},{cN:"number",b:"\\.\\d+(?:[eE][+\\-]?\\d*)?i?\\b",e:hljs.IMMEDIATE_RE,r:1},{cN:"keyword",b:"(?:tryCatch|library|setGeneric|setGroupGeneric)\\b",e:hljs.IMMEDIATE_RE,r:10},{cN:"keyword",b:"\\.\\.\\.",e:hljs.IMMEDIATE_RE,r:10},{cN:"keyword",b:"\\.\\.\\d+(?![\\w.])",e:hljs.IMMEDIATE_RE,r:10},{cN:"keyword",b:"\\b(?:function)",e:hljs.IMMEDIATE_RE,r:2},{cN:"keyword",b:"(?:if|in|break|next|repeat|else|for|return|switch|while|try|stop|warning|require|attach|detach|source|setMethod|setClass)\\b",e:hljs.IMMEDIATE_RE,r:1},{cN:"literal",b:"(?:NA|NA_integer_|NA_real_|NA_character_|NA_complex_)\\b",e:hljs.IMMEDIATE_RE,r:10},{cN:"literal",b:"(?:NULL|TRUE|FALSE|T|F|Inf|NaN)\\b",e:hljs.IMMEDIATE_RE,r:1},{cN:"identifier",b:"[a-zA-Z.][a-zA-Z0-9._]*\\b",e:hljs.IMMEDIATE_RE,r:0},{cN:"operator",b:"<\\-(?!\\s*\\d)",e:hljs.IMMEDIATE_RE,r:2},{cN:"operator",b:"\\->|<\\-",e:hljs.IMMEDIATE_RE,r:1},{cN:"operator",b:"%%|~",e:hljs.IMMEDIATE_RE},{cN:"operator",b:">=|<=|==|!=|\\|\\||&&|=|\\+|\\-|\\*|/|\\^|>|<|!|&|\\||\\$|:",e:hljs.IMMEDIATE_RE,r:0},{cN:"operator",b:"%",e:"%",i:"\\n",r:1},{cN:"identifier",b:"`",e:"`",r:0},{cN:"string",b:'"',e:'"',c:[hljs.BE],r:0},{cN:"string",b:"'",e:"'",c:[hljs.BE],r:0},{cN:"paren",b:"[[({\\])}]",e:hljs.IMMEDIATE_RE,r:0}]}};
hljs.initHighlightingOnLoad();
</script>
<style type="text/css">
body, td {
font-family: sans-serif;
background-color: white;
font-size: 13px;
}
body {
max-width: 800px;
margin: auto;
padding: 1em;
line-height: 20px;
}
tt, code, pre {
font-family: 'DejaVu Sans Mono', 'Droid Sans Mono', 'Lucida Console', Consolas, Monaco, monospace;
}
h1 {
font-size:2.2em;
}
h2 {
font-size:1.8em;
}
h3 {
font-size:1.4em;
}
h4 {
font-size:1.0em;
}
h5 {
font-size:0.9em;
}
h6 {
font-size:0.8em;
}
a:visited {
color: rgb(50%, 0%, 50%);
}
pre, img {
max-width: 100%;
}
pre {
overflow-x: auto;
}
pre code {
display: block; padding: 0.5em;
}
code {
font-size: 92%;
border: 1px solid #ccc;
}
code[class] {
background-color: #F8F8F8;
}
table, td, th {
border: none;
}
blockquote {
color:#666666;
margin:0;
padding-left: 1em;
border-left: 0.5em #EEE solid;
}
hr {
height: 0px;
border-bottom: none;
border-top-width: thin;
border-top-style: dotted;
border-top-color: #999999;
}
@media print {
* {
background: transparent !important;
color: black !important;
filter:none !important;
-ms-filter: none !important;
}
body {
font-size:12pt;
max-width:100%;
}
a, a:visited {
text-decoration: underline;
}
hr {
visibility: hidden;
page-break-before: always;
}
pre, blockquote {
padding-right: 1em;
page-break-inside: avoid;
}
tr, img {
page-break-inside: avoid;
}
img {
max-width: 100% !important;
}
@page :left {
margin: 15mm 20mm 15mm 10mm;
}
@page :right {
margin: 15mm 10mm 15mm 20mm;
}
p, h2, h3 {
orphans: 3; widows: 3;
}
h2, h3 {
page-break-after: avoid;
}
}
</style>
</head>
<body>
<h1>Parallel Processing for Distributed Computing in R, Python, Matlab, and C</h1>
<h2>Parallelization tools in a distributed memory (multiple machine) context</h2>
<p>Chris Paciorek, Department of Statistics, UC Berkeley</p>
<h1>0) This Tutorial</h1>
<p>This tutorial covers strategies for using parallel processing in R, Python, Matlab (briefly), and C on multiple machines, in which the various processes must interact across a network linking the machines. </p>
<p>This tutorial assumes you have access to two or more servers on which to parallelize your computation, potentially via a Linux cluster managed via scheduling software such as SLURM, and that MPI, R, and Python are installed on the machines. </p>
<p>Alternatively, you may be able to start a virtual cluster on Amazon Web Services using CfnCluster. If using CfnCluster, we recommend using a virtual machine developed here at Berkeley, <a href="http://bce.berkeley.edu">the Berkeley Common Environment (BCE)</a>. BCE is a virtual Linux machine - basically it is a Linux computer that you can run within your own computer, regardless of whether you are using Windows, Mac, or Linux. This provides a common environment so that things behave the same for all of us. Please follow the instructions related to CfnCluster at the <a href="http://bce.berkeley.edu/install.html">BCE install page</a>. </p>
<p>This tutorial assumes you have a working knowledge of either R, Python, or C. </p>
<p>Materials for this tutorial, including the R markdown file and associated code files that were used to create this document are available on Github at (<a href="https://github.com/berkeley-scf/tutorial-parallel-distributed">https://github.com/berkeley-scf/tutorial-parallel-distributed</a>). You can download the files by doing a git clone from a terminal window on a UNIX-like machine, as follows:</p>
<pre><code class="r">git clone https://github.com/berkeley-scf/tutorial-parallel-distributed
</code></pre>
<p>To create this HTML document, simply compile the corresponding R Markdown file in R as follows (the following will work from within BCE after cloning the repository as above).</p>
<pre><code class="r">Rscript -e "library(knitr); knit2html('parallel-dist.Rmd')"
</code></pre>
<p>This tutorial by Christopher Paciorek is licensed under a Creative Commons Attribution 3.0 Unported License.</p>
<h1>1) Types of parallel processing</h1>
<p>There are two basic flavors of parallel processing (leaving aside
GPUs): distributed memory and shared memory. With shared memory, multiple
processors (which I'll call cores) share the same memory. With distributed
memory, you have multiple nodes, each with their own memory. You can
think of each node as a separate computer connected by a fast network. </p>
<h2>1.1) Some useful terminology:</h2>
<ul>
<li><em>cores</em>: We'll use this term to mean the different processing
units available on a single node.</li>
<li><em>nodes</em>: We'll use this term to mean the different computers,
each with their own distinct memory, that make up a cluster or supercomputer.</li>
<li><em>processes</em>: computational tasks executing on a machine; multiple
processes may be executing at once. A given program may start up multiple
processes at once. Ideally we have no more processes than cores on
a node.</li>
<li><em>threads</em>: multiple paths of execution within a single process;
the OS sees the threads as a single process, but one can think of
them as 'lightweight' processes. Ideally when considering the processes
and their threads, we would have no more processes and threads combined
than cores on a node.</li>
<li><em>forking</em>: child processes are spawned that are identical to
the parent, but with different process IDs and their own memory.</li>
<li><em>sockets</em>: some of R's parallel functionality involves creating
new R processes (e.g., starting processes via <em>Rscript</em>) and
communicating with them via a communication technology called sockets.</li>
</ul>
<h2>1.2) Distributed memory and an overview of the topics in this tutorial</h2>
<p>Parallel programming for distributed memory parallelism requires passing
messages between the different nodes. The standard protocol for doing
this is MPI, of which there are various versions, including <em>openMPI</em>, which we'll use here.</p>
<p>The R package <em>Rmpi</em> implements MPI in R. The <em>pbdR</em> packages for R also implement MPI as well as distributed linear algebra.</p>
<p>Python has a package <em>mpi4py</em> that allows use of MPI within Python.</p>
<p>In both R and Python, there are also easy ways to do embarrassingly parallel calculations (such as simple parallel for loops) across multiple machines, with MPI and similar tools used behind the scenes to manage the worker processes.</p>
<p>Matlab has its own system for distributed computation, called the Distributed Computing Server (DCS), requiring additional licensing above the standard Matlab installation. </p>
<p>This tutorial will cover:</p>
<ul>
<li>simple parallelization of embarrassingly parallel computations (in R, Python, and Matlab) without writing code that explicitly uses MPI;</li>
<li>distributed linear algebra using the pbdR front-end to the <em>ScaLapack</em> package; and</li>
<li>using MPI explicitly (in R, Python and C).</li>
</ul>
<h2>1.3) Other type of parallel processing</h2>
<p>We won't cover any of these in this material.</p>
<h3>Shared memory parallelization</h3>
<p>For shared memory parallelism, each core is accessing the same memory
so there is no need to pass information (in the form of messages)
between different machines. But in some programming contexts one needs
to be careful that activity on different cores doesn't mistakenly
overwrite places in memory that are used by other cores. Threading is a form of shared memory parallelism.</p>
<p>This tutorial will not cover shared memory parallelization, as it is covered in <a href="https://github.com/berkeley-scf/tutorial-parallel-basics">a separate tutorial</a>.</p>
<p>For information about working with random numbers in a parallel computation, please see that same tutorial, as the discussion applies to both shared and distributed memory. </p>
<h3>GPUs</h3>
<p>GPUs (Graphics Processing Units) are processing units originally designed
for rendering graphics on a computer quickly. This is done by having
a large number of simple processing units for massively parallel calculation.
The idea of general purpose GPU (GPGPU) computing is to exploit this
capability for general computation. In spring 2016, I gave a <a href="http://statistics.berkeley.edu/computing/gpu">workshop on using GPUs</a>.</p>
<p>Most researchers don't program for a GPU directly but rather use software (often machine learning software such as Tensorflow or Caffe) that has been programmed to take advantage of a GPU if one is available.</p>
<h3>Spark and Hadoop</h3>
<p>Spark and Hadoop are systems for implementing computations in a distributed
memory environment, using the MapReduce approach. </p>
<h1>2) Starting MPI-based jobs</h1>
<p>Code that explicitly uses MPI, as well as code using MPI under the hood, such as <em>foreach</em> with <em>doMPI</em> in R and pbdR, requires that you start your process(es) in a special way via the <em>mpirun</em> command. Note that <em>mpirun</em>, <em>mpiexec</em> and <em>orterun</em> are synonyms under <em>openMPI</em>. </p>
<p>The basic requirements for starting such a job are that you specify the number of processes you want to run and that you indicate what machines those processes should run on. Those machines should be networked together such that MPI can ssh to the various machines without any password required.</p>
<h1>2.1) Running an MPI job under SLURM</h1>
<p>There are two ways to tell <em>mpirun</em> the machines on which to run the worker processes.</p>
<p>First, we can pass the machine names directly, replicating the name
if we want multiple processes on a single machine. In the example here, these are machines accessible to me, and you would need to replace those names with the names of machines you have access to. You'll need to <a href="http://statistics.berkeley.edu/computing/sshkeys">set up SSH keys</a> so that you can access the machines without a password.</p>
<pre><code class="bash">mpirun --host smeagol,radagast,arwen,arwen -np 4 hostname
</code></pre>
<pre><code>## smeagol
## radagast
## arwen
## arwen
</code></pre>
<p>Alternatively, we can create a file with the relevant information.</p>
<pre><code class="bash">echo 'smeagol slots=1' > .hosts
echo 'radagast slots=1' >> .hosts
echo 'arwen slots=2' >> .hosts
mpirun -machinefile .hosts -np 4 hostname
</code></pre>
<pre><code>## smeagol
## radagast
## arwen
## arwen
</code></pre>
<p><strong>If you are running your code as part of a job submitted to SLURM, you generally won't need to pass the <em>machinefile</em> or <em>np</em> arguments as MPI will get that information from SLURM.</strong> So you can simply do:</p>
<pre><code>mpirun hostname
</code></pre>
<p>Note that on a CfnCluster-based EC2 VM, you could run your job through SLURM, or you can directly use the node names, which can be seen by invoking <code>sinfo</code> and looking at the <em>NODELIST</em> column. </p>
<p>To limit the number of threads for each process, we can tell <em>mpirun</em>
to export the value of <em>OMP_NUM_THREADS</em> to the processes. E.g., calling a C program, <em>quad_mpi</em>:</p>
<pre><code>export OMP_NUM_THREADS=2
mpirun -machinefile .hosts -np 4 -x OMP_NUM_THREADS quad_mpi
</code></pre>
<p>In the examples above, I illustrated with a simple bash command (hostname) and with a compiled C program, but one would similarly
use the -machinefile flag when starting R or Python or a C program via mpirun.</p>
<p>There are additional details involved in carefully controlling how processes are allocated to nodes, but the default arguments for mpirun should do a reasonable job in many situations. </p>
<p>Also, I've had inconsistent results in terms of having the correct number of workers start up on each of the machines specified, depending on whether I specify the number of workers implicitly via the hosts information (without specifying -np), explicitly via -np or both. You may want to check that the right number of workers is running on each host. </p>
<h1>3) Basic parallelization across nodes</h1>
<p>Here we'll see the use of high-level packages in R, Python, and Matlab that hide the details of communication between nodes. </p>
<h2>3.1) R</h2>
<h3>3.1.1) <em>foreach</em> with the <em>doMPI</em> and <em>doSNOW</em> backends</h3>
<p>Just as we used <em>foreach</em> in a shared memory context, we can
use it in a distributed memory context as well, and R will handle
everything behind the scenes for you. </p>
<h4><em>doMPI</em></h4>
<p>Start R through the <em>mpirun</em> command as discussed above, either
as a batch job or for interactive use. We'll only ask for 1 process
because the worker processes will be started automatically from within R (but using the machine names information passed to mpirun).</p>
<pre><code>mpirun -machinefile .hosts -np 1 R CMD BATCH -q --no-save doMPI.R doMPI.out
mpirun -machinefile .hosts -np 1 R --no-save
</code></pre>
<p>Here's R code for using <em>Rmpi</em> as the back-end to <em>foreach</em>.
If you call <em>startMPIcluster</em> with no arguments, it will start
up one fewer worker processes than the number of hosts times slots given to mpirun
so your R code will be more portable. </p>
<pre><code class="r">## you should have invoked R as:
## mpirun -machinefile .hosts -np 1 R CMD BATCH --no-save doMPI.R doMPI.out
## unless running within a SLURM job, in which case you should do:
## mpirun R CMD BATCH --no-save file.R file.out
library(Rmpi)
library(doMPI)
cl = startMPIcluster() # by default will start one fewer slave
# than elements in .hosts
registerDoMPI(cl)
clusterSize(cl) # just to check
results <- foreach(i = 1:200) %dopar% {
out = mean(rnorm(1e6))
}
closeCluster(cl)
mpi.quit()
</code></pre>
<pre><code class="bash">mpirun -machinefile .hosts -np 1 R CMD BATCH -q --no-save doMPI.R doMPI.out
cat doMPI.out
</code></pre>
<pre><code>## > ## @knitr doMPI
## >
## > ## you should have invoked R as:
## > ## mpirun -machinefile .hosts -np 1 R CMD BATCH --no-save doMPI.R doMPI.out
## > ## unless running within a SLURM job, in which case you should do:
## > ## mpirun R CMD BATCH --no-save file.R file.out
## >
## > library(Rmpi)
## > library(doMPI)
## Loading required package: foreach
## Loading required package: iterators
## >
## > cl = startMPIcluster() # by default will start one fewer slave
## 3 slaves are spawned successfully. 0 failed.
## > # than elements in .hosts
## >
## > registerDoMPI(cl)
## > clusterSize(cl) # just to check
## [1] 3
## >
## > results <- foreach(i = 1:200) %dopar% {
## + out = mean(rnorm(1e6))
## + }
## >
## > closeCluster(cl)
## >
## > mpi.quit()
</code></pre>
<p>A caution concerning Rmpi/doMPI: when you invoke <code>startMPIcluster()</code>,
all the slave R processes become 100% active and stay active until
the cluster is closed. In addition, when <em>foreach</em> is actually
running, the master process also becomes 100% active. So using this
functionality involves some inefficiency in CPU usage. This inefficiency
is not seen with a sockets cluster (Section 3.1.4) nor when using other
Rmpi functionality - i.e., starting slaves with <em>mpi.spawn.Rslaves</em>
and then issuing commands to the slaves.</p>
<p>If you specified <code>-np</code> with more than one process then as with the C-based
MPI job above, you can control the threading via OMP_NUM_THREADS
and the -x flag to <em>mpirun</em>. Note that this only works when the
R processes are directly started by <em>mpirun</em>, which they are
not if you set -np 1. The <em>maxcores</em> argument to <em>startMPIcluster()</em>
does not seem to function (perhaps it does on other systems).</p>
<p>Sidenote: You can use <em>doMPI</em> on a single node, which might be useful for avoiding
some of the conflicts between R's forking functionality and openBLAS that
can cause R to hang when using <em>foreach</em> with <em>doParallel</em>.</p>
<h4><em>doSNOW</em></h4>
<p>The <em>doSNOW</em> backend has the advantage that it doesn't need to have MPI installed on the system. MPI can be tricky to install and keep working, so this is an easy approach to using <em>foreach</em> across multiple machines.</p>
<p>Simply start R as you usually would. </p>
<p>Here's R code for using <em>doSNOW</em> as the back-end to <em>foreach</em>. Make sure to use the <code>type = "SOCK"</code> argument or <em>doSNOW</em> will actually use MPI behind the scenes. </p>
<pre><code class="r">library(doSNOW)
machines = c(rep("beren.berkeley.edu", 1),
rep("gandalf.berkeley.edu", 1),
rep("arwen.berkeley.edu", 2))
cl = makeCluster(machines, type = "SOCK")
cl
registerDoSNOW(cl)
fun = function(i)
out = mean(rnorm(n))
nTasks <- 120
print(system.time(out <- foreach(i = 1:nTasks) %dopar% {
outSub <- fun()
outSub # this will become part of the out object
}))
stopCluster(cl) # good practice, but not strictly necessary
</code></pre>
<h4>Loading packages and accessing variables within your parallel tasks</h4>
<p>When using <em>foreach</em> with multiple machines, you need to use the <em>.packages</em> argument (or load the package in the code being run in parallel) to load any packages needed in the code. You do not need to explicitly export variables from the master process to the workers. Rather, <em>foreach</em> determines which variables in the global environment of the master process are used in the code being run in parallel and makes copies of those in each worker process. Note that these variables are read-only on the workers and cannot be modified (if you try to do so, you'll notice that <em>foreach</em> actually did not make copies of the variables that your code tries to modify). </p>
<h3>3.1.2) Using pbdR</h3>
<p>There is a project to enhance R's capability for distributed
memory processing called <a href="http://r-pbd.org">pbdR</a>. For an extensive tutorial, see the
<a href="https://github.com/wrathematics/pbdDEMO/blob/master/inst/doc/pbdDEMO-guide.pdf?raw=true">pbdDEMO vignette</a>.
<em>pbdR</em> is designed for
SPMD processing in batch mode, which means that you start up multiple
processes in a non-interactive fashion using mpirun. The same code
runs in each R process so you need to have the code behavior depend
on the process ID.</p>
<p><em>pbdR</em> provides the following capabilities:</p>
<ul>
<li>the ability to do some parallel apply-style computations (this section),</li>
<li>the ability to do distributed linear algebra by interfacing to <em>ScaLapack</em> (see Section 4), and</li>
<li>an alternative to <em>Rmpi</em> for interfacing with MPI (see Section 5).</li>
</ul>
<p>Personally, I think the second of the three is the most exciting as
it's a functionality not readily available in R or even more generally
in other readily-accessible software.</p>
<p>Let's see parallel-apply style computations in pbdR.</p>
<p>Here's some basic syntax for doing a distributed <em>apply()</em> on
a matrix that is on one of the workers. So in this case, the matrix is not initially distributed to the workers – that is done as part of the <em>pbdApply</em> computation. (One can also use <em>pbdApply</em> on matrices that are already distributed, and this is of course recommended for large matrices – see Section 4.) </p>
<p>As mentioned above, pbdR code is always run in batch mode, with the same code running on all of the processes. This means that you often need to explicitly build in logic about which process should execute a given piece of code, including print statements. Here the check for <code>comm.rank() == 0</code> allows us to only create the matrix and call some print statements on the master node (rank 0).</p>
<pre><code class="r">## you should have invoked R as:
## mpirun -machinefile .hosts -np 4 R CMD BATCH --no-save pbd-apply.R pbd-apply.out
## unless running within a SLURM job, in which case you should do:
## mpirun R CMD BATCH --no-save pbd-apply.R pbd-apply.out
library(pbdMPI, quiet = TRUE )
init()
nrows <- 1e6
if(comm.rank()==0) {
x <- matrix(rnorm(nrows*50), nrow = nrows)
}
sm <- comm.timer(out <- pbdApply(x, 1, mean, pbd.mode = 'mw', rank.source = 0))
if(comm.rank()==0) {
print(out[1:5])
print(sm)
}
finalize()
</code></pre>
<pre><code class="bash">mpirun -machinefile .hosts -np 4 Rscript pbd-apply.R > pbd-apply.out
cat pbd-apply.out
</code></pre>
<pre><code>## [1] 0.17686351 -0.12216986 0.04345966 -0.06581673 0.07439472
## min mean max
## 7.81000 12.99175 14.74900
</code></pre>
<p>In this case it's a fair amount slower to parallelize the calculation than just to do it in R using <em>rowSums()</em>, because of the overhead of communication (including passing the data) with the workers.</p>
<h3>3.1.3) Using parallel apply functionality in Rmpi</h3>
<p><em>Rmpi</em> is a package that provides MPI capabilities from R, including low-level MPI type calls (see Section 5). It also provides high-level wrapper functions that use MPI behind the scenes, including parallel apply functionality for operating on lists (and vectors) with functions such as <em>mpi.parSapply</em>. </p>
<p>The documentation (see <code>help(mpi.parSapply)</code>) documents a number of confusingly-named functions. It appears that they are basically multi-node versions of the analogous <em>parSapply</em> and related functions. </p>
<pre><code class="r">## you should have invoked R as:
## mpirun -machinefile .hosts -np 1 R CMD BATCH --no-save mpi.parSapply.R mpi.parSapply.out
## unless running within a SLURM job, in which case you should do:
## mpirun R CMD BATCH --no-save mpi.parSapply.R mpi.parSapply.out
library(Rmpi)
## on my system, this fails unless explicitly
## ask for one fewer slave than total number of slots across hosts
mpi.spawn.Rslaves(nslaves = mpi.universe.size()-1)
myfun <- function(i) {
set.seed(i)
mean(rnorm(1e7))
}
x <- seq_len(25)
# parallel sapply-type calculations on a vector
system.time(out <- mpi.parSapply(x, myfun))
system.time(out <- mpi.applyLB(x, myfun))
nrows <- 10000
x <- matrix(rnorm(nrows*50), nrow = nrows)
# parallel apply on a matrix
out <- mpi.parApply(x, 1, mean)
mpi.close.Rslaves()
mpi.quit()
</code></pre>
<pre><code class="bash">mpirun -machinefile .hosts -np 1 R CMD BATCH -q --no-save mpi.parSapply.R mpi.parSapply.out
cat mpi.parSapply.out
</code></pre>
<pre><code>## > ## @knitr mpi.parSapply
## >
## > ## you should have invoked R as:
## > ## mpirun -machinefile .hosts -np 1 R CMD BATCH --no-save mpi.parSapply.R mpi.parSapply.out
## > ## unless running within a SLURM job, in which case you should do:
## > ## mpirun R CMD BATCH --no-save mpi.parSapply.R mpi.parSapply.out
## >
## > library(Rmpi)
## > ## on my system, this fails unless explicitly
## > ## ask for one fewer slave than total number of slots across hosts
## > mpi.spawn.Rslaves(nslaves = mpi.universe.size()-1)
## 3 slaves are spawned successfully. 0 failed.
## master (rank 0, comm 1) of size 4 is running on: smeagol
## slave1 (rank 1, comm 1) of size 4 is running on: radagast
## slave2 (rank 2, comm 1) of size 4 is running on: arwen
## slave3 (rank 3, comm 1) of size 4 is running on: arwen
## >
## > myfun <- function(i) {
## + set.seed(i)
## + mean(rnorm(1e7))
## + }
## >
## > x <- seq_len(25)
## > # parallel sapply-type calculations on a vector
## > system.time(out <- mpi.parSapply(x, myfun))
## user system elapsed
## 5.612 8.032 13.644
## > system.time(out <- mpi.applyLB(x, myfun))
## user system elapsed
## 5.400 7.044 12.441
## >
## > nrows <- 10000
## > x <- matrix(rnorm(nrows*50), nrow = nrows)
## > # parallel apply on a matrix
## > out <- mpi.parApply(x, 1, mean)
## >
## > mpi.close.Rslaves()
## [1] 1
## > mpi.quit()
</code></pre>
<p>In some cases, it may be useful to specify <em>job.num</em> when the number of tasks is bigger than the number of worker processes to ensure load-balancing.</p>
<h3>3.1.4) Using sockets</h3>
<p>One can also set up a cluster with the worker processes communicating via sockets. You just need to specify
a character vector with the machine names as the input to <em>makeCluster()</em>. A nice thing about this is that it doesn't involve any of the complications of working with needing MPI installed.</p>
<pre><code class="r">library(parallel)
machines = c(rep("beren.berkeley.edu", 1),
rep("gandalf.berkeley.edu", 1),
rep("arwen.berkeley.edu", 2))
cl = makeCluster(machines)
cl
</code></pre>
<pre><code>## socket cluster with 4 nodes on hosts 'beren.berkeley.edu', 'gandalf.berkeley.edu', 'arwen.berkeley.edu'
</code></pre>
<pre><code class="r">n = 1e7
clusterExport(cl, c('n'))
fun = function(i)
out = mean(rnorm(n))
result <- parSapply(cl, 1:20, fun)
result[1:5]
</code></pre>
<pre><code>## [1] 1.431600e-04 6.146156e-04 -8.718859e-05 8.976951e-05 1.152365e-04
</code></pre>
<pre><code class="r">stopCluster(cl) # not strictly necessary
</code></pre>
<p>Note the use of <em>clusterExport</em>, needed to make variables in the master process available to the workers; this involves making a copy of each variable for each worker process. You'd also need to load any packages used in the code being run in parallel in that code. </p>
<h3>3.1.5) The <em>partools</em> package</h3>
<p><em>partools</em> is a somewhat new package developed by Norm Matloff at UC-Davis. He has the perspective that Spark/Hadoop are not the right tools in many cases when doing statistics-related work and has developed some simple tools for parallelizing computation across multiple nodes, also referred to as <em>Snowdoop</em>. The tools make use of the key idea in Hadoop of a distributed file system and distributed data objects but avoid the complications of trying to ensure fault tolerance, which is critical only on very large clusters of machines.</p>
<p>I won't go into details, but <em>partools</em> allows you to split up your data across multiple nodes and then read the data into R in parallel across R sessions running on those nodes, all controlled from a single master R session. You can then do operations on the subsets and gather results back to the master session as needed. One point that confused me in the <em>partools</em> vignette is that it shows how to split up a dataset that you can read into your R session, but it's not clear what one does if the dataset is too big to read into a single R session. </p>
<h2>3.2) Python</h2>
<h3>3.2.1) IPython parallel</h3>
<p>One can use IPython's parallelization tools in a context with multiple nodes, though the setup to get the worker processes is a bit more involved when you have multiple nodes. For details on using IPython parallel on a single node, see the <a href="https://github.com/berkeley-scf/tutorial-parallel-basics">parallel basics tutorial appendix</a>. </p>
<p>If we are using the SLURM scheduling software, here's how we start up the worker processes:</p>
<pre><code class="bash">ipcontroller --ip='*' &
sleep 25
# next line will start as many ipengines as we have SLURM tasks
# because srun is a SLURM command
srun ipengine &
sleep 45 # wait until all engines have successfully started
</code></pre>
<p>We can then run IPython to split up our computational tasks across the engines.</p>
<pre><code class="python">import numpy as np
np.random.seed(0)
n = 500
p = 50
X = np.random.normal(0, 1, size = (n, p))
Y = X[: , 0] + pow(abs(X[:,1] * X[:,2]), 0.5) + X[:,1] - X[:,2] + np.random.normal(0, 1, n)
def looFit(index, Ylocal, Xlocal):
rf = rfr(n_estimators=100)
fitted = rf.fit(np.delete(Xlocal, index, axis = 0), np.delete(Ylocal, index))
pred = rf.predict(np.array([Xlocal[index, :]]))
return(pred[0])
from ipyparallel import Client
c = Client()
c.ids
dview = c[:]
dview.block = True
dview.apply(lambda : "Hello, World")
lview = c.load_balanced_view()
lview.block = True
dview.execute('from sklearn.ensemble import RandomForestRegressor as rfr')
dview.execute('import numpy as np')
mydict = dict(X = X, Y = Y, looFit = looFit)
dview.push(mydict)
nSub = 50 # for illustration only do a subset
# need a wrapper function because map() only operates on one argument
def wrapper(i):
return(looFit(i, Y, X))
import time
time.time()
pred = lview.map(wrapper, range(nSub))
time.time()
print(pred[0:10])
# import pylab
# import matplotlib.pyplot as plt
# plt.plot(Y, pred, '.')
# pylab.show()
</code></pre>
<p>To finish up, we need to shut down the cluster of workers:</p>
<pre><code class="bash">ipcluster stop
</code></pre>
<p>To start the engines in a context outside of using slurm (provided all machines share a filesystem), you should be able ssh to each machine and run <code>ipengine &</code> for as many worker processes as you want to start as follows. In some, but not all cases (depending on how the network is set up) you may not need the <code>--location</code> flag. </p>
<pre><code class="bash">ipcontroller --ip='*' --location=URL_OF_THIS_MACHINE &
sleep 25
nengines=8
ssh other_host "for (( i = 0; i < ${nengines}; i++ )); do ipengine & done"
sleep 45 # wait until all engines have successfully started
</code></pre>
<h3>3.2.2) <em>pp</em> package</h3>
<p>Another way to parallelize across multiple nodes that uses more manual setup and doesn't integrate as well with scheduling software like SLURM is to use the pp package (also useful for parallelizing on a single machine as discussed in the <a href="https://github.com/berkeley-scf/tutorial-parallel-basics">parallel basics tutorial appendix</a>. </p>
<p>Assuming that the pp package is installed on each node (e.g., <code>sudo apt-get install python-pp</code> on an Ubuntu machine), you need to start up a ppserver process on each node. E.g., if <code>$nodes</code> is a UNIX environment variable containing the names of the worker nodes and you want to start 2 workers per node:</p>
<pre><code class="bash">nodes='smeagol radagast beren arwen'
for node in $nodes; do
# cd /tmp is because of issue with starting ppserver in home directory
# -w says how many workers to start on the node
ssh $node "cd /tmp && ppserver -s mysecretphrase -t 120 -w 2 &" &
done
</code></pre>
<p>Now in our Python code we create a server object and submit jobs to the server object, which manages the farming out of the tasks. Note that this will run interactively in IPython or as a script from UNIX, but there have been times where I was not able to run it interactively in the base Python interpreter. Also note that while we are illustrating this as basically another parallelized for loop, the individual jobs can be whatever calculations you want, so the function (in this case it's always <em>pi.sample</em>) could change from job to job.</p>
<pre><code class="python">import numpy.random
import pp
import time
import pi_code # provided in pi_code.py
samples_per_slice = 10000000
num_slices = 24*20
# remember to start ppserver on worker nodes
# assume 'hosts' contains the names of the nodes on which you
# started ppserver
nprocsPerNode = 2
hosts = ['smeagol', 'radagast', 'beren', 'arwen']
ppservers = hosts * nprocsPerNode
print ppservers
# put ncpus=0 here or it will start workers locally too
job_server = pp.Server(ncpus = 0, ppservers = tuple(ppservers), secret = 'mysecretphrase')
inputs = [(i, samples_per_slice) for i in xrange(num_slices)]
t0 = time.time()
jobs = [job_server.submit(pi_code.sample, invalue, modules = ('numpy.random',)) for invalue in inputs]
results = [job() for job in jobs]
t1 = time.time()
print "Pi is roughly %f" % (4.0 * sum(results) / (num_slices*samples_per_slice))
print "Time elapsed: ", t1 - t0
</code></pre>
<pre><code class="bash">python python-pp.py > python-pp.out
cat python-pp.out
</code></pre>
<pre><code>['smeagol', 'radagast', 'beren', 'arwen', 'smeagol', 'radagast', 'beren', 'arwen']
Pi is roughly 3.141567
Time elapsed: 32.0389587879
</code></pre>
<p>The -t flag used when starting ppserver should ensure that the server processes are removed, but if you need to do it manually, this should work:</p>
<pre><code class="bash">for node in $nodes; do
killall ppserver
done
</code></pre>
<h2>3.3) Matlab</h2>
<p>To use Matlab across multiple nodes, you need to have the Matlab Distributed Computing Server (DCS). If it is installed, one can set up Matlab so that <em>parfor</em> will distribute its work across multiple nodes. Details may vary depending on how DCS is installed on your system. </p>
<h1>4) Distributed linear algebra in R using pbdR</h1>
<h2>4.1) Distributed linear algebra example</h2>
<p>And here's how you would set up a distributed matrix and do linear
algebra on it. Note that when working with large matrices, you would
generally want to construct the matrices (or read from disk) in a
parallel fashion rather than creating the full matrix on one worker.
For simplicity in the example, I construct the matrix, <em>x</em>, on the master
and then create the distributed version of the matrix, <em>dx</em>, with <em>as.ddmatrix</em>.</p>
<p>Here's the code in <em>pbd-linalg.R</em>.</p>
<pre><code class="r">library(pbdDMAT, quiet = TRUE )
n <- 4096*2
# if you are putting multiple processes on node
# you may want to prevent threading of the linear algebra:
# library(RhpcBLASctl)
# blas_set_num_threads(1)
# (or do by passing OMP_NUM_THREADS to mpirun)
init.grid()
if(comm.rank()==0) print(date())
# pbd allows for parallel I/O, but here
# we keep things simple and distribute
# an object from one process
if(comm.rank() == 0) {
x <- rnorm(n^2)
dim(x) <- c(n, n)
} else x <- NULL
dx <- as.ddmatrix(x)
timing <- comm.timer(sigma <- crossprod(dx))
if(comm.rank()==0) {
print(date())
print(timing)
}
timing <- comm.timer(out <- chol(sigma))
if(comm.rank()==0) {
print(date())
print(timing)
}
finalize()
</code></pre>
<p>As before we run the job in batch mode via mpirun:</p>
<pre><code class="bash">export OMP_NUM_THREADS=1
mpirun -machinefile .hosts -np 4 -x OMP_NUM_THREADS Rscript pbd-linalg.R > pbd-linalg.out
cat pbd-linalg.out
</code></pre>
<pre><code>## Using 2x2 for the default grid size
##
## [1] "Sat Oct 17 12:05:38 2015"
## [1] "Sat Oct 17 12:06:54 2015"
## min mean max
## 48.086 50.806 52.585
## [1] "Sat Oct 17 12:08:10 2015"
## min mean max
## 76.47000 76.51125 76.53300
</code></pre>
<p>You may want to set the <em>bldim</em> argument to <em>as.ddmatrix</em>. That determines
the size of the submatrices (aka 'blocks') into which the overall matrix is split. Generally, multiple
submatrices are owned by an individual worker process. For example, to use 100x100
blocks, you'd have</p>
<pre><code>dx <- as.ddmatrix(x, bldim = c(100, 100))
</code></pre>
<p>In general, you don't
want the blocks too big as the work may not be well load-balanced, or too small as
that may have a higher computational cost in terms of latency and communication.
My experiments suggest that it's worth exploring block sizes of 10x10 through 1000x1000 (if you have square matrices). </p>
<p>As a quick, completely non-definitive point of comparison, doing the
crossproduct and Cholesky for the 8192x8192 matrix on 3 EC2 nodes
(2 cores per node) with -np 6 took 39 seconds for each operation,
while doing with two threads on the master node took 64 seconds (crossproduct)
and 23 seconds (Cholesky). While that is a single test, some other experiments
I've done also haven't show much speedup in using multiple nodes with pbdR compared
to simply using a threaded BLAS on one machine. So you may need to get fairly big matrices
that won't fit in memory on a single machine before it's worthwhile
to do the computation in distributed fashion using pbdR.</p>
<h2>4.2) Constructing a distributed matrix on parallel</h2>
<p>pbdR has functionality for reading in parallel from a parallel file
system such as Lustre (available on Berkeley's Savio cluster). Things
are bit more complicated if that's not the case. Here's some code that
illustrates how to construct a distributed matrix from constituent column blocks.
First create a distributed version of the
matrix using a standard R matrix with each process owning a block of
columns (I haven't yet gotten the syntax to work for blocks of rows). Then create a
pbd version of that distributed matrix and finally convert the
distributed matrix to a standard pbd block structure on which the
linear algebra can be done efficiently. </p>
<pre><code class="r">library(pbdDMAT, quiet = TRUE)
init.grid()
nprocs <- comm.size()
nrows <- 10000
ncolsPerBlock <- nrows/nprocs
# each process has a block of columns as an R matrix
subdata <- matrix(rnorm(nrows * ncolsPerBlock), ncol = ncols)
# now construct the distributed matrix object
tmp <- ddmatrix(data = subdata, nrow = nrows, ncol = nrows,
bldim = c(nrows, ncolsPerBlock), ICTXT = 1)
# now rearrange the blocks for better linear algebra performance
dx <- redistribute(tmp, bldim = c(100, 100), ICTXT = 0)
finalize ()
</code></pre>
<p>The code above creates the submatrices within the R sessions, but one could also read in from separate files, one per process.</p>
<p>The code in <em>redistribute-test.R</em> demonstrates that constructing the full matrix
from column-wise blocks with this syntax works correctly. </p>
<h1>5) MPI</h1>
<h2>5.1) MPI Overview</h2>
<p>There are multiple MPI implementations, of which <em>openMPI</em> and
<em>mpich</em> are very common. <em>openMPI</em> is quite common, and we'll use that.</p>
<p>In MPI programming, the same code runs on all the machines. This is
called SPMD (single program, multiple data). As we saw a bit with the pbdR code, one
invokes the same code (same program) multiple times, but the behavior
of the code can be different based on querying the rank (ID) of the
process. Since MPI operates in a distributed fashion, any transfer
of information between processes must be done explicitly via send
and receive calls (e.g., <em>MPI_Send</em>, <em>MPI_Recv</em>, <em>MPI_Isend</em>,
and <em>MPI_Irecv</em>). (The “MPI_'' is for C code; C++ just has
<em>Send</em>, <em>Recv</em>, etc.)</p>
<p>The latter two of these functions (<em>MPI_Isend</em> and <em>MPI_Irecv</em>)
are so-called non-blocking calls. One important concept to understand
is the difference between blocking and non-blocking calls. Blocking
calls wait until the call finishes, while non-blocking calls return
and allow the code to continue. Non-blocking calls can be more efficient,
but can lead to problems with synchronization between processes. </p>