rxjs核心原理讲解

lxf2023-05-05 10:31:01

响应式编程是一种面向数据流和变更传播的异步编程范式。RxJS是一个使用可观察对象进行响应式编程的库,它让组合异步代码和基于回调的代码变得更简单,下面谈对rxjs的理解

核心代码

RxJS是一个使用可观察对象进行响应式编程, 它和Promise有一个很大区别,是懒执行的, Promise 创建时就会执行,不管你后面有没有then, 而RxJS可观察对象只有订阅(subscribe)时才会执行,如下代码

    new Promise((resolve) => {
      console.log('会执行');
      resolve(1);
    });

    new Observable((subscriber) => {
      console.log('不会执行');
      subscriber.next(1);
    });

RxJS可观察对象核心原理代码如下

    interface Observer<T> {
      next: (value: T) => void;
      error: (err: any) => void;
      complete: () => void;
    }
    // 核心就是这个类 9 行代码, 每个Rxjs的东西都是建立在这几行代码的基本上
    class Observable {
      private _subscribe: (subscriber: Observer<any>) => void;
      constructor(subscribe: (subscriber: Observer<any>) => void) {
        this._subscribe = subscribe; // 存订阅方法
      }
      subscribe(observer: Observer<any>) {
        this._subscribe(observer); // 通过订阅方法执行订阅对象
      }
    }
    const nextFn = (res) => console.log(res)
    new Observable((subscriber) => {
      console.log(1);
      subscriber.next('发布值');
    }).subscribe({
      next: nextFn, // 发布值
      error: console.log,
      complete: console.log,
    });

本质上就是 subscriber.next方法里面会调用观察者对象subscribe方法参数里的next的方法,照上面实现subscriber.next是等于nextFn, 但Rxjs里面肯定不是的,Rxjs对nextFn做一层包装处理各种前置条件

极简版的Rxjs实现

功能实现如下:

  1. Observable类,订阅方法、管道方法
  2. 取消订阅

功能实现后达到以下效果

const source = new Observable((subscriber) => {
      subscriber.next(1);
      setTimeout(() => {
        subscriber.next(2);
      }, 500);
      setTimeout(() => {
        subscriber.next(3);
      }, 1000);
    });
    const subscription = source.pipe(map(a => a*2)).subscribe({next: console.log}); 
    setTimeout(() => {
      subscription.unsubscribe();
    }, 800);

结果:打印 2、4; 去掉下面定时器,还会打印6 实现如下

export interface Observer<T> {
  next: (value: T) => void;
  error: (err: any) => void;
  complete: () => void;
}
// 订阅类
class Subscription<T> {
  private teardowns: Subscription<T>[] = [];
  constructor() {
    this.teardowns = [];
  }
  unsubscribe() {
    this.teardowns.forEach((teardown) => {
      teardown.unsubscribe();
    });
  }
  add(teardown: Subscription<T>) {
    if (teardown) {
      this.teardowns.push(teardown);
    }
  }
}
对传入的订阅对象做一层包装
class Subscriber<T> extends Subscription<T> {
  private isClosed = false;
  private observer;
  constructor(observer: Observer<T>) {
    super();
    this.observer = observer;
  }
  next(val: T) {
    if (this.observer.next && !this.isClosed) {
      this.observer.next(val);
    }
  }
  override unsubscribe() {
    if (!this.isClosed) {
      super.unsubscribe();
      this.isClosed = true;
    }
  }
  error(val: any) {
    if (this.observer.error && !this.isClosed) {
      this.isClosed = true;
      this.observer.error(val);
    }
  }
  complete() {
    if (this.observer.complete && !this.isClosed) {
      this.isClosed = true;
      this.observer.complete();
    }
  }
}
class Observable<T> {
  private _subscribe;
  constructor(subscribe: (subscriber: Observer<T>) => void) {
    this._subscribe = subscribe;
  }
  subscribe(observer?: Partial<Observer<T>>): Subscription<T> {
    const subscription = new Subscription(); // 定义订阅实例
    const subscribe = new Subscriber(observer); // 包装传入的订阅对象
    subscription.add(subscribe); // 添加订阅对象
    this._subscribe(subscribe); // 执行订阅回调
    return subscription; // 返回阅读实例,用于取消订阅
  }
  // 管道, 返回的是最后一个管道操作符返回的可观察实例
  pipe(...operators: any[]): Observable<T> {
    return operators.reduce((prev: any, Fn: (x: any) => any) => Fn(prev), this);
  }
}
// map操作符实现
function map(Fn: any) {
  // 执行形成闭包,使返函数可以访问Fn
  return (source: Observable<any>) => {
  // 管道执行后传入上一个可观察实例
    return new Observable((subscriber) => {
      return source.subscribe({
        next(val) {
          subscriber.next(Fn(val));
        },
        error(err) {
          subscriber.error(err);
        },
        complete() {
          subscriber.complete();
        },
      });
    });
  };
}

管道看起来不好理解,可能原因是这里使用了几次高阶函数,将其类比出来就好理解了,目的是让其按照管道参数顺序依次执行,如下

// 管道使用
const fn = (x: number) => x * 2;
new Observable<number>((subscriber) => {
  subscriber.next(1);
})
  .pipe(map(fn), map(fn))
  .subscribe({
    next(val) {
      console.log(val);
    },
  });
  
// 上面管道就相当于转化成了下面这些代码,将下面代码通过pipe组织了起来
const source1 = new Observable<number>((subscriber) => {
  subscriber.next(1);
});
const source2 = new Observable<number>((subscriber) => {
  source1.subscribe({
    next(val) {
      subscriber.next(fn(val));
    },
  });
});
const source3 = new Observable<number>((subscriber) => {
  source2.subscribe({
    next(val) {
      subscriber.next(fn(val));
    },
  });
}).subscribe({
  next(val) {
    console.log(val);
  },
});

总结

rxjs 原理代码很少,实例化时传入一个函数,参数是观察对象,实例在订阅时,执行实例化时传入的函数,函数中使用了了观察对象的next方法,其实就是执行了订阅的传入函数,这样就可以达到声明时不执行,只有在订阅时才执行的特点