- Grain Mall - distributed basics [Environmental preparation]
- Cereal Mall - distributed infrastructure [business preparation]
- Grain Mall - Distributed Advanced chapter [business writing] continues to be updated
- Cereal Mall - Distributed Advanced - ElasticSearch
- Cereal Mall - Distributed Advanced chapter - distributed lock and cache
- The project is hosted in gitee
Refer to my RabbitMQ notes: the project will be released after writing
1, Environment deployment
1. Docker installation RabbitMQ
[root@hgwtencent ~]# docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management [root@hgwtencent ~]# docker update rabbitmq --restart=always rabbitmq
- 436925672: Erlang Discovery & cluster port
- 56725671: AMQP port
- 15672: web management background port
- 61361614: STOMP protocol port
- 18838883: MQTT protocol port
Login ip:15672 to access the web management background port:
Account No.: guest
Password: guest
2. RabbitMQ demo
The instructor's intention here is to let everyone understand the three modes of the switch without making records.
-
Direct [point-to-point]: (Routing key in the message) = = (Routing key in the queue bound by the switch and queue)
-
Fanout [publish subscribe]: messages are sent to all queues bound by the exchange
-
Topic [publish and subscribe]:
- (Routing key in the message) like (Routing key in the queue bound by the switch and queue)
- *(asterisk) can replace a word
- #(pound sign) can replace zero or more words
2.1. Creation of switches and queues and demonstration of their binding
- Create a switch
- Create a queue
- Switch binding queue
2, SpringBoot integrates RabbitMq
2.1. SpringBoot integrates RabbitMq
-
Using RabbitMQ
-
1. Introduce amqp scenario launcher; RabbitAutoConfiguration will take effect automatically
-
2. RabbitTemplate, AmqpAdmin, CachingConnectionFactory and RabbitMessagingTemplate are automatically configured in the container
All properties are bound here: spring rabbitmq@ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitProperties {}
-
3. Configure spring.com in the configuration file Rabbitmq information
-
4. @ EnableRabbit enable function
-
1. Import amqp scenario initiator dependencies
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2. Add configuration
spring: rabbitmq: host: 124.222.223.222 username: guest port: 5672 virtual-host: / password: guest
3. Add @ EnableRabbit annotation to the main startup class
@EnableRabbit @SpringBootApplication public class GulimallOrderApplication { public static void main(String[] args) { SpringApplication.run(GulimallOrderApplication.class, args); } }
2.2,AmqpAdmin
Use AmqpAdmin to create Exchange, Queue, Binding
2.2.1. Create switch
/** * Create switch * TopicExchange * FanoutExchange * DirectExchange */ @Test public void createExchange(){ /** * String name Switch name * boolean durable Persistent * boolean autoDelete Delete automatically * Map<String, Object> arguments parameter */ DirectExchange directExchange = new DirectExchange("hello-java-exchange",true,false); amqpAdmin.declareExchange(directExchange); log.info("Exchange[{}]Created successfully",directExchange.getName()); }
2.2.2. Create queue
/** * Create queue */ @Test public void createQueue(){ /** * Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) * String name, * boolean durable, Persistent * boolean exclusive, Exclusive: can only be connected by one switch at the same time * boolean autoDelete Delete automatically * Map<String, Object> arguments Carry parameters */ Queue queue = new Queue("hello-java-queue",true,false,false); amqpAdmin.declareQueue(queue); log.info("Queue[{}]Created successfully",queue.getName()); }
2.2.3. Create binding
/** * Create binding */ @Test public void createBinding() { /** * Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments) * String destination, Destination (queue or subject) * Binding.DestinationType destinationType, Destination type (queue or subject) * String exchange, Switch * String routingKey, Routing key * Map<String, Object> arguments * Bind the switch specified by exchange with the destination, and use routingKey as the specified routing key */ Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE, "hello-java-exchange", "hello.java", null); amqpAdmin.declareBinding(binding); log.info("Binding Created successfully"); }
2.3,RabbitTemplate
Use the RabbitTemplate to send and receive messages.
2.3.1 sending messages
/** * convertAndSend(String exchange, String routingKey, Object object) * String exchange, exchanger * String routingKey, Routing value * Object object Message. If the message sent is an object, we will use the serialization mechanism to write the object out. */ @Test public void sendMsgTest(){ OrderReturnApplyEntity orderReturnApplyEntity = new OrderReturnApplyEntity(); orderReturnApplyEntity.setId(1L); orderReturnApplyEntity.setCreateTime(new Date()); orderReturnApplyEntity.setReturnName("Ha ha ha"); //1. Send a message. If the message sent is an object, we will use the serialization mechanism to write the object out. Object must implement Serializable String msg = "Hello,World!"; rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", orderReturnApplyEntity); log.info("news[{}]Sent successfully!",msg); }
- Send a message. If the message sent is an object, we will use the serialization mechanism to write the object out. Object must implement Serializable
- Or the object we want to send is serialized into JSON
be careful:
Configure MyRabbitConfig so that the message of the object type sent can be a json
Add the "com.atguigu.gulimall.order.config.MyRabbitConfig" class with the following code:
package com.atguigu.gulimall.order.config; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MyRabbitConfig { @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
After configuring MyRabbitConfig, the data sent and received is serialized into JSON
2.3.2. Rabbitlistener & rabbithandler receive message
-
@Listen on the queue
-
@RabbitHandler: mark on method (overload distinguishes different messages)
2.3.2.1,@RabbitListener
Listening for messages: use @ RabbitListener; The main startup class must have @ EnableRabbit
-
@interface RabbitListener
String[] queues() default {}; Listening queue -
Parameter list:
- 1. Message: native message details. Head + body
-
2,T<Type of message sent> : For example: OrderReturnReasonEntity content; Specifies the type of message received
-
3,Channel channel : Current data transmission channel
/** * @interface RabbitListener * String[] queues() default {}; Listening queue * Parameter list: * 1,Message message: Native message details. Head + body * 2,T<Type of message sent >: for example: OrderReturnReasonEntity content * 3,Channel channel : Current data transmission channel */ @RabbitListener(queues = {"hello-java-queue"}) public void recieveMessage(Message message, OrderReturnReasonEntity content, Channel channel){ // Header properties MessageProperties properties = message.getMessageProperties(); // Message body properties byte[] body = message.getBody(); System.out.println("Message received:" + message +"Content:"+content); }
2.3.2.2. RabbitHandler receives messages
Use @ RabbitListener annotation on the class to identify which message queue to listen to. Add @ RabbitHandler annotation on the method and overload to distinguish different messages.
-
@RabbitListener: which message queue is added to the class to listen on
-
@RabbitHandler: mark on method (overload distinguishes different messages)
Add the "com.atguigu.gulimall.order.controller.RabbitController" class. The code is as follows:
@Slf4j @RestController public class RabbitController { @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/sendMq") public String sendMq(@RequestParam(value = "num",defaultValue = "10") Integer num){ for (int i = 0; i < num; i++) { if (i%2 == 0) { OrderReturnApplyEntity orderReturnApplyEntity = new OrderReturnApplyEntity(); orderReturnApplyEntity.setId(1L); orderReturnApplyEntity.setCreateTime(new Date()); orderReturnApplyEntity.setReturnName("Ha ha ha"); rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", orderReturnApplyEntity); } else { OrderEntity entity = new OrderEntity(); entity.setOrderSn(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", entity); } } return "ok"; } }
Modify the "com.atguigu.gulamall.order.service.impl.orderitemserviceimpl" class, and the code is as follows:
@Service("orderItemService") @RabbitListener(queues = {"hello-java-queue"}) public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService { @Override public PageUtils queryPage(Map<String, Object> params) { IPage<OrderItemEntity> page = this.page( new Query<OrderItemEntity>().getPage(params), new QueryWrapper<OrderItemEntity>() ); return new PageUtils(page); } /** * @interface RabbitListener * String[] queues() default {}; Listening queue * Parameter list: * 1,Message message: Native message details. Head + body * 2,T<Type of message sent >: e.g. OrderReturnReasonEntity content * 3,Channel channel : Current data transmission channel * * Queue: Many people can listen, but a message can only be received by one consumer. * Scenario: * 1),Order service starts multiple: only one client can receive the same message * 2),When only one message is completely processed and the method runs, we can receive the next message */ @RabbitHandler public void recieveMessage(OrderReturnApplyEntity content){ System.out.println("Received message:" + content); } @RabbitHandler public void recieveMessage2(OrderEntity content){ System.out.println("Received message:" + content); } }
2.4.1 reliable delivery - sender confirmation
- The server receives a message p - > b: ConfirmCallback
- pring.rabbitmq.publisher-confirms=true
- Set confirmation callback
- Callback e - > Q: ReturnCallback when the message arrives in the queue
- spring.rabbitmq.publisher-returns: true
- spring.rabbitmq.template.mandatory: true
- Set the confirmation callback ReturnCallback
Step 1: modify the configuration file
spring: rabbitmq: publisher-confirms: true # Enable sender confirmation publisher-returns: true # Enable the confirmation of the sender's message arrival queue template: mandatory: true # As long as the queue arrives, the returnconfirm will be called back asynchronously
Step 2: customize the rabbit template
package com.atguigu.gulimall.order.config; @Configuration public class MyRabbitConfig { @Autowired RabbitTemplate rabbitTemplate; /** * Use JSON serialization mechanism for message conversion * @return */ @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } /** * Customize RabbitTemplate * 1,The server will call back when it receives the message * 1),spring.rabbitmq.publisher-confirms=true * 2),Set confirmation callback * 2,The message arrives in the queue correctly for callback * 1),spring.rabbitmq.publisher-returns: true * 2),spring.rabbitmq.template.mandatory: true * 3),Set the confirmation callback ReturnCallback * 3,Consumer side confirmation (ensure that each message is consumed correctly, and then the Broker can delete this message) */ @PostConstruct //After the MyRabbitConfig object is created, execute this method public void initRabbitTemplate() { // Set message confirmation callback rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * 1,As long as the message arrives, the Broker will return ack=true * @param correlationData Unique associated data of the current message (this is the unique id of the message) * @param b Does the message succeed or fail * @param s Reasons for failure */ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println("confirm...[correlationData:"+correlationData+"];[ack="+b+"];[cause="+s+"]"); } }); /** * Set the acknowledgement callback of message arrival queue */ rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * Trigger timing: this failure callback is triggered as long as the message is not delivered to the specified queue * @param message Message details of delivery failure; * @param i Reply status code; * @param s The text content of the reply; * @param s1 Which switch was this message sent to * @param s2 At that time, it was called which routing key was used for messages */ @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.out.println("Fail Message["+message+"]==>i["+i+"]==>s["+s+"]==>s1["+s1+"]==>s2["+s2+"]"); } }); } }
2.4.2 reliable delivery - consumer confirmation
Consumer side confirmation (ensure that each message is consumed correctly, and then ensure that the Broker can delete this message)
-
1) . the default is automatic confirmation. As long as the message is received, the client will automatically confirm and the server will remove the message
- Question:
-
1,Received a message, but failed to process it. Automatically reply to the server ACK,A message loss occurred.
-
2,We receive many messages and automatically reply to the server ack,Only one message was processed successfully and went down. Message loss occurred
-
- Solution: confirm manually, and return after manual confirmation after each processing.
- Question:
-
Manual ack
- spring.rabbitmq.listener.simple.acknowledge-mode=manual
- Manual confirmation return
- channel.basicAck(deliveryTag,false); Sign for; If the business is successfully completed, it should be signed
- channel.basicNack(deliveryTag,false,true); Dishonor: business failure, dishonor
**Step 1, * * modify the configuration file to: confirm manually
spring: rabbitmq: listener: simple: acknowledge-mode: manual # Enable manual ack message mode
**Step 2: manually sign in * * business code
@RabbitHandler public void recieveMessage(Message message,OrderReturnApplyEntity content, Channel channel){ System.out.println("Received message:" + content); long deliveryTag = message.getMessageProperties().getDeliveryTag(); System.out.println("Message processing completed!"); try { if (deliveryTag%2==0) { // Sign and return Ack; If the business is successfully completed, it should be signed channel.basicAck(deliveryTag,false); System.out.println("Signed for the goods..."+deliveryTag); } else { /** * deliveryTag: The label of the current message distribution, a string of numbers * multiple: Batch processing * requeue: * true: Rejoin the team * false: Discard message */ channel.basicNack(deliveryTag,false,true); System.out.println("Refused to sign the goods..."+deliveryTag); } } catch (IOException e) { // Network interruption } }