最近几周趁年底年初工作事情少,把RxJS的基础知识学习了一下,算是简单入了个门,近期准备写几篇博客总结一下RxJS的相关知识点,给自己找点事情做。
概述
什么是RxJS?简单来说RxJS是响应式编程技术的JavaScript实现。响应式编程Reactive Extension,也叫ReactiveX,简称Rx,用官网上的一句话解释就是:
An API for asynchronous programming with observable streams
其中,Observable是一种模式,有点像Observer Pattern观察者模式和Iterator Pattern迭代器模式的结合,而streams指的是数据流。Rx最早的实现是Rx.NET,之后又陆陆续续有了RxJava、RxPy等语言的实现。那RxJS当然就是Rx的JavaScript实现了。
不过今天先不细说RxJS的一些具体细节,这两天在网上找到一篇比较有趣的博客:使用RxJS来简单实现一个“贪吃蛇”小游戏,感觉很有趣,就由此作为RxJS入门学习的引子,看看如何用RxJS的思维方式去思考这个游戏的实现。先看一下游戏运行的预览图:

设计&思考
首先可以确定的是整个游戏界面都是使用canvas来进行绘制的,游戏中的“蛇”、“苹果”(红色块儿)和分数(动图未展示)都是canvas用进来的数据流streams来进行绘制的,而这些流的产生及走向就是我们要思考的。在响应式编程中,编程无外乎数据流及输入数据流。从概念上来说,当响应式编程执行时,它会建立一套可观察的管道,可以根据变化采取行动。我们需要理清数据源头,找到它,生成它,进而处理整合它。先来概览一下这个游戏的“流”:

我们反向来进行思考。首先,canvas绘制的数据流来自于scene$流,这个流是由snake$蛇、apples$苹果和score$分数三个流合成的,但其中apples$流又来源于snake$。其次,snake$流受到键盘的操作输入direction$、蛇本身的速度ticks$和蛇长度snakeLength$的影响,这三者构成了snake$流的数据。再看score$分数流,每当蛇的长度改变时,score$就会改变,发出数据;最后,当我们判断到蛇吃到了苹果,apples$流就会发出数据,同时通过appleEaten$使length$执行length$.next(point),让蛇长度snakeLength$进行改变,进而触发一系列的数据发出。
仔细看,里面的源头只有direction$键盘方向事件、snakeLength$和ticks$定时器三个流。
实际操作
canvas游戏布局
这部分主要涉及到使canvas绘制一个游戏区域,细节略过,简单贴一下代码,博客最后有超链接指向源博客,有详细的说明。
1 | // html |
键盘事件direction$流
RxJS提供了一个fromEvent()操作符用来将事件转换为数据流,我们就用它来将玩家按键输入转为流数据,同时使用一些map()、filter()等操作符,来处理一下,确保只输出上下左右按键的值:
1 | import { fromEvent } from 'rxjs'; |
页面中现在什么也没有,随便点按键盘按键,上下左右会输出值,其他键的输入会被忽略:

这里有个问题,我们的蛇是不能进行反方向运动的,所以我们需要忽略掉反向的输入;同时对于连续相同方向的输入,我们也应该忽略掉。这样的话我们就需要记录下键盘最后的输入状态,不过这里最好还是不要引入外部变量,全部交由Observable管道去处理,这里就要用到scan操作符,类似于js中的reduce()方法。同时对于相同方向的输入,我们使用distinctUntilChangede()来处理:
1 | function nextDirection(previous, next) { |
这次,相同方向或者相反方向的输入都已经被处理(忽略)掉了。看一下direction$的Marbles图:

snakeLength$蛇长流
我们再回头看一下上面的snake流图,snakeLength$记录的是蛇当前的长度,发出的是蛇的长度的数字流。这个流基于(或者说来源于)length$流,length$记录的是蛇每次吃到苹果时增加的长度(也是增加的分数),这时候我们需要手动把值emit出去,使用BehaviorSubject()来实现。当length$发出值,即使用length$.next()来给Subject()提供值,这个值就会在snakeLength$上发出。我们需要snakeLength$的值累加上length$的值:
1 | const SNAKE_LENGTH = 5; |
注意,这里使用share()来允许多次订阅Observable,否则每次订阅都会重新创建源Observable。我们的snakeLength$同时被snake$流和score$流订阅,需要确保这两个流的获取的数据是同步的。
snake$流
到目前,direction$和snakeLength$都已确定,还差一个ticks$就可以组成snake$流。ticks$是一个类计时器的东西,限定蛇的移动速度,抽象来讲就是我们需要一个按时间规律发出值的Observable,interval()操作符可以完成这个任务:
1 | const SPEED = 120; |
从ticks$到最终的snake$,我们需要继续思考:每当ticks发出值,蛇继续前进还是增加身长,取决于是否吃到了苹果,我们可以使用scan()操作符去积累蛇身的长度。但接下来如何组合direction$、snakeLength$和ticks$?
withLatestFrom()可以完美实现我们的需求,我这里直接贴上官方文档的英文释义,通俗易懂:
Combines the source Observable with other Observables to create an Observable whose values are calculated from the latest values of each, only when the source emits.
withLatestFrom()组合多个Observables,并且具有主从关系,非常适合这里当蛇移动的时候去组合direction$和snakeLength$:
1 | import { move, generateSnake } from './utils'; |
我们主要的源 Observable 是 ticks$,每当管道上有新值发出,我们就取 direction$ 和 snakeLength$ 的最新值。注意,即使辅助流频繁地发出值(例如,玩家头撞键盘上),也只会在每次定时器发出值时处理数据。
此外,我们给 withLatestFrom 传入了选择器函数,当主要的流产生值时才会调用此函数。此函数是可选的,如果不传,将会生成包含所有元素的列表。来看下Marbles图:

可以看到,使用了withLatestFrom()后,只有当ticks$发出值时,才会去direction$和snakeLength$中取最近的值一起发出,然后经过scan()处理,生成蛇身的数组对象形式的流。
score$流
玩家的比分比较好处理,有了snakeLength$我们直接使用scan()积累一下分数即可,每当snakeLength$发出值,累加一次分数:
1 | const POINTS_PER_APPLE = 1; // 每次一分 |
来看下Marbles图:

这里看似简单,其实有点小坑。可以看到score$中没有出现snakeLength$流中数据5对应的分数,这是因为snakeLength$使用了share()来允许多次订阅它,同时snake$已经先行订阅了snakeLength$,这时候score$的订阅就只会先发出startWith(0)初始化的数据0,只有等到length$.next(POINT)发出值,snakeLength$发出6,score$才会同步发出分数1,仔细体会一下。
apples$流
snake$和score$已经搞定,剩下的就是apples$流。先确定一下apples$的数据格式,我们需要在canvas中同时显示两个苹果,同时,如果蛇吃到了苹果,则会在页面中非蛇身的位置随机产生另外一个苹果。所以我们使用[{x: *, y: *}, {x: **, y: **}]数组对象的格式来表示苹果。每次蛇移动时都检查是否有碰撞。如果有碰撞,我们就生成一个新的苹果并返回一个新的数组。这样的话我们便可以利用 distinctUntilChanged() 来过滤掉完全相同的值。
1 | let apples$ = snake$.pipe( |
每当 apples$ 产生一个新值时,我们就可以假定蛇吞掉了一个苹果。剩下要做的就是增加比分,还要将此事件通知给其他流,比如 snake$,它从 snakeLength$ 中获取最新值,以确定是否将蛇的身体变长。
appleEaten$流
我们在上面的eat方法中检测到了“碰撞”,即蛇吃到了苹果。这时候我们应该调用length$.next(POINTS_PER_APPLE)去增加蛇的长度snakeLength$,但是我们把eat工具方法整合到了utils.ts文件内,无法调用到length$.next(),这个时候我们可以引入一个中间流,让它帮我们通知。
appleEaten$扮演了这个角色,首先,因为appleEaten$订阅的是apples$流,apples$刚开始就有一个初始值发出,我们需要跳过这个初始值,不然一开局比分就会增加,显然不行;其次,appleEaten$只负责扮演通知者的角色,它只负责通知其他的流,而不会有观察者来订阅它。因此,我们需要手动订阅。
1 | let appleEaten$ = apples$.pipe( |
scene$流
回头再看一次上面的snake流图,基本上所有的流都已完成,而且形成了一整套闭环。只差之后的的整合流scene$了。先来看下代码:
1 | let scene$ = combineLatest(snake$, apples$,score$, (snake, apples, score) => ({ snake, apples, score })); |
与 withLatestFrom 不同的是,我们不会限制辅助流,我们关心每个输入 Observable 产生的新值。最后一个参数还是选择器函数,我们将所有数据组合成一个表示游戏状态的对象,并将对象返回。游戏状态包含了 canvas 渲染所需的所有数据。

性能维护
无论是游戏,还是 Web 应用,性能都是我们所追求的。性能的意义重大,但就我们的游戏而言,我们希望每秒重绘整个场景 60 次。所以我们需要另外一个定时器去限制渲染频率:
1 | // interval 接收以毫秒为单位的时间周期,这也就是为什么我们要用 1000 来除以 FPS |
问题是 JavaScript 是单线程的。最糟糕的情况是,我们阻止浏览器执行任何操作,导致其锁定。换句话说,浏览器可能无法快速处理所有这些更新。原因是浏览器正在尝试渲染一帧,然后立即被要求渲染下一帧。作为结果,它会抛下当前帧以维持速度。这时候动画就开始看上去有些不流畅了。
幸运的是,我们可以使用 requestAnimationFrame 来允许浏览器对任务进行排队,并在最合适的时间执行任务。但是,我们如何在 Observable 管道中使用呢?好消息是包括 interval() 在内的众多操作符都接收 Scheduler (调度器) 作为最后的参数。总而言之,Scheduler 是一种调度将来要执行的任务的机制。
虽然 RxJS 提供了多种调度器,但我们关心的是名为 animationFrame 的调度器。此调度器在 window.requestAnimationFrame触发时执行任务。
1 | const game$ = Observable.interval(1000 / FPS, animationFrame) |
现在interval大概每 16ms 发出一次值,从而保持 FPS 在 60 左右。
场景渲染
最后,我们把game$和scene$组合起来。game$作为主要流,每隔1000/FPS毫秒时,根据scene$数据渲染一次页面,所以使用withLatestFrom()组合:
1 | const game$ = interval(1000 / FPS, animationFrame) |
至此,我们就使用RxJS完成了整个游戏的核心部分,完全使用响应式编程,没有依赖任何外部状态。
这里是在线试玩的 demo。
总结&资料引入
在花了两三周时间把RxJS的基础概念和操作符都过了一遍之后,急需一个实例demo来将自己所学的RxJS知识进行转化,只有真正的操作过,使用过,才能更深入的理解Observable和响应式编程的理念。最后在知乎上找到了一篇RxJS 游戏之贪吃蛇的专栏文章,自己跟着做了一遍,反复思考其中流的起源、转换和走向。这篇博客基本上是按照RxJS 游戏之贪吃蛇一文中的思路整理的,其中加入了我自己的思考过程,极力推荐阅读原文。以下是本文的参考资料:
RxJS 游戏之贪吃蛇、RxJS官方api文档、30 天精通 RxJS、RxJS Marbles、学习 RxJS 操作符、贪吃蛇源码
以上的链接也是我最近几周所看所学的来源。接下来会写几篇博客进行知识点的总结,掺杂一些自己的思考,巩固自己所学,加深印象。