PySpark is the Spark Python API that exposes the Spark programming model to Python. With PySpark, you can easily utilize Spark‘s capabilities for processing large amounts of data efficiently using Python.

One of the most common data operations is joining data from multiple sources. PySpark provides various join methods to allow you to combine data from multiple DataFrames or tables.

In this comprehensive guide, we will explore the various types of joins supported in PySpark and how to use them with examples.

DataFrames for Examples

Let‘s first create two sample DataFrames that we will use for the join examples.

# Import SparkSession 
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
            .appName("joins-example") \
            .getOrCreate()

# Create students DataFrame
students_df = spark.createDataFrame([
    ("001", "John", 15, "New York"), 
    ("002", "Sam", 16, "California"),
    ("003", "Sarah", 17, None), 
    ("004", "Cindy", 18, "Texas")  
], ["id", "name", "age", "state"])

# Create scores DataFrame 
scores_df = spark.createDataFrame([
    ("001", 90), 
    ("002", 95),
    ("001", 85),
    ("003", 98),
    ("005", 75)
], ["id", "score"]) 

The students_df DataFrame contains some simple student data with columns for ID, name, age and state.

The scores_df contains test scores per student ID.

Now let‘s explore the various join options.

Inner Join

An inner join returns only the rows from both DataFrames where the join condition is satisfied.

This means it will return rows with matching IDs from students_df and scores_df.

join_df = students_df.join(scores_df, "id", "inner") 
join_df.show()

This performs an inner join on the "id" column:

+---+------+---+--------+-----+                                                         
| id|  name|age|   state|score|
+---+------+---+--------+-----+
|001|  John| 15|New York|   90|
|001|  John| 15|New York|   85| 
|002|   Sam| 16|California|   95|
|003| Sarah| 17|     null|   98|
+---+------+---+--------+-----+

We can see that it has matched rows from both DataFrames where the ID exists in both. Students without scores are not included.

Left Outer Join

A left outer join returns all rows from the left DataFrame (first DataFrame) regardless of whether the join condition is met. Rows from the right DataFrame (second DataFrame) are included where the join condition is satisfied.

join_df = students_df.join(scores_df, "id", "left_outer")
join_df.show() 

This left outer joins on ID:

+---+------+---+--------+-----+
| id|  name|age|   state|score|  
+---+------+---+--------+-----+
|001|  John| 15|New York|   90|
|001|  John| 15|New York|   85|
|002|   Sam| 16|California|   95| 
|003| Sarah| 17|     null|   98|
|004| Cindy| 18|   Texas| null|  
+---+------+---+--------+-----+

We can see all rows from students_df are included, along with the matched rows from scores_df. Students without scores have null for the score column.

Right Outer Join

A right outer join is the opposite of a left outer join. It returns all rows from the right DataFrame with rows from the left DataFrame included where the join condition is met.

join_df = students_df.join(scores_df, "id", "right_outer")
join_df.show()

Right outer join on ID:

+---+------+----+--------+-----+                                                                
| id|  name| age|   state|score|
+---+------+----+--------+-----+
|001|  John|  15|New York|   90|
|001|  John|  15|New York|   85|   
|002|   Sam|  16|California|   95|
|003| Sarah|  17|     null|   98|  
|005|  null|null|    null|   75|
+---+------+----+--------+-----+

Here all rows from scores_df are included, with rows from students_df joined where available. Students without data have null values.

Full Outer Join

A full outer join combines the results of left and right outer joins. It returns all rows from both the left and right DataFrames, with null values for missing rows on either side.

join_df = students_df.join(scores_df, "id", "full_outer")
join_df.show()  

Full outer join on ID:

+---+------+----+--------+-----+
| id|  name| age|   state|score|  
+---+------+----+--------+-----+
|004| Cindy|  18|   Texas| null|
|001|  John|  15|New York|   90|
|001|  John|  15|New York|   85|    
|002|   Sam|  16|California|   95|
|003| Sarah|  17|     null|   98|
|005|  null|null|    null|   75|  
+---+------+----+--------+-----+

As we can see, the full outer join output contains all records from both sides, students_df and scores_df in this case.

Left Semi Join

A left semi join returns only rows from the left DataFrame where the join condition is met. Unlike an inner join, it does not return columns from the right DataFrame.

join_df = students_df.join(scores_df, "id", "left_semi")   
join_df.show()  

Left semi join on ID:

+---+------+---+--------+                                                       
| id|  name|age|   state|
+---+------+---+--------+
|001|  John| 15|New York|
|002|   Sam| 16|California|  
|003| Sarah| 17|     null|
+---+------+---+--------+

Here we only get the matching rows from the left students_df DataFrame.

Left Anti Join

A left anti join returns rows from the left DataFrame that do not have matches in the right DataFrame. This is the opposite of an inner join.

join_df = students_df.join(scores_df, "id", "left_anti")
join_df.show()   

Left anti join on ID:

+---+------+---+--------+ 
|id |name  |age|state   |
+---+------+---+--------+
|004|Cindy | 18| Texas  |
+---+------+---+--------+

The above left anti join selects students that do not have scores i.e students not present in scores_df.

Cross Join

A cross join performs a cartesian product between two DataFrames. It matches each row from the left DataFrame with all rows from the right DataFrame.

For large DataFrames, this can generate extremely large output, so take caution when using it.

Here‘s an example:

join_df = students_df.crossJoin(scores_df)
join_df.show()  

Cross join:

+---+------+---+--------+---+-----+
| id|  name|age|   state| id|score|
+---+------+---+--------+---+-----+   
|001|  John| 15|New York|001|   90|
|001|  John| 15|New York|002|   95|    
|001|  John| 15|New York|001|   85|
|001|  John| 15|New York|003|   98| 
|001|  John| 15|New York|005|   75|
|002|   Sam| 16|California|001|   90|   
|002|   Sam| 16|California|002|   95|    
|002|   Sam| 16|California|001|   85|  
|002|   Sam| 16|California|003|   98|
|002|   Sam| 16|California|005|   75|  
|003| Sarah| 17|     null|001|   90|
|003| Sarah| 17|     null|002|   95| 
|003| Sarah| 17|     null|001|   85|
|003| Sarah| 17|     null|003|   98|
|003| Sarah| 17|     null|005|   75|
|004| Cindy| 18|   Texas|001|   90|
|004| Cindy| 18|   Texas|002|   95|     
|004| Cindy| 18|   Texas|001|   85|  
|004| Cindy| 18|   Texas|003|   98| 
|004| Cindy| 18|   Texas|005|   75|
+---+------+---+--------+---+-----+

As you can see, it generates the cartesian product matching every row from the left DataFrame (students_df) with every row from the right DataFrame (scores_df).

So use cross joins cautiously on large datasets.

Joining Multiple DataFrames

All the joins we have covered can be used to join more than 2 DataFrames.

For example, to join 3 DataFrames:

df1.join(df2, "id").join(df3, "id") 

The rule of thumb is that calling join creates a new DataFrame, which can then be joined to another DataFrame.

So chaining multiple join calls allows joining multiple DataFrames.

Join Conditions

So far all the join examples used the ID column for joins. However joins can use more complex conditions using conditional expressions.

For example:

df1.join(df2, (df1.col1 == df2.col1) & (df1.col2 > 10))  

This joins on an AND condition checking equality of one column and inequality of another column.

Any valid Spark SQL expression can be used as join conditions in this way to support quite advanced join logics.

Conclusion

That covers the main joins offered by PySpark SQL and DataFrames. Some key points:

  • Inner joins match only common rows across both DataFrames
  • Left/Right outer joins retain all rows from one side (left/right DataFrame)
  • Full outer joins preserve rows from both DataFrames
  • Semi joins and anti joins allow filtering one DataFrame based on presence/absence of matches from another DataFrame
  • Cross joins compute a cartesian product across DataFrames, match every row with every other row

By mastering these joins you can combine and analyze data from multiple sources in your PySpark applications and pipelines.

I hope you found this PySpark SQL joins guide useful! Let me know if you have any other questions.

Similar Posts