Skip to content

Commit d734496

Browse files
committed
Polishes the ORC data source
1 parent 2650a42 commit d734496

File tree

14 files changed

+591
-328
lines changed

14 files changed

+591
-328
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ private[spark] object SQLConf {
4343
val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
4444
val PARQUET_USE_DATA_SOURCE_API = "spark.sql.parquet.useDataSourceApi"
4545

46+
val ORC_FILTER_PUSHDOWN_ENABLED = "spark.sql.orc.filterPushdown"
47+
4648
val HIVE_VERIFY_PARTITIONPATH = "spark.sql.hive.verifyPartitionPath"
4749

4850
val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
@@ -143,6 +145,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
143145
private[spark] def parquetUseDataSourceApi =
144146
getConf(PARQUET_USE_DATA_SOURCE_API, "true").toBoolean
145147

148+
private[spark] def orcFilterPushDown =
149+
getConf(ORC_FILTER_PUSHDOWN_ENABLED, "false").toBoolean
150+
146151
/** When true uses verifyPartitionPath to prune the path which is not exists. */
147152
private[spark] def verifyPartitionPath =
148153
getConf(HIVE_VERIFY_PARTITIONPATH, "true").toBoolean
@@ -254,7 +259,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
254259

255260
private[spark] def dataFrameRetainGroupColumns: Boolean =
256261
getConf(DATAFRAME_RETAIN_GROUP_COLUMNS, "true").toBoolean
257-
262+
258263
/** ********************** SQLConf functionality methods ************ */
259264

260265
/** Set Spark SQL configuration properties. */

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala

Lines changed: 4 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,9 @@ import java.io.File
2121

2222
import scala.reflect.ClassTag
2323
import scala.reflect.runtime.universe.TypeTag
24-
import scala.util.Try
2524

26-
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
27-
import org.apache.spark.util.Utils
25+
import org.apache.spark.sql.test.SQLTestUtils
26+
import org.apache.spark.sql.{DataFrame, SaveMode}
2827

2928
/**
3029
* A helper trait that provides convenient facilities for Parquet testing.
@@ -33,54 +32,9 @@ import org.apache.spark.util.Utils
3332
* convenient to use tuples rather than special case classes when writing test cases/suites.
3433
* Especially, `Tuple1.apply` can be used to easily wrap a single type/value.
3534
*/
36-
private[sql] trait ParquetTest {
37-
val sqlContext: SQLContext
38-
35+
private[sql] trait ParquetTest extends SQLTestUtils {
3936
import sqlContext.implicits.{localSeqToDataFrameHolder, rddToDataFrameHolder}
40-
import sqlContext.{conf, sparkContext}
41-
42-
protected def configuration = sparkContext.hadoopConfiguration
43-
44-
/**
45-
* Sets all SQL configurations specified in `pairs`, calls `f`, and then restore all SQL
46-
* configurations.
47-
*
48-
* @todo Probably this method should be moved to a more general place
49-
*/
50-
protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
51-
val (keys, values) = pairs.unzip
52-
val currentValues = keys.map(key => Try(conf.getConf(key)).toOption)
53-
(keys, values).zipped.foreach(conf.setConf)
54-
try f finally {
55-
keys.zip(currentValues).foreach {
56-
case (key, Some(value)) => conf.setConf(key, value)
57-
case (key, None) => conf.unsetConf(key)
58-
}
59-
}
60-
}
61-
62-
/**
63-
* Generates a temporary path without creating the actual file/directory, then pass it to `f`. If
64-
* a file/directory is created there by `f`, it will be delete after `f` returns.
65-
*
66-
* @todo Probably this method should be moved to a more general place
67-
*/
68-
protected def withTempPath(f: File => Unit): Unit = {
69-
val path = Utils.createTempDir()
70-
path.delete()
71-
try f(path) finally Utils.deleteRecursively(path)
72-
}
73-
74-
/**
75-
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
76-
* returns.
77-
*
78-
* @todo Probably this method should be moved to a more general place
79-
*/
80-
protected def withTempDir(f: File => Unit): Unit = {
81-
val dir = Utils.createTempDir().getCanonicalFile
82-
try f(dir) finally Utils.deleteRecursively(dir)
83-
}
37+
import sqlContext.sparkContext
8438

8539
/**
8640
* Writes `data` to a Parquet file, which is then passed to `f` and will be deleted after `f`
@@ -105,13 +59,6 @@ private[sql] trait ParquetTest {
10559
withParquetFile(data)(path => f(sqlContext.read.parquet(path)))
10660
}
10761

108-
/**
109-
* Drops temporary table `tableName` after calling `f`.
110-
*/
111-
protected def withTempTable(tableName: String)(f: => Unit): Unit = {
112-
try f finally sqlContext.dropTempTable(tableName)
113-
}
114-
11562
/**
11663
* Writes `data` to a Parquet file, reads it back as a [[DataFrame]] and registers it as a
11764
* temporary table named `tableName`, then call `f`. The temporary table together with the
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.test
19+
20+
import java.io.File
21+
22+
import scala.util.Try
23+
24+
import org.apache.spark.sql.SQLContext
25+
import org.apache.spark.util.Utils
26+
27+
trait SQLTestUtils {
28+
val sqlContext: SQLContext
29+
30+
import sqlContext.{conf, sparkContext}
31+
32+
protected def configuration = sparkContext.hadoopConfiguration
33+
34+
/**
35+
* Sets all SQL configurations specified in `pairs`, calls `f`, and then restore all SQL
36+
* configurations.
37+
*
38+
* @todo Probably this method should be moved to a more general place
39+
*/
40+
protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
41+
val (keys, values) = pairs.unzip
42+
val currentValues = keys.map(key => Try(conf.getConf(key)).toOption)
43+
(keys, values).zipped.foreach(conf.setConf)
44+
try f finally {
45+
keys.zip(currentValues).foreach {
46+
case (key, Some(value)) => conf.setConf(key, value)
47+
case (key, None) => conf.unsetConf(key)
48+
}
49+
}
50+
}
51+
52+
/**
53+
* Generates a temporary path without creating the actual file/directory, then pass it to `f`. If
54+
* a file/directory is created there by `f`, it will be delete after `f` returns.
55+
*
56+
* @todo Probably this method should be moved to a more general place
57+
*/
58+
protected def withTempPath(f: File => Unit): Unit = {
59+
val path = Utils.createTempDir()
60+
path.delete()
61+
try f(path) finally Utils.deleteRecursively(path)
62+
}
63+
64+
/**
65+
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
66+
* returns.
67+
*
68+
* @todo Probably this method should be moved to a more general place
69+
*/
70+
protected def withTempDir(f: File => Unit): Unit = {
71+
val dir = Utils.createTempDir().getCanonicalFile
72+
try f(dir) finally Utils.deleteRecursively(dir)
73+
}
74+
75+
/**
76+
* Drops temporary table `tableName` after calling `f`.
77+
*/
78+
protected def withTempTable(tableName: String)(f: => Unit): Unit = {
79+
try f finally sqlContext.dropTempTable(tableName)
80+
}
81+
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/HadoopTypeConverter.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@
1717

1818
package org.apache.spark.sql.hive.orc
1919

20-
2120
import org.apache.hadoop.hive.serde2.objectinspector._
2221
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
2322

2423
import org.apache.spark.sql.catalyst.expressions.MutableRow
25-
import org.apache.spark.sql.hive.{HiveInspectors, HiveShim}
24+
import org.apache.spark.sql.hive.HiveInspectors
2625

2726
/**
2827
* We can consolidate TableReader.unwrappers and HiveInspectors.wrapperFor to use

sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,28 +28,25 @@ import org.apache.spark.sql.hive.HiveMetastoreTypes
2828
import org.apache.spark.sql.types.StructType
2929

3030
private[orc] object OrcFileOperator extends Logging{
31-
3231
def getFileReader(pathStr: String, config: Option[Configuration] = None ): Reader = {
3332
val conf = config.getOrElse(new Configuration)
3433
val fspath = new Path(pathStr)
3534
val fs = fspath.getFileSystem(conf)
3635
val orcFiles = listOrcFiles(pathStr, conf)
37-
OrcFile.createReader(fs, orcFiles(0))
36+
37+
// TODO Need to consider all files when schema evolution is taken into account.
38+
OrcFile.createReader(fs, orcFiles.head)
3839
}
3940

4041
def readSchema(path: String, conf: Option[Configuration]): StructType = {
4142
val reader = getFileReader(path, conf)
42-
val readerInspector: StructObjectInspector = reader.getObjectInspector
43-
.asInstanceOf[StructObjectInspector]
43+
val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
4444
val schema = readerInspector.getTypeName
4545
HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType]
4646
}
4747

4848
def getObjectInspector(path: String, conf: Option[Configuration]): StructObjectInspector = {
49-
val reader = getFileReader(path, conf)
50-
val readerInspector: StructObjectInspector = reader.getObjectInspector
51-
.asInstanceOf[StructObjectInspector]
52-
readerInspector
49+
getFileReader(path, conf).getObjectInspector.asInstanceOf[StructObjectInspector]
5350
}
5451

5552
def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = {
@@ -66,6 +63,7 @@ private[orc] object OrcFileOperator extends Logging{
6663
throw new IllegalArgumentException(
6764
s"orcFileOperator: path $path does not have valid orc files matching the pattern")
6865
}
66+
6967
paths
7068
}
7169
}

0 commit comments

Comments
 (0)