The distinct() command in Spark is used to remove duplicate rows from a DataFrame. It returns a new DataFrame containing only the unique rows based on all columns or a subset of columns. This is particularly useful when you need to eliminate redundant data and ensure that each row in your dataset is unique.


1. Syntax

PySpark:

df.distinct()

Spark SQL:

SELECT DISTINCT * FROM table_name;

2. Parameters

  • The distinct() method does not take any parameters. It operates on the entire DataFrame by default.

3. Return Type

  • Returns a new DataFrame with duplicate rows removed.

4. Examples

Example 1: Removing Duplicate Rows from a DataFrame

PySpark:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DistinctExample").getOrCreate()

data = [("Anand", 25), ("Bala", 30), ("Kavitha", 28), ("Anand", 25)]
columns = ["Name", "Age"]

df = spark.createDataFrame(data, columns)

# Removing duplicate rows
df_distinct = df.distinct()
df_distinct.show()

Spark SQL:

SELECT DISTINCT * FROM people;

Output:

+-------+---+
|   Name|Age|
+-------+---+
|  Anand| 25|
|   Bala| 30|
| Kavitha| 28|
+-------+---+

Example 2: Removing Duplicates Based on a Subset of Columns

PySpark:

from pyspark.sql.functions import col

data = [("Anand", 25, "M"), ("Bala", 30, "F"), ("Kavitha", 28, "F"), ("Anand", 25, "M")]
columns = ["Name", "Age", "Gender"]

df = spark.createDataFrame(data, columns)

# Removing duplicates based on the 'Name' and 'Age' columns
df_distinct_subset = df.select("Name", "Age").distinct()
df_distinct_subset.show()

Spark SQL:

SELECT DISTINCT Name, Age FROM people;

Output:

+-------+---+
|   Name|Age|
+-------+---+
|  Anand| 25|
|   Bala| 30|
| Kavitha| 28|
+-------+---+

Example 3: Counting Distinct Rows

PySpark:

# Counting the number of distinct rows
distinct_count = df.distinct().count()
print("Number of distinct rows:", distinct_count)

Spark SQL:

SELECT COUNT(DISTINCT *) FROM people;

Output:

Number of distinct rows: 3

Example 4: Removing Duplicates with Null Values

PySpark:

data = [("Anand", 25), ("Bala", None), ("Kavitha", 28), ("Anand", 25), ("Bala", None)]
columns = ["Name", "Age"]

df = spark.createDataFrame(data, columns)

# Removing duplicates (including rows with null values)
df_distinct = df.distinct()
df_distinct.show()

Spark SQL:

SELECT DISTINCT * FROM people;

Output:

+-------+----+
|   Name| Age|
+-------+----+
|  Anand|  25|
|   Bala|null|
| Kavitha|  28|
+-------+----+

Example 5: Using dropDuplicates() for Subset of Columns

PySpark:

# Removing duplicates based on specific columns
df_drop_duplicates = df.dropDuplicates(["Name", "Age"])
df_drop_duplicates.show()

Spark SQL:

SELECT DISTINCT Name, Age FROM people;

Output:

+-------+---+
|   Name|Age|
+-------+---+
|  Anand| 25|
|   Bala| 30|
| Kavitha| 28|
+-------+---+

Example 6: Counting Distinct Values in a Column

PySpark:

from pyspark.sql.functions import countDistinct

# Counting distinct values in the 'Name' column
distinct_name_count = df.select(countDistinct("Name")).collect()[0][0]
print("Number of distinct names:", distinct_name_count)

Spark SQL:

SELECT COUNT(DISTINCT Name) FROM people;

Output:

Number of distinct names: 3

Example 7: Removing Duplicates with Complex Data

PySpark:

data = [("Anand", 25, "M"), ("Bala", 30, "F"), ("Kavitha", 28, "F"), ("Anand", 25, "M"), ("Bala", 30, "F")]
columns = ["Name", "Age", "Gender"]

df = spark.createDataFrame(data, columns)

# Removing duplicates
df_distinct = df.distinct()
df_distinct.show()

Spark SQL:

SELECT DISTINCT * FROM people;

Output:

+-------+---+------+
|   Name|Age|Gender|
+-------+---+------+
|  Anand| 25|     M|
|   Bala| 30|     F|
| Kavitha| 28|     F|
+-------+---+------+

5. Common Use Cases

  • Cleaning datasets by removing duplicate records.
  • Ensuring data integrity by enforcing uniqueness.
  • Preparing data for aggregation or analysis.

6. Performance Considerations

  • Use distinct() judiciously on large datasets, as it involves shuffling and sorting, which can be expensive.
  • Consider using dropDuplicates() if you only need to remove duplicates based on a subset of columns, as it can be more efficient.

7. Key Takeaways

  1. The distinct() command is used to remove duplicate rows from a DataFrame.
  2. It can be applied to the entire DataFrame or a subset of columns.
  3. In Spark SQL, similar functionality can be achieved using SELECT DISTINCT.
  4. Works efficiently on large datasets when combined with proper partitioning and caching.