This is ericpony's blog

Friday, January 23, 2015

Deterministic aggregate semantics in Scala and Spark

The aggregate function in Scala has method signature as follows:
def aggregate[A](z: => A)(seqOp: (A, B) => A, combOp: (A, A) => A): A
The semantics of aggregate depend on the collection upon which it is invoked. When aggregate is invoked on a sequential traversable collection such as a List, it fallbacks to foldLeft and ignores the provided compOp. In this case, aggregate behaves just like the ordinary fold in typical functional programming languages such as Lisp and ML. When aggregate is invoked on a parallel traversable collection, however, it performs a two-stage aggregation over the data elements in parallel. In Scala, a parallel collection can be splitted into partitions and distributed over several cores or nodes. Invoking aggregate on a parallel collection reduces a value for the collection by first folding each partition separately using seqOp, and then merging the results one by one using combOp.

Note that the Scala API document doesn't specify the way in which the collection is partitioned, as well as the order in which the intermediate results are merged. Hence, if a programmer hopes to obtain the same output whenever aggregate is invoked on the same parallel collection, he has to make sure himself that seqOp is associative and combOp is both associative and commutative.

Aggregate v.s fold

The fold function in Scala has method signature as follows:
def fold[A1 >: A](z: A1)(combOp: (A1, A1) => A1): A1
One can observe two differences between the signatures of fold and aggregate: 1) fold only needs one operator, while aggregate needs two; 2) fold requires the result type to be a supertype of the element type, while aggregate doesn't. Note that both methods fallback to foldLeft when they are invoked on a sequential collection. One may notice that Scala's aggregate is in effect the ordinary FP fold
fold :  (A → B → A) → A → ([B] → A)
and Scala's fold is just a special form of its aggregate. The idea behind this design is to allow for parallel FP fold. In Scala, fold and aggregate are both done in two stages, namely folding and combining. For fold, the same operator is used for folding and combining, and this requires the supertype relationship between the element and the result type. Without the supertype relationship, it will take two operators to fold and combine, and this is why Scala provides aggregate that takes two operators as arguments.

Aggregate in Spark

Spark implements its own aggregate operations for RDD. One can observe from the source code of Spark that the intermediate results of aggregate are always iterated and merged in a fixed order (see the Java, Python and Scala implementations for details). Hence, we don't have to use commutative combOp to obtain deterministic results from aggregate, given that at least one of the following conditions holds:
(i) there doesn't exist an empty partition in any partitioning of the collection, or
(ii) z is an identity of combOp, i.e., for all a of type A, combOp(z,a) = combOp(a,z) = a.
Note that this observation is not assured by the official Spark API document and is thus not guaranteed to hold in the future versions of Spark. However, it is still useful if we want to model the behaviours of a Spark program formally.

Below, we shall try to formalize Spark's implementation of aggregate. While Scala is an impure functional language, we only consider pure functions here for simplicity. Let $R(L,m)$ denote an RDD object obtained from partitioning a list $L \in \mathbb{D}^*$ into $m \ge 1$ sub-lists. Suppose that we have fixed an element $zero \in \mathbb{E}$, as well as operators $seq:{\mathbb{E} \times \mathbb{D} \to \mathbb{E}}$ and $comb:{\mathbb{E} \times \mathbb{E} \to \mathbb{E}}$. An invocation of aggregate is said to be deterministic with respect to $zero$, $seq$ and $comb$ if the output of $$R(L,\cdot).\textrm{aggregate}(zero)(seq,\ comb)$$ only depends on $L$. Define a function $\textrm{foldl}:({\mathbb{E} \times \mathbb{D} \to \mathbb{E}}) \times \mathbb{E} \times \mathbb{D}^* \to \mathbb{E}$ by \begin{eqnarray*} \textrm{foldl}(f,\ a,\ Nil) & = & a \\ \textrm{foldl}(f,\ a,\ b \!::\! bs) & = & \textrm{foldl}(f,\ f(a,b),\ bs), \end{eqnarray*} where $f\in {\mathbb{E} \times \mathbb{D} \to \mathbb{E}}$, $a \in \mathbb{E}$, $b \in \mathbb{D}$ and $bs \in \mathbb{D}^*$. Suppose that $1\le m \le |L|$ and $R(L,m)$ partitions $L$ into $\{L_1, \dots, L_m\}$ such that $L=L_1 \cdots L_m$. According to the source code of Spark, we can define the aggregate operation in Spark as \begin{eqnarray*} R(L,m).\textrm{aggregate}(zero)(seq,\ comb) = \textrm{foldl}(comb,\ zero,\ L'), \end{eqnarray*} where $L' \in \mathbb{E}^*$ is a list of length $m$ and $L'[i] = \textrm{foldl}(seq,\ zero,\ L_i)$ for $i=1,\dots,m$.

Note that the partitioning of $L$ is unspecified and may be non-deterministic. Hence, we have to use an associative $comb$ operator if we want to ensure that the result of aggregation is deterministic. Suppose that $comb$ is associative and at least one of conditions (i) and (ii) is satisfied. It turns out that the output of aggregate is deterministic with respect to $zero$, $seq$ and $comb$ if and only if for all lists $L_1$ and $L_2$, $$comb(zero,\ \textrm{foldl}(seq,\ zero,\ L_1L_2)) = comb(\textrm{foldl}(seq,\ zero,\ L_1),\ \textrm{foldl}(seq,\ zero,\ L_2)).$$ While the above condition is undecidable in general, it can be effectively checked for some classes of programs such as FSMs over finite alphabets.

Concluding remarks

Aggregate in Spark is the parallelized version of the conventional fold, which was already shown to be extremely useful in functional programming. It is therefore of practical interests to study the properties and behaviours of the aggregate function. In particular, it is interesting to establish and verify the conditions under which aggregate is deterministic with respect to the provided $zero$, $seq$ and $comb$. We may further investigate this problem in the coming posts.

Thursday, January 15, 2015

Emulating Hadoop in Spark

The map and reduce functions in MapReduce have the following general form:
Map: (K1, V1) → list(K2, V2)
Reduce: (K2, list(V2)) → list(K3, V3)
When a MapReduce program is executed, the Map function is applied in parallel to every pair in the input dataset. Each invocation of map produces a list of pairs. The execution framework then collects all pairs with the same key from all lists and groups them together, creating one group for each key. After that, the Reduce function is applied in parallel to each group, which in turn produces a collection of key-value pairs. In Hadoop's implementation of MapReduce, the values in the same group are sorted before they are sent to a reducer.

Despite the suggestive naming, map and reduce operations in Spark do not directly correspond to the functions of the same name in MapReduce. For example, Map and Reduce functions in MapReduce can return multiple key-value pairs. This feature can be implemented by the flatMap operation in Spark (and typical FP languages in general):
List(1, 2, 3).map(a => List(a))     // List(List(1), List(2), List(3))
List(1, 2, 3).flatMap(a => List(a)) // List(1, 2, 3)
One naive attempt to emulate Hadoop MapReduce (without combiners) in Spark is to use two flatMap operations for map and reduce, separated by a groupByKey and a sortByKey to perform shuffle and sort:
val input: RDD[(K1, V1)] = ...
val mapOutput: RDD[(K2, V2)] = input.flatMap(mapFn)
val shuffled: RDD[(K2, Iterable[V2])] = mapOutput.groupByKey().sortByKey()
val output: RDD[(K3, V3)] = shuffled.flatMap(reduceFn)
Here the key type K2 needs to inherit from Scala’s Ordering type to satisfy the signature of sortByKey. The above code is useful to demonstrate the relationship between Hadoop and Spark, but it should not be applied blindly in practice. For one thing, its semantics are slightly different from Hadoop's MapReduce, since sortByKey performs a total sort over all key-value pairs while Hadoop sorts within groups. This issue can be avoided by using repartitionAndSortWithinPartitions in Spark 1.2 to perform a partial sort:
val shuffled: RDD[(K2, Iterable[V2])] = mapOutput
    .groupByKey()
    .repartitionAndSortWithinPartitions(Partitioner.defaultPartitioner(mapOutput))
The improvement is unfortunately still not as efficient, since Spark uses two shuffles (one for the groupByKey and one for the sort) while MapReduce only uses one. It turns out one cannot precisely and efficiently capture the semantics of Hadoop's MapReduce in the current version of Spark.

Rather than trying to reproduce MapReduce in Spark, it is better to use only the operations that you actually need. For example, if you don’t need keys to be sorted, you can omit the sortByKey call to avoid sorting (which is impossible in regular Hadoop MapReduce). Similarly, groupByKey is too general in most cases. If shuffling is needed only for aggregating values, you should use reduceByKey, foldByKey, or aggregateByKey, which are more efficient than groupByKey since they exploit combiners in the map task. Finally, flatMap may not always be needed either. Instead, you can use map if there is always one return value, or use filter if there is zero or one.

Consult these two articles on Cloudera for details of translating Hadoop programs to efficient Spark programs: How-to: Translate from MapReduce to Apache Spark, part 1 and part 2.

Further readings

1. Jimmy Lin, Chris Dyer. Data-Intensive Text Processing with MapReduce.
2. Tom White. Hadoop: The Definitive Guide, 4th Edition.
3. Srinivasa, K.G., Muppalla, Anil K. Guide to High Performance Distributed Computing.
Related Posts Plugin for WordPress, Blogger...