Release Confirmation of RabbitMQ

Table of contents

8.1. Release confirm springboot version

8.1.1. Confirmation Mechanism Scheme

8.1.2. Code Architecture Diagram

8.1.3. Configuration files

8.1.4. Add configuration class

8.1.5. Message producers

8.1.6. Callback interface

8.1.7. Message consumers

8.1.8. Results Analysis

8.2. Fallback messages

8.2.1. Mandatory parameters

8.2.2. Message producer code

8.2.3. Callback interface

8.2.4. Results Analysis

8.3. Backup switch

8.3.1. Code Architecture Diagram

8.3.2. Modify configuration class

8.3.3. Alert consumers

8.3.4. Test Considerations

8.3.5. Results Analysis

In the production environment, due to some unknown reasons, the rabbitmq restarts, and the producer message delivery fails during the RabbitMQ restart.

Resulting in message loss requiring manual processing and recovery. So, we started to think about how to deliver RabbitMQ messages reliably?

Especially in such an extreme situation, when the RabbitMQ cluster is unavailable, how to deal with undeliverable messages

8.1. Release confirm springboot version

8.1.1. Confirmation Mechanism Scheme

8.1.2. Code Architecture Diagram

8.1.3. Configuration files

In the configuration file, you need to add

spring.rabbitmq.publisher-confirm-type=correlated

⚫ NONE

Disable publish confirmation mode, is the default

⚫ CORRELATED

The callback method will be triggered after the message is successfully published to the exchange

⚫ SIMPLE

After testing, there are two effects, one of which will trigger the callback method like the CORRELATED value,

Second, use the rabbitTemplate to call the waitForConfirms or waitForConfirmsOrDie method after the message is successfully published

Wait for the broker node to return the sending result, and determine the logic of the next step according to the returned result. The points to note are

If the waitForConfirmsOrDie method returns false, the channel will be closed, and then messages cannot be sent to the broker

8.1.4. Add configuration class

package com.xingchen.mq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

/**
 * @author xingchen
 * @version V1.0
 *  * Publish confirmation back-and-forth solution Add cache test
 * @Package com.xingchen.mq.config
 * @date 2022/12/7 11:52
 */
@Configuration
public class ConfirmConfig {

    //switch
    private static final String CONFIRM_EXCHANGE_NAME = "confirm-exchange";
    //queue

    private static final String CONFIRM_QUEUE_NAME = "confirm-queue";
    //ROUTING_KEY

    private static final String CONFIRM_ROUTING_KEY = "confirm-key";

    /**
     * backup switch
     */
    private static final String BACKUP_EXCHANGE_NAME = "backup-exchange";
    /**
     * backup queue
     */
    private static final String BACKUP_QUEUE_NAME = "backup-queue";
    /**
     * alarm queue
     */
    private static final String WARNING_QUEUE_NAME = "warning-queue";

    @Bean("confirmExchange")
    public DirectExchange directExchange() {
        /**Confirm that the switch configures the backup switch to ensure that messages are forwarded to the backup switch after a failure*/
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)
                .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME).build();
    }

    @Bean("backupExchange")
    public FanoutExchange backupExchange() {
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }

    @Bean("confirmQueue")
    public Queue confirmQueue() {
        HashMap<String, Object> map = new HashMap<>(8);
        return new Queue(CONFIRM_QUEUE_NAME, false, false, false, map);
    }

    @Bean("backupQueue")
    public Queue backupQueue() {
        HashMap<String, Object> map = new HashMap<>(8);
        return new Queue(BACKUP_QUEUE_NAME, false, false, false, map);
    }

    @Bean("warningQueue")
    public Queue warningQueue() {
        HashMap<String, Object> map = new HashMap<>(8);
        return new Queue(WARNING_QUEUE_NAME, false, false, false, map);
    }

    @Bean
    public Binding queueConfirmBindingExchange(@Qualifier("confirmQueue") Queue queue,
                                               @Qualifier("confirmExchange") Exchange exchange) {

        return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY).noargs();
    }

    @Bean
    public Binding backupConfirmBindingExchange(@Qualifier("backupQueue") Queue queue,
                                                @Qualifier("backupExchange") FanoutExchange exchange) {

        return BindingBuilder.bind(queue).to(exchange);
    }

    @Bean
    public Binding warningConfirmBindingExchange(@Qualifier("warningQueue") Queue queue,
                                                 @Qualifier("backupExchange") FanoutExchange exchange) {

        return BindingBuilder.bind(queue).to(exchange);
    }
}

8.1.5. Message producers

package com.xingchen.mq.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * @author xingchen
 * @version V1.0
 * @Package com.xingchen.mq.config
 * @date 2022/12/7 11:52
 */
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ConfirmController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendConfirm/{msg}")
    public void sendConfirmMessage(@PathVariable("msg") String msg) {
        /**Declare the formal parameters of the callback*/

        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend("confirm-exchange", "confirm-key", msg, correlationData);
        log.info("send message as:" + msg);
    }
}

8.1.6. Callback interface

package com.xingchen.mq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

/**
 * @author xingchen
 * @version V1.0
 * @Package com.xingchen.mq.config
 * @date 2022/12/7 11:52
 */
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * This annotation will inject a property after other annotations are executed, and this class must be injected into the inner class of rabbitTemplate
     * The inner class is this ConfirmCallback
     */
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        /**At the same time, the queue fallback interface needs to be injected*/
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * @param correlationData Contains the ID and other data information of the message. This needs to be created at the sender, otherwise there is no
     * @param ack             Returns a switch confirmation status true for confirmation false for unconfirmed
     * @param cause           A reason for unacknowledged, if ack is true, this value is null
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("message sent successfully, id Yes{} ", id);
        } else {
            log.info("Message sending failed due to{} id for{}", cause, id);
        }
    }
    /**
     * In the process of message delivery, if the switch encounters an unroutable situation, it will return the message to the producer
     *
     * @param message    information
     * @param replyCode  reply status code
     * @param replyText  return reason
     * @param exchange   switch
     * @param routingKey RoutingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("information{},being switched{}fallback, routing Key Yes{},The reason for the return is{}", new String(message.getBody()), exchange
                , routingKey, replyText);
    }
}

8.1.7. Message consumers

package com.xingchen.mq.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author xingchen
 * @version V1.0
 * @Package com.xingchen.mq.config
 * @date 2022/12/7 11:52
 */
@Component
@Slf4j
public class ConfirmConsumer {
    @RabbitListener(queues = {"confirm-queue"})
    public void receiveMsg(Message message) {
        log.info("The received message is: " + new String(message.getBody()));
    }
}

8.1.8. Results analysis

It can be seen that two messages are sent, the RoutingKey of the first message is "key1", and the RoutingKey of the second message is

"key2", both messages were successfully received by the exchange, and the confirmation callback from the exchange was also received, but the consumer only received one message, because

The RoutingKey of the second message is inconsistent with the BindingKey of the queue, and no other queue can receive this message, so the second message

The message is simply discarded.

8.2. Fallback messages

8.2.1. Mandatory parameters

When only the producer confirmation mechanism is enabled, the switch will directly send a confirmation message to the message producer after receiving the message, such as

If the message is found to be unroutable, the message will be discarded directly. At this time, the producer does not know that the message is discarded. so how

Let me figure out what to do with messages that cannot be routed? At least let me know, so I can handle it myself. By setting the mandatory parameter

Data can return messages to producers when the destination is unreachable during message delivery.

8.2.2. Message producer code

package com.xingchen.mq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

/**
 * @author xingchen
 * @version V1.0
 * @Package com.xingchen.mq.config
 * @date 2022/12/7 11:52
 */
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * This annotation will inject a property after other annotations are executed, and this class must be injected into the inner class of rabbitTemplate
     * The inner class is this ConfirmCallback
     */
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        /**At the same time, the queue fallback interface needs to be injected*/
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * @param correlationData Contains the ID and other data information of the message. This needs to be created at the sender, otherwise there is no
     * @param ack             Returns a switch confirmation status true for confirmation false for unconfirmed
     * @param cause           A reason for unacknowledged, if ack is true, this value is null
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("message sent successfully, id Yes{} ", id);
        } else {
            log.info("Message sending failed due to{} id for{}", cause, id);
        }
    }

    /**
     * In the process of message delivery, if the switch encounters an unroutable situation, it will return the message to the producer
     *
     * @param message    information
     * @param replyCode  reply status code
     * @param replyText  return reason
     * @param exchange   switch
     * @param routingKey RoutingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("information{},being switched{}fallback, routing Key Yes{},The reason for the return is{}", new String(message.getBody()), exchange
                , routingKey, replyText);
    }
}

8.2.3. Callback interface

package com.xingchen.mq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

/**
 * @author xingchen
 * @version V1.0
 * @Package com.xingchen.mq.config
 * @date 2022/12/7 11:52
 */
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * This annotation will inject a property after other annotations are executed, and this class must be injected into the inner class of rabbitTemplate
     * The inner class is this ConfirmCallback
     */
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        /**At the same time, the queue fallback interface needs to be injected*/
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * @param correlationData Contains the ID and other data information of the message. This needs to be created at the sender, otherwise there is no
     * @param ack             Returns a switch confirmation status true for confirmation false for unconfirmed
     * @param cause           A reason for unacknowledged, if ack is true, this value is null
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("message sent successfully, id Yes{} ", id);
        } else {
            log.info("Message sending failed due to{} id for{}", cause, id);
        }
    }

    /**
     * In the process of message delivery, if the switch encounters an unroutable situation, it will return the message to the producer
     *
     * @param message    information
     * @param replyCode  reply status code
     * @param replyText  return reason
     * @param exchange   switch
     * @param routingKey RoutingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("information{},being switched{}fallback, routing Key Yes{},The reason for the return is{}", new String(message.getBody()), exchange
                , routingKey, replyText);
    }
}

8.2.4. Results Analysis

8.3. Backup switch

With the mandatory parameter and the fallback message, we gain awareness of undeliverable messages and have the opportunity to

Unable to be found and processed when delivered. But sometimes, we don't know how to deal with these messages that cannot be routed, at most make a log, and then

Then trigger an alarm, and then manually handle it. Logging these unroutable messages is inelegant, especially when the producer

When the service has multiple machines, manually copying logs will be more troublesome and error-prone. And setting the mandatory parameter will increase the

To add complexity to the producer, logic needs to be added to handle these returned messages. If you don't want to lose messages and don't want to increase the producer's

Complexity, how to do it? In the previous article on setting up the dead letter queue, we mentioned that a dead letter switch can be set up for the queue to store those

Processing failed messages, but these non-routable messages have no chance to enter the queue at all, so the dead letter queue cannot be used to save messages.

In RabbitMQ, there is a backup switch mechanism that can deal with this problem well. What is a backup switch? backup

The switch can be understood as the "spare tire" of the switch in RabbitMQ. When we declare a corresponding backup switch for a certain switch, it is

It is to create a backup tire for it. When the switch receives a non-routable message, it will forward this message to the backup switch.

The backup switch is used for forwarding and processing. Usually the type of backup switch is Fanout, so that all messages can be delivered to the

In the queue, and then we bind a queue under the backup switch, so that all the messages that cannot be routed by the original switch will enter

into this queue. Of course, we can also build an alarm queue and use independent consumers to monitor and alarm.

8.3.1. Code Architecture Diagram

8.3.2. Modify configuration class

package com.xingchen.mq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

/**
 * @author xingchen
 * @version V1.0
 *  * Publish confirmation back-and-forth solution Add cache test
 * @Package com.xingchen.mq.config
 * @date 2022/12/7 11:52
 */
@Configuration
public class ConfirmConfig {

    //switch
    private static final String CONFIRM_EXCHANGE_NAME = "confirm-exchange";
    //queue

    private static final String CONFIRM_QUEUE_NAME = "confirm-queue";
    //ROUTING_KEY

    private static final String CONFIRM_ROUTING_KEY = "confirm-key";

    /**
     * backup switch
     */
    private static final String BACKUP_EXCHANGE_NAME = "backup-exchange";
    /**
     * backup queue
     */
    private static final String BACKUP_QUEUE_NAME = "backup-queue";
    /**
     * alarm queue
     */
    private static final String WARNING_QUEUE_NAME = "warning-queue";

    @Bean("confirmExchange")
    public DirectExchange directExchange() {
        /**Confirm that the switch configures the backup switch to ensure that messages are forwarded to the backup switch after a failure*/
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)
                .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME).build();
    }

    @Bean("backupExchange")
    public FanoutExchange backupExchange() {
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }

    @Bean("confirmQueue")
    public Queue confirmQueue() {
        HashMap<String, Object> map = new HashMap<>(8);
        return new Queue(CONFIRM_QUEUE_NAME, false, false, false, map);
    }

    @Bean("backupQueue")
    public Queue backupQueue() {
        HashMap<String, Object> map = new HashMap<>(8);
        return new Queue(BACKUP_QUEUE_NAME, false, false, false, map);
    }

    @Bean("warningQueue")
    public Queue warningQueue() {
        HashMap<String, Object> map = new HashMap<>(8);
        return new Queue(WARNING_QUEUE_NAME, false, false, false, map);
    }

    @Bean
    public Binding queueConfirmBindingExchange(@Qualifier("confirmQueue") Queue queue,
                                               @Qualifier("confirmExchange") Exchange exchange) {

        return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY).noargs();
    }

    @Bean
    public Binding backupConfirmBindingExchange(@Qualifier("backupQueue") Queue queue,
                                                @Qualifier("backupExchange") FanoutExchange exchange) {

        return BindingBuilder.bind(queue).to(exchange);
    }

    @Bean
    public Binding warningConfirmBindingExchange(@Qualifier("warningQueue") Queue queue,
                                                 @Qualifier("backupExchange") FanoutExchange exchange) {

        return BindingBuilder.bind(queue).to(exchange);
    }
}

8.3.3. Alert consumers

package com.xingchen.mq.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author xingchen
 * @version V1.0
 * @Package com.xingchen.mq.config
 * @date 2022/12/7 11:52
 */
@Component
@Slf4j
public class WarningConsumer {
    @RabbitListener(queues = {"warning-queue"})
    public void receiveWarningMsg(Message message) {
        log.warn("A non-routable message appears:", message);
    }
}

8.3.4. Test Considerations

When restarting the project, we need to delete the original confirm.exchange because we have modified its binding properties, otherwise the following error will be reported:

8.3.5. Results Analysis

When the mandatory parameter and the backup switch can be used together, if both are enabled at the same time, where will the message go? who takes priority

The priority is high, and the above results show that the answer is that the priority of the backup switch is high.

Tags: Java Middleware RabbitMQ java-rabbitmq

Posted by Victorm on Sun, 11 Dec 2022 02:28:14 +1030