1. Improved Performance
a. Whole Stage Code Generation – Tungsten Engine
Apache Spark 2.0 ships with the second-generation Tungsten engine, which aims to improve Spark execution by optimizing Spark jobs for CPU and memory efficiency, through a technique called “whole-stage code generation”. Through this technique, optimized bytecode is emitted at runtime which collapses entire query into a single function, eliminates virtual function calls and uses CPU registers for intermediate data.
Below examples demonstrate the improved performance in Spark 2.0 vs Spark 1.6. We are using Spark 2.0 and turn off whole-stage code generation resulting in a code path similar to Spark 1.6.
i. Benchmark Setup
We define the following benchmark function to calculate the time taken by a function to execute.
def benchmark(name: String)(f: => Unit) {
val startTime = System.nanoTime</code>
f
val endTime = System.nanoTime
println(s"Time taken in $name: " +
(endTime - startTime).toDouble / 1000000000 + " seconds")
}
ii. Sum up 1 billion numbers using Spark 1.6
// This config turns off whole stage code generation, effectively changing the execution path to be similar to Spark 1.6.
spark.conf.set("spark.sql.codegen.wholeStage", false)
benchmark("Spark 1.6") {
spark.range(1000L * 1000 * 1000).selectExpr("sum(id)").show()
}
Time taken in Spark 1.6: 15.565444737 seconds
iii. Sum up 1 billion numbers using Spark 2.0
spark.conf.set("spark.sql.codegen.wholeStage", true)
benchmark("Spark 2.0") {
spark.range(1000L * 1000 * 1000).selectExpr("sum(id)").show()
}
Time taken in Spark 2.0: 0.675835036 seconds
iv. Join 1 billion records using Spark 1.6
spark.conf.set("spark.sql.codegen.wholeStage", false)
benchmark("Spark 1.6") {
spark.range(1000L * 1000 * 1000)
.join(spark.range(1000L).toDF(), "id").count()
}
Time taken in Spark 1.6: 58.126652348 seconds
v. Join 1 billion records using Spark 2.0
spark.conf.set("spark.sql.codegen.wholeStage", true)
benchmark("Spark 2.0") {
spark.range(1000L * 1000 * 1005).join(spark.range(1040L).toDF(), "id").count()
}
Time taken in Spark 2.0: 0.987448172 seconds
Below table provides a comparative view of the above results:
|
Primitive |
Spark 1.6 |
Spark 2.0 |
|
Sum |
15.6 |
0.68 |
|
Join |
58.12 seconds |
0.98 |
b. Faster Group-by aggregation
Group-by aggregates are sped up by around 3-5X by using an in-memory hashmap which acts as a ‘cache’ for extremely fast key-value lookups while evaluating aggregates.
Using the same benchmark code mentioned in above examples :
Using Spark 1.6
benchmark("Spark 1.6 Groupby"){
sqlContext.range(1000L * 1000 * 100).selectExpr("(id & 65535) as k").groupBy("k").sum().collect()
}
Time taken in Spark 1.6 Groupby: 9.419302176 seconds
Using Spark 2.0
benchmark("Spark 2.0 Groupby"){
spark.range(1000L * 1000 * 100).selectExpr("(id & 65535) as k").groupBy("k").sum().collect()
}
Time taken in Spark 2.0 Groupby: 2.265611999 seconds
c. Catalog based partition handling
In previous versions of Spark, the first read to a table can be slow, if the number of partitions is very large, since Spark must first discover which partitions exist. The initial query over the table is blocked until Spark loads all the table partition’s metadata. For a large-partitioned table, recursively scanning the filesystem to discover file metadata for the initial query can take many minutes.
As of Spark 2.1, table partition metadata is now stored in system catalog where the partitions which are not required in the query are pruned. This avoids need to locate files from partitions which are not used.
2. SQL support
a. Native DDL command implementation.
As of Spark 1.6 most of the DDL commands are delegated directly to Hive which may cause missing functionality, failures with bad error messages or inconsistent behaviours. In Spark 2.0, native DDL command are implemented directly, no more depending on Hive.
b. Support all TPC-DS queries
Spark 1.6 supports only 55 out of 99 TPC-DS queries. As of Spark 2.0, all 99 TPC-DS queries can be run. Below are some of the key changes available in Spark 2.0 due to this effort.
• Support CUBE/ROLLUP/GROUPING function
Example:
val sales = Seq(
("Warsaw", 2016, 100),
("Warsaw", 2017, 200),
("Boston", 2015, 50),
("Boston", 2016, 150),
("Toronto", 2017, 50)
).toDF("city", "year", "amount")
sales.registerTempTable("sales")
Spark 1.6
sqlContext.sql("select city, SUM(amount) as Total from sales group by rollup(city)").show
org.apache.spark.sql.AnalysisException: undefined function rollup; line 1 pos 61
Spark 2.0
spark.sql("select city, SUM(amount) as Total from sales group by rollup(city)").show
+-------+-----+
| city|Total|
+-------+-----+
| null| 550|
|Toronto| 50|
| Boston| 200|
| Warsaw| 300|
+-------+-----+
• Subquery support
In earlier versions of Spark, subqueries were only supported in FROM clause in Spark SQL.
So, statement such as given below will fail.
“select customerid from sparkbug where customerid in (select customerid from sparkbug where customerid in (2,3))”
As of Spark 2.0, subquery support has been added for SELECT/WHERE/HAVING clauses and for predicate subqueries (EXISTS, IN) only in WHERE clause.
• Support Windows Functions in SQLContext
As of Spark 2.0, three types of windows functions are supported: ranking functions, analytic functions and aggregate functions. Following ranking and analytic functions are available.
|
SQL |
|
|
Ranking |
rank |
|
dense_rank |
|
|
percent_rank |
|
|
ntile |
|
|
row_number |
|
|
Analytic |
cume_dist |
|
first_value |
|
|
last_value |
|
|
lag |
|
|
Lead |
3. New Features
a. To remove the confusion of whether SQLContext or HiveContext is to be used, SparkSession is introduced to serve as the single-entry point to access DataFrame and Dataset APIs.
i. SQLContext and HiveContext are deprecated.
ii. In REPL (spark-shell), spark is available as SparkSession.
b. To take advantage of RDD type of operations, type-safety and DataFrame type of query optimization capability, DataFrame/Dataset APIs are unified into Dataset for Java and Scala.
b. CSV data source implementation is now built in. Now there is no need to include the Apache Spark CSV module JAR while submitting Apache Spark applications. Instead, use CSV as a datasource provider when you read or write CSV datasource tables.
val df = spark.read.option(“header”, true).csv(path)









