RxJS学习总结-Subject

这两周拖延症又又又犯了,本来这篇博客是要上周末完结的,结果报了健身房,运动回到家就是懒得开电脑写。这一拖公司开发任务又接踵而来,预感到这个周末是未来一个月最闲的周末了,一大早就赶紧开电脑开始写这篇博客。今天来总结RxJS中最重要的一个知识点—Subject

概念

什么是SubjectSubject能做什么?简单来说,SubjectObservableObserver的混合体,它既可以作为一个Observable被多个观察者订阅,又可以作为一个观察者(Observer)去订阅一个源Observable。前面我们说的Observable作为数据源,每当它被一个Observer订阅的时候,都会产生一个全新的数据流,多个观察者之间的数据是相互隔离的,互不影响。而Subject的观察者,或者说订阅者,是共享这一个数据源的,称为”多播”。

举个例子,中央电视台每晚7点播送新闻联播。小明7点准时打开电视机开始收看新闻,而小红7点15分才打开电视机,打开电视机相当于一个订阅subscribe的动作,那么小红看到的新闻必然是从第15分钟开始的,在这之后小明小红收看的新闻(数据)是一样的。转换为代码,如果我们使用普通的Observable去实现,那么所有人不管何时打开电视,看到的都会是7点开始的新闻(类似录像),就失去了一个同步性和实时性。这个时候我们就可以用Subject来实现。

1
2
3
4
5
6
7
8
9
10
11
12
import { Subject } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe(val => console.log('ObserverA: ' + val));
subject.next(1);
subject.subscribe(val => console.log('ObserverB: ' + val));
subject.next(2);

// ObserverA: 1
// ObserverA: 2
// ObserverB: 2

这个例子中,new Subject()返回一个Observable对象,先被A订阅,随后调用subject.next(1)发出值1,接着被B订阅,然后发出值2。此时B观察者只会收到它订阅subject后发出的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
import { Subject, from } from 'rxjs';

const subject = new Subject<number>();
const source = from([1, 2]);

subject.subscribe(val => console.log('ObserverA: ' + val));
subject.subscribe(val => console.log('ObserverB: ' + val));

source.subscribe(subject);
// ObserverA: 1
// ObserverB: 1
// ObserverA: 2
// ObserverB: 2

这个例子中,subject作为观察者Observer,订阅了数据源source。然后AB两个观察者都收到了数据,这样就通过Subject实现了多播

Hot 和 Cold 数据流

如果你在网上查阅关于Subject的资料,肯定会注意到两个名词:Hot ObservableCold Observable。其实这两个名词并没有什么高深的含义,Hot Observable指的是无论多少Observer来订阅,推给Observer数据的都是来自一样的数据源;而Cold Observable则是说每次被订阅的时候都产生一个全新的数据序列数据流。上面的例子就是使用Subject将一个Cold Observable(form([1, 2])转为一个Hot Observable(subject.subscribe())。

不能重复使用

Subject不像普通的Observable一样每次订阅都会产生一个新的数据流,所以根据订阅Subject的时机不同会有不同的效果。总的来说,当一个Subject调用了complete()或者error()方法,后续再有Observer订阅了这个Subject(),即使再使用subject.next(val)发送值,后续的Observer也收不到了,只会收到complete()或者error()消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import { Subject, from } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe(
val => console.log('ObserverA: ' + val),
error => console.log(error),
() => console.log('Acomplete')
);

subject.next(1);
subject.next(2);
subject.complete();

subject.subscribe(
val => console.log('ObserverB: ' + val),
error => console.log(error),
() => console.log('Bcomplete')
);
subject.next(3); // 即使不调用,也会打印Bcomplete
// ObserverA: 1
// ObserverA: 2
// Acomplete
// Bcomplete

以上,观察者B在subject完结之后订阅,只会收到前面subject完结的通知。同样,当subjcet调用error()方法,下面的观察者也只会收到error消息。

这里顺便提一句,如果Subject有多个订阅者,那么当任一订阅者产生一个错误异常,这个异常没有被正确处理的话,其他订阅者都会失败。相当于这个错误”阻塞”掉了其他Observer正常接收数据。所以最稳妥的方式是每一个Observer都具备处理错误异常的能力。

三个基类 BehaviorSubject、ReplaySubject、AsyncSubject

BehaviorSubject可以提供一个初始值(默认值),当第一个Observer订阅它的时候,BehaviorSubject会立即发出这个初始值,并且接下来BehaviorSubject发出的值都会不断替换这个初始值,这样之后新的Observer来订阅时,都会拿到最新的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import { BehaviorSubject } from 'rxjs';

const subject = new BehaviorSubject(1);

subject.subscribe(
val => console.log('ObserverA: ' + val)
);

subject.next(2);
subject.next(3);

subject.subscribe(
val => console.log('ObserverB: ' + val)
);
// ObserverA: 1 A一订阅,就拿到了默认值1
// ObserverA: 2
// ObserverA: 3
// ObserverB: 3 B一订阅,就拿到了BehaviorSubject的最新值3

ReplaySubject接收一个number类型参数,可以缓存N个最新的值同步发送给新的订阅者。好像回放电影一样把指定个数的值发给后续的订阅者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import { ReplaySubject } from 'rxjs';

const subject = new ReplaySubject(2);

subject.subscribe(
val => console.log('ObserverA: ' + val)
);

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe(
val => console.log('ObserverB: ' + val)
);
subject.subscribe(
val => console.log('ObserverC: ' + val)
);
// ObserverA: 1
// ObserverA: 2
// ObserverA: 3
// ObserverA: 4
// ObserverB: 3 B一订阅,就拿到了ReplaySubject缓存的最新两个值3、4
// ObserverB: 4
// ObserverC: 3 C一订阅,就拿到了ReplaySubject缓存的最新两个值3、4
// ObserverC: 4

AsyncSubject只有当数据源complete()的时候,才会把最后一个值发送给订阅者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import { AsyncSubject } from 'rxjs';

const subject = new AsyncSubject();

subject.subscribe(
val => console.log('ObserverA: ' + val)
);

subject.next(1);
subject.next(2);
subject.next(3);
subject.complete();

subject.subscribe(
val => console.log('ObserverB: ' + val)
);

// ObserverA: 3
// ObserverB: 3

多播操作符

RxJS使用当中有很多场景需要将Cold Observable转为Hot Observable,如果都使用上面例子的方法,先新建subject对象,订阅数据源,再用Observer订阅这个subject未免有点麻烦。RxJS提供了几个操作符来简化这个过程。

multicast

multicast是一个比较底层或者说基础的操作符,一般情况下不会直接使用它。但一些高级的多播操作符都是基于它来实现,还是需要了解一下。multicast()能够将上游Observable转换为Hot Observable

1
const hotObservable = coldObservable.pipe(multicast(new Subject()));

需要注意的是,现在订阅hotObservable并不会产生值,还需要调用一个connect()方法才会真正将HotCold连接起来。下面看一个完整的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import { from, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';

const source = from([1, 2, 3]);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));

// 这里相当于 subject.subscribe({...})
let ObservableA = multicasted.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
let ObservableB = multicasted.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});

// 这里相当于 source.subscribe(subject)
multicasted.connect();

以上的例子都没有考虑取消订阅的情况,那么如果我们想取消订阅应该怎么办呢?显然,上面这个例子有multicastedObservableAObservableB三个Observable。取消订阅时也需要考虑multicasted。但大部分情况下,我们希望这个Observable智能一点:当有Observer订阅它时,multicasted作为中间人,自动去订阅上方源数据,开始发出数据;当最后一个Observer退订时,multicasted也自动退订上方数据源。

显然,我们需要一个计数器,自动跟踪Observer的数量来进行订阅和退订动作。RxJS提供了refCount()这个方法来实现这个功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import { from, Subject } from 'rxjs';
import { multicast, refCount } from 'rxjs/operators';

const source = from([1, 2, 3]);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject), refCount());

let ObservableA = multicasted.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
let ObservableB = multicasted.subscribe(
(v) => console.log(`observerB: ${v}`),
error => console.log(error),
() => console.log('complete')
);
// 不再需要调用connect()方法
// ObserverA: 1
// ObserverA: 2
// ObserverA: 3
// complete

以上,我们直接在multicast()后使用了refCount()方法,下面就不需要再调用connect()ObserverA就可以正常的收到值。但是这个例子又引入了另外一个问题:ObserverB呢?

首先,ObserverA可以正常收到数据是没有疑问的,因为源数据是同步发出的,故在ObserverA订阅multicasted后,3个值同时发了出来,然后就complete()了,所以之后ObserverB的订阅将收不到任何值,只会收到complete()的消息。其次我们来想一个问题:引入refCount()的初衷就是为了当有Observer订阅的时候发出值,Observer退订的时候自动退订,如果像这个例子一样,ObserverA订阅了一次数据,过了相当久的一段时间,ObserverB又来订阅取值,却发现无值可取,这样来看,refCount()存在的意义是不是太弱了?倘若我们想要ObserverB也能订阅到值呢?

其实,multicast()除了可以直接接收一个new Subject()对象外,还可以接收一个返回Subject对象的函数作为参数,即multicast(() => new Subject())。如果接受一个Subject()工厂函数(就是() => new Subject()),则可以实现上面说的效果,因为源数据complete()之后,ObserverB再来订阅,multicast()使用工厂函数重新创建了一个新的Subject(),相当于重新订阅了源数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import { from, Subject } from 'rxjs';
import { multicast, refCount } from 'rxjs/operators';

const source = from([1, 2, 3]);
const subject = () => new Subject(); // 注意这里
const multicasted = source.pipe(multicast(subject), refCount());

let ObservableA = multicasted.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
let ObservableB = multicasted.subscribe(
v => console.log(`observerB: ${v}`),
err => console.log(err),
() => console.log('complete')
);
// ObserverA: 1
// ObserverA: 2
// ObserverA: 3
// ObserverB: 1
// ObserverB: 2
// ObserverB: 3
// complete

简单总结:multicast()接收new Subject()或者() => new Subject()工厂函数,区别在于源数据complete()之后能否重新订阅数据源;refCount()是一个计数器,使”中间件”Subject自动订阅或取消订阅上游数据源。一般情况下,如果同时使用.pipe(multicast(subject), refCount()),则subject一般会表现为工厂函数形式。

publish

publish()其实是multicast(subject)的简写,接收一个可选参数selector,这个selector是干什么的,我也没太搞懂,这里提一下这个可选参数的原因在于,它决定了multicast(subject)subject的形式:

  • publish() --> multicast(new Subject())

  • publish(selector) --> multicast(() => new Subject())

这里注意,publish()只是multicast(subject)的简写,可没有refCount()

上面我们讲,Subject()有三个基类:BehaviorSubject()ReplaySubject()AsyncSubject()。于此对应,publish()也有三个增强方法:

  1. publishBehavior就是multicast(new BehaviorSubject())的组合。
  2. publishReplay就是multicast(new ReplaySubject())的组合。
  3. publishLast就是multicast(new AsyncSubject())的组合。

share

share()multicast(() => new Subject()), refCount()的简写。

注意,这里multicast接收的是subject的工厂函数。而且,最最重要的一点,share()不是publish(), refCount()的组合,这是很多教程犯的一个错误。不要将share()publish()联系到一块儿就好。

另外一个需要注意的点是,share()的内部使用了Subject(),如果需要BehaviorSubject()等其他subject类,则需要使用与之对应的publish方法。

好了,今天总结了一下Subject的相关知识点,在实际开发中经常会使用到这些思想和方法。接下来会找时间简单总结一下Scheduler的知识点,虽说它和ObservableSubject并称为三大知识点,但感觉门槛高,用的比较少。