概述
Integration flow
通俗的说就是定义了一个管道流连接两个端点(输入端,与输出端),输入端和输出端代表着不同的组件,通过这个管道流可以更加简单的实现messages在不同组件之间的传输
在传输中可以轻松的实现对messages的各种处理操作
1. 摘要
- Integration flow - File Support实例
- Integration flow概览
- Integration flow -Email Support实例
Integration flow-File Support实例
对文件系统的支持,将文件数据通过适配器连接到Integration flow
1. Maven依赖
Integration依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency>
因为使用到file作为端点,因此额外引入对file端点支持的依赖
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-file</artifactId> </dependency>
2. 创建一个Integration flow
2.1 创建
/**
* 创建一个Integration flow
* 入口端点: textInChannel
* 出口端点(endpoint): 需要通过配置指定
* 两个方法: 将参数抽象为Message对象传入flow中
* org.springframework.messaging.Message
* public interface Message<T> {
* T getPayload();
* MessageHeaders getHeaders();
* }
*/
@MessagingGateway(defaultRequestChannel = "textInChannel")
public interface FileWriterGateway {
void writeToFile(@Header(FileHeaders.FILENAME) String filename, String data);
void writeToFile(@Header(FileHeaders.FILENAME) String filename,byte[] bytes);
}
2.2 配置
XML方式
xml配置文件:
fileWriter-config.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-file="http://www.springframework.org/schema/integration/file" xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/file https://www.springframework.org/schema/integration/file/spring-integration-file.xsd"> <int:channel id="textInChannel" /> <int:transformer id="upperCase" input-channel="textInChannel" output-channel="fileWriterChannel" expression="payload instanceof T(String) ? payload.toUpperCase() : payload" /> <int:channel id="fileWriterChannel" /> <!--./temp/files: 在项目根目录下创建temp/files文件夹--> <int-file:outbound-channel-adapter id="writer" channel="fileWriterChannel" directory="./temp/files" mode="APPEND" append-new-line="true" /> </beans>
引入xml配置文件
@Configuration @ImportResource("classpath:/xml/fileWriter-config.xml") public class IntegrationConfiguration {}
Java 方式
@Configuration public class IntegrationConfiguration { /***** 方式二: Java Base配置 ******/ @Bean @Transformer(inputChannel = "textInChannel",outputChannel = "fileWriterChannel") public GenericTransformer<Object,Object> upperCaseTransformer(){ return data -> data instanceof String ? ((String) data).toUpperCase() : data; } @Bean @ServiceActivator(inputChannel = "fileWriterChannel") public FileWritingMessageHandler fileWriter(){ FileWritingMessageHandler handler = new FileWritingMessageHandler(new File("./temp/files"));//在项目根目录下创建temp/files文件夹 handler.setExpectReply(false);//指定不需要回复消息 handler.setFileExistsMode(FileExistsMode.APPEND);//已经存在则添加 handler.setAppendNewLine(true);//从新的一行还是追加 return handler; } /**** 创建MessageChannel可省略,spring会自动创建 *****/ //@Bean public MessageChannel textInChannel(){ return new DirectChannel(); } //@Bean public MessageChannel fileWriterChannel(){ return new DirectChannel(); } }
java DSL方式
@Configuration public class IntegrationConfiguration { /*** 方式三: Spring Integration's DSL configuration ***/ @Bean public IntegrationFlow fileWriterFlow(){ return IntegrationFlows.from(MessageChannels.direct("textInChannel")) .<Object,Object>transform(data -> data instanceof String ? ((String) data).toUpperCase() : data) //.channel(MessageChannels.direct("fileWriterChannel")) 可省略 .handle(Files .outboundAdapter(new File("./temp/files")) .fileExistsMode(FileExistsMode.APPEND) .appendNewLine(true)) .get(); } }
2.3 使用
//注入bean
@Autowired
private FileWriterGateway fileWriterGateway;
public void test(){
//使用: 将数据"1234567890"保存到abc.txt文件中
fileWriterGateway.writeToFile("abc.txt","1234567890");
}
Spring Integration flow 概览
1. 概述
组件 | 描述 |
---|---|
Channels | 通道: 将messages从一端传到另一端 |
Filters | 过滤: 指定条件,允许特定的messages通过flow |
Transformers | 转换: 将messages或者其值转换为另一种类型信息 |
Routers | 路由: 将messages根据一定规则分派到一个或者多个channels,通常基于messages headers |
Splitters | 分割: 将传入的messages分割成一个或者多个messages,分派到不同的channels |
Aggregators | 聚合: 与分割相反,将来自多个channnels的messages合并为一个messages |
Service activators | 服务: 将messages交由一些java方法处理,然后将返回值发布到输出channel上 |
Channel adapters | 通道适配器: 连接通道和外部系统或者传输,可以实现通道与外部系统的读或者写 |
Gateways | 入口: 通过接口将数据传入到Integration flow中去 |
Message
信息: Spring Integration flow中数据传输都是抽象为Message进行传输的
package org.springframework.messaging;
public interface Message<T> {
T getPayload(); //数据载体
MessageHeaders getHeaders();//header
}
3. Message channels
3.1 概述
- 消息通道是在Integration flow中连接两个组件进行数据流通的通道,简单来说就是传输Message的管道
- Spring Integration提供了几种常用的管道实现
消息通道 | 描述 |
---|---|
PublishSubscribeChannel | 发布订阅: 将messages发布给所有订阅者 |
QueueChannel | 队列: 将messages发布到一个FIFO队列中,订阅者从队列中取,一个message只会由一个人获取到 |
PriorityChannel | 优先队列: 将messages发布到一个优先队列(优先级规则根据message priority header)中去,订阅者从其中获取 |
RendezvousChannel | 与QueueChannel 类似,不同在于接收方没收到消息之前发送方会阻塞通道,简单说就是发送方和接收方实现了同步 |
DirectChannel | 默认的消息通道,与PublishSubscribeChannel 类似,发送方将message发送给单个消费者,然后在此线程中调用消费者,即同步操作,这允许事务跨越通道 |
ExecutorChannel | 与DirectChannel 类似,不同之处在于,message分发是通过Taskexecutor`进行的与发送方不在同一线程,此通道类型不支持跨通道的事务 |
FluxMessageChannel | A Reactive Streams Publisher message channel based on Project Reactor’s Flux |
3.2 PublishSubscribeChannel
java Configuration方式简介
//创建 @Bean public MessageChannel orderChannel() { return new PublishSubscribeChannel(); } //如果要交给java方法处理则使用注解@ServiceActivator即可 //@ServiceActivator(inputChannel="orderChannel")
java DSL实例
使用
/** * 创建一个Integration flow * 入口端点: textInChannel * 出口端点(endpoint): 需要通过配置指定 * 两个方法: 将data数据放入到入口textInChannel中 */ @MessagingGateway(defaultRequestChannel = "textInChannel") public interface FileWriterGateway { void writeToFile(@Header(FileHeaders.FILENAME) String filename, String data); void writeToFile(@Header(FileHeaders.FILENAME) String filename,byte[] bytes); }
配置
@Bean public IntegrationFlow subscribeFlow(){ //创建textInChannel并根据此生成flow,并发布到不同的订阅者 return IntegrationFlows.from(MessageChannels.publishSubscribe("textInChannel")) .transform(data -> data instanceof String ? ((String) data).toUpperCase() : data) .publishSubscribeChannel(publishSubscribeSpec -> publishSubscribeSpec .subscribe(flow -> flow //发布给订阅者A .channel(MessageChannels.direct("A")) //使用DirectChannel: A .handle(Files //保存到文件夹temp/files/A中 .outboundAdapter(new File("./temp/files/A")) .fileExistsMode(FileExistsMode.APPEND) .appendNewLine(true)) ) .subscribe(flow -> flow //发布给订阅者B .channel(MessageChannels.direct("B")) //使用DirectChannel: B .handle(Files //保存到文件夹temp/files/B中 .outboundAdapter(new File("./temp/files/B")) .fileExistsMode(FileExistsMode.APPEND) .appendNewLine(true)) ) ) .get(); }
3.3 QueueChannel
Java configuration方式简介
@Bean public MessageChannel orderChannel() { return new QueueChannel(); } //注意: 交给java方法处理则必须设置消费者轮询时间如下: 1000ms //消费者每一秒从orderChannel中查询一次 //@ServiceActivator(inputChannel="orderChannel",poller=@Poller(fixedRate="1000"))
配置DSL实例
@Bean public IntegrationFlow queueFlow(){ return IntegrationFlows.from(MessageChannels.queue("textInChannel")) .bridge(bridgeHandlerGenericEndpointSpec -> bridgeHandlerGenericEndpointSpec .poller(pollerFactory -> pollerFactory .fixedRate(1000) //.fixedDelay(1000) .maxMessagesPerPoll(1) ) ) .transform(data -> data instanceof String ? ((String) data).toUpperCase() : data) .channel(MessageChannels.direct("writer")) .handle(Files //保存到文件夹temp/files中 .outboundAdapter(new File("./temp/files")) .fileExistsMode(FileExistsMode.APPEND) .appendNewLine(true)) .get(); }
4. Filters
过滤: 用于在Integration管道中对message进行过滤,允许指定的message通过或者不通过
Java Configuration方式
@Filter(inputChannel="numberChannel", outputChannel="evenNumberChannel") public boolean evenNumberFilter(Integer number) { return number % 2 == 0; }
DSL方式
@Bean public IntegrationFlow evenNumberFlow(AtomicInteger integerSource) { return IntegrationFlows ... .<Integer>filter((p) -> p % 2 == 0) /**/ ... .get(); }
5. Transformers
转换: 对message进行一些操作,转换成不同的message,同时可能将payload转为其他类型
Java Configuration方式
@Bean @Transformer(inputChannel="numberChannel", outputChannel="romanNumberChannel") public GenericTransformer<Integer, String> romanNumTransformer() { return RomanNumbers::toRoman; }
DSL方式
//简单的可以直接写 @Bean public IntegrationFlow transformerFlow() { return IntegrationFlows ... .transform(RomanNumbers::toRoman) /**/ ... .get(); } }
//对于复杂的逻辑可以单独写一个方法通过Bean方式注入 @Bean public RomanNumberTransformer romanNumberTransformer() { return new RomanNumberTransformer(); } @Bean public IntegrationFlow transformerFlow( RomanNumberTransformer romanNumberTransformer) { return IntegrationFlows ... .transform(romanNumberTransformer) //注入bean ... .get(); }
6. Routers
路由: 基于一些路由标准,可以将Integration flow路由到不同的channels
Java Configuration方式
@Bean @Router(inputChannel="numberChannel") public AbstractMessageRouter evenOddRouter() { return new AbstractMessageRouter() { @Override protected Collection<MessageChannel> determineTargetChannels(Message<?> message) { Integer number = (Integer) message.getPayload(); if (number % 2 == 0) { return Collections.singleton(evenChannel()); } return Collections.singleton(oddChannel()); } }; } @Bean public MessageChannel evenChannel() { return new DirectChannel(); } @Bean public MessageChannel oddChannel() { return new DirectChannel(); }
DSL方式
@Bean public IntegrationFlow numberRoutingFlow(AtomicInteger source) { return IntegrationFlows ... .<Integer, String>route(n -> n%2==0 ? "EVEN":"ODD", mapping -> mapping .subFlowMapping("EVEN", sf -> sf .<Integer, Integer>transform(n -> n * 10) .handle((i,h) -> { ... }) ) .subFlowMapping("ODD", sf -> sf .transform(RomanNumbers::toRoman) .handle((i,h) -> { ... }) ) ) .get(); }
7. Splitters
分割: 将一个message分割到多个messages中去,以便进行不同的处理
Java Configuration方式
//将消息PurchaseOrder的内容分割取出放入集合 public class OrderSplitter { public Collection<Object> splitOrderIntoParts(PurchaseOrder po) { ArrayList<Object> parts = new ArrayList<>(); parts.add(po.getBillingInfo()); // 分割 parts.add(po.getLineItems()); //分割 return parts; } } //从poChannel中输入的message会被分割并保存到一个集合中从splitOrderChannel输出 @Bean @Splitter(inputChannel="poChannel", outputChannel="splitOrderChannel") public OrderSplitter orderSplitter() { return new OrderSplitter(); } //使用路由将集合中不同的对象路由到对应的channel中进行处理 @Bean @Router(inputChannel="splitOrderChannel") public MessageRouter splitOrderRouter() { PayloadTypeRouter router = new PayloadTypeRouter(); router.setChannelMapping( BillingInfo.class.getName(), "billingInfoChannel"); router.setChannelMapping( List.class.getName(), "lineItemsChannel"); return router; } //对lineItemsChannel中的多个list对象进行分割直接放入一个list集合中 @Splitter(inputChannel="lineItemsChannel", outputChannel="lineItemChannel") public List<LineItem> lineItemSplitter(List<LineItem> lineItems) { return lineItems; }
DSL方式
return IntegrationFlows ... .split(orderSplitter()) .<Object, String> route( p -> { if (p.getClass().isAssignableFrom(BillingInfo.class)) { return "BILLING_INFO"; } else { return "LINE_ITEMS"; } }, mapping -> mapping .subFlowMapping("BILLING_INFO", sf -> sf .<BillingInfo> handle((billingInfo, h) -> { ... })) .subFlowMapping("LINE_ITEMS", sf -> sf .split() .<LineItem> handle((lineItem, h) -> { ... })) ) .get();
7. Service activators
从input channel中接收Message,然后使用
Messagehandler
实现种处理如果不想处理完就结束,而是处理完成后放入其他Channel中去则使用
GenericHandler
处理
Java Configuration方式
//1. Messagehandler: 从someChannel接收message处理然后结束 @Bean @ServiceActivator(inputChannel="someChannel") public MessageHandler sysoutHandler() { return message -> { System.out.println("Message payload: " + message.getPayload()); }; } //2. GenericHandler: 从orderChannel接收message处理,然后从completeOrder输出 @Bean @ServiceActivator(inputChannel="orderChannel", outputChannel="completeOrder") public GenericHandler<Order> orderHandler( OrderRepository orderRepo) { return (payload, headers) -> { return orderRepo.save(payload); }; }
DSL方式
//1. Messagehandler: 从Channel接收message处理然后结束 public IntegrationFlow someFlow() { return IntegrationFlows ... .handle(msg -> { System.out.println("Message payload: " + msg.getPayload()); }) .get(); } //2. GenericHandler: 从orderChannel接收message处理,然后从completeOrder输出 public IntegrationFlow orderFlow(OrderRepository orderRepo) { return IntegrationFlows ... .<Order>handle((payload, headers) -> { return orderRepo.save(payload); }) ... .get(); }
8. GateWays
消息网关: 类似于
Channel adapters
,但是是双向的,是应用程序组件向Integration flow提交messages的接口
Java Configuration方式
@Component @MessagingGateway(defaultRequestChannel="inChannel", defaultReplyChannel="outChannel") public interface UpperCaseGateway { String uppercase(String in);//执行此方法字符串in放入inChannel中,然后从outChannel输出 }
DSL方式
@Bean public IntegrationFlow uppercaseFlow() { return IntegrationFlows .from("inChannel") .<String, String> transform(s -> s.toUpperCase()) .channel("outChannel") .get(); }
9. Channel adapters
通道适配器: 代表Interaction flow的入口点和出口点,数据通过
Inbound channel adapter
进入Integration flow,然后通过Outbound channel adapter
出去,是单向的通道
- 适配器用于支持各种外部系统的数据进入通道
- spring Integration 提供了多种
Endpoint modules
可以适配不同类型的数据
Java Configuration方式
//支持Integer类型的endpoint modules //每隔一秒获取一个AtomicInteger注入到numberChannel通道 @Bean @InboundChannelAdapter( poller=@Poller(fixedRate="1000"), channel="numberChannel") public MessageSource<Integer> numberSource(AtomicInteger source) { return () -> { return new GenericMessage<>(source.getAndIncrement()); }; } //支持File类型的endpoint modules @Bean @InboundChannelAdapter(channel="file-channel", poller=@Poller(fixedDelay="1000")) public MessageSource<File> fileReadingMessageSource() { FileReadingMessageSource sourceReader = new FileReadingMessageSource(); sourceReader.setDirectory(new File(INPUT_DIR)); sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN)); return sourceReader; }
DSL方式
//支持Integer类型的endpoint modules @Bean public IntegrationFlow someFlow(AtomicInteger integerSource) { return IntegrationFlows .from(integerSource, "getAndIncrement", c -> c.poller(Pollers.fixedRate(1000))) ... .get(); } //支持File类型的endpoint modules @Bean public IntegrationFlow fileReaderFlow() { return IntegrationFlows .from(Files.inboundAdapter(new File(INPUT_DIR)) .patternFilter(FILE_PATTERN)) .get(); }
10. Endpoint modules
端点模型: spring Integration提供了20多种与外部系统集成的
Endpoint modules
,用于创建对应的channel adapters
Module | Dependency artifact ID (Group ID: org.springframework.integration) |
---|---|
AMQP | spring-integration-amqp |
Spring application events | spring-integration-event |
RSS and Atom | spring-integration-feed |
Filesystem | spring-integration-file |
FTP/FTPS | spring-integration-ftp |
GemFire | spring-integration-gemfire |
HTTP | spring-integration-http |
JDBC | spring-integration-jdbc |
JPA | spring-integration-jpa |
JMS | spring-integration-jms |
spring-integration-mail | |
MongoDB | spring-integration-mongodb |
MQTT | spring-integration-mqtt |
Redis | spring-integration-redis |
RMI | spring-integration-rmi |
SFTP | spring-integration-sftp |
STOMP | spring-integration-stomp |
Stream | spring-integration-stream |
Syslog | spring-integration-syslog |
TCP/UDP | spring-integration-ip |
spring-integration-twitter | |
Web Services | spring-integration-ws |
WebFlux | spring-integration-webflux |
WebSocket | spring-integration-websocket |
XMPP | spring-integration-xmpp |
ZooKeeper | spring-integration-zookeeper |
Integration flow -Email Support实例
对邮件系统的支持,将邮件数据通过适配器连接到Integration flow然后进行处理
Maven依赖
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-file</artifactId> </dependency>
domain
邮件
@Data @ConfigurationProperties(prefix="tacocloud.email")//制定需要获取数据的配置文件 @Component public class EmailProperties { private String username; private String password; private String host; private String mailbox; private long pollRate = 30000; public String getImapUrl() { return String.format("imaps://%s:%s@%s/%s", this.username, this.password, this.host, this.mailbox); } }
邮件通过Integration flow处理后转换为Order
@Data public class Order { private final String email; private List<Taco> tacos = new ArrayList<>(); public void addTaco(Taco taco) { this.tacos.add(taco); } }
application.yml: 邮件地址配置
tacocloud: email: host: imap.tacocloud.com mailbox: INBOX username: taco-in-flow password: 1L0v3T4c0s poll-rate: 10000
Integration flow
@Configuration public class TacoOrderEmailIntegrationConfig { @Bean public IntegrationFlow tacoOrderEmailFlow( EmailProperties emailProps, EmailToOrderTransformer emailToOrderTransformer, OrderSubmitMessageHandler orderSubmitHandler) { return IntegrationFlows .from(Mail.imapInboundAdapter(emailProps.getImapUrl()), e -> e.poller( Pollers.fixedDelay(emailProps.getPollRate()))) .transform(emailToOrderTransformer) .handle(orderSubmitHandler) .get(); }
//Transformer: 转换为Order @Component public class EmailToOrderTransformer extends AbstractMailMessageTransformer<Order> { @Override protected AbstractIntegrationMessageBuilder<Order> doTransform(Message mailMessage) throws Exception { Order tacoOrder = processPayload(mailMessage); return MessageBuilder.withPayload(tacoOrder); } ... }
将邮件信息读取创建Order并发送
使用
RestTemplate
发送//将转换后的Order通过POST请求发送 @Component public class OrderSubmitMessageHandler implements GenericHandler<Order> { private RestTemplate rest; private ApiProperties apiProps; public OrderSubmitMessageHandler( ApiProperties apiProps, RestTemplate rest) { this.apiProps = apiProps; this.rest = rest; } @Override public Object handle(Order order, Map<String, Object> headers) { rest.postForObject(apiProps.getUrl(), order, String.class); return null; } }
避免硬编码方式的URL,使用配置文件配置的URL
@Data @ConfigurationProperties(prefix="tacocloud.api") @Component public class ApiProperties { private String url; }
tacocloud: api: url: http://api.tacocloud.com
配置
RestTemplate需要引入Maven依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
引入
spring-boot-starter-web
依赖后启动时会自动配置Spring MVC autoconfiguration
,因为实际上不需要Spring MVC,可以通过配置关闭Spring MVC autoconfiguration
spring: main: web-application-type: none