summary
Shield the differences between the underlying message middleware, reduce the switching cost, and unify the programming model of message
Standard MQ:
Information content is transmitted between producers / consumers through message media
Messages must go through a specific channel
By defining Binder as the middle layer, the isolation between application and message middleware details is realized
The message communication mode in Stream follows the publish subscribe mode
Case description
Three sub modules
The service registry uses Eureka
Message driven producer
1. Create a new Module and introduce dependencies
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>cloud2020</artifactId> <groupId>com.lyh.springcloud</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>cloud-stream-rabbitmq-provider8801</artifactId> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <!--Basic configuration--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> </project>
2. Configuration file
server: port: 8801 spring: application: name: cloud-stream-provider cloud: stream: binders: # Configure rabbitmq service information to bind defaultRabbit: # Name of the definition type: rabbit # Message component type environment: # Related environment configuration spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # Service integration processing output: destination: studyExchange content-type: application/json # Message type binder: defaultRabbit # eureka related configuration eureka: client: service-url: defaultZone: http://localhost:7003/eureka instance: lease-renewal-interval-in-seconds: 2 lease-expiration-duration-in-seconds: 5 instance-id: send-8801.com prefer-ip-address: true
3. Main startup class
package com.lyh.springcloud; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * @author martin * @date 2021/2/16 **/ @SpringBootApplication public class StreamMQMain8801 { public static void main(String[] args) { SpringApplication.run(StreamMQMain8801.class, args); } }
4. Business class
package com.lyh.springcloud.service.impl; import com.lyh.springcloud.service.IMessageProvider; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.MessageBuilder; import javax.annotation.Resource; import java.util.UUID; /** * @author martin * @date 2021/2/16 **/ @EnableBinding(Source.class) // Defines the push pipeline for messages @Slf4j public class IMessageProviderImpl implements IMessageProvider { @Resource private MessageChannel output; // Message sending pipeline @Override public String send() { String serial = UUID.randomUUID().toString().replaceAll("-", ""); output.send(MessageBuilder.withPayload(serial).build()); log.info("serial: " + serial); return serial; } }
package com.lyh.springcloud.controller; import com.lyh.springcloud.service.IMessageProvider; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** * @author martin * @date 2021/2/16 **/ @RestController public class SendMessageController { @Resource private IMessageProvider iMessageProvider; @GetMapping("/sendMessage") public String sendMessage() { return iMessageProvider.send(); } }
5. Testing
Message driven consumers
1. Create a new Module and introduce dependencies
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>cloud2020</artifactId> <groupId>com.lyh.springcloud</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>cloud-stream-rabbitmq-consumer8802</artifactId> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <!--Basic configuration--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> </project>
2. Configuration file application yml
server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: binders: # Configure rabbitmq service information to bind defaultRabbit: # Name of the definition type: rabbit # Message component type environment: # Related environment configuration spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # Service integration processing input: destination: studyExchange content-type: application/json # Message type binder: defaultRabbit # eureka related configuration eureka: client: service-url: defaultZone: http://localhost:7003/eureka instance: lease-renewal-interval-in-seconds: 2 lease-expiration-duration-in-seconds: 5 instance-id: receive-8802.com prefer-ip-address: true
3. Main startup class
package com.lyh.springcloud; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * @author martin * @date 2021/2/16 **/ @SpringBootApplication public class StreamMQMain8802 { public static void main(String[] args) { SpringApplication.run(StreamMQMain8802.class, args); } }
4. Business class
package com.lyh.springcloud.controller; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; /** * @author martin * @date 2021/2/16 **/ @Component @Slf4j @EnableBinding(Sink.class) public class ReceiveMessageController { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message<String> message) { log.info("consumer\t Port number:" + serverPort + "\t Received message:" + message.getPayload()); } }
5. Testing
Producers send messages and consumers can receive messages
Group consumption and persistence
8801 producers sent messages, and 8802 and 8803 consumers received them at the same time. There is a problem of repeated consumption
How to solve it?
The user-defined configuration is grouped. The user-defined configuration is divided into the same group to solve the problem of repeated consumption
Principle: when microservice applications are placed in the same group, it can ensure that messages will only be consumed by one application once. Different groups can be consumed. There will be competition in the same group, and only one of them can be consumed
Custom configuration grouping
In order to achieve high availability and load balancing, distributed microservice applications actually deploy multiple instances
In most cases, when a producer sends a message to a specific micro service, he only wants to be consumed once
To solve the problem of repeated consumption, the concept of consumption group is provided in Spring Cloud Stream
8802 / 8803 implements polling packets. There is only one consumer at a time. Messages sent by 8801 module can only be received by one of 8802 or 8803, so as to avoid repeated consumption
Change 8802 / 8803 into the same group, that is, the group is the same
After grouping is configured, the previous messages can still be obtained after service restart, avoiding message loss and realizing persistence