The join() command in Spark is used to combine two DataFrames based on a common column or key. It is similar to SQL joins and allows you to perform various types of joins, such as inner, outer, left, right, and cross joins. This is particularly useful when you need to combine datasets for analysis or processing.


1. Syntax

PySpark:

df1.join(df2, join_condition, join_type)

Spark SQL:

SELECT * 
FROM table1 
JOIN table2 
ON join_condition;

2. Parameters

  • df2: The DataFrame to join with.
  • join_condition: A condition specifying how the DataFrames should be joined (e.g., df1.col_name == df2.col_name).
  • join_type (optional): The type of join to perform. Default is inner. Options include:
    • inner: Returns rows with matching keys in both DataFrames.
    • outer/full: Returns all rows from both DataFrames, with null where there is no match.
    • left/left_outer: Returns all rows from the left DataFrame and matching rows from the right DataFrame.
    • right/right_outer: Returns all rows from the right DataFrame and matching rows from the left DataFrame.
    • cross: Returns the Cartesian product of both DataFrames.

3. Return Type

  • Returns a new DataFrame containing the combined data based on the join condition and type.

4. Examples

Example 1: Inner Join

PySpark:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("JoinExample").getOrCreate()

# Create DataFrames
employees_data = [("Anand", 101), ("Bala", 102), ("Kavitha", 103)]
departments_data = [(101, "Sales"), (102, "HR"), (104, "Finance")]

employees_columns = ["Name", "DeptID"]
departments_columns = ["DeptID", "DeptName"]

employees_df = spark.createDataFrame(employees_data, employees_columns)
departments_df = spark.createDataFrame(departments_data, departments_columns)

# Inner join on 'DeptID'
joined_df = employees_df.join(departments_df, "DeptID", "inner")
joined_df.show()

Spark SQL:

SELECT * 
FROM employees 
INNER JOIN departments 
ON employees.DeptID = departments.DeptID;

Output:

+------+-------+--------+
|DeptID|   Name|DeptName|
+------+-------+--------+
|   101|  Anand|   Sales|
|   102|   Bala|      HR|
+------+-------+--------+

Example 2: Left Join

PySpark:

# Left join on 'DeptID'
joined_df = employees_df.join(departments_df, "DeptID", "left")
joined_df.show()

Spark SQL:

SELECT * 
FROM employees 
LEFT JOIN departments 
ON employees.DeptID = departments.DeptID;

Output:

+------+-------+--------+
|DeptID|   Name|DeptName|
+------+-------+--------+
|   101|  Anand|   Sales|
|   102|   Bala|      HR|
|   103|Kavitha|    null|
+------+-------+--------+

Example 3: Right Join

PySpark:

# Right join on 'DeptID'
joined_df = employees_df.join(departments_df, "DeptID", "right")
joined_df.show()

Spark SQL:

SELECT * 
FROM employees 
RIGHT JOIN departments 
ON employees.DeptID = departments.DeptID;

Output:

+------+-----+--------+
|DeptID| Name|DeptName|
+------+-----+--------+
|   101|Anand|   Sales|
|   102| Bala|      HR|
|   104| null| Finance|
+------+-----+--------+

Example 4: Full Outer Join

PySpark:

# Full outer join on 'DeptID'
joined_df = employees_df.join(departments_df, "DeptID", "outer")
joined_df.show()

Spark SQL:

SELECT * 
FROM employees 
FULL OUTER JOIN departments 
ON employees.DeptID = departments.DeptID;

Output:

+------+-------+--------+
|DeptID|   Name|DeptName|
+------+-------+--------+
|   101|  Anand|   Sales|
|   102|   Bala|      HR|
|   103|Kavitha|    null|
|   104|   null| Finance|
+------+-------+--------+

Example 5: Cross Join

PySpark:

# Cross join (Cartesian product)
joined_df = employees_df.crossJoin(departments_df)
joined_df.show()

Spark SQL:

SELECT * 
FROM employees 
CROSS JOIN departments;

Output:

+------+-------+------+--------+
|DeptID|   Name|DeptID|DeptName|
+------+-------+------+--------+
|   101|  Anand|   101|   Sales|
|   101|  Anand|   102|      HR|
|   101|  Anand|   104| Finance|
|   102|   Bala|   101|   Sales|
|   102|   Bala|   102|      HR|
|   102|   Bala|   104| Finance|
|   103|Kavitha|   101|   Sales|
|   103|Kavitha|   102|      HR|
|   103|Kavitha|   104| Finance|
+------+-------+------+--------+

Example 6: Joining on Multiple Columns

PySpark:

# Create DataFrames
employees_data = [("Anand", 101, "M"), ("Bala", 102, "F"), ("Kavitha", 103, "F")]
departments_data = [(101, "Sales", "Chennai"), (102, "HR", "Bangalore"), (104, "Finance", "Mumbai")]

employees_columns = ["Name", "DeptID", "Gender"]
departments_columns = ["DeptID", "DeptName", "Location"]

employees_df = spark.createDataFrame(employees_data, employees_columns)
departments_df = spark.createDataFrame(departments_data, departments_columns)

# Join on multiple columns
joined_df = employees_df.join(departments_df, ["DeptID"], "inner")
joined_df.show()

Spark SQL:

SELECT * 
FROM employees 
INNER JOIN departments 
ON employees.DeptID = departments.DeptID;

Output:

+------+-------+------+--------+---------+
|DeptID|   Name|Gender|DeptName| Location|
+------+-------+------+--------+---------+
|   101|  Anand|     M|   Sales|  Chennai|
|   102|   Bala|     F|      HR|Bangalore|
+------+-------+------+--------+---------+

Example 7: Using Broadcast Join for Small DataFrames

PySpark:

from pyspark.sql.functions import broadcast

# Broadcast the smaller DataFrame for optimization
joined_df = employees_df.join(broadcast(departments_df), "DeptID", "inner")
joined_df.show()

Spark SQL:

SELECT /*+ BROADCAST(departments) */ * 
FROM employees 
INNER JOIN departments 
ON employees.DeptID = departments.DeptID;

Output:

+------+-------+------+--------+---------+
|DeptID|   Name|Gender|DeptName| Location|
+------+-------+------+--------+---------+
|   101|  Anand|     M|   Sales|  Chennai|
|   102|   Bala|     F|      HR|Bangalore|
+------+-------+------+--------+---------+

5. Common Use Cases

  • Combining transactional data with master data (e.g., sales data with product data).
  • Enriching datasets by merging related information (e.g., customer data with order data).
  • Preparing data for machine learning by combining features from multiple tables.

6. Performance Considerations

  • Use join() judiciously on large datasets, as it involves shuffling and sorting, which can be expensive.
  • Consider using broadcast() for small DataFrames to optimize performance.
  • Use proper partitioning and indexing to improve join performance.

7. Key Takeaways

  1. The join() command is used to combine two DataFrames based on a common column or key.
  2. It supports various types of joins, including inner, outer, left, right, and cross joins.
  3. Joins can be resource-intensive for large datasets, as they involve shuffling and sorting.
  4. In Spark SQL, similar functionality can be achieved using JOIN clauses.
  5. Works efficiently on large datasets when combined with proper partitioning and caching.