MySQL canal Kafka data replication details


MySQL is widely used in the storage database of massive business. In the era of big data, we urgently need to analyze the massive data, but it is obviously unrealistic to analyze the big data on MySQL, which will affect the operation stability of the business system. If we want to analyze these data in real time, we need to copy them to the data system suitable for OLAP in real time. This paper introduces a CDC tool, canal, which is open source by Alibaba and widely used in Alibaba's production system. It simulates MySQL Slave node and obtains the changed binlog in real time. We will post the binlog obtained by canal to kafka for subsequent system consumption.

This article is based on Ubuntu 16.04 LTS

Environmental description

  • Java 8+
  • Build ZooKeeper cluster
  • Set up Kafka cluster

If ZooKeeper cluster and Kafka cluster are not built, please refer to:

Build ZooKeeper cluster under Linux

Build kafka cluster under Linux

1, Source MySQL configuration

1. Enable Binlog write function

For self built MySQL, you need to turn on the Binlog write function first, and configure Binlog format to ROW mode, my CNF is configured as follows

$ vim /etc/my.cnf

log-bin=mysql-bin # Enable binlog
binlog-format=ROW # Select ROW mode
server_id=1 # MySQL replacement configuration needs to be defined and cannot be duplicate with the slaveId of canal

#Restart MySQL database
$ service mysql restart

2. Create and authorize canal Users

Authorize the canal linked MySQL account to have the permission to act as a MySQL slave. If there is an existing account, you can grant it directly

> CREATE USER canal IDENTIFIED BY 'canal';  
-->  GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;

2, Install ZooKeeper

For details, please refer to: Build ZooKeeper cluster under Linux

1. Start zkServer on all nodes

$ start&

2. View node status

$ status

3, Install KafKa

For details, please refer to: Build kafka cluster under Linux

1. Start kafka on all nodes

#Start the Kafka cluster from the background (all three need to be started)
$ cd /usr/local/kafka_2.13-2.7.0/bin #Enter the bin directory of kafka 
$ ./ -daemon ../config/

#Check whether kafka is started
$ jps

2. Creating and viewing topics

$ cd /usr/local/kafka_2.13-2.7.0/bin #Enter the bin directory of kafka 

#Create Topic
$ ./ --create --zookeeper,, --replication-factor 2 --partitions 1 --topic hello_canal
# --Create means create
# --The parameter after zookeeper is the cluster node of zk
# --Replication factor 2 indicates the number of replicas
# --partitions 1 indicates the number of partitions
# --topic hello_canal indicates that the topic name is hello_canal

#To view the list of topic s:
$ ./ --list --zookeeper,,

#To view the specified topic:
$ ./ --describe --zookeeper,, --topic hello_canal
Topic: hello_canal	PartitionCount: 1	ReplicationFactor: 2	Configs: 
	Topic: hello_canal	Partition: 0	Leader: 0	Replicas: 0,2	Isr: 0,2

3. Verify that the Kafka cluster is started successfully

# Start zkCli(zookeeper client) on any zk node

$ [zk: localhost:2181(CONNECTED) 0] ls /brokers/ids
[0, 1, 2]

# Three nodes can see fkaka ID indicates that the three kafka nodes are started normally

4, Install canal server

(1) Download and unzip

1. Download

Go to the official website of Canal and download the latest compressed package: Canal deployer-latest. Tar gz

$ cd /data
$ wget

2. Decompress

$ mkdir /usr/local/canal

$ tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /usr/local/canal

(2) Modify profile

1. Modify instance configuration file

$ vim /usr/local/canal/conf/hello_canal/

## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info, which needs to be changed to its own database information
canal.instance.master.address = = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = = 
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password, which needs to be changed to its own database information
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName = 
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*
# mq config
# dynamic topic route by schema or table regex,mytest2\\..*,.*\\..*

canal.instance.connectionCharset represents the encoding method of the database, corresponding to the encoding type in java, such as UTF-8, GBK, ISO-8859-1

If the system is a cpu, you need to set canal instance. parser. Parallel is set to false

2. Modify canal configuration file

$ vim /usr/local/canal/conf/

# ...
# Optional: TCP (default), kafka, RocketMQ
canal.serverMode = kafka
# ...
# kafka/rocketmq cluster configuration. If your mq has already done cluster configuration, you need to write all the IP: ports of all nodes below =,, = 0
# In the flagMessage mode, you can increase the value, but do not exceed the upper limit of MQ message body size = 16384 = 1048576
# In flatMessage mode, please increase the value, 50-200 is recommended = 1 = 33554432
# The batch size of Canal is 50K by default. Due to the maximum message body limit of kafka, please do not exceed 1m (below 900K) = 50
# Timeout time of Canal get data, unit: MS; if it is blank, it is unlimited timeout = 100
# Is it a flat json format object = false = none = all
# Whether kafka message delivery uses transactions = false

(3) Start canal

1. Start

$ cd /usr/local/canal/
$ sh bin/

2. View server logs

$ vim /usr/local/canal/logs/canal/canal.log
2021-02-22 15:45:24.422 [main] INFO - ## set default uncaught exception handler
2021-02-22 15:45:24.559 [main] INFO - ## load canal configurations
2021-02-22 15:45:24.624 [main] INFO - ## start the canal server.
2021-02-22 15:45:24.834 [main] INFO - ## start the canal server[]
2021-02-22 15:45:30.351 [main] INFO - ## the canal server is running now ......

3. View the log of instance

$ vim /usr/local/canal/logs/hello_canal/hello_canal.log
2021-02-22 16:54:24.284 [main] INFO - Loading properties file from class path resource []
2021-02-22 16:54:24.308 [main] INFO - Loading properties file from class path resource [hello_canal/]
2021-02-22 16:54:25.143 [main] INFO - Loading properties file from class path resource []
2021-02-22 16:54:25.144 [main] INFO - Loading properties file from class path resource [hello_canal/]
2021-02-22 16:54:26.586 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-hello_canal
2021-02-22 16:54:26.642 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2021-02-22 16:54:26.642 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2021-02-22 16:54:27.057 [destination = hello_canal , address = , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2021-02-22 16:54:27.176 [destination = hello_canal , address = , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
2021-02-22 16:54:27.179 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....

4. Shut down

$ cd /usr/local/canal/
$ sh bin/

5, Check the synchronization of Canal data

(1) View via Kafka consumer

1. Launch Kafka consumer

Create a consumer on another server:

$ cd /usr/local/kafka_2.13-2.7.0/bin
$ ./ --bootstrap-server,, --topic hello_canal --from-beginning

Note: starting from version 2.2, Kafka topic The − − zookeeper parameter in the SH script is marked "obsolete", and the − − bootstrap server parameter is recommended.

The port is also changed from the previous zookeeper communication port 2181 to kafka communication port 9092.

2. Modify data on the source mysql database

mysql> use test;
mysql> insert into fk values(13,'hello_canal',19);

3. Consumer window output

{"data":[{"id":"13","name":"hello_canal","age":"19"}],"database":"test","es":1614252283000,"id":2,"isDdl":false,"mysqlType":{"id":"int(10) unsigned","name":"varchar(100)","age":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"age":4},"table":"fk","ts":1614252283248,"type":"INSERT"}

It indicates that canal has successfully captured the binlog of the source MySQL change data and delivered it to the Hello of kafka cluster_ In the canal theme.

6, Problems encountered

(1) Canadian synchronizes mysql binlog to kafka. After startup, the instance log reports TimeoutException: Failed to update metadata after 60000 ms

1. Detailed error reporting information:

Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

2. Possible causes and solutions of error reporting:

Error reason 1: kafka configuration file config / server In properties, listeners = plaintext: / / your host. Name: 9092 and advertisement listeners=PLAINTEXT:// is not configured, which makes canal unable to communicate with kafka in socket.

Solution: supplement the above two configurations and restart kafka.

Error reason 2: configuration file conf / canal kafka. In properties bootstrap. Servers = x.x.x.x: 9092 does not match all kafka nodes. The error is reported because there is only one kafka node, and I started three kafka nodes in cluster mode.

Solution: modify conf / canal Kafka. In properties bootstrap. Servers = x.x.x.1:9092, x.x.x.2:9092, x.x.x.3:9092. Restart canal.

(2) canal cannot stop

1. Detailed error reporting information:

bin/ 52: kill: No such process

bin/ 58: [: unexpected operator
bin/ 63: bin/ let: not found

2. Error reporting reason and scheme:

Error: let: not found

Because ubuntu points to the bin/dash interpreter by default. Dash is a castrated version of bash, and its functions are far less powerful and rich than bash. And dash does not support let and i + +

Solution: sudo dpkg reconfigure dash, select "No" to replace dash with bash.

reference resources

[1] Canal QuickStart[]

[2] Canal Kafka RocketMQ QuickStart[]

[3] Using Canal to deliver MySQL Binlog to Kafka[]

[4] canal real-time synchronization of mysql table data to Kafka[]

For more articles on big data, distributed, storage, block chain, Linux, please pay attention to my WeChat official account: asympTech progressive line laboratory.

Tags: MySQL Big Data Zookeeper kafka canal

Posted by jyhm on Fri, 15 Apr 2022 02:34:48 +0930