1,DataFrameReader
A very important goal of SparkSQL is to improve data reading, so a new framework is added in SparkSQL, which is specially used to read external data sources, called DataFrameReader.
For example: via code Val Reader: dataframereader = spark Read , you can see spark Frame type of read
DataFrameReader # consists of the following components
assembly | explain |
---|---|
schema |
Structure information. Because # Dataset # is structured, when reading data, you need # Schema # information, which may be obtained from external data sources or specified |
option |
Parameters for connecting to external data sources, such as the URL of "JDBC", or whether to import "Header" when reading "CSV" files |
format |
Formats of external data sources, such as csv, jdbc, json, etc |
DataFrameReader , has two access methods: one is to use the , load , method to load and use , format , to specify the loading format; the other is to use the encapsulation method, such as , csv, json, jdbc , and so on
/** * First experience Reader: two ways of reader reading * 1. load Method loading * 2. Packaging method */ @Test def reader2(): Unit ={ val spark = SparkSession.builder().master("local[6]").appName("reader2").getOrCreate() // First form spark.read .format("csv") // Tell the program is a csv file .option("header", value = true) // Tell the program that the first line of data in the file is a header .option("inferSchema", value = true) // Tell the program to infer the type of data field according to the content of the file .load("../dataset/BeijingPM20100101_20151231.csv") .show() // The second form spark.read .option("header", value = true) .option("inferSchema", value = true) .csv("../dataset/BeijingPM20100101_20151231.csv") .show() }
2,DataFrameWriter
DataFrameWriter consists of the following parts
assembly | explain |
---|---|
source |
The write target, file format, etc. are set through the "format" method |
mode |
Write mode. For example, a table already exists. If you write data to this table through DataFrameWriter, do you want to overwrite the table or append it to the table? Set by # mode # method |
extraOptions |
External parameters, such as JDBC URL, are set through option s |
partitioningColumns |
A partition similar to Hive is used when saving tables. The partition in this place is not RDD partition, but file partition or table partition, which is set through partitionBy |
bucketColumnNames |
Bucket splitting similar to "Hive" is used when saving the table, which is set through "bucket by" |
sortColumnNames |
The column used for sorting is set through sortBy |
Mode specifies the write mode, such as overwriting the original data set or adding to the tail of the original data set
Scala} object representation | String representation | explain |
---|---|---|
SaveMode.ErrorIfExists |
"error" |
When saving DataFrame to source, if the target already exists, an error will be reported |
SaveMode.Append |
"append" |
When saving # DataFrame # to # source #, if the target already exists, it will be added to the file or # Table # |
SaveMode.Overwrite |
"overwrite" |
When saving # DataFrame # to # source # if the target already exists, the data in # DataFrame # will be used to completely overwrite the target |
SaveMode.Ignore |
"ignore" |
When saving # DataFrame # to # source # if the target already exists, the # DataFrame # data will not be saved and the target dataset will not be modified, similar to # CREATE TABLE IF NOT EXISTS
|
DataFrameWriter , can also be used in two ways: one is to use , format , with , save, and the other is to use encapsulation methods, such as , csv, json, saveAsTable , and so on
For example: read the file and store it in json format.
val spark = SparkSession.builder().master("local[6]").appName("write1").getOrCreate() val df = spark.read.option("header", true).csv("../dataset/BeijingPM20100101_20151231.csv") df.write.json("../dataset/Beijing_PM1") df.write.format("json").save("../dataset/Beijing_PM2")
3,Parquet
When will Parquet be used?
In ETL, Spark often plays the role of T, that is, data cleaning and data conversion
In order to save complex data and ensure performance and compression rate, Parquet is usually a good choice
Therefore, Parquet may be used for data collected by external systems. When Spark reads and converts, it needs to support Parquet format files
Read and write Parquet format files
By reading a csv file, write the parquet format file and read the parquet file.
// read csv file val spark = SparkSession.builder().master("local[6]").appName("write1").getOrCreate() val df = spark.read.option("header", true).csv("../dataset/BeijingPM20100101_20151231.csv") // Write the data as parquet format df.write .format("parquet") // It can also be left unspecified. The default is parquet .mode(SaveMode.Overwrite) // The specified write method is overwrite(cover),also Ignore(If there are documents, do nothing) Append(Add) ... .save("../dataset/Beijing_PM3") // read parquet Format file spark.read.load("../dataset/Beijing_PM3") // The default read format is parquet, And you can directly specify a folder .show()
As you can see, the default format for reading and writing without specifying format is parquet.
4. Zoning
When writing Parquet, you can specify the partition. After the partition, the folder name contains partition information, such as year=2010,month=1, etc
Spark # supports partition when writing files. You can set a column as a partition column like hive #. Hive is very important. Spark often cooperates with hive, so spark supports partition reading and writing.
// 1. Read and write data val spark = SparkSession.builder().master("local[6]").appName("write1").getOrCreate() val df = spark.read.option("header", value = true).csv("../dataset/BeijingPM20100101_20151231.csv") // 2. Write file,Specify partition df.write .partitionBy("year", "month") // according to year and month partition .save("../dataset/Beijing_PM4") // 3. read file spark.read // .parquet("../dataset/Beijing_PM4/year=2010/month=1") // Read directly through the file, and the partition information will be lost .parquet("../dataset/Beijing_PM4") // Auto discover partition .printSchema() // Print schema information
5,JSON
In ETL, Spark often plays the role of T, that is, data cleaning and data conversion
In the business system, JSON is a very common data format, and it is often used in front and back-end interaction. Therefore, the data obtained from the business system is likely to use JSON format, so Spark needs to be able to support the reading of JSON format files
It is easy to read and write JSON in Spark.
JSON read / write
df.write.json("dataset/beijing_pm5.json")
spark.read.json("dataset/beijing_pm5.json").show()
It should be noted that the JSON file written in this way is not a complete JSON file. It takes each line as a separate JSON format, also known as JSON line file.
JSON format
df.toJSON.show() // To be read csv File content is directly converted to JSON format // Spark You can save from one JSON Format string Dataset[String] Read in JSON information, Turn into DataFrame //Directly from rdd read json of DataFrame val jsonRdd = df.toJSON.rdd spark.read.json(jsonRdd).show()
6. Connect to MySQL
package com.thorine import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.types.{FloatType, IntegerType, StringType, StructField, StructType} /** * MySQL There are two modes of operation: using local and submitting to the cluster * When writing MySQL data, use local operation, and when reading, use cluster */ object MySQLWrite { def main(args: Array[String]): Unit = { // establish SparkSession object val spark = SparkSession.builder().master("local[6]").appName("MySQLWrite").getOrCreate() // Read data creation DataFrame // 1. establish schema structural information val schema = StructType( List( StructField("name", StringType), StructField("age", IntegerType), StructField("gpa", FloatType) ) ) // 2. File reading val df = spark.read .schema(schema) .option("delimiter", "\t") // Specify separator .csv("dataset/studenttab10k") // Processing data val resultDF = df.where(" age < 30 ") // Landing data to MySQL resultDF.write .format("jdbc") .option("url","jdbc:mysql://bigdata3:3306/spark01") .option("dbtable","student") .option("user","spark01") .option("password", "spark01") .mode(SaveMode.Overwrite) .save() } }
SparkSQL , does not directly provide , API , and parameters for filtering and reading data according to , SQL , but it can save the country through , dbtable , which specifies the name of the target table. However, because , SQL , can be written in , dbtable , sub query can be used
attribute | meaning |
---|---|
partitionColumn |
Specify which column to partition according to. You can only set the column whose type is number, which is generally specified as ID |
lowerBound, upperBound |
The parameter for determining the step size. The data between lowerBound - upperBound , is distributed to each partition, the data less than , is distributed to the first partition, and the data greater than , upperBound , is distributed to the last partition |
numPartitions |
Number of partitions |
spark.read.format("jdbc") .option("url", "jdbc:mysql://node01:3306/spark_test") .option("dbtable", "(select name, age from student where age > 10 and age < 20) as stu") .option("user", "spark") .option("password", "Spark123!") .option("partitionColumn", "age") .option("lowerBound", 1) .option("upperBound", 60) .option("numPartitions", 10) .load() .show()