72

For a set of dataframes

val df1 = sc.parallelize(1 to 4).map(i => (i,i*10)).toDF("id","x")
val df2 = sc.parallelize(1 to 4).map(i => (i,i*100)).toDF("id","y")
val df3 = sc.parallelize(1 to 4).map(i => (i,i*1000)).toDF("id","z")

to union all of them I do

df1.unionAll(df2).unionAll(df3)

Is there a more elegant and scalable way of doing this for any number of dataframes, for example from

Seq(df1, df2, df3) 

5 Answers 5

128

For pyspark you can do the following:

from functools import reduce
from pyspark.sql import DataFrame

dfs = [df1,df2,df3]
df = reduce(DataFrame.unionAll, dfs)

It's also worth noting that the order of all the columns in all the dataframes in the list should be the same for this to work. This can silently give unexpected results if you don't have the correct column orders!!

If you are using pyspark 2.3 or greater, you can use unionByName so you don't have to reorder the columns.

Sign up to request clarification or add additional context in comments.

6 Comments

Please remember the point mentioned in bold.
Using Python's reduce means that the operations don't occur in parallel though.. correct?
How can i add a parameter like allowMissingColumns=True?
DataFrame.unionAll is now deprecated. Use DataFrame.union instead
Wouldn't this be counterproductive to using spark as the reduce will write to disk?
|
74

The simplest solution is to reduce with union (unionAll in Spark < 2.0):

val dfs = Seq(df1, df2, df3)
dfs.reduce(_ union _)

This is relatively concise and shouldn't move data from off-heap storage but extends lineage with each union requires non-linear time to perform plan analysis. what can be a problem if you try to merge large number of DataFrames.

You can also convert to RDDs and use SparkContext.union:

dfs match {
  case h :: Nil => Some(h)
  case h :: _   => Some(h.sqlContext.createDataFrame(
                     h.sqlContext.sparkContext.union(dfs.map(_.rdd)),
                     h.schema
                   ))
  case Nil  => None
}

It keeps lineage short analysis cost low but otherwise it is less efficient than merging DataFrames directly.

5 Comments

Thanks for all these approaches!
Is this as simple in scala ? What would it be ?
How would the equivalent of this code be in pySpark?
How is the performance is there are lots (say, more than 20) of DataFrames?
Also curious in performance for large number of DF
2

You can add parameters like allowMissingColumns by using reduce with lambda

from functools import reduce
from pyspark.sql import DataFrame

dfs = [df1, df2]
df = reduce(lambda x, y: x.unionByName(y, allowMissingColumns=True), dfs)

1 Comment

I could not get why we use lambda here. I was doing without it and error thrown out. Basically , how does lambda made the difference. Can you please elaborate ?
1

Under the Hood spark flattens union expressions. So it takes longer when the Union is done linearly.

The best solution is spark to have a union function that supports multiple DataFrames.

But the following code might speed up the union of multiple DataFrames (or DataSets)somewhat.

  def union[T : ClassTag](datasets : TraversableOnce[Dataset[T]]) : Dataset[T] = {
      binaryReduce[Dataset[T]](datasets, _.union(_))
  }
  def binaryReduce[T : ClassTag](ts : TraversableOnce[T], op: (T, T) => T) : T = {
      if (ts.isEmpty) {
         throw new IllegalArgumentException
      }
      var array = ts toArray
      var size = array.size
      while(size > 1) {
         val newSize = (size + 1) / 2
         for (i <- 0 until newSize) {
             val index = i*2
             val index2 = index + 1
             if (index2 >= size) {
                array(i) = array(index)  // last remaining
             } else {
                array(i) = op(array(index), array(index2))
             }
         }
         size = newSize
     }
     array(0)
 }

Comments

0

In case some dataframes have missing columns, one can used a partially applied function:

from functools import reduce
from pyspark.sql import DataFrame

# Union dataframes by name (missing columns filled with null) 
union_by_name = partial(DataFrame.unionByName, allowMissingColumns=True)
df_output = reduce(union_by_name, [df1, df2, ...])

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.