This is ericpony's blog

Thursday, January 15, 2015

Emulating Hadoop in Spark

The map and reduce functions in MapReduce have the following general form:
Map: (K1, V1) → list(K2, V2)
Reduce: (K2, list(V2)) → list(K3, V3)
When a MapReduce program is executed, the Map function is applied in parallel to every pair in the input dataset. Each invocation of map produces a list of pairs. The execution framework then collects all pairs with the same key from all lists and groups them together, creating one group for each key. After that, the Reduce function is applied in parallel to each group, which in turn produces a collection of key-value pairs. In Hadoop's implementation of MapReduce, the values in the same group are sorted before they are sent to a reducer.

Despite the suggestive naming, map and reduce operations in Spark do not directly correspond to the functions of the same name in MapReduce. For example, Map and Reduce functions in MapReduce can return multiple key-value pairs. This feature can be implemented by the flatMap operation in Spark (and typical FP languages in general):
List(1, 2, 3).map(a => List(a))     // List(List(1), List(2), List(3))
List(1, 2, 3).flatMap(a => List(a)) // List(1, 2, 3)
One naive attempt to emulate Hadoop MapReduce (without combiners) in Spark is to use two flatMap operations for map and reduce, separated by a groupByKey and a sortByKey to perform shuffle and sort:
val input: RDD[(K1, V1)] = ...
val mapOutput: RDD[(K2, V2)] = input.flatMap(mapFn)
val shuffled: RDD[(K2, Iterable[V2])] = mapOutput.groupByKey().sortByKey()
val output: RDD[(K3, V3)] = shuffled.flatMap(reduceFn)
Here the key type K2 needs to inherit from Scala’s Ordering type to satisfy the signature of sortByKey. The above code is useful to demonstrate the relationship between Hadoop and Spark, but it should not be applied blindly in practice. For one thing, its semantics are slightly different from Hadoop's MapReduce, since sortByKey performs a total sort over all key-value pairs while Hadoop sorts within groups. This issue can be avoided by using repartitionAndSortWithinPartitions in Spark 1.2 to perform a partial sort:
val shuffled: RDD[(K2, Iterable[V2])] = mapOutput
    .groupByKey()
    .repartitionAndSortWithinPartitions(Partitioner.defaultPartitioner(mapOutput))
The improvement is unfortunately still not as efficient, since Spark uses two shuffles (one for the groupByKey and one for the sort) while MapReduce only uses one. It turns out one cannot precisely and efficiently capture the semantics of Hadoop's MapReduce in the current version of Spark.

Rather than trying to reproduce MapReduce in Spark, it is better to use only the operations that you actually need. For example, if you don’t need keys to be sorted, you can omit the sortByKey call to avoid sorting (which is impossible in regular Hadoop MapReduce). Similarly, groupByKey is too general in most cases. If shuffling is needed only for aggregating values, you should use reduceByKey, foldByKey, or aggregateByKey, which are more efficient than groupByKey since they exploit combiners in the map task. Finally, flatMap may not always be needed either. Instead, you can use map if there is always one return value, or use filter if there is zero or one.

Consult these two articles on Cloudera for details of translating Hadoop programs to efficient Spark programs: How-to: Translate from MapReduce to Apache Spark, part 1 and part 2.

Further readings

1. Jimmy Lin, Chris Dyer. Data-Intensive Text Processing with MapReduce.
2. Tom White. Hadoop: The Definitive Guide, 4th Edition.
3. Srinivasa, K.G., Muppalla, Anil K. Guide to High Performance Distributed Computing.

No comments:

Post a Comment

Related Posts Plugin for WordPress, Blogger...