Flink SQL Client
Flink's Table & SQL API makes it possible to work with queries written in the SQL language, but these queries need to be embedded within a table program that is written in either Java or Scala. Moreover, these programs need to be packaged with a build tool before being submitted to a cluster. This more or less limits the usage of Flink to Java/Scala programmers.
The SQL Client aims to provide an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of Java or Scala code. The SQL Client CLI allows for retrieving and visualizing real-time results from the running distributed application on the command line.
from https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sqlClient.html
The general meaning of the original text is that the interface between Table and SQL provided by Flink can only be embedded and used by users in the way of writing programs. There is no problem for programs with a high degree of customization, but most of the programs we use everyday are relatively simple and the query logic is a lot of repetition, so it is more meaningful to provide a special SQL client.
In fact, this is similar to Hive and Pig in the early stage and SparkSQL in the later stage. It provides an interactive way, even as an independent process [hiveserver2] (some kind of protocol) of SQL service. However, the current version (1.12.1) provides embedded mode, and Flink official documents are also included in the later plan of SQL Client
Limitations & Future
The current SQL Client only supports embedded mode. In the future, the community plans to extend its functionality by providing a REST-based SQL Client Gateway, see more in FLIP-24 and FLIP-91.
from https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sqlClient.html
However, with the advantages of Flink processing architecture and the current hot community, Flink SQL Client can be expected in the future.
Why use SQL Client
As mentioned in the previous article, there is a small cluster (actually a micro cluster) built by Ambari in group X of the company, 😂), It's OK to use the built-in HiveSQL for some daily processing, but because the architect plans to redo the new computing system with Flink, I'll do it here during the research. In fact, in essence, Flink SQL also inherits its Stream based idea. The set data is an unbounded unbounded Stream, and a long term running SQL pipeline can be designed based on the configuration file. For example, it can do some ETL work
Operating environment
Has Flink been deployed in the Ambari configured cluster? Previously, Flink jobs were run in an external way, that is, to find another network connected client machine. So hive is nothing. According to the official documents, what is needed is
to configure
- copy online configuration file hive site XML to XXX / hive conf directory
hive-site.xml mainly defines the service URLs of metastore and hive metastore. Flink can get the Catalog information when executing SQL.
Demo from https://www.cnblogs.com/qiu-hua/p/13938738.html
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://hadoop102:3306/metastore?createDatabaseIfNotExist=true</value> <description>JDBC connect string for a JDBC metastore</description> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>com.mysql.cj.jdbc.Driver</value> <description>Driver class name for a JDBC metastore</description> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>root</value> <description>username to use against metastore database</description> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>123456</value> <description>password to use against metastore database</description> </property> <property> <name>hive.metastore.warehouse.dir</name> <value>/user/hive/warehouse</value> <description>location of default database for the warehouse</description> </property> <property> <name>hive.cli.print.header</name> <value>true</value> </property> <property> <name>hive.cli.print.current.db</name> <value>true</value> </property> <property> <name>hive.cli.print.current.db</name> <value>true</value> </property> <property> <name>hive.metastore.schema.verification</name> <value>false</value> </property> <property> <name>hive.server2.thrift.bind.host</name> <value>192.168.1.122</value> </property> <property> <name>hive.metastore.event.db.notification.api.auth</name> <value>false</value> </property> <property> <name>datanucleus.schema.autoCreateAll</name> <value>true</value> </property> <property> <name>hive.metastore.uris</name> <value>thrift://localhost:9083</value> <!-- Metastore's IP address on pc -- > </property> <!--Spark Location dependent (Note: port number 8020 must be the same as namenode (consistent port number)--> </configuration>
- copy the relevant jar packages of hive deployed online to XXX / fly hive jars
libfb303-0.9.3.jar hive-metastore-3.1.0.3.1.4.0-315.jar hive-exec-3.1.0.3.1.4.0-315.jar
- Download the flink hive connector and put it in the XXX / Flink hive jars directory
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.12/1.12.1/flink-connector-hive_2.12-1.12.1.jar
- Environment profile for Flink Sql
In flash install dir / conf / SQL client defaults Yaml, you can copy SQL client defaults yaml -> sql-client-hive.yaml
function
The example given in Flink's official document is a cluster running in standalone mode, which I don't need to try to configure.
The SQL Client scripts are also located in the binary directory of Flink. In the future, a user will have two possibilities of starting the SQL Client CLI either by starting an embedded standalone process or by connecting to a remote SQL Client Gateway. At the moment only the embedded mode is supported. You can start the CLI by calling:
./bin/sql-client.sh embedded
Here, I use my own configured environment file to specify the jar file directory required by flink SQL
/bin/sql-client.sh embedded -d conf/sql-client-hive.yaml -l xxx/flink-hive-jars Command history file path: /root/.flink-sql-history ▒▓██▓██▒ ▓████▒▒█▓▒▓███▓▒ ▓███▓░░ ▒▒▒▓██▒ ▒ ░██▒ ▒▒▓▓█▓▓▒░ ▒████ ██▒ ░▒▓███▒ ▒█▒█▒ ░▓█ ███ ▓░▒██ ▓█ ▒▒▒▒▒▓██▓░▒░▓▓█ █░ █ ▒▒░ ███▓▓█ ▒█▒▒▒ ████░ ▒▓█▓ ██▒▒▒ ▓███▒ ░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░ ▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒ ███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒ ░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒ ███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░ ██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓ ▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒ ▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒ ▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█ ██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █ ▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓ █▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓ ██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓ ▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒ ██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒ ▓█ ▒█▓ ░ █░ ▒█ █▓ █▓ ██ █░ ▓▓ ▒█▓▓▓▒█░ █▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█ ██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓ ▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██ ░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓ ░▓██▒ ▓░ ▒█▓█ ░░▒▒▒ ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░ ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Flink SQL>
Because I didn't know the operation mode of Flink SQL at first, direct submission will report an error. Viewing the log is an error when connecting to Flink Cluster Rest Client, so you need to start Flink Cluster
### sql client error [ERROR] Could not execute SQL statement. Reason: java.net.ConnectException: Connection refused ### Error reported in log ~ Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:8081 Caused by: java.net.ConnectException: Connection refused
Start Cluster
./bin/yarn-session.sh -n4 -jm1024 -tm 4096 -s 2
You need to re-enter the Sql client Execute the official example, select 'hello world';
SQL Query Result (Table) Table program finished. Page: Last of 1 Updated: 10:54:00.624 EXPR$0 hello world Q Quit + Inc Refresh G Goto Page N Next Page O Open Row R Refresh - Dec Refresh L Last Page P Prev Page
Problems encountered
- The original intention of using Flink SQL here is to use the Batch query table. Like hive, the SQL Client environment file is configured with "execution" type = Batch .
SQL Query Result (Table) Table program finished. Page: Last of 1 Updated: 10:59:23.426 c f 11 20210105 1 20210105 3 20210105 1 20210105 1 20210105 19 20210105 1 20210105 9 20210105 5 20210105 7 20210105 Q Quit + Inc Refresh G Goto Page N Next Page O Open Row R Refresh - Dec Refresh L Last Page P Prev Page
Make some settings on the command line, and you will find Metastore related errors ~
SET execution.result-mode=tableau; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive Metastore client