Documentation Index
Fetch the complete documentation index at: https://rajanand.org/llms.txt
Use this file to discover all available pages before exploring further.
The join() command in Spark is used to combine two DataFrames based on a common column or key. It is similar to SQL joins and allows you to perform various types of joins, such as inner, outer, left, right, and cross joins. This is particularly useful when you need to combine datasets for analysis or processing.
1. Syntax
PySpark:
df1.join(df2, join_condition, join_type)
Spark SQL:
SELECT *
FROM table1
JOIN table2
ON join_condition;
2. Parameters
- df2: The DataFrame to join with.
- join_condition: A condition specifying how the DataFrames should be joined (e.g.,
df1.col_name == df2.col_name).
- join_type (optional): The type of join to perform. Default is
inner. Options include:
inner: Returns rows with matching keys in both DataFrames.
outer/full: Returns all rows from both DataFrames, with null where there is no match.
left/left_outer: Returns all rows from the left DataFrame and matching rows from the right DataFrame.
right/right_outer: Returns all rows from the right DataFrame and matching rows from the left DataFrame.
cross: Returns the Cartesian product of both DataFrames.
3. Return Type
- Returns a new DataFrame containing the combined data based on the join condition and type.
4. Examples
Example 1: Inner Join
PySpark:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("JoinExample").getOrCreate()
# Create DataFrames
employees_data = [("Anand", 101), ("Bala", 102), ("Kavitha", 103)]
departments_data = [(101, "Sales"), (102, "HR"), (104, "Finance")]
employees_columns = ["Name", "DeptID"]
departments_columns = ["DeptID", "DeptName"]
employees_df = spark.createDataFrame(employees_data, employees_columns)
departments_df = spark.createDataFrame(departments_data, departments_columns)
# Inner join on 'DeptID'
joined_df = employees_df.join(departments_df, "DeptID", "inner")
joined_df.show()
Spark SQL:
SELECT *
FROM employees
INNER JOIN departments
ON employees.DeptID = departments.DeptID;
Output:
+------+-------+--------+
|DeptID| Name|DeptName|
+------+-------+--------+
| 101| Anand| Sales|
| 102| Bala| HR|
+------+-------+--------+
Example 2: Left Join
PySpark:
# Left join on 'DeptID'
joined_df = employees_df.join(departments_df, "DeptID", "left")
joined_df.show()
Spark SQL:
SELECT *
FROM employees
LEFT JOIN departments
ON employees.DeptID = departments.DeptID;
Output:
+------+-------+--------+
|DeptID| Name|DeptName|
+------+-------+--------+
| 101| Anand| Sales|
| 102| Bala| HR|
| 103|Kavitha| null|
+------+-------+--------+
Example 3: Right Join
PySpark:
# Right join on 'DeptID'
joined_df = employees_df.join(departments_df, "DeptID", "right")
joined_df.show()
Spark SQL:
SELECT *
FROM employees
RIGHT JOIN departments
ON employees.DeptID = departments.DeptID;
Output:
+------+-----+--------+
|DeptID| Name|DeptName|
+------+-----+--------+
| 101|Anand| Sales|
| 102| Bala| HR|
| 104| null| Finance|
+------+-----+--------+
Example 4: Full Outer Join
PySpark:
# Full outer join on 'DeptID'
joined_df = employees_df.join(departments_df, "DeptID", "outer")
joined_df.show()
Spark SQL:
SELECT *
FROM employees
FULL OUTER JOIN departments
ON employees.DeptID = departments.DeptID;
Output:
+------+-------+--------+
|DeptID| Name|DeptName|
+------+-------+--------+
| 101| Anand| Sales|
| 102| Bala| HR|
| 103|Kavitha| null|
| 104| null| Finance|
+------+-------+--------+
Example 5: Cross Join
PySpark:
# Cross join (Cartesian product)
joined_df = employees_df.crossJoin(departments_df)
joined_df.show()
Spark SQL:
SELECT *
FROM employees
CROSS JOIN departments;
Output:
+------+-------+------+--------+
|DeptID| Name|DeptID|DeptName|
+------+-------+------+--------+
| 101| Anand| 101| Sales|
| 101| Anand| 102| HR|
| 101| Anand| 104| Finance|
| 102| Bala| 101| Sales|
| 102| Bala| 102| HR|
| 102| Bala| 104| Finance|
| 103|Kavitha| 101| Sales|
| 103|Kavitha| 102| HR|
| 103|Kavitha| 104| Finance|
+------+-------+------+--------+
Example 6: Joining on Multiple Columns
PySpark:
# Create DataFrames
employees_data = [("Anand", 101, "M"), ("Bala", 102, "F"), ("Kavitha", 103, "F")]
departments_data = [(101, "Sales", "Chennai"), (102, "HR", "Bangalore"), (104, "Finance", "Mumbai")]
employees_columns = ["Name", "DeptID", "Gender"]
departments_columns = ["DeptID", "DeptName", "Location"]
employees_df = spark.createDataFrame(employees_data, employees_columns)
departments_df = spark.createDataFrame(departments_data, departments_columns)
# Join on multiple columns
joined_df = employees_df.join(departments_df, ["DeptID"], "inner")
joined_df.show()
Spark SQL:
SELECT *
FROM employees
INNER JOIN departments
ON employees.DeptID = departments.DeptID;
Output:
+------+-------+------+--------+---------+
|DeptID| Name|Gender|DeptName| Location|
+------+-------+------+--------+---------+
| 101| Anand| M| Sales| Chennai|
| 102| Bala| F| HR|Bangalore|
+------+-------+------+--------+---------+
Example 7: Using Broadcast Join for Small DataFrames
PySpark:
from pyspark.sql.functions import broadcast
# Broadcast the smaller DataFrame for optimization
joined_df = employees_df.join(broadcast(departments_df), "DeptID", "inner")
joined_df.show()
Spark SQL:
SELECT /*+ BROADCAST(departments) */ *
FROM employees
INNER JOIN departments
ON employees.DeptID = departments.DeptID;
Output:
+------+-------+------+--------+---------+
|DeptID| Name|Gender|DeptName| Location|
+------+-------+------+--------+---------+
| 101| Anand| M| Sales| Chennai|
| 102| Bala| F| HR|Bangalore|
+------+-------+------+--------+---------+
5. Common Use Cases
- Combining transactional data with master data (e.g., sales data with product data).
- Enriching datasets by merging related information (e.g., customer data with order data).
- Preparing data for machine learning by combining features from multiple tables.
- Use
join() judiciously on large datasets, as it involves shuffling and sorting, which can be expensive.
- Consider using
broadcast() for small DataFrames to optimize performance.
- Use proper partitioning and indexing to improve join performance.
7. Key Takeaways
- The
join() command is used to combine two DataFrames based on a common column or key.
- It supports various types of joins, including inner, outer, left, right, and cross joins.
- Joins can be resource-intensive for large datasets, as they involve shuffling and sorting.
- In Spark SQL, similar functionality can be achieved using
JOIN clauses.
- Works efficiently on large datasets when combined with proper partitioning and caching.