本文要点
*响应式编程是一种处理异步数据流的规范
*响应式为数据流的转换和聚合以及数据流的控制管理提供了工具支持
*弹珠交互图(Marble Diagram)以可交互的方式可视化响应式的结构
*响应式编程风格看起来跟Java Streams API有点相似,不过本质上是不一样的
*如何连接到动态流处理异步数据源
本文秉承同一风格,用相同的例子循序渐进地介绍了RxJava2。译者将两篇文章中的不同之处用粗体标识出来,以供读过前篇的读者快速浏览本文。
在高并发编程范式的发展过程中,我们使用过很多工具,比如java.util.concurrent包、Akka
Streams框架、CompletableFuture类以及Netty框架。响应式编程近来大受欢迎,这要得益于它强大的功能和健壮的工具包。
响应式编程是一种处理异步数据流的规范,它为数据流的转换和聚合以及数据流的控制管理提供了工具支持,它让考量程序整体设计的工作变得简单。
但它使用起来并不简单,它的学习曲线也并不平坦。对于我们当中的那些数学家来说,学习响应式就好比当初他们从学习标准代数的无向量过渡到学习线性代数的向量、矩阵和张量,它们实际上是被单元化的数据流。传统的编程模式以对象为基础,而响应式以事件流为基础。事件可能以多种形式出现,比如对象、数据源、鼠标移动信息或者异常。在传统的编程范式里,“异常”这个词描述的是对意外情况的处理,因为在这个背景下,没有按照预想发生的情况都算异常。而在响应式编程范式里,异常却是一等公民。因为数据流一般是异步的,所以抛出异常是没有意义的,任何一个异常都会被当成数据流里的一个事件。
在这篇文章里,我们会探讨响应式编程的基本原理,以一种教与学的方式来强化一些重要的概念。
首先要记住的是,响应式里所有的东西都是流。Observable封装了流,是最基本的单元。流可以包含零个或多个事件,有未完成和已完成两种状态,可以正常结束也可以发生错误。如果一个流正常完成或者发生错误,说明处理结束了,虽然有些工具可以对错误进行重试或者使用不同的流替换发生错误的流。
在运行我们给出的例子之前,需要把RxJava的依赖加入到项目里。可以在Maven里加入这个依赖:
<dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>2.0.5</version> </dependency> |
Observable类有几个静态工厂方法和实例方法,它们被用来生成各种新的Observable对象,或者把Observable对象添加到感兴趣的处理流程里。Observable是可变的,所以针对它们的操作总是会生成新的Observable对象。为了更好地理解我们的例子,我们先来温习一下Observable的基本操作,因为在后面的例子里会用到它们。
Observable.just方法生成一个简单对象,然后返回。例如:
Observable.just("Howdy!") |
这行代码生成一个新的Observable对象,在结束之前触发一个单独的事件,生成字符串“Howdy!”。
可以把新生成的Observable对象赋给一个Observable变量:
Observable<String> hello = Observable.just("Howdy!"); |
不过知道这个还远远不够。就像那个著名的哲学问题一样,森林里的一颗树倒下来,如果周围没有人听见,那么就等于说树的倒下是无声无息的。一个Observable对象必须要有一个订阅者来处理它所生成的事件。所幸的是,现在Java支持Lambda表达式,我们就可以使用简洁的声明式风格来表示订阅操作:
Observable<String> howdy = Observable.just("Howdy!"); howdy.subscribe(System.out::println); |
这段代码仍然会生成字符串“Howdy!”。
跟Observable的其它方法一样,just方法可以被重载:
Observable.just("Hello", "World") .subscribe(System.out::println); |
这行代码会输出
just方法可以被重载,最多可以接收10个参数。这里要注意,输出的结果分成两行显示,说明它们是两个独立的事件。
让我们来看看如果使用列表会发生什么情况:
List<String> words = Arrays.asList( "the", "quick", "brown", "fox", "jumped", "over", "the", "lazy", "dog" );
Observable.just(words)
.subscribe(System.out::println); |
这段代码输出一个很平常的结果:
[the, quick, brown, fox, jumped, over, the, lazy, dog] |
我们本以为每个单词会是一个单独的事件,但实际上整个列表被当成了一个事件。为了达到我们想要的结果,我们引入fromIterable方法:
Observable.fromIterable(words) .subscribe(System.out::println); |
(注意在rxjava1中,有一个重载过的from方法。该方法已被多种from方法的具体实现所替代,包括fromIterable和fromArray等。)
这行代码把数组或者列表转换成一系列事件,每个元素就是一个事件。
执行这行代码会得到我们想要的多行输出:
the quick brown fox jumped over the lazy dog |
为了能从中获取编号,我们要在Observable上多做一些工作。
不过在写代码之前,我们先来看看另外两个操作,range和zip。range(i,n)会创建一个包含n个数的流,它的第一个数是从i开始的。如果我们有办法把这种区间流跟上面的单词流组合在一起,就可以解决编号的问题。
Observable.range(1, 5).subscribe(System.out::println); |
输出:
如果有一种将区间流与我们的单词流合并的方法,就会解决添加编号的问题。
RX Marbles这个网站对我们学习响应式编程很有帮助。这个网站使用JavaScript渲染大部分响应式操作,而且是可交互的。每个响应式操作使用“弹珠”来描述一个或多个源流(source
stream)以及由操作生成的结果流(result stream)。时间从左到右,事件用弹珠表示。单击或者拖动弹珠,可以看到它们是如何影响结果的。
执行一个zip操作就跟遵照医嘱一样简单。让我们用弹珠交互图来解释一下这个过程:
zip操作通过成对的“zip”映射转换把源流的元素跟另一个给定流的元素组合起来,其中的映射可以使用Lambda表达式来表示。只要其中的一个流完成操作,整个zip操作也跟着停止,另一个未完成的流剩下的事件就会被忽略。zip可以支持最多9个源流的zip操作。zipWith操作可以把一个指定流合并到一个已存在的流里。
现在回到我们的例子上,我们可以使用range和zipWith操作加入编号,并用String.format做映射转换:
Observable.fromIterable(words) .zipWith(Observable.range(1, Integer.MAX_VALUE), (string, count)->String.format("%2d. %s", count, string)) .subscribe(System.out::println); |
这段代码会输出:
1. the 2. quick 3. brown 4. fox 5. jumped 6. over 7. the 8. lazy 9. dog |
看起来很不错!注意一旦任何一个流的操作完成,zip和zipWith操作就停止对所有流的拉取。这就是为什么不受Integer.MAX_VALUE上限限制的原因。
现在假设我们要列出单词里的字母而不是单词本身,这个时候要用到flatMap,flatMap会从Observable里获取事件源(对象、集合或数组),并把这些元素分别映射成Observable,然后把这些Observable扁平化成一个单独的Observable。
对于我们的例子来说,我们会先用split方法把每个单词拆分成一个字母数组,然后用flatMap创建一个新的Observable对象,这个Observable对象包含了组成这些单词的所有字母:
Observable.fromIterable(words) .flatMap(word -> Observable.fromArray(word.split(""))) .zipWith(Observable.range(1, Integer.MAX_VALUE), (string, count) -> String.format("%2d. %s", count, string)) .subscribe(System.out::println); |
这段代码会输出:
1. t 2. h 3. e 4. q 5. u 6. i 7. c 8. k ... 30. l 31. a 32. z 33. y 34. d 35. o 36. g |
所有单词的字母都出现在这里。不过这样太繁琐了,我们希望相同的字母只出现一次:
Observable.fromIterable(words) .flatMap(word -> Observable.fromArray(word.split(""))) .distinct() .zipWith(Observable.range(1, Integer.MAX_VALUE), (string, count) -> String.format("%2d. %s", count, string)) .subscribe(System.out::println); |
这段代码输出:
1. t 2. h 3. e 4. q 5. u 6. i 7. c 8. k 9. b 10. r 11. o 12. w 13. n 14. f 15. x 16. j 17. m 18. p 19. d 20. v 21. l 22. a 23. z 24. y 25. g |
我们从小被告知“quick brown fox”这个全字母短句包含了英语里所有的字母,不过在这里我们只看到25个,而不是26个。现在让我们对这些字母进行排序,找出丢失的那个字母:
.flatMap(word -> Observable.fromIterable(word.split(""))) .distinct() .sorted() .zipWith(Observable.range(1, Integer.MAX_VALUE), (string, count) -> String.format("%2d. %s", count, string)) .subscribe(System.out::println); |
这段代码输出:
1. a 2. b 3. c ... 17. q 18. r 19. t 20. u 21. v 22. w 23. x 24. y 25. z |
看样子是字母“s”丢掉了。为了得到我们期望的结果,需要对数组做一点修改:
List<String> words = Arrays.asList( "the", "quick", "brown", "fox", "jumped", "over", "the", "lazy", "dogs" );
Observable.fromIterable(words)
.flatMap(word -> Observable.fromArray(word.split("")))
.distinct()
.sorted()
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d.
%s", count, string))
.subscribe(System.out::println); |
修改后的代码输出为:
1. a 2. b 3. c 4. d 5. e 6. f 7. g 8. h 9. i 10. j 11. k 12. l 13. m 14. n 15. o 16. p 17. q 18. r 19. s 20. t 21. u 22. v 23. w 24. x 25. y 26. z |
现在好了!
但是到目前为止,所有的代码都跟Java 8里引入的Streams API很相似,不过这种相似只是一种巧合,因为响应式包含的内容远不止这些。
Java Streams和Lambda表达式为编程语言带来很大的价值,不过归根结底,它们只是提供了一种方式来遍历集合和生成集合。它们的作用很有限,而且缺乏可扩展性和可重用性。尽管Stream的parallel操作可以并行执行任务,但在返回结果前程序无法对整个过程进行干预。相反,响应式引入了执行时间、节流、流量控制等概念,而且它们可以被连接到“永不停止”的处理流程里。响应式产生的结果虽然不是集合,但你可以用任何期望的方式来处理这些结果。
让我们通过弹珠交互图更好地理解这些概念。
merge操作可以把最多9个源流合并到一个结果里,而且可以保留它们的顺序。无需担心这里会出现竞赛条件,因为所有的事件都被“扁平化”到一个单独的线程里,包括异常事件和结束事件。
debounce操作会把在一个时间段内紧挨在一起的几个事件看成一个单独事件,这几个事件里只有最后一个会被触发:
可以看到,上下两个图中的“1”之间有一个指定的时间间隔,而2、3、4、5之间的时间间隔都小于这个间隔,所以它们被看成单个事件。如果把“5”往右挪一点,结果就不一样了:
另一个有趣的操作是amb,它是一种不确定性的操作。对应的数组形式操作是ambArray。
amb操作会从所有的输入流中选择第一个出现的流,然后忽略其它剩下的流。如下图,第二个流是最先出现的,所以amb操作选择了这个流。
如果把第一个流里的“20”往左移动,超过第二个流的第一个元素,那么生成的结果又会不一样:
如果你有一个需要接入到某个数据源的处理流程,比如从消息主题上获取数据,可能是Bloomberg或者Reuters,你并不关心接入的到底是哪一个,只要从中选择一个就可以了。在这种情况下,amb操作就会很有用。
Tick Tock
现在,我们可以使用这些工具基于流生成各种有意义的结果。在接下来的这个例子里,我们有一个数据源,它会每秒钟生成一个事件。不过为了节省CPU,我们让它在周末时每三秒生成一次。我们使用混合型的“节奏器”按照一定的节奏生成数据。
首先,我们要创建一个返回boolean的方法,它会检查当前时间是否是周末,如果是就返回true,否则就返回false:
private static boolean isSlowTickTime() { return LocalDate.now().getDayOfWeek() == DayOfWeek.SATURDAY || LocalDate.now().getDayOfWeek() == DayOfWeek.SUNDAY; } |
对于边读这篇文章边在IDE里执行这段代码的读者来说,他们可能不想等到下个周末才来验证这个方法是否可行,所以可以使用下面的替代实现,这个实现会在一个15秒钟内返回true,在另一个15秒钟内返回false:
private static long start = System.currentTimeMillis(); public static Boolean isSlowTickTime() { return (System.currentTimeMillis() - start) % 30_000 >= 15_000; } |
接下来我们创建两个Observable对象,fast和slow,然后使用过滤器对它们进行调度,并把它们合并起来。
我们使用Observable.interval操作来安排调度,它会在每个指定的时间间隔内产生一次数据(从0开始计算)。
Observable<Long> fast = Observable.interval(1, TimeUnit.SECONDS); Observable<Long> slow = Observable.interval(3, TimeUnit.SECONDS); |
fast每秒生成一个事件,slow每三秒生成一个事件(我们会忽略事件的值,因为我们只对执行时间感兴趣)。
现在我们把这两个Observable合并到一起,通过使用过滤器让fast流在工作日生成数据(或者在15秒内),slow流在周末生成数据(或者在另一个15秒内)。
Observable<Long> clock = Observable.merge( slow.filter(tick-> isSlowTickTime()), fast.filter(tick-> !isSlowTickTime()) ); |
最后,我们要添加一个打印时间的订阅动作。在执行这些代码时,它会根据我们的调度安排打印出系统时间。
clock.subscribe(tick-> System.out.println(new Date()));
为了防止程序中途退出,需要在方法的末尾添加一行代码(注意要处理InterruptedException异常)。
运行代码的结果:
Fri Sep 16 03:08:18 BST 2016 Fri Sep 16 03:08:19 BST 2016 Fri Sep 16 03:08:20 BST 2016 Fri Sep 16 03:08:21 BST 2016 Fri Sep 16 03:08:22 BST 2016 Fri Sep 16 03:08:23 BST 2016 Fri Sep 16 03:08:24 BST 2016 Fri Sep 16 03:08:25 BST 2016 Fri Sep 16 03:08:26 BST 2016 Fri Sep 16 03:08:27 BST 2016 Fri Sep 16 03:08:28 BST 2016 Fri Sep 16 03:08:29 BST 2016 Fri Sep 16 03:08:30 BST 2016 Fri Sep 16 03:08:31 BST 2016 Fri Sep 16 03:08:32 BST 2016 Fri Sep 16 03:08:35 BST 2016 Fri Sep 16 03:08:38 BST 2016 Fri Sep 16 03:08:41 BST 2016 Fri Sep 16 03:08:44 BST 2016 . . . |
可以看到,前面15个事件之间的时间间隔都是1秒,后面15秒内的事件之间的时间间隔是3秒,就像我们所期望的那样。
连接到已存在的数据源
以上方法用于创建能够生成静态数据的Observable是没有问题的。但如何把Observable连接到已有的数据源上,并享受响应式的流量控制和流操作策略为我们带来的好处呢?
在继续介绍之前,我们应该认识一下RxJava2新引入的一些类。
静态Observable、动态Observable和Flowable
在RxJava的前期版本中,即使对于无需流控制的小型流,Observable也给出了流控制方法。为符合响应式的规范,RxJava2将流控制从Observable类中移除,并引入了新的Flowable类。Flowable可以看作是提供流控制的Observable。
到目前为止我们讨论的都是静态Observable,它们提供静态的数据,尽管我们可以在执行时间上做一些调节,不过这远远
不够。静态Observable只在有订阅者的情况下才会生成事件,而且订阅者收到的是历史数据,不管它们是从何时开始订阅的。相反,动态Observable不管有多少个订阅者都会生成数据,而且只生成最新的数据(除非使用了缓存)。可以通过两个步骤把静态Observable转化成动态Observable:
调用Observable的publish方法,生成一个新的ConnectableObservable
调用ConnectableObservable的connect方法,开始生成数据
这种方式也能工作,但并不支持任何流控制。要连接到长期运行的现有数据源上,除非是提供背压控制,我们通常会选择使用Flowable,使用一种Observable的并行语法。
1a. 调用Flowable的publish方法生成一个新的ConnectableFlowable
2a. 调用ConnectableFlowable的connect方法开始生成数据
要连接到一个已有的数据源上,可以在这个数据源上添加监听器(如果你喜欢这么做),监听器会把事件传播给订阅者,然后在每个事件发生时调用订阅者的onNext方法。在实现监听器的时候要确保每个订阅者仍然处于订阅状态,否则就要停止把事件传播给它,同时要注意回压信号。所幸的是,这些工作可以由Flowabled的create方法来处理。假设我们有一个叫做SomeFeed的数据服务,它会生成报价事件,同时有一个SomeListener监听这些报价事件以及其它生命周期事件。在GitHub上已经有一个实现,如果你想自己动手运行这些代码,可以去下载。
我们的数据源监听器有两个方法:
public void priceTick(PriceTick event); public void error(Throwable throwable); |
PriceTick类包含了date、instrument和price字段,还有一个isLast方法用来判断它是否是最后一个事件:
让我们来看看如何使用AsyncEmitter把Observable连接到一个实时的数据源上:
SomeFeed<PriceTick> feed = new SomeFeed<>(); Flowable<PriceTick> flowable = Flowable.create(emitter -> { SomeListener listener = new SomeListener() { @Override public void priceTick(PriceTick event) { emitter.onNext(event); if (event.isLast()) { emitter.onComplete(); } }
@Override
public void error(Throwable e) {
emitter.onError(e);
}
};
feed.register(listener);
}, BackpressureStrategy.BUFFER);
flowable.subscribe(System.out::println); |
这段代码几乎是逐字逐句地从Flowable类的Javadoc里摘抄出来的。Flowable封装了监听器(第3行)的创建过程,并把它注册到数据源上(第17行)。Flowable直接让订阅者对自己进行了订阅。数据源生成的事件被委托给了listener(第6行)。第18行告诉观察者要缓冲所有的事件通知,直到它们被订阅者消费。除了缓冲,还有其它几种回压策略:
BackpressureMode.MISSING不使用回压。如果流的速度无法保持同步,可能会抛出MissingBackpressureException或IllegalStateException。
BackpressureStrategy.ERROR会在下游跟不上速度时抛出MissingBackpressureException。
BackpressureStrategy.DROP会在下游跟不上速度时把onNext的值丢弃。
BackpressureStrategy.LATEST会一直保留最新的onNext的值,直到被下游消费掉。
这样生成的是静态Flowable。静态Observable在没有订阅者的时候不会生成数据,而且所有订阅者收到的是同样的历史数据,而这不是我们想要的。
为了把它转化成动态Observable,让所有订阅者可以实时地接收事件通知,我们必须调用publish和connect方法,就像之前提到的那样:
ConnectableFlowable<PriceTick> hotObservable = flowable.publish(); hotObservable.connect(); |
最后,我们可以对它进行订阅并显示报价:
hotObservable.subscribe((priceTick) -> System.out.printf("%s %4s %6.2f%n", priceTick.getDate(), priceTick.getInstrument(), priceTick.getPrice())); |
|