複数のメッセージをとりまとめるときのポイントは、「どれとどれが関連するのかの判断」「処理を進めるのに十分なネタがそろったかの判断」をどう行うかということだ。
複数のメッセージ群をとりまとめてから何らかの処理をしたいことがある。たとえば、複数の業者の入札を見て最安値の入札を取り出したり、ひとつの発注に含まれるすべての商品を引き当ててから請求書を発行したりといったこと。
バラバラになっているけれどもそれぞれ関連しているメッセージ群を組み合わせて、ひとまとめに処理するには?
メッセージングシステムは基本的に非同期で動くので、複数のメッセージにまたがる情報をとりまとめるのは難しい。
Splitterを使ったとしても、分割した各メッセージのレスポンスがリクエストと同じ順番で戻ってくるとは限らないし、各サブメッセージを処理するシステムが違えばその処理速度も変わってくるだろう(この件に関しては、次のResequencerで改めて扱う)。
たいていのメッセージング基盤は、「メッセージが最終的に配送されることは保証するけど、いつ配送されるのかは保証しない」という配送モードになっている。つまり、サブメッセージの一部が届かないときにいったいいつまで待てばいいのかの判断が難しい。
このあたりをうまい具合に処理してくれる専用のコンポーネントを用意して、すべてのサブメッセージがそろったことを確かめてからその後の処理に流すことができるようになれば、ビジネスロジックの実装も楽になりそう。それがAggregator。
ステートフルなフィルターであるAggregatorを使い、各メッセージを収集して保存する。 そして、関連する一連のメッセージがすべて揃うまで待つ。 すべて揃ったら、Aggregatorがそれらを一つのメッセージにまとめて発行する。
Aggregatorは、Pipes and Filtersにおけるフィルタの一種。メッセージ群のストリームを受け取って、関連するメッセージどうしをとりまとめる。関連するメッセージをすべて受け取ったら、その情報を取りまとめて単一のメッセージを出力に送る。
これまでのルーティングパターンとは違って、Aggregatorはステートフルなコンポーネントである。
Aggregatorをつくるときに考慮すべきなのは、次の三点。
Correlationに関しては、入力メッセージの型を調べるなりCorrelation Identifierを使うなりすればいい。残りのふたつに関する選択肢は、Aggregation Strategiesで紹介する。
やってきたメッセージの関連を管理するために、Aggregatorは「アクティブな集合(aggregate)のリスト」を管理する。「アクティブな」とは、それを構成する複数のメッセージのうち一部を既に受信しているということ。
Aggregatorは、新しいメッセージを受け取ったときに、それが「既にアクティブな集合に属するものなのかそうでないのか」を判断する。そして、アクティブな集合にメッセージを追加するなり新たな集合をアクティブにするなりする。
メッセージを追加したら、追加先の集合の完全性を評価する。完全性が満たされた場合は、それらをとりまとめた集約メッセージを作って出力チャネルに送る。そうでない場合は、さらに次のメッセージを待つ。
ここまでの流れの例を示したのが次の図。Correlation Identifierを使って相関を判断し、完全性の条件は「少なくとも三つのメッセージがあること」とした。
この例では、手持ちのアクティブな集合に属さないメッセージを受け取ったら新たな集合を作るという実装にしている。つまり、Aggregatorは自分がこれから作る集合に関する事前知識を要しない。このタイプを「自己開始型のAggregator」と呼ぶ。
集約の方針によっては、いったん「完了」と見なして処理を終えた集合に属するメッセージが後から届いてしまうという可能性もある。そんな場合にまた新たに集合を作ってしまうのはまずいので、処理済みの集合のリストをAggregator側で持っておかないといけない。
Completeness Conditionについては、こんな方針が考えられる。
一方、Aggregation Algorithmに関してはこんなものが一般的だ。
タイムアウト方式を採用したときのタイムアウト時間や、あらかじめ設定しておく最低スコアの閾値などのパラメータは、事前にAggregatorに知らせておくことになる。これらのパラメータの設定を含む制御メッセージを受け取るようにしておくなどの方法があるだろう。制御メッセージには、期待するメッセージ数などを含めることもできる。この方式の例を、次の図に示す。
先ほどの例と違って、アクティブな集合に属さないメッセージを受け取った時点で集合を作るのではなく、最初に制御メッセージを受け取った時点で集合を作り、必要なパラメータを設定する。この方式を、自己開始型のAggregatorに対して「初期化済みのAggregator」と呼ぶ。
第9章のサンプルでは、AggregatorをJavaとC#そしてTIBCOで実装する。
あるシステムで利用する一連のコンポーネントが極めて信頼性に欠けるので、その対策としてAggregatorを使ってみた、という例。 入力メッセージを、二つのパスに並行で渡す。一方は、必須だけれども不安定なコンポーネント群、そして一方はGuaranteed Delivery。最後にAggregatorでこれらをまとめる。
完全性の条件には「タイムアウトとオーバーライド」方式を使う。両方からのメッセージが揃うか、タイムアウトになるかのいずれかで完了。
両方からのメッセージが揃った場合には、単純にそれらをまとめて何も手を加えず次に渡す。タイムアウトに達した場合はどちらかのメッセージが「消えてしまった」ということなので、Aggregatorがエラーメッセージを送出する。
JMSによる実装例。入札のメッセージを受け取り、最低額の入札を採用して次に渡すというもの。相関を表すのはオークションIDで、これがCorrelation Identifierの役割を果たす。集約の完全性の条件は「少なくとも三件のメッセージを受信すること」とする。自己開始型のAggregatorなので、初期化は不要だ。
構成は、次のクラス図のとおり。
このソリューションの肝となるのがAggregatorクラス。Destinationクラスは、QueueやTopicをJMSが抽象化したもの。テスト中はpublish-subscribe型のTopicを使って運用時はQueueに差し替えたり、といったことができる。
public class Aggregator implements MessageListener { static final String PROP_CORRID = "AuctionID"; Map activeAggregates = new HashMap(); Destination inputDest = null; Destination outputDest = null; Session session = null; MessageConsumer in = null; MessageProducer out = null; public Aggregator (Destination inputDest, Destination outputDest, Session session) { this.inputDest = inputDest; this.outputDest = outputDest; this.session = session; } public void run() { try { in = session.createConsumer(inputDest); out = session.createProducer(outputDest); in.setMessageListener(this); } catch (Exception e) { System.out.println("Exception occurred: " + e.toString()); } } public void onMessage(Message msg) { try { String correlationID = msg.getStringProperty(PROP_CORRID); Aggregate aggregate = (Aggregate)activeAggregates.get(correlationID); if (aggregate == null) { aggregate = new AuctionAggregate(session); activeAggregates.put(correlationID, aggregate); } // すでに完了した集合に関しては何もしない if (!aggregate.isComplete()) { aggregate.addMessage(msg); if (aggregate.isComplete()) { MapMessage result = (MapMessage)aggregate.getResultMessage(); out.send(result); } } } catch (JMSException e) { System.out.println("Exception occurred: " + e.toString()); } } }
非常に汎用的なコードになっている。今回のサンプル固有の処理は二行だけ。
だ。これも切り離したければ、Aggregateオブジェクトを返すファクトリーを使い、内部的にAuctionAggregate?のインスタンスを作らせればいい。が、この本はオブジェクト指向を語る本ではないので ^^;、そこまではこだわらずにシンプルに進める。
AuctionAggregate?クラスはAggregateインターフェイスの実装。このインターフェイスは次の三つのメソッドだけを持つシンプルなもの。
public interface Aggregate { public void addMessage(Message message); public boolean isComplete(); public Message getResultMessage(); }
集約の作戦は、AuctionAggregate?クラスではなくAuctionクラスで実装した。JMSのMessageクラスのかわりにBidクラスを使うことで、JMSに依存しない実装になった。
ublic class Auction { ArrayList bids = new ArrayList(); public void addBid(Bid bid) { bids.add(bid); System.out.println(bids.size() + " Bids in auction."); } public boolean isComplete() { return (bids.size() >= 3); } public Bid getBestBid() { Bid bestBid = null; Iterator iter = bids.iterator(); if (iter.hasNext()) bestBid = (Bid) iter.next(); while (iter.hasNext()) { Bid b = (Bid) iter.next(); if (b.getPrice() < bestBid.getPrice()) { bestBid = b; } } return bestBid; } }
AuctionAggregate?クラスは、AggregateインターフェイスとAuctionクラスの間のアダプタ。
public class AuctionAggregate implements Aggregate { static String PROP_AUCTIONID = "AuctionID"; static String ITEMID = "ItemID"; static String VENDOR = "Vendor"; static String PRICE = "Price"; private Session session; private Auction auction; public AuctionAggregate(Session session) { this.session = session; auction = new Auction(); } public void addMessage(Message message) { Bid bid = null; if (message instanceof MapMessage) { try { MapMessage mapmsg = (MapMessage)message; String auctionID = mapmsg.getStringProperty(PROP_AUCTIONID); String itemID = mapmsg.getString(ITEMID); String vendor = mapmsg.getString(VENDOR); double price = mapmsg.getDouble(PRICE); bid = new Bid(auctionID, itemID, vendor, price); auction.addBid(bid); } catch (JMSException e) { System.out.println(e.getMessage()); } } } public boolean isComplete() { return auction.isComplete(); } public Message getResultMessage() { Bid bid = auction.getBestBid(); try { MapMessage msg = session.createMapMessage(); msg.setStringProperty(PROP_AUCTIONID, bid.getCorrelationID()); msg.setString(ITEMID, bid.getItemID()); msg.setString(VENDOR, bid.getVendorName()); msg.setDouble(PRICE, bid.getPrice()); return msg; } catch (JMSException e) { System.out.println("Could not create message: " + e.getMessage()); return null; } } }
これらのクラスが、次のシーケンス図のような流れで動く。