-
Notifications
You must be signed in to change notification settings - Fork 0
/
hadoop.tex
1801 lines (1386 loc) · 87.9 KB
/
hadoop.tex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
\documentclass[12pt,english]{book}
\usepackage[utf8]{inputenc}
\usepackage{float}
\usepackage[T1]{fontenc}
\usepackage[english]{babel}
\usepackage{CJKutf8}
\usepackage[breaklinks]{hyperref}
\usepackage{graphicx}
\usepackage[table,xcdraw]{xcolor}
\usepackage{listings}
\graphicspath{{images/}}
\restylefloat{table}
\title{Chapter 2\\Hadoop, Hive, Spark with examples}
\date{2020-09-24}
\author{Gaetan Robert Lescouflair\\Sergio Simonian}
\begin{document}
\pagenumbering{gobble}
\maketitle
\newpage
\pagenumbering{arabic}
\setcounter{chapter}{2}
\setcounter{secnumdepth}{3}
\setlength\arrayrulewidth{1pt}
\lstset{escapeinside={\%}{)}}
\section{Introduction}
As soon as we have more data than can be fit in one machine or we want to process more than a single machine can handle in a reasonable time, we tend to redesign our systems to work in a distributed manner.
While a distributed system can scale horizontally (by adding more machines), it comes with new challenges to tackle.
How to optimally distribute the workload across (many different) machines?
How to ensure that the system does not interrupt or produce wrong results if a machine fails (fault tolerance) or becomes unavailable (partition tolerance)?
Also, a distributed system has to serve multiple users and run several applications concurrently.
In that case, how to manage file access rights and resource usage?
Moreover, how to make the distributed system appear as a single coherent system to the end-users?
This chapter will explore how Hadoop, Hive, and Spark handle these challenges and provide us with a simple way to set up a distributed system featuring distributed storage and parallel processing.
\section{Hadoop}
\emph{Apache Hadoop} is an open-source, cross-platform framework written in Java for distributed data storage and parallel data processing.
Doug Cutting and Mike Cafarella created Hadoop, inspired by two research papers from Google: "The Google File System" (2003) and "MapReduce: Simplified Data Processing on Large Clusters" (2004).
It scales up from one machine to large clusters of thousands of machines with different hardware capacities (disk, CPU, RAM, and bandwidth).
Hadoop plays an essential role in Big Data distributed computing and storage, ranging from structured to unstructured data.
The machines in a Hadoop cluster work together to behave as if they were a single system.
Leading providers of Big Data Database Management Systems have implemented the Hadoop platform in their enterprise solutions.
For example, Oracle's "Big Data Appliance"
\footnote{\url{https://docs.oracle.com/en/bigdata/big-data-appliance/5.1/bigug/concepts.html\#GUID-8D18CCDF-D5EB-421B-9E5D-13027856EDA0}}
, Microsoft's "Polybase"
\footnote{\url{https://docs.microsoft.com/en-us/sql/relational-databases/polybase/get-started-with-polybase}}
and IBM's "BigInsights"
\footnote{\url{https://www.ibm.com/support/knowledgecenter/en/SSPT3X\_4.0.0/com.ibm.swg.im.infosphere.biginsights.product.doc/doc/bi\_editions.html}}
.
\begin{figure}[ht]
\centering
\includegraphics[width=\linewidth]{hadoop1vshadoop2.png}
\caption[Differences between Hadoop 1.0 and Hadoop 2.0]{Differences between Hadoop 1.0 and Hadoop 2.0 \footnotemark}
\label{fig:differenceBetweenHadoop1and2}
\end{figure}
\footnotetext{\url{https://infinitescript.com/wordpress/wp-content/uploads/2014/08/Differences-between-Hadoop-1-and-2.png}}
At the time of this writing, the latest version of Hadoop is 3.3.1.
In its first version (Figure \ref{fig:differenceBetweenHadoop1and2}), the essential components are the MapReduce model, which is responsible for the distributed processing and management of cluster resources, and HDFS (Hadoop Distributed File System) for the distributed storage.
In the second version of Hadoop (Figure \ref{fig:differenceBetweenHadoop1and2}), the MapReduce model is used only for distributed processing, and YARN (Yet Another Resource Negotiator) has become the cluster resource manager.
This change in architecture allows Hadoop to have a whole ecosystem around it, including other Frameworks capable of performing distributed data processing while adding new structures and new ways of making Hadoop function at the application and execution levels.
Finally, in version 3, improvements are introduced to reduce storage costs while maintaining fault tolerance and optimizing resource management for even greater scalability.
In the following sections of this chapter, we will look in more detail at how MapReduce, HDFS, YARN, and the ecosystem around Hadoop works, how to install Hadoop, and how to run MapReduce jobs, and finally present Apache Hive and Apache Spark and how to use them with examples.
\section{MapReduce}
\emph{MapReduce} is a paradigm designed to simplify parallel data processing on large clusters.
Its principle is based on the "divide and conquer" technique - it divides the computation into sub-processes and runs them in parallel on the cluster.
A standard MapReduce program reads data from HDFS, splits it into parts, assigns for each part a key, groups these parts by their keys, and computes a summary for each group.
\paragraph{The four main steps of a MapReduce process:}\mbox{}\\
A MapReduce process consists of several steps. Here are the four main steps in their corresponding order:
\begin{itemize}
\item
\textbf{Split}: Split input data into multiple fragments to form subsets of data according to an index such as a space, comma, semicolon, new line, or any other logical rule.
\item
\textbf{Map}: Map each of the fragments into a new subset where the elements form key-value pairs.
\item
\textbf{Shuffle}: Group all the key-value pairs by their respective keys.
\item
\textbf{Reduce}: Perform a calculation on each group of values and output a possibly smaller set of values.
\end{itemize}
\paragraph{The general structure of a MapReduce process is in this form:}\mbox{}\\
Map (key 1, value 1) -> list(key 2, value 2)
Reduce (key 2, list(value 2)) -> list(key 3, value 3)
\begin{figure}[ht]
\centering
\includegraphics[width=\linewidth]{mapReduceSchema.png}
\caption{MapReduce process steps illustrated with the word counter example}
\label{fig:wordCountExample}
\end{figure}
\paragraph{Let us take a closer look at the MapReduce process steps with a word count example (Figure \ref{fig:wordCountExample})}\mbox{}\\
In the following table (Table ~\ref{tbl:wordCountExample}), we see the shape of the input and output data for the different steps.
\begin{table}[H]
\begin{tabular}{|p{1.4cm}|p{3.4cm}|p{1.5cm}|p{3.3cm}|p{2cm}|}
\hline
\rowcolor[HTML]{CBCEFB}
Step & Input & Input type & Output & Output type
\\ \hline
Split &
Hello Hadoop Welcome HDFS \par Hello Yarn Bye Yarn &
Text file &
(1; "Hello Hadoop Welcome HDFS") \par (2; "Hello Yarn Bye Yarn") &
Fragments of the input file in form of key-value pairs
\\ \hline
\rowcolor[HTML]{EDEDED}
Map &
(1; "Hello Hadoop Welcome HDFS") \par (2; "Hello Yarn Bye Yarn") &
Key-value pairs &
(Hello;1), (Hadoop;1), (Welcome;1), (HDFS;1), (Hello; 1), (Yarn;1), (Bye;1), (Yarn;1) &
Key-value pairs
\\ \hline
Shuffle &
(Hello;1), (Hadoop;1), (Welcome;1), (HDFS;1), (Hello; 1), (Yarn;1), (Bye;1), (Yarn;1) &
Key-value pairs &
[(Hello;1)(Hello;1)], [(Hadoop;1)], [(Welcome;1)], [(HDFS;1)], [(Yarn;1)(Yarn;1)], [(Bye;1)] &
Groups of key-value pairs by key
\\ \hline
\rowcolor[HTML]{EDEDED}
Reduce &
[(Hello;1)(Hello;1)], [(Hadoop;1)], [(Welcome;1)], [(HDFS;1)], [(Yarn;1)(Yarn;1)], [(Bye;1)] &
Groups of key-value pairs by key &
(Hello;2), (Hadoop;1), (Welcome;1), (HDFS;1), (Yarn;2), (Bye;1) &
Subset of key-value pairs
\\ \hline
\end{tabular}
\caption{MapReduce input/output data in the word count example}
\label{tbl:wordCountExample}
\end{table}
\paragraph{Hadoop and MapReduce}\mbox{}\\
In general, Hadoop executes each MapReduce process step in a distributed manner.
To see how Hadoop distributes the split step, we will first look at how Hadoop stores its data in the following section.
\section{HDFS}
HDFS stands for Hadoop Distributed File System.
As its name indicates, it is a distributed file system used by Hadoop.
From a user perspective, it is similar to other filesystems such as Ext4, FAT32, NTFS, and HFS+.
However, its internal functioning is very different.
Due to its distributed nature, it can store large amounts of data.
HDFS divides each file it stores into fixed-size blocks, distributes them over the entire cluster, and replicates each of them (by default three times) across the cluster to assure fault tolerance.
In case a node in the cluster becomes unavailable, there will be, for each data block, two other nodes that have a replica of the lost block.
All this is handled transparently for the end-user giving him the impression of a regular single-machine filesystem.
\paragraph{HDFS Architecture}\mbox{}\\
HDFS is a master-worker architecture composed of two main daemons: NameNode and DataNode:
\begin{itemize}
\item
The \textbf{NameNode} daemon is the master of the HDFS cluster.
It stores metadata about all files and directories present on HDFS (their paths, data block IDs, access rights) and keeps track of all changes done to them.
The NameNode persists this information on its local host OS file system in two types of files: \emph{fsimage} and \emph{edit-logs}.
The \emph{fsimage} contains the state of the file system at a given time, and the \emph{edit-logs} record every change in the file system metadata since the creation of the last \emph{fsimage}.
The NameNode also keeps track of the locations of blocks and replicas on the cluster.
All interactions like downloading/uploading/listing/creating/deleting/moving/copying files on HDFS first go through the NameNode.
In order to serve clients as quickly as possible, the NameNode daemon keeps all metadata in memory (RAM) and only persists metadata changes in the edit-logs.
In case of a crash, when the NameNode restarts, it loads the last \emph{fsimage} in memory and applies the changes from the \emph{edit-logs} to restore its previous state.
However, to keep the file system consistent for all clients, there can only be one active NameNode on the cluster.
Unfortunately, this constraint creates a single point of failure.
If the NameNode becomes unavailable, the whole HDFS cluster cannot be used by the clients anymore.
Fortunately, Hadoop provides a way to assure the NameNode High Availability by running one or multiple Standby NameNodes that keep their state in sync with the active one and can take over its role in case of a failure.
Moreover, failover from the Active NameNode to a Standby NameNode can be relatively quick and transparent to the clients.
\item
The \textbf{DataNode} daemon is a worker of the HDFS cluster.
It runs on every cluster node, except usually the Namenode, and is responsible for storing and managing data blocks.
Each DataNode performs block creation, deletion, and replication upon instruction from the NameNode and serves read and write requests from the file system's clients.
It also periodically sends Heartbeats and block reports to the NameNode to confirm that it is alive and healthy.
The DataNodes communicate as well with each other to perform data replication.
\end{itemize}
\paragraph{Reading and Writing files on HDFS}\mbox{}\\
\begin{figure}[t]
\centering
\includegraphics[width=\linewidth]{hdfsArch.png}
\caption[HDFS Architecture]{HDFS Architecture \footnotemark}
\end{figure}
\footnotetext{\url{https://hadoop.apache.org/docs/r3.3.1/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html}}
When a filesystem client wants to read a file in HDFS, it has to ask the NameNode where to retrieve the data blocks for the file.
The NameNode will respond with a list of DataNodes for each block.
Then the client has to contact the DataNodes directly to retrieve the corresponding data blocks.
Finally, the filesystem client reconstructs the original file by merging the retrieved data blocks.
Writing files to HDFS is quite similar.
The client splits the original file into blocks and asks the NameNode where to store them.
The NameNode will respond with a list of DataNodes for each block.
Then the client has to contact the DataNodes directly to send the corresponding file data blocks.
Next, the DataNodes will replicate the received blocks and send an acknowledgment to the client.
Finally, the client will notify the NameNode about the completion of the write operation.
Fortunately, for the end-user, reading and writing files on HDFS is abstracted by a command-line interface (\textbf{hadoop fs} \footnote{\url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html}}) and a java library (\textbf{org.apache.hadoop.fs}\footnote{\url{https://hadoop.apache.org/docs/r3.3.1/api/org/apache/hadoop/fs/package-summary.html}}).
\paragraph{Using HDFS comand-line interface}\mbox{}\\
Here we will demonstrate the usage of the Hadoop command-line interface with some examples.
To create some nesting directories, we can use the \textbf{-mkdir} argument followed by \textbf{-p} and the path of the directories.
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
hadoop fs -mkdir -p /user/hdoop/example
\end{lstlisting}
Next, we will create two files in our local filesystem and upload them to HDFS in our example directory using the \textbf{-put} argument.
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
echo "Hello Hadoop Welcome HDFS" > test_file_1.txt
echo "Hello Yarn Bye Yarn" > test_file_2.txt
hadoop fs -put test_file_1.txt test_file_2.txt /user/hdoop/example
\end{lstlisting}
To list the content of our directory, we can use the \textbf{-ls} argument.
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
hadoop fs -ls /user/hdoop/example
\end{lstlisting}
To print the content of a file on the standard output, we can use the \textbf{-cat} argument.
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
hadoop fs -cat /user/hdoop/example/test_file_1.txt
\end{lstlisting}
Finally, to download a file back to our local filesystem, we can use the \textbf{-get} option.
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
hadoop fs -get /user/hdoop/example/test_file_2.txt ./downloaded_file.txt
\end{lstlisting}
Note that the \textbf{hadoop fs} command has many more arguments providing additional functionality.
We have merely shown some of the most common and simple ones.
For an exhaustive list of all available arguments, refer to the official Hadoop documentation. \footnote{\url{https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html}})
\paragraph{MapReduce and HDFS}\mbox{}\\
In a nutshell, files stored on HDFS are split into fixed-size blocks, replicated, and spread over the Hadoop cluster.
This prior setup allows Hadoop to distribute the processing of MapReduce programs on cluster nodes that already possess the required data (data-locality).
Let us now revisit the word count MapReduce example.
In the first MapReduce step, we want to split our input file by line, and we
have mentioned that Hadoop will perform this task in a distributed manner.
Hadoop will start by computing the number of split operation tasks to spawn and selects several cluster nodes to perform these tasks based on their current resource availability, configured policies, and other factors with a preference for nodes that possess the required data.
By default, the number of split operation tasks is equal to the number of data blocks of the input file.
However, to accomplish this, Hadoop needs a way to coordinate its tasks which is the subject of the next section.
\section{YARN}
YARN is Hadoop's resource manager that distributes tasks to all the machines in the Hadoop cluster and tracks the status of the running tasks.
\begin{figure}[ht]
\centering
\includegraphics[width=10cm]{yarnArch.png}
\caption[YARN architecture]{YARN architecture \footnotemark}
\label{fig:YARNarchitecture}
\end{figure}
\footnotetext{Source : \url{https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YARN.html}}
YARN is also a master-worker architecture composed of two main daemons: ResourceManager (master) and NodeManager (worker). (See Figure ~\ref{fig:YARNarchitecture}):
\begin{itemize}
\item
The \textbf{ResourceManager} keeps track and manages available resources in the cluster. It also receives application submissions from clients.
\item
The \textbf{NodeManager} runs on each node of the cluster, except usually the ResourceManager node, and is responsible for providing execution containers. The containers consist of execution environments with limited resources (like RAM or CPU) and run tasks (for example, MAP, REDUCE, ApplicationMaster, ...).
\end{itemize}
**********************************************************************
When a YARN receives an application submission from a client, it starts its execution on a NodeManager cluster node.
**********************************************************************
\section{Hadoop Ecosystem}
\textbf{Hadoop} is a framework that includes a range of tools and technologies around it which form the Hadoop ecosystem (see Figure ~\ref{fig:HadoopEco}).
Before starting to work with Hadoop, it is vital to understand its environment.
Each tool can play a substantial role in different parts of a Big Data Project.
HDFS, YARN, and MapReduce are the foundation of the Hadoop ecosystem. Most tools of the Hadoop ecosystem are open-source projects from the Apache Software Foundation. However, there are proprietary solutions too.
All the tools in the ecosystem have the ingestion, storage, analysis of data, and maintenance of the system as their primary purpose.
\begin{figure}[ht]
\centering
\includegraphics[width=10cm]{hadoopEco.png}
\caption[Hadoop ecosystem]{Hadoop ecosystem \footnotemark}
\label{fig:HadoopEco}
\end{figure}
\footnotetext{Source : \url{https://cdn.edureka.co/blog/wp-content/uploads/2016/10/HADOOP-ECOSYSTEM-Edureka.png}}
The number of tools around Hadoop is constantly increasing. In this section, we will take a look at the most commonly used tools currently on the market in the field of Big Data.
\subsection{Hive}
Apache Hive \footnote{See https://hive.apache.org/} is a Data Warehousing Framework that allows you to read, write and manage large volumes of data in a distributed environment from an SQL-like interface.
\subsection{Spark}
Apache Spark \footnote{See https://spark.apache.org/} is a framework for performing data analysis using in-memory data processing in a distributed environment.
\subsection{Sqoop}
Apache Sqoop \footnote{Voir http://sqoop.apache.org/} is an ETL (Extract, Transform, Load) tool designed to perform data transfers between Hadoop and structured data (relational database, CSV file, ...) on large volumes efficiently.
\subsection{Hbase}
Apache HBase \footnote{Voir https://hbase.apache.org/} is a Hadoop database with the ability to manage read and write access to large volumes of data in a random and real-time manner. HBase is a database capable of maintaining large tables that can contain millions of columns.
\subsection{Pig}
Apache Pig \footnote{Voir https://pig.apache.org/} is a platform for performing data analysis on large volumes of data. It offers a high-level language, named Pig Latin, with command structures similar to SQL. When compiled, it produces MAP and REDUCE job sequences already capable of being parallelized on Hadoop.
\subsection{Zookeeper}
Apache Zookeeper \footnote{See https://zookeeper.apache.org/} is a centralized service manager in a distributed environment. It allows to maintain configuration information and provides distributed information synchronization and enumeration and grouping services.
% Maintains data across a distributed system in a consistent manner
% For example, it can keep track of information that must be in sync across the cluster
% - Which node is the master / - What task are assigned to which workers / - Which workers are currently available
% - What tasks are assigned to which workers
% - Which workers are currently available
% Can be used as a tool that applications can use to recover from partial failures in a cluster
% An integral part of HBase, High-Availability MapReduce, Drill, Storm, Solr, and much more
% (Master election) In High-Availability single master systems (HBase / YARN / HDFS) - can keep track of who the master node is, detect when the master is down, trigger a new master election for the standby master nodes and assure only one new master node is elected.
% One node registers itself as the master and holds a "lock" on that data. Other nodes cannot become master until the lock is released. Only one node is allowed to hold the lock at a time.
% (Crash detection) Can detect and notify the application of Worker node crashes - then the application can redistribute the work load.
% "Ephemeral" data on a node's availability goes away if the node disconnects or fails to refresh itself (heartbeat) after some timeout period.
% (Group management) keep track of what workers are available in your pool
% (Store Metadata), which has to be consistent across the entire cluster, like a list of outstanding tasks and assignments.
% Detect network failures (partitioning).
% But instead of providing a specific API tackling these problems - Zookeeper is much more general - it provides a very consistent little distributed file system that any application in the distributed system can read and write. Using this approach pushes the logic of dealing with those failures to the individual applications.
% Replace the concept of file with znode, and you pretty much got it!
% Zookeepers API:
% Create, delete, exists, setData, getData, getChildren
% To avoid continuous polling, clients can register for notifications on a znode.
% Persistent znodes - remain stored until explicitly deleted
% Ephemeral znodes go away if the client that created it crashes or loses its connection to Zookeeper.
% Zookeeper architecture image from zookeeper.apache.org
% ZK clients (maintains a list of ZK servers addresses to) connect to one of the ZK servers (in a distributed manner to distribute de read load), which form a ZK Ensemble. ZK Ensemble replicates the data among its nodes.
% When a client writes to the ZK ensemble - it waits for confirmation while the date is replicated in a configured number of ZK servers (zookeeper quorum) (to guarantee consistency). Split Brain problem - when a part of the cluster has different information than another part. (Availability trade-off of the CAP theorem)
\subsection{Ambari}
Apache Ambri \footnote{See https://ambari.apache.org/} is a management tool that provides services to simplify new service provisioning and configuration, management, and monitoring in Hadoop clusters.
\subsection{Oozie}
Apache Oozie \footnote{See http://oozie.apache.org/} is an event scheduling and triggering system in Hadoop.
It can be considered as a clock or alarm service internal to Hadoop.
It can execute a set of events one after the other or trigger events based on the availability of information.
The events launched can be map-reduce, Pig, Hive, Sqoop, Java program tasks, and many others.
\subsection{Apache Solr and Lucene}
Apache Solr and Apache Lucene \footnote{See https://solr.apache.org/} are two services that are used for search and indexing in the Hadoop environment.
They are suitable for implementing information systems that require full-text search.
Lucene is a core component, and Solr is built around it, adding even more functionality.
\subsection{Kafka}
Apache Kalka \footnote{See https://kafka.apache.org/} is a distributed messaging system for publishing, subscribing, and recording data stream exchanges.
It allows the creation of a data distribution pipeline between systems or applications.
\subsection{Storm}
Apache Storm \footnote{See https://storm.apache.org/} is a data stream processing system for real-time analytics use cases, machine learning, continuous operations monitoring.
\subsection{Flume}
Apache Flume \footnote{See https://flume.apache.org/} is a distributed service for collecting, aggregating, and transferring large volumes of semi-structured or unstructured data from online streams in HDFS.
\subsection{Drill}
Apache Drill \footnote{See https://drill.apache.org/} is a schema-free SQL query engine for Hadoop, NoSQL, and Cloud Storage.
It supports a variety of NoSQL databases and is capable of performing join queries between multiple data sources.
\subsection{Mahout}
Apache Mahout \footnote{See https://mahout.apache.org/} provides an environment for the development of Machine Learning applications at scale.
\subsection{Impala}
Apache Impala \footnote{See https://impala.apache.org/} and Presto \footnote{Voir https://prestodb.io/} are SQL query engines designed for Big Data.
They are capable of processing Petabytes of data very quickly.
For more information on Impala, see « Impala : A Modern, Open-Source SQL Engine for Hadoop » \footnote{M. Kornacker et al., « Impala: A Modern, Open-Source SQL Engine for Hadoop. », in CIDR, 2015, vol. 1, p. 9.}
For Presto, see « Presto: Interacting with petabytes of data at Facebook » \footnote{"Presto: Interacting with petabytes of data at Facebook." [Online]. https://www.facebook.com/notes/facebookengineering/presto-interacting-with-petabytes-of-data-atfacebook/10151786197628920.}
\section{Hadoop 3.3.1 cluster installation on Linux Ubuntu 20.04.1 LTS}
This section shows the installation and configuration of an Apache Hadoop version 3.3.1 cluster with YARN. As shown in the following diagram ~\ref{fig:clusterSchema}, the installation will be on three (3) machines with one Master node (hdmaster) and two Worker nodes (hdworker1 and hdworker2).
However, because this example setup is a pretty small cluster and the Hadoop master services will not consume much processing power on the Master node, we will also use the Master node as a Worker node.
\begin{figure}[ht]
\centering
\includegraphics[width=\linewidth]{clusterSchema.png}
\caption{Example Cluster Schema}
\label{fig:clusterSchema}
\end{figure}
The machines used for this example installation are interconnected through a network switch, run Linux Ubuntu 20.04.1 LTS operating system, have about 4G of RAM, 200G free space on their hard drives, and Intel i3 processors.
\subsection{Prerequisites}
In this section, we will see the initial setup for the operation of Apache Hadoop and some best practices.
Since Hadoop is a Java-based platform, in order for it to work, it needs the Java Virtual Machine (JVM) to run.
Hadoop 3.3.1 can run on Java 8 or 11.
We will install Java 8 because many Hadoop ecosystem components only support Java versions up to 8.
Another important aspect is that Hadoop uses SSH (Secure Shell) to connect to the cluster nodes.
Moreover, to provide better isolation and security between Hadoop services, it is recommended to create dedicated users.
\subsubsection{Install Java version 8}
Firstly, we will install Java 8 on all cluster nodes.
In the terminal:
{\parindent 0pt Update and upgrade packages:}
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
sudo apt update
sudo apt upgrade
\end{lstlisting}
Install Java 8 OpenJDK Development Kit:
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
sudo apt install openjdk-8-jdk
\end{lstlisting}
Check the Java version:
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
java -version
\end{lstlisting}
The output should look similar to this:
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
openjdk version "1.8.0_275"
OpenJDK Runtime Environment (build 1.8.0_275-8u275-b01-0ubuntu1~20.04-b01)
OpenJDK 64-Bit Server VM (build 25.275-b01, mixed mode)
\end{lstlisting}
\subsubsection{Hostname configuration}
It is important to determine the hostnames and IP addresses associated with each machine from the start.
Our master node is named "hdmaster", and our worker nodes are "hdworker1" and "hdworker2".
Make sure that each node has a static IP address so that it does not change over time.
This can be done from the network configuration file, which can be found at /etc/network/interfaces.
Add in the \textbf{/etc/hosts} file the \textbf{IP addresses and hostnames} corresponding to each node.
We are using the \textbf{vim} text editor for this task.
However, any other text editor could do it as well.
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
sudo vim /etc/hosts
\end{lstlisting}
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
# IPs and Hostnames for Hadoop configuration
192.168.2.120 hdmaster
192.168.2.121 hdworker1
192.168.2.122 hdworker2
\end{lstlisting}
\subsubsection{Create a Hadoop user for HDFS and MapReduce access}
We will create a non-root Hadoop user and a group named \textbf{hdoop} on each cluster node. However, using separate users for each Hadoop service is preferable because it provides better isolation and security.
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
sudo adduser hdoop
\end{lstlisting}
\subsubsection{SSH installation}
In the terminal:
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
sudo apt install ssh
\end{lstlisting}
Next, we will set up Passwordless SSH access for the Hadoop user.
Generate the SSH key pair with passphrase in the master node:
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
su - hdoop
ssh-keygen -t rsa -b 4096 -m pem
\end{lstlisting}
Then copy the SSH key from Master to the Workers and localhost to initiate SSH access without a password.
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
ssh-copy-id -i $HOME/.ssh/id_rsa.pub hdoop@hdworker1
ssh-copy-id -i $HOME/.ssh/id_rsa.pub hdoop@hdworker2
ssh-copy-id -i $HOME/.ssh/id_rsa.pub hdoop@localhost
\end{lstlisting}
Finally load the password for they SSH key in memory with ssh-agent
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
ssh-agent $SHELL
ssh-add
\end{lstlisting}
To check that the Hadoop User has gained passwordless access for the localhost and the worker nodes, we will attempt to connect to each node.
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
ssh hdoop@localhost
exit
ssh hdoop@hdworker1
exit
ssh hdoop@hdworker2
exit
\end{lstlisting}
\subsection{Hadoop installation}
The binary version of Apache Hadoop can be downloaded from the official website (https://hadoop.apache.org/).
We choose the "/usr/local/" directory for the installation.
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
cd /usr/local/
\end{lstlisting}
Download the Hadoop archive file:
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
sudo wget https://miroir.univ-lorraine.fr/apache/hadoop/common/hadoop-3.3.1/hadoop-3.3.1.tar.gz
\end{lstlisting}
Unarchive the newly downloaded file (here hadoop-3.3.1.tar.gz)
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
sudo tar xzf hadoop-3.3.1.tar.gz
\end{lstlisting}
Give the hdoop user the ownership of the directory:
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
sudo chown hdoop:hdoop -R /usr/local/hadoop-3.3.1
\end{lstlisting}
We will also create an alias for our installation directory, which could be useful when upgrading the Hadoop version in the future:
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
sudo ln -s hadoop-3.3.1 hadoop
\end{lstlisting}
\paragraph{Setting up Hadoop environment variables}\mbox{}\\
There are various environment variables to configure the Hadoop installation.
Some notable environment variables are:
\begin{itemize}
\item
The \textbf{JAVA\_HOME} variable informs Hadoop where to find the Java installation.
\item
The \textbf{HADOOP\_HOME} variable holds the absolute path to the Hadoop installation and the \textbf{HADOOP\_CONF\_DIR} variable points to the directory containing the Hadoop configuration files.
Hadoop ecosystem tools often require these variables to be set in order to find Hadoop libraries and configurations.
\item
The \textbf{HADOOP\_OPTS} variable specifies JVM (Java Virtual Machine) options to use when starting Hadoop services.
\end{itemize}
We will define these environment variables for all cluster nodes in the \textbf{.profile} file at the home directory of the \textit{hdoop} user. In this way, the shell will define our environment variables on each subsequential login.
First, switch to the hdoop user:
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
su - hdoop
\end{lstlisting}
Add Hadoop environment variables to the end of the hdoop user profile file.
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
vim /home/hdoop/.profile
\end{lstlisting}
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
## BEGIN -- HADOOP ENVIRONMENT VARIABLES
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export HADOOP_YARN_HOME=$HADOOP_HOME
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib/native"
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
export PATH=$PATH:$JAVA_HOME/bin
## END -- HADOOP ENVIRONMENT VARIABLES
\end{lstlisting}
Make the change of the profile file active immediately:
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
source /home/hdoop/.profile
\end{lstlisting}
Update the Hadoop environment configuration file
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
vim /usr/local/hadoop-3.3.1/etc/hadoop/hadoop-env.sh
\end{lstlisting}
Find the line defining containing "export JAVA\_HOME=" and update it to:
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
\end{lstlisting}
Check if the Hadoop command is now defined:
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
hadoop version
\end{lstlisting}
Hadoop is in its default configuration at this stage, which means it is in "Stand Alone" mode.
\subsection{Configuring Hadoop in fully-distributed mode}
To change Hadoop's default configuration, we can use site-specific configuration files located by default at \textbf{\$HADOOP\_HOME/etc/hadoop}.
Adding parameters to for example \textbf{mapred-site.xml}, \textbf{yarn-site.xml}, \textbf{capacity-scheduler.xml}, and other files in this directory means that Hadoop must consider these new properties listed instead of the default values.
\subsubsection{Edit the "workers" file}
The Hadoop master daemons need to know the hostnames or IP addresses of the worker nodes to communicate with and manage them. To provide this information to the Hadoop master daemons, we will edit the "workers" file.
In the master node:
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
vim /usr/local/hadoop/etc/hadoop/workers
\end{lstlisting}
Insert all workers hostnames or IP addresses (one per line)
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
hdworker1
hdworker2
localhost
\end{lstlisting}
Note that by adding \textbf{localhost} to this file, we also use our Master node as a Worker node.
\subsubsection{Edit the "core-site.xml" file}
The \textbf{core-site.xml} file enables us to overwrite Hadoop's default configuration properties from \textbf{core-default.xml}.
Hadoop's official website provides more details about configurable values in the core-site.xml and the set of default values.
\footnote{See https://hadoop.apache.org/docs/r3.3.1/hadoop-project-dist/hadoop-common/core-default.xml}
We will set the default file system property to HDFS and point it to our master node for our installation.
In the master node and the worker nodes :
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
vim /usr/local/hadoop/etc/hadoop/core-site.xml
\end{lstlisting}
Insert this property between the opening (<configuration>) and closing (</configuration>) tags.
\begin{lstlisting}[language=xml, frame=single, basicstyle=\footnotesize]
<property>
<name>fs.default.name</name>
<value>hdfs://hdmaster:9000</value>
</property>
\end{lstlisting}
\subsubsection{Creation of Hadoop's data directories}
Optionally, we can define the directories where HDFS DataNodes will store their local data blocks and where the NameNode will store its \textit{edit-logs} and \textit{fsimage}.
In our example we will use the "/usr/local/tmp\_hadoop/namenode" and "/usr/local/tmp\_hadoop/datanode" directories.
First, we will create the directories, and then in the following sections, we will present the corresponding Hadoop configuration.
In the master node (NameNode):
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
sudo mkdir -p /usr/local/tmp_hadoop/hdfs/namenode
sudo mkdir -p /usr/local/tmp_hadoop/hdfs/datanode
sudo chown -R hdoop:hdoop /usr/local/tmp_hadoop/
\end{lstlisting}
In the DataNodes nodes :
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
sudo mkdir -p /usr/local/tmp_hadoop/hdfs/datanode
sudo chown -R hdoop:hdoop /usr/local/tmp_hadoop/
\end{lstlisting}
\subsubsection{Edit the "hdfs-site.xml" file}
The \textbf{hdfs-site.xml} file enables us to overwrite the default configuration for the HDFS client from \textbf{hdfs-default.xml}.
In our example, we will configure the block replication factor to 3 and indicate that the NameNode should store its local files (fsimage/edit-logs) in the data directory we created in the previous section ("Creation of Hadoop's data directories").
For more details about what is configurable in the \textbf{hdfs-site.xml} see Hadoop's official website.
\footnote{See https://hadoop.apache.org/docs/r3.3.1/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml}
In the master node and the worker nodes :
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
vim /usr/local/hadoop/etc/hadoop/hdfs-site.xml
\end{lstlisting}
Insert these properties between the opening (<configuration>) and closing (</configuration>) tags.
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>/usr/local/tmp_hadoop/hdfs/namenode</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/usr/local/tmp_hadoop/hdfs/datanode</value>
</property>
\end{lstlisting}
\subsubsection{Edit the "yarn-site.xml" file}
The \textbf{yarn-site.xml} file enables us to overwrite the default configuration for YARN from \textbf{yarn-default.xml}.
In our example, we will configure the hostnames and ports used by the Resource Manager and Node Managers.
We will also configure the auxiliary shuffle service and reduce the minimum container memory allocation value.
Furthermore, we will enable log aggregation to store container logs on HDFS.
For more details about what is configurable in the \textbf{yarn-site.xml} see Hadoop's official website.
\footnote{https://hadoop.apache.org/docs/r3.3.1/hadoop-yarn/hadoop-yarn-common/yarn-default.xml}
In the master node and the slave nodes :
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
vim /usr/local/hadoop/etc/hadoop/yarn-site.xml
\end{lstlisting}
Insert these properties between the opening (<configuration>) and closing (</configuration>) tags.
\begin{lstlisting}[language=xml, frame=single, basicstyle=\footnotesize]
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hdmaster</value>
</property>
<property>
<name>yarn.resourcemanager.address</name>
<value>hdmaster:8032</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>hdmaster:8030</value>
</property>
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>hdmaster:8031</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</nam>
<value>256</value>
</property>
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
\end{lstlisting}
\subsubsection{Edit the "mapred-site.xml" file}
The \textbf{mapred-site.xml} file enables us to overwrite Hadoop's default configuration properties from \textbf{mapred-default.xml}.
Hadoop's official website provides more details about what is configurable in the \textbf{mapred-site.xml} along with the set default values.
\footnote{http://hadoop.apache.org/docs/r3.3.1/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml}
We will indicate that we want to use YARN as the runtime framework for executing our MapReduce jobs for our installation. We will also indicate where to search for related jar files and packages for our MapReduce applications.
In the master node and the slave nodes :
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
vim /usr/local/hadoop/etc/hadoop/mapred-site.xml
\end{lstlisting}
Insert these properties between the opening (<configuration>) and closing (</configuration>) tags.
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>yarn.app.mapreduce.am.env</name
<value>HADOOP_MAPRED_HOME=$HADOOP_MAPRED_HOME</value>
</property>
<property>
<name>mapreduce.map.env</name>
<value>HADOOP_MAPRED_HOME=$HADOOP_MAPRED_HOME</value>
</property>
<property>
<name>mapreduce.reduce.env</name>
<value>HADOOP_MAPRED_HOME=$HADOOP_MAPRED_HOME</value>
</property>
\end{lstlisting}
\subsubsection{Format the NameNode}
In the master node, to start HDFS for the first time, it is required to format the NameNode.
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
hdfs namenode -format
\end{lstlisting}
\subsection{Starting the Hadoop daemons on the cluster}
\subsubsection{Starting the HDFS daemons}
To start HDFS, Hadoop provides a shell script named \textbf{start-dfs.sh}.
In the master node:
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
start-dfs.sh
\end{lstlisting}
Run this command to check that the HDFS daemons started.
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
jps
\end{lstlisting}
You should get something similar to this
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
40161 NameNode
40708 Jps
40549 SecondaryNameNode
\end{lstlisting}
In the slave nodes :
When the NameNode daemon launches, it connects to the worker nodes through SSH.
To check that the worker nodes are properly started, we can connect to them with SSH and run this command:
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
jps
\end{lstlisting}
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
8561 Jps
7753 DataNode
\end{lstlisting}
\subsubsection{Starting the YARN daemon}
Similar to HDFS, Hadoop provides a script to start YARN named \textbf{start-yarn.sh}
In the master node:
Run this command to start YARN
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
start-yarn.sh
\end{lstlisting}
Check that the YARN ResourceManager has started
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
jps
\end{lstlisting}
The output should be similar to this
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
8128 ResourceManager
8561 Jps
7604 NameNode
7964 SecondaryNameNode
\end{lstlisting}
In the worker nodes:
Here too, the YARN ResourceManager will connect to the worker nodes and start the NodeManager daemon.
No further actions are required.
To check the NodeManager has started, connect to the slave nodes and run this command:
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
jps
\end{lstlisting}
The output should be similar to this
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
8561 Jps
8249 NodeManager
7753 DataNode
\end{lstlisting}
To stop the Hadoop daemons, run the \textbf{stop-yarn.sh} and \textbf{stop-dfs.sh} scripts as the hadoop user (hdoop in our example).
\paragraph{Configuring YARN and MapReduce for optimal resource management}mbox{}\\
It is crucial to know where to find the ideal balance for the system to manage and optimize shared resources and memory usage.
One configuration may work well for one application and not for another.
In many cases, processes running multiple applications fail because the memory size of ApplicationMaster and other containers exceeds the available capacity.
Requiring resource-heavy containers may involve the application being accepted for execution and the process being left in a queue or abruptly stopped during execution.
The following tables describe the default and current values of some relevant configuration properties in their respective files.
\paragraph{mapred-site.xml}\mbox{}\\
\begin{table}[H]
\begin{tabular}{llll}
\rowcolor[HTML]{4472C4}
{\color[HTML]{FFFFFF} Property name} & {\color[HTML]{FFFFFF} Default value} & {\color[HTML]{FFFFFF} Current value} & {\color[HTML]{FFFFFF} description} \\
\rowcolor[HTML]{D9E2F3}
mapreduce.map.memory.mb & 1204 & 256 & \\
mapreduce.reduce.memory.mb & 3072 & 256 & \\
\rowcolor[HTML]{D9E2F3}
mapreduce.map.java.opts & -Xm900m & -Xmx205m & \\
mapreduce.reduce.java.opts & -Xm2560m & -Xmx205m & \\
\rowcolor[HTML]{D9E2F3}
yarn.app.mapreduce.am.resource.mb & 1536 & 768 & \\
yarn.app.mapreduce.am.command-opts & -Xm1024m & -Xmx615m &
\end{tabular}
\caption{}
\end{table}
\paragraph{yarn-site.xml}\mbox{}\\
\begin{table}[H]
\begin{tabular}{llll}
\rowcolor[HTML]{4472C4}
{\color[HTML]{FFFFFF} Property name} & {\color[HTML]{FFFFFF} Default value} & {\color[HTML]{FFFFFF} Current value} & {\color[HTML]{FFFFFF} description} \\
\rowcolor[HTML]{D9E2F3}
yarn.nodemanager.resource.memory-mb & & 2048 & \\
yarn.scheduler.minimum-allocation-mb & 1024 & 256 & \\
\rowcolor[HTML]{D9E2F3}
yarn.scheduler.maximum-allocation-mb & 8192 & 1408 & \\
yarn.scheduler.minimum-allocation-vcores & 1 & 1 & \\
\rowcolor[HTML]{D9E2F3}
yarn.scheduler.maximum-allocation-vcores & 32 & 4 & \\
yarn.scheduler.increment-allocation-mb & & 128 & \\
\rowcolor[HTML]{D9E2F3}
yarn.nodemanager.vmem-check-enabled & true & false & \\
yarn.nodemanager.pmem-check-enabled & true & true &
\end{tabular}
\caption{}
\end{table}
\paragraph{Hadoop MapReduce example program}\mbox{}\\
This section will showcase how to compile and run a MapReduce program for Hadoop.
We will use a typical introductory WordCount example.
In the example, we will use the \textbf{test\_file\_1.txt} and \textbf{test\_file\_2.txt} files stored in the \textbf{/user/hdoop/example} HDFS directory, which we created earlier.
First, we will create a directory in our local filesystem to put our .java program.
\begin{lstlisting}[language=bash, frame=single, basicstyle=\footnotesize]
mkdir ~/word_count
cd ~/word_count/
\end{lstlisting}
Next we will save the following Java code in a file named \textbf{WordCount.java} in our \textbf{word\_count} directory.
\paragraph{WordCount.java} \footnote{Source: https://hadoop.apache.org/docs/r3.3.1/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html}
\begin{lstlisting}[language=java, frame=single, basicstyle=\footnotesize, breaklines=true, postbreak=\mbox{\textcolor{red}{$\hookrightarrow$}\space}]
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();