Data reading and writing operations of spark09 sparksql

1,DataFrameReader

A very important goal of SparkSQL is to improve data reading, so a new framework is added in SparkSQL, which is specially used to read external data sources, called DataFrameReader.

For example: via code Val Reader: dataframereader = spark Read , you can see spark Frame type of read

DataFrameReader # consists of the following components

assemblyexplain

schema

Structure information. Because # Dataset # is structured, when reading data, you need # Schema # information, which may be obtained from external data sources or specified

option

Parameters for connecting to external data sources, such as the URL of "JDBC", or whether to import "Header" when reading "CSV" files

format

Formats of external data sources, such as csv, jdbc, json, etc

DataFrameReader , has two access methods: one is to use the , load , method to load and use , format , to specify the loading format; the other is to use the encapsulation method, such as , csv, json, jdbc , and so on

/**
   * First experience Reader: two ways of reader reading
   *    1. load Method loading
   *    2. Packaging method
   */
  @Test
  def reader2(): Unit ={
    val spark = SparkSession.builder().master("local[6]").appName("reader2").getOrCreate()

    // First form
    spark.read
      .format("csv") // Tell the program is a csv file
      .option("header", value = true) // Tell the program that the first line of data in the file is a header
      .option("inferSchema", value = true) // Tell the program to infer the type of data field according to the content of the file
      .load("../dataset/BeijingPM20100101_20151231.csv")
      .show()

    // The second form
    spark.read
      .option("header", value = true)
      .option("inferSchema", value = true)
      .csv("../dataset/BeijingPM20100101_20151231.csv")
      .show()
  }

2,DataFrameWriter

DataFrameWriter consists of the following parts

assemblyexplain

source

The write target, file format, etc. are set through the "format" method

mode

Write mode. For example, a table already exists. If you write data to this table through DataFrameWriter, do you want to overwrite the table or append it to the table? Set by # mode # method

extraOptions

External parameters, such as JDBC URL, are set through option s

partitioningColumns

A partition similar to Hive is used when saving tables. The partition in this place is not RDD partition, but file partition or table partition, which is set through partitionBy

bucketColumnNames

Bucket splitting similar to "Hive" is used when saving the table, which is set through "bucket by"

sortColumnNames

The column used for sorting is set through sortBy

Mode specifies the write mode, such as overwriting the original data set or adding to the tail of the original data set

Scala} object representationString representationexplain

SaveMode.ErrorIfExists

"error"

When saving DataFrame to source, if the target already exists, an error will be reported

SaveMode.Append

"append"

When saving # DataFrame # to # source #, if the target already exists, it will be added to the file or # Table #

SaveMode.Overwrite

"overwrite"

When saving # DataFrame # to # source # if the target already exists, the data in # DataFrame # will be used to completely overwrite the target

SaveMode.Ignore

"ignore"

When saving # DataFrame # to # source # if the target already exists, the # DataFrame # data will not be saved and the target dataset will not be modified, similar to # CREATE TABLE IF NOT EXISTS

 

DataFrameWriter , can also be used in two ways: one is to use , format , with , save, and the other is to use encapsulation methods, such as , csv, json, saveAsTable , and so on

For example: read the file and store it in json format.

    val spark = SparkSession.builder().master("local[6]").appName("write1").getOrCreate()
    val df = spark.read.option("header", true).csv("../dataset/BeijingPM20100101_20151231.csv")
    df.write.json("../dataset/Beijing_PM1")
    df.write.format("json").save("../dataset/Beijing_PM2")

3,Parquet

When will Parquet be used?

In ETL, Spark often plays the role of T, that is, data cleaning and data conversion

In order to save complex data and ensure performance and compression rate, Parquet is usually a good choice

Therefore, Parquet may be used for data collected by external systems. When Spark reads and converts, it needs to support Parquet format files

Read and write Parquet format files

By reading a csv file, write the parquet format file and read the parquet file.

    // read csv file
    val spark = SparkSession.builder().master("local[6]").appName("write1").getOrCreate()
    val df = spark.read.option("header", true).csv("../dataset/BeijingPM20100101_20151231.csv")

    // Write the data as parquet format
    df.write
      .format("parquet") // It can also be left unspecified. The default is parquet
      .mode(SaveMode.Overwrite) // The specified write method is overwrite(cover),also Ignore(If there are documents, do nothing) Append(Add) ...
      .save("../dataset/Beijing_PM3")

    // read parquet Format file
    spark.read.load("../dataset/Beijing_PM3") // The default read format is parquet, And you can directly specify a folder
      .show()

As you can see, the default format for reading and writing without specifying format is parquet.

4. Zoning

When writing Parquet, you can specify the partition. After the partition, the folder name contains partition information, such as year=2010,month=1, etc

Spark # supports partition when writing files. You can set a column as a partition column like hive #. Hive is very important. Spark often cooperates with hive, so spark supports partition reading and writing.

    // 1. Read and write data
    val spark = SparkSession.builder().master("local[6]").appName("write1").getOrCreate()
    val df = spark.read.option("header", value = true).csv("../dataset/BeijingPM20100101_20151231.csv")

    // 2. Write file,Specify partition
    df.write
      .partitionBy("year", "month") // according to year and month partition
      .save("../dataset/Beijing_PM4")

    // 3. read file
    spark.read
      // .parquet("../dataset/Beijing_PM4/year=2010/month=1") // Read directly through the file, and the partition information will be lost
      .parquet("../dataset/Beijing_PM4") // Auto discover partition
      .printSchema() // Print schema information

5,JSON

In ETL, Spark often plays the role of T, that is, data cleaning and data conversion

In the business system, JSON is a very common data format, and it is often used in front and back-end interaction. Therefore, the data obtained from the business system is likely to use JSON format, so Spark needs to be able to support the reading of JSON format files

It is easy to read and write JSON in Spark.

JSON read / write

    df.write.json("dataset/beijing_pm5.json")

    spark.read.json("dataset/beijing_pm5.json").show()

It should be noted that the JSON file written in this way is not a complete JSON file. It takes each line as a separate JSON format, also known as JSON line file.

JSON format

    df.toJSON.show() // To be read csv File content is directly converted to JSON format

    // Spark You can save from one JSON Format string Dataset[String] Read in JSON information, Turn into DataFrame
    //Directly from rdd read json of DataFrame
    val jsonRdd = df.toJSON.rdd
    spark.read.json(jsonRdd).show()

6. Connect to MySQL

package com.thorine

import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.types.{FloatType, IntegerType, StringType, StructField, StructType}

/**
 * MySQL There are two modes of operation: using local and submitting to the cluster
 * When writing MySQL data, use local operation, and when reading, use cluster
 */
object MySQLWrite {
  def main(args: Array[String]): Unit = {
    // establish SparkSession object
    val spark = SparkSession.builder().master("local[6]").appName("MySQLWrite").getOrCreate()

    // Read data creation DataFrame
    // 1. establish schema structural information
    val schema = StructType(
      List(
        StructField("name", StringType),
        StructField("age", IntegerType),
        StructField("gpa", FloatType)
      )
    )

    // 2. File reading
    val df = spark.read
      .schema(schema)
      .option("delimiter", "\t") // Specify separator
      .csv("dataset/studenttab10k")

    // Processing data
    val resultDF = df.where(" age < 30 ")

    // Landing data to MySQL
    resultDF.write
      .format("jdbc")
      .option("url","jdbc:mysql://bigdata3:3306/spark01")
      .option("dbtable","student")
      .option("user","spark01")
      .option("password", "spark01")
      .mode(SaveMode.Overwrite)
      .save()
  }
}

SparkSQL , does not directly provide , API , and parameters for filtering and reading data according to , SQL , but it can save the country through , dbtable , which specifies the name of the target table. However, because , SQL , can be written in , dbtable , sub query can be used

attributemeaning

partitionColumn

Specify which column to partition according to. You can only set the column whose type is number, which is generally specified as ID

lowerBound, upperBound

The parameter for determining the step size. The data between lowerBound - upperBound , is distributed to each partition, the data less than , is distributed to the first partition, and the data greater than , upperBound , is distributed to the last partition

numPartitions

Number of partitions

spark.read.format("jdbc")
  .option("url", "jdbc:mysql://node01:3306/spark_test")
  .option("dbtable", "(select name, age from student where age > 10 and age < 20) as stu")
  .option("user", "spark")
  .option("password", "Spark123!")
  .option("partitionColumn", "age")
  .option("lowerBound", 1)
  .option("upperBound", 60)
  .option("numPartitions", 10)
  .load()
  .show()
 

Posted by AKA Panama Jack on Tue, 19 Apr 2022 05:40:33 +0930