一 消息驱动
1.1 Spring Cloud Stream
Spring Cloud Stream是一个用来为微服务应用构建消息驱动能力的框架,它可以基于Spring Boot来创建独立的,可用于生产的Spring应用程序。它通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。
Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并且引入了发布-订阅、消费组以及分区这三个核心概念。
1.2 Spring Cloud Stream快速入门
我这里使用RabbitMQ作为演示。
1.2.1 依赖注入
构建一个spring-cloud-stream的工程,引入Spring Cloud Stream对RabbitMQ的支持,具体如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.bianjf</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
<relativePath/>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Greenwich.SR1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.1.0.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- 引入alibaba-nacos Start -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- 引入alibaba-nacos End -->
<!-- 引入alibaba-nacos-config Start -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!-- 引入alibaba-nacos-config End -->
<!-- Spring Boot Start -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Boot End -->
<!-- RabbitMQ支持 Start -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!-- RabbitMQ支持 End -->
<!-- Slf4j日志系统 Start -->
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>4.9</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</dependency>
<!-- Slf4j日志系统 End -->
<!-- Alibaba fastjson Start -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.60</version>
</dependency>
<!-- Alibaba fastjson End -->
<!-- 动态生成代码 Start -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.14</version>
<optional>true</optional>
</dependency>
<!-- 动态生成代码 End -->
</dependencies>
<build>
<plugins>
<!-- Spring Boot启动插件 Start -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<!-- Spring Boot启动插件 End -->
<!-- 编译工具 Start -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- 编译工具 End -->
<!-- Maven Install 跳过Test阶段 Start -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<!-- Maven Install 跳过Test阶段 End -->
</plugins>
</build>
</project>
1.2.2 创建消费者
创建用于接收来自RabbitMQ消息的消费者SinkReceiver,通过@EnableBinding注解和@StreamListener完成接收消息服务,代码如下:
@Slf4j
@EnableBinding(Sink.class)
public class SinkReceiver {
@StreamListener(Sink.INPUT)
public void receive(Object payload) {
log.info("receive -- payload --> {}", JSONObject.toJSONString(payload));
}
}
1.2.3 配置
配置RabbitMQ的配置,bootstrap.yml如下:
server:
port: 8083
spring:
application:
#服务名称
name: spring-cloud-strem
cloud:
nacos:
discovery:
#Nacos注册中心地址
server-addr: 192.168.10.3:8848
#固定本机的IP地址,防止注册时,本机使用其他网卡的地址与Nacos服务器进行通讯
ip: 192.168.10.79
config:
#Nacos配置中心的地址
server-addr: 192.168.10.3:8848
#Nacos配置文件的扩展名
file-extension: yml
#指定组
group: dev
inetutils:
#使用本地网卡
use-only-site-local-interfaces: true
rabbitmq:
addresses: 192.168.10.3
port: 5672
username: rabbitadmin
password: 123456
1.2.4 启动主类
启动主类没有其他内容
@EnableDiscoveryClient
@SpringBootApplication(scanBasePackages = {"com.bianjf"})
public class StreamApplication {
public static void main(String[] args) {
SpringApplication.run(StreamApplication.class);
}
}
启动起来以后,我们可以通过RabbitMQ的Web界面推送一条消息,查看是否收到。
我们对Spring Boot应用引入了spring-cloud-starter-stream-rabbit依赖,该依赖包是Spring Cloud Stream对RabbitMQ支持的封装,其中包含了对RabbitMQ的自动化配置等内容。从它定义的依赖关系中,它等价于spring-cloud-stream-binder-rabbit
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>
SinkReceiver中,核心注解:
- @EnableBinding:该注解用来指定一个或多个定义了@Input或@Output注解的接口,以此实现对消息通道(Channel)的绑定。在上面的例子中,我们通过@EnableBinding(Sink.class)绑定了Sink接口,该接口是Spring Cloud Stream中默认实现的对输入消息通道绑定的定义。源码如下:
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
它通过@Input注解绑定了一个名为input的通道。除了Sink之外,Spring Cloud Stream还默认实现了绑定output通道的Source接口,还有结合了Sink和Source的Processor接口。实际使用时我们也可以通过@Input和@Output注解来自定义绑定消息通道的接口。当需要为@EnableBinding指定多个接口来绑定消息通道的时候,可以这样定义:
@EnableBinding(value = {Sink.class, Source.class})
- @StreamListener:它主要定义在方法上,作用是将被修饰的方式注册为消息中间件上数据流的事件监听器。注解中的属性值对应了监听的消息通道名。在上面的例子中,我们通过@StreamListener(Sink.INPUT)注解将receive方法注册为input消息通道的监听处理器,所以当我们在RabbitMQ的控制页面中发布消息的时候,receive方法会做出对应的响应动作。
1.3 核心概念
那么Spring Cloud Stream中是如何通过定义一些基础概念来对各种不同的消息中间件做抽象的?
如下图所示,Spring Cloud Stream应用模型的结构图。
Spring Cloud Stream构建应用程序与消息中间件之间是通过绑定器Binder相关联的,绑定器对于应用程序而言起到了隔离作用,它使得不同消息中间件的实现细节对应用程序来说是透明的。
所以对应每一个Spring Cloud Stream的应用程序来说,它不需要知晓消息中间件的通信细节,它只需要知道Binder对应用程序提供的抽象概念来使用消息中间件来实现业务逻辑即可。而这个抽象概念就是我们使用的消息通道:Channel。
如上图所示,在应用程序和Binder之间定义了两条输入通道和三条输出通道来传递消息,而绑定器则是作为这些通道和消息中间件之间的桥梁进行通信。
1.3.1 绑定器
Binder绑定器是Spring Cloud Stream非常重要的概念。在没有绑定器这个概念的情况下,Spring Boot应用要直接与消息中间件进行消息交互的时候,由于各消息中间件构建的初衷不同,所以它们在实现细节上会有较大的差异,当中间件有较大的变动升级或更换中间件的时候,我们需要付出非常大的代价来实施。
通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件的实现。当需要升级消息中间件或是更换其他消息中间件产品时,我们要做的就是更换它们对应的Binder绑定器而不需要修改任何Spring Boot的应用逻辑。
1.3.2 发布-订阅模式
Spring Cloud Stream中的消息通信方式遵循了发布-订阅模式,当一条消息被投递到消息中间件之后,它会通过共享的Topic主题进行广播,消息消费者在订阅的主题中收到它并触发自身的业务逻辑处理。
这里的Topic主题是Spring Cloud Stream中的一个抽象概念,用来代表发布共享消息给消费者的地方。在不同的消息中间件中,Topic可能对应不同的概念,比如在RabbitMQ中,它对应Exchange,而在Kafka中则对应Topic。
在上面的程序中,我们通过RabbitMQ的Channel发布消息给我们编写的应用程序消费,而实际上Spring Cloud Stream应用启动的时候,在RabbitMQ的Exchange中也创建了一个名为input的Exchange交换器,由于Binder的隔离作用,应用程序并无法感知它的存在,应用程序只知道自己指向Binder的输入或是输出通道。
为了更直观的感受这种模式,我们通过命令模式启动两个不同端口的进程(上述快速开始的程序),此时,RabbitMQ控制页面的channels可以看到连个消息通道,
而此时通过Exchange页面的Publish Message来发布消息,两个启动的应用程序都能输出消息内容。
相对于点对点队列实现的消息通信来说,Spring Cloud Stream采用了发布-订阅模式可以有效降低消息生产者与消费者之间的耦合。
1.3.3 消费组
虽然Spring Cloud Stream通过发布-订阅模式将消息生产者与消费者做了很好的解耦,基于相同主题的消费者可以轻松地进行扩展,但是这些扩展都是针对不同的应用实例而言的。在现实的微服务架构中,我们的每一个微服务应用为了实现高可用和负载均衡,实际上都会部署多个实例。在很多情况下,消息生产者发送消息给某个具体微服务时,只希望被消费一次,按照上面的例子,消息被重复消费两次的情况。为了解决这个问题,在Spring Cloud Stream中提供了消费组的概念。
如果在同一个主题上的应用需要启动多个实例的时候,我们可以通过spring.cloud.stream.bindings.input.group (其中input表示Channel的名称)属性为应用指定一个组名,这样这个应用的多个实例在接收到消息的时候,只会有一个成员接收到消息并进行处理。
如下图所示,消息进入主题之后,hdfsWrite和average都会收到消息的副本,但是在两个组中只会有一个实例对其进行消费
1.3.4 消息分区
通过引入消费组的概念,在多实例的情况下,保障每个消息只被组内的一个实例消费,但是消费组无法控制消息具体被哪个实例消费。对于一些业务场景,需要对一些具有相同特征的消息设置每次都被同一个消费实例处理。比如,一些用于监控服务,为了统计某段时间内消息生产者发送的报告内容,监控服务需要在自身聚合这些数据,那么消息生产者可以为消息增加一个固有的特征ID来进行分区,使得拥有这些ID的消息每次都能被发送到一个特定的实例上实现累计统计。
消息分区概念的引入就是为了解决这样的问题:当生产者消息数据发送给多个消费者实例时,保证拥有共同特征的消息数据始终是由同一个消费者实例接收和处理。
二 使用详解
2.1 开启绑定功能
使用@EnableBinding注解开启绑定功能,该注解只有一个唯一的属性:value。value是一个Class类型的数组,所以我们可以通过value属性一次性指定多个关于消息通道的配置。
2.2 绑定消息通道
在Spring Cloud Stream中,我们可以在接口中通过@Input和@Output注解来定义消息通道,而用于定义绑定消息通道的接口则可以被@EnableBinding注解的value参数来指定,从而在应用启动的时候实现对定义消息通道的绑定。
在上面的例子中我们使用了Sink接口绑定的消息通道。Sink接口是Spring Cloud Stream提供的一个默认实现,除此之外还有Source和Processor
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT)
MessageChannel output();
}
public interface Processor extends Source, Sink {
}
由代码可以看出,Sink和Source中分别通过@Input和@Output注解定义了输入通道和输出通道,而Processor通过集成Source和Sink的方式同时定义了一个输入通道和一个输出通道。
另外,@Input和@Output注解都还有一个value属性,该属性可以用来设置消息通道的名称。
需要注意,当我们定义输出通道的时候,需要返回MessageChannel接口对象,该接口定义了向消息通道发送消息的方法;而定义输入通道时,需要返回SubscribableChannel接口对象,该接口继承自MessageChannel接口,它定义了维护消息通道订阅者的方法。
2.2.1 注入绑定接口
我们通过注入的方式实现一个消息生产者,向output消息通道发送数据。
- 创建Output消息通道作为输出通道的接口,具体如下:
public interface SinkSender {
@Output(Source.OUTPUT)
MessageChannel output();
}
- 对上述工程的代码SinkReceiver做一下修改:在@EnableBinding注解中增加对SinkSender接口的指定,使Spring Cloud Stream能创建出对应的实例:
@Slf4j
@EnableBinding({SinkSender.class})
public class SinkReceiver {
@StreamListener(Source.OUTPUT)
public void receive(Object payload) {
log.info("receive -- payload --> {}", JSONObject.toJSONString(payload));
}
}
这里我将Sink.class去掉了,因为在接下来的场景无需使用该通道了。
- 创建单元测试,如下:
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = StreamApplication.class)
public class StreamTest {
@Autowired
private SinkSender sinkSender;
@Test
public void contextLoads() {
this.sinkSender.output().send(MessageBuilder.withPayload("Test-测试").build());
}
}
2.3 消息生产与消费
Spring Cloud Stream是基于Spring Integration构建起来的,所以在使用Spring Cloud Stream构建消息驱动服务的时候,完全可以使用Spring Integration的原生注解来实现各种业务需求。
2.3.1 @StreamListener详解
通过@StreamListener注解,Spring Cloud Stream会将其注册为输入消息通道的监听器。当输入消息通道中有消息到达的时候,会立即触发该注解修饰方法的处理逻辑对消息进行消费。
2.3.1.1 消息转换
大部分情况下,我们通过消息来对接服务或系统时,消息生产者都会以结构化的字符串形式来发送,比如JSON或XML。当消息到达的时候,输入通道的监听器需要对该字符串做一定的转换,将JSON或XML转换成具体的对象,然后再做处理。
为了方便演示,我这里创建User对象,如下:
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
private static final long serialVersionUID = -1L;
/** 姓名 */
private String username;
/** 年龄 */
private Integer age;
}
我们可以直接使用注解@StreamListener,接收对象写成User对象,代码如下:
@Slf4j
@EnableBinding({SinkSender.class})
public class SinkReceiver {
@StreamListener(Source.OUTPUT)
public void receive(User user) {
log.info("receive -- payload --> {}", JSONObject.toJSONString(user));
}
}
此时,需要在配置文件中增加:spring.cloud.stream.bindings.output.context-type=application/json属性设置即可。如下:
server:
port: 8083
spring:
application:
#服务名称
name: spring-cloud-strem
cloud:
nacos:
discovery:
#Nacos注册中心地址
server-addr: 192.168.10.3:8848
#固定本机的IP地址,防止注册时,本机使用其他网卡的地址与Nacos服务器进行通讯
ip: 192.168.10.79
config:
#Nacos配置中心的地址
server-addr: 192.168.10.3:8848
#Nacos配置文件的扩展名
file-extension: yml
#指定组
group: dev
inetutils:
#使用本地网卡
use-only-site-local-interfaces: true
stream:
bindings:
#Channel名称
output:
content-type: application/json
group: mychannel
rabbitmq:
addresses: 192.168.10.3
port: 5672
username: rabbitadmin
password: 123456
@StreamListener注解能够通过配置属性实现JSON字符串到对象的转换,这是因为在Spring Cloud Stream中实现了一套可扩展的消息转换机制。在消息消费逻辑执行之前,消息转换机制会根据消息头信息中声明的类型(content-type)找到对应的消息转换器并实现对消息的自动转换。
2.3.1.2 自定义输入输出流
现在有这样一个场景:我们在App1中监听消息,监听来自userDest的消息;在App2中发送消息,发往的目的地是userDest。这样在两个应用中都接收/发往同一个地方,而不关乎Spring Cloud Stream的Binder的Channel名称了。
两个工程承自上述快速开始的工程。
- 在App1中接收消息
受限声明一个@Input对应方法的接口,返回SubscribableChannel。UserInputMessageSource代码如下:
public interface UserInputMessageSource {
/** 用户输入通道名称 */
String USER_INPUT_CHANNEL = "userInputChannel";
@Input(UserInputMessageSource.USER_INPUT_CHANNEL)
SubscribableChannel input();
}
开启绑定,接收消息,App1代码如下:
@Slf4j
@EnableBinding(value = {UserInputMessageSource.class})
public class App1 {
@StreamListener(UserInputMessageSource.USER_INPUT_CHANNEL)
public void receiveFromInput(User user) {
log.info("receiveFromInput -- user --> {}", JSONObject.toJSONString(user));
}
}
我们将userInputChannel通道监听来自userDest的消息。配置内容如下:
spring:
cloud:
stream:
bindings:
#Channel名称
userInputChannel:
content-type: application/json
#组名称
group: userInputGroup
#该Channel的消息获取目的地是
destination: userDest
我们使用spring.cloud.stream.bindings.[channel_name].destination来指定目的地,当输出消息时,表示发往的目的地;当输入消息时,表示监听的目的器。在RabbitMQ中相当于Exchange的概念。
- 在App2中发送消息
受限声明一个@Output对应方法的接口,返回MessageChannel。UserOutputMessageSource代码如下:
public interface UserOutputMessageSource {
/** 用户输出通道 */
String USER_OUTPUT_CHANNEL = "userOutputChannel";
@Output(UserOutputMessageSource.USER_OUTPUT_CHANNEL)
MessageChannel output();
}
开启Spring Cloud Stream功能,代码如下:
@EnableBinding(UserOutputMessageSource.class)
public class MessageService {
}
此处没有定义任何的方法
我们将userOutputChannel的通道发往的目的地为userDest,这样在App1中就可以直接监听读取消息了
spring:
cloud:
stream:
bindings:
#通道名称
userOutputChannel:
#消息发往的目的地
destination: userDest
同上述。
最后我们可以在App2中测试,然后观察App1中是否有消息
测试代码如下:
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = StreamBackupApplication.class)
public class AppTest {
@Autowired
private UserOutputMessageSource userOutputMessageSource;
@Test
public void send() {
User user = new User("王五", 1234);
this.userOutputMessageSource.output().send(MessageBuilder.withPayload(user).build());
}
}
在App1上观察如下:
所以我们可以通过spring.cloud.stream.bindings.[channel_name].destination来指定发往/接收消息的目的地
2.4 Spring Cloud Stream利用RabbitMQ实现延时消息
实现延时队列很简单,流程如下:
- RabbitMQ启用延时队列功能。
- 通过Spring Cloud Stream向其头部增加x-delay,单位为毫秒
- 将exchange声明成延时消息队列(队列类型为:x-delayed-message)
启用RabbitMQ支持延时队列的插件可见上述
2.4.1 发送消息增加x-delay头部
我们在发送消息时,向头部增加x-delay头部信息。
代码如下:
@Slf4j
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = StreamBackupApplication.class)
public class AppTest {
@Autowired
private UserOutputMessageSource userOutputMessageSource;
@Test
public void send() {
User user = new User("王五", 1234);
this.userOutputMessageSource.output().send(MessageBuilder.withPayload(user).setHeader("x-delay", 5000).build());
System.out.println("结束发送时间: " + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
}
}
我这里将消息延迟到5秒后消费。
生产端配置:
spring:
cloud:
stream:
#Spring Cloud Stream Binders配置
bindings:
#通道名称
userOutputChannel:
#消息发往的目的地
destination: userDest
rabbit:
bindings:
#通道名称
userOutputChannel:
#开启该通道的延时消息功能
producer:
delayed-exchange: true
通过spring.cloud.stream.rabbit.bindings.<channelName>.producer.delayped-exchange=true将队列声明成x-delayed-message延时消息队列
消费端代码不变,只是增加时间打印:
@Slf4j
@EnableBinding(value = {UserInputMessageSource.class})
public class App1 {
@StreamListener(UserInputMessageSource.USER_INPUT_CHANNEL)
public void receiveFromInput(User user) {
System.out.println("开始接收时间: " + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
log.info("receiveFromInput -- user --> {}", JSONObject.toJSONString(user));
}
}
生产端配置如下:
spring:
cloud:
stream:
#Spring Cloud Stream Binders配置
bindings:
#Channel名称
userInputChannel:
content-type: application/json
#组名称
group: userInputGroup
#该Channel的消息获取目的地是
destination: userDest
#RabbitMQ相关配置
rabbit:
bindings:
#Channel名称
userInputChannel:
#开启该通道消费延时功能
consumer:
delayed-exchange: true
消费端通过spring.cloud.stream.rabbit.bindings.<channelName>.consumer.delayed-exchange=true将exchange声明成x-delayed-message队列
效果如下:
发送消息时间:
消费消息时间:
RabbitMQ的延时队列,延时的最大限度为:4294967295毫秒,即2^31-1毫秒。即不超过50天。
其他相关Spring Cloud Stream配置请看下面文件:
README.zip
评论区