BT

如何利用碎片时间提升技术认知与能力? 点击获取答案

在Andoid中如何使用RxJava 2进行多线程编程?

| 作者 Aritra Roy 关注 0 他的粉丝 ,译者 张卫滨 关注 11 他的粉丝 发布于 2017年12月28日. 估计阅读时间: 34 分钟 | GMTC大前端的下一站,PWA、Web框架、Node等最新最热的大前端话题邀你一起共同探讨。

亲爱的读者:我们最近添加了一些个人消息定制功能,您只需选择感兴趣的技术主题,即可获取重要资讯的邮件和网页通知

本文最初发布于GO-JEK的博客站点,经原作者授权由InfoQ中文站翻译并分享。

如果你还没有接触RxJava或者刚刚开始使用它的话,那么你会发现始终会有很多新东西要学习。我们在GO-JEK的App中需要执行很多的异步操作,而且我们无法在UI的速度和流畅性上妥协

编写大量多线程的Android App是一件很困难和很有挑战的事情,因为在这个过程中有很多的部件需要进行处理。这连同一些其他的原因促使我们在Android App中大量使用了RxJava。

在本文中,我们将会讨论如何使用RxJava真正的多线程功能,它会让复杂的App开发过程再次变得简单、容易和有趣。本文中的所有代码样例都会关注RxJava,但是文中讨论的概念同样适用于其他的反应式扩展(Reactive Extension)

为何要进行反应式编程?

任何一篇关于RxJava的文章都会包含一个“为什么要进行反应式编程”的章节,我们也不打算破坏这个约定。在Android中采用反应式的方式构建App会带来多项收益,接下来我们讨论几项你真正值得关注的好处。

不再会有回调地狱

如果你已经做过一段时间的Android开发的话,你肯定会明白嵌套回调会快速地让所有事情失去控制。

如果你想按顺序执行一系列的异步操作并且下一步操作的行为要依赖于上一步操作的结果,那么你就会遇到这种状况。几乎瞬间,代码就会变得超级丑陋和难以管理。

简单的错误处理

在命令编程的世界中,如果你要执行大量复杂、异步的操作,错误可能会在任意的地方出现,为了处理这些场景,我们就需要将大量补丁式的代码到处放得到处都是,从而形成大量重复和繁琐的代码。

超级简单的多线程

我们都知道(私下也承认)在Java中编写多线程代码有多困难。在后台线程中执行一段代码并在UI线程中获取结果,这听起来似乎很简单,但实际上有很多复杂的情况需要处理。

通过使用RxJava,你可以非常容易地在任意线程中执行复杂的操作,它会维持适当的状态同步,并能够让你无缝地切换线程。

RxJava所带来的好处是无穷无尽的,就此可以谈论几个小时,但是现在我们要更深入地探索一下它为多线程编程所带来的真正威力。

默认情况下,RxJava并不是多线程的

是的,你没有看错。RxJava默认情况下跟多线程一点关系都没有。在官方网站上,RxJava是这样定义的:

借助observable序列,在Java VM上组合异步和基于事件的程序的库。

看到“异步”这个词,很多人就形成了一种误解,认为RxJava默认就是多线程的。的确,它支持多线程并且提供了一些强大的特性以便于我们执行异步操作,但是不能就此断定它的默认行为就是多线程的。

如果你多少了解一些RxJava的话,会知道它的基本构造为:

  • 源Observable
  • 一个或多个操作符(Operator)
  • 目标Subscriber
Observable.just(1, 2, 3, 4, 5)
          .doOnNext(new Consumer<Integer>() {
              @Override
              public void accept(Integer integer) throws Exception {
                  println("Emitting item on: " + currentThread().getName());
              }
          })
          .map(new Function<Integer, Integer>() {
              @Override
              public Integer apply(@NonNull Integer integer) throws Exception {
                  println("Processing item on: " + currentThread().getName());
                  return integer * 2;
              }
          })
          .subscribeWith(new DisposableObserver<Integer>() {
              @Override
              public void onNext(@NonNull Integer integer) {
                  println("Consuming item on: " + currentThread().getName());
              }

              @Override
              public void onError(@NonNull Throwable e) {
              }

              @Override
              public void onComplete() {
              }
          });

如果你运行上面的代码片段的话,你会发现所有的执行过程都在应用的主线程中执行(请关注打印出来的日志中的线程名)。这表明在默认情况下,RxJava是阻塞的。所有的过程都是在代码运行所在的线程上执行的。

额外的福利:想知道doOnNext()是什么吗?它只是一个副作用操作符,能够让你从observable链中脱离出来并执行一些不那么纯的操作,你可以阅读本文了解更多的信息。

让我们开始一些简单的多线程操作

如果我们想要在Android中使用RxJava做一些基本的多线程操作,需要做的就是熟悉SchedulersobserveOn/subscribeOn 操作符,这样的话,就可以开始了。

现在,看一个最简单的多线程用例。假设,我们想要通过网络获取一个Book列表,并且要在应用的UI线程中展现这个列表,我们就采用这个简单直接的用例作为开始。

getBooks().subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread())
          .subscribeWith(new DisposableObserver<Book>() {
              @Override
              public void onNext(@NonNull Book book) {
                  // You can access your Book objects here
              }
              @Override
              public void onError(@NonNull Throwable e) {
                  // Handler errors here
              }
              @Override
              public void onComplete() {
                  // All your book objects have been fetched. Done!
              }
          });

在这里,有一个getBooks()方法,它会发起网络调用并获取图书列表。网络调用会耗费一定的时间(通常几毫秒到几秒钟的时间),因此,我们使用subscribeOn()并指定Schedulers.io() Scheduler在I/O线程中执行操作。

我们同时还使用了observeOn()操作符和AndroidSchedulers.mainThread() Scheduler,以便于在主线程中消费结果并将图书列表填充到应用的UI之中。这些知识可能你之前已经了解过或使用过了。

不必担心,我们马上就会进入更高级的内容。现在展现的这些内容只是为了确保我们在同一个起跑线上,在深入介绍更深入内容之前能有一个基本的认识。

使用Scheduler

RxJava中的多线程操作主要是由强大的Scheduler集合提供的。在RxJava中,我们无法直接访问或操作线程。如果想要使用线程的话,必须要通过内置的Scheduler来实现。

你可以将Scheduler视为线程或线程池(一个线程的集合),能够执行不同类型的工作

简而言之,如果你需要在特定的线程中执行任务的话,我们就需要此选择恰当的Scheduler,Scheduler接下来会从它的池中获取一个可用的线程,并基于该线程执行任务。

在RxJava框架中有多种类型的Scheduler,但是这里比较有技巧的一点就是为合适的工作选择恰当的Scheduler。如果你没有选择恰当的Scheduler的话,那么任务就无法最优地运行,所以接下来,我们尝试理解每一个Scheduler。

Schedulers.io()

这是由无边界线程池作为支撑的一个Scheduler,它适用于非CPU密集的I/O工作,比如访问文件系统、执行网络调用、访问数据库等等。这个Scheduler是没有限制的,它的线程池可以按需一直增长。

Schedulers.computation()

这个Scheduler用于执行CPU密集的工作,比如处理大规模的数据集、图像处理等等。它由一个有界的线程池作为支撑,线程的最大数量就是可用的处理器数量。

因为这个Scheduler只适用于CPU密集的任务,我们希望限制线程的数量,这样的话,它们不会彼此抢占CPU时间或出现线程饿死的现象。

Schedulers.newThread()

这个Scheduler 每次都会创建一个全新的线程来完成一组工作。它不会从任何线程池中受益,线程的创建和销毁都是很昂贵的,所以你需要非常小心,不要衍生出太多的线程,导致服务器系统变慢或出现内存溢出的错误。

理想情况下,你应该很少使用这个Scheduler,它大多用于在一个完全分离的线程中开始一项长时间运行、隔离的一组任务。

Schedulers.single()

这个Scheduler是RxJava 2新引入的,它的背后只有一个线程作为支撑,只能按照有序的方式执行任务。如果你有一组后台任务要在App的不同地方执行,但是同时只能承受一个任务执行的话,那么这个Scheduler就可以派上用场了。

Schedulers.from(Executor executor)

我们可以使用它创建自定义的Scheduler,它是由我们自己的Executor作为支撑的。在有些场景下,我们希望创建自定义的Scheduler为App执行特定的任务,这些任务可能需要自定义的线程逻辑。

假设,我们想要限制App中并行网络请求的数量,那么我们就可以创建一个自定义的Scheduler,使其具有一个固定线程池大小的Executor:Scheduler.from(Executors.newFixedThreadPool(n)),然后将其应用到代码中所有网络相关的Observable上。

AndroidSchedulers.mainThread()

这是一个特殊的Scheduler,它无法在核心RxJava库中使用,要使用它,必须要借助RxAndroid扩展库。这个Scheduler对Android App特别有用,它能够在应用的主线程中执行基于UI的任务

默认情况下,它会在应用主线程关联的looper中进行任务排队,但是它有一个其他的变种,允许我们以API的形式使用任意的Looper:AndroidSchedulers.from(Looper looper)

注意:在使用无边界线程池支撑的Scheduler时,比如Schedulers.io(),我们要特别小心,因为它有可能会导致线程池无限增长,使系统中出现大量的线程。

理解subscribeOn()和observeOn()

现在,我们对不同类型的Scheduler已经有了一个清晰的理解,并且掌握了何时该使用哪种Scheduler,我们继续前进,接下来会详细介绍subscribeOn()observeOn()操作符。

要理解RxJava真正的多线程功能,我们需要深入理解这两个操作符分别如何运行以及何时要将它们组合起来。

subscribeOn()

简而言之,这个操作符指定源observable要在哪个线程中发出各个条目。读者需要理解这里“源(source)” observable的含义,如果你有一个observable链的话,源observable始终位于根部或者说是链的顶部,也就是产出物(emission)生成的地方。

读者也看到了,如果我们不使用subscribeOn()的话,所有的产出都是直接在代码执行的线程中生成的(在我们的场景中,也就是main线程)。

接下来,我们将所有的产出物在计算线程中生成,这需要使用subscribeOn()Schedulers.computation() Scheduler。如果你运行下面的代码片段,所有产出物是在线程池中某个可用的计算线程中生成的,RxComputationThreadPool-1

简洁起见,我们没有使用完整的DisposableSubscriber,因为在这种简单的场景中,我们不需要每次都处理onError()onComplete(),我们只需要处理onNext(),它只是一个简单的消费者。

Observable.just(1, 2, 3, 4, 5, 6)
          .subscribeOn(Schedulers.computation())
          .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName()))
          .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));

在链中,将subscribeOn()放到什么位置其实无关紧要。它只会影响源observable并控制在哪个线程中生成条目。

在上面的样例中,读者可能会注意到map()filter()操作符也会生成其他的observable,subscribeOn()放到了链的底部。但是,如果你运行下面的代码片段的话,就会发现它只会影响源observable。如果将observeOn()同时加入到链中,会更加清晰。即便我们将subscribeOn()放到observeOn()下面,它也只会影响源observable

Observable.just(1, 2, 3, 4, 5, 6)
          .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName()))
          .map(integer -> integer * 3)
          .filter(integer -> integer % 2 == 0)
          .subscribeOn(Schedulers.computation())
          .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));

另外需要注意的是,我们不能在链中多次使用subscribeOn()。从技术上讲,你可以这样做,但是它并不会产生额外的作用。在下面的代码片段中,我们将三个不同的Scheduler连接到了一起,你能猜出来,哪个Scheduler会成为源observable吗?

Observable.just(1, 2, 3, 4, 5, 6)
           .subscribeOn(Schedulers.io())
           .subscribeOn(Schedulers.computation())
           .subscribeOn(Schedulers.newThread())
           .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName()))
           .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));

如果你的答案是Schedulers.io(),那么恭喜你答对了!

即便我们在链中放置多个subscribeOn()操作符,只有最靠近源observable的那一个会发挥作用。

背后的原理

我们值得花一些时间更深入地理解一下上述的场景。为什么Schedulers.io() Scheduler会发挥作用,而不是其他的Scheduler?正常情况下,你可能会认为Schedulers.newThread()会生效,因为它是在链的最后才添加上去的。

我们必须要理解,在RxJava中,订阅(subscription)必须是基于上游observable实例的。如下的代码与我们前面看到的非常类似,只是更加繁琐一些。

Observable<Integer> o1 = Observable.just(1, 2, 3, 4, 5);
Observable<Integer> o2 = o1.filter(integer -> integer % 2 == 0);
Observable<Integer> o3 = o2.map(integer -> integer * 10);
o3.subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));

我们从片段的最后一行开始理解这段代码。在这里,目标订阅者(或者说链中最下游的observer)基于observable o3调用subscribe()方法,这样会隐式地对它的直接上游observable o2调用subscribe()方法。o3所提供的observer实现会将生成的数字乘以10。

这个过程会重复进行,o2会隐式地基于o1调用subscribe,所传入的observer实现会将偶数过滤出来并允许其通过。现在,我们到达了根部,源observable o1没有任何上游observable来调用subscribe这实际上就完成了observable链,源observable就能生成其条目了

对于RxJava中订阅是如何实现的,读者应该就会比较清楚了。到目前为止,对observable链如何组成以及事件如何从源observable开始在链中传播应该有了一个基本的了解。

observeOn()

正如我们刚刚看到的,subscribeOn()能够指明源observable要在一个特定的线程中生成其条目,这个线程还会负责将这些条目一直推送到sink Subscriber中。因此,默认情况下,订阅者也会在这个线程中消费这些条目

但是,我们的应用所期望的行为往往并非总是如此。假设,我们想要通过网络获取一些数据并在App的UI中展现。

本质上,我们有两件事情需要完成:

  • 在非阻塞的I/O线程上执行网络调用;
  • 在应用的主(UI)线程上消费得到的结果。

我们需要有一个observable在I/O线程进行网络调用,并将产出传递给目标订阅者。如果你只是使用subscribeOn()Schedulers.io()的话,最终的订阅者将会在I/O线程中进行操作。我们无法在主线程之外的其他线程中访问UI组件,因此我们如果这样做的话,会遇到麻烦。

现在,我们迫切需要切换线程,而这就是observeOn()操作符能够发挥作用的地方。在observable链中,如果遇到observeOn(),那么产出物将会立即切换到所指定的线程中。

getIntegersFromRemoteSource()
         .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName()))
         .subscribeOn(Schedulers.io())
         .observeOn(AndroidSchedulers.mainThread())
         .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));

在这个稍微有点牵强的样例中,我们有一个observable从网络上获取整数所组成的流。在实际的用例中,这可以换成任意的异步操作,比如读取大文件或从数据库获取数据等等。你可以尝试该代码片段并查看结果,需要关注的是日志中的线程名。

现在,我们看一个稍微复杂一点的样例,这里会使用多个observeOn()操作符,这样在我们的observable链中会多次切换线程。

getIntegersFromRemoteSource()
     .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName()))
     .subscribeOn(Schedulers.io())
     .observeOn(Schedulers.computation())
     .map(integer -> {
         println("Mapping item " + integer + " on: " + currentThread().getName());
         return integer * integer;
     })
     .observeOn(Schedulers.newThread())
     .filter(integer -> {
         println("Filtering item " + integer + " on: " + currentThread().getName());
         return integer % 2 == 0;
     })
     .observeOn(AndroidSchedulers.mainThread())
     .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));

在上面的代码片段中,源observable在I/O线程生成其条目,因为这里组合使用了subscribeOn()Schedulers.io()。现在,我们想要使用map()操作符来转换每个条目,只不过需要在计算线程中进行。为了实现这一点,我们可以在map()操作符之前组合使用observeOn()Schedulers.computation(),这样的话就能切换线程并将产出物传递到计算线程中了。

接下来,需要过滤条目,但是因为某些原因,我们想在一个全新的线程中进行操作。这样的话,我们可以在filter()操作符前面组合使用observeOn()Schedulers.newThread(),这样的话,就能为每个条目切换至一个新的线程。

最后,我们希望订阅者消费最终处理后的条目并在UI上展现结果,为了实现这一点,我们需要再次切换线程,不过这一次需要切换至主线程,这里需要组合使用observeOn()AndroidSchedulers.mainThread() Scheduler。

但是,如果我们多次地连续使用observeOn()会怎样呢?在下面的代码片段中,最终的订阅者在消费结果的时候,到底使用的是哪个线程呢?是第一个还是最后一个observeOn()能发挥作用呢?

getIntegersFromRemoteSource()
          .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName()))
          .subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread())
          .observeOn(Schedulers.single())
          .observeOn(Schedulers.computation())
          .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));

如果你运行这个代码片段的话,你会发现所有的条目都是在RxComputationThreadPool-1线程中消费的,这意味着最后的observeOn()Schedulers.computation()发挥了作用。但是,为什么呢?

背后的原理

你可能已经猜到,为何是最后的observeOn(),而不是其他的observeOn()发挥作用了。我们已经知道,订阅只能针对上游才能发生,另外一方面,产出物只能面对下游才能发挥作用。它们从源observable开始,一路沿着链向下,直至最后的sink subscriber。

observeOn()操作符只能针对下游observable才有效,所以最后的observeOn()Schedulers.computation()覆盖了前面声明的observeOn()操作符。因此,每当你想为特定的observable切换线程时,需要做的就是指定observeOn()操作符。同步、状态不一致性、竞态条件以及其他线程相关的问题都会在幕后自动处理好了。

到目前为止,相信你已经对如何使用RxJava编写多线程的应用并保证App的快速和流畅有了很好的理解。

如果你还没有充分理解和吸收文中的知识的话,其实也不要紧,读者可以多读几遍并亲自尝试一下样例代码,这样的话,会有更深刻的理解。对于一篇文章来说,这些内容确实有些多,你可以在这上面多花一点时间。

感谢覃云对本文的审校。

评价本文

专业度
风格

您好,朋友!

您需要 注册一个InfoQ账号 或者 才能进行评论。在您完成注册后还需要进行一些设置。

获得来自InfoQ的更多体验。

告诉我们您的想法

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我

帮助很大 by Rose Funn

不错,对rxjava入门者帮助很大。自己也写过各种demo来测试emit、consume等各环节所处的线程,本文归纳得更为系统。

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我

1 讨论

登陆InfoQ,与你最关心的话题互动。


找回密码....

Follow

关注你最喜爱的话题和作者

快速浏览网站内你所感兴趣话题的精选内容。

Like

内容自由定制

选择想要阅读的主题和喜爱的作者定制自己的新闻源。

Notifications

获取更新

设置通知机制以获取内容更新对您而言是否重要

BT