Spring-Message


概述

异步消息传递是一种将消息从一个应用程序间接发送到另一个应用程序而无需等待响应的方法,这种间接提供了通信应用程序之间更松的耦合和更大的可伸缩性。

1.摘要

  • 基于JMS的ActiveMQ Artemis
  • 基于AMQP的RabbitMQ
  • Kafka

使用JMS服务

  • JMS(Java Message Service)是一种java标准,它定义了一种面向消息中间件的公共API接口
  • JMS只是接口,不同的提供商或者开源组织对其有不同的实现,ActiveMQ就是其中之一
  • *缺点: * 只能用于Java平台

1. 配置JMS

  • 选用一种JMS实现,ActiveMQ或ActiveMQ Artemis(推荐: 新一代实现)

    • 参考官方文档安装一个安装一个Artemis 或者 ActiveMQ,如以Artemis为例简单介绍:

      • 下载解压后进入bin目录,使用命令创建broker设置用户名密码等 ./artemis create $directory
      • 进入创建的broker目录中的bin目录下启动: ./artemis run ,关闭: ./artemis stop
    • 如果不使用独立模式,还可以使用嵌入式模式,以Artemis为例:

      • 引入嵌入式服务器依赖

                <dependency>
                    <groupId>org.apache.activemq</groupId>
                    <artifactId>artemis-jms-server</artifactId>
                    <version>2.13.0</version>
                </dependency>
      • 配置嵌入式启动

        spring.artemis.mode=embedded
  • 引入Maven依赖

    • ActiveMQ

      <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-activemq</artifactId>
      </dependency>
    • ActiveMQ Artemis

      <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-artemis</artifactId>
      </dependency>
  • 相关配置: application.yml

    • ActiveMQ

      属性 描述
      spring.activemq.broker-url url(应该是tcp://URL)
      spring.activemq.user 用户名(可选)
      spring.activemq.password 密码(可选)
      spring.activemq.in-memory 内存模式(默认为true)
    • ActiveMQ Artemis

      属性 描述
      spring.artemis.host 主机
      spring.artemis.port 端口(默认61616)
      spring.artemis.user 用户名(可选)
      spring.artemis.password 密码(可选)
      spring.artemis.mode 模式(默认自动检测)(可设置embedded或native)
  • 默认情况下会监听localhost:61616,如果在开发环境下则不需要配置,但是如果在生产环境则需要进行配置,告诉Spring如何访问broker,如:

    # ActiveMQ Artemis
    spring:
      artemis:
        host: localhost
        port: 61616
        user: root
        password: root
        mode: embedded #使用内嵌服务器,需要额外引入artemis-jms-server
    
    # ActiveMQ
    spring:
     activemq:
       broker-url: tcp://activemq.tacocloud.com
       user: tacoweb
       password: l3tm31n

3. 使用JmsTemplate发送消息

在引入JMS starter dependency(Artemis 或者 ActiveMQ)后,Spring Boot会自动配置JmsTemplate等,我们可以在代码中直接注入此对象使用,进行发送或者接受消息

3.1 相关API
    //对于没有指定Destination的方法会使用application.yml中配置的默认的Destination
    //发送创建的消息
    void send(MessageCreator messageCreator) throws JmsException;
    void send(Destination destination, MessageCreator messageCreator) throws JmsException;
    void send(String destinationName, MessageCreator messageCreator) throws JmsException;
    //将对象通过转换器转换后在发送
    void convertAndSend(Object message) throws JmsException;
    void convertAndSend(Destination destination, Object message) throws JmsException;
    void convertAndSend(String destinationName, Object message) throws JmsException;
    //将要发送的消息通过转换器转换后在发送
    void convertAndSend(Object message, MessagePostProcessor postProcessor) throws JmsException;
    void convertAndSend(Destination destination, Object message, MessagePostProcessor postProcessor) throws JmsException;
    void convertAndSend(String destinationName, Object message, MessagePostProcessor postProcessor) throws JmsException;
3.2 Destination配置
  • 默认的Destination

    spring:
      jms:
        template:
          default-destination: cn.com.queue
  • 自定义Destination

    @Configuration
    public class MessageConfiguration {
        @Bean
        public Destination destination(){
            return new ActiveMQQueue("popInStockQueue");
        }
    }
3.3 消息转换器
  • 消息转换器可以进行非标准化的Message对象与目标Message对象之间的互相转换
  • Spring Message提供了对于常见任务场景的消息转换器
消息转换器 描述
MappingJackson2MessageConverter 使用Jackson库进行message与json之间的转换
MarshallingMessageConverter 使用JAXB进行message与XML之间的转换
MessagingMessageConverter Converts a Message from the messaging abstraction to and from a Message using an underlying MessageConverter for the payload and a JmsHeaderMapper to map the JMS headers to and from standard message headers
SimpleMessageConverter 默认转换器,对于对象类型需要实现Serializable接口,进行Strings 与 TextMessage;
byte arrays 与 BytesMessage;
Maps 与 MapMessage;
Serializable objects 与 ObjectMessage之间的转换
  • 使用指定的消息转换器

    @Configuration
    public class MessageConfiguration {
        //注册一个指定的消息转换器,就会使用改指定的消息转换器
        //类为:org.springframework.jms.support.converter.MappingJackson2MessageConverter
        @Bean
        public MappingJackson2MessageConverter messageConverter(){
            MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
            /** 默认: 使用完整类名作为标记 **/
            //messageConverter.setTypeIdPropertyName("_typeId");
            //输出: ActiveMQMessage[null]:PERSISTENT/ClientMessageImpl[messageID=0, durable=true, address=null,userID=null,properties=TypedProperties[_typeId=cn.tacos.tacocloud.domain.jpa.PopInStock]]
    
            /** 重新命名_typeId的值(仅对指定的类型更改) **/
            messageConverter.setTypeIdPropertyName("_typeId");
            messageConverter.setTypeIdMappings(Map.of("popInStock", PopInStock.class));
            //输出: ActiveMQMessage[null]:PERSISTENT/ClientMessageImpl[messageID=0, durable=true, address=null,userID=null,properties=TypedProperties[_typeId=popInStock]]
            return messageConverter;
        }
    }
3.4 发送Message
//发送Message
@Component
public class Producer {
    private JmsTemplate jmsTemplate;
    @Autowired
    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    /** 1.使用默认的消息转换器: SimpleMessageConverter
     * 注意: 对象类型必须实现Serializable接口  **/
    //使用默认的destination
    public void sendPopInStock(PopInStock popInStock) {
        jmsTemplate.send(session -> session.createObjectMessage(popInStock));
    }
    //使用自定义的destination
    @Autowired
    private Destination popInStockQueue;
    public void sendPopInStockByDestination(PopInStock popInStock) {
        jmsTemplate.send(popInStockQueue, session -> session.createObjectMessage(popInStock));
    }
    //直接指定destination
    public void sendPopInStockOfDestination(PopInStock popInStock) {
        jmsTemplate.send("taco.popInStock.queue", session -> session.createObjectMessage(popInStock));
    }

    /** 使用指定的消息转换器 **/
    //使用MappingJackson2MessageConverter消息转换器: 对象类型无需实现Serializable接口
    public void sendPopInStockConvert(PopInStock popInStock){
        jmsTemplate.convertAndSend("taco.popInStock.queue", popInStock,this::postProcessMessage);
    }
    private Message postProcessMessage(Message message) throws JMSException {
        //message为使用转换器转换过的消息
        /************发送前给message增加额外的信息********/
        message.setStringProperty("description", "pop");
        System.out.println(message);
        return message;
    }
}

4.使用JmsTemplate接收消息

Message的接收有两种模式:

  • pull model : 请求一个消息直到接收到为止(默认方式)
  • push model : 当消息到达时,调用消息处理代码,可通过消息监听器实现@JmsListener
4.1 API
/** 以下方法都是pull model模式 **/
//接收默认destination的消息
Message receive() throws JmsException;
//接收指定destination的消息
Message receive(Destination destination) throws JmsException;
Message receive(String destinationName) throws JmsException;
//接收消息并使用消息转换器进行转换
Object receiveAndConvert() throws JmsException;
Object receiveAndConvert(Destination destination) throws JmsException;
Object receiveAndConvert(String destinationName) throws JmsException;
4.2 接收Message
  • pull model

    /**
     * 消费者: 接收消息
     */
    @Component
    public class Consumer {
        private JmsTemplate jmsTemplate;
        @Autowired
        public void setJmsTemplate(JmsTemplate jmsTemplate) {
            this.jmsTemplate = jmsTemplate;
        }
        @Autowired
        private MappingJackson2MessageConverter mappingJackson2MessageConverter;
        public PopInStock receivePopInStockConvert() throws JMSException {
            //return (PopInStock) jmsTemplate.receiveAndConvert("taco.popInStock.queue");
            //与下等价
            Message message = jmsTemplate.receive("taco.popInStock.queue");
            return (PopInStock) mappingJackson2MessageConverter.fromMessage(message);
        }
    }
  • push model : 创建监听器即可

    @Component
    public class MessageListener {
        @JmsListener(destination = "taco.popInStock.queue")
        public void receive(PopInStock popInStock){
            //处理消息
            //System.out.println(1);
            //System.out.println(popInStock);
        }
    }

基于AMQP协议的RabbitMQ

  • AMQP(Advanced Message Queue Protocol 高级消息队列协议): 是一个网络协议,它支持符合条件的客户端和消息代理中间件(message middleware broker)进行通讯
  • RabbitMQ是AMQP协议的实现者,所以AMQP中的概念和准则也适用于RabbitMQ。

1. RabbitMQ准备(详情参考官方文档)

1.1 安装
  • 以windows为例: 推荐 Using chocolatey方式,在PowerShell窗口行运行以下命令安装即可,会提示先安装Erlang

    choco install rabbitmq
1.2 创建用户
  • 默认会有一个guest用户,密码为guest ,但是只在连接localhost时有效

  • 进入安装目录下的sbin文件夹下打开PowerShell窗口, 如C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.4\sbin>目录下,该目录下有rabbitmqctl.bat文件

  • 使用命令创建用户,(也可在管理界面创建用户http://localhost:15672/)

    # 添加用户root 密码 root
    rabbitmqctl.bat add_user root root
    #设置权限
    # / 表示virtual host 即用户所属与此vhost,/ 为默认存在的vhost
    # 第一个 .* 表示读每个entity权限
    # 第二个 .* 表示写每个entity权限
    # 第三个 .* 表示配置每个entity权限
    rabbitmqctl.bat set_permissions -p / root .* .* .*
    #设置tag 为 "administrator",方便该用户管理UI界面(http://localhost:15672/)和HTTP API 访问
    rabbitmqctl.bat set_user_tags root administrator
    
    #######其他相关命令参考####
    #添加virtual host
    rabbitmqctl add_vhost qa1
    #查看所有用户
    rabbitmqctl.bat list_users
1.3 Exchange
  • RabbitMQ接收到messages之后会交由Exchange ,然后Exchange根据自身定义的规则(如binding key会对应一个或者多个queue),将messages放入到相应的queue中

  • 默认的几种exchange如下

    • Default : 默认的exchange,message的 routing-key与queue名字相同时放入该queue
    • Direct : message的routing-key与exchange的binding key(对应queue)相同时进行放入queue
    • Topic : 将message放入到一个或者多个queue上,massage的routing-keybinding key匹配时放入binding key对应的queue(binding key可能包含通配符,对应一个或者多个queue)
    • Fanout : 将messages放入到所有的绑定queue上,无需验证routing-key
    • Headers : 与Topic类似,只是基于messages的header values,不根据routing-key
    • Dead letter : 捕获不匹配任何定义的exchange规则的message

2. 添加RabbitMQ到Spring boot

2.1 引入Maven依赖
<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.2 部分相关配置属性
属性 描述
spring.rabbitmq.addresses RabbitMQ broker 地址列表可用,分割,格式如:
spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring.rabbitmq.host broker的host地址,默认为localhost
spring.rabbitmq.port broker的端口,默认为5672
spring.rabbitmq.username 用户名,默认guest
spring.rabbitmq.password 密码,默认guest
spring.rabbitmq.virtual-host 用户所属的v-host
spring.rabbitmq.template.exchange 代码中不指定exchange时默认使用的exchange,默认为Default
spring.rabbitmq.template.routing-key 代码中不指定routing-key时默认使用此值
spring.rabbitmq.template.receive-timeout 接收消息时的等待超时时间
  • 示例
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: root
    password: root
    template:
      #exchange: amq.fanout
      #需要先在RabbitMQ中创建queue: popInStock.queue (可通过管理UI界面创建)
      routing-key: popInStock.queue
      #receive-timeout: 30000
    #virtual-host: /

3. 使用RebbitTemplate发送Message

3.1 相关API(类似JmsTemplate)
    // Send raw messages
    void send(Message message) throws AmqpException;
    void send(String routingKey, Message message) throws AmqpException;
    void send(String exchange, String routingKey, Message message) throws AmqpException;
    // Send messages converted from objects
    void convertAndSend(Object message) throws AmqpException;
    void convertAndSend(String routingKey, Object message) throws AmqpException;
    void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
    // Send messages converted from objects with post-processing
    void convertAndSend(Object message, MessagePostProcessor mPP) throws AmqpException;
    void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
    void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor  messagePostProcessor) throws AmqpException;
3.2 配置Message Converter

默认情况下使用的是SimpleMessageConverter ,如果需要更改使用其他的Message Converter则需要配置

  • Spring提供了几种Message Converter可供选择
消息转换器 描述
Jackson2JsonMessageConverter 使用Jackson 2进行对象与JSON之间互转
MarshallingMessageConverter 使用Spring Marshaller 和 Unmarshaller进行转换
SerializerMessageConverter Converts String and native objects of any kind using Spring’s Serializer and Deserializer abstractions
SimpleMessageConverter (默认使用)用于转换字符串,字节数组,可序列化类型(实现Serializable接口)
ContentTypeDelegatingMessageConverter 根据contentType header委托另一个消息转换器去转换
MessagingMessageConverter Delegates to an underlying MessageConverter for the message conversion and to an AmqpHeaderConverter for the headers
3.3 发送Message
//发送Message
@Component
public class RabbitMQProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //使用默认消息转换器
    public void sendPopInStock(PopInStock popInStock){
        /** 1. 使用默认的routing-key **/
        //给message中增加额外信息,无需添加信息时使用空对象即可
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("description","pop");
        //使用默认的MessageConverter: SimpleMessageConverter 将对象转换为Message
        Message message = rabbitTemplate.getMessageConverter().toMessage(popInStock,messageProperties);
        //不指定routing-key进行发送,此时使用配置中的routing-key: popInStock.queue
        rabbitTemplate.send(message);

        /** 2.使用指定的routing-key发送 **/
        //rabbitTemplate.send("popInStock.queue",message);
        /** 3. 使用默认的MessageConverter: SimpleMessageConverter 将对象转换为message发送 **/
        //rabbitTemplate.convertAndSend(popInStock);
    }

    /** 使用指定的MessageConverter: Jackson2JsonMessageConverter进行转换 **/
    //需要在Configuration配置类中配置Jackson2JsonMessageConverter的bean
    public void sendPopInStockConvert(PopInStock popInStock){
        rabbitTemplate.convertAndSend(popInStock,this::postProcessMessage);
    }
    private Message postProcessMessage(Message message) throws AmqpException {
        /********** 发送前为Message增加额外信息 ************/
        MessageProperties messageProperties = message.getMessageProperties();
        messageProperties.setHeader("description","pop");
        return message;
    }
}

4. 使用RebbitTemplate接收Message

4.1 相关API(类似JmsTemplate)
// Receive messages
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
// Receive objects converted from messages
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
// Receive type-safe objects converted from messages
<T> T receiveAndConvert(ParameterizedTypeReference<T> type) throws AmqpException;
<T> T receiveAndConvert(String queueName, ParameterizedTypeReference<T> type) throws AmqpException;
<T> T receiveAndConvert(long timeoutMillis, ParameterizedTypeReference<T> type) throws AmqpException;
<T> T receiveAndConvert(String queueName, long timeoutMillis,ParameterizedTypeReference<T> type) throws AmqpException;
4.2 接收Message

同JMS相同,Message的接收有两种模式:

  • pull model : 请求一个消息直到接收到为止(默认方式)
  • push model : 当消息到达时,调用消息处理代码,可通过消息监听器实现@RabbitListener
  • pull model

    /**
     * 消费者: 接收消息
     */
    @Component
    public class RabbitMQConsumer {
        @Autowired
        private RabbitTemplate rabbitTemplate;
        //接收消息
        public PopInStock receivePopInStock(){
            //Message message = rabbitTemplate.receive("popInStock.queue");
            //return (PopInStock) rabbitTemplate.getMessageConverter().fromMessage(message);
    
            ////System.out.println("Jackson2JsonMessageConverter:" + (rabbitTemplate.getMessageConverter() instanceof Jackson2JsonMessageConverter));
            //与上等价: 接收并转换消息
            return (PopInStock) rabbitTemplate.receiveAndConvert("popInStock.queue");
        }
        //接收消息并设置超时等待时间
        public PopInStock receivePopInStockTimeOut(){
            Message message = rabbitTemplate.receive("popInStock.queue",3000);
            if(message != null) return (PopInStock) rabbitTemplate.getMessageConverter().fromMessage(message);
            else return null;
        }
        //接收并转换消息
        public PopInStock receivePopInStockConvert(){
            //return (PopInStock) rabbitTemplate.receiveAndConvert("popInStock.queue");
            //同上传入new ParameterizedTypeReference<PopInStock>(){}无需强转
            return rabbitTemplate.receiveAndConvert("popInStock.queue", new ParameterizedTypeReference<>(){});
        }
    }
    
  • push model : 收到相应的Message时会调用此方法

    @Component
    public class RabbitMQMessageListener {
        @RabbitListener(queues = "popInStock.queue")
        public void receive(PopInStock popInStock){
            //从queue: popInStock.queue中接收到PopInStock
            //...其他操作
            System.out.println("push model: " + popInStock);
        }
    }

Kafka

  • Kafka 具有高吞吐量丶低延迟的特点支持多个producer和consumer(多个consumer可以组成几个group,他们共享一个消息流,并保证整个群组对每个给定的消息只处理一次)
  • 可扩展性: kafka集群支持热扩展
  • 持久性丶可靠性: messages被持久化到本地磁盘,并且支持数据备份防止丢失
  • 容错性: 允许集群中节点失败
  • 高并发: 支持数千个客户端同时读写

1. Kafka准备(详情参考官方文档)

1.1 安装
  • 安装zookeeper : Kafka是基于zookeeper,因此要先安装并启动zookeeper才行

    • 下载并解压: 在安装目录conf/下创建zoo.cfg配置文件(参考同目录下zoo_sample.cfg文件)

      tickTime=2000
      # 修改数据保存目录
      dataDir=./../tmp/zookeeper
      clientPort=2181
    • 启动: 进入安装目录下的bin/目录下会看到zkServer.sh文件,开启PowerShell窗口执行命令./zkServer.sh start即可

  • 安装kafka

    • 下载并解压: 找到安装目录下的config/zookeeper.properties更改其中dataDir属性自定义数据保存目录

    • 启动: 进入bin(linux系统)或者bin/windows下找到kafka-server-start.bat(linux为.sh),开启PowerShell窗口执行命令

      #以windows为例: 注意路径正确即可
      ./kafka-server-start.bat ./../../config/server.properties
1.2 创建Topic
  • kafka消息都是基于Topic的,是数据写入操作的基本单元,可以包含一个或多个Partion,创建Topic时可以手动指定Partion个数,个数与服务器数相当

  • 每条消息属于且仅属于一个Topic,Producer(发布)与Consumer(订阅)消息时必须指定具体的Topic

  • Topic命名不推荐使用’.’或者’_’字符

#此处bin/windows目录下: 创建一个topic: popInStock.topic
kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic popInStock.topic

2. 添加Kafka到Spring boot

2.1 引入Maven依赖
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
2.1 部分配置属性
spring:
  kafka:
    bootstrap-servers:
      - localhost:9092
    template:
      #默认topic
      default-topic: popInStock.topic
    #订阅配置
    consumer:
      #自定义group-id
      group-id: test
      #指定发送消息时键值对应序列化类
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      #属性配置,也可以使用java bese方式
      properties:
          #指定反序列化时信任的包
        spring.json.trusted.packages: "cn.tacos.tacocloud.domain.jpa"
        #spring.json.remove.type.headers: false
        #spring.json.use.type.headers: false
        #消息headers中没有指定类型时使用此默认类型反序列号
        spring.json.value.default.type: "cn.tacos.tacocloud.domain.jpa.PopInStock"
    #发布配置
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      #properties:
          #消息headers是否增加类型(默认为true)
        #spring.json.add.type.headers: false

3. 使用KafkaTemplate发送Message

@Component
public class KafkaProducer {
    //引入kafka依赖之后spring boot中就会自动创建此bean,使用时直接注入即可
    @Autowired
    private KafkaTemplate<String,PopInStock> kafkaTemplate;
    public void send(PopInStock popInStock){
        MessageConverter converter = kafkaTemplate.getMessageConverter();
        //使用默认的topic发送
        //kafkaTemplate.sendDefault(popInStock);
        kafkaTemplate.send("popInStock.topic",popInStock);
    }
}

4. 接收Message

Kafka只有一种通过监听方式接收Message,通过@KafkaListener

@Component
public class KafkaMessageListener {
    @KafkaListener(topics = "popInStock.topic") //监听topic: popInStock.topic
    public void receive(ConsumerRecord<String,PopInStock> record){
        //...其他操作
        System.out.println("Kafka-receive: "+ record);
    }
}

文章作者: Bryson
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Bryson !
评论
 上一篇
Spring-Integration-flow Spring-Integration-flow
概述 Integration flow 通俗的说就是定义了一个管道流连接两个端点(输入端,与输出端), 输入端和输出端代表着不同的组件,通过这个管道流可以更加简单的实现messages在不同组件之间的传输 在传输中可以轻松的实现对me
2020-07-06
下一篇 
Spring-使用REST服务 Spring-使用REST服务
概述 对于RESTful API,返回的一般是json字符串,如果后台想调用别人的RESTful API则需要解析json Spring提供了更简单的方式可以自动将数据封装为对象使用 1. 摘要 通过RestTemplate调用REST
2020-06-09
  目录