[Spring Cloud learning notes] Stream message driven

summary

Shield the differences between the underlying message middleware, reduce the switching cost, and unify the programming model of message


Standard MQ:
Information content is transmitted between producers / consumers through message media
Messages must go through a specific channel





By defining Binder as the middle layer, the isolation between application and message middleware details is realized
The message communication mode in Stream follows the publish subscribe mode


Case description

Three sub modules

The service registry uses Eureka

Message driven producer

1. Create a new Module and introduce dependencies

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>cloud2020</artifactId>
        <groupId>com.lyh.springcloud</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>cloud-stream-rabbitmq-provider8801</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <!--Basic configuration-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>

2. Configuration file

server:
  port: 8801
spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # Configure rabbitmq service information to bind
        defaultRabbit: # Name of the definition
          type: rabbit # Message component type
          environment: # Related environment configuration
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # Service integration processing
        output:
          destination: studyExchange
          content-type: application/json # Message type
          binder: defaultRabbit
# eureka related configuration
eureka:
  client:
    service-url:
      defaultZone: http://localhost:7003/eureka
  instance:
    lease-renewal-interval-in-seconds: 2
    lease-expiration-duration-in-seconds: 5
    instance-id: send-8801.com
    prefer-ip-address: true

3. Main startup class

package com.lyh.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author martin
 * @date 2021/2/16
 **/
@SpringBootApplication
public class StreamMQMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class, args);
    }
}

4. Business class

package com.lyh.springcloud.service.impl;

import com.lyh.springcloud.service.IMessageProvider;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * @author martin
 * @date 2021/2/16
 **/

@EnableBinding(Source.class) // Defines the push pipeline for messages
@Slf4j
public class IMessageProviderImpl implements IMessageProvider {

    @Resource
    private MessageChannel output; // Message sending pipeline

    @Override
    public String send() {
        String serial = UUID.randomUUID().toString().replaceAll("-", "");
        output.send(MessageBuilder.withPayload(serial).build());
        log.info("serial: " + serial);
        return serial;
    }
}

package com.lyh.springcloud.controller;

import com.lyh.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * @author martin
 * @date 2021/2/16
 **/

@RestController
public class SendMessageController {
    @Resource
    private IMessageProvider iMessageProvider;

    @GetMapping("/sendMessage")
    public String sendMessage() {
        return iMessageProvider.send();
    }
}

5. Testing

Message driven consumers

1. Create a new Module and introduce dependencies

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>cloud2020</artifactId>
        <groupId>com.lyh.springcloud</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>cloud-stream-rabbitmq-consumer8802</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <!--Basic configuration-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>

2. Configuration file application yml

server:
  port: 8802
spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # Configure rabbitmq service information to bind
        defaultRabbit: # Name of the definition
          type: rabbit # Message component type
          environment: # Related environment configuration
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # Service integration processing
        input:
          destination: studyExchange
          content-type: application/json # Message type
          binder: defaultRabbit
# eureka related configuration
eureka:
  client:
    service-url:
      defaultZone: http://localhost:7003/eureka
  instance:
    lease-renewal-interval-in-seconds: 2
    lease-expiration-duration-in-seconds: 5
    instance-id: receive-8802.com
    prefer-ip-address: true


3. Main startup class

package com.lyh.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author martin
 * @date 2021/2/16
 **/

@SpringBootApplication
public class StreamMQMain8802 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8802.class, args);
    }
}

4. Business class

package com.lyh.springcloud.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

/**
 * @author martin
 * @date 2021/2/16
 **/

@Component
@Slf4j
@EnableBinding(Sink.class)
public class ReceiveMessageController {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        log.info("consumer\t Port number:" + serverPort + "\t Received message:" + message.getPayload());
    }
}

5. Testing



Producers send messages and consumers can receive messages

Group consumption and persistence

8801 producers sent messages, and 8802 and 8803 consumers received them at the same time. There is a problem of repeated consumption

How to solve it?

The user-defined configuration is grouped. The user-defined configuration is divided into the same group to solve the problem of repeated consumption

Principle: when microservice applications are placed in the same group, it can ensure that messages will only be consumed by one application once. Different groups can be consumed. There will be competition in the same group, and only one of them can be consumed

Custom configuration grouping


In order to achieve high availability and load balancing, distributed microservice applications actually deploy multiple instances
In most cases, when a producer sends a message to a specific micro service, he only wants to be consumed once
To solve the problem of repeated consumption, the concept of consumption group is provided in Spring Cloud Stream

8802 / 8803 implements polling packets. There is only one consumer at a time. Messages sent by 8801 module can only be received by one of 8802 or 8803, so as to avoid repeated consumption
Change 8802 / 8803 into the same group, that is, the group is the same

After grouping is configured, the previous messages can still be obtained after service restart, avoiding message loss and realizing persistence

Tags: Distribution Spring Cloud RabbitMQ

Posted by JTapp on Mon, 18 Apr 2022 22:37:03 +0930