• Home
  • LLMs
  • Docker
  • Kubernetes
  • Java
  • Maven
  • About
Big Data | Spark API: RDD, DataFrame, Dataset
  1. References
  2. Notes
  3. Create RDD, DataFrame, Dataset
  4. Read/Write supported options
  5. Read files (Text, CSV, JSON, Parquet, ORC)
    1. Supported methods of spark.read
    2. Read CSV files
    3. Read JSON files
    4. Read Text, Parquet, ORC files
    5. spark.read.format("...").load("...")
  6. Create schema
  7. Add schema to a DataFrame (change the schema of a DataFrame)
  8. Provide schema while reading CSV files
  9. Write Dataset/DataFrame to Text, CSV, JSON, Parquet, ORC files
    1. Supported methods of [Dataset/DataFrame].write
    2. [Dataset/DataFrame].write
    3. [Dataset/DataFrame].write.format("...").save("...")
  10. Use coalesce and repartition to manage partitions
  11. Dataset methods

  1. References
    Spark SQL, DataFrames and Datasets Guide: https://spark.apache.org/docs/2.4.3/sql-getting-started.html

    See this page for the API of Dataset: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Dataset.html

    See this page for the API of RDD: https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html

    See this page for details how to use spark-shell: spark-shell
  2. Notes
    RDD (Resilient Distributed Dataset) is the basic abstraction in Spark.
    RDD is an immutable distributed collection of elements partitioned across nodes of the cluster that can be operated on in parallel (using low-level API that allow applying transformations and performing actions on the RDD).

    DataFrame is an immutable, partitioned collection of elements that can be operated on in parallel.
    Elements are organized into named columns (similar to a relational table).

    Dataset is a strongly typed collection of domain-specific objects that can be transformed in parallel using functional or relational operations.
    Each Dataset also has an untyped view called a DataFrame, which is a Dataset of Row (Dataset[Row]).
  3. Create RDD, DataFrame, Dataset
    Create a simple-test text file:
    $ vi test1.txt
    1,user1name,user1pwd
    2,user2name,user2pwd
    3,user3name,user3pwd

    • Create RDD from text file:

      scala> val rddFile =  sc.textFile("test1.txt")
      #rddFile: org.apache.spark.rdd.RDD[String] = test1.txt MapPartitionsRDD[3] at textFile at <console>:24

    • Create Dataset from text file:

      scala> val dsFile =  spark.read.textFile("test1.txt")
      #dsFile: org.apache.spark.sql.Dataset[String] = [value: string]

    • Create DataFrame from text file:

      scala> val dfFile =  spark.read.text("test1.txt")
      #dfFile: org.apache.spark.sql.DataFrame = [value: string]

    • Create DataFrame from RDD:

      scala> val dfFileFromRDD = rddFile.toDF
      #dfFileFromRDD: org.apache.spark.sql.DataFrame = [value: string]
      scala> dfFileFromRDD.show
      +--------------------+
      |               value|
      +--------------------+
      |1,user1name,user1pwd|
      |2,user2name,user2pwd|
      |3,user3name,user3pwd|
      +--------------------+

    • Create DataFrame from DataSet:

      scala> val dfFileFromDataSet = dsFile.toDF
      #dfFileFromDataSet: org.apache.spark.sql.DataFrame = [value: string]
      scala> dfFileFromDataSet.show
      +--------------------+
      |               value|
      +--------------------+
      |1,user1name,user1pwd|
      |2,user2name,user2pwd|
      |3,user3name,user3pwd|
      +--------------------+

    • Create DataFrame from sequence:

      scala> import org.apache.spark.sql.types._
      #import org.apache.spark.sql.types._
      scala> import org.apache.spark.sql._
      #import org.apache.spark.sql._
      scala> val sequence = Seq(
        Row(1, "user1name", "user1pwd"),
        Row(2, "user2name", "user2pwd"),
        Row(3, "user3name", "user3pwd")
      )
      #sequence: Seq[org.apache.spark.sql.Row] = List([1,user1name,user1pwd], [2,user2name,user2pwd], [3,user3name,user3pwd])
      scala> val schema = StructType(
        List(
          StructField("id", IntegerType, true),
          StructField("name", StringType, true),
          StructField("pwd", StringType, true)
        )
      )
      #schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(name,StringType,true), StructField(pwd,StringType,true))
      scala> val rddSequence = sc.parallelize(sequence)
      #rddSequence: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[82] at parallelize at <console>:41
      scala> val dfSequence = spark.createDataFrame(rddSequence, schema)
      #dfSequence: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]
      scala> dfSequence.printSchema
      root
       |-- id: integer (nullable = true)
       |-- pwd: string (nullable = true)
       |-- name: string (nullable = true)
      scala> dfSequence.show
      +--+---------+--------+
      |id|     name|    pwd |
      +--+---------+--------+
      | 1|user1name|user1pwd|
      | 2|user2name|user2pwd|
      | 3|user3name|user3pwd|
      +--+---------+--------+
  4. Read/Write supported options
    To find out the list of supported options for reading and writing different file formats, you can visit the links bellow (scala code source):
    • Text Options

    • CSV Options

    • JSON Options

    • Parquet Options

    • Orc Options
  5. Read files
    1. Supported methods of spark.read
      scala> spark.read.<tab>

      • text
      • textFile
      • csv
      • json
      • parquet
      • orc

      • load

      • option
      • options

      • format

      • schema

      • jdbc

      • table
    2. Read CSV files
      scala> val dfCSVFile = spark.read.csv("test1.txt")
      #dfCSVFile: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 1 more field]
      scala> dfCSVFile.show
      +---+---------+---------+
      |_c0|      _c1|      _c2|
      +---+---------+---------+
      |  1|user1name|user1pwd |
      |  2|user2name|user2pwd |
      |  3|user3name|user3pwd |
      +---+---------+---------+
    3. Read JSON files
      • Single line:

        $ vi test1.json
        {"id": "1", "name": "user1name", "pwd": "user1pwd"}
        {"id": "2", "name": "user2name", "pwd": "user2pwd"}
        {"id": "3", "name": "user3name", "pwd": "user3pwd"}

        scala> val dfJSONFile = spark.read.json("test1.json")
        #dfJSONFile: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field]
        scala> dfJSONFile.printSchema
        root
         |-- id: string (nullable = true)
         |-- name: string (nullable = true)
         |-- pwd: string (nullable = true)
        scala> dfJSONFile.show
        +--+---------+--------+
        |id|     name|     pwd|
        +--+---------+--------+
        | 1|user1name|user1pwd|
        | 2|user2name|user2pwd|
        | 3|user3name|user3pwd|
        +--+---------+--------+

      • Multi line:

        $ vi test2.json
        [
        {"id": "1", "name": "user1name", "pwd": "user1pwd"},
        {"id": "2", "name": "user2name", "pwd": "user2pwd"},
        {"id": "3", "name": "user3name", "pwd": "user3pwd"}
        ]

        scala> val dfJSONFile = spark.read.option("multiline", "true").json("test2.json")
        #dfJSONFile: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field]
        scala> dfJSONFile.printSchema
        root
         |-- id: string (nullable = true)
         |-- name: string (nullable = true)
         |-- pwd: string (nullable = true)
        scala> dfJSONFile.show
        +--+---------+--------+
        |id|     name|     pwd|
        +--+---------+--------+
        | 1|user1name|user1pwd|
        | 2|user2name|user2pwd|
        | 3|user3name|user3pwd|
        +--+---------+--------+

      • Character encoding (charset):

        By default spark detects the character encoding but it's possible to explicitly specify the character encoding of a file by using the option charset.

        scala> val dfJSONFile = spark.read.option("charset", "UTF-8").option("multiline", "true").json("test2.json")
        #dfJSONFile: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field
    4. Read Text, Parquet, ORC files
      • Read Text file:

        scala> spark.read.text("test1.txt")
        #res0: org.apache.spark.sql.DataFrame = [value: string]

        scala> spark.read.textFile("test1.txt")
        #res0: org.apache.spark.sql.Dataset[String] = [value: string]

      • Read Parquet file:

        scala> spark.read.parquet("test1.parquet")
        #res0: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field]

      • Read ORC file:

        scala> spark.read.orc("test1.orc")
        #res0: org.apache.spark.sql.DataFrame = [id: struct<string: string>, name: struct<string: string> ... 1 more field]
    5. spark.read.format("...").load("...")
      • Read Text file:

        scala> spark.read.format("text").load("test1.txt")
        #res0: org.apache.spark.sql.DataFrame = [value: string]

      • Read CSV file:

        scala> spark.read.format("csv").load("test1.csv")
        #res0: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 1 more field]

        scala> spark.read.format("csv").option("header", "true").load("test1.csv")
        #res0: org.apache.spark.sql.DataFrame = [1: string, user1pwd: string ... 1 more field]

      • Read JSON file:

        scala> spark.read.format("json").load("test1-single-line.json")
        #res0: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field]

        scala> spark.read.format("json").option("multiline", "true").load("test1-multi-line.json")
        #res0: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field]

      • Read Parquet file:

        scala> spark.read.format("parquet").load("test1.parquet")
        #res0: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field]

      • Read ORC file:

        scala> spark.read.format("orc").load("test1.orc")
        #res0: org.apache.spark.sql.DataFrame = [id: struct<string: string>, name: struct<string: string> ... 1 more field]
  6. Create schema
    • Create schema using StructType/StructField:

      scala> import org.apache.spark.sql.types._
      #import org.apache.spark.sql.types._
      scala> val schema = StructType(Array(
        StructField("id", StringType, nullable = true),
        StructField("name", StringType, nullable = true),
        StructField("pwd", StringType, nullable = true)
      ))
      #schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,StringType,true), StructField(name,StringType,true), StructField(pwd,StringType,true))

    • Create schema using StructType/StructField (dynamic fields list + single data type):

      scala> import org.apache.spark.sql.types._
      #import org.apache.spark.sql.types._
      scala> val fieldNames = "id,name,pwd";
      #fieldNames: String = id,name,pwd
      scala> val fields = fieldNames.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true))
      #fields: Array[org.apache.spark.sql.types.StructField] = Array(StructField(id,StringType,true), StructField(name,StringType,true), StructField(pwd,StringType,true))
      scala> val schema = StructType(fields)
      #schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,StringType,true), StructField(name,StringType,true), StructField(pwd,StringType,true))
  7. Add schema to a DataFrame (change the schema of a DataFrame)
    scala> val dfCSVFileAddSchema = spark.createDataFrame(dfCSVFile.rdd, schema)
    #dfCSVFileAddSchema: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field]
    scala> dfCSVFileAddSchema.printSchema
    root
     |-- id: string (nullable = true)
     |-- pwd: string (nullable = true)
     |-- name: string (nullable = true)
    scala> dfCSVFileAddSchema.show
    +--+---------+--------+
    |id|     name|    pwd |
    +--+---------+--------+
    | 1|user1name|user1pwd|
    | 2|user2name|user2pwd|
    | 3|user3name|user3pwd|
    +--+---------+--------+
  8. Provide schema while reading CSV files
    • Provide schema while reading a CSV file:

      scala> val dfCSVFileWithSchema = spark.read.schema(schema).csv("test1.txt")
      #dfCSVFileWithSchema: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field]
      scala> dfCSVFileWithSchema.printSchema
      root
       |-- id: string (nullable = true)
       |-- pwd: string (nullable = true)
       |-- name: string (nullable = true)

    • Use the header of the CSV file as the schema:

      $ vi test2.txt
      id,pwd,name
      1,user1pwd,user1name
      2,user2pwd,user2name
      3,user3pwd,user3name

      scala> val dfCSVFileWithHeader = spark.read.option("header", "true").csv("test2.txt")
      #dfCSVFileWithHeader: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field]
      scala> dfCSVFileWithHeader.printSchema
      root
       |-- id: string (nullable = true)
       |-- pwd: string (nullable = true)
       |-- name: string (nullable = true)
  9. Write Dataset/DataFrame to Text, CSV, JSON, Parquet, ORC files
    Let's use the DataFrame dfSequence created from a sequence (see "Create DataFrame from sequence:" above).
    The same examples can be applied to Dataset.

    1. Supported methods of [Dataset/DataFrame].write
      scala> dfSequence.write.<tab>

      • text
      • csv
      • json
      • parquet
      • orc

      • save
      • saveAsTable
      • insertInto

      • option
      • options

      • mode
      • format

      • partitionBy
      • bucketBy

      • sortBy

      • jdbc
    2. [Dataset/DataFrame].write
      • Write DataFrame to Text file:

        scala> dfSequence.write.text("test/text")
        #org.apache.spark.sql.AnalysisException: Text data source supports only a single column, and you have 3 columns.;

        To fix the error above either use the csv method (see write.csv below) or use the following custom code (but be aware that this might not work properly in some cases):
        scala> dfSequence.map(line => line.mkString(",")).write.text("test/text")

        To see the generated files:
        $ ls -al test/text/
        -rw-r--r-- 1 spark spark   part-00002-c39afb9a-1909-4f0f-8de6-1a678bf519eb-c000.txt
        -rw-r--r-- 1 spark spark   .part-00002-c39afb9a-1909-4f0f-8de6-1a678bf519eb-c000.txt.crc
        -rw-r--r-- 1 spark spark   part-00005-c39afb9a-1909-4f0f-8de6-1a678bf519eb-c000.txt
        -rw-r--r-- 1 spark spark   .part-00005-c39afb9a-1909-4f0f-8de6-1a678bf519eb-c000.txt.crc
        -rw-r--r-- 1 spark spark   part-00007-c39afb9a-1909-4f0f-8de6-1a678bf519eb-c000.txt
        -rw-r--r-- 1 spark spark   .part-00007-c39afb9a-1909-4f0f-8de6-1a678bf519eb-c000.txt.crc
        -rw-r--r-- 1 spark spark   _SUCCESS
        -rw-r--r-- 1 spark spark   ._SUCCESS.crc

      • Write DataFrame to CSV file:

        scala> dfSequence.write.csv("test/csv")

      • Write DataFrame to JSON file:

        scala> dfSequence.write.json("test/json")

      • Write DataFrame to Parquet file:

        scala> dfSequence.write.parquet("test/parquet")

      • Write DataFrame to ORC file:

        scala> dfSequence.write.orc("test/orc")
    3. [Dataset/DataFrame].write.format("...").save("...")
      • Write DataFrame to CSV file:

        scala> dfSequence.write.format("csv").save("test/csv")

      • Write DataFrame to JSON file:

        scala> dfSequence.write.format("json").save("test/json")

      • Write DataFrame to Parquet file:

        scala> dfSequence.write.format("parquet").save("test/parquet")

      • Write DataFrame to ORC file:

        scala> dfSequence.write.format("orc").save("test/orc")
  10. Use coalesce and repartition to manage partitions
    Writing the DataFrame will result on many partitions to disk.
    To verify the number of partitions of DataFrame:
    scala> dfSequence.rdd.partitions.size
    #res0: Int = 8

    To be able to write the DataFrame in one single file, use either coalesce and repartition:
    scala> dfSequence.coalesce(1).write.csv("test/csv")

    scala> dfSequence.repartition(1).write.csv("test/csv")
  11. Dataset methods
    Let's use the DataFrame dfSequence created from a sequence (see "Create DataFrame from sequence:" above).
    The same examples can be applied to DataFrame.

    • Schema:
      • <dataset>.printSchema: Prints the schema to the console in a nice tree format.

      • <dataset>.dtypes: Returns all column names and their data types as an array.

      • <dataset>.schema: Returns the schema of this Dataset.

      • <dataset>.columns: Returns all column names as an array.

    • Columns:
      • <dataset>.withColumn(columnName, column): Returns a new Dataset by adding a column or replacing the existing column that has the same name.

        scala> dfSequence.withColumn("new_column",functions.lit(1))

      • <dataset>.withColumnRenamed(existingColumnName, newColumnName): Returns a new Dataset with a column renamed.

        scala> dfSequence.withColumn("new_column",functions.lit(1)).withColumnRenamed("new_column","num")

      • <dataset>.drop(columnNames): Returns a new Dataset with columns dropped.

        scala> dfSequence.drop("name")

    • Rows:
      • <dataset>.count: Returns the number of rows in the Dataset.

      • <dataset>.show: Displays the top 20 rows of Dataset in a tabular form.

      • <dataset>.head: Returns the first row.

      • <dataset>.first: Returns the first row.

      • <dataset>.take(n): Returns the first n rows in the Dataset.

      • <dataset>.limit(n): Returns a new Dataset by taking the first n rows.

      • <dataset>.distinct: Returns a new Dataset that contains only the unique rows from this Dataset.

    • Select:
      • <dataset>.select(columnName, columnNames): Selects a set of columns.

        scala> dfSequence.select("id")

        scala> dfSequence.select(col("id"), col("name"))

    • Filter:
      • <dataset>.filter(condition): Filters rows using the given condition.

        scala> dfSequence.filter($"id" > 1)

        scala> dfSequence.filter(col("id") > 1)

    • Sort:
      • <dataset>.sort(columns): Returns a new Dataset sorted by the given expressions.

        scala> dfSequence.sort(desc("id"))

      • <dataset>.orderBy(columns): Returns a new Dataset sorted by the given expressions.

        scala> dfSequence.orderBy(desc("id"))

    • Views:
      • <dataset>.createTempView(viewName): Creates a local temporary view using the given name.

        scala> dfSequence.createTempView("myView")
        scala> spark.sql("select id from myView")
© 2025  mtitek