forked from decster/nativetask
-
Notifications
You must be signed in to change notification settings - Fork 0
/
DESIGN.txt
899 lines (781 loc) · 40.9 KB
/
DESIGN.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
The Design of NativeTask
========================
Author: Binglin Chang ([email protected])
Introduction
------------
NativeTask is a high performance C++ API & runtime for Hadoop MapReduce. Why
it is called *NativeTask* is that it is a *native* computing unit only focus
on data processing, which is exactly what *Task* do in the Hadoop MapReduce
context.
In other word, NativeTask is not responsible for resource management, job
Scheduling and fault-tolerance. Those are all managed by original Hadoop
components as before, unchanged. But the actual data processing and computation,
which consumes most of cluster resources, are delegated to this highly
efficient data processing unit.
NativeTask is designed to be very fast, with native C++ API. So more
efficient data analysis applications can build upon it, like LLVM based
query execution engine mentioned in Google's
[Tenzing](http://research.google.com/pubs/pub37200.html).
Actually this is the main objective of NativeTask, to provide a efficient
native Hadoop framework, so much more efficient data analyze tools can
be built upon it:
* Data warehousing tool using state of the art query execution techniques
existing in parallel DBMSs, such as compression, vectorization, dynamic
compilation, etc. These techniques are more easy to implement in
native code, as we can see that most of these techniques are implemented
using C/C++: Vectorwise, Vertica.
* High performance data mining/machine learning libraries, most of these
algorithms are CPU intensive, involving lot of numerical computation,
or have been implemented using native languages already, a native runtime
permits better performance, or easy porting these algorithms to Hadoop.
From user's perspective, NativeTask is a lot like Hadoop Pipes: using header
files and dynamic libraries provided in NativeTask library, you compile
your application or class library to a dynamic library rather than executable
program(because we use JNI), then using a Submitter tool to submit you
job to Hadoop cluster like streaming or pipes do. Tutorials or manuals are not
available yet, also you can read examples in src/main/native/examples.
Features
--------
1. High performance, more cost effective for your Hadoop cluster;
2. C++ API, so user can develop native applications or apply more
aggressive optimizations not available or convenient for java,
like SSE/AVX instruction, LLVM, GPU computing, coprocessor etc.
3. Support no sort, by removing sort, the shuffle stage barrier can be
eliminated, yielding better data processing throughput;
4. Support foldl style API, much faster for aggregation queries;
5. Binary based MapReduce API, no serialization/deserialization overhead;
6. Compatible with Hadoop 0.20-0.23(need task-delegation patch)
Why it is Fast?
---------------
That's the topic people most interested in, but before the explain technical
details of NativeTask, the more appropriate question to begin with should be:
*Does Hadoop fast enough?*
Actually, No.
It is common to see a well hand written C++ program to process 1GB data
in just a few seconds, but it may take MapReduce task minutes to process the
same data, and many research have shown that Hadoop MapReduce is not so
efficient comparing to traditional parallel DBMS for analytical workloads.
On the other hand Hadoop does better at scalability and fault tolerance.
Although it is not efficient enough, but I believe there is no technical
limitations for Hadoop to get the same performance as hand written native
programs. so:
*How fast can it get?*
Let's do some computation for this, for example, consider a commodity server:
Dell PowerEdge C2100
CPU: 2 * 6 core Xeon5600
Memory: 48GB
Disk: 12 * 2TB SATA
This server can run 12 tasks in parallel, each task use 1 core(2 thread),
4GB memory, 1 SATA disk. A typical map task data flow and it's ideal
speed would be:
Read data from HDFS 100MB/s (data local task)
Decompression 700-2000MB/s ratio 2-5x (snappy or lz4)
RecordReader+Mapper 2000MB/s (LineRecordReader+IdenticalMapper)
Sort 300-600MB/s (varies a lot, faster if key/value are large)
Compression 250-500MB/s (varies a lot, depending on data type)
Write to local Disk 100MB/s (2000MB/s with page cache)
One thing to notice here is that with lightweight compression enabled, disk is not
bottleneck any more, system throughput is more and more determined by
the raw CPU costs.
So if all things are perfect, a map task should handle 1GB(250MB compressed) data:
Read + Decompression 2.5s
RecordReader+Mapper 0.5s
Sort 2s
Compression+Write 3s
Total 8s
So it is 1GB/8s = 125MB/s. Furthermore, for selection+filter+join/aggregation
queries, sort is not needed, output size is much less than input size,
and each core has 2 threads to use in one task, combine all these factors,
it is possible to process 1GB data in just 3s,
that's about 333MB/s. For the whole server, it is 12 * 333MB/s = 4GB/s.
This means that in best conditions(totally balanced scheduling,
perfect data locality, no slow node or failure), A 25 node cluster with 10GbE
should:
* Complete 1TB Terasort in 58 seconds (27s map + 10s shuffle + 21s reduce),
if input, map output, final output are all compressed (Terasort is an IO
test by default and do not allow compression, but it can be served as
typical MapReduce framework test).
* Answer an simple aggregation query against 1TB dataset in 10 seconds.
Sure there are lots of assumptions in the arguments above, but again there
are no technical limits in every stage of the whole processing flow.
With this processing throughput, it is possible to setup Hadoop based data
warehouse at very low cost comparing to commercial data warehousing solutions,
but with comparable performance. The server mentioned above cost
about 10-20K$ per node, with 8TB(3replicaion)/24T(decompressed) capacity,
that is 1-2K$ per core, 1-2K$/TB. With hardware cost continue to decrease,
this cost will continue to drop.
Although this sounds amazing, but it is a long way to get there.
Currently, a well written Hadoop map task can process 1GB data in about 40-120s,
so it's 10-30MB/s, Hive/Pig tasks may take much longer time because their high
level abstraction. Apparently it is far from the maximum possible
speed(100-300MB/s). This leads to the next question:
*Why Hadoop not perform well enough? How to improve?*
Here are some top reasons(but not all):
1. I/O bottleneck. Most Hadoop workloads are data intensive, so if no
compression is used for input, mid-output, and output, I/O(disk,
network) could be a bottleneck.
The solution is use compression everywhere. Luckily there are amazing
general lightweight compression algorithms out there: snappy & lz4,
with 2x-5x compression ratio(actually much higher for Haodop workload
data types), I/O bandwidth is virtually 2x-5x of real I/O bandwidth.
Another thing need to mention is high speed network, today's server are
much powerful than a few years ago, with more and more cores and RAM per
node, a server can run more tasks concurrently, so high speed network
like 10~40GbE will become standard setup for Hadoop cluster, whether the
current Hadoop network stack(jetty/netty based) can sustain such big
throughput is also questionable.
2. Inefficient implementation. This inefficiency lies everywhere:
* Map side sort: current sort can be 10x slower than a well written sort,
because current sort implementation suffer from cache locality problem
and is not partition based. This will likely be improved in latest
Hadoop version but it is still not optimal.
* Serialization/Deserialization: this leads to inevitable object creation,
lots of small buffer copies, heavy stream abstraction, primitive
type boxing/unboxing, suboptimal compare operation, etc.
Ser/Deser are overused both in MapRedcue framework level and query
execution level(Hive/Pig), this is the main reason for Hadoop's poor
data processing throughput. There are
[discussions](https://issues.apache.org/jira/browse/MAPREDUCE-326)
for this long ago, but no progress yet.
Here is my thought: at MR framework level a pure binary interface is
enough & efficient for a query execution engine build upon it,
or even more aggressive: don't use MR API, just use task input split &
data redistribution utility(shuffle) provided by the MR framework;
At query execution level, ser/deser is not necessary too, the most
efficient way is to use some sort of schema to describe data, using
C struct like binary representation to store data, then using LLVM to
directly generate native code based on schema and logical query plan.
This can leads to a big boost in processing throughput, Google has
reported 6x-12x throughput boost using LLVM in Tenzing.
* Shuffle: Hadoop 0.23 has done many optimization for shuffle(netty, batch
fetch, etc.), but it can be further optimized(for example, shuffle in
lastest Hadoop version still slower than Baidu's internal version).
When sort is not needed, there are more optimizations to exploit.
And sure there will be a lot of tuning work to fully utilize high
speed Ethernet too.
* Data locality. This is one of the main advantage of parallel DBMS over
Hadoop, with advanced data partitioning, indexing, and sophisticated
query plan, most data are processed locally and data movements are
reduced to minimum. Hive have done some similar optimizations, but
more can be done, also some optimization need more flexible
computing model beyond MapReduce.
* Scheduling & starting overhead. This has big impact on small jobs and
multiple iteration jobs.
3. Inflexible programming paradigm. MapReduce is a very general data
processing model, this gives it's strength, but also limits its
performance. For some specific tasks, there are more efficient methods
to adopt. There are many examples in
[Tenzing](http://research.google.com/pubs/pub37200.html) paper, also
there are lots of research recently about improving query performance for
MapReduce. Hive has done many optimizations on application level,
but some framework level optimizations/interfaces are needed, such as
hash-aggregation with no sort for aggregation queries, map-side join
with dictionary-server, chained MapReduce job(combine reducer with
mapper of the next MR job) etc.
These factors directly leads to the design principles of NativeTask:
1. Native implementation.
I'm fully aware that java is very efficient, actually based on my
experience, java is very efficient for normal tasks, and java has certain
runtime optimizations techniques which are much more difficult for
c/c++ to realize. For example, it is very difficult to do dynamic
optimizations such as lock coarsening, virtual function inlining in C++.
But there are some tasks/optimizations, which I believe are essential for
this project, are better done in a native runtime:
* _Compression_ Nearly all the fastest compression algorithms are
written in native code, Currently Hadoop uses JNI to call these
libraries in a bulk processing manner, but still there are some
overheads crossing JNI boundary, especially when decompression
speed is very fast(>1GB/s). And some techniques like lazy
decompression, direct operations on compressed data can not fit
in bulk processing.
* _SSE/SIMD_ This is similar to compression, currently Hadoop use
JNI to leverage SSE optimization such as CRC checksum. But again
it is not a general solution.
* _LLVM_ As mentioned before, on of the main objectives of this project
is to provide a native runtime to support high level query execution
engine, it is almost certain that LLVM will be used. Because LLVM is
a native C++ library, so C++ is more suitable.
2. Avoid serialization and memory copy.
As mentioned before, serialization has a lot of overhead. To get maximum
throughput, it is better to abandon serialization, or to introduce some
serialization method that can operate directly on serialized data, or to
avoid object creation and memory copy. Again it is hard or not user
friendly in java, but convenient and straightforward in native code, such
as C struct like data representation. In addition, when the whole data
flow is in native side(CRC checksum, decompression, reader, process, writer,
compression, CRC checksum), a lot of small memory copies can be and should
be eliminated. So the interface and underlying processing flow are designed
to try to eliminate most memory copies.
3. Keep it simple.
This project mainly focuses on pure data processing, unlike typical
distributed systems, there shouldn't be much complex things involved, such
as multi-thread programming & synchronization, high level abstractions or
complex system programming. For example, this project try to avoid
asynchronized output collector, io stream abstractions and other complex
things existing in current MapReduce design.
4. Less concern of compatibility.
As mentioned before, the main objective of this project is to build high
level data analysis tools/libraries upon this, the compatibility should be
constrained in a higher level(such as query language level), while permitting
more flexibility in the lower level, so we can experiment varies
things on this. And the new MRv2/YARN framework permits us to experimenting
new frameworks. Finally, since this project is in very early stage, lots of
things will certainly go through radical changes during development.
Design & Implementation
-----------------------
NativeTask consists of two major parts: java side and native side. Java side
is responsible to bypass normal java data flow and delegate the data
processing to native side, and native side do the actual computation.
Java side and native side communicate with each other using JNI, in a
synchronized, batch processing(block based) way. This is different from
other IPC mechanisms used in
[Streaming](http://hadoop.apache.org/common/docs/r1.0.0/streaming.html)
and
[Pipes](http://hadoop.apache.org/common/docs/current/api/org/apache/
hadoop/mapred/pipes/package-summary.html).
Sockets and pipes are fast enough for data processing, but they consume
a lot of CPU and will introduce multi-thread programming and asynchronized
processing.
### Task delegation
To bypass normal java data flow, NativeTask introduces a task delegation
interface, it will insert the bypassing logic into the beginning of MapTask
and ReduceTask(needs modification to the current
MapReduce source code). The bypassing logic will check whether a delegator
is configured in JobConf, if there is, it will use the configured delegator
to run the task, bypassing the original logic.
The delegation interface looks like this:
MapTask: void run(TaskAttemptID taskID,
JobConf job,
TaskUmbilicalProtocol umbilical,
DelegateReporter reporter,
Object split)
ReduceTask: void run(TaskAttemptID taskID,
JobConf job,
TaskUmbilicalProtocol umbilical,
DelegateReporter reporter,
RawKeyValueIterator rIter)
For MapTask, split information is needed, currently only FileSplit is
supported by native RecordReader. For ReduceTask, shuffle and merge is
still done in java side unchanged, so RawKeyValueIterator is passed to
delegator. A native implementation of shuffle and merge will certainly
have better performance in the future. I have proposed another possible
(and more general) solution [Extensible Task(MAPREDUCE-3246)](https://
issues.apache.org/jira/browse/MAPREDUCE-3246)
to try to make task extensible, but in practice I found the delegation
interface more convenient because there are still many works can't be
done in native side right now. Anyway these are minor issues, since both
are easy to refactor.
Currently delegation supports two modes of dataflow:
* Native Mapper/Reducer only: compatible with existing InputFormat/OuputFormat
and RecordReader/Writer, Key/Value pairs are passed to/from native side in
batch.
The dataflow of a typical MapTask:
RecordReader -> Serialize -> [DirectByteBuffer] -> Native Mapper -> Native
Output Collector(Sort & Spill)
The dataflow of a typical ReduceTask:
RawKeyValueIterator -> [DirectByteBuffer] -> Native Reducer ->
[DirectByteBuffer] -> Deserialize -> RecordWriter
* Native Mapper/Reducer with native RecordReader/Writer: currently
InputFormat/OutputFormat still exist for input split and output
commit, but RecordReader/Writer are native, so native task can implement
RecordReader/Writer for read input or write output directly, yielding
better performance and flexibility.
The dataflow of a typical MapTask:
Input Split -> Native RecordReader -> Native Mapper -> Native Output
Collector
The dataflow of a typical ReduceTask:
RawKeyValueIterator -> [DirectByteBuffer] -> Native Reducer -> Native
RecordWriter
### Small Batch Processing
As described before, the java side and native side pass serialized K/V data in
a block based batch processing pattern, rather than record based. This is
because JNI calls have considerable overheads, batch processing can minimize
the numbers of JNI calls. The block size is about 32KB~128KB, smaller than
L2-cache.
The JNI based batch processing is implemented in Java class
NativeBatchProcessor and native C++ class BatchHandler, the JNI stuffs are
isolated in these 2 classes, so other part of the project needn't to deal with
the complexity of JNI.
### Class Library
One problem of C++ is its lack of reflection, so it's difficult to setup
mapper, reducer, record reader, writers in JobConf at client side and
create them dynamically at task. Pipes uses static linking, unlike Pipes,
NativeTask uses something more dynamic, a class library based structure.
A typical application based on NativeTask consists of several dynamic
libraries(as class libraries), for example:
[Task JVM]
|
delegation
|
|--load-> [libnativetask.so]
|--load-> [userlibrary.so]
|--load-> [application.so]
|
create native objects
|
run mapper/reducer
|
|----------------|
done()
NativeTask uses a little template tricks to realize a very simple equivalent
of Hadoop's ReflectionUtils.newInstance().
Consider .so library as class libraries(like .jar files), every .so library
have an entrance function to create C++ objects of the classes in this library.
The dynamic library, libnativetask.so, is the NativeTask runtime, but it is
also served as a class library, with some predefined Mapper/Reducer,
Partitioner and RecordReader/Writer, such as IdentitcalMapper/Reducer,
HashPartitioner, TotalOrderPartitioner, LineRecordReader/Writer, etc.
The drawback of dynamic linking is the poor ABI compatibility of C++, but since
this is an open source project, and mainly target on Linux and homogeneous
computing environment, and based on my experience in HCE(Hadoop C++ Extension),
this is not a serious problem.
### IO buffers and Compression
To minimize buffer copy, two light weighted io buffers are introduced:
ReadBuffer & AppendBuffer, these are different from decorator pattern based
java & Hadoop IO streams, ReaderBuffer & AppendBuffer are implemented to
inline most frequently invoked methods, and add code path to avoid one buffer
copy when supporting compression/decompression. This doesn't mean NativeTask
don't use decorator based stream, but they are only used in batch mode, such
as file read/write and CRC checksum.
It is much easier to add a compression codec in native code, currently snappy,
lz4 and gzip have been integrated into NativeTask.
### Task Dataflow
The dataflow and main logic of map/reduce task are almost the same as of the
original implementation, the differences are the implementation details.
The general difference is that the native implementation tends to be simpler
and so is easy to be optimized, and the mapper/reducer, reader/writer API is
designed to make zero copy possible.
#### Map Output Collector
This part contributes a lot of performance gains. As mentioned before, sort
implementation of the current Hadoop is suboptimal. So a different partition
based sort & spill method is used. The main components of for this method is
described below:
Basically, map output collect is a partitioned key/value buffer, mapper emit
key/value pairs and then a partition number is generated using partitioner,
map output collect find a PartitionBucket to put this key/value pair to,
a PartitionBucket has a array of MemoryBlocks to hold KV pair, if the last
MemoryBlock is full, it will allocate a new MemoryBlock from MemoryPool,
if there is not enough memory in MemoryPool, a spill will be activated.
MemoryPool hold the buffer of size io.sort.mb, and track current buffer usage,
notice that this buffer will only occupy virtual memory not RSS(memory really
used) if the memory is not actually accessed, this is better than java because
java initialize arrays.
MemoryBlock is small chunk of memory block backed by MemoryPool, used by
PartitionBucket. The default size of MemoryBlock equals ceil(io.sort.mb /
partition / 4 / MIN_BLOCK_SIZE) * MIN_BLOCK_SIZE, currently MIN_BLOCK_SIZE
equals 32K, and the max size of MemoryBlock is 1M, it should be dynamically
tuned according to partition number & io.sort.mb in the future.
The purpose of MemoryBlock is to reduce CPU cache miss. When sorting large
indirect addressed KV pairs, the sort time will be dominated by RAM random
reads, so MemoryBlock is used to let each bucket get relatively continuous
memory.
PartitionBucket stores KV pairs for a partition, it has two arrays:
vector<MemoryBlock *> blocks
blocks used by this bucket
vector<uint32_t> offsets
KV pair start offset in MemoryPool
This vector is not under memory control(in io.sort.mb) yet, but in practice it
doesn't affect memory footprint too much.
This approach will not work well when partition number & Key/Value size is
large, but this is rare case, and it can be improved, just for example, we
can use MemoryPool directly (disable MemoryBlock) if io.sort.mb/partition
number is too small.
### Map Side Sort
Since map output buffer is partitioned, we can sort each partition separately,
this is different from java's single buffer approach. By doing so, sort can
be much faster, because sort a big array is much slower than sort many small
arrays; small array also means less cache miss; and partition number does not
needed to be compared in sort. My test have shown 10x-20x speedup in sort
performance.
Currently only binary comparator is supported, because it is efficient, and
enough for most applications, fix length key comparison and user defined
compare function maybe useful, they can be implemented in the future.
### No Sort Dataflow
NO sort dataflow is easy to implement in the native map side, just do not sort
each PartitionBucket, since combiner relies on grouping KV pairs together,
so combiner is not supported in no sort dataflow, but combine can be done in
mapper logic in many cases. Originally I plan to implement grouping dataflow
that do support combiner, but after sort is optimized, there seems very little
benefits to support grouping.
Since reduce side shuffle and merge is not implemented yet, no sort dataflow
in reduce side is implemented in java. A patch is submitted to
[MAPREDUCE-3246](https://issues.apache.org/jira/browse/MAPREDUCE-3246)
with both map and reduce side implementation.
### Parallel Spill
Since map output KV buffer is partitioned, parallel sort and spill became
possible, but this need some change to the original Hadoop code so I left
this not implemented. For example, suppose a map task with reducer number
of 100, instead of spilling to one file, we spill to one directory:
output
|- partition0-49.out
|_ partition50-100.out
then sorting, combining, spilling, compression can all be done in parallel,
to fully utilize CPU resource and reduce task execution time.
#### Reduce Task
Shuffle and merge are not implemented yet, so there nothing special.
2 new interfaces are introduced in combiner and reducer stage, so you can
use mapper or [folder](https://issues.apache.org/jira/browse/MAPREDUCE-3247)
interface in combiner and/or reducer stage. These two interfaces are both
passive interfaces, which are suitable in no sort dataflow to implement
aggregation style workloads. Mapper API is for user who want to manage
their hashtable by themselves, Folder API is for users who want the framework
to manage hashtable for them. This work is experimental and not
finished yet.
### Usability and Others
To increase usability, A few classes are built into NativeTask library:
LineRecordReader/LineRecordWriter
IdenticalMapper/IdenticalReducer
HashPartitioner
TotalOrderPartitioner
More Reader/Writers will be added, to support other Input/OutputFormats such
as SequenceFile and RCFile.
I also implemented Terasort & Wordcount, bundled with NativeTask library,
to make performance test easier.
There is an example in the "example" directory, a simple version of Hadoop
Streaming, to illustrate a relatively complex demo.
There are quite some utility classes missing in C++ comparing to Java,
I have to re-implement them, such as synchronization utils, process & pipes,
random generator etc. Some of them are copied and modified based on JDK
and google-leveldb.
This project use a lot of open source projects from google: snappy, gtest,
cityhash, leveldb, probably sparsehash for hash aggregation implementations
in the future. Another project is LZ4, I'm quite impressed by its simplicity
and amazing speed.
Performance Experiments
-----------------------
I tested hadoop-1.0 and NativeTask using simple MapReduce applications: Terasort and
WordCount, on a 15 node cluster.
### Cluster Configuration
The test cluster has 16 nodes connected by 1Gb Ethernet, each node has:
CPU: Xeon(R) CPU E5645 * 2, 2.4GHz, 12 core, 24 thread
Memory: 32GB
Disk: 12 * 1T SATA
JDK: 1.6 u23
Map Task: 7
Reduce Task: 7
I use Hadoop version 1.0 patched with task delegation patch.
The namenode and jobtracker are deployed on the save node, datanodes and
tasktracker are deployed on the other 15 nodes. So the whole cluster has 105
map slots, and 105 reduce slots. Block size is configured to 256MB.
The NativeTask library is compiled by gcc version 3.4.5, because it is the only
available compiler in the test environment, this compiler is very old and probably
generate bad native code. Actually on my own computer Macbook Pro with gcc
version 4.2.1 (Apple Inc. build 5659), the result is much better(50%-70% faster),
the CPU of my computer is Intel Core i5 2.3GHz, it should have similar performance
with Xeon E5645. Anyway I suggest anyone who is interested to compile the code and
run on their own environment, and let me know. I don't think I will have resources
and time to do large scale tests recently :(
### Test Application
Standard Terasort is actually an IO test and don't allow compression, but for the
purpose of this experiment, to evaluate the data processing throughput, snappy
compression is used in input, mid-output and final output, this actually moves
the bottleneck from disk and network IO to CPU. This test focus on pure
framework performance, key/value is passed directly in mapper and reducer,
without object creation and copying.
WordCount is a simple aggregation workload, and their are some computation in
application level. The original WordCount demo implementation is inefficient,
involving lots of type cast, object creation and copying. I make an optimized
version using the same implementation in NativeTask, both test results will
be included.
Hera are some characteristics of terasort and wordcount:
<table>
<tr>
<th></th>
<th> Terasort </th>
<th> WordCount </th>
</tr>
<tr>
<th>Key value size</th>
<td>100</td>
<td>8-16</td>
</tr>
<tr>
<th>Combiner</th>
<td>No</td>
<td>Yes</td>
</tr>
<tr>
<th>Input</th>
<td>200G(44G compressed)</td>
<td>100G(52G compressed)</td>
</tr>
<tr>
<th>MapTask</th>
<td>200(1G/task)</td>
<td>200(500M/task)</td>
</tr>
<tr>
<th>ReduceTask</th>
<td>200</td>
<td>100</td>
</tr>
<tr>
<th>Compression Ratio</th>
<td>about 0.2</td>
<td>about 0.5</td>
</tr>
<tr>
<th>Input/Output</th>
<td>1:1</td>
<td>1:0(almost)</td>
</tr>
</table>
### Test data generation
Input data generation commands:
Terasort
bin/hadoop jar hadoop-examples-1.0.1-SNAPSHOT.jar teragen 2000000000 /tera200G-snappy
WordCount
bin/hadoop jar hadoop-examples-1.0.1-SNAPSHOT.jar randomtextwriter -Dtest.randomtextwrite.total_bytes=100000000000 -Dtest.randomtextwrite.bytes_per_map=500000000 -outFormat org.apache.hadoop.mapred.TextOutputFormat /text100G-snappy
Tests execution commands:
Terasort Java
bin/hadoop jar hadoop-examples-1.0.1-SNAPSHOT.jar terasort /tera200G-snappy /terasort200G-java
Terasort NativeTask
bin/hadoop jar lib/hadoop-nativetask-0.1.0.jar terasort /tera200G-snappy /terasort200G-nt
WordCount Java
bin/hadoop jar hadoop-examples-1.0.1-SNAPSHOT.jar wordcount /text100G-snappy /wordcount-100G-java
WordCount Java Optimized
bin/hadoop jar hadoop-examples-1.0.1-SNAPSHOT.jar wordcount -Dwordcount.enable.fast.mapper=true /text100G-snappy /wordcount-100G-java-opt
WordCount NativeTask
bin/hadoop jar lib/hadoop-nativetask-0.1.0.jar -reader NativeTask.LineRecordReader -writer NativeTask.TextIntRecordWriter -mapper NativeTask.WordCountMapper -reducer NativeTask.IntSumReducer -combiner NativeTask.IntSumReducer -input /text100G-snappy -output /wordcount-100G-nt
### Test Result
Terasort
<table>
<tr>
<th>Terasort 200G(io.sort.mb=1200M, no merge)
200Map,200Reduce</th>
<th>Total Time(s)</th>
<th>Map Avg(s)</th>
<th>Map Best(s)</th>
<th>Sort(s)</th>
<th>Shuffle Avg(s)</th>
<th>Shuffle Best(s)</th>
<th>Reduce Avg(s)</th>
<th>Reduce Best(s)</th>
<th>Map CPU(ms)</th>
<th>Reduce CPU(ms)</th>
<th>Map Memory(M)</th>
<th>Reduce Memory(M)</th>
</tr>
<tr>
<th>java</th>
<td>220</td>
<td>51</td>
<td>47</td>
<td>23.336</td>
<td>31</td>
<td>20</td>
<td>20</td>
<td>14</td>
<td>10357020</td>
<td>11466330</td>
<td>292001</td>
<td>338160</td>
</tr>
<tr>
<th>native</th>
<td>139</td>
<td>15</td>
<td>14</td>
<td>3.476</td>
<td>30</td>
<td>20</td>
<td>17</td>
<td>11</td>
<td>295510</td>
<td>10595440</td>
<td>259581</td>
<td>336060</td>
</tr>
<tr>
<th>ratio</th>
<td>1.583</td>
<td>3.4</td>
<td>3.36</td>
<td>6.71</td>
<td>1.03</td>
<td>1</td>
<td>1.176</td>
<td>1.273</td>
<td>3.504</td>
<td>1.082</td>
<td>1.125</td>
<td>1.006</td>
</tr>
</table>
WordCount
<table>
<tr>
<th>WordCount 200G(io.sort.mb=300M)
200Map, 100Reduce</th>
<th>Total Time(s)</th>
<th>Merge Segments</th>
<th>Map Avg(s)</th>
<th>Map Best(s)</th>
<th>Sort(s)</th>
<th>Shuffle Avg(s)</th>
<th>Shuffle Best(s)</th>
<th>Reduce Avg(s)</th>
<th>Reduce Best(s)</th>
<th>Map CPU(ms)</th>
<th>Reduce CPU(ms)</th>
<th>Map Memory(M)</th>
<th>Reduce Memory(M)</th>
</tr>
<tr>
<th>java</th>
<td>266</td>
<td>5</td>
<td>124</td>
<td>117</td>
<td>45</td>
<td>8</td>
<td>8</td>
<td>1</td>
<td>1</td>
<td>25324990</td>
<td>410990</td>
<td>211082</td>
<td>21153</td>
</tr>
<tr>
<th>java optimized</th>
<td>243</td>
<td>5</td>
<td>112</td>
<td>95</td>
<td>46</td>
<td>8</td>
<td>8</td>
<td>1</td>
<td>1</td>
<td>22909200</td>
<td>412430</td>
<td>104078</td>
<td>21054</td>
</tr>
<tr>
<th>native</th>
<td>55</td>
<td>4</td>
<td>17</td>
<td>16</td>
<td>5.52</td>
<td>8</td>
<td>8</td>
<td>1</td>
<td>1</td>
<td>3287460</td>
<td>443890</td>
<td>104350</td>
<td>21706</td>
</tr>
<tr>
<th>ratio</th>
<td>4.42</td>
<td>-</td>
<td>6.59</td>
<td>5.93</td>
<td>8.33</td>
<td>1</td>
<td>1</td>
<td>1</td>
<td>1</td>
<td>6.869</td>
<td>0.939</td>
<td>0.997</td>
<td>0.970</td>
</tr>
</table>
### Result Analysis
#### Map Task
There is a lot of performance gains in map tasks, this is because it is
all native, and it has a relatively efficient implementation of sort and
spill. The speedup is higher in WordCount than in Terasort, this is because
the KV size for terasort is much larger than wordcount, so there are more
records processed in WordCount for the same amount of input, the framework
has some constant overhead for each record, and sort performance is related
to record count, so the small the record is, or the more records there are,
the more speed advantage NativeTask will have.
#### Reduce Task
Reduce side does change much, about 8% in Terasort test case. This is because
reduce side shuffle and merge are still done in java, shuffle and merge
take most CPU resource and task execution time in reduce task; and there
are extra serialization overheads when crossing JNI boundary.
After shuffle and merge are implemented, or maybe just merge, similar(perhaps
smaller) performance gains are expected.
As mentioned before, the shuffle implementation is suboptimal in hadoop-1.0,
although the current trunk version has improved shuffle performance a lot,
it still can be optimized. Finally, this test environment only use 1GbE network,
we can get better whole job speedup if high speed networks like 10GbE is used.
#### Compiler Factor
As I said before, The NativeTask library used in the experiment is probably
suboptimal. For example a native wordcount task unittest runs about 11s on
my laptop, and 16s on test environment, a native terasort task unittest runs
about 9s in my laptop, and 14s on test environment. Here are some logs
generated by the tests:
On my laptop:
12/01/04 17:35:30 INFO Native Mapper with MapOutputCollector, RecordReader: NativeTask.LineRecordReader Combiner: NativeTask.IntSumReducer Partitioner: default
12/01/04 17:35:33 INFO Spill 0 [0,100) collect: 1.515s sort: 1.192s spill: 0.227s, record: 12841142, key: 1000, block: 400, size 17855, real: 18895
12/01/04 17:35:36 INFO Spill 1 [0,100) collect: 1.226s sort: 1.154s spill: 0.223s, record: 12778865, key: 1000, block: 400, size 17855, real: 18907
12/01/04 17:35:39 INFO Spill 2 [0,100) collect: 1.463s sort: 1.167s spill: 0.224s, record: 12748890, key: 1000, block: 400, size 17855, real: 18894
12/01/04 17:35:40 INFO Sort 3 [0,100) time: 0.699
12/01/04 17:35:41 INFO Merge 4 segments: record 0, key: 1000, size 17855, real 18958, time: 0.383
On test environment:
12/01/04 15:54:56 INFO Native Mapper with MapOutputCollector, RecordReader: NativeTask.LineRecordReader Combiner: NativeTask.IntSumReducer Partitioner: default
12/01/04 15:55:01 INFO Spill 0 [0,100) collect: 2.426s sort: 1.557s spill: 0.352s, record: 12841142, key: 1000, block: 400, size 17855, real: 18895
12/01/04 15:55:05 INFO Spill 1 [0,100) collect: 2.097s sort: 1.507s spill: 0.287s, record: 12778865, key: 1000, block: 400, size 17855, real: 18907
12/01/04 15:55:09 INFO Spill 2 [0,100) collect: 2.077s sort: 1.506s spill: 0.399s, record: 12748890, key: 1000, block: 400, size 17855, real: 18894
12/01/04 15:55:11 INFO Sort 3 [0,100) time: 0.951
12/01/04 15:55:11 INFO Merge 4 segments: record 0, key: 1000, size 17855, real 18958, time: 0.491
One the other hand, the same java task unittests run about the same speed on
my laptop and on test environment. So it is very likely a compiler issue,
excluding this factor, NativeTask should have extra speed advantage, about 40%-60%.
Conclusion and Future Work
--------------------------
Generally, NativeTask outperforms original MapReduce framework, about 3x-7x
for map task, 1x-1.1x for reduce task, 1.5x-5x for whole job. If the compiler
hypothesis has some truth, the speedup could be 4.5x-12x for map task, and the
speedup for whole job should be larger correspondingly.
The main reason for NativeTask's high performance are avoiding serialization,
avoiding heavy abstraction, better usage of compression, and speed advantage of
C++ over Java. Since this project is in very early stage, I expect more
improvements in the future. As mentioned before, it is possible that the
throughput for a single map task can reach 300MB/s, currently NativeTask is
about 50-100MB/s, so there is space for improvement.
NativeTask only addresses some aspects of Hadoop's inefficiency, other aspects
like shuffle, data locality, schedule & startup overhead are not the scope of
this project, but may become dominate factors in some workloads. These aspects
are better to be addressed in a higher level, such as data warehousing tools
like hive, or BSP workloads like giraph.
The next step of this project will be to integrate no sort dataflow, support
folder API, implement reduce shuffle and merge, parallel sort and spill.
Again, the main objective of this project is to provide a efficient native
Hadoop framework, so much more efficient data analyze tools can
build upon it, with the same performance of commercial systems.
I am thinking a modified version of hive, which transform its physical query
plan to LLVM IR, then run on top of NativeTask. According to Google's tenzing
paper, and current status of Hive and NativeTask, an 10x speedup for Hive is
entirely possible, and with more advanced techniques already exist in
commercial databases, it possbile to reach comparable performance of
commerical data warehousing products.
Another possible direction is Hadoop distribution for single fat node or very
small cluster. Most analytical workloads are TB scale for small companies,
only a few large companies really need to scale to PB scale, with manycore
processors and very dense disk storage, a commodity server in the near future
can have the same computing power and capacity of today's small Hadoop
cluster, a single fat node Hadoop can perform many optimizations which are
impossible in distributed mode. No network bottleneck, data can be shared
directly, combine the performance boost of NativeTask, small workloads
won't need a cluster to run anymore. In the future, perhaps every data analyst
can use Hadoop to analyze TBs of data only with their computer, and if he or she
wants more processing power, just connect to cloud and submit your same Hadoop
application unchanged.
If anyone have similar thoughts and want to start open source projects or
realize them in existing projects, please let me know:)
Useful Links
------------
For more information about vectorization or dynamic compilation:
* [Efficiently Compiling Efficient Query Plans for Modern Hardware](www.vldb.org/pvldb/vol4/p539-neumann.pdf)
* [MonetDB/X100: Hyper-pipelining query execution](http://homepages.cwi.nl/~boncz/x100.html)
A interesting article about future hardware trend and programming model:
* [http://herbsutter.com/welcome-to-the-jungle/](http://herbsutter.com/welcome-to-the-jungle/)