Flink1.10 task submission process analysis

stay Flink1.10 task submission process analysis (I) This paper analyzes the process analysis from flick run to task submission to the cluster. For different submission modes, Flink uses different pipelineexecutors. This paper analyzes the process of submitting tasks to the yarn cluster based on the yarn per job mode. (Note: Based on 1.10.1 analysis)


Then, according to the analysis in the previous part, the final task submission is to be execute d by PipelineExecutor, and the selection of PipelineExecutor is determined according to different submission modes, that is, execution Determined by the target parameter, the executor of yarnjobclusterexecution type will be selected for yarnjob per job.

public class YarnJobClusterExecutor extends AbstractJobClusterExecutor<ApplicationId, YarnClusterClientFactory> {
   public static final String NAME = "yarn-per-job";
   public YarnJobClusterExecutor() {
      super(new YarnClusterClientFactory());

Its implementation is relatively simple and important. In its constructor, YarnClusterClientFactory is used to create YarnClusterDescriptor, which contains some information of submitting yarn, such as YarnClient, yarn configuration, queue of submitting yarn, etc. It inherits the abstractjobclusterexecution. The abstract task is submitted to the executor, and the execute is also executed by abstractjobclusterexecution:

public class AbstractJobClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements PipelineExecutor {

   private static final Logger LOG = LoggerFactory.getLogger(AbstractJobClusterExecutor.class);
   //It represents YarnClusterClientFactory
   private final ClientFactory clusterClientFactory;

   public AbstractJobClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) {
      this.clusterClientFactory = checkNotNull(clusterClientFactory);

   //Execute task submission
   //pipeline stands for StreamGraph
   public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception {
      //Convert StreamGraph to JobGraph
      final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, configuration);
     //Create some information for submitting tasks: YarnClusterDescriptor
      try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
        //Encapsulate the configuration information in the ExecutionConfigAccessor
         final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
         //It contains the description of the resources required to submit the task: memory size and parallelism 
         final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
         //Submit task
         final ClusterClientProvider<ClusterID> clusterClientProvider = clusterDescriptor
               .deployJobCluster(clusterSpecification, jobGraph, 
                                 //Whether separate mode is adopted
         LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());

         return CompletableFuture.completedFuture(
               new ClusterClientJobClientAdapter<>(clusterClientProvider, jobGraph.getJobID()));

The cluster specification describes the resource size required to submit tasks to the cluster. For the allocation mode, it is recommended to read the official website flink1 10 memory management mechanism for better understanding. The task is finally handed over to YarnClusterDescriptor deploy.

Deploy process

The deploy process represents the process of interacting with yarn, clusterdescriptor Deployjobcluster will call the internal deployInternal method:

private ClusterClientProvider<ApplicationId> deployInternal(
      ClusterSpecification clusterSpecification,
      String applicationName,
      String yarnClusterEntrypoint,
      @Nullable JobGraph jobGraph,
      boolean detached) throws Exception {
    //.....  Some checks will be done: check whether the yarn queue exists and check the configuration
    //Verify resource size, etc
   ApplicationReport report = startAppMaster(


The most important is startAppMaster. Start an AppMaster process on yarn, where yarnClusterEntrypoint represents the entry class of the process, that is, the startup entry class of JobMaster: org apache. flink. yarn. entrypoint. Yarnjobclusterentrypoint can also be seen on the machine process of the cluster. If we see this process, we can know that it represents the process of JobMaster.

The process of startAppMaster is relatively long, and it will be broken down one by one here:

private ApplicationReport startAppMaster(
      Configuration configuration,
      String applicationName,
      String yarnClusterEntrypoint,
      JobGraph jobGraph,
      YarnClient yarnClient,
      YarnClientApplication yarnApplication,
      ClusterSpecification clusterSpecification) throws Exception {

   // ------------------ Initialize the file systems -------------------------


   //Get homeDir, which indicates the upload path of jar and log configuration, generally on hdfs
   //Its path is / user/hadoop, (the current user represented by hadoop)
   final FileSystem fs = FileSystem.get(yarnConfiguration);
   final Path homeDir = fs.getHomeDirectory();
   //Description information submitted to yarn
   ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
   // Will be uploaded to the hdfs file and added to the classpath
   Set<File> systemShipFiles = new HashSet<>(shipFiles.size());
   // It will only be uploaded to hdfs, but will not be added to the classpath
   Set<File> shipOnlyFiles = new HashSet<>();
   for (File file : shipFiles) {

   final String logConfigFilePath = configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);
   if (logConfigFilePath != null) {
      systemShipFiles.add(new File(logConfigFilePath));
   //Flink_ The files under home / lib are added to systemShipFiles, and the files specified through - yt are also in it

   //Flink_ Add the files under home / plugins to shipOnlyFiles

   final ApplicationId appId = appContext.getApplicationId();

   // ZK ha related configuration
   String zkNamespace = getZookeeperNamespace();
   // no user specified cli argument for namespace?
   if (zkNamespace == null || zkNamespace.isEmpty()) {
      // namespace defined in config? else use applicationId as default.
      zkNamespace = configuration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, String.valueOf(appId));

   configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);

   if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
      // activate re-execution of failed applications

   } else {
      // set number of application retries to 1 in the default case

  //userJarFiles represents the user jar
   final Set<File> userJarFiles = (jobGraph == null)
         // not per-job submission
         ? Collections.emptySet()
         // add user code jars from the provided JobGraph
         : jobGraph.getUserJars().stream().map(f -> f.toUri()).map(File::new).collect(Collectors.toSet());

   //cache files need to be uploaded to hdfs, which is generally used in file sharing
   if (jobGraph != null) {
      for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry : jobGraph.getUserArtifacts().entrySet()) {
         org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(entry.getValue().filePath);
         // only upload local files
         if (!path.getFileSystem().isDistributedFS()) {
            Path localPath = new Path(path.getPath());
            Tuple2<Path, Long> remoteFileInfo =
               Utils.uploadLocalFileToRemote(fs, appId.toString(), localPath, homeDir, entry.getKey());
            jobGraph.setUserArtifactRemotePath(entry.getKey(), remoteFileInfo.f0.toString());


   //Indicates the resource file required to start the appMaster, which will be downloaded from hdfs
   final Map<String, LocalResource> localResources = new HashMap<>(2 + systemShipFiles.size() + userJarFiles.size());
   // Security settings for accessing hdfs
   final List<Path> paths = new ArrayList<>(2 + systemShipFiles.size() + userJarFiles.size());
   // Resource files required to start taskExecutor
   StringBuilder envShipFileList = new StringBuilder();

   //Several uploadAndRegisterFiles methods upload systemShipFiles, shipOnlyFiles and user jar to hdfs

   if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {

   // normalize classpath by sorting
   Collections.sort(systemClassPaths); //Some classpath sorting of the system
   Collections.sort(userClassPaths); //User classpath sorting

   // classPathBuilder: store classpath information
   StringBuilder classPathBuilder = new StringBuilder();
      * Build files: user, log file, log path

   final Path yarnFilesDir = getYarnFilesDir(appId);
   FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
   fs.setPermission(yarnFilesDir, permission); // set permission for path.

    *A pile of safety related configurations in the middle

  //Start YarnJobClusterEntrypoint according to the executed java command information 
   final ContainerLaunchContext amContainer = setupApplicationMasterContainer(

   if (UserGroupInformation.isSecurityEnabled()) {
      // set HDFS delegation tokens when security is enabled
      LOG.info("Adding delegation token to the AM container.");
      Utils.setTokensFor(amContainer, paths, yarnConfiguration);


   // Setup CLASSPATH and environment variables for ApplicationMaster
   final Map<String, String> appMasterEnv = new HashMap<>();
    * Configure the environment variable parameters into appMasterEnv, which is used when starting YarnJobClusterEntrypoint,
    * For example: classpath, Hadoop user, appId, etc


    // There is also a pile of configuration information for setting the submission task queue and yarn task name

   // add a hook to clean up in case deployment fails
   Thread deploymentFailureHook = new DeploymentFailureHook(yarnApplication, yarnFilesDir);
   LOG.info("Submitting application master " + appId);
   //Submit task  

    *  Get task status

The process of this part is relatively long. To sum up, it mainly includes the following points:

  1. shipFiles, plugins, userJar, logFile, flick-conf.yaml, job Upload files such as graph to hdfs
  2. Build classpath, ha ZK configuration, security configuration and jobMaster startup command required for startup
  3. Submit task to yarn

After successful startup on yarn, you can see launch in the working directory of JobMaster_ container. SH is a file that contains all the environment variable parameter settings and startup commands made in startAppMaster.


This article mainly introduces the task submission process of yarn per job. Combined with the analysis of the previous two articles, we should now master how to submit tasks through API. I think there are two important points: one is to analyze and configure parameters, and the other is to select an appropriate PipelineExecutor to submit tasks.

Posted by jbachris on Mon, 18 Apr 2022 14:49:51 +0930