Flink SQL reads and writes Hudi through Hudi HMS Catalog and synchronizes Hive tables (this method is strongly recommended)

A few days ago, I found a giant cow's artificial intelligence learning website. It is easy to understand, funny and humorous. I can't help but share it with you. Click to jump to the website: https://www.captainai.net/dongkelun

foreword

Previous article Flink SQL to operate Hudi and synchronize Hive use summary Summarized how to use Flink SQL to read and write Hudi and synchronize Hive, and introduced various ways to create tables, but each method is not perfect. This article introduces a relatively perfect way to read and write Hudi and synchronize through Hudi HMS Catalog. Hive table, the Hudi HMS Catalog here is actually implemented through the HoodieHiveCatalog mentioned at the end of the previous article, PR: [HUDI-4098]Support HMS for flink HudiCatalog , merged on July 18, 2022, that is, support from Hudi 0.12.0 version (I confirmed that 0.11.1 version is not available), if you want to use it, you must upgrade to 0.12.0+, this article uses Hudi master 0.13.0 -SNAPSHOT.

Benefits of Flink Hudi HMS Catalog

Since this method is recommended, let’s talk about its benefits first. The advantage is that it can directly build tables in Hive like Spark SQL creates tables, and the table structure is compatible with Hive SQL and Spark SQL, that is, tables created in Flink Hudi HMS Catalog can use Flink SQL, Hive SQL, Spark SQL query, you can also use Flink SQL and Spark SQL to write Hudi at the same time. Unlike the method described in the previous article, Flink SQL writes Hudi tables that cannot be used by Hive/Spark, only by synchronizing tables. In addition, in the Flink Hudi HMS Catalog, like Spark SQL, synchronous Hive is enabled by default, that is, the corresponding _ro table and _rt table will be created synchronously for the MOR table by default. As for the COW table, because the synchronized table name is the same as the created table name, so Read and write are the same table. In short, it is consistent with Spark SQL to create tables, read and write.

Version

Flink 1.14.3 Hudi master 0.13.0-SNAPSHOT.

This article uses the Flink yarn-session mode, if not, you can refer to: Flink SQL client query Hive configuration and problem solving

Create Flink Hudi HMS Catalog

Let's take a look at how to create a Flink Hudi HMS Catalog

CREATE CATALOG hudi_catalog WITH (
    'type' = 'hudi',
    'mode' = 'hms',
    'default-database' = 'default',
    'hive.conf.dir' = '/usr/hdp/3.1.0.0-78/hive/conf',
    'table.external' = 'true'
);

## In fact, it is to create a database test_flink in Hive
create database if not exists hudi_catalog.test_flink;
## Switch to database test_flink
use hudi_catalog.test_flink;
copy

Supported configuration items:

catalog.path
default-database
hive.conf.dir
# Optional hms, dfs
mode
property-version
# Version 0.12.1 should not be supported yet, you need to pull the latest master code yourself, PR support: https://github.com/apache/hudi/pull/6923
# Whether it is an external table, the default is false, that is, the default internal table
# 0.12.0 and 0.12.1 do not have this configuration item, only external tables
table.external
copy

You can see that the configuration items are similar to the hive catalog, except that the type is hudi, where the mode must be hms, and the default value is dfs. As for why it is hms, please see the source code analysis below. Another point to note is that the configuration item in the hive catalog is hive-conf-dir, but hudi's is hive.conf.dir, which looks similar, but is actually different. table.external: Whether it is an external table, the default is false, that is, the default internal table, but 0.12.0 and 0.12.1 do not have this configuration item, they can only be external tables, which is what I use Hudi master 0.13.0-SNAPSHOT reason If you think this configuration is not necessary, you can use 0.12.1 directly

why mode is hms

public Catalog createCatalog(Context context) {
  final FactoryUtil.CatalogFactoryHelper helper =
      FactoryUtil.createCatalogFactoryHelper(this, context);
  helper.validate();
  String mode = helper.getOptions().get(CatalogOptions.MODE);
  switch (mode.toLowerCase(Locale.ROOT)) {
    case "hms":
      return new HoodieHiveCatalog(
          context.getName(),
          (Configuration) helper.getOptions());
    case "dfs":
      return new HoodieCatalog(
          context.getName(),
          (Configuration) helper.getOptions());
    default:
      throw new HoodieCatalogException(String.format("Invalid catalog mode: %s, supported modes: [hms, dfs].", mode));
  }
}

public static final ConfigOption<String> MODE = ConfigOptions
    .key("mode")
    .stringType()
    .defaultValue("dfs");
copy

You can see that the default value of mode is dfs, and HoodieHiveCatalog will be used only when mode is hms

MOR table

build table

CREATE TABLE test_hudi_flink_mor (
  id int PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  price int,
  ts int,
  dt VARCHAR(10)
)
PARTITIONED BY (dt)
WITH (
  'connector' = 'hudi',
  'path' = '/tmp/hudi/test_hudi_flink_mor',
  'table.type' = 'MERGE_ON_READ',
  'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
  'hoodie.datasource.write.recordkey.field' = 'id',
  'hoodie.datasource.write.hive_style_partitioning' = 'true',
  'hive_sync.conf.dir'='/usr/hdp/3.1.0.0-78/hive/conf'
);
copy

When using the catalog, the path does not need to be specified. If it is not specified, the path is the Hive library path + table name. You can see the Cow table below.

It should be noted here that although Hive-related configuration does not need to be configured to synchronize, that is, it will be synchronized by default, but hive_sync.conf.dir still needs to be configured, otherwise the same exception as in the previous article will still be reported: WARN hive.metastore [] - set_ugi() not successful, Likely cause: new client talking to old server. Continuing without it. org.apache.thrift.transport.TTransportException: null In fact, I think it is unreasonable here, because hive.conf.dir has been configured in the catalog, and the two can actually be shared.

At this time, the table has been built in the corresponding Hive database, and the table structure is compatible with Hive, Spark and Flink at the same time, that is, it can be queried with Hive SQL, and can also be read and written with Spark SQL and Flink SQL

show create table test_hudi_flink_mor;
## You can verify by yourself that table.external is in effect
+----------------------------------------------------+
|                   createtab_stmt                   |
+----------------------------------------------------+
| CREATE TABLE `test_hudi_flink_mor`(                |
|   `_hoodie_commit_time` string,                    |
|   `_hoodie_commit_seqno` string,                   |
|   `_hoodie_record_key` string,                     |
|   `_hoodie_partition_path` string,                 |
|   `_hoodie_file_name` string,                      |
|   `id` int,                                        |
|   `name` string,                                   |
|   `price` int,                                     |
|   `ts` int)                                        |
| PARTITIONED BY (                                   |
|   `dt` string)                                     |
| ROW FORMAT SERDE                                   |
|   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  |
| WITH SERDEPROPERTIES (                             |
|   'hoodie.query.as.ro.table'='false',              |
|   'path'='/tmp/hudi/test_hudi_flink_mor',          |
|   'primaryKey'='id',                               |
|   'type'='mor')                                    |
| STORED AS INPUTFORMAT                              |
|   'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'  |
| OUTPUTFORMAT                                       |
|   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
| LOCATION                                           |
|   'hdfs://cluster1/tmp/hudi/test_hudi_flink_mor'   |
| TBLPROPERTIES (                                    |
|   'connector'='hudi',                              |
|   'hive_sync.conf.dir'='/usr/hdp/3.1.0.0-78/hive/conf',  |
|   'hoodie.datasource.write.hive_style_partitioning'='true',  |
|   'hoodie.datasource.write.keygenerator.class'='org.apache.hudi.keygen.ComplexAvroKeyGenerator',  |
|   'hoodie.datasource.write.recordkey.field'='id',  |
|   'path'='/tmp/hudi/test_hudi_flink_mor',          |
|   'spark.sql.create.version'='spark2.4.4',         |
|   'spark.sql.sources.provider'='hudi',             |
|   'spark.sql.sources.schema.numPartCols'='1',      |
|   'spark.sql.sources.schema.numParts'='1',         |
|   'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"price","type":"integer","nullable":true,"metadata":{}},{"name":"ts","type":"integer","nullable":true,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}',  |
|   'spark.sql.sources.schema.partCol.0'='dt',       |
|   'table.type'='MERGE_ON_READ',                    |
|   'transient_lastDdlTime'='1667373370')            |
+----------------------------------------------------+
copy

Sync Hive

Insert a few pieces of data and see if it will trigger Hive synchronization

insert into test_hudi_flink_mor values (1,'hudi',10,100,'2022-10-31'),(2,'hudi',10,100,'2022-10-31');
copy

Sure enough, the default synchronization, the table structure is the same as before. The synchronized table is an external table by default. You can configure whether it is an external table through the configuration item hoodie.datasource.hive_sync.create_managed_table

COW table

build table

CREATE TABLE test_hudi_flink_cow (
  id int PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  price int,
  ts int,
  dt VARCHAR(10)
)
PARTITIONED BY (dt)
WITH (
  'connector' = 'hudi',
  'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
  'hoodie.datasource.write.recordkey.field' = 'id',
  'hoodie.datasource.write.hive_style_partitioning' = 'true',
  'hive_sync.conf.dir'='/usr/hdp/3.1.0.0-78/hive/conf'
);
copy

There is no path specified here. Take a look at the table structure in Hive. The path is the library path + table name: hdfs://cluster1/warehouse/tablespace/managed/hive/test_flink/test_hudi_flink_cow, which is more in line with the usual usage habits, after all, less A configuration item, and the path is unified and easy to manage, and it is not easy to make mistakes.

+----------------------------------------------------+
|                   createtab_stmt                   |
+----------------------------------------------------+
| CREATE TABLE `test_hudi_flink_cow`(                |
|   `_hoodie_commit_time` string,                    |
|   `_hoodie_commit_seqno` string,                   |
|   `_hoodie_record_key` string,                     |
|   `_hoodie_partition_path` string,                 |
|   `_hoodie_file_name` string,                      |
|   `id` int,                                        |
|   `name` string,                                   |
|   `price` int,                                     |
|   `ts` int)                                        |
| PARTITIONED BY (                                   |
|   `dt` string)                                     |
| ROW FORMAT SERDE                                   |
|   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  |
| WITH SERDEPROPERTIES (                             |
|   'hoodie.query.as.ro.table'='true',               |
|   'path'='hdfs://cluster1/warehouse/tablespace/managed/hive/test_flink/test_hudi_flink_cow',  |
|   'primaryKey'='id')                               |
| STORED AS INPUTFORMAT                              |
|   'org.apache.hudi.hadoop.HoodieParquetInputFormat'  |
| OUTPUTFORMAT                                       |
|   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
| LOCATION                                           |
|   'hdfs://cluster1/warehouse/tablespace/managed/hive/test_flink/test_hudi_flink_cow' |
| TBLPROPERTIES (                                    |
|   'connector'='hudi',                              |
|   'hive_sync.conf.dir'='/usr/hdp/3.1.0.0-78/hive/conf',  |
|   'hoodie.datasource.write.hive_style_partitioning'='true',  |
|   'hoodie.datasource.write.keygenerator.class'='org.apache.hudi.keygen.ComplexAvroKeyGenerator',  |
|   'hoodie.datasource.write.recordkey.field'='id',  |
|   'path'='hdfs://cluster1/warehouse/tablespace/managed/hive/test_flink/test_hudi_flink_cow',  |
|   'spark.sql.create.version'='spark2.4.4',         |
|   'spark.sql.sources.provider'='hudi',             |
|   'spark.sql.sources.schema.numPartCols'='1',      |
|   'spark.sql.sources.schema.numParts'='1',         |
|   'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"price","type":"integer","nullable":true,"metadata":{}},{"name":"ts","type":"integer","nullable":true,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}',  |
|   'spark.sql.sources.schema.partCol.0'='dt',       |
|   'transient_lastDdlTime'='1667375710')            |
+----------------------------------------------------+
copy

Sync Hive

insert into test_hudi_flink_cow values (1,'hudi',10,100,'2022-10-31'),(2,'hudi',10,100,'2022-10-31');
copy

Because the names are the same, the results of synchronization cannot see changes

Consistency verification

Write a few pieces of data to each table through Spark SQL, and then use Spark, Hive, and Flink to query

insert into test_hudi_flink_mor values (3,'hudi',10,100,'2022-10-31');
insert into test_hudi_flink_mor_ro values (4,'hudi',10,100,'2022-10-31');
insert into test_hudi_flink_mor_rt values (5,'hudi',10,100,'2022-10-31');
insert into test_hudi_flink_cow values (3,'hudi',10,100,'2022-10-31');
copy

After verification, there is no problem with consistency. Unfortunately, Flink SQL query results still do not contain metadata fields, it is not clear why it is designed this way~

exception resolution

exception information

Caused by: java.lang.NoSuchMethodError: org.apache.hudi.org.apache.jetty.util.compression.DeflaterPool.ensurePool(Lorg/apache/hudi/org/apache/jetty/util/component/Container;)Lorg/apache/hudi/org/apache/jetty/util/compression/DeflaterPool;
	at org.apache.hudi.org.apache.jetty.websocket.server.WebSocketServerFactory.<init>(WebSocketServerFactory.java:184) ~[hudi-flink1.14-bundle-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
copy

The reason for the exception is that the jetty version in the Hudi package is inconsistent with the jetty version in the hadoop environment, resulting in a conflict. Related PR: [HUDI-4960] Upgrade jetty version for timeline server , This PR upgrades the version of jetty. The solution is to make the jetty version in the hadoop environment consistent with the version in the Hudi package. One way is to make Flink tasks not depend on jetty-related jar s in the Hadoop environment. Here, because HADOOP_CLASSPATH is configured, it cannot be solved after trying for a while. The other is to upgrade the jetty version in the Hadoop environment, but I tried it. Since the Hadoop environment components depend on many jar packages, simply upgrading the jetty version will cause other problems. I have no choice but to roll back jetty in Hudi to For the original version, the easiest way is to reset directly to the location before this PR.

Summarize

This article introduces how Flink SQL reads and writes Hudi and synchronizes Hive tables through Hudi HMS Catalog, and describes the benefits of Hudi HMS Catalog. I think this is the perfect way at present, and it is strongly recommended for everyone to use it.

Related Reading

This article was published by Dong Kelun in Lun Shao's Blog , using Attribution-NonCommercial-No Derivatives 3.0 License.

For non-commercial reprints, please indicate the author and source. For commercial reprints, please contact the author himself.

Title of this article: Flink SQL reads and writes Hudi through Hudi HMS Catalog and synchronizes Hive tables (this method is strongly recommended)

Link to this article: https://dongkelun.com/2022/11/02/flinkHudiHmsCatalog/

Tags: Big Data SQL flink Spark hive

Posted by DillyDong on Mon, 14 Nov 2022 21:58:59 +1030