(16) , spring cloud stream message driven

Fried stewed sugar chestnut 2021-10-14 05:06:00

Stream Why was it introduced

common MQ( Message middleware ):

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka

Is there a new technology born , Let's stop focusing on specific MQ The details of the , We just need to use a way of adapting binding , Automatically give us all kinds of MQ Switch inside .( Be similar to Hibernate)

Cloud Stream What is it? ? Mask the differences of underlying message middleware , Reduce switching costs , Unified messaging Programming model .

Stream What is it and Binder Introduce

Official documents 1

Official documents 2

Cloud Stream Chinese instruction manual

What is? Spring Cloud Stream?

The official definition of Spring Cloud Stream It's a framework for building message driven microservices .

Application through inputs perhaps outputs Come and Spring Cloud Stream in binder Object interaction .

Through our configuration binding( binding ), and Spring Cloud Stream Of binder Object is responsible for interacting with message middleware . therefore , We just need to figure out how to deal with Spring Cloud Stream Interaction makes it easy to use message driven methods . By using Spring Integration To connect the message broker middleware for message event driven .
Spring Cloud Stream For some of the supplier's message middleware products, personalized automatic configuration is provided , Cited release - subscribe 、 Consumer groups 、 Three core concepts of zoning .

Currently only supported RabbitMQ、 Kafka.

image-20210916122806681

Stream Design idea

standard MQ

img

  • producer / Consumers rely on information media to transmit information content
  • News has to go through specific channels - News channel Message Channel
  • How is the message in the message channel consumed , Who is responsible for sending and receiving - News channel MessageChannel Sub interface of SubscribableChannel, from MessageHandler Message processor subscribed to .

Why Cloud Stream?

For example, we use RabbitMQ and Kafka, Due to the differences in the architecture of the two message oriented middleware , image RabbitMQ Yes exchange,kafka Yes Topic and Partitions Partition .

img

The differences of these middleware lead to our actual project development, which has caused us some difficulties , If we use one of the two message queues , The business requirements behind , I want to migrate to another message queue , There is no doubt that this is a disaster , A lot of things have to be pulled down and redone , Because it's coupled to our system , Now Spring Cloud Stream It provides us with — A way to decouple .

Stream How can we unify the underlying differences ?

Without the concept of a binder , our SpringBoot When an application wants to directly interact with message middleware , Because the original intention of each message middleware is different , There will be great differences in their implementation details by defining binders as the middle layer , Perfect isolation between application and message middleware details . By exposing unified Channel passageway , It makes the application no longer need to consider different message middleware implementations .

By defining the binder Binder As the middle layer , Realize the isolation between application and message middleware details .

Binder

  • INPUT Corresponding to the consumer
  • OUTPUT Corresponding to the producer

img

Stream The message communication mode in follows the release - A subscription model

Topic Broadcast the theme

  • stay RabbitMQ Namely Exchange
  • stay Kakfa The middle is Topic

Stream Introduction to common coding annotations

img

img

  • Binder - It's very convenient to connect middleware , Shielding differences .

  • Channel - passageway , Is a queue Queue An abstraction of , In the message communication system, it is the medium to store and forward , adopt Channel Configure the queue .

  • Source and Sink - It can be simply understood that the reference object is Spring Cloud Stream Oneself , from Stream To publish a message is to output , To receive a message is to input .

    code API And common notes

image-20210916125206221

Case description

Get ready RabbitMQ Environmental Science

Three new sub modules will be built in the project

  • cloud-stream-rabbitmq-provider8801, As a producer, send message module
  • cloud-stream-rabbitmq-consumer8802, As a message receiving module
  • cloud-stream-rabbitmq-consumer8803, As a message receiving module

Stream Message driven producers

newly build Module:cloud-stream-rabbitmq-provider8801

POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloud2021</artifactId>
<groupId>com.ylc.cloud</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-stream-rabbitmq-provider8801</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!-- Basic configuration -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

YML

server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # Configure the... To be bound here rabbitmq Service information ;
defaultRabbit: # Represents the name of the definition , Used on binding Integrate
type: rabbit # Message component type
environment: # Set up rabbitmq Related environment configuration of
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # Integration of services
output: # This name is the name of a channel
destination: studyExchange # Indicates what to use Exchange Name definition
content-type: application/json # Set message type , This time is json, Text is set to “text/plain”
binder: defaultRabbit # Set the specific settings of the message service to be bound
eureka:
client: # Client to Eureka Registered configuration
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # Set the interval between heartbeats ( The default is 30 second )
lease-expiration-duration-in-seconds: 5 # If it's more than 5 In seconds ( The default is 90 second )
instance-id: send-8801.com # Display host name in message list
prefer-ip-address: true # The access path becomes IP Address

Main startup class

package com.ylc.cloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StreamMQMain8801 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8801.class,args);
}
}

Business class

1. Send message interface

package com.ylc.cloud.service;
public interface IMessageProvider {
public String send();
}

2. Send message interface implementation class

package com.ylc.cloud.service.Imp;
import com.ylc.cloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import javax.annotation.Resource;
import java.util.UUID;
@EnableBinding(Source.class) // Define the push pipeline for messages
public class MessageProviderImpl implements IMessageProvider
{
@Resource
private MessageChannel output; // Message delivery pipeline
@Override
public String send()
{
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("*****serial: "+serial);
return null;
}
}

3.Controller

package com.ylc.cloud.controller;
import com.ylc.cloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class SendMessageController
{
@Resource
private IMessageProvider messageProvider;
@GetMapping(value = "/sendMessage")
public String sendMessage() {
return messageProvider.send();
}
}

test

image-20210916131653272

Stream Message driven consumers

newly build Module:cloud-stream-rabbitmq-consumer8802

POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloud2021</artifactId>
<groupId>com.ylc.cloud</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-stream-rabbitmq-consumer8802</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- Basic configuration -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

YML

server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # Configure the... To be bound here rabbitmq Service information ;
defaultRabbit: # Represents the name of the definition , Used on binding Integrate
type: rabbit # Message component type
environment: # Set up rabbitmq Related environment configuration of
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # Integration of services
input: # This name is the name of a channel
destination: studyExchange # Indicates what to use Exchange Name definition
content-type: application/json # Set message type , This time for the object json, If it's text, set “text/plain”
binder: defaultRabbit # Set the specific settings of the message service to be bound
eureka:
client: # Client to Eureka Registered configuration
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # Set the interval between heartbeats ( The default is 30 second )
lease-expiration-duration-in-seconds: 5 # If it's more than 5 In seconds ( The default is 90 second )
instance-id: receive-8802.com # Display host name in message list
prefer-ip-address: true # The access path becomes IP Address

Main startup class

package com.ylc.cloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StreamMQMain8802 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8802.class,args);
}
}

Business class

package com.ylc.cloud.controller;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController
{
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message)
{
System.out.println(" consumer 1 Number ,-----> Received news : "+message.getPayload()+"\t port: "+serverPort);
}
}

test

  • start-up EurekaMain7001
  • start-up StreamMQMain8801
  • start-up StreamMQMain8802
  • 8801 send out 8802 receive messages

image-20210916141908784

Stream The news of repeated consumption

according to 8802, Clone a copy to run 8803 - cloud-stream-rabbitmq-consumer8803.

start-up

  • RabbitMQ
  • Service registration - 8801
  • Message production - 8801
  • News consumption - 8802
  • News consumption - 8802

There are two problems after running

  1. There is the problem of repeated consumption
  2. Message persistence problem

consumption

  • http://localhost:8801/sendMessage

  • At present, it is 8802/8803 All received at the same time , There is a problem of repeated consumption

    image-20210916143634574

    image-20210916143622779

  • How to solve : Grouping and persistence properties group( important )

Actual production cases

For example, in the following scenario , We do cluster deployment for order system , Will come from RabbitMQ Get order information , Then if an order is obtained by two services at the same time , That would cause data errors , We have to avoid this situation . Then we can Use Stream Message grouping in .

img

Pay attention to Stream In the same group Many of the consumers in are competitive , It can guarantee that the message will only be consumed by one of the applications once . Different groups can be fully consumed ( Repeated consumption ).

Just be in the same group , It can guarantee that the message will only be consumed by one of the applications once .

Stream And group Solve the repeated consumption of messages

principle

Microservice applications are placed in the same group in , It can guarantee that the message will only be consumed by one of the applications once .

Different groups It can be consumed repeatedly , Same group There will be competition in , Only one of them can consume .

8802/8803 All become different groups ,group Two different

modify 8002、8003 Of YAL file :

Add groups , Put them in the same group

image-20210916144232991

Conclusion : Multiple microservice instances of the same group , Only one gets it at a time

By default, consumers in the same queue are competitive , Who's fast, who grabs

Stream The message is persistent

stop it 8802/8803 And get rid of 8802 The grouping group: A_Group,8803 The grouping group: A_Group Not removed .

8801 Send... First 4 Message to the RabbitMq.

Start... First 8802, No grouping property configuration , There's no news from the backstage .

Restart 8803, There are group property configurations , It came out backstage MQ News on .( Message persistence embodiment )

The principle is ,exchange Data is sent to the queue , because 02 No grouping is set for restart , The queue is recreated and listens , and 03 Or listen to the original queue .

Please bring the original link to reprint ,thank
Similar articles

2021-10-14