Window Functions in Spark

Window functions in Spark are used to perform calculations across a set of rows that are related to the current row. Unlike aggregate functions, which return a single value for a group of rows, window functions return a value for each row in the DataFrame. They are particularly useful for tasks like ranking, cumulative sums, moving averages, and more.


1. Syntax

PySpark:

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, lead, lag, sum, avg, etc.

window_spec = Window.partitionBy(partition_cols).orderBy(order_cols)
df.withColumn("new_column", window_function().over(window_spec))

Spark SQL:

SELECT *, 
       window_function() OVER (PARTITION BY partition_cols ORDER BY order_cols) AS new_column
FROM table_name;

2. Key Components

  • Window Specification: Defines the partitioning, ordering, and frame for the window function.
    • partitionBy(): Groups rows into partitions (similar to GROUP BY).
    • orderBy(): Orders rows within each partition.
    • rowsBetween() or rangeBetween(): Defines the window frame (e.g., current row, preceding rows, following rows).
  • Window Functions:
    • Ranking functions: row_number(), rank(), dense_rank().
    • Analytic functions: lead(), lag().
    • Aggregate functions: sum(), avg(), min(), max().

3. Common Window Functions

  • Ranking Functions:
    • row_number(): Assigns a unique sequential number to each row within a partition.
    • rank(): Assigns a rank to each row, with gaps for ties.
    • dense_rank(): Assigns a rank to each row, without gaps for ties.
  • Analytic Functions:
    • lead(): Accesses the value of a column in the next row.
    • lag(): Accesses the value of a column in the previous row.
  • Aggregate Functions:
    • sum(), avg(), min(), max(): Perform aggregations over a window of rows.

4. Examples

Example 1: Using row_number() for Ranking

PySpark:

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

spark = SparkSession.builder.appName("WindowFunctions").getOrCreate()

# Create DataFrame
data = [("Anand", "Sales", 3000), 
        ("Bala", "Sales", 4000), 
        ("Kavitha", "HR", 3500), 
        ("Raj", "HR", 4500)]
columns = ["Name", "Department", "Salary"]

df = spark.createDataFrame(data, columns)

# Define window specification
window_spec = Window.partitionBy("Department").orderBy("Salary")

# Add a row number column
df_with_row_number = df.withColumn("RowNumber", row_number().over(window_spec))
df_with_row_number.show()

Spark SQL:

SELECT *, 
       ROW_NUMBER() OVER (PARTITION BY Department ORDER BY Salary) AS RowNumber
FROM employees;

Output:

+-------+----------+------+---------+
|   Name|Department|Salary|RowNumber|
+-------+----------+------+---------+
|  Anand|     Sales|  3000|        1|
|   Bala|     Sales|  4000|        2|
|Kavitha|        HR|  3500|        1|
|    Raj|        HR|  4500|        2|
+-------+----------+------+---------+

Example 2: Using rank() and dense_rank()

PySpark:

from pyspark.sql.functions import rank, dense_rank

# Add rank and dense_rank columns
df_with_rank = df.withColumn("Rank", rank().over(window_spec)) \
                 .withColumn("DenseRank", dense_rank().over(window_spec))
df_with_rank.show()

Spark SQL:

SELECT *, 
       RANK() OVER (PARTITION BY Department ORDER BY Salary) AS Rank,
       DENSE_RANK() OVER (PARTITION BY Department ORDER BY Salary) AS DenseRank
FROM employees;

Output:

+-------+----------+------+----+----------+
|   Name|Department|Salary|Rank|DenseRank|
+-------+----------+------+----+----------+
|  Anand|     Sales|  3000|   1|         1|
|   Bala|     Sales|  4000|   2|         2|
|Kavitha|        HR|  3500|   1|         1|
|    Raj|        HR|  4500|   2|         2|
+-------+----------+------+----+----------+

Example 3: Using lead() and lag()

PySpark:

from pyspark.sql.functions import lead, lag

# Add lead and lag columns
df_with_lead_lag = df.withColumn("NextSalary", lead("Salary").over(window_spec)) \
                     .withColumn("PreviousSalary", lag("Salary").over(window_spec))
df_with_lead_lag.show()

Spark SQL:

SELECT *, 
       LEAD(Salary) OVER (PARTITION BY Department ORDER BY Salary) AS NextSalary,
       LAG(Salary) OVER (PARTITION BY Department ORDER BY Salary) AS PreviousSalary
FROM employees;

Output:

+-------+----------+------+----------+--------------+
|   Name|Department|Salary|NextSalary|PreviousSalary|
+-------+----------+------+----------+--------------+
|  Anand|     Sales|  3000|      4000|          null|
|   Bala|     Sales|  4000|      null|          3000|
|Kavitha|        HR|  3500|      4500|          null|
|    Raj|        HR|  4500|      null|          3500|
+-------+----------+------+----------+--------------+

Example 4: Using Aggregate Functions with Window

PySpark:

from pyspark.sql.functions import sum

# Add a cumulative sum column
df_with_cumulative_sum = df.withColumn("CumulativeSum", sum("Salary").over(window_spec))
df_with_cumulative_sum.show()

Spark SQL:

SELECT *, 
       SUM(Salary) OVER (PARTITION BY Department ORDER BY Salary) AS CumulativeSum
FROM employees;

Output:

+-------+----------+------+--------------+
|   Name|Department|Salary|CumulativeSum|
+-------+----------+------+--------------+
|  Anand|     Sales|  3000|          3000|
|   Bala|     Sales|  4000|          7000|
|Kavitha|        HR|  3500|          3500|
|    Raj|        HR|  4500|          8000|
+-------+----------+------+--------------+

Example 5: Using Window Frames

PySpark:

from pyspark.sql.window import Window
from pyspark.sql.functions import sum

# Define a window frame (e.g., current row and preceding row)
window_spec = Window.partitionBy("Department").orderBy("Salary").rowsBetween(-1, 0)

# Add a moving sum column
df_with_moving_sum = df.withColumn("MovingSum", sum("Salary").over(window_spec))
df_with_moving_sum.show()

Spark SQL:

SELECT *, 
       SUM(Salary) OVER (PARTITION BY Department ORDER BY Salary ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS MovingSum
FROM employees;

Output:

+-------+----------+------+----------+
|   Name|Department|Salary|MovingSum|
+-------+----------+------+----------+
|  Anand|     Sales|  3000|      3000|
|   Bala|     Sales|  4000|      7000|
|Kavitha|        HR|  3500|      3500|
|    Raj|        HR|  4500|      8000|
+-------+----------+------+----------+

Example 6: Calculating a Moving Average

PySpark:

from pyspark.sql.functions import avg

# Define a window frame for moving average (e.g., current row and 2 preceding rows)
window_spec = Window.partitionBy("Department").orderBy("Salary").rowsBetween(-2, 0)

# Add a moving average column
df_with_moving_avg = df.withColumn("MovingAvg", avg("Salary").over(window_spec))
df_with_moving_avg.show()

Spark SQL:

SELECT *, 
       AVG(Salary) OVER (PARTITION BY Department ORDER BY Salary ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS MovingAvg
FROM employees;

Output:

+-------+----------+------+----------+
|   Name|Department|Salary| MovingAvg|
+-------+----------+------+----------+
|  Anand|     Sales|  3000|    3000.0|
|   Bala|     Sales|  4000|    3500.0|
|Kavitha|        HR|  3500|    3500.0|
|    Raj|        HR|  4500|    4000.0|
+-------+----------+------+----------+

Example 7: Calculating Percentiles

PySpark:

from pyspark.sql.functions import percent_rank

# Add a percentile rank column
df_with_percentile = df.withColumn("PercentileRank", percent_rank().over(window_spec))
df_with_percentile.show()

Spark SQL:

SELECT *, 
       PERCENT_RANK() OVER (PARTITION BY Department ORDER BY Salary) AS PercentileRank
FROM employees;

Output:

+-------+----------+------+--------------+
|   Name|Department|Salary|PercentileRank|
+-------+----------+------+--------------+
|  Anand|     Sales|  3000|           0.0|
|   Bala|     Sales|  4000|           1.0|
|Kavitha|        HR|  3500|           0.0|
|    Raj|        HR|  4500|           1.0|
+-------+----------+------+--------------+

5. Common Use Cases

  • Ranking and row numbering (e.g., top N records).
  • Calculating cumulative sums or moving averages.
  • Accessing values from adjacent rows (e.g., comparing current and previous values).

6. Performance Considerations

  • Use window functions judiciously on large datasets, as they involve shuffling and sorting.
  • Optimize partitioning and ordering to reduce the size of the data processed in each window.

7. Key Takeaways

  1. Window functions perform calculations across a set of rows related to the current row.
  2. They support partitioning, ordering, and window frames for advanced calculations.
  3. Window functions can be resource-intensive for large datasets, as they involve shuffling and sorting.
  4. In Spark SQL, similar functionality can be achieved using OVER clauses.
  5. Works efficiently on large datasets when combined with proper partitioning and caching.