Hello everyone, I am Bucai Chen~
There is a project with a data volume of up to 50 million, but because the data in the report is not accurate, and the business database and the report database are operated across databases, SQL cannot be used for synchronization. At that time, the plan was to use mysqldump or storage to synchronize, but after trying, I found that these solutions were not practical:
Recommended Java Engineer Technical Guide: https://github.com/chenjiabin...
Pay attention to the public account: code ape technology column, reply keywords: 1111 Get Ali's internal Java performance tuning manual!
mysqldump: Not only does backup take time, but synchronization also takes time, and during the backup process, there may be data output (that is, synchronization is equal to no synchronization)
Storage method: This efficiency is too slow. It’s okay if the amount of data is small. When we use this method, we can only synchronize 2,000 pieces of data in three hours...
The differences between several middlewares for common data heterogeneity are as follows:
Ali’s Canal was introduced earlier: Actual combat! Spring Boot integrates Ali's open source middleware Canal to realize incremental data synchronization!
Today I will introduce another good middleware: DataX
Introduction to DataX
DataX is the open source version of Alibaba Cloud DataWorks data integration, which is mainly used to realize offline synchronization between data. DataX is committed to realizing stable and efficient data synchronization functions among various heterogeneous data sources (that is, different databases) including relational databases (MySQL, Oracle, etc.), HDFS, Hive, ODPS, HBase, FTP, etc.
- In order to solve the synchronization problem of heterogeneous data sources, DataX turns the complex mesh synchronization link into a star data link, and DataX is responsible for connecting various data sources as an intermediate transmission carrier;
- When you need to access a new data source, you only need to connect this data source to DataX, and it can be seamlessly synchronized with the existing data source.
DataX3.0 framework design
DataX adopts the Framework + Plugin architecture, and the data source reading and writing abstraction is called Reader/Writer plug-in, which is incorporated into the entire synchronization framework.
Role | effect |
---|---|
Reader (acquisition module) | Responsible for collecting data from data sources and sending the data to Framework. |
Writer (write module) | Responsible for continuously fetching data from the Framework and writing the data to the destination. |
Framework (middleman) | Responsible for connecting Reader and Writer as the data transmission channel between the two, and dealing with core technical issues such as buffering, flow control, concurrency, and data conversion. |
DataX3.0 Core Architecture
DataX completes a single data synchronization job, which we call Job. After DataX receives a Job, it will start a process to complete the entire job synchronization process. The DataX Job module is the central management node of a single job, which undertakes functions such as data cleaning, subtask segmentation, and TaskGroup management.
- After the DataX Job is started, the Job will be divided into multiple small Task s (subtasks) according to the segmentation strategies of different sources for concurrent execution.
- Then the DataX Job will call the Scheduler module to reassemble the split Tasks into a TaskGroup (Task group) according to the configured concurrency
- Each Task is started by the TaskGroup. After the Task is started, the Reader --> Channel --> Writer thread will be started to complete the task synchronization.
- After the DataX job is started, the Job will monitor the TaskGroup and wait for all TaskGroups to complete, then the Job will exit successfully (the value is not 0 when it exits abnormally)
DataX scheduling process:
- First, the DataX Job module will be divided into several Tasks according to the sub-database and sub-table, and then calculate how many TaskGroup s need to be allocated according to the number of concurrency configured by the user;
- Calculation process: Task / Channel = TaskGroup, and finally the TaskGroup runs the Task (task) according to the assigned concurrency
Data Synchronization with DataX
Preparation:
- JDK (1.8 or above, 1.8 recommended)
- Python (both versions 2 and 3 are available)
- Apache Maven 3.x (Compile DataX) (manually packaged and used, no installation is required to use the tar package)
CPU name | operating system | IP address | package |
---|---|---|---|
MySQL-1 | CentOS 7.4 | 192.168.1.1 | jdk-8u181-linux-x64.tar.gz datax.tar.gz |
MySQL-2 | CentOS 7.4 | 192.168.1.2 |
Install JDK s:
download link: https://www.oracle.com/java/t... (Oracle account needs to be created)
[root@MySQL-1 ~]# ls anaconda-ks.cfg jdk-8u181-linux-x64.tar.gz [root@MySQL-1 ~]# tar zxf jdk-8u181-linux-x64.tar.gz [root@DataX ~]# ls anaconda-ks.cfg jdk1.8.0_181 jdk-8u181-linux-x64.tar.gz [root@MySQL-1 ~]# mv jdk1.8.0_181 /usr/local/java [root@MySQL-1 ~]# cat <<END >> /etc/profile export JAVA_HOME=/usr/local/java export PATH=$PATH:"$JAVA_HOME/bin" END [root@MySQL-1 ~]# source /etc/profile [root@MySQL-1 ~]# java -version
- Since the Python 2.7 package comes with CentOS 7, no installation is required.
Install DataX software on Linux
[root@MySQL-1 ~]# wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz [root@MySQL-1 ~]# tar zxf datax.tar.gz -C /usr/local/ [root@MySQL-1 ~]# rm -rf /usr/local/datax/plugin/*/._* # Need to delete hidden files (important)
- When not removed, it may output: [/usr/local/datax/plugin/reader/._drdsreader/plugin.json] does not exist. Please check your configuration file.
verify:
[root@MySQL-1 ~]# cd /usr/local/datax/bin [root@MySQL-1 ~]# python datax.py ../job/job.json # Used to verify whether the installation was successful
output:
2021-12-13 19:26:28.828 [job-0] INFO JobContainer - PerfTrace not enable! 2021-12-13 19:26:28.829 [job-0] INFO StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.060s | All Task WaitReaderTime 0.068s | Percentage 100.00% 2021-12-13 19:26:28.829 [job-0] INFO JobContainer - Task start time : 2021-12-13 19:26:18 mission end time : 2021-12-13 19:26:28 The total time spent on the task : 10s Task average flow : 253.91KB/s record write speed : 10000rec/s Total number of records read : 100000 Total number of read and write failures : 0
Basic use of DataX
Check out the templates for streamreader \--> streamwriter:
[root@MySQL-1 ~]# python /usr/local/datax/bin/datax.py -r streamreader -w streamwriter
output:
DataX (DATAX-OPENSOURCE-3.0), From Alibaba ! Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved. Please refer to the streamreader document: https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md Please refer to the streamwriter document: https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md Please save the following configuration as a json file and use python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json to run the job. { "job": { "content": [ { "reader": { "name": "streamreader", "parameter": { "column": [], "sliceRecordCount": "" } }, "writer": { "name": "streamwriter", "parameter": { "encoding": "", "print": true } } } ], "setting": { "speed": { "channel": "" } } } }
Write the json file according to the template
[root@MySQL-1 ~]# cat <<END > test.json { "job": { "content": [ { "reader": { "name": "streamreader", "parameter": { "column": [ # Synchronized column names (* means all) { "type":"string", "value":"Hello." }, { "type":"string", "value":"Hebei Peng Yuyan" }, ], "sliceRecordCount": "3" # print quantity } }, "writer": { "name": "streamwriter", "parameter": { "encoding": "utf-8", # coding "print": true } } } ], "setting": { "speed": { "channel": "2" # concurrent (ie sliceRecordCount * channel = result) } } } }
Output: (If you copy what I said above, you need to remove the content with #)
Install the MySQL database
Install on two hosts respectively:
[root@MySQL-1 ~]# yum -y install mariadb mariadb-server mariadb-libs mariadb-devel [root@MySQL-1 ~]# systemctl start mariadb # Install the MariaDB database [root@MySQL-1 ~]# mysql_secure_installation # initialization NOTE: RUNNING ALL PARTS OF THIS SCRIPT IS RECOMMENDED FOR ALL MariaDB SERVERS IN PRODUCTION USE! PLEASE READ EACH STEP CAREFULLY! Enter current password for root (enter for none): # Enter directly OK, successfully used password, moving on... Set root password? [Y/n] y # Configure root password New password: Re-enter new password: Password updated successfully! Reloading privilege tables.. ... Success! Remove anonymous users? [Y/n] y # remove anonymous user ... skipping. Disallow root login remotely? [Y/n] n # Allow root remote login ... skipping. Remove test database and access to it? [Y/n] y # Remove test database ... skipping. Reload privilege tables now? [Y/n] y # reload table ... Success!
1) Prepare to synchronize data (both hosts to be synchronized must have this table)
MariaDB [(none)]> create database `course-study`; Query OK, 1 row affected (0.00 sec) MariaDB [(none)]> create table `course-study`.t_member(ID int,Name varchar(20),Email varchar(30)); Query OK, 0 rows affected (0.00 sec)
Because the DataX program is used for synchronization, permissions need to be opened on the databases of both parties:
grant all privileges on *.* to root@'%' identified by '123123'; flush privileges;
2) Create a stored procedure:
DELIMITER $$ CREATE PROCEDURE test() BEGIN declare A int default 1; while (A < 3000000)do insert into `course-study`.t_member values(A,concat("LiSa",A),concat("LiSa",A,"@163.com")); set A = A + 1; END while; END $$ DELIMITER ;
3) Call the stored procedure (configured in the data source, verify the use of synchronization):
call test();
Synchronize MySQL data through DataX
1) Generate a template for MySQL to MySQL synchronization:
[root@MySQL-1 ~]# python /usr/local/datax/bin/datax.py -r mysqlreader -w mysqlwriter { "job": { "content": [ { "reader": { "name": "mysqlreader", # read end "parameter": { "column": [], # Columns that need to be synchronized (* means all columns) "connection": [ { "jdbcUrl": [], # connection information "table": [] # connection table } ], "password": "", # connect user "username": "", # connection password "where": "" # describe filter } }, "writer": { "name": "mysqlwriter", # write side "parameter": { "column": [], # columns to be synchronized "connection": [ { "jdbcUrl": "", # connection information "table": [] # connection table } ], "password": "", # connection password "preSql": [], # Before syncing. Things to do "session": [], "username": "", # connect user "writeMode": "" # operation type } } } ], "setting": { "speed": { "channel": "" # Specify the number of concurrency } } } }
2) Write the json file:
[root@MySQL-1 ~]# vim install.json { "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "root", "password": "123123", "column": ["*"], "splitPk": "ID", "connection": [ { "jdbcUrl": [ "jdbc:mysql://192.168.1.1:3306/course-study?useUnicode=true&characterEncoding=utf8" ], "table": ["t_member"] } ] } }, "writer": { "name": "mysqlwriter", "parameter": { "column": ["*"], "connection": [ { "jdbcUrl": "jdbc:mysql://192.168.1.2:3306/course-study?useUnicode=true&characterEncoding=utf8", "table": ["t_member"] } ], "password": "123123", "preSql": [ "truncate t_member" ], "session": [ "set session sql_mode='ANSI'" ], "username": "root", "writeMode": "insert" } } } ], "setting": { "speed": { "channel": "5" } } } }
3) Verify
[root@MySQL-1 ~]# python /usr/local/datax/bin/datax.py install.json
output:
2021-12-15 16:45:15.120 [job-0] INFO JobContainer - PerfTrace not enable! 2021-12-15 16:45:15.120 [job-0] INFO StandAloneJobContainerCommunicator - Total 2999999 records, 107666651 bytes | Speed 2.57MB/s, 74999 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 82.173s | All Task WaitReaderTime 75.722s | Percentage 100.00% 2021-12-15 16:45:15.124 [job-0] INFO JobContainer - Task start time : 2021-12-15 16:44:32 mission end time : 2021-12-15 16:45:15 The total time spent on the task : 42s Task average flow : 2.57MB/s record write speed : 74999rec/s Total number of records read : 2999999 Total number of read and write failures : 0
You can check in the destination database to see if the synchronization is complete.
- The above method is equivalent to complete synchronization, but when the amount of data is large, it is very painful to be interrupted during synchronization;
- So in some cases, incremental synchronization is quite important.
Incremental sync using DataX
The only difference between full synchronization and incremental synchronization using DataX is that incremental synchronization needs to use where for conditional filtering. (i.e., synchronously filtered SQL)
1) Write the json file:
[root@MySQL-1 ~]# vim where.json { "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "root", "password": "123123", "column": ["*"], "splitPk": "ID", "where": "ID <= 1888", "connection": [ { "jdbcUrl": [ "jdbc:mysql://192.168.1.1:3306/course-study?useUnicode=true&characterEncoding=utf8" ], "table": ["t_member"] } ] } }, "writer": { "name": "mysqlwriter", "parameter": { "column": ["*"], "connection": [ { "jdbcUrl": "jdbc:mysql://192.168.1.2:3306/course-study?useUnicode=true&characterEncoding=utf8", "table": ["t_member"] } ], "password": "123123", "preSql": [ "truncate t_member" ], "session": [ "set session sql_mode='ANSI'" ], "username": "root", "writeMode": "insert" } } } ], "setting": { "speed": { "channel": "5" } } } }
- The parts that need attention are: where (conditional filtering) and preSql (things to do before synchronization) parameters.
2) Verify:
[root@MySQL-1 ~]# python /usr/local/data/bin/data.py where.json
output:
2021-12-16 17:34:38.534 [job-0] INFO JobContainer - PerfTrace not enable! 2021-12-16 17:34:38.534 [job-0] INFO StandAloneJobContainerCommunicator - Total 1888 records, 49543 bytes | Speed 1.61KB/s, 62 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.002s | All Task WaitReaderTime 100.570s | Percentage 100.00% 2021-12-16 17:34:38.537 [job-0] INFO JobContainer - Task start time : 2021-12-16 17:34:06 mission end time : 2021-12-16 17:34:38 The total time spent on the task : 32s Task average flow : 1.61KB/s record write speed : 62rec/s Total number of records read : 1888 Total number of read and write failures : 0
View on the target database:
3) Based on the above data, perform incremental synchronization again:
mainly where Configuration:"where": "ID > 1888 AND ID <= 2888" # Incremental synchronization through conditional filtering Also need to add my above preSql delete(Because when I did the operation above truncate surface)