EIP / JMS Request-Reply Example


EIP

JMSによるRequest-Reply 例

要約

Request-ReplyのJMS実装例(JMS 1.1 & J2EE 1.4)を示す。requestorから要求を送り、replierは要求を受け取り応答を返し、requestorがそれを受け取る。

RequestReplyExample.gif

Request-Reply の例

この例は2つのクラスで構成している。

Requestor
要求メッセージを送り、応答メッセージを受け取るために待つMessage Endpoint
Replier
要求メッセージを受け取るために待ち、応答メッセージを送るMessage Endpoint

requestorとreplierは別々のJVMで動き、通信を分散可能にする。

この例ではメッセージングシステムは3つのキューを定義している。

jms/RequestQueue?
requestorがreplierに要求メッセージを送るのに使うキュー
jms/ReplyQueue?
replierがrequestorに応答メッセージを送るのに使うキュー
jms/InvalidMessages?
requestorやreplierが解釈できなかったメッセージを移動させるキュー

requestorをコマンドライン画面にて起動すると以下のように出力される。

Sent request
        Time:       1048261736520 ms
        Message ID: ID:_XYZ123_1048261766139_6.2.1.1
        Correl. ID: null
        Reply to:   com.sun.jms.Queue: jms/ReplyQueue
        Contents:   Hello world.

requestorが要求メッセージを送っているが、replierはまだ動いてないので要求を受け取れない。replierを別コマンドライン画面で起動すると以下のように出力される。

Received request
        Time:       1048261766790 ms
        Message ID: ID:_XYZ123_1048261766139_6.2.1.1
        Correl. ID: null
        Reply to:   com.sun.jms.Queue: jms/ReplyQueue
        Contents:   Hello world.
Sent reply
        Time:       1048261766850 ms
        Message ID: ID:_XYZ123_1048261758148_5.2.1.1
        Correl. ID: ID:_XYZ123_1048261766139_6.2.1.1
        Reply to:   null
        Contents:   Hello world.

replierが要求メッセージを受け取り応答メッセージを送ったことが分かる。

上記出力より色々なことが分かる。タイムスタンプにより要求を送って(30270ms経って)から受け取っている。メッセージIDが同じ(=同じメッセージ)。内容が同じ「Hello world」でデータが正常に転送されている(この例はあまりイケてなく基本はDocument Messageで、現実の要求は通常Command Message)。jms/ReplyQueue?キューが応答メッセージの送り先として要求メッセージに指定されている(Return Addressの一例)。

要求を受け取った際の出力と応答を送った出力とを比べてみる。要求を受け取って(60ms経って)応答を送っている。応答のメッセージIDが要求のと違うが、これは要求と応答は別々のメッセージであるからだ。要求の内容を取り出し応答に付与している(この例ではreplierは単なるechoサービスとして振る舞っている)。応答は要らないのでreply-toには何も指定していない(応答はReturn Addressを使わない)。応答の相関IDは要求側のメッセージIDと同じ(応答はCorrelation Identifierを使っている)。

requestor側の画面には以下の応答を受け取っている。

Received reply
        Time:       1048261797060 ms
        Message ID: ID:_XYZ123_1048261758148_5.2.1.1
        Correl. ID: ID:_XYZ123_1048261766139_6.2.1.1
        Reply to:   null
        Contents:   Hello world.

応答は送信してから30210ms後に受け取っている。応答のメッセージIDが同じ(=同じメッセージ)。メッセージ内容が送った際と同じで、相関ID(Correlation Identifier)がrequestorがこの応答を要求していることを示している。

requestorは応答を受け取ったら終了するがreplierは動き続けているので明示的に終了させる必要がある。

Request-Replyのコード

requestorのコードを見てみる。

(コード略)

requestorは初期化のためにメッセージングシステムに接続し、3つのキューのJNDI名を指定している。initializeメソッドではメッセージングシステムに接続するためにコネクションとキュー名を使っている。

  • Session作成にConnectionを使う。アプリケーションはメッセージングシステムに必要なコネクションは1つだけであるが、各コンポーネントは個々に送受信したいので自身のセッションに必要。2つのスレッドは単一セッションは共有できない(それぞれ別セッションを使うべき)
  • DestinationであるキューのルックアップにJNDI識別子を使い、JndiUtil?が担っている
  • メッセージ送信にMessageProducer?、受信にMessageConsumer?を生成し、もう一つのproducerはinvalidメッセージキュー用。

requestorはsendメソッドにて要求メッセージを送信する。

  • TextMessage?を生成し「Hello world」をセット
  • Return Addressとしてreplierに応答を送信してもらうよう、reply-toに応答キューをセット
  • メッセージ送信にrequestProducer(要求キューに接続)を利用
  • メッセージIDは送信時にメッセージングシステムがセットするため、送信後に詳細を出力

receiveSyncメソッドで応答メッセージを受け取る。

  • 応答の受け取りにreplyConsumer(応答キューに接続)を利用。receiveメソッドでメッセージを取得しメッセージがキューに届いて読み込むまで同期ブロックするのでrequestorはPolling Consumer。受け取りが同期なのでreceiveSyncメソッドという
  • メッセージはTextMessage?でなければならない。受け取ったら詳細を出力
  • TextMessage?でない場合は処理できなかったことを表し、破棄せずにinvalidメッセージキューに送り直す。メッセージIDは変更し元メッセージIDは相関IDに格納する(Correlation Identifier参照)

JMSではQueueRequestor?という上記で実装した目的を果たす特別なクラスを提供している。

次にreplierの実装例を示す。

(コード略)

メッセージングシステムに接続し、要求とinvalidメッセージキューのJNDI名を指定(メッセージのReturn Addressで提供されるので応答キューを指定する必要はない)

initializeメソッドはrequestorのと似てるので差分だけ。

  • 応答を常に送るとは限らないため応答キューをルックアップしない
  • replierはEvent-Driven ConsumerMessageListener?をimplementsしている。メッセージが届くとメッセージングシステムは自動的にonMessageを呼び出す

replierはイベント駆動であるので、リスナーとして初期化されるとメッセージが届いてonMessageメソッドが呼ばれるまで何もしない。

  • 応答メッセージはTextMessage?であり応答を送信するキューがあることを想定している。もしそうでなければ(requestorと同じように)invalidメッセージキューに移動する
  • replierはReturn Address部分を実装する。MessageProducer?を生成するために要求メッセージのreply-toプロパティを利用し、ハードコードしてはならない。
  • 応答メッセージを生成。Correlation Identifierを実装するため、応答メッセージのcorrelation-idプロパティに要求メッセージのmessage-idプロパティの値をセット
  • 応答メッセージを送信して詳細を出力

Invalidメッセージ例

Invalid Message Channelの例を見てみる。jms/InvalidMessages?と名づけたキューはJMSクライアント(Message Endpoint)が処理できないメッセージを受け取った際に存在し、特別なチャネルに移動させる。

invalidメッセージの扱いを示すため、不正フォーマットのメッセージを送るInvalidMessenger?クラスを設計してみる。正しいフォーマットを期待する要求チャネル(Datatype Channel)に対して異なるフォーマットのメッセージを送ることで、replierはメッセージフォーマットを認識できずinvalidメッセージキューに移す。

コマンドライン画面にてreplierを、別画面でinvalidメッセンジャーを動かす。invalidメッセンジャーがメッセージを送信すると、以下のように出力される。

Sent invalid message
        Type:       com.sun.jms.ObjectMessageImpl
        Time:       1048288516959 ms
        Message ID: ID:_XYZ123_1048288516639_7.2.1.1
        Correl. ID: null
        Reply to:   com.sun.jms.Queue: jms/ReplyQueue

メッセージがObjectMessage?のインスタンスであり、TextMessage?を期待しているreplierはinvalidメッセージを受け取りinvalidメッセージキューに送り直す。

Invalid message detected
        Type:       com.sun.jms.ObjectMessageImpl
        Time:       1048288517049 ms
        Message ID: ID:_XYZ123_1048288516639_7.2.1.1
        Correl. ID: null
        Reply to:   com.sun.jms.Queue: jms/ReplyQueue
Sent to invalid message queue
        Type:       com.sun.jms.ObjectMessageImpl
        Time:       1048288517140 ms
        Message ID: ID:_XYZ123_1048287020267_6.2.1.2
        Correl. ID: ID:_XYZ123_1048288516639_7.2.1.1
        Reply to:   com.sun.jms.Queue: jms/ReplyQueue

送り直す際、新規にメッセージIDを取得するのでCorrelation Identifier?を適用する。コードはReplierやRequestorに実装済み。

まとめ

RequestorとReplierの2つクラス(Message Endpoints)を実装し、Request-Replyを使ったMessages交換方法を示した。要求メッセージはReturn Addressを使って応答を送信するキューを指定する。requestorは応答を受け取るためにPolling Consumerを、replierは要求を受け取るためにEvent-Driven Consumerを実装する。要求/応答キューはDatatype Channelsであり、consumerが正しい種類でないメッセージを受け取るとInvalid Message Channelに経路を切り替える。

担当者のつぶやき

みんなの突っ込み