Configuration and use of Flink essay SqlClient

Flink SQL Client

 

Flink's Table & SQL API makes it possible to work with queries written in the SQL language, but these queries need to be embedded within a table program that is written in either Java or Scala. Moreover, these programs need to be packaged with a build tool before being submitted to a cluster. This more or less limits the usage of Flink to Java/Scala programmers.

The SQL Client aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of Java or Scala code. The SQL Client CLI allows for retrieving and visualizing real-time results from the running distributed application on the command line.

from https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sqlClient.html

The general meaning of the original text is that the interface between Table and SQL provided by Flink can only be embedded and used by users in the way of writing programs. There is no problem for programs with a high degree of customization, but most of the programs we use everyday are relatively simple and the query logic is a lot of repetition, so it is more meaningful to provide a special SQL client.  

In fact, this is similar to Hive and Pig in the early stage and SparkSQL in the later stage. It provides an interactive way, even as an independent process [hiveserver2] (some kind of protocol) of SQL service. However, the current version (1.12.1) provides embedded mode, and Flink official documents are also included in the later plan of SQL Client

Limitations & Future

The current SQL Client only supports embedded mode. In the future, the community plans to extend its functionality by providing a REST-based SQL Client Gateway, see more in FLIP-24 and FLIP-91.

from https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sqlClient.html

However, with the advantages of Flink processing architecture and the current hot community, Flink SQL Client can be expected in the future.

Why use SQL Client

As mentioned in the previous article, there is a small cluster (actually a micro cluster) built by Ambari in group X of the company, 😂), It's OK to use the built-in HiveSQL for some daily processing, but because the architect plans to redo the new computing system with Flink, I'll do it here during the research. In fact, in essence, Flink SQL also inherits its Stream based idea. The set data is an unbounded unbounded Stream, and a long term running SQL pipeline can be designed based on the configuration file. For example, it can do some ETL work

 

Operating environment

Has Flink been deployed in the Ambari configured cluster? Previously, Flink jobs were run in an external way, that is, to find another network connected client machine. So hive is nothing. According to the official documents, what is needed is

to configure

  • copy online configuration file hive site XML to XXX / hive conf directory

hive-site.xml mainly defines the service URLs of metastore and hive metastore. Flink can get the Catalog information when executing SQL.  

Demo from https://www.cnblogs.com/qiu-hua/p/13938738.html

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
        <property>
            <name>javax.jdo.option.ConnectionURL</name>
            <value>jdbc:mysql://hadoop102:3306/metastore?createDatabaseIfNotExist=true</value>
            <description>JDBC connect string for a JDBC metastore</description>
        </property>

        <property>
            <name>javax.jdo.option.ConnectionDriverName</name>
            <value>com.mysql.cj.jdbc.Driver</value>
            <description>Driver class name for a JDBC metastore</description>
        </property>

        <property>
            <name>javax.jdo.option.ConnectionUserName</name>
            <value>root</value>
            <description>username to use against metastore database</description>
        </property>

        <property>
            <name>javax.jdo.option.ConnectionPassword</name>
            <value>123456</value>
            <description>password to use against metastore database</description>
        </property>

    <property>
         <name>hive.metastore.warehouse.dir</name>
         <value>/user/hive/warehouse</value>
         <description>location of default database for the warehouse</description>
    </property>

    <property>
        <name>hive.cli.print.header</name>
        <value>true</value>
    </property>

    <property>
        <name>hive.cli.print.current.db</name>
        <value>true</value>
    </property>

    <property>
        <name>hive.cli.print.current.db</name>
        <value>true</value>
    </property>

    <property>
        <name>hive.metastore.schema.verification</name>
        <value>false</value>
    </property>

    <property>
        <name>hive.server2.thrift.bind.host</name>
         <value>192.168.1.122</value>
    </property>

<property>
        <name>hive.metastore.event.db.notification.api.auth</name>
        <value>false</value>
 </property>


    <property>
        <name>datanucleus.schema.autoCreateAll</name>
        <value>true</value>
    </property>


    <property>
            <name>hive.metastore.uris</name>
            <value>thrift://localhost:9083</value> <!--  Metastore's IP address on pc -- >
    </property>


<!--Spark Location dependent (Note: port number 8020 must be the same as namenode (consistent port number)-->

</configuration>
  • copy the relevant jar packages of hive deployed online to XXX / fly hive jars
libfb303-0.9.3.jar
hive-metastore-3.1.0.3.1.4.0-315.jar
hive-exec-3.1.0.3.1.4.0-315.jar
  • Download the flink hive connector and put it in the XXX / Flink hive jars directory
wget  https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.12/1.12.1/flink-connector-hive_2.12-1.12.1.jar

 

  • Environment profile for Flink Sql

In flash install dir / conf / SQL client defaults Yaml, you can copy SQL client defaults yaml ->  sql-client-hive.yaml

 

function

The example given in Flink's official document is a cluster running in standalone mode, which I don't need to try to configure.

The SQL Client scripts are also located in the binary directory of Flink. In the future, a user will have two possibilities of starting the SQL Client CLI either by starting an embedded standalone process or by connecting to a remote SQL Client Gateway. At the moment only the embedded mode is supported. You can start the CLI by calling:

./bin/sql-client.sh embedded

Here, I use my own configured environment file to specify the jar file directory required by flink SQL

/bin/sql-client.sh embedded -d conf/sql-client-hive.yaml -l xxx/flink-hive-jars

Command history file path: /root/.flink-sql-history
                                   ▒▓██▓██▒
                               ▓████▒▒█▓▒▓███▓▒
                            ▓███▓░░        ▒▒▒▓██▒  ▒
                          ░██▒   ▒▒▓▓█▓▓▒░      ▒████
                          ██▒         ░▒▓███▒    ▒█▒█▒
                            ░▓█            ███   ▓░▒██
                              ▓█       ▒▒▒▒▒▓██▓░▒░▓▓█
                            █░ █   ▒▒░       ███▓▓█ ▒█▒▒▒
                            ████░   ▒▓█▓      ██▒▒▒ ▓███▒
                         ░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░
                   ▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒
                  ███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒
                ░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒
               ███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░
              ██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓
           ▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒
           ▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒
           ▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓█
           ██▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █
          ▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓
          █▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓
          ██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓
          ▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒
           ██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒
           ▓█   ▒█▓   ░     █░                ▒█              █▓
            █▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░
             █▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒█
              ██   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓
               ▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██
                ░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓
                  ░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒
                      ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░

    ______ _ _       _       _____  ____  _         _____ _ _            _  BETA
   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |
   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_
   |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_
   |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|

        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.


Flink SQL>

 

Because I didn't know the operation mode of Flink SQL at first, direct submission will report an error. Viewing the log is an error when connecting to Flink Cluster Rest Client, so you need to start Flink Cluster

### sql client error    
[ERROR] Could not execute SQL statement. Reason:
java.net.ConnectException: Connection refused


### Error reported in log ~
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:8081
Caused by: java.net.ConnectException: Connection refused

Start Cluster

./bin/yarn-session.sh -n4 -jm1024 -tm 4096 -s 2

You need to re-enter the Sql client Execute the official example, select 'hello world';

                                                                       SQL Query Result (Table)
 Table program finished.                                                    Page: Last of 1                                                      Updated: 10:54:00.624

                    EXPR$0
               hello world




Q Quit                           + Inc Refresh                    G Goto Page                      N Next Page                      O Open Row
R Refresh                        - Dec Refresh                    L Last Page                      P Prev Page

 

Problems encountered

  • The original intention of using Flink SQL here is to use the Batch query table. Like hive, the SQL Client environment file is configured with "execution" type = Batch .   
                                                                       SQL Query Result (Table)
 Table program finished.                                                    Page: Last of 1                                                      Updated: 10:59:23.426

                         c                         f
                        11                  20210105
                         1                  20210105
                         3                  20210105
                         1                  20210105
                         1                  20210105
                        19                  20210105
                         1                  20210105
                         9                  20210105
                         5                  20210105
                         7                  20210105





Q Quit                           + Inc Refresh                    G Goto Page                      N Next Page                      O Open Row
R Refresh                        - Dec Refresh                    L Last Page                      P Prev Page

 

Make some settings on the command line, and you will find Metastore related errors ~

SET execution.result-mode=tableau;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive Metastore client

 

 

 

 

 

Posted by finkrattaz on Sat, 16 Apr 2022 07:43:00 +0930