Stream Why was it introduced
common MQ( Message middleware ):
- ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
Is there a new technology born , Let's stop focusing on specific MQ The details of the , We just need to use a way of adapting binding , Automatically give us all kinds of MQ Switch inside .( Be similar to Hibernate)
Cloud Stream What is it? ? Mask the differences of underlying message middleware , Reduce switching costs , Unified messaging Programming model .
Stream What is it and Binder Introduce
Cloud Stream Chinese instruction manual
What is? Spring Cloud Stream?
The official definition of Spring Cloud Stream It's a framework for building message driven microservices .
Application through inputs perhaps outputs Come and Spring Cloud Stream in binder Object interaction .
Through our configuration binding( binding ), and Spring Cloud Stream Of binder Object is responsible for interacting with message middleware . therefore , We just need to figure out how to deal with Spring Cloud Stream Interaction makes it easy to use message driven methods . By using Spring Integration To connect the message broker middleware for message event driven .
Spring Cloud Stream For some of the supplier's message middleware products, personalized automatic configuration is provided , Cited release - subscribe 、 Consumer groups 、 Three core concepts of zoning .
Currently only supported RabbitMQ、 Kafka.
Stream Design idea
standard MQ
- producer / Consumers rely on information media to transmit information content
- News has to go through specific channels - News channel Message Channel
- How is the message in the message channel consumed , Who is responsible for sending and receiving - News channel MessageChannel Sub interface of SubscribableChannel, from MessageHandler Message processor subscribed to .
Why Cloud Stream?
For example, we use RabbitMQ and Kafka, Due to the differences in the architecture of the two message oriented middleware , image RabbitMQ Yes exchange,kafka Yes Topic and Partitions Partition .
The differences of these middleware lead to our actual project development, which has caused us some difficulties , If we use one of the two message queues , The business requirements behind , I want to migrate to another message queue , There is no doubt that this is a disaster , A lot of things have to be pulled down and redone , Because it's coupled to our system , Now Spring Cloud Stream It provides us with — A way to decouple .
Stream How can we unify the underlying differences ?
Without the concept of a binder , our SpringBoot When an application wants to directly interact with message middleware , Because the original intention of each message middleware is different , There will be great differences in their implementation details by defining binders as the middle layer , Perfect isolation between application and message middleware details . By exposing unified Channel passageway , It makes the application no longer need to consider different message middleware implementations .
By defining the binder Binder As the middle layer , Realize the isolation between application and message middleware details .
Binder:
- INPUT Corresponding to the consumer
- OUTPUT Corresponding to the producer
Stream The message communication mode in follows the release - A subscription model
Topic Broadcast the theme
- stay RabbitMQ Namely Exchange
- stay Kakfa The middle is Topic
Stream Introduction to common coding annotations
-
Binder - It's very convenient to connect middleware , Shielding differences .
-
Channel - passageway , Is a queue Queue An abstraction of , In the message communication system, it is the medium to store and forward , adopt Channel Configure the queue .
-
Source and Sink - It can be simply understood that the reference object is Spring Cloud Stream Oneself , from Stream To publish a message is to output , To receive a message is to input .
code API And common notes
Case description
Get ready RabbitMQ Environmental Science
Three new sub modules will be built in the project
- cloud-stream-rabbitmq-provider8801, As a producer, send message module
- cloud-stream-rabbitmq-consumer8802, As a message receiving module
- cloud-stream-rabbitmq-consumer8803, As a message receiving module
Stream Message driven producers
newly build Module:cloud-stream-rabbitmq-provider8801
POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloud2021</artifactId>
<groupId>com.ylc.cloud</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-stream-rabbitmq-provider8801</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!-- Basic configuration -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
YML
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # Configure the... To be bound here rabbitmq Service information ;
defaultRabbit: # Represents the name of the definition , Used on binding Integrate
type: rabbit # Message component type
environment: # Set up rabbitmq Related environment configuration of
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # Integration of services
output: # This name is the name of a channel
destination: studyExchange # Indicates what to use Exchange Name definition
content-type: application/json # Set message type , This time is json, Text is set to “text/plain”
binder: defaultRabbit # Set the specific settings of the message service to be bound
eureka:
client: # Client to Eureka Registered configuration
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # Set the interval between heartbeats ( The default is 30 second )
lease-expiration-duration-in-seconds: 5 # If it's more than 5 In seconds ( The default is 90 second )
instance-id: send-8801.com # Display host name in message list
prefer-ip-address: true # The access path becomes IP Address
Main startup class
package com.ylc.cloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StreamMQMain8801 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8801.class,args);
}
}
Business class
1. Send message interface
package com.ylc.cloud.service;
public interface IMessageProvider {
public String send();
}
2. Send message interface implementation class
package com.ylc.cloud.service.Imp;
import com.ylc.cloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import javax.annotation.Resource;
import java.util.UUID;
@EnableBinding(Source.class) // Define the push pipeline for messages
public class MessageProviderImpl implements IMessageProvider
{
@Resource
private MessageChannel output; // Message delivery pipeline
@Override
public String send()
{
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("*****serial: "+serial);
return null;
}
}
3.Controller
package com.ylc.cloud.controller;
import com.ylc.cloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class SendMessageController
{
@Resource
private IMessageProvider messageProvider;
@GetMapping(value = "/sendMessage")
public String sendMessage() {
return messageProvider.send();
}
}
test
-
start-up 7001eureka
-
start-up RabpitMq(79_Bus And RabbitMQ Environment configuration )
-
rabbitmq-plugins enable rabbitmq_management
-
-
start-up 8801
-
visit - http://localhost:8801/sendMessage
- Will print in the background serial: UUID character string
Stream Message driven consumers
newly build Module:cloud-stream-rabbitmq-consumer8802
POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloud2021</artifactId>
<groupId>com.ylc.cloud</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-stream-rabbitmq-consumer8802</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- Basic configuration -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
YML
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # Configure the... To be bound here rabbitmq Service information ;
defaultRabbit: # Represents the name of the definition , Used on binding Integrate
type: rabbit # Message component type
environment: # Set up rabbitmq Related environment configuration of
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # Integration of services
input: # This name is the name of a channel
destination: studyExchange # Indicates what to use Exchange Name definition
content-type: application/json # Set message type , This time for the object json, If it's text, set “text/plain”
binder: defaultRabbit # Set the specific settings of the message service to be bound
eureka:
client: # Client to Eureka Registered configuration
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # Set the interval between heartbeats ( The default is 30 second )
lease-expiration-duration-in-seconds: 5 # If it's more than 5 In seconds ( The default is 90 second )
instance-id: receive-8802.com # Display host name in message list
prefer-ip-address: true # The access path becomes IP Address
Main startup class
package com.ylc.cloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StreamMQMain8802 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8802.class,args);
}
}
Business class
package com.ylc.cloud.controller;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController
{
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message)
{
System.out.println(" consumer 1 Number ,-----> Received news : "+message.getPayload()+"\t port: "+serverPort);
}
}
test
- start-up EurekaMain7001
- start-up StreamMQMain8801
- start-up StreamMQMain8802
- 8801 send out 8802 receive messages
Stream The news of repeated consumption
according to 8802, Clone a copy to run 8803 - cloud-stream-rabbitmq-consumer8803.
start-up
- RabbitMQ
- Service registration - 8801
- Message production - 8801
- News consumption - 8802
- News consumption - 8802
There are two problems after running
- There is the problem of repeated consumption
- Message persistence problem
consumption
-
At present, it is 8802/8803 All received at the same time , There is a problem of repeated consumption
-
How to solve : Grouping and persistence properties group( important )
Actual production cases
For example, in the following scenario , We do cluster deployment for order system , Will come from RabbitMQ Get order information , Then if an order is obtained by two services at the same time , That would cause data errors , We have to avoid this situation . Then we can Use Stream Message grouping in .
Pay attention to Stream In the same group Many of the consumers in are competitive , It can guarantee that the message will only be consumed by one of the applications once . Different groups can be fully consumed ( Repeated consumption ).
Just be in the same group , It can guarantee that the message will only be consumed by one of the applications once .
Stream And group Solve the repeated consumption of messages
principle
Microservice applications are placed in the same group in , It can guarantee that the message will only be consumed by one of the applications once .
Different groups It can be consumed repeatedly , Same group There will be competition in , Only one of them can consume .
8802/8803 All become different groups ,group Two different
modify 8002、8003 Of YAL file :
Add groups , Put them in the same group
Conclusion : Multiple microservice instances of the same group , Only one gets it at a time
By default, consumers in the same queue are competitive , Who's fast, who grabs
Stream The message is persistent
stop it 8802/8803 And get rid of 8802 The grouping group: A_Group,8803 The grouping group: A_Group Not removed .
8801 Send... First 4 Message to the RabbitMq.
Start... First 8802, No grouping property configuration , There's no news from the backstage .
Restart 8803, There are group property configurations , It came out backstage MQ News on .( Message persistence embodiment )
The principle is ,exchange Data is sent to the queue , because 02 No grouping is set for restart , The queue is recreated and listens , and 03 Or listen to the original queue .