1. First introduce the workflow of RabbitMQ:
The producer sends a message to the switch;
The exchange binds the queue through the rules, and stores the message in the queue through the routing key;
The consumer obtains the message in the queue for consumption;
2. Integrate rabbitmq in SpringBoot
1. Add dependencies in springboot
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
2. Configure MQ in application.properties
#IP address of RabbitMQ spring.rabbitmq.host=192.168.33.xx #rabbitmq service port number spring.rabbitmq.port=5672 #rabbitmq virtual host spring.rabbitmq.virtual-host=/ #username spring.rabbitmq.username=guest #password spring.rabbitmq.password=guest spring.rabbitmq.listener.simple.prefetch=1 # auto answer spring.rabbitmq.listener.simple.acknowledge-mode=auto # Turn on auto-retry spring.rabbitmq.listener.simple.retry.enabled=true # maximum number of retries spring.rabbitmq.listener.simple.retry.max-attempts=5 # maximum interval time spring.rabbitmq.listener.simple.retry.max-interval=20000ms # retry interval spring.rabbitmq.listener.simple.retry.initial-interval=3000ms # Multiplier Retry interval * Multiplier to get the next retry interval 3s 6s 12s 24s where 24s>20s go 20s spring.rabbitmq.listener.simple.retry.multiplier=2 # refuse to requeue spring.rabbitmq.listener.simple.default-requeue-rejected=false
3. Configure switches, queues and bindingKey
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean public DirectExchange directExchange(){ return new DirectExchange("DirectExchange-01",true,false); } @Bean public Queue directQueue(){ return QueueBuilder.durable("DirectQueue-01").build(); } @Bean public Binding binding(){ return BindingBuilder.bind(directQueue()).to(directExchange()).with("Direct-RoutingKey-01"); } }
4. Write the consumer code
The process of the consumer receiving the message:
- The consumer connects to the RabbitMQ Broker, establishes a connection (Connection), and opens a channel (Channel).
- The consumer requests the RabbitMQ Broker to consume the messages in the corresponding queue, and may set the corresponding callback function and do some preparatory work
- Waiting for the RabbitMQ Broker to respond and deliver the message in the corresponding queue, the consumer receives the message.
- The consumer acknowledges (acks) the received message.
- RabbitMQ deletes the corresponding acknowledged message from the queue.
- Close the channel.
- Close the connection.
package com.example.consumer; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class DirectConsumer { @RabbitHandler @RabbitListener(queues ="DirectQueue-01" ) public void process(Message message){ String test = new String(message.getBody()); System.out.println(test); } }
5. Write producer code
The process for the producer to send a message:
- The producer connects to RabbitMQ, establishes a TCP connection (Connection), and opens a channel (Channel)
- The producer declares an Exchange (exchange) and sets relevant properties, such as the type of the exchange, whether it is persistent, etc.
- The producer declares a queue and sets relevant properties, such as whether it is exclusive, whether it is persistent, whether it is automatically deleted, etc.
- The producer binds the exchange and the queue through bindingKey (binding Key)
- The producer sends a message to the RabbitMQ Broker, which contains routingKey (routing key), exchange and other information
- The corresponding exchange looks for a matching queue based on the received routingKey.
- If found, the message sent from the producer is stored in the corresponding queue.
- If not found, choose to discard or fall back to the producer according to the properties configured by the producer
- Close the channel.
- Close the connection.
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; @Component public class DirectProvider { @Resource private RabbitTemplate rabbitTemplate; public void send(){ Message message = new Message("Hello".getBytes()); rabbitTemplate.send("DirectExchange-01","Direct-RoutingKey-01",message); } }
6. Test class
package com.example; import com.example.provider.DirectProvider; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; @SpringBootTest class APPTests { @Autowired DirectProvider directProvider; @Test void test() throws InterruptedException { directProvider.send(); Thread.sleep(5000); } }