这两周拖延症又又又犯了,本来这篇博客是要上周末完结的,结果报了健身房,运动回到家就是懒得开电脑写。这一拖公司开发任务又接踵而来,预感到这个周末是未来一个月最闲的周末了,一大早就赶紧开电脑开始写这篇博客。今天来总结RxJS
中最重要的一个知识点—Subject
。
概念
什么是Subject
?Subject
能做什么?简单来说,Subject
是Observable
和Observer
的混合体,它既可以作为一个Observable
被多个观察者订阅,又可以作为一个观察者(Observer)去订阅一个源Observable
。前面我们说的Observable
作为数据源,每当它被一个Observer
订阅的时候,都会产生一个全新的数据流,多个观察者之间的数据是相互隔离的,互不影响。而Subject
的观察者,或者说订阅者,是共享这一个数据源的,称为”多播”。
举个例子,中央电视台每晚7点播送新闻联播。小明7点准时打开电视机开始收看新闻,而小红7点15分才打开电视机,打开电视机相当于一个订阅subscribe
的动作,那么小红看到的新闻必然是从第15分钟开始的,在这之后小明小红收看的新闻(数据)是一样的。转换为代码,如果我们使用普通的Observable
去实现,那么所有人不管何时打开电视,看到的都会是7点开始的新闻(类似录像),就失去了一个同步性和实时性。这个时候我们就可以用Subject
来实现。
1 | import { Subject } from 'rxjs'; |
这个例子中,new Subject()
返回一个Observable
对象,先被A
订阅,随后调用subject.next(1)
发出值1,接着被B
订阅,然后发出值2。此时B
观察者只会收到它订阅subject
后发出的值。
1 | import { Subject, from } from 'rxjs'; |
这个例子中,subject
作为观察者Observer
,订阅了数据源source
。然后AB两个观察者都收到了数据,这样就通过Subject
实现了多播
。
Hot 和 Cold 数据流
如果你在网上查阅关于Subject
的资料,肯定会注意到两个名词:Hot Observable
和Cold Observable
。其实这两个名词并没有什么高深的含义,Hot Observable
指的是无论多少Observer
来订阅,推给Observer
数据的都是来自一样的数据源;而Cold Observable
则是说每次被订阅的时候都产生一个全新的数据序列数据流。上面的例子就是使用Subject
将一个Cold Observable
(form([1, 2])转为一个Hot Observable
(subject.subscribe())。
不能重复使用
Subject
不像普通的Observable
一样每次订阅都会产生一个新的数据流,所以根据订阅Subject
的时机不同会有不同的效果。总的来说,当一个Subject
调用了complete()
或者error()
方法,后续再有Observer
订阅了这个Subject()
,即使再使用subject.next(val)
发送值,后续的Observer
也收不到了,只会收到complete()
或者error()
消息。
1 | import { Subject, from } from 'rxjs'; |
以上,观察者B在subject
完结之后订阅,只会收到前面subject
完结的通知。同样,当subjcet
调用error()
方法,下面的观察者也只会收到error
消息。
这里顺便提一句,如果Subject
有多个订阅者,那么当任一订阅者产生一个错误异常,这个异常没有被正确处理的话,其他订阅者都会失败。相当于这个错误”阻塞”掉了其他Observer
正常接收数据。所以最稳妥的方式是每一个Observer
都具备处理错误异常的能力。
三个基类 BehaviorSubject、ReplaySubject、AsyncSubject
BehaviorSubject
可以提供一个初始值(默认值),当第一个Observer
订阅它的时候,BehaviorSubject
会立即发出这个初始值,并且接下来BehaviorSubject
发出的值都会不断替换这个初始值,这样之后新的Observer
来订阅时,都会拿到最新的值。
1 | import { BehaviorSubject } from 'rxjs'; |
ReplaySubject
接收一个number
类型参数,可以缓存N
个最新的值同步发送给新的订阅者。好像回放电影一样把指定个数的值发给后续的订阅者。
1 | import { ReplaySubject } from 'rxjs'; |
AsyncSubject
只有当数据源complete()
的时候,才会把最后一个值发送给订阅者。
1 | import { AsyncSubject } from 'rxjs'; |
多播操作符
RxJS
使用当中有很多场景需要将Cold Observable
转为Hot Observable
,如果都使用上面例子的方法,先新建subject
对象,订阅数据源,再用Observer
订阅这个subject
未免有点麻烦。RxJS
提供了几个操作符来简化这个过程。
multicast
multicast
是一个比较底层或者说基础的操作符,一般情况下不会直接使用它。但一些高级的多播操作符都是基于它来实现,还是需要了解一下。multicast()
能够将上游Observable
转换为Hot Observable
:
1 | const hotObservable = coldObservable.pipe(multicast(new Subject())); |
需要注意的是,现在订阅hotObservable
并不会产生值,还需要调用一个connect()
方法才会真正将Hot
和Cold
连接起来。下面看一个完整的例子。
1 | import { from, Subject } from 'rxjs'; |
以上的例子都没有考虑取消订阅的情况,那么如果我们想取消订阅应该怎么办呢?显然,上面这个例子有multicasted
、ObservableA
和ObservableB
三个Observable
。取消订阅时也需要考虑multicasted
。但大部分情况下,我们希望这个Observable
智能一点:当有Observer
订阅它时,multicasted
作为中间人,自动去订阅上方源数据,开始发出数据;当最后一个Observer
退订时,multicasted
也自动退订上方数据源。
显然,我们需要一个计数器,自动跟踪Observer
的数量来进行订阅和退订动作。RxJS
提供了refCount()
这个方法来实现这个功能。
1 | import { from, Subject } from 'rxjs'; |
以上,我们直接在multicast()
后使用了refCount()
方法,下面就不需要再调用connect()
,ObserverA
就可以正常的收到值。但是这个例子又引入了另外一个问题:ObserverB
呢?
首先,ObserverA
可以正常收到数据是没有疑问的,因为源数据是同步发出的,故在ObserverA
订阅multicasted
后,3个值同时发了出来,然后就complete()
了,所以之后ObserverB
的订阅将收不到任何值,只会收到complete()
的消息。其次我们来想一个问题:引入refCount()
的初衷就是为了当有Observer
订阅的时候发出值,Observer
退订的时候自动退订,如果像这个例子一样,ObserverA
订阅了一次数据,过了相当久的一段时间,ObserverB
又来订阅取值,却发现无值可取,这样来看,refCount()
存在的意义是不是太弱了?倘若我们想要ObserverB
也能订阅到值呢?
其实,multicast()
除了可以直接接收一个new Subject()
对象外,还可以接收一个返回Subject
对象的函数作为参数,即multicast(() => new Subject())
。如果接受一个Subject()
工厂函数(就是() => new Subject()
),则可以实现上面说的效果,因为源数据complete()
之后,ObserverB
再来订阅,multicast()
使用工厂函数重新创建了一个新的Subject()
,相当于重新订阅了源数据。
1 | import { from, Subject } from 'rxjs'; |
简单总结:multicast()
接收new Subject()
或者() => new Subject()
工厂函数,区别在于源数据complete()
之后能否重新订阅数据源;refCount()
是一个计数器,使”中间件”Subject
自动订阅或取消订阅上游数据源。一般情况下,如果同时使用.pipe(multicast(subject), refCount())
,则subject
一般会表现为工厂函数形式。
publish
publish()
其实是multicast(subject)
的简写,接收一个可选参数selector
,这个selector
是干什么的,我也没太搞懂,这里提一下这个可选参数的原因在于,它决定了multicast(subject)
中subject
的形式:
publish() --> multicast(new Subject())
publish(selector) --> multicast(() => new Subject())
这里注意,publish()
只是multicast(subject)
的简写,可没有refCount()
。
上面我们讲,Subject()
有三个基类:BehaviorSubject()
、ReplaySubject()
和AsyncSubject()
。于此对应,publish()
也有三个增强方法:
publishBehavior
就是multicast(new BehaviorSubject())
的组合。publishReplay
就是multicast(new ReplaySubject())
的组合。publishLast
就是multicast(new AsyncSubject())
的组合。
share
share()
是multicast(() => new Subject()), refCount()
的简写。
注意,这里multicast
接收的是subject
的工厂函数。而且,最最重要的一点,share()
不是publish(), refCount()
的组合,这是很多教程犯的一个错误。不要将share()
和publish()
联系到一块儿就好。
另外一个需要注意的点是,share()
的内部使用了Subject()
,如果需要BehaviorSubject()
等其他subject
类,则需要使用与之对应的publish
方法。
好了,今天总结了一下Subject
的相关知识点,在实际开发中经常会使用到这些思想和方法。接下来会找时间简单总结一下Scheduler
的知识点,虽说它和Observable
、Subject
并称为三大知识点,但感觉门槛高,用的比较少。