Writing serializable closures
When you use a closure, Spark will serialize it and send it around the cluster. This means that any captured variables must be serializable. But sometimes it can't be. Consider this example:class Foo { val factor = 3.14159 val log = new Log(...) // not serializable def multiply(rdd: RDD[Int]) = { rdd.map(x => x * factor).reduce(...) } }The JVM must pass the closure to
map
to capture variable factor
. However, since the closure contains an unserializable variable log
, a NotSerializableException
will be thrown at runtime. A work around here is to assign factor
to a local variable:
class Foo { val factor = 3.14159 val log = new Log(...) // not serializable def multiply(rdd: RDD[Int]) = { // Only factor2 will be serialized. val factor2 = factor rdd.map(x => x * factor2).reduce(...) } }Serialization is a general issue for distributed programs written on the JVM. A future version of Scala may introduce a "serialization-safe" mechanism for defining closures for this purpose.
Preventing the driver program from OOM
1. Operations on RDDs includes transformations and actions. When the dataset is distributed over the workers, the outputs of transformations are still distributed. In contrast, actions will transfer the outputs to driver program, which can lead to OOM for large outputs. A rule of thumb here is to avoid using actions that may return unbounded outputs, such ascollect
, countByKey
, collectAsMap
, etc. If you need to probe a large RDD, try to use actions such as sample
, count
and take
to obtain bounded results. Also, you can write outputs directly from workers to the HDFS using saveAsTextFile
and then load them later.2. Sometimes, you get an OOM Error not because your RDDs don't fit in memory, but because the working set of one of your tasks was too large. Spark’s shuffle operations (sortByKey, groupByKey, reduceByKey, join, etc) build a hash table within each task to perform the grouping, which can often be large. The simplest fix here is to increase the number of tasks, so that each task’s input set is smaller. Spark reuses one executor across many tasks with low launching costs, so you can safely increase the number of tasks to more than the number of cores in your clusters.
Avoiding imbalant join operations
1. Suppose that you want to inner join two RDDs in Spark, one is very small with $n$ partitions and the other is very large with $m\gg n$ partitions. This operation will shuffle the data in the large RDD into $n$ partitions, carried out by at most $n$ executors regardless of the number of work nodes. A better way to do inner join in this case is to first broadcasts the small RDD to the nodes on which the large RDD is distributed, and then performs the join operation at each executor. In this way, you not only avoid the need of shuffling, but also maintain the parallelism of the larger RDD (with $m$ output partitions instead of $n$). The benefits are obtained at the cost of replicating and broadcasting the smaller RDD around the cluster.2. If you want to left join a very small RDD with a very large RDD, it may be helpful to first filter the the large RDD for only those entries that sharing keys with the small RDD. This filtering step reduces the data needed to be shuffled over the network and thus may speed up the overall execution.
Note that this kind of balancing tricks are not needed for reduce-like operations such as
groupByKey
and reduceByKey
, where the largest parent RDD’s number of partitions will be used.References and resources
1. The official guidelines for turning Spark2. How-to: Tune Your Apache Spark Jobs at Cloudera
3. CME 323: Distributed Algorithms and Optimization at Stanford Univ.
No comments:
Post a Comment