What is predicate pushdown?

Predicate pushdown is an optimization technique in Apache Spark where the filtering logic (predicates) is pushed closer to the data source.
Instead of Spark loading all the data into memory and applying the filters, the filtering happens directly at the data source (e.g., databases, Parquet files, ORC files, etc.), reducing the amount of data Spark needs to process.

Why?

Predicate pushdown is essential because:

  1. Efficiency: It minimizes the amount of data read from the storage layer, leading to faster query execution.
  2. Reduced Network Traffic: When querying external data sources, pushing filters reduces data transfer over the network.
  3. Resource Optimization: Less data being read translates to lower memory and CPU usage in Spark clusters.
  4. Cost Savings: For cloud storage or databases with usage-based pricing, processing less data lowers costs.

How?

Predicate pushdown is implemented by writing queries or transformations in Spark that allow it to infer the filters early in the execution plan. For example:

  • Use the filter or where transformations directly on DataFrames or SQL queries.
  • Use supported file formats like Parquet, ORC, or Avro that natively support predicate pushdown.
  • When connecting to external data sources (e.g., JDBC, Hive), ensure the predicates are supported by the source system and properly translated by Spark.

Example without predicate pushdown

In this example, the filter is applied after all the data is read into Spark. This approach processes unnecessary data, leading to inefficiency:

# Reading the Parquet file without leveraging predicate pushdown
transactions_df = spark.read.parquet("/path/to/transactions")

# Applying the filter after loading all data into Spark
filtered_df = transactions_df.filter(transactions_df['country'] == 'India')

filtered_df.show()

Why this is inefficient:

  • The entire dataset is loaded into Spark’s memory, including rows for all countries.
  • Filtering happens in Spark, not at the data source level.
  • Leads to increased I/O and unnecessary memory usage.

Example with predicate pushdown

To enable predicate pushdown, write the filter as part of the data reading process. This ensures only the required data is loaded into Spark:

# Reading the Parquet file with predicate pushdown
filtered_df = spark.read.parquet("/path/to/transactions") \
    .filter("country = 'India'")  # Pushes the filter to the Parquet reader

filtered_df.show()

Why this is efficient:

  • Spark pushes the filter (country = 'India') to the Parquet file reader.
  • Only rows where country = 'India' are loaded into memory.
  • Reduces I/O, memory usage, and processing time.

When?

Predicate pushdown should be used when:

  1. Large Datasets: Filtering early is critical to avoid reading unnecessary data.
  2. Supported Data Sources: Use data sources that allow predicates to be applied efficiently at the source level.
  3. Resource Constraints: Optimize cluster performance by reducing unnecessary memory and compute usage.
  4. Data Latency Considerations: When working with time-sensitive queries, reducing I/O and network latency through predicate pushdown is effective.

Best Use Case

Consider a retail analytics scenario where a company needs to analyze sales data stored in Parquet files for specific product categories over the last month. Without predicate pushdown, Spark will load all the data, and filtering happens in-memory. By applying predicate pushdown (product_category = 'Electronics' AND sale_date >= '2023-12-01'), Spark retrieves only the relevant rows from the Parquet file, leading to faster and more efficient execution. Predicate pushdown also shines in querying databases where filtering billions of records to retrieve a few thousand can drastically reduce load times and compute costs.