実際にいくつかのオペレータを実装してみたらRx
の気持ちがわかるかと思い実践してみました。
簡素化するために以下の方針とします。
unsubscribe
しないerror
ハンドリングしない
実装してみたのは以下です。
of
map
subject
filter
delay
fromPromise
combineLatest
switchMap
Observable
何はともあれ、まずはObservable
を実装します。
class Observable { constructor(producer) { this.subscribe = producer } }
コードはこれだけで、producer
を受け取って、自身のsubscribe
に接続します。
producer
はobserver
を引数にとって、次に、どんなタイミングで、どんな値を流すか決定する関数です。
現時点ではイメージもわかないと思うので次of
を眺めたほうがわかりやすいかと思います。
of
次にofを実装します。of
は引数で受け取った値を順に流していくだけの最もシンプルなOperatorの一つです。
Observable.prototype.of = function (...values) { const producer = observer => { values.forEach(v => observer.next(v)); observer.complete(); } return new Observable(producer) }
observer
を引数にとるproducer
を作成し、引数の値を順にobserver.next
で流し。完了すればobserber.complete
します。
使用例は以下のようになります。1,2,3と値が流れ、completeします。冒頭で述べましたが、observer
は本来error
をハンドリングする関数を含みますが簡素化のため削除しています。
const observer = { next: (v) => console.log(v), complete: () => console.log('complete'), } new Observable().of(1,2,3).subscribe(observer);
実際の動作を以下で確認することができます。
observer
がsubscribe
することで初めて、producer
が実行され、値が流れます。
map
もうひとつシンプルかつ、多用するoperator
であるmap
を実装してみます。
Observable.prototype.map = function (f) { const producer = observer => { return this.subscribe({ next(value) { observer.next(f(value)) }, complete() { observer.complete() } }) }; return new Observable(producer); }
関数を引数にとり、producer
の中で受け取った関数を適用した値をobserver.next
で流します。
先程のof
と合わせて以下のように使用します。
const observer = { next: (v) => console.log(v), complete: () => console.log('complete'), } new Observable().of(1,2,3).map(x => x * x).subscribe(observer); // 1, 4, 9, complete
1
, 4
, 9
と値が流れcompleteします。
動作は以下で確認できます。
Subject
ここまで来るとあとは応用で、粛々とOperator
を追加していくだけなんですが、先にHot
であるSubject
を実装しておきます。
class Subject extends Observable { constructor() { super(function (observer) { this.observers.push(observer); }); this.observers = []; } next(x) { this.observers.forEach((observer) => observer.next(x)); } complete() { this.observers.forEach((observer) => observer.complete()); } }
Subject
はObservable
を継承しsubscribe
されると配信先を自身のリストに登録されるようにします。
また、登録された配信先に値を流せるようnext
を生やします。next
では登録済の配信先全てに値を流します。
const observer = { next: (v) => console.log(v), complete: () => console.log('complete'), } const subject$ = new Subject(); subject$.subscribe(observer); subject$.next('hoge'); // hoge subject$.next('fuga'); // fuga subject$.complete(); // complete
良さそう。
filter
後は上記の応用でほとんどのものは実装できます。
filter
は名前の通りなんですが、マーブルダイアグラムがあるとよりわかりやすいですね。
Observable.prototype.filter = function (f) { const producer = observer => { return this.subscribe({ next(value) { if (f(value)) observer.next(value) }, complete() { observer.complete() } }) }; return new Observable(producer); }
const observer = { next: (v) => console.log(v), complete: () => console.log('complete'), } new Observable().of(1,2,3).map(x => x * x).filter(x => x % 2 === 0).subscribe(observer); // 4, complete
delay
こちらも名前の通り指定時間出力を遅延させるoperator
です。
Observable.prototype.delay = function (time) { const producer = observer => { return this.subscribe({ next(value) { setTimeout(() => observer.next(value), time) }, complete() { setTimeout(() => observer.complete(), time) } }) }; return new Observable(producer); }
1秒後に1
が流れてきます
const observer = { next: (v) => console.log(v), complete: () => console.log('complete'), } new Observable().of(1).delay(1000).subscribe(observer); // 1 after 1sec
fromPromise
promise
からobservable
に変換するoperator
Observable.prototype.fromPromise = function (promised) { const producer = observer => { return this.subscribe({ next(value) { promised.then((a) => { observer.next(a) }) }, complete() { promised.then(() => { observer.complete() }) } }) }; return new Observable(producer); }
5秒後に1
が流れてきます
const observer = { next: (v) => console.log(v), complete: () => console.log('complete'), } new Observable() .of(null) .fromPromise(new Promise(resolve => { setTimeout(() => { resolve(1)}, 5000) })) .subscribe(observer); // 1 after 5sec
combineLatest
頻出ですが、このあたりから結構複雑ですね。
みんなだいすきcombineLatest
。
本来combineLatest
は最後の引数が値を変換するtransform
関数なんですが、省略も可能になっていて、今回はObservableの結合のみ行うOperator
として実装しています。
* http://rxmarbles.com/#combineLatestより
Observable.prototype.combineLatest = function (...observables) { const length = observables.length + 1; const producer = outObserver => { const values = [...Array(length)].map(_ => undefined); const hasValue = [...Array(length)].map(_ => false); const hasComplete = [...Array(length)].map(_ => false); const next = (x, index) => { values[index] = x; hasValue[index] = true; if (hasValue.every(x => x === true)) outObserver.next(values); }; const complete = (index) => { hasComplete[index] = true; if (hasComplete.every(x => x === true)) outObserver.complete(); }; observables.forEach((observable, index) => { observable.subscribe({ next: (x) => next(x, index + 1), complete: () => complete(index + 1), }); }); this.subscribe({ next: (x) => next(x, 0), complete: () => complete(0), }); }; return new Observable(producer); }
以下使用例。
new Observable() .of(0) .combineLatest( new Observable().of(1, 4), new Observable().of(2), new Observable().of(3).delay(1000), ) .subscribe(observer); // [0, 4, 2, 3] after 1sec
動作を文書で説明するのがなかなか難しいOperator
だと思います。
1
の値が流れていってしまうことに注意してください。初回のみ各streamの値が出揃うまで値は流れません。
次回以降値が流れてくる度に、他のstreamの最新の値と合わせて配列にパッキングされます。
以下のマーブルダイアグラムを動かしてみるのが一番しっくりくるかもしれません。
動作確認は以下。
こっちはRx
版
switchMap
こちらも頻出、みんなだいすきswitchMap
です。
実装も泥臭くて若干怪しいですが、おもちゃレベルでは動いてそうです。
switchMap
は分かりやすいマーブルダイアグラムがないですね。。
Observable.prototype.switchMap = function (f) { const producer = outObserver => { let i = 0; let hasSourceCompleted = false; const completed = []; this.subscribe({ next: (x) => { i++; completed[i] = false; f(x).subscribe({ next: ((index, y) => { if (index === i) { outObserver.next(y) } }).bind(this, i), complete: ((index) => { completed[index] = true; if (hasSourceCompleted && completed.every(x => x)) outObserver.complete(); }).bind(this, i), }); }, complete: (() => { hasSourceCompleted = true; if (hasSourceCompleted && completed.every(x => x)) outObserver.complete(); }), }); }; return new Observable(producer); };
RxのswitchMap
はPromise
も展開してつないでくれますが、今回はObservable
のみ対応しています。
使用例は以下。
const observer = { next(value) { console.log(value) }, complete() { console.log('Done') } } new Observable().of(1, 2).switchMap((v) => { if (v === 1) return new Observable().of(v).delay(400); if (v === 2) return new Observable().of(v).delay(200); }).subscribe(observer); // 2 after 200ms
1
と2
の値が順次流れてきて、1
のときは400ms後に1
が返るObservable
が、2
のときは200ms後に2
が返るObservable
がreturnされます。
200ms後に2
が400ms後に1
が流れてきそうですが、2
がswitchMap
に流れてきた時点でstreamは200ms後に2が返るObservable
にswitch
されるのでobserver
まで1
が流れてくることはありません。
動作確認用
こっちはRx
版
さいごに
業務ではAngular
を使用しているため、Observable
の扱いにはいつも悩んでいて、もう少し仲良くなるために今回は実装してみました。
若干複雑なOperator
もありますが、map
やfilter
はかなりシンプルで、仕組みを知るにはちょうどいい題材ではないかと思います。
また趣味ではredux-observable
を使用していて、わりと気に入っているのでもう少し使いこなせるようになりたいですね。