The union() command in Spark is used to combine two DataFrames with the same schema (i.e., the same column names and data types) into a single DataFrame. It appends the rows of one DataFrame to another, similar to the SQL UNION ALL operation. If you want to remove duplicates, you can use distinct() after the union.


1. Syntax

PySpark:

df1.union(df2)

Spark SQL:

SELECT * FROM table1
UNION ALL
SELECT * FROM table2;

2. Parameters

  • df2: The DataFrame to union with. It must have the same schema as df1.

3. Return Type

  • Returns a new DataFrame containing all rows from both DataFrames.

4. Examples

Example 1: Basic Union of Two DataFrames

PySpark:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("UnionExample").getOrCreate()

# Create DataFrames
data1 = [("Anand", 25), ("Bala", 30)]
data2 = [("Kavitha", 28), ("Raj", 35)]

columns = ["Name", "Age"]

df1 = spark.createDataFrame(data1, columns)
df2 = spark.createDataFrame(data2, columns)

# Union of two DataFrames
union_df = df1.union(df2)
union_df.show()

Spark SQL:

SELECT * FROM employees1
UNION ALL
SELECT * FROM employees2;

Output:

+-------+---+
|   Name|Age|
+-------+---+
|  Anand| 25|
|   Bala| 30|
|Kavitha| 28|
|    Raj| 35|
+-------+---+

Example 2: Union with Duplicates

PySpark:

# Create DataFrames with duplicate rows
data1 = [("Anand", 25), ("Bala", 30)]
data2 = [("Bala", 30), ("Raj", 35)]

df1 = spark.createDataFrame(data1, columns)
df2 = spark.createDataFrame(data2, columns)

# Union with duplicates
union_df = df1.union(df2)
union_df.show()

Spark SQL:

SELECT * FROM employees1
UNION ALL
SELECT * FROM employees2;

Output:

+-------+---+
|   Name|Age|
+-------+---+
|  Anand| 25|
|   Bala| 30|
|   Bala| 30|
|    Raj| 35|
+-------+---+

Example 3: Union with Removal of Duplicates

PySpark:

# Union with removal of duplicates
union_df = df1.union(df2).distinct()
union_df.show()

Spark SQL:

SELECT * FROM employees1
UNION
SELECT * FROM employees2;

Output:

+-------+---+
|   Name|Age|
+-------+---+
|  Anand| 25|
|   Bala| 30|
|    Raj| 35|
+-------+---+

Example 4: Union of DataFrames with Different Column Orders

PySpark:

# Create DataFrames with different column orders
data1 = [("Anand", 25), ("Bala", 30)]
data2 = [(28, "Kavitha"), (35, "Raj")]

df1 = spark.createDataFrame(data1, ["Name", "Age"])
df2 = spark.createDataFrame(data2, ["Age", "Name"])

# Union after reordering columns
union_df = df1.union(df2.select("Name", "Age"))
union_df.show()

Spark SQL:

SELECT Name, Age FROM employees1
UNION ALL
SELECT Name, Age FROM employees2;

Output:

+-------+---+
|   Name|Age|
+-------+---+
|  Anand| 25|
|   Bala| 30|
|Kavitha| 28|
|    Raj| 35|
+-------+---+

Example 5: Union of DataFrames with Null Values

PySpark:

# Create DataFrames with null values
data1 = [("Anand", 25), ("Bala", None)]
data2 = [(None, 28), ("Raj", 35)]

df1 = spark.createDataFrame(data1, ["Name", "Age"])
df2 = spark.createDataFrame(data2, ["Name", "Age"])

# Union of DataFrames with null values
union_df = df1.union(df2)
union_df.show()

Spark SQL:

SELECT * FROM employees1
UNION ALL
SELECT * FROM employees2;

Output:

+-------+----+
|   Name| Age|
+-------+----+
|  Anand|  25|
|   Bala|null|
|   null|  28|
|    Raj|  35|
+-------+----+

Example 6: Union of DataFrames with Different Schemas (Error Case)

PySpark:

# Create DataFrames with different schemas
data1 = [("Anand", 25), ("Bala", 30)]
data2 = [("Kavitha", 28, "HR"), ("Raj", 35, "Sales")]

df1 = spark.createDataFrame(data1, ["Name", "Age"])
df2 = spark.createDataFrame(data2, ["Name", "Age", "Department"])

# Attempting to union DataFrames with different schemas will raise an error
try:
    union_df = df1.union(df2)
except Exception as e:
    print("Error:", e)

Output:

Error: Union can only be performed on tables with the same number of columns...

Example 7: Union of DataFrames with Different Column Names (Error Case)

PySpark:

# Create DataFrames with different column names
data1 = [("Anand", 25), ("Bala", 30)]
data2 = [("Kavitha", 28), ("Raj", 35)]

df1 = spark.createDataFrame(data1, ["Name", "Age"])
df2 = spark.createDataFrame(data2, ["FullName", "Age"])

# Attempting to union DataFrames with different column names will raise an error
try:
    union_df = df1.union(df2)
except Exception as e:
    print("Error:", e)

Output:

Error: Union can only be performed on tables with the same column names...

5. Common Use Cases

  • Combining datasets from different time periods (e.g., daily logs, monthly reports).
  • Appending new records to an existing dataset.
  • Merging datasets from multiple sources with the same schema.

6. Performance Considerations

  • Use union() judiciously on large datasets, as it can increase the size of the DataFrame.
  • Use distinct() after union() if you need to remove duplicates, but be aware that it involves shuffling and sorting.

7. Key Takeaways

  1. The union() command is used to combine two DataFrames with the same schema into a single DataFrame.
  2. It appends rows from one DataFrame to another, similar to SQL UNION ALL.
  3. In Spark SQL, similar functionality can be achieved using UNION ALL or UNION (to remove duplicates).
  4. Works efficiently on large datasets as it does not involve data transformation.