CentOS7 installation configuration Kafka3.2.0 (including SpringBoot connection test)

catalogue

1, Install jdk (it can be installed without management)

2, Download Kafka

3, Configure Kafka

4, Start Kafka

5, SpringBoot connection test

1, Install jdk (it can be installed without management)

Kafka needs to rely on jdk. Install jdk here first

[1] Uninstall jdk

Sometimes we need to uninstall jdk for various reasons. Here is also a record

 # View the installed jdk
 rpm -qa|grep -i jdk

 # Uninstall jdk (uninstall according to your own package name)
 rpm -e --nodeps java-1.8.0-openjdk-headless-1.8.0.332.b09-1.el7_9.x86_64
 rpm -e --nodeps java-1.8.0-openjdk-1.8.0.332.b09-1.el7_9.x86_64
 rpm -e --nodeps java-1.8.0-openjdk-devel-1.8.0.332.b09-1.el7_9.x86_64
 # After uninstalling, check the jdk version and find that it cannot be found. The uninstallation is successful
 java -version

[2] Install jdk

Download on the official website: Java Downloads | Oracle Find tar Download at the end of GZ and upload to the server for installation and configuration

 # Unzip (my unzip directory is: /software/jdk/jdk1.8.0_333)
 tar -zxvf jdk-8u333-linux-x64.tar.gz
 ​
 # Open the configuration file for configuration
 vim /etc/profile
 ​
 # Add the following configuration (according to your own path)
 export JAVA_HOME=/software/jdk/jdk1.8.0_333
 export JRE_HOME=/$JAVA_HOME/jre
 export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
 export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
 ​
 # Make the configuration file effective
 source /etc/profile
 ​
 # Check whether the installation is successful
 java -version

2, Download Kafka

Official website: Apache Downloads

According to the corresponding version, use wget to download. If the download speed is too slow, you can download it on Windows through the downloader, and then upload it to the server

 wget https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz --no-check-certificate

Unzip after uploading

 tar -axvf kafka_2.13-3.2.0.tgz

Note: the decompression path should not be too long, otherwise an error may be reported when starting later, so here I will kafka_2.13-3.2.0 rename to kafka

 mv kafka_2.13-3.2.0 kafka

The directory structure after decompression is as follows:

3, Configure Kafka

This is only the configuration of single machine kafka, not the configuration of cluster

[1] Configure the log path of kafka

Open the server under the config directory Properties file to modify the configuration

 # Open profile
 vim config/server.properties
 ​
 # Configure the log storage location (create a Kafka logs folder in the corresponding directory when starting)
 log.dirs=/software/kafka/kafka-logs

[2] Configure Internet access

Or modify the server Properties file, find the following configuration and modify it. The location is about 34 lines

 # Release annotation
 listeners=PLAINTEXT://:9092
 ​
 # Modify the annotation and change it to your own server ip
 advertised.listeners=PLAINTEXT://42.104.249.49:9092

[3] Configure zookeeper data path

First create a zookeeper folder under the unzipped directory to store data, and then open zookeeper under the config directory Properties file to modify the configuration

 # Open profile
 vim config/zookeeper.properties
 ​
 # Configure the data storage location (when starting, you will create a zookeeper folder in the corresponding directory)
 dataDir=/software/kafka/zookeeper

[4] Put through port

 # Connect 9092 port
 firewall-cmd --zone=public --add-port=9092/tcp --permanent
 ​
 # service iptables restart 
 firewall-cmd --reload

Note that in addition to the centos port, the security group port should also be put through. For example, I use Tencent cloud here, so I need to open the corresponding security group port on the Tencent cloud console

4, Start Kafka

 # Start zookeeper
 bin/zookeeper-server-start.sh config/zookeeper.properties
 ​
 # Start kafka
 bin/kafka-server-start.sh config/server.properties
 ​
 # Production message (create a topic named testTopic)
 bin/kafka-console-producer.sh --topic testTopic --bootstrap-server localhost:9092
 ​
 # Listen for messages (reopen a terminal to listen for messages on the testTopic topic, send messages at the terminal that produces messages, and this terminal receives messages)
 bin/kafka-console-consumer.sh --topic testTopic --from-beginning --bootstrap-server localhost:9092

If the terminal listening for the message can receive the message, the installation and configuration are successful

5, SpringBoot connection test

The directory structure is as follows:

[1] Introduce pom dependency

 <dependency>
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka-clients</artifactId>
     <version>3.2.0</version>
 </dependency>
 <dependency>
     <groupId>org.projectlombok</groupId>
     <artifactId>lombok</artifactId>
 </dependency>

[2] Configure application yml

Bootstrap servers are configured as the IP of their own servers

 spring:
   kafka:
     # consumer
     consumer:
       group-id: foo
       auto-offset-reset: earliest
       bootstrap-servers: 42.104.249.49:9092
     # producer
     producer:
       bootstrap-servers: 42.104.249.49:9092
       key-serializer: org.apache.kafka.common.serialization.StringSerializer
       value-serializer: org.apache.kafka.common.serialization.StringSerializer

[3] Production messages

Production messages are tested through the controller access interface

@RestController
 public class KafkaProducerController {
     
     @Autowired
     private KafkaTemplate kafkaTemplate;
     
     @GetMapping("/send")
     public void send() {
         // testTopic is the theme, which is consistent with the terminal test above. It is convenient for testing. onestar is the message to be sent
         kafkaTemplate.send("testTopic","onestar");
     }
 }

[4] Monitor consumption messages

@Component
 @Slf4j
 public class KafkaConsumer {
     /**
      * kafka The topic of the listener is "testTopic", and the consumer group defaults to the one in the configuration file
      */
     @KafkaListener(topics = {"testTopic"})
     public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
         Optional<?> optional = Optional.ofNullable(consumerRecord.value());
         if (optional.isPresent()) {
             Object msg = optional.get();
             log.info("message:{}", msg);
         }
     }
 }

[5] Testing

For convenience, we can turn on the above listening message terminal, and then run the code to access http://localhost:8080/send Conduct tests

From the printed log, we can see that the message has been consumed, and the terminal listening for the message also prints the message

Tags: Java kafka Spring Boot

Posted by millwardt on Fri, 22 Jul 2022 06:26:28 +0930