上篇博客我们使用RxJS的一些知识实现了一个贪吃蛇的小游戏,对RxJS有了一定的了解。接下来几篇博客会从基础开始,总结一下RxJS的概念、操作符、多播、调度器等知识点。
今天先来看一下Observable的几个相关概念。
Observable可观察对象
概述
什么是Observable?讲道理我也不太清楚到底什么是Observable,硬要一个定义的话,可以说Observable是一个以推送Push方式产生一个或多个值(同步或者异步)的集合。我们以推送方式和产生值的数量来对JavaScript常见的几种概念来个分类:
| 单值SINGLE | 多值MULTIPLE | |
|---|---|---|
| 拉取PULL | Function | Iterator |
| 推送PUSH | Promise | Observable |
Function函数,每个函数只能有一个return出来的值,而且当调用它时,会同步的发出值。生产者foo()的值是由消费者x主动Pull拉取来的,单值。
1 | function foo() { |
Iterator迭代器,迭代器可以产生多个值,需要消费者自己Pull取用。想想async/await。
1 | const arr = [1, 2, 3]; |
Promise承诺(感觉怪怪的),承诺以推送Push的方式发出(或者不发出)单值。承诺在创建的时候会立即计算出结果,而且只执行一次(跟Observable的区别)。
1 | const promise = new Promise((resolve, reject) => { |
Observable可观察对象,今天的重点,以推送Push的方式同步或异步的发出一个或多个值,直接看代码:
1 | import { Observable } from 'rxjs'; |
想要调用observable并且看到值,我们需要订阅subscribe它:
1 | console.log('just before subscribe'); |
Pull && Push
Pull拉取模式,消费者Consumer决定什么时候从生产者Producer处拿值。生产者不知道数据什么时候会发给消费者。如上面的Function和Iterator,只有当函数foo()调用或者变量iterator调用next()方法时,即“拉取Pull”时,才会“消费”值。
Push推送模式,生产者Producer决定什么时候发出值,消费者不知道数据什么时候过来。Promise是最常见的推送模式,promise推送resolved值到回调函数。RxJS的Observable也是推送模式,不过它可以推送多个值(同步或异步)。
Observable & Function & EventEmitters & Promise 异同
Observable有点像无参数的函数Function,但是是可以产生多个值。先看下面:
1 | // Function 形式 |
Function和Observable都是延迟计算lazy computation,如果你不调用函数foo(),那么console.log('hello')就不会运行;同样,如果你不订阅(使用subscribe)Observable,那么console.log('hello')也不会运行。而且不管调用还是订阅都是一个独立的操作:多次调用或者多次订阅的副作用side effects是相互独立,互不影响的。
Observable和EventEmitter有些类似,但Observable适用范围更广,且数据在交付前可以预处理:
1 | // EventEmitter |
这里,EventEmitter更像是使用了Subject的Observable,后面会再讨论Subject。
Observable经常被拿来跟Promise进行比较。有一些关键点的不同:
Observable是声明式的,在被订阅之前,它不会开始执行。Promise是在创建时就立即执行的,具体见上面的例子。Observable可以提供多个值,Promise只提供一个。Observable可以随着时间的推移获取多个值(流)。Observable可以通过管道pipe()串联处理数据,Promise()只有then()语句。Observable可以使得数据在被订阅之前进行处理。Observable的subscribe(observer)会负责处理错误error,Promise会把错误推给它的子Promise去处理。- 最重要一点,
Observable是可以通过unsubscribe()取消的。取消订阅就会移除监听器,不再接收将来的值。
最后简单提一下Observable和数组Array在运算方式上的不同。先看下面:
1 | // 数组链式调用 |
数组的链式调用,每一步都会等所有数据计算完,才继续走下一步。即.map()先返回一个数据[1, 4, 9, 16],然后.filter()过滤后,返回[9, 16];而Observable的管道,每一个值都会处理到底,再继续下一个值。即数字1先.map(),再.filter(),不符合;同样,2不符合;数字3进来,符合,打印出9;最后处理数字4,打印16。
subscriber订阅者函数
当我们手动创建一个Observable的实例时,就需要传入一个订阅者函数subscriber。当有消费者调用subscribe()方法时,这个函数就会执行。
1 | import { Observable } from 'rxjs'; |
不过,我们一般情况下不使用这种形式创建Observable,而是使用creation functions,如of()、fromEvent()、interval()等等。
Observables can be created with
new Observable. Most commonly, observables are created using creation functions, likeof,from,interval, etc.
注意,complete()后,后面的值将不会再发出。自定义subscriber函数时,最好将代码用try/catch包裹,处理错误。
订阅Subscribing
只有当有人订阅 Observable 的实例时,它才会开始发布值。 订阅时要先调用该实例的 subscribe() 方法,并把一个观察者对象observer传给它,用来接收通知。observer定义了收到数据时的处理器handler。同时subscribe()的调用会返回一个Subscription对象,这个对象有一个unsubscribe()方法,用来取消订阅。
1 | import { Observable } from 'rxjs'; |
订阅后,Observable发出值,observer接收值,处理值。注意:多次订阅互不影响。
Subscribing to an Observable is like calling a function, providing callbacks where the data will be delivered to.
另外,订阅时还可以直接传入回调函数,next 处理器都是必要的,而 error 和 complete 处理器是可选的,注意传入顺序。
1 | observable.subscribe( |
每当调用subscribe()方法时,都会返回一个Subscription对象,这个对象有一个unsubscribe()方法,用来取消订阅。类似于setInterval()和setTimeout()返回id的形式。
1 | const subscription = observable.subscribe(observer); |
observer观察者
观察者对象observer定义了一些回调函数来处理可观察对象Observable可能发来的三种通知:
| 通知类型 | 说明 |
|---|---|
| next | 必要。用来处理每个送达值。在开始执行后可能执行零次或多次。 |
| error | 可选。用来处理错误通知。错误会中断这个可观察对象实例的执行过程。 |
| complete | 可选。用来处理执行完毕(complete)通知。当执行完毕后,这些值就会继续传给下一个处理器。 |
以上就是Observable的一些基础知识点,简单记忆就是:
1 | const subscription = new Observable(subscriber).subscribe(observer); |
接下来我们会先总结一下Observable的创建函数和操作符,RxJS繁琐就在于creation function和operators特别多,需要费点时间去记忆和理解,但也正因为这样,RxJS在处理流数据的时候才能“因材施教”,得心应手。过完这些知识点后,我们再去总结Subject和Scheduler这两个难搞的点。