EIP / Aggregator


Aggregator(アグリゲーター)

一言要約

複数のメッセージをとりまとめるときのポイントは、「どれとどれが関連するのかの判断」「処理を進めるのに十分なネタがそろったかの判断」をどう行うかということだ。

Context

複数のメッセージ群をとりまとめてから何らかの処理をしたいことがある。たとえば、複数の業者の入札を見て最安値の入札を取り出したり、ひとつの発注に含まれるすべての商品を引き当ててから請求書を発行したりといったこと。


Problem

バラバラになっているけれどもそれぞれ関連しているメッセージ群を組み合わせて、ひとまとめに処理するには?

Force

メッセージングシステムは基本的に非同期で動くので、複数のメッセージにまたがる情報をとりまとめるのは難しい。

Splitterを使ったとしても、分割した各メッセージのレスポンスがリクエストと同じ順番で戻ってくるとは限らないし、各サブメッセージを処理するシステムが違えばその処理速度も変わってくるだろう(この件に関しては、次のResequencerで改めて扱う)。

たいていのメッセージング基盤は、「メッセージが最終的に配送されることは保証するけど、いつ配送されるのかは保証しない」という配送モードになっている。つまり、サブメッセージの一部が届かないときにいったいいつまで待てばいいのかの判断が難しい。

このあたりをうまい具合に処理してくれる専用のコンポーネントを用意して、すべてのサブメッセージがそろったことを確かめてからその後の処理に流すことができるようになれば、ビジネスロジックの実装も楽になりそう。それがAggregator。

Solution

ステートフルなフィルターであるAggregatorを使い、各メッセージを収集して保存する。 そして、関連する一連のメッセージがすべて揃うまで待つ。 すべて揃ったら、Aggregatorがそれらを一つのメッセージにまとめて発行する。

Aggregatorは、Pipes and Filtersにおけるフィルタの一種。メッセージ群のストリームを受け取って、関連するメッセージどうしをとりまとめる。関連するメッセージをすべて受け取ったら、その情報を取りまとめて単一のメッセージを出力に送る。

これまでのルーティングパターンとは違って、Aggregatorはステートフルなコンポーネントである。

Aggregatorをつくるときに考慮すべきなのは、次の三点。

  1. 相関(Correlation):どのメッセージとどのメッセージをひとまとめにして扱うのか?
  2. 完全性(Completeness Condition):次の処理に進む準備が整ったとみなす条件は?
  3. 集約方式(Aggregation Algorithm):受け取ったメッセージを取りまとめる方法は?

Correlationに関しては、入力メッセージの型を調べるなりCorrelation Identifierを使うなりすればいい。残りのふたつに関する選択肢は、Aggregation Strategiesで紹介する。

Implementation Details

やってきたメッセージの関連を管理するために、Aggregatorは「アクティブな集合(aggregate)のリスト」を管理する。「アクティブな」とは、それを構成する複数のメッセージのうち一部を既に受信しているということ。

Aggregatorは、新しいメッセージを受け取ったときに、それが「既にアクティブな集合に属するものなのかそうでないのか」を判断する。そして、アクティブな集合にメッセージを追加するなり新たな集合をアクティブにするなりする。

メッセージを追加したら、追加先の集合の完全性を評価する。完全性が満たされた場合は、それらをとりまとめた集約メッセージを作って出力チャネルに送る。そうでない場合は、さらに次のメッセージを待つ。

ここまでの流れの例を示したのが次の図。Correlation Identifierを使って相関を判断し、完全性の条件は「少なくとも三つのメッセージがあること」とした。

この例では、手持ちのアクティブな集合に属さないメッセージを受け取ったら新たな集合を作るという実装にしている。つまり、Aggregatorは自分がこれから作る集合に関する事前知識を要しない。このタイプを「自己開始型のAggregator」と呼ぶ。

集約の方針によっては、いったん「完了」と見なして処理を終えた集合に属するメッセージが後から届いてしまうという可能性もある。そんな場合にまた新たに集合を作ってしまうのはまずいので、処理済みの集合のリストをAggregator側で持っておかないといけない。


Aggregation Strategies

Completeness Conditionについては、こんな方針が考えられる。

全部待つ(Wait for All)
すべてのレスポンスを受信するまで待つ。一定の時間内にすべてのメッセージが揃わなければ、タイムアウトのエラーを発生させる。わかりやすい方法だけど、一番時間がかかるし不安定だ。しかも、事前にメッセージの数を知っておく必要がある。
タイムアウト(Timeout)
一定の時間だけレスポンスを待ち、その間に届いたメッセージだけを評価して決断を下す。オークションの入札の処理などで使える。
早い者勝ち(First Best)
最初に届いたレスポンスだけを採用し、それ以外は一切無視する。一番高速に処理できるが、多くの情報を無視してしまうことになる。株取引など、レスポンスタイムが重要となる場面で使える。
タイムアウトとオーバーライド(Timeout with Override)
指定した時間が経過するか、あるいはあらかじめ設定した最低スコアを上回るレスポンスが届くまで待つ。満足できるレスポンスがあったらできるだけ早めに終わらせることが狙い。
外部イベント(External Event)
外部のイベント(たとえば金融業界なら、取引終了時刻など)によって完了が決まる。

一方、Aggregation Algorithmに関してはこんなものが一般的だ。

「ベスト」な答えを採用する(Select the "best" answer)
たったひとつの正解(たとえば入札額が一番安いものなど)があるときに、それを採用する。ただ、現実的にはそんなに単純に正解が決まることはない。
データを凝縮する(Condense data)
個々のメッセージの平均値や合計を計算して、それを一つのメッセージにまとめる。メッセージのトラフィックを軽減するなどの効果があるが、数値データについてしか使えない。
評価を先送りする(Collect data for later evaluation)
Aggregatorでは単純に個々のメッセージを寄せ集めるだけにする。その後の判断は、別のコンポーネント(あるいは人間)に任せる。

タイムアウト方式を採用したときのタイムアウト時間や、あらかじめ設定しておく最低スコアの閾値などのパラメータは、事前にAggregatorに知らせておくことになる。これらのパラメータの設定を含む制御メッセージを受け取るようにしておくなどの方法があるだろう。制御メッセージには、期待するメッセージ数などを含めることもできる。この方式の例を、次の図に示す。

先ほどの例と違って、アクティブな集合に属さないメッセージを受け取った時点で集合を作るのではなく、最初に制御メッセージを受け取った時点で集合を作り、必要なパラメータを設定する。この方式を、自己開始型のAggregatorに対して「初期化済みのAggregator」と呼ぶ。

Examples

Loan Broker

第9章のサンプルでは、AggregatorをJavaとC#そしてTIBCOで実装する。

Aggregator as Missing Message Detector

あるシステムで利用する一連のコンポーネントが極めて信頼性に欠けるので、その対策としてAggregatorを使ってみた、という例。 入力メッセージを、二つのパスに並行で渡す。一方は、必須だけれども不安定なコンポーネント群、そして一方はGuaranteed Delivery。最後にAggregatorでこれらをまとめる。

完全性の条件には「タイムアウトとオーバーライド」方式を使う。両方からのメッセージが揃うか、タイムアウトになるかのいずれかで完了。

両方からのメッセージが揃った場合には、単純にそれらをまとめて何も手を加えず次に渡す。タイムアウトに達した場合はどちらかのメッセージが「消えてしまった」ということなので、Aggregatorがエラーメッセージを送出する。


Aggregator in JMS

JMSによる実装例。入札のメッセージを受け取り、最低額の入札を採用して次に渡すというもの。相関を表すのはオークションIDで、これがCorrelation Identifierの役割を果たす。集約の完全性の条件は「少なくとも三件のメッセージを受信すること」とする。自己開始型のAggregatorなので、初期化は不要だ。

構成は、次のクラス図のとおり。

Aggregator
メッセージの受信やその集約、結果の送信などを受け持つ。メッセージの集合とのやりとりにはAggregateインターフェイスを使う。
AuctionAggregate?
Aggregateインターフェイスを実装し、このインターフェイスとAuctionクラスとの間のAdapterとなる。これで、AuctionクラスをJMS非依存にできる。
Auction
これまでに受信した入札を管理するコレクション。集約の戦略をここで実装する。
Bid
単一の入札に関するデータを保持するクラス。

このソリューションの肝となるのがAggregatorクラス。Destinationクラスは、QueueやTopicをJMSが抽象化したもの。テスト中はpublish-subscribe型のTopicを使って運用時はQueueに差し替えたり、といったことができる。

public class Aggregator implements MessageListener
{
    static final String PROP_CORRID = "AuctionID";

    Map activeAggregates = new HashMap();

    Destination inputDest = null;
    Destination outputDest = null;
    Session session = null;

    MessageConsumer in = null;
    MessageProducer out = null;

    public Aggregator (Destination inputDest, Destination outputDest, Session session)
    {
        this.inputDest = inputDest;
        this.outputDest = outputDest;
        this.session = session;
    }

    public void run()
    {
        try {
            in = session.createConsumer(inputDest);
            out = session.createProducer(outputDest);
            in.setMessageListener(this);
        } catch (Exception e) {
            System.out.println("Exception occurred: " + e.toString());
        }
    }

    public void onMessage(Message msg)
    {
        try {
            String correlationID = msg.getStringProperty(PROP_CORRID);
            Aggregate aggregate = (Aggregate)activeAggregates.get(correlationID);
            if (aggregate == null) {
                aggregate = new AuctionAggregate(session);
                activeAggregates.put(correlationID, aggregate);
            }
            // すでに完了した集合に関しては何もしない
            if (!aggregate.isComplete()) {
                aggregate.addMessage(msg);
                if (aggregate.isComplete()) {
                    MapMessage result = (MapMessage)aggregate.getResultMessage();
                    out.send(result);
                }
            }
        } catch (JMSException e) {
            System.out.println("Exception occurred: " + e.toString());
        }
    }
}

非常に汎用的なコードになっている。今回のサンプル固有の処理は二行だけ。

  • メッセージのAuctionIDプロパティで相関を判断する という指定
  • AuctionAggregate?クラスのインスタンスを作っているところ

だ。これも切り離したければ、Aggregateオブジェクトを返すファクトリーを使い、内部的にAuctionAggregate?のインスタンスを作らせればいい。が、この本はオブジェクト指向を語る本ではないので ^^;、そこまではこだわらずにシンプルに進める。

AuctionAggregate?クラスはAggregateインターフェイスの実装。このインターフェイスは次の三つのメソッドだけを持つシンプルなもの。

public interface Aggregate {
    public void addMessage(Message message);
    public boolean isComplete();
    public Message getResultMessage();
}

集約の作戦は、AuctionAggregate?クラスではなくAuctionクラスで実装した。JMSのMessageクラスのかわりにBidクラスを使うことで、JMSに依存しない実装になった。

ublic class Auction
{
    ArrayList bids = new ArrayList();

    public void addBid(Bid bid)
    {
        bids.add(bid);
        System.out.println(bids.size() + " Bids in auction.");
    }

    public boolean isComplete()
    {
        return (bids.size() >= 3);
    }

    public Bid getBestBid()
    {
        Bid bestBid = null;

        Iterator iter = bids.iterator();
        if (iter.hasNext())
            bestBid = (Bid) iter.next();
        while (iter.hasNext()) {
            Bid b = (Bid) iter.next();
            if (b.getPrice() < bestBid.getPrice()) {
                bestBid = b;
            }
        }
        return bestBid;
    }
}

AuctionAggregate?クラスは、AggregateインターフェイスとAuctionクラスの間のアダプタ。

public class AuctionAggregate implements Aggregate {
    static String PROP_AUCTIONID = "AuctionID";
    static String ITEMID = "ItemID";
    static String VENDOR = "Vendor";
    static String PRICE = "Price";

    private Session session;
    private Auction auction;

    public AuctionAggregate(Session session)
    {
        this.session = session;
        auction = new Auction();
    }

    public void addMessage(Message message) {
        Bid bid = null;
        if (message instanceof MapMessage) {
            try {
                MapMessage mapmsg = (MapMessage)message;
                String auctionID = mapmsg.getStringProperty(PROP_AUCTIONID);
                String itemID = mapmsg.getString(ITEMID);
                String vendor = mapmsg.getString(VENDOR);
                double price = mapmsg.getDouble(PRICE);
                bid = new Bid(auctionID, itemID, vendor, price);
                auction.addBid(bid);
            } catch (JMSException e) {
                System.out.println(e.getMessage());
            }
        }
    }

    public boolean isComplete()
    {
        return auction.isComplete();
    }

    public Message getResultMessage() {
        Bid bid = auction.getBestBid();
        try {
            MapMessage msg = session.createMapMessage();
            msg.setStringProperty(PROP_AUCTIONID, bid.getCorrelationID());
            msg.setString(ITEMID, bid.getItemID());
            msg.setString(VENDOR, bid.getVendorName());
            msg.setDouble(PRICE, bid.getPrice());
            return msg;
        } catch (JMSException e) {
            System.out.println("Could not create message: " + e.getMessage());
            return null;
        }
    }
}

これらのクラスが、次のシーケンス図のような流れで動く。

担当者のつぶやき

みんなの突っ込み