undefined

bokuweb.me

実装して学ぶRxJS

実際にいくつかのオペレータを実装してみたらRxの気持ちがわかるかと思い実践してみました。 簡素化するために以下の方針とします。

  • unsubscribeしない
  • errorハンドリングしない

実装してみたのは以下です。

  • of
  • map
  • subject
  • filter
  • delay
  • fromPromise
  • combineLatest
  • switchMap

Observable

何はともあれ、まずはObservableを実装します。

class Observable {
  constructor(producer) {
    this.subscribe = producer
  }
}

コードはこれだけで、producerを受け取って、自身のsubscribeに接続します。 producerobserverを引数にとって、次に、どんなタイミングで、どんな値を流すか決定する関数です。 現時点ではイメージもわかないと思うので次ofを眺めたほうがわかりやすいかと思います。

of

次にofを実装します。ofは引数で受け取った値を順に流していくだけの最もシンプルなOperatorの一つです。

github.com

Observable.prototype.of = function (...values) {
  const producer = observer => {
    values.forEach(v => observer.next(v));
    observer.complete();
  }
  return new Observable(producer)
}

observerを引数にとるproducerを作成し、引数の値を順にobserver.nextで流し。完了すればobserber.completeします。 使用例は以下のようになります。1,2,3と値が流れ、completeします。冒頭で述べましたが、observerは本来errorをハンドリングする関数を含みますが簡素化のため削除しています。

const observer = {
  next: (v) => console.log(v),
  complete: () => console.log('complete'),
}

new Observable().of(1,2,3).subscribe(observer);

実際の動作を以下で確認することができます。

runkit.com

observersubscribeすることで初めて、producerが実行され、値が流れます。

map

もうひとつシンプルかつ、多用するoperatorであるmapを実装してみます。

Observable.prototype.map = function (f) {
  const producer = observer => {
    return this.subscribe({
      next(value) { observer.next(f(value)) },
      complete() { observer.complete() }
    })
  };
  return new Observable(producer);
}

関数を引数にとり、producerの中で受け取った関数を適用した値をobserver.nextで流します。 先程のofと合わせて以下のように使用します。

const observer = {
  next: (v) => console.log(v),
  complete: () => console.log('complete'),
}

new Observable().of(1,2,3).map(x => x * x).subscribe(observer); // 1, 4, 9, complete

1, 4, 9と値が流れcompleteします。 動作は以下で確認できます。

runkit.com

Subject

ここまで来るとあとは応用で、粛々とOperatorを追加していくだけなんですが、先にHotであるSubjectを実装しておきます。

class Subject extends Observable {
  constructor() {
    super(function (observer) {
      this.observers.push(observer);
    });
    this.observers = [];
  }

  next(x) {
    this.observers.forEach((observer) => observer.next(x));
  }

  complete() {
    this.observers.forEach((observer) => observer.complete());
  }
}

SubjectObservableを継承しsubscribeされると配信先を自身のリストに登録されるようにします。 また、登録された配信先に値を流せるようnextを生やします。nextでは登録済の配信先全てに値を流します。

const observer = {
  next: (v) => console.log(v),
  complete: () => console.log('complete'),
}

const subject$ = new Subject();
subject$.subscribe(observer);
subject$.next('hoge'); // hoge
subject$.next('fuga'); // fuga
subject$.complete();   // complete

良さそう。

runkit.com

filter

後は上記の応用でほとんどのものは実装できます。 filterは名前の通りなんですが、マーブルダイアグラムがあるとよりわかりやすいですね。

f:id:bokuweb:20170413231104p:plain

Observable.prototype.filter = function (f) {
  const producer = observer => {
    return this.subscribe({
      next(value) {
        if (f(value)) observer.next(value)
      },
      complete() { observer.complete() }
    })
  };
  return new Observable(producer);
}
const observer = {
  next: (v) => console.log(v),
  complete: () => console.log('complete'),
}

new Observable().of(1,2,3).map(x => x * x).filter(x => x % 2 === 0).subscribe(observer); // 4, complete

delay

こちらも名前の通り指定時間出力を遅延させるoperatorです。

Observable.prototype.delay = function (time) {
  const producer = observer => {
    return this.subscribe({
      next(value) {
        setTimeout(() => observer.next(value), time)
      },
      complete() {
        setTimeout(() => observer.complete(), time)
      }
    })
  };
  return new Observable(producer);
}

1秒後に1が流れてきます

const observer = {
  next: (v) => console.log(v),
  complete: () => console.log('complete'),
}

new Observable().of(1).delay(1000).subscribe(observer); // 1 after 1sec

fromPromise

promiseからobservableに変換するoperator

Observable.prototype.fromPromise = function (promised) {
  const producer = observer => {
    return this.subscribe({
      next(value) {
        promised.then((a) => {
          observer.next(a)
        })
      },
      complete() {
        promised.then(() => {
          observer.complete()
        })
      }
    })
  };
  return new Observable(producer);
}

5秒後に1が流れてきます

const observer = {
  next: (v) => console.log(v),
  complete: () => console.log('complete'),
}

new Observable()
  .of(null)
  .fromPromise(new Promise(resolve => {
    setTimeout(() => { resolve(1)}, 5000)
  }))
  .subscribe(observer); // 1 after 5sec

combineLatest

頻出ですが、このあたりから結構複雑ですね。 みんなだいすきcombineLatest

本来combineLatestは最後の引数が値を変換するtransform関数なんですが、省略も可能になっていて、今回はObservableの結合のみ行うOperatorとして実装しています。

f:id:bokuweb:20170413224908p:plain * http://rxmarbles.com/#combineLatestより

Observable.prototype.combineLatest = function (...observables) {

  const length = observables.length + 1;
  const producer = outObserver => {
    const values = [...Array(length)].map(_ => undefined);
    const hasValue = [...Array(length)].map(_ => false);
    const hasComplete = [...Array(length)].map(_ => false);

    const next = (x, index) => {
      values[index] = x;
      hasValue[index] = true;
      if (hasValue.every(x => x === true)) outObserver.next(values);
    };

    const complete = (index) => {
      hasComplete[index] = true;
      if (hasComplete.every(x => x === true)) outObserver.complete();
    };

    observables.forEach((observable, index) => {
      observable.subscribe({
        next: (x) => next(x, index + 1),
        complete: () => complete(index + 1),
      });
    });
    this.subscribe({
      next: (x) => next(x, 0),
      complete: () => complete(0),
    });
  };
  return new Observable(producer);
}

以下使用例。

new Observable()
   .of(0)
   .combineLatest(
     new Observable().of(1, 4),
     new Observable().of(2),
     new Observable().of(3).delay(1000),  
   )
   .subscribe(observer);  // [0, 4, 2, 3] after 1sec

動作を文書で説明するのがなかなか難しいOperatorだと思います。 1の値が流れていってしまうことに注意してください。初回のみ各streamの値が出揃うまで値は流れません。 次回以降値が流れてくる度に、他のstreamの最新の値と合わせて配列にパッキングされます。

以下のマーブルダイアグラムを動かしてみるのが一番しっくりくるかもしれません。

rxmarbles.com

動作確認は以下。

runkit.com

こっちはRx

runkit.com

switchMap

こちらも頻出、みんなだいすきswitchMapです。 実装も泥臭くて若干怪しいですが、おもちゃレベルでは動いてそうです。

switchMapは分かりやすいマーブルダイアグラムがないですね。。

Observable.prototype.switchMap = function (f) {
  const producer = outObserver => {
    let i = 0;
    let hasSourceCompleted = false;
    const completed = [];
    this.subscribe({
      next: (x) => {      
        i++;
        completed[i] = false;
        f(x).subscribe({
          next: ((index, y) => {
            if (index === i) {
              outObserver.next(y)
            }
          }).bind(this, i),
          complete: ((index) => {
            completed[index] = true;
            if (hasSourceCompleted && completed.every(x => x)) outObserver.complete();
          }).bind(this, i),
        });
      },
      complete: (() => {
        hasSourceCompleted = true;
        if (hasSourceCompleted && completed.every(x => x)) outObserver.complete();
      }),
    });
  };
  return new Observable(producer);
};

RxのswitchMapPromiseも展開してつないでくれますが、今回はObservableのみ対応しています。 使用例は以下。

const observer = {
  next(value) { console.log(value) },
  complete() { console.log('Done') }
}

new Observable().of(1, 2).switchMap((v) => {
  if (v === 1) return new Observable().of(v).delay(400);
  if (v === 2) return new Observable().of(v).delay(200);
}).subscribe(observer); // 2 after 200ms

12の値が順次流れてきて、1のときは400ms後に1が返るObservableが、2のときは200ms後に2が返るObservableがreturnされます。 200ms後に2が400ms後に1が流れてきそうですが、2switchMapに流れてきた時点でstreamは200ms後に2が返るObservableswitchされるのでobserverまで1が流れてくることはありません。

動作確認用

runkit.com

こっちはRx

runkit.com

さいごに

業務ではAngularを使用しているため、Observableの扱いにはいつも悩んでいて、もう少し仲良くなるために今回は実装してみました。 若干複雑なOperatorもありますが、mapfilterはかなりシンプルで、仕組みを知るにはちょうどいい題材ではないかと思います。 また趣味ではredux-observableを使用していて、わりと気に入っているのでもう少し使いこなせるようになりたいですね。