最近几周趁年底年初工作事情少,把RxJS
的基础知识学习了一下,算是简单入了个门,近期准备写几篇博客总结一下RxJS
的相关知识点,给自己找点事情做。
概述
什么是RxJS
?简单来说RxJS
是响应式编程技术的JavaScript
实现。响应式编程Reactive Extension
,也叫ReactiveX
,简称Rx
,用官网上的一句话解释就是:
An API for asynchronous programming with observable streams
其中,Observable
是一种模式,有点像Observer Pattern观察者模式
和Iterator Pattern迭代器模式
的结合,而streams
指的是数据流。Rx最早的实现是Rx.NET
,之后又陆陆续续有了RxJava
、RxPy
等语言的实现。那RxJS
当然就是Rx的JavaScript
实现了。
不过今天先不细说RxJS
的一些具体细节,这两天在网上找到一篇比较有趣的博客:使用RxJS
来简单实现一个“贪吃蛇”小游戏,感觉很有趣,就由此作为RxJS
入门学习的引子,看看如何用RxJS
的思维方式去思考这个游戏的实现。先看一下游戏运行的预览图:
设计&思考
首先可以确定的是整个游戏界面都是使用canvas
来进行绘制的,游戏中的“蛇”、“苹果”(红色块儿)和分数(动图未展示)都是canvas
用进来的数据流streams
来进行绘制的,而这些流的产生及走向就是我们要思考的。在响应式编程中,编程无外乎数据流及输入数据流。从概念上来说,当响应式编程执行时,它会建立一套可观察的管道,可以根据变化采取行动。我们需要理清数据源头,找到它,生成它,进而处理整合它。先来概览一下这个游戏的“流”:
我们反向来进行思考。首先,canvas绘制的数据流来自于scene$流
,这个流是由snake$蛇
、apples$苹果
和score$分数
三个流合成的,但其中apples$
流又来源于snake$
。其次,snake$
流受到键盘的操作输入direction$
、蛇本身的速度ticks$
和蛇长度snakeLength$
的影响,这三者构成了snake$
流的数据。再看score$
分数流,每当蛇的长度改变时,score$
就会改变,发出数据;最后,当我们判断到蛇吃到了苹果,apples$
流就会发出数据,同时通过appleEaten$
使length$
执行length$.next(point)
,让蛇长度snakeLength$
进行改变,进而触发一系列的数据发出。
仔细看,里面的源头只有direction$键盘方向事件
、snakeLength$
和ticks$定时器
三个流。
实际操作
canvas游戏布局
这部分主要涉及到使canvas
绘制一个游戏区域,细节略过,简单贴一下代码,博客最后有超链接指向源博客,有详细的说明。
1 | // html |
键盘事件direction$流
RxJS
提供了一个fromEvent()
操作符用来将事件转换为数据流,我们就用它来将玩家按键输入转为流数据,同时使用一些map()
、filter()
等操作符,来处理一下,确保只输出上下左右按键的值:
1 | import { fromEvent } from 'rxjs'; |
页面中现在什么也没有,随便点按键盘按键,上下左右会输出值,其他键的输入会被忽略:
这里有个问题,我们的蛇是不能进行反方向运动的,所以我们需要忽略掉反向的输入;同时对于连续相同方向的输入,我们也应该忽略掉。这样的话我们就需要记录下键盘最后的输入状态,不过这里最好还是不要引入外部变量,全部交由Observable管道去处理,这里就要用到scan
操作符,类似于js中的reduce()
方法。同时对于相同方向的输入,我们使用distinctUntilChangede()
来处理:
1 | function nextDirection(previous, next) { |
这次,相同方向或者相反方向的输入都已经被处理(忽略)掉了。看一下direction$
的Marbles
图:
snakeLength$蛇长流
我们再回头看一下上面的snake流图
,snakeLength$
记录的是蛇当前的长度,发出的是蛇的长度的数字流。这个流基于(或者说来源于)length$
流,length$
记录的是蛇每次吃到苹果时增加的长度(也是增加的分数),这时候我们需要手动把值emit出去,使用BehaviorSubject()
来实现。当length$
发出值,即使用length$.next()
来给Subject()
提供值,这个值就会在snakeLength$
上发出。我们需要snakeLength$
的值累加上length$
的值:
1 | const SNAKE_LENGTH = 5; |
注意,这里使用share()
来允许多次订阅Observable
,否则每次订阅都会重新创建源Observable
。我们的snakeLength$
同时被snake$
流和score$
流订阅,需要确保这两个流的获取的数据是同步的。
snake$流
到目前,direction$
和snakeLength$
都已确定,还差一个ticks$
就可以组成snake$
流。ticks$
是一个类计时器的东西,限定蛇的移动速度,抽象来讲就是我们需要一个按时间规律发出值的Observable
,interval()
操作符可以完成这个任务:
1 | const SPEED = 120; |
从ticks$
到最终的snake$
,我们需要继续思考:每当ticks
发出值,蛇继续前进还是增加身长,取决于是否吃到了苹果,我们可以使用scan()
操作符去积累蛇身的长度。但接下来如何组合direction$
、snakeLength$
和ticks$
?
withLatestFrom()
可以完美实现我们的需求,我这里直接贴上官方文档的英文释义,通俗易懂:
Combines the source Observable with other Observables to create an Observable whose values are calculated from the latest values of each, only when the source emits.
withLatestFrom()
组合多个Observables
,并且具有主从关系,非常适合这里当蛇移动的时候去组合direction$
和snakeLength$
:
1 | import { move, generateSnake } from './utils'; |
我们主要的源 Observable
是 ticks$
,每当管道上有新值发出,我们就取 direction$
和 snakeLength$
的最新值。注意,即使辅助流频繁地发出值(例如,玩家头撞键盘上),也只会在每次定时器发出值时处理数据。
此外,我们给 withLatestFrom
传入了选择器函数,当主要的流产生值时才会调用此函数。此函数是可选的,如果不传,将会生成包含所有元素的列表。来看下Marbles
图:
可以看到,使用了withLatestFrom()
后,只有当ticks$
发出值时,才会去direction$
和snakeLength$
中取最近的值一起发出,然后经过scan()
处理,生成蛇身的数组对象形式的流。
score$流
玩家的比分比较好处理,有了snakeLength$
我们直接使用scan()
积累一下分数即可,每当snakeLength$
发出值,累加一次分数:
1 | const POINTS_PER_APPLE = 1; // 每次一分 |
来看下Marbles
图:
这里看似简单,其实有点小坑。可以看到score$
中没有出现snakeLength$
流中数据5
对应的分数,这是因为snakeLength$
使用了share()
来允许多次订阅它,同时snake$
已经先行订阅了snakeLength$
,这时候score$
的订阅就只会先发出startWith(0)
初始化的数据0
,只有等到length$.next(POINT)
发出值,snakeLength$
发出6
,score$
才会同步发出分数1
,仔细体会一下。
apples$流
snake$
和score$
已经搞定,剩下的就是apples$
流。先确定一下apples$
的数据格式,我们需要在canvas
中同时显示两个苹果,同时,如果蛇吃到了苹果,则会在页面中非蛇身
的位置随机产生另外一个苹果。所以我们使用[{x: *, y: *}, {x: **, y: **}]
数组对象的格式来表示苹果。每次蛇移动时都检查是否有碰撞。如果有碰撞,我们就生成一个新的苹果并返回一个新的数组。这样的话我们便可以利用 distinctUntilChanged()
来过滤掉完全相同的值。
1 | let apples$ = snake$.pipe( |
每当 apples$
产生一个新值时,我们就可以假定蛇吞掉了一个苹果。剩下要做的就是增加比分,还要将此事件通知给其他流,比如 snake$
,它从 snakeLength$
中获取最新值,以确定是否将蛇的身体变长。
appleEaten$流
我们在上面的eat
方法中检测到了“碰撞”,即蛇吃到了苹果。这时候我们应该调用length$.next(POINTS_PER_APPLE)
去增加蛇的长度snakeLength$
,但是我们把eat
工具方法整合到了utils.ts
文件内,无法调用到length$.next()
,这个时候我们可以引入一个中间流,让它帮我们通知。
appleEaten$
扮演了这个角色,首先,因为appleEaten$
订阅的是apples$
流,apples$
刚开始就有一个初始值发出,我们需要跳过这个初始值,不然一开局比分就会增加,显然不行;其次,appleEaten$
只负责扮演通知者的角色,它只负责通知其他的流,而不会有观察者来订阅它。因此,我们需要手动订阅。
1 | let appleEaten$ = apples$.pipe( |
scene$流
回头再看一次上面的snake流图
,基本上所有的流都已完成,而且形成了一整套闭环。只差之后的的整合流scene$
了。先来看下代码:
1 | let scene$ = combineLatest(snake$, apples$,score$, (snake, apples, score) => ({ snake, apples, score })); |
与 withLatestFrom
不同的是,我们不会限制辅助流,我们关心每个输入 Observable
产生的新值。最后一个参数还是选择器函数,我们将所有数据组合成一个表示游戏状态的对象,并将对象返回。游戏状态包含了 canvas 渲染所需的所有数据。
性能维护
无论是游戏,还是 Web 应用,性能都是我们所追求的。性能的意义重大,但就我们的游戏而言,我们希望每秒重绘整个场景 60 次。所以我们需要另外一个定时器去限制渲染频率:
1 | // interval 接收以毫秒为单位的时间周期,这也就是为什么我们要用 1000 来除以 FPS |
问题是 JavaScript 是单线程的。最糟糕的情况是,我们阻止浏览器执行任何操作,导致其锁定。换句话说,浏览器可能无法快速处理所有这些更新。原因是浏览器正在尝试渲染一帧,然后立即被要求渲染下一帧。作为结果,它会抛下当前帧以维持速度。这时候动画就开始看上去有些不流畅了。
幸运的是,我们可以使用 requestAnimationFrame
来允许浏览器对任务进行排队,并在最合适的时间执行任务。但是,我们如何在 Observable 管道中使用呢?好消息是包括 interval()
在内的众多操作符都接收 Scheduler
(调度器) 作为最后的参数。总而言之,Scheduler
是一种调度将来要执行的任务的机制。
虽然 RxJS 提供了多种调度器,但我们关心的是名为 animationFrame
的调度器。此调度器在 window.requestAnimationFrame
触发时执行任务。
1 | const game$ = Observable.interval(1000 / FPS, animationFrame) |
现在interval
大概每 16ms 发出一次值,从而保持 FPS 在 60 左右。
场景渲染
最后,我们把game$
和scene$
组合起来。game$
作为主要流,每隔1000/FPS
毫秒时,根据scene$
数据渲染一次页面,所以使用withLatestFrom()
组合:
1 | const game$ = interval(1000 / FPS, animationFrame) |
至此,我们就使用RxJS
完成了整个游戏的核心部分,完全使用响应式编程,没有依赖任何外部状态。
这里是在线试玩的 demo。
总结&资料引入
在花了两三周时间把RxJS
的基础概念和操作符都过了一遍之后,急需一个实例demo来将自己所学的RxJS
知识进行转化,只有真正的操作过,使用过,才能更深入的理解Observable
和响应式编程的理念。最后在知乎上找到了一篇RxJS 游戏之贪吃蛇
的专栏文章,自己跟着做了一遍,反复思考其中流的起源、转换和走向。这篇博客基本上是按照RxJS 游戏之贪吃蛇一文中的思路整理的,其中加入了我自己的思考过程,极力推荐阅读原文。以下是本文的参考资料:
RxJS 游戏之贪吃蛇、RxJS官方api文档、30 天精通 RxJS、RxJS Marbles、学习 RxJS 操作符、贪吃蛇源码
以上的链接也是我最近几周所看所学的来源。接下来会写几篇博客进行知识点的总结,掺杂一些自己的思考,巩固自己所学,加深印象。