RabbitMQ practical guide -- RabbitMQ advanced

1. Backup switch (AE)

Alternate Exchange (AE).

If the producer does not set the mandatory parameter when sending a message, the message will be lost if it is not routed; If the mandatory parameter is set, the programming logic of ReturnListener needs to be added, and the producer's code will become complex.

If you don't want to complicate the producer's programming logic and don't want to lose messages, you can use a backup switch to store unrouted messages in RabbitMQ and process them when needed.

It can be realized by adding the alternate exchange parameter when declaring the switch (calling the channel.exchangeDeclare method), or by means of Policy (see section 6.3 for details). If both are used at the same time, the former has higher priority and will override the Policy setting.

	Map<String, Object> args = new HashMap<String, Object>();
	args.put("alternate-exchange", "myAe");  // Set the backup switch myAe in the normalExchange parameter
	channel.exchangeDeclare("normalExchange", "direct", true, false, args);
	channel.exchangeDeclare("myAe", "fanout", true, false, null); // Declare backup switch myAe
	channel.queueDeclare("normalQueue", true, false, false, null);
	channel.queueBind("normalQueue", "normalExchange", "normalKey");
	channel.queueDeclare("unroutedQueue", true, false, false, null);
	channel.queueBind("unroutedQueue", "myAe", "");

In fact, the backup switch is not much different from the ordinary switch. In order to facilitate use, it is recommended to set it to fanout type.

For backup switches, the following special cases are summarized:

  • If the set backup switch does not exist, there will be no exceptions on the client and RabbitMQ server, and the message will be lost.
  • If the backup switch is not bound to any queue, there will be no exception on the client and RabbitMQ server, and the message will be lost.
  • If the backup switch does not have any matching queues, the client and RabbitMQ server will not have exceptions, and the message will be lost.
  • If the backup switch is used with the mandatory parameter, the mandatory parameter is invalid.

2. Expiration time (TTL)

TTL, short for time to live, is the expiration time. RabbitMQ can set TTL for messages and queues.

2.1 set TTL of message

Once the lifetime of the message in the queue exceeds the set TTL value, it will become a "dead message", and the consumer will no longer receive the message. At present, there are two methods to set the TTL of messages.

  1. The first method is to set the queue attribute, and all messages in the queue have the same expiration time.
    The method of setting message TTL through queue attribute is in channel The queuedeclare method is implemented by adding the x-message-ttl parameter. The unit of this parameter is milliseconds.

        Map<String, Object> argss = new HashMap<String, Object>();
        argss.put("x-message-ttl",6000);
        channel.queueDeclare(queueName, durable, exclusive, autoDelete, argss);
    
  2. The second method is to set the message itself separately, and the TTL of each message can be different. If the two methods are used together, the TTL of the message takes the smaller value between the two.
    The method of setting TTL for each message is in channel The attribute parameter of expiration is added to the basicpublish method, and the unit is milliseconds.

    	AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
    	builder.deliveryMode(2);//Persistent message
    	builder.expiration("60000");//Set TTL=60000ms
    	AMQP.BasicProperties properties = builder.build();
    	channel.basicPublish(exchangeName, routingKey, mandatory, properties, "ttlTestMessage".getBytes());
    

For the first method of setting the TTL attribute of the queue, once the message expires, it will be erased from the queue. In the second method, even if the message expires, it will not be erased from the queue immediately, because whether each message expires is determined before it is delivered to the consumer.

Why are these two methods handled differently?

In the first method, the expired messages in the queue must be at the head of the queue. RabbitMQ only needs to scan the head of the queue regularly to see if there are expired messages. In the second method, the expiration time of each message is different. If you want to delete all expired messages, it is necessary to scan the whole queue, so you might as well wait until the message is about to be consumed to determine whether it expires. If it expires, you can delete it.

2.2 setting TTL of queue

Through channel The x-expires parameter in the queuedeclare method controls how long the queue is unused before it is automatically deleted. Unused means that there are no consumers on the queue, the queue has not been redeclared, and basic has not been called within the expiration period Get command.

RabbitMQ will ensure that the queue is deleted after the expiration time, but it does not guarantee how timely the deletion action is. After RabbitMQ restarts, the expiration time of the persistent queue will be recalculated.

The x-expires parameter used to indicate the expiration time is in milliseconds and subject to the same constraints as x-message-ttl, but cannot be set to 0.

     Map<String, Object> args = new HashMap<String, Object>();
     args.put("x-expires", 1800000);
     channel.queueDeclare("myqueue", false, false, false, args);

3. Dead letter queue

DLX (dead letter exchange) dead letter exchanger. When a message becomes a dead message in a queue, it can be re sent to another switch, which is DLX. The queue bound to DLX is called dead message queue.

Messages become dead letters generally due to the following situations:

  • The message is rejected (Basic.Reject/Basic.Nack), and the request parameter is set to false;
  • Message TTL expires;
  • The queue has reached its maximum length.

DLX is also a normal switch. It is no different from a general switch. It can be specified on any queue. In fact, it is to set the properties of a queue. When there is a dead letter in this queue, RabbitMQ will automatically republish the message to the set DLX, and then it will be routed to another queue, that is, the dead letter queue.

Through in channel Set the x-dead-letter-exchange parameter in the queuedeclare method to add DLX to the queue.

     channel.exchangeDeclare("exchange.dlx", "direct", true);
     channel.exchangeDeclare("exchange.normal", "fanout", true);
     Map<String, Object> args = new HashMap<String, Object>();
     args.put("x-message-ttl", 10000);
     // Set up dead letter exchange DLX for queue
     args.put("x-dead-letter-exchange", "exchange.dlx");
     // You can specify a routing key for DLX. If there is no special designation, the routing key of the original queue will be used
     args.put("x-dead-letter-routing-key", "routingkey"); 
     channel.queueDeclare("queue.normal", true, false, false, args);
     channel.queueBind("queue.normal", "exchange.normal", "");
     channel.queueDeclare("queue.dlx", true, false, false, null);
     channel.queueBind("queue.dlx", "exchange.dlx", "routingkey");
     channel.basicPublish("exchange.normal", "rk", MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx".getBytes());


DLX is a very useful feature for rabbit MQ. It can deal with the situation that the message cannot be correctly consumed by the consumer (the consumer calls Basic.Nack or Basic.Reject) and is placed in the dead letter queue. The subsequent analysis program can analyze the abnormal situation encountered at that time by consuming the content in the dead letter queue, so as to improve and optimize the system.

Handling method of dead letter:

  1. Discard. If it is not very important, you can choose to discard it
  2. Record dead letter receipt, and then conduct subsequent business analysis or processing
  3. Through the dead letter queue, it is processed by the application responsible for listening for dead letters

4. Delay queue

The object stored in the delay queue is the corresponding delay message. The so-called "delay message" refers to that when the message is sent, the consumer does not want to get the message immediately, but waits for a specific time before the consumer can get the message for consumption.

There are many usage scenarios for delay queues, such as:

  • In the order system, a user usually has 30 minutes to pay after placing an order. If the payment is not successful within 30 minutes, the order will be handled abnormally. At this time, the delay queue can be used to process these orders.
  • Users want to remotely control their smart devices at home through mobile phones and work at a specified time. At this time, the user instruction can be sent to the delay queue. When the set time of the instruction is up, the instruction can be pushed to the intelligent device.

In AMQP protocol, or RabbitMQ itself does not directly support the function of delay queue, but the function of delay queue can be simulated through DLX and TTL.

5. Priority queue

Priority queue, as the name suggests, queues with high priority have high priority, and messages with high priority have the privilege of being consumed first.

This can be achieved by setting the x-max-priority parameter of the queue:

        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-max-priority", 10);
        channel.queueDeclare("queue.priority", true, false, false, args);

The above code demonstrates how to configure the maximum priority of a queue. After that, you need to set the current priority of the message in the message when sending:

        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
        builder.priority(5);
        AMQP.BasicProperties properties = builder.build();
        channel.basicPublish("exchange_priority","rk_priority",properties,("messages").getBytes());

In the above code, the priority of the message is set to 5. The default minimum is 0 and the maximum is the maximum priority set for the queue.

Messages with high priority can be consumed preferentially, which is also a prerequisite: if the consumption speed of consumers is greater than that of producers and there is no message accumulation in the Broker, setting the priority of sent messages has no practical significance.

Tags: RabbitMQ

Posted by taiger on Sat, 16 Apr 2022 08:39:32 +0930