【Rx】ColdなObservableの仕組みを見てみる

はじめに

この記事では ColdObservable がなぜあんな性質を持っているのかを実装を追いながら簡単に解説したいと思います。
分かりやすく追っていったつもりなので、内部実装を理解する手助けになればといいなと思っています。

Observableの性質

まずは2つの性質について軽くおさらいしてみます。

ColdなObservable

  • 購読されるまで何もしない。
  • ストリームを自分のところから枝分かれさせる能力がない。
  • 大抵のオペレータがこっち。

HotなObservable

  • 購読されなくても値を発行する能力を持つ。
  • 複数のObserverを持つことができ、同じ値をそれぞれに発行する。
    (ストリームを自分のところから枝分かれさせることができる。)
  • Publish系のオペレータはこっち。

オペレータの内部実装

ということで内部実装を見ながら挙動について考えてみます。
実装は UniRx のコードを参考にしたいと思います。

大抵のオペレータは Cold な性質なので、今回は .Where<T>(Func<T, bool>) オペレータの実装を見てみます。
このオペレータは色々と最適化がされていますが、今回は動作に影響を及ぼさない範囲で簡略化したコードで考えてみます。

using System;

namespace UniRx.Operators
{
    public abstract class OperatorObservableBase<T> : IObservable<T>
    {
        readonly bool isRequiredSubscribeOnCurrentThread;

        public OperatorObservableBase(bool isRequiredSubscribeOnCurrentThread)
        {
            this.isRequiredSubscribeOnCurrentThread = isRequiredSubscribeOnCurrentThread;
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            var subscription = new SingleAssignmentDisposable();
            return subscription.Disposable = SubscribeCore(observer, subscription);
        }

        protected abstract IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel);
    }

    public abstract class OperatorObserverBase<TSource, TResult> : IObserver<TSource>, IDisposable
    {
        protected internal volatile IObserver<TResult> observer;
        IDisposable cancel;

        public OperatorObserverBase(IObserver<TResult> observer, IDisposable cancel)
        {
            this.observer = observer;
            this.cancel = cancel;
        }

        public abstract void OnNext(TSource value);

        public abstract void OnError(Exception error);

        public abstract void OnCompleted();

        public void Dispose()
        {
            observer = UniRx.InternalUtil.EmptyObserver<TResult>.Instance;
            cancel.Dispose();
        }
    }

    internal class WhereObservable<T> : OperatorObservableBase<T>
    {
        readonly IObservable<T> source;
        readonly Func<T, bool> predicate;

        public WhereObservable(IObservable<T> source, Func<T, bool> predicate)
            : base(source.IsRequiredSubscribeOnCurrentThread())
        {
            this.source = source;
            this.predicate = predicate;
        }

        protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
        {
            return source.Subscribe(new Where(this, observer, cancel));
        }

        class Where : OperatorObserverBase<T, T>
        {
            readonly WhereObservable<T> parent;

            public Where(WhereObservable<T> parent, IObserver<T> observer, IDisposable cancel)
                : base(observer, cancel)
            {
                this.parent = parent;
            }

            public override void OnNext(T value)
            {
                var isPassed = false;
                try
                {
                    isPassed = parent.predicate(value);
                }
                catch (Exception ex)
                {
                    try { observer.OnError(ex); } finally { Dispose(); }
                    return;
                }

                if (isPassed)
                {
                    observer.OnNext(value);
                }
            }

            public override void OnError(Exception error)
            {
                try { observer.OnError(error); } finally { Dispose(); }
            }

            public override void OnCompleted()
            {
                try { observer.OnCompleted(); } finally { Dispose(); }
            }
        }
    }
}

購読されるまで何もしないという挙動について

ColdObservable と言ったらこの性質ですね。
これについては SubscribeCore メソッドを見たら分かると思います。

LINQ ではオペレータ用の EnumeratorMoveNext メソッドの中で1つ前の MoveNext を呼びだすことによって実現されています。
Rx でも同じように、オペレータ用の Observer が1つ後ろの Observer に通知することによって実現されています。
その通知の連鎖を実現するため、1つ前の Observable に自身のオペレータ用の Observer を登録する処理が必要になります。

その購読処理を行うのは SubscribeCore メソッドです。
このメソッドは OperatorObservableBaseSubscribe メソッド内で呼び出されています。
(今回の例では省きましたが、購読を CurrentThread で行ってくれるような処理がラップされています。)

WhereObservable.SubscribeCore メソッド内でもちゃんと source.Subscribe(new Where(this, observer, cancel)); というように購読処理がされています。
このとき、sourceSubscribe メソッドを呼び出していますが、もし、sourceOperatorObservableBase の派生であれば、同じように SubscribeCore メソッドが呼ばれ、購読処理の連鎖が行われていることでしょう。

このことから、ColdObservable は購読されていないときは何もしないという性質であることが分かりますね。
だって、SubscribeCore メソッドが呼ばれていないと、オペレータ用の Observer のインスタンスが作られてすらないんだもん。

ストリームの枝分かれ機能がないことについて

ここでいう「ストリームの枝分かれ機能」というのは「複数の Observer を持つことができ、それらに同じ値を通知することができる」という機能とします。

これを実現するためには、後続の Observer をリストで複数保持している必要があります。
しかし、OperatorObserverBase の実装を見てみると、Observer を1つしか持っていません。
これだと、複数の Observer に値を通知することができるはずがありません。

さらに、ごもっともなのですが、複数の Observer を保持するということは、複数回 Subscribe メソッド、それを通して SubscribeCore メソッドが呼ばれるはずです。
SubscribeCore メソッド内で new Where(this, observer, cancel) と書かれていますが、これは購読処理が呼ばれるごとに新しい別のオペレータ用 Observer が作られているということです。
それを sourceSubscribe メソッドを呼び出して複数登録するわけですから、自分のオペレータのところで枝分かれせず、根本から別々のストリームを複数作ってしまいます。

ということから、ColdObservable はストリームの枝分かれ機能がないということが明らかですね。

最後に

今回は ColdObservable の挙動の仕組みを探るべく、実際にオペレータの実装を追いながら解説していきましたが、まぁ実装を追うということは大事ですね。
特に Rx は性質を適切に使い分けないと思った通りの動作をしてくれないですし、そのうえ、性質の見分けがつきづらいですしね。
(LINQ はオペレータが具象型を返すかインターフェース型を返すかである程度判断できたりするが、Rx の場合はそうはいかないことが多い。)

ちゃんと理解して使おうとなれば実際のコードを読みに行くという機会も増えるでしょうし、そのための何らかの理解の助けになれば幸いです。

何かご指摘等があれば気軽にコメントかTwitterで教えてください。

2017/10/22 22:11:32
コメントを投稿する