响应式编程是一种面向数据流和变更传播的异步编程范式。RxJS是一个使用可观察对象进行响应式编程的库,它让组合异步代码和基于回调的代码变得更简单,下面谈对rxjs的理解
核心代码
RxJS是一个使用可观察对象进行响应式编程, 它和Promise有一个很大区别,是懒执行的, Promise 创建时就会执行,不管你后面有没有then, 而RxJS可观察对象只有订阅(subscribe)时才会执行,如下代码
new Promise((resolve) => {
console.log('会执行');
resolve(1);
});
new Observable((subscriber) => {
console.log('不会执行');
subscriber.next(1);
});
RxJS可观察对象核心原理代码如下
interface Observer<T> {
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}
// 核心就是这个类 9 行代码, 每个Rxjs的东西都是建立在这几行代码的基本上
class Observable {
private _subscribe: (subscriber: Observer<any>) => void;
constructor(subscribe: (subscriber: Observer<any>) => void) {
this._subscribe = subscribe; // 存订阅方法
}
subscribe(observer: Observer<any>) {
this._subscribe(observer); // 通过订阅方法执行订阅对象
}
}
const nextFn = (res) => console.log(res)
new Observable((subscriber) => {
console.log(1);
subscriber.next('发布值');
}).subscribe({
next: nextFn, // 发布值
error: console.log,
complete: console.log,
});
本质上就是 subscriber.next方法里面会调用观察者对象subscribe方法参数里的next的方法,照上面实现subscriber.next是等于nextFn, 但Rxjs里面肯定不是的,Rxjs对nextFn做一层包装处理各种前置条件
极简版的Rxjs实现
功能实现如下:
- Observable类,订阅方法、管道方法
- 取消订阅
功能实现后达到以下效果
const source = new Observable((subscriber) => {
subscriber.next(1);
setTimeout(() => {
subscriber.next(2);
}, 500);
setTimeout(() => {
subscriber.next(3);
}, 1000);
});
const subscription = source.pipe(map(a => a*2)).subscribe({next: console.log});
setTimeout(() => {
subscription.unsubscribe();
}, 800);
结果:打印 2、4; 去掉下面定时器,还会打印6 实现如下
export interface Observer<T> {
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}
// 订阅类
class Subscription<T> {
private teardowns: Subscription<T>[] = [];
constructor() {
this.teardowns = [];
}
unsubscribe() {
this.teardowns.forEach((teardown) => {
teardown.unsubscribe();
});
}
add(teardown: Subscription<T>) {
if (teardown) {
this.teardowns.push(teardown);
}
}
}
对传入的订阅对象做一层包装
class Subscriber<T> extends Subscription<T> {
private isClosed = false;
private observer;
constructor(observer: Observer<T>) {
super();
this.observer = observer;
}
next(val: T) {
if (this.observer.next && !this.isClosed) {
this.observer.next(val);
}
}
override unsubscribe() {
if (!this.isClosed) {
super.unsubscribe();
this.isClosed = true;
}
}
error(val: any) {
if (this.observer.error && !this.isClosed) {
this.isClosed = true;
this.observer.error(val);
}
}
complete() {
if (this.observer.complete && !this.isClosed) {
this.isClosed = true;
this.observer.complete();
}
}
}
class Observable<T> {
private _subscribe;
constructor(subscribe: (subscriber: Observer<T>) => void) {
this._subscribe = subscribe;
}
subscribe(observer?: Partial<Observer<T>>): Subscription<T> {
const subscription = new Subscription(); // 定义订阅实例
const subscribe = new Subscriber(observer); // 包装传入的订阅对象
subscription.add(subscribe); // 添加订阅对象
this._subscribe(subscribe); // 执行订阅回调
return subscription; // 返回阅读实例,用于取消订阅
}
// 管道, 返回的是最后一个管道操作符返回的可观察实例
pipe(...operators: any[]): Observable<T> {
return operators.reduce((prev: any, Fn: (x: any) => any) => Fn(prev), this);
}
}
// map操作符实现
function map(Fn: any) {
// 执行形成闭包,使返函数可以访问Fn
return (source: Observable<any>) => {
// 管道执行后传入上一个可观察实例
return new Observable((subscriber) => {
return source.subscribe({
next(val) {
subscriber.next(Fn(val));
},
error(err) {
subscriber.error(err);
},
complete() {
subscriber.complete();
},
});
});
};
}
管道看起来不好理解,可能原因是这里使用了几次高阶函数,将其类比出来就好理解了,目的是让其按照管道参数顺序依次执行,如下
// 管道使用
const fn = (x: number) => x * 2;
new Observable<number>((subscriber) => {
subscriber.next(1);
})
.pipe(map(fn), map(fn))
.subscribe({
next(val) {
console.log(val);
},
});
// 上面管道就相当于转化成了下面这些代码,将下面代码通过pipe组织了起来
const source1 = new Observable<number>((subscriber) => {
subscriber.next(1);
});
const source2 = new Observable<number>((subscriber) => {
source1.subscribe({
next(val) {
subscriber.next(fn(val));
},
});
});
const source3 = new Observable<number>((subscriber) => {
source2.subscribe({
next(val) {
subscriber.next(fn(val));
},
});
}).subscribe({
next(val) {
console.log(val);
},
});
总结
rxjs 原理代码很少,实例化时传入一个函数,参数是观察对象,实例在订阅时,执行实例化时传入的函数,函数中使用了了观察对象的next方法,其实就是执行了订阅的传入函数,这样就可以达到声明时不执行,只有在订阅时才执行的特点