coalesce
and repartition
to manage partitionsspark-shell
: spark-shell$ vi test1.txt 1,user1name,user1pwd 2,user2name,user2pwd 3,user3name,user3pwd
scala> val rddFile = sc.textFile("test1.txt") #rddFile: org.apache.spark.rdd.RDD[String] = test1.txt MapPartitionsRDD[3] at textFile at <console>:24
scala> val dsFile = spark.read.textFile("test1.txt") #dsFile: org.apache.spark.sql.Dataset[String] = [value: string]
scala> val dfFile = spark.read.text("test1.txt") #dfFile: org.apache.spark.sql.DataFrame = [value: string]
scala> val dfFileFromRDD = rddFile.toDF #dfFileFromRDD: org.apache.spark.sql.DataFrame = [value: string]
scala> dfFileFromRDD.show +--------------------+ | value| +--------------------+ |1,user1name,user1pwd| |2,user2name,user2pwd| |3,user3name,user3pwd| +--------------------+
scala> val dfFileFromDataSet = dsFile.toDF #dfFileFromDataSet: org.apache.spark.sql.DataFrame = [value: string]
scala> dfFileFromDataSet.show +--------------------+ | value| +--------------------+ |1,user1name,user1pwd| |2,user2name,user2pwd| |3,user3name,user3pwd| +--------------------+
scala> import org.apache.spark.sql.types._ #import org.apache.spark.sql.types._
scala> import org.apache.spark.sql._ #import org.apache.spark.sql._
scala> val sequence = Seq( Row(1, "user1name", "user1pwd"), Row(2, "user2name", "user2pwd"), Row(3, "user3name", "user3pwd") ) #sequence: Seq[org.apache.spark.sql.Row] = List([1,user1name,user1pwd], [2,user2name,user2pwd], [3,user3name,user3pwd])
scala> val schema = StructType( List( StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("pwd", StringType, true) ) ) #schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(name,StringType,true), StructField(pwd,StringType,true))
scala> val rddSequence = sc.parallelize(sequence) #rddSequence: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[82] at parallelize at <console>:41
scala> val dfSequence = spark.createDataFrame(rddSequence, schema) #dfSequence: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]
scala> dfSequence.printSchema root |-- id: integer (nullable = true) |-- pwd: string (nullable = true) |-- name: string (nullable = true)
scala> dfSequence.show +--+---------+--------+ |id| name| pwd | +--+---------+--------+ | 1|user1name|user1pwd| | 2|user2name|user2pwd| | 3|user3name|user3pwd| +--+---------+--------+
spark.read
scala> spark.read.<tab>
text
textFile
csv
json
parquet
orc
load
option
options
format
schema
jdbc
table
scala> val dfCSVFile = spark.read.csv("test1.txt") #dfCSVFile: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 1 more field]
scala> dfCSVFile.show +---+---------+---------+ |_c0| _c1| _c2| +---+---------+---------+ | 1|user1name|user1pwd | | 2|user2name|user2pwd | | 3|user3name|user3pwd | +---+---------+---------+
$ vi test1.json {"id": "1", "name": "user1name", "pwd": "user1pwd"} {"id": "2", "name": "user2name", "pwd": "user2pwd"} {"id": "3", "name": "user3name", "pwd": "user3pwd"}
scala> val dfJSONFile = spark.read.json("test1.json") #dfJSONFile: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field]
scala> dfJSONFile.printSchema root |-- id: string (nullable = true) |-- name: string (nullable = true) |-- pwd: string (nullable = true)
scala> dfJSONFile.show +--+---------+--------+ |id| name| pwd| +--+---------+--------+ | 1|user1name|user1pwd| | 2|user2name|user2pwd| | 3|user3name|user3pwd| +--+---------+--------+
$ vi test2.json [ {"id": "1", "name": "user1name", "pwd": "user1pwd"}, {"id": "2", "name": "user2name", "pwd": "user2pwd"}, {"id": "3", "name": "user3name", "pwd": "user3pwd"} ]
scala> val dfJSONFile = spark.read.option("multiline", "true").json("test2.json") #dfJSONFile: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field]
scala> dfJSONFile.printSchema root |-- id: string (nullable = true) |-- name: string (nullable = true) |-- pwd: string (nullable = true)
scala> dfJSONFile.show +--+---------+--------+ |id| name| pwd| +--+---------+--------+ | 1|user1name|user1pwd| | 2|user2name|user2pwd| | 3|user3name|user3pwd| +--+---------+--------+
charset
.scala> val dfJSONFile = spark.read.option("charset", "UTF-8").option("multiline", "true").json("test2.json") #dfJSONFile: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field
scala> spark.read.text("test1.txt") #res0: org.apache.spark.sql.DataFrame = [value: string]
scala> spark.read.textFile("test1.txt") #res0: org.apache.spark.sql.Dataset[String] = [value: string]
scala> spark.read.parquet("test1.parquet") #res0: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field]
scala> spark.read.orc("test1.orc") #res0: org.apache.spark.sql.DataFrame = [id: struct<string: string>, name: struct<string: string> ... 1 more field]
spark.read.format("...").load("...")
scala> spark.read.format("text").load("test1.txt") #res0: org.apache.spark.sql.DataFrame = [value: string]
scala> spark.read.format("csv").load("test1.csv") #res0: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 1 more field]
scala> spark.read.format("csv").option("header", "true").load("test1.csv") #res0: org.apache.spark.sql.DataFrame = [1: string, user1pwd: string ... 1 more field]
scala> spark.read.format("json").load("test1-single-line.json") #res0: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field]
scala> spark.read.format("json").option("multiline", "true").load("test1-multi-line.json") #res0: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field]
scala> spark.read.format("parquet").load("test1.parquet") #res0: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field]
scala> spark.read.format("orc").load("test1.orc") #res0: org.apache.spark.sql.DataFrame = [id: struct<string: string>, name: struct<string: string> ... 1 more field]
scala> import org.apache.spark.sql.types._ #import org.apache.spark.sql.types._
scala> val schema = StructType(Array( StructField("id", StringType, nullable = true), StructField("name", StringType, nullable = true), StructField("pwd", StringType, nullable = true) )) #schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,StringType,true), StructField(name,StringType,true), StructField(pwd,StringType,true))
scala> import org.apache.spark.sql.types._ #import org.apache.spark.sql.types._
scala> val fieldNames = "id,name,pwd"; #fieldNames: String = id,name,pwd
scala> val fields = fieldNames.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true)) #fields: Array[org.apache.spark.sql.types.StructField] = Array(StructField(id,StringType,true), StructField(name,StringType,true), StructField(pwd,StringType,true))
scala> val schema = StructType(fields) #schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,StringType,true), StructField(name,StringType,true), StructField(pwd,StringType,true))
scala> val dfCSVFileAddSchema = spark.createDataFrame(dfCSVFile.rdd, schema) #dfCSVFileAddSchema: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field]
scala> dfCSVFileAddSchema.printSchema root |-- id: string (nullable = true) |-- pwd: string (nullable = true) |-- name: string (nullable = true)
scala> dfCSVFileAddSchema.show +--+---------+--------+ |id| name| pwd | +--+---------+--------+ | 1|user1name|user1pwd| | 2|user2name|user2pwd| | 3|user3name|user3pwd| +--+---------+--------+
scala> val dfCSVFileWithSchema = spark.read.schema(schema).csv("test1.txt") #dfCSVFileWithSchema: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field]
scala> dfCSVFileWithSchema.printSchema root |-- id: string (nullable = true) |-- pwd: string (nullable = true) |-- name: string (nullable = true)
$ vi test2.txt id,pwd,name 1,user1pwd,user1name 2,user2pwd,user2name 3,user3pwd,user3name
scala> val dfCSVFileWithHeader = spark.read.option("header", "true").csv("test2.txt") #dfCSVFileWithHeader: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field]
scala> dfCSVFileWithHeader.printSchema root |-- id: string (nullable = true) |-- pwd: string (nullable = true) |-- name: string (nullable = true)
dfSequence
created from a sequence (see "Create DataFrame from sequence:" above).[Dataset/DataFrame].write
scala> dfSequence.write.<tab>
text
csv
json
parquet
orc
save
saveAsTable
insertInto
option
options
mode
format
partitionBy
bucketBy
sortBy
jdbc
[Dataset/DataFrame].write
scala> dfSequence.write.text("test/text") #org.apache.spark.sql.AnalysisException: Text data source supports only a single column, and you have 3 columns.;
write.csv
below) or use the following custom code (but be aware that this might not work properly in some cases):
scala> dfSequence.map(line => line.mkString(",")).write.text("test/text")
$ ls -al test/text/ -rw-r--r-- 1 spark spark part-00002-c39afb9a-1909-4f0f-8de6-1a678bf519eb-c000.txt -rw-r--r-- 1 spark spark .part-00002-c39afb9a-1909-4f0f-8de6-1a678bf519eb-c000.txt.crc -rw-r--r-- 1 spark spark part-00005-c39afb9a-1909-4f0f-8de6-1a678bf519eb-c000.txt -rw-r--r-- 1 spark spark .part-00005-c39afb9a-1909-4f0f-8de6-1a678bf519eb-c000.txt.crc -rw-r--r-- 1 spark spark part-00007-c39afb9a-1909-4f0f-8de6-1a678bf519eb-c000.txt -rw-r--r-- 1 spark spark .part-00007-c39afb9a-1909-4f0f-8de6-1a678bf519eb-c000.txt.crc -rw-r--r-- 1 spark spark _SUCCESS -rw-r--r-- 1 spark spark ._SUCCESS.crc
scala> dfSequence.write.csv("test/csv")
scala> dfSequence.write.json("test/json")
scala> dfSequence.write.parquet("test/parquet")
scala> dfSequence.write.orc("test/orc")
[Dataset/DataFrame].write.format("...").save("...")
scala> dfSequence.write.format("csv").save("test/csv")
scala> dfSequence.write.format("json").save("test/json")
scala> dfSequence.write.format("parquet").save("test/parquet")
scala> dfSequence.write.format("orc").save("test/orc")
coalesce
and repartition
to manage partitions
scala> dfSequence.rdd.partitions.size #res0: Int = 8
coalesce
and repartition
:
scala> dfSequence.coalesce(1).write.csv("test/csv")
scala> dfSequence.repartition(1).write.csv("test/csv")
dfSequence
created from a sequence (see "Create DataFrame from sequence:" above).<dataset>.printSchema
: Prints the schema to the console in a nice tree format.<dataset>.dtypes
: Returns all column names and their data types as an array.<dataset>.schema
: Returns the schema of this Dataset.<dataset>.columns
: Returns all column names as an array.<dataset>.withColumn(columnName, column)
: Returns a new Dataset by adding a column or replacing the existing column that has the same name.scala> dfSequence.withColumn("new_column",functions.lit(1))
<dataset>.withColumnRenamed(existingColumnName, newColumnName)
: Returns a new Dataset with a column renamed.scala> dfSequence.withColumn("new_column",functions.lit(1)).withColumnRenamed("new_column","num")
<dataset>.drop(columnNames)
: Returns a new Dataset with columns dropped.scala> dfSequence.drop("name")
<dataset>.count
: Returns the number of rows in the Dataset.<dataset>.show
: Displays the top 20 rows of Dataset in a tabular form.<dataset>.head
: Returns the first row.<dataset>.first
: Returns the first row.<dataset>.take(n)
: Returns the first n rows in the Dataset.<dataset>.limit(n)
: Returns a new Dataset by taking the first n rows.<dataset>.distinct
: Returns a new Dataset that contains only the unique rows from this Dataset.<dataset>.select(columnName, columnNames)
: Selects a set of columns.scala> dfSequence.select("id")
scala> dfSequence.select(col("id"), col("name"))
<dataset>.filter(condition)
: Filters rows using the given condition.scala> dfSequence.filter($"id" > 1)
scala> dfSequence.filter(col("id") > 1)
<dataset>.sort(columns)
: Returns a new Dataset sorted by the given expressions.scala> dfSequence.sort(desc("id"))
<dataset>.orderBy(columns)
: Returns a new Dataset sorted by the given expressions.scala> dfSequence.orderBy(desc("id"))
<dataset>.createTempView(viewName)
: Creates a local temporary view using the given name.scala> dfSequence.createTempView("myView") scala> spark.sql("select id from myView")