EIP / .NET Request-Reply Example


.NET Request-Reply Example

  • C#によるメッセージングの使い方の単純な例
  • Request-Replyの実装方法を示す
    • 不正なメッセージの扱いも

Request-Reply Example

主に二つの主要クラスからなる

  • Requestor
    • リクエストメッセージを送信し、リプライメッセージの受信を待つMessage Endpoint
  • Replier
    • リクエストメッセージの受信を待ち、リプライメッセージを送信するMessage Endpoint

通信を分散させるのに、リクエスタとリプライヤの.NETプログラムとして分離して動かす

メッセージングシステムは3つのキューを持つ

  1. .\private$\RequestQueue?
  2. .\private$\ReplyQueue?
  3. .\private$\InvalidQueue?
    • リプライヤが解釈できないメッセージを受け取った時に使うMessageQueue

リクエスタがリクエストを送信したときの出力内容

Sent request
	Time: 		09:11:09.165342
	Message ID: 	8b0fc389-f21f-423b-9eaa-c3a881a34808\149
       Correl. ID:
	Reply to: 	.\private$\ReplyQueue
	Contents: 	Hello world.

リプライヤがリクエストを受信し、リプライを送信したときの出力内容

Received request
	Time:		09:11:09.375644
	Message ID:	8b0fc389-f21f-423b-9eaa-c3a881a34808\149
	Correl. ID: 	<n/a>
	Reply to: 	FORMATNAME:DIRECT=OS:XYZ123\private$\ReplyQueue
	Contents:	Hello world.
Sent reply
	Time:		09:11:09.956480
	Message ID: 	8b0fc389-f21f-423b-9eaa-c3a881a34808\150
	Correl. ID: 	8b0fc389-f21f-423b-9eaa-c3a881a34808\149
	Reply to: 	<n/a>
	Contents:	Hello world.

ここまでの出力内容について

  • Sent requestとReceived requestに注目
    • リクエスタが送った後にリプライヤが受け取ってる
      • タイムスタンプに注目
    • Message IDが一致
      • 同じメッセージなので
    • コンテンツが一致
      • 同じメッセージなので
    • Reply toのキューが明記
  • Received requestとSent Replyに注目
    • リクエストを受信するまではリプライを返していない
      • タイムスタンプに注目
    • Message IDが異なる
      • 異なるメッセージなので
    • コンテンツがリクエストのものを元に設定されてる
    • Reply toが未設定
    • Correl. IDがリクエストのMessage IDと一致

リクエスタがリプライを受信したときの出力内容

Received reply 
	Time:		09:11:10.156467
	Message ID:	8b0fc389-f21f-423b-9eaa-c3a881a34808\150
	Correl. ID: 	8b0fc389-f21f-423b-9eaa-c3a881a34808\149
	Reply to: 	<n/a>
	Contents:	Hello world.

この出力内容について

  • リプライを受け取った時間は送信した時間の後
  • Message IDが一致
    • 同じメッセージなので
  • コンテンツが一致
    • 同じメッセージなので
  • Correl. IDにより、どのリクエストに対するリプライなのかを判断できる

その他の仕様について

  • リクエスタは長時間稼働しない
  • リプライヤは常時稼働してリクエストを待っている
    • エンターキーを押すと終了

Request-Reply Code

リクエスタの実装

using System;
using System.Messaging;

public class Requestor
{
    private MessageQueue requestQueue;
    private MessageQueue replyQueue;
        
    public Requestor(String requestQueueName, String replyQueueName)
    {
        requestQueue = new MessageQueue(requestQueueName);
        replyQueue = new MessageQueue(replyQueueName);

        replyQueue.MessageReadPropertyFilter.SetAll();
        ((XmlMessageFormatter)replyQueue.Formatter).TargetTypeNames = new string[] { "System.String,mscorlib" };
    }

    public void Send()
     {
        Message requestMessage = new Message();
        requestMessage.Body = "Hello world.";
        requestMessage.ResponseQueue = replyQueue;
        requestQueue.Send(requestMessage);

        Console.WriteLine("Sent request");
        Console.WriteLine("\tTime:       {0}", DateTime.Now.ToString("HH:mm:ss.ffffff"));
        Console.WriteLine("\tMessage ID: {0}", requestMessage.Id);
        Console.WriteLine("\tCorrel. ID: {0}", requestMessage.CorrelationId);
        Console.WriteLine("\tReply to:   {0}", requestMessage.ResponseQueue.Path);
        Console.WriteLine("\tContents:   {0}", requestMessage.Body.ToString());
    }

    public void ReceiveSync()
    {
        Message replyMessage = replyQueue.Receive();

        Console.WriteLine("Received reply");
        Console.WriteLine("\tTime:       {0}", DateTime.Now.ToString("HH:mm:ss.ffffff"));
        Console.WriteLine("\tMessage ID: {0}", replyMessage.Id);
        Console.WriteLine("\tCorrel. ID: {0}", replyMessage.CorrelationId);
        Console.WriteLine("\tReply to:   {0}", "<n/a>");
        Console.WriteLine("\tContents:   {0}", replyMessage.Body.ToString());
    }
}

ソースについて

  • コンストラクタに注目
    • リクエスト用とリプライ用それぞれのキューのパス名を受け取る
      • MSMQのリソースに対するパス名
    • メッセージのプロパティすべてを取得するようフィルタを設定
    • 文字列として解釈されるべくTargetTypeNames?を設定
  • Sendに注目
    • コンテンツに"Hello world."を設定
    • メッセージのResponseQueue?プロパティにreplyQueueを設定
    • キューにメッセージを送信
    • 送信したメッセージの詳細を出力
      • メッセージIDは実際に送られてから割り振られる
  • ReceiveSync?に注目
    • リプライキューで同期的に受信
    • 受信したメッセージの詳細を出力

リプライヤの実装

using System;
using System.Messaging;

public class Replier
{
    private MessageQueue invalidQueue;

    public Replier(String requestQueueName, String invalidQueueName)
    {
        MessageQueue requestQueue = new MessageQueue(requestQueueName);
        invalidQueue = new MessageQueue(invalidQueueName);

        requestQueue.MessageReadPropertyFilter.SetAll();
        ((XmlMessageFormatter)requestQueue.Formatter).TargetTypeNames = new string[] { "System.String,mscorlib" };
        requestQueue.ReceiveCompleted += new ReceiveCompletedEventHandler(OnReceiveCompleted);
        requestQueue.BeginReceive();
    }

    public void OnReceiveCompleted(Object source, ReceiveCompletedEventArgs asyncResult)
    {
        MessageQueue requestQueue = (MessageQueue) source;
        Message requestMessage = requestQueue.EndReceive(asyncResult.AsyncResult);

        try
        {
            Console.WriteLine("Received request");
            Console.WriteLine("\tTime:       {0}", DateTime.Now.ToString("HH:mm:ss.ffffff"));
            Console.WriteLine("\tMessage ID: {0}", requestMessage.Id);
            Console.WriteLine("\tCorrel. ID: {0}", "<n/a>");
            Console.WriteLine("\tReply to:   {0}", requestMessage.ResponseQueue.Path);
            Console.WriteLine("\tContents:   {0}", requestMessage.Body.ToString());

            string contents = requestMessage.Body.ToString();
            MessageQueue replyQueue = requestMessage.ResponseQueue;
            Message replyMessage = new Message();
            replyMessage.Body = contents;
            replyMessage.CorrelationId = requestMessage.Id;
            replyQueue.Send(replyMessage);

            Console.WriteLine("Sent reply");
            Console.WriteLine("\tTime:       {0}", DateTime.Now.ToString("HH:mm:ss.ffffff"));
            Console.WriteLine("\tMessage ID: {0}", replyMessage.Id);
            Console.WriteLine("\tCorrel. ID: {0}", replyMessage.CorrelationId);
            Console.WriteLine("\tReply to:   {0}", "<n/a>");
            Console.WriteLine("\tContents:   {0}", replyMessage.Body.ToString());

        }
        catch (Exception)
        {
            Console.WriteLine("Invalid message detected");
            Console.WriteLine("\tType:       {0}", requestMessage.BodyType);
            Console.WriteLine("\tTime:       {0}", DateTime.Now.ToString("HH:mm:ss.ffffff"));
            Console.WriteLine("\tMessage ID: {0}", requestMessage.Id);
            Console.WriteLine("\tCorrel. ID: {0}", "<n/a>");
            Console.WriteLine("\tReply to:   {0}", requestMessage.ResponseQueue.Path);

            requestMessage.CorrelationId = requestMessage.Id;
            invalidQueue.Send(requestMessage);

            Console.WriteLine("Sent to invalid message queue");
            Console.WriteLine("\tType:       {0}", requestMessage.BodyType);
            Console.WriteLine("\tTime:       {0}", DateTime.Now.ToString("HH:mm:ss.ffffff"));
            Console.WriteLine("\tMessage ID: {0}", requestMessage.Id);
            Console.WriteLine("\tCorrel. ID: {0}", requestMessage.CorrelationId);
            Console.WriteLine("\tReply to:   {0}", requestMessage.ResponseQueue.Path);
        }

        requestQueue.BeginReceive();
    }
}

ソースについて

  • コンストラクタに注目
    • リクエスト用と不正なメッセージ用それぞれのキューのパス名を受け取る
      • リプライ用のキューはReply toに設定されてるので不要
      • 必ずリプライする必要があるとも限らないし
    • Event-Driven Consumerの例になってる
      • ReceiveCompletedEventHandler?によりOnReceiveCompleted?を実行
    • リクエストキューの非同期のリスナ
      • リクエスタと違いメッセージを受信するまで何もしない
  • OnReceiveCompleted?に注目
    • リクエストキューでの受信をいったん完了する
    • Return AddressとしてリクエストメッセージからResponseQueue?を取得
      • 特定のリプライキューのパス名をハードコードする等して使用していな
    • Correl. IDにリクエストメッセージのIDを使用
    • リプライメッセージを送信
    • 送信したリプライメッセージの詳細を出力
    • 処理が失敗したら例外をスローして不正メッセージキューに送信
      • Correl. IDにリクエストメッセージのIDを使用
    • 処理が終ればまた受信を再開

Invalid Message Example

Invalid Message Channelの例を示すのに、Invalid Messangerクラスを設計する

  • リクエストチャネルに不正なフォーマットのメッセージが届いた場合に使用される
  • リクエストチャネルはDatatype Channelになっている
    • 規定のフォーマットのリクエストを想定したもの
  • 不正なメッセンジャーは不正なメッセージを送信するためのもの
    • これを受信したリプライヤが不正メッセージキューにメッセージを移動
  • リプライヤとは別のウィンドウで不正なメッセージ送信を実行する

不正なメッセンジャーがリクエストを送信したときの出力内容

Sent request
	Type:		768
	Time: 		09:39:44.223729
	Message ID: 	8b0fc389-f21f-423b-9eaa-c3a881a34808\168
	Correl. ID:	00000000-0000-0000-0000-000000000000\0
	Reply to: 	.\private$\ReplyQueue

この出力内容について

  • タイプ768はバイナリのメッセージとする
    • リプライヤはtext/XMLのコンテンツを想定

リプライヤがリクエストを受信し、不正メッセージキューに再送信したときの出力内容

Invalid message detected
	Type:		768
	Time:		09:39:44.233744
	Message ID:	8b0fc389-f21f-423b-9eaa-c3a881a34808\168
	Correl. ID: 	<n/a>
	Reply to: 	<n/a>
Sent to invalid message queue
	Type:		768
	Time:		09:39:44.233744
	Message ID: 	8b0fc389-f21f-423b-9eaa-c3a881a34808\169
	Correl. ID: 	8b0fc389-f21f-423b-9eaa-c3a881a34808\168
	Reply to: 	FORMATNAME:DIRECT=OS:XYZ123\private$\ReplyQueue

この出力内容について

  • 不正メッセージキューに再送信されるので、新たにMesssageIDが振られてしまう
    • このため、Correlation Identifierを使ってる
    • この辺りの処理はReplierクラスのOnReceiveCompleted?メソッドにある

Conclusions

関連するパターン