[RabbitMQ, Spring Boot] Spring Boot integrates RabbitMQ

1, Environmental preparation

Introducing pom dependency

        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

Set application YML profile

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 7006
    username: admin
    password: admin
    # Whether to trigger callback method
    # The value of NONE disables the release confirmation mode and is the default value
    # The CORRELATED value is that the callback method will be triggered after the message is successfully published to the exchange, as shown in example 1
    # The SIMPLE value has been tested to have two effects. One effect is the same as the corelated value, which will trigger the callback method. The other is to use rabbitTemplate to call waitForConfirms or waitForConfirmsOrDie method after the message is successfully published, wait for the broker node to return the sending result, and determine the logic of the next step according to the returned result. Note that if waitForConfirmsOrDie method returns false, it will close the channel, Then the message cannot be sent to the broker;
    publisher-confirm-type: correlated

2, Simple mode: a consumer consumes information in a queue

1. Register

package com.rabbmq.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Simple mode
 *
 * @author He PanFu
 * @date 2021-09-16 12:33:25
 */
@Configuration
public class RabbitSimpleConfig {
    
    @Bean
    public Queue simpleQueueWx() {
        // Queue name, persistent or not
        return new Queue("simple_queue", true);
    }
}

2. Add message to queue

	@Autowired
    RabbitTemplate rabbitTemplate;

	 @GetMapping("/simple")
    public String sendSimpleMessage() {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("simple_queue", getStringObjectMap());
        }
        return "ok";
    }
    
	private Map<String, Object> getStringObjectMap() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "test message, hello!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String, Object> map = new HashMap<>();
        map.put("messageId", messageId);
        map.put("messageData", messageData);
        map.put("createTime", createTime);
        return map;
    }

3. Consumer News

package com.rabbitmq.receiver;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;


/**
 * Simple mode: wechat subscription
 *
 * @author He PanFu
 * @date 2021-09-16 12:47:32
 */
@Component
@RabbitListener(queues = "simple_queue")
public class RabbitSimpleWXListener {

    @RabbitHandler
    public void init(Map testMessage) {
        System.out.println("Simple mode (wechat):" + testMessage);
    }
}

4. Display

remarks:

  • @RabbitListener can be used directly on the method alone.
  • @The RabbitListener can be marked on the class and should be used together with the @ RabbitHandler annotation
  • @The RabbitListener label on the class indicates that when a message is received, it will be handed over to the method of @ RabbitHandler for processing. The specific method to use for processing depends on the parameter type converted by MessageConverter

3, Working mode (polling distribution): multiple consumers consume information in the same queue

  • Introduction: start multiple consumers. When the producer sends the message to the queue, one message will only be received by one consumer. rabbit sends the message to the consumer evenly by polling by default. The consumer will receive the next message after processing a message.
  • Round robin: message distribution is polling, that is, multiple consumers get messages from the queue and consume them one at a time. Polling distribution does not consider the speed of consumers consuming messages. For example, there are 100 messages in the queue, C1 and C2 consumers. No matter how fast they consume, the final result is always you and me, and everyone gets 25 messages.

1. Register

package com.cyun.demo.rabbitmq.provider;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * Working mode
 *
 * @author He PanFu
 * @date 2021-09-16 12:33:25
 */
@Component
public class RabbitWorkManage {
    /**
     * Work queue
     *
     * @return queue
     */
    @Bean
    public Queue workQueue() {
        return new Queue("work_queue", true);
    }
}


2. Add message to queue

 	@GetMapping("/work")
    public String sendWorkMessage() {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("work_queue", getStringObjectMap());
        }
        return "ok";
    }

3. Consumer News

Note: the formal environment is to start multiple consumer services. I will simplify the simulation directly here.

package com.cyun.demo.rabbitmq.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.concurrent.TimeUnit;


/**
 * Working mode: queue listening
 *
 * @author He PanFu
 * @date 2021-09-16 12:47:32
 */
@Slf4j
@Component
public class RabbitWorkListener {

    @RabbitListener(queues = "work_queue")
    public void messageListener1(Object msg, Channel channel, Message message) throws InterruptedException, IOException {
       TimeUnit.SECONDS.sleep(1);
        log.info("1 Message processing completed, message content:{}", msg);
    }

    @RabbitListener(queues = "work_queue")
    public void messageListener2(Object msg, Channel channel, Message message) throws IOException, InterruptedException {
       TimeUnit.SECONDS.sleep(2);
        log.info("2 Message processing completed, message content:{}", msg);
    }
}

4. Display

4, Working mode (fair distribution): multiple consumers consume information in the same queue

  • Fair dispatcher: the distribution of messages is fair, that is, messages are distributed according to the consumption ability of consumers. Those who can do more work. For example, there are 90 messages in the queue, C1 and C2 consumers. C1 consumes one message at a speed of 1s and C2 consumes one message at a speed of 2s. Then the final result is that C1 consumes 60 messages and C2 consumes 30 messages.

1. Modify application YML file

Enable manual ack, and only one message can be read at a time

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 7006
    username: admin
    password: admin
    # Whether to trigger callback method
    # The value of NONE disables the release confirmation mode and is the default value
    # The callback method will trigger the value of red to the exchange after it is published successfully, for example, the value of red will be released to the exchange
    # The SIMPLE value has been tested to have two effects. One effect is the same as the corelated value, which will trigger the callback method. The other is to use rabbitTemplate to call waitForConfirms or waitForConfirmsOrDie method after the message is successfully published, wait for the broker node to return the sending result, and determine the logic of the next step according to the returned result. Note that if waitForConfirmsOrDie method returns false, it will close the channel, Then the message cannot be sent to the broker;
    publisher-confirm-type: correlated
    listener:
      simple:
        # Start manual confirmation
        acknowledge-mode: manual
        # The preprocessing mode is changed to read one message at a time, and the next message will not be delivered until the consumer has received the confirmation
        prefetch: 1

2. Transform consumer message code

package com.cyun.demo.rabbitmq.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.concurrent.TimeUnit;


/**
 * Working mode: queue listening
 *
 * @author He PanFu
 * @date 2021-09-16 12:47:32
 */
@Slf4j
@Component
public class RabbitWorkListener {

    @RabbitListener(queues = "work_queue")
    public void messageListener1(Object msg, Channel channel, Message message) throws InterruptedException, IOException {
        log.info("1 Start processing the message. Message content:{}", msg);
        
        // Fair distribution: manual confirmation messages
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        // Return the unacknowledged message manually. When the request is true, rejoin the queue
        // channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);

    }

    @RabbitListener(queues = "work_queue")
    public void messageListener2(Object msg, Channel channel, Message message) throws IOException, InterruptedException {
        log.info("2 Start processing the message. Message content:{}", msg);
        
        TimeUnit.SECONDS.sleep(1);
        // Fair distribution: manual confirmation messages
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

3. Display

5, Publish / subscribe mode: multiple consumers consume the same information

1. Register

package com.cyun.demo.rabbitmq.provider;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Publish / subscribe mode
 *
 * @author He PanFu
 * @date 2022-04-16 17:04:55
 */
@Configuration
public class RabbitmqFanoutConfig {

    /**
     * Set up the switch:
     * <p>
     * durable:Whether to persist or not. The default is false. Persistence queue: it will be stored on the disk and still exist when the message agent is restarted. Temporary queue: the current connection is valid
     * exclusive:The default is also false, which can only be used by the currently created connection, and the queue will be deleted after the connection is closed. This reference takes precedence over durable
     * autoDelete:Whether to delete automatically. When no producer or consumer uses this queue, the queue will be deleted automatically.
     *
     * @return Publish / subscribe switch
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout_exchange", true, false);
    }

    /**
     * Wechat queue
     *
     * @return queue
     */
    @Bean
    public Queue fanoutQueueWx() {
        return new Queue("fanout_queue_wx", true);
    }

    /**
     * SMS queue
     *
     * @return queue
     */
    @Bean
    public Queue fanoutQueueDx() {
        return new Queue("fanout_queue_dx", true);
    }

    /**
     * Wechat queue binding switch
     *
     * @return Binding object
     */
    @Bean
    public Binding fanoutBindingWx() {
        return BindingBuilder.bind(fanoutQueueWx()).to(fanoutExchange());
    }

    /**
     * SMS queue binding switch
     *
     * @return Binding object
     */
    @Bean
    public Binding fanoutBindingDx() {
        return BindingBuilder.bind(fanoutQueueDx()).to(fanoutExchange());
    }

}


2. Add message to queue

	 @GetMapping("/fanout")
    public String sendFanoutMessage() {
        for (int i = 0; i < 10; i++) {
            //Send the message with the binding key: TestDirectRouting to TestDirectExchange
            rabbitTemplate.convertAndSend("fanout_exchange", "", getStringObjectMap(), new CorrelationData(UUID.randomUUID().toString()));
        }
        return "ok";
    }

3. Consumer News

package com.cyun.demo.rabbitmq.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.concurrent.TimeUnit;


/**
 * Publish / subscribe mode: queue listening
 *
 * @author He PanFu
 * @date 2021-09-16 12:47:32
 */
@Slf4j
@Component
public class RabbitmqFanoutListener {

    @RabbitListener(queues = "fanout_queue_wx")
    public void messageListener1(Object msg, Channel channel, Message message, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws InterruptedException, IOException {
        log.info("Subscription mode (wechat), message processing starts, message content:{}", msg);

        // Manual confirmation message
        channel.basicAck(tag, false);

    }

    @RabbitListener(queues = "fanout_queue_dx")
    public void messageListener2(Object msg, Channel channel, Message message) throws IOException, InterruptedException {
        log.info("Subscription mode (SMS), message processing starts, message content:{}", msg);

        // Manual confirmation message
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

4. Display

6, Routing mode

1. Register

package com.cyun.demo.rabbitmq.provider;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

/**
 * @author He PanFu
 * @date 2022-04-16 18:29:03
 */
@Configuration
public class RabbitmqDirectConfig {

    /**
     * Set up the switch:
     * <p>
     * durable:Whether to persist or not. The default is false. Persistence queue: it will be stored on the disk and still exist when the message agent is restarted. Temporary queue: the current connection is valid
     * exclusive:The default is also false, which can only be used by the currently created connection, and the queue will be deleted after the connection is closed. This reference takes precedence over durable
     * autoDelete:Whether to delete automatically. When no producer or consumer uses this queue, the queue will be deleted automatically.
     *
     * @return Switch:
     */
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("direct_exchange", true, false);
    }

    /**
     * Wechat queue
     *
     * @return queue
     */
    @Bean
    public Queue directQueueWx() {
        return new Queue("direct_queue_wx", true);
    }

    /**
     * SMS queue
     *
     * @return queue
     */
    @Bean
    public Queue directQueueDx() {
        return new Queue("direct_queue_dx", true);
    }

    /**
     * Wechat queue binding switch
     *
     * @return binding
     */
    @Bean
    public Binding directBindingWx() {
        return BindingBuilder.bind(directQueueWx()).to(directExchange()).with("wx");
    }

    /**
     * SMS queue binding switch
     *
     * @return binding
     */
    @Bean
    public Binding directBindingDx() {
        return BindingBuilder.bind(directQueueDx()).to(directExchange()).with("dx");
    }

}

2. Add message to queue

    @GetMapping("/direct/{routingKey}")
    public String sendDirectMessage(@PathVariable("routingKey") String routingKey) {
        rabbitTemplate.convertAndSend("direct_exchange", routingKey, getStringObjectMap());
        return "ok";
    }

3. Consumer News

package com.cyun.demo.rabbitmq.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * Routing mode
 *
 * @author He PanFu
 * @date 2022-04-16 18:32:19
 */
@Slf4j
@Component
public class RabbitmqDirectListener {

    @RabbitListener(queues = "direct_queue_wx")
    public void messageListener1(Object msg, Channel channel, Message message, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws InterruptedException, IOException {
        log.info("Subscription mode (wechat), message processing starts, message content:{}", msg);

        // Manual confirmation message
        channel.basicAck(tag, false);

    }

    @RabbitListener(queues = "direct_queue_dx")
    public void messageListener2(Object msg, Channel channel, Message message) throws IOException, InterruptedException {
        log.info("Subscription mode (SMS), message processing starts, message content:{}", msg);

        // Manual confirmation message
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

4. Display

7, Theme mode

1. Register

package com.cyun.demo.rabbitmq.provider;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Theme mode
 *
 * @author He PanFu
 * @date 2022-04-18 09:00:43
 */
@Configuration
public class RabbitmqTopicConfig {

    /**
     * Set up the switch:
     * <p>
     * durable:Whether to persist or not. The default is false. Persistence queue: it will be stored on the disk and still exist when the message agent is restarted. Temporary queue: the current connection is valid
     * exclusive:The default is also false, which can only be used by the currently created connection, and the queue will be deleted after the connection is closed. This reference takes precedence over durable
     * autoDelete:Whether to delete automatically. When no producer or consumer uses this queue, the queue will be deleted automatically.
     *
     * @return Switch
     */
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topic_exchange", true, false);
    }

    /**
     * Set wechat queue
     *
     * @return queue
     */
    @Bean
    public Queue topicQueueWx() {
        return new Queue("topic_queue_wx", true);
    }

    /**
     * Set SMS queue
     *
     * @return queue
     */
    @Bean
    public Queue topicQueueDx() {
        return new Queue("topic_queue_dx", true);
    }

    /**
     * Wechat queue binding switch
     * "*" Means any word
     * "#" Represents 0 or 1 words
     *
     * @return binding
     */
    @Bean
    public Binding topicBindingWx() {
        return BindingBuilder.bind(topicQueueWx()).to(topicExchange()).with("#.wx.#");
    }

    /**
     * Exchange for SMS queue binding
     * "*" Means any word
     * "#" Represents 0 or 1 words
     *
     * @return binding
     */
    @Bean
    public Binding topicBindingDx() {
        return BindingBuilder.bind(topicQueueDx()).to(topicExchange()).with("dx.*");
    }
}

2. Add message to queue

    @GetMapping("/topic")
    public String sendTopicMessage() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "test message, hello!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String, Object> map = new HashMap<>();
        map.put("messageId", messageId);
        map.put("messageData", messageData);
        map.put("createTime", createTime);
        //Send the message with the binding key: TestDirectRouting to TestDirectExchange
        map.put("key", "wx");
        rabbitTemplate.convertAndSend("topic_exchange", "wx", map);
        map.put("key", "wx.www");
        rabbitTemplate.convertAndSend("topic_exchange", "wx.www", map);
        map.put("key", "wx.www.www");
        rabbitTemplate.convertAndSend("topic_exchange", "wx.www.www", map);
        map.put("key", "dx");
        rabbitTemplate.convertAndSend("topic_exchange", "dx", map);
        map.put("key", "dx.www");
        rabbitTemplate.convertAndSend("topic_exchange", "dx.www", map);
        map.put("key", "dx.www.www");
        rabbitTemplate.convertAndSend("topic_exchange", "dx.www.www", map);
        return "ok";
    }

3. Consumer News

package com.cyun.demo.rabbitmq.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

/**
 * Theme mode
 *
 * @author He PanFu
 * @date 2022-04-18 09:18:47
 */
@Slf4j
@Component
public class RabbitmqTopicListener {

    /**
     * Wechat queue monitoring
     *
     * @param testMessage data
     */
    @RabbitListener(queues = "topic_queue_wx")
    public void messageListener2(Map testMessage, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        log.info("Theme mode (wechat), key: {},testMessage: {}", testMessage.get("key"),testMessage);

        channel.basicAck(tag, false);
    }
    
    /**
     * SMS queue monitoring
     *
     * @param testMessage data
     */
    @RabbitListener(queues = "topic_queue_dx")
    public void messageListener1(Map testMessage, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        log.info("Theme mode (SMS), key: {},testMessage: {}", testMessage.get("key"),testMessage);
        channel.basicAck(tag, false);
    }

}


4. Display

8, Parameter mode

1. Register

package com.cyun.demo.rabbitmq.provider;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * Parameter mode
 *
 * @author He PanFu
 * @date 2022-04-18 09:53:43
 */
@Configuration
public class RabbitmqHeadersConfig {

    /**
     * Set up the switch:
     * <p>
     * durable:Whether to persist or not. The default is false. Persistence queue: it will be stored on the disk and still exist when the message agent is restarted. Temporary queue: the current connection is valid
     * exclusive:The default is also false, which can only be used by the currently created connection, and the queue will be deleted after the connection is closed. This reference takes precedence over durable
     * autoDelete:Whether to delete automatically. When no producer or consumer uses this queue, the queue will be deleted automatically.
     *
     * @return Switch
     */
    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange("headers_exchange", true, false);
    }

    /**
     * Set wechat queue
     *
     * @return queue
     */
    @Bean
    public Queue headersQueueWx() {
        return new Queue("headers_queue_wx", true);
    }

    /**
     * Set SMS queue
     *
     * @return queue
     */
    @Bean
    public Queue headersQueueDx() {
        return new Queue("headers_queue_dx", true);
    }

    /**
     * Set mailbox queue
     *
     * @return queue
     */
    @Bean
    public Queue headersQueueYx() {
        return new Queue("headers_queue_yx", true);
    }

    /**
     * The wechat queue is bound to the switch, and the request header contains two
     *
     * @return binding
     */
    @Bean
    public Binding headersBindingWx() {
        Map<String, Object> map = new HashMap<>();
        map.put("One", "A");
        map.put("Two", "B");
        return BindingBuilder.bind(headersQueueWx()).to(headersExchange()).whereAll(map).match();
    }

    /**
     * Bind the switch to the short message queue, and the request header contains any one
     *
     * @return binding
     */
    @Bean
    public Binding headersBindingDx() {
        Map<String, Object> map = new HashMap<>();
        map.put("One", "A");
        map.put("Two", "B");
        return BindingBuilder.bind(headersQueueDx()).to(headersExchange()).whereAny(map).match();
    }

    /**
     * Mailbox queue binding switch
     *
     * @return binding
     */
    @Bean
    public Binding headersBindingYx() {
        return BindingBuilder.bind(headersQueueYx()).to(headersExchange()).where("Three").matches("C");
    }
}

2. Add message to queue

    @GetMapping("/topic")
    public String sendTopicMessage() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "test message, hello!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String, Object> map = new HashMap<>();
        map.put("messageId", messageId);
        map.put("messageData", messageData);
        map.put("createTime", createTime);
        //Send the message with the binding key: TestDirectRouting to TestDirectExchange
        map.put("key", "wx");
        rabbitTemplate.convertAndSend("topic_exchange", "wx", map);
        map.put("key", "wx.www");
        rabbitTemplate.convertAndSend("topic_exchange", "wx.www", map);
        map.put("key", "wx.www.www");
        rabbitTemplate.convertAndSend("topic_exchange", "wx.www.www", map);
        map.put("key", "dx");
        rabbitTemplate.convertAndSend("topic_exchange", "dx", map);
        map.put("key", "dx.www");
        rabbitTemplate.convertAndSend("topic_exchange", "dx.www", map);
        map.put("key", "dx.www.www");
        rabbitTemplate.convertAndSend("topic_exchange", "dx.www.www", map);
        return "ok";
    }

3. Consumer News

package com.cyun.demo.rabbitmq.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

/**
 * Parameter mode
 *
 * @author He PanFu
 * @date 2022-04-18 10:27:15
 */
@Slf4j
@Component
public class RabbitmqHeadersListener {

    /**
     * Wechat queue monitoring
     *
     * @param testMessage data
     */
    @RabbitListener(queues = "headers_queue_wx")
    public void messageListener1(Message testMessage, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        log.info("Parameter mode (wechat), testMessage: {}", testMessage);

        channel.basicAck(tag, false);
    }

    /**
     * SMS queue monitoring
     *
     * @param testMessage data
     */
    @RabbitListener(queues = "headers_queue_dx")
    public void messageListener2(Message testMessage, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        log.info("Parameter mode (SMS), testMessage: {}", testMessage);

        channel.basicAck(tag, false);
    }

    /**
     * Mailbox queue listening
     *
     * @param testMessage data
     */
    @RabbitListener(queues = "headers_queue_yx")
    public void messageListener3(Message testMessage, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        log.info("Parameter mode (mailbox), testMessage: {}", testMessage);

        channel.basicAck(tag, false);
    }
}

4. Display

9, Dead letter queue

Introduction: "dead letter" is a message mechanism in RabbitMQ. When you consume a message, if the message in the queue occurs the following conditions, the message will become "dead letter".

  1. If the message is negative, use channel Basicnack or channel Basicreject, and the request property is set to false.
  2. The lifetime of the message in the queue exceeds the set TTL time.
  3. The number of messages in the message queue has exceeded the maximum queue length.

The "dead letter" message will be specially processed by RabbitMQ. If the dead letter queue information is configured, the message will be thrown into the dead letter queue. If it is not configured, the message will be discarded.

Note: if you modify the configuration information of the queue, you need to delete the original queue and re register.

1. Register

package com.cyun.demo.rabbitmq.provider;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * Publish / subscribe mode
 *
 * @author He PanFu
 * @date 2022-04-16 17:04:55
 */
@Configuration
public class RabbitmqFanoutTtlConfig {

    /**
     * Set up the switch:
     * <p>
     * durable:Whether to persist or not. The default is false. Persistence queue: it will be stored on the disk and still exist when the message agent is restarted. Temporary queue: the current connection is valid
     * exclusive:The default is also false, which can only be used by the currently created connection, and the queue will be deleted after the connection is closed. This reference takes precedence over durable
     * autoDelete:Whether to delete automatically. When no producer or consumer uses this queue, the queue will be deleted automatically.
     *
     * @return Publish / subscribe switch
     */
    @Bean
    public FanoutExchange fanoutTtlExchange() {
        return new FanoutExchange("fanout_ttl_exchange", true, false);
    }

    /**
     * Wechat queue
     *
     * @return queue
     */
    @Bean
    public Queue fanoutTtlQueueWx() {
        Map<String, Object> map = new HashMap<>();
        // Expiration time
        map.put("x-message-ttl", 1000);
        // Dead letter queue. After the message expires, it will be placed in the dead letter switch
        map.put("x-dead-letter-exchange", "direct_exchange");
        map.put("x-dead-letter-routing-key", "wx");
        return new Queue("fanout_ttl_queue_wx", true, false, false, map);
    }

    /**
     * Wechat queue binding switch
     *
     * @return Binding object
     */
    @Bean
    public Binding fanoutTtlBindingWx() {
        return BindingBuilder.bind(fanoutTtlQueueWx()).to(fanoutTtlExchange());
    }

}

2. Add message to queue

    @GetMapping("/fanout/ttl")
    public String sendFanoutTtlMessage() {
        rabbitTemplate.convertAndSend("fanout_ttl_exchange", "", getStringObjectMap());
        return "ok";
    }

3. Consumer News

package com.cyun.demo.rabbitmq.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.concurrent.TimeUnit;


/**
 * Publish / subscribe mode: queue listening
 *
 * @author He PanFu
 * @date 2021-09-16 12:47:32
 */
@Slf4j
@Component
public class RabbitmqFanoutTtlListener {

    @RabbitListener(queues = "fanout_ttl_queue_wx")
    public void messageListener1(Object msg, Channel channel, Message message, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws InterruptedException, IOException {
        log.info("subscription model  TTL(Wechat), message processing starts, message content:{}", msg);
        
        // Trigger case 1: the message is negative and confirmed. multiple: batch processing True: ack all messages less than deliveryTag at one time. Request: rejoin the queue when it is true
        // channel.basicNack(tag,false,false);
        // Trigger case 2: timeout
        TimeUnit.SECONDS.sleep(15);
        channel.basicAck(tag, false);
    }

}

4. Display

5. Expand: set the expiration time information when adding messages to the queue

Note: two expiration times, whichever is the smallest.

    @GetMapping("/fanout/ttl/5000")
    public String sendFanoutTTLMessageMessage() {
        // Set expiration time
        MessagePostProcessor processor = message -> {
            message.getMessageProperties().setExpiration("5000");
            message.getMessageProperties().setContentEncoding("UTF-8");
            return message;
        };

        rabbitTemplate.convertAndSend("fanout_ttl_exchange", "", getStringObjectMap(), processor);
        return "ok";
    }

10, Summary

The specific steps are: register (register switch - > register queue - > bind switch and queue) - > Listen - > join queue

Tags: Microservices Distribution Spring Cloud Spring Boot RabbitMQ

Posted by Trader77 on Mon, 18 Apr 2022 21:37:45 +0930