Flink1.10 task submission process analysis

The common submission methods of Flink tasks are submitted through the flink run command. If we want to submit tasks through API, we need to understand the execution process of flink run. This article mainly analyzes its submission process through the source code. (Note: Based on 1.10.1 analysis)

Submission entry

Check the bin/flink script and you can see that the submission entry class is: org apache. flink. client. cli. Clifrontend, the parameter passed in is the parameter after the Flink command. Check the main method:

public static void main(final String[] args) {
   EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
   // 1. $FLINK_HOME/conf
   final String configurationDirectory = getConfigurationDirectoryFromEnv();
   // 2. Load flink-conf.yaml
   final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);
   // 3. Initialize the parameter resolver of all submission modes
   final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(
   try {
     //Initialize execution entry
      final CliFrontend cli = new CliFrontend(
      SecurityUtils.install(new SecurityConfiguration(cli.configuration));
      int retCode = SecurityUtils.getInstalledContext()

        //parseParameters will execute different processes according to different types: run, info, list, modify, etc
            .runSecured(() -> cli.parseParameters(args));
   catch (Throwable t) {
      final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
      LOG.error("Fatal error while running command line interface.", strippedThrowable);

CustomCommandLine represents an interface for parameter resolution of the command line. In fact, there are FlinkYarnSessionCli and DefaultCLI. FlinkYarnSessionCli resolves per job or session mode parameters, and DefaultCLI resolves standalone mode parameters. Base note, the program will select the appropriate parser based on the parameter options, match it through its isActive method, and then call applyCommandLineOptionsToConfiguration parsing parameter.

RUN process

protected void run(String[] args) throws Exception {
   LOG.info("Running 'run' command.");
   //savepoint recovery parameters
   final Options commandOptions = CliFrontendParser.getRunCommandOptions();
   //Encapsulate parameters in CommandLine
   final CommandLine commandLine = getCommandLine(commandOptions, args, true);
   //The instance is a ProgramOptions object, which contains jar path, user program entry class, user program parameters, classpath, etc
   final ProgramOptions programOptions = new ProgramOptions(commandLine);
   // Help command
   if (commandLine.hasOption(HELP_OPTION.getOpt())) {

   if (!programOptions.isPython()) {
      // Java program should be specified a JAR file
      if (programOptions.getJarFilePath() == null) {
         throw new CliArgsException("Java program should be specified a JAR file.");
  //Represents the program, including jar, parameters and other information
   final PackagedProgram program;
   try {
      LOG.info("Building program from JAR file");
      program = buildProgram(programOptions);
   catch (FileNotFoundException e) {
      throw new CliArgsException("Could not build the program from JAR file.", e);
  //The jar information required by the program is mainly the user jar package
   final List<URL> jobJars = program.getJobJarAndDependencies();
  //Get valid configuration information. Here, the parser will get valid configuration information according to different parameters
   final Configuration effectiveConfiguration =
         getEffectiveConfiguration(commandLine, programOptions, jobJars);

   LOG.debug("Effective executor configuration: {}", effectiveConfiguration);

   try {
      executeProgram(effectiveConfiguration, program);
   } finally {

In the getEffectiveConfiguration method, different parameter parsers will be selected according to the parameters. For example, if - M yarn cluster is used in per job mode, the FlinkYarnSessionCli parameter parser will be selected. In this process, there is an important parameter configuration: execution Target, the target executor, determines the following types of executors to submit tasks: yarn session, yarn per job and remote. The configuration of this parameter is also configured through different submission modes.

Execute Program process

The executeProgram method directly calls clientutils executeProgram method:

public static void executeProgram(
      PipelineExecutorServiceLoader executorServiceLoader,
      Configuration configuration,
      PackagedProgram program) throws ProgramInvocationException {
   final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
   final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
   try {

      LOG.info("Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED));
     //User created context for program execution
      ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
     //The factory is assigned to a variable in the ExecutionEnvironment

      try {
        //Calling program main method
      } finally {
   } finally {

PipelineExecutorServiceLoader user Executor executor selection, refer to Flink1.10 task submission and SPI mechanism based on factory mode

ContextEnvironmentFactory is used to create a context execution environment for program execution. It can be understood that it encapsulates the interaction mode between the program and the outside world, such as per job mode or standalone mode, the required resource size, etc. at the same time, it will also create different streamexecutionenvironments according to their types (see details below). For the client submission method, an ExecutionEnvironment of type ContextEnvironment is created.

Main submission process

program.invokeInteractiveModeForExecution method the user calls the main method of the user program, in which the StreamExecutionEnvironment Getexecutionenvironment get the appropriate StreamExecutionEnvironment:

public static StreamExecutionEnvironment getExecutionEnvironment() {

  //threadLocalContextEnvironmentFactory and contextEnvironmentFactory are empty by default, so createStreamExecutionEnvironment method will be called
   return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)

private static StreamExecutionEnvironment createStreamExecutionEnvironment() {

   ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
   if (env instanceof ContextEnvironment) {
      return new StreamContextEnvironment((ContextEnvironment) env);
   } else if (env instanceof OptimizerPlanEnvironment) {
      return new StreamPlanEnvironment(env);
   } else {
      return createLocalEnvironment();
public static ExecutionEnvironment getExecutionEnvironment() {
   return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
       //In the local mode, create the LocalEnvironment

In clientutils According to the analysis in the executeprogram, it will pass the ContextEnvironment Setascontext (factory) assigns values to threadLocalContextEnvironment Factory and contextEnvironmentFactory, then contextEnvironmentFactory is called Createexecutionenvironment gets a ContextEnvironment.

Final StreamExecutionEnvironment Getexecutionenvironment gets a StreamExecutionEnvironment object that internally encapsulates the ContextEnvironment object.

Execute process

After the main method executes the user code process, it will call streamexecutionenvironment Execute method, and then the executeAsync(StreamGraph) method will be called:

public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
   checkNotNull(streamGraph, "StreamGraph cannot be null.");
   checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");
   //Select the matching factory based on the submission mode
   final PipelineExecutorFactory executorFactory =

      "Cannot find compatible factory for specified execution.target (=%s)",
  //Select the appropriate executor to submit the task
   CompletableFuture<? extends JobClient> jobClientFuture = executorFactory
      .execute(streamGraph, configuration);

   try {
      JobClient jobClient = jobClientFuture.get();
      jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
      return jobClient;
   } catch (Throwable t) {
      jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, t));

      // make javac happy, this code path will not be reached
      return null;

This is what was mentioned in the previous article. Load all pipelineexecutorfactories according to the SPI mechanism, and then select the matching factory. The matching condition is to meet the execution mentioned above The factory of target parameter is YarnJobClusterExecutorFactory for yarnjobper job. Finally, the Executor of YarnJobClusterExecutor type will be obtained to submit jobs to yarn.


This paper mainly analyzes the process from the beginning of the flick run to the submission to the cluster. I think it can be simplified into three steps:

  • Select the appropriate parameter parser to parse the command parameters (CustomCommandLine);
  • Select the appropriate execution context (stream execution environment)
  • Select the appropriate pipeline executor

The next article will analyze the specific submission process of the yarn per job submission model.

Posted by redRemedy on Mon, 18 Apr 2022 14:44:45 +0930