RabbitMQ -- Message Middleware

1, What is middleware?

Middleware is the software between operating system and application program. Middleware is an independent system software or service program, with which distributed application software can share resources among different technologies.

There are many middleware products, such as redis, ElasticSearch, RabbitMQ, ActiveMQ, Kafka, MyCat, sharding JDBC and so on. These are collectively referred to as middleware.

2, Why do you need middleware?

Let's not talk about redis and es. They are all designed to improve the search speed. Here we focus on message middleware.
In the early single system architecture of all in one, there was no need for message middleware, or even if message middleware was used in the single system architecture, it was rarely used. Common message middleware include ActiveMQ, RabbitMQ, Kafka, RocketMQ, etc. Message middleware technology is mainly used to solve some problems in distributed architecture system

  1. Cross system data transfer reduces the coupling between different systems in the distributed architecture
    For example: system A sends data to three BCD systems, and sends data through interface call. What if the E system also needs this data? What if the C system is not needed now? The person in charge of A system almost collapsed

    After using message oriented middleware: A only needs to send messages to mq, and other systems will read the messages, thus relieving the call coupling between A and other systems
  2. Asynchronous processing mechanism to improve the user experience.
    In the single architecture, the waiting time for a user to initiate a request is 3+300+450+200=953ms

    After using message middleware, the request message can be sent to mq, and then the result can be returned directly. Other systems can read mq messages to achieve the effect of asynchronous processing requests. At this time, it only takes 3+5=8ms
  3. High concurrent traffic peak clipping. Sometimes a system usually has 30 concurrent requests per second, but in a certain period of time, the number of concurrent requests will reach more than 5K. At this time, a large number of 5K requests flow into msql at the same time, which may lead to database crash.

    At this time, if an mq middleware is introduced, 5K requests will be sent to mq first, and then system A will pull 2K requests per second from mq for processing, which greatly alleviates the pressure of the database( Don't worry about message backlog in mq, because this high concurrency of 5K requests per second is transient, while system A has been pulling at the rate of 2K requests per second, so as long as the peak period is over, messages will be consumed immediately, and there will be no backlog.)
  4. In addition, message oriented middleware can effectively solve the problem of distributed transaction, which is described in detail in the chapter of distributed transaction below.

3, Several core components of message middleware

Since rabbitMq adopts AMQP protocol, in order to better understand AMQP, this knowledge point of network protocol is consolidated again

Network protocol is a set of rules, standards or conventions established for data exchange in computer network.
In short, in the whole computer network, each computer entity has to communicate with each other. Because the character set of each computer terminal is different, the input commands between the two computers are also different. However, in order to enable the communication between the two computers, it is required that each computer should implement a set of standard character set, Then the standard character set is used to enter the network for transmission, so as to achieve the purpose of communication between computers.

Network protocol is divided into many layers, and different layers correspond to different protocols

For example, TCP/UDP is a transport layer protocol,
TCP is also the most commonly used transport layer protocol. The three handshake protocol is characterized by reliable and stable transmission, but slow transmission and low efficiency. Applicable scenarios: file transfer, mail sending, normal network page access, etc.
The characteristic of UDP is that it doesn't need to establish a connection after three handshakes like TCP to transmit. Its characteristic is simple and fast transmission, but its disadvantage is also obvious, that is, it is unreliable and unstable, which has great requirements for the network environment. Applicable scenarios, phone calls, video calls, etc. (so when the network is bad, the phone calls will not be heard clearly or the video will get stuck)

HTTP protocol is an application layer protocol, also known as hypertext transfer protocol. This should not be confused with TCP/UDP. It is a protocol at two different levels.
For example, the network layer protocol is equivalent to the road, the transport layer protocol is equivalent to the car running on the road, and the application layer Protocol HTTP is the person in the car. These layers of network protocol are progressive from bottom to top.

AMQP full name: Advanced message queuing protocol. Similar to http, it is also a development standard of application layer protocol, which is designed for message oriented middleware. Generally speaking, every message middleware has its own application layer protocol, and the message transfer protocols similar to AMQP include MQTT protocol, OpenMessage protocol, Kafka protocol, etc.

Interview question: why does message middleware not use http protocol directly, but use another protocol?

  1. Because the http request header and response header are relatively complex, including cookie, data encryption and decryption, status code, response code and other additional functions, but for a message, we do not need to be so complex, and there is no need for it. In fact, it is responsible for data transmission, storage and distribution. We must pursue high performance. Try to be concise and fast.
  2. In most cases, http is mostly short links. In the actual interaction process, a request to the response is likely to be interrupted. After the interruption, it will not be persistent, which will cause the loss of the request. This is not conducive to the business scenario of message oriented middleware, because message oriented middleware may be a long-term process of obtaining messages. In case of problems and failures, data or messages need to be persisted. The purpose is to ensure the high reliability and robust operation of messages and data.

In fact, the simple problem is that the http protocol is relatively complex. What we need to do in message middleware is to deliver message data, which is in pursuit of high speed and high performance. Another is that most http links are short, once interrupted, it will cause the loss of data. Message middleware is a long-term process to obtain messages. If there is a problem, it is necessary to persist messages immediately and achieve high reliability.

Message queue persistence: data will not be lost after the server is restarted. Similar to Redis, RabbitMQ also saves data in a file, so I don't want to give too much overview on persistence( Persistence in RabbitMQ requires that you declare persistence when you create a switch or queue

Message distribution strategy: after the message arrives in the queue, how to get the message and how to consume the message.

Note: This is about message distribution strategy, which refers to how consumers consume messages in the queue. The following several working modes of MQ, such as fanout, direct and topic, refer to the mode in which the switch sends messages to the queue!

4, Introduction and installation of RabbitMQ

RabbitMQ is an open source AMQP protocol implementation based on Erlang language, supporting a variety of client (Language). It is used to store and forward messages in distributed system. It has the characteristics of high availability, high scalability and ease of use.

Location of message middleware in the system:

RabbitMQ installation steps (the following is the installation on Linux):

Rabbitmq is developed in Erlang language, so you must provide Erlang environment and install Erlang first. Before installation, you should note that Erlang version should correspond to rabbitmq version. If not, incompatibility will occur later. The corresponding relationship table can be found on rabbitmq official website https://www.rabbitmq.com/which-erlang.html

Due to the slow download speed of the official website, download the rpm package locally and copy it to the linux system

  1. rpm -Uvh erlang-solutions-2.0-1.noarch.rpm
  2. yum install -y erlang
  3. erl -v view erlang version, the version appears, indicating that the installation of erlang is successful!

Install socat: because rabbitmq needs this plug-in

  1. yum install -y socat

Install rabbitmq:

  1. rpm -Uvh rabbitmq-server-3.8.13-1.el8.noarch.rpm is installed
  2. Systemctl start rabbitmq server start rabbitmq
  3. Systemctl status rabbitmq server checks the status of rabbitmq and shows that active started successfully!

By default, rabbitmq does not install the graphical management interface. We need to install it manually

  1. rabbitmq-plugins enable rabbitmq_management install web management plug-in
  2. After installing systemctl restart rabbitmq server, restart the service

The port of the web management plug-in is 15672, and the browser can access it by inputting ip + port number

Note: rabbitmq has a default account: guest/guest, but this account can only be accessed on the local host machine. Since rabbitmq is installed on a remote server, we need to add a user who can log in remotely.

  1. rabbitmqctl add_user admin admin new user
  2. rabbitmqctl set_ user_ Tags administrator set user permissions (the highest level of administrator, equivalent to root user)

The user permissions have the following levels:

  1. administrator can log in to the console, view all the information, and manage rabbitmq
  2. The monitor logs in to the console to view all the information
  3. The policy maker logs in to the console and specifies the policy
  4. managment ordinary administrator login console

After login, the following interface appears:

The above is the general installation process of rabbitmq. Let's start learning rabbitmq.

5, Several working modes of RabbitMQ

From the introduction of the official website, there are the following modes:



Since the sixth and seventh modes are rarely used, we only discuss the first five modes.

Simple mode

This is the simplest of several working modes, that is, producers produce messages to a specified queue, and consumers consume the messages. But it should be noted that this is not without a switch. You should know that all messages received by the queue are first sent to the switch by the producer and then to the queue by the switch. The reason why there is no switch in the figure above is that the default switch is used.

The following uses the most native code to realize the whole process of sending and consuming messages in simple mode:

public static void main(String[] args) {

      // 1. Create a connection factory
      ConnectionFactory connectionFactory = new ConnectionFactory();
      // 2. Set connection properties
      connectionFactory.setHost("139.196.255.42");
      connectionFactory.setPort(5672);
      connectionFactory.setVirtualHost("/hhl");
      connectionFactory.setUsername("admin");
      connectionFactory.setPassword("admin");

      Connection connection = null;
      Channel channel = null;


      // 3. Get the connection from the factory
      try {
          connection = connectionFactory.newConnection("producer");
          // 4. Get the channel from the connection. rabbitmq is operated through the channel. Later, explain in detail why
          channel = connection.createChannel();

          // 5. Create a Queue
           /*
           *  If the queue does not exist, it is created
           *  Rabbitmq It is not allowed to create two identical queue names, otherwise an error will be reported.
           *
           *  @params1:  queue The name of the queue
           *  @params2:  durable Is the queue persistent
           *  @params3:  exclusive Whether it is exclusive, that is, whether it is private. If it is true, the current queue will be locked, other channels will not be accessible, and the connection will be closed automatically
           *  @params4:  autoDelete Whether to delete automatically. When the last consumer disconnects, whether to delete the message automatically.
           *  @params5:  arguments You can set the additional parameters of the queue, the validity period of the queue, the maximum length of the message, the message life cycle of the queue, and so on.
           * */
          channel.queueDeclare("queue1",false,false,false,null);

          // 6. Create a message to send
          String message = "Hello Simple !";

          // 7. Send message to rabbitmq server
          // @params1: switch exchange
          // @params2: queue name / routing (if switch is specified, this parameter is route; If the switch is not specified to be empty (the default switch is used), this parameter is the queue name)
          // @params3: property configuration (also some conditions, for example, the switches of headers type are specially configured with properties)
          // @params4: send message content

          // Generally speaking, the switch must be specified in the actual development!
          channel.basicPublish("","queue1",null,message.getBytes());
          System.out.println("Message sent successfully!");

      } catch (Exception e) {
          e.printStackTrace();
      }finally {
          if(channel!=null && channel.isOpen()){
              try {
                  channel.close();
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
          if(connection!=null){
              try {
                  connection.close();
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      }

  }

Log in to rabbitmq management interface to view the message. The message is sent successfully and has been stored in queue 1

Next, create the code of the consumer and consume the message

public static void main(String[] args) {
        // 1. Create a connection factory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2. Set connection properties
        connectionFactory.setHost("139.196.255.42");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/hhl");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");

        Connection connection = null;
        Channel channel = null;

        try {
            // 3. Get the connection from the factory
            connection = connectionFactory.newConnection("consumer");
            // 4. Get the channel from the connection
            channel = connection.createChannel();

            channel.basicConsume("queue1", true, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    try {
                        System.out.println("Receive queue queue1 News from:" + new String(delivery.getBody(), "UTF-8"));
                        Thread.sleep(1000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                    System.out.println("Accept message exception...");
                }
            });

            System.out.println("Start receiving message...");
            System.in.read();

        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            if(channel!=null && channel.isOpen()){
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null){
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

    }

Message successfully consumed:

That's how to create the simple pattern.

Work mode: it is mainly divided into polling mode and fair distribution mode

As can be seen from the figure above, this mode is mainly aimed at how multiple consumers consume messages in the same queue when there are multiple consumers in a queue. In fact, this mode is quite special. Other modes refer to the mechanism of the switch sending messages to the queue. work mode mainly refers to the mechanism of how multiple consumers consume messages in the same queue. This concept must be removed.

First of all, we need to make it clear that a message can only be consumed by one consumer once. We can't say that a message can be consumed by two or more consumers at the same time.

So the problem arises. For example, if there are 10 messages in a queue and two consumers are listening to the queue at the same time, how will these 10 messages be allocated to the two consumers?
This uses the work mode. There are two consumption modes. First, the polling mode:
Polling mode means that 10 messages are allocated to two consumers one by one, each with 5 messages. Polling mode is also a default consumption mode when there are multiple consumers in rabbitmq.

The producer just sends 10 messages to the queue
Consumer C1

// Consumer 1
public static void main(String[] args) {
         // 1. Create a connection factory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2. Set connection properties
        connectionFactory.setHost("139.196.255.42");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/hhl");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");

        Connection connection = null;
        Channel channel = null;

        try {
            // 3. Get the connection from the factory
            connection = connectionFactory.newConnection("consumer");
            // 4. Get the channel from the connection
            channel = connection.createChannel();

            // There is no need to set anything for polling distribution. The default is polling distribution and @ param2 true is auto response
            channel.basicConsume("queue1", true, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    try {
                        System.out.println("Message received from queue:" + new String(delivery.getBody(), "UTF-8"));
                        Thread.sleep(100); // Polling mode has nothing to do with performance, so no matter how long it is set here, the same number of messages will be consumed evenly
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                    System.out.println("Accept message exception...");
                }
            });

            System.out.println("Start receiving message...");
            System.in.read();

        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            if(channel!=null && channel.isOpen()){
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null){
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

Consumer C2

public static void main(String[] args) {
         // 1. Create a connection factory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2. Set connection properties
        connectionFactory.setHost("139.196.255.42");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/hhl");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");

        Connection connection = null;
        Channel channel = null;

        try {
            // 3. Get the connection from the factory
            connection = connectionFactory.newConnection("consumer");
            // 4. Get the channel from the connection
            channel = connection.createChannel();


            channel.basicConsume("queue1", true, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    try {
                        System.out.println("Message received from queue:" + new String(delivery.getBody(), "UTF-8"));
                        Thread.sleep(1000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                    System.out.println("Accept message exception...");
                }
            });

            System.out.println("Start receiving message...");
            System.in.read();

        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            if(channel!=null && channel.isOpen()){
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null){
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

At the same time, start consumer C1 and C2 to view the consumption situation of the console

It can be seen that the number of messages received by two consumers is not different because of the length of time required to consume messages, that is, they consume messages one by one.

Fair mode: the above polling mode has some disadvantages. If the performance, CPU, hardware or network of two consumer machines are different, we should not let them get the same number of messages, but should consume more for those with good performance and less for those with poor performance. This is fair mode.
Since the polling mode is the default, the fairness mode requires some configuration to enable the fairness mode, mainly in the consumer section. The automatic response needs to be turned off and the manual response needs to be turned on.

public static void main(String[] args) {
         // 1. Create a connection factory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2. Set connection properties
        connectionFactory.setHost("139.196.255.42");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/hhl");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");

        Connection connection = null;
        Channel channel = null;

        try {
            // 3. Get the connection from the factory
            connection = connectionFactory.newConnection("consumer");
            // 4. Get the channel from the connection
            channel = connection.createChannel();

            Channel finalChannel = channel;


            // Fair distribution needs to pay attention to abc
            // a. qos index is defined to read several messages at a time. This value should be considered according to memory, CPU, total number of messages and other factors
            finalChannel.basicQos(1);
            //  b. If @ param2 is set to false, auto answer will be turned off
            finalChannel.basicConsume("queue1", false, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    try {
                        System.out.println("Message received from queue:" + new String(delivery.getBody(), "UTF-8"));
                        Thread.sleep(100);

                        // c. Turn on manual answer
                        finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                    System.out.println("Accept message exception...");
                }
            });

            System.out.println("Start receiving message...");
            System.in.read();

        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            if(channel!=null && channel.isOpen()){
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null){
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

To view the console output:


As you can see, because consumer 1 has a short sleep time, it gets 8 more messages, while consumer 2 only gets 2 messages, which is fair mode. Those with good performance naturally consume more messages.

Fanout mode: (also called publish subscribe mode, broadcast mode)

As can be seen from the figure above, different from the simple and work modes above, there is a switch in this mode, which means that the switch needs to be specified instead of the default switch in this mode.

Note: fanout mode, including the following routing mode and topics mode, refers to the mechanism that the switch sends messages to the queue. It should be different from the above work mode!!!

In this mode, we need to create a switch and a queue first, and then bind the relationship between the switch and the queue (this process is completed in the web interface of rabbitmq, which will be implemented in the code when spring boot integrates rabbitmq). The producer can send messages to the switch, and the switch will send messages to the specified queue according to different modes.

Producer Code: (all other codes are the same, only the differences are shown here)

 // 7. Send message to rabbitmq server

   String exchange="fanout_exchange"; // Specify the switch name and use fanout type switch
   String routing_Key=""; // Routing key and fanout switch are sent to all bound queues, so it is meaningless to specify routing here

   // In fact, the difference between these message modes lies in this method, switch and route
   channel.basicPublish(exchange,routing_Key,null,message.getBytes());

   System.out.println("Message sent successfully!");

For example, the switch fanout_exchange binds three queues, queue1, queue2, and queue3. Then the same message will be sent to the three queues at once.

Consumers don't show it. There are messages in several queues. Just consume.

Direct mode: (routing mode)

To put it simply, add a where condition to the fanout mode. Each queue has a routing key. When the direct switch adds a routing key condition to send a message, the message will be sent to the queue that meets the key.

 // 5. Create Queue (the relationship between switch and Queue and their binding has been created in rabbitmq console, which is omitted here)


            // *******************Create switch and queue with code and bind start to each other******************************

            // Create switch @ params1 switch name @ params2 switch type @ params3 persistent
            channel.exchangeDeclare("direct_exchange","direct",true);
            // Create queue
            channel.queueDeclare("queue5",true,false,false,null);
            channel.queueDeclare("queue6",true,false,false,null);
            channel.queueDeclare("queue7",true,false,false,null);
            // Relationship between binding queue and switch @ params1 queue name @ params2 switch name @ params3 route key corresponding to queue
            channel.queueBind("queue5","direct_exchange","email");
            channel.queueBind("queue6","direct_exchange","sms");
            channel.queueBind("queue7","direct_exchange","wechat");

            // *******************Create switches and queues with code and bind them to each other******************************



            // 6. Create a message to send
            String message = "Hello Direct !";

            // 7. Send message to rabbitmq server

            String exchange="direct_exchange"; // Specify the switch name
            String routing_Key1="email"; // Routing key
            String routing_Key2="sms";
            String routing_Key3="wechat";

            // In fact, the difference between these message modes lies in this method, switch and route
            channel.basicPublish(exchange,routing_Key1,null,message.getBytes());
            channel.basicPublish(exchange,routing_Key2,null,message.getBytes());
            channel.basicPublish(exchange,routing_Key3,null,message.getBytes());

            System.out.println("Message sent successfully!");

Topics: (theme mode)

topic mode is also called wildcard mode. Different from routing mode, routing mode is equivalent to the condition of where followed by =. topic mode is to change = to like, which is similar to fuzzy matching.

// 7. Send message to rabbitmq server

    String exchange="topic_exchange"; // Specify the switch name
    String routing_Key="com.user.order"; // Routing key is similar to direct. topic is replaced by rule here

    // In fact, the difference between these message modes lies in this method, switch and route
    channel.basicPublish(exchange,routing_Key,null,message.getBytes());

    System.out.println("Message sent successfully!");

Note the difference between the wildcard # and * characters

#: represents one or more or no characters
*: represents that there is and only one character (must be and only one character)

6, SpringBoot integrates with RabbitMQ

In the actual development, we must use rabbitmq in the springboot project. Here is how to use rabbitmq in the springboot project

Create a springboot project and introduce corresponding dependencies

<!-- rabbitMQ rely on -->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

To write a configuration file:

server:
  port: 8080
spring:
  rabbitmq:
    username: admin
    password: admin
     # This is actually used to distinguish. For example, multiple databases can be established under a mysql server, and there can be multiple hosts under a rabbitmq server. There are corresponding switches and queues under each host
    virtual-host: /hhl 
    host: 000.000.000.00
    port: 5672

Take direct mode as an example

  1. Create a rabbitmq configuration class, create switches and queues in the configuration class, and bind the relationship between them
/**
 * This configuration class is used for
 * 1.Create queue
 * 2.Create switch
 * 3.The relationship between binding queue and switch
 */
@Configuration
public class RabbitConfig {

    // Create queue
    @Bean
    public Queue emailQueue(){
        return new Queue("email.direct.queue",true);
    }
    @Bean
    public Queue smsQueue(){
        return new Queue("sms.direct.queue",true);
    }
    @Bean
    public Queue wechatQueue(){
        return new Queue("wechat.direct.queue",true);
    }

    // Create switch
    @Bean
    public DirectExchange directOrderExchange(){
        return new DirectExchange("direct_order_exchange",true,false);
    }

    // Binding switch and queue
    @Bean
    public Binding bindingFanout1(){
        return BindingBuilder.bind(emailQueue()).to(directOrderExchange()).with("email");
    }
    @Bean
    public Binding bindingFanout2(){
        return BindingBuilder.bind(smsQueue()).to(directOrderExchange()).with("sms");
    }
    @Bean
    public Binding bindingFanout3(){
        return BindingBuilder.bind(wechatQueue()).to(directOrderExchange()).with("wechat");
    }

}
  1. Producer sends message
	 @Autowired
    private RabbitTemplate rabbitTemplate;

    private String exchangeName = "direct_order_exchange";
    private String routingKey1 = "email";
    private String routingKey2 = "sms";
    private String routingKey3 = "wechat";


    public void addOrder(Long userID, Long productID, int num){

        String msg = "ID by"+userID+"Successful order! commodity id by"+productID+",The quantity is"+num+"The order number is:"+ UUID.randomUUID().toString();
        rabbitTemplate.convertAndSend(exchangeName,routingKey1,msg);
        rabbitTemplate.convertAndSend(exchangeName,routingKey2,msg);
        rabbitTemplate.convertAndSend(exchangeName,routingKey3,msg);
    }
  1. Consumer News
@Component
public class ConsumerService {

	// Here, just use the @ RabbitListener annotation to configure the listening queue. When there are messages in the queue, they will be consumed immediately
    @RabbitListener(queues = {"email.direct.queue"})
    public void getEmailQueueMessage(String message){
        System.out.println("email Consumer news:"+message);
    }

    @RabbitListener(queues = {"sms.direct.queue"})
    public void getSmsQueueMessage(String message){
        System.out.println("sms Consumer news:"+message);
    }

	//You can also create a queue switch on the consumer side and bind the relationship. The following is to perform these actions in the way of annotation. Of course, it is better to use the configuration class
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "email.topic.queue",autoDelete = "false"),
        exchange = @Exchange(value = "topic.order.exchange",type = ExchangeTypes.TOPIC),
        key = "#.email.*"
    ))
    public void getMessage(String message){
        System.out.println("wechat Consumer news:"+message);
    }
    
}

7, The content of RabbitMQ graphical management Web interface

Overview page:

Switch page:

Switch details page:

Queue page:

Queue details page:

Admin page:

8, RabbitMQ advanced

The above contents are the most basic working modes and using steps of rabbitmq. Now let's start with some other advanced knowledge in rabbitmq, which are also very important!

Message confirmation mechanism: after the producer sends a message to the broker, is the message successfully received by the broker, or is the message successfully sent? This requires a message confirmation mechanism, that is, after the producer sends a message, the broker gives the producer a feedback on whether the message has been successfully received.

Implementation steps:
rabbitmq adds a new line of configuration:

spring:
  rabbitmq:
    username: admin
    password: admin
    virtual-host: /hhl
    host: 139.196.255.42
    port: 5672
    publisher-confirm-type: correlated  # This is the configuration

Publisher confirm type is the configuration of the publish confirmation property. It has the following values:

  1. NONE: disabled. It is disabled by default
  2. Corelated: after a message is successfully released to the switch, a callback method will be triggered, that is, the message confirmation mechanism will be enabled
  3. SIMPLE: after testing, there are two effects: one is that the callback method will be triggered just like the corelated value; the other is that after the message is published successfully, the rabbitTemplate is used to call the waitForConfirms or waitForConfirmsOrDie method to wait for the broker node to return the sending result, and the next step logic will be determined according to the returned result, It should be noted that if the waitForConfirmsOrDie method returns false, the channel will be closed, and then the message cannot be sent to the broker

Define a class to implement the callback function interface of RabbitTemplate, that is, RabbitTemplate will call back after executing the method of sending message, so as to confirm whether the message is sent successfully.

public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback{
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        if(b){
            System.out.println("Message sent successfully!");
        }else{
            System.out.println("Message sending failed!");
        }
    }
}

Send message:

public void makeOrderTopic(){
        String orderId = UUID.randomUUID().toString();
        // send message
        
        // Set message confirmation mechanism
        rabbitTemplate.setConfirmCallback(new MessageConfirmCallback());
        rabbitTemplate.convertAndSend("direct_order_exchange","email",orderId);
    }

Trigger callback function after sending successfully:

Expiration time (TTL)
RabbitMQ can set expiration times for queues and messages, respectively

Setting the queue expiration time: it is very simple, that is, adding a parameter x-message-ttl when creating the queue. After setting, if the message reaches the queue and is not consumed within the specified expiration time, the message will be automatically deleted.

@Configuration
public class TTLRabbitConfig {

    @Bean
    public Queue ttlQueue(){
        Map<String,Object> args = new HashMap<>();
        // Set ttl queue expiration, the message will expire after 9 seconds, just add this parameter
        args.put("x-message-ttl",9000);
        return new Queue("ttl_direct_queue",true,false,false,args);
    }
}

Set message expiration time:

public void addMessageTTL(){

        // Set the expiration time of a message
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("10000"); // Set the message to expire in 10 seconds
                message.getMessageProperties().setContentType("UTF-8");
                return message;
            }
        };

        rabbitTemplate.convertAndSend("fanout_order_exchange","","hello ttlMessage",messagePostProcessor);

    }

Note: when the expiration time is set for both the queue and the message, the shorter one will prevail. That is, if the queue is set to expire in 10 seconds and a message is set to expire in 5 seconds, then the message will expire in 5 seconds after it is put into the queue

Dead letter queue (DLX): dead letter exchange

A problem can be raised from the above expired queue. It is too violent to delete the messages in the queue directly after they have expired. After deletion, they can not be found. Therefore, we can declare a switch to receive these expired messages. The switch is called dead letter switch, and the queue bound under the switch is called dead letter queue.

@Configuration
public class TTLRabbitConfig {

    @Bean
    public Queue ttlQueue(){
        Map<String,Object> args = new HashMap<>();
        // Set ttl queue expiration, the message will expire after 9 seconds, just add this parameter
        args.put("x-message-ttl",9000);
        // To specify the dead letter queue is to specify which switch I want to send the expired message to after the message has expired, and the designated switch will send the message to the queue (dead letter queue)
        args.put("x-dead-letter-exchange","dead_direct_exchange");
        // Since the specified switch is of direct type, the routing should also be configured here. Of course, if it is in fanout mode, there is no need to configure it here
        args.put("x-dead-letter-routing-key","dead");
        return new Queue("ttl_direct_queue",true,false,false,args);
    }
    @Bean
    public DirectExchange directOrderExchange2(){
        return new DirectExchange("ttl_direct_exchange",true,false);
    }

    @Bean
    public Binding bindingttl(){
        return BindingBuilder.bind(ttlQueue()).to(directOrderExchange2()).with("ttl");
    }

    //Note: the following dead letter queue and dead letter switch are ordinary queues and switches,
    // It's just that after the messages in other queues have expired, it's specified that the expired messages should be put into the switch below, and then sent to the queue by this switch,
    // Therefore, it is called dead letter queue and switch. In fact, it is a common switch. The emphasis is on the above queue to specify the configuration of dead letter queue

    // Create a dead letter queue
    @Bean
    public Queue deadQueue(){
        return new Queue("dead_direct_queue",true,false,false);
    }
    // Create dead letter switch
    @Bean
    public DirectExchange directDeadExchange(){
        return new DirectExchange("dead_direct_exchange",true,false);
    }
    // Binding to each other
    @Bean
    public Binding bindingDead(){
        return BindingBuilder.bind(deadQueue()).to(directDeadExchange()).with("dead");
    }

}

Note: the dead letter switch and dead letter queue are ordinary switches and queues. The comments in the above code are very clear.

When will a message enter the dead letter queue?

  1. Message rejected
  2. Message expired
  3. The queue has reached the maximum length

Distributed transaction

Distributed transaction is a problem that must be solved in distributed architecture system, and it is also the most frequently asked problem in interview. There are several solutions to distributed transaction

  1. Two phase commit (2PC) needs the support of database manufacturers, and the java components include atomikos.
  2. Compensation services (TCC) Yan Xuan, Ali, ant financial services.
  3. Local message table (asynchronous guarantee) such as: Alipay, WeChat pay the initiative to query the payment status, the form of the bill.
  4. MQ transaction message asynchronous scenario has strong generality and expansibility.

Here is mainly to explain how to use message middleware RabbitMQ to solve the problem of distributed transaction. This should be the most commonly used method to solve the problem of distributed transaction~

What is distributed transaction?

In short, in a distributed system, an operation may call multiple services. These services are located on different servers, and each server has a corresponding different database. Distributed transaction is to ensure that all the services to be called will succeed or fail after an operation. In essence, distributed transaction is to ensure the data consistency of different databases.

For example, compared with the following order operations, the order system and the inventory system are two different systems. Placing an order is an operation. In this operation, you need to add an order in the order system and reduce an inventory in the inventory system. Then these two steps must either succeed at one time or fail at the same time, This is the problem of distributed transaction.

The following is a diagram to show the problems to be solved by Distributed Accounting Firms:

To solve the distributed transaction problem based on MQ, the overall design idea is as follows:


As can be seen from the figure above, the key of MQ to solve distributed transactions is to add MQ middleware between the two systems.

Since middleware is added between the two systems, we need to consider the problem of high reliability in message sending and consumption. Let's first look at how to solve the problem of high reliability in message production

The general idea is that after placing an order in the order system, an order record must be added to the order table, and then we need to send a message of a new order to MQ. At this time, we don't know whether the message is sent successfully. If the message fails to be sent, we don't deal with it. Then MQ has no message, let alone the distribution center, It doesn't know that it has added an order.
Therefore, we must first ensure that MQ must receive the message generated by the new order. The solution is to combine a redundant table with the message response mechanism mentioned above.
After a new order record is generated from the order table, a new order record is also generated to the redundant table. The order record of the redundant table has a status field, status, which is 0 by default, that is, the field to record whether the message is sent successfully. After the message is sent to MQ, MQ responds to the order system through the response mechanism. If the message is received successfully, then this field status is changed to 1, Indicates that the message has been successfully sent to MQ.

If MQ fails to receive a message, it will not change this value. You can regularly check the record of status=0 in the redundancy table in the system. These are all because the message has not been sent successfully. After finding it, it will be sent again. In this way, high reliability of message production can be achieved.

After solving the problem of high reliability of message generation, we will solve the problem of high reliability of message consumption

After consumers get the message, they execute the corresponding business logic. For example, in the above example, the order center successfully sends the order message to MQ. Consumers listen to the queue in MQ, get the message for consumption, and complete the logic of the distribution center. Of course, this is normal. But if the consumer has a problem with the code in the business logic of consumption and reports an error, what will happen at this time? Will messages be lost or sent all the time? This is the problem of high consumption reliability

Simulate an error:

@RabbitListener(queues = {"email.direct.queue"})
    public void getEmailQueueMessage(String message){
        System.out.println("email The consumption of news began");
        System.out.println(1/0);// Manual manufacturing exception
        System.out.println("email Consumer news:"+message);
    }

After testing, it is found that the message retrial mechanism will be triggered after the consumer's consumption message reports an error. Without configuring the retrial mechanism, it will cause a dead loop and trigger the code loop of the consumption message all the time.

There are several ways to solve the dead cycle

  1. Control the number of retries + dead letter queue
rabbitmq:
    port: 5672
    host: 00.000.000.00
    username: admin
    password: admin
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual  # Here is to start the manual ack and let the program control the resend, delete and transfer of MQ messages
        retry:
          enabled: true  # Turn on and try again
          max-attempts: 3  #max retries 
          initial-interval: 2000ms  #Time between retries
  1. try+catch + manual ack
  2. try+catch + manual ack + dead letter queue + manual intervention

In fact, the above method is also used to solve the problem of high reliability of consumption, but the best one is the third one. The third one is explained in detail below.

	@RabbitListener(queues = {"order.queue"})
	// The tag in the parameter is the unique id of a message and can be used as the id of the message
    public void messageconsumer(String ordermsg, Channel channel,
                                CorrelationData correlationData,
                                @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        try {
            
            System.out.println("received MQ The news is that: " + ordermsg + ",count = " + count++);
            
            Order order = JsonUtil.string2Obj(ordermsg, Order.class);
            
            String orderId = order.getOrderId();
            // 4: Save the waybill (idempotence is involved here, that is, after the message consumption fails, it will be retried. For example, the method in the following service will be executed in a loop all the time,
            //Then we need to consider multiple loops. This method can't save the order data every time. For example, we can set the primary key id for the order table, or use distributed locks to solve the idempotent problem.)
            dispatchService.dispatch(orderId);
            
           
            System.out.println(1 / 0); //There is an exception
            
			// 5: Manual ack tells mq that the message has been consumed normally. Note that here is basic ACK, the response after normal consumption
            channel.basicAck(tag, false);
            
        } catch (Exception ex) {
            // After catching an exception, you should also respond. Here is the method of basic NACK to respond
            //@param1: message tag @ param2: false multiple processing @ param3: whether to resend the request
            // Parameter 3 false will not resend, but will send the message to the dead letter queue
            // Parameter 3 true will retransmit, that is, it will fall into a dead loop (note that the configuration of retransmission times will be invalid at this time)
            channel.basicNack(tag, false, false);// Dead letter queue
        }
    }

After the consumption error, manually respond, put the message into the dead letter queue, and then another consumer listens to the private message queue

@RabbitListener(queues = {"dead.order.queue"})
    public void messageconsumer(String ordermsg, Channel channel,
                                CorrelationData correlationData,
                                @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        try {
            // 1: Gets the message of the message queue
            System.out.println("received MQ The news is that: " + ordermsg );
            //  2: Get order service information
            Order order = JsonUtil.string2Obj(ordermsg, Order.class);
            // 3: Get order id
            String orderId = order.getOrderId();
            // On idempotency
            //int count = countOrderById(orderId);
            // 4: Save waybill
            //if(count==0)dispatchService.dispatch(orderId);
            //if(count>0)dispatchService.updateDispatch(orderId);
             dispatchService.dispatch(orderId);
            // 3: Manual ack tells mq that the message has been consumed normally
            channel.basicAck(tag, false);
        } catch (Exception ex) {
			// If there is still an error in the processing logic of the dead letter queue, manual intervention is necessary
            System.out.println("manual intervention ");
            System.out.println("SMS alert");
            System.out.println("Transfer messages to other storage at the same time DB");
            channel.basicNack(tag, false,false);
        }
    }

The above is the code implementation of try+catch + manual ack + dead letter queue + manual intervention to solve the problem of high consumption reliability. The general flow chart is as follows:

The above is the method of using MQ to solve distributed transactions, and its focus is to deal with two aspects: high reliability of producer's production message and high reliability of consumer's consumption message.

This method based on MQ has the following advantages and disadvantages

advantage:

  1. Strong versatility
  2. Convenient expansion
  3. The coupling degree is low, and the scheme is relatively mature

Disadvantages:

  1. Based on message middleware, it is only suitable for asynchronous scenarios
  2. Messages will delay processing and need to be tolerated by the business

In fact, in the actual development, we should try our best to avoid distributed transactions, because no matter how good the processing method is, it can not guarantee 100% error free.

Tags: Distribution queue Middleware RabbitMQ

Posted by textbox on Wed, 09 Jun 2021 04:50:18 +0930