We all know that when you have to process a massive amount of data, Apache Spark is a perfect choice. Apache Spark is a distributed processing framework easy to use and designed for performing a lot of computations and stream processing. However, it can present a range of problems if not properly optimized.
A Spark job can be optimized by many techniques, so let’s dig deeper into those techniques one by one.
By default, Spark uses Java Serialization and serialization plays an important role in the performance, especially when the data is moved over the network or when we want to reduce memory usage by storing spark RDDs in serialized form. We recommend using Kryo Serialization which is more compact and 10x faster than Java Serialization. To set Kryo, we need to set the serializer properties:
# Kryo Serialization is much faster than the default Java Serialization conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
Kryo Serialization does not support all Serializable types and it requires you to register the classes you’ll use.
val conf = new SparkConf().setMaster(...).setAppName(...) conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) val sc = new SparkContext(conf)
Both contribute to better performance when you want to access a data set repeatedly. They are synonymous, but persist() provides more control over where (memory or disk) and how (serialized or unserialized) your data is stored.
val df = spark.range(1 * 5000000).toDF("id").withColumn("square", $"id" * $"id") df.cache() val size = df.count() // materialize the cache println(size) //first action is slower println(df.count()) // second call gets data from the cache and it’s faster
It is good to know that not all use cases require cache or persist. In some scenarios, it is faster to execute the DAG from the beginning instead of caching at a certain transformation step. Also, be careful when your data set is too big to fit in memory. As well, in some cases when the DataFrame is not used anymore and more computations follow it would be useful to call async nonblocking unpersist.
df.unpersist(false)
We have noticed a common mistake, especially among junior developers, that when they want to read files that are spread across many folders, they read those files sequentially on each folder, instead of reading all the files at once.
val dates = Seq("2020-07-07", "2020-07-09") val df = dates.map(_.split("-")) .map { case Array(y, m, d) => spark.read.json(s"/data/exchange-rates/y=$y/m=$m/d=$d") }.reduce(_ union _)
val dates = Seq("2020-07-07", "2020-07-09") val filesPaths = dates.map(_.split("-")) .map { case Array(y, m, d) => s"/data/exchange-rates/y=$y/m=$m/d=$d" } val df = spark.read.json(filesPaths: _*)
When you have to process .csv or .json files and you don’t specify the schema, then Spark will go through the input to determine the input schema. To avoid the extra scan, you can explicitly specify the schema using a config file:
val conf = ConfigFactory.parseResources("schema.conf") val jsonSchema = conf.root().render(ConfigRenderOptions.concise) val schema = DataType.fromJson(jsonSchema).asInstanceOf[StructType] val df = spark.read.schema(schema).json(filesPaths: _*)
When you have multiple small files, another approach is to read only one file to deduce the schema and apply the same schema to the rest of the files:
val jsonSchema = spark.read.json(filesPaths(0)).schema.json val schema = DataType.fromJson(jsonSchema).asInstanceOf[StructType] val df = spark.read.schema(schema).json(filesPaths: _*)
Coalesce should be preferred over repartition to reduce the number of partitions because it avoids a shuffle, but the number of tasks depends on the number of partitions of the output of the stage. If there are heavy computations performed upstream, you would like them to be performed in parallel, so it may be better to use repartition(1) that adds a shuffle step but keeps the execution in parallel, rather than coalesce(1) that will execute on only one node.
df.doSth().coalesce(10).write(...) //faster df.doSth().repartition(10).write(...) //slower
We have seen the most important and common tuning configurations. Spark has many configurations for tuning and you can find them in the official documentation. Another important optimization with a significant effect on execution time is the join operations tuning, but I will cover this in a future article.
Lucian Neghina is an experienced big data architect, leading eSolutions’ big data team. With a passion for open source technologies, Lucian has extensive experience in designing and implementing complex big data projects, being also a Certified Google Cloud Platform Architect.
Got a question or need advice? We're just one click away.