The explain() function in Spark is used to display the execution plan of a DataFrame or Dataset operation. It provides detailed information about how Spark will execute a query, including the logical and physical plans. This is particularly useful for debugging, optimizing performance, and understanding the underlying execution process.


1. Syntax

PySpark:

df.explain(mode="extended")

Spark SQL:

EXPLAIN [EXTENDED | CODEGEN | COST | FORMATTED] SELECT ...;

2. Parameters

  • mode (optional): Specifies the level of detail in the execution plan. Options include:
    • "simple": Displays only the physical plan (default).
    • "extended": Displays both the logical and physical plans.
    • "codegen": Displays the physical plan and the generated code (if applicable).
    • "cost": Displays the logical plan with cost-based optimization details.
    • "formatted": Displays a split output of the physical plan.

3. Key Components of the Execution Plan

  • Logical Plan: Represents the high-level transformation steps (e.g., filters, joins, aggregations).
  • Physical Plan: Represents the low-level execution steps (e.g., scans, shuffles, exchanges).
  • Optimized Logical Plan: Shows the logical plan after Spark’s Catalyst optimizer applies optimizations.
  • Parsed Logical Plan: Shows the initial logical plan before optimization.

4. Examples

Example 1: Simple Execution Plan

PySpark:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ExplainExample").getOrCreate()

# Create DataFrame
data = [("Anand", 25), ("Bala", 30), ("Kavitha", 28), ("Raj", 35)]
columns = ["Name", "Age"]

df = spark.createDataFrame(data, columns)

# Display the simple execution plan
df.filter("Age > 30").explain()

Spark SQL:

EXPLAIN SELECT * FROM people WHERE Age > 30;

Output:

== Physical Plan ==
*(1) Filter (isnotnull(Age#1) AND (Age#1 > 30))
+- Scan ExistingRDD[Name#0,Age#1]

Example 2: Extended Execution Plan

PySpark:

# Display the extended execution plan
df.filter("Age > 30").explain(mode="extended")

Spark SQL:

EXPLAIN EXTENDED SELECT * FROM people WHERE Age > 30;

Output:

== Parsed Logical Plan ==
'Filter ('Age > 30)
+- AnalysisBarrier
      +- Project [Name#0, Age#1]
         +- LogicalRDD [Name#0, Age#1], false

== Analyzed Logical Plan ==
Name: string, Age: int
Filter (Age#1 > 30)
+- Project [Name#0, Age#1]
   +- LogicalRDD [Name#0, Age#1], false

== Optimized Logical Plan ==
Filter (isnotnull(Age#1) AND (Age#1 > 30))
+- Scan ExistingRDD[Name#0,Age#1]

== Physical Plan ==
*(1) Filter (isnotnull(Age#1) AND (Age#1 > 30))
+- Scan ExistingRDD[Name#0,Age#1]

Example 3: Codegen Execution Plan

PySpark:

# Display the codegen execution plan
df.filter("Age > 30").explain(mode="codegen")

Spark SQL:

EXPLAIN CODEGEN SELECT * FROM people WHERE Age > 30;

Output:

== Physical Plan ==
*(1) Filter (isnotnull(Age#1) AND (Age#1 > 30))
+- Scan ExistingRDD[Name#0,Age#1]

Generated code:
...

Example 4: Formatted Execution Plan

PySpark:

# Display the formatted execution plan
df.filter("Age > 30").explain(mode="formatted")

Spark SQL:

EXPLAIN FORMATTED SELECT * FROM people WHERE Age > 30;

Output:

== Physical Plan ==
* Filter (1)
+- * Scan ExistingRDD (2)

(1) Filter [codegen id : 1]
Input [2]: [Name#0, Age#1]
Condition : (isnotnull(Age#1) AND (Age#1 > 30))

(2) Scan ExistingRDD [codegen id : 0]
Output [2]: [Name#0, Age#1]

Example 5: Explaining a Join Operation

PySpark:

# Create another DataFrame
departments_data = [(101, "Sales"), (102, "HR"), (103, "Finance")]
departments_columns = ["DeptID", "DeptName"]

departments_df = spark.createDataFrame(departments_data, departments_columns)

# Perform a join and explain the plan
joined_df = df.join(departments_df, df["Age"] == departments_df["DeptID"])
joined_df.explain(mode="extended")

Spark SQL:

EXPLAIN EXTENDED 
SELECT * 
FROM people 
JOIN departments 
ON people.Age = departments.DeptID;

Output:

== Parsed Logical Plan ==
...
== Analyzed Logical Plan ==
...
== Optimized Logical Plan ==
...
== Physical Plan ==
...

Example 6: Explaining an Aggregation

PySpark:

from pyspark.sql.functions import sum

# Perform an aggregation and explain the plan
aggregated_df = df.groupBy("Name").agg(sum("Age").alias("TotalAge"))
aggregated_df.explain(mode="extended")

Spark SQL:

EXPLAIN EXTENDED 
SELECT Name, SUM(Age) AS TotalAge 
FROM people 
GROUP BY Name;

Output:

== Parsed Logical Plan ==
...
== Analyzed Logical Plan ==
...
== Optimized Logical Plan ==
...
== Physical Plan ==
...

5. Common Use Cases

  • Debugging complex queries.
  • Identifying performance bottlenecks (e.g., shuffles, expensive operations).
  • Verifying that optimizations (e.g., predicate pushdown, join reordering) are applied.

6. Performance Considerations

  • Use explain() to analyze and optimize queries, especially for large datasets.
  • Look for expensive operations like shuffles, wide transformations, or full table scans.

7. Key Takeaways

  1. The explain() function is used to display the execution plan of a DataFrame or Dataset operation.
  2. It supports multiple modes (simple, extended, codegen, cost, formatted) for different levels of detail.
  3. Using explain() is a metadata operation and does not impact performance.
  4. In Spark SQL, similar functionality can be achieved using EXPLAIN.