【硬核拆解】从源码实现一个rxjs(Observable篇)

lxf2023-05-13 01:32:09

项目代码:github.com/blazer233/a…

参考轮子:github.com/ReactiveX/r…

依照 RxJS 的官方定义:

RxJS 是使用 Observables 的响应式编程的库,它使编写异步或基于回调的代码更容易

针对 JavaScript 中的非阻塞行为(non-blocking manner),RxJS in Action(Deniels, P.P etc.)列举了 3 种处理方式,包括回调函数(callback functions)、事件派发器(Event Emitters)以及Promise。

RxJS 则提出了一种新的思维方式:数据流,它通过observable序列来实现基于事件的编程,它使编写异步或基于回调的代码更容易。且代码量会有效减少,可读性的提高。本文通过挖掘精简RxJS源码,力求实现一个tiny版的RxJS,用简单的demo深入浅出理解RxJS的核心思想。

举个例子

通过伪代码,实现一个业务中常见的滚动下拉加载

普通:

【硬核拆解】从源码实现一个rxjs(Observable篇)

上述代码,我们可以看出来几点问题:

  1. 两个异步行为(fetch请求事件、页面滚动事件),两个异步事件的处理显得很冗余,且不容易阅读。
  2. 我们要使用 flag 去判断状态(isRequesting 判断上一个请求是否已经处理完成)。
  3. 如果我们需要判断请求的次数,还需要更多的 flag 在外层来记录。
rxjs:

【硬核拆解】从源码实现一个rxjs(Observable篇)

RxJS配合操作符(operator),提高了每一步操作的可读性,将事件转换成(Observable)通过流的方式进行监听并处理,并且在处理业务逻辑时,可以将每一步拆成更小的函数,通过更多的operator连接,提高业务代码的复用性和可读性,这也是RxJS的精髓所在。

接下来,我们首先来实现 RxJS 框架中核心的核心 —— Observable


Observable:

在我们实现的tony版RxJS中,需要跑通下面的Demo。

import { Observable } from 'rxjs';

// Create a lazy Push System
const observable = new Observable(observer => {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
    return () => console.log('succes');
});

// Subscribe the lazy Push System
observable.subscribe({
    next: (value) => console.log('we got', value),
    complete: () => console.log('completed'),
});

/**
 * Output:
 * we got 1
 * we got 2
 * we got 3
 * completed
 * 'succes'
 */

RxJSObservable是可以被订阅(subscribe)的一个流对象,而observer是订阅Observable的物件,理解这两者的区别和联系是很重要的。

在上述Demo中observer方法有:

  • observer.next():类似于promise的then,表示接下来的传入或操作。
  • observer.complete():表示观察者对象的流结束,complete()触发后,next将不再起作用。

通常subscribe对象中仅传入一个函数的时候视为next函数执行。

RxJS中,每一个函数都是一个 lazy Pull 系统,只有我们订阅 Observable,我们才可以拿到我们需要的数据,执行订阅的函数。

Observable

通过Demo,我们发现:

  1. Observable是一个入参为函数的构造函数
  2. Observable的入参函数会拿到observer作为参数
  3. Observable原型链上存在subscribe函数,作为订阅的开关

那么可以拆解源码实现如下代码:

export class Observable {
  constructor(subscribe) {
    if (subscribe) this._subscribe = subscribe;
  }

  subscribe(observerOrNext) {
    const subscriber = observerOrNext instanceof Subscriber
      ? observerOrNext
      : new SafeSubscriber(observerOrNext, error, complete);
    subscriber.add(this._subscribe(subscriber));
    return subscriber;
  }
}

点击查看对应源码

在如上代码里 new 实例Observable对象,其传入参数对内部方法_subscribe进行了覆盖,之后在调用subscribe时,将observable.subscribe({...})传入的对象转化为Subscriber的实例对象,之后调用实例对象的add方法,将一开始new Observable时传入的方法执行,如果执行后返回为函数,则挂载到实例对象上,销毁时执行。

此时就有小伙伴纳闷了,SafeSubscriberSubscriberadd方法...这些都是啥???

【硬核拆解】从源码实现一个rxjs(Observable篇)

我们先往下看,订阅时调用subscibe方法,接受三个可选参数,如下图

subscribe(
  observerOrNext?: Partial<Observer<T>> | ((value: T) => void) | null,
  error?: ((error: any) => void) | null,
  complete?: (() => void) | null
): Subscription {...}

点击查看对应源码

errorcomplete参数没有传,暂时无需关注,对于observerOrNext接口中定义了三种类型,分别是Observer<T>对象,函数(value: T) => void和空值null

subscribe方法真正需要的是Subscriber的实例对象,所以一开始就会进行判断,是否为Subscriber的实例对象。如果不是就会通过SafeSubscriber将我们传入的对象(包含 next、error 和 complete 函数)重新进行处理,使得其拥有添加订阅,取消订阅的功能(add、unsubscribe),接着我们看下SafeSubscriber如何实现。

SafeSubscriber

SafeSubscriber继承于 Subscriber,而Subscriber方法也是对入参observerOrNext进行了又一层封装,本质还是继承了Subscription方法,add、unsubscribe等核心方法最终是挂载在Subscription上(后面会提到)。

export class SafeSubscriber extends Subscriber {
  constructor(observerOrNext, error, complete) {
    super();
    this.destination = new Observer(observerOrNext, error, complete, this);
  }
}

点击查看对应源码

Observer

Observer 中对传入的observerOrNextnexterrorcomplete 三个方法做了一层兼容封装,保证即使没有如上三个方法,也会构成出兜底的方法。

export function defaultErrorHandler(err) {
  throw err;
}
export function noop() {
  return function () {};
}

export class Observer {
  static isObserver(obj) {
    return (
      typeof obj.next == "function" ||
      typeof obj.error == "function" ||
      typeof obj.error == "function"
    );
  }
  constructor(observerOrNext, error, complete, context) {
    if (Observer.isObserver(observerOrNext)) {
      error = observerOrNext.error;
      complete = observerOrNext.complete;
      observerOrNext = observerOrNext.next;
    }
    this.next = observerOrNext ? observerOrNext.bind(context) : noop;
    this.error = (error || defaultErrorHandler).bind(context);
    this.complete = complete ? complete.bind(context) : noop;
  }
}

Subscriber

Subscriber上的this.destination,在new SafeSubscriber的时候,被设置了 nexterrorcomplete三个方法属性(即Observer的实例对象)。

class Subscriber extends Subscription {
  isStopped = false;
  destination = null;
  constructor(destination) {
    super();
    this.destination = destination;
    if (destination instanceof Subscription) {
      destination.add(this);
    }
  }
  next(value) {
    if (!this.isStopped) {
     this.destination.next(value);
    }
  }
  unsubscribe() {
    if (!this.closed) {
      this.isStopped = true;
      super.unsubscribe();
      this.destination = null;
    }
  }
  complete() {
    if (!this.isStopped) {
      this.isStopped = true;
      try {
        this.destination.complete();
      } finally {
        this.unsubscribe();
      }
    }
  }
}

点击查看对应源码

通过isStoppedclosed两个flag对执行顺序进行了限制。

  • 确保complete不会重复执行
  • 确保complete执行后不会再执行next函数
  • 确保unsubscribe不会重复执行

接下来,我们继续看Subscription函数。

Subscription

现在就是真正的核心,即RxJS是如何实现 订阅/卸载 (add/unsubscribe)这一功能的。

export class Subscription {
  constructor() {
    this.closed = false; // 是否已经取消订阅
    this._disposeFuncs = []; // 回收函数列表
  }
  /**
   * 取消订阅,释放资源
   */
  unsubscribe() {
    if (!this.closed) {
      this.closed = true;
      this._disposeFuncs.forEach(i => {
        if (typeof i == "function") {
          i();
        } else if (i && typeof i.unsubscribe == "function") {
          i.unsubscribe();
        }
      });
    }
  }

  /**
   * 添加取消订阅处理
   * @param {*} dispose
   */
  add(dispose) {
    if (this.closed) {
      if (typeof dispose == "function") {
        dispose();
      } else if (dispose && typeof dispose.unsubscribe == "function") {
        dispose.unsubscribe();
      }
    } else {
      this._disposeFuncs.push(dispose);
    }
  }
}

点击查看对应源码

上面代码首先通过closed标记是否取消订阅,并提供两个方法。

  • add:如果已经取消订阅,则立刻执行收集到的函数,此时如果收集到的函数是继承于Subscription的实例对象,则进行卸载,如果还没取消订阅,就把函数进行收集,订阅结束时循环执行。

  • unsubscribe:取消订阅状态,将add收集到的函数循环执行。

*此处有点类似于 React.useEffect 最后返回的那个方法。

*源码中可以通过add方法链接其他的subscription,此处还有涉及到父子级的概念,在父subscription调用 unsubscribe方法取消订阅的时候,会调用子 subscription 的 unsubscribe,取消其下所有子孙 subscription 的订阅,此处代码没有提现,有兴趣可以点击源码查看。

以上就是我们代码的全部实现,全部代码抽出来应该在100行左右。有兴趣的可以将以上代码进行拷贝出来应该可以跑通一开始的Demo。

总结

以上就是一个简单Observable的tiny版实现,其实很多也还没有讲,比如

  • pipe是什么?
  • 操作符是如何链接的?
  • Subject是如何实现的?
  • ...

接下来我会分篇章一一实现。

手写一遍之后,可能会有一种感觉 "这不就是个观察者模式吗",如果仅仅想实现功能,也会有更简版的实现,十几行就能完成。

但是当我们深入源码,拆分代码,剥去外壳,去最终一步一步实现功能找到答案,而这个过程才是我们应当所追求的

毕竟 Programming Is Thinking Not Typing

【硬核拆解】从源码实现一个rxjs(Observable篇)


参考:

  • www.youtube.com/watch?v=BA1…
  • github.com/ReactiveX/r…
  • segmentfault.com/a/119000004…
  • /post/699102…
  • /post/703626…

以上就是 npm 包 rxjs 第一部分Observable的源码学习

本网站是一个以CSS、JavaScript、Vue、HTML为核心的前端开发技术网站。我们致力于为广大前端开发者提供专业、全面、实用的前端开发知识和技术支持。 在本网站中,您可以学习到最新的前端开发技术,了解前端开发的最新趋势和最佳实践。我们提供丰富的教程和案例,让您可以快速掌握前端开发的核心技术和流程。 本网站还提供一系列实用的工具和插件,帮助您更加高效地进行前端开发工作。我们提供的工具和插件都经过精心设计和优化,可以帮助您节省时间和精力,提升开发效率。 除此之外,本网站还拥有一个活跃的社区,您可以在社区中与其他前端开发者交流技术、分享经验、解决问题。我们相信,社区的力量可以帮助您更好地成长和进步。 在本网站中,您可以找到您需要的一切前端开发资源,让您成为一名更加优秀的前端开发者。欢迎您加入我们的大家庭,一起探索前端开发的无限可能!