EIP / JMS Publish-Subscribe Example


JMS Publish-Subscribe Example

一言要約

Publish-Subscribe Channelを使えば、分散環境でも手軽にObserverパターンを実装できる。他の部分で一切メッセージングを使っていないアプリでも、もしお互いに変更を伝え合う必要があるのなら、それだけのためにメッセージングを導入する価値があるだろう。

はじめに

この例ではpublish-subscribeメッセージングの威力を見つつ、それ以外の設計方法についても探ってみる。 それは、複数の購読(subscriber)アプリケーションが、イベントをただの一度だけ発行することにより、その単一のイベント通知を受けることができることを示している。また、そのイベントの詳細をいかに通信するかという代替戦略も考える。

Publish-Subscribe Channelの真の威力を理解するには、複数アプリによる分散環境でのObserverパターンの実装がいかに難しいかを知る必要がある。そこで、まずはObserverパターンのおさらいからはじめよう。

The Observer Pattern(Observerパターン)

Observerパターン[GoF]は、ある変更の依存していもの(dependents)を通知することが出来るあるオブジェクトを通して、設計手法を記述している。依存しているもの(dependents)からは切り離されたそのオブジェクトを保持している間、どんなに多くの依存するもの(dependents)があろうとも、そのオブジェクトがちょうどうまく働くようになっている。もしそれが、まったく何ももっていないとしても。 その参加者は、ひとつのSubject(そのオブジェクトは、その状態の変更を通知する)と、複数のObserver(そのオブジェクトは、そのSubjectの変更通知を受け取ることに興味がある)である。 あるSubjectの状態が変化すると、それその Notify() メソッドを呼び出し、その実装はObserverのリストを知っていて、それらそれぞれの Update() を呼び出す。 いくつかのObserverは、この状態変更には興味が無いかも知れないが、それらのものは、Subjectの GetStat?() を呼び出すことで、何が新しい状態かを知ることが出来る。 そのSubjectは、 興味のある人を登録するためにObserverが使用する Attach(Observer) と Detach(Observer) メソッドを実装しなければならない。

Observerパターンは、SubjectからObjectへ、新しい状態を入手するためのふたつの方法を提供している。PushモデルとPullモデルである。 Pushモデルでは、各Observerを呼び出すUpdateメソッドは、パラメータとして新しい状態を含んでいる。 ゆえに、興味のあるObserverは、GetState?メソッドを呼ばなければいけないということから開放されるが、いっぽう、興味の無いObserverへデータを渡すのは無駄な努力である。 逆のアプローチがPullモデルである。Subjectは基本的な通知を送り、各ObserverはSubjectから新しい状態を要求する。 ゆえに、まったく何も無くても、各Observerはそれが欲している正確な詳細を要求することが出来る。しかし、Subjectはしばしば、同じデータを要求する複数のリクエストを処理しなければならない。 Pushモデルは、Subjectが更新の一部としてあるObserverへのデータを送る、という単一のOne-Wayの通信を要求する。 Pullモデルは、SubjectがあるObserverに通知し、ObserverがSubjectに現在の状態を要求し、Subjectが現在の状態をObserverへ送る、という3つのOne-Way通信を要求する。 われわれがこれから見るように、One-Way通信の数は、設計時の複雑さと実行時の通知性能に影響する。

SubjectのNotifyメソッドを実装する最も簡単な方法は、シングルスレッドでやることだ。しかし、それは、望んでいない性能を暗示している。 シングルスレッドは一度にひとつのObserverをひとつずつ更新していくので、Observerの長いリストの最後のものは、更新されるのに長い間待つ必要があるかもしれない。 また、そのすべてのObserverを更新する長い時間を費やしているSubjectは、まったく何も達成していない。 さらに悪いことに、状態を問い合わせて新しいデータを処理することにより、あるObserverは、その更新に反応するためのその更新スレッドを都合よく使ってしまうかも知れない。そういったObserverは更新スレッド上で動作し、さらに長い時間を更新処理にかけさせてしまう。 したがって、SubjectのNotifyメソッドを実装するより洗練された方法は、それ自身のスレッドでそれぞれのUpdateメソッドを走られることである。 その結果、すべてのObserverは同時に更新され、それぞれのどんな作業がその中で行われるとしても、更新スレッドは他のObserverやSubjectを遅らせることはない。 そのマイナス面は、マルチスレッドを実装することやスレッド操作の問題を取り扱うことは、より複雑になるということである。

Distributed Observer(分散環境でのObserverパターン)

Observerパターンは、subjectとobserverたちが同じアプリケーション内で動いているという前提になりがちだ。でも、このパターンは分散環境にも対応している。つまり、そのObserverは、Subjectとたぶんお互いに、別々のメモリー領域で動作するが分散動作する。 Update()とGetState?()メソッド、同様にAttach()とDetach()メソッドは、リモートからアクセス可能になってなければならない。(Remote Procedure Invocation[50]を参照) なぜなら、そのSubjectは各Observerを呼び出すことが出来る必要があり、逆もまた同様に、各オブジェクトは何かORB(Object Request Broker)環境上で動作しなければならないからである。それは、そのオブジェクトが、それがリモートから呼び出されることが出来るようにする。 その更新の詳細と状態データメモリー空間間で受け渡されるので、そのアプリケーション群はシリアライズ、つまり、受け渡すオブジェクトを整列(marshal)できなければならない。

ゆえに、分散環境内でObserverパターンを実装することは、思ったより複雑になりうる。 実装するのに多少複雑なマルチスレッドなObserverパターンだけでなく、リモートからアクセスできるようにメソッドを作りリモートから呼び出すことも、より複雑度を増す。 いくらかの状態変化に依存するもの(dependents)に通知することは、多くの作業になりうる。

他の問題として、Remote Procedure Invocation(50) は、呼び出し元とターゲットとそれらをつなぐネットーワークがすべて正しく動作するときのみうまくいく、ということである。 もしSubjectが変更を通知するとき、リモートObserverがその通知を処理出来る状態でなかったり、ネットワークから切断されていると、そのObserverはその通知を失う。 Observerが、あるときは、通知が無くてもうまく動作するかもしれないが、別の時には、その失った通知が、ObserverがSubjectとの同期から外れることを招くかもしれない。 まさに、オブザーバーパターン問題は防ぐように作られている。(注:よく意味が分からなかった)

分散処理は、また、PullモデルよりPushモデルを好む。 以前、議論したように、Pushモデルは単一の一方向通信を要求するのに対し、Pullモデルは3つ要求する。 分散処理がRPC(Remote Procedure Call)で実装されるなら、Pushモデルはひとつの呼び出し(Update())を要求するのに対し、Pullモデルは少なくとも2つの呼び出し(Update()とGetState?())を要求する。 RPCには、非分散メソッド呼び出しより多くのオーバーヘッドがある。なので、Pushモデルアプローチにより要求される追加の呼び出しは、すぐに性能を傷つけやすい。


Publish-Subscribe

Publish-Subscribe Channelは、Observerパターンを実装し、分散環境でも使いやすくしたもの。次の三段階の手順で実装する。

  1. メッセージングシステム管理者は Publish-Subscribe Channel(106) を生成する。(これは、 Javaアプリケーションでは、JMSの "Topic" として表現される。)
  2. Subjectとして動作するアプリケーションは、TopicPublisher?(MessageProducer?の一種)を、チャネルにメッセージを送るために、生成する。
  3. Observer(たとえば、依存するもの(dependent))として動作する、それぞれのアプリケーションは、TopicSubscriber?(MessageConsumer?の一種)を、チャネル上のメッセージを受信するために、生成する。(これは、オブザーバーパターンにおいて、Attach(Observer)メソッドを呼び出すことに似ている。)

これは、SubjectとObserver間のコネクションをチャネルを通して確立する。 Subjectに通知するべき変更があるときには、いつも、メッセージを送ることによりそれをする。 チャネルは、各Observerがメッセージのコピーを受信することを保障する。

これは、変更を通知するのに必要な単純なサンプルコードである。

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.NamingException;

public class SubjectGateway {

    public static final String UPDATE_TOPIC_NAME = "jms/Update";
    private Connection connection;
    private Session session;
    private MessageProducer updateProducer;

    protected SubjectGateway() {
        super();
    }

    public static SubjectGateway newGateway() throws JMSException, NamingException {
        SubjectGateway gateway = new SubjectGateway();
        gateway.initialize();
        return gateway;
    }

    protected void initialize() throws JMSException, NamingException {
        ConnectionFactory connectionFactory = JndiUtil.getQueueConnectionFactory();
        connection = connectionFactory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination updateTopic = JndiUtil.getDestination(UPDATE_TOPIC_NAME);
        updateProducer = session.createProducer(updateTopic);

        connection.start();
    }

    public void notify(String state) throws JMSException {
        TextMessage message = session.createTextMessage(state);
        updateProducer.send(message);
    }

    public void release() throws JMSException {
        if (connection != null) {
            connection.stop();
            connection.close();
        }
    }
}

SubjectGateway? はSubject(現れていない)とメッセージングシステム間の message Gateway(468) である。 そのSubjectはゲートウェーを生成し、そして、通知をブロードキャストするためにそれを使用する。 本質的には、Subjectの Notify() メソッドは SubjectGateway?.notify(String) を呼び出すために実装される。 そして、そのゲートウェーは、更新チャネルにメッセージを送ることにより、変更を通知する。

これは、変更通知を受信するために必要な、サンプルコードである。

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.NamingException;

public class ObserverGateway implements MessageListener {

    public static final String UPDATE_TOPIC_NAME = "jms/Update";
    private Observer observer;
    private Connection connection;
    private MessageConsumer updateConsumer;

    protected ObserverGateway() {
        super();
    }

    public static ObserverGateway newGateway(Observer observer)
        throws JMSException, NamingException {
        ObserverGateway gateway = new ObserverGateway();
        gateway.initialize(observer);
        return gateway;
    }

    protected void initialize(Observer observer) throws JMSException, NamingException {
        this.observer = observer;

        ConnectionFactory connectionFactory = JndiUtil.getQueueConnectionFactory();
        connection = connectionFactory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination updateTopic = JndiUtil.getDestination(UPDATE_TOPIC_NAME);
        updateConsumer = session.createConsumer(updateTopic);
        updateConsumer.setMessageListener(this);
    }

    public void onMessage(Message message) {
        try {
            TextMessage textMsg = (TextMessage) message; // assume cast always works
            String newState = textMsg.getText();
            update(newState);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void attach() throws JMSException {
        connection.start();
    }

    public void detach() throws JMSException {
        if (connection != null) {
            connection.stop();
            connection.close();
        }
    }

    private void update(String newState) throws JMSException {
        observer.update(newState);
    }
}

ObserverGateway? は、今回は、Observer(現れていない)とメッセージングシステム間の、もうひとつの Messaging Gateway(468)である。 Observerはゲートウェーを生成し、そして、コネクションを開始するために attach() を使用する。 (それは、オブザーバーパターンにおいて Attach(Observer) メソッドを呼び出すのに似ている。) そのゲートウェーは Event Driven Consumer(498) であるので、それは MessageListener? インターフェースを実装し、それは onMessage メソッドを要求する。 この方法では、ある更新が受信されると、そのゲートウェーは、新しい状態を得てそれ自身の update(String) メソッドを呼び出すために、メッセージを処理する。そして、それは、Observer内の同様のメッセージを呼び出す。

その2つのクラスはオブザーバーパターンのPushモデルを実装する。 SubjectGateway?.notify(String)により送られた通知メッセージにおいて、メッセージが存在するということは、Observerに変更が起こった、ということを示している。しかし、それは、Subjectの新しい状態を、そのObserverに示しているメッセージの中身である。 その新しい状態はSubjectからObserverにpushされてつづけている。 後で見るように、Pullモデルを使用してこの機能を実装する他の方法もある。

Comparisons(メッセージングとRPCの比較)

分散アプリケーション間での通知には、 昔ながらの同期方式(RPCなど)でObserverを実装するよりも、 メッセージングなどのPublish-Subscribe Channelのほうが何かと有利だ。なぜなら…

通知を単純化する (Simplifies notification)

SubjectのNotify()の実装は、とても単純になる。そのコードはチャネルにメッセージを送らなければいけないだけだ。 同様に、Observer.Update() はメッセージを受け取らなければいけないだけだ。

追加と削除を単純化する (Simplifies attach/detach)

Subjectに追加や削除するよりむしろ、Observerはチャネルを購読したりや購読をやめたりする必要がある。 そのSubjectはAttach(Observer)やDetach(Observer)を実装する必要は無い。 (けれども、そのObserverは、購読したり購読を止めたりする挙動をカプセル化するためのそれらのメソッドを実装するかもしれないが。)

並列スレッド処理を単純化する (Simplifies concurrent threading)

Subjectは、並行的にすべてのObserverを更新するために、ただひとつのスレッドしか必要としない。 つまり、チャネルは通知メッセージをObserverに並行的に配送する。 そして、各Observerは、自分のスレッドでその更新を取り扱う。 各Subject(Observer)は、他へは影響しない自分のスレッドを使うので、これは、Subjectの実装を単純化する。


遠隔アクセスを単純化する (Simplifies remote access)

SubjectもObserverも、リモートメソッドを実装する必要は無い。そしてまた、ORB内で実行する必要も無い。 それらは、メッセージングシステムにアクセスする必要があるだけで、分散処理ができる。

信頼性を向上させる (Increases reliability)

チャネルはメッセージングを使用するので、Observerがそれらを処理できるまで、通知はキューの中に入れられる。それは、Observerが通知を処理する速度を調整することを可能にする。 もし、Observerが接続されていないときに送られた通知を受信したいならば、それは Durable Subscriber(522) にするべきである。

publish-subscribeアプローチでも変えられないひとつの問題は、直列化(serialization)である。 オブザーバーパターンがRPCやメッセージングで実装されていようとなかろうと、状態データはSubjectのメモリ領域から各Observerのメモリ領域へ分散処理されるには、そのデータは直列化(serialized、つまり、marshaled)されなればならない。 この動作はどんなアプローチであろうと、実装されなければならない。

もし、publish-subscribeアプローチに不利な点があるとしたら、そのアプローチはメッセージングを要求するということである。それは、SubjectとObserverアプリケーションが共有されたメッセージングシステムにアクセスしなければならなず、また、そのメッセージングシステムのクライアントとして実装されなければならない、ということを意味する。 もう、メッセージングクライアントとしてアプリケーションを作るのは、RPCアプローチを使うよりも、もはや難しくは無い。

Push and Pull Models(プッシュモデルとプルモデル)

Publish-Subscribe Channelの弱点になりそうなところがもうひとつある。 プルモデルの実装がプッシュモデルよりも複雑になるということだ。さっき説明したように、Pullモデルは、Pushモデルよりより行きつ戻りつな議論を要求する。 議論が分散アプリケーションにあるとき、外部通信は性能を大いに傷つける。

通信はRPCよりもメッセージングのほうがより複雑だ。 両方の場合(voidを返すRPCか、SubjectからObserverへの単一のEvent Messageのどちらか)において、Update() は一方向通信だ。 より手の込んだ部分は、ObserverがSubjectの状態を問い合わせる必要があるときだ。 GetState?()は、状態要求しそれを返す単一のRPCか、Request-Reply(154)(Command Message(145)が状態を要求し、別のDocument Message(147)がそれを返す)のどてらにしても、二方向通信である。

何がRequest-Reply(154)をより難しくするかというと、単にそれが2つのメッセージを要求するからではない。それは、それがそれらのメッセージを送信するために、2つのチャネルを要求するからである。 あるチャネル(状態要求チャネル)が、ObserverからSubjectに向かって配置され、Observerはそのチャネルで状態要求メッセージを送る。 もうひとつのチャネル(状態返信チャネル)はSubjectからObserverに逆に向かって配置され、Subjectはそのチャネルで状態返信メッセージを送る。 すべてのObserverは同じリクエストチャネルを共有することが出来る。しかし、それらはおそらくそれら自身の返信チャネルをおそらく必要とするだろう。 各Observerはすべての応答を受信する必要は無いが、その特定の要求に対する特定の応答を受信する必要がある。これを保障する最も簡単な方法は、各Observerがそれぞれ個別の応答チャネルを持つことである。 (代替手段は、単一の応答チャネルを持ちいて、また、どの応答がどのObserverに行くべきか判別するために、Correlation Identifiers(163)(関係性ID)を用いることである。しかし、Observerごとに個別のチャネルのほうが、よほど実装するのが簡単である。)

各Observerごとの応答チャネルは、チャネルの爆発を引き起こす。 そんなにたくさんのチャネルは管理可能かもしれないが、それらのチャネルを使わなければいけない多くのObserverが動作時に動的に変更する時に、メッセージングシステム管理者は、どれくらい静的チャネルが生成されているか分からない。 たとえ、すべてのObserverのためのチャネルが十分にあったとしても、どうやって各Observerがどのチャネルを使えばよいか知るのだろうか。

JMSは、特にこの用途のために、ある機能 TemporaryQueue? を持っている。 (Request-Reply[154]の議論もまた参照すること。) Observerは、その占有する排他的な一時キューを生成することができて、そのリクエストの中にReturn Address(159)としてそのキューを指定し、そして、そのキューで応答を待つ。 新しいキューをたびたび生成することは、あなたのメッセージングシステムの実装に依存するが、非効率になりうる。また、一時キューは永続性を持ち得ない。(Guaranteed Delivery[122]と共に使用できない) しかしながら、もしPushモデルを使用したくないなら、一時キューを用いてPullモデルを実装することができる。

次の2つのクラスは、Pullモデルを用いてどうやってゲートウェーを実装するかを示している。

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.NamingException;

public class PullSubjectGateway {

    public static final String UPDATE_TOPIC_NAME = "jms/Update";
    private PullSubject subject;
    private Connection connection;
    private Session session;
    private MessageProducer updateProducer;

    protected PullSubjectGateway() {
        super();
    }

    public static PullSubjectGateway newGateway(PullSubject subject)
        throws JMSException, NamingException {
        PullSubjectGateway gateway = new PullSubjectGateway();
        gateway.initialize(subject);
        return gateway;
    }

    protected void initialize(PullSubject subject) throws JMSException, NamingException {
        this.subject = subject;

        ConnectionFactory connectionFactory = JndiUtil.getQueueConnectionFactory();
        connection = connectionFactory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination updateTopic = JndiUtil.getDestination(UPDATE_TOPIC_NAME);
        updateProducer = session.createProducer(updateTopic);

        new Thread(new GetStateReplier()).start();

        connection.start();
    }

    public void notifyNoState() throws JMSException {
        TextMessage message = session.createTextMessage();
        updateProducer.send(message);
    }

    public void release() throws JMSException {
        if (connection != null) {
            connection.stop();
            connection.close();
        }
    }

    private class GetStateReplier implements Runnable, MessageListener {

        public static final String GET_STATE_QUEUE_NAME = "jms/GetState";
        private Session session;
        private MessageConsumer requestConsumer;

        public void run() {
            try {
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                Destination getStateQueue = JndiUtil.getDestination(GET_STATE_QUEUE_NAME);
                requestConsumer = session.createConsumer(getStateQueue);
                requestConsumer.setMessageListener(this);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        public void onMessage(Message message) {
            try {
                Destination replyQueue = message.getJMSReplyTo();
                MessageProducer replyProducer = session.createProducer(replyQueue);

                Message replyMessage = session.createTextMessage(subject.getState());
                replyProducer.send(replyMessage);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

PullSubjectGateway?SubjectGateway? ととてもよく似ている。 Pullモデル版はそのSubjectへの参照を持っているので、ゲートウェーは、Observerが必要な時に、そのSubjectに状態を問い合わせることができる。 notify(String) は notifyNoState?() になった。なぜなら、Pullモデルは、どんな状態も含むことなく、通知を単純に送るからである。(そして、Javaは既に notify() という名前のメソッドを既に使っているからである。)

Pullモデルのための大きな追加は、それが自分自身のスレッドで動作できるうようにするためのRunnableを実装する内部クラスGetStateReplier?である。 それは、また、MessageListener?であり、それは、それを Event-Driven Cosumer(498)にさせる。 そのonMessageメソッドは、GetState?キューから要求を読み込み、Subjectの状態を含む応答を、要求に指定されたキューに送る。 この方法では、あるObserverが GetState?() 要求を作るときに、ゲートウェーは応答を送る。(Request-Reply[154]を参照すること。)

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueRequestor;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.NamingException;

public class PullObserverGateway implements MessageListener {

    public static final String UPDATE_TOPIC_NAME = "jms/Update";
    public static final String GET_STATE_QUEUE_NAME = "jms/GetState";
    private PullObserver observer;
    private QueueConnection connection;
    private QueueSession session;
    private MessageConsumer updateConsumer;
    private QueueRequestor getStateRequestor;

    protected PullObserverGateway() {
        super();
    }

    public static PullObserverGateway newGateway(PullObserver observer)
        throws JMSException, NamingException {
        PullObserverGateway gateway = new PullObserverGateway();
        gateway.initialize(observer);
        return gateway;
    }

    protected void initialize(PullObserver observer) throws JMSException, NamingException {
        this.observer = observer;
        QueueConnectionFactory connectionFactory = JndiUtil.getQueueConnectionFactory();
        connection = connectionFactory.createQueueConnection();
        session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination updateTopic = JndiUtil.getDestination(UPDATE_TOPIC_NAME);
        updateConsumer = session.createConsumer(updateTopic);
        updateConsumer.setMessageListener(this);

        Queue getStateQueue = (Queue) JndiUtil.getDestination(GET_STATE_QUEUE_NAME);
        getStateRequestor = new QueueRequestor(session, getStateQueue);
    }

    public void onMessage(Message message) {
        try {
            // message's contents are empty
            updateNoState();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void attach() throws JMSException {
        connection.start();
    }

    public void detach() throws JMSException {
        if (connection != null) {
            connection.stop();
            connection.close();
        }
    }

    private void updateNoState() throws JMSException {
        TextMessage getStateRequestMessage = session.createTextMessage();
        Message getStateReplyMessage = getStateRequestor.request(getStateRequestMessage);
        TextMessage textMsg = (TextMessage) getStateReplyMessage; // assume cast always works
        String newState = textMsg.getText();
        observer.update(newState);
    }
}

PullObserverGateway? の場合もやはり、 ObserverGateway? とよく似ているが、Pullモデルを実装するためのいくつかの追加のコードがある。 初期化時に、更新を受信するための updateConsumer だけでなく、GetState?() 要求を送るための getStateRequestor? も設定している。 (GetStateRequestor?QueueRequestor? のサブクラスである。Request-Reply [154] を参照すること。) Pull版において、ゲートウェーの onMessage コードは、そのメッセージは殻なので、そのメッセージの内容を無視する。 そのメッセージの存在は、Subjectがすでに変わっている、ということをObserverに伝えている。しかし、Subjectの新しい状態が何かということは、Observerに伝えていない。 なので、そこでするすべては、updateNoState?()を呼び出すことである。 (単に、notifyNoState?()と名づけられている。)

Observerにとって、PushモデルとPullモデルとの違いは、update(String)に対し、updateNoState?()の実装で明らかになった。 Push版が、新しい状態をパラメータとして取得し、Observerを更新するのに対し、Pull版では、それがObserverを更新できる間に、新しい状態を取りに行かなければならない。 新しい状態を取得するために、それは、要求を送くり応答を受け取る getStateRequestor? を使用する。 その応答はSubjectの新しい状態を含んでいて、それは、Observerを更新するためにゲートウェーが用いたものである。 (この単純な実装で留意すべきは、そのゲートウェーはシングルスレッドなので、それが状態取得要求を送り応答を待っている間、追加の更新を一切処理しない、ということである。 従って、もし、その要求や応答メッセージを転送するのに、本当に長い時間かかるならば、ゲートウェーは待ち状態でスタックしてしまい、発生する追加の更新は単純にキューに入ることになる。)

ご覧のように、Pullモデルは、Pushモデルより複雑である。 それは、より多くのチャネル(各Observerごとの一時的なものも含む)と、より多くのメッセージ(すべてのObserverへのひとつのメッセージの代わりに、更新毎、興味のあるObserverごとの3つのメッセージ)を要求し、SubjectとObserverのクラスは、追加のメッセージング処理のために、より多くのコードを要求し、実行時オブジェクトは、追加のメッセージング処理を実行するために、より多くのスレッドを要求する。 もし、あなたのアプリケーションで、これらすべてが受け入れ可能ならば、Pullモデルは実行可能なアプローチである。 しかしながら、もし疑わしいなら、あなたは、よりシンプルなので、おそらくPullモデルから始めるべきであろう。

Channel Design(チャネルの設計)

ここまでは、ひとつのsubjectがひとつの状態をobserverたちに伝えることを考えてきた。でも、実際のアプリだともっと複雑になるよね?変更を伝えたいsubjectが大量にあるかもしれないし、ひとつのsubjectが伝えたい変更内容もいろいろあるかもしれない。また、ひとつのobserverがいろんなsubjectのいろんな変更を知りたいということも考えられる。さあたいへん。

Observerパターンでは、このあたりの問題は実装の問題だととらえている。また、ObserverパターンとCommandパターンを組み合わせたSASE (Self-Addresses Stamped Envelope)パターンっていうのもある(The Design Patterns Smalltalk Companion)。これは、subjectに変更があったときにどんなcommandを送ってほしいのかを、observer側が指定するというパターン。

Observerパターンにあまり深入りせず、メッセージングの観点から話を進める。実例を用意してみたので、いったいチャネルはいくつ必要になるかを考えてみよう。

ある企業のアプリケーション群の中に、顧客の連絡先情報を扱うものが複数ある。どれか一つで住所の変更をしたら、他のアプリケーションにもその変更を伝えないといけない。また、住所の変更を知りたいアプリケーションは一つだけだとは限らない。

これだけなら簡単。住所変更をアナウンスするためのPublish-Subscribe Channelをひとつ作って、住所変更をするアプリケーションはすべてそこに通知を流せばいい。通知を受け取りたいアプリケーションは、このチャネルを購読する。住所変更のメッセージは、たとえばこんな感じ。

<AddressChange customer_id="12345">
    <OldAddress>
        <Street>123 Wall Street</Street>
        <City>New York</City>
        <State>NY</State>
        <Zip>10005</Zip>
    </OldAddress>
    <NewAddress>
        <Street>321 Sunset Blvd</Street>
        <City>Los Angeles</City>
        <State>CA</State>
        <Zip>90012</Zip>
    </NewAddress>
</AddressChange>

話はこれだけでは終わらない。この企業のアプリケーションの中には、商品の在庫切れをアナウンスしないといけないものがあるかもしれない。一方で、在庫が切れたら再発注をかけるというアプリケーションもあるだろう。これは、要するにさっきの問題と同じ。ちょっと目先を変えただけ。在庫切れの通知用のチャネルを作ればいいだけのことだ。メッセージは、たとえばこんな感じ。

<OutOfProduct>
    <ProductID>12345</ProductID>
    <StoreID>67890</StoreID>
    <QuantityRequested>100</QuantityRequested>
</OutOfProduct>

この二つのアナウンスに同じチャネルを使えるだろうか?たぶん、答えはノーだ。Datatype Channelで見たとおり、ひとつのチャネルを流れるメッセージはすべて同じ型でないといけない。今回の例なら、XMLスキーマが同じでないといけないってこと。さっきの例の<AddressChange?>と<OutOfProduct?>は明らかに別の要素なので、これらは同じチャネルでは送れない。メッセージのフォーマットを工夫すればむりやり同じスキーマにまとめてしまえるだろうが、問題もある。だって、住所変更の通知を受け取りたいアプリが常に在庫切れ情報も知りたいとは限らないでしょ?住所変更を知りたいだけなのに、欲しくもない在庫切れ情報を受け取ることになってしまう。

さて、さらに別の例。顧客のクレジットカードの格付けが変わった場合を考えよう。メッセージはたぶんこんな感じ。

<CreditRatingChange customer_id="12345">
    <OldRating>AAA</OldRating>
    <NewRating>BBB</NewRating>
</CreditRatingChange>

ここでも、また別のチャネルを用意したくなる。そうすれば格付け情報の変更と住所変更を別々に扱えるようになるし。

…という考えかたで進めていくと、チャネルの数がどんどんふくれあがってしまう。顧客の情報って、けっこう大量にあるよね?名前とか連絡先(住所、電話、メール)とか。あ、連絡先はひとつだけじゃない。メール用とか配送用とか請求用とか。これらのどれかひとつが変わるたびに、それを知りたいというアプリケーションが出てくる。項目ごとにそれぞれ別のチャネルを作っていくの?

チャネル数が増えると、メッセージングシステムへの負荷が大きくなる。チャネルが複数あるってことは、送信者や受信者も複数あるってことで、きっと大量のスレッドが大量のチャネルを常時チェックするようになるんだろう。

もうすこしうまいやりかたがある。address-changedメッセージとcredit-ratingchangedメッセージは同チャネルで送ってしまうと言う方法。どっちも顧客がらみの変更だし、どちらか一方の変更が気になるアプリケーションはおそらくもう一方も気になるだろうから。でも、out-of-productメッセージだけは別にしておいたほうがいいだろう。顧客の情報を知りたいアプリケーションが常に商品にも興味を持つとは限らないから。

address-changedメッセージとcredit-rating-changedメッセージを同じチャネルで扱うには、メッセージのフォーマットを揃えないといけない(see: Datatype Channel)。XML的には、ルート要素を同じにしておかないといけないってこと。ルート要素以外の要素については、オプション要素を定義すればいろいろ違うものにできる。たとえば、二つのメッセージを扱う共通フォーマットはこんなふうになる。

まずは、住所変更。

<CustomerChange customer_id="12345">
    <AddressChange>
        <OldAddress>
            <Street>123 Wall Street</Street>
            <City>New York</City>
            <State>NY</State>
            <Zip>10005</Zip>
        </OldAddress>
        <NewAddress>
            <Street>321 Sunset Blvd</Street>
            <City>Los Angeles</City>
            <State>CA</State>
            <Zip>90012</Zip>
        </NewAddress>
    </AddressChange>
</CustomerChange>

そして、クレジット情報の変更。

<CustomerChange customer_id="12345">
    <CreditRatingChange>
        <OldRating>AAA</OldRating>
        <NewRating>BBB</NewRating>
    </CreditRatingChange>
</CustomerChange>

まだ問題がある。たとえば出荷アプリケーションの場合、住所の変更は気になるがクレジットの情報はどうでもいい。逆に請求アプリケーションの場合は、クレジットの情報こそが大事で住所の変更はあまり気にしない。こんなアプリケーションについては、Selective Consumerパターンを使えば、欲しいメッセージだけを受け取れる。Selective Consumerだとちょっと複雑になってしまうし、チャネルを多少増やしてもメッセージングシステム的に余裕がある、というなら最終的にはチャネルを増やす方向のほうがいいかもしれない。

Conclusions(まとめ)

Publish-Subscribe Channelを使えば、分散環境でも手軽にObserverパターンを実装できるという例を示した。Subject.Notify()とかObserver.Update()の実装が、単にメッセージの送受信だけで済んでしまうってわけだ。分散化とか並行性とかの面倒くさいところはメッセージングシステムに任せてしまえばいい。

実際に運用することを考えると、ついチャネルを増やしがちになってしまう。でも、よく似た通知を同じようなobserverたちに伝えるのなら、同じチャネルにまとめてしまうほうが現実的だ。

他の部分で一切メッセージングを使っていないアプリでも、もしお互いに変更を伝え合う必要があるのなら、それだけのためにメッセージングを導入する価値がある。


担当者のつぶやき

「Channel Design」で何が書かれているのかが気になって担当してみたけど、「まあ、そうですよねえ…」ということしか書いていなかったような気がしました。(高木)

高木さんが、各Subsectionとか、Program/Diagram を私の分も埋め込んでくれていたので、手元のテキストファイルからのはめ込みが、とても、楽でした。高木さん、ありがとうございました。(杉野)

みんなの突っ込み