2021-02-21 big data course notes day32


@Principal R star

Spark day 4 [SparkCore content]

primary coverage

  1. Spark Master startup source code
  2. Spark Submit task submission source code
  3. Spark Driver startup source code
  4. Spark Application registers and allocates resources
  5. Spark resource scheduling conclusion
  6. Spark task scheduling
  7. Spark quadratic sorting problem
  8. Spark group topN problem

Learning objectives

Section I Spark Master startup

  1. Spark resource task scheduling object diagram
  2. Cluster startup process
    After the spark cluster starts, first call $SPARK_HOME/sbin/start-all.sh,start-all. The middle note is called "start-master.sh" script and "start-slaves.sh" script in SH script. Start-master. In the SH script, you can see the main class for starting the master role: "org.apache.spark.deploy.master.Master". In the corresponding start slave Start - slave is called again in the SH script SH script, in star slave In the SH script, you can see the main class that starts the worker role: "org.apache.spark.deploy.worker.Worker".
  • Master & worker start
    The design idea of spark framework is that each node will start the corresponding Netty communication environment, which is called RpcEnv communication environment. Before starting each role, first register the corresponding Endpoint in the NettyRpcEnv environment, and then start. Roles include: Master, Worker, Driver, Executor, etc. The following figure shows the process of starting the master role after starting the start all cluster. The main class "org.apache.spark.deploy.master.Master" will be called when starting the master role. The execution process is as follows:

Section II Spark Submit task submission

  • SparkSubmit task submission
    Spark submit calls $spark when submitting a task_ The org.apache.spark.deploy.SparkSubmit class is called in the middle note of HOME/bin/spark-submit spark-submit script. When executing this class, first run the main method to set parameters, and then apply to the Master to start the Driver. The code flow is shown as follows:
  • Start the DriverWrapper class
    After submitting the task, the client applies to the Master to start the Driver. First, a DriverWrapper class will be started to wrap and run the application submitted by the user. The starting process of the DriverWrapper class is as follows:
  • Register Driver Endpoint and Application with Master
    When the user's code is executed, the real Driver role will be registered in the new SparkContext. The role name is "CoarseGrainedScheduler". After the Driver role is registered, the "AppClient" role will be registered, and the current role will register the Application with the Master. The code flow is as follows:

Section 3 Spark resource scheduling source code

  1. Spark resource scheduling source code process
    Spark resource scheduling source code starts after the Driver is started and the Application is registered. Spark resource scheduling mainly refers to how the spark cluster divides resources on the Worker resource node for the currently submitted Spark application. Spark resource scheduling source code is in master In schedule() in the scala class.

  2. Spark resource scheduling source code conclusion
    1) The executor is started in a decentralized manner in the cluster, which is conducive to the data localization of task calculation.
    2) By default (the -- Executor cores option is not set when submitting a task), each Worker starts an Executor for the current Application, and the Executor will use all the cores and 1G memory of the Worker.
    3) If you want to start multiple executors on the Worker, you should add the option -- Executor cores when submitting the Application.
    4) By default -- total executor cores is not set. One Application will use all cores in the Spark cluster.
    5) Starting the Executor is not only related to core, but also related to memory.

  3. Resource scheduling source code conclusion verification
    Use spark submit to submit the task presentation. You can also use spark shell to verify.

1) By default, each worker starts an Executor for the current Application, which uses all cores and 1G of memory in the cluster.

./spark-submit 
--master spark://node1:7077
 --class org.apache.spark.examples.SparkPi
 ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 
10000

2) Start multiple executors on workr, and set the -- executor cores parameter to specify the number of cores used by each executor.

./spark-submit
 --master  spark://node1:7077
 --executor-cores 1 
 --class org.apache.spark.examples.SparkPi 
../lib/spark-examples-1.6.0-hadoop2.6.0.jar 
10000

3) Start the core when there is not enough memory. Spark startup depends not only on the core configuration parameters, but also on whether the memory of the configured core is sufficient.

./spark-submit 
--master  spark://node1:7077 
--executor-cores 1  
--executor-memory 3g 
--class org.apache.spark.examples.SparkPi
 ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 
10000

– how many cores are used in the total executor cores cluster
Note: a process cannot be started by multiple nodes of the cluster.

./spark-submit 
--master  spark://node1:7077 
--executor-cores 1  
--executor-memory 2g 
--total-executor-cores 3
--class org.apache.spark.examples.SparkPi
 ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 
10000

Section 4 Spark task scheduling source code

Spark task scheduling source code starts with an action operator of Spark Application. When the action operator starts executing, it will call a series of RDD logic to trigger the job. There is also the stage division process:

Section V Spark secondary sorting and grouping topN

  1. Secondary sorting
    Many sorting scenarios in big data need to sort according to one column first. If the data of the current column is the same, then sort another column. This is the secondary sorting scenario. For example, to find out the top 10 active users of the website, the evaluation standard of active users is that users log on to the website the most days in the current quarter. If some users log on to the website the same days in the current quarter, then compare the current login duration of these users to sort and find out the active users. This is a typical secondary sort scenario.
    To solve the problem of secondary sorting, we can encapsulate the object and realize the corresponding comparison method in the object.
1.	SparkConf sparkConf = new SparkConf()
2.	.setMaster("local")
3.	.setAppName("SecondarySortTest");
4.	final JavaSparkContext sc = new JavaSparkContext(sparkConf);
5.	
6.	JavaRDD<String> secondRDD = sc.textFile("secondSort.txt");
7.	
8.	JavaPairRDD<SecondSortKey, String> pairSecondRDD = secondRDD.mapToPair(new PairFunction<String, SecondSortKey, String>() {
9.	
10.	  /**
11.	   * 
12.	  */
13.	  private static final long serialVersionUID = 1L;
14.	
15.	  @Override
16.	  public Tuple2<SecondSortKey, String> call(String line) throws Exception {
17.	    String[] splited = line.split(" ");
18.	    int first = Integer.valueOf(splited[0]);
19.	    int second = Integer.valueOf(splited[1]);
20.	    SecondSortKey secondSortKey = new SecondSortKey(first,second);
21.	    return new Tuple2<SecondSortKey, String>(secondSortKey,line);
22.	  }
23.	});
24.	
25.	pairSecondRDD.sortByKey(false).foreach(new 
26.	VoidFunction<Tuple2<SecondSortKey,String>>() {
27.	
28.	  /**
29.	   * 
30.	   */
31.	  private static final long serialVersionUID = 1L;
32.	
33.	    @Override
34.	    public void call(Tuple2<SecondSortKey, String> tuple) throws Exception {
35.	      System.out.println(tuple._2);
36.	  }
37.	});
38.	
39.	
40.	
41.	public class SecondSortKey implements Serializable,Comparable<SecondSortKey>{
42.	  /**
43.	   * 
44.	   */
45.	  private static final long serialVersionUID = 1L;
46.	  private int first;
47.	  private int second;
48.	  public int getFirst() {
49.	    return first;
50.	  }
51.	  public void setFirst(int first) {
52.	    this.first = first;
53.	  }
54.	  public int getSecond() {
55.	    return second;
56.	  }
57.	  public void setSecond(int second) {
58.	    this.second = second;
59.	  }
60.	  public SecondSortKey(int first, int second) {
61.	    super();
62.	    this.first = first;
63.	    this.second = second;
64.	  }
65.	  @Override
66.	  public int compareTo(SecondSortKey o1) {
67.	    if(getFirst() - o1.getFirst() ==0 ){
68.	      return getSecond() - o1.getSecond();
69.	    }else{
70.	      return getFirst() - o1.getFirst();
71.	    }
72.	  }
73.	}
  1. Group topN
    When big data is grouped according to a Key and the topN of data in each group is found, this is the problem of grouping and taking the topN.
    There are two ways to solve the problem of grouping topn. The first is to directly group and sort the data in the group. The second way is to directly use the fixed length array to solve the problem of grouping topn.
1.	SparkConf conf = new SparkConf()
2.	.setMaster("local")
3.	.setAppName("TopOps");
4.	JavaSparkContext sc = new JavaSparkContext(conf);
5.	JavaRDD<String> linesRDD = sc.textFile("scores.txt");
6.	
7.	JavaPairRDD<String, Integer> pairRDD = linesRDD.mapToPair(new PairFunction<String, String, Integer>() {
8.	
9.	/**
10.	  * 
11.	  */
12.	  private static final long serialVersionUID = 1L;
13.	
14.	  @Override
15.	  public Tuple2<String, Integer> call(String str) throws Exception {
16.	    String[] splited = str.split("\t");
17.	    String clazzName = splited[0];
18.	    Integer score = Integer.valueOf(splited[1]);
19.	    return new Tuple2<String, Integer> (clazzName,score);
20.	  }
21.	});
22.	
23.	pairRDD.groupByKey().foreach(new 
24.	VoidFunction<Tuple2<String,Iterable<Integer>>>() {
25.	
26.	  /**
27.	   * 
28.	   */
29.	  private static final long serialVersionUID = 1L;
30.	
31.	  @Override
32.	  public void call(Tuple2<String, Iterable<Integer>> tuple) throws Exception {
33.	    String clazzName = tuple._1;
34.	    Iterator<Integer> iterator = tuple._2.iterator();
35.	
36.	    Integer[] top3 = new Integer[3];
37.	
38.	    while (iterator.hasNext()) {
39.	      Integer score = iterator.next();
40.	
41.	      for (int i = 0; i < top3.length; i++) {
42.	        if(top3[i] == null){
43.	          top3[i] = score;
44.	          break;
45.	        }else if(score > top3[i]){
46.	        for (int j = 2; j > i; j--) {
47.	          top3[j] = top3[j-1];
48.	        }
49.	        top3[i] = score;
50.	        break;
51.	      }
52.	    }
53.	  }
54.	  System.out.println("class Name:"+clazzName);
55.	  for(Integer sscore : top3){
56.	    System.out.println(sscore);
57.	  }
58.	}
59.	});

Operation in this section

  1. Spark bottom code design idea, taking Master as an example?
  2. Spark resource scheduling source code process?
  3. Spark resource scheduling source code conclusion?
  4. Take the topN problem code for secondary sorting and grouping.

Tags: Java Big Data Nginx Hadoop Spark

Posted by brian79 on Sun, 17 Apr 2022 10:41:20 +0930