How do you join 3 tables in Python?

Spark supports joining multiple (two or more) DataFrames, In this article, you will learn how to use a Join on multiple DataFrames using Spark SQL expression(on tables) and Join operator with Scala example. Also, you will learn different ways to provide Join condition.

In order to explain join with multiple tables, we will use Inner join, this is the default join in Spark and it’s mostly used, this joins two DataFrames/Datasets on key columns, and where keys don’t match the rows get dropped from both datasets.

Before we jump into Spark Join examples, first, let’s create an 


+------+--------+-----------+
|emp_id|name    |emp_dept_id|
+------+--------+-----------+
|1     |Smith   |10         |
|2     |Rose    |20         |
|3     |Williams|10         |
|4     |Jones   |10         |
|5     |Brown   |40         |
|6     |Brown   |50         |
+------+--------+-----------+
4 ,

+------+--------+-----------+
|emp_id|name    |emp_dept_id|
+------+--------+-----------+
|1     |Smith   |10         |
|2     |Rose    |20         |
|3     |Williams|10         |
|4     |Jones   |10         |
|5     |Brown   |40         |
|6     |Brown   |50         |
+------+--------+-----------+
5,

+------+--------+-----------+
|emp_id|name    |emp_dept_id|
+------+--------+-----------+
|1     |Smith   |10         |
|2     |Rose    |20         |
|3     |Williams|10         |
|4     |Jones   |10         |
|5     |Brown   |40         |
|6     |Brown   |50         |
+------+--------+-----------+
6 DataFrame tables.

Emp Table


    val emp = Seq((1,"Smith","10"),
    (2,"Rose","20"),
    (3,"Williams","10"),
    (4,"Jones","10"),
    (5,"Brown","40"),
    (6,"Brown","50")
  )
  val empColumns = Seq("emp_id","name","emp_dept_id")
  import spark.sqlContext.implicits._
  val empDF = emp.toDF(empColumns:_*)
  empDF.show(false)

Yields below output.


+------+--------+-----------+
|emp_id|name    |emp_dept_id|
+------+--------+-----------+
|1     |Smith   |10         |
|2     |Rose    |20         |
|3     |Williams|10         |
|4     |Jones   |10         |
|5     |Brown   |40         |
|6     |Brown   |50         |
+------+--------+-----------+

Dept Table


  val dept = Seq(("Finance",10),
    ("Marketing",20),
    ("Sales",30),
    ("IT",40)
  )
  val deptColumns = Seq("dept_name","dept_id")
  val deptDF = dept.toDF(deptColumns:_*)
  deptDF.show(false)

Yields below output.


+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+

Address Table


  val address = Seq((1,"1523 Main St","SFO","CA"),
    (2,"3453 Orange St","SFO","NY"),
    (3,"34 Warner St","Jersey","NJ"),
    (4,"221 Cavalier St","Newark","DE"),
    (5,"789 Walnut St","Sandiago","CA")
  )
  val addColumns = Seq("emp_id","addline1","city","state")
  val addDF = address.toDF(addColumns:_*)
  addDF.show(false)

Yields below output.


+------+---------------+--------+-----+
|emp_id|addline1       |city    |state|
+------+---------------+--------+-----+
|1     |1523 Main St   |SFO     |CA   |
|2     |3453 Orange St |SFO     |NY   |
|3     |34 Warner St   |Jersey  |NJ   |
|4     |221 Cavalier St|Newark  |DE   |
|5     |789 Walnut St  |Sandiago|CA   |
+------+---------------+--------+-----+

Using Join operator


join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
join(right: Dataset[_]): DataFrame

The first join syntax takes, takes


+------+--------+-----------+
|emp_id|name    |emp_dept_id|
+------+--------+-----------+
|1     |Smith   |10         |
|2     |Rose    |20         |
|3     |Williams|10         |
|4     |Jones   |10         |
|5     |Brown   |40         |
|6     |Brown   |50         |
+------+--------+-----------+
7 dataset,

+------+--------+-----------+
|emp_id|name    |emp_dept_id|
+------+--------+-----------+
|1     |Smith   |10         |
|2     |Rose    |20         |
|3     |Williams|10         |
|4     |Jones   |10         |
|5     |Brown   |40         |
|6     |Brown   |50         |
+------+--------+-----------+
8 and

+------+--------+-----------+
|emp_id|name    |emp_dept_id|
+------+--------+-----------+
|1     |Smith   |10         |
|2     |Rose    |20         |
|3     |Williams|10         |
|4     |Jones   |10         |
|5     |Brown   |40         |
|6     |Brown   |50         |
+------+--------+-----------+
9 as arguments and we use

+------+--------+-----------+
|emp_id|name    |emp_dept_id|
+------+--------+-----------+
|1     |Smith   |10         |
|2     |Rose    |20         |
|3     |Williams|10         |
|4     |Jones   |10         |
|5     |Brown   |40         |
|6     |Brown   |50         |
+------+--------+-----------+
8 to provide a join condition. second join syntax takes just dataset and joinExprs and it considers default join as

  val dept = Seq(("Finance",10),
    ("Marketing",20),
    ("Sales",30),
    ("IT",40)
  )
  val deptColumns = Seq("dept_name","dept_id")
  val deptDF = dept.toDF(deptColumns:_*)
  deptDF.show(false)
1. The rest of the article uses both syntaxes to join multiple Spark DataFrames.


  //Using Join expression
  empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"inner" )
      .join(addDF,empDF("emp_id") === addDF("emp_id"),"inner")
      .show(false)

This joins all 3 tables and returns a new DataFrame with the below result.


+------+--------+-----------+---------+-------+------+---------------+--------+-----+
|emp_id|name    |emp_dept_id|dept_name|dept_id|emp_id|addline1       |city    |state|
+------+--------+-----------+---------+-------+------+---------------+--------+-----+
|1     |Smith   |10         |Finance  |10     |1     |1523 Main St   |SFO     |CA   |
|2     |Rose    |20         |Marketing|20     |2     |3453 Orange St |SFO     |NY   |
|3     |Williams|10         |Finance  |10     |3     |34 Warner St   |Jersey  |NJ   |
|4     |Jones   |10         |Finance  |10     |4     |221 Cavalier St|Newark  |DE   |
|5     |Brown   |40         |IT       |40     |5     |789 Walnut St  |Sandiago|CA   |
+------+--------+-----------+---------+-------+------+---------------+--------+-----+

Alternatively, you can also use


  val dept = Seq(("Finance",10),
    ("Marketing",20),
    ("Sales",30),
    ("IT",40)
  )
  val deptColumns = Seq("dept_name","dept_id")
  val deptDF = dept.toDF(deptColumns:_*)
  deptDF.show(false)
2 as jointype and to use this you should import

  val dept = Seq(("Finance",10),
    ("Marketing",20),
    ("Sales",30),
    ("IT",40)
  )
  val deptColumns = Seq("dept_name","dept_id")
  val deptDF = dept.toDF(deptColumns:_*)
  deptDF.show(false)
3


  //Using Join expression
  empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),Inner.sql )
      .join(addDF,empDF("emp_id") === addDF("emp_id"),Inner.sql)
      .show(false)

The rest of the article provides a spark Inner Join example using DataFrame


  val dept = Seq(("Finance",10),
    ("Marketing",20),
    ("Sales",30),
    ("IT",40)
  )
  val deptColumns = Seq("dept_name","dept_id")
  val deptDF = dept.toDF(deptColumns:_*)
  deptDF.show(false)
4,

  val dept = Seq(("Finance",10),
    ("Marketing",20),
    ("Sales",30),
    ("IT",40)
  )
  val deptColumns = Seq("dept_name","dept_id")
  val deptDF = dept.toDF(deptColumns:_*)
  deptDF.show(false)
5 operators and

  val dept = Seq(("Finance",10),
    ("Marketing",20),
    ("Sales",30),
    ("IT",40)
  )
  val deptColumns = Seq("dept_name","dept_id")
  val deptDF = dept.toDF(deptColumns:_*)
  deptDF.show(false)
6, all these examples provide the same output as above.

Using Where to provide Join condition

Instead of using a join condition with


  val dept = Seq(("Finance",10),
    ("Marketing",20),
    ("Sales",30),
    ("IT",40)
  )
  val deptColumns = Seq("dept_name","dept_id")
  val deptDF = dept.toDF(deptColumns:_*)
  deptDF.show(false)
7 operator, here, we use

  val dept = Seq(("Finance",10),
    ("Marketing",20),
    ("Sales",30),
    ("IT",40)
  )
  val deptColumns = Seq("dept_name","dept_id")
  val deptDF = dept.toDF(deptColumns:_*)
  deptDF.show(false)
4 to provide an inner join condition.


+------+--------+-----------+
|emp_id|name    |emp_dept_id|
+------+--------+-----------+
|1     |Smith   |10         |
|2     |Rose    |20         |
|3     |Williams|10         |
|4     |Jones   |10         |
|5     |Brown   |40         |
|6     |Brown   |50         |
+------+--------+-----------+
0

Using Filter to provide Join condition

We can also use


  val dept = Seq(("Finance",10),
    ("Marketing",20),
    ("Sales",30),
    ("IT",40)
  )
  val deptColumns = Seq("dept_name","dept_id")
  val deptDF = dept.toDF(deptColumns:_*)
  deptDF.show(false)
5 to provide join condition for Spark Join operations


+------+--------+-----------+
|emp_id|name    |emp_dept_id|
+------+--------+-----------+
|1     |Smith   |10         |
|2     |Rose    |20         |
|3     |Williams|10         |
|4     |Jones   |10         |
|5     |Brown   |40         |
|6     |Brown   |50         |
+------+--------+-----------+
1

Using SQL Expression

Here, we will use the native SQL syntax to do join on multiple tables, in order to use Native SQL syntax, first, we should create a temporary view for all our DataFrames and then use


  val dept = Seq(("Finance",10),
    ("Marketing",20),
    ("Sales",30),
    ("IT",40)
  )
  val deptColumns = Seq("dept_name","dept_id")
  val deptDF = dept.toDF(deptColumns:_*)
  deptDF.show(false)
6 to execute the SQL expression.


+------+--------+-----------+
|emp_id|name    |emp_dept_id|
+------+--------+-----------+
|1     |Smith   |10         |
|2     |Rose    |20         |
|3     |Williams|10         |
|4     |Jones   |10         |
|5     |Brown   |40         |
|6     |Brown   |50         |
+------+--------+-----------+
2

Source code of Spark Join on multiple DataFrames


+------+--------+-----------+
|emp_id|name    |emp_dept_id|
+------+--------+-----------+
|1     |Smith   |10         |
|2     |Rose    |20         |
|3     |Williams|10         |
|4     |Jones   |10         |
|5     |Brown   |40         |
|6     |Brown   |50         |
+------+--------+-----------+
3

The complete example is available at GitHub project for reference.

Conclusion

In this Spark article, you have learned how to join multiple DataFrames and tables(creating temporary views) with Scala example and also learned how to use conditions using where filter.

Thanks for reading. If you like it, please do share the article by following the below social links and any comments or suggestions are welcome in the comments sections! 

How do I join 3 data frames in Python?

Pandas merge() function is used to merge multiple Dataframes. We can use either pandas. merge() or DataFrame. merge() to merge multiple Dataframes.

Can I inner join 3 tables?

It is possible to use multiple join statements together to join more than one table at the same time. To do that you add a second INNER JOIN statement and a second ON statement to indicate the third table and the second relationship.

How do you use joins in Python?

Python String join() Method The join() method takes all items in an iterable and joins them into one string. A string must be specified as the separator.

How do you join tables in Pandas?

Key Points.
You can join pandas Dataframes in much the same way as you join tables in SQL..
The concat() function can be used to concatenate two Dataframes by adding the rows of one to the other..
concat() can also combine Dataframes by columns but the merge() function is the preferred way..