假设你已经阅读了RxJava的相关内容并体验了《RxJava入门:实例解析》中的实例,现在你打算在自己的代码中探索响应式编程。但是,现在卡在如何测试代码库中可能发现的新功能上?下面我们将探讨如何测试RxJava代码。本文要点:RxJava有内置的、测试友好的解决方案。使用TestSubscriber来验证Observable。使用TestScheduler可以实现对时间的严格控制。Awaitility库提供了对测试环境的进一步控制。使用反应式编程,我们必须改变我们对给定问题的推理方式,因为我们将重点放在作为事件流而不是单个数据项的流动数据上。事件通常由不同的线程产生和消费,因此在编写测试时必须清楚地了解并发问题。幸运的是,RxJava为测试Observables和Subscriptions提供了内置支持,并且直接内置到RxJava的核心依赖项中。第一步让我们回顾一下《RxJava入门实例分析》一文给出的词汇示例,看看如何测试示例。让我们从基本的测试工具设置开始。在我们的测试架构中,JUnit被用作测试工具。事实上,如果没有给定的调度器(Scheduler),Subscription会默认运行在调用线程上。所以我们会在***测试中使用native方法。这意味着我们可以实现订阅接口的对象,并在订阅发生后立即断言其状态。请注意,此处使用显式List容器来累积实际订阅者的结果。给定测试的简单性可能会让您认为这种显式累加器方法已经足够好了。但请记住,生产级Observables可能会封装错误或可能会产生意外事件。示例中Subscriber和accumulator的简单组合不足以涵盖这种情况。不过不用担心,RxJava提供的TestSubscriber类型就是用来处理这种情况的。下面我们使用TestSubscriber类型重构上述测试。TestSubscriber不仅取代了用户累加器,还提供了一些额外的行为。例如,它可以给出接收到的消息的大小和与每个事件关联的数据,它还可以断言订阅的状态在Observable消费期间没有错误地完成。虽然当前测试中的Observable并没有产生任何错误,但是回到《RxJava入门实例分析》一文,我们了解到Observable是把异常当成数据事件来处理的。我们可以通过以下方式连接异常事件来模拟错误:在我们给出的有限用例中,所有机制都运行良好。但实际产品代码可能与示例完全不同。所以在下面我们将考虑一些更复杂的产品示例。自定义调度器(Scheduler)在产品代码中,很多用例中的Observable是在特定的线程上执行的,在反应式编程环境中被称为“调度器(Scheduler)”。许多Observable操作将可选的调度程序参数作为附加参数。RxJava定义了一系列随时可用的命名调度器,包括IO调度器(io)、计算调度器(computation,forsharedthreads)和新线程调度器(newThread)。开发人员还可以实现自己的自定义调度程序。让我们通过指定计算调度程序来修改Observable的代码。这段代码运行起来立马发现有问题。Subscriber在测试线程上执行它的断言,但是Observable在后台线程(计算线程)上产生值。这意味着执行Subscriber断言可能会在Observable之前生成所有相关事件,从而导致测试失败。为了让测试顺利进行,有一些策略可以选择:ConvertObservabletoblocking。强制测试等待,直到满足给定条件。将计算调度程序转换为即时(Schedulers.immediate())调度程序。我们将扩展每个策略,但我们将从“将Observables转换为阻塞”开始,因为实施此策略需要最少的技术工作,与使用的调度程序无关。我们假设数据是在后台线程中生成的,这将导致订阅者从同一个后台线程中得到通知。我们要做的是强制生成所有事件,并在执行下一条语句之前完成测试中的Observable。这是通过调用Observable本身的toBlocking()方法来实现的。虽然这种方法适用于我们展示的简单代码,但它可能不适用于实际生产代码。如果生产者需要很长时间才能生成所有数据,会发生什么情况?这将使测试非常缓慢,增加编译时间,并可能产生其他性能问题。这里我推荐一个方便的库,Awaitility(https://github.com/awaitility/awaitility)。简单地说,Awaitility是一种DSL,它以一种精确、易于阅读的方式表达与异步系统相关的期望。可以使用Maven在项目中添加Awaitility依赖。org.awaitilityawaitility2.0.0test或者使用Gradle:testCompile'org.awaitility:awaitility:2.0.0'AwaitilityDSL的入口点是org.awaitility.Awaitility.await()方法(参见下面示例中的第13行和第14行)。您可以使用Awaitility来定义测试继续必须满足的条件,您还可以为条件添加超时或其他时序约束,例如最小值、最大值或连续范围。对于上面的例子,下面的代码展示了如何在结果中使用Awaitility:这个版本的测试没有以任何方式改变Observable的性质,这让你可以在不对生产代码做任何更改的情况下进行测试。这个版本通过检查订阅者状态来测试让Observable执行它的工作,等待时间最长为2秒。如果一切顺利,Subscriber状态将在2秒内释放给所有9个事件。Awaitility与Hamcrest的匹配器、Java8的lambda表达式和方法引用等有很好的配合,从而给出了精确可读的测试条件。Awaitility还为广泛使用的JVM语言(包括Groovy和Scala)提供预构建的扩展。我们将介绍使用RxJava扩展机制的最后一种策略,该机制作为RxJavaAPI的一部分发布。RxJava中定义了一系列扩展点,允许微调几乎所有默认的RxJava行为。这种扩展机制允许我们为特定的RxJava特性提供修改值。使用这种机制,我们可以在测试中注入选定的调度器,而无需关心生成代码中指定的调度器。这正是我们正在寻找的方法,封装在RxJavaHooks类中。假设生产代码依赖于计算调度程序,我们覆盖它的默认值,返回一个调度程序作为被调用的代码来进行事件处理,它是即时调度程序(Schedulers.immediate())。测试代码如下:在测试中,生产代码不知道计算调度程序是即时的。请注意,必须重置挂钩函数,否则立即调度程序的设置可能会泄漏,从而导致到处都是测试中断。使用try/finall块在某种程度上模糊了测试的目的,但幸运的是,我们可以使用JUnit规则重构此行为,使测试更简洁,结果更易读。下面给出了使用上述规则的一种可能的实现代码:另外,我们重写了另外两个调度器的生成方法。这条规则对于以后的其他测试目标更通用。在新的测试用例类中,这个规则的使用很简单,只需定义一个字段并将新类型标记为@Rule。示例代码如下:最后我们可以得到和前面测试一样的行为,但是没有前面测试那样的乱七八糟。这里有一些空间来回顾我们到目前为止所做的事情:订阅者将在同一个线程中处理数据,只要没有使用特定的调度程序。这意味着当Subscriber订阅了Observable之后,我们就可以对Subscriber进行断言了。TestSubscriber可以累积事件并给出关于其自身状态的额外断言。任何Observable都可以转换为阻塞,这允许我们等待同步生成的事件,无论Observable使用什么调度程序。RxJava提供了一种扩展机制,允许开发人员覆盖其默认方法,并以适当的方式将它们注入到生产代码中。可以使用AwaitilityDSL测试并发代码。上面的每一种技术都在不同的上下文中工作,但它们都通过“公共线程”思想)关联工作:测试代码需要等待Observable完成,然后才能对订阅者的状态进行断言。考虑到Observable的行为会产生数据,有没有办法检查这个行为?换句话说,是否可以通过编程方式对Observable进行实时调试?我们稍后会给出这样的技术。以黑盒方式。接下来我们将考虑另一种时间操纵技术,它允许我们在Observable仍处于活动状态时打开引擎盖查看订阅者状态。换句话说,我们将使用TestScheduler类白盒测试技术,使用RxJava,可以说是RxJava又来救场了。这个特殊的调度器精确地设置了内部如何使用时间,比如提前半秒,或者跳跃时间5秒。我们将首先展示如何创建这样一个新的调度程序实例,然后讨论代码的测试。“产品”代码略有变化,因为我们没有生成计数范围,而是使用绑定到调度程序间隔(interval())的方法来生成计数(第6行)。但这有副作用,即计数是从零而不是从一生成的。一旦配置了Observable和测试调度程序,我们立即断言Subscriber被假定为没有值(第15行)并且没有完成或产生任何错误(第16行)。这是一个健全性测试,因为此时调度器还没有移动,所以Observable没有产生任何值,订阅者也没有接收到任何值。下面将时间提前1整秒(第19行),这将导致Observable产生第一个值,这是后续断言集检查的内容(第22-24行)。现在将时间从当前时间调整为9秒。需要注意的是,这里的意思是在scheduler启动后正好9秒调整时间(不是向前1秒再向前9秒,也就是schedulercheck开始后的第10秒)。换句话说,advanceTimeBy()方法调整调度程序相对于当前位置的时间,而advanceTimeTo()以绝对方式调整时间。之后我们进行下一轮断言(第28到20行)以确保所有数据都由Observable生成并由Subscriber消费。另一个需要注意的是,当使用TestScheduler时,realtime是立即调整的,这意味着测试实际上不必等待9秒才能完成。可以看到,这个调度器使用起来非常方便,只需要将调度器提供给被测Observable即可。但是对于使用指定类型调度器的Observables,调度器并不能很好地工作。但是等一下,之前我们看到了如何使用RxJavaHooks切换一个不影响生产代码的调度器,这次提供了一个TestScheduler来代替即时调度器(第13到15行)。我们甚至可以应用相同的自定义JUnit规则的技术,以便以更可重用的方式重写以前的代码。首先,新规则是:接下来是实际的测试代码(在新的测试用例类中),使用我们的测试规则:您已经成功实施了它。使用通过RxJavaHooks注入TestScheduler的方法,可以在不改变原始Observable组成的情况下编写测试代码,而且它提供了一种在observable本身执行期间更改时间并在特定点进行断言的方法。本文中介绍的所有这些技术对于您选择用来测试RxJava的代码应该足够了。FutureRxJava是最早为Java提供响应式编程功能的库之一。即将推出的2.0版本将进行重新设计,使RxJavaAPI更加符合ReactiveStreams规范。针对Java和JavaScript运行时,ReactiveStreams规范提供了使用非阻塞背压进行异步流处理的标准。这意味着在下一个版本的RxJava中会有一些API改进。有关这些改进的详细描述,请参阅RxJavawiki。对于测试,这些核心类型(Observable、Maybe和Single)现在提供了一种方便且易于使用的test()方法来当场创建TestSubscriber实例。也可以在TestSubscriber上链接方法调用,并且对于这种用法也有一些新的断言方法。添加一名作者