This is ericpony's blog
Showing posts with label MapReduce. Show all posts
Showing posts with label MapReduce. Show all posts

Thursday, January 15, 2015

Emulating Hadoop in Spark

The map and reduce functions in MapReduce have the following general form:
Map: (K1, V1) → list(K2, V2)
Reduce: (K2, list(V2)) → list(K3, V3)
When a MapReduce program is executed, the Map function is applied in parallel to every pair in the input dataset. Each invocation of map produces a list of pairs. The execution framework then collects all pairs with the same key from all lists and groups them together, creating one group for each key. After that, the Reduce function is applied in parallel to each group, which in turn produces a collection of key-value pairs. In Hadoop's implementation of MapReduce, the values in the same group are sorted before they are sent to a reducer.

Despite the suggestive naming, map and reduce operations in Spark do not directly correspond to the functions of the same name in MapReduce. For example, Map and Reduce functions in MapReduce can return multiple key-value pairs. This feature can be implemented by the flatMap operation in Spark (and typical FP languages in general):
List(1, 2, 3).map(a => List(a))     // List(List(1), List(2), List(3))
List(1, 2, 3).flatMap(a => List(a)) // List(1, 2, 3)
One naive attempt to emulate Hadoop MapReduce (without combiners) in Spark is to use two flatMap operations for map and reduce, separated by a groupByKey and a sortByKey to perform shuffle and sort:
val input: RDD[(K1, V1)] = ...
val mapOutput: RDD[(K2, V2)] = input.flatMap(mapFn)
val shuffled: RDD[(K2, Iterable[V2])] = mapOutput.groupByKey().sortByKey()
val output: RDD[(K3, V3)] = shuffled.flatMap(reduceFn)
Here the key type K2 needs to inherit from Scala’s Ordering type to satisfy the signature of sortByKey. The above code is useful to demonstrate the relationship between Hadoop and Spark, but it should not be applied blindly in practice. For one thing, its semantics are slightly different from Hadoop's MapReduce, since sortByKey performs a total sort over all key-value pairs while Hadoop sorts within groups. This issue can be avoided by using repartitionAndSortWithinPartitions in Spark 1.2 to perform a partial sort:
val shuffled: RDD[(K2, Iterable[V2])] = mapOutput
    .groupByKey()
    .repartitionAndSortWithinPartitions(Partitioner.defaultPartitioner(mapOutput))
The improvement is unfortunately still not as efficient, since Spark uses two shuffles (one for the groupByKey and one for the sort) while MapReduce only uses one. It turns out one cannot precisely and efficiently capture the semantics of Hadoop's MapReduce in the current version of Spark.

Rather than trying to reproduce MapReduce in Spark, it is better to use only the operations that you actually need. For example, if you don’t need keys to be sorted, you can omit the sortByKey call to avoid sorting (which is impossible in regular Hadoop MapReduce). Similarly, groupByKey is too general in most cases. If shuffling is needed only for aggregating values, you should use reduceByKey, foldByKey, or aggregateByKey, which are more efficient than groupByKey since they exploit combiners in the map task. Finally, flatMap may not always be needed either. Instead, you can use map if there is always one return value, or use filter if there is zero or one.

Consult these two articles on Cloudera for details of translating Hadoop programs to efficient Spark programs: How-to: Translate from MapReduce to Apache Spark, part 1 and part 2.

Further readings

1. Jimmy Lin, Chris Dyer. Data-Intensive Text Processing with MapReduce.
2. Tom White. Hadoop: The Definitive Guide, 4th Edition.
3. Srinivasa, K.G., Muppalla, Anil K. Guide to High Performance Distributed Computing.

Saturday, June 21, 2014

Verifying the Commutativity of Reduce Functions

MapReduce is a programming model that assumes as input a large, unordered stream of data. Computation essentially proceeds in three steps:
Map A map function is applied to each input, which outputs zero or more intermediate key-value pairs of an arbitrary type.
Shuffle All intermediate key-value pairs are grouped and sorted by key, so that pairs with the same key can be reduced together.
Reduce A reduce function combines values with the same key and produces results associated with that key in the final output.
Note that there is in fact an optional optimization step between map and shuffle that combines values by key on a map node. This is useful as reducing the number of key-value pairs locally before they are shuffled. We ignore this step because it is irrelevant to the topics discussed later.

While records are sorted by key before they reach the reduce function, for any particular key, the order that the values appear is not stable from one run to the next, since they come from different map tasks, which may finish at different times from run to run. As a result, most MapReduce programs are written so as not to depend on the order that the values appear to the reduce function. In other words, an implementation of a reduce function is expected to be commutative with respect to its input. Formally, a reduce function, which we may alternatively call a reducer hereafter, is commutative if for each input list $L$, it returns the same result for each permutation $\sigma$ of that list: $$\forall L, \sigma: reduce(k, L) = reduce(k, \sigma(L))$$ Note that we treat reduce functions in a very high-level manner: A reducer in our mind is just a function that takes a key and a list as input and returns a value as output. Moreover, we may assume without loss of generality that keys and values are all integers, say by considering their hash values instead of their actual values.
The main purpose of this post is to initiate an investigation to model check commutativity of reduce functions. Our modeling language supports Boolean and integer data types, Boolean operations and integer arithmetics, if-then-else and control structures that can be constructed from it, return statement, and a while loop over the iterator of the input list. Moreover, we introduce non-determinism to the language so that we can model uninterpreted function calls, etc. While this model looks like an over-simplification of reducers used in practical MapReduce programs, hopefully it has captured enough ingredients for us to derive non-trivial results that may shed light on more realistic use cases. Our first theorem about this model is as follows.
Theorem. The commutativity of reducers is undecidable.
Proof. We shall reduce the Diophantine problem , i.e., determining the solvability of Diophantine equation systems, which is known to be undecidable in general, to the problem of determining commutativity of reducers. Given a Diophantine equation system $P: P_1(x_1,...,x_k)=0$, ..., $P_n(x_1,...,x_k)=0$, we define a reducer as follows:
public int reduce(int key, Iterator<int> values) {
    int x1, ..., xk;
    if(values.hasNext()) x1 = values.next(); else return 0;
    ...
    if(values.hasNext()) xk = values.next(); else return 0;

    if(P1(x1,...,xk)!=0) return 0;
    ...
    if(Pn(x1,...,xk)!=0) return 0;

    int y1, y2;
    if(values.hasNext()) y1 = values.next(); else return 0;
    if(values.hasNext()) y2 = values.next(); else return 0;
    return y1 - y2;
}
It is clear that if equation system $P$ has no solution, the reduce function always returns zero regardless of its input, i.e., it is commutative. On the other hand, if there is a solution, then its return value depends on its input values as well as the order that they are iterated, i.e., the function is not commutative. Note that the return value $y_1-y_2$ makes the reducer non-commutative even when $P$ has a unique solution with $x_1=...=x_k$. In this way, we reduced the problem of checking the solvability of $P$ to that of checking the commutativity of a reducer. This concludes our proof. $\square$
As verifying commutativity is undecidable, the best hope for us is to derive a semi-algorithm that is effective for some interesting case studies. Our first attempt is to use abstract interpretation, along with counter abstraction for lists, and reduce the verification problem to a reachability problem. Let $t,n\in\mathbb N$ be refinement parameters. A state is a 4-tuple $(L,pc,itr,V)$ where $L$ is an abstract list value, $pc$ is the program counter, $itr$ is the position of iterator, and $V$ is the valuation in abstract domain $[-n, n]$. An abstract list is a prefix of size $t$ followed by a subset of $\{-n,...,n\}$ that represents the "tail". For example, an abstract list $1\rightarrow 2\rightarrow \{1,3\}$ concretizes to set of lists $12\{1,3\}^*$ (in regular expression). We say an abstract list is "likely" a permutation of another abstract list if any of the permutations of its concretization is contained in the concretization of the other abstract list. Finally, a state is called accepting if its $pc$ equals to the line number of some return statement.
Now, given two initial states $(L,pc,itr,V)$ and $(L',pc',itr',V')$ such that $L$ is likely a permutation of $L'$, we carry out two coordinated symbolic executions from each state and see if they can reach accepting states after the same number of steps (note that we have to allow nondeterministic NOP's so that the executions can represent real paths of different lengths even though they are coordinated). We say that the executions reach a bad state if the two paths reach some return statements, say "return x" and "return y", respectively, and $V(x)\neq V'(y)$. If no bad state is reached, then we have proved that the reducer is commutative. Otherwise, we check the bad state we reach and see if it is in effect concrete, i.e., abstract lists do not have tails and all valuations are in $(-n, n)$. If so, then we have proved that the reducer is not commutative. If the bad state is not concrete, then we have found a plausible bug, but it may be a false alarm since abstract states are over-approximation of concrete states. In such case, we need to refine the abstraction, e.g., by increasing parameters $t,n$, and re-run the execution from the beginning. Continuing in this flavor, we can obtain an answer for certain if the process terminates. Of course, it is possible that the process never stops and we just keep doing refinements again and again. This possibility cannot be eliminated, however, since the question we want to answer is undecidable per se.

References and further readings

1. Yu-Fang Chen, Chih-Duo Hong, Nishant Sinha, and Bow-Yaw Wang. "Commutativity of Reducers", TACAS, 2015.
2. A note on Hilbert's 10th Problem by Yuri Matiyasevich, or see this overview paper for a glimpse of the literature.
4. Csallner, Christoph, Leonidas Fegaras, and Chengkai Li. "New ideas track: testing MapReduce-style programs." ACM SIGSOFT, 2011.
Related Posts Plugin for WordPress, Blogger...