Window Functions in Spark
Window functions in Spark are used to perform calculations across a set of rows that are related to the current row. Unlike aggregate functions, which return a single value for a group of rows, window functions return a value for each row in the DataFrame. They are particularly useful for tasks like ranking, cumulative sums, moving averages, and more.
1. Syntax
PySpark:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, lead, lag, sum, avg, etc.
window_spec = Window.partitionBy(partition_cols).orderBy(order_cols)
df.withColumn("new_column", window_function().over(window_spec))
Spark SQL:
SELECT *,
window_function() OVER (PARTITION BY partition_cols ORDER BY order_cols) AS new_column
FROM table_name;
2. Key Components
- Window Specification: Defines the partitioning, ordering, and frame for the window function.
partitionBy()
: Groups rows into partitions (similar to GROUP BY
).
orderBy()
: Orders rows within each partition.
rowsBetween()
or rangeBetween()
: Defines the window frame (e.g., current row, preceding rows, following rows).
- Window Functions:
- Ranking functions:
row_number()
, rank()
, dense_rank()
.
- Analytic functions:
lead()
, lag()
.
- Aggregate functions:
sum()
, avg()
, min()
, max()
.
3. Common Window Functions
- Ranking Functions:
row_number()
: Assigns a unique sequential number to each row within a partition.
rank()
: Assigns a rank to each row, with gaps for ties.
dense_rank()
: Assigns a rank to each row, without gaps for ties.
- Analytic Functions:
lead()
: Accesses the value of a column in the next row.
lag()
: Accesses the value of a column in the previous row.
- Aggregate Functions:
sum()
, avg()
, min()
, max()
: Perform aggregations over a window of rows.
4. Examples
Example 1: Using row_number()
for Ranking
PySpark:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
spark = SparkSession.builder.appName("WindowFunctions").getOrCreate()
# Create DataFrame
data = [("Anand", "Sales", 3000),
("Bala", "Sales", 4000),
("Kavitha", "HR", 3500),
("Raj", "HR", 4500)]
columns = ["Name", "Department", "Salary"]
df = spark.createDataFrame(data, columns)
# Define window specification
window_spec = Window.partitionBy("Department").orderBy("Salary")
# Add a row number column
df_with_row_number = df.withColumn("RowNumber", row_number().over(window_spec))
df_with_row_number.show()
Spark SQL:
SELECT *,
ROW_NUMBER() OVER (PARTITION BY Department ORDER BY Salary) AS RowNumber
FROM employees;
Output:
+-------+----------+------+---------+
| Name|Department|Salary|RowNumber|
+-------+----------+------+---------+
| Anand| Sales| 3000| 1|
| Bala| Sales| 4000| 2|
|Kavitha| HR| 3500| 1|
| Raj| HR| 4500| 2|
+-------+----------+------+---------+
Example 2: Using rank()
and dense_rank()
PySpark:
from pyspark.sql.functions import rank, dense_rank
# Add rank and dense_rank columns
df_with_rank = df.withColumn("Rank", rank().over(window_spec)) \
.withColumn("DenseRank", dense_rank().over(window_spec))
df_with_rank.show()
Spark SQL:
SELECT *,
RANK() OVER (PARTITION BY Department ORDER BY Salary) AS Rank,
DENSE_RANK() OVER (PARTITION BY Department ORDER BY Salary) AS DenseRank
FROM employees;
Output:
+-------+----------+------+----+----------+
| Name|Department|Salary|Rank|DenseRank|
+-------+----------+------+----+----------+
| Anand| Sales| 3000| 1| 1|
| Bala| Sales| 4000| 2| 2|
|Kavitha| HR| 3500| 1| 1|
| Raj| HR| 4500| 2| 2|
+-------+----------+------+----+----------+
Example 3: Using lead()
and lag()
PySpark:
from pyspark.sql.functions import lead, lag
# Add lead and lag columns
df_with_lead_lag = df.withColumn("NextSalary", lead("Salary").over(window_spec)) \
.withColumn("PreviousSalary", lag("Salary").over(window_spec))
df_with_lead_lag.show()
Spark SQL:
SELECT *,
LEAD(Salary) OVER (PARTITION BY Department ORDER BY Salary) AS NextSalary,
LAG(Salary) OVER (PARTITION BY Department ORDER BY Salary) AS PreviousSalary
FROM employees;
Output:
+-------+----------+------+----------+--------------+
| Name|Department|Salary|NextSalary|PreviousSalary|
+-------+----------+------+----------+--------------+
| Anand| Sales| 3000| 4000| null|
| Bala| Sales| 4000| null| 3000|
|Kavitha| HR| 3500| 4500| null|
| Raj| HR| 4500| null| 3500|
+-------+----------+------+----------+--------------+
Example 4: Using Aggregate Functions with Window
PySpark:
from pyspark.sql.functions import sum
# Add a cumulative sum column
df_with_cumulative_sum = df.withColumn("CumulativeSum", sum("Salary").over(window_spec))
df_with_cumulative_sum.show()
Spark SQL:
SELECT *,
SUM(Salary) OVER (PARTITION BY Department ORDER BY Salary) AS CumulativeSum
FROM employees;
Output:
+-------+----------+------+--------------+
| Name|Department|Salary|CumulativeSum|
+-------+----------+------+--------------+
| Anand| Sales| 3000| 3000|
| Bala| Sales| 4000| 7000|
|Kavitha| HR| 3500| 3500|
| Raj| HR| 4500| 8000|
+-------+----------+------+--------------+
Example 5: Using Window Frames
PySpark:
from pyspark.sql.window import Window
from pyspark.sql.functions import sum
# Define a window frame (e.g., current row and preceding row)
window_spec = Window.partitionBy("Department").orderBy("Salary").rowsBetween(-1, 0)
# Add a moving sum column
df_with_moving_sum = df.withColumn("MovingSum", sum("Salary").over(window_spec))
df_with_moving_sum.show()
Spark SQL:
SELECT *,
SUM(Salary) OVER (PARTITION BY Department ORDER BY Salary ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS MovingSum
FROM employees;
Output:
+-------+----------+------+----------+
| Name|Department|Salary|MovingSum|
+-------+----------+------+----------+
| Anand| Sales| 3000| 3000|
| Bala| Sales| 4000| 7000|
|Kavitha| HR| 3500| 3500|
| Raj| HR| 4500| 8000|
+-------+----------+------+----------+
Example 6: Calculating a Moving Average
PySpark:
from pyspark.sql.functions import avg
# Define a window frame for moving average (e.g., current row and 2 preceding rows)
window_spec = Window.partitionBy("Department").orderBy("Salary").rowsBetween(-2, 0)
# Add a moving average column
df_with_moving_avg = df.withColumn("MovingAvg", avg("Salary").over(window_spec))
df_with_moving_avg.show()
Spark SQL:
SELECT *,
AVG(Salary) OVER (PARTITION BY Department ORDER BY Salary ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS MovingAvg
FROM employees;
Output:
+-------+----------+------+----------+
| Name|Department|Salary| MovingAvg|
+-------+----------+------+----------+
| Anand| Sales| 3000| 3000.0|
| Bala| Sales| 4000| 3500.0|
|Kavitha| HR| 3500| 3500.0|
| Raj| HR| 4500| 4000.0|
+-------+----------+------+----------+
Example 7: Calculating Percentiles
PySpark:
from pyspark.sql.functions import percent_rank
# Add a percentile rank column
df_with_percentile = df.withColumn("PercentileRank", percent_rank().over(window_spec))
df_with_percentile.show()
Spark SQL:
SELECT *,
PERCENT_RANK() OVER (PARTITION BY Department ORDER BY Salary) AS PercentileRank
FROM employees;
Output:
+-------+----------+------+--------------+
| Name|Department|Salary|PercentileRank|
+-------+----------+------+--------------+
| Anand| Sales| 3000| 0.0|
| Bala| Sales| 4000| 1.0|
|Kavitha| HR| 3500| 0.0|
| Raj| HR| 4500| 1.0|
+-------+----------+------+--------------+
5. Common Use Cases
- Ranking and row numbering (e.g., top N records).
- Calculating cumulative sums or moving averages.
- Accessing values from adjacent rows (e.g., comparing current and previous values).
- Use window functions judiciously on large datasets, as they involve shuffling and sorting.
- Optimize partitioning and ordering to reduce the size of the data processed in each window.
7. Key Takeaways
- Window functions perform calculations across a set of rows related to the current row.
- They support partitioning, ordering, and window frames for advanced calculations.
- Window functions can be resource-intensive for large datasets, as they involve shuffling and sorting.
- In Spark SQL, similar functionality can be achieved using
OVER
clauses.
- Works efficiently on large datasets when combined with proper partitioning and caching.
Responses are generated using AI and may contain mistakes.