概述
异步消息传递是一种将消息从一个应用程序间接发送到另一个应用程序而无需等待响应的方法,这种间接提供了通信应用程序之间更松的耦合和更大的可伸缩性。
1.摘要
- 基于JMS的ActiveMQ Artemis
- 基于AMQP的RabbitMQ
- Kafka
使用JMS服务
- JMS(Java Message Service)是一种java标准,它定义了一种面向消息中间件的公共API接口
- JMS只是接口,不同的提供商或者开源组织对其有不同的实现,ActiveMQ就是其中之一
- *缺点: * 只能用于Java平台
1. 配置JMS
选用一种JMS实现,ActiveMQ或ActiveMQ Artemis(推荐: 新一代实现)
参考官方文档安装一个安装一个
Artemis
或者ActiveMQ
,如以Artemis
为例简单介绍:- 下载解压后进入bin目录,使用命令创建broker设置用户名密码等
./artemis create $directory
- 进入创建的
broker
目录中的bin目录下启动:./artemis run
,关闭:./artemis stop
- 下载解压后进入bin目录,使用命令创建broker设置用户名密码等
如果不使用独立模式,还可以使用嵌入式模式,以
Artemis
为例:引入嵌入式服务器依赖
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>artemis-jms-server</artifactId> <version>2.13.0</version> </dependency>
配置嵌入式启动
spring.artemis.mode=embedded
引入Maven依赖
ActiveMQ
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
ActiveMQ Artemis
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-artemis</artifactId> </dependency>
相关配置:
application.yml
ActiveMQ
属性 描述 spring.activemq.broker-url url(应该是tcp://URL) spring.activemq.user 用户名(可选) spring.activemq.password 密码(可选) spring.activemq.in-memory 内存模式(默认为true) ActiveMQ Artemis
属性 描述 spring.artemis.host 主机 spring.artemis.port 端口(默认61616) spring.artemis.user 用户名(可选) spring.artemis.password 密码(可选) spring.artemis.mode 模式(默认自动检测)(可设置embedded或native)
默认情况下会监听localhost:61616,如果在开发环境下则不需要配置,但是如果在生产环境则需要进行配置,告诉Spring如何访问
broker
,如:# ActiveMQ Artemis spring: artemis: host: localhost port: 61616 user: root password: root mode: embedded #使用内嵌服务器,需要额外引入artemis-jms-server # ActiveMQ spring: activemq: broker-url: tcp://activemq.tacocloud.com user: tacoweb password: l3tm31n
3. 使用JmsTemplate发送消息
在引入JMS starter dependency(Artemis 或者 ActiveMQ)后,Spring Boot会自动配置JmsTemplate等,我们可以在代码中直接注入此对象使用,进行发送或者接受消息
3.1 相关API
//对于没有指定Destination的方法会使用application.yml中配置的默认的Destination
//发送创建的消息
void send(MessageCreator messageCreator) throws JmsException;
void send(Destination destination, MessageCreator messageCreator) throws JmsException;
void send(String destinationName, MessageCreator messageCreator) throws JmsException;
//将对象通过转换器转换后在发送
void convertAndSend(Object message) throws JmsException;
void convertAndSend(Destination destination, Object message) throws JmsException;
void convertAndSend(String destinationName, Object message) throws JmsException;
//将要发送的消息通过转换器转换后在发送
void convertAndSend(Object message, MessagePostProcessor postProcessor) throws JmsException;
void convertAndSend(Destination destination, Object message, MessagePostProcessor postProcessor) throws JmsException;
void convertAndSend(String destinationName, Object message, MessagePostProcessor postProcessor) throws JmsException;
3.2 Destination配置
默认的Destination
spring: jms: template: default-destination: cn.com.queue
自定义Destination
@Configuration public class MessageConfiguration { @Bean public Destination destination(){ return new ActiveMQQueue("popInStockQueue"); } }
3.3 消息转换器
- 消息转换器可以进行非标准化的Message对象与目标Message对象之间的互相转换
- Spring Message提供了对于常见任务场景的消息转换器
消息转换器 | 描述 |
---|---|
MappingJackson2MessageConverter | 使用Jackson库进行message与json之间的转换 |
MarshallingMessageConverter | 使用JAXB进行message与XML之间的转换 |
MessagingMessageConverter | Converts a Message from the messaging abstraction to and from a Message using an underlying MessageConverter for the payload and a JmsHeaderMapper to map the JMS headers to and from standard message headers |
SimpleMessageConverter | 默认转换器,对于对象类型需要实现Serializable接口,进行Strings 与 TextMessage; byte arrays 与 BytesMessage; Maps 与 MapMessage; Serializable objects 与 ObjectMessage之间的转换 |
使用指定的消息转换器
@Configuration public class MessageConfiguration { //注册一个指定的消息转换器,就会使用改指定的消息转换器 //类为:org.springframework.jms.support.converter.MappingJackson2MessageConverter @Bean public MappingJackson2MessageConverter messageConverter(){ MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter(); /** 默认: 使用完整类名作为标记 **/ //messageConverter.setTypeIdPropertyName("_typeId"); //输出: ActiveMQMessage[null]:PERSISTENT/ClientMessageImpl[messageID=0, durable=true, address=null,userID=null,properties=TypedProperties[_typeId=cn.tacos.tacocloud.domain.jpa.PopInStock]] /** 重新命名_typeId的值(仅对指定的类型更改) **/ messageConverter.setTypeIdPropertyName("_typeId"); messageConverter.setTypeIdMappings(Map.of("popInStock", PopInStock.class)); //输出: ActiveMQMessage[null]:PERSISTENT/ClientMessageImpl[messageID=0, durable=true, address=null,userID=null,properties=TypedProperties[_typeId=popInStock]] return messageConverter; } }
3.4 发送Message
//发送Message
@Component
public class Producer {
private JmsTemplate jmsTemplate;
@Autowired
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
/** 1.使用默认的消息转换器: SimpleMessageConverter
* 注意: 对象类型必须实现Serializable接口 **/
//使用默认的destination
public void sendPopInStock(PopInStock popInStock) {
jmsTemplate.send(session -> session.createObjectMessage(popInStock));
}
//使用自定义的destination
@Autowired
private Destination popInStockQueue;
public void sendPopInStockByDestination(PopInStock popInStock) {
jmsTemplate.send(popInStockQueue, session -> session.createObjectMessage(popInStock));
}
//直接指定destination
public void sendPopInStockOfDestination(PopInStock popInStock) {
jmsTemplate.send("taco.popInStock.queue", session -> session.createObjectMessage(popInStock));
}
/** 使用指定的消息转换器 **/
//使用MappingJackson2MessageConverter消息转换器: 对象类型无需实现Serializable接口
public void sendPopInStockConvert(PopInStock popInStock){
jmsTemplate.convertAndSend("taco.popInStock.queue", popInStock,this::postProcessMessage);
}
private Message postProcessMessage(Message message) throws JMSException {
//message为使用转换器转换过的消息
/************发送前给message增加额外的信息********/
message.setStringProperty("description", "pop");
System.out.println(message);
return message;
}
}
4.使用JmsTemplate接收消息
Message的接收有两种模式:
pull model
: 请求一个消息直到接收到为止(默认方式)push model
: 当消息到达时,调用消息处理代码,可通过消息监听器实现@JmsListener
4.1 API
/** 以下方法都是pull model模式 **/
//接收默认destination的消息
Message receive() throws JmsException;
//接收指定destination的消息
Message receive(Destination destination) throws JmsException;
Message receive(String destinationName) throws JmsException;
//接收消息并使用消息转换器进行转换
Object receiveAndConvert() throws JmsException;
Object receiveAndConvert(Destination destination) throws JmsException;
Object receiveAndConvert(String destinationName) throws JmsException;
4.2 接收Message
pull model
/** * 消费者: 接收消息 */ @Component public class Consumer { private JmsTemplate jmsTemplate; @Autowired public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } @Autowired private MappingJackson2MessageConverter mappingJackson2MessageConverter; public PopInStock receivePopInStockConvert() throws JMSException { //return (PopInStock) jmsTemplate.receiveAndConvert("taco.popInStock.queue"); //与下等价 Message message = jmsTemplate.receive("taco.popInStock.queue"); return (PopInStock) mappingJackson2MessageConverter.fromMessage(message); } }
push model
: 创建监听器即可@Component public class MessageListener { @JmsListener(destination = "taco.popInStock.queue") public void receive(PopInStock popInStock){ //处理消息 //System.out.println(1); //System.out.println(popInStock); } }
基于AMQP协议的RabbitMQ
AMQP
(Advanced Message Queue Protocol 高级消息队列协议): 是一个网络协议,它支持符合条件的客户端和消息代理中间件(message middleware broker)进行通讯RabbitMQ
是AMQP协议的实现者,所以AMQP中的概念和准则也适用于RabbitMQ。
1. RabbitMQ准备(详情参考官方文档)
1.1 安装
以windows为例: 推荐
Using chocolatey
方式,在PowerShell窗口行运行以下命令安装即可,会提示先安装Erlang
choco install rabbitmq
1.2 创建用户
默认会有一个
guest
用户,密码为guest
,但是只在连接localhost
时有效进入安装目录下的
sbin
文件夹下打开PowerShell窗口, 如C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.4\sbin>
目录下,该目录下有rabbitmqctl.bat
文件使用命令创建用户,(也可在管理界面创建用户
http://localhost:15672/
)# 添加用户root 密码 root rabbitmqctl.bat add_user root root #设置权限 # / 表示virtual host 即用户所属与此vhost,/ 为默认存在的vhost # 第一个 .* 表示读每个entity权限 # 第二个 .* 表示写每个entity权限 # 第三个 .* 表示配置每个entity权限 rabbitmqctl.bat set_permissions -p / root .* .* .* #设置tag 为 "administrator",方便该用户管理UI界面(http://localhost:15672/)和HTTP API 访问 rabbitmqctl.bat set_user_tags root administrator #######其他相关命令参考#### #添加virtual host rabbitmqctl add_vhost qa1 #查看所有用户 rabbitmqctl.bat list_users
1.3 Exchange
RabbitMQ接收到messages之后会交由
Exchange
,然后Exchange
根据自身定义的规则(如binding key
会对应一个或者多个queue),将messages放入到相应的queue中默认的几种
exchange
如下Default
: 默认的exchange,message的routing-key
与queue名字相同时放入该queueDirect
: message的routing-key
与exchange的binding key
(对应queue)相同时进行放入queueTopic
: 将message放入到一个或者多个queue上,massage的routing-key
与binding key
匹配时放入binding key
对应的queue(binding key
可能包含通配符,对应一个或者多个queue)Fanout
: 将messages放入到所有的绑定queue上,无需验证routing-key
Headers
: 与Topic
类似,只是基于messages的header values,不根据routing-key
Dead letter
: 捕获不匹配任何定义的exchange规则的message
2. 添加RabbitMQ到Spring boot
2.1 引入Maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.2 部分相关配置属性
属性 | 描述 |
---|---|
spring.rabbitmq.addresses | RabbitMQ broker 地址列表可用,分割,格式如: spring.rabbitmq.addresses=amqp://admin:secret@localhost |
spring.rabbitmq.host | broker的host地址,默认为localhost |
spring.rabbitmq.port | broker的端口,默认为5672 |
spring.rabbitmq.username | 用户名,默认guest |
spring.rabbitmq.password | 密码,默认guest |
spring.rabbitmq.virtual-host | 用户所属的v-host |
spring.rabbitmq.template.exchange | 代码中不指定exchange时默认使用的exchange,默认为Default |
spring.rabbitmq.template.routing-key | 代码中不指定routing-key时默认使用此值 |
spring.rabbitmq.template.receive-timeout | 接收消息时的等待超时时间 |
- 示例
spring:
rabbitmq:
host: localhost
port: 5672
username: root
password: root
template:
#exchange: amq.fanout
#需要先在RabbitMQ中创建queue: popInStock.queue (可通过管理UI界面创建)
routing-key: popInStock.queue
#receive-timeout: 30000
#virtual-host: /
3. 使用RebbitTemplate发送Message
3.1 相关API(类似JmsTemplate)
// Send raw messages
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;
// Send messages converted from objects
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
// Send messages converted from objects with post-processing
void convertAndSend(Object message, MessagePostProcessor mPP) throws AmqpException;
void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
3.2 配置Message Converter
默认情况下使用的是
SimpleMessageConverter
,如果需要更改使用其他的Message Converter则需要配置
- Spring提供了几种Message Converter可供选择
消息转换器 | 描述 |
---|---|
Jackson2JsonMessageConverter | 使用Jackson 2进行对象与JSON之间互转 |
MarshallingMessageConverter | 使用Spring Marshaller 和 Unmarshaller进行转换 |
SerializerMessageConverter | Converts String and native objects of any kind using Spring’s Serializer and Deserializer abstractions |
SimpleMessageConverter | (默认使用)用于转换字符串,字节数组,可序列化类型(实现Serializable接口) |
ContentTypeDelegatingMessageConverter | 根据contentType header委托另一个消息转换器去转换 |
MessagingMessageConverter | Delegates to an underlying MessageConverter for the message conversion and to an AmqpHeaderConverter for the headers |
3.3 发送Message
//发送Message
@Component
public class RabbitMQProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
//使用默认消息转换器
public void sendPopInStock(PopInStock popInStock){
/** 1. 使用默认的routing-key **/
//给message中增加额外信息,无需添加信息时使用空对象即可
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("description","pop");
//使用默认的MessageConverter: SimpleMessageConverter 将对象转换为Message
Message message = rabbitTemplate.getMessageConverter().toMessage(popInStock,messageProperties);
//不指定routing-key进行发送,此时使用配置中的routing-key: popInStock.queue
rabbitTemplate.send(message);
/** 2.使用指定的routing-key发送 **/
//rabbitTemplate.send("popInStock.queue",message);
/** 3. 使用默认的MessageConverter: SimpleMessageConverter 将对象转换为message发送 **/
//rabbitTemplate.convertAndSend(popInStock);
}
/** 使用指定的MessageConverter: Jackson2JsonMessageConverter进行转换 **/
//需要在Configuration配置类中配置Jackson2JsonMessageConverter的bean
public void sendPopInStockConvert(PopInStock popInStock){
rabbitTemplate.convertAndSend(popInStock,this::postProcessMessage);
}
private Message postProcessMessage(Message message) throws AmqpException {
/********** 发送前为Message增加额外信息 ************/
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setHeader("description","pop");
return message;
}
}
4. 使用RebbitTemplate接收Message
4.1 相关API(类似JmsTemplate)
// Receive messages
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
// Receive objects converted from messages
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
// Receive type-safe objects converted from messages
<T> T receiveAndConvert(ParameterizedTypeReference<T> type) throws AmqpException;
<T> T receiveAndConvert(String queueName, ParameterizedTypeReference<T> type) throws AmqpException;
<T> T receiveAndConvert(long timeoutMillis, ParameterizedTypeReference<T> type) throws AmqpException;
<T> T receiveAndConvert(String queueName, long timeoutMillis,ParameterizedTypeReference<T> type) throws AmqpException;
4.2 接收Message
同JMS相同,Message的接收有两种模式:
pull model
: 请求一个消息直到接收到为止(默认方式)push model
: 当消息到达时,调用消息处理代码,可通过消息监听器实现@RabbitListener
pull model
/** * 消费者: 接收消息 */ @Component public class RabbitMQConsumer { @Autowired private RabbitTemplate rabbitTemplate; //接收消息 public PopInStock receivePopInStock(){ //Message message = rabbitTemplate.receive("popInStock.queue"); //return (PopInStock) rabbitTemplate.getMessageConverter().fromMessage(message); ////System.out.println("Jackson2JsonMessageConverter:" + (rabbitTemplate.getMessageConverter() instanceof Jackson2JsonMessageConverter)); //与上等价: 接收并转换消息 return (PopInStock) rabbitTemplate.receiveAndConvert("popInStock.queue"); } //接收消息并设置超时等待时间 public PopInStock receivePopInStockTimeOut(){ Message message = rabbitTemplate.receive("popInStock.queue",3000); if(message != null) return (PopInStock) rabbitTemplate.getMessageConverter().fromMessage(message); else return null; } //接收并转换消息 public PopInStock receivePopInStockConvert(){ //return (PopInStock) rabbitTemplate.receiveAndConvert("popInStock.queue"); //同上传入new ParameterizedTypeReference<PopInStock>(){}无需强转 return rabbitTemplate.receiveAndConvert("popInStock.queue", new ParameterizedTypeReference<>(){}); } }
push model
: 收到相应的Message时会调用此方法@Component public class RabbitMQMessageListener { @RabbitListener(queues = "popInStock.queue") public void receive(PopInStock popInStock){ //从queue: popInStock.queue中接收到PopInStock //...其他操作 System.out.println("push model: " + popInStock); } }
Kafka
Kafka
具有高吞吐量丶低延迟的特点支持多个producer和consumer(多个consumer可以组成几个group,他们共享一个消息流,并保证整个群组对每个给定的消息只处理一次)- 可扩展性: kafka集群支持热扩展
- 持久性丶可靠性: messages被持久化到本地磁盘,并且支持数据备份防止丢失
- 容错性: 允许集群中节点失败
- 高并发: 支持数千个客户端同时读写
1. Kafka准备(详情参考官方文档)
1.1 安装
安装
zookeeper
:Kafka
是基于zookeeper
,因此要先安装并启动zookeeper
才行下载并解压: 在安装目录
conf/
下创建zoo.cfg
配置文件(参考同目录下zoo_sample.cfg
文件)tickTime=2000 # 修改数据保存目录 dataDir=./../tmp/zookeeper clientPort=2181
启动: 进入安装目录下的
bin/
目录下会看到zkServer.sh
文件,开启PowerShell窗口执行命令./zkServer.sh start
即可
安装
kafka
下载并解压: 找到安装目录下的
config/zookeeper.properties
更改其中dataDir
属性自定义数据保存目录启动: 进入
bin
(linux系统)或者bin/windows
下找到kafka-server-start.bat
(linux为.sh),开启PowerShell窗口执行命令#以windows为例: 注意路径正确即可 ./kafka-server-start.bat ./../../config/server.properties
1.2 创建Topic
kafka消息都是基于
Topic
的,是数据写入操作的基本单元,可以包含一个或多个Partion
,创建Topic时可以手动指定Partion
个数,个数与服务器数相当每条消息属于且仅属于一个Topic,Producer(发布)与Consumer(订阅)消息时必须指定具体的Topic
Topic命名不推荐使用’.’或者’_’字符
#此处bin/windows目录下: 创建一个topic: popInStock.topic
kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic popInStock.topic
2. 添加Kafka到Spring boot
2.1 引入Maven依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.1 部分配置属性
spring:
kafka:
bootstrap-servers:
- localhost:9092
template:
#默认topic
default-topic: popInStock.topic
#订阅配置
consumer:
#自定义group-id
group-id: test
#指定发送消息时键值对应序列化类
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
#属性配置,也可以使用java bese方式
properties:
#指定反序列化时信任的包
spring.json.trusted.packages: "cn.tacos.tacocloud.domain.jpa"
#spring.json.remove.type.headers: false
#spring.json.use.type.headers: false
#消息headers中没有指定类型时使用此默认类型反序列号
spring.json.value.default.type: "cn.tacos.tacocloud.domain.jpa.PopInStock"
#发布配置
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
#properties:
#消息headers是否增加类型(默认为true)
#spring.json.add.type.headers: false
3. 使用KafkaTemplate发送Message
@Component
public class KafkaProducer {
//引入kafka依赖之后spring boot中就会自动创建此bean,使用时直接注入即可
@Autowired
private KafkaTemplate<String,PopInStock> kafkaTemplate;
public void send(PopInStock popInStock){
MessageConverter converter = kafkaTemplate.getMessageConverter();
//使用默认的topic发送
//kafkaTemplate.sendDefault(popInStock);
kafkaTemplate.send("popInStock.topic",popInStock);
}
}
4. 接收Message
Kafka只有一种通过监听方式接收Message,通过
@KafkaListener
@Component
public class KafkaMessageListener {
@KafkaListener(topics = "popInStock.topic") //监听topic: popInStock.topic
public void receive(ConsumerRecord<String,PopInStock> record){
//...其他操作
System.out.println("Kafka-receive: "+ record);
}
}