Write Spark's UDF function to solve the scientific notation problem when converting large numbers in Hive tables [bigint, double, float, decimal, etc.] to string s [Java]

Write Spark's UDF function to solve the scientific notation problem when converting large numbers in Hive tables [bigint, double, float, decimal, etc.] to string s [Java]

background

This is a problem that the author encountered when developing the platform. When using Spark to convert the data of Hive table, scientific notation will appear when converting large numbers [bigint, double, float, decimal, etc.] into string s. Take a chestnut:

select cast(col1 as string) as col1_1

123456789.123456789→1.23456789123456789E8

When col1 is a large number, the problem of scientific notation will occur probabilistically. After searching for a long time, I could not find any parameters to turn off this function.

Since the function of this Java class is Hive2FTP fixed-length double file, it will eventually write a Data file to FTP, which obviously cannot accept this scientific notation method. In order to make the data of the file look consistent with the most original upstream database and Hive into the lake, a way must be found to transfer the data converted to scientific notation back.

As a last resort, I wrote this UDF function. After testing, the effect is good and the expected function can be achieved. The performance is also acceptable. The processing time of 3000w data is about 2 minutes [not including the waiting time for Yarn].

Java Demo

Without further ado, let's start with pom.xml dependencies:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>zhiyong_study</artifactId>
        <groupId>com.zhiyong</groupId>
        <version>1.0.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spark_study</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <scala.version>2.12.12</scala.version>
        <scala.binary.version>2.12</scala.binary.version>
        <spark.version>3.3.0</spark.version>
        <encoding>UTF-8</encoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <!-- Add to spark rely -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>


        <!--        can use Lombok of@annotation-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.20</version>
        </dependency>

        <!--        MySQL driver package-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.28</version>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <testSourceDirectory>src/test/java</testSourceDirectory>
        <plugins>
            <!-- Compile the plugin -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <!--<encoding>${project.build.sourceEncoding}</encoding>-->
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
            <!-- beat jar package plugin(will include all dependencies) -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <!--
                                        zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!-- can be set jar package entry class(optional) -->
                                    <!--<mainClass>com.aa.flink.StreamWordCount</mainClass>-->
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

On to the code:

package com.zhiyong.demo.udf;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;

/**
 * @program: zhiyong_study
 * @description: Use Java to write UDF to solve the scientific notation problem when converting large numbers to strings in Spark
 * @author: zhiyong
 * @create: 2022-10-31 20:23
 **/
public class Num2Str implements UDF1<Object, String> {
    @Override
    public String call(Object num) {
        String string = num.toString();
        boolean flg = false;
        if (string.contains("-")) {
            flg = true;
            string.replaceAll("-", "");
        }
        String result = "123456789.123456789E5";
        StringBuilder strb = new StringBuilder();
        String sub1 = null;
        String sub2 = null;
        strb.delete(0, strb.length());

        String[] split = string.split("E");

        String num1 = split[0];
        int num2 = Integer.parseInt(split[1]);

        if (num2 > 0) {//move the decimal point to the right
            if (num1.contains(".")) {//with decimal point
                String[] split2 = num1.split("\\.");//truncate
                if (split2[1].length() > num2) {//prevent null pointers
                    sub1 = split2[1].substring(0, num2);
                    sub2 = split2[1].substring(num2, split2[1].length());
                    strb.append(split2[0]).append(sub1).append(".").append(sub2);
                } else {//fill 0
                    strb.append(split2[0]).append(split2[1]).append(".");
                    for (int i = 0; i < num2 - split2.length; i++) {
                        strb.append("0");
                    }
                }

            } else {//no decimal point
                strb.append(split[0]);
                for (int i = 0; i < num2; i++) {
                    strb.append("0");
                }
            }

        } else if (0 == num2) {//do not move
            strb.append(split[0]);
        } else {//move the decimal point to the left
            num2 = -num2;
            strb.append("0");
            if (split[0].contains(".")) {//with decimal point
                String[] split2 = split[0].split("\\.");//truncate
                for (int i = 0; i < num2 - 1; i++) {
                    strb.append("0");//placeholder
                }
                strb.append(split[0]);
            }
        }

        result = strb.toString();

        if (flg) {
            result = "-" + result;
        }

        /**
         * There must be a reason not to
         */
        //result = new BigDecimal(num.toString()).toString();

        return result;
    }

    public static void main(String[] args) throws Exception {
        SparkSession sc = SparkSession.builder()
                .appName("Num2Str")
                .master("local[*]")
                .enableHiveSupport()
                .getOrCreate();

        Dataset<Row> df1 = sc.sql("select col1 from db1.tb1");//Get to DataFrame

        Num2Str zhiyongUdf1 = new Num2Str();

        sc.udf().register("zhiyongUdf1", zhiyongUdf1, DataTypes.StringType);

        df1.createTempView("temp1");

        Dataset<Row> df2;

        /**
         * It doesn't matter when writing Hive tables, when writing files, it may be 123456789.123456789→1.23456789123456789E8
         * Causes the data seen from the file to be inconsistent with the Hive table
         * cast(col1 as string) as col1_1 This approach will encounter problems with scientific notation
         * It is known that big numbers such as bigint, double, float, decimal, etc. → string will change to scientific notation with probability
         */
        if (true) {
            df2 = sc.sql("select zhiyongUdf1(col1) from temp1");
        }

        df2.show();
        
        df2.write().text("hdfs://zhiyong1/temp1/day20221031");

        JavaRDD<Row> rowJavaRDD = df2.toJavaRDD();

        rowJavaRDD.saveAsTextFile("hdfs://zhiyong1/temp1/day20221031");

    }
}

The content of the production-level code after desensitization is roughly the same as above. In this way, you can get the shaped DataFrame, and then save it as a file without seeing scientific notation.

The second opening of the platform component is a high-risk position. If you dare to fail to perform various functions, you will be fired! ! ! Can you understand?

end words

Spark officially does not recommend writing UDFs, because UDFs run ordinary JVM multi-threaded tasks at the bottom, and they will not get Spark's CataLyst optimization, nor can they enjoy the advantages of Spark's automatically managed memory. The performance is better than Spark's own functions. Much worse. It was really a last resort that I was forced to do this. If you can set parameters, in fact, the performance is much better.

As for why BigDecimal is not used for conversion, it is inconvenient to disclose.

Please indicate the source: https://lizhiyong.blog.csdn.net/article/details/127624005

Tags: Java SQL Spark hdfs hive

Posted by Drizzt321 on Wed, 02 Nov 2022 01:11:35 +1030