Integrate rabbitmq in SpringBoot

1. First introduce the workflow of RabbitMQ:


The producer sends a message to the switch;
The exchange binds the queue through the rules, and stores the message in the queue through the routing key;
The consumer obtains the message in the queue for consumption;

2. Integrate rabbitmq in SpringBoot

1. Add dependencies in springboot

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

2. Configure MQ in application.properties

#IP address of RabbitMQ
spring.rabbitmq.host=192.168.33.xx
#rabbitmq service port number
spring.rabbitmq.port=5672
#rabbitmq virtual host
spring.rabbitmq.virtual-host=/
#username
spring.rabbitmq.username=guest
#password
spring.rabbitmq.password=guest

spring.rabbitmq.listener.simple.prefetch=1
# auto answer
spring.rabbitmq.listener.simple.acknowledge-mode=auto
# Turn on auto-retry
spring.rabbitmq.listener.simple.retry.enabled=true
# maximum number of retries
spring.rabbitmq.listener.simple.retry.max-attempts=5
# maximum interval time
spring.rabbitmq.listener.simple.retry.max-interval=20000ms
# retry interval
spring.rabbitmq.listener.simple.retry.initial-interval=3000ms
# Multiplier Retry interval * Multiplier to get the next retry interval 3s 6s 12s 24s where 24s>20s go 20s
spring.rabbitmq.listener.simple.retry.multiplier=2
# refuse to requeue
spring.rabbitmq.listener.simple.default-requeue-rejected=false

3. Configure switches, queues and bindingKey

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitConfig {
    @Bean
    public DirectExchange directExchange(){
        return  new DirectExchange("DirectExchange-01",true,false);
    }

    @Bean
    public Queue directQueue(){
        return   QueueBuilder.durable("DirectQueue-01").build();
    }

    @Bean
    public Binding binding(){

        return  BindingBuilder.bind(directQueue()).to(directExchange()).with("Direct-RoutingKey-01");
    }
}

4. Write the consumer code

The process of the consumer receiving the message:

  1. The consumer connects to the RabbitMQ Broker, establishes a connection (Connection), and opens a channel (Channel).
  2. The consumer requests the RabbitMQ Broker to consume the messages in the corresponding queue, and may set the corresponding callback function and do some preparatory work
  3. Waiting for the RabbitMQ Broker to respond and deliver the message in the corresponding queue, the consumer receives the message.
  4. The consumer acknowledges (acks) the received message.
  5. RabbitMQ deletes the corresponding acknowledged message from the queue.
  6. Close the channel.
  7. Close the connection.
package com.example.consumer;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DirectConsumer {

    @RabbitHandler
    @RabbitListener(queues ="DirectQueue-01" )
    public void  process(Message message){

        String test = new  String(message.getBody());
        System.out.println(test);
    }
}

5. Write producer code

The process for the producer to send a message:

  1. The producer connects to RabbitMQ, establishes a TCP connection (Connection), and opens a channel (Channel)
  2. The producer declares an Exchange (exchange) and sets relevant properties, such as the type of the exchange, whether it is persistent, etc.
  3. The producer declares a queue and sets relevant properties, such as whether it is exclusive, whether it is persistent, whether it is automatically deleted, etc.
  4. The producer binds the exchange and the queue through bindingKey (binding Key)
  5. The producer sends a message to the RabbitMQ Broker, which contains routingKey (routing key), exchange and other information
  6. The corresponding exchange looks for a matching queue based on the received routingKey.
  7. If found, the message sent from the producer is stored in the corresponding queue.
  8. If not found, choose to discard or fall back to the producer according to the properties configured by the producer
  9. Close the channel.
  10. Close the connection.
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;

@Component
public class DirectProvider {
    @Resource
    private RabbitTemplate rabbitTemplate;
    public  void  send(){
        Message  message = new Message("Hello".getBytes());
        rabbitTemplate.send("DirectExchange-01","Direct-RoutingKey-01",message);
    }
}

6. Test class

package com.example;

import com.example.provider.DirectProvider;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
class APPTests {


    @Autowired
    DirectProvider directProvider;

    @Test
    void test() throws InterruptedException {

        directProvider.send();
        
        Thread.sleep(5000);
    }
}

Tags: Spring Boot RabbitMQ

Posted by riyaz on Thu, 14 Jul 2022 05:42:34 +0930