1, Environmental preparation
Introducing pom dependency
<!--rabbitmq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
Set application YML profile
spring: rabbitmq: host: 127.0.0.1 port: 7006 username: admin password: admin # Whether to trigger callback method # The value of NONE disables the release confirmation mode and is the default value # The CORRELATED value is that the callback method will be triggered after the message is successfully published to the exchange, as shown in example 1 # The SIMPLE value has been tested to have two effects. One effect is the same as the corelated value, which will trigger the callback method. The other is to use rabbitTemplate to call waitForConfirms or waitForConfirmsOrDie method after the message is successfully published, wait for the broker node to return the sending result, and determine the logic of the next step according to the returned result. Note that if waitForConfirmsOrDie method returns false, it will close the channel, Then the message cannot be sent to the broker; publisher-confirm-type: correlated
2, Simple mode: a consumer consumes information in a queue
1. Register
package com.rabbmq.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Simple mode * * @author He PanFu * @date 2021-09-16 12:33:25 */ @Configuration public class RabbitSimpleConfig { @Bean public Queue simpleQueueWx() { // Queue name, persistent or not return new Queue("simple_queue", true); } }
2. Add message to queue
@Autowired RabbitTemplate rabbitTemplate; @GetMapping("/simple") public String sendSimpleMessage() { for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("simple_queue", getStringObjectMap()); } return "ok"; } private Map<String, Object> getStringObjectMap() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "test message, hello!"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> map = new HashMap<>(); map.put("messageId", messageId); map.put("messageData", messageData); map.put("createTime", createTime); return map; }
3. Consumer News
package com.rabbitmq.receiver; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; /** * Simple mode: wechat subscription * * @author He PanFu * @date 2021-09-16 12:47:32 */ @Component @RabbitListener(queues = "simple_queue") public class RabbitSimpleWXListener { @RabbitHandler public void init(Map testMessage) { System.out.println("Simple mode (wechat):" + testMessage); } }
4. Display
remarks:
- @RabbitListener can be used directly on the method alone.
- @The RabbitListener can be marked on the class and should be used together with the @ RabbitHandler annotation
- @The RabbitListener label on the class indicates that when a message is received, it will be handed over to the method of @ RabbitHandler for processing. The specific method to use for processing depends on the parameter type converted by MessageConverter
3, Working mode (polling distribution): multiple consumers consume information in the same queue
- Introduction: start multiple consumers. When the producer sends the message to the queue, one message will only be received by one consumer. rabbit sends the message to the consumer evenly by polling by default. The consumer will receive the next message after processing a message.
- Round robin: message distribution is polling, that is, multiple consumers get messages from the queue and consume them one at a time. Polling distribution does not consider the speed of consumers consuming messages. For example, there are 100 messages in the queue, C1 and C2 consumers. No matter how fast they consume, the final result is always you and me, and everyone gets 25 messages.
1. Register
package com.cyun.demo.rabbitmq.provider; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; /** * Working mode * * @author He PanFu * @date 2021-09-16 12:33:25 */ @Component public class RabbitWorkManage { /** * Work queue * * @return queue */ @Bean public Queue workQueue() { return new Queue("work_queue", true); } }
2. Add message to queue
@GetMapping("/work") public String sendWorkMessage() { for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("work_queue", getStringObjectMap()); } return "ok"; }
3. Consumer News
Note: the formal environment is to start multiple consumer services. I will simplify the simulation directly here.
package com.cyun.demo.rabbitmq.consumer; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.concurrent.TimeUnit; /** * Working mode: queue listening * * @author He PanFu * @date 2021-09-16 12:47:32 */ @Slf4j @Component public class RabbitWorkListener { @RabbitListener(queues = "work_queue") public void messageListener1(Object msg, Channel channel, Message message) throws InterruptedException, IOException { TimeUnit.SECONDS.sleep(1); log.info("1 Message processing completed, message content:{}", msg); } @RabbitListener(queues = "work_queue") public void messageListener2(Object msg, Channel channel, Message message) throws IOException, InterruptedException { TimeUnit.SECONDS.sleep(2); log.info("2 Message processing completed, message content:{}", msg); } }
4. Display
4, Working mode (fair distribution): multiple consumers consume information in the same queue
- Fair dispatcher: the distribution of messages is fair, that is, messages are distributed according to the consumption ability of consumers. Those who can do more work. For example, there are 90 messages in the queue, C1 and C2 consumers. C1 consumes one message at a speed of 1s and C2 consumes one message at a speed of 2s. Then the final result is that C1 consumes 60 messages and C2 consumes 30 messages.
1. Modify application YML file
Enable manual ack, and only one message can be read at a time
spring: rabbitmq: host: 127.0.0.1 port: 7006 username: admin password: admin # Whether to trigger callback method # The value of NONE disables the release confirmation mode and is the default value # The callback method will trigger the value of red to the exchange after it is published successfully, for example, the value of red will be released to the exchange # The SIMPLE value has been tested to have two effects. One effect is the same as the corelated value, which will trigger the callback method. The other is to use rabbitTemplate to call waitForConfirms or waitForConfirmsOrDie method after the message is successfully published, wait for the broker node to return the sending result, and determine the logic of the next step according to the returned result. Note that if waitForConfirmsOrDie method returns false, it will close the channel, Then the message cannot be sent to the broker; publisher-confirm-type: correlated listener: simple: # Start manual confirmation acknowledge-mode: manual # The preprocessing mode is changed to read one message at a time, and the next message will not be delivered until the consumer has received the confirmation prefetch: 1
2. Transform consumer message code
package com.cyun.demo.rabbitmq.consumer; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.concurrent.TimeUnit; /** * Working mode: queue listening * * @author He PanFu * @date 2021-09-16 12:47:32 */ @Slf4j @Component public class RabbitWorkListener { @RabbitListener(queues = "work_queue") public void messageListener1(Object msg, Channel channel, Message message) throws InterruptedException, IOException { log.info("1 Start processing the message. Message content:{}", msg); // Fair distribution: manual confirmation messages channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // Return the unacknowledged message manually. When the request is true, rejoin the queue // channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); } @RabbitListener(queues = "work_queue") public void messageListener2(Object msg, Channel channel, Message message) throws IOException, InterruptedException { log.info("2 Start processing the message. Message content:{}", msg); TimeUnit.SECONDS.sleep(1); // Fair distribution: manual confirmation messages channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
3. Display
5, Publish / subscribe mode: multiple consumers consume the same information
1. Register
package com.cyun.demo.rabbitmq.provider; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Publish / subscribe mode * * @author He PanFu * @date 2022-04-16 17:04:55 */ @Configuration public class RabbitmqFanoutConfig { /** * Set up the switch: * <p> * durable:Whether to persist or not. The default is false. Persistence queue: it will be stored on the disk and still exist when the message agent is restarted. Temporary queue: the current connection is valid * exclusive:The default is also false, which can only be used by the currently created connection, and the queue will be deleted after the connection is closed. This reference takes precedence over durable * autoDelete:Whether to delete automatically. When no producer or consumer uses this queue, the queue will be deleted automatically. * * @return Publish / subscribe switch */ @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanout_exchange", true, false); } /** * Wechat queue * * @return queue */ @Bean public Queue fanoutQueueWx() { return new Queue("fanout_queue_wx", true); } /** * SMS queue * * @return queue */ @Bean public Queue fanoutQueueDx() { return new Queue("fanout_queue_dx", true); } /** * Wechat queue binding switch * * @return Binding object */ @Bean public Binding fanoutBindingWx() { return BindingBuilder.bind(fanoutQueueWx()).to(fanoutExchange()); } /** * SMS queue binding switch * * @return Binding object */ @Bean public Binding fanoutBindingDx() { return BindingBuilder.bind(fanoutQueueDx()).to(fanoutExchange()); } }
2. Add message to queue
@GetMapping("/fanout") public String sendFanoutMessage() { for (int i = 0; i < 10; i++) { //Send the message with the binding key: TestDirectRouting to TestDirectExchange rabbitTemplate.convertAndSend("fanout_exchange", "", getStringObjectMap(), new CorrelationData(UUID.randomUUID().toString())); } return "ok"; }
3. Consumer News
package com.cyun.demo.rabbitmq.consumer; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.concurrent.TimeUnit; /** * Publish / subscribe mode: queue listening * * @author He PanFu * @date 2021-09-16 12:47:32 */ @Slf4j @Component public class RabbitmqFanoutListener { @RabbitListener(queues = "fanout_queue_wx") public void messageListener1(Object msg, Channel channel, Message message, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws InterruptedException, IOException { log.info("Subscription mode (wechat), message processing starts, message content:{}", msg); // Manual confirmation message channel.basicAck(tag, false); } @RabbitListener(queues = "fanout_queue_dx") public void messageListener2(Object msg, Channel channel, Message message) throws IOException, InterruptedException { log.info("Subscription mode (SMS), message processing starts, message content:{}", msg); // Manual confirmation message channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
4. Display
6, Routing mode
1. Register
package com.cyun.demo.rabbitmq.provider; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; /** * @author He PanFu * @date 2022-04-16 18:29:03 */ @Configuration public class RabbitmqDirectConfig { /** * Set up the switch: * <p> * durable:Whether to persist or not. The default is false. Persistence queue: it will be stored on the disk and still exist when the message agent is restarted. Temporary queue: the current connection is valid * exclusive:The default is also false, which can only be used by the currently created connection, and the queue will be deleted after the connection is closed. This reference takes precedence over durable * autoDelete:Whether to delete automatically. When no producer or consumer uses this queue, the queue will be deleted automatically. * * @return Switch: */ @Bean public DirectExchange directExchange() { return new DirectExchange("direct_exchange", true, false); } /** * Wechat queue * * @return queue */ @Bean public Queue directQueueWx() { return new Queue("direct_queue_wx", true); } /** * SMS queue * * @return queue */ @Bean public Queue directQueueDx() { return new Queue("direct_queue_dx", true); } /** * Wechat queue binding switch * * @return binding */ @Bean public Binding directBindingWx() { return BindingBuilder.bind(directQueueWx()).to(directExchange()).with("wx"); } /** * SMS queue binding switch * * @return binding */ @Bean public Binding directBindingDx() { return BindingBuilder.bind(directQueueDx()).to(directExchange()).with("dx"); } }
2. Add message to queue
@GetMapping("/direct/{routingKey}") public String sendDirectMessage(@PathVariable("routingKey") String routingKey) { rabbitTemplate.convertAndSend("direct_exchange", routingKey, getStringObjectMap()); return "ok"; }
3. Consumer News
package com.cyun.demo.rabbitmq.consumer; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.io.IOException; /** * Routing mode * * @author He PanFu * @date 2022-04-16 18:32:19 */ @Slf4j @Component public class RabbitmqDirectListener { @RabbitListener(queues = "direct_queue_wx") public void messageListener1(Object msg, Channel channel, Message message, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws InterruptedException, IOException { log.info("Subscription mode (wechat), message processing starts, message content:{}", msg); // Manual confirmation message channel.basicAck(tag, false); } @RabbitListener(queues = "direct_queue_dx") public void messageListener2(Object msg, Channel channel, Message message) throws IOException, InterruptedException { log.info("Subscription mode (SMS), message processing starts, message content:{}", msg); // Manual confirmation message channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
4. Display
7, Theme mode
1. Register
package com.cyun.demo.rabbitmq.provider; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Theme mode * * @author He PanFu * @date 2022-04-18 09:00:43 */ @Configuration public class RabbitmqTopicConfig { /** * Set up the switch: * <p> * durable:Whether to persist or not. The default is false. Persistence queue: it will be stored on the disk and still exist when the message agent is restarted. Temporary queue: the current connection is valid * exclusive:The default is also false, which can only be used by the currently created connection, and the queue will be deleted after the connection is closed. This reference takes precedence over durable * autoDelete:Whether to delete automatically. When no producer or consumer uses this queue, the queue will be deleted automatically. * * @return Switch */ @Bean public TopicExchange topicExchange() { return new TopicExchange("topic_exchange", true, false); } /** * Set wechat queue * * @return queue */ @Bean public Queue topicQueueWx() { return new Queue("topic_queue_wx", true); } /** * Set SMS queue * * @return queue */ @Bean public Queue topicQueueDx() { return new Queue("topic_queue_dx", true); } /** * Wechat queue binding switch * "*" Means any word * "#" Represents 0 or 1 words * * @return binding */ @Bean public Binding topicBindingWx() { return BindingBuilder.bind(topicQueueWx()).to(topicExchange()).with("#.wx.#"); } /** * Exchange for SMS queue binding * "*" Means any word * "#" Represents 0 or 1 words * * @return binding */ @Bean public Binding topicBindingDx() { return BindingBuilder.bind(topicQueueDx()).to(topicExchange()).with("dx.*"); } }
2. Add message to queue
@GetMapping("/topic") public String sendTopicMessage() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "test message, hello!"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> map = new HashMap<>(); map.put("messageId", messageId); map.put("messageData", messageData); map.put("createTime", createTime); //Send the message with the binding key: TestDirectRouting to TestDirectExchange map.put("key", "wx"); rabbitTemplate.convertAndSend("topic_exchange", "wx", map); map.put("key", "wx.www"); rabbitTemplate.convertAndSend("topic_exchange", "wx.www", map); map.put("key", "wx.www.www"); rabbitTemplate.convertAndSend("topic_exchange", "wx.www.www", map); map.put("key", "dx"); rabbitTemplate.convertAndSend("topic_exchange", "dx", map); map.put("key", "dx.www"); rabbitTemplate.convertAndSend("topic_exchange", "dx.www", map); map.put("key", "dx.www.www"); rabbitTemplate.convertAndSend("topic_exchange", "dx.www.www", map); return "ok"; }
3. Consumer News
package com.cyun.demo.rabbitmq.consumer; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.Map; /** * Theme mode * * @author He PanFu * @date 2022-04-18 09:18:47 */ @Slf4j @Component public class RabbitmqTopicListener { /** * Wechat queue monitoring * * @param testMessage data */ @RabbitListener(queues = "topic_queue_wx") public void messageListener2(Map testMessage, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException { log.info("Theme mode (wechat), key: {},testMessage: {}", testMessage.get("key"),testMessage); channel.basicAck(tag, false); } /** * SMS queue monitoring * * @param testMessage data */ @RabbitListener(queues = "topic_queue_dx") public void messageListener1(Map testMessage, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException { log.info("Theme mode (SMS), key: {},testMessage: {}", testMessage.get("key"),testMessage); channel.basicAck(tag, false); } }
4. Display
8, Parameter mode
1. Register
package com.cyun.demo.rabbitmq.provider; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.HeadersExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * Parameter mode * * @author He PanFu * @date 2022-04-18 09:53:43 */ @Configuration public class RabbitmqHeadersConfig { /** * Set up the switch: * <p> * durable:Whether to persist or not. The default is false. Persistence queue: it will be stored on the disk and still exist when the message agent is restarted. Temporary queue: the current connection is valid * exclusive:The default is also false, which can only be used by the currently created connection, and the queue will be deleted after the connection is closed. This reference takes precedence over durable * autoDelete:Whether to delete automatically. When no producer or consumer uses this queue, the queue will be deleted automatically. * * @return Switch */ @Bean public HeadersExchange headersExchange() { return new HeadersExchange("headers_exchange", true, false); } /** * Set wechat queue * * @return queue */ @Bean public Queue headersQueueWx() { return new Queue("headers_queue_wx", true); } /** * Set SMS queue * * @return queue */ @Bean public Queue headersQueueDx() { return new Queue("headers_queue_dx", true); } /** * Set mailbox queue * * @return queue */ @Bean public Queue headersQueueYx() { return new Queue("headers_queue_yx", true); } /** * The wechat queue is bound to the switch, and the request header contains two * * @return binding */ @Bean public Binding headersBindingWx() { Map<String, Object> map = new HashMap<>(); map.put("One", "A"); map.put("Two", "B"); return BindingBuilder.bind(headersQueueWx()).to(headersExchange()).whereAll(map).match(); } /** * Bind the switch to the short message queue, and the request header contains any one * * @return binding */ @Bean public Binding headersBindingDx() { Map<String, Object> map = new HashMap<>(); map.put("One", "A"); map.put("Two", "B"); return BindingBuilder.bind(headersQueueDx()).to(headersExchange()).whereAny(map).match(); } /** * Mailbox queue binding switch * * @return binding */ @Bean public Binding headersBindingYx() { return BindingBuilder.bind(headersQueueYx()).to(headersExchange()).where("Three").matches("C"); } }
2. Add message to queue
@GetMapping("/topic") public String sendTopicMessage() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "test message, hello!"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> map = new HashMap<>(); map.put("messageId", messageId); map.put("messageData", messageData); map.put("createTime", createTime); //Send the message with the binding key: TestDirectRouting to TestDirectExchange map.put("key", "wx"); rabbitTemplate.convertAndSend("topic_exchange", "wx", map); map.put("key", "wx.www"); rabbitTemplate.convertAndSend("topic_exchange", "wx.www", map); map.put("key", "wx.www.www"); rabbitTemplate.convertAndSend("topic_exchange", "wx.www.www", map); map.put("key", "dx"); rabbitTemplate.convertAndSend("topic_exchange", "dx", map); map.put("key", "dx.www"); rabbitTemplate.convertAndSend("topic_exchange", "dx.www", map); map.put("key", "dx.www.www"); rabbitTemplate.convertAndSend("topic_exchange", "dx.www.www", map); return "ok"; }
3. Consumer News
package com.cyun.demo.rabbitmq.consumer; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.Map; /** * Parameter mode * * @author He PanFu * @date 2022-04-18 10:27:15 */ @Slf4j @Component public class RabbitmqHeadersListener { /** * Wechat queue monitoring * * @param testMessage data */ @RabbitListener(queues = "headers_queue_wx") public void messageListener1(Message testMessage, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException { log.info("Parameter mode (wechat), testMessage: {}", testMessage); channel.basicAck(tag, false); } /** * SMS queue monitoring * * @param testMessage data */ @RabbitListener(queues = "headers_queue_dx") public void messageListener2(Message testMessage, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException { log.info("Parameter mode (SMS), testMessage: {}", testMessage); channel.basicAck(tag, false); } /** * Mailbox queue listening * * @param testMessage data */ @RabbitListener(queues = "headers_queue_yx") public void messageListener3(Message testMessage, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException { log.info("Parameter mode (mailbox), testMessage: {}", testMessage); channel.basicAck(tag, false); } }
4. Display
9, Dead letter queue
Introduction: "dead letter" is a message mechanism in RabbitMQ. When you consume a message, if the message in the queue occurs the following conditions, the message will become "dead letter".
- If the message is negative, use channel Basicnack or channel Basicreject, and the request property is set to false.
- The lifetime of the message in the queue exceeds the set TTL time.
- The number of messages in the message queue has exceeded the maximum queue length.
The "dead letter" message will be specially processed by RabbitMQ. If the dead letter queue information is configured, the message will be thrown into the dead letter queue. If it is not configured, the message will be discarded.
Note: if you modify the configuration information of the queue, you need to delete the original queue and re register.
1. Register
package com.cyun.demo.rabbitmq.provider; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * Publish / subscribe mode * * @author He PanFu * @date 2022-04-16 17:04:55 */ @Configuration public class RabbitmqFanoutTtlConfig { /** * Set up the switch: * <p> * durable:Whether to persist or not. The default is false. Persistence queue: it will be stored on the disk and still exist when the message agent is restarted. Temporary queue: the current connection is valid * exclusive:The default is also false, which can only be used by the currently created connection, and the queue will be deleted after the connection is closed. This reference takes precedence over durable * autoDelete:Whether to delete automatically. When no producer or consumer uses this queue, the queue will be deleted automatically. * * @return Publish / subscribe switch */ @Bean public FanoutExchange fanoutTtlExchange() { return new FanoutExchange("fanout_ttl_exchange", true, false); } /** * Wechat queue * * @return queue */ @Bean public Queue fanoutTtlQueueWx() { Map<String, Object> map = new HashMap<>(); // Expiration time map.put("x-message-ttl", 1000); // Dead letter queue. After the message expires, it will be placed in the dead letter switch map.put("x-dead-letter-exchange", "direct_exchange"); map.put("x-dead-letter-routing-key", "wx"); return new Queue("fanout_ttl_queue_wx", true, false, false, map); } /** * Wechat queue binding switch * * @return Binding object */ @Bean public Binding fanoutTtlBindingWx() { return BindingBuilder.bind(fanoutTtlQueueWx()).to(fanoutTtlExchange()); } }
2. Add message to queue
@GetMapping("/fanout/ttl") public String sendFanoutTtlMessage() { rabbitTemplate.convertAndSend("fanout_ttl_exchange", "", getStringObjectMap()); return "ok"; }
3. Consumer News
package com.cyun.demo.rabbitmq.consumer; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.concurrent.TimeUnit; /** * Publish / subscribe mode: queue listening * * @author He PanFu * @date 2021-09-16 12:47:32 */ @Slf4j @Component public class RabbitmqFanoutTtlListener { @RabbitListener(queues = "fanout_ttl_queue_wx") public void messageListener1(Object msg, Channel channel, Message message, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws InterruptedException, IOException { log.info("subscription model TTL(Wechat), message processing starts, message content:{}", msg); // Trigger case 1: the message is negative and confirmed. multiple: batch processing True: ack all messages less than deliveryTag at one time. Request: rejoin the queue when it is true // channel.basicNack(tag,false,false); // Trigger case 2: timeout TimeUnit.SECONDS.sleep(15); channel.basicAck(tag, false); } }
4. Display
5. Expand: set the expiration time information when adding messages to the queue
Note: two expiration times, whichever is the smallest.
@GetMapping("/fanout/ttl/5000") public String sendFanoutTTLMessageMessage() { // Set expiration time MessagePostProcessor processor = message -> { message.getMessageProperties().setExpiration("5000"); message.getMessageProperties().setContentEncoding("UTF-8"); return message; }; rabbitTemplate.convertAndSend("fanout_ttl_exchange", "", getStringObjectMap(), processor); return "ok"; }
10, Summary
The specific steps are: register (register switch - > register queue - > bind switch and queue) - > Listen - > join queue