这两周拖延症又又又犯了,本来这篇博客是要上周末完结的,结果报了健身房,运动回到家就是懒得开电脑写。这一拖公司开发任务又接踵而来,预感到这个周末是未来一个月最闲的周末了,一大早就赶紧开电脑开始写这篇博客。今天来总结RxJS中最重要的一个知识点—Subject。
概念
什么是Subject?Subject能做什么?简单来说,Subject是Observable和Observer的混合体,它既可以作为一个Observable被多个观察者订阅,又可以作为一个观察者(Observer)去订阅一个源Observable。前面我们说的Observable作为数据源,每当它被一个Observer订阅的时候,都会产生一个全新的数据流,多个观察者之间的数据是相互隔离的,互不影响。而Subject的观察者,或者说订阅者,是共享这一个数据源的,称为”多播”。
举个例子,中央电视台每晚7点播送新闻联播。小明7点准时打开电视机开始收看新闻,而小红7点15分才打开电视机,打开电视机相当于一个订阅subscribe的动作,那么小红看到的新闻必然是从第15分钟开始的,在这之后小明小红收看的新闻(数据)是一样的。转换为代码,如果我们使用普通的Observable去实现,那么所有人不管何时打开电视,看到的都会是7点开始的新闻(类似录像),就失去了一个同步性和实时性。这个时候我们就可以用Subject来实现。
1 | import { Subject } from 'rxjs'; |
这个例子中,new Subject()返回一个Observable对象,先被A订阅,随后调用subject.next(1)发出值1,接着被B订阅,然后发出值2。此时B观察者只会收到它订阅subject后发出的值。
1 | import { Subject, from } from 'rxjs'; |
这个例子中,subject作为观察者Observer,订阅了数据源source。然后AB两个观察者都收到了数据,这样就通过Subject实现了多播。
Hot 和 Cold 数据流
如果你在网上查阅关于Subject的资料,肯定会注意到两个名词:Hot Observable和Cold Observable。其实这两个名词并没有什么高深的含义,Hot Observable指的是无论多少Observer来订阅,推给Observer数据的都是来自一样的数据源;而Cold Observable则是说每次被订阅的时候都产生一个全新的数据序列数据流。上面的例子就是使用Subject将一个Cold Observable(form([1, 2])转为一个Hot Observable(subject.subscribe())。
不能重复使用
Subject不像普通的Observable一样每次订阅都会产生一个新的数据流,所以根据订阅Subject的时机不同会有不同的效果。总的来说,当一个Subject调用了complete()或者error()方法,后续再有Observer订阅了这个Subject(),即使再使用subject.next(val)发送值,后续的Observer也收不到了,只会收到complete()或者error()消息。
1 | import { Subject, from } from 'rxjs'; |
以上,观察者B在subject完结之后订阅,只会收到前面subject完结的通知。同样,当subjcet调用error()方法,下面的观察者也只会收到error消息。
这里顺便提一句,如果Subject有多个订阅者,那么当任一订阅者产生一个错误异常,这个异常没有被正确处理的话,其他订阅者都会失败。相当于这个错误”阻塞”掉了其他Observer正常接收数据。所以最稳妥的方式是每一个Observer都具备处理错误异常的能力。
三个基类 BehaviorSubject、ReplaySubject、AsyncSubject
BehaviorSubject可以提供一个初始值(默认值),当第一个Observer订阅它的时候,BehaviorSubject会立即发出这个初始值,并且接下来BehaviorSubject发出的值都会不断替换这个初始值,这样之后新的Observer来订阅时,都会拿到最新的值。
1 | import { BehaviorSubject } from 'rxjs'; |
ReplaySubject接收一个number类型参数,可以缓存N个最新的值同步发送给新的订阅者。好像回放电影一样把指定个数的值发给后续的订阅者。
1 | import { ReplaySubject } from 'rxjs'; |
AsyncSubject只有当数据源complete()的时候,才会把最后一个值发送给订阅者。
1 | import { AsyncSubject } from 'rxjs'; |
多播操作符
RxJS使用当中有很多场景需要将Cold Observable转为Hot Observable,如果都使用上面例子的方法,先新建subject对象,订阅数据源,再用Observer订阅这个subject未免有点麻烦。RxJS提供了几个操作符来简化这个过程。
multicast
multicast是一个比较底层或者说基础的操作符,一般情况下不会直接使用它。但一些高级的多播操作符都是基于它来实现,还是需要了解一下。multicast()能够将上游Observable转换为Hot Observable:
1 | const hotObservable = coldObservable.pipe(multicast(new Subject())); |
需要注意的是,现在订阅hotObservable并不会产生值,还需要调用一个connect()方法才会真正将Hot和Cold连接起来。下面看一个完整的例子。
1 | import { from, Subject } from 'rxjs'; |
以上的例子都没有考虑取消订阅的情况,那么如果我们想取消订阅应该怎么办呢?显然,上面这个例子有multicasted、ObservableA和ObservableB三个Observable。取消订阅时也需要考虑multicasted。但大部分情况下,我们希望这个Observable智能一点:当有Observer订阅它时,multicasted作为中间人,自动去订阅上方源数据,开始发出数据;当最后一个Observer退订时,multicasted也自动退订上方数据源。
显然,我们需要一个计数器,自动跟踪Observer的数量来进行订阅和退订动作。RxJS提供了refCount()这个方法来实现这个功能。
1 | import { from, Subject } from 'rxjs'; |
以上,我们直接在multicast()后使用了refCount()方法,下面就不需要再调用connect(),ObserverA就可以正常的收到值。但是这个例子又引入了另外一个问题:ObserverB呢?
首先,ObserverA可以正常收到数据是没有疑问的,因为源数据是同步发出的,故在ObserverA订阅multicasted后,3个值同时发了出来,然后就complete()了,所以之后ObserverB的订阅将收不到任何值,只会收到complete()的消息。其次我们来想一个问题:引入refCount()的初衷就是为了当有Observer订阅的时候发出值,Observer退订的时候自动退订,如果像这个例子一样,ObserverA订阅了一次数据,过了相当久的一段时间,ObserverB又来订阅取值,却发现无值可取,这样来看,refCount()存在的意义是不是太弱了?倘若我们想要ObserverB也能订阅到值呢?
其实,multicast()除了可以直接接收一个new Subject()对象外,还可以接收一个返回Subject对象的函数作为参数,即multicast(() => new Subject())。如果接受一个Subject()工厂函数(就是() => new Subject()),则可以实现上面说的效果,因为源数据complete()之后,ObserverB再来订阅,multicast()使用工厂函数重新创建了一个新的Subject(),相当于重新订阅了源数据。
1 | import { from, Subject } from 'rxjs'; |
简单总结:multicast()接收new Subject()或者() => new Subject()工厂函数,区别在于源数据complete()之后能否重新订阅数据源;refCount()是一个计数器,使”中间件”Subject自动订阅或取消订阅上游数据源。一般情况下,如果同时使用.pipe(multicast(subject), refCount()),则subject一般会表现为工厂函数形式。
publish
publish()其实是multicast(subject)的简写,接收一个可选参数selector,这个selector是干什么的,我也没太搞懂,这里提一下这个可选参数的原因在于,它决定了multicast(subject)中subject的形式:
publish() --> multicast(new Subject())publish(selector) --> multicast(() => new Subject())
这里注意,publish()只是multicast(subject)的简写,可没有refCount()。
上面我们讲,Subject()有三个基类:BehaviorSubject()、ReplaySubject()和AsyncSubject()。于此对应,publish()也有三个增强方法:
publishBehavior就是multicast(new BehaviorSubject())的组合。publishReplay就是multicast(new ReplaySubject())的组合。publishLast就是multicast(new AsyncSubject())的组合。
share
share()是multicast(() => new Subject()), refCount()的简写。
注意,这里multicast接收的是subject的工厂函数。而且,最最重要的一点,share()不是publish(), refCount()的组合,这是很多教程犯的一个错误。不要将share()和publish()联系到一块儿就好。
另外一个需要注意的点是,share()的内部使用了Subject(),如果需要BehaviorSubject()等其他subject类,则需要使用与之对应的publish方法。
好了,今天总结了一下Subject的相关知识点,在实际开发中经常会使用到这些思想和方法。接下来会找时间简单总结一下Scheduler的知识点,虽说它和Observable、Subject并称为三大知识点,但感觉门槛高,用的比较少。