1

Question

  1. How is the number of partitions decided by Spark?
  2. Do I need to specify the number of available CPU cores somewhere explicitly so that the number of partitions will be the same (such as numPartition arg of parallelize method, but then need to update program whenever the number of cores changed)?

Background

Installed a Spark cluster as in Environment with no changes to the spark-env.sh, spark-defaults.conf files nor SparkConf object in programs.

For a N Queen program, the number of partition was 2 and only one node was assigned tasks. For a word count program, the number of partition was 22 and tasks were allocated to all nodes. Used spark-submit for both programs.

Programs

N Queen

val sparkConf = new SparkConf().setAppName("NQueen").set("spark.files.overwrite", "true")
val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
def isSafe(column: Int, placement: List[Int]): Boolean = { ... }
def placeQueensAt(row: Int, placements: Set[List[Int]]): Set[List[Int]] = { ... }

val initial = sc.parallelize(queensAtFirst)
//val initial = sc.parallelize(queensAtFirst, 12)
println("Partitions = %d".format(initial.partitions.size))

val result = initial.flatMap(x => placeQueensAt(1, Set(x))).collect()

Word Count

val sparkConf = new SparkConf().setAppName("WordCount").set("spark.files.overwrite", "true")
val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val lines = sc.textFile("hdfs:/user/wynadmin/sfpd.csv")
println("Patitions = %d".format(lines.partitions.size))

val words = for (line <- lines; word <- line.split(",") if word.toLowerCase.matches("[a-z]+")) yield (word, 1)
val counts = words.reduceByKey(_ + _)

Environment

Spark 2.0.1 (3 nodes with 4 CPU each) on Ubuntu 14.04.
Standalone deployment (not YARN nor Mesos)

mon
  • 225
  • 3
  • 9

1 Answers1

2

Found the information in How-to: Tune Your Apache Spark Jobs (Part 2).

How is this number determined? The way Spark groups RDDs into stages is described in the previous post. (As a quick reminder, transformations like repartition and reduceByKey induce stage boundaries.) The number of tasks in a stage is the same as the number of partitions in the last RDD in the stage. The number of partitions in an RDD is the same as the number of partitions in the RDD on which it depends, with a couple exceptions: thecoalesce transformation allows creating an RDD with fewer partitions than its parent RDD, the union transformation creates an RDD with the sum of its parents’ number of partitions, and cartesian creates an RDD with their product.

What about RDDs with no parents? RDDs produced by textFile or hadoopFile have their partitions determined by the underlying MapReduce InputFormat that’s used. Typically there will be a partition for each HDFS block being read. Partitions for RDDs produced by parallelize come from the parameter given by the user, or spark.default.parallelism if none is given.

spark.default.parallelism option fixed the symptom.

--conf spark.default.parallelism=24

Setting to 12 (same with the number of cores) lead to uneven usage of nodes.

mon
  • 225
  • 3
  • 9
  • If this answers the question. Please mark it as answered so it will not appear in the unanswered questions tab. See http://meta.stackexchange.com/a/16933/269617 – 030 Jan 22 '17 at 16:55