1. Observables & Reactive
先来一个简单直观的例子:
const { Observable } = require('rxjs');
const source$ = Observable.of([1, 2, 3]);
source$.subscribe(x => console.log(x));
过滤器节点:subscribe
2. Declarative Transformation( 声明式转换 )
如果我们想要平时开发的数据转换功能,可以使用一些类似管道的工具来做。
Observable.of(1, 2, 3)
.map(n => n * 2)
.subscribe(x => console.log(x));
过滤器节点:map、subscribe。我所理解的就像一条溪流流动的水,然后 map 这类 API 就像水桶,将水装入进行加工,当然,其他 map 的特性先不用细致了解。
3. Lazy Transformation( 懒执行透明特性 )
Observable 能够在流动的过程中进行选择,所谓的懒特性就是不会像 Promise 一样,给了一个数据承认,就一定会让你接受,你可以选择不接受或者现在不接受,先这样子理解。
Observable.range(1, 100)
// 转换管道
.map(n => n * 2)
// 拦截节点
.filter(n => n > 4)
// 懒获取
.take(2)
.subscribe(x => {
console.log(x);
});
4. DOM Event
Rx 其实是可以独立与任何框架使用了,也可以按需加载想用的方法,对于 Rx,其实和 Zone 一样,有了自己的保护圈和玩法,如果你要玩它,就需要进入它的圈子,大多数的相关 API 就是 from
的特性,对于 Rx 5.0 模块化做的更好,以及渲染 Timeline 上更可控。
<button id="btn">加</button>
<h1 id="out">0</h1>
const btn = document.getElementById('btn');
const out = document.getElementById('out');
/* eslint-disable no-undef */
const { Observable } = Rx;
Observable.fromEvent(btn, 'click')
// mapTo 能映射一个常量,map 需要一个函数
.mapTo(1)
.scan((acc, cur) => acc + cur, 0)
.subscribe(count => {
out.innerHTML = count;
});
其中主要依赖了:
rxjs/Observable.js
rxjs/observable/fromEvent.js
rxjs/operator/mapTo.js
rxjs/operator/scan.js
如果是打包的方式可以按需加载,如果是 script 可以直接引入 rxjs/bundles/Rx.js
,不过会比较大,所以最好是按需提取到公用的 external 文件中,也能使用 lite 的版本,是常用的精简版。
如果有两个事件怎么办呢?利用流的合并特性可以做:
// 加法流,然后转换成数据 1 流入给下一个输入
const { Observable } = Rx;
const inSource$ = Observable.fromEvent(inBtn, 'click').mapTo(1);
// 减法流,然后转换成数据 -1 流入给下一个输入
const outSource$ = Observable.fromEvent(outBtn, 'click').mapTo(-1);
// 合并成最后需要操作的流
const source$ = Observable.merge(inSource$, outSource$);
// 操作流并且订阅流的 next 函数
source$.scan((acc, cur) => {
if(acc + cur < 0) {
return 0;
}
return acc + cur;
}, 0).subscribe(count => {
print.innerHTML = count;
});
5. Async HTTP
前端主要除了这种简单的数据操作,更多的是要看异步场景下的复杂度,下面用 Promise
、async/await
、Obserable
三者的写法来对比。
(1) Promise方案
const data = fetchOrders().then(res => {
if(res.status === 200) {
return res.data;
}
});
data.then(orders => orders.filter(order => order.text === 'Bob'))
.then(orders => orders.map(order => order.id))
.then(ids => console.log(ids));
function fetchOrders() {
return axios.post('orders.json');
}
(2) async/await
function fetchOrders() {
return axios.post('orders.json');
}
renderOrders();
async function renderOrders() {
const res = await fetchOrders();
if(res.status === 200) {
const data = res.data;
data.filter(order => order.text === 'Bob')
.map(order => order.id)
.forEach(id => console.log(id));
}
}
(3) Observable
const { Observable } = Rx;
function fetchOrders() {
const promise = axios.post('orders.json');
return Observable.fromPromise(promise);
}
const hook =
fetchOrders()
.switchMap(res => {
let data = [];
if(res.status === 200) {
data = res.data;
}
return Observable.from(data);
})
.filter(order => order.text === 'Bob')
.map(order => order.id)
.filter(id => id !== void 0)
.subscribe(id => {
console.log(id);
});
6. Cancellation( 可取消的特性 )
其实这个特性很好用,可以随时取消你订阅的流,这是 Promise
做不到的,一旦发起承认就无法取消。
hook.unsubscribe();
7. Create HTTP Request
上面的 ajax 使用的是 axios 封装的,然后返回的是 Promise 的对象,如果要用 Observable 的生态,那么就需要利用 fromPromise
进行转化,这看上去不是特别好,所以能够直接利用 Observable 生态下的 ajax 进行操作。
与之前方案需要修改的地方有 2 个:
1. fetchOrders 函数请求 ajax 的方式
2. ajax 响应返回的对象不一样,毕竟之前使用的 axios
// 1. 使用 Observable.ajax
function fetchOrders() {
return Observable.ajax.post('orders.json');
}
// 2. 修改一下 ajax 响应基础数据的处理
if(res.status === 200) {
// res.data => res.response, ng2 如果不是 json 需要调用 json() 方法
data = res.response;
}
8. Observable.create 的一些细节
(1) Obserable 的组成
我之前仿造过一个简单 Obserable 的实现:Obserable
这里用简单的结构来说一下:
class Observable {
constructor (fn) {
this.fn = fn;
}
static create(fn) {
return new Observable(fn);
}
subscribe(next, error, complete) {
if (typeof next !== 'function') {
return this.fn(next);
}
return this.fn({
next,
error: error || () => {},
complete: complete || () => {}
});
}
}
从上面代码可以看出来,构造器主要接收一个函数,然后 create
方法主要返回的是一个 Obserable 实例。然后 subscribe
方法接收的是三个参数,如果我们传入的是一个 function 那么就会调用 this.fn
传入一个对象,这个对象有三个属性值:next、error、complete
,默认情况下 next
是必须传入的,其余的是可选,为了安全,自己手动创建的最好调用一下 complete
。
这样看上来其实也挺简单的。下来回到 rxjs 本身来看一下 create
的用法:
function fetchSomeone() {
return Observable.create(observer => {
observer.next('ok');
observer.complete();
});
}
fetchSomeone().subscribe(x => console.log(x));
// 或者如下写法
fetchSomeone().subscribe({
next: (x) => console.log(x),
complete: () => {}
});
这里用法比较简单,就是传入一个常量字符串 ok
,然后在这个 Observable 被订阅的时候传给订阅者。

剩下的一些内容单独来看:
(1) Error Handle( 错误的处理机制 )
(2) Hot and Cold 数据流以及它们之间的转换关系
(3) 流节点之间的转换
(4) 流节点的并发问题
后面可能会比较细节,如果有兴趣可以直接看下面的 PPT,我主要也是按 PPT 的例子和思路来写。
参考的PPT: https://speakerdeck.com/jfairbank/devnexus-2017-the-rise-of-async-javascript