Table of contents
8.1. Release confirm springboot version
8.1.1. Confirmation Mechanism Scheme
8.1.2. Code Architecture Diagram
8.1.4. Add configuration class
8.3.1. Code Architecture Diagram
8.3.2. Modify configuration class
In the production environment, due to some unknown reasons, the rabbitmq restarts, and the producer message delivery fails during the RabbitMQ restart.
Resulting in message loss requiring manual processing and recovery. So, we started to think about how to deliver RabbitMQ messages reliably?
Especially in such an extreme situation, when the RabbitMQ cluster is unavailable, how to deal with undeliverable messages
8.1. Release confirm springboot version
8.1.1. Confirmation Mechanism Scheme
8.1.2. Code Architecture Diagram
8.1.3. Configuration files
In the configuration file, you need to add
spring.rabbitmq.publisher-confirm-type=correlated
⚫ NONE
Disable publish confirmation mode, is the default
⚫ CORRELATED
The callback method will be triggered after the message is successfully published to the exchange
⚫ SIMPLE
After testing, there are two effects, one of which will trigger the callback method like the CORRELATED value,
Second, use the rabbitTemplate to call the waitForConfirms or waitForConfirmsOrDie method after the message is successfully published
Wait for the broker node to return the sending result, and determine the logic of the next step according to the returned result. The points to note are
If the waitForConfirmsOrDie method returns false, the channel will be closed, and then messages cannot be sent to the broker
8.1.4. Add configuration class
package com.xingchen.mq.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; /** * @author xingchen * @version V1.0 * * Publish confirmation back-and-forth solution Add cache test * @Package com.xingchen.mq.config * @date 2022/12/7 11:52 */ @Configuration public class ConfirmConfig { //switch private static final String CONFIRM_EXCHANGE_NAME = "confirm-exchange"; //queue private static final String CONFIRM_QUEUE_NAME = "confirm-queue"; //ROUTING_KEY private static final String CONFIRM_ROUTING_KEY = "confirm-key"; /** * backup switch */ private static final String BACKUP_EXCHANGE_NAME = "backup-exchange"; /** * backup queue */ private static final String BACKUP_QUEUE_NAME = "backup-queue"; /** * alarm queue */ private static final String WARNING_QUEUE_NAME = "warning-queue"; @Bean("confirmExchange") public DirectExchange directExchange() { /**Confirm that the switch configures the backup switch to ensure that messages are forwarded to the backup switch after a failure*/ return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true) .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME).build(); } @Bean("backupExchange") public FanoutExchange backupExchange() { return new FanoutExchange(BACKUP_EXCHANGE_NAME); } @Bean("confirmQueue") public Queue confirmQueue() { HashMap<String, Object> map = new HashMap<>(8); return new Queue(CONFIRM_QUEUE_NAME, false, false, false, map); } @Bean("backupQueue") public Queue backupQueue() { HashMap<String, Object> map = new HashMap<>(8); return new Queue(BACKUP_QUEUE_NAME, false, false, false, map); } @Bean("warningQueue") public Queue warningQueue() { HashMap<String, Object> map = new HashMap<>(8); return new Queue(WARNING_QUEUE_NAME, false, false, false, map); } @Bean public Binding queueConfirmBindingExchange(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY).noargs(); } @Bean public Binding backupConfirmBindingExchange(@Qualifier("backupQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } @Bean public Binding warningConfirmBindingExchange(@Qualifier("warningQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } }
8.1.5. Message producers
package com.xingchen.mq.controller; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** * @author xingchen * @version V1.0 * @Package com.xingchen.mq.config * @date 2022/12/7 11:52 */ @Slf4j @RestController @RequestMapping("/confirm") public class ConfirmController { @Resource private RabbitTemplate rabbitTemplate; @GetMapping("/sendConfirm/{msg}") public void sendConfirmMessage(@PathVariable("msg") String msg) { /**Declare the formal parameters of the callback*/ CorrelationData correlationData = new CorrelationData("1"); rabbitTemplate.convertAndSend("confirm-exchange", "confirm-key", msg, correlationData); log.info("send message as:" + msg); } }
8.1.6. Callback interface
package com.xingchen.mq.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; /** * @author xingchen * @version V1.0 * @Package com.xingchen.mq.config * @date 2022/12/7 11:52 */ @Component @Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Resource private RabbitTemplate rabbitTemplate; /** * This annotation will inject a property after other annotations are executed, and this class must be injected into the inner class of rabbitTemplate * The inner class is this ConfirmCallback */ @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); /**At the same time, the queue fallback interface needs to be injected*/ rabbitTemplate.setReturnCallback(this); } /** * @param correlationData Contains the ID and other data information of the message. This needs to be created at the sender, otherwise there is no * @param ack Returns a switch confirmation status true for confirmation false for unconfirmed * @param cause A reason for unacknowledged, if ack is true, this value is null */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : ""; if (ack) { log.info("message sent successfully, id Yes{} ", id); } else { log.info("Message sending failed due to{} id for{}", cause, id); } } /** * In the process of message delivery, if the switch encounters an unroutable situation, it will return the message to the producer * * @param message information * @param replyCode reply status code * @param replyText return reason * @param exchange switch * @param routingKey RoutingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error("information{},being switched{}fallback, routing Key Yes{},The reason for the return is{}", new String(message.getBody()), exchange , routingKey, replyText); } }
8.1.7. Message consumers
package com.xingchen.mq.consumer; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @author xingchen * @version V1.0 * @Package com.xingchen.mq.config * @date 2022/12/7 11:52 */ @Component @Slf4j public class ConfirmConsumer { @RabbitListener(queues = {"confirm-queue"}) public void receiveMsg(Message message) { log.info("The received message is: " + new String(message.getBody())); } }
8.1.8. Results analysis
It can be seen that two messages are sent, the RoutingKey of the first message is "key1", and the RoutingKey of the second message is
"key2", both messages were successfully received by the exchange, and the confirmation callback from the exchange was also received, but the consumer only received one message, because
The RoutingKey of the second message is inconsistent with the BindingKey of the queue, and no other queue can receive this message, so the second message
The message is simply discarded.
8.2. Fallback messages
8.2.1. Mandatory parameters
When only the producer confirmation mechanism is enabled, the switch will directly send a confirmation message to the message producer after receiving the message, such as
If the message is found to be unroutable, the message will be discarded directly. At this time, the producer does not know that the message is discarded. so how
Let me figure out what to do with messages that cannot be routed? At least let me know, so I can handle it myself. By setting the mandatory parameter
Data can return messages to producers when the destination is unreachable during message delivery.
8.2.2. Message producer code
package com.xingchen.mq.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; /** * @author xingchen * @version V1.0 * @Package com.xingchen.mq.config * @date 2022/12/7 11:52 */ @Component @Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Resource private RabbitTemplate rabbitTemplate; /** * This annotation will inject a property after other annotations are executed, and this class must be injected into the inner class of rabbitTemplate * The inner class is this ConfirmCallback */ @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); /**At the same time, the queue fallback interface needs to be injected*/ rabbitTemplate.setReturnCallback(this); } /** * @param correlationData Contains the ID and other data information of the message. This needs to be created at the sender, otherwise there is no * @param ack Returns a switch confirmation status true for confirmation false for unconfirmed * @param cause A reason for unacknowledged, if ack is true, this value is null */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : ""; if (ack) { log.info("message sent successfully, id Yes{} ", id); } else { log.info("Message sending failed due to{} id for{}", cause, id); } } /** * In the process of message delivery, if the switch encounters an unroutable situation, it will return the message to the producer * * @param message information * @param replyCode reply status code * @param replyText return reason * @param exchange switch * @param routingKey RoutingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error("information{},being switched{}fallback, routing Key Yes{},The reason for the return is{}", new String(message.getBody()), exchange , routingKey, replyText); } }
8.2.3. Callback interface
package com.xingchen.mq.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; /** * @author xingchen * @version V1.0 * @Package com.xingchen.mq.config * @date 2022/12/7 11:52 */ @Component @Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Resource private RabbitTemplate rabbitTemplate; /** * This annotation will inject a property after other annotations are executed, and this class must be injected into the inner class of rabbitTemplate * The inner class is this ConfirmCallback */ @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); /**At the same time, the queue fallback interface needs to be injected*/ rabbitTemplate.setReturnCallback(this); } /** * @param correlationData Contains the ID and other data information of the message. This needs to be created at the sender, otherwise there is no * @param ack Returns a switch confirmation status true for confirmation false for unconfirmed * @param cause A reason for unacknowledged, if ack is true, this value is null */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : ""; if (ack) { log.info("message sent successfully, id Yes{} ", id); } else { log.info("Message sending failed due to{} id for{}", cause, id); } } /** * In the process of message delivery, if the switch encounters an unroutable situation, it will return the message to the producer * * @param message information * @param replyCode reply status code * @param replyText return reason * @param exchange switch * @param routingKey RoutingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error("information{},being switched{}fallback, routing Key Yes{},The reason for the return is{}", new String(message.getBody()), exchange , routingKey, replyText); } }
8.2.4. Results Analysis
8.3. Backup switch
With the mandatory parameter and the fallback message, we gain awareness of undeliverable messages and have the opportunity to
Unable to be found and processed when delivered. But sometimes, we don't know how to deal with these messages that cannot be routed, at most make a log, and then
Then trigger an alarm, and then manually handle it. Logging these unroutable messages is inelegant, especially when the producer
When the service has multiple machines, manually copying logs will be more troublesome and error-prone. And setting the mandatory parameter will increase the
To add complexity to the producer, logic needs to be added to handle these returned messages. If you don't want to lose messages and don't want to increase the producer's
Complexity, how to do it? In the previous article on setting up the dead letter queue, we mentioned that a dead letter switch can be set up for the queue to store those
Processing failed messages, but these non-routable messages have no chance to enter the queue at all, so the dead letter queue cannot be used to save messages.
In RabbitMQ, there is a backup switch mechanism that can deal with this problem well. What is a backup switch? backup
The switch can be understood as the "spare tire" of the switch in RabbitMQ. When we declare a corresponding backup switch for a certain switch, it is
It is to create a backup tire for it. When the switch receives a non-routable message, it will forward this message to the backup switch.
The backup switch is used for forwarding and processing. Usually the type of backup switch is Fanout, so that all messages can be delivered to the
In the queue, and then we bind a queue under the backup switch, so that all the messages that cannot be routed by the original switch will enter
into this queue. Of course, we can also build an alarm queue and use independent consumers to monitor and alarm.
8.3.1. Code Architecture Diagram
8.3.2. Modify configuration class
package com.xingchen.mq.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; /** * @author xingchen * @version V1.0 * * Publish confirmation back-and-forth solution Add cache test * @Package com.xingchen.mq.config * @date 2022/12/7 11:52 */ @Configuration public class ConfirmConfig { //switch private static final String CONFIRM_EXCHANGE_NAME = "confirm-exchange"; //queue private static final String CONFIRM_QUEUE_NAME = "confirm-queue"; //ROUTING_KEY private static final String CONFIRM_ROUTING_KEY = "confirm-key"; /** * backup switch */ private static final String BACKUP_EXCHANGE_NAME = "backup-exchange"; /** * backup queue */ private static final String BACKUP_QUEUE_NAME = "backup-queue"; /** * alarm queue */ private static final String WARNING_QUEUE_NAME = "warning-queue"; @Bean("confirmExchange") public DirectExchange directExchange() { /**Confirm that the switch configures the backup switch to ensure that messages are forwarded to the backup switch after a failure*/ return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true) .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME).build(); } @Bean("backupExchange") public FanoutExchange backupExchange() { return new FanoutExchange(BACKUP_EXCHANGE_NAME); } @Bean("confirmQueue") public Queue confirmQueue() { HashMap<String, Object> map = new HashMap<>(8); return new Queue(CONFIRM_QUEUE_NAME, false, false, false, map); } @Bean("backupQueue") public Queue backupQueue() { HashMap<String, Object> map = new HashMap<>(8); return new Queue(BACKUP_QUEUE_NAME, false, false, false, map); } @Bean("warningQueue") public Queue warningQueue() { HashMap<String, Object> map = new HashMap<>(8); return new Queue(WARNING_QUEUE_NAME, false, false, false, map); } @Bean public Binding queueConfirmBindingExchange(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY).noargs(); } @Bean public Binding backupConfirmBindingExchange(@Qualifier("backupQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } @Bean public Binding warningConfirmBindingExchange(@Qualifier("warningQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } }
8.3.3. Alert consumers
package com.xingchen.mq.consumer; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @author xingchen * @version V1.0 * @Package com.xingchen.mq.config * @date 2022/12/7 11:52 */ @Component @Slf4j public class WarningConsumer { @RabbitListener(queues = {"warning-queue"}) public void receiveWarningMsg(Message message) { log.warn("A non-routable message appears:", message); } }
8.3.4. Test Considerations
When restarting the project, we need to delete the original confirm.exchange because we have modified its binding properties, otherwise the following error will be reported:
8.3.5. Results Analysis
When the mandatory parameter and the backup switch can be used together, if both are enabled at the same time, where will the message go? who takes priority
The priority is high, and the above results show that the answer is that the priority of the backup switch is high.