The common submission methods of Flink tasks are submitted through the flink run command. If we want to submit tasks through API, we need to understand the execution process of flink run. This article mainly analyzes its submission process through the source code. (Note: Based on 1.10.1 analysis)
Submission entry
Check the bin/flink script and you can see that the submission entry class is: org apache. flink. client. cli. Clifrontend, the parameter passed in is the parameter after the Flink command. Check the main method:
public static void main(final String[] args) { EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args); // 1. $FLINK_HOME/conf final String configurationDirectory = getConfigurationDirectoryFromEnv(); // 2. Load flink-conf.yaml final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory); // 3. Initialize the parameter resolver of all submission modes final List<CustomCommandLine> customCommandLines = loadCustomCommandLines( configuration, configurationDirectory); try { //Initialize execution entry final CliFrontend cli = new CliFrontend( configuration, customCommandLines); SecurityUtils.install(new SecurityConfiguration(cli.configuration)); int retCode = SecurityUtils.getInstalledContext() //parseParameters will execute different processes according to different types: run, info, list, modify, etc .runSecured(() -> cli.parseParameters(args)); System.exit(retCode); } catch (Throwable t) { final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class); LOG.error("Fatal error while running command line interface.", strippedThrowable); strippedThrowable.printStackTrace(); System.exit(31); } }
CustomCommandLine represents an interface for parameter resolution of the command line. In fact, there are FlinkYarnSessionCli and DefaultCLI. FlinkYarnSessionCli resolves per job or session mode parameters, and DefaultCLI resolves standalone mode parameters. Base note, the program will select the appropriate parser based on the parameter options, match it through its isActive method, and then call applyCommandLineOptionsToConfiguration parsing parameter.
RUN process
protected void run(String[] args) throws Exception { LOG.info("Running 'run' command."); //savepoint recovery parameters final Options commandOptions = CliFrontendParser.getRunCommandOptions(); //Encapsulate parameters in CommandLine final CommandLine commandLine = getCommandLine(commandOptions, args, true); //The instance is a ProgramOptions object, which contains jar path, user program entry class, user program parameters, classpath, etc final ProgramOptions programOptions = new ProgramOptions(commandLine); // Help command if (commandLine.hasOption(HELP_OPTION.getOpt())) { CliFrontendParser.printHelpForRun(customCommandLines); return; } if (!programOptions.isPython()) { // Java program should be specified a JAR file if (programOptions.getJarFilePath() == null) { throw new CliArgsException("Java program should be specified a JAR file."); } } //Represents the program, including jar, parameters and other information final PackagedProgram program; try { LOG.info("Building program from JAR file"); program = buildProgram(programOptions); } catch (FileNotFoundException e) { throw new CliArgsException("Could not build the program from JAR file.", e); } //The jar information required by the program is mainly the user jar package final List<URL> jobJars = program.getJobJarAndDependencies(); //Get valid configuration information. Here, the parser will get valid configuration information according to different parameters final Configuration effectiveConfiguration = getEffectiveConfiguration(commandLine, programOptions, jobJars); LOG.debug("Effective executor configuration: {}", effectiveConfiguration); try { executeProgram(effectiveConfiguration, program); } finally { program.deleteExtractedLibraries(); } }
In the getEffectiveConfiguration method, different parameter parsers will be selected according to the parameters. For example, if - M yarn cluster is used in per job mode, the FlinkYarnSessionCli parameter parser will be selected. In this process, there is an important parameter configuration: execution Target, the target executor, determines the following types of executors to submit tasks: yarn session, yarn per job and remote. The configuration of this parameter is also configured through different submission modes.
Execute Program process
The executeProgram method directly calls clientutils executeProgram method:
public static void executeProgram( PipelineExecutorServiceLoader executorServiceLoader, Configuration configuration, PackagedProgram program) throws ProgramInvocationException { checkNotNull(executorServiceLoader); final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader(); final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(userCodeClassLoader); LOG.info("Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED)); //User created context for program execution ContextEnvironmentFactory factory = new ContextEnvironmentFactory( executorServiceLoader, configuration, userCodeClassLoader); //The factory is assigned to a variable in the ExecutionEnvironment ContextEnvironment.setAsContext(factory); try { //Calling program main method program.invokeInteractiveModeForExecution(); } finally { ContextEnvironment.unsetContext(); } } finally { Thread.currentThread().setContextClassLoader(contextClassLoader); } }
PipelineExecutorServiceLoader user Executor executor selection, refer to Flink1.10 task submission and SPI mechanism based on factory mode;
ContextEnvironmentFactory is used to create a context execution environment for program execution. It can be understood that it encapsulates the interaction mode between the program and the outside world, such as per job mode or standalone mode, the required resource size, etc. at the same time, it will also create different streamexecutionenvironments according to their types (see details below). For the client submission method, an ExecutionEnvironment of type ContextEnvironment is created.
Main submission process
program.invokeInteractiveModeForExecution method the user calls the main method of the user program, in which the StreamExecutionEnvironment Getexecutionenvironment get the appropriate StreamExecutionEnvironment:
//StreamExecutionEnvironment.java public static StreamExecutionEnvironment getExecutionEnvironment() { //threadLocalContextEnvironmentFactory and contextEnvironmentFactory are empty by default, so createStreamExecutionEnvironment method will be called return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory) .map(StreamExecutionEnvironmentFactory::createExecutionEnvironment) .orElseGet(StreamExecutionEnvironment::createStreamExecutionEnvironment); } private static StreamExecutionEnvironment createStreamExecutionEnvironment() { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); if (env instanceof ContextEnvironment) { return new StreamContextEnvironment((ContextEnvironment) env); } else if (env instanceof OptimizerPlanEnvironment) { return new StreamPlanEnvironment(env); } else { return createLocalEnvironment(); } }
//ExecutionEnvironment.java public static ExecutionEnvironment getExecutionEnvironment() { return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory) .map(ExecutionEnvironmentFactory::createExecutionEnvironment) //In the local mode, create the LocalEnvironment .orElseGet(ExecutionEnvironment::createLocalEnvironment); }
In clientutils According to the analysis in the executeprogram, it will pass the ContextEnvironment Setascontext (factory) assigns values to threadLocalContextEnvironment Factory and contextEnvironmentFactory, then contextEnvironmentFactory is called Createexecutionenvironment gets a ContextEnvironment.
Final StreamExecutionEnvironment Getexecutionenvironment gets a StreamExecutionEnvironment object that internally encapsulates the ContextEnvironment object.
Execute process
After the main method executes the user code process, it will call streamexecutionenvironment Execute method, and then the executeAsync(StreamGraph) method will be called:
public JobClient executeAsync(StreamGraph streamGraph) throws Exception { checkNotNull(streamGraph, "StreamGraph cannot be null."); checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file."); //Select the matching factory based on the submission mode final PipelineExecutorFactory executorFactory = executorServiceLoader.getExecutorFactory(configuration); checkNotNull( executorFactory, "Cannot find compatible factory for specified execution.target (=%s)", configuration.get(DeploymentOptions.TARGET)); //Select the appropriate executor to submit the task CompletableFuture<? extends JobClient> jobClientFuture = executorFactory .getExecutor(configuration) .execute(streamGraph, configuration); try { JobClient jobClient = jobClientFuture.get(); jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null)); return jobClient; } catch (Throwable t) { jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, t)); ExceptionUtils.rethrow(t); // make javac happy, this code path will not be reached return null; } }
This is what was mentioned in the previous article. Load all pipelineexecutorfactories according to the SPI mechanism, and then select the matching factory. The matching condition is to meet the execution mentioned above The factory of target parameter is YarnJobClusterExecutorFactory for yarnjobper job. Finally, the Executor of YarnJobClusterExecutor type will be obtained to submit jobs to yarn.
summary
This paper mainly analyzes the process from the beginning of the flick run to the submission to the cluster. I think it can be simplified into three steps:
- Select the appropriate parameter parser to parse the command parameters (CustomCommandLine);
- Select the appropriate execution context (stream execution environment)
- Select the appropriate pipeline executor
The next article will analyze the specific submission process of the yarn per job submission model.