EIP / Channel Purger


Channel Purger

一言要約

ゴミはきちんと後始末しましょう。

Context

第6章で考えたJMS Request-Reply Example。requestorがメッセージをreplierに送信し、replierからの返事を待つというシンプルなものだったが、あれを見ているときに、おもしろい問題に出くわした。

まずreplierを立ち上げて、それからrequestorを立ち上げる。そのとき、奇妙なことが起こった。requestorのコンソールウィンドウに、早くもレスポンスが届いているのだ。そもそもreplier側ではまだリクエストを受け付けてすらいないのに。よくわからないので、replierをシャットダウンしてからrequestorを再起動した。おかしな話だが、まだレスポンスが届いている。奇跡でも起こった?

そんなことはなくて、単に永続化メッセージングの副作用が出ただけのことだ。それ以前の何かの処理が中途半端に終わってしまい、ReplyQueue?に余分なメッセージが残っていたということだ。

requestorが立ち上がると、新しいメッセージをRequestQueue?に入れる。そして、それとはまったく無関係の応答メッセージをすぐに受け取る。これはもともとReplyQueue?に居座っていたものであり、ついさっきRequestQueue?に入れたメッセージへの応答ではない。でも、requestorにはそんなことはわからない。

非同期メッセージングって、こんな単純なシナリオでさえおかしなことが起こってしまうものなんだね……。


Problem

チャネル上に残ったメッセージが、その後のテストや稼働中のシステムを邪魔しないようにできるだろうか?

Force

そもそもMessage Channelは、受け取り側のコンポーネントが動いているかどうかにかかわらずメッセージを確実に配送するように作られている。が、時にはこれは問題を引き起こすこともある。システムのテスト中や、何かのコンポーネントがうまく動いていない(そしてトランザクションも効いていない)ときに、余計なメッセージがチャネル上に残ってしまうことになる。

今回の例のようにシンプルなパターンなら、Correlation Identifierを使えばデバッグがいくらか楽になっただろう。受け取った応答が自分の送信したリクエストに関係ないものであると、気づけるからだ。それがわかりさえずれば、後は何とかなる(古いメッセージは破棄したり、Invalid Message Channelに送ったり、ね)。

それ以外の方法として、テンポラリチャネルを使うという方法がある(JMSには、それ用のメソッドcreateTemporaryQueue?がある)。このチャネルは、アプリケーションとメッセージングシステムとの接続を閉じた時点で、チャネル上のメッセージをすべて消去するというものだ。しかし、シンプルなrequest-reply型の場合にしか使えない。

もうひとつの対策としては、トランザクション管理がある。メッセージを受け取って何かの処理をし、そしてメッセージを送信するまでをひとつのトランザクションにすればいい。メッセージの処理中に異常終了したらロールバックされるし、応答メッセージについても、実際にメッセージの送信をコミットするまでは送信したとは見なされない。しかし、注意すべきことがある。いくらトランザクションといえども、プログラマーのミスによるエラーは防ぎようがないってこと。プログラマーのミスでrequestorがReplyQueue?からのメッセージを読み落としたとしても、トランザクションはそれを救ってくれない。

Solution

Channel Purgerを使い、不要なメッセージをチャネルから除去する。

シンプルなChannel Purgerは、チャネル上のすべてのメッセージを削除する。テスト中ならこれで十分だろう。稼働中のシステムのデバッグに使う場合は、何らかの条件を指定してそれを満たすメッセージだけを削除できるようにもしておきたい。

たいていの場合は、Channel Purgerが削除したメッセージをそのまま破棄してしまって問題ない。もしどこかに保存しておきたい(あとで調べるとか、再送するとかのため)のなら、Message Storeと組み合わせて使えばいい。


Examples


JMSにおけるChannel Purger

Javaによる実装例。すべてのメッセージをチャネルから削除する。

import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;

public class ChannelPurger extends JmsEndpoint
{

    public static void main(String[] args)
    {

        if (args.length != 1) {
            System.out.println("Usage: java ChannelPurger <queue_name>");
            System.exit(1);
        }
        String queueName = new String(args[0]);
        System.out.println("Purging queue " + queueName);

        ChannelPurger purger = new ChannelPurger();

        purger.purgeQueue(queueName);

    }

    private void purgeQueue(String queueName)
    {
        try {
            initialize();
            connection.start();
            Queue queue = (Queue) JndiUtil.getDestination(queueName);

            MessageConsumer consumer = session.createConsumer(queue);

            while (consumer.receiveNoWait() != null)
                System.out.print(".");
            connection.stop();
        } catch (Exception e) {
            System.out.println("Exception occurred: " + e.toString());
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    // 無視
                }
            }
        }
    }
}

担当者のつぶやき

みんなの突っ込み

  • すみませんが、Channel Purgerって日本語で何と言うか教えてください! -- きこ? 2016-05-04 (水) 00:31:39