上篇博客我们使用RxJS
的一些知识实现了一个贪吃蛇的小游戏,对RxJS
有了一定的了解。接下来几篇博客会从基础开始,总结一下RxJS
的概念、操作符、多播、调度器等知识点。
今天先来看一下Observable
的几个相关概念。
Observable可观察对象
概述
什么是Observable
?讲道理我也不太清楚到底什么是Observable
,硬要一个定义的话,可以说Observable
是一个以推送Push
方式产生一个或多个值(同步或者异步)的集合。我们以推送方式和产生值的数量来对JavaScript
常见的几种概念来个分类:
单值SINGLE | 多值MULTIPLE | |
---|---|---|
拉取PULL | Function | Iterator |
推送PUSH | Promise | Observable |
Function
函数,每个函数只能有一个return
出来的值,而且当调用它时,会同步的发出值。生产者foo()
的值是由消费者x
主动Pull
拉取来的,单值。
1 | function foo() { |
Iterator
迭代器,迭代器可以产生多个值,需要消费者自己Pull
取用。想想async/await
。
1 | const arr = [1, 2, 3]; |
Promise
承诺(感觉怪怪的),承诺以推送Push
的方式发出(或者不发出)单值。承诺在创建的时候会立即计算出结果,而且只执行一次(跟Observable
的区别)。
1 | const promise = new Promise((resolve, reject) => { |
Observable
可观察对象,今天的重点,以推送Push
的方式同步或异步的发出一个或多个值,直接看代码:
1 | import { Observable } from 'rxjs'; |
想要调用observable
并且看到值,我们需要订阅subscribe
它:
1 | console.log('just before subscribe'); |
Pull && Push
Pull
拉取模式,消费者Consumer
决定什么时候从生产者Producer
处拿值。生产者不知道数据什么时候会发给消费者。如上面的Function
和Iterator
,只有当函数foo()
调用或者变量iterator
调用next()
方法时,即“拉取Pull
”时,才会“消费”值。
Push
推送模式,生产者Producer
决定什么时候发出值,消费者不知道数据什么时候过来。Promise
是最常见的推送模式,promise推送resolved
值到回调函数。RxJS
的Observable
也是推送模式,不过它可以推送多个值(同步或异步)。
Observable & Function & EventEmitters & Promise 异同
Observable
有点像无参数的函数Function
,但是是可以产生多个值。先看下面:
1 | // Function 形式 |
Function
和Observable
都是延迟计算lazy computation
,如果你不调用函数foo()
,那么console.log('hello')
就不会运行;同样,如果你不订阅(使用subscribe
)Observable
,那么console.log('hello')
也不会运行。而且不管调用
还是订阅
都是一个独立的操作:多次调用或者多次订阅的副作用side effects
是相互独立,互不影响的。
Observable
和EventEmitter
有些类似,但Observable
适用范围更广,且数据在交付前可以预处理:
1 | // EventEmitter |
这里,EventEmitter
更像是使用了Subject
的Observable
,后面会再讨论Subject
。
Observable
经常被拿来跟Promise
进行比较。有一些关键点的不同:
Observable
是声明式的,在被订阅之前,它不会开始执行。Promise
是在创建时就立即执行的,具体见上面的例子。Observable
可以提供多个值,Promise
只提供一个。Observable
可以随着时间的推移获取多个值(流)。Observable
可以通过管道pipe()
串联处理数据,Promise()
只有then()
语句。Observable
可以使得数据在被订阅之前进行处理。Observable
的subscribe(observer)
会负责处理错误error
,Promise
会把错误推给它的子Promise
去处理。- 最重要一点,
Observable
是可以通过unsubscribe()
取消的。取消订阅就会移除监听器,不再接收将来的值。
最后简单提一下Observable
和数组Array
在运算方式上的不同。先看下面:
1 | // 数组链式调用 |
数组的链式调用,每一步都会等所有数据计算完,才继续走下一步。即.map()
先返回一个数据[1, 4, 9, 16]
,然后.filter()
过滤后,返回[9, 16]
;而Observable
的管道,每一个值都会处理到底,再继续下一个值。即数字1先.map()
,再.filter()
,不符合;同样,2不符合;数字3进来,符合,打印出9;最后处理数字4,打印16。
subscriber订阅者函数
当我们手动创建一个Observable
的实例时,就需要传入一个订阅者函数subscriber
。当有消费者调用subscribe()
方法时,这个函数就会执行。
1 | import { Observable } from 'rxjs'; |
不过,我们一般情况下不使用这种形式创建Observable
,而是使用creation functions
,如of()
、fromEvent()
、interval()
等等。
Observables can be created with
new Observable
. Most commonly, observables are created using creation functions, likeof
,from
,interval
, etc.
注意,complete()
后,后面的值将不会再发出。自定义subscriber
函数时,最好将代码用try/catch
包裹,处理错误。
订阅Subscribing
只有当有人订阅 Observable
的实例时,它才会开始发布值。 订阅时要先调用该实例的 subscribe()
方法,并把一个观察者对象observer
传给它,用来接收通知。observer
定义了收到数据时的处理器handler
。同时subscribe()
的调用会返回一个Subscription
对象,这个对象有一个unsubscribe()
方法,用来取消订阅。
1 | import { Observable } from 'rxjs'; |
订阅后,Observable
发出值,observer
接收值,处理值。注意:多次订阅互不影响。
Subscribing to an Observable is like calling a function, providing callbacks where the data will be delivered to.
另外,订阅时还可以直接传入回调函数,next
处理器都是必要的,而 error
和 complete
处理器是可选的,注意传入顺序。
1 | observable.subscribe( |
每当调用subscribe()
方法时,都会返回一个Subscription
对象,这个对象有一个unsubscribe()
方法,用来取消订阅。类似于setInterval()
和setTimeout()
返回id
的形式。
1 | const subscription = observable.subscribe(observer); |
observer观察者
观察者对象observer
定义了一些回调函数来处理可观察对象Observable
可能发来的三种通知:
通知类型 | 说明 |
---|---|
next | 必要。用来处理每个送达值。在开始执行后可能执行零次或多次。 |
error | 可选。用来处理错误通知。错误会中断这个可观察对象实例的执行过程。 |
complete | 可选。用来处理执行完毕(complete)通知。当执行完毕后,这些值就会继续传给下一个处理器。 |
以上就是Observable
的一些基础知识点,简单记忆就是:
1 | const subscription = new Observable(subscriber).subscribe(observer); |
接下来我们会先总结一下Observable
的创建函数和操作符,RxJS
繁琐就在于creation function
和operators
特别多,需要费点时间去记忆和理解,但也正因为这样,RxJS
在处理流数据的时候才能“因材施教”,得心应手。过完这些知识点后,我们再去总结Subject
和Scheduler
这两个难搞的点。