BT

你的观点很重要! 快来参与InfoQ调研吧!

Reactor实例解析

| 作者 Simon Basle  他的粉丝 ,译者 冬雨 关注 0 他的粉丝 发布于 2016年12月20日. 估计阅读时间: 60 分钟 | ArchSummit社交架构图谱:Facebook、Snapchat、Tumblr等背后的核心技术

要点

  • Reactor是一个运行在Java8之上的响应式流框架,它提供了一组响应式风格的API
  • 除了个别API上的区别,它的原理跟RxJava很相似
  • 它是第四代响应式框架,支持操作融合,类似RxJava 2
  • Spring 5的响应式编程模型主要依赖Reactor

RxJava回顾

Reactor是第四代响应式框架,跟RxJava 2有些相似。Reactor项目由Pivotal启动,以响应式流规范、Java8和ReactiveX术语表为基础。它的设计是Reactor 2(上一个主要版本)和RxJava核心贡献者共同努力的结果。

在之前的同系列文章“RxJava实例解析”和“测试RxJava”里,我们已经了解了响应式编程的基础:数据流的概念、Observable类和它的各种操作以及通过工厂方法创建静态和动态的Observable对象。

Observable是事件的源头,Observer提供了一组简单的接口,并通过订阅事件源来消费Observable的事件。Observable通过onNext向Observer通知事件的到达,后面可能会跟上onError或onComplete来表示事件的结束。

RxJava提供了TestSubscriber来测试Observable,TestSubscriber是一个特别的Observer,可以用它断言流事件。

在这篇文章里,我们将会对Reactor和RxJava进行比较,包括它们的相同点和不同点。

Reactor的类型

Reactor有两种类型,Flux<T>Mono<T>。Flux类似RaxJava的Observable,它可以触发零到多个事件,并根据实际情况结束处理或触发错误。

Mono最多只触发一个事件,它跟RxJava的SingleMaybe类似,所以可以把Mono<Void>用于在异步任务完成时发出通知。

因为这两种类型之间的简单区别,我们可以很容易地区分响应式API的类型:从返回的类型我们就可以知道一个方法会“发射并忘记”或“请求并等待”(Mono),还是在处理一个包含多个数据项的流(Flux)。

Flux和Mono的一些操作利用了这个特点在这两种类型间互相转换。例如,调用Flux<T>的single()方法将返回一个Mono<T>,而使用concatWith()方法把两个Mono串在一起就可以得到一个Flux。类似地,有些操作对Mono来说毫无意义(例如take(n)会得到n>1的结果),而有些操作只有作用在Mono上才有意义(例如or(otherMono))。

Reactor设计的原则之一是要保持API的精简,而对这两种响应式类型的分离,是表现力与API易用性之间的折中。

“使用响应式流,基于Rx构建”

正如“RxJava实例解析”里所说的,从设计概念方面来看,RxJava有点类似Java 8 Steams API。而Reactor看起来有点像RxJava,不过这决不只是个巧合。这样的设计是为了能够给复杂的异步逻辑提供一套原生的具有Rx操作风格的响应式流API。所以说Reactor扎根于响应式流,同时在API方面尽可能地与RxJava靠拢。

响应式类库和响应式流的使用

Reactive Streams(以下简称为RS)是“一种规范,它为基于非阻塞回压的异步流处理提供了标准”。它是一组包含了TCK工具套件和四个简单接口(Publisher、Subscriber、Subscription和Processor)的规范,这些接口将被集成到Java 9.

RS主要跟响应式回压(稍后会详细介绍)以及多个响应式事件源之间的交互操作有关。它并不提供任何操作方法,它只关注流的生命周期。

Reactor不同于其它框架的最关键一点就是RS。Flux和Mono这两者都是RS的Publisher实现,它们都具备了响应式回压的特点。

在RxJava 1里,只有少部分操作支持回压,RxJava 1的Observable并没有实现RS里的任何类型,不过它有一些RS类型的适配器。可以说,RxJava 1实际上比RS规范出现得更早,而且在RS规范设计期间,RxJava 1充当了函数式工作者的角色。

所以,你在使用那些Publisher适配器时,它们并不会为你提供任何操作。为了能做一些有用的操作,你可能需要用回Observable,而这个时候你需要另一个适配器。这种视觉上的混乱会破坏代码的可读性,特别是像Spring 5这样的框架,如果整个框架建立在这样的Publisher之上,那么就更是杂乱不堪。

RS规范不支持null值,所以在从RxJava 1迁移到Reactor或RxJava 2时要注意这点。如果你在代码里把null用作特殊用途,那么就更是要注意了。

RxJava 2是在RS规范之后出现的,所以它直接在Flowable类型里实现了Publisher。不过除了RS类型,RxJava 2还保留了RxJava 1的“遗留”类型(Observable、Completable和Single)并且引入了其它一些可选类型——Maybe。这些类型提供了不同的语义,不过它们并没有实现RS接口,这是它们的不足之处。跟RxJava 1不一样,RxJava 2的Observable不支持RxJava 2的回压协议(只有Flowable具备这个特性)。之所以这样设计是为了能够为一些场景提供一组丰富且流畅的API,比如用户界面发出的事件,在这样的场景里是不需要用到回压的,而且也不可能用到。Completable、Single和Maybe不需要支持回压,不过它们也提供了一组丰富的API,而且在被订阅之前不会做任何事情。

在响应式领域,Reactor变得愈加精益,它的Mono和Flux两种类型都实现了Publisher,并且都支持回压。虽然把Mono作为一个Publisher需要付出一些额外的开销,不过Mono在其它方面的优势弥补了它的缺点。在后续部分我们将看到对Mono来说回压意味着什么。

相比RxJava,API相似但不相同

ReactiveX和RxJava的操作术语表有时候真的难以掌握,因为历史原因,有些操作的名字让人感到困惑。Reactor尽量把API设计得紧凑,在给API取名时尽量选择好一点的名字,不过总的来说,这两套API看起来还是很相像。在最新的RxJava 2迭代版本中,RxJava 2借鉴了Reactor的一些术语,这预示着这两个项目之间可能会有越来越紧密的合作。一些操作和概念总是先出现在其中的一个项目里,然后互相借鉴,最后会同时渗透到两个项目里。

例如,Flux也有常见的just工厂方法(虽然只有两种变形:接受一个参数或变长参数)。不过from方法有很多个变种,最值得一提的是fromIterable。当然,Flux也包含了那些常规的操作:map、merge、concat、flatMap、take,等等。

Reactor把RxJava里令人困惑的amb操作改成了看起来更加中肯的firstEmitting。另外,为了保持API的一致,toList被重新命名为collectList。实际上,所有以collect开头的操作都会把值聚合到一个特定类型的集合里,不过只会为每个集合生成一个Mono。而所有以to开头的操作被保留用于类型转换,转换之后的类型可以用于非响应式编程,例如toFuture()。

在类初始化和资源使用方面,Reactor之所以也能表现得如此精益,要得益于它的融合特性:Reactor可以把多个串行的操作(例如调用concatWith两次)合并成单个操作,这样就可以只对这个操作的内部类做一次初始化(也就是macro-fusion)。这个特性包含了基于数据源的优化,抵消了Mono在实现Publisher时的一些额外开销。它还能在多个相关的操作之间共享资源(也就是micro-fusion),比如内部队列。这些特性让Reactor成为不折不扣的的第四代响应式框架,不过这个超出了这篇文章的讨论范围。

下面让我们来看看几个Reactor的操作。

一些操作示例

(这一小节包含了一些代码片段,我们建议你动手去运行它们,深入体验一下Reactor。所以你需要打开IDE,并创建一个测试项目,把Reactor加入到依赖项里。)

对于Maven,可以把下面的依赖加到pom.xml里:

<dependency>
    <groupId>io.projectreactor</groupId>    
    <artifactId>reactor-core</artifactId>
    <version>3.0.3.RELEASE</version>
</dependency>

对于Gradle,要把Reactor作为依赖项,类似这样:

dependencies {
    compile "io.projectreactor:reactor-core:3.0.3.RELEASE"
}

我们来重写前面几篇同系列文章里的例子!

Observable的创建跟在RxJava里有点类似,在Reactor里可以使用just(T...)和fromIterator(Iterable<T>)工厂方法来创建。just方法会把List作为一个整体触发,而fromIterable会逐个触发List里的每个元素:

public class ReactorSnippets {
  private static List<String> words = Arrays.asList(
        "the",
        "quick",
        "brown",
        "fox",
        "jumped",
        "over",
        "the",
        "lazy",
        "dog"
        );

  @Test
  public void simpleCreation() {
     Flux<String> fewWords = Flux.just("Hello", "World");
     Flux<String> manyWords = Flux.fromIterable(words);

     fewWords.subscribe(System.out::println);
     System.out.println();
     manyWords.subscribe(System.out::println);
  }
}

跟在RxJava里一样,上面的代码会打印出:

Hello
World
the
quick
brown
fox
jumped
over
the
lazy
dog

为了打印句子里的每一个字母,我们还需要flatMap方法(跟在RxJava里一样),不过在Reactor里我们使用fromArray来代替from。然后我们会用distinct过滤掉重复的字母,并用sort对它们进行排序。最后,我们使用zipWith和range输出每个字母的次序:

@Test
public void findingMissingLetter() {
  Flux<String> manyLetters = Flux
        .fromIterable(words)
        .flatMap(word -> Flux.fromArray(word.split("")))
        .distinct()
        .sort()
        .zipWith(Flux.range(1, Integer.MAX_VALUE),
              (string, count) -> String.format("%2d. %s", count, string));

  manyLetters.subscribe(System.out::println);
}

我们可以很容易地看到s被遗漏了:

1. a
2. b
...
18. r
19. t
20. u
...
25. z

我们可以通过纠正单词数组来修复这个问题,不过也可以使用concat/concatWith和一个Mono来手动往字母Flux里添加“s”:

@Test
public void restoringMissingLetter() {
  Mono<String> missing = Mono.just("s");
  Flux<String> allLetters = Flux
        .fromIterable(words)
        .flatMap(word -> Flux.fromArray(word.split("")))
        .concatWith(missing)
        .distinct()
        .sort()
        .zipWith(Flux.range(1, Integer.MAX_VALUE),
              (string, count) -> String.format("%2d. %s", count, string));

  allLetters.subscribe(System.out::println);
}

这样,在去重和排序后,遗漏的s字母就被添加进来了:

1. a
2. b
...
18. r
19. s
20. t
...
26. z

上一篇文章提到了Rx和Streams API之间的相似之处,而实际上,在数据就绪的时候,Reactor也会像Java Steams那样开始简单地推送数据事件(可以参看下面关于回压的内容)。只是在主线程里对事件源进行订阅无法完成更加复杂的异步操作,主要是因为在订阅完成之后,控制权会马上返回到主线程,并退出整个程序。例如:

@Test
public void shortCircuit() {
  Flux<String> helloPauseWorld = 
              Mono.just("Hello")
                  .concatWith(Mono.just("world")
                  .delaySubscriptionMillis(500));

  helloPauseWorld.subscribe(System.out::println);
}

这个单元测试会打印出“Hello”,但无法打印出“world”,因为程序会过早地退出。在做简单测试的时候,如果你只是像这样写一个简单的主类,你通常会掉入陷阱。作为补救,你可以创建一个CountDownLatch对象,并在Subscriber(包括onError和onComplete)里调用countDown方法。不过这样就变得不那么响应式了,不是吗?(万一你忘了调用countDown方法,而刚好发生错误了该怎么办?)

解决这个问题的第二种方法是使用一些操作转换到非响应式模式。toItetable和toStream会生成阻塞实例。我们在例子里使用toStream:

@Test
public void blocks() {
  Flux<String> helloPauseWorld = 
    Mono.just("Hello")
        .concatWith(Mono.just("world")
                        .delaySubscriptionMillis(500));

  helloPauseWorld.toStream()
                 .forEach(System.out::println);
}

正如你所期待的那样,在打印出“Hello”之后有一个短暂的停顿,然后打印出“world”并退出。我们之前也提过,RxJava的amb操作在Reactor里被重命名为firstEmitting(正如它的名字所表达的:选择第一个Flux来触发)。在下面的例子里,我们会创建一个Mono,这个Mono会有450毫秒的延迟,还会创建一个Flux,这个Flux以400毫秒的间隔触发事件。在使用firstEmitting()对它们进行合并时,因为Flux的第一个值先于Mono的值出现,所以最后Flux会被采用:

@Test
public void firstEmitting() {
  Mono<String> a = Mono.just("oops I'm late")
                       .delaySubscriptionMillis(450);
  Flux<String> b = Flux.just("let's get", "the party", "started")
                       .delayMillis(400);

  Flux.firstEmitting(a, b)
      .toIterable()
      .forEach(System.out::println);
}

这个单元测试会打印出句子的所有部分,它们之间有400毫秒的时间间隔。

这个时候你可能会想,如果我写的测试使用的是4000毫秒的间隔而不是400毫秒,那会怎样?你不会想在一个单元测试里等待4秒钟的!在后面的部分,我们会看到Reactor提供了一些测试工具可以很好地解决这个问题。

我们已经通过例子比较了Reactor的一些常用操作,现在我们要回头看看这个框架其它方面的不同点。

基于Java 8

Reactor选择Java 8作为运行基础而不是之前的任何版本,这再一次与它简化API的目标不谋而合:RxJava选择了Java 6,而Java 6里没有java.util.function包,RxJava也就无法利用这个包下面的Functino类和Consumer类,所以它必须创建很多类似Func1、Func2、Action0、Action1这样的类。RxJava 2使用类似Reactor 2的方式把这些类作为java.util.function的镜像,因为它还得支持Java 7。

Reactor API还使用了Java 8里新引入的一些类型。因为大部分基于时间的操作都跟时间段有关系(例如超时、时间间隔、延迟,等等),所以直接就使用了Java 8里的Duration类。

Java 8 Stream API和CompletableFuture跟Flux/Mono之间可以很容易地进行互相转换。那么一般情况下我们是否要把Stream转成Flux?不一定。虽然说Flux或Mono对IO和内存相关操作的封装所产生的开销微不足道,不过Stream本身也并不会带来很大延迟,所以直接使用Stream API是没有问题的。对于上述情况,在RxJava 2里需要使用Observable,因为Observable不支持回压,所以一旦对其进行订阅,它就成为事件推送的来源。Reactor是基于Java 8的,所以在大部分情况下,Stream API已经能够满足需求了。要注意的是,尽管Flux和Mono的工厂模式也支持简单类型,但它们的主要用途还是在于把对象合并到更高层次的流里面。所以一般来说,在现有代码上应用响应式模式时,你不会希望把“long getCount()”这样的方法转成“Mono<Long> getCount()”。

关于回压

回压是RS规范和Reactor主要关注点之一(如果还有其它关注点的话)。回压的原理是说,在一个推送场景里,生产者的生产速度比消费者的消费速度快,消费者会向生产者发出信号说“嘿,慢一点,我处理不过来了。”生产者可以借机控制数据生成的速度,而不是抛弃数据或者冒着产生级联错误的风险继续生成数据。

你也许会想,在Mono里为什么也需要回压:什么样的消费者会被一个单独的触发事件压垮?答案是“应该不会有这样的消费者”。不过,在Mono和CompletableFuture工作原理之间仍然有一个关键的不同点。后者只有推送:如果你持有一个Future的引用,那么说明一个异步任务已经在执行了。另一方面,回压的Flux或Mono会启动延迟的拉取-推送迭代:

  1. 延迟是因为在调用subscribe()方法之前不会发生任何事情
  2. 拉取是因为在订阅和发出请求时,Subscriber会向上游发出信号,准备拉取下一个数据块
  3. 接下来生产者向消费者推送数据,这些数据在消费者的请求范围之内

对Mono来说,subscribe()方法就相当于一个按钮,按下它就等于说“我准备好接收数据了”。Flux也有一个类似的按钮,不过它是request(n)方法,这个方法是subscribe()的一般化用法。

Mono作为一个Publisher,它往往代表着一个耗费资源的任务(在IO、延迟等方面),意识到这点是理解回压的关键:如果不对其进行订阅,你就不需要为之付出任何代价。因为Mono经常跟具有回压的Flux一起被编排到一个响应式链上,来自多个异步数据源的结果有可能被组合到一起,这种按需触发的能力是避免阻塞的关键。

我们可以使用回压来区分Mono的不同使用场景,相比上述的例子,Mono有另外一个常见的使用场景:把Flux的数据异步地聚合到Mono里。reduce和hasElement可以消费Flux里的每一个元素,再把这些数据以某种形式聚合起来(分别是reduce函数的调用结果和一个boolean值),作为一个Mono对外暴露数据。在这种情况下,使用Long.MAX_VALUE向上游发出回压信号,上游会以完全推送的方式工作。

关于回压另一个有意思的话题是它如何对存储在内存里的流的对象数量进行限制。作为一个Publisher,数据源很有可能出现生成数据缓慢的问题,而来自下游的请求超出了可用数据项。在这种情况下,整个流很自然地进入到推送模式,消费者会在有新数据到达时收到通知。当生产高峰来临,或者在生产速度加快的情况下,整个流又回到了拉取模式。在以上两种情况下,最多有N项数据(request()请求的数据量)会被保留在内存里。

你可以对内存的使用情况进行更精确的推算,把N项数据跟每项数据需要消耗的内存W结合起来:这样你就可以推算出最多需要消耗W*N的内存。实际上,Reactor在大多数情况下会根据N来做出优化:根据情况创建内部队列,并应用预取策略,每次自动请求75%的数据量。

Reactor的操作有时候会根据它们所代表的语义和调用者的期望来改变回压信号。例如对于操作buffer(10):下游请求N项数据,而这个操作会向上游请求10N的数据量,这样就可以填满缓冲区,为订阅者提供足够的数据。这通常被称为“主动式回压”,开发人员可以充分利用这种特性,例如在微批次场景里,可以显式地告诉Reactor该如何从一个输入源切换到一个输出地。

跟Spring的关系

Reactor是Spring整个生态系统的基础,特别是Spring 5(通过Spring Web Reactive)和Spring Data “kay”(跟spring-data-commons 2.0相对应的)。

这两个项目的响应式版本是非常有用的,我们因此可以开发出完全响应式的Web应用:异步地处理请求,一直到数据库,最后异步地返回结果。Spring应用因此可以更有效地利用资源,避免为每个请求单独分配一个线程,还要等待I/O阻塞。

Reactor将被用于未来Spring应用的内部响应式核心组件,以及这些Spring组件暴露出来的API。一般情况下,它们可以处理RS Publisher,不过大多数时候它们要面对的是Flux/Mono,需要用到Reactor的丰富特性。当然,你也可以自行选择其它响应式框架,Reactor提供了可以用来适配其它Reactor类型和RxJava类型甚至简单的RS类型的钩子接口。

目前,你可以通过Spring Boot 2.0.0.BUILD-SNAPSHOT和spring-boot-starter-web-reactive依赖项(可以在start.spring.io上生成一个这样的项目)来体验Spring Web Reactive:

<dependency>
  <groupId>org.springframework.boot.experimental</groupId>
  <artifactId>spring-boot-starter-web-reactive</artifactId>
</dependency>

你可以像往常那样写你的@Controller,只不过把Spring MVC的底层变成响应式的,把大部分Spring MVC的契约换成了响应式非阻塞的契约。响应式层默认运行在Tomcat 8.5上,你也可以选择使用Undertow或Netty。

另外,虽然Spring API是以Reactor类型为基础的,不过在Spring Web Reactive模块里可以为请求和响应使用各种各样的响应式类型:

  • Mono<T>:作为@RequestBody,请求实体T会被异步反序列化,之后的处理可以跟Mono关联起来。作为返回类型,每次Mono发出了一个值,T就会被异步序列化并发回客户端。你可以把请求Mono作为参数,并把参数化了的关联处理作为结果Mono返回。
  • Flux<T>:在流场景里使用(作为@RequestBody使用的输入流以及包含了Flux返回类型的Server Sent Events)。
  • Single/Observable:分别对应Mono和Flux,不过会切换回RxJava。
  • Mono作为返回类型:在Mono结束时请求的处理也跟着完成。
  • 非响应式返回类型(void和T):这个时候你的@Controller方法是同步的,不过它应该是非阻塞的(短暂的处理)。请求处理在方法执行完毕时结束,返回的T被异步地序列化并发回客户端。

下面是使用Spring Web Reactive的例子:

@Controller
public class ExampleController {

  private final MyReactiveLibrary reactiveLibrary;

  public ExampleController(@Autowired MyReactiveLibrary reactiveLibrary) {
     this.reactiveLibrary = reactiveLibrary;
  }

  @RequestMapping("hello/{who}")
  @ResponseBody
  public Mono<String> hello(@PathVariable String who) {
       return Mono.just(who)
             .map(w -> "Hello " + w + "!");
}

  @RequestMapping(value = "heyMister", method = RequestMethod.POST)
  @ResponseBody
  public Flux<String> hey(@RequestBody Mono<Sir> body) {
     return Mono.just("Hey mister ")
           .concatWith(body
                 .flatMap(sir -> Flux.fromArray(sir.getLastName().split("")))
                 .map(String::toUpperCase)
                 .take(1)
           ).concatWith(Mono.just(". how are you?"));
  }
}

第一个端点含有一个路径变量,它被转成Mono,并被映射到一个问候语里返回给客户端。

一个发到/hello/SImon的GET请求会得到“Hello Simon!”的文本响应。

第二个端点相对复杂一些:它异步地接收序列化Sir对象(一个包含了firstName和lastName属性的类)并使用flatMap方法把它映射到一个字母流里,这个字母流包含了lastName的所有字母。然后它选取流里的第一个字母,把它转成大写,并跟问候语串在一起。

所以向/heyMister POST一个JSON对象

{
    "firstName": "Paul",
    "lastName": "tEsT"
}

会返回字符串“Hello mister T. How are you?”。

响应式Spring Data目前也在开发当中,它被作为Kay发布的一部分,代码在spring-data-commons 2.0.x分支上。现在已经有一个里程碑版本可以使用:

<dependencyManagement>
  <dependencies>
     <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-releasetrain</artifactId>
        <version>Kay-M1</version>
        <scope>import</scope>
        <type>pom</type>
     </dependency>
  </dependencies>
</dependencyManagement>

然后简单地添加Spring Data Commons的依赖(它会自动从上面的BOM里获取版本号):

<dependency>
  <groupId>org.springframework.data</groupId>
  <artifactId>spring-data-commons</artifactId>
</dependency>

Spring Data对响应式的支持主要表现在新的ReactiveCrudRepository接口,它扩展了Repository。这个接口暴露了CRUD方法,使用的是Reactor类型的输入和返回值。还有一个RxJava 1的版本,叫作RxJava1CrudRepository。要在CrudRepository里通过id获取一个实体,可以调用“T findOne(ID id)”方法,而在ReactiveCrudRepository和RxJava1CrudRepository里要分别调用“Mono<T> findOne(ID id)”和“Observable<T> findOne(ID id)”。还有其它的变种,它们接收Mono/Single作为参数,异步地提供key,并在此基础上组合返回结果。

假设有一个响应式的后端存储(或者mock的ReactiveCrudRepository bean),下面的controller将从前到后都是响应式的:

@Controller
public class DataExampleController {

  private final ReactiveCrudRepository<Sir, String> reactiveRepository;

  public DataExampleController(
                 @Autowired ReactiveCrudRepository<Sir, String> repo) {
     this.reactiveRepository = repo;
  }

  @RequestMapping("data/{who}")
  @ResponseBody
  public Mono<ResponseEntity<Sir>> hello(@PathVariable String who) {
     return reactiveRepository.findOne(who)
                              .map(ResponseEntity::ok)
                              .defaultIfEmpty(ResponseEntity.status(404)
                                                            .body(null));
  }
}

请注意整个流程:我们异步地获取实体并用map把它包装成ResponseEntity,取得一个可以马上返回的Mono。如果Spring Data repository找不到这个key的数据,会返回一个空的Mono。我们使用defaultIfEmpty显式地返回404。

测试Reactor

“测试RxJava”这篇文章里提到了如何测试Observable。正如我们所看到的,RxJava提供了TestScheduler,我们可以把它跟RxJava的操作一起使用,这些操作接受一个Scheduler参数,TestScheduler会为这些操作启动虚拟的时钟。RxJava还提供了一个TestSubscriber类,可以用它等待Observable执行完毕,也可以用它对每个事件进行断言(onNext的值和它的数量、触发的onError,等等)。在RxJava 2里,TestSubscriber就是RS Subscriber,你可以用它测试Reactor的Flux和Mono!

在Reactor里,上述两个使用广泛的特性被组合到了StepVerifier类里。从reactor-addons仓库的reactor-test模块里可以获取到StepVerifier。在创建Publisher实例时,调用StepVerifier.create方法可以初始化一个StepVerifier。如果要使用虚拟时钟,可以调用StepVerifier.withVirtualTime方法,这个方法接受一个Supplier作为参数。之所以这样设计,是因为它会首先保证创建一个VirtualTimeScheduler对象,并把它作为默认的Scheduler传给旧有的操作。StepVerifier会对在Supplier里创建的Flux/Mono进行配置,把基于时间的操作转为“虚拟时间操作”。接下来你就可以编写各种你所期望的用例:下一个元素应该是什么,是否应该出现错误,是否应该及时向前移动,等等。借助其它方法,比如事件与Predicate的匹配或者对onNext事件的消费,你可以与那些值之间做一些更高级的交互(犹如在使用断言框架)。任何地方抛出的AssertionError都会在最终的结果里反应出来。最后,调用verify()对你的用例进行测试,这个方法会通过StepVerifier.create或StepVerifier.withVirtualTime方法对预定义的事件源进行订阅。

让我们来举一些简单的例子来说明StepVerifier时如何工作的。首先要添加依赖到POM里:

<dependency>
  <groupId>io.projectreactor.addons</groupId>
  <artifactId>reactor-test</artifactId>
  <version>3.0.3.RELEASE</version>
  <scope>test</scope>
</dependency>

<dependency>
  <groupId>org.assertj</groupId>
  <artifactId>assertj-core</artifactId>
  <version>3.5.2</version>
  <scope>test</scope>
</dependency>

假设你有一个叫作MyReactiveLibrary的类,你要对这个类生成的一些Flux进行测试:

@Component
public class MyReactiveLibrary {

  public Flux<String> alphabet5(char from) {
     return Flux.range((int) from, 5)
           .map(i -> "" + (char) i.intValue());
  }

  public Mono<String> withDelay(String value, int delaySeconds) {
     return Mono.just(value)
                .delaySubscription(Duration.ofSeconds(delaySeconds));
  }
}

第一个方法将返回给定字母之后的5个字母。第二个方法返回一个Flux,它会以给定的时间间隔触发给定值,其中的时间间隔以秒为单位。第一个测试是要保证使用x调用alphabet5的输出被限定在x、y、z。使用StepVerifier看起来是这样的:

@Test
public void testAlphabet5LimitsToZ() {
  MyReactiveLibrary library = new MyReactiveLibrary();
  StepVerifier.create(library.alphabet5('x'))
        .expectNext("x", "y", "z")
        .expectComplete()
        .verify();
}

第二个测试要保证alphabet5返回的每个值都是字母。在这里我们使用断言框架AssertJ

@Test
public void testAlphabet5LastItemIsAlphabeticalChar() {
  MyReactiveLibrary library = new MyReactiveLibrary();
  StepVerifier.create(library.alphabet5('x'))
              .consumeNextWith(c -> assertThat(c)
                    .as("first is alphabetic").matches("[a-z]"))
              .consumeNextWith(c -> assertThat(c)
                    .as("second is alphabetic").matches("[a-z]"))
              .consumeNextWith(c -> assertThat(c)
                    .as("third is alphabetic").matches("[a-z]"))
              .consumeNextWith(c -> assertThat(c)
                    .as("fourth is alphabetic").matches("[a-z]"))
              .expectComplete()
              .verify();
}

结果这些测试都运行失败。让我们检查一下StepVirifier的输出,看看能不能找出bug:

java.lang.AssertionError: expected: onComplete(); actual: onNext({)

java.lang.AssertionError: [fourth is alphabetic] 
Expecting:
 "{"
to match pattern:
 "[a-z]"

看起来我们的方法并没有在z的时候停住,而是继续发出ASCII字符。我们可以加入.take(Math.min(5,'z'-from+1))来修复这个bug,或者把Math.min作为range的第二个参数。

我们要做的最后一个测试需要用到虚拟时钟:我们使用withVirtualTime构造器来测试方法的延迟,而不需要真的等待指定的时间:

@Test
public void testWithDelay() {
  MyReactiveLibrary library = new MyReactiveLibrary();
  Duration testDuration =
     StepVerifier.withVirtualTime(() -> library.withDelay("foo", 30))
                 .expectSubscription()
                 .thenAwait(Duration.ofSeconds(10))
                 .expectNoEvent(Duration.ofSeconds(10))
                 .thenAwait(Duration.ofSeconds(10))
                 .expectNext("foo")
                 .expectComplete()
                 .verify();
  System.out.println(testDuration.toMillis() + "ms");
}

这个测试用例测试一个将被延迟30秒的Flux:在订阅之后的30秒内不会发生任何事情,然后发生一个onNext("foo")事件后结束。

System.out会打印出验证所需要的时间,在最近的一次测试中它用掉了8毫秒。

如果调用构造器的create方法,thenAwait和expectNoEvent方法仍然可以使用,不过它们会阻塞指定的时间。

StepVerifier还有其它很多方法可以用于对Publisher进行测试(如果你有其它的想法,欢迎加入或反馈到github仓库)。

自定义动态源

在“RxJava实例解析”一文中提到的动态和静态Observable对Reactor来说也是适用的。

如果你要创建一个自定义的Flux,需要使用Reactor的FluxSink。这个类将会为你考虑所有跟异步有关的情况,你只需要把注意力集中在触发事件上。

使用Flux.create并从回调中获得的FluxSink可以用于后续的触发事件。这个自定义的Flux是静态的,为了把它变成动态的,可以使用publish()和connect()方法。基于上一篇文章中的例子,我们几乎可以把它逐字逐句地翻译成Reactor的版本:

SomeFeed<PriceTick> feed = new SomeFeed<>();
Flux<PriceTick> flux =
     Flux.create(emitter ->
     {
        SomeListener listener = new SomeListener() {
           @Override
           public void priceTick(PriceTick event) {
              emitter.next(event);
              if (event.isLast()) {
                 emitter.complete();
              }
           }

           @Override
           public void error(Throwable e) {
              emitter.error(e);
           }};
        feed.register(listener);
     }, FluxSink.OverflowStrategy.BUFFER);

ConnectableFlux<PriceTick> hot = flux.publish();

在连接到动态Flux之前,可以做两次订阅:一个订阅将打印每个tick的细节,另一个订阅会打印出instrument:

hot.subscribe(priceTick -> System.out.printf("%s %4s %6.2f%n", priceTick
     .getDate(), priceTick.getInstrument(), priceTick.getPrice()));

hot.subscribe(priceTick -> System.out.println(priceTick.getInstrument()));

接下来我们连接到动态Flux,并在程序结束前让它运行5秒钟:

hot.connect();
Thread.sleep(5000);

(要注意,如果PriceTick的isLast()方法改变了,那么feed本身也会结束)。

FluxSink通过isCancelled()来检查下游的订阅是否已取消。你还可以通过requestedFromDownstream()来获得请求数,这个在遵循回压策略时很管用。最后,你可以通过setCancellation方法释放所有使用过的资源。

要注意,FluxSink使用了回压,所以你必须提供一个OverflowStrategy来显式地处理回压。这个等价于使用onBackpressureXXX操作(例如,FluxSink.OverflowStrategy.BUFFER等价于.onBackpressureBuffer()),它们会覆盖来自下游的回压信号。

结论

在这篇文章里,我们学习了Reactor,一个运行在Java 8之上并以Rx规范和Reactive Streams规范为基础的第四代响应式框架。我们展示了RxJava中的设计理念是如何被应用在Reactor上的,尽管它们之间有一些API设计上的差别。我们还展示了Reactor如何成为Spring 5的基础,还提供了一些跟测试Publisher、Flux和Mono有关的资源。

如果你想再深入了解Reactor,可以访问github 仓库,本文中的代码也可以在github上找到。研讨会“轻量级Rx API上手”介绍了更多操作和用例。

最后,你可以在Gitter上联系Reactor团队,也可以通过github issues提供反馈(同时欢迎拉取代码)。

关于作者

Simon Basle是一个狂热的软件开发者,特别对响应式编程、软件设计(OOP、设计模式、软件架构)、富客户端、代码以外的东西(持续集成、(D)VCS、最佳实践)都很感兴趣,对CompSci和并发编程也有所涉猎。他目前在Pivotal的Reactor团队工作。

查看英文原文:Reactor by Example

评价本文

专业度
风格

您好,朋友!

您需要 注册一个InfoQ账号 或者 才能进行评论。在您完成注册后还需要进行一些设置。

获得来自InfoQ的更多体验。

告诉我们您的想法

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我
社区评论

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我

讨论

登陆InfoQ,与你最关心的话题互动。


找回密码....

Follow

关注你最喜爱的话题和作者

快速浏览网站内你所感兴趣话题的精选内容。

Like

内容自由定制

选择想要阅读的主题和喜爱的作者定制自己的新闻源。

Notifications

获取更新

设置通知机制以获取内容更新对您而言是否重要

BT