RxJS 中的基本概念
- Observeable
- Observer
- Subscription
- Operators
- Subject
- Schedulers:用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如
setTimeout
或requestAnimationFrame
或其他。
RxJS 的特性
纯净性
使得 RxJS 强大的正是它使用纯函数来产生值的能力。这意味着你的代码更不容易出错。
1
2
3
4var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.scan(count => count + 1, 0)
.subscribe(count => console.log(`Clicked ${count} times`));流动性
RxJS 提供了一整套操作符来帮助你控制事件如何流经 observables 。
1
2
3
4
5var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.throttleTime(1000)
.scan(count => count + 1, 0)
.subscribe(count => console.log(`Clicked ${count} times`));
Observable
创建 Observables
Observables 可以使用
create
来创建, 但通常我们使用所谓的创建操作符 像of
、from
、interval
、等等。Rx.Observable.create
是Observable
构造函数的别名,接受一个参数:subscribe
函数1
2
3
4
5var observable = Rx.Observable.create(function subscribe(observer) {
var id = setInterval(() => {
observer.next('hi')
}, 1000);
});订阅 Observables
1
observable.subscribe(x => console.log(x));
subscribe 调用在同一个 Observable 的多个观察者之间是不共享的。
执行 Observables
Observable.create(function subscribe(observer) {...})
中...
的代码表示 “Observable 执行”,它是惰性运算,只有在每个观察者订阅后才会执行。随着时间的推移,执行会以同步或异步的方式产生多个值。Observable 执行可以传递三种类型的值:
- “Next” 通知: 发送一个值,比如数字、字符串、对象,等等。
- “Error” 通知: 发送一个 JavaScript 错误 或 异常。
- “Complete” 通知: 不再发送任何值。
“Next” 通知是最重要,也是最常见的类型:它们表示传递给观察者的实际数据。“Error” 和 “Complete” 通知可能只会在 Observable 执行期间发生一次,并且只会执行其中的一个。
约束正则:
1
next*(error|complete)?
使用
try/catch
块1
2
3
4
5
6
7
8
9
10var observable = Rx.Observable.create(function subscribe(observer) {
try {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
} catch (err) {
observer.error(err); // 如果捕获到异常会发送一个错误
}
});清理 Observable 执行
1
2
3
4var observable = Rx.Observable.from([10, 20, 30]);
var subscription = observable.subscribe(x => console.log(x));
// 稍后:
subscription.unsubscribe();当我们使用
create()
方法创建 Observable 时,Observable 必须定义如何清理执行的资源。你可以通过在function subscribe()
中返回一个自定义的unsubscribe
函数。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16var observable = Rx.Observable.create(function subscribe(observer) {
// 追踪 interval 资源
var intervalID = setInterval(() => {
observer.next('hi');
}, 1000);
// 提供取消和清理 interval 资源的方法
return function unsubscribe() {
clearInterval(intervalID);
};
});
var unsubscribe = subscribe({next: (x) => console.log(x)});
// 稍后:
unsubscribe(); // 清理资源