abstract
MySQL is widely used in the storage database of massive business. In the era of big data, we urgently need to analyze the massive data, but it is obviously unrealistic to analyze the big data on MySQL, which will affect the operation stability of the business system. If we want to analyze these data in real time, we need to copy them to the data system suitable for OLAP in real time. This paper introduces a CDC tool, canal, which is open source by Alibaba and widely used in Alibaba's production system. It simulates MySQL Slave node and obtains the changed binlog in real time. We will post the binlog obtained by canal to kafka for subsequent system consumption.
This article is based on Ubuntu 16.04 LTS
Environmental description
- Java 8+
- Build ZooKeeper cluster
- Set up Kafka cluster
If ZooKeeper cluster and Kafka cluster are not built, please refer to:
Build ZooKeeper cluster under Linux
Build kafka cluster under Linux
1, Source MySQL configuration
1. Enable Binlog write function
For self built MySQL, you need to turn on the Binlog write function first, and configure Binlog format to ROW mode, my CNF is configured as follows
$ vim /etc/my.cnf [mysqld] log-bin=mysql-bin # Enable binlog binlog-format=ROW # Select ROW mode server_id=1 # MySQL replacement configuration needs to be defined and cannot be duplicate with the slaveId of canal #Restart MySQL database $ service mysql restart
2. Create and authorize canal Users
Authorize the canal linked MySQL account to have the permission to act as a MySQL slave. If there is an existing account, you can grant it directly
> CREATE USER canal IDENTIFIED BY 'canal'; > GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; --> GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; > FLUSH PRIVILEGES;
2, Install ZooKeeper
For details, please refer to: Build ZooKeeper cluster under Linux
1. Start zkServer on all nodes
$ zkServer.sh start&
2. View node status
$ zkServer.sh status
3, Install KafKa
For details, please refer to: Build kafka cluster under Linux
1. Start kafka on all nodes
#Start the Kafka cluster from the background (all three need to be started) $ cd /usr/local/kafka_2.13-2.7.0/bin #Enter the bin directory of kafka $ ./kafka-server-start.sh -daemon ../config/server.properties #Check whether kafka is started $ jps
2. Creating and viewing topics
$ cd /usr/local/kafka_2.13-2.7.0/bin #Enter the bin directory of kafka #Create Topic $ ./kafka-topics.sh --create --zookeeper 192.168.1.113:2181,192.168.1.114:2181,192.168.1.115:2181 --replication-factor 2 --partitions 1 --topic hello_canal #explain # --Create means create # --The parameter after zookeeper 192.168.1.113:2181 is the cluster node of zk # --Replication factor 2 indicates the number of replicas # --partitions 1 indicates the number of partitions # --topic hello_canal indicates that the topic name is hello_canal #To view the list of topic s: $ ./kafka-topics.sh --list --zookeeper 192.168.1.113:2181,192.168.1.114:2181,192.168.1.115:2181 #To view the specified topic: $ ./kafka-topics.sh --describe --zookeeper 192.168.1.113:2181,192.168.1.114:2181,192.168.1.115:2181 --topic hello_canal Topic: hello_canal PartitionCount: 1 ReplicationFactor: 2 Configs: Topic: hello_canal Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0,2
3. Verify that the Kafka cluster is started successfully
# Start zkCli(zookeeper client) on any zk node $ sh $ZOOKEEPER_HOME/bin/zkCli.sh $ [zk: localhost:2181(CONNECTED) 0] ls /brokers/ids [0, 1, 2] # Three nodes can see fkaka ID indicates that the three kafka nodes are started normally
4, Install canal server
(1) Download and unzip
1. Download
Go to the official website of Canal and download the latest compressed package: Canal deployer-latest. Tar gz
$ cd /data $ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.deployer-1.1.5-SNAPSHOT.tar.gz
2. Decompress
$ mkdir /usr/local/canal $ tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /usr/local/canal
(2) Modify profile
1. Modify instance configuration file
$ vim /usr/local/canal/conf/hello_canal/instance.properties ## mysql serverId canal.instance.mysql.slaveId = 1234 #position info, which needs to be changed to its own database information canal.instance.master.address = 127.0.0.1:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #username/password, which needs to be changed to its own database information canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName = canal.instance.connectionCharset = UTF-8 #table regex canal.instance.filter.regex = .\*\\\\..\* # mq config canal.mq.topic=hello_canal # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0
canal.instance.connectionCharset represents the encoding method of the database, corresponding to the encoding type in java, such as UTF-8, GBK, ISO-8859-1
If the system is a cpu, you need to set canal instance. parser. Parallel is set to false
2. Modify canal configuration file
$ vim /usr/local/canal/conf/canal.properties # ... # Optional: TCP (default), kafka, RocketMQ canal.serverMode = kafka # ... # kafka/rocketmq cluster configuration. If your mq has already done cluster configuration, you need to write all the IP: ports of all nodes below canal.mq.servers = 192.168.1.113:9092,192.168.1.114:9092,192.168.1.115:9092 canal.mq.retries = 0 # In the flagMessage mode, you can increase the value, but do not exceed the upper limit of MQ message body size canal.mq.batchSize = 16384 canal.mq.maxRequestSize = 1048576 # In flatMessage mode, please increase the value, 50-200 is recommended canal.mq.lingerMs = 1 canal.mq.bufferMemory = 33554432 # The batch size of Canal is 50K by default. Due to the maximum message body limit of kafka, please do not exceed 1m (below 900K) canal.mq.canalBatchSize = 50 # Timeout time of Canal get data, unit: MS; if it is blank, it is unlimited timeout canal.mq.canalGetTimeout = 100 # Is it a flat json format object canal.mq.flatMessage = false canal.mq.compressionType = none canal.mq.acks = all # Whether kafka message delivery uses transactions canal.mq.transaction = false
(3) Start canal
1. Start
$ cd /usr/local/canal/ $ sh bin/startup.sh
2. View server logs
$ vim /usr/local/canal/logs/canal/canal.log
2021-02-22 15:45:24.422 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler 2021-02-22 15:45:24.559 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations 2021-02-22 15:45:24.624 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server. 2021-02-22 15:45:24.834 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.17.0.1(172.17.0.1):11111] 2021-02-22 15:45:30.351 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
3. View the log of instance
$ vim /usr/local/canal/logs/hello_canal/hello_canal.log
2021-02-22 16:54:24.284 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties] 2021-02-22 16:54:24.308 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [hello_canal/instance.properties] 2021-02-22 16:54:25.143 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties] 2021-02-22 16:54:25.144 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [hello_canal/instance.properties] 2021-02-22 16:54:26.586 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-hello_canal 2021-02-22 16:54:26.642 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$ 2021-02-22 16:54:26.642 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$ 2021-02-22 16:54:27.057 [destination = hello_canal , address = ubuntu-master.com/192.168.1.113:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position 2021-02-22 16:54:27.176 [destination = hello_canal , address = ubuntu-master.com/192.168.1.113:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status 2021-02-22 16:54:27.179 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
4. Shut down
$ cd /usr/local/canal/ $ sh bin/stop.sh
5, Check the synchronization of Canal data
(1) View via Kafka consumer
1. Launch Kafka consumer
Create a consumer on another server:
$ cd /usr/local/kafka_2.13-2.7.0/bin $ ./kafka-console-consumer.sh --bootstrap-server 192.168.1.113:9092,192.168.1.114:9092,192.168.1.115:9092 --topic hello_canal --from-beginning
Note: starting from version 2.2, Kafka topic The − − zookeeper parameter in the SH script is marked "obsolete", and the − − bootstrap server parameter is recommended.
The port is also changed from the previous zookeeper communication port 2181 to kafka communication port 9092.
2. Modify data on the source mysql database
mysql> use test; mysql> insert into fk values(13,'hello_canal',19);
3. Consumer window output
{"data":[{"id":"13","name":"hello_canal","age":"19"}],"database":"test","es":1614252283000,"id":2,"isDdl":false,"mysqlType":{"id":"int(10) unsigned","name":"varchar(100)","age":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"age":4},"table":"fk","ts":1614252283248,"type":"INSERT"}
It indicates that canal has successfully captured the binlog of the source MySQL change data and delivered it to the Hello of kafka cluster_ In the canal theme.
6, Problems encountered
(1) Canadian synchronizes mysql binlog to kafka. After startup, the instance log reports TimeoutException: Failed to update metadata after 60000 ms
1. Detailed error reporting information:
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
2. Possible causes and solutions of error reporting:
Error reason 1: kafka configuration file config / server In properties, listeners = plaintext: / / your host. Name: 9092 and advertisement listeners=PLAINTEXT://your.host.name:9092 is not configured, which makes canal unable to communicate with kafka in socket.
Solution: supplement the above two configurations and restart kafka.
Error reason 2: configuration file conf / canal kafka. In properties bootstrap. Servers = x.x.x.x: 9092 does not match all kafka nodes. The error is reported because there is only one kafka node, and I started three kafka nodes in cluster mode.
Solution: modify conf / canal Kafka. In properties bootstrap. Servers = x.x.x.1:9092, x.x.x.2:9092, x.x.x.3:9092. Restart canal.
(2) canal cannot stop
1. Detailed error reporting information:
bin/stop.sh: 52: kill: No such process bin/stop.sh: 58: [: unexpected operator bin/stop.sh: 63: bin/stop.sh let: not found
2. Error reporting reason and scheme:
Error: let: not found
Because ubuntu points to the bin/dash interpreter by default. Dash is a castrated version of bash, and its functions are far less powerful and rich than bash. And dash does not support let and i + +
Solution: sudo dpkg reconfigure dash, select "No" to replace dash with bash.
reference resources
[1] Canal QuickStart[https://github.com/alibaba/canal/wiki/QuickStart]
[2] Canal Kafka RocketMQ QuickStart[https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart]
[3] Using Canal to deliver MySQL Binlog to Kafka[ https://www.jianshu.com/p/93d9018e2fa1]
[4] canal real-time synchronization of mysql table data to Kafka[ https://www.cnblogs.com/zpan2019/p/13323035.html]
For more articles on big data, distributed, storage, block chain, Linux, please pay attention to my WeChat official account: asympTech progressive line laboratory.