Spring-Integration-flow


概述

  • Integration flow 通俗的说就是定义了一个管道流连接两个端点(输入端,与输出端),

  • 输入端和输出端代表着不同的组件,通过这个管道流可以更加简单的实现messages在不同组件之间的传输

  • 在传输中可以轻松的实现对messages的各种处理操作

1. 摘要

  • Integration flow - File Support实例
  • Integration flow概览
  • Integration flow -Email Support实例

Integration flow-File Support实例

对文件系统的支持,将文件数据通过适配器连接到Integration flow

1. Maven依赖

  • Integration依赖

    <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
  • 因为使用到file作为端点,因此额外引入对file端点支持的依赖

    <dependency>
     <groupId>org.springframework.integration</groupId>
     <artifactId>spring-integration-file</artifactId>
    </dependency>

2. 创建一个Integration flow

2.1 创建
/**
 * 创建一个Integration flow
 * 入口端点: textInChannel
 * 出口端点(endpoint): 需要通过配置指定
 * 两个方法: 将参数抽象为Message对象传入flow中
 * org.springframework.messaging.Message
 * public interface Message<T> {
 *    T getPayload();
 *     MessageHeaders getHeaders();
 * }
 */
@MessagingGateway(defaultRequestChannel = "textInChannel")
public interface FileWriterGateway {
    void writeToFile(@Header(FileHeaders.FILENAME) String filename, String data);
    void writeToFile(@Header(FileHeaders.FILENAME) String filename,byte[] bytes);
}
2.2 配置
  • XML方式

    • xml配置文件: fileWriter-config.xml

      <?xml version="1.0" encoding="UTF-8"?>
      <beans xmlns="http://www.springframework.org/schema/beans"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xmlns:int="http://www.springframework.org/schema/integration"
             xmlns:int-file="http://www.springframework.org/schema/integration/file"
             xsi:schemaLocation="http://www.springframework.org/schema/beans
              https://www.springframework.org/schema/beans/spring-beans.xsd
              http://www.springframework.org/schema/integration
              https://www.springframework.org/schema/integration/spring-integration.xsd
              http://www.springframework.org/schema/integration/file
              https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
      
          <int:channel id="textInChannel" />
          <int:transformer id="upperCase"
                           input-channel="textInChannel"
                           output-channel="fileWriterChannel"
                           expression="payload instanceof T(String) ? payload.toUpperCase() : payload" />
      
          <int:channel id="fileWriterChannel" />
          <!--./temp/files: 在项目根目录下创建temp/files文件夹-->
          <int-file:outbound-channel-adapter id="writer"
                                             channel="fileWriterChannel"
                                             directory="./temp/files"
                                             mode="APPEND"
                                             append-new-line="true" />
      </beans>
    • 引入xml配置文件

      @Configuration
      @ImportResource("classpath:/xml/fileWriter-config.xml")
      public class IntegrationConfiguration {}
  • Java 方式

    @Configuration
    public class IntegrationConfiguration {
        /***** 方式二: Java Base配置 ******/
        @Bean
        @Transformer(inputChannel = "textInChannel",outputChannel = "fileWriterChannel")
        public GenericTransformer<Object,Object> upperCaseTransformer(){
            return data -> data instanceof String ? ((String) data).toUpperCase() : data;
        }
        @Bean
        @ServiceActivator(inputChannel = "fileWriterChannel")
        public FileWritingMessageHandler fileWriter(){
            FileWritingMessageHandler handler = new FileWritingMessageHandler(new File("./temp/files"));//在项目根目录下创建temp/files文件夹
            handler.setExpectReply(false);//指定不需要回复消息
            handler.setFileExistsMode(FileExistsMode.APPEND);//已经存在则添加
            handler.setAppendNewLine(true);//从新的一行还是追加
            return handler;
        }
    
        /**** 创建MessageChannel可省略,spring会自动创建 *****/
        //@Bean
        public MessageChannel textInChannel(){
            return new DirectChannel();
        }
        //@Bean
        public MessageChannel fileWriterChannel(){
            return new DirectChannel();
        }
    }
  • java DSL方式

    @Configuration
    public class IntegrationConfiguration {
        /*** 方式三: Spring Integration's DSL configuration ***/
        @Bean
        public IntegrationFlow fileWriterFlow(){
            return IntegrationFlows.from(MessageChannels.direct("textInChannel"))
                    .<Object,Object>transform(data -> data instanceof String ? ((String) data).toUpperCase() : data)
                    //.channel(MessageChannels.direct("fileWriterChannel")) 可省略
                    .handle(Files
                            .outboundAdapter(new File("./temp/files"))
                            .fileExistsMode(FileExistsMode.APPEND)
                            .appendNewLine(true))
                    .get();
        }
    }
2.3 使用
    //注入bean
    @Autowired
    private FileWriterGateway fileWriterGateway;
    public void test(){
        //使用: 将数据"1234567890"保存到abc.txt文件中
        fileWriterGateway.writeToFile("abc.txt","1234567890");
    }

Spring Integration flow 概览

1. 概述

组件 描述
Channels 通道: 将messages从一端传到另一端
Filters 过滤: 指定条件,允许特定的messages通过flow
Transformers 转换: 将messages或者其值转换为另一种类型信息
Routers 路由: 将messages根据一定规则分派到一个或者多个channels,通常基于messages headers
Splitters 分割: 将传入的messages分割成一个或者多个messages,分派到不同的channels
Aggregators 聚合: 与分割相反,将来自多个channnels的messages合并为一个messages
Service activators 服务: 将messages交由一些java方法处理,然后将返回值发布到输出channel上
Channel adapters 通道适配器: 连接通道和外部系统或者传输,可以实现通道与外部系统的读或者写
Gateways 入口: 通过接口将数据传入到Integration flow中去

Message

信息: Spring Integration flow中数据传输都是抽象为Message进行传输的

package org.springframework.messaging;
public interface Message<T> {
    T getPayload(); //数据载体
    MessageHeaders getHeaders();//header
}

3. Message channels

3.1 概述
  • 消息通道是在Integration flow中连接两个组件进行数据流通的通道,简单来说就是传输Message的管道
  • Spring Integration提供了几种常用的管道实现
消息通道 描述
PublishSubscribeChannel 发布订阅: 将messages发布给所有订阅者
QueueChannel 队列: 将messages发布到一个FIFO队列中,订阅者从队列中取,一个message只会由一个人获取到
PriorityChannel 优先队列: 将messages发布到一个优先队列(优先级规则根据message priority header)中去,订阅者从其中获取
RendezvousChannel QueueChannel类似,不同在于接收方没收到消息之前发送方会阻塞通道,简单说就是发送方和接收方实现了同步
DirectChannel 默认的消息通道,与PublishSubscribeChannel类似,发送方将message发送给单个消费者,然后在此线程中调用消费者,即同步操作,这允许事务跨越通道
ExecutorChannel DirectChannel类似,不同之处在于,message分发是通过Taskexecutor`进行的与发送方不在同一线程,此通道类型不支持跨通道的事务
FluxMessageChannel A Reactive Streams Publisher message channel based on Project Reactor’s Flux
3.2 PublishSubscribeChannel
  • java Configuration方式简介

    //创建
    @Bean
    public MessageChannel orderChannel() {
     return new PublishSubscribeChannel();
    }
    //如果要交给java方法处理则使用注解@ServiceActivator即可
    //@ServiceActivator(inputChannel="orderChannel")
  • java DSL实例

    • 使用

      /**
       * 创建一个Integration flow
       * 入口端点: textInChannel
       * 出口端点(endpoint): 需要通过配置指定
       * 两个方法: 将data数据放入到入口textInChannel中
       */
      @MessagingGateway(defaultRequestChannel = "textInChannel")
      public interface FileWriterGateway {
          void writeToFile(@Header(FileHeaders.FILENAME) String filename, String data);
          void writeToFile(@Header(FileHeaders.FILENAME) String filename,byte[] bytes);
      }
    • 配置

          @Bean
          public IntegrationFlow subscribeFlow(){
              //创建textInChannel并根据此生成flow,并发布到不同的订阅者
              return IntegrationFlows.from(MessageChannels.publishSubscribe("textInChannel"))
                      .transform(data -> data instanceof String ? ((String) data).toUpperCase() : data)
                      .publishSubscribeChannel(publishSubscribeSpec -> publishSubscribeSpec
                              .subscribe(flow -> flow //发布给订阅者A
                                      .channel(MessageChannels.direct("A")) //使用DirectChannel: A
                                      .handle(Files   //保存到文件夹temp/files/A中
                                              .outboundAdapter(new File("./temp/files/A"))
                                              .fileExistsMode(FileExistsMode.APPEND)
                                              .appendNewLine(true))
                              )
                              .subscribe(flow -> flow //发布给订阅者B
                                      .channel(MessageChannels.direct("B")) //使用DirectChannel: B
                                      .handle(Files   //保存到文件夹temp/files/B中
                                              .outboundAdapter(new File("./temp/files/B"))
                                              .fileExistsMode(FileExistsMode.APPEND)
                                              .appendNewLine(true))
                              )
                      )
                      .get();
          }
    3.3 QueueChannel
    • Java configuration方式简介

      @Bean
      public MessageChannel orderChannel() {
       return new QueueChannel();
      }
      //注意: 交给java方法处理则必须设置消费者轮询时间如下: 1000ms
      //消费者每一秒从orderChannel中查询一次
      //@ServiceActivator(inputChannel="orderChannel",poller=@Poller(fixedRate="1000"))
    • 配置DSL实例

          @Bean
          public IntegrationFlow queueFlow(){
              return IntegrationFlows.from(MessageChannels.queue("textInChannel"))
                      .bridge(bridgeHandlerGenericEndpointSpec -> bridgeHandlerGenericEndpointSpec
                              .poller(pollerFactory -> pollerFactory
                                      .fixedRate(1000)
                                      //.fixedDelay(1000)
                                      .maxMessagesPerPoll(1)
                              )
                      )
                      .transform(data -> data instanceof String ? ((String) data).toUpperCase() : data)
                      .channel(MessageChannels.direct("writer"))
                      .handle(Files   //保存到文件夹temp/files中
                              .outboundAdapter(new File("./temp/files"))
                              .fileExistsMode(FileExistsMode.APPEND)
                              .appendNewLine(true))
                      .get();
          }

4. Filters

过滤: 用于在Integration管道中对message进行过滤,允许指定的message通过或者不通过

  • Java Configuration方式

    @Filter(inputChannel="numberChannel",
     outputChannel="evenNumberChannel")
    public boolean evenNumberFilter(Integer number) {
     return number % 2 == 0;
    }
  • DSL方式

        @Bean
        public IntegrationFlow evenNumberFlow(AtomicInteger integerSource) {
            return IntegrationFlows
                ...
                .<Integer>filter((p) -> p % 2 == 0) /**/
                ...
                .get();
        }

5. Transformers

转换: 对message进行一些操作,转换成不同的message,同时可能将payload转为其他类型

  • Java Configuration方式

    @Bean
    @Transformer(inputChannel="numberChannel",
     outputChannel="romanNumberChannel")
    public GenericTransformer<Integer, String> romanNumTransformer() {
     return RomanNumbers::toRoman;
    }
  • DSL方式

    //简单的可以直接写
    @Bean
    public IntegrationFlow transformerFlow() {
     return IntegrationFlows
     ...
     .transform(RomanNumbers::toRoman) /**/
     ...
     .get();
    }
    }
    //对于复杂的逻辑可以单独写一个方法通过Bean方式注入
    @Bean
    public RomanNumberTransformer romanNumberTransformer() {
     return new RomanNumberTransformer();
    }
    @Bean
    public IntegrationFlow transformerFlow(
     RomanNumberTransformer romanNumberTransformer) {
     return IntegrationFlows
     ...
     .transform(romanNumberTransformer) //注入bean
     ...
     .get();
    }

6. Routers

路由: 基于一些路由标准,可以将Integration flow路由到不同的channels

  • Java Configuration方式

    @Bean
    @Router(inputChannel="numberChannel")
    public AbstractMessageRouter evenOddRouter() {
     return new AbstractMessageRouter() {
     @Override
     protected Collection<MessageChannel>
     determineTargetChannels(Message<?> message) {
     Integer number = (Integer) message.getPayload();
     if (number % 2 == 0) {
     return Collections.singleton(evenChannel());
     }
     return Collections.singleton(oddChannel());
     }
     };
    }
    @Bean
    public MessageChannel evenChannel() {
     return new DirectChannel();
    }
    @Bean
    public MessageChannel oddChannel() {
     return new DirectChannel();
    }
  • DSL方式

    @Bean
    public IntegrationFlow numberRoutingFlow(AtomicInteger source) {
     return IntegrationFlows
     ...
     .<Integer, String>route(n -> n%2==0 ? "EVEN":"ODD", mapping -> mapping
     .subFlowMapping("EVEN", sf -> sf
     .<Integer, Integer>transform(n -> n * 10)
     .handle((i,h) -> { ... })
     )
     .subFlowMapping("ODD", sf -> sf
     .transform(RomanNumbers::toRoman)
     .handle((i,h) -> { ... })
     )
     )
     .get();
    }

7. Splitters

分割: 将一个message分割到多个messages中去,以便进行不同的处理

  • Java Configuration方式

    //将消息PurchaseOrder的内容分割取出放入集合
    public class OrderSplitter {
     public Collection<Object> splitOrderIntoParts(PurchaseOrder po) {
     ArrayList<Object> parts = new ArrayList<>();
     parts.add(po.getBillingInfo()); // 分割
     parts.add(po.getLineItems()); //分割
     return parts;
     }
    }
    //从poChannel中输入的message会被分割并保存到一个集合中从splitOrderChannel输出
    @Bean
    @Splitter(inputChannel="poChannel",
     outputChannel="splitOrderChannel")
    public OrderSplitter orderSplitter() {
     return new OrderSplitter();
    }
    //使用路由将集合中不同的对象路由到对应的channel中进行处理
    @Bean
    @Router(inputChannel="splitOrderChannel")
    public MessageRouter splitOrderRouter() {
     PayloadTypeRouter router = new PayloadTypeRouter();
     router.setChannelMapping(
     BillingInfo.class.getName(), "billingInfoChannel");
     router.setChannelMapping(
     List.class.getName(), "lineItemsChannel");
     return router;
    }
    //对lineItemsChannel中的多个list对象进行分割直接放入一个list集合中
    @Splitter(inputChannel="lineItemsChannel", outputChannel="lineItemChannel")
    public List<LineItem> lineItemSplitter(List<LineItem> lineItems) {
     return lineItems;
    }
  • DSL方式

    return IntegrationFlows
     ...
     .split(orderSplitter())
     .<Object, String> route(
     p -> {
     if (p.getClass().isAssignableFrom(BillingInfo.class)) {
     return "BILLING_INFO";
     } else {
     return "LINE_ITEMS";
     }
     }, mapping -> mapping
     .subFlowMapping("BILLING_INFO", sf -> sf
     .<BillingInfo> handle((billingInfo, h) -> {
     ...
     }))
     .subFlowMapping("LINE_ITEMS", sf -> sf
     .split()
     .<LineItem> handle((lineItem, h) -> {
     ...
     }))
     )
     .get();

7. Service activators

从input channel中接收Message,然后使用Messagehandler实现种处理

如果不想处理完就结束,而是处理完成后放入其他Channel中去则使用GenericHandler处理

  • Java Configuration方式

    //1. Messagehandler: 从someChannel接收message处理然后结束
    @Bean
    @ServiceActivator(inputChannel="someChannel")
    public MessageHandler sysoutHandler() {
     return message -> {
     System.out.println("Message payload: " + message.getPayload());
     };
    }
    //2. GenericHandler: 从orderChannel接收message处理,然后从completeOrder输出
    @Bean
    @ServiceActivator(inputChannel="orderChannel",
     outputChannel="completeOrder")
    public GenericHandler<Order> orderHandler(
     OrderRepository orderRepo) {
     return (payload, headers) -> {
     return orderRepo.save(payload);
     };
    }
  • DSL方式

    //1. Messagehandler: 从Channel接收message处理然后结束
    public IntegrationFlow someFlow() {
     return IntegrationFlows
     ...
     .handle(msg -> {
     System.out.println("Message payload: " + msg.getPayload());
     })
     .get();
    }
    //2. GenericHandler: 从orderChannel接收message处理,然后从completeOrder输出
    public IntegrationFlow orderFlow(OrderRepository orderRepo) {
     return IntegrationFlows
     ...
     .<Order>handle((payload, headers) -> {
     return orderRepo.save(payload);
     })
     ...
     .get();
    }

8. GateWays

消息网关: 类似于Channel adapters,但是是双向的,是应用程序组件向Integration flow提交messages的接口

  • Java Configuration方式

    @Component
    @MessagingGateway(defaultRequestChannel="inChannel",
     defaultReplyChannel="outChannel")
    public interface UpperCaseGateway {
     String uppercase(String in);//执行此方法字符串in放入inChannel中,然后从outChannel输出
    }
  • DSL方式

    @Bean
    public IntegrationFlow uppercaseFlow() {
     return IntegrationFlows
     .from("inChannel")
     .<String, String> transform(s -> s.toUpperCase())
     .channel("outChannel")
     .get();
    }

9. Channel adapters

通道适配器: 代表Interaction flow的入口点和出口点,数据通过Inbound channel adapter进入Integration flow,然后通过Outbound channel adapter 出去,是单向的通道

  • 适配器用于支持各种外部系统的数据进入通道
  • spring Integration 提供了多种Endpoint modules可以适配不同类型的数据
  • Java Configuration方式

    //支持Integer类型的endpoint modules
    //每隔一秒获取一个AtomicInteger注入到numberChannel通道
    @Bean
    @InboundChannelAdapter(
     poller=@Poller(fixedRate="1000"), channel="numberChannel")
    public MessageSource<Integer> numberSource(AtomicInteger source) {
     return () -> {
     return new GenericMessage<>(source.getAndIncrement());
     };
    }
    //支持File类型的endpoint modules
    @Bean
    @InboundChannelAdapter(channel="file-channel",
     poller=@Poller(fixedDelay="1000"))
    public MessageSource<File> fileReadingMessageSource() {
     FileReadingMessageSource sourceReader = new FileReadingMessageSource();
     sourceReader.setDirectory(new File(INPUT_DIR));
     sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
     return sourceReader;
    }
  • DSL方式

    //支持Integer类型的endpoint modules
    @Bean
    public IntegrationFlow someFlow(AtomicInteger integerSource) {
     return IntegrationFlows
     .from(integerSource, "getAndIncrement",
     c -> c.poller(Pollers.fixedRate(1000)))
     ...
     .get();
    }
    //支持File类型的endpoint modules
    @Bean
    public IntegrationFlow fileReaderFlow() {
     return IntegrationFlows
     .from(Files.inboundAdapter(new File(INPUT_DIR))
     .patternFilter(FILE_PATTERN))
     .get();
    }

10. Endpoint modules

端点模型: spring Integration提供了20多种与外部系统集成的Endpoint modules,用于创建对应的channel adapters

Module Dependency artifact ID
(Group ID: org.springframework.integration)
AMQP spring-integration-amqp
Spring application events spring-integration-event
RSS and Atom spring-integration-feed
Filesystem spring-integration-file
FTP/FTPS spring-integration-ftp
GemFire spring-integration-gemfire
HTTP spring-integration-http
JDBC spring-integration-jdbc
JPA spring-integration-jpa
JMS spring-integration-jms
Email spring-integration-mail
MongoDB spring-integration-mongodb
MQTT spring-integration-mqtt
Redis spring-integration-redis
RMI spring-integration-rmi
SFTP spring-integration-sftp
STOMP spring-integration-stomp
Stream spring-integration-stream
Syslog spring-integration-syslog
TCP/UDP spring-integration-ip
Twitter spring-integration-twitter
Web Services spring-integration-ws
WebFlux spring-integration-webflux
WebSocket spring-integration-websocket
XMPP spring-integration-xmpp
ZooKeeper spring-integration-zookeeper

Integration flow -Email Support实例

对邮件系统的支持,将邮件数据通过适配器连接到Integration flow然后进行处理

  • Maven依赖

    <dependency>
     <groupId>org.springframework.integration</groupId>
     <artifactId>spring-integration-file</artifactId>
    </dependency>
  • domain

    • 邮件

      @Data
      @ConfigurationProperties(prefix="tacocloud.email")//制定需要获取数据的配置文件
      @Component
      public class EmailProperties {
       private String username;
       private String password;
       private String host;
       private String mailbox;
       private long pollRate = 30000;
       public String getImapUrl() {
       return String.format("imaps://%s:%s@%s/%s",
       this.username, this.password, this.host, this.mailbox);
       }
      }
    • 邮件通过Integration flow处理后转换为Order

      @Data
      public class Order {
       private final String email;
       private List<Taco> tacos = new ArrayList<>();
       public void addTaco(Taco taco) {
       this.tacos.add(taco);
       }
      }
  • application.yml: 邮件地址配置

    tacocloud:
     email:
     host: imap.tacocloud.com
     mailbox: INBOX
     username: taco-in-flow
     password: 1L0v3T4c0s
     poll-rate: 10000
  • Integration flow

    @Configuration
    public class TacoOrderEmailIntegrationConfig {
     @Bean
     public IntegrationFlow tacoOrderEmailFlow(
     EmailProperties emailProps,
     EmailToOrderTransformer emailToOrderTransformer,
     OrderSubmitMessageHandler orderSubmitHandler) {
     return IntegrationFlows
     .from(Mail.imapInboundAdapter(emailProps.getImapUrl()),
     e -> e.poller(
     Pollers.fixedDelay(emailProps.getPollRate())))
     .transform(emailToOrderTransformer)
     .handle(orderSubmitHandler)
     .get();
     }
    //Transformer: 转换为Order
    @Component
    public class EmailToOrderTransformer
     extends AbstractMailMessageTransformer<Order> {
     @Override
     protected AbstractIntegrationMessageBuilder<Order>
     doTransform(Message mailMessage) throws Exception {
     Order tacoOrder = processPayload(mailMessage);
     return MessageBuilder.withPayload(tacoOrder);
     }
     ...
    }
  • 将邮件信息读取创建Order并发送

    • 使用RestTemplate发送

      //将转换后的Order通过POST请求发送
      @Component
      public class OrderSubmitMessageHandler implements GenericHandler<Order> {
       private RestTemplate rest;
       private ApiProperties apiProps;
       public OrderSubmitMessageHandler(
       ApiProperties apiProps, RestTemplate rest) {
       this.apiProps = apiProps;
       this.rest = rest;
       }
       @Override
       public Object handle(Order order, Map<String, Object> headers) {
       rest.postForObject(apiProps.getUrl(), order, String.class);
       return null;
       }
      }
    • 避免硬编码方式的URL,使用配置文件配置的URL

      @Data
      @ConfigurationProperties(prefix="tacocloud.api")
      @Component
      public class ApiProperties {
       private String url;
      }
      tacocloud:
       api:
       url: http://api.tacocloud.com
    • 配置

      • RestTemplate需要引入Maven依赖

        <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
      • 引入spring-boot-starter-web依赖后启动时会自动配置Spring MVC autoconfiguration,因为实际上不需要Spring MVC,可以通过配置关闭Spring MVC autoconfiguration

        spring:
         main:
         web-application-type: none

文章作者: Bryson
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Bryson !
评论
 上一篇
下一篇 
Spring-Message Spring-Message
概述 异步消息传递是一种将消息从一个应用程序间接发送到另一个应用程序而无需等待响应的方法,这种间接提供了通信应用程序之间更松的耦合和更大的可伸缩性。 1.摘要 基于JMS的ActiveMQ Artemis 基于AMQP的RabbitMQ
2020-06-19
  目录