Objectives

  1. Read from CSV files
  2. Read from JSON files
  3. Write DataFrame to files
  4. Write DataFrame to tables
  5. Write DataFrame to Delta tables

Methods:

  • DataFrameReader: read, csv, json, parquet, table, format, load, option, schema
  • DataFrameWriter: write, csv, json, parquet, table, format, save, option, mode, saveAsTable,
  • StructType: toDDL

SparkTypes:

  • Types: StructType, StructField, StringType, IntegerType, DoubleType, DateType, TimestampType, BooleanType

DataFrameReader

  • The DataFrameReader class is used to read data from various sources (e.g., CSV, JSON, Parquet, Delta) into a DataFrame.
  • Interface used to load a DataFrame from external data source.
  • The DataFrameReader class is available as the read attribute of the SparkSession object.

spark.read.parquet("/mnt/tables/sales")

  • The above command reads the Parquet files from the specified path and returns a DataFrame.
  • The read method returns a DataFrameReader object that can be used to read data from various sources.

Read from CSV files

  • Read from CSV with the DataFrameReader class’ csv method.
file_path = "/mnt/tables/sales.csv"
sales_df = spark
    .read \
    .option("sep", "\t") \
    .option("header", "true") \ 
    .option("inferSchema", "true") \
    .csv(file_path)

sales_df.printSchema()

When you execute the above code, you will see two jobs spun. You may wonder why when there is no action involved, the jobs spun. The below two option that we used in the above code are required the file to be read. This is the reason you see the jobs executed.

    .option("header", "true") \ 
    .option("inferSchema", "true") \

Another way to write the same code is as below.

file_path = "/mnt/tables/sales.csv"
sales_df = spark.read.csv(file_path, sep="\t", header=True, inferSchema=True)
sales_df.printSchema()

Define the schema explicitly using StructType and StructField classes.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType
from datetime import datetime
sales_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("price", DoubleType(), True),
    StructField("order_date", DateType(), True)
])
sales_df = spark.read.csv(file_path, sep="\t", header=True, schema=sales_schema)
  • The above code reads the CSV file from the specified path and returns a DataFrame with the specified schema.
  • The StructType class is used to define the schema of the DataFrame.
  • The StructField class is used to define the fields of the schema.
  • The StringType, IntegerType, DoubleType, and DateType classes are used to define the data types of the fields.
  • The True parameter in the StructField constructor indicates that the field is nullable.
  • This time there is no need to spin the jobs as we have defined the schema explicitly.

There is an alternative way to define schema using DDL syntax.

ddl_schema = """
    order_id STRING,
    customer_id STRING,
    product_id STRING,
    quantity INT,
    price DOUBLE,
    order_date DATE
"""
sales_df = spark.read.csv(file_path, sep="\t", header=True, schema=ddl_schema)

This code does exactly the same thing as the previous code, but uses DDL syntax to define the schema.

  • The ddl_schema variable contains the DDL syntax for the schema.
  • The schema parameter in the csv method is set to the ddl_schema variable.

Read from JSON files

  • Read from JSON with the DataFrameReader class’ json method.
file_path = "/mnt/tables/sales.json"
sales_df = spark
    .read \
    .option("inferSchema", "true") \
    .json(file_path)
sales_df.printSchema()
  • The above code reads the JSON file from the specified path and returns a DataFrame.
  • This will spin a job as we have used the inferSchema option. If we explicitly define the schema, there will be no job spun.

You can use the StructType Scala method toDDL to convert the schema to DDL syntax. This will provide you the DDL-formatted string. This will be useful if you want to get the DDL formated string for ingesting CSV or JSON. But you don’t want to write the StructType variant of the schema. This functionality is not available in Python. But you can use the toDDL method in Scala and then copy the DDL-formatted string to Python.

val sales_file_path = "/mnt/tables/sales.json"
val sales_df_schema = spark.read.option("inferSchema", "true").json(sales_file_path).schema.toDDL()
println(sales_df_schema)