RxJS学习总结-Observable

上篇博客我们使用RxJS的一些知识实现了一个贪吃蛇的小游戏,对RxJS有了一定的了解。接下来几篇博客会从基础开始,总结一下RxJS的概念、操作符、多播、调度器等知识点。

今天先来看一下Observable的几个相关概念。

Observable可观察对象

概述

什么是Observable?讲道理我也不太清楚到底什么是Observable,硬要一个定义的话,可以说Observable是一个以推送Push方式产生一个或多个值(同步或者异步)的集合。我们以推送方式和产生值的数量来对JavaScript常见的几种概念来个分类:

单值SINGLE 多值MULTIPLE
拉取PULL Function Iterator
推送PUSH Promise Observable

Function函数,每个函数只能有一个return出来的值,而且当调用它时,会同步的发出值。生产者foo()的值是由消费者x主动Pull拉取来的,单值。

1
2
3
4
5
6
7
8
9
function foo() {
console.log('hello');
return 47;
return 48; //被忽略
}
const x = foo.call(); // 或者foo();
console.log(x);
// hello
// 47

Iterator迭代器,迭代器可以产生多个值,需要消费者自己Pull取用。想想async/await

1
2
3
4
5
6
7
8
9
10
11
const arr = [1, 2, 3];
const iterator = arr[Symbol.iterator]();

iterator.next();
// { value: 1, done: false }
iterator.next();
// { value: 2, done: false }
iterator.next();
// { value: 3, done: false }
iterator.next();
// { value: undefined, done: true }

Promise承诺(感觉怪怪的),承诺以推送Push的方式发出(或者不发出)单值。承诺在创建的时候会立即计算出结果,而且只执行一次(跟Observable的区别)。

1
2
3
4
5
6
7
const promise = new Promise((resolve, reject) => {
console.log('hello');
resolve('world');
})
// hello
promise.then(res => console.log(res));
// world

Observable可观察对象,今天的重点,以推送Push的方式同步或异步的发出一个或多个值,直接看代码:

1
2
3
4
5
6
7
8
9
10
11
12
import { Observable } from 'rxjs';

const observable = new Observable(subscriber => {
console.log('hello'); // 不订阅,不会打印
subscriber.next(1);
subscriber.next(2);
setTimeout(() => {
subscriber.next(3); // 订阅后 1s 发出 3
subscriber.complete();
}, 1000);
})
// 创建后不会有任何值打印出来

想要调用observable并且看到值,我们需要订阅subscribe它:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
console.log('just before subscribe');
observable.subscribe({
next(x) { console.log('got value ' + x); },
error(err) { console.error('something wrong occurred: ' + err); },
complete() { console.log('done'); }
});
console.log('just after subscribe');
// just before subscribe
// hello
// got value 1
// got value 2
// just after subscribe
// got value 3
// done

Pull && Push

Pull拉取模式,消费者Consumer决定什么时候从生产者Producer处拿值。生产者不知道数据什么时候会发给消费者。如上面的FunctionIterator,只有当函数foo()调用或者变量iterator调用next()方法时,即“拉取Pull”时,才会“消费”值。

Push推送模式,生产者Producer决定什么时候发出值,消费者不知道数据什么时候过来。Promise是最常见的推送模式,promise推送resolved值到回调函数。RxJSObservable也是推送模式,不过它可以推送多个值(同步或异步)。

Observable & Function & EventEmitters & Promise 异同

Observable有点像无参数的函数Function,但是是可以产生多个值。先看下面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Function 形式
function foo() {
console.log('hello');
return 47;
}
const x = foo.call();
console.log(x); // hello 47
const y = foo.call();
console.log(x); // hello 47

// Observable 形式
import { Observable } from 'rxjs';
const foo = new Observable(subscriber => {
console.log('hello');
subscriber.next(47);
});

foo.subscribe(x => {
console.log(x); // hello 47
});
foo.subscribe(y => {
console.log(y); // hello 47
});

FunctionObservable都是延迟计算lazy computation,如果你不调用函数foo(),那么console.log('hello')就不会运行;同样,如果你不订阅(使用subscribeObservable,那么console.log('hello')也不会运行。而且不管调用还是订阅都是一个独立的操作:多次调用或者多次订阅的副作用side effects是相互独立,互不影响的。

ObservableEventEmitter有些类似,但Observable适用范围更广,且数据在交付前可以预处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// EventEmitter
function handler(e) {
// 数据无法被预处理
console.log('Clicked', e);
}
button.addEventListener('click', handler);
button.removeEventListener('click', handler);

// Observable
let clicks$ = fromEvent(buttonEl, 'click');
let subscription = clicks$.pipe(
map(e => e.target.value) // 预处理事件,直接发出value
).subscribe(val => console.log('Clicked', val))
subscription.unsubscribe();
// 除事件处理外,Observable还可以处理其他数据流

这里,EventEmitter更像是使用了SubjectObservable,后面会再讨论Subject

Observable经常被拿来跟Promise进行比较。有一些关键点的不同:

  1. Observable是声明式的,在被订阅之前,它不会开始执行。Promise是在创建时就立即执行的,具体见上面的例子。
  2. Observable可以提供多个值,Promise只提供一个。Observable可以随着时间的推移获取多个值(流)。
  3. Observable可以通过管道pipe()串联处理数据,Promise()只有then()语句。Observable可以使得数据在被订阅之前进行处理。
  4. Observablesubscribe(observer)会负责处理错误errorPromise会把错误推给它的子Promise去处理。
  5. 最重要一点,Observable是可以通过unsubscribe()取消的。取消订阅就会移除监听器,不再接收将来的值。

最后简单提一下Observable和数组Array在运算方式上的不同。先看下面:

1
2
3
4
5
6
7
8
9
10
11
12
// 数组链式调用
let arr = [1, 2, 3, 4];
let result = arr.map(num => num * num).filter(num => num > 5);
console.log(result); // [9, 16]

// Observable管道操作
import { from } from 'rxjs';
import { map, filter } from 'rxjs/operators';
from([1, 2, 3, 4]).pipe(
map(num => num * num),
filter(num => num > 5)
).subscribe(console.log); // 9, 16

数组的链式调用,每一步都会等所有数据计算完,才继续走下一步。即.map()先返回一个数据[1, 4, 9, 16],然后.filter()过滤后,返回[9, 16];而Observable的管道,每一个值都会处理到底,再继续下一个值。即数字1先.map(),再.filter(),不符合;同样,2不符合;数字3进来,符合,打印出9;最后处理数字4,打印16。

subscriber订阅者函数

当我们手动创建一个Observable的实例时,就需要传入一个订阅者函数subscriber。当有消费者调用subscribe()方法时,这个函数就会执行。

1
2
3
4
5
6
7
8
9
10
11
12
import { Observable } from 'rxjs';
const observable = new Observable(function subscriber(subscriber) {
try {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete(); // 完成
subscriber.next(4); // 不会发出
} catch (err) {
subscriber.error(err); // 捕获错误
}
})

不过,我们一般情况下不使用这种形式创建Observable,而是使用creation functions,如of()fromEvent()interval()等等。

Observables can be created with new Observable. Most commonly, observables are created using creation functions, like of, from, interval, etc.

注意,complete()后,后面的值将不会再发出。自定义subscriber函数时,最好将代码用try/catch包裹,处理错误。

订阅Subscribing

只有当有人订阅 Observable 的实例时,它才会开始发布值。 订阅时要先调用该实例的 subscribe() 方法,并把一个观察者对象observer传给它,用来接收通知。observer定义了收到数据时的处理器handler。同时subscribe()的调用会返回一个Subscription对象,这个对象有一个unsubscribe()方法,用来取消订阅。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
// 观察者对象
const observer = {
next: x => console.log('next value: ' + x),
error: err => console.error('error: ' + err),
complete: () => console.log('complete'),
}
// 第一次订阅
observable.subscribe(observer);
// next value: 1
// next value: 2
// next value: 3
// complete

// 第二次订阅
observable.subscribe(observer);
// next value: 1
// next value: 2
// next value: 3
// complete

订阅后,Observable发出值,observer接收值,处理值。注意:多次订阅互不影响。

Subscribing to an Observable is like calling a function, providing callbacks where the data will be delivered to.

另外,订阅时还可以直接传入回调函数,next 处理器都是必要的,而 errorcomplete 处理器是可选的,注意传入顺序。

1
2
3
4
5
6
7
8
9
observable.subscribe(
x => console.log('next value: ' + x),
err => console.error('error: ' + err),
() => console.log('complete')
);
// next value: 1
// next value: 2
// next value: 3
// complete

每当调用subscribe()方法时,都会返回一个Subscription对象,这个对象有一个unsubscribe()方法,用来取消订阅。类似于setInterval()setTimeout()返回id的形式。

1
2
3
const subscription = observable.subscribe(observer);
// 之后取消订阅
subscription.unsubscribe();

observer观察者

观察者对象observer定义了一些回调函数来处理可观察对象Observable可能发来的三种通知:

通知类型 说明
next 必要。用来处理每个送达值。在开始执行后可能执行零次或多次。
error 可选。用来处理错误通知。错误会中断这个可观察对象实例的执行过程。
complete 可选。用来处理执行完毕(complete)通知。当执行完毕后,这些值就会继续传给下一个处理器。

以上就是Observable的一些基础知识点,简单记忆就是:

1
2
const subscription = new Observable(subscriber).subscribe(observer);
subscription.unsubscribe();

接下来我们会先总结一下Observable的创建函数和操作符,RxJS繁琐就在于creation functionoperators特别多,需要费点时间去记忆和理解,但也正因为这样,RxJS在处理流数据的时候才能“因材施教”,得心应手。过完这些知识点后,我们再去总结SubjectScheduler这两个难搞的点。