type
Post
status
Published
date
Aug 6, 2022
slug
summary
tags
开发
rxjs
前端
category
技术分享
icon
password
Property
Aug 6, 2022 06:36 AM
rxjs入门
从一个例子看起
let container = document.getElementById('container'); container.onmousedown = function(e) { const { screenX, screenY } = e; const { offsetTop, offsetLeft } = e.target; let diffY = offsetTop-screenY; let diffX = offsetLeft-screenX; document.body.addEventListener('mousemove', move); document.body.onmouseup = () => { document.body.removeEventListener('mousemove', move); } document.body.onmouseleave = () => { document.body.removeEventListener('mousemove', move); } function move(e) { const targetX = e.screenX + diffX; const targetY = e.screenY + diffY; container.style.left = `${targetX}px`; container.style.top = `${targetY}px`; } }
实现一个拖拽功能我们需要
- 监听mousedown事件
- mousedown事件触发时绑定mousemove事件
- mousemove事件触发时更新视图
- mouseup或者mouseLeave事件触发时解绑mousemove事件
那么我们的代码用rxjs实现会是什么样子呢
const container = document.getElementById('container'); const mouseDown$ = rxjs.fromEvent(container, 'mousedown'); const mouseUp$ = rxjs.fromEvent(container, 'mouseup'); const mouseLeave$ = rxjs.fromEvent(container, 'mouseleave'); const mouseMove$ = rxjs.fromEvent(document.body, 'mousemove'); // 合并mouseLeave与mouseUp事件 const stop$ = rxjs.merge(mouseLeave$, mouseUp$); const drag$ = mouseDown$ .pipe( rxjs.operators.mergeMap( startEvent => { const initialLeft = container.offsetLeft; const initialTop = container.offsetTop; return mouseMove$.pipe( // 发出值 直到触发mouseLeave或mouseUp事件 rxjs.operators.takeUntil(stop$), // 转换值 rxjs.operators.map( mouseEvent => ({ x: mouseEvent.x - startEvent.x + initialLeft, y: mouseEvent.y - startEvent.y + initialTop }) ), ); } ) ) drag$.subscribe( event => { container.style.left = event.x + 'px'; container.style.top = event.y + 'px'; } )
可以看到
- 我们的事件定义全部在开始就定义好了
- 事件的绑定不需要手动解绑
- mouseLeave事件与mouseUp事件进行了合并
- 与视图有关的操作只在subscribe中触发,上游只负责数据的生产
rxjs简介
RxJS 是函数响应式编程的库,它使编写异步或基于回调的代码更容易,正如rxjs官网宣称的那样:
Think of RxJS as Lodash for events.
Observable与Observer
要理解RxJS,先要理解两个最重要的概念:Observable和Observer,可 以说RxJS的运⾏就是Observable和Observer之间的互动游戏。
在rxjs中,Obseravble对象就是一个发布者, 通过Observable对象的subscribe函数给观察者推送数据
创建Observable
我们可以使用Observable.create函数构造一个简单的Observable,其中Observable返回的函数为Observable取消时进行的操作(类似React.useEffect)。
const source$ = rxjs.Observable.create(observer => { let count = 1; const interval = setInterval( () => { observer.next(count++); if(count > 5){ observer.complete(); } }, 1000 ); return () => clearInterval(interval); });
Observale对象的subscribe函数接受三个回调函数,分别在Observable推送数据、Observable推送异常、Observable推送结束之后调用。
source$ .subscribe( (data) => { console.log(data); }, (err) => { console.err(err) }, () => { console.log('done') } )
流(stream)、管道与操作符(Operator)
rxjs中的每一个Obseravble对象都可以简称为一个流(stream),对于复杂的流,我们往往需要对这个数据流做一系列处理,这时候就轮到管道与操作符出场了。
我们在对流做处理的过程中,往往会经过多个处理过程,最终observer只需要处理可以走到终点的数据,数据就像是在一个管道中一样,数据处理就靠的是操作符。
创建操作符
实现一个操作符至少需要做下面两件事情
- 返回一个全新的observable对象
- 做好上游的数据订阅以及退订处理
function mapOperator(map) { return function(source) { return new rxjs.Observable( subscriber => { const subscription = source.subscribe({ next(value) { subscriber.next(map(value)); }, error(error) { subscriber.error(error); }, complete() { subscriber.complete(); } }); return () => subscription.unsubscribe() } ) }
使用上面定义的操作符,我们可以实现类似js中array.map的效果
const source$ = rxjs.interval(1000) .pipe( mapOperator(i => `index-${i}`) ); source$.subscribe(console.log); // index-0、index-1、index-2、index-3、index-4.。。。。。
弹珠图
rxjs当中简单的数据流我们可以想象,但是如果数据流过于复杂的话,就有点困难了,这时候就需要使用弹珠图来表示数据流了。
在弹珠图中,每个弹珠的间隔表示数据吐出的时间间隔,
|
符号表示流的结束,对应subscribe的complete函数。符号×代表数据流中的异常,对应于subscribe的error函数。
我们可以在这两个网站上看到大部分操作符对应的弹珠图
- https://rxmarbles.com/#first
- https://rxviz.com/
rxjs特性
流的过滤与转化
过滤类操作符
过滤类操作符可以看作是一个漏斗,只给下游传输需要的数据。
const $inputEle = document.getElementById('input'); const $textContent = document.getElementById('text-content'); const type$ = rxjs.fromEvent($inputEle, 'keyup'); const enter$ = type$ .pipe( rxjs.filter(e => e.keyCode === 13), ); type$ .subscribe(e => { const text = e.target.value; console.log(text); }) enter$ .subscribe(e => { const text = e.target.value; $textContent.innerText = text; })
转换类操作符
Example: 使用mergeMap进行数据请求
function mockHttp(text) { return rxjs.Observable.create( observer => { const timeout = setTimeout( () => { observer.next(`resolve ${text}`); observer.complete(); }, 1000 ); return () => clearTimeout(timeout); } ) } const $inputEle = document.getElementById('input-element'); const $resContent = document.getElementById('res-content'); const type$ = rxjs.fromEvent($inputEle, 'input') .pipe( rxjs.map(e => e.target.value) ); type$ .pipe( rxjs.mergeMap(mockHttp), ) .subscribe( text => $resContent.innerText = text, )
合并流
Example: 使用forkJoin和mergeMap操作符实现带有并发控制的请求。
function mockHttp(num) { return rxjs.Observable.create( observer => { console.log(`fetching${num}`) const timeout = setTimeout( () => { observer.next(`data${num}`); observer.complete(); }, num * 500 ); return () => clearTimeout(timeout); } ) } const source$ = rxjs.of([1,2,3,4,5,6,7,8]); source$ .pipe( rxjs.mergeMap( arr => { const tasks = rxjs.from(arr.map(mockHttp)) .pipe( rxjs.mergeAll(3), ); return rxjs.forkJoin(tasks) } ) ) .subscribe({ complete: () => console.log('complete'), });
切换/中断流
使用switchMap操作符,我们可以优化上面转换数据流例子的数据请求。
function mockHttp(text) { return rxjs.Observable.create( observer => { console.log(`fetching ${text}`); const timeout = setTimeout( () => { observer.next(`resolve ${text}`); observer.complete(); }, 1000 ); return () => { console.log(`fetching abort${text}`) clearTimeout(timeout); }; } ) } const $inputEle = document.getElementById('input-element'); const $resContent = document.getElementById('res-content'); const type$ = rxjs.fromEvent($inputEle, 'input') .pipe( rxjs.map(e => e.target.value) ); type$ .pipe( rxjs.switchMap(mockHttp), ) .subscribe( text => $resContent.innerText = text, )
背压(Backpressure)控制
回压: 在数据流从上游生产者向下游消费者传输的过程中,上游生产速度大于下游消费速度,导致下游的 Buffer 溢出,这种现象就叫做 Backpressure 出现。
因为上游数据流可能会不定时产生数据,并且数据大小不确定,所以很有可能会出现背压的情况,这时候可以使用throttle、debounce、distinct、bufferCount、bufferWhen等操作符进行背压控制。
Example: 使用bufferTime收集一定时间内的事件
const type$ = rxjs.fromEvent(document.body, 'keyup') .pipe( rxjs.map(i => ({ event: 'keyup', key: i.key, })), rxjs.tap(console.log) ); const click$ = rxjs.fromEvent(document.body, 'click') .pipe( rxjs.map(i => ({ event: 'click', })), rxjs.tap(console.log) ); rxjs.merge(type$, click$) .pipe( rxjs.bufferTime(1000), rxjs.filter(i => i.length), ) .subscribe(console.log)
流的重播
使用重播类的操作符号可以对异常的流进行重试处理
Example: 使用retryWhen操作符实现有时间间隔的重试操作符
function mockErrorResponse() { return new rxjs.Observable(observer => { console.log('fetching') setTimeout(() => observer.error('我报错了'), 200) }) } const retryWithDelay = (maxCount, delayMillSeconds) => source$ => new rxjs.Observable( subscriber => { const subscription = source$ .pipe( rxjs.retryWhen( err$ => err$ .pipe( rxjs.scan( (errCount, err) => { if (errCount > maxCount) { throw err; } return errCount + 1; }, 0 ), rxjs.delayWhen(rate => rxjs.timer(delayMillSeconds * rate)) ) ) ).subscribe(); } ); const err$ = mockErrorResponse(); err$ .pipe( retryWithDelay(5, 300), ) .subscribe({ error: err => console.log(err), })
多播与单播
我们上面的例子大部分都是单个观察者消费数据,那么如果有多个观察者,数据流会怎样推送呢?
以下的代码根据直觉来看,第二个observer是在1.5s之后才订阅的,所以数据流应该会是
0,1 ,1,2,2
const source$ = rxjs.interval(1000) .pipe( rxjs.take(3), ); source$.subscribe(console.log); setTimeout( () => source$.subscribe(console.log), 1500, )
实际上输出的数据流是
0,1,0,2,1,2
为什么呢?
其实是因为上面产生的Observable是cold Observable,每一个Observer订阅的都是一个独立的数据流,并不是来自于同一个数据源。
我们可以使用publish操作符将cold Observable变成hot Observable
const source$ = rxjs.interval(1000) .pipe( rxjs.take(3), ); const shareSource$ = source$ .pipe( rxjs.publish(), rxjs.refCount(), ); shareSource$.subscribe(console.log); setTimeout( () => shareSource$.subscribe(console.log), 1500, );
rxjs接入
react
Vue
Angular
Angular支持直接使用rxjs,Angular的核心代码也大量使用了rxjs。
rxjs的使用场景
- 单一数据流多状态
- 可取消数据流
- 多数据流互相依赖
rxjs的缺点
- 首先,从目前已有的编程习惯转换到rxjs中,相对来会比较困难。
- rxjs当中的操作符也是异常多,本文中提到的操作符仅仅是rxjs中的一小部分
rxjs学习曲线示意图 摘自《深入浅出rxjs》
- rxjs的使用场景比较有限,只有在复杂异步场景下面才能体现出rxjs的价值,一些简单的场景下使用rxjs反而会徒增复杂度
代码仓库
推荐阅读

- Author:double
- URL:https://double2.wiki//article/387eb715-12a2-4147-9499-bc9fcb49416b
- Copyright:All articles in this blog, except for special statements, adopt BY-NC-SA agreement. Please indicate the source!
Relate Posts