Using Spring cloud stream can easily realize the sending and receiving of Kafka messages. The following is a Kafka application that I implemented according to the example of Spring's official website.
This example is to implement a scenario in which a telecommunication company collects user consumption telecommunication services and calculates the cost. Three applications are included.
1. Record the length of time users use telecommunications services:
This application will simulate and generate user bills, including user ID, voice call duration, data traffic information, and send bill information to Kafka.
Create an application named usage-detail-sender-kafka with Spring boot, and select Spring for Apache Kafka, Spring cloud stream, Acutator in the dependencies
Create a new class named UsageDetail with the following code:
package cn.roygao.usagedetailsender; public class UsageDetail { private String userId; private long duration; private long data; public String getUserId() { return this.userId; } public void setUserId(String userId) { this.userId = userId; } public long getDuration() { return this.duration; } public void setDuration(long duration) { this.duration = duration; } public long getData() { return this.data; } public void setData(long data) { this.data = data; } }
Create a new class named UsageDetailSender to implement the function of sending Kafka messages. The code is as follows:
package cn.roygao.usagedetailsender; import java.util.Random; import java.util.function.Supplier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class UsageDetailSender { private String[] users = {"user1", "user2", "user3", "user4", "user5"}; @Bean public Supplier<UsageDetail> sendEvents() { return () -> { UsageDetail usageDetail = new UsageDetail(); usageDetail.setUserId(this.users[new Random().nextInt(5)]); usageDetail.setDuration(new Random().nextInt(300)); usageDetail.setData(new Random().nextInt(700)); return usageDetail; }; } }
This code uses the functional programming pattern, and Supplier implements a function to send Kafka messages.
Add the following configuration in application.properties:
spring.cloud.stream.function.bindings.sendEvents-out-0=output spring.cloud.stream.bindings.output.destination=usage-detail # Spring Boot will automatically assign an unused http port. This may be overridden using external configuration. server.port=0
The first line of this configuration means to take an alias for the output of this Kafka, because the default output of this Supplier is bound to spring.cloud.stream.function.bindings.xxxx-out-0, where xxxx represents this The name of the output function, in this case sendEvents, so we replace the name xxxx-out-0 with one called output.
The second line of configuration is to set the destination message topic of this output to usage-detail
Then run ./mvnw clean package to compile.
2. Calculate the user's charges for using telecommunication services:
This application will subscribe to Kafka's related message topics, receive the user bills generated by application 1, then calculate the related costs, and send the costs to another Kafka message topic.
Create an application named usage-cost-processor-kafka with Spring boot, and select Spring for Apache Kafka, Spring cloud stream, Acutator in the dependencies
Create a new class named UsageDetail, such as the code in Application 1 above.
Create a new class named UsageCostDetail with the following code:
package cn.roygao.usagecostprocessor; public class UsageCostDetail { private String userId; private double callCost; private double dataCost; public String getUserId() { return this.userId; } public void setUserId(String userId) { this.userId = userId; } public double getCallCost() { return this.callCost; } public void setCallCost(double callCost) { this.callCost = callCost; } public double getDataCost() { return this.dataCost; } public void setDataCost(double dataCost) { this.dataCost = dataCost; } @Override public String toString() { return "{" + " userId='" + getUserId() + "'" + ", callCost='" + getCallCost() + "'" + ", dataCost='" + getDataCost() + "'" + "}"; } }
Create a new UsageCostProcessor class to receive the message sent by application 1, calculate the cost, and then send the message to Kafka. The code is as follows:
package cn.roygao.usagecostprocessor; import java.util.function.Function; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class UsageCostProcessor { private double ratePerSecond = 0.1; private double ratePerMB = 0.05; @Bean public Function<UsageDetail, UsageCostDetail> processUsageCost() { return usageDetail -> { UsageCostDetail usageCostDetail = new UsageCostDetail(); usageCostDetail.setUserId(usageDetail.getUserId()); usageCostDetail.setCallCost(usageDetail.getDuration() * this.ratePerSecond); usageCostDetail.setDataCost(usageDetail.getData() * this.ratePerMB); return usageCostDetail; }; } }
Function is used here to define a Kafka's receiving and sending, where the first parameter represents the received message, and the second parameter represents the sent message.
Add the following configuration in application.properties:
spring.cloud.stream.function.bindings.processUsageCost-in-0=input spring.cloud.stream.function.bindings.processUsageCost-out-0=output spring.cloud.stream.bindings.input.destination=usage-detail spring.cloud.stream.bindings.output.destination=usage-cost # Spring Boot will automatically assign an unused http port. This may be overridden using external configuration. server.port=0
3. Receive user telecommunication charges:
This application will subscribe to the Kafka topic, receive the cost information sent by application 2 and print it in the log.
Create a new application named usage-cost-logger-kafka, and select Spring for Apache Kafka, Spring cloud stream, Acutator in the dependencies
Create a new class named UsageCostDetail, as shown in the code of application 2 above.
Create a new class named UsageCostLogger with the following code:
package cn.roygao.usagecostlogger; import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class UsageCostLogger { private static final Logger logger = LoggerFactory.getLogger(UsageCostLoggerKafkaApplication.class); @Bean public Consumer<UsageCostDetail> process() { return usageCostDetail -> { logger.info(usageCostDetail.toString()); }; } }
Consumer is used here to realize the consumption of Kafka messages.
Add the following configuration in application.properties:
spring.cloud.stream.function.bindings.process-in-0=input spring.cloud.stream.bindings.input.destination=usage-cost # Spring Boot will automatically assign an unused http port. This may be overridden using external configuration. server.port=0
Deployment test
After the above three applications are compiled, we can test them. First start Kafka, here I start Kafka by running Docker, and download the following docker-compose file:
curl --silent --output docker-compose.yml \ https://raw.githubusercontent.com/confluentinc/cp-all-in-one/6.1.0-post/cp-all-in-one/docker-compose.yml
Then run docker compose up to start, and then visit localhost:9021 in the browser to view the information of the Kafka console.
Run the above compiled applications respectively, and then you can see the information on the two topics of usage-detail and usage-cost in the Kafka console, and you can see the printed users in the log output by the third application fee information.
The above is a simple example of writing a Kafka application using Spring cloud stream.