Cereal Mall - Distributed Advanced [mall business - RabbitMQ]

  1. Grain Mall - distributed basics [Environmental preparation]
  2. Cereal Mall - distributed infrastructure [business preparation]
  3. Grain Mall - Distributed Advanced chapter [business writing] continues to be updated
  4. Cereal Mall - Distributed Advanced - ElasticSearch
  5. Cereal Mall - Distributed Advanced chapter - distributed lock and cache
  6. The project is hosted in gitee

Refer to my RabbitMQ notes: the project will be released after writing

1, Environment deployment

1. Docker installation RabbitMQ

[root@hgwtencent ~]# docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
[root@hgwtencent ~]# docker update rabbitmq --restart=always
rabbitmq
  • 436925672: Erlang Discovery & cluster port
  • 56725671: AMQP port
  • 15672: web management background port
  • 61361614: STOMP protocol port
  • 18838883: MQTT protocol port

Login ip:15672 to access the web management background port:

Account No.: guest
Password: guest

2. RabbitMQ demo

The instructor's intention here is to let everyone understand the three modes of the switch without making records.

  • Direct [point-to-point]: (Routing key in the message) = = (Routing key in the queue bound by the switch and queue)

  • Fanout [publish subscribe]: messages are sent to all queues bound by the exchange

  • Topic [publish and subscribe]:

    • (Routing key in the message) like (Routing key in the queue bound by the switch and queue)
    • *(asterisk) can replace a word
    • #(pound sign) can replace zero or more words

2.1. Creation of switches and queues and demonstration of their binding

  1. Create a switch

  1. Create a queue

  1. Switch binding queue



2, SpringBoot integrates RabbitMq

2.1. SpringBoot integrates RabbitMq

  • Using RabbitMQ

    • 1. Introduce amqp scenario launcher; RabbitAutoConfiguration will take effect automatically

    • 2. RabbitTemplate, AmqpAdmin, CachingConnectionFactory and RabbitMessagingTemplate are automatically configured in the container
      All properties are bound here: spring rabbitmq

      @ConfigurationProperties(prefix = "spring.rabbitmq")
      public class RabbitProperties {}
      
    • 3. Configure spring.com in the configuration file Rabbitmq information

    • 4. @ EnableRabbit enable function

1. Import amqp scenario initiator dependencies

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. Add configuration

spring:
  rabbitmq:
    host: 124.222.223.222
    username: guest
    port: 5672
    virtual-host: /
    password: guest

3. Add @ EnableRabbit annotation to the main startup class

@EnableRabbit
@SpringBootApplication
public class GulimallOrderApplication {

    public static void main(String[] args) {
        SpringApplication.run(GulimallOrderApplication.class, args);
    }

}

2.2,AmqpAdmin

Use AmqpAdmin to create Exchange, Queue, Binding

2.2.1. Create switch

/**
 * Create switch
 * TopicExchange
 * FanoutExchange
 * DirectExchange
 */
@Test
public void createExchange(){

    /**
     * String name Switch name
     * boolean durable  Persistent
     * boolean autoDelete Delete automatically
     * Map<String, Object> arguments parameter
     */
    DirectExchange directExchange = new DirectExchange("hello-java-exchange",true,false);
    amqpAdmin.declareExchange(directExchange);
    log.info("Exchange[{}]Created successfully",directExchange.getName());
}

2.2.2. Create queue

/**
 * Create queue
 */
@Test
public void createQueue(){
    /**
     * Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
     *  String name,
     *  boolean durable,   Persistent
     *  boolean exclusive,   Exclusive: can only be connected by one switch at the same time
     *  boolean autoDelete   Delete automatically
     *  Map<String, Object> arguments Carry parameters
     */
    Queue queue = new Queue("hello-java-queue",true,false,false);
    amqpAdmin.declareQueue(queue);
    log.info("Queue[{}]Created successfully",queue.getName());
}

2.2.3. Create binding

/**
 * Create binding
 */
@Test
public void createBinding() {
    /**
     * Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments)
     *  String destination, Destination (queue or subject)
     *  Binding.DestinationType destinationType, Destination type (queue or subject)
     *  String exchange, Switch
     *  String routingKey,  Routing key
     *  Map<String, Object> arguments
     * Bind the switch specified by exchange with the destination, and use routingKey as the specified routing key
     */
    Binding binding = new Binding("hello-java-queue",
            Binding.DestinationType.QUEUE,
            "hello-java-exchange",
            "hello.java",
            null);
    amqpAdmin.declareBinding(binding);
    log.info("Binding Created successfully");
}

2.3,RabbitTemplate

Use the RabbitTemplate to send and receive messages.

2.3.1 sending messages

/**
* convertAndSend(String exchange, String routingKey, Object object)
* String exchange, exchanger
* String routingKey,   Routing value
* Object object    Message. If the message sent is an object, we will use the serialization mechanism to write the object out.
 */
@Test
public void sendMsgTest(){
  OrderReturnApplyEntity orderReturnApplyEntity = new OrderReturnApplyEntity();
  orderReturnApplyEntity.setId(1L);
  orderReturnApplyEntity.setCreateTime(new Date());
  orderReturnApplyEntity.setReturnName("Ha ha ha");

  //1. Send a message. If the message sent is an object, we will use the serialization mechanism to write the object out. Object must implement Serializable
  String msg = "Hello,World!";
  rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", orderReturnApplyEntity);
  log.info("news[{}]Sent successfully!",msg);
}
  1. Send a message. If the message sent is an object, we will use the serialization mechanism to write the object out. Object must implement Serializable
  2. Or the object we want to send is serialized into JSON

be careful:

Configure MyRabbitConfig so that the message of the object type sent can be a json

Add the "com.atguigu.gulimall.order.config.MyRabbitConfig" class with the following code:

package com.atguigu.gulimall.order.config;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MyRabbitConfig {

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

After configuring MyRabbitConfig, the data sent and received is serialized into JSON

2.3.2. Rabbitlistener & rabbithandler receive message

  • @Listen on the queue

  • @RabbitHandler: mark on method (overload distinguishes different messages)

2.3.2.1,@RabbitListener

Listening for messages: use @ RabbitListener; The main startup class must have @ EnableRabbit

  • @interface RabbitListener
    String[] queues() default {}; Listening queue

  • Parameter list:

    • 1. Message: native message details. Head + body
    •  2,T<Type of message sent> : For example: OrderReturnReasonEntity content; Specifies the type of message received
      
    •  3,Channel channel : Current data transmission channel
      
/**
 * @interface RabbitListener
 *     String[] queues() default {}; Listening queue
 * Parameter list:
 *      1,Message message: Native message details. Head + body
 *      2,T<Type of message sent >: for example: OrderReturnReasonEntity content
 *      3,Channel channel : Current data transmission channel
 */
@RabbitListener(queues = {"hello-java-queue"})
public void recieveMessage(Message message,
                           OrderReturnReasonEntity content,
                           Channel channel){
    // Header properties
    MessageProperties properties = message.getMessageProperties();
    // Message body properties
    byte[] body = message.getBody();
    System.out.println("Message received:" + message +"Content:"+content);
}
2.3.2.2. RabbitHandler receives messages

Use @ RabbitListener annotation on the class to identify which message queue to listen to. Add @ RabbitHandler annotation on the method and overload to distinguish different messages.

  • @RabbitListener: which message queue is added to the class to listen on

  • @RabbitHandler: mark on method (overload distinguishes different messages)

Add the "com.atguigu.gulimall.order.controller.RabbitController" class. The code is as follows:

@Slf4j
@RestController
public class RabbitController {
    
    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMq")
    public String sendMq(@RequestParam(value = "num",defaultValue = "10") Integer num){
        for (int i = 0; i < num; i++) {
            if (i%2 == 0) {
                OrderReturnApplyEntity orderReturnApplyEntity = new OrderReturnApplyEntity();
                orderReturnApplyEntity.setId(1L);
                orderReturnApplyEntity.setCreateTime(new Date());
                orderReturnApplyEntity.setReturnName("Ha ha ha");
                rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", orderReturnApplyEntity);
            } else {
                OrderEntity entity = new OrderEntity();
                entity.setOrderSn(UUID.randomUUID().toString());
                rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", entity);
            }
        }
        return "ok";
    }
}

Modify the "com.atguigu.gulamall.order.service.impl.orderitemserviceimpl" class, and the code is as follows:

@Service("orderItemService")
@RabbitListener(queues = {"hello-java-queue"})
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {

    @Override
    public PageUtils queryPage(Map<String, Object> params) {
        IPage<OrderItemEntity> page = this.page(
                new Query<OrderItemEntity>().getPage(params),
                new QueryWrapper<OrderItemEntity>()
        );

        return new PageUtils(page);
    }

    /**
     * @interface RabbitListener
     *     String[] queues() default {}; Listening queue
     * Parameter list:
     *      1,Message message: Native message details. Head + body
     *      2,T<Type of message sent >: e.g. OrderReturnReasonEntity content
     *      3,Channel channel : Current data transmission channel
     *
     * Queue: Many people can listen, but a message can only be received by one consumer.
     * Scenario:
     *      1),Order service starts multiple: only one client can receive the same message
     *      2),When only one message is completely processed and the method runs, we can receive the next message
     */
    @RabbitHandler
    public void recieveMessage(OrderReturnApplyEntity content){
        System.out.println("Received message:" + content);
    }

    @RabbitHandler
    public void recieveMessage2(OrderEntity content){
        System.out.println("Received message:" + content);
    }
}

2.4.1 reliable delivery - sender confirmation

  • The server receives a message p - > b: ConfirmCallback
    1. pring.rabbitmq.publisher-confirms=true
    2. Set confirmation callback
  • Callback e - > Q: ReturnCallback when the message arrives in the queue
    1. spring.rabbitmq.publisher-returns: true
    2. spring.rabbitmq.template.mandatory: true
    3. Set the confirmation callback ReturnCallback

Step 1: modify the configuration file

spring:
  rabbitmq:
    publisher-confirms: true  # Enable sender confirmation
    publisher-returns: true   # Enable the confirmation of the sender's message arrival queue
    template:
      mandatory: true         # As long as the queue arrives, the returnconfirm will be called back asynchronously

Step 2: customize the rabbit template

package com.atguigu.gulimall.order.config;

@Configuration
public class MyRabbitConfig {

    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * Use JSON serialization mechanism for message conversion
     * @return
     */
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }


    /**
     * Customize RabbitTemplate
     *  1,The server will call back when it receives the message
     *      1),spring.rabbitmq.publisher-confirms=true
     *      2),Set confirmation callback
     *  2,The message arrives in the queue correctly for callback
     *      1),spring.rabbitmq.publisher-returns: true
     *      2),spring.rabbitmq.template.mandatory: true
     *      3),Set the confirmation callback ReturnCallback
     *  3,Consumer side confirmation (ensure that each message is consumed correctly, and then the Broker can delete this message)
     */
    @PostConstruct  //After the MyRabbitConfig object is created, execute this method
    public void initRabbitTemplate() {
        // Set message confirmation callback
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 1,As long as the message arrives, the Broker will return ack=true
             * @param correlationData Unique associated data of the current message (this is the unique id of the message)
             * @param b Does the message succeed or fail
             * @param s Reasons for failure
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println("confirm...[correlationData:"+correlationData+"];[ack="+b+"];[cause="+s+"]");
            }
        });

        /**
         * Set the acknowledgement callback of message arrival queue
         */
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * Trigger timing: this failure callback is triggered as long as the message is not delivered to the specified queue
             * @param message Message details of delivery failure;
             * @param i Reply status code;
             * @param s The text content of the reply;
             * @param s1    Which switch was this message sent to
             * @param s2    At that time, it was called which routing key was used for messages
             */
            @Override
            public void returnedMessage(Message message, int i, String s, String s1, String s2) {
                System.out.println("Fail Message["+message+"]==>i["+i+"]==>s["+s+"]==>s1["+s1+"]==>s2["+s2+"]");
            }
        });
    }
}

2.4.2 reliable delivery - consumer confirmation

Consumer side confirmation (ensure that each message is consumed correctly, and then ensure that the Broker can delete this message)

  • 1) . the default is automatic confirmation. As long as the message is received, the client will automatically confirm and the server will remove the message

    • Question:
      •          1,Received a message, but failed to process it. Automatically reply to the server ACK,A message loss occurred.
        
      •          2,We receive many messages and automatically reply to the server ack,Only one message was processed successfully and went down. Message loss occurred
        
    • Solution: confirm manually, and return after manual confirmation after each processing.
  • Manual ack

    1. spring.rabbitmq.listener.simple.acknowledge-mode=manual
    2. Manual confirmation return
      • channel.basicAck(deliveryTag,false); Sign for; If the business is successfully completed, it should be signed
      • channel.basicNack(deliveryTag,false,true); Dishonor: business failure, dishonor

**Step 1, * * modify the configuration file to: confirm manually

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual  # Enable manual ack message mode

**Step 2: manually sign in * * business code

@RabbitHandler
public void recieveMessage(Message message,OrderReturnApplyEntity content, Channel channel){
    System.out.println("Received message:" + content);
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    System.out.println("Message processing completed!");
    try {
        if (deliveryTag%2==0) {
            // Sign and return Ack; If the business is successfully completed, it should be signed
            channel.basicAck(deliveryTag,false);
            System.out.println("Signed for the goods..."+deliveryTag);
        } else {
            /**
             * deliveryTag: The label of the current message distribution, a string of numbers
             * multiple: Batch processing
             * requeue: 
             *      true: Rejoin the team
             *      false: Discard message
             */
            channel.basicNack(deliveryTag,false,true);
            System.out.println("Refused to sign the goods..."+deliveryTag);
        }
    } catch (IOException e) {
        // Network interruption
    }
}

Tags: RabbitMQ MQ

Posted by Dan39 on Sat, 16 Apr 2022 19:00:33 +0930