The explain()
function in Spark is used to display the execution plan of a DataFrame or Dataset operation. It provides detailed information about how Spark will execute a query, including the logical and physical plans. This is particularly useful for debugging, optimizing performance, and understanding the underlying execution process.
1. Syntax
PySpark:
df.explain(mode="extended")
Spark SQL:
EXPLAIN [EXTENDED | CODEGEN | COST | FORMATTED] SELECT ...;
2. Parameters
- mode (optional): Specifies the level of detail in the execution plan. Options include:
"simple"
: Displays only the physical plan (default).
"extended"
: Displays both the logical and physical plans.
"codegen"
: Displays the physical plan and the generated code (if applicable).
"cost"
: Displays the logical plan with cost-based optimization details.
"formatted"
: Displays a split output of the physical plan.
3. Key Components of the Execution Plan
- Logical Plan: Represents the high-level transformation steps (e.g., filters, joins, aggregations).
- Physical Plan: Represents the low-level execution steps (e.g., scans, shuffles, exchanges).
- Optimized Logical Plan: Shows the logical plan after Spark’s Catalyst optimizer applies optimizations.
- Parsed Logical Plan: Shows the initial logical plan before optimization.
4. Examples
Example 1: Simple Execution Plan
PySpark:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ExplainExample").getOrCreate()
# Create DataFrame
data = [("Anand", 25), ("Bala", 30), ("Kavitha", 28), ("Raj", 35)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)
# Display the simple execution plan
df.filter("Age > 30").explain()
Spark SQL:
EXPLAIN SELECT * FROM people WHERE Age > 30;
Output:
== Physical Plan ==
*(1) Filter (isnotnull(Age#1) AND (Age#1 > 30))
+- Scan ExistingRDD[Name#0,Age#1]
Example 2: Extended Execution Plan
PySpark:
# Display the extended execution plan
df.filter("Age > 30").explain(mode="extended")
Spark SQL:
EXPLAIN EXTENDED SELECT * FROM people WHERE Age > 30;
Output:
== Parsed Logical Plan ==
'Filter ('Age > 30)
+- AnalysisBarrier
+- Project [Name#0, Age#1]
+- LogicalRDD [Name#0, Age#1], false
== Analyzed Logical Plan ==
Name: string, Age: int
Filter (Age#1 > 30)
+- Project [Name#0, Age#1]
+- LogicalRDD [Name#0, Age#1], false
== Optimized Logical Plan ==
Filter (isnotnull(Age#1) AND (Age#1 > 30))
+- Scan ExistingRDD[Name#0,Age#1]
== Physical Plan ==
*(1) Filter (isnotnull(Age#1) AND (Age#1 > 30))
+- Scan ExistingRDD[Name#0,Age#1]
Example 3: Codegen Execution Plan
PySpark:
# Display the codegen execution plan
df.filter("Age > 30").explain(mode="codegen")
Spark SQL:
EXPLAIN CODEGEN SELECT * FROM people WHERE Age > 30;
Output:
== Physical Plan ==
*(1) Filter (isnotnull(Age#1) AND (Age#1 > 30))
+- Scan ExistingRDD[Name#0,Age#1]
Generated code:
...
PySpark:
# Display the formatted execution plan
df.filter("Age > 30").explain(mode="formatted")
Spark SQL:
EXPLAIN FORMATTED SELECT * FROM people WHERE Age > 30;
Output:
== Physical Plan ==
* Filter (1)
+- * Scan ExistingRDD (2)
(1) Filter [codegen id : 1]
Input [2]: [Name#0, Age#1]
Condition : (isnotnull(Age#1) AND (Age#1 > 30))
(2) Scan ExistingRDD [codegen id : 0]
Output [2]: [Name#0, Age#1]
Example 5: Explaining a Join Operation
PySpark:
# Create another DataFrame
departments_data = [(101, "Sales"), (102, "HR"), (103, "Finance")]
departments_columns = ["DeptID", "DeptName"]
departments_df = spark.createDataFrame(departments_data, departments_columns)
# Perform a join and explain the plan
joined_df = df.join(departments_df, df["Age"] == departments_df["DeptID"])
joined_df.explain(mode="extended")
Spark SQL:
EXPLAIN EXTENDED
SELECT *
FROM people
JOIN departments
ON people.Age = departments.DeptID;
Output:
== Parsed Logical Plan ==
...
== Analyzed Logical Plan ==
...
== Optimized Logical Plan ==
...
== Physical Plan ==
...
Example 6: Explaining an Aggregation
PySpark:
from pyspark.sql.functions import sum
# Perform an aggregation and explain the plan
aggregated_df = df.groupBy("Name").agg(sum("Age").alias("TotalAge"))
aggregated_df.explain(mode="extended")
Spark SQL:
EXPLAIN EXTENDED
SELECT Name, SUM(Age) AS TotalAge
FROM people
GROUP BY Name;
Output:
== Parsed Logical Plan ==
...
== Analyzed Logical Plan ==
...
== Optimized Logical Plan ==
...
== Physical Plan ==
...
5. Common Use Cases
- Debugging complex queries.
- Identifying performance bottlenecks (e.g., shuffles, expensive operations).
- Verifying that optimizations (e.g., predicate pushdown, join reordering) are applied.
- Use
explain()
to analyze and optimize queries, especially for large datasets.
- Look for expensive operations like shuffles, wide transformations, or full table scans.
7. Key Takeaways
- The
explain()
function is used to display the execution plan of a DataFrame or Dataset operation.
- It supports multiple modes (
simple
, extended
, codegen
, cost
, formatted
) for different levels of detail.
- Using
explain()
is a metadata operation and does not impact performance.
- In Spark SQL, similar functionality can be achieved using
EXPLAIN
.