Summary of Join in MapReduce
>> copy from research blog.
MapReduce can perform joins between large datasets, but writing the code to do joins from scratch is fairly involved. The basic problem is reconcile two datasets shared a same field/key.
Reduce-Side Joins
A reduce join operation will be compiled to a MapReduce task, which involves a map stage and a reduce stage. A mapper reads from join tables and emits the join key and join value pair into an intermediate file. Hadoop sorts and merges these pairs in what’s called the shuffle stage. The reducer takes the sorted results as input and does the actual join work. The shuffle is really expensive since it needs to sort and merge. Saving the shuffle and reduce stages improves the task performance.
Map-Side Joins
The motivation of map join is to save the shuffle and reduce stages and do the join work only in the map stage. By doing so, when one of the join tables is small enough (|R|<|S|/r) to fit into the memory, all the mappers can hold the small data in memory and do the join work there. So all the join operations can be finished in the map stage.
However there are some scaling problems with this type of map join. When thousands of mappers read the small join table from the HDFS into memory at the same time, the join table easily becomes the performance bottleneck, causing the mappers to time out during the read operations.
To support the data distribution of small table, MapReduce can put the small table into the distributed cache before launching all mappers. And the distributed cache will distribute the small table to cache located at each node. After optimization, the small table needs to be read just once.
Semi Joins
The reduce-side join may involve lots of transfer cost, as the two tables are all needed to ship to reducers. In some scenarios, such as log processing, a large portion of records in two records don’t share a same key, which means these records are not used by the join operation. Semi-join is used to avoid sending records over the network that will not be used in join operation.
Let X, Y represent the two tables, and X is smaller than Y. The semi join implementation has two phases. The first phase is to collect key information in table X and save it in another file. In the second phase, the generate file is distributed to all mappers through distributed cache. Each record in table X or Y will be output to reducer only its key appear in the key file. Through such way, we can avoid sending unused records
If the key file is too large to be load in memory, we can further use bloom filter.






Recent Comments