Spring WebFlux高级实战
示例代码:https://gitee.com/ixinglan/spring-webflux-demo.git
一、WebFlux作为核心响应式服务器基础
Spring MVC的缺点:
- 不允许在整个请求声明周期中出现非阻塞操作。没有开箱即用的非阻塞HTTP客户端。
- WebMVC 抽象不能支持非阻塞 Servlet 3.1 的所有功能。
- 对于非 Servlet 服务器,重用 Spring Web 功能或变成模块不灵活。
1.1 响应式web内核
响应式Web内核首先需要使用模拟接口和对请求进行处理的方法替换javax.servlet.Servlet.service
方法。
-
更改相关的类和接口,增强和定制 Servlet API 对客户端请求和服务器响应的交互方式。
/** * 请求的封装。 * 获取请求报文体的类型是Flux,表示具备响应式能力。 * DataBuffer是针对字节缓冲区的抽象,便于对特定服务器实现数据交换。 * 除了请求报文体,还有消息头、请求路径、cookie、查询参数等信息,可以在该接口或子接口中提供。 */ interface ServerHttpRequest { // ... Flux<DataBuffer> getBody(); // ... } /** * 响应的封装。 * writeWith方法接收的参数是Publisher,提供了响应式,并与特定响应式库解耦。 * 返回值是Mono<Void>,表示向网络发送数据是一个异步的过程。 * 即,只有当订阅Mono时才会执行发送数据的过程。 * 接收服务器可以根据传输协议的流控支持背压。 */ interface ServerHttpResponse { // ... Mono<Void> writeWith(Publisher<? extends DataBuffer> body); // ... } /** * HTTP请求-响应的容器。 * 这是高层接口,除了HTTP交互,还可以保存框架相关信息。 * 如请求的已恢复的WebSession信息等。 * * */ interface ServerWebExchange { // ... ServerHttpRequest getRequest(); ServerHttpResponse getResponse(); // ... Mono<WebSession> getSession(); // ... }
-
请求的处理程序和过滤器 API
/** * 对应于WebMVC中的DispatcherServlet * 查找请求的处理程序,使用视图解析器渲染视图,因此handle方法不需要返回任何结果。 * * 返回值Mono<Void>提供了异步处理。 * 如果在指定的时间内没有信号出现,可以取消执行。 */ interface WebHandler { Mono<Void> handle(ServerWebExchange exchange); } /** * 过滤器链 */ interface WebFilterChain { Mono<Void> filter(ServerWebExchange exchange); } /** * 过滤器 */ interface WebFilter { Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain); }
-
还需要为这些接口适配不同的服务器。
即与ServerHttpRequest和ServerHttpResponse进行直接交互的组件。同时负责ServerWebExchange的构建,特定的会话存储、本地化解析器等信息的保存.
public interface HttpHandler { Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response); }
通过该适当的抽象,隐藏了服务器引擎的细节,具体服务器的工作方式对 Spring WebFlux 用户不重要。
1.2 响应式Web和MVC框架
Spring Web MVC 模块的关键特性 基于注解。因此,需要为响应式Web 栈提供相同的概念。重用 WebMVC 的基础设施,用 Flux 、Mono 和 Publisher 等响应式类型替换同步通信。
保留与 Spring Web MVC 相同的 HandlerMapping 和 HandlerAdapter 链,使用基于 Reactor 的响应式交互替换实时命令
interface HandlerMapping {
/*
HandlerExecutionChain getHandler(HttpServletRequest request)
*/
Mono<Object> getHandler(ServerWebExchange exchange);
}
interface HandlerAdapter {
boolean supports(Object handler);
/*
ModelAndView handle(HttpServletRequest request, HttpServletResponse
response, Object handler);
*/
Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler);
}
响应式 HandlerMapping 中,两个方法整体上类似,不同之处在于响应式返回Mono 类型支持响应式。
响应式HandlerAdapter 接口中,由于 ServerWebExchange 类同时组合了请求和响应,因此handle 方法的响应式版本更简洁。
遵循这些步骤,我们将得到一个响应式交互模型,而不会破坏整个执行层次结构,从而可以保留现有设计并能以最小的更改重用现有代码。
传入请求,由底层服务器引擎处理。服务器引擎列表不限于基于ServletAPI 的服务器。每个服务器引擎都有自己的响应式适配器,将 HTTP 请求和 HTTP 响应的内部表示映射到ServerHttpRequest 和ServerHttpResponse 。
HttpHandler 阶段,该阶段将给定的 ServerHttpRequest 、ServerHttpResponse 、用户Session 和相关信息组合到 ServerWebExchage 实例中。
WebFilterChain 阶段,它将定义的 WebFilter 组合到链中。然后, WebFilterChain 会负责执行此链中每个 WebFilter 实例的 WebFilter#filter 方法,以过滤传入的ServerWebExchange
如果满足所有过滤条件, WebFilterChain 将调用 WebHandler 实例。
查找 HandlerMapping 实例并调用第一个合适的实例。可以是RouterFunctionMapping、也可以是RequestMappingHandlerMapping 和HandlerMapping 资源。RouterFunctionMapping,引入到WebFlux 之中,超越了纯粹的功能请求处理。
RequestMappingHandlerAdapter 阶段,与以前功能相同,使用响应式流来构建响应式流。
在WebFlux 模块中,默认服务器引擎是Netty。
1.3 基于webFlux的纯函数式web
示例代码:demo-3-1、demo-3-2
1.4 基于webClient的非阻塞跨服务通信
从本质上讲, WebClient 是旧 RestTemplate 的响应式替代品。
WebClient 中有一个函数式API,并提供内置的到 Project Reactor 类型(如 Flux 或 Mono )的映射。
示例:
// 创建的时候指定基础URI
WebClient.create("http://localhost/api")
// 指定请求方法:GET
.get()
// 指定相对URI,并对URI变量进行扩展
// 还可以指定消息头、cookie 和请求主体。
.uri("/users/{id}", userId)
// 指定结果的处理方式
.retrieve()
// 将响应体进行反序列化
.bodyToMono(User.class)
// 进行其他操作
.map(...)
// 订阅触发异步执行,发起远程调用。这里只使用了订阅的副作用。
.subscribe();
WebClient 遵循响应式流规范中描述的行为。
只有通过 subscribe 方法, WebClient 才会建立连接并开始发送数据到远程服务器。
示例代码:demo-3-2 test
1.5 响应式模板引擎
Spring 5.x 和 WebFlux 模块已经放弃支持包括Apache Velocity 在内的许多技术。
Spring WebFlux 与 Web MVC 拥有相同的视图渲染技术。
FreeMarker 不支持数据的响应式呈现和非阻塞呈现,必须将所有歌曲收集到列表中并将收集的数据全部放入Model 中。(示例代码:demo-3-3)
Thymeleaf 支持响应式WebFlux,并为异步和流模板渲染提供更多可能性。Thymeleaf 提供与FreeMarker 类似的功能,并允许编写相同的代码来呈现UI。Thymeleaf 能够将响应式类型用作模板内的数据源,并在流中的新元素可用时呈现模板的一部分。(示例代码:demo-3-4)
1.6 响应式web安全
Spring Web的SpringSecurity 模块通过在任何控制器和Web 处理程序调用之前提供Filter 来设置安全的Web应用程序。
为了支持响应式和非阻塞交互,Spring Security的响应式栈,使用WebFilter并依赖Project Reactor上下文。
尽管ReactiveSecurityContextHolder 的API 看起来很熟悉,但它隐藏了许多陷阱。
例如,可能错误地遵循使用SecurityContextHolder 时的惯用法。这样一来,可能盲目地实现以下示例代码中描述的常见交互:
ReactiveSecurityContextHolder .getContext() .map(SecurityContext::getAuthentication) .block();
就像以前从ThreadLocal 中获取SecurityContext 一样,可能尝试使用 ReactiveSecurityContextHolder 执行相同的操作。但是,当调用 getContext 并使用 block 方法订阅流时,我们在流中配置的是一个空的上下文。 因此,一旦 ReactiveSecurityContextHodler 类尝试访问内部Context,就不会在那里找到可用的 SecurityContext。
示例代码:demo-3-5
1.7 与其他响应式库的交互
WebFlux 使用Project Reactor 3 作为核心构建块,同时,WebFlux 也允许使用其他响应式库。
为了实现跨库互操作性,WebFlux 中的大多数操作基于响应式流规范中的接口。
通过这种方式,我们可以轻松地用 RxJava 2 或 Akka Streams 替换Reactor 3 所编写的代码。
示例代码:demo-3-6
二、SpringWebFlux与SpringWeb Mvc对比
具体如何使用,考虑如下:
-
如果现存的项目是基于Spring MVC的并且没有问题,就别更改了。命令式编程开发、阅读、debug都是最简单的。同时可选择的库也很多,只不过大多数都是阻塞式的。
-
如果项目的技术栈是非阻塞的,则使用WebFlux可以使用与环境相同的模型来执行,WebFlux也提供了服务器的选项(Netty、Tomcat、Jetty、Undertow以及Servlet 3.1及以上的容器),提供了编程模型的选项(基于注解的控制器和函数式web端点),以及响应式库的选项(Reactor、RxJava以及其他的)。
-
如果希望发挥java8 lambda或Kotlin的优势,使用轻量级、函数式的web框架,则可以使用Spring WebFlux函数式web端点的编程模型。Spring WebFlux非常适合小型的应用或没有复杂需求的微服务。
-
在微服务架构中,可以同时使用Spring WebFlux和Spring MVC,或者将Spring WebFlux作为函数式端点来使用。由于它们基于相同的注解编程模型,可以很方便的做到在正确的场合使用正确的工具。
-
一个简单的评估应用的方式是检查应用的依赖。如果使用的是阻塞式的持久化API(JPA,JDBC)或者阻塞式的网络API,Spring MVC基本上是最好的选择。技术上Reactor和RxJava也可以使用分立的线程支持阻塞式的操作,但无法发挥非阻塞web技术栈的全部优势。
-
如果Spring MVC应用需要调用远程服务,可以使用响应式的WebClient 。可以让Spring MVC控制器的方法直接返回响应式类型(Reactor、RxJava或其他的)数据。每个远程调用的延迟越大,各个远程调用之间的依赖越大,响应式模型的优势发挥的越明显。当然,Spring MVC的控制器也可以调用其他响应式组件。
-
如果开发团队很大,就要考虑到转向非阻塞、函数式、声明式编程模型的陡峭的学习曲线。最佳实践是先使用响应式WebClient 做部分转向。然后在小的模块中使用并评估响应式模型带来的优势。一般对于整个项目,没必要全部转向响应式模型。如果不确定响应式编程带来的优势,可以先学习一下非阻塞I/O的工作流程(例如单线程Node.js的并发)以及效果。
三、使用SrpingBoot
3.1 Spring Core中的响应式
Spring 生态系统的核心模块是Spring Core 模块。
Spring 5.x 引入对响应式流和响应式库的原生支持,其中,响应式库包含RxJava 1/2 和ProjectReactor 3。
-
响应式类型转换支持
为了支持响应式流规范所进行的最全面的改进之一是引入了 ReactiveAdapter 和ReactiveAdapterRegistry 。
ReactiveAdapter 类为响应式类型转换提供了两种基本方法,用于将任何类型转换为Publisher
并将其转换回 Object ReactiveAdapterRegistry 为简化交互,使我们能将 ReactiveAdapter 的实例保存在一个位置并提供对它们的通用访问。 -
响应式IO
Spring Core 模块在 byte 缓冲区实例上引入了一个称为 DataBuffer 的抽象。
之所以避免使用 java.nio.ByteBuffer ,主要是为了提供一个既可以支持不同字节缓冲区,又不需要在它们之间进行任何额外的转换的抽象。 例如,为了将 io.netty.buffer.ByteBuf 转换为 ByteBuffer ,必须访问所存储的字节,而这些字节可能需要从堆外空间被拉入到堆中。这可能破坏Netty 提供的高效内存使用和缓冲区回收(重用相同的字节缓冲区)。
Spring DataBuffer 提供特定实现的抽象,能以通用方式使用底层实现。
此外,Spring Core 的第五版引入了 DataBufferUtils 类,能以响应式流的形式与I/O 进行交互(与网络、资源、文件等交互)。
最后,与Spring Core 中响应式相关的最后一个意义重大且不可或缺的特性是响应式编解码器(reactive codecs)。响应式编解码器提供了一种将 DataBuffer 实例流 和 对象流 进行相互转换的简便方式。
3.2 响应式web
3.3 响应式Spring Data
Spring Data 主要提供对底层存储区域的同步阻塞访问。
现在,Spring Data 框架提供了 ReactiveCrudRepository 接口,该接口暴露了Project Reactor的响应式类型。
Spring Data 还提供了几个通过扩展 ReactiveCrudRepository 接口而与存储方法集成的模块。
- 基于 Spring Data Mongo 响应式模块的 MongoDB:与NoSQL 数据库之间的完全响应式非阻塞交互,同时也包含背压控制。
- 基于 Spring Data Cassandra 响应式模块的 Cassandra:与Cassandra 数据存储的异步非阻塞交互,支持基于TCP 流控制的背压。
- 基于 Spring Data Redis 响应式模块的 Redis:通过Lettuce Java 客户端实现的与 Redis 之间的响应式集成。
- 基于 Spring Data Couchbase 响应式模块的 Couchbase:通过基于 RxJava 的驱动程序实现的与 Couchbase 数据库之间的响应式Spring Data 集成。
除了NoSQL 数据库,Spring Data 还引入了 Spring Data JDBC ,与 JDBC 轻量级集成,快速提供响应式 JDBC 连接。
其他 Spring 框架模块的大多数改进以 WebFlux 的响应式能力或响应式 Spring Data 模块为基础。
3.4 响应式Spring Session
Spring 框架中与Spring Web 模块相关的另一个重要更新是 Spring Session 模块中的响应式支持。 Spring Session 引入了 ReactiveSessionRepository,可以使用 Reactor 的 Mono 类型对存储的会话进行异步非阻塞访问。
除此之外,作为响应式 Spring Data 的会话存储,Spring Session 还提供与 Redis 的响应式集成。
可以通过包含以下依赖项来实现分布式 WebSession:
<dependency> <groupId>org.springframework.session</groupId> <artifactId>spring-session-data-redis</artifactId> </dependency> <dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis-reactive</artifactId> </dependency>
3.4 响应式Spring Security
旧的Spring Security 使用 ThreadLocal 作为 SecurityContext 实例的存储方法。在单个Thread 内执行时,该技术很有效,在任何时候,都可以访问存储在ThreadLocal 中的SecurityContext。 但是,在执行异步通信时,该技术就会出现问题。这时,必须提供额外的工作来将 ThreadLocal 内容传输到另一个Thread,并为Thread 实例之间的每个切换实例执行此操作。 尽管Spring 框架通过使用一个额外的 ThreadLocal 扩展简化了 Threads 之间的 SecurityContext 传输,但在基于 Project Reactor 或类似的响应式库应用响应式编程范例时,仍然会有问题。 新一代 Spring Security 采用了 Reactor 上下文功能,以便在 Flux 或 Mono 流中传输安全上下文。通过这种方式,即使在运作着不同执行线程的复杂响应式流中,我们也可以安全地访问安全上下文。
3.6 响应式Spring Cloud
首先,响应式影响了分布式系统的入口点,即网关(gateway)。 很长一段时间,唯一能够将应用程序作为网关运行的 Spring 模块是 Spring Cloud Netflix Zuul 模块。Netflix Zuul 基于使用阻塞同步请求路由的 Servlet API。使处理请求获得更好性能的唯一方法是调整底层服务器线程池。
Spring Cloud 引入了新的 Spring Cloud Gateway 模块,该模块构建于 Spring WebFlux 之上,并在 Project Reactor 3 的支持下提供异步和非阻塞路由。 除了新的网关模块,Spring Cloud Streams 还获得了Project Reactor 的支持,并且引入了更加细粒度的流模型。 为了简化响应式系统的开发,Spring Cloud 引入了一个名为 Spring Cloud Function 的新模块,该模块旨在为构建我们自己的函数即服务(function as a service ,FaaS)解决方案提供必要的组件。 如果没有适当的附加基础设施,Spring Cloud Function 模块将无法应用在普通开发中。Spring Cloud Data Flow 不仅提供了这种可能性,还包含了Spring Cloud Function 的部分功能。
3.7 响应式Spring Test
Spring 生态系统提供了改进后的 Spring Test 和 Spring Boot Test 模块,它们扩展了一系列用于测试响应式 Spring 应用程序的附加功能。 Spring Test 提供了一个 WebTestClient 来测试基于 WebFlux 的 Web 应用程序,同时,Spring Boot Test 使用普通的注解来处理测试套件的自动配置。 同时,为了测试响应式流的 Publisher,Project Reactor 提供了Reactor-Test 模块,它与 Spring Test 和 Spring Boot Test 模块相结合,可以为使用响应式Spring 实现的业务逻辑编写完整的验证套件。
3.8 响应式监控
基于 Project Reactor 和响应式 Spring 框架构建的面向生产的响应式系统应该暴露所有重要的运维指标。 首先,Project Reactor 本身具有内置指标。它提供 Flux#metrics() 方法,可以跟踪响应式流中的不同事件。 Spring 框架生态系统提供了更新后的 Spring Boot Actuator 模块,该模块支持应用程序监控和故障排除的主要指标。 新一代SpringActuator 提供与 WebFlux 的完全集成,并使用其异步、非阻塞编程模型,以便有效地暴露指标端点。
Spring Cloud Sleuth 模块提供了监控和跟踪应用程序的最终选项。该模块提供开箱即用的分布式跟踪,它的一个显著优点是支持 Project Reactor 的响应式编程,因此应用程序中的所有响应式工作流都可以被正确跟踪。 Spring 生态系统不仅改进了内核框架的响应性,还负责面向生产的功能,而且支持详细的应用程序监控(这种监控甚至包括这些功能的响应式解决方案)。
四、webFlux的应用
4.1 基于微服务的系统
WebFlux 的第一个应用是微服务系统。 微服务系统最显著的特点是大量的I/O 通信。 I/O 的存在,尤其是阻塞式I/O,会降低整体系统延迟和吞吐量。
-
微服务网关
Spring Cloud Gateway 是 Spring 官方基于 Spring 5.0,Spring Boot 2.0 和 Project Reactor 等技术开发的网关,Spring Cloud Gateway 旨在为微服务架构提供一种简单而有效的统一的 API 路由管理方式。Spring Cloud Gateway 作为 Spring Cloud 生态系中的网关,目标是替代 Netflix ZUUL,其不仅提供统一的路由方式,并且基于 Filter 链的方式提供了网关基本的功能,例如:安全,监控/埋点,和限流等。
功能特点:
- 基于 Spring Framework 5,Project Reactor 和 Spring Boot 2.0动态路由
- Predicates 和 Filters 作用于特定路由
- 集成 Hystrix 断路器
- 集成 Spring Cloud DiscoveryClient
- 易于编写的 Predicates 和 Filters
- 限流
- 路径重写
-
大文件上传
示例代码:demo-3-6
-
处理客户端连接速度慢的系统
WebFlux 的第二个应用是构建系统,而这些系统的目标是在缓慢或不稳定网络连接条件下适用于移动设备客户端。要理解为什么WebFlux 在这个领域有用,就要回想一下在处理一个慢速连接时会发生什么。问题在于,将数据从客户端传输到服务器可能花费大量时间,并且相应的响应也可能花费大量时间。在使用单连接单线程模型的情况下,已连接客户端数量越多,系统崩溃的可能性越大。例如,黑客能很容易的通过使用拒绝服务(Denial-of-Service,DoS)攻击使我们的服务器不可用。 相比之下,WebFlux 使我们能在不阻塞工作线程的情况下接受连接。这样,慢速连接不会导致任何问题。在等待传入请求体时,WebFlux 将继续接收其他连接而不会阻塞。响应式流抽象使我们能在需要时消费数据。这意味着服务器可以根据网络的就绪情况控制事件消费。
-
流系统或实时系统
WebFlux 的另一个有用的应用是实时流系统。要了解WebFlux 为什么能在这一点上提供帮助,就要回想实时流系统是什么。 首先,这些系统的特点是低延迟和高吞吐量。在流系统中,大多数数据是从服务器端传出的,因此客户端扮演消费者的角色。通常来自于客户端的事件少于来自于服务器端的事件。但是,在在线游戏等实时系统中,传入数据量等于传出数据量。 使用非阻塞通信可以实现低延迟和高吞吐量。正如前文所述,非阻塞异步通信可以实现高效的资源利用,而基于Netty 或类似框架的系统可以实现最高的吞吐量和最低的延迟。然而,这种响应式框架有其自身的缺点,即使用通道和回调的复杂交互模型。
尽管如此,响应式编程仍然可以巧妙地解决这两个问题。正如第4 章所述,响应式编程,尤其是响应式库(如Reactor 3)可以帮助我们构建一个异步的非阻塞流而只需要很少的开销。这些开销来自基础代码复杂性和可接受的学习曲线。这两种解决方案都包含在WebFlux中。使用Spring框架可以让我们轻松构建这样的系统。
五、Spring WebFlux数据库访问
5.1 响应式持久化库的工作原理
Spring Data 中的响应式存储库通过适配底层数据库驱动来工作。
ReactiveMongoRepository 继承了 ReactiveSortingRepository 和 ReactiveQueryByExampleExecutor 等更多通用接口。
ReactiveQueryByExampleExecutor接口可以使用 QBE 语言执行查询。
ReactiveSortingRepository 接口扩展了 ReactiveCrudRepository 接口;并添加了 findAll 方法,该方法能对请求查询结果进行排序。
ReactiveCrudRepository声明了用于保存、查找和删除实体的方法。
- Mono
save(T entity)方法保存 entity,然后返回所保存的实体。保存操作可能更改整个实体对象。 - Mono
findById(ID id)操作实体的 id 并返回包装在 Mono 中的结果。 - findAllById方法有两个重载方法,其中一个重载方法以 Iterable
集合的形式消费 ID,另一个则采用Publisher 的形式。 - ReactiveCrudRepository 和 CrudRepository 之间唯一值得注意的区别在于ReactiveCrudRepository 没有分页且不能进行事务操作。
-
分页支持
Spring Data 故意省略分页,因为同步存储库中使用的实现方案不适合响应式。
但是,通过将 Pageable 对象传递到存储库,仍然可以获取数据块,如下所示:
pubic interface ReactiveBookRepository extends ReactiveSortingRepository<Book, Long> { Flux<Book> findByAuthor(String author, Pageable pageable); }
Flux<Book> result = reactiveBookRepository.findByAuthor("Andy Weir",PageRequest.of(1, 5));
-
ReactiveMongoRepository实现细节
只有一个实现类SimpleReactiveMongoRepository
5.2 响应式事务
对于同步处理的情况,事务对象通常保存在 ThreadLocal 容器中。 但是 ThreadLocal 不适合用于响应式处理方式,因为用户无法控制线程切换。事务需要将底层资源绑定到物化数据流。 在 Project Reactor 中,可以通过 Reactor 上下文来实现这一目标。
-
基于 MongoDB 4 的响应式事务
MongoDB 从 4.0 版开始支持多文档事务(multi-document transactions)。 Spring Data 没有任何在服务或存储库级别应用响应式事务的功能。但是,可以使用ReactiveMongoOperations 级别(由 ReactiveMongoTemplate 实现)的事务进行操作。 首先,多文档事务是 MongoDB 的一项新功能。它仅适用于使用 WiredTiger 存储引擎的非分片副本集。在 MongoDB 4.0 中,没有其他配置支持多文档事务。 其次,某些 MongoDB 功能在事务中不可用,如,发出元命令和创建集合或索引都是不可能的。同时,隐式创建集合在事务中不起作用。因此,需要设置所需的数据库结构以防止错误。
假设必须实现一种用于在用户账户之间转账的钱包服务。每个用户都有自己的账户,且账户余额非负。用户可以将任意金额转给其他用户,但只有在账户中有足够资金时转账才会成功。 转账可以并行发生,但在转账时,系统中的资金既不能增加,也不能减少。因此,汇款人钱包的取款操作和收款人钱包的存款操作必须同时且原子地进行。此时可以使用多文档事务。
要将一笔款项从账户 A 转账到账户 B,应该执行以下操作:
-
启动新事务。
-
加载账户 A 的钱包。
-
加载账户 B 的钱包。
-
检查账户 A 的钱包中是否有足够的资金。
-
提取转账金额并计算账户 A 的新余额。
-
存入转账金额并计算账户 B 的新余额。
-
保存账户 A 的钱包。
-
保存账户 B 的钱包。
-
提交事务。
由于目前没有mongo节点,示例代码以后再加
-
5.3 Spring Data响应式连接器
Spring Data为 4 个 NoSQL 数据库准备了数据库连接器,即 MongoDB、Cassandra、Couchbase和 Redis。
Spring Data 也可能支持其他数据存储,特别是那些利用 Spring WebFlux WebClient 基于 HTTP 进行通信的数据存储
5.4 响应式关系型数据库连接
响应式关系型数据库连接(Reactive Relational Database Connectivity,R2DBC)是一项探索完全响应式数据库 API 的倡议。 Spring Data 团队领导 R2DBC 倡议,并使用它在响应式应用程序内的响应式数据访问环境中探测和验证想法。 R2DBC 在 Spring OnePlatform 2018 会议上被公开,其目标是定义具有背压支持的响应式数据库访问 API。Spring Data 团队在响应式 NoSQL 持久化方面获得了一些先进经验,因此决定提出对真正响应式语言级数据访问 API的愿景。
R2DBC 项目包括以下部分。
- R2DBC 服务提供程序接口(Service Provider Interface,SPI)定义了实现驱动程序的简约 API,便于彻底减少驱动程序实现者必须遵守的 API。SPI 不适合在应用程序代码中直接使用,需要专用的客户端库。
- R2DBC 客户端提供了人性化的 API 和帮助类,可将用户请求转换为 SPI 级别。R2DBC 客户端对R2DBC SPI 的作用与 Jdbi 库对 JDBC 的作用相同。
- R2DBC PostgreSQL 实现为 PostgreSQL 提供了 R2DBC 驱动程序。使用 Netty 框架通过 PostgreSQL 连接协议进行异步通信。背压既可以通过 TCP 流控制,也可以通过被称为门户(portal)的 PostgreSQL 特性来实现,后者实际上是一个查询内的光标。门户能完美地转换为响应式流。
并非所有关系型数据库都具有正确背压传播所需的连接协议功能。但 TCP 流控制在所有情况下都可用。
5.5 Spring Data R2DBC集成mysql
示例代码:demo-5-1
5.6 Spring Web Flux 集成mongodb
示例代码:demo-5-2
5.7 Spring Web Flux集成 Redis
示例代码:demo-5-3
5.8 rxjava2-jdbc库
用的少,不做示例………………
5.9 将同步CrudRepositorty包装为响应式
@Component
public class RxBookRepository extends ReactiveCrudRepositoryAdapter<Book, Integer, BookJpaRepository> {
...
}
六、测试响应式应用程序
6.1 使用StepVerifier测试响应式流
出于测试目的,Reactor 提供了额外的 reactor-test 模块,该模块提供了 StepVerifier。StepVerifier 提供了一个流式 API,用于为任何 Publisher 构建验证流程。
6.2 测试WebFlux
使用 WebTestClient 测试控制器
@Test
public void verifyRespondWithExpectedPayments() {
PaymentService paymentService = Mockito.mock(PaymentService.class);
PaymentController controller = new PaymentController(paymentService);
prepareMockResponse(paymentService);
WebTestClient
.bindToController(controller)
.build()
.get()
.uri("/payments/")
.exchange()
.expectHeader()
.contentTypeCompatibleWith(APPLICATION_JSON)
.expectStatus()
.is2xxSuccessful()
.returnResult(Payment.class)
.getResponseBody()
.as(StepVerifier::create)
.expectNextCount(5)
.expectComplete()
.verify();
}